Dpdk in VM: Increase num_mbufs
[csit.git] / resources / libraries / python / PLRsearch / PLRsearch.py
index db870c5..cdfd308 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2019 Cisco and/or its affiliates.
+# Copyright (c) 2021 Cisco and/or its affiliates.
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
@@ -18,19 +18,21 @@ import math
 import multiprocessing
 import time
 
 import multiprocessing
 import time
 
+from collections import namedtuple
+
 import dill
 import dill
-# TODO: Inform pylint about scipy (of correct version) being available.
+
 from scipy.special import erfcx, erfc
 
 # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
 # then switch to absolute imports within PLRsearch package.
 # Current usage of relative imports is just a short term workaround.
 from scipy.special import erfcx, erfc
 
 # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
 # then switch to absolute imports within PLRsearch package.
 # Current usage of relative imports is just a short term workaround.
-import Integrator  # pylint: disable=relative-import
-from log_plus import log_plus, log_minus  # pylint: disable=relative-import
-import stat_trackers  # pylint: disable=relative-import
+from . import Integrator
+from . import stat_trackers
+from .log_plus import log_plus, log_minus
 
 
 
 
-class PLRsearch(object):
+class PLRsearch:
     """A class to encapsulate data relevant for the search method.
 
     The context is performance testing of packet processing systems.
     """A class to encapsulate data relevant for the search method.
 
     The context is performance testing of packet processing systems.
@@ -41,8 +43,9 @@ class PLRsearch(object):
 
     Two constants are stored as class fields for speed.
 
 
     Two constants are stored as class fields for speed.
 
-    Method othed than search (and than __init__)
+    Method other than search (and than __init__)
     are just internal code structure.
     are just internal code structure.
+
     TODO: Those method names should start with underscore then.
     """
 
     TODO: Those method names should start with underscore then.
     """
 
@@ -51,9 +54,11 @@ class PLRsearch(object):
 
     def __init__(
             self, measurer, trial_duration_per_trial, packet_loss_ratio_target,
 
     def __init__(
             self, measurer, trial_duration_per_trial, packet_loss_ratio_target,
-            trial_number_offset=0, timeout=1800.0, trace_enabled=False):
+            trial_number_offset=0, timeout=7200.0, trace_enabled=False):
         """Store rate measurer and additional parameters.
 
         """Store rate measurer and additional parameters.
 
+        The measurer must never report negative loss count.
+
         TODO: Copy AbstractMeasurer from MLRsearch.
 
         :param measurer: The measurer to call when searching.
         TODO: Copy AbstractMeasurer from MLRsearch.
 
         :param measurer: The measurer to call when searching.
@@ -162,35 +167,37 @@ class PLRsearch(object):
         :type min_rate: float
         :type max_rate: float
         :returns: Average and stdev of critical load estimate.
         :type min_rate: float
         :type max_rate: float
         :returns: Average and stdev of critical load estimate.
-        :rtype: 2-tuple of floats
+        :rtype: 2-tuple of float
         """
         stop_time = time.time() + self.timeout
         min_rate = float(min_rate)
         max_rate = float(max_rate)
         """
         stop_time = time.time() + self.timeout
         min_rate = float(min_rate)
         max_rate = float(max_rate)
-        logging.info("Started search with min_rate %(min)r, max_rate %(max)r",
-                     {"min": min_rate, "max": max_rate})
+        logging.info(
+            f"Started search with min_rate {min_rate!r}, "
+            f"max_rate {max_rate!r}"
+        )
         trial_result_list = list()
         trial_number = self.trial_number_offset
         focus_trackers = (None, None)
         transmit_rate = (min_rate + max_rate) / 2.0
         lossy_loads = [max_rate]
         trial_result_list = list()
         trial_number = self.trial_number_offset
         focus_trackers = (None, None)
         transmit_rate = (min_rate + max_rate) / 2.0
         lossy_loads = [max_rate]
-        zeros = [0, 0]  # Cosecutive zero loss, separately for stretch and erf.
+        zeros = 0  # How many consecutive zero loss results are happening.
         while 1:
             trial_number += 1
         while 1:
             trial_number += 1
