diff options
Diffstat (limited to 'analysis/R/association.R')
-rwxr-xr-x | analysis/R/association.R | 491 |
1 files changed, 491 insertions, 0 deletions
diff --git a/analysis/R/association.R b/analysis/R/association.R new file mode 100755 index 0000000..d1c7b5e --- /dev/null +++ b/analysis/R/association.R @@ -0,0 +1,491 @@ +# Copyright 2014 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +library(parallel) # mclapply + +source.rappor <- function(rel_path) { + abs_path <- paste0(Sys.getenv("RAPPOR_REPO", ""), rel_path) + source(abs_path) +} + +source.rappor("analysis/R/util.R") # for Log +source.rappor("analysis/R/decode.R") # for ComputeCounts + +# +# Tools used to estimate variable distributions of up to three variables +# in RAPPOR. This contains the functions relevant to estimating joint +# distributions. + +GetOtherProbs <- function(counts, map_by_cohort, marginal, params, pstar, + qstar) { + # Computes the marginal for the "other" category. + # + # Args: + # counts: m x (k+1) matrix with counts of each bit for each + # cohort (m=#cohorts total, k=# bits in bloom filter), first column + # stores the total counts + # map_by_cohort: list of matrices encoding locations of hashes for each + # string "other" category) + # marginal: object containing the estimated frequencies of known strings + # as well as the strings themselves, variance, etc. + # params: RAPPOR encoding parameters + # + # Returns: + # List of vectors of probabilities that each bit was set by the "other" + # category. The list is indexed by cohort. + + N <- sum(counts[, 1]) + + # Counts of known strings to remove from each cohort. + known_counts <- ceiling(marginal$proportion * N / params$m) + sum_known <- sum(known_counts) + + # Select only the strings we care about from each cohort. + # NOTE: drop = FALSE necessary if there is one candidate + candidate_map <- lapply(map_by_cohort, function(map_for_cohort) { + map_for_cohort[, marginal$string, drop = FALSE] + }) + + # If no strings were found, all nonzero counts were set by "other" + if (length(marginal) == 0) { + probs_other <- apply(counts, 1, function(cohort_row) { + cohort_row[-1] / cohort_row[1] + }) + return(as.list(as.data.frame(probs_other))) + } + + # Counts set by known strings without noise considerations. + known_counts_by_cohort <- sapply(candidate_map, function(map_for_cohort) { + as.vector(as.matrix(map_for_cohort) %*% known_counts) + }) + + # Protect against R's matrix/vector confusion. This ensures + # known_counts_by_cohort is a matrix in the k=1 case. + dim(known_counts_by_cohort) <- c(params$m, params$k) + + # Counts set by known vals zero bits adjusting by p plus true bits + # adjusting by q. + known_counts_by_cohort <- (sum_known - known_counts_by_cohort) * pstar + + known_counts_by_cohort * qstar + + # Add the left hand sums to make it a m x (k+1) "counts" matrix + known_counts_by_cohort <- cbind(sum_known, known_counts_by_cohort) + + # Counts set by the "other" category. + reduced_counts <- counts - known_counts_by_cohort + reduced_counts[reduced_counts < 0] <- 0 + probs_other <- apply(reduced_counts, 1, function(cohort_row) { + cohort_row[-1] / cohort_row[1] + }) + + # Protect against R's matrix/vector confusion. + dim(probs_other) <- c(params$k, params$m) + + probs_other[probs_other > 1] <- 1 + probs_other[is.nan(probs_other)] <- 0 + probs_other[is.infinite(probs_other)] <- 0 + + # Convert it from a k x m matrix to a list indexed by m cohorts. + # as.data.frame makes each cohort a column, which can be indexed by + # probs_other[[cohort]]. + result <- as.list(as.data.frame(probs_other)) + + result +} + +GetCondProbBooleanReports <- function(reports, pstar, qstar, num_cores) { + # Compute conditional probabilities given a set of Boolean reports. + # + # Args: + # reports: RAPPOR reports as a list of bit arrays (of length 1, because + # this is a boolean report) + # pstar, qstar: standard params computed from from rappor parameters + # num_cores: number of cores to pass to mclapply to parallelize apply + # + # Returns: + # Conditional probability of all boolean reports corresponding to + # candidates (TRUE, FALSE) + + # The values below are p(report=1|value=TRUE), p(report=1|value=FALSE) + cond_probs_for_1 <- c(qstar, pstar) + # The values below are p(report=0|value=TRUE), p(report=0|value=FALSE) + cond_probs_for_0 <- c(1 - qstar, 1 - pstar) + + cond_report_dist <- mclapply(reports, function(report) { + if (report[[1]] == 1) { + cond_probs_for_1 + } else { + cond_probs_for_0 + } + }, mc.cores = num_cores) + cond_report_dist +} + +GetCondProbStringReports <- function(reports, cohorts, map, m, pstar, qstar, + marginal, prob_other = NULL, num_cores) { + # Wrapper around GetCondProb. Given a set of reports, cohorts, map and + # parameters m, p*, and q*, it first computes bit indices by cohort, and + # then applies GetCondProb individually to each report. + # + # Args: + # reports: RAPPOR reports as a list of bit arrays + # cohorts: cohorts corresponding to these reports as a list + # map: map file + # m, pstar, qstar: standard params computed from from rappor parameters + # marginal: list containing marginal estimates (output of Decode) + # prob_other: vector of length k, indicating how often each bit in the + # Bloom filter was set by a string in the "other" category. + # + # Returns: + # Conditional probability of all reports given each of the strings in + # marginal$string + + # Get bit indices that are set per candidate per cohort + bit_indices_by_cohort <- lapply(1:m, function(cohort) { + map_for_cohort <- map$map_by_cohort[[cohort]] + # Find the bits set by the candidate strings + bit_indices <- lapply(marginal$string, function(x) { + which(map_for_cohort[, x]) + }) + bit_indices + }) + + # Apply GetCondProb over all reports + cond_report_dist <- mclapply(seq(length(reports)), function(i) { + cohort <- cohorts[i] + #Log('Report %d, cohort %d', i, cohort) + bit_indices <- bit_indices_by_cohort[[cohort]] + GetCondProb(reports[[i]], pstar, qstar, bit_indices, + prob_other = prob_other[[cohort]]) + }, mc.cores = num_cores) + cond_report_dist +} + + +GetCondProb <- function(report, pstar, qstar, bit_indices, prob_other = NULL) { + # Given the observed bit array, estimate P(report | true value). + # Probabilities are estimated for all truth values. + # + # Args: + # report: A single observed RAPPOR report (binary vector of length k). + # params: RAPPOR parameters. + # bit_indices: list with one entry for each candidate. Each entry is an + # integer vector of length h, specifying which bits are set for the + # candidate in the report's cohort. + # prob_other: vector of length k, indicating how often each bit in the + # Bloom filter was set by a string in the "other" category. + # + # Returns: + # Conditional probability of report given each of the strings in + # candidate_strings + ones <- sum(report) + zeros <- length(report) - ones + probs <- ifelse(report == 1, pstar, 1 - pstar) + + # Find the likelihood of report given each candidate string + prob_obs_vals <- sapply(bit_indices, function(x) { + prod(c(probs[-x], ifelse(report[x] == 1, qstar, 1 - qstar))) + }) + + # Account for the "other" category + if (!is.null(prob_other)) { + prob_other <- prod(c(prob_other[which(report == 1)], + (1 - prob_other)[which(report == 0)])) + c(prob_obs_vals, prob_other) + } else { + prob_obs_vals + } +} + +UpdatePij <- function(pij, cond_prob) { + # Update the probability matrix based on the EM algorithm. + # + # Args: + # pij: conditional distribution of x (vector) + # cond_prob: conditional distribution computed previously + # + # Returns: + # Updated pijs from em algorithm (maximization) + + # NOTE: Not using mclapply here because we have a faster C++ implementation. + # mclapply spawns multiple processes, and each process can take up 3 GB+ or 5 + # GB+ of memory. + wcp <- lapply(cond_prob, function(x) { + z <- x * pij + z <- z / sum(z) + z[is.nan(z)] <- 0 + z + }) + Reduce("+", wcp) / length(wcp) +} + +ComputeVar <- function(cond_prob, est) { + # Computes the variance of the estimated pij's. + # + # Args: + # cond_prob: conditional distribution computed previously + # est: estimated pij's + # + # Returns: + # Variance of the estimated pij's + + inform <- Reduce("+", lapply(cond_prob, function(x) { + (outer(as.vector(x), as.vector(x))) / (sum(x * est))^2 + })) + var_cov <- solve(inform) + sd <- matrix(sqrt(diag(var_cov)), dim(cond_prob[[1]])) + list(var_cov = var_cov, sd = sd, inform = inform) +} + +EM <- function(cond_prob, starting_pij = NULL, estimate_var = FALSE, + max_em_iters = 1000, epsilon = 10^-6, verbose = FALSE) { + # Performs estimation. + # + # Args: + # cond_prob: conditional distribution computed previously + # starting_pij: estimated pij's + # estimate_var: flags whether we should estimate the variance + # of our computed distribution + # max_em_iters: maximum number of EM iterations + # epsilon: convergence parameter + # verbose: flags whether to display error data + # + # Returns: + # Estimated pij's, variance, error params + + pij <- list() + state_space <- dim(cond_prob[[1]]) + if (is.null(starting_pij)) { + pij[[1]] <- array(1 / prod(state_space), state_space) + } else { + pij[[1]] <- starting_pij + } + + i <- 0 # visible outside loop + if (nrow(pij[[1]]) > 0) { + # Run EM + for (i in 1:max_em_iters) { + pij[[i + 1]] <- UpdatePij(pij[[i]], cond_prob) + dif <- max(abs(pij[[i + 1]] - pij[[i]])) + if (dif < epsilon) { + break + } + Log('EM iteration %d, dif = %e', i, dif) + } + } + # Compute the variance of the estimate. + est <- pij[[length(pij)]] + if (estimate_var) { + var_cov <- ComputeVar(cond_prob, est) + sd <- var_cov$sd + inform <- var_cov$inform + var_cov <- var_cov$var_cov + } else { + var_cov <- NULL + inform <- NULL + sd <- NULL + } + list(est = est, sd = sd, var_cov = var_cov, hist = pij, num_em_iters = i) +} + +TestIndependence <- function(est, inform) { + # Tests the degree of independence between variables. + # + # Args: + # est: esimated pij values + # inform: information matrix + # + # Returns: + # Chi-squared statistic for whether two variables are independent + + expec <- outer(apply(est, 1, sum), apply(est, 2, sum)) + diffs <- matrix(est - expec, ncol = 1) + stat <- t(diffs) %*% inform %*% diffs + df <- (nrow(est) - 1) * (ncol(est) - 1) + list(stat = stat, pval = pchisq(stat, df, lower = FALSE)) +} + +UpdateJointConditional <- function(cond_report_dist, joint_conditional = NULL) { + # Updates the joint conditional distribution of d variables, where + # num_variables is chosen by the client. Since variables are conditionally + # independent of one another, this is basically an outer product. + # + # Args: + # joint_conditional: The current state of the joint conditional + # distribution. This is a list with as many elements as there + # are reports. + # cond_report_dist: The conditional distribution of variable x, which will + # be outer-producted with the current joint conditional. + # + # Returns: + # A list of same length as joint_conditional containing the joint + # conditional distribution of all variables. If I want + # P(X'=x',Y=y'|X=x,Y=y), I will look at + # joint_conditional[x,x',y,y']. + + if (is.null(joint_conditional)) { + lapply(cond_report_dist, function(x) array(x)) + } else { + mapply("outer", joint_conditional, cond_report_dist, + SIMPLIFY = FALSE) + } +} + +ComputeDistributionEM <- function(reports, report_cohorts, maps, + ignore_other = FALSE, + params = NULL, + params_list = NULL, + marginals = NULL, + estimate_var = FALSE, + num_cores = 10, + em_iter_func = EM, + max_em_iters = 1000) { + # Computes the distribution of num_variables variables, where + # num_variables is chosen by the client, using the EM algorithm. + # + # Args: + # reports: A list of num_variables elements, each a 2-dimensional array + # containing the counts of each bin for each report + # report_cohorts: A num_variables-element list; the ith element is an array + # containing the cohort of jth report for ith variable. + # maps: A num_variables-element list containing the map for each variable + # ignore_other: A boolean describing whether to compute the "other" category + # params: RAPPOR encoding parameters. If set, all variables are assumed to + # be encoded with these parameters. + # params_list: A list of num_variables elements, each of which is the + # RAPPOR encoding parameters for a variable (a list itself). If set, + # it must be the same length as 'reports'. + # marginals: List of estimated marginals for each variable + # estimate_var: A flag telling whether to estimate the variance. + # em_iter_func: Function that implements the iterative EM algorithm. + + # Handle the case that the client wants to find the joint distribution of too + # many variables. + num_variables <- length(reports) + + if (is.null(params) && is.null(params_list)) { + stop("Either params or params_list must be passed") + } + + Log('Computing joint conditional') + + # Compute the counts for each variable and then do conditionals. + joint_conditional = NULL + found_strings <- list() + + for (j in (1:num_variables)) { + Log('Processing var %d', j) + + var_report <- reports[[j]] + var_cohort <- report_cohorts[[j]] + var_map <- maps[[j]] + if (!is.null(params)) { + var_params <- params + } else { + var_params <- params_list[[j]] + } + + var_counts <- NULL + if (is.null(marginals)) { + Log('\tSumming bits to gets observed counts') + var_counts <- ComputeCounts(var_report, var_cohort, var_params) + + Log('\tDecoding marginal') + marginal <- Decode(var_counts, var_map$all_cohorts_map, var_params, + quiet = TRUE)$fit + Log('\tMarginal for var %d has %d values:', j, nrow(marginal)) + print(marginal[, c('estimate', 'proportion')]) # rownames are the string + cat('\n') + + if (nrow(marginal) == 0) { + Log('ERROR: Nothing decoded for variable %d', j) + return (NULL) + } + } else { + marginal <- marginals[[j]] + } + found_strings[[j]] <- marginal$string + + p <- var_params$p + q <- var_params$q + f <- var_params$f + # pstar and qstar needed to compute other probabilities as well as for + # inputs to GetCondProb{Boolean, String}Reports subsequently + pstar <- (1 - f / 2) * p + (f / 2) * q + qstar <- (1 - f / 2) * q + (f / 2) * p + k <- var_params$k + + # Ignore other probability if either ignore_other is set or k == 1 + # (Boolean RAPPOR) + if (ignore_other || (k == 1)) { + prob_other <- vector(mode = "list", length = var_params$m) + } else { + # Compute the probability of the "other" category + if (is.null(var_counts)) { + var_counts <- ComputeCounts(var_report, var_cohort, var_params) + } + prob_other <- GetOtherProbs(var_counts, var_map$map_by_cohort, marginal, + var_params, pstar, qstar) + found_strings[[j]] <- c(found_strings[[j]], "Other") + } + + # Get the joint conditional distribution + Log('\tGetCondProb for each report (%d cores)', num_cores) + + # TODO(pseudorandom): check RAPPOR type more systematically instead of by + # checking if k == 1 + if (k == 1) { + cond_report_dist <- GetCondProbBooleanReports(var_report, pstar, qstar, + num_cores) + } else { + cond_report_dist <- GetCondProbStringReports(var_report, + var_cohort, var_map, var_params$m, pstar, qstar, + marginal, prob_other, num_cores) + } + + Log('\tUpdateJointConditional') + + # Update the joint conditional distribution of all variables + joint_conditional <- UpdateJointConditional(cond_report_dist, + joint_conditional) + } + + N <- length(joint_conditional) + dimensions <- dim(joint_conditional[[1]]) + # e.g. 2 x 3 + dimensions_str <- paste(dimensions, collapse = ' x ') + total_entries <- prod(c(N, dimensions)) + + Log('Starting EM with N = %d matrices of size %s (%d entries)', + N, dimensions_str, total_entries) + + start_time <- proc.time()[['elapsed']] + + # Run expectation maximization to find joint distribution + em <- em_iter_func(joint_conditional, max_em_iters=max_em_iters, + epsilon = 10 ^ -6, verbose = FALSE, + estimate_var = estimate_var) + + em_elapsed_time <- proc.time()[['elapsed']] - start_time + + dimnames(em$est) <- found_strings + # Return results in a usable format + list(fit = em$est, + sd = em$sd, + em_elapsed_time = em_elapsed_time, + num_em_iters = em$num_em_iters, + # This last field is implementation-specific; it can be used for + # interactive debugging. + em = em) +} |