PLRsearch: Use stat trackers to shorten Integrator 15/19515/1
authorVratko Polak <vrpolak@cisco.com>
Wed, 2 Jan 2019 13:18:50 +0000 (14:18 +0100)
committerVratko Polak <vrpolak@cisco.com>
Fri, 10 May 2019 14:28:16 +0000 (16:28 +0200)
+ Extract several tracker classes and use them in Integrator.
+ Apply next_rate workarounds to focus more on critical region.
+ Rewrite stretch function, as the previous implementation was wrong.
+ Rework logging:
++ Use injected trace() function in Integrator.
++ Inject function that skips trace logging on default initialization.
++ Use the same multiprocessing-safe passing, but separate queue.
+ Set duration to 120m in Robot, but keep at 30m in Python.
+ Apply minor changes to make computations more reproducible:
++ Attempt to log everything needed by reproducibility in one line.
++ Log samples used, and make it settable as upper limit.
++ Use repr output in TRex scripts, to avoid rounding when copypasting.
+- Numpy seems to be sharing PRNG between processes.
+-- An edit to disable one thread is needed for full reproducibility.
+-- Such an edit is prepared, but will not be merged anytime soon.

Change-Id: Icf56429b34ab6de2d18a12e6c676e2d242b3ba4c
Signed-off-by: Vratko Polak <vrpolak@cisco.com>
resources/libraries/python/PLRsearch/Integrator.py
resources/libraries/python/PLRsearch/PLRsearch.py
resources/libraries/python/PLRsearch/stat_trackers.py [new file with mode: 0644]
resources/libraries/python/TrafficGenerator.py
resources/libraries/robot/performance/performance_utils.robot
resources/tools/trex/trex_stateless_profile.py

index f8846fd..82abe5f 100644 (file)
 
 See log_plus for an explanation why None acts as a special case "float" number.
 
-TODO: Separate optimizations specific to PLRsearch
-      and distribute as a standalone package so other projects may reuse.
+TODO: Separate optimizations specific to PLRsearch and distribute the rest
+      as a standalone package so other projects may reuse.
 """
 
-import math
+import copy
 import traceback
 
 import dill
-import numpy
+from numpy import random
 
-# TODO: The preferred way to consume this code is via a pip package.
-# If your project copies code instead, make sure your pylint check does not
-# require these imports to be absolute and descending from your superpackage.
-from log_plus import log_plus
+# TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
+# then switch to absolute imports within PLRsearch package.
+# Current usage of relative imports is just a short term workaround.
+import stat_trackers  # pylint: disable=relative-import
 
 
-def try_estimate_nd(communication_pipe, scale_coeff=10.0, trace_enabled=False):
+def try_estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False):
     """Call estimate_nd but catch any exception and send traceback."""
     try:
         return estimate_nd(communication_pipe, scale_coeff, trace_enabled)
@@ -45,10 +45,10 @@ def try_estimate_nd(communication_pipe, scale_coeff=10.0, trace_enabled=False):
         raise
 
 
-# FIXME: Pylint reports multiple complexity violations.
+# TODO: Pylint reports multiple complexity violations.
 # Refactor the code, using less (but structured) variables
 # and function calls for (relatively) loosly coupled code blocks.
-def estimate_nd(communication_pipe, scale_coeff=10.0, trace_enabled=False):
+def estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False):
     """Use Bayesian inference from control queue, put result to result queue.
 
     TODO: Use a logging framework that works in a user friendly way.
@@ -58,7 +58,7 @@ def estimate_nd(communication_pipe, scale_coeff=10.0, trace_enabled=False):
     Anyway, the current implementation with trace_enabled looks ugly.)
 
     The result is average and standard deviation for posterior distribution
-    of a single dependent (positive scalar) value.
+    of a single dependent (scalar, float) value.
     The prior is assumed to be uniform on (-1, 1) for every parameter.
     Number of parameters and the function for computing
     the dependent value and likelihood both come from input.
@@ -80,28 +80,26 @@ def estimate_nd(communication_pipe, scale_coeff=10.0, trace_enabled=False):
     thus integration algorithm returns also the distribution data,
     to be used as initial focus in next iteration.
 
-    After some number of samples (depends on dimension),
-    the algorithm starts tracking few most likely samples to base the Gaussian
-    distribution around, mixing with estimates from observed samples.
-    The idea is that even when (usually) one of the samples dominates,
-    first few are treated as if equally likely, to get reasonable focus.
-    During the "find the maximum" phase, this set of best samples
-    frequently takes a wrong shape (compared to observed samples
-    in equilibrium). Therefore scale_coeff argument is left for humans to tweak,
+    There are workarounds in place that allow old or default focus tracker
+    to be updated reasonably, even when initial samples
+    of new iteration have way smaller (or larger) weights.
+
+    During the "find the maximum" phase, the focus tracker frequently takes
+    a wrong shape (compared to observed samples in equilibrium).
+    Therefore scale_coeff argument is left for humans to tweak,
     so the convergence is reliable and quick.
-    Any data (other than correctly weighted samples) used to keep
-    distribution shape reasonable is called "bias", regardles of
-    whether it comes from input hint, or from tracking top samples.
 
     Until the distribution locates itself roughly around
     the maximum likeligood point, the integration results are probably wrong.
     That means some minimal time is needed for the result to become reliable.
+
+    TODO: The folowing is not currently implemented.
     The reported standard distribution attempts to signal inconsistence
     (when one sample has dominating weight compared to the rest of samples),
     but some human supervision is strongly encouraged.
 
     To facilitate running in worker processes, arguments and results
-    are communicated via pipe. The computation does not start
+    are communicated via pipe. The computation does not start
     until arguments appear in the pipe, the computation stops
     when another item (stop object) is detected in the pipe
     (and result is put to pipe).
@@ -115,17 +113,16 @@ def estimate_nd(communication_pipe, scale_coeff=10.0, trace_enabled=False):
     - dilled_function: Function (serialized using dill), which:
     - - Takes the dimension number of float parameters from (-1, 1).
     - - Returns float 2-tuple of dependent value and parameter log-likelihood.
-    - param_hint_avg: Dimension-tuple of floats to start searching around.
-    - param_hint_cov: Covariance matrix defining initial focus shape.
+    - param_focus_tracker: VectorStatTracker to use for initial focus.
+    - max_samples: None or a limit for samples to use.
 
     Output/result object (sent to pipe queue)
-    is a 6-tuple of the following fields:
-    - value_avg: Float estimate of posterior average dependent value.
-    - value_stdev: Float estimate of posterior standard deviation of the value.
-    - param_importance_avg: Float tuple, center of Gaussian to use next.
-    - param_importance_cov: Float covariance matrix of the Gaussian to use next.
+    is a 5-tuple of the following fields:
+    - value_tracker: ScalarDualStatTracker estimate of value posterior.
+    - param_focus_tracker: VectorStatTracker to use for initial focus next.
     - debug_list: List of debug strings to log at main process.
     - trace_list: List of trace strings to pass to main process if enabled.
+    - samples: Number of samples used in computation (to make it reproducible).
     Trace strings are very verbose, it is not recommended to enable them.
     In they are not enabled, trace_list will be empty.
     It is recommended to edit some lines manually to debug_list if needed.
@@ -144,11 +141,13 @@ def estimate_nd(communication_pipe, scale_coeff=10.0, trace_enabled=False):
         (due to rounding errors). Try changing scale_coeff.
     """
 
-    # Block until input object appears.
-    dimension, dilled_function, param_hint_avg, param_hint_cov = (
-        communication_pipe.recv())
     debug_list = list()
     trace_list = list()
+    # Block until input object appears.
+    dimension, dilled_function, param_focus_tracker, max_samples = (
+        communication_pipe.recv())
+    debug_list.append("Called with param_focus_tracker {tracker!r}"
+                      .format(tracker=param_focus_tracker))
     def trace(name, value):
         """
         Add a variable (name and value) to trace list (if enabled).
@@ -165,77 +164,37 @@ def estimate_nd(communication_pipe, scale_coeff=10.0, trace_enabled=False):
         if trace_enabled:
             trace_list.append(name + " " + repr(value))
     value_logweight_function = dill.loads(dilled_function)
-    len_top = (dimension + 2) * (dimension + 1) / 2
-    top_weight_param = list()
     samples = 0
-    log_sum_weight = None
     # Importance sampling produces samples of higher weight (important)
     # more frequently, and corrects that by adding weight bonus
     # for the less frequently (unimportant) samples.
     # But "corrected_weight" is too close to "weight" to be readable,
     # so "importance" is used instead, even if it runs contrary to what
     # important region is.