-            logging.info("Trial %(number)r", {"number": trial_number})
+            logging.info(f"Trial {trial_number!r}")
             results = self.measure_and_compute(
                 self.trial_duration_per_trial * trial_number, transmit_rate,
             results = self.measure_and_compute(
                 self.trial_duration_per_trial * trial_number, transmit_rate,
-                trial_result_list, min_rate, max_rate, focus_trackers)
+                trial_result_list, min_rate, max_rate, focus_trackers
+            )
             measurement, average, stdev, avg1, avg2, focus_trackers = results
             measurement, average, stdev, avg1, avg2, focus_trackers = results
-            index = trial_number % 2
-            zeros[index] += 1
+            zeros += 1
             # TODO: Ratio of fill rate to drain rate seems to have
             # exponential impact. Make it configurable, or is 4:3 good enough?
             # TODO: Ratio of fill rate to drain rate seems to have
             # exponential impact. Make it configurable, or is 4:3 good enough?
-            if measurement.loss_fraction >= self.packet_loss_ratio_target:
-                for _ in range(4 * zeros[index]):
+            if measurement.loss_ratio >= self.packet_loss_ratio_target:
+                for _ in range(4 * zeros):
                     lossy_loads.append(measurement.target_tr)
             if measurement.loss_count > 0:
                     lossy_loads.append(measurement.target_tr)
             if measurement.loss_count > 0:
-                zeros[index] = 0
+                zeros = 0
             lossy_loads.sort()
             if stop_time <= time.time():
                 return average, stdev
             lossy_loads.sort()
             if stop_time <= time.time():
                 return average, stdev
@@ -198,24 +205,24 @@ class PLRsearch(object):
             if (trial_number - self.trial_number_offset) <= 1:
                 next_load = max_rate
             elif (trial_number - self.trial_number_offset) <= 3:
             if (trial_number - self.trial_number_offset) <= 1:
                 next_load = max_rate
             elif (trial_number - self.trial_number_offset) <= 3:
-                next_load = (measurement.receive_rate / (
+                next_load = (measurement.relative_receive_rate / (
                     1.0 - self.packet_loss_ratio_target))
             else:
                     1.0 - self.packet_loss_ratio_target))
             else:
-                index = (trial_number + 1) % 2
-                next_load = (avg1, avg2)[index]
-                if zeros[index] > 0:
+                next_load = (avg1 + avg2) / 2.0
+                if zeros > 0:
                     if lossy_loads[0] > next_load:
                     if lossy_loads[0] > next_load:
-                        diminisher = math.pow(2.0, 1 - zeros[index])
+                        diminisher = math.pow(2.0, 1 - zeros)
                         next_load = lossy_loads[0] + diminisher * next_load
                         next_load /= (1.0 + diminisher)
                     # On zero measurement, we need to drain obsoleted low losses
                     # even if we did not use them to increase next_load,
                         next_load = lossy_loads[0] + diminisher * next_load
                         next_load /= (1.0 + diminisher)
                     # On zero measurement, we need to drain obsoleted low losses
                     # even if we did not use them to increase next_load,
-                    # in order to get to usable loses with higher load.
+                    # in order to get to usable loses at higher loads.
                     if len(lossy_loads) > 3:
                         lossy_loads = lossy_loads[3:]
                     if len(lossy_loads) > 3:
                         lossy_loads = lossy_loads[3:]
-                logging.debug("Zeros %(z)r orig %(o)r next %(n)r loads %(s)r",
-                              {"z": zeros, "o": (avg1, avg2)[index],
-                               "n": next_load, "s": lossy_loads})
+                logging.debug(
+                    f"Zeros {zeros!r} orig {(avg1 + avg2) / 2.0!r} "
+                    f"next {next_load!r} loads {lossy_loads!r}"
+                )
             transmit_rate = min(max_rate, max(min_rate, next_load))
 
     @staticmethod
             transmit_rate = min(max_rate, max(min_rate, next_load))
 
     @staticmethod
