Revert "fix(IPsecUtil): Delete keywords no longer used"
[csit.git] / resources / libraries / python / PLRsearch / PLRsearch.py
1 # Copyright (c) 2024 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Module holding PLRsearch class."""
15
16 import logging
17 import math
18 import multiprocessing
19 import time
20
21 from collections import namedtuple
22
23 import dill
24
25 from scipy.special import erfcx, erfc
26
27 # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
28 # then switch to absolute imports within PLRsearch package.
29 # Current usage of relative imports is just a short term workaround.
30 from . import Integrator
31 from . import stat_trackers
32 from .log_plus import log_plus, log_minus
33
34
35 class PLRsearch:
36     """A class to encapsulate data relevant for the search method.
37
38     The context is performance testing of packet processing systems.
39     The system, when being offered a steady stream of packets,
40     can process some of them successfully, other are considered "lost".
41
42     See docstring of the search method for algorithm description.
43
44     Two constants are stored as class fields for speed.
45
46     Method other than search (and than __init__)
47     are just internal code structure.
48
49     TODO: Those method names should start with underscore then.
50     """
51
52     xerfcx_limit = math.pow(math.acos(0), -0.5)
53     log_xerfcx_10 = math.log(xerfcx_limit - math.exp(10) * erfcx(math.exp(10)))
54
55     def __init__(
56         self,
57         measurer,
58         trial_duration_per_trial,
59         packet_loss_ratio_target,
60         trial_number_offset=0,
61         timeout=7200.0,
62         trace_enabled=False,
63     ):
64         """Store rate measurer and additional parameters.
65
66         The measurer must never report negative loss count.
67
68         TODO: Copy AbstractMeasurer from MLRsearch.
69
70         :param measurer: The measurer to call when searching.
71         :param trial_duration_per_trial: Each trial has larger duration
72             than the previous trial. This is the increment, in seconds.
73         :param packet_loss_ratio_target: The algorithm tries to estimate
74             the offered load leading to this ratio on average.
75             Trial ratio is number of packets lost divided by packets offered.
76         :param trial_number_offset: The "first" trial number will be 1+this.
77             Use this to ensure first iterations have enough time to compute
78             reasonable estimates for later trials to use.
79         :param timeout: The search ends if it lasts more than this many seconds.
80         :type measurer: MLRsearch.AbstractMeasurer
81         :type trial_duration_per_trial: float
82         :type packet_loss_ratio_target: float
83         :type trial_number_offset: int
84         :type timeout: float
85         """
86         self.measurer = measurer
87         self.trial_duration_per_trial = float(trial_duration_per_trial)
88         self.packet_loss_ratio_target = float(packet_loss_ratio_target)
89         self.trial_number_offset = int(trial_number_offset)
90         self.timeout = float(timeout)
91         self.trace_enabled = bool(trace_enabled)
92
93     def search(self, min_rate, max_rate):
94         """Perform the search, return average and stdev for throughput estimate.
95
96         Considering measurer and packet_loss_ratio_target (see __init__),
97         find such an offered load (called critical load) that is expected
98         to hit the target loss ratio in the limit of very long trial duration.
99         As the system is probabilistic (and test duration is finite),
100         the critical ratio is only estimated.
101         Return the average and standard deviation of the estimate.
102
103         In principle, this algorithm performs trial measurements,
104         each with varied offered load (which is constant during the trial).
105         During each measurement, Bayesian inference is performed
106         on all the measurement results so far.
107         When timeout is up, the last estimate is returned,
108         else another trial is performed.
109
110         It is assumed that the system under test, even though not deterministic,
111         still follows the rule of large numbers. In another words,
112         any growing set of measurements at a particular offered load
113         will converge towards unique (for the given load) packet loss ratio.
114         This means there is a deterministic (but unknown) function
115         mapping the offered load to average loss ratio.
116         This function is called loss ratio function.
117         This also assumes the average loss ratio
118         does not depend on trial duration.
119
120         The actual probability distribution of loss counts, achieving
121         the average ratio on trials of various duration
122         can be complicated (and can depend on offered load), but simply assuming
123         Poisson distribution will make the algorithm converge.
124         Binomial distribution would be more precise,
125         but Poisson is more practical, as it effectively gives
126         less information content to high ratio results.
127
128         Even when applying other assumptions on the loss ratio function
129         (increasing function, limit zero ratio when load goes to zero,
130         global upper limit on rate of packets processed), there are still
131         too many different shapes of possible loss functions,
132         which makes full Bayesian reasoning intractable.
133
134         This implementation radically simplifies things by examining
135         only two shapes, each with finitely many (in this case just two)
136         parameters. In other words, two fitting functions
137         (each with two parameters and one argument).
138         When restricting model space to one of the two fitting functions,
139         the Bayesian inference becomes tractable (even though it needs
140         numerical integration from Integrator class).
141
142         The first measurement is done at the middle between
143         min_rate and max_rate, to help with convergence
144         if max_rate measurements give loss below target.
145         TODO: Fix overflow error and use min_rate instead of the middle.
146
147         The second measurement is done at max_rate, next few measurements
148         have offered load of previous load minus excess loss rate.
149         This simple rule is found to be good when offered loads
150         so far are way above the critical rate. After few measurements,
151         inference from fitting functions converges faster that this initial
152         "optimistic" procedure.
153
154         Offered loads close to (limiting) critical rate are the most useful,
155         as linear approximation of the fitting function
156         becomes good enough there (thus reducing the impact
157         of the overall shape of fitting function).
158         After several trials, usually one of the fitting functions
159         has better predictions than the other one, but the algorithm
160         does not track that. Simply, it uses the estimate average,
161         alternating between the functions.
162         Multiple workarounds are applied to try and avoid measurements
163         both in zero loss region and in big loss region,
164         as their results tend to make the critical load estimate worse.
165
166         The returned average and stdev is a combination of the two fitting
167         estimates.
168
169         :param min_rate: Avoid measuring at offered loads below this,
170             in packets per second.
171         :param max_rate: Avoid measuring at offered loads above this,
172             in packets per second.
173         :type min_rate: float
174         :type max_rate: float
175         :returns: Average and stdev of critical load estimate.
176         :rtype: 2-tuple of float
177         """
178         stop_time = time.time() + self.timeout
179         min_rate = float(min_rate)
180         max_rate = float(max_rate)
181         logging.info(
182             f"Started search with min_rate {min_rate!r}, "
183             f"max_rate {max_rate!r}"
184         )
185         trial_result_list = []
186         trial_number = self.trial_number_offset
187         focus_trackers = (None, None)
188         transmit_rate = (min_rate + max_rate) / 2.0
189         lossy_loads = [max_rate]
190         zeros = 0  # How many consecutive zero loss results are happening.
191         while 1:
192             trial_number += 1
193             logging.info(f"Trial {trial_number!r}")
194             results = self.measure_and_compute(
195                 self.trial_duration_per_trial * trial_number,
196                 transmit_rate,
197                 trial_result_list,
198                 min_rate,
199                 max_rate,
200                 focus_trackers,
201             )
202             measurement, average, stdev, avg1, avg2, focus_trackers = results
203             # Workaround for unsent packets and other anomalies.
204             measurement.plr_loss_count = min(
205                 measurement.intended_count,
206                 int(measurement.intended_count * measurement.loss_ratio + 0.9),
207             )
208             logging.debug(
209                 f"loss ratio {measurement.plr_loss_count}"
210                 f" / {measurement.intended_count}"
211             )
212             zeros += 1
213             # TODO: Ratio of fill rate to drain rate seems to have
214             # exponential impact. Make it configurable, or is 4:3 good enough?
215             if measurement.plr_loss_count >= (
216                 measurement.intended_count * self.packet_loss_ratio_target
217             ):
218                 for _ in range(4 * zeros):
219                     lossy_loads.append(measurement.intended_load)
220                 lossy_loads.sort()
221                 zeros = 0
222                 logging.debug("High enough loss, lossy loads added.")
223             else:
224                 logging.debug(
225                     f"Not a high loss, zero counter bumped to {zeros}."
226                 )
227             if stop_time <= time.time():
228                 return average, stdev
229             trial_result_list.append(measurement)
230             if (trial_number - self.trial_number_offset) <= 1:
231                 next_load = max_rate
232             elif (trial_number - self.trial_number_offset) <= 3:
233                 next_load = measurement.relative_forwarding_rate / (
234                     1.0 - self.packet_loss_ratio_target
235                 )
236             else:
237                 next_load = (avg1 + avg2) / 2.0
238                 if zeros > 0:
239                     if lossy_loads[0] > next_load:
240                         diminisher = math.pow(2.0, 1 - zeros)
241                         next_load = lossy_loads[0] + diminisher * next_load
242                         next_load /= 1.0 + diminisher
243                     # On zero measurement, we need to drain obsoleted low losses
244                     # even if we did not use them to increase next_load,
245                     # in order to get to usable loses at higher loads.
246                     if len(lossy_loads) > 3:
247                         lossy_loads = lossy_loads[3:]
248                 logging.debug(
249                     f"Zeros {zeros!r} orig {(avg1 + avg2) / 2.0!r} "
250                     f"next {next_load!r} loads {lossy_loads!r}"
251                 )
252             transmit_rate = min(max_rate, max(min_rate, next_load))
253
254     @staticmethod
255     def lfit_stretch(trace, load, mrr, spread):
256         """Stretch-based fitting function.
257
258         Return the logarithm of average packet loss per second
259         when the load (argument) is offered to a system with given
260         mrr and spread (parameters).
261         Stretch function is 1/(1+Exp[-x]). The average itself is definite
262         integral from zero to load, of shifted and x-scaled stretch function.
263         As the integrator is sensitive to discontinuities,
264         and it calls this function at large areas of parameter space,
265         the implementation has to avoid rounding errors, overflows,
266         and correctly approximate underflows.
267
268         TODO: Explain how the high-level description
269         has been converted into an implementation full of ifs.
270
271         :param trace: A multiprocessing-friendly logging function (closure).
272         :param load: Offered load (positive), in packets per second.
273         :param mrr: Parameter of this fitting function, equal to limiting
274             (positive) average number of packets received (as opposed to lost)
275             when offered load is many spreads more than mrr.
276         :param spread: The x-scaling parameter (positive). No nice semantics,
277             roughly corresponds to size of "tail" for loads below mrr.
278         :type trace: function (str, object) -> NoneType
279         :type load: float
280         :type mrr: float
281         :type spread: float
282         :returns: Logarithm of average number of packets lost per second.
283         :rtype: float
284         """
285         # TODO: What is the fastest way to use such values?
286         log_2 = math.log(2)
287         log_3 = math.log(3)
288         log_spread = math.log(spread)
289         # TODO: chi is from https://en.wikipedia.org/wiki/Nondimensionalization
290         chi = (load - mrr) / spread
291         chi0 = -mrr / spread
292         trace("stretch: load", load)
293         trace("mrr", mrr)
294         trace("spread", spread)
295         trace("chi", chi)
296         trace("chi0", chi0)
297         if chi > 0:
298             log_lps = math.log(
299                 load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread
300             )
301             trace("big loss direct log_lps", log_lps)
302         else:
303             two_positive = log_plus(chi, 2 * chi0 - log_2)
304             two_negative = log_plus(chi0, 2 * chi - log_2)
305             if two_positive <= two_negative:
306                 log_lps = log_minus(chi, chi0) + log_spread
307                 trace("small loss crude log_lps", log_lps)
308                 return log_lps
309             two = log_minus(two_positive, two_negative)
310             three_positive = log_plus(two_positive, 3 * chi - log_3)
311             three_negative = log_plus(two_negative, 3 * chi0 - log_3)
312             three = log_minus(three_positive, three_negative)
313             if two == three:
314                 log_lps = two + log_spread
315                 trace("small loss approx log_lps", log_lps)
316             else:
317                 log_lps = math.log(log_plus(0, chi) - log_plus(0, chi0))
318                 log_lps += log_spread
319                 trace("small loss direct log_lps", log_lps)
320         return log_lps
321
322     @staticmethod
323     def lfit_erf(trace, load, mrr, spread):
324         """Erf-based fitting function.
325
326         Return the logarithm of average packet loss per second
327         when the load (argument) is offered to a system with given
328         mrr and spread (parameters).
329         Erf function is Primitive function to normal distribution density.
330         The average itself is definite integral from zero to load,
331         of shifted and x-scaled erf function.
332         As the integrator is sensitive to discontinuities,
333         and it calls this function at large areas of parameter space,
334         the implementation has to avoid rounding errors, overflows,
335         and correctly approximate underflows.
336
337         TODO: Explain how the high-level description
338         has been converted into an implementation full of ifs.
339
340         :param trace: A multiprocessing-friendly logging function (closure).
341         :param load: Offered load (positive), in packets per second.
342         :param mrr: Parameter of this fitting function, equal to limiting
343             (positive) average number of packets received (as opposed to lost)
344             when offered load is many spreads more than mrr.
345         :param spread: The x-scaling parameter (positive). No nice semantics,
346             roughly corresponds to size of "tail" for loads below mrr.
347         :type trace: function (str, object) -> NoneType
348         :type load: float
349         :type mrr: float
350         :type spread: float
351         :returns: Logarithm of average number of packets lost per second.
352         :rtype: float
353         """
354         # Beware, this chi has the sign opposite to the stretch function chi.
355         # TODO: The stretch sign is just to have less minuses. Worth changing?
356         chi = (mrr - load) / spread
357         chi0 = mrr / spread
358         trace("Erf: load", load)
359         trace("mrr", mrr)
360         trace("spread", spread)
361         trace("chi", chi)
362         trace("chi0", chi0)
363         if chi >= -1.0:
364             trace("positive, b roughly bigger than m", None)
365             if chi > math.exp(10):
366                 first = PLRsearch.log_xerfcx_10 + 2 * (math.log(chi) - 10)
367                 trace("approximated first", first)
368             else:
369                 first = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi))
370                 trace("exact first", first)
371             first -= chi * chi
372             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
373             second -= chi0 * chi0
374             intermediate = log_minus(first, second)
375             trace("first", first)
376         else:
377             trace("negative, b roughly smaller than m", None)
378             exp_first = PLRsearch.xerfcx_limit + chi * erfcx(-chi)
379             exp_first *= math.exp(-chi * chi)
380             exp_first -= 2 * chi
381             # TODO: Why has the following line chi there (as opposed to chi0)?
382             # In general the functions would be more readable if they explicitly
383             #     return math.log(func(chi) - func(chi0))
384             # for some function "func", at least for some branches.
385             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
386             second -= chi0 * chi0
387             intermediate = math.log(exp_first - math.exp(second))
388             trace("exp_first", exp_first)
389         trace("second", second)
390         trace("intermediate", intermediate)
391         result = intermediate + math.log(spread) - math.log(erfc(-chi0))
392         trace("result", result)
393         return result
394
395     @staticmethod
396     def find_critical_rate(
397         trace, lfit_func, min_rate, max_rate, loss_ratio_target, mrr, spread
398     ):
399         """Given ratio target and parameters, return the achieving offered load.
400
401         This is basically an inverse function to lfit_func
402         when parameters are fixed.
403         Instead of implementing effective implementation
404         of the inverse function, this implementation uses
405         brute force binary search. It is bisecting (nim_rate, max_rate) interval
406         until the critical load is found (or interval becomes degenerate).
407         This implementation assures min and max rate limits are honored.
408
409         TODO: Use some method with faster convergence?
410
411         :param trace: A multiprocessing-friendly logging function (closure).
412         :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
413         :param min_rate: Lower bound for binary search [pps].
414         :param max_rate: Upper bound for binary search [pps].
415         :param loss_ratio_target: Fitting function should return loss rate
416             giving this ratio at the returned load and parameters [1].
417         :param mrr: The mrr parameter for the fitting function [pps].
418         :param spread: The spread parameter for the fittinmg function [pps].
419         :type trace: function (str, object) -> None
420         :type lfit_func: Function from 3 floats to float.
421         :type min_rate: float
422         :type max_rate: float
423         :type loss_ratio_target: float
424         :type mrr: float
425         :type spread: float
426         :returns: Load [pps] which achieves the target with given parameters.
427         :rtype: float
428         """
429         trace("Finding critical rate for loss_ratio_target", loss_ratio_target)
430         rate_lo = min_rate
431         rate_hi = max_rate
432         loss_ratio = -1
433         while loss_ratio != loss_ratio_target:
434             rate = (rate_hi + rate_lo) / 2.0
435             if rate in (rate_hi, rate_lo):
436                 break
437             loss_rate = math.exp(lfit_func(trace, rate, mrr, spread))
438             loss_ratio = loss_rate / rate
439             if loss_ratio > loss_ratio_target:
440                 trace("halving down", rate)
441                 rate_hi = rate
442             elif loss_ratio < loss_ratio_target:
443                 trace("halving up", rate)
444                 rate_lo = rate
445         trace("found", rate)
446         return rate
447
448     @staticmethod
449     def log_weight(trace, lfit_func, trial_result_list, mrr, spread):
450         """Return log of weight of trial results by the function and parameters.
451
452         Integrator assumes uniform distribution, but over different parameters.
453         Weight and likelihood are used interchangeably here anyway.
454
455         Each trial has an intended load, a sent count and a loss count
456         (probably counting unsent packets as loss, as they signal
457         the load is too high for the traffic generator).
458         The fitting function is used to compute the average loss rate.
459         Geometric distribution (with average loss per trial) is used
460         to get likelihood of one trial result, the overal likelihood
461         is a product of all trial likelihoods.
462         As likelihoods can be extremely small, logarithms are tracked instead.
463
464         The current implementation does not use direct loss rate
465         from the fitting function, as the input and output units may not match
466         (e.g. intended load in TCP transactions, loss in packets).
467         Instead, the expected average loss is scaled according to the number
468         of packets actually sent.
469
470         TODO: Copy MeasurementResult from MLRsearch.
471
472         :param trace: A multiprocessing-friendly logging function (closure).
473         :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
474         :param trial_result_list: List of trial measurement results.
475         :param mrr: The mrr parameter for the fitting function.
476         :param spread: The spread parameter for the fitting function.
477         :type trace: function (str, object) -> None
478         :type lfit_func: Function from 3 floats to float.
479         :type trial_result_list: list of MLRsearch.MeasurementResult
480         :type mrr: float
481         :type spread: float
482         :returns: Logarithm of result weight for given function and parameters.
483         :rtype: float
484         """
485         log_likelihood = 0.0
486         trace("log_weight for mrr", mrr)
487         trace("spread", spread)
488         for result in trial_result_list:
489             trace("for tr", result.intended_load)
490             trace("plc", result.plr_loss_count)
491             trace("d", result.intended_duration)
492             # _rel_ values use units of intended_load (transactions per second).
493             log_avg_rel_loss_per_second = lfit_func(
494                 trace, result.intended_load, mrr, spread
495             )
496             # _abs_ values use units of loss count (maybe packets).
497             # There can be multiple packets per transaction.
498             log_avg_abs_loss_per_trial = log_avg_rel_loss_per_second + math.log(
499                 result.offered_count / result.intended_load
500             )
501             # Geometric probability computation for logarithms.
502             log_trial_likelihood = log_plus(0.0, -log_avg_abs_loss_per_trial)
503             log_trial_likelihood *= -result.plr_loss_count
504             log_trial_likelihood -= log_plus(0.0, +log_avg_abs_loss_per_trial)
505             log_likelihood += log_trial_likelihood
506             trace("avg_loss_per_trial", math.exp(log_avg_abs_loss_per_trial))
507             trace("log_trial_likelihood", log_trial_likelihood)
508         return log_likelihood
509
510     def measure_and_compute(
511         self,
512         trial_duration,
513         transmit_rate,
514         trial_result_list,
515         min_rate,
516         max_rate,
517         focus_trackers=(None, None),
518         max_samples=None,
519     ):
520         """Perform both measurement and computation at once.
521
522         High level steps: Prepare and launch computation worker processes,
523         perform the measurement, stop computation and combine results.
524
525         Integrator needs a specific function to process (-1, 1) parameters.
526         As our fitting functions use dimensional parameters,
527         so a transformation is performed, resulting in a specific prior
528         distribution over the dimensional parameters.
529         Maximal rate (line rate) is needed for that transformation.
530
531         Two fitting functions are used, computation is started
532         on temporary worker process per fitting function. After the measurement,
533         average and stdev of the critical rate (not log) of each worker
534         are combined and returned. Raw averages are also returned,
535         offered load for next iteration is chosen based on them.
536         The idea is that one fitting function might be fitting much better,
537         measurements at its avg are best for relevant results (for both),
538         but we do not know which fitting function it is.
539
540         Focus trackers are updated in-place. If a focus tracker in None,
541         new instance is created.
542
543         TODO: Define class for result object, so that fields are documented.
544         TODO: Re-use processes, instead creating on each computation?
545         TODO: As only one result is needed fresh, figure out a way
546         how to keep the other worker running. This will alow shorter
547         duration per trial. Special handling at first and last measurement
548         will be needed (to properly initialize and to properly combine results).
549
550         :param trial_duration: Length of the measurement in seconds.
551         :param transmit_rate: Offered load in packets per second.
552         :param trial_result_list: Results of previous measurements.
553         :param min_rate: Practical minimum of possible ofered load.
554         :param max_rate: Practical maximum of possible ofered load.
555         :param focus_trackers: Pair of trackers initialized
556             to speed up the numeric computation.
557         :param max_samples: Limit for integrator samples, for debugging.
558         :type trial_duration: float
559         :type transmit_rate: float
560         :type trial_result_list: list of MLRsearch.MeasurementResult
561         :type min_rate: float
562         :type max_rate: float
563         :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker
564         :type max_samples: None or int
565         :returns: Measurement and computation results.
566         :rtype: _ComputeResult
567         """
568         logging.debug(
569             f"measure_and_compute started with self {self!r}, trial_duration "
570             f"{trial_duration!r}, transmit_rate {transmit_rate!r}, "
571             f"trial_result_list {trial_result_list!r}, max_rate {max_rate!r}, "
572             f"focus_trackers {focus_trackers!r}, max_samples {max_samples!r}"
573         )
574         # Preparation phase.
575         dimension = 2
576         stretch_focus_tracker, erf_focus_tracker = focus_trackers
577         if stretch_focus_tracker is None:
578             stretch_focus_tracker = stat_trackers.VectorStatTracker(dimension)
579             stretch_focus_tracker.unit_reset()
580         if erf_focus_tracker is None:
581             erf_focus_tracker = stat_trackers.VectorStatTracker(dimension)
582             erf_focus_tracker.unit_reset()
583         old_trackers = stretch_focus_tracker.copy(), erf_focus_tracker.copy()
584
585         def start_computing(fitting_function, focus_tracker):
586             """Just a block of code to be used for each fitting function.
587
588             Define function for integrator, create process and pipe ends,
589             start computation, return the boss pipe end.
590
591             :param fitting_function: lfit_erf or lfit_stretch.
592             :param focus_tracker: Tracker initialized to speed up the numeric
593                 computation.
594             :type fitting_function: Function from 3 floats to float.
595             :type focus_tracker: None or stat_trackers.VectorStatTracker
596             :returns: Boss end of communication pipe.
597             :rtype: multiprocessing.Connection
598             """
599
600             boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
601             # Starting the worker first. Contrary to documentation
602             # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection
603             # sending of large object without active listener on the other side
604             # results in a deadlock, not in a ValueError.
605             # See https://stackoverflow.com/questions/15137292/large-objects-and-multiprocessing-pipes-and-send
606             worker = multiprocessing.Process(
607                 target=Integrator.try_estimate_nd,
608                 args=(worker_pipe_end, 5.0, self.trace_enabled),
609             )
610             worker.daemon = True
611             worker.start()
612
613             # Only now it is safe to send the function to compute with.
614             def value_logweight_func(trace, x_mrr, x_spread):
615                 """Return log of critical rate and log of likelihood.
616
617                 This is a closure. The ancestor function got
618                 trial_result_list as a parameter, and we are accessing it.
619                 As integrator has strict conditions on function signature,
620                 trial_result_list cannot be an explicit argument
621                 of the current function.
622                 This is also why we have to define this closure
623                 at each invocation of the ancestor function anew.
624
625                 The dimensional spread parameter is the (dimensional) mrr
626                 raised to the power of x_spread scaled to interval (0, 1).
627                 The dimensional mrr parameter distribution has shape of
628                 1/(1+x^2), but x==1 corresponds to max_rate
629                 and 1.0 pps is added to avoid numerical problems in fitting
630                 functions.
631
632                 TODO: x^-2 (for x>1.0) might be simpler/nicer prior.
633
634                 :param trace: Multiprocessing-safe logging function (closure).
635                 :param x_mrr: The first dimensionless param
636                     from (-1, 1) interval.
637                 :param x_spread: The second dimensionless param
638                     from (-1, 1) interval.
639                 :type trace: function (str, object) -> None
640                 :type x_mrr: float
641                 :type x_spread: float
642                 :returns: Log of critical rate [pps] and log of likelihood.
643                 :rtype: 2-tuple of float
644                 """
645                 mrr = max_rate * (1.0 / (x_mrr + 1.0) - 0.5) + 1.0
646                 spread = math.exp((x_spread + 1.0) / 2.0 * math.log(mrr))
647                 logweight = self.log_weight(
648                     trace, fitting_function, trial_result_list, mrr, spread
649                 )
650                 value = math.log(
651                     self.find_critical_rate(
652                         trace,
653                         fitting_function,
654                         min_rate,
655                         max_rate,
656                         self.packet_loss_ratio_target,
657                         mrr,
658                         spread,
659                     )
660                 )
661                 return value, logweight
662
663             dilled_function = dill.dumps(value_logweight_func)
664             boss_pipe_end.send(
665                 (dimension, dilled_function, focus_tracker, max_samples)
666             )
667             return boss_pipe_end
668
669         erf_pipe = start_computing(self.lfit_erf, erf_focus_tracker)
670         stretch_pipe = start_computing(self.lfit_stretch, stretch_focus_tracker)
671
672         # Measurement phase.
673         measurement = self.measurer.measure(trial_duration, transmit_rate)
674
675         # Processing phase.
676         def stop_computing(name, pipe):
677             """Just a block of code to be used for each worker.
678
679             Send stop object, poll for result, then either
680             unpack response, log messages and return, or raise traceback.
681
682             TODO: Define class/structure for the return value?
683
684             :param name: Human friendly worker identifier for logging purposes.
685             :param pipe: Boss end of connection towards worker to stop.
686             :type name: str
687             :type pipe: multiprocessing.Connection
688             :returns: Computed value tracker, actual focus tracker,
689                 and number of samples used for this iteration.
690             :rtype: _PartialResult
691             """
692             # If worker encountered an exception, we get it in the recv below,
693             # but send will report a broken pipe.
694             # EAFP says we should ignore the error (instead of polling first).
695             # https://devblogs.microsoft.com/python
696             #   /idiomatic-python-eafp-versus-lbyl/
697             try:
698                 pipe.send(None)
699             except BrokenPipeError:
700                 pass
701             if not pipe.poll(10.0):
702                 raise RuntimeError(f"Worker {name} did not finish!")
703             result_or_traceback = pipe.recv()
704             try:
705                 (
706                     value_tracker,
707                     focus_tracker,
708                     debug_list,
709                     trace_list,
710                     sampls,
711                 ) = result_or_traceback
712             except ValueError as exc:
713                 raise RuntimeError(
714                     f"Worker {name} failed with the following traceback:\n"
715                     f"{result_or_traceback}"
716                 ) from exc
717             logging.info(f"Logs from worker {name!r}:")
718             for message in debug_list:
719                 logging.info(message)
720             for message in trace_list:
721                 logging.debug(message)
722             logging.debug(
723                 f"trackers: value {value_tracker!r} focus {focus_tracker!r}"
724             )
725             return _PartialResult(value_tracker, focus_tracker, sampls)
726
727         stretch_result = stop_computing("stretch", stretch_pipe)
728         erf_result = stop_computing("erf", erf_pipe)
729         result = PLRsearch._get_result(measurement, stretch_result, erf_result)
730         logging.info(
731             f"measure_and_compute finished with trial result "
732             f"{result.measurement!r} avg {result.avg!r} stdev {result.stdev!r} "
733             f"stretch {result.stretch_exp_avg!r} erf {result.erf_exp_avg!r} "
734             f"new trackers {result.trackers!r} old trackers {old_trackers!r} "
735             f"stretch samples {stretch_result.samples!r} erf samples "
736             f"{erf_result.samples!r}"
737         )
738         return result
739
740     @staticmethod
741     def _get_result(measurement, stretch_result, erf_result):
742         """Process and collate results from measure_and_compute.
743
744         Turn logarithm based values to exponential ones,
745         combine averages and stdevs of two fitting functions into a whole.
746
747         :param measurement: The trial measurement obtained during computation.
748         :param stretch_result: Computation output for stretch fitting function.
749         :param erf_result: Computation output for erf fitting function.
750         :type measurement: MeasurementResult
751         :type stretch_result: _PartialResult
752         :type erf_result: _PartialResult
753         :returns: Combined results.
754         :rtype: _ComputeResult
755         """
756         stretch_avg = stretch_result.value_tracker.average
757         erf_avg = erf_result.value_tracker.average
758         stretch_var = stretch_result.value_tracker.get_pessimistic_variance()
759         erf_var = erf_result.value_tracker.get_pessimistic_variance()
760         avg_log = (stretch_avg + erf_avg) / 2.0
761         var_log = (stretch_var + erf_var) / 2.0
762         var_log += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
763         stdev_log = math.sqrt(var_log)
764         low, upp = math.exp(avg_log - stdev_log), math.exp(avg_log + stdev_log)
765         avg = (low + upp) / 2
766         stdev = avg - low
767         trackers = (stretch_result.focus_tracker, erf_result.focus_tracker)
768         sea = math.exp(stretch_avg)
769         eea = math.exp(erf_avg)
770         return _ComputeResult(measurement, avg, stdev, sea, eea, trackers)
771
772
773 # Named tuples, for multiple local variables to be passed as return value.
774 _PartialResult = namedtuple(
775     "_PartialResult", "value_tracker focus_tracker samples"
776 )
777 """Two stat trackers and sample counter.
778
779 :param value_tracker: Tracker for the value (critical load) being integrated.
780 :param focus_tracker: Tracker for focusing integration inputs (sample points).
781 :param samples: How many samples were used for the computation.
782 :type value_tracker: stat_trackers.ScalarDualStatTracker
783 :type focus_tracker: stat_trackers.VectorStatTracker
784 :type samples: int
785 """
786
787 _ComputeResult = namedtuple(
788     "_ComputeResult",
789     "measurement avg stdev stretch_exp_avg erf_exp_avg trackers",
790 )
791 """Measurement, 4 computation result values, pair of trackers.
792
793 :param measurement: The trial measurement result obtained during computation.
794 :param avg: Overall average of critical rate estimate.
795 :param stdev: Overall standard deviation of critical rate estimate.
796 :param stretch_exp_avg: Stretch fitting function estimate average exponentiated.
797 :param erf_exp_avg: Erf fitting function estimate average, exponentiated.
798 :param trackers: Pair of focus trackers to start next iteration with.
799 :type measurement: MeasurementResult
800 :type avg: float
801 :type stdev: float
802 :type stretch_exp_avg: float
803 :type erf_exp_avg: float
804 :type trackers: 2-tuple of stat_trackers.VectorStatTracker
805 """