-    log_sum_importance = None
-    log_importance_best = None
-    value_avg = 0.0
-    # 1x1 dimensional covariance matrix is just variance.
-    # As variance is never negative, we can track logarithm.
-    value_log_variance = None
-    # Here "secondary" means "excluding the weightest sample".
-    log_secondary_sum_importance = None
-    value_secondary_avg = 0.0
-    value_log_secondary_variance = None
-    param_sampled_avg = [0.0 for first in range(dimension)]
-    # TODO: Examine whether we can gain speed by tracking triangle only.
-    # Covariance matrix can contain negative element (off-diagonal),
-    # so no logarithm here. This can lead to zeroes on diagonal,
-    # but we have biasing to make sure it does not hurt much.
-    param_sampled_cov = [[0.0 for first in range(dimension)]
-                         for second in range(dimension)]
-    # The next two variables do NOT need to be initialized here,
-    # but pylint is not a mathematician enough to understand that.
-    param_top_avg = [0.0 for first in range(dimension)]
-    param_top_cov = [[0.0 for first in range(dimension)]
-                     for second in range(dimension)]
-    if not (param_hint_avg and param_hint_cov):
-        # First call has Nones instead of useful hints.
-        param_hint_avg = [0.0 for first in range(dimension)]
-        param_hint_cov = [
-            [1.0 if first == second else 0.0 for first in range(dimension)]
-            for second in range(dimension)]
+    value_tracker = stat_trackers.ScalarDualStatTracker()
+    param_sampled_tracker = stat_trackers.VectorStatTracker(dimension).reset()
+    if not param_focus_tracker:
+        # First call has None instead of a real (even empty) tracker.
+        param_focus_tracker = stat_trackers.VectorStatTracker(dimension)
+        param_focus_tracker.unit_reset()
+    else:
+        # Focus tracker has probably too high weight.
+        param_focus_tracker.log_sum_weight = None
+    # TODO: Teach pylint the used version of numpy.random does have this member.
+    random.seed(0)
     while not communication_pipe.poll():
-        # Compute focus data.
-        if len(top_weight_param) < len_top:
-            # Not enough samples for reasonable top, use hint bias.
-            param_focus_avg = param_hint_avg
-            param_focus_cov = param_hint_cov
-        else:
-            # We have both top samples and overall samples.
-            # Mix them according to how much the weightest sample dominates.
-            log_top_weight = top_weight_param[0][0]
-            log_weight_norm = log_plus(log_sum_weight, log_top_weight)
-            top_ratio = math.exp(log_top_weight - log_weight_norm)
-            sampled_ratio = math.exp(log_sum_weight - log_weight_norm)
-            trace("log_top_weight", log_top_weight)
-            trace("log_sum_weight", log_sum_weight)
-            trace("top_ratio", top_ratio)
-            trace("sampled_ratio", sampled_ratio)
-            param_focus_avg = [
-                sampled_ratio * param_sampled_avg[first]
-                + top_ratio * param_top_avg[first]
-                for first in range(dimension)]
-            param_focus_cov = [[
-                scale_coeff * (
-                    sampled_ratio * param_sampled_cov[first][second]
-                    + top_ratio * param_top_cov[first][second])
-                for first in range(dimension)] for second in range(dimension)]
-        trace("param_focus_avg", param_focus_avg)
-        trace("param_focus_cov", param_focus_cov)
+        if max_samples and samples >= max_samples:
+            break
         # Generate next sample.
+        averages = param_focus_tracker.averages
+        covariance_matrix = copy.deepcopy(param_focus_tracker.covariance_matrix)
+        for first in range(dimension):
+            for second in range(dimension):
+                covariance_matrix[first][second] *= scale_coeff
         while 1:
-            # TODO: Inform pylint that correct version of numpy is available.
-            sample_point = numpy.random.multivariate_normal(
-                param_focus_avg, param_focus_cov, 1)[0]
+            # TODO: Teach pylint that numpy.random does also have this member.
+            sample_point = random.multivariate_normal(
+                averages, covariance_matrix, 1)[0].tolist()
             # Multivariate Gauss can fall outside (-1, 1) interval
             for first in range(dimension):
                 sample_coordinate = sample_point[first]
@@ -245,134 +204,30 @@ def estimate_nd(communication_pipe, scale_coeff=10.0, trace_enabled=False):
                 break
         trace("sample_point", sample_point)
         samples += 1
-        value, log_weight = value_logweight_function(*sample_point)
+        trace("samples", samples)
+        value, log_weight = value_logweight_function(trace, *sample_point)
         trace("value", value)
         trace("log_weight", log_weight)
-        # Update bias related statistics.
-        log_sum_weight = log_plus(log_sum_weight, log_weight)
-        if len(top_weight_param) < len_top:
-            top_weight_param.append((log_weight, sample_point))
-        # Hack: top_weight_param[-1] is either the smallest,
-        # or the just appended to len_top-1 item list.
-        if (len(top_weight_param) >= len_top
-                and log_weight >= top_weight_param[-1][0]):
-            top_weight_param = top_weight_param[:-1]
-            top_weight_param.append((log_weight, sample_point))
-            top_weight_param.sort(key=lambda item: -item[0])
-            trace("top_weight_param", top_weight_param)
-            # top_weight_param has changed, recompute biases.
-            param_top_avg = top_weight_param[0][1]
-            param_top_cov = [[0.0 for first in range(dimension)]
-                             for second in range(dimension)]
-            top_item_count = 1
-            for _, near_top_param in top_weight_param[1:]:
-                top_item_count += 1
-                next_item_ratio = 1.0 / top_item_count
-                previous_items_ratio = 1.0 - next_item_ratio
-                param_shift = [
-                    near_top_param[first] - param_top_avg[first]
-                    for first in range(dimension)]
-                # Do not move center from the weightest sample.
-                for second in range(dimension):
-                    for first in range(dimension):
-                        param_top_cov[first][second] += (
-                            param_shift[first] * param_shift[second]
-                            * next_item_ratio)
-                        param_top_cov[first][second] *= previous_items_ratio
-            trace("param_top_avg", param_top_avg)
-            trace("param_top_cov", param_top_cov)
+        trace("focus tracker before adding", param_focus_tracker)
+        # Update focus related statistics.
+        param_distance = param_focus_tracker.add_without_dominance_get_distance(
+            sample_point, log_weight)
         # The code above looked at weight (not importance).
         # The code below looks at importance (not weight).
-        param_shift = [sample_point[first] - param_focus_avg[first]
-                       for first in range(dimension)]
-        rarity_gradient = numpy.linalg.solve(param_focus_cov, param_shift)
-        rarity_step = numpy.vdot(param_shift, rarity_gradient)
-        log_rarity = rarity_step / 2.0
+        log_rarity = param_distance / 2.0
         trace("log_rarity", log_rarity)
-        trace("samples", samples)
         log_importance = log_weight + log_rarity
         trace("log_importance", log_importance)
+        value_tracker.add(value, log_importance)
         # Update sampled statistics.
-        old_log_sum_importance = log_sum_importance
-        log_sum_importance = log_plus(old_log_sum_importance, log_importance)
-        trace("new log_sum_weight", log_sum_weight)
-        trace("log_sum_importance", log_sum_importance)
-        if old_log_sum_importance is None:
-            param_sampled_avg = list(sample_point)
-            value_avg = value
-            # Other value related quantities stay None.
-            continue
-        previous_samples_ratio = math.exp(
-            old_log_sum_importance - log_sum_importance)
-        new_sample_ratio = math.exp(log_importance - log_sum_importance)
-        param_shift = [sample_point[first] - param_sampled_avg[first]
-                       for first in range(dimension)]
-        value_shift = value - value_avg
-        for first in range(dimension):
-            param_sampled_avg[first] += param_shift[first] * new_sample_ratio
-        old_value_avg = value_avg
-        value_avg += value_shift * new_sample_ratio
-        value_absolute_shift = abs(value_shift)
-        for second in range(dimension):
-            for first in range(dimension):
-                param_sampled_cov[first][second] += (
-                    param_shift[first] * param_shift[second] * new_sample_ratio)
-                param_sampled_cov[first][second] *= previous_samples_ratio
-        trace("param_sampled_avg", param_sampled_avg)
-        trace("param_sampled_cov", param_sampled_cov)
-        update_secondary_stats = True
-        if log_importance_best is None or log_importance > log_importance_best:
-            log_importance_best = log_importance
-            log_secondary_sum_importance = old_log_sum_importance
-            value_secondary_avg = old_value_avg
-            value_log_secondary_variance = value_log_variance
-            update_secondary_stats = False
-            # TODO: Update all primary quantities before secondary ones.
-            # (As opposed to current hybrid code.)
-        if value_absolute_shift > 0.0:
-            value_log_variance = log_plus(
-                value_log_variance, 2 * math.log(value_absolute_shift)
-                + log_importance - log_sum_importance)
-        if value_log_variance is not None:
-            value_log_variance -= log_sum_importance - old_log_sum_importance
-        if not update_secondary_stats:
-            continue
-        # TODO: Pylint says the following variable name is bad.
-        # Make sure the refactor uses shorter names.
-        old_log_secondary_sum_importance = log_secondary_sum_importance
-        log_secondary_sum_importance = log_plus(
-            old_log_secondary_sum_importance, log_importance)
-        if old_log_secondary_sum_importance is None:
-            value_secondary_avg = value
-            continue
-        new_sample_secondary_ratio = math.exp(
-            log_importance - log_secondary_sum_importance)
-        # TODO: No would-be variable named old_value_secondary_avg
-        # appears in subsequent computations. Probably means there is a bug.
-        value_secondary_shift = value - value_secondary_avg
-        value_secondary_absolute_shift = abs(value_secondary_shift)
-        value_secondary_avg += (
-            value_secondary_shift * new_sample_secondary_ratio)
-        if value_secondary_absolute_shift > 0.0:
-            value_log_secondary_variance = log_plus(
-                value_log_secondary_variance, (
-                    2 * math.log(value_secondary_absolute_shift)
-                    + log_importance - log_secondary_sum_importance))
-        if value_log_secondary_variance is not None:
-            value_log_secondary_variance -= (
-                log_secondary_sum_importance - old_log_secondary_sum_importance)
+        param_sampled_tracker.add_get_shift(sample_point, log_importance)
     debug_list.append("integrator used " + str(samples) + " samples")
