chore(mtu): delete some unused keywords related to mtu
[csit.git] / resources / libraries / python / PLRsearch / PLRsearch.py
index e20d293..0e78cc9 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2019 Cisco and/or its affiliates.
+# Copyright (c) 2022 Cisco and/or its affiliates.
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
@@ -54,9 +54,11 @@ class PLRsearch:
 
     def __init__(
             self, measurer, trial_duration_per_trial, packet_loss_ratio_target,
 
     def __init__(
             self, measurer, trial_duration_per_trial, packet_loss_ratio_target,
-            trial_number_offset=0, timeout=1800.0, trace_enabled=False):
+            trial_number_offset=0, timeout=7200.0, trace_enabled=False):
         """Store rate measurer and additional parameters.
 
         """Store rate measurer and additional parameters.
 
+        The measurer must never report negative loss count.
+
         TODO: Copy AbstractMeasurer from MLRsearch.
 
         :param measurer: The measurer to call when searching.
         TODO: Copy AbstractMeasurer from MLRsearch.
 
         :param measurer: The measurer to call when searching.
@@ -191,7 +193,7 @@ class PLRsearch:
             zeros += 1
             # TODO: Ratio of fill rate to drain rate seems to have
             # exponential impact. Make it configurable, or is 4:3 good enough?
             zeros += 1
             # TODO: Ratio of fill rate to drain rate seems to have
             # exponential impact. Make it configurable, or is 4:3 good enough?
-            if measurement.loss_fraction >= self.packet_loss_ratio_target:
+            if measurement.loss_ratio >= self.packet_loss_ratio_target:
                 for _ in range(4 * zeros):
                     lossy_loads.append(measurement.target_tr)
             if measurement.loss_count > 0:
                 for _ in range(4 * zeros):
                     lossy_loads.append(measurement.target_tr)
             if measurement.loss_count > 0:
@@ -203,7 +205,7 @@ class PLRsearch:
             if (trial_number - self.trial_number_offset) <= 1:
                 next_load = max_rate
             elif (trial_number - self.trial_number_offset) <= 3:
             if (trial_number - self.trial_number_offset) <= 1:
                 next_load = max_rate
             elif (trial_number - self.trial_number_offset) <= 3:
-                next_load = (measurement.receive_rate / (
+                next_load = (measurement.relative_receive_rate / (
                     1.0 - self.packet_loss_ratio_target))
             else:
                 next_load = (avg1 + avg2) / 2.0
                     1.0 - self.packet_loss_ratio_target))
             else:
                 next_load = (avg1 + avg2) / 2.0
@@ -424,20 +426,28 @@ class PLRsearch:
         Integrator assumes uniform distribution, but over different parameters.
         Weight and likelihood are used interchangeably here anyway.
 
         Integrator assumes uniform distribution, but over different parameters.
         Weight and likelihood are used interchangeably here anyway.
 
-        Each trial has an offered load, a duration and a loss count.
-        Fitting function is used to compute the average loss per second.
-        Poisson distribution (with average loss per trial) is used
+        Each trial has an intended load, a sent count and a loss count
+        (probably counting unsent packets as loss, as they signal
+        the load is too high for the traffic generator).
+        The fitting function is used to compute the average loss rate.
+        Geometric distribution (with average loss per trial) is used
         to get likelihood of one trial result, the overal likelihood
         is a product of all trial likelihoods.
         As likelihoods can be extremely small, logarithms are tracked instead.
 
         to get likelihood of one trial result, the overal likelihood
         is a product of all trial likelihoods.
         As likelihoods can be extremely small, logarithms are tracked instead.
 
+        The current implementation does not use direct loss rate
+        from the fitting function, as the input and output units may not match
+        (e.g. intended load in TCP transactions, loss in packets).
+        Instead, the expected average loss is scaled according to the number
+        of packets actually sent.
+
         TODO: Copy ReceiveRateMeasurement from MLRsearch.
 
         :param trace: A multiprocessing-friendly logging function (closure).
         :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
         :param trial_result_list: List of trial measurement results.
         :param mrr: The mrr parameter for the fitting function.
         TODO: Copy ReceiveRateMeasurement from MLRsearch.
 
         :param trace: A multiprocessing-friendly logging function (closure).
         :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
         :param trial_result_list: List of trial measurement results.
         :param mrr: The mrr parameter for the fitting function.
-        :param spread: The spread parameter for the fittinmg function.
+        :param spread: The spread parameter for the fitting function.
         :type trace: function (str, object) -> None
         :type lfit_func: Function from 3 floats to float.
         :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
         :type trace: function (str, object) -> None
         :type lfit_func: Function from 3 floats to float.
         :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
