- erf_pipe.send(None)
- stretch_pipe.send(None)
- if not stretch_pipe.poll(1.0):
- raise RuntimeError("Stretch worker did not finish!")
- result_or_traceback = stretch_pipe.recv()
- try:
- (stretch_avg, stretch_stdev, stretch_bias_avg,
- stretch_bias_cov, debug_list, _) = result_or_traceback
- except ValueError:
- raise RuntimeError(
- "Stretch worker failed with the following traceback:\n{tr}"
- .format(tr=result_or_traceback))
- logging.info("Logs from stretch worker:")
- for message in debug_list:
- logging.debug(message)
- if not erf_pipe.poll(1.0):
- raise RuntimeError("Erf worker did not finish!")
- result_or_traceback = erf_pipe.recv()
- try:
- (erf_avg, erf_stdev, erf_bias_avg,
- erf_bias_cov, debug_list, _) = result_or_traceback
- except ValueError:
- raise RuntimeError(
- "Erf worker failed with the following traceback:\n{tr}"
- .format(tr=result_or_traceback))
- logging.info("Logs from erf worker:")
- for message in debug_list:
- logging.debug(message)
+ def stop_computing(name, pipe):
+ """Just a block of code to be used for each worker.
+
+ Send stop object, poll for result, then either
+ unpack response, log messages and return, or raise traceback.
+
+ TODO: Define class/structure for the return value?
+
+ :param name: Human friendly worker identifier for logging purposes.
+ :param pipe: Boss end of connection towards worker to stop.
+ :type name: str
+ :type pipe: multiprocessing.Connection
+ :returns: Computed value tracker, actual focus tracker,
+ and number of samples used for this iteration.
+ :rtype: 3-tuple of tracker, tracker and int
+ """
+ pipe.send(None)
+ if not pipe.poll(10.0):
+ raise RuntimeError(
+ "Worker {name} did not finish!".format(name=name))
+ result_or_traceback = pipe.recv()
+ try:
+ value_tracker, focus_tracker, debug_list, trace_list, sampls = (
+ result_or_traceback)
+ except ValueError:
+ raise RuntimeError(
+ "Worker {name} failed with the following traceback:\n{tr}"
+ .format(name=name, tr=result_or_traceback))
+ logging.info("Logs from worker %(name)r:", {"name": name})
+ for message in debug_list:
+ logging.info(message)
+ for message in trace_list:
+ logging.debug(message)
+ logging.debug("trackers: value %(val)r focus %(foc)r", {
+ "val": value_tracker, "foc": focus_tracker})
+ return value_tracker, focus_tracker, sampls
+ stretch_value_tracker, stretch_focus_tracker, stretch_samples = (
+ stop_computing("stretch", stretch_pipe))
+ erf_value_tracker, erf_focus_tracker, erf_samples = (
+ stop_computing("erf", erf_pipe))
+ stretch_avg = stretch_value_tracker.average
+ erf_avg = erf_value_tracker.average
+ # TODO: Take into account secondary stats.
+ stretch_stdev = math.exp(stretch_value_tracker.log_variance / 2)
+ erf_stdev = math.exp(erf_value_tracker.log_variance / 2)