@@ -256,21 +263,22 @@ class PLRsearch(object):
         # TODO: chi is from https://en.wikipedia.org/wiki/Nondimensionalization
         chi = (load - mrr) / spread
         chi0 = -mrr / spread
         # TODO: chi is from https://en.wikipedia.org/wiki/Nondimensionalization
         chi = (load - mrr) / spread
         chi0 = -mrr / spread
-        trace("stretch: load", load)
-        trace("mrr", mrr)
-        trace("spread", spread)
-        trace("chi", chi)
-        trace("chi0", chi0)
+        trace(u"stretch: load", load)
+        trace(u"mrr", mrr)
+        trace(u"spread", spread)
+        trace(u"chi", chi)
+        trace(u"chi0", chi0)
         if chi > 0:
             log_lps = math.log(
         if chi > 0:
             log_lps = math.log(
-                load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread)
-            trace("big loss direct log_lps", log_lps)
+                load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread
+            )
+            trace(u"big loss direct log_lps", log_lps)
         else:
             two_positive = log_plus(chi, 2 * chi0 - log_2)
             two_negative = log_plus(chi0, 2 * chi - log_2)
             if two_positive <= two_negative:
                 log_lps = log_minus(chi, chi0) + log_spread
         else:
             two_positive = log_plus(chi, 2 * chi0 - log_2)
             two_negative = log_plus(chi0, 2 * chi - log_2)
             if two_positive <= two_negative:
                 log_lps = log_minus(chi, chi0) + log_spread
-                trace("small loss crude log_lps", log_lps)
+                trace(u"small loss crude log_lps", log_lps)
                 return log_lps
             two = log_minus(two_positive, two_negative)
             three_positive = log_plus(two_positive, 3 * chi - log_3)
                 return log_lps
             two = log_minus(two_positive, two_negative)
             three_positive = log_plus(two_positive, 3 * chi - log_3)
@@ -278,11 +286,11 @@ class PLRsearch(object):
             three = log_minus(three_positive, three_negative)
             if two == three:
                 log_lps = two + log_spread
             three = log_minus(three_positive, three_negative)
             if two == three:
                 log_lps = two + log_spread
-                trace("small loss approx log_lps", log_lps)
+                trace(u"small loss approx log_lps", log_lps)
             else:
                 log_lps = math.log(log_plus(0, chi) - log_plus(0, chi0))
                 log_lps += log_spread
             else:
                 log_lps = math.log(log_plus(0, chi) - log_plus(0, chi0))
                 log_lps += log_spread
-                trace("small loss direct log_lps", log_lps)
+                trace(u"small loss direct log_lps", log_lps)
         return log_lps
 
     @staticmethod
         return log_lps
 
     @staticmethod
@@ -321,26 +329,26 @@ class PLRsearch(object):
         # TODO: The stretch sign is just to have less minuses. Worth changing?
         chi = (mrr - load) / spread
         chi0 = mrr / spread
         # TODO: The stretch sign is just to have less minuses. Worth changing?
         chi = (mrr - load) / spread
         chi0 = mrr / spread
-        trace("Erf: load", load)
-        trace("mrr", mrr)
-        trace("spread", spread)
-        trace("chi", chi)
-        trace("chi0", chi0)
+        trace(u"Erf: load", load)
+        trace(u"mrr", mrr)
+        trace(u"spread", spread)
+        trace(u"chi", chi)
+        trace(u"chi0", chi0)
         if chi >= -1.0:
         if chi >= -1.0:
-            trace("positive, b roughly bigger than m", None)
+            trace(u"positive, b roughly bigger than m", None)
             if chi > math.exp(10):
                 first = PLRsearch.log_xerfcx_10 + 2 * (math.log(chi) - 10)
             if chi > math.exp(10):
                 first = PLRsearch.log_xerfcx_10 + 2 * (math.log(chi) - 10)
-                trace("approximated first", first)
+                trace(u"approximated first", first)
             else:
                 first = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi))
             else:
                 first = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi))
-                trace("exact first", first)
+                trace(u"exact first", first)
             first -= chi * chi
             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
             second -= chi0 * chi0
             intermediate = log_minus(first, second)
             first -= chi * chi
             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
             second -= chi0 * chi0
             intermediate = log_minus(first, second)
-            trace("first", first)
+            trace(u"first", first)
         else:
         else:
