style(PLRsearch): format according to black
[csit.git] / resources / libraries / python / PLRsearch / PLRsearch.py
index 37ee468..e0eea23 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2020 Cisco and/or its affiliates.
+# Copyright (c) 2024 Cisco and/or its affiliates.
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
@@ -53,10 +53,18 @@ class PLRsearch:
     log_xerfcx_10 = math.log(xerfcx_limit - math.exp(10) * erfcx(math.exp(10)))
 
     def __init__(
     log_xerfcx_10 = math.log(xerfcx_limit - math.exp(10) * erfcx(math.exp(10)))
 
     def __init__(
-            self, measurer, trial_duration_per_trial, packet_loss_ratio_target,
-            trial_number_offset=0, timeout=1800.0, trace_enabled=False):
+        self,
+        measurer,
+        trial_duration_per_trial,
+        packet_loss_ratio_target,
+        trial_number_offset=0,
+        timeout=7200.0,
+        trace_enabled=False,
+    ):
         """Store rate measurer and additional parameters.
 
         """Store rate measurer and additional parameters.
 
+        The measurer must never report negative loss count.
+
         TODO: Copy AbstractMeasurer from MLRsearch.
 
         :param measurer: The measurer to call when searching.
         TODO: Copy AbstractMeasurer from MLRsearch.
 
         :param measurer: The measurer to call when searching.
@@ -184,17 +192,21 @@ class PLRsearch:
             trial_number += 1
             logging.info(f"Trial {trial_number!r}")
             results = self.measure_and_compute(
             trial_number += 1
             logging.info(f"Trial {trial_number!r}")
             results = self.measure_and_compute(
-                self.trial_duration_per_trial * trial_number, transmit_rate,
-                trial_result_list, min_rate, max_rate, focus_trackers
+                self.trial_duration_per_trial * trial_number,
+                transmit_rate,
+                trial_result_list,
+                min_rate,
+                max_rate,
+                focus_trackers,
             )
             measurement, average, stdev, avg1, avg2, focus_trackers = results
             zeros += 1
             # TODO: Ratio of fill rate to drain rate seems to have
             # exponential impact. Make it configurable, or is 4:3 good enough?
             )
             measurement, average, stdev, avg1, avg2, focus_trackers = results
             zeros += 1
             # TODO: Ratio of fill rate to drain rate seems to have
             # exponential impact. Make it configurable, or is 4:3 good enough?
-            if measurement.loss_fraction >= self.packet_loss_ratio_target:
+            if measurement.loss_ratio >= self.packet_loss_ratio_target:
                 for _ in range(4 * zeros):
                 for _ in range(4 * zeros):
-                    lossy_loads.append(measurement.target_tr)
-            if measurement.loss_count > 0:
+                    lossy_loads.append(measurement.intended_load)
+            if measurement.loss_ratio > 0.0:
                 zeros = 0
             lossy_loads.sort()
             if stop_time <= time.time():
                 zeros = 0
             lossy_loads.sort()
             if stop_time <= time.time():
@@ -203,15 +215,16 @@ class PLRsearch:
             if (trial_number - self.trial_number_offset) <= 1:
                 next_load = max_rate
             elif (trial_number - self.trial_number_offset) <= 3:
             if (trial_number - self.trial_number_offset) <= 1:
                 next_load = max_rate
             elif (trial_number - self.trial_number_offset) <= 3:
-                next_load = (measurement.receive_rate / (
-                    1.0 - self.packet_loss_ratio_target))
+                next_load = measurement.relative_forwarding_rate / (
+                    1.0 - self.packet_loss_ratio_target
+                )
             else:
                 next_load = (avg1 + avg2) / 2.0
                 if zeros > 0:
                     if lossy_loads[0] > next_load:
                         diminisher = math.pow(2.0, 1 - zeros)
                         next_load = lossy_loads[0] + diminisher * next_load
             else:
                 next_load = (avg1 + avg2) / 2.0
                 if zeros > 0:
                     if lossy_loads[0] > next_load:
                         diminisher = math.pow(2.0, 1 - zeros)
                         next_load = lossy_loads[0] + diminisher * next_load