-    debug_list.append(
-        "value_avg " + str(value_avg)
-        + " param_sampled_avg " + repr(param_sampled_avg)
-        + " param_sampled_cov " + repr(param_sampled_cov)
-        + " value_log_variance " + str(value_log_variance)
-        + " value_log_secondary_variance " + str(value_log_secondary_variance))
-    value_stdev = math.exp(
-        (2 * value_log_variance - value_log_secondary_variance) / 2.0)
-    debug_list.append("top_weight_param[0] " + repr(top_weight_param[0]))
-    # Intentionally returning param_focus_avg and param_focus_cov,
-    # instead of possibly hyper-focused bias or sampled.
+    debug_list.append(" ".join([
+        "value_avg", str(value_tracker.average),
+        "param_sampled_avg", repr(param_sampled_tracker.averages),
+        "param_sampled_cov", repr(param_sampled_tracker.covariance_matrix),
+        "value_log_variance", str(value_tracker.log_variance),
+        "value_log_secondary_variance",
+        str(value_tracker.secondary.log_variance)]))
     communication_pipe.send(
-        (value_avg, value_stdev, param_focus_avg, param_focus_cov, debug_list,
-         trace_list))
+        (value_tracker, param_focus_tracker, debug_list, trace_list, samples))
index f1b3f74..db870c5 100644 (file)
@@ -22,15 +22,16 @@ import dill
 # TODO: Inform pylint about scipy (of correct version) being available.
 from scipy.special import erfcx, erfc
 
-# TODO: The preferred way to consume this code is via a pip package.
-# If your project copies code instead, make sure your pylint check does not
-# require these imports to be absolute and descending from your superpackage.
-import Integrator
-from log_plus import log_plus, log_minus
+# TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
+# then switch to absolute imports within PLRsearch package.
+# Current usage of relative imports is just a short term workaround.
+import Integrator  # pylint: disable=relative-import
+from log_plus import log_plus, log_minus  # pylint: disable=relative-import
+import stat_trackers  # pylint: disable=relative-import
 
 
 class PLRsearch(object):
-    """A class to encapsulate data relevant for search method.
+    """A class to encapsulate data relevant for the search method.
 
     The context is performance testing of packet processing systems.
     The system, when being offered a steady stream of packets,
@@ -43,9 +44,6 @@ class PLRsearch(object):
     Method othed than search (and than __init__)
     are just internal code structure.
     TODO: Those method names should start with underscore then.
-
-    TODO: Figure out how to replace #print with logging
-    without slowing down too much.
     """
 
     xerfcx_limit = math.pow(math.acos(0), -0.5)
@@ -53,12 +51,9 @@ class PLRsearch(object):
 
     def __init__(
             self, measurer, trial_duration_per_trial, packet_loss_ratio_target,
-            trial_number_offset=0, timeout=60.0):
+            trial_number_offset=0, timeout=1800.0, trace_enabled=False):
         """Store rate measurer and additional parameters.
 
-        Also declare packet_loss_per_second_target field (float),
-        to be initialized later.
-
         TODO: Copy AbstractMeasurer from MLRsearch.
 
         :param measurer: The measurer to call when searching.
@@ -78,10 +73,11 @@ class PLRsearch(object):
         :type timeout: float
         """
         self.measurer = measurer
-        self.trial_duration_per_trial = trial_duration_per_trial
-        self.packet_loss_ratio_target = packet_loss_ratio_target
-        self.trial_number_offset = trial_number_offset
-        self.timeout = timeout
+        self.trial_duration_per_trial = float(trial_duration_per_trial)
+        self.packet_loss_ratio_target = float(packet_loss_ratio_target)
+        self.trial_number_offset = int(trial_number_offset)
+        self.timeout = float(timeout)
+        self.trace_enabled = bool(trace_enabled)
 
     def search(self, min_rate, max_rate):
         """Perform the search, return average and stdev for throughput estimate.
@@ -106,7 +102,7 @@ class PLRsearch(object):
         will converge towards unique (for the given load) packet loss ratio.
         This means there is a deterministic (but unknown) function
         mapping the offered load to average loss ratio.
-        This function is called loss function.
+        This function is called loss ratio function.
         This also assumes the average loss ratio
         does not depend on trial duration.
 
@@ -118,7 +114,7 @@ class PLRsearch(object):
         but Poisson is more practical, as it effectively gives
         less information content to high ratio results.
 
-        Even when applying other assumptions on the loss function
+        Even when applying other assumptions on the loss ratio function
         (increasing function, limit zero ratio when load goes to zero,
         global upper limit on rate of packets processed), there are still
         too many different shapes of possible loss functions,
@@ -132,14 +128,16 @@ class PLRsearch(object):
         the Bayesian inference becomes tractable (even though it needs
         numerical integration from Integrator class).
 
-        The first measurement is done at min_rate to help with convergence
+        The first measurement is done at the middle between
+        min_rate and max_rate, to help with convergence
         if max_rate measurements give loss below target.
-        FIXME: Fix overflow error and really use min_rate.
+        TODO: Fix overflow error and use min_rate instead of the middle.
+
         The second measurement is done at max_rate, next few measurements
-        have offered load of previous load minus excess loss ratio.
+        have offered load of previous load minus excess loss rate.
         This simple rule is found to be good when offered loads
         so far are way above the critical rate. After few measurements,
-        inference from fitting functions converges faster that the initial
+        inference from fitting functions converges faster that this initial
         "optimistic" procedure.
 
         Offered loads close to (limiting) critical rate are the most useful,
@@ -150,15 +148,13 @@ class PLRsearch(object):
         has better predictions than the other one, but the algorithm
         does not track that. Simply, it uses the estimate average,
         alternating between the functions.
+        Multiple workarounds are applied to try and avoid measurements
+        both in zero loss region and in big loss region,
+        as their results tend to make the critical load estimate worse.
 
         The returned average and stdev is a combination of the two fitting
         estimates.
 
-        TODO: If measurement at max_rate is already below target loss rate,
-        the algorithm always measures at max_rate, and likelihood
-        no longer has single maximum, leading to "random" estimates.
-        Find a way to avoid that.
-
         :param min_rate: Avoid measuring at offered loads below this,
             in packets per second.
         :param max_rate: Avoid measuring at offered loads above this,
@@ -171,36 +167,59 @@ class PLRsearch(object):
         stop_time = time.time() + self.timeout
         min_rate = float(min_rate)
         max_rate = float(max_rate)
+        logging.info("Started search with min_rate %(min)r, max_rate %(max)r",
+                     {"min": min_rate, "max": max_rate})
         trial_result_list = list()
         trial_number = self.trial_number_offset
-        integrator_data = (None, None, None, None)
-        message = "Trial %(number)r computed avg %(avg)r stdev %(stdev)r"
-        message += " stretch %(a1)r erf %(a2)r difference %(diff)r"
+        focus_trackers = (None, None)
         transmit_rate = (min_rate + max_rate) / 2.0
+        lossy_loads = [max_rate]
+        zeros = [0, 0]  # Cosecutive zero loss, separately for stretch and erf.
         while 1:
             trial_number += 1