-            trace("negative, b roughly smaller than m", None)
+            trace(u"negative, b roughly smaller than m", None)
             exp_first = PLRsearch.xerfcx_limit + chi * erfcx(-chi)
             exp_first *= math.exp(-chi * chi)
             exp_first -= 2 * chi
             exp_first = PLRsearch.xerfcx_limit + chi * erfcx(-chi)
             exp_first *= math.exp(-chi * chi)
             exp_first -= 2 * chi
@@ -351,11 +359,11 @@ class PLRsearch(object):
             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
             second -= chi0 * chi0
             intermediate = math.log(exp_first - math.exp(second))
             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
             second -= chi0 * chi0
             intermediate = math.log(exp_first - math.exp(second))
-            trace("exp_first", exp_first)
-        trace("second", second)
-        trace("intermediate", intermediate)
+            trace(u"exp_first", exp_first)
+        trace(u"second", second)
+        trace(u"intermediate", intermediate)
         result = intermediate + math.log(spread) - math.log(erfc(-chi0))
         result = intermediate + math.log(spread) - math.log(erfc(-chi0))
-        trace("result", result)
+        trace(u"result", result)
         return result
 
     @staticmethod
         return result
 
     @staticmethod
@@ -386,7 +394,7 @@ class PLRsearch(object):
         :type lfit_func: Function from 3 floats to float.
         :type min_rate: float
         :type max_rate: float
         :type lfit_func: Function from 3 floats to float.
         :type min_rate: float
         :type max_rate: float
-        :type log_lps_target: float
+        :type loss_ratio_target: float
         :type mrr: float
         :type spread: float
         :returns: Load [pps] which achieves the target with given parameters.
         :type mrr: float
         :type spread: float
         :returns: Load [pps] which achieves the target with given parameters.
@@ -398,17 +406,17 @@ class PLRsearch(object):
         loss_ratio = -1
         while loss_ratio != loss_ratio_target:
             rate = (rate_hi + rate_lo) / 2.0
         loss_ratio = -1
         while loss_ratio != loss_ratio_target:
             rate = (rate_hi + rate_lo) / 2.0
-            if rate == rate_hi or rate == rate_lo:
+            if rate in (rate_hi, rate_lo):
                 break
             loss_rate = math.exp(lfit_func(trace, rate, mrr, spread))
             loss_ratio = loss_rate / rate
             if loss_ratio > loss_ratio_target:
                 break
             loss_rate = math.exp(lfit_func(trace, rate, mrr, spread))
             loss_ratio = loss_rate / rate
             if loss_ratio > loss_ratio_target:
-                trace("halving down", rate)
+                trace(u"halving down", rate)
                 rate_hi = rate
             elif loss_ratio < loss_ratio_target:
                 rate_hi = rate
             elif loss_ratio < loss_ratio_target:
-                trace("halving up", rate)
+                trace(u"halving up", rate)
                 rate_lo = rate
                 rate_lo = rate
-        trace("found", rate)
+        trace(u"found", rate)
         return rate
 
     @staticmethod
         return rate
 
     @staticmethod
@@ -429,40 +437,42 @@ class PLRsearch(object):
 
         :param trace: A multiprocessing-friendly logging function (closure).
         :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
 
         :param trace: A multiprocessing-friendly logging function (closure).
         :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
-        :param result_list: List of trial measurement results.
+        :param trial_result_list: List of trial measurement results.
         :param mrr: The mrr parameter for the fitting function.
         :param mrr: The mrr parameter for the fitting function.
-        :param spread: The spread parameter for the fittinmg function.
+        :param spread: The spread parameter for the fitting function.
         :type trace: function (str, object) -> None
         :type lfit_func: Function from 3 floats to float.
         :type trace: function (str, object) -> None
         :type lfit_func: Function from 3 floats to float.
-        :type result_list: list of MLRsearch.ReceiveRateMeasurement
+        :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
         :type mrr: float
         :type spread: float
         :returns: Logarithm of result weight for given function and parameters.
         :rtype: float
         """
         log_likelihood = 0.0
         :type mrr: float
         :type spread: float
         :returns: Logarithm of result weight for given function and parameters.
         :rtype: float
         """
         log_likelihood = 0.0