-                        next_load /= (1.0 + diminisher)
+                        next_load /= 1.0 + diminisher
                     # On zero measurement, we need to drain obsoleted low losses
                     # even if we did not use them to increase next_load,
                     # in order to get to usable loses at higher loads.
                     # On zero measurement, we need to drain obsoleted low losses
                     # even if we did not use them to increase next_load,
                     # in order to get to usable loses at higher loads.
@@ -261,22 +274,22 @@ class PLRsearch:
         # TODO: chi is from https://en.wikipedia.org/wiki/Nondimensionalization
         chi = (load - mrr) / spread
         chi0 = -mrr / spread
         # TODO: chi is from https://en.wikipedia.org/wiki/Nondimensionalization
         chi = (load - mrr) / spread
         chi0 = -mrr / spread
-        trace(u"stretch: load", load)
-        trace(u"mrr", mrr)
-        trace(u"spread", spread)
-        trace(u"chi", chi)
-        trace(u"chi0", chi0)
+        trace("stretch: load", load)
+        trace("mrr", mrr)
+        trace("spread", spread)
+        trace("chi", chi)
+        trace("chi0", chi0)
         if chi > 0:
             log_lps = math.log(
                 load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread
             )
         if chi > 0:
             log_lps = math.log(
                 load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread
             )
-            trace(u"big loss direct log_lps", log_lps)
+            trace("big loss direct log_lps", log_lps)
         else:
             two_positive = log_plus(chi, 2 * chi0 - log_2)
             two_negative = log_plus(chi0, 2 * chi - log_2)
             if two_positive <= two_negative:
                 log_lps = log_minus(chi, chi0) + log_spread
         else:
             two_positive = log_plus(chi, 2 * chi0 - log_2)
             two_negative = log_plus(chi0, 2 * chi - log_2)
             if two_positive <= two_negative:
                 log_lps = log_minus(chi, chi0) + log_spread
-                trace(u"small loss crude log_lps", log_lps)
+                trace("small loss crude log_lps", log_lps)
                 return log_lps
             two = log_minus(two_positive, two_negative)
             three_positive = log_plus(two_positive, 3 * chi - log_3)
                 return log_lps
             two = log_minus(two_positive, two_negative)
             three_positive = log_plus(two_positive, 3 * chi - log_3)
@@ -284,11 +297,11 @@ class PLRsearch:
             three = log_minus(three_positive, three_negative)
             if two == three:
                 log_lps = two + log_spread
             three = log_minus(three_positive, three_negative)
             if two == three:
                 log_lps = two + log_spread
-                trace(u"small loss approx log_lps", log_lps)
+                trace("small loss approx log_lps", log_lps)
             else:
                 log_lps = math.log(log_plus(0, chi) - log_plus(0, chi0))
                 log_lps += log_spread
             else:
                 log_lps = math.log(log_plus(0, chi) - log_plus(0, chi0))
                 log_lps += log_spread
-                trace(u"small loss direct log_lps", log_lps)
+                trace("small loss direct log_lps", log_lps)
         return log_lps
 
     @staticmethod
         return log_lps
 
     @staticmethod
@@ -327,26 +340,26 @@ class PLRsearch:
         # TODO: The stretch sign is just to have less minuses. Worth changing?
         chi = (mrr - load) / spread
         chi0 = mrr / spread
         # TODO: The stretch sign is just to have less minuses. Worth changing?
         chi = (mrr - load) / spread
         chi0 = mrr / spread
-        trace(u"Erf: load", load)
-        trace(u"mrr", mrr)
-        trace(u"spread", spread)
-        trace(u"chi", chi)
-        trace(u"chi0", chi0)
+        trace("Erf: load", load)
+        trace("mrr", mrr)
+        trace("spread", spread)
+        trace("chi", chi)
+        trace("chi0", chi0)
         if chi >= -1.0:
         if chi >= -1.0:
-            trace(u"positive, b roughly bigger than m", None)
+            trace("positive, b roughly bigger than m", None)
             if chi > math.exp(10):
                 first = PLRsearch.log_xerfcx_10 + 2 * (math.log(chi) - 10)
             if chi > math.exp(10):
                 first = PLRsearch.log_xerfcx_10 + 2 * (math.log(chi) - 10)
-                trace(u"approximated first", first)
+                trace("approximated first", first)
             else:
                 first = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi))
             else:
                 first = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi))
-                trace(u"exact first", first)
+                trace("exact first", first)
             first -= chi * chi
             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
             second -= chi0 * chi0
             intermediate = log_minus(first, second)
             first -= chi * chi
             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
             second -= chi0 * chi0
             intermediate = log_minus(first, second)
-            trace(u"first", first)
+            trace("first", first)
         else:
         else:
-            trace(u"negative, b roughly smaller than m", None)
+            trace("negative, b roughly smaller than m", None)
             exp_first = PLRsearch.xerfcx_limit + chi * erfcx(-chi)
             exp_first *= math.exp(-chi * chi)
             exp_first -= 2 * chi
             exp_first = PLRsearch.xerfcx_limit + chi * erfcx(-chi)
             exp_first *= math.exp(-chi * chi)
             exp_first -= 2 * chi
@@ -357,17 +370,17 @@ class PLRsearch:
             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
             second -= chi0 * chi0
             intermediate = math.log(exp_first - math.exp(second))
             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
             second -= chi0 * chi0
             intermediate = math.log(exp_first - math.exp(second))
-            trace(u"exp_first", exp_first)
-        trace(u"second", second)
-        trace(u"intermediate", intermediate)
+            trace("exp_first", exp_first)
+        trace("second", second)
+        trace("intermediate", intermediate)
         result = intermediate + math.log(spread) - math.log(erfc(-chi0))
         result = intermediate + math.log(spread) - math.log(erfc(-chi0))
-        trace(u"result", result)
+        trace("result", result)
         return result
 
     @staticmethod
     def find_critical_rate(
         return result
 
     @staticmethod
     def find_critical_rate(
-            trace, lfit_func, min_rate, max_rate, loss_ratio_target,
-            mrr, spread):
+        trace, lfit_func, min_rate, max_rate, loss_ratio_target, mrr, spread
+    ):
         """Given ratio target and parameters, return the achieving offered load.
 
         This is basically an inverse function to lfit_func
         """Given ratio target and parameters, return the achieving offered load.
 
         This is basically an inverse function to lfit_func
@@ -409,12 +422,12 @@ class PLRsearch:
             loss_rate = math.exp(lfit_func(trace, rate, mrr, spread))
             loss_ratio = loss_rate / rate
             if loss_ratio > loss_ratio_target:
             loss_rate = math.exp(lfit_func(trace, rate, mrr, spread))
             loss_ratio = loss_rate / rate
             if loss_ratio > loss_ratio_target:
-                trace(u"halving down", rate)
+                trace("halving down", rate)
                 rate_hi = rate
             elif loss_ratio < loss_ratio_target:
                 rate_hi = rate
             elif loss_ratio < loss_ratio_target:
-                trace(u"halving up", rate)
+                trace("halving up", rate)
                 rate_lo = rate
                 rate_lo = rate
-        trace(u"found", rate)
+        trace("found", rate)
         return rate
 
     @staticmethod
         return rate
 
     @staticmethod
@@ -424,55 +437,71 @@ class PLRsearch:
         Integrator assumes uniform distribution, but over different parameters.
         Weight and likelihood are used interchangeably here anyway.
 
         Integrator assumes uniform distribution, but over different parameters.
         Weight and likelihood are used interchangeably here anyway.
 