-            trial_duration = trial_number * self.trial_duration_per_trial
+            logging.info("Trial %(number)r", {"number": trial_number})
             results = self.measure_and_compute(
-                trial_duration, transmit_rate, trial_result_list, max_rate,
-                integrator_data)
-            measurement, average, stdev, avg1, avg2, integrator_data = results
-            logging.info(message, {
-                "number": trial_number, "avg": average, "stdev": stdev,
-                "a1": avg1, "a2": avg2, "diff": avg2 - avg1})
+                self.trial_duration_per_trial * trial_number, transmit_rate,
+                trial_result_list, min_rate, max_rate, focus_trackers)
+            measurement, average, stdev, avg1, avg2, focus_trackers = results
+            index = trial_number % 2
+            zeros[index] += 1
+            # TODO: Ratio of fill rate to drain rate seems to have
+            # exponential impact. Make it configurable, or is 4:3 good enough?
+            if measurement.loss_fraction >= self.packet_loss_ratio_target:
+                for _ in range(4 * zeros[index]):
+                    lossy_loads.append(measurement.target_tr)
+            if measurement.loss_count > 0:
+                zeros[index] = 0
+            lossy_loads.sort()
             if stop_time <= time.time():
                 return average, stdev
             trial_result_list.append(measurement)
             if (trial_number - self.trial_number_offset) <= 1:
                 next_load = max_rate
             elif (trial_number - self.trial_number_offset) <= 3:
-                next_load = (measurement.receive_rate
-                             / (1.0 - self.packet_loss_ratio_target))
+                next_load = (measurement.receive_rate / (
+                    1.0 - self.packet_loss_ratio_target))
             else:
-                next_load = avg1 if trial_number % 2 else avg2
+                index = (trial_number + 1) % 2
+                next_load = (avg1, avg2)[index]
+                if zeros[index] > 0:
+                    if lossy_loads[0] > next_load:
+                        diminisher = math.pow(2.0, 1 - zeros[index])
+                        next_load = lossy_loads[0] + diminisher * next_load
+                        next_load /= (1.0 + diminisher)
+                    # On zero measurement, we need to drain obsoleted low losses
+                    # even if we did not use them to increase next_load,
+                    # in order to get to usable loses with higher load.
+                    if len(lossy_loads) > 3:
+                        lossy_loads = lossy_loads[3:]
+                logging.debug("Zeros %(z)r orig %(o)r next %(n)r loads %(s)r",
+                              {"z": zeros, "o": (avg1, avg2)[index],
+                               "n": next_load, "s": lossy_loads})
             transmit_rate = min(max_rate, max(min_rate, next_load))
 
     @staticmethod
-    def lfit_stretch(load, mrr, spread):
+    def lfit_stretch(trace, load, mrr, spread):
         """Stretch-based fitting function.
 
         Return the logarithm of average packet loss per second
@@ -216,45 +235,58 @@ class PLRsearch(object):
         TODO: Explain how the high-level description
         has been converted into an implementation full of ifs.
 
+        :param trace: A multiprocessing-friendly logging function (closure).
         :param load: Offered load (positive), in packets per second.
         :param mrr: Parameter of this fitting function, equal to limiting
             (positive) average number of packets received (as opposed to lost)
             when offered load is many spreads more than mrr.
         :param spread: The x-scaling parameter (positive). No nice semantics,
             roughly corresponds to size of "tail" for loads below mrr.
+        :type trace: function (str, object) -> NoneType
         :type load: float
         :type mrr: float
         :type spread: float
         :returns: Logarithm of average number of packets lost per second.
         :rtype: float
         """
+        # TODO: What is the fastest way to use such values?
+        log_2 = math.log(2)
+        log_3 = math.log(3)
+        log_spread = math.log(spread)
         # TODO: chi is from https://en.wikipedia.org/wiki/Nondimensionalization
         chi = (load - mrr) / spread
         chi0 = -mrr / spread
-#        print "load", load, "mrr", mrr, "spread", spread, "chi", chi
+        trace("stretch: load", load)
+        trace("mrr", mrr)
+        trace("spread", spread)
+        trace("chi", chi)
+        trace("chi0", chi0)
         if chi > 0:
             log_lps = math.log(
                 load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread)
-#            print "big loss direct log_lps", log_lps
+            trace("big loss direct log_lps", log_lps)
         else:
-            approx = (math.exp(chi) - math.exp(2 * chi) / 2) * spread
-            if approx == 0.0:
-                log_lps = chi
-#                print "small loss crude log_lps", log_lps
+            two_positive = log_plus(chi, 2 * chi0 - log_2)
+            two_negative = log_plus(chi0, 2 * chi - log_2)
+            if two_positive <= two_negative:
+                log_lps = log_minus(chi, chi0) + log_spread
+                trace("small loss crude log_lps", log_lps)
                 return log_lps
-            third = math.exp(3 * chi) / 3 * spread
-            if approx + third != approx + 2 * third:
-                log_lps = math.log(
-                    (log_plus(0, chi) - log_plus(0, chi0)) * spread)
-#                print "small loss direct log_lps", log_lps
+            two = log_minus(two_positive, two_negative)
+            three_positive = log_plus(two_positive, 3 * chi - log_3)
+            three_negative = log_plus(two_negative, 3 * chi0 - log_3)
+            three = log_minus(three_positive, three_negative)
+            if two == three:
+                log_lps = two + log_spread
+                trace("small loss approx log_lps", log_lps)
             else:
-                log_lps = math.log(
-                    approx - (math.exp(chi0) - math.exp(2 * chi0)) * spread)
-#                print "small loss approx log_lps", log_lps
+                log_lps = math.log(log_plus(0, chi) - log_plus(0, chi0))
+                log_lps += log_spread
+                trace("small loss direct log_lps", log_lps)
         return log_lps
 
     @staticmethod
-    def lfit_erf(load, mrr, spread):
+    def lfit_erf(trace, load, mrr, spread):
         """Erf-based fitting function.
 
         Return the logarithm of average packet loss per second
@@ -271,12 +303,14 @@ class PLRsearch(object):
         TODO: Explain how the high-level description
         has been converted into an implementation full of ifs.
 
+        :param trace: A multiprocessing-friendly logging function (closure).
         :param load: Offered load (positive), in packets per second.
         :param mrr: Parameter of this fitting function, equal to limiting
             (positive) average number of packets received (as opposed to lost)
             when offered load is many spreads more than mrr.
         :param spread: The x-scaling parameter (positive). No nice semantics,
             roughly corresponds to size of "tail" for loads below mrr.
+        :type trace: function (str, object) -> NoneType
         :type load: float
         :type mrr: float
         :type spread: float
@@ -287,24 +321,26 @@ class PLRsearch(object):
         # TODO: The stretch sign is just to have less minuses. Worth changing?
         chi = (mrr - load) / spread
         chi0 = mrr / spread
-#        print "load", load, "mrr", mrr, "spread", spread,
-#        print "chi", chi, "chi0", chi0
+        trace("Erf: load", load)
+        trace("mrr", mrr)
+        trace("spread", spread)
+        trace("chi", chi)
+        trace("chi0", chi0)
         if chi >= -1.0:
-#            print "positive, b ~> m"
+            trace("positive, b roughly bigger than m", None)
             if chi > math.exp(10):
                 first = PLRsearch.log_xerfcx_10 + 2 * (math.log(chi) - 10)
-#                print "approximated"
+                trace("approximated first", first)
             else:
                 first = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi))
-#                print "exact"
+                trace("exact first", first)
             first -= chi * chi
             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
             second -= chi0 * chi0
             intermediate = log_minus(first, second)
-#            print "first", first, "second", second,
-#            print "intermediate", intermediate
+            trace("first", first)
         else:
-#            print "negative, b ~< m"
+            trace("negative, b roughly smaller than m", None)
             exp_first = PLRsearch.xerfcx_limit + chi * erfcx(-chi)
             exp_first *= math.exp(-chi * chi)
             exp_first -= 2 * chi
@@ -315,82 +351,68 @@ class PLRsearch(object):
             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
             second -= chi0 * chi0
             intermediate = math.log(exp_first - math.exp(second))
-#            print "exp_first", exp_first, "second", second,
-#            print "intermediate", intermediate
+            trace("exp_first", exp_first)
+        trace("second", second)
+        trace("intermediate", intermediate)
         result = intermediate + math.log(spread) - math.log(erfc(-chi0))
-#        print "lfit erf result", result
+        trace("result", result)
         return result
 
     @staticmethod
-    def find_critical_rate(lfit_func, log_lps_target, mrr, spread):
-        """Given lps target and parameters, return the achieving offered load.
+    def find_critical_rate(
+            trace, lfit_func, min_rate, max_rate, loss_ratio_target,
+            mrr, spread):
+        """Given ratio target and parameters, return the achieving offered load.
 
         This is basically an inverse function to lfit_func
         when parameters are fixed.
         Instead of implementing effective implementation
         of the inverse function, this implementation uses
-        brute force binary search.
-        The search starts at an arbitrary offered load,
-        uses exponential search to find the other bound
-        and then bisecting until the load is found (or interval
-        becoming degenerate).
+        brute force binary search. It is bisecting (nim_rate, max_rate) interval
+        until the critical load is found (or interval becomes degenerate).
+        This implementation assures min and max rate limits are honored.
 
-        TODO: Use at least some method with faster convergence.
+        TODO: Use some method with faster convergence?
 
+        :param trace: A multiprocessing-friendly logging function (closure).
         :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
-        :param log_lps_target: Fitting function should return this
-            at the returned load and parameters.
-        :param mrr: The mrr parameter for the fitting function.
-        :param spread: The spread parameter for the fittinmg function.
+        :param min_rate: Lower bound for binary search [pps].
+        :param max_rate: Upper bound for binary search [pps].
+        :param loss_ratio_target: Fitting function should return loss rate
+            giving this ratio at the returned load and parameters [1].
+        :param mrr: The mrr parameter for the fitting function [pps].
+        :param spread: The spread parameter for the fittinmg function [pps].
+        :type trace: function (str, object) -> None
         :type lfit_func: Function from 3 floats to float.
+        :type min_rate: float
+        :type max_rate: float
         :type log_lps_target: float
         :type mrr: float
         :type spread: float
         :returns: Load [pps] which achieves the target with given parameters.
         :rtype: float
         """