-        trace("log_weight for mrr", mrr)
-        trace("spread", spread)
+        trace(u"log_weight for mrr", mrr)
+        trace(u"spread", spread)
         for result in trial_result_list:
         for result in trial_result_list:
-            trace("for tr", result.target_tr)
-            trace("lc", result.loss_count)
-            trace("d", result.duration)
-            log_avg_loss_per_second = lfit_func(
-                trace, result.target_tr, mrr, spread)
-            log_avg_loss_per_trial = (
-                log_avg_loss_per_second + math.log(result.duration))
-            # Poisson probability computation works nice for logarithms.
-            log_trial_likelihood = (
-                result.loss_count * log_avg_loss_per_trial
-                - math.exp(log_avg_loss_per_trial))
-            log_trial_likelihood -= math.lgamma(1 + result.loss_count)
+            trace(u"for tr", result.target_tr)
+            trace(u"lc", result.loss_count)
+            trace(u"d", result.duration)
+            # _rel_ values use units of target_tr (transactions per second).
+            log_avg_rel_loss_per_second = lfit_func(
+                trace, result.target_tr, mrr, spread
+            )
+            # _abs_ values use units of loss count (maybe packets).
+            # There can be multiple packets per transaction.
+            log_avg_abs_loss_per_trial = log_avg_rel_loss_per_second + math.log(
+                result.transmit_count / result.target_tr
+            )
+            # Geometric probability computation for logarithms.
+            log_trial_likelihood = log_plus(0.0, -log_avg_abs_loss_per_trial)
+            log_trial_likelihood *= -result.loss_count
+            log_trial_likelihood -= log_plus(0.0, +log_avg_abs_loss_per_trial)
             log_likelihood += log_trial_likelihood
             log_likelihood += log_trial_likelihood
-            trace("avg_loss_per_trial", math.exp(log_avg_loss_per_trial))
-            trace("log_trial_likelihood", log_trial_likelihood)
+            trace(u"avg_loss_per_trial", math.exp(log_avg_abs_loss_per_trial))
+            trace(u"log_trial_likelihood", log_trial_likelihood)
         return log_likelihood
 
         return log_likelihood
 
-    # TODO: Refactor (somehow) so pylint stops complaining about
-    # too many local variables.
     def measure_and_compute(
             self, trial_duration, transmit_rate, trial_result_list,
             min_rate, max_rate, focus_trackers=(None, None), max_samples=None):
     def measure_and_compute(
             self, trial_duration, transmit_rate, trial_result_list,
             min_rate, max_rate, focus_trackers=(None, None), max_samples=None):
@@ -512,15 +522,14 @@ class PLRsearch(object):
         :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker
         :type max_samples: None or int
         :returns: Measurement and computation results.
         :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker
         :type max_samples: None or int
         :returns: Measurement and computation results.
-        :rtype: 6-tuple: ReceiveRateMeasurement, 4 floats, 2-tuple of trackers.
+        :rtype: _ComputeResult
         """
         logging.debug(
         """
         logging.debug(
-            "measure_and_compute started with self %(self)r, trial_duration "
-            + "%(dur)r, transmit_rate %(tr)r, trial_result_list %(trl)r, "
-            + "max_rate %(mr)r, focus_trackers %(track)r, max_samples %(ms)r",
-            {"self": self, "dur": trial_duration, "tr": transmit_rate,
-             "trl": trial_result_list, "mr": max_rate, "track": focus_trackers,
-             "ms": max_samples})
+            f"measure_and_compute started with self {self!r}, trial_duration "
+            f"{trial_duration!r}, transmit_rate {transmit_rate!r}, "
+            f"trial_result_list {trial_result_list!r}, max_rate {max_rate!r}, "
+            f"focus_trackers {focus_trackers!r}, max_samples {max_samples!r}"
+        )
         # Preparation phase.
         dimension = 2
         stretch_focus_tracker, erf_focus_tracker = focus_trackers
         # Preparation phase.
         dimension = 2
         stretch_focus_tracker, erf_focus_tracker = focus_trackers
