- erf_pipe.send(None)
- stretch_pipe.send(None)
- if not stretch_pipe.poll(1.0):
- raise RuntimeError("Stretch worker did not finish!")
- result_or_traceback = stretch_pipe.recv()
- try:
- (stretch_avg, stretch_stdev, stretch_bias_avg,
- stretch_bias_cov, debug_list, _) = result_or_traceback
- except ValueError:
- raise RuntimeError(
- "Stretch worker failed with the following traceback:\n{tr}"
- .format(tr=result_or_traceback))
- logging.info("Logs from stretch worker:")
- for message in debug_list:
- logging.debug(message)
- if not erf_pipe.poll(1.0):
- raise RuntimeError("Erf worker did not finish!")
- result_or_traceback = erf_pipe.recv()
- try:
- (erf_avg, erf_stdev, erf_bias_avg,
- erf_bias_cov, debug_list, _) = result_or_traceback
- except ValueError:
- raise RuntimeError(
- "Erf worker failed with the following traceback:\n{tr}"
- .format(tr=result_or_traceback))
- logging.info("Logs from erf worker:")
- for message in debug_list:
- logging.debug(message)
- avg = math.exp((stretch_avg + erf_avg) / 2.0)
- var = (stretch_stdev * stretch_stdev + erf_stdev * erf_stdev) / 2.0
- var += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
- stdev = avg * math.sqrt(var)
- integrator_data = (
- stretch_bias_avg, erf_bias_avg, stretch_bias_cov, erf_bias_cov)
- return (
- measurement, avg, stdev, math.exp(stretch_avg),
- math.exp(erf_avg), integrator_data)
+ def stop_computing(name, pipe):
+ """Just a block of code to be used for each worker.
+
+ Send stop object, poll for result, then either
+ unpack response, log messages and return, or raise traceback.
+
+ TODO: Define class/structure for the return value?
+
+ :param name: Human friendly worker identifier for logging purposes.
+ :param pipe: Boss end of connection towards worker to stop.
+ :type name: str
+ :type pipe: multiprocessing.Connection
+ :returns: Computed value tracker, actual focus tracker,
+ and number of samples used for this iteration.
+ :rtype: _PartialResult
+ """
+ # If worker encountered an exception, we get it in the recv below,
+ # but send will report a broken pipe.
+ # EAFP says we should ignore the error (instead of polling first).
+ # https://devblogs.microsoft.com/python
+ # /idiomatic-python-eafp-versus-lbyl/
+ try:
+ pipe.send(None)
+ except BrokenPipeError:
+ pass
+ if not pipe.poll(10.0):
+ raise RuntimeError(f"Worker {name} did not finish!")
+ result_or_traceback = pipe.recv()
+ try:
+ value_tracker, focus_tracker, debug_list, trace_list, sampls = (
+ result_or_traceback
+ )
+ except ValueError:
+ raise RuntimeError(
+ f"Worker {name} failed with the following traceback:\n"
+ f"{result_or_traceback}"
+ )
+ logging.info(f"Logs from worker {name!r}:")
+ for message in debug_list:
+ logging.info(message)
+ for message in trace_list:
+ logging.debug(message)
+ logging.debug(
+ f"trackers: value {value_tracker!r} focus {focus_tracker!r}"
+ )
+ return _PartialResult(value_tracker, focus_tracker, sampls)
+
+ stretch_result = stop_computing(u"stretch", stretch_pipe)
+ erf_result = stop_computing(u"erf", erf_pipe)
+ result = PLRsearch._get_result(measurement, stretch_result, erf_result)
+ logging.info(
+ f"measure_and_compute finished with trial result "
+ f"{result.measurement!r} avg {result.avg!r} stdev {result.stdev!r} "
+ f"stretch {result.stretch_exp_avg!r} erf {result.erf_exp_avg!r} "
+ f"new trackers {result.trackers!r} old trackers {old_trackers!r} "
+ f"stretch samples {stretch_result.samples!r} erf samples "
+ f"{erf_result.samples!r}"
+ )
+ return result
+
+ @staticmethod
+ def _get_result(measurement, stretch_result, erf_result):
+ """Process and collate results from measure_and_compute.
+
+ Turn logarithm based values to exponential ones,
+ combine averages and stdevs of two fitting functions into a whole.
+
+ :param measurement: The trial measurement obtained during computation.
+ :param stretch_result: Computation output for stretch fitting function.
+ :param erf_result: Computation output for erf fitting function.
+ :type measurement: ReceiveRateMeasurement
+ :type stretch_result: _PartialResult
+ :type erf_result: _PartialResult
+ :returns: Combined results.
+ :rtype: _ComputeResult
+ """
+ stretch_avg = stretch_result.value_tracker.average
+ erf_avg = erf_result.value_tracker.average
+ stretch_var = stretch_result.value_tracker.get_pessimistic_variance()
+ erf_var = erf_result.value_tracker.get_pessimistic_variance()
+ avg_log = (stretch_avg + erf_avg) / 2.0
+ var_log = (stretch_var + erf_var) / 2.0
+ var_log += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
+ stdev_log = math.sqrt(var_log)
+ low, upp = math.exp(avg_log - stdev_log), math.exp(avg_log + stdev_log)
+ avg = (low + upp) / 2
+ stdev = avg - low
+ trackers = (stretch_result.focus_tracker, erf_result.focus_tracker)
+ sea = math.exp(stretch_avg)
+ eea = math.exp(erf_avg)
+ return _ComputeResult(measurement, avg, stdev, sea, eea, trackers)
+
+
+# Named tuples, for multiple local variables to be passed as return value.
+_PartialResult = namedtuple(
+ u"_PartialResult", u"value_tracker focus_tracker samples"
+)
+"""Two stat trackers and sample counter.
+
+:param value_tracker: Tracker for the value (critical load) being integrated.
+:param focus_tracker: Tracker for focusing integration inputs (sample points).
+:param samples: How many samples were used for the computation.
+:type value_tracker: stat_trackers.ScalarDualStatTracker
+:type focus_tracker: stat_trackers.VectorStatTracker
+:type samples: int
+"""
+
+_ComputeResult = namedtuple(
+ u"_ComputeResult",
+ u"measurement avg stdev stretch_exp_avg erf_exp_avg trackers"
+)
+"""Measurement, 4 computation result values, pair of trackers.
+
+:param measurement: The trial measurement result obtained during computation.
+:param avg: Overall average of critical rate estimate.
+:param stdev: Overall standard deviation of critical rate estimate.
+:param stretch_exp_avg: Stretch fitting function estimate average exponentiated.
+:param erf_exp_avg: Erf fitting function estimate average, exponentiated.
+:param trackers: Pair of focus trackers to start next iteration with.
+:type measurement: ReceiveRateMeasurement
+:type avg: float
+:type stdev: float
+:type stretch_exp_avg: float
+:type erf_exp_avg: float
+:type trackers: 2-tuple of stat_trackers.VectorStatTracker
+"""