@@ -453,20 +463,21 @@ class PLRsearch:
             trace(u"for tr", result.target_tr)
             trace(u"lc", result.loss_count)
             trace(u"d", result.duration)
             trace(u"for tr", result.target_tr)
             trace(u"lc", result.loss_count)
             trace(u"d", result.duration)
-            log_avg_loss_per_second = lfit_func(
+            # _rel_ values use units of target_tr (transactions per second).
+            log_avg_rel_loss_per_second = lfit_func(
                 trace, result.target_tr, mrr, spread
             )
                 trace, result.target_tr, mrr, spread
             )
-            log_avg_loss_per_trial = (
-                log_avg_loss_per_second + math.log(result.duration)
-            )
-            # Poisson probability computation works nice for logarithms.
-            log_trial_likelihood = (
-                result.loss_count * log_avg_loss_per_trial
-                - math.exp(log_avg_loss_per_trial)
+            # _abs_ values use units of loss count (maybe packets).
+            # There can be multiple packets per transaction.
+            log_avg_abs_loss_per_trial = log_avg_rel_loss_per_second + math.log(
+                result.transmit_count / result.target_tr
             )
             )
-            log_trial_likelihood -= math.lgamma(1 + result.loss_count)
+            # Geometric probability computation for logarithms.
+            log_trial_likelihood = log_plus(0.0, -log_avg_abs_loss_per_trial)
+            log_trial_likelihood *= -result.loss_count
+            log_trial_likelihood -= log_plus(0.0, +log_avg_abs_loss_per_trial)
             log_likelihood += log_trial_likelihood
             log_likelihood += log_trial_likelihood
-            trace(u"avg_loss_per_trial", math.exp(log_avg_loss_per_trial))
+            trace(u"avg_loss_per_trial", math.exp(log_avg_abs_loss_per_trial))
             trace(u"log_trial_likelihood", log_trial_likelihood)
         return log_likelihood
 
             trace(u"log_trial_likelihood", log_trial_likelihood)
         return log_likelihood
 
@@ -553,6 +564,20 @@ class PLRsearch:
             :rtype: multiprocessing.Connection
             """
 
             :rtype: multiprocessing.Connection
             """
 
+            boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
+            # Starting the worker first. Contrary to documentation
+            # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection
+            # sending of large object without active listener on the other side
+            # results in a deadlock, not in a ValueError.
+            # See https://stackoverflow.com/questions/15137292/large-objects-and-multiprocessing-pipes-and-send
+            worker = multiprocessing.Process(
+                target=Integrator.try_estimate_nd,
+                args=(worker_pipe_end, 10.0, self.trace_enabled)
+            )
+            worker.daemon = True
+            worker.start()
+
+            # Only now it is safe to send the function to compute with.
             def value_logweight_func(trace, x_mrr, x_spread):
                 """Return log of critical rate and log of likelihood.
 
             def value_logweight_func(trace, x_mrr, x_spread):
                 """Return log of critical rate and log of likelihood.
 
@@ -598,16 +623,9 @@ class PLRsearch:
                 return value, logweight
 
             dilled_function = dill.dumps(value_logweight_func)
                 return value, logweight
 
             dilled_function = dill.dumps(value_logweight_func)
-            boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
             boss_pipe_end.send(
                 (dimension, dilled_function, focus_tracker, max_samples)
             )
             boss_pipe_end.send(
                 (dimension, dilled_function, focus_tracker, max_samples)
             )
-            worker = multiprocessing.Process(
-                target=Integrator.try_estimate_nd,
-                args=(worker_pipe_end, 10.0, self.trace_enabled)
-            )
-            worker.daemon = True
-            worker.start()
             return boss_pipe_end
 
         erf_pipe = start_computing(self.lfit_erf, erf_focus_tracker)
             return boss_pipe_end
 
         erf_pipe = start_computing(self.lfit_erf, erf_focus_tracker)
@@ -633,7 +651,15 @@ class PLRsearch:
                 and number of samples used for this iteration.
             :rtype: _PartialResult
             """
                 and number of samples used for this iteration.
             :rtype: _PartialResult
             """
-            pipe.send(None)
+            # If worker encountered an exception, we get it in the recv below,
+            # but send will report a broken pipe.
+            # EAFP says we should ignore the error (instead of polling first).
+            # https://devblogs.microsoft.com/python
+            #   /idiomatic-python-eafp-versus-lbyl/
+            try:
+                pipe.send(None)
+            except BrokenPipeError:
+                pass
             if not pipe.poll(10.0):
                 raise RuntimeError(f"Worker {name} did not finish!")
             result_or_traceback = pipe.recv()
             if not pipe.poll(10.0):
                 raise RuntimeError(f"Worker {name} did not finish!")
             result_or_traceback = pipe.recv()