-        # TODO: Should we make the initial rate configurable?
-        rate = 10000000.0
-        log_loss = lfit_func(rate, mrr, spread)
-        if log_loss == log_lps_target:
-            return rate
-        # Exponential search part.
-        if log_loss > log_lps_target:
-            rate_hi = rate
-            while 1:
-                rate_lo = rate_hi / 2.0
-                log_loss = lfit_func(rate_lo, mrr, spread)
-                if log_loss > log_lps_target:
-                    rate_hi = rate_lo
-                    continue
-                if log_loss == log_lps_target:
-                    return rate_lo
-                break
-        else:
-            rate_lo = rate
-            while 1:
-                rate_hi = rate_lo * 2.0
-                log_loss = lfit_func(rate_hi, mrr, spread)
-                if log_loss < log_lps_target:
-                    rate_lo = rate_hi
-                    continue
-                if log_loss == log_lps_target:
-                    return rate_hi
-                break
-        # Binary search part.
-        while rate_hi != rate_lo:
+        trace("Finding critical rate for loss_ratio_target", loss_ratio_target)
+        rate_lo = min_rate
+        rate_hi = max_rate
+        loss_ratio = -1
+        while loss_ratio != loss_ratio_target:
             rate = (rate_hi + rate_lo) / 2.0
-            log_loss = lfit_func(rate, mrr, spread)
-            if rate == rate_hi or rate == rate_lo or log_loss == log_lps_target:
-#                print "found", rate
-                return rate
-            if log_loss > log_lps_target:
+            if rate == rate_hi or rate == rate_lo:
+                break
+            loss_rate = math.exp(lfit_func(trace, rate, mrr, spread))
+            loss_ratio = loss_rate / rate
+            if loss_ratio > loss_ratio_target:
+                trace("halving down", rate)
                 rate_hi = rate
-            else:
+            elif loss_ratio < loss_ratio_target:
+                trace("halving up", rate)
                 rate_lo = rate
+        trace("found", rate)
+        return rate
 
     @staticmethod
-    def log_weight(lfit_func, trial_result_list, mrr, spread):
+    def log_weight(trace, lfit_func, trial_result_list, mrr, spread):
         """Return log of weight of trial results by the function and parameters.
 
         Integrator assumes uniform distribution, but over different parameters.
@@ -399,15 +421,18 @@ class PLRsearch(object):
         Each trial has an offered load, a duration and a loss count.
         Fitting function is used to compute the average loss per second.
         Poisson distribution (with average loss per trial) is used
-        to get likelihood of one trial result, the overal likelihood is product.
+        to get likelihood of one trial result, the overal likelihood
+        is a product of all trial likelihoods.
         As likelihoods can be extremely small, logarithms are tracked instead.
 
         TODO: Copy ReceiveRateMeasurement from MLRsearch.
 
+        :param trace: A multiprocessing-friendly logging function (closure).
         :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
         :param result_list: List of trial measurement results.
         :param mrr: The mrr parameter for the fitting function.
         :param spread: The spread parameter for the fittinmg function.
+        :type trace: function (str, object) -> None
         :type lfit_func: Function from 3 floats to float.
         :type result_list: list of MLRsearch.ReceiveRateMeasurement
         :type mrr: float
@@ -416,10 +441,14 @@ class PLRsearch(object):
         :rtype: float
         """
         log_likelihood = 0.0
+        trace("log_weight for mrr", mrr)
+        trace("spread", spread)
         for result in trial_result_list:
-#            print "DEBUG for tr", result.target_tr,
-#            print "lc", result.loss_count, "d", result.duration
-            log_avg_loss_per_second = lfit_func(result.target_tr, mrr, spread)
+            trace("for tr", result.target_tr)
+            trace("lc", result.loss_count)
+            trace("d", result.duration)
+            log_avg_loss_per_second = lfit_func(
+                trace, result.target_tr, mrr, spread)
             log_avg_loss_per_trial = (
                 log_avg_loss_per_second + math.log(result.duration))
             # Poisson probability computation works nice for logarithms.
@@ -428,20 +457,15 @@ class PLRsearch(object):
                 - math.exp(log_avg_loss_per_trial))
             log_trial_likelihood -= math.lgamma(1 + result.loss_count)
             log_likelihood += log_trial_likelihood
-#            print "log_avg_loss_per_second", log_avg_loss_per_second
-#            print "log_avg_loss_per_trial", log_avg_loss_per_trial
-#            print "avg_loss_per_trial", math.exp(log_avg_loss_per_trial)
-#            print "log_trial_likelihood", log_trial_likelihood
-#            print "log_likelihood", log_likelihood
-#        print "returning log_likelihood", log_likelihood
+            trace("avg_loss_per_trial", math.exp(log_avg_loss_per_trial))
+            trace("log_trial_likelihood", log_trial_likelihood)
         return log_likelihood
 
-    # FIXME: Refactor (somehow) so pylint stops complaining about
+    # TODO: Refactor (somehow) so pylint stops complaining about
     # too many local variables.
-    # Some work already done in subsequent changes.
     def measure_and_compute(
-            self, trial_duration, transmit_rate,
-            trial_result_list, max_rate, integrator_data):
+            self, trial_duration, transmit_rate, trial_result_list,
+            min_rate, max_rate, focus_trackers=(None, None), max_samples=None):
         """Perform both measurement and computation at once.
 
         High level steps: Prepare and launch computation worker processes,
@@ -455,17 +479,18 @@ class PLRsearch(object):
 
         Two fitting functions are used, computation is started
         on temporary worker process per fitting function. After the measurement,
-        Average and stdev of the critical rate (not log) of each worker
+        average and stdev of the critical rate (not log) of each worker
         are combined and returned. Raw averages are also returned,
-        offered load for next iteration is chosen from them.
+        offered load for next iteration is chosen based on them.
         The idea is that one fitting function might be fitting much better,
         measurements at its avg are best for relevant results (for both),
         but we do not know which fitting function it is.
 
-        TODO: Define class for integrator data, so that fields are documented.
+        Focus trackers are updated in-place. If a focus tracker in None,
+        new instance is created.
+
         TODO: Define class for result object, so that fields are documented.
-        TODO: More processes to the slower fitting function?
-        TODO: Re-use processes, instead creating on each computation.
+        TODO: Re-use processes, instead creating on each computation?
         TODO: As only one result is needed fresh, figure out a way
         how to keep the other worker running. This will alow shorter
         duration per trial. Special handling at first and last measurement
@@ -474,23 +499,39 @@ class PLRsearch(object):
         :param trial_duration: Length of the measurement in seconds.
         :param transmit_rate: Offered load in packets per second.
         :param trial_result_list: Results of previous measurements.
-        :param max_rate: Theoretic maximum of possible ofered load.
-        :param integrator_data: Hints to speed up the numeric computation.
+        :param min_rate: Practical minimum of possible ofered load.
+        :param max_rate: Practical maximum of possible ofered load.
+        :param focus_trackers: Pair of trackers initialized
+            to speed up the numeric computation.
+        :param max_samples: Limit for integrator samples, for debugging.
         :type trial_duration: float
         :type transmit_rate: float
         :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
+        :type min_rate: float
         :type max_rate: float
-        :type integrator_data: 4-tuple of gaussian positions and covariances
+        :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker
+        :type max_samples: None or int
         :returns: Measurement and computation results.
-        :rtype: 6-tuple: ReceiveRateMeasurement, floats, integrator data.
+        :rtype: 6-tuple: ReceiveRateMeasurement, 4 floats, 2-tuple of trackers.
         """