@@ -531,6 +540,7 @@ class PLRsearch(object):
             erf_focus_tracker = stat_trackers.VectorStatTracker(dimension)
             erf_focus_tracker.unit_reset()
         old_trackers = stretch_focus_tracker.copy(), erf_focus_tracker.copy()
             erf_focus_tracker = stat_trackers.VectorStatTracker(dimension)
             erf_focus_tracker.unit_reset()
         old_trackers = stretch_focus_tracker.copy(), erf_focus_tracker.copy()
+
         def start_computing(fitting_function, focus_tracker):
             """Just a block of code to be used for each fitting function.
 
         def start_computing(fitting_function, focus_tracker):
             """Just a block of code to be used for each fitting function.
 
@@ -538,14 +548,14 @@ class PLRsearch(object):
             start computation, return the boss pipe end.
 
             :param fitting_function: lfit_erf or lfit_stretch.
             start computation, return the boss pipe end.
 
             :param fitting_function: lfit_erf or lfit_stretch.
-            :param bias_avg: Tuple of floats to start searching around.
-            :param bias_cov: Covariance matrix defining initial focus shape.
+            :param focus_tracker: Tracker initialized to speed up the numeric
+                computation.
             :type fitting_function: Function from 3 floats to float.
             :type fitting_function: Function from 3 floats to float.
-            :type bias_avg: 2-tuple of floats
-            :type bias_cov: 2-tuple of 2-tuples of floats
+            :type focus_tracker: None or stat_trackers.VectorStatTracker
             :returns: Boss end of communication pipe.
             :rtype: multiprocessing.Connection
             """
             :returns: Boss end of communication pipe.
             :rtype: multiprocessing.Connection
             """
+
             def value_logweight_func(trace, x_mrr, x_spread):
                 """Return log of critical rate and log of likelihood.
 
             def value_logweight_func(trace, x_mrr, x_spread):
                 """Return log of critical rate and log of likelihood.
 
@@ -580,27 +590,37 @@ class PLRsearch(object):
                 mrr = max_rate * (1.0 / (x_mrr + 1.0) - 0.5) + 1.0
                 spread = math.exp((x_spread + 1.0) / 2.0 * math.log(mrr))
                 logweight = self.log_weight(
                 mrr = max_rate * (1.0 / (x_mrr + 1.0) - 0.5) + 1.0
                 spread = math.exp((x_spread + 1.0) / 2.0 * math.log(mrr))
                 logweight = self.log_weight(
-                    trace, fitting_function, trial_result_list, mrr, spread)
-                value = math.log(self.find_critical_rate(
-                    trace, fitting_function, min_rate, max_rate,
-                    self.packet_loss_ratio_target, mrr, spread))
+                    trace, fitting_function, trial_result_list, mrr, spread
+                )
+                value = math.log(
+                    self.find_critical_rate(
+                        trace, fitting_function, min_rate, max_rate,
+                        self.packet_loss_ratio_target, mrr, spread
+                    )
+                )
                 return value, logweight
                 return value, logweight
+
             dilled_function = dill.dumps(value_logweight_func)
             boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
             dilled_function = dill.dumps(value_logweight_func)
             boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
-            boss_pipe_end.send(
-                (dimension, dilled_function, focus_tracker, max_samples))
+            # Do not send yet, run the worker first to avoid a deadlock.
+            # See https://stackoverflow.com/a/15716500
             worker = multiprocessing.Process(
             worker = multiprocessing.Process(
-                target=Integrator.try_estimate_nd, args=(
-                    worker_pipe_end, 10.0, self.trace_enabled))
+                target=Integrator.try_estimate_nd,
+                args=(worker_pipe_end, 10.0, self.trace_enabled)
+            )
             worker.daemon = True
             worker.start()
             worker.daemon = True
             worker.start()
+            boss_pipe_end.send(
+                (dimension, dilled_function, focus_tracker, max_samples)
+            )
             return boss_pipe_end
             return boss_pipe_end