-        Each trial has an offered load, a duration and a loss count.
-        Fitting function is used to compute the average loss per second.
-        Poisson distribution (with average loss per trial) is used
+        Each trial has an intended load, a sent count and a loss count
+        (probably counting unsent packets as loss, as they signal
+        the load is too high for the traffic generator).
+        The fitting function is used to compute the average loss rate.
+        Geometric distribution (with average loss per trial) is used
         to get likelihood of one trial result, the overal likelihood
         is a product of all trial likelihoods.
         As likelihoods can be extremely small, logarithms are tracked instead.
 
         to get likelihood of one trial result, the overal likelihood
         is a product of all trial likelihoods.
         As likelihoods can be extremely small, logarithms are tracked instead.
 
-        TODO: Copy ReceiveRateMeasurement from MLRsearch.
+        The current implementation does not use direct loss rate
+        from the fitting function, as the input and output units may not match
+        (e.g. intended load in TCP transactions, loss in packets).
+        Instead, the expected average loss is scaled according to the number
+        of packets actually sent.
+
+        TODO: Copy MeasurementResult from MLRsearch.
 
         :param trace: A multiprocessing-friendly logging function (closure).
         :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
         :param trial_result_list: List of trial measurement results.
         :param mrr: The mrr parameter for the fitting function.
 
         :param trace: A multiprocessing-friendly logging function (closure).
         :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
         :param trial_result_list: List of trial measurement results.
         :param mrr: The mrr parameter for the fitting function.
-        :param spread: The spread parameter for the fittinmg function.
+        :param spread: The spread parameter for the fitting function.
         :type trace: function (str, object) -> None
         :type lfit_func: Function from 3 floats to float.
         :type trace: function (str, object) -> None
         :type lfit_func: Function from 3 floats to float.
-        :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
+        :type trial_result_list: list of MLRsearch.MeasurementResult
         :type mrr: float
         :type spread: float
         :returns: Logarithm of result weight for given function and parameters.
         :rtype: float
         """
         log_likelihood = 0.0
         :type mrr: float
         :type spread: float
         :returns: Logarithm of result weight for given function and parameters.
         :rtype: float
         """
         log_likelihood = 0.0
-        trace(u"log_weight for mrr", mrr)
-        trace(u"spread", spread)
+        trace("log_weight for mrr", mrr)
+        trace("spread", spread)
         for result in trial_result_list:
         for result in trial_result_list:
-            trace(u"for tr", result.target_tr)
-            trace(u"lc", result.loss_count)
-            trace(u"d", result.duration)
-            log_avg_loss_per_second = lfit_func(
-                trace, result.target_tr, mrr, spread
+            trace("for tr", result.intended_load)
+            trace("lc", result.loss_count)
+            trace("d", result.intended_duration)
+            # _rel_ values use units of intended_load (transactions per second).
+            log_avg_rel_loss_per_second = lfit_func(
+                trace, result.intended_load, mrr, spread
             )
             )
-            log_avg_loss_per_trial = (
-                log_avg_loss_per_second + math.log(result.duration)
+            # _abs_ values use units of loss count (maybe packets).
+            # There can be multiple packets per transaction.
+            log_avg_abs_loss_per_trial = log_avg_rel_loss_per_second + math.log(
+                result.offered_count / result.intended_load
             )
             )
-            # Poisson probability computation works nice for logarithms.
-            log_trial_likelihood = (
-                result.loss_count * log_avg_loss_per_trial
-                - math.exp(log_avg_loss_per_trial)
-            )
-            log_trial_likelihood -= math.lgamma(1 + result.loss_count)
+            # Geometric probability computation for logarithms.
+            log_trial_likelihood = log_plus(0.0, -log_avg_abs_loss_per_trial)
+            log_trial_likelihood *= -result.loss_count
+            log_trial_likelihood -= log_plus(0.0, +log_avg_abs_loss_per_trial)
             log_likelihood += log_trial_likelihood
             log_likelihood += log_trial_likelihood