+        logging.debug(
+            "measure_and_compute started with self %(self)r, trial_duration "
+            + "%(dur)r, transmit_rate %(tr)r, trial_result_list %(trl)r, "
+            + "max_rate %(mr)r, focus_trackers %(track)r, max_samples %(ms)r",
+            {"self": self, "dur": trial_duration, "tr": transmit_rate,
+             "trl": trial_result_list, "mr": max_rate, "track": focus_trackers,
+             "ms": max_samples})
         # Preparation phase.
         dimension = 2
-        stretch_bias_avg, erf_bias_avg, stretch_bias_cov, erf_bias_cov = (
-            integrator_data)
-        packet_loss_per_second_target = (
-            transmit_rate * self.packet_loss_ratio_target)
-        def start_computing(fitting_function, bias_avg, bias_cov):
+        stretch_focus_tracker, erf_focus_tracker = focus_trackers
+        if stretch_focus_tracker is None:
+            stretch_focus_tracker = stat_trackers.VectorStatTracker(dimension)
+            stretch_focus_tracker.unit_reset()
+        if erf_focus_tracker is None:
+            erf_focus_tracker = stat_trackers.VectorStatTracker(dimension)
+            erf_focus_tracker.unit_reset()
+        old_trackers = stretch_focus_tracker.copy(), erf_focus_tracker.copy()
+        def start_computing(fitting_function, focus_tracker):
             """Just a block of code to be used for each fitting function.
 
             Define function for integrator, create process and pipe ends,
@@ -505,7 +546,7 @@ class PLRsearch(object):
             :returns: Boss end of communication pipe.
             :rtype: multiprocessing.Connection
             """
-            def value_logweight_func(x_mrr, x_spread):
+            def value_logweight_func(trace, x_mrr, x_spread):
                 """Return log of critical rate and log of likelihood.
 
                 This is a closure. The ancestor function got
@@ -525,72 +566,101 @@ class PLRsearch(object):
 
                 TODO: x^-2 (for x>1.0) might be simpler/nicer prior.
 
+                :param trace: Multiprocessing-safe logging function (closure).
                 :param x_mrr: The first dimensionless param
                     from (-1, 1) interval.
                 :param x_spread: The second dimensionless param
                     from (-1, 1) interval.
+                :type trace: function (str, object) -> None
+                :type x_mrr: float
+                :type x_spread: float
                 :returns: Log of critical rate [pps] and log of likelihood.
                 :rtype: 2-tuple of float
                 """
                 mrr = max_rate * (1.0 / (x_mrr + 1.0) - 0.5) + 1.0
                 spread = math.exp((x_spread + 1.0) / 2.0 * math.log(mrr))
-#                print "mrr", mrr, "spread", spread
                 logweight = self.log_weight(
-                    fitting_function, trial_result_list, mrr, spread)
+                    trace, fitting_function, trial_result_list, mrr, spread)
                 value = math.log(self.find_critical_rate(
-                    fitting_function,
-                    math.log(packet_loss_per_second_target), mrr, spread))
+                    trace, fitting_function, min_rate, max_rate,
+                    self.packet_loss_ratio_target, mrr, spread))
                 return value, logweight
             dilled_function = dill.dumps(value_logweight_func)
             boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
             boss_pipe_end.send(
-                (dimension, dilled_function, bias_avg, bias_cov))
+                (dimension, dilled_function, focus_tracker, max_samples))
             worker = multiprocessing.Process(
-                target=Integrator.try_estimate_nd, args=(worker_pipe_end,))
+                target=Integrator.try_estimate_nd, args=(
+                    worker_pipe_end, 10.0, self.trace_enabled))
             worker.daemon = True
             worker.start()
             return boss_pipe_end
         erf_pipe = start_computing(
-            self.lfit_erf, erf_bias_avg, erf_bias_cov)
+            self.lfit_erf, erf_focus_tracker)
         stretch_pipe = start_computing(
-            self.lfit_stretch, stretch_bias_avg, stretch_bias_cov)
+            self.lfit_stretch, stretch_focus_tracker)
         # Measurement phase.
         measurement = self.measurer.measure(trial_duration, transmit_rate)
         # Processing phase.
-        erf_pipe.send(None)
-        stretch_pipe.send(None)
-        if not stretch_pipe.poll(1.0):
-            raise RuntimeError("Stretch worker did not finish!")
-        result_or_traceback = stretch_pipe.recv()
-        try:
-            (stretch_avg, stretch_stdev, stretch_bias_avg,
-             stretch_bias_cov, debug_list, _) = result_or_traceback
-        except ValueError:
-            raise RuntimeError(
-                "Stretch worker failed with the following traceback:\n{tr}"
-                .format(tr=result_or_traceback))
-        logging.info("Logs from stretch worker:")
-        for message in debug_list:
-            logging.debug(message)
-        if not erf_pipe.poll(1.0):
-            raise RuntimeError("Erf worker did not finish!")
-        result_or_traceback = erf_pipe.recv()
-        try:
-            (erf_avg, erf_stdev, erf_bias_avg,
-             erf_bias_cov, debug_list, _) = result_or_traceback
-        except ValueError:
-            raise RuntimeError(
-                "Erf worker failed with the following traceback:\n{tr}"
-                .format(tr=result_or_traceback))
-        logging.info("Logs from erf worker:")
-        for message in debug_list:
-            logging.debug(message)
+        def stop_computing(name, pipe):
+            """Just a block of code to be used for each worker.
+
+            Send stop object, poll for result, then either
+            unpack response, log messages and return, or raise traceback.
+
+            TODO: Define class/structure for the return value?
+
+            :param name: Human friendly worker identifier for logging purposes.
+            :param pipe: Boss end of connection towards worker to stop.
+            :type name: str
+            :type pipe: multiprocessing.Connection
+            :returns: Computed value tracker, actual focus tracker,
+                and number of samples used for this iteration.
+            :rtype: 3-tuple of tracker, tracker and int
+            """
+            pipe.send(None)
+            if not pipe.poll(10.0):
+                raise RuntimeError(
+                    "Worker {name} did not finish!".format(name=name))
+            result_or_traceback = pipe.recv()
+            try:
+                value_tracker, focus_tracker, debug_list, trace_list, sampls = (
+                    result_or_traceback)
+            except ValueError:
+                raise RuntimeError(
+                    "Worker {name} failed with the following traceback:\n{tr}"
+                    .format(name=name, tr=result_or_traceback))
+            logging.info("Logs from worker %(name)r:", {"name": name})
+            for message in debug_list:
+                logging.info(message)
+            for message in trace_list:
+                logging.debug(message)
+            logging.debug("trackers: value %(val)r focus %(foc)r", {
+                "val": value_tracker, "foc": focus_tracker})
+            return value_tracker, focus_tracker, sampls
+        stretch_value_tracker, stretch_focus_tracker, stretch_samples = (
+            stop_computing("stretch", stretch_pipe))
+        erf_value_tracker, erf_focus_tracker, erf_samples = (
+            stop_computing("erf", erf_pipe))
+        stretch_avg = stretch_value_tracker.average
+        erf_avg = erf_value_tracker.average
+        # TODO: Take into account secondary stats.
+        stretch_stdev = math.exp(stretch_value_tracker.log_variance / 2)
+        erf_stdev = math.exp(erf_value_tracker.log_variance / 2)
         avg = math.exp((stretch_avg + erf_avg) / 2.0)
         var = (stretch_stdev * stretch_stdev + erf_stdev * erf_stdev) / 2.0
         var += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
         stdev = avg * math.sqrt(var)