-        erf_pipe = start_computing(
-            self.lfit_erf, erf_focus_tracker)
-        stretch_pipe = start_computing(
-            self.lfit_stretch, stretch_focus_tracker)
+
+        erf_pipe = start_computing(self.lfit_erf, erf_focus_tracker)
+        stretch_pipe = start_computing(self.lfit_stretch, stretch_focus_tracker)
+
         # Measurement phase.
         measurement = self.measurer.measure(trial_duration, transmit_rate)
         # Measurement phase.
         measurement = self.measurer.measure(trial_duration, transmit_rate)
+
         # Processing phase.
         def stop_computing(name, pipe):
             """Just a block of code to be used for each worker.
         # Processing phase.
         def stop_computing(name, pipe):
             """Just a block of code to be used for each worker.
@@ -616,51 +636,115 @@ class PLRsearch(object):
             :type pipe: multiprocessing.Connection
             :returns: Computed value tracker, actual focus tracker,
                 and number of samples used for this iteration.
             :type pipe: multiprocessing.Connection
             :returns: Computed value tracker, actual focus tracker,
                 and number of samples used for this iteration.
-            :rtype: 3-tuple of tracker, tracker and int
+            :rtype: _PartialResult
             """
             """
-            pipe.send(None)
+            # If worker encountered an exception, we get it in the recv below,
+            # but send will report a broken pipe.
+            # EAFP says we should ignore the error (instead of polling first).
+            # https://devblogs.microsoft.com/python
+            #   /idiomatic-python-eafp-versus-lbyl/
+            try:
+                pipe.send(None)
+            except BrokenPipeError:
+                pass
             if not pipe.poll(10.0):
             if not pipe.poll(10.0):
-                raise RuntimeError(
-                    "Worker {name} did not finish!".format(name=name))
+                raise RuntimeError(f"Worker {name} did not finish!")
             result_or_traceback = pipe.recv()
             try:
                 value_tracker, focus_tracker, debug_list, trace_list, sampls = (
             result_or_traceback = pipe.recv()
             try:
                 value_tracker, focus_tracker, debug_list, trace_list, sampls = (
-                    result_or_traceback)
+                    result_or_traceback
+                )
             except ValueError:
                 raise RuntimeError(
             except ValueError:
                 raise RuntimeError(
-                    "Worker {name} failed with the following traceback:\n{tr}"
-                    .format(name=name, tr=result_or_traceback))
-            logging.info("Logs from worker %(name)r:", {"name": name})
+                    f"Worker {name} failed with the following traceback:\n"
+                    f"{result_or_traceback}"
+                )
+            logging.info(f"Logs from worker {name!r}:")
             for message in debug_list:
                 logging.info(message)
             for message in trace_list:
                 logging.debug(message)
             for message in debug_list:
                 logging.info(message)
             for message in trace_list:
                 logging.debug(message)