-            trace(u"avg_loss_per_trial", math.exp(log_avg_loss_per_trial))
-            trace(u"log_trial_likelihood", log_trial_likelihood)
+            trace("avg_loss_per_trial", math.exp(log_avg_abs_loss_per_trial))
+            trace("log_trial_likelihood", log_trial_likelihood)
         return log_likelihood
 
     def measure_and_compute(
         return log_likelihood
 
     def measure_and_compute(
-            self, trial_duration, transmit_rate, trial_result_list,
-            min_rate, max_rate, focus_trackers=(None, None), max_samples=None):
+        self,
+        trial_duration,
+        transmit_rate,
+        trial_result_list,
+        min_rate,
+        max_rate,
+        focus_trackers=(None, None),
+        max_samples=None,
+    ):
         """Perform both measurement and computation at once.
 
         High level steps: Prepare and launch computation worker processes,
         """Perform both measurement and computation at once.
 
         High level steps: Prepare and launch computation worker processes,
@@ -513,7 +542,7 @@ class PLRsearch:
         :param max_samples: Limit for integrator samples, for debugging.
         :type trial_duration: float
         :type transmit_rate: float
         :param max_samples: Limit for integrator samples, for debugging.
         :type trial_duration: float
         :type transmit_rate: float
-        :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
+        :type trial_result_list: list of MLRsearch.MeasurementResult
         :type min_rate: float
         :type max_rate: float
         :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker
         :type min_rate: float
         :type max_rate: float
         :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker
@@ -553,6 +582,20 @@ class PLRsearch:
             :rtype: multiprocessing.Connection
             """
 
             :rtype: multiprocessing.Connection
             """
 
+            boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
+            # Starting the worker first. Contrary to documentation
+            # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection
+            # sending of large object without active listener on the other side
+            # results in a deadlock, not in a ValueError.
+            # See https://stackoverflow.com/questions/15137292/large-objects-and-multiprocessing-pipes-and-send
+            worker = multiprocessing.Process(
+                target=Integrator.try_estimate_nd,
+                args=(worker_pipe_end, 5.0, self.trace_enabled),
+            )
+            worker.daemon = True
+            worker.start()
+
+            # Only now it is safe to send the function to compute with.
             def value_logweight_func(trace, x_mrr, x_spread):
                 """Return log of critical rate and log of likelihood.
 
             def value_logweight_func(trace, x_mrr, x_spread):
                 """Return log of critical rate and log of likelihood.
 
@@ -591,23 +634,21 @@ class PLRsearch:
                 )
                 value = math.log(
                     self.find_critical_rate(
                 )
                 value = math.log(
                     self.find_critical_rate(
-                        trace, fitting_function, min_rate, max_rate,
-                        self.packet_loss_ratio_target, mrr, spread
+                        trace,
+                        fitting_function,
+                        min_rate,
+                        max_rate,
+                        self.packet_loss_ratio_target,
+                        mrr,
+                        spread,
                     )
                 )
                 return value, logweight
 
             dilled_function = dill.dumps(value_logweight_func)
                     )
                 )
                 return value, logweight
 
             dilled_function = dill.dumps(value_logweight_func)
-            boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
             boss_pipe_end.send(
                 (dimension, dilled_function, focus_tracker, max_samples)
             )
             boss_pipe_end.send(
                 (dimension, dilled_function, focus_tracker, max_samples)
             )
-            worker = multiprocessing.Process(
-                target=Integrator.try_estimate_nd,
-                args=(worker_pipe_end, 10.0, self.trace_enabled)
-            )
-            worker.daemon = True
-            worker.start()
             return boss_pipe_end
 
         erf_pipe = start_computing(self.lfit_erf, erf_focus_tracker)
             return boss_pipe_end
 
         erf_pipe = start_computing(self.lfit_erf, erf_focus_tracker)
