- erf_pipe.send(None)
- stretch_pipe.send(None)
- if not stretch_pipe.poll(1.0):
- raise RuntimeError("Stretch worker did not finish!")
- result_or_traceback = stretch_pipe.recv()
- try:
- (stretch_avg, stretch_stdev, stretch_bias_avg,
- stretch_bias_cov, debug_list, _) = result_or_traceback
- except ValueError:
- raise RuntimeError(
- "Stretch worker failed with the following traceback:\n{tr}"
- .format(tr=result_or_traceback))
- logging.info("Logs from stretch worker:")
- for message in debug_list:
- logging.debug(message)
- if not erf_pipe.poll(1.0):
- raise RuntimeError("Erf worker did not finish!")
- result_or_traceback = erf_pipe.recv()
- try:
- (erf_avg, erf_stdev, erf_bias_avg,
- erf_bias_cov, debug_list, _) = result_or_traceback
- except ValueError:
- raise RuntimeError(
- "Erf worker failed with the following traceback:\n{tr}"
- .format(tr=result_or_traceback))
- logging.info("Logs from erf worker:")
- for message in debug_list:
- logging.debug(message)
+ def stop_computing(name, pipe):
+ """Just a block of code to be used for each worker.
+
+ Send stop object, poll for result, then either
+ unpack response, log messages and return, or raise traceback.
+
+ TODO: Define class/structure for the return value?
+
+ :param name: Human friendly worker identifier for logging purposes.
+ :param pipe: Boss end of connection towards worker to stop.
+ :type name: str
+ :type pipe: multiprocessing.Connection
+ :returns: Computed value tracker, actual focus tracker,
+ and number of samples used for this iteration.
+ :rtype: 3-tuple of tracker, tracker and int
+ """
+ pipe.send(None)
+ if not pipe.poll(10.0):
+ raise RuntimeError(
+ "Worker {name} did not finish!".format(name=name))
+ result_or_traceback = pipe.recv()
+ try:
+ value_tracker, focus_tracker, debug_list, trace_list, sampls = (
+ result_or_traceback)
+ except ValueError:
+ raise RuntimeError(
+ "Worker {name} failed with the following traceback:\n{tr}"
+ .format(name=name, tr=result_or_traceback))
+ logging.info("Logs from worker %(name)r:", {"name": name})
+ for message in debug_list:
+ logging.info(message)
+ for message in trace_list:
+ logging.debug(message)
+ logging.debug("trackers: value %(val)r focus %(foc)r", {
+ "val": value_tracker, "foc": focus_tracker})
+ return _PartialResult(value_tracker, focus_tracker, sampls)
+
+ stretch_result = stop_computing("stretch", stretch_pipe)
+ erf_result = stop_computing("erf", erf_pipe)
+ result = PLRsearch._get_result(measurement, stretch_result, erf_result)
+ logging.info(
+ "measure_and_compute finished with trial result %(res)r "
+ "avg %(avg)r stdev %(stdev)r stretch %(a1)r erf %(a2)r "
+ "new trackers %(nt)r old trackers %(ot)r stretch samples %(ss)r "
+ "erf samples %(es)r",
+ {"res": result.measurement,
+ "avg": result.avg, "stdev": result.stdev,
+ "a1": result.stretch_exp_avg, "a2": result.erf_exp_avg,
+ "nt": result.trackers, "ot": old_trackers,
+ "ss": stretch_result.samples, "es": erf_result.samples})
+ return result
+
+ @staticmethod
+ def _get_result(measurement, stretch_result, erf_result):
+ """Collate results from measure_and_compute"""
+ stretch_avg = stretch_result.value_tracker.average
+ erf_avg = erf_result.value_tracker.average
+ # TODO: Take into account secondary stats.
+ stretch_stdev = math.exp(stretch_result.value_tracker.log_variance / 2)
+ erf_stdev = math.exp(erf_result.value_tracker.log_variance / 2)