-            logging.debug("trackers: value %(val)r focus %(foc)r", {
-                "val": value_tracker, "foc": focus_tracker})
-            return value_tracker, focus_tracker, sampls
-        stretch_value_tracker, stretch_focus_tracker, stretch_samples = (
-            stop_computing("stretch", stretch_pipe))
-        erf_value_tracker, erf_focus_tracker, erf_samples = (
-            stop_computing("erf", erf_pipe))
-        stretch_avg = stretch_value_tracker.average
-        erf_avg = erf_value_tracker.average
-        # TODO: Take into account secondary stats.
-        stretch_stdev = math.exp(stretch_value_tracker.log_variance / 2)
-        erf_stdev = math.exp(erf_value_tracker.log_variance / 2)
-        avg = math.exp((stretch_avg + erf_avg) / 2.0)
-        var = (stretch_stdev * stretch_stdev + erf_stdev * erf_stdev) / 2.0
-        var += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
-        stdev = avg * math.sqrt(var)
-        focus_trackers = (stretch_focus_tracker, erf_focus_tracker)
+            logging.debug(
+                f"trackers: value {value_tracker!r} focus {focus_tracker!r}"
+            )
+            return _PartialResult(value_tracker, focus_tracker, sampls)
+
+        stretch_result = stop_computing(u"stretch", stretch_pipe)
+        erf_result = stop_computing(u"erf", erf_pipe)
+        result = PLRsearch._get_result(measurement, stretch_result, erf_result)
         logging.info(
         logging.info(
-            "measure_and_compute finished with trial result %(res)r "
-            "avg %(avg)r stdev %(stdev)r stretch %(a1)r erf %(a2)r "
-            "new trackers %(nt)r old trackers %(ot)r stretch samples %(ss)r "
-            "erf samples %(es)r",
-            {"res": measurement, "avg": avg, "stdev": stdev,
-             "a1": math.exp(stretch_avg), "a2": math.exp(erf_avg),
-             "nt": focus_trackers, "ot": old_trackers, "ss": stretch_samples,
-             "es": erf_samples})
-        return (
-            measurement, avg, stdev, math.exp(stretch_avg),
-            math.exp(erf_avg), focus_trackers)
+            f"measure_and_compute finished with trial result "
+            f"{result.measurement!r} avg {result.avg!r} stdev {result.stdev!r} "
+            f"stretch {result.stretch_exp_avg!r} erf {result.erf_exp_avg!r} "
+            f"new trackers {result.trackers!r} old trackers {old_trackers!r} "
+            f"stretch samples {stretch_result.samples!r} erf samples "
+            f"{erf_result.samples!r}"
+        )
+        return result
+
+    @staticmethod
+    def _get_result(measurement, stretch_result, erf_result):
+        """Process and collate results from measure_and_compute.
+
+        Turn logarithm based values to exponential ones,
+        combine averages and stdevs of two fitting functions into a whole.
+
+        :param measurement: The trial measurement obtained during computation.
+        :param stretch_result: Computation output for stretch fitting function.
+        :param erf_result: Computation output for erf fitting function.
+        :type measurement: ReceiveRateMeasurement
+        :type stretch_result: _PartialResult
+        :type erf_result: _PartialResult
+        :returns: Combined results.
+        :rtype: _ComputeResult
+        """
+        stretch_avg = stretch_result.value_tracker.average
+        erf_avg = erf_result.value_tracker.average
+        stretch_var = stretch_result.value_tracker.get_pessimistic_variance()
+        erf_var = erf_result.value_tracker.get_pessimistic_variance()
+        avg_log = (stretch_avg + erf_avg) / 2.0
+        var_log = (stretch_var + erf_var) / 2.0
+        var_log += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
+        stdev_log = math.sqrt(var_log)
+        low, upp = math.exp(avg_log - stdev_log), math.exp(avg_log + stdev_log)
+        avg = (low + upp) / 2
+        stdev = avg - low
+        trackers = (stretch_result.focus_tracker, erf_result.focus_tracker)
+        sea = math.exp(stretch_avg)
+        eea = math.exp(erf_avg)
+        return _ComputeResult(measurement, avg, stdev, sea, eea, trackers)
+
+
+# Named tuples, for multiple local variables to be passed as return value.
+_PartialResult = namedtuple(
+    u"_PartialResult", u"value_tracker focus_tracker samples"
+)
+"""Two stat trackers and sample counter.
+
+:param value_tracker: Tracker for the value (critical load) being integrated.
+:param focus_tracker: Tracker for focusing integration inputs (sample points).
+:param samples: How many samples were used for the computation.
+:type value_tracker: stat_trackers.ScalarDualStatTracker
+:type focus_tracker: stat_trackers.VectorStatTracker
+:type samples: int
+"""
+
+_ComputeResult = namedtuple(
+    u"_ComputeResult",
+    u"measurement avg stdev stretch_exp_avg erf_exp_avg trackers"
+)
+"""Measurement, 4 computation result values, pair of trackers.
+
+:param measurement: The trial measurement result obtained during computation.
+:param avg: Overall average of critical rate estimate.
+:param stdev: Overall standard deviation of critical rate estimate.
+:param stretch_exp_avg: Stretch fitting function estimate average exponentiated.
+:param erf_exp_avg: Erf fitting function estimate average, exponentiated.
+:param trackers: Pair of focus trackers to start next iteration with.
+:type measurement: ReceiveRateMeasurement
+:type avg: float
+:type stdev: float
+:type stretch_exp_avg: float
+:type erf_exp_avg: float
+:type trackers: 2-tuple of stat_trackers.VectorStatTracker
+"""