@@ -646,9 +687,13 @@ class PLRsearch:
                 raise RuntimeError(f"Worker {name} did not finish!")
             result_or_traceback = pipe.recv()
             try:
                 raise RuntimeError(f"Worker {name} did not finish!")
             result_or_traceback = pipe.recv()
             try:
-                value_tracker, focus_tracker, debug_list, trace_list, sampls = (
-                    result_or_traceback
-                )
+                (
+                    value_tracker,
+                    focus_tracker,
+                    debug_list,
+                    trace_list,
+                    sampls,
+                ) = result_or_traceback
             except ValueError:
                 raise RuntimeError(
                     f"Worker {name} failed with the following traceback:\n"
             except ValueError:
                 raise RuntimeError(
                     f"Worker {name} failed with the following traceback:\n"
@@ -664,8 +709,8 @@ class PLRsearch:
             )
             return _PartialResult(value_tracker, focus_tracker, sampls)
 
             )
             return _PartialResult(value_tracker, focus_tracker, sampls)
 
-        stretch_result = stop_computing(u"stretch", stretch_pipe)
-        erf_result = stop_computing(u"erf", erf_pipe)
+        stretch_result = stop_computing("stretch", stretch_pipe)
+        erf_result = stop_computing("erf", erf_pipe)
         result = PLRsearch._get_result(measurement, stretch_result, erf_result)
         logging.info(
             f"measure_and_compute finished with trial result "
         result = PLRsearch._get_result(measurement, stretch_result, erf_result)
         logging.info(
             f"measure_and_compute finished with trial result "
@@ -687,7 +732,7 @@ class PLRsearch:
         :param measurement: The trial measurement obtained during computation.
         :param stretch_result: Computation output for stretch fitting function.
         :param erf_result: Computation output for erf fitting function.
         :param measurement: The trial measurement obtained during computation.
         :param stretch_result: Computation output for stretch fitting function.
         :param erf_result: Computation output for erf fitting function.
-        :type measurement: ReceiveRateMeasurement
+        :type measurement: MeasurementResult
         :type stretch_result: _PartialResult
         :type erf_result: _PartialResult
         :returns: Combined results.
         :type stretch_result: _PartialResult
         :type erf_result: _PartialResult
         :returns: Combined results.
@@ -712,7 +757,7 @@ class PLRsearch:
 
 # Named tuples, for multiple local variables to be passed as return value.
 _PartialResult = namedtuple(
 
 # Named tuples, for multiple local variables to be passed as return value.
 _PartialResult = namedtuple(
-    u"_PartialResult", u"value_tracker focus_tracker samples"
+    "_PartialResult", "value_tracker focus_tracker samples"
 )
 """Two stat trackers and sample counter.
 
 )
 """Two stat trackers and sample counter.
 
@@ -725,8 +770,8 @@ _PartialResult = namedtuple(
 """
 
 _ComputeResult = namedtuple(
 """
 
 _ComputeResult = namedtuple(
-    u"_ComputeResult",
-    u"measurement avg stdev stretch_exp_avg erf_exp_avg trackers"
+    "_ComputeResult",
+    "measurement avg stdev stretch_exp_avg erf_exp_avg trackers",
 )
 """Measurement, 4 computation result values, pair of trackers.
 
 )
 """Measurement, 4 computation result values, pair of trackers.
 
@@ -736,7 +781,7 @@ _ComputeResult = namedtuple(
 :param stretch_exp_avg: Stretch fitting function estimate average exponentiated.
 :param erf_exp_avg: Erf fitting function estimate average, exponentiated.
 :param trackers: Pair of focus trackers to start next iteration with.
 :param stretch_exp_avg: Stretch fitting function estimate average exponentiated.
 :param erf_exp_avg: Erf fitting function estimate average, exponentiated.
 :param trackers: Pair of focus trackers to start next iteration with.
-:type measurement: ReceiveRateMeasurement
+:type measurement: MeasurementResult
 :type avg: float
 :type stdev: float
 :type stretch_exp_avg: float
 :type avg: float
 :type stdev: float
 :type stretch_exp_avg: float