aboutsummaryrefslogtreecommitdiff
path: root/pipeline/combine_status.py
diff options
context:
space:
mode:
Diffstat (limited to 'pipeline/combine_status.py')
-rwxr-xr-xpipeline/combine_status.py298
1 files changed, 298 insertions, 0 deletions
diff --git a/pipeline/combine_status.py b/pipeline/combine_status.py
new file mode 100755
index 0000000..4fbb36a
--- /dev/null
+++ b/pipeline/combine_status.py
@@ -0,0 +1,298 @@
+#!/usr/bin/python
+"""Summarize the results of many RAPPOR analysis runs.
+
+Takes a list of STATUS.txt files on stdin, and reads the corresponding spec.txt
+and log.txt files. Writes a CSV to stdout. Row key is (metric, date).
+"""
+
+import collections
+import csv
+import json
+import os
+import re
+import sys
+
+
+# Parse bash 'time' output:
+# real 0m11.578s
+
+# TODO: Parse the time from metrics.json instead.
+TIMING_RE = re.compile(
+ r'real \s+ (\d+) m ([\d.]+) s', re.VERBOSE)
+
+# TODO: Could have decode-dist and decode-assoc output the PID?
+PID_RE = re.compile(
+ r'write_pid.py: PID (\d+)') # not VERBOSE, spaces are literal
+
+
+def ParseMemCsv(f):
+ """Compute summary stats for memory.
+
+ vm5_peak_kib -> max(vm_peak_kib) # over 5 second intervals. Since it uses
+ the kernel, it's accurate except for takes that spike in their last 4
+ seconds.
+
+ vm5_mean_kib -> mean(vm_size_kib) # over 5 second intervals
+ """
+ peak_by_pid = collections.defaultdict(list)
+ size_by_pid = collections.defaultdict(list)
+
+ # Parse columns we care about, by PID
+ c = csv.reader(f)
+ for i, row in enumerate(c):
+ if i == 0:
+ continue # skip header
+ # looks like timestamp, pid, then (rss, peak, size)
+ _, pid, _, peak, size = row
+ if peak != '':
+ peak_by_pid[pid].append(int(peak))
+ if size != '':
+ size_by_pid[pid].append(int(size))
+
+ mem_by_pid = {}
+
+ # Now compute summaries
+ pids = peak_by_pid.keys()
+ for pid in pids:
+ peaks = peak_by_pid[pid]
+ vm5_peak_kib = max(peaks)
+
+ sizes = size_by_pid[pid]
+ vm5_mean_kib = sum(sizes) / len(sizes)
+
+ mem_by_pid[pid] = (vm5_peak_kib, vm5_mean_kib)
+
+ return mem_by_pid
+
+
+def CheckJobId(job_id, parts):
+ """Sanity check for date or smoke test."""
+ if not job_id.startswith('201') and not job_id.startswith('smoke'):
+ raise RuntimeError(
+ "Expected job ID to start with '201' or 'smoke': got %r (%s)" %
+ (job_id, parts))
+
+
+def ReadStatus(f):
+ status_line = f.readline().strip()
+ return status_line.split()[0] # OK, TIMEOUT, FAIL
+
+
+def CombineDistTaskStatus(stdin, c_out, mem_by_pid):
+ """Read status task paths from stdin, write CSV summary to c_out'."""
+
+ #util.log('%s', mem_by_pid)
+
+ # Parses:
+ # - input path for metric name and date
+ # - spec.txt for task params
+ # - STATUS.txt for task success/failure
+ # - metrics.json for output metrics
+ # - log.txt for timing, if it ran to completion
+ # - and for structured data
+ # - join with mem by PID
+
+ header = (
+ 'job_id', 'params_file', 'map_file',
+ 'metric', 'date',
+ 'vm5_peak_kib', 'vm5_mean_kib', # set when not skipped
+ 'seconds', 'status',
+ # only set when OK
+ 'num_reports', 'num_rappor', 'allocated_mass',
+ # only set when failed
+ 'fail_reason')
+ c_out.writerow(header)
+
+ for line in stdin:
+ #
+ # Receive a STATUS.txt path on each line of stdin, and parse it.
+ #
+ status_path = line.strip()
+
+ with open(status_path) as f:
+ status = ReadStatus(f)
+
+ # Path should look like this:
+ # ~/rappor/cron/2015-05-20__19-22-01/raw/Settings.NewTabPage/2015-05-19/STATUS.txt
+ parts = status_path.split('/')
+ job_id = parts[-5]
+ CheckJobId(job_id, parts)
+
+ #
+ # Parse the job spec
+ #
+ result_dir = os.path.dirname(status_path)
+ spec_file = os.path.join(result_dir, 'spec.txt')
+ with open(spec_file) as f:
+ spec_line = f.readline()
+ # See backfill.sh analyze-one for the order of these 7 fields.
+ # There are 3 job constants on the front.
+ (num_reports, metric_name, date, counts_path, params_path,
+ map_path, _) = spec_line.split()
+
+ # NOTE: These are all constant per metric. Could have another CSV and
+ # join. But denormalizing is OK for now.
+ params_file = os.path.basename(params_path)
+ map_file = os.path.basename(map_path)
+
+ # remove extension
+ params_file, _ = os.path.splitext(params_file)
+ map_file, _ = os.path.splitext(map_file)
+
+ #
+ # Read the log
+ #
+ log_file = os.path.join(result_dir, 'log.txt')
+ with open(log_file) as f:
+ lines = f.readlines()
+
+ # Search lines in reverse order for total time. It could have output from
+ # multiple 'time' statements, and we want the last one.
+ seconds = None # for skipped
+ for i in xrange(len(lines) - 1, -1, -1):
+ # TODO: Parse the R timing too. Could use LOG_RECORD_RE.
+ m = TIMING_RE.search(lines[i])
+ if m:
+ min_part, sec_part = m.groups()
+ seconds = float(min_part) * 60 + float(sec_part)
+ break
+
+ # Extract stack trace
+ if status == 'FAIL':
+ # Stack trace looks like: "Calls: main -> RunOne ..."
+ fail_reason = ''.join(line.strip() for line in lines if 'Calls' in line)
+ else:
+ fail_reason = None
+
+ # Extract PID and join with memory results
+ pid = None
+ vm5_peak_kib = None
+ vm5_mean_kib = None
+ if mem_by_pid:
+ for line in lines:
+ m = PID_RE.match(line)
+ if m:
+ pid = m.group(1)
+ # Could the PID not exist if the process was super short was less
+ # than 5 seconds?
+ try:
+ vm5_peak_kib, vm5_mean_kib = mem_by_pid[pid]
+ except KeyError: # sometimes we don't add mem-track on the front
+ vm5_peak_kib, vm5_mean_kib = None, None
+ break
+ else:
+ pass # we weren't passed memory.csv
+
+ #
+ # Read the metrics
+ #
+ metrics = {}
+ metrics_file = os.path.join(result_dir, 'metrics.json')
+ if os.path.isfile(metrics_file):
+ with open(metrics_file) as f:
+ metrics = json.load(f)
+
+ num_rappor = metrics.get('num_detected')
+ allocated_mass = metrics.get('allocated_mass')
+
+ # Construct and write row
+ row = (
+ job_id, params_file, map_file,
+ metric_name, date,
+ vm5_peak_kib, vm5_mean_kib,
+ seconds, status,
+ num_reports, num_rappor, allocated_mass,
+ fail_reason)
+
+ c_out.writerow(row)
+
+
+def CombineAssocTaskStatus(stdin, c_out):
+ """Read status task paths from stdin, write CSV summary to c_out'."""
+
+ header = (
+ 'job_id', 'metric', 'date', 'status', 'num_reports',
+ 'total_elapsed_seconds', 'em_elapsed_seconds', 'var1', 'var2', 'd1',
+ 'd2')
+
+ c_out.writerow(header)
+
+ for line in stdin:
+ status_path = line.strip()
+
+ with open(status_path) as f:
+ status = ReadStatus(f)
+
+ parts = status_path.split('/')
+ job_id = parts[-6]
+ CheckJobId(job_id, parts)
+
+ #
+ # Parse the job spec
+ #
+ result_dir = os.path.dirname(status_path)
+ spec_file = os.path.join(result_dir, 'assoc-spec.txt')
+ with open(spec_file) as f:
+ spec_line = f.readline()
+ # See backfill.sh analyze-one for the order of these 7 fields.
+ # There are 3 job constants on the front.
+
+ # 5 job params
+ (_, _, _, _, _,
+ dummy_num_reports, metric_name, date, reports, var1, var2, map1,
+ output_dir) = spec_line.split()
+
+ #
+ # Parse decode-assoc metrics
+ #
+ metrics = {}
+ metrics_file = os.path.join(result_dir, 'assoc-metrics.json')
+ if os.path.isfile(metrics_file):
+ with open(metrics_file) as f:
+ metrics = json.load(f)
+
+ # After we run it we have the actual number of reports
+ num_reports = metrics.get('num_reports')
+ total_elapsed_seconds = metrics.get('total_elapsed_time')
+ em_elapsed_seconds = metrics.get('em_elapsed_time')
+ estimate_dimensions = metrics.get('estimate_dimensions')
+ if estimate_dimensions:
+ d1, d2 = estimate_dimensions
+ else:
+ d1, d2 = (0, 0) # unknown
+
+ row = (
+ job_id, metric_name, date, status, num_reports, total_elapsed_seconds,
+ em_elapsed_seconds, var1, var2, d1, d2)
+ c_out.writerow(row)
+
+
+def main(argv):
+ action = argv[1]
+
+ try:
+ mem_csv = argv[2]
+ except IndexError:
+ mem_by_pid = None
+ else:
+ with open(mem_csv) as f:
+ mem_by_pid = ParseMemCsv(f)
+
+ if action == 'dist':
+ c_out = csv.writer(sys.stdout)
+ CombineDistTaskStatus(sys.stdin, c_out, mem_by_pid)
+
+ elif action == 'assoc':
+ c_out = csv.writer(sys.stdout)
+ CombineAssocTaskStatus(sys.stdin, c_out)
+
+ else:
+ raise RuntimeError('Invalid action %r' % action)
+
+
+if __name__ == '__main__':
+ try:
+ main(sys.argv)
+ except RuntimeError, e:
+ print >>sys.stderr, 'FATAL: %s' % e
+ sys.exit(1)