feat(MLRsearch): MLRsearch v7
[csit.git] / resources / libraries / python / PLRsearch / PLRsearch.py
index ce65fd2..7599a9e 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2021 Cisco and/or its affiliates.
+# Copyright (c) 2023 Cisco and/or its affiliates.
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
@@ -195,8 +195,8 @@ class PLRsearch:
             # exponential impact. Make it configurable, or is 4:3 good enough?
             if measurement.loss_ratio >= self.packet_loss_ratio_target:
                 for _ in range(4 * zeros):
-                    lossy_loads.append(measurement.target_tr)
-            if measurement.loss_count > 0:
+                    lossy_loads.append(measurement.intended_load)
+            if measurement.loss_ratio > 0.0:
                 zeros = 0
             lossy_loads.sort()
             if stop_time <= time.time():
@@ -205,7 +205,7 @@ class PLRsearch:
             if (trial_number - self.trial_number_offset) <= 1:
                 next_load = max_rate
             elif (trial_number - self.trial_number_offset) <= 3:
-                next_load = (measurement.relative_receive_rate / (
+                next_load = (measurement.relative_forwarding_rate / (
                     1.0 - self.packet_loss_ratio_target))
             else:
                 next_load = (avg1 + avg2) / 2.0
@@ -441,7 +441,7 @@ class PLRsearch:
         Instead, the expected average loss is scaled according to the number
         of packets actually sent.
 
-        TODO: Copy ReceiveRateMeasurement from MLRsearch.
+        TODO: Copy MeasurementResult from MLRsearch.
 
         :param trace: A multiprocessing-friendly logging function (closure).
         :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
@@ -450,7 +450,7 @@ class PLRsearch:
         :param spread: The spread parameter for the fitting function.
         :type trace: function (str, object) -> None
         :type lfit_func: Function from 3 floats to float.
-        :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
+        :type trial_result_list: list of MLRsearch.MeasurementResult
         :type mrr: float
         :type spread: float
         :returns: Logarithm of result weight for given function and parameters.
@@ -460,17 +460,17 @@ class PLRsearch:
         trace(u"log_weight for mrr", mrr)
         trace(u"spread", spread)
         for result in trial_result_list:
-            trace(u"for tr", result.target_tr)
+            trace(u"for tr", result.intended_load)
             trace(u"lc", result.loss_count)
-            trace(u"d", result.duration)
-            # _rel_ values use units of target_tr (transactions per second).
+            trace(u"d", result.intended_duration)
+            # _rel_ values use units of intended_load (transactions per second).
             log_avg_rel_loss_per_second = lfit_func(
-                trace, result.target_tr, mrr, spread
+                trace, result.intended_load, mrr, spread
             )
             # _abs_ values use units of loss count (maybe packets).
             # There can be multiple packets per transaction.
             log_avg_abs_loss_per_trial = log_avg_rel_loss_per_second + math.log(
-                result.transmit_count / result.target_tr
+                result.offered_count / result.intended_load
             )
             # Geometric probability computation for logarithms.
             log_trial_likelihood = log_plus(0.0, -log_avg_abs_loss_per_trial)
@@ -524,7 +524,7 @@ class PLRsearch:
         :param max_samples: Limit for integrator samples, for debugging.
         :type trial_duration: float
         :type transmit_rate: float
-        :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
+        :type trial_result_list: list of MLRsearch.MeasurementResult
         :type min_rate: float
         :type max_rate: float
         :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker
@@ -564,6 +564,20 @@ class PLRsearch:
             :rtype: multiprocessing.Connection
             """
 
+            boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
+            # Starting the worker first. Contrary to documentation
+            # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection
+            # sending of large object without active listener on the other side
+            # results in a deadlock, not in a ValueError.
+            # See https://stackoverflow.com/questions/15137292/large-objects-and-multiprocessing-pipes-and-send
+            worker = multiprocessing.Process(
+                target=Integrator.try_estimate_nd,
+                args=(worker_pipe_end, 5.0, self.trace_enabled)
+            )
+            worker.daemon = True
+            worker.start()
+
+            # Only now it is safe to send the function to compute with.
             def value_logweight_func(trace, x_mrr, x_spread):
                 """Return log of critical rate and log of likelihood.
 
@@ -609,15 +623,6 @@ class PLRsearch:
                 return value, logweight
 
             dilled_function = dill.dumps(value_logweight_func)
-            boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
-            # Do not send yet, run the worker first to avoid a deadlock.
-            # See https://stackoverflow.com/a/15716500
-            worker = multiprocessing.Process(
-                target=Integrator.try_estimate_nd,
-                args=(worker_pipe_end, 10.0, self.trace_enabled)
-            )
-            worker.daemon = True
-            worker.start()
             boss_pipe_end.send(
                 (dimension, dilled_function, focus_tracker, max_samples)
             )
@@ -700,7 +705,7 @@ class PLRsearch:
         :param measurement: The trial measurement obtained during computation.
         :param stretch_result: Computation output for stretch fitting function.
         :param erf_result: Computation output for erf fitting function.
-        :type measurement: ReceiveRateMeasurement
+        :type measurement: MeasurementResult
         :type stretch_result: _PartialResult
         :type erf_result: _PartialResult
         :returns: Combined results.
@@ -749,7 +754,7 @@ _ComputeResult = namedtuple(
 :param stretch_exp_avg: Stretch fitting function estimate average exponentiated.
 :param erf_exp_avg: Erf fitting function estimate average, exponentiated.
 :param trackers: Pair of focus trackers to start next iteration with.
-:type measurement: ReceiveRateMeasurement
+:type measurement: MeasurementResult
 :type avg: float
 :type stdev: float
 :type stretch_exp_avg: float