Improve PLRsearch yet again
[csit.git] / resources / libraries / python / PLRsearch / PLRsearch.py
index 4205818..b7c9344 100644 (file)
@@ -43,6 +43,7 @@ class PLRsearch(object):
 
     Method othed than search (and than __init__)
     are just internal code structure.
+
     TODO: Those method names should start with underscore then.
     """
 
@@ -162,7 +163,7 @@ class PLRsearch(object):
         :type min_rate: float
         :type max_rate: float
         :returns: Average and stdev of critical load estimate.
-        :rtype: 2-tuple of floats
+        :rtype: 2-tuple of float
         """
         stop_time = time.time() + self.timeout
         min_rate = float(min_rate)
@@ -174,7 +175,7 @@ class PLRsearch(object):
         focus_trackers = (None, None)
         transmit_rate = (min_rate + max_rate) / 2.0
         lossy_loads = [max_rate]
-        zeros = [0, 0]  # Cosecutive zero loss, separately for stretch and erf.
+        zeros = 0  # How many cosecutive zero loss results are happening.
         while 1:
             trial_number += 1
             logging.info("Trial %(number)r", {"number": trial_number})
@@ -182,15 +183,14 @@ class PLRsearch(object):
                 self.trial_duration_per_trial * trial_number, transmit_rate,
                 trial_result_list, min_rate, max_rate, focus_trackers)
             measurement, average, stdev, avg1, avg2, focus_trackers = results
-            index = trial_number % 2
-            zeros[index] += 1
+            zeros += 1
             # TODO: Ratio of fill rate to drain rate seems to have
             # exponential impact. Make it configurable, or is 4:3 good enough?
             if measurement.loss_fraction >= self.packet_loss_ratio_target:
-                for _ in range(4 * zeros[index]):
+                for _ in range(4 * zeros):
                     lossy_loads.append(measurement.target_tr)
             if measurement.loss_count > 0:
-                zeros[index] = 0
+                zeros = 0
             lossy_loads.sort()
             if stop_time <= time.time():
                 return average, stdev
@@ -201,20 +201,19 @@ class PLRsearch(object):
                 next_load = (measurement.receive_rate / (
                     1.0 - self.packet_loss_ratio_target))
             else:
-                index = (trial_number + 1) % 2
-                next_load = (avg1, avg2)[index]
-                if zeros[index] > 0:
+                next_load = (avg1 + avg2) / 2.0
+                if zeros > 0:
                     if lossy_loads[0] > next_load:
-                        diminisher = math.pow(2.0, 1 - zeros[index])
+                        diminisher = math.pow(2.0, 1 - zeros)
                         next_load = lossy_loads[0] + diminisher * next_load
                         next_load /= (1.0 + diminisher)
                     # On zero measurement, we need to drain obsoleted low losses
                     # even if we did not use them to increase next_load,