-        integrator_data = (
-            stretch_bias_avg, erf_bias_avg, stretch_bias_cov, erf_bias_cov)
+        focus_trackers = (stretch_focus_tracker, erf_focus_tracker)
+        logging.info(
+            "measure_and_compute finished with trial result %(res)r "
+            "avg %(avg)r stdev %(stdev)r stretch %(a1)r erf %(a2)r "
+            "new trackers %(nt)r old trackers %(ot)r stretch samples %(ss)r "
+            "erf samples %(es)r",
+            {"res": measurement, "avg": avg, "stdev": stdev,
+             "a1": math.exp(stretch_avg), "a2": math.exp(erf_avg),
+             "nt": focus_trackers, "ot": old_trackers, "ss": stretch_samples,
+             "es": erf_samples})
         return (
             measurement, avg, stdev, math.exp(stretch_avg),
-            math.exp(erf_avg), integrator_data)
+            math.exp(erf_avg), focus_trackers)
diff --git a/resources/libraries/python/PLRsearch/stat_trackers.py b/resources/libraries/python/PLRsearch/stat_trackers.py
new file mode 100644 (file)
index 0000000..168b09a
--- /dev/null
@@ -0,0 +1,372 @@
+# Copyright (c) 2019 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Module for tracking average and variance for data of weighted samples.
+
+See log_plus for an explanation why None acts as a special case "float" number.
+
+TODO: Current implementation sets zero averages on empty tracker.
+Should we set None instead?
+
+TODO: Implement __str__ and __repr__ for easier debugging.
+"""
+
+import copy
+import math
+
+import numpy
+
+# TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
+# then switch to absolute imports within PLRsearch package.
+# Current usage of relative imports is just a short term workaround.
+from log_plus import log_plus  # pylint: disable=relative-import
+
+
+class ScalarStatTracker(object):
+    """Class for tracking one-dimensional samples.
+
+    Variance of one-dimensional data cannot be negative,
+    so this class avoids possible underflows by tracking logarithm
+    of the variance instead.
+
+    Similarly, sum of weights is also tracked as a logarithm.
+    """
+
+    def __init__(self, log_sum_weight=None, average=0.0, log_variance=None):
+        """Initialize new tracker instance, empty by default.
+
+        :param log_sum_weight: Natural logarithm of sum of weights
+            of samples so far. Default: None (as log of 0.0).
+        :param average: Weighted average of the samples. Default: 0.0
+        :param log_variance: Natural logarithm of variance.
+            Default: None (as log of 0.0).
+        :type log_sum_weight: float or None
+        :type average: float
+        :type log_variance: float or None
+        """
+        self.log_sum_weight = log_sum_weight
+        self.average = average
+        self.log_variance = log_variance
+
+    def __repr__(self):
+        """Return string, which interpreted constructs state of self."""
+        return ("ScalarStatTracker(log_sum_weight={lsw!r},average={a!r},"
+                "log_variance={lv!r})".format(
+                    lsw=self.log_sum_weight, a=self.average,
+                    lv=self.log_variance))
+
+    def copy(self):
+        """Return new ScalarStatTracker instance with the same state as self.
+
+        The point of this method is the return type, instances of subclasses
+        can get rid of their additional data this way.
+
+        :returns: New instance with the same core state.
+        :rtype: ScalarStatTracker
+        """
+        return ScalarStatTracker(
+            self.log_sum_weight, self.average, self.log_variance)
+
+    def add(self, scalar_value, log_weight=0.0):
+        """Return updated stats corresponding to addition of another sample.
+
+        :param scalar_value: The scalar value of the sample.
+        :param log_weight: Natural logarithm of weight of the sample.
+            Default: 0.0 (as log of 1.0).
+        :type scalar_value: float
+        :type log_weight: float
+        :returns: Updated self.
+        :rtype: ScalarStatTracker
+        """
+        old_log_sum_weight = self.log_sum_weight
+        if old_log_sum_weight is None:
+            # First sample.
+            self.log_sum_weight = log_weight
+            self.average = scalar_value
+            return self
+        old_average = self.average
+        log_variance = self.log_variance
+        new_log_sum_weight = log_plus(old_log_sum_weight, log_weight)
+        log_sample_ratio = log_weight - new_log_sum_weight
+        sample_ratio = math.exp(log_sample_ratio)
+        shift = scalar_value - old_average
+        new_average = old_average + shift * sample_ratio
+        # The log_variance is updated in-place (instead of new_ vs old_)
+        # because of if-s detecting None-s where the value does not change.
+        absolute_shift = abs(shift)
+        if absolute_shift > 0.0:
+            log_square_shift = 2 * math.log(absolute_shift)
+            log_variance = log_plus(
+                log_variance, log_square_shift + log_sample_ratio)
+        if log_variance is not None:
+            log_variance += old_log_sum_weight - new_log_sum_weight
+        self.log_sum_weight = new_log_sum_weight
+        self.average = new_average
+        self.log_variance = log_variance
+        return self
+
+
+class ScalarDualStatTracker(ScalarStatTracker):
+    """Class for tracking one-dimensional samples, offering dual stats.
+
+    It means that instead of just primary stats (identical to what
+    ScalarStatTracker offers, that is why this is its subclass),
+    this tracker allso offers secondary stats.
+    Secondary stats are scalar stats that track the same data,
+    except that the most weighty (or later if tied) sample is not considered.
+
+    Users can analyze how do secondary stats differ from the primary ones,
+    and based on that decide whether they processed "enough" data.
+    One typical use is for Monte Carlo integrator to decide whether
+    the partial sums so far are reliable enough.
+    """
+
+    def __init__(
+            self, log_sum_weight=None, average=0.0, log_variance=None,
+            log_sum_secondary_weight=None, secondary_average=0.0,
+            log_secondary_variance=None, max_log_weight=None):
+        """Initialize new tracker instance, empty by default.
+
+        :param log_sum_weight: Natural logarithm of sum of weights
+            of samples so far. Default: None (as log of 0.0).
+        :param average: Weighted average of the samples. Default: 0.0
+        :param log_variance: Natural logarithm of variance.
+            Default: None (as log of 0.0).
+        :param log_sum_secondary_weight: Natural logarithm of sum of weights
+            of samples so far except weightest one.
+            Default: None (as log of 0.0).
+        :param secondary_average: Weighted average of the samples
+            except the weightest one. Default: 0.0
+        :param log_variance: Natural logarithm of variance od samples
+            except the weightest one. Default: None (as log of 0.0).
+        :param max_log_weight: Natural logarithm of weight of sample
+            counted in primary but not secondary stats.
+            Default: None (as log of 0.0).
+        :type log_sum_weight: float or None
+        :type average: float
+        :type log_variance: float or None
+        :type log_sum_secondary_weight: float or None
+        :type secondary_average: float
+        :type log_secondary_variance: float or None
+        :type max_log_weight: float or None
+        """
+        # Not using super() as the constructor signature is different,
+        # so in case of diamond inheritance mismatch would be probable.
+        ScalarStatTracker.__init__(self, log_sum_weight, average, log_variance)
+        self.secondary = ScalarStatTracker(
+            log_sum_secondary_weight, secondary_average, log_secondary_variance)
+        self.max_log_weight = max_log_weight
+
+    def __repr__(self):
+        """Return string, which interpreted constructs state of self."""
+        sec = self.secondary
+        return (
+            "ScalarDualStatTracker(log_sum_weight={lsw!r},average={a!r},"
+            "log_variance={lv!r},log_sum_secondary_weight={lssw!r},"
+            "secondary_average={sa!r},log_secondary_variance={lsv!r},"
+            "max_log_weight={mlw!r})".format(
+                lsw=self.log_sum_weight, a=self.average, lv=self.log_variance,
+                lssw=sec.log_sum_weight, sa=sec.average, lsv=sec.log_variance,
+                mlw=self.max_log_weight))
+
+    def add(self, scalar_value, log_weight=0.0):
+        """Return updated both stats after addition of another sample.
+
+        :param scalar_value: The scalar value of the sample.
+        :param log_weight: Natural logarithm of weight of the sample.
+            Default: 0.0 (as log of 1.0).
+        :type scalar_value: float
+        :type log_weight: float
+        :returns: Updated self.
+        :rtype: ScalarDualStatTracker
+        """
+        # Using super() as copy() and add() are not expected to change
+        # signature, so this way diamond inheritance will be supported.
+        primary = super(ScalarDualStatTracker, self)
+        if self.max_log_weight is None or log_weight >= self.max_log_weight:
+            self.max_log_weight = log_weight
+            self.secondary = primary.copy()
+        else:
+            self.secondary.add(scalar_value, log_weight)
+        primary.add(scalar_value, log_weight)
+        return self
+
+
+class VectorStatTracker(object):
+    """Class for tracking multi-dimensional samples.
+
+    Contrary to one-dimensional data, multi-dimensional covariance matrix
+    contains off-diagonal elements which can be negative.
+
+    But, sum of weights is never negative, so it is tracked as a logarithm.
+
+    The code assumes every method gets arguments with correct dimensionality
+    as set in constructor. No checks are performed (for performance reasons).
+
+    TODO: Should we provide a subclass with the dimensionality checks?
+    """
+
+    def __init__(
+            self, dimension=2, log_sum_weight=None, averages=None,
+            covariance_matrix=None):
+        """Initialize new tracker instance, two-dimenstional empty by default.
+
+        If any of latter two arguments is None, it means
+        the tracker state is invalid. Use reset method
+        to create empty tracker of constructed dimentionality.
+
+        :param dimension: Number of scalar components of samples.
+        :param log_sum_weight: Natural logarithm of sum of weights
+            of samples so far. Default: None (as log of 0.0).
+        :param averages: Weighted average of the samples.
+            Default: None
+        :param covariance_matrix: Variance matrix elements.
+            Default: None
+        :type log_sum_weight: float or None
+        :type averages: None or tuple of float
+        :type covariance_matrix: None or tuple of tuple of float
+        """
+        self.dimension = dimension
+        self.log_sum_weight = log_sum_weight
+        self.averages = averages
+        self.covariance_matrix = covariance_matrix
+
+    def __repr__(self):
+        """Return string, which interpreted constructs state of self.
+
+        :returns: Expression contructing an equivalent instance.
+        :rtype: str"""
+        return (
+            "VectorStatTracker(dimension={d!r},log_sum_weight={lsw!r},"
+            "averages={a!r},covariance_matrix={cm!r})".format(
+                d=self.dimension, lsw=self.log_sum_weight, a=self.averages,
+                cm=self.covariance_matrix))
+
+    def copy(self):
+        """Return new instance with the same state as self.
+
+        The main usage is to simplify debugging. This method allows
+        to store the old state, while self continues to track towards new state.
+
+        :returns: Created tracker instance.
+        :rtype: VectorStatTracker
+        """
+        return VectorStatTracker(
+            self.dimension, self.log_sum_weight, self.averages,
+            self.covariance_matrix)
+
+    def reset(self):
+        """Return state set to empty data of proper dimensionality.
+
+        :returns: Updated self.
+        :rtype: VectorStatTracker
+        """
+        self.averages = [0.0 for _ in range(self.dimension)]
+        # TODO: Examine whether we can gain speed by tracking triangle only.
+        self.covariance_matrix = [[0.0 for _ in range(self.dimension)]
+                                  for _ in range(self.dimension)]
+        # TODO: In Python3, list comprehensions are generators,
+        # so they are not indexable. Put list() when converting.
+        return self
+
+    def unit_reset(self):
+        """Reset state but use unit matric as covariance.
+
+        :returns: Updated self.
+        :rtype: VectorStatTracker
+        """
+        self.reset()
+        for index in range(self.dimension):
+            self.covariance_matrix[index][index] = 1.0
+
+    def add_get_shift(self, vector_value, log_weight=0.0):
+        """Return shift and update state to addition of another sample.
+
+        Shift is the vector from old average to new sample.
+        For most callers, returning shift is more useful than returning self.
+
+        :param vector_value: The value of the sample.
+        :param log_weight: Natural logarithm of weight of the sample.
+            Default: 0.0 (as log of 1.0).
+        :type vector_value: iterable of float
+        :type log_weight: float
+        :returns: Updated self.
+        :rtype: VectorStatTracker
+        """
+        dimension = self.dimension
+        old_log_sum_weight = self.log_sum_weight
+        old_averages = self.averages
+        if not old_averages:
+            shift = [0.0 for index in range(dimension)]
+        else:
+            shift = [vector_value[index] - old_averages[index]
+                     for index in range(dimension)]
+        if old_log_sum_weight is None:
+            # First sample.
+            self.log_sum_weight = log_weight
+            self.averages = [vector_value[index] for index in range(dimension)]
+            # Not touching covariance matrix.
+            return shift
+        covariance_matrix = self.covariance_matrix
+        new_log_sum_weight = log_plus(old_log_sum_weight, log_weight)
+        data_ratio = math.exp(old_log_sum_weight - new_log_sum_weight)
+        sample_ratio = math.exp(log_weight - new_log_sum_weight)
+        new_averages = [old_averages[index] + shift[index] * sample_ratio
+                        for index in range(dimension)]
+        # It is easier to update covariance matrix in-place.
+        for second in range(dimension):
+            for first in range(dimension):
+                element = covariance_matrix[first][second]
+                element += shift[first] * shift[second] * sample_ratio
+                element *= data_ratio
+                covariance_matrix[first][second] = element
+        self.log_sum_weight = new_log_sum_weight
+        self.averages = new_averages
+        # self.covariance_matrix still points to the object we updated in-place.
+        return shift
+
+    # TODO: There are some uses for such a vector tracker,
+    # that does not track average, but weightest (latest if tied) value,
+    # and computes covariance matrix centered around that.
+    # But perhaps the following method would work similarly well?
+    def add_without_dominance_get_distance(self, vector_value, log_weight=0.0):
+        """Update stats, avoid having the sample dominate, return old distance.
+
+        If the weight of the incoming sample is far bigger
+        than the weight of all the previous data together,
+        convariance matrix would suffer from underflows.
+        To avoid that, this method manipulates both weights
+        before calling add().
+
+        The old covariance matrix (before update) can define a metric.
+        Shift is a vector, so the metric can be used to compute a distance
+        between old average and new sample.
+
+        :param vector_value: The value of the sample.
+        :param log_weight: Natural logarithm of weight of the sample.
+            Default: 0.0 (as log of 1.0).
+        :type vector_value: iterable of float
+        :type log_weight: float
+        :returns: Updated self.
+        :rtype: VectorStatTracker
+        """
+        lsw = self.log_sum_weight
+        if lsw is not None and lsw < log_weight - 1.0:
+            lsw = (lsw + log_weight) / 2.0
+            log_weight = lsw
+            self.log_sum_weight = lsw
+        old_metric = copy.deepcopy(self.covariance_matrix)
+        shift = self.add_get_shift(vector_value, log_weight)
+        gradient = numpy.linalg.solve(old_metric, shift)
+        distance = numpy.vdot(shift, gradient)
+        return distance
index 5f92888..da364cb 100644 (file)
@@ -425,8 +425,8 @@ class TrafficGenerator(AbstractMeasurer):
         command = (
             "sh -c '{tool}/resources/tools/trex/trex_stateless_profile.py"
             " --profile {prof}/resources/traffic_profiles/trex/{traffic}.py"
-            " --duration {duration} --frame_size {frame_size} --rate {rate}"
-            " --warmup_time {warmup} --port_0 {p_0} --port_1 {p_1}").format(
+            " --duration {duration!r} --frame_size {frame_size} --rate {rate!r}"
+            " --warmup_time {warmup!r} --port_0 {p_0} --port_1 {p_1}").format(
                 tool=Constants.REMOTE_FW_DIR, prof=Constants.REMOTE_FW_DIR,
                 traffic=traffic_profile, duration=duration,
                 frame_size=frame_size, rate=rate, warmup=warmup_time, p_0=p_0,
@@ -698,8 +698,8 @@ class OptimizedSearch(object):
     @staticmethod
     def perform_soak_search(
             frame_size, traffic_profile, minimum_transmit_rate,
-            maximum_transmit_rate, plr_target=1e-7, tdpt=0.2,
-            initial_count=50, timeout=1800.0):
+            maximum_transmit_rate, plr_target=1e-7, tdpt=0.1,
+            initial_count=50, timeout=1800.0, trace_enabled=False):
         """Setup initialized TG, perform soak search, return avg and stdev.
 
         :param frame_size: Frame size identifier or value [B].
@@ -734,6 +734,7 @@ class OptimizedSearch(object):
         algorithm = PLRsearch(
             measurer=tg_instance, trial_duration_per_trial=tdpt,
             packet_loss_ratio_target=plr_target,
-            trial_number_offset=initial_count, timeout=timeout)
+            trial_number_offset=initial_count, timeout=timeout,
+            trace_enabled=trace_enabled)
         result = algorithm.search(minimum_transmit_rate, maximum_transmit_rate)
         return result
index dd3aa64..154674e 100644 (file)
 | | ...
 | | ... | *Example:*
 | | ...
-| | ... | \| Find critical load usingPLR search \| \${1e-7} \| \${1800} \|
+| | ... | \| Find critical load using PLR search \| \${1e-7} \| \${120} \|
 | | ...
-| | [Arguments] | ${packet_loss_ratio}=${1e-7} | ${timeout}=${1800.0}
+| | [Arguments] | ${packet_loss_ratio}=${1e-7} | ${timeout}=${7200.0}
 | | ...
 | | ${min_rate} = | Set Variable | ${20000}
 | | ${average} | ${stdev} = | Perform soak search | ${frame_size}
index 61b244e..9b62975 100755 (executable)
@@ -245,11 +245,11 @@ def simple_burst(profile_file, duration, framesize, rate, warmup_time, port_0,
         else:
             if client:
                 client.disconnect()
-            print("rate={0}, totalReceived={1}, totalSent={2}, "
+            print("rate={0!r}, totalReceived={1}, totalSent={2}, "
                   "frameLoss={3}, latencyStream0(usec)={4}, "
-                  "latencyStream1(usec)={5}".
+                  "latencyStream1(usec)={5}, targetDuration={d!r}".
                   format(rate, total_rcvd, total_sent, lost_a + lost_b,
-                         lat_a, lat_b))
+                         lat_a, lat_b, d=duration))
 
 
 def main():