Improve PLRsearch yet again 75/21475/1
authorVratko Polak <vrpolak@cisco.com>
Thu, 22 Aug 2019 13:04:34 +0000 (15:04 +0200)
committerVratko Polak <vrpolak@cisco.com>
Fri, 23 Aug 2019 11:02:39 +0000 (11:02 +0000)
Logic improvements for rls1908 and post-pylint fixes:

+ Reduce search time to 30 minutes.
+ Use average instead alternating loads.
+ Rework log/exp avg/stdev from two estimates.
+ Introduce and use pessimistic variance of dual trackers.
+ Introduce safe_exp to use when None does not skip code.
+ Use dot relative imports (instead of disabling pylint).
+ Complete docstrings for simpler functions.
+ Append docstrings to named tuples.
+ Somewhat unify docstrings related to the same arguments.
+ Slightly improve intentation.
+ State named tuples as rtype where used.
+ Add returns and rtype to __repr__ where missing.
+ Return what docstring say (or update them).
+ Explicit copying for vector and matrix values.

Change-Id: I884c68b4839c5df5e8bef95e463666599603a0ff
Signed-off-by: Vratko Polak <vrpolak@cisco.com>
(cherry picked from commit fbbc47359e3f7b59bbd5a84d85c673374933a50a)

resources/libraries/python/PLRsearch/Integrator.py
resources/libraries/python/PLRsearch/PLRsearch.py
resources/libraries/python/PLRsearch/__init__.py
resources/libraries/python/PLRsearch/log_plus.py
resources/libraries/python/PLRsearch/stat_trackers.py
resources/libraries/robot/performance/performance_utils.robot

index 035afd8..86181ea 100644 (file)
@@ -28,13 +28,31 @@ from numpy import random
 # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
 # then switch to absolute imports within PLRsearch package.
 # Current usage of relative imports is just a short term workaround.
-import stat_trackers  # pylint: disable=relative-import
+from . import stat_trackers
 
 
 def try_estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False):
-    """Call estimate_nd but catch any exception and send traceback."""
+    """Call estimate_nd but catch any exception and send traceback.
+
+    This function does not return anything, computation result
+    is sent via the communication pipe instead.
+
+    TODO: Move scale_coeff to a field of data class
+    with constructor/factory hiding the default value,
+    and receive its instance via pipe, instead of argument.
+
+    :param communication_pipe: Endpoint for communication with parent process.
+    :param scale_coeff: Float number to tweak convergence speed with.
+    :param trace_enabled: Whether to emit trace level debugs.
+        Keeping trace disabled improves speed and saves memory.
+        Enable trace only when debugging the computation itself.
+    :type communication_pipe: multiprocessing.Connection
+    :type scale_coeff: float
+    :type trace_enabled: bool
+    :raises BaseException: Anything raised by interpreter or estimate_nd.
+    """
     try:
-        return estimate_nd(communication_pipe, scale_coeff, trace_enabled)
+        estimate_nd(communication_pipe, scale_coeff, trace_enabled)
     except BaseException:
         # Any subclass could have caused estimate_nd to stop before sending,
         # so we have to catch them all.
@@ -46,7 +64,22 @@ def try_estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False):
 
 
 def generate_sample(averages, covariance_matrix, dimension, scale_coeff):
-    """Generate next sample for estimate_nd"""
+    """Generate next sample for estimate_nd.
+
+    Arguments control the multivariate normal "focus".
+    Keep generating until the sample point fits into unit area.
+
+    :param averages: Coordinates of the focus center.
+    :param covariance_matrix: Matrix controlling the spread around the average.
+    :param dimension: If N is dimension, average is N vector and matrix is NxN.
+    :param scale_coeff: Coefficient to conformally multiply the spread.
+    :type averages: Indexable of N floats
+    :type covariance_matrix: Indexable of N indexables of N floats
+    :type dimension: int
+    :type scale_coeff: float
+    :returns: The generated sample point.
+    :rtype: N-tuple of float
+    """
     covariance_matrix = copy.deepcopy(covariance_matrix)
     for first in range(dimension):
         for second in range(dimension):
@@ -142,13 +175,12 @@ def estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False):
     In they are not enabled, trace_list will be empty.
     It is recommended to edit some lines manually to debug_list if needed.
 
-    :param communication_pipe: Pipe to comunicate with boss process.
+    :param communication_pipe: Endpoint for communication with parent process.
     :param scale_coeff: Float number to tweak convergence speed with.
     :param trace_enabled: Whether trace list should be populated at all.
-        Default: False
-    :type communication_pipe: multiprocessing.Connection (or compatible)
+    :type communication_pipe: multiprocessing.Connection
     :type scale_coeff: float
-    :type trace_enabled: boolean
+    :type trace_enabled: bool
     :raises OverflowError: If one sample dominates the rest too much.
         Or if value_logweight_function does not handle
         some part of parameter space carefully enough.
@@ -201,10 +233,9 @@ def estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False):
     while not communication_pipe.poll():
         if max_samples and samples >= max_samples:
             break
-        sample_point = generate_sample(param_focus_tracker.averages,
-                                       param_focus_tracker.covariance_matrix,
-                                       dimension,
-                                       scale_coeff)
+        sample_point = generate_sample(
+            param_focus_tracker.averages, param_focus_tracker.covariance_matrix,
+            dimension, scale_coeff)
         trace("sample_point", sample_point)
         samples += 1
         trace("samples", samples)
index 4205818..b7c9344 100644 (file)
@@ -43,6 +43,7 @@ class PLRsearch(object):
 
     Method othed than search (and than __init__)
     are just internal code structure.
+
     TODO: Those method names should start with underscore then.
     """
 
@@ -162,7 +163,7 @@ class PLRsearch(object):
         :type min_rate: float
         :type max_rate: float
         :returns: Average and stdev of critical load estimate.
-        :rtype: 2-tuple of floats
+        :rtype: 2-tuple of float
         """
         stop_time = time.time() + self.timeout
         min_rate = float(min_rate)
@@ -174,7 +175,7 @@ class PLRsearch(object):
         focus_trackers = (None, None)
         transmit_rate = (min_rate + max_rate) / 2.0
         lossy_loads = [max_rate]
-        zeros = [0, 0]  # Cosecutive zero loss, separately for stretch and erf.
+        zeros = 0  # How many cosecutive zero loss results are happening.
         while 1:
             trial_number += 1
             logging.info("Trial %(number)r", {"number": trial_number})
@@ -182,15 +183,14 @@ class PLRsearch(object):
                 self.trial_duration_per_trial * trial_number, transmit_rate,
                 trial_result_list, min_rate, max_rate, focus_trackers)
             measurement, average, stdev, avg1, avg2, focus_trackers = results
-            index = trial_number % 2
-            zeros[index] += 1
+            zeros += 1
             # TODO: Ratio of fill rate to drain rate seems to have
             # exponential impact. Make it configurable, or is 4:3 good enough?
             if measurement.loss_fraction >= self.packet_loss_ratio_target:
-                for _ in range(4 * zeros[index]):
+                for _ in range(4 * zeros):
                     lossy_loads.append(measurement.target_tr)
             if measurement.loss_count > 0:
-                zeros[index] = 0
+                zeros = 0
             lossy_loads.sort()
             if stop_time <= time.time():
                 return average, stdev
@@ -201,20 +201,19 @@ class PLRsearch(object):
                 next_load = (measurement.receive_rate / (
                     1.0 - self.packet_loss_ratio_target))
             else:
-                index = (trial_number + 1) % 2
-                next_load = (avg1, avg2)[index]
-                if zeros[index] > 0:
+                next_load = (avg1 + avg2) / 2.0
+                if zeros > 0:
                     if lossy_loads[0] > next_load:
-                        diminisher = math.pow(2.0, 1 - zeros[index])
+                        diminisher = math.pow(2.0, 1 - zeros)
                         next_load = lossy_loads[0] + diminisher * next_load
                         next_load /= (1.0 + diminisher)
                     # On zero measurement, we need to drain obsoleted low losses
                     # even if we did not use them to increase next_load,
-                    # in order to get to usable loses with higher load.
+                    # in order to get to usable loses at higher loads.
                     if len(lossy_loads) > 3:
                         lossy_loads = lossy_loads[3:]
                 logging.debug("Zeros %(z)r orig %(o)r next %(n)r loads %(s)r",
-                              {"z": zeros, "o": (avg1, avg2)[index],
+                              {"z": zeros, "o": (avg1 + avg2) / 2.0,
                                "n": next_load, "s": lossy_loads})
             transmit_rate = min(max_rate, max(min_rate, next_load))
 
@@ -510,12 +509,12 @@ class PLRsearch(object):
         :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker
         :type max_samples: None or int
         :returns: Measurement and computation results.
-        :rtype: 6-tuple: ReceiveRateMeasurement, 4 floats, 2-tuple of trackers.
+        :rtype: _ComputeResult
         """
         logging.debug(
             "measure_and_compute started with self %(self)r, trial_duration "
-            "%(dur)r, transmit_rate %(tr)r, trial_result_list %(trl)r, "
-            "max_rate %(mr)r, focus_trackers %(track)r, max_samples %(ms)r",
+            "%(dur)r, transmit_rate %(tr)r, trial_result_list %(trl)r, "
+            "max_rate %(mr)r, focus_trackers %(track)r, max_samples %(ms)r",
             {"self": self, "dur": trial_duration, "tr": transmit_rate,
              "trl": trial_result_list, "mr": max_rate, "track": focus_trackers,
              "ms": max_samples})
@@ -620,7 +619,7 @@ class PLRsearch(object):
             :type pipe: multiprocessing.Connection
             :returns: Computed value tracker, actual focus tracker,
                 and number of samples used for this iteration.
-            :rtype: 3-tuple of tracker, tracker and int
+            :rtype: _PartialResult
             """
             pipe.send(None)
             if not pipe.poll(10.0):
@@ -660,23 +659,65 @@ class PLRsearch(object):
 
     @staticmethod
     def _get_result(measurement, stretch_result, erf_result):
-        """Collate results from measure_and_compute"""
+        """Process and collate results from measure_and_compute.
+
+        Turn logarithm based values to exponential ones,
+        combine averages and stdevs of two fitting functions into a whole.
+
+        :param measurement: The trial measurement obtained during computation.
+        :param stretch_result: Computation output for stretch fitting function.
+        :param erf_result: Computation output for erf fitting function.
+        :type measurement: ReceiveRateMeasurement
+        :type stretch_result: _PartialResult
+        :type erf_result: _PartialResult
+        :returns: Combined results.
+        :rtype: _ComputeResult
+        """
         stretch_avg = stretch_result.value_tracker.average
         erf_avg = erf_result.value_tracker.average
-        # TODO: Take into account secondary stats.
-        stretch_stdev = math.exp(stretch_result.value_tracker.log_variance / 2)
-        erf_stdev = math.exp(erf_result.value_tracker.log_variance / 2)
-        avg = math.exp((stretch_avg + erf_avg) / 2.0)
-        var = (stretch_stdev * stretch_stdev + erf_stdev * erf_stdev) / 2.0
-        var += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
-        stdev = avg * math.sqrt(var)
+        stretch_var = stretch_result.value_tracker.get_pessimistic_variance()
+        erf_var = erf_result.value_tracker.get_pessimistic_variance()
+        avg_log = (stretch_avg + erf_avg) / 2.0
+        var_log = (stretch_var + erf_var) / 2.0
+        var_log += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
+        stdev_log = math.sqrt(var_log)
+        low, upp = math.exp(avg_log - stdev_log), math.exp(avg_log + stdev_log)
+        avg = (low + upp) / 2
+        stdev = avg - low
         trackers = (stretch_result.focus_tracker, erf_result.focus_tracker)
         sea = math.exp(stretch_avg)
         eea = math.exp(erf_avg)
         return _ComputeResult(measurement, avg, stdev, sea, eea, trackers)
 
 
-_PartialResult = namedtuple('_PartialResult',
-                            'value_tracker focus_tracker samples')
-_ComputeResult = namedtuple('_ComputeResult', 'measurement avg stdev ' +
-                            'stretch_exp_avg erf_exp_avg trackers')
+# Named tuples, for multiple local variables to be passed as return value.
+_PartialResult = namedtuple(
+    "_PartialResult", "value_tracker focus_tracker samples")
+"""Two stat trackers and sample counter.
+
+:param value_tracker: Tracker for the value (critical load) being integrated.
+:param focus_tracker: Tracker for focusing integration inputs (sample points).
+:param samples: How many samples were used for the computation.
+:type value_tracker: stat_trackers.ScalarDualStatTracker
+:type focus_tracker: stat_trackers.VectorStatTracker
+:type samples: int
+"""
+
+_ComputeResult = namedtuple(
+    "_ComputeResult",
+    "measurement avg stdev stretch_exp_avg erf_exp_avg trackers")
+"""Measurement, 4 computation result values, pair of trackers.
+
+:param measurement: The trial measurement result obtained during computation.
+:param avg: Overall average of critical rate estimate.
+:param stdev: Overall standard deviation of critical rate estimate.
+:param stretch_exp_avg: Stretch fitting function estimate average exponentiated.
+:param erf_exp_avg: Erf fitting function estimate average, exponentiated.
+:param trackers: Pair of focus trackers to start next iteration with.
+:type measurement: ReceiveRateMeasurement
+:type avg: float
+:type stdev: float
+:type stretch_exp_avg: float
+:type erf_exp_avg: float
+:type trackers: 2-tuple of stat_trackers.VectorStatTracker
+"""
index bce7038..6d1559d 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2018 Cisco and/or its affiliates.
+# Copyright (c) 2019 Cisco and/or its affiliates.
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
index 3f21cc7..1c802a5 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2018 Cisco and/or its affiliates.
+# Copyright (c) 2019 Cisco and/or its affiliates.
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
@@ -87,3 +87,16 @@ def log_minus(first, second):
         raise RuntimeError("log_minus: non-positive number to log")
     else:
         return first + math.log(factor)
+
+
+def safe_exp(log_value):
+    """Return exponential of the argument, or zero if the argument is None.
+
+    :param log_value: The value to exponentiate.
+    :type log_value: NoneType or float
+    :returns: The exponentiated value.
+    :rtype: float
+    """
+    if log_value is None:
+        return 0.0
+    return math.exp(log_value)
index 168b09a..58ad98f 100644 (file)
@@ -29,7 +29,7 @@ import numpy
 # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
 # then switch to absolute imports within PLRsearch package.
 # Current usage of relative imports is just a short term workaround.
-from log_plus import log_plus  # pylint: disable=relative-import
+from .log_plus import log_plus, safe_exp
 
 
 class ScalarStatTracker(object):
@@ -59,7 +59,11 @@ class ScalarStatTracker(object):
         self.log_variance = log_variance
 
     def __repr__(self):
-        """Return string, which interpreted constructs state of self."""
+        """Return string, which interpreted constructs state of self.
+
+        :returns: Expression contructing an equivalent instance.
+        :rtype: str
+        """
         return ("ScalarStatTracker(log_sum_weight={lsw!r},average={a!r},"
                 "log_variance={lv!r})".format(
                     lsw=self.log_sum_weight, a=self.average,
@@ -168,7 +172,11 @@ class ScalarDualStatTracker(ScalarStatTracker):
         self.max_log_weight = max_log_weight
 
     def __repr__(self):
-        """Return string, which interpreted constructs state of self."""
+        """Return string, which interpreted constructs state of self.
+
+        :returns: Expression contructing an equivalent instance.
+        :rtype: str
+        """
         sec = self.secondary
         return (
             "ScalarDualStatTracker(log_sum_weight={lsw!r},average={a!r},"
@@ -202,6 +210,27 @@ class ScalarDualStatTracker(ScalarStatTracker):
         return self
 
 
+    def get_pessimistic_variance(self):
+        """Return estimate of variance reflecting weight effects.
+
+        Typical scenario is the primary tracker dominated by a single sample.
+        In worse case, secondary tracker is also dominated by
+        a single (but different) sample.
+
+        Current implementation simply returns variance of average
+        of the two trackers, as if they were independent.
+
+        :returns: Pessimistic estimate of variance (not stdev, no log).
+        :rtype: float
+        """
+        var_primary = safe_exp(self.log_variance)
+        var_secondary = safe_exp(self.secondary.log_variance)
+        var_combined = (var_primary + var_secondary) / 2
+        avg_half_diff = (self.average - self.secondary.average) / 2
+        var_combined += avg_half_diff * avg_half_diff
+        return var_combined
+
+
 class VectorStatTracker(object):
     """Class for tracking multi-dimensional samples.
 
@@ -245,7 +274,8 @@ class VectorStatTracker(object):
         """Return string, which interpreted constructs state of self.
 
         :returns: Expression contructing an equivalent instance.
-        :rtype: str"""
+        :rtype: str
+        """
         return (
             "VectorStatTracker(dimension={d!r},log_sum_weight={lsw!r},"
             "averages={a!r},covariance_matrix={cm!r})".format(
@@ -262,8 +292,8 @@ class VectorStatTracker(object):
         :rtype: VectorStatTracker
         """
         return VectorStatTracker(
-            self.dimension, self.log_sum_weight, self.averages,
-            self.covariance_matrix)
+            self.dimension, self.log_sum_weight, self.averages[:],
+            copy.deepcopy(self.covariance_matrix))
 
     def reset(self):
         """Return state set to empty data of proper dimensionality.
@@ -288,6 +318,7 @@ class VectorStatTracker(object):
         self.reset()
         for index in range(self.dimension):
             self.covariance_matrix[index][index] = 1.0
+        return self
 
     def add_get_shift(self, vector_value, log_weight=0.0):
         """Return shift and update state to addition of another sample.
@@ -300,8 +331,8 @@ class VectorStatTracker(object):
             Default: 0.0 (as log of 1.0).
         :type vector_value: iterable of float
         :type log_weight: float
-        :returns: Updated self.
-        :rtype: VectorStatTracker
+        :returns: Shift vector
+        :rtype: list of float
         """
         dimension = self.dimension
         old_log_sum_weight = self.log_sum_weight
index 7f97137..54b20cb 100644 (file)
 | | ...
 | | ... | \| Find critical load using PLR search \| \${1e-7} \| \${120} \|
 | | ...
-| | [Arguments] | ${packet_loss_ratio}=${1e-7} | ${timeout}=${7200.0}
+| | [Arguments] | ${packet_loss_ratio}=${1e-7} | ${timeout}=${1800.0}
 | | ...
 | | ${min_rate} = | Set Variable | ${20000}
 | | ${average} | ${stdev} = | Perform soak search | ${frame_size}

©2016 FD.io a Linux Foundation Collaborative Project. All Rights Reserved.
Linux Foundation is a registered trademark of The Linux Foundation. Linux is a registered trademark of Linus Torvalds.
Please see our privacy policy and terms of use.