-                    # in order to get to usable loses with higher load.
+                    # in order to get to usable loses at higher loads.
                     if len(lossy_loads) > 3:
                         lossy_loads = lossy_loads[3:]
                 logging.debug("Zeros %(z)r orig %(o)r next %(n)r loads %(s)r",
-                              {"z": zeros, "o": (avg1, avg2)[index],
+                              {"z": zeros, "o": (avg1 + avg2) / 2.0,
                                "n": next_load, "s": lossy_loads})
             transmit_rate = min(max_rate, max(min_rate, next_load))
 
@@ -510,12 +509,12 @@ class PLRsearch(object):
         :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker
         :type max_samples: None or int
         :returns: Measurement and computation results.
-        :rtype: 6-tuple: ReceiveRateMeasurement, 4 floats, 2-tuple of trackers.
+        :rtype: _ComputeResult
         """
         logging.debug(
             "measure_and_compute started with self %(self)r, trial_duration "
-            "%(dur)r, transmit_rate %(tr)r, trial_result_list %(trl)r, "
-            "max_rate %(mr)r, focus_trackers %(track)r, max_samples %(ms)r",
+            "%(dur)r, transmit_rate %(tr)r, trial_result_list %(trl)r, "
+            "max_rate %(mr)r, focus_trackers %(track)r, max_samples %(ms)r",
             {"self": self, "dur": trial_duration, "tr": transmit_rate,
              "trl": trial_result_list, "mr": max_rate, "track": focus_trackers,
              "ms": max_samples})
@@ -620,7 +619,7 @@ class PLRsearch(object):
             :type pipe: multiprocessing.Connection
             :returns: Computed value tracker, actual focus tracker,
                 and number of samples used for this iteration.
-            :rtype: 3-tuple of tracker, tracker and int
+            :rtype: _PartialResult
             """
             pipe.send(None)
             if not pipe.poll(10.0):
@@ -660,23 +659,65 @@ class PLRsearch(object):
 
     @staticmethod
     def _get_result(measurement, stretch_result, erf_result):
-        """Collate results from measure_and_compute"""
+        """Process and collate results from measure_and_compute.
+
+        Turn logarithm based values to exponential ones,
+        combine averages and stdevs of two fitting functions into a whole.
+
+        :param measurement: The trial measurement obtained during computation.
+        :param stretch_result: Computation output for stretch fitting function.
+        :param erf_result: Computation output for erf fitting function.
+        :type measurement: ReceiveRateMeasurement
+        :type stretch_result: _PartialResult
+        :type erf_result: _PartialResult
+        :returns: Combined results.
+        :rtype: _ComputeResult
+        """
         stretch_avg = stretch_result.value_tracker.average
         erf_avg = erf_result.value_tracker.average
-        # TODO: Take into account secondary stats.
-        stretch_stdev = math.exp(stretch_result.value_tracker.log_variance / 2)
-        erf_stdev = math.exp(erf_result.value_tracker.log_variance / 2)
-        avg = math.exp((stretch_avg + erf_avg) / 2.0)
-        var = (stretch_stdev * stretch_stdev + erf_stdev * erf_stdev) / 2.0
-        var += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
-        stdev = avg * math.sqrt(var)
+        stretch_var = stretch_result.value_tracker.get_pessimistic_variance()
+        erf_var = erf_result.value_tracker.get_pessimistic_variance()
+        avg_log = (stretch_avg + erf_avg) / 2.0
+        var_log = (stretch_var + erf_var) / 2.0
+        var_log += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
+        stdev_log = math.sqrt(var_log)
+        low, upp = math.exp(avg_log - stdev_log), math.exp(avg_log + stdev_log)
+        avg = (low + upp) / 2
+        stdev = avg - low
         trackers = (stretch_result.focus_tracker, erf_result.focus_tracker)
         sea = math.exp(stretch_avg)
         eea = math.exp(erf_avg)
         return _ComputeResult(measurement, avg, stdev, sea, eea, trackers)
 
 
-_PartialResult = namedtuple('_PartialResult',
-                            'value_tracker focus_tracker samples')
-_ComputeResult = namedtuple('_ComputeResult', 'measurement avg stdev ' +
-                            'stretch_exp_avg erf_exp_avg trackers')
+# Named tuples, for multiple local variables to be passed as return value.
+_PartialResult = namedtuple(
+    "_PartialResult", "value_tracker focus_tracker samples")
+"""Two stat trackers and sample counter.
+
+:param value_tracker: Tracker for the value (critical load) being integrated.
+:param focus_tracker: Tracker for focusing integration inputs (sample points).
+:param samples: How many samples were used for the computation.
+:type value_tracker: stat_trackers.ScalarDualStatTracker
+:type focus_tracker: stat_trackers.VectorStatTracker
+:type samples: int
+"""
+
+_ComputeResult = namedtuple(
+    "_ComputeResult",
+    "measurement avg stdev stretch_exp_avg erf_exp_avg trackers")
+"""Measurement, 4 computation result values, pair of trackers.
+
+:param measurement: The trial measurement result obtained during computation.
+:param avg: Overall average of critical rate estimate.
+:param stdev: Overall standard deviation of critical rate estimate.
+:param stretch_exp_avg: Stretch fitting function estimate average exponentiated.
+:param erf_exp_avg: Erf fitting function estimate average, exponentiated.
+:param trackers: Pair of focus trackers to start next iteration with.
+:type measurement: ReceiveRateMeasurement
+:type avg: float
+:type stdev: float
+:type stretch_exp_avg: float
+:type erf_exp_avg: float
+:type trackers: 2-tuple of stat_trackers.VectorStatTracker
+"""