PLRsearch: Update docscrings
[csit.git] / resources / libraries / python / PLRsearch / PLRsearch.py
1 # Copyright (c) 2021 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Module holding PLRsearch class."""
15
16 import logging
17 import math
18 import multiprocessing
19 import time
20
21 from collections import namedtuple
22
23 import dill
24
25 from scipy.special import erfcx, erfc
26
27 # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
28 # then switch to absolute imports within PLRsearch package.
29 # Current usage of relative imports is just a short term workaround.
30 from . import Integrator
31 from . import stat_trackers
32 from .log_plus import log_plus, log_minus
33
34
35 class PLRsearch:
36     """A class to encapsulate data relevant for the search method.
37
38     The context is performance testing of packet processing systems.
39     The system, when being offered a steady stream of packets,
40     can process some of them successfully, other are considered "lost".
41
42     See docstring of the search method for algorithm description.
43
44     Two constants are stored as class fields for speed.
45
46     Method other than search (and than __init__)
47     are just internal code structure.
48
49     TODO: Those method names should start with underscore then.
50     """
51
52     xerfcx_limit = math.pow(math.acos(0), -0.5)
53     log_xerfcx_10 = math.log(xerfcx_limit - math.exp(10) * erfcx(math.exp(10)))
54
55     def __init__(
56             self, measurer, trial_duration_per_trial, packet_loss_ratio_target,
57             trial_number_offset=0, timeout=7200.0, trace_enabled=False):
58         """Store rate measurer and additional parameters.
59
60         The measurer must never report negative loss count.
61
62         TODO: Copy AbstractMeasurer from MLRsearch.
63
64         :param measurer: The measurer to call when searching.
65         :param trial_duration_per_trial: Each trial has larger duration
66             than the previous trial. This is the increment, in seconds.
67         :param packet_loss_ratio_target: The algorithm tries to estimate
68             the offered load leading to this ratio on average.
69             Trial ratio is number of packets lost divided by packets offered.
70         :param trial_number_offset: The "first" trial number will be 1+this.
71             Use this to ensure first iterations have enough time to compute
72             reasonable estimates for later trials to use.
73         :param timeout: The search ends if it lasts more than this many seconds.
74         :type measurer: MLRsearch.AbstractMeasurer
75         :type trial_duration_per_trial: float
76         :type packet_loss_ratio_target: float
77         :type trial_number_offset: int
78         :type timeout: float
79         """
80         self.measurer = measurer
81         self.trial_duration_per_trial = float(trial_duration_per_trial)
82         self.packet_loss_ratio_target = float(packet_loss_ratio_target)
83         self.trial_number_offset = int(trial_number_offset)
84         self.timeout = float(timeout)
85         self.trace_enabled = bool(trace_enabled)
86
87     def search(self, min_rate, max_rate):
88         """Perform the search, return average and stdev for throughput estimate.
89
90         Considering measurer and packet_loss_ratio_target (see __init__),
91         find such an offered load (called critical load) that is expected
92         to hit the target loss ratio in the limit of very long trial duration.
93         As the system is probabilistic (and test duration is finite),
94         the critical ratio is only estimated.
95         Return the average and standard deviation of the estimate.
96
97         In principle, this algorithm performs trial measurements,
98         each with varied offered load (which is constant during the trial).
99         During each measurement, Bayesian inference is performed
100         on all the measurement results so far.
101         When timeout is up, the last estimate is returned,
102         else another trial is performed.
103
104         It is assumed that the system under test, even though not deterministic,
105         still follows the rule of large numbers. In another words,
106         any growing set of measurements at a particular offered load
107         will converge towards unique (for the given load) packet loss ratio.
108         This means there is a deterministic (but unknown) function
109         mapping the offered load to average loss ratio.
110         This function is called loss ratio function.
111         This also assumes the average loss ratio
112         does not depend on trial duration.
113
114         The actual probability distribution of loss counts, achieving
115         the average ratio on trials of various duration
116         can be complicated (and can depend on offered load), but simply assuming
117         Poisson distribution will make the algorithm converge.
118         Binomial distribution would be more precise,
119         but Poisson is more practical, as it effectively gives
120         less information content to high ratio results.
121
122         Even when applying other assumptions on the loss ratio function
123         (increasing function, limit zero ratio when load goes to zero,
124         global upper limit on rate of packets processed), there are still
125         too many different shapes of possible loss functions,
126         which makes full Bayesian reasoning intractable.
127
128         This implementation radically simplifies things by examining
129         only two shapes, each with finitely many (in this case just two)
130         parameters. In other words, two fitting functions
131         (each with two parameters and one argument).
132         When restricting model space to one of the two fitting functions,
133         the Bayesian inference becomes tractable (even though it needs
134         numerical integration from Integrator class).
135
136         The first measurement is done at the middle between
137         min_rate and max_rate, to help with convergence
138         if max_rate measurements give loss below target.
139         TODO: Fix overflow error and use min_rate instead of the middle.
140
141         The second measurement is done at max_rate, next few measurements
142         have offered load of previous load minus excess loss rate.
143         This simple rule is found to be good when offered loads
144         so far are way above the critical rate. After few measurements,
145         inference from fitting functions converges faster that this initial
146         "optimistic" procedure.
147
148         Offered loads close to (limiting) critical rate are the most useful,
149         as linear approximation of the fitting function
150         becomes good enough there (thus reducing the impact
151         of the overall shape of fitting function).
152         After several trials, usually one of the fitting functions
153         has better predictions than the other one, but the algorithm
154         does not track that. Simply, it uses the estimate average,
155         alternating between the functions.
156         Multiple workarounds are applied to try and avoid measurements
157         both in zero loss region and in big loss region,
158         as their results tend to make the critical load estimate worse.
159
160         The returned average and stdev is a combination of the two fitting
161         estimates.
162
163         :param min_rate: Avoid measuring at offered loads below this,
164             in packets per second.
165         :param max_rate: Avoid measuring at offered loads above this,
166             in packets per second.
167         :type min_rate: float
168         :type max_rate: float
169         :returns: Average and stdev of critical load estimate.
170         :rtype: 2-tuple of float
171         """
172         stop_time = time.time() + self.timeout
173         min_rate = float(min_rate)
174         max_rate = float(max_rate)
175         logging.info(
176             f"Started search with min_rate {min_rate!r}, "
177             f"max_rate {max_rate!r}"
178         )
179         trial_result_list = list()
180         trial_number = self.trial_number_offset
181         focus_trackers = (None, None)
182         transmit_rate = (min_rate + max_rate) / 2.0
183         lossy_loads = [max_rate]
184         zeros = 0  # How many consecutive zero loss results are happening.
185         while 1:
186             trial_number += 1
187             logging.info(f"Trial {trial_number!r}")
188             results = self.measure_and_compute(
189                 self.trial_duration_per_trial * trial_number, transmit_rate,
190                 trial_result_list, min_rate, max_rate, focus_trackers
191             )
192             measurement, average, stdev, avg1, avg2, focus_trackers = results
193             zeros += 1
194             # TODO: Ratio of fill rate to drain rate seems to have
195             # exponential impact. Make it configurable, or is 4:3 good enough?
196             if measurement.loss_ratio >= self.packet_loss_ratio_target:
197                 for _ in range(4 * zeros):
198                     lossy_loads.append(measurement.target_tr)
199             if measurement.loss_count > 0:
200                 zeros = 0
201             lossy_loads.sort()
202             if stop_time <= time.time():
203                 return average, stdev
204             trial_result_list.append(measurement)
205             if (trial_number - self.trial_number_offset) <= 1:
206                 next_load = max_rate
207             elif (trial_number - self.trial_number_offset) <= 3:
208                 next_load = (measurement.relative_receive_rate / (
209                     1.0 - self.packet_loss_ratio_target))
210             else:
211                 next_load = (avg1 + avg2) / 2.0
212                 if zeros > 0:
213                     if lossy_loads[0] > next_load:
214                         diminisher = math.pow(2.0, 1 - zeros)
215                         next_load = lossy_loads[0] + diminisher * next_load
216                         next_load /= (1.0 + diminisher)
217                     # On zero measurement, we need to drain obsoleted low losses
218                     # even if we did not use them to increase next_load,
219                     # in order to get to usable loses at higher loads.
220                     if len(lossy_loads) > 3:
221                         lossy_loads = lossy_loads[3:]
222                 logging.debug(
223                     f"Zeros {zeros!r} orig {(avg1 + avg2) / 2.0!r} "
224                     f"next {next_load!r} loads {lossy_loads!r}"
225                 )
226             transmit_rate = min(max_rate, max(min_rate, next_load))
227
228     @staticmethod
229     def lfit_stretch(trace, load, mrr, spread):
230         """Stretch-based fitting function.
231
232         Return the logarithm of average packet loss per second
233         when the load (argument) is offered to a system with given
234         mrr and spread (parameters).
235         Stretch function is 1/(1+Exp[-x]). The average itself is definite
236         integral from zero to load, of shifted and x-scaled stretch function.
237         As the integrator is sensitive to discontinuities,
238         and it calls this function at large areas of parameter space,
239         the implementation has to avoid rounding errors, overflows,
240         and correctly approximate underflows.
241
242         TODO: Explain how the high-level description
243         has been converted into an implementation full of ifs.
244
245         :param trace: A multiprocessing-friendly logging function (closure).
246         :param load: Offered load (positive), in packets per second.
247         :param mrr: Parameter of this fitting function, equal to limiting
248             (positive) average number of packets received (as opposed to lost)
249             when offered load is many spreads more than mrr.
250         :param spread: The x-scaling parameter (positive). No nice semantics,
251             roughly corresponds to size of "tail" for loads below mrr.
252         :type trace: function (str, object) -> NoneType
253         :type load: float
254         :type mrr: float
255         :type spread: float
256         :returns: Logarithm of average number of packets lost per second.
257         :rtype: float
258         """
259         # TODO: What is the fastest way to use such values?
260         log_2 = math.log(2)
261         log_3 = math.log(3)
262         log_spread = math.log(spread)
263         # TODO: chi is from https://en.wikipedia.org/wiki/Nondimensionalization
264         chi = (load - mrr) / spread
265         chi0 = -mrr / spread
266         trace(u"stretch: load", load)
267         trace(u"mrr", mrr)
268         trace(u"spread", spread)
269         trace(u"chi", chi)
270         trace(u"chi0", chi0)
271         if chi > 0:
272             log_lps = math.log(
273                 load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread
274             )
275             trace(u"big loss direct log_lps", log_lps)
276         else:
277             two_positive = log_plus(chi, 2 * chi0 - log_2)
278             two_negative = log_plus(chi0, 2 * chi - log_2)
279             if two_positive <= two_negative:
280                 log_lps = log_minus(chi, chi0) + log_spread
281                 trace(u"small loss crude log_lps", log_lps)
282                 return log_lps
283             two = log_minus(two_positive, two_negative)
284             three_positive = log_plus(two_positive, 3 * chi - log_3)
285             three_negative = log_plus(two_negative, 3 * chi0 - log_3)
286             three = log_minus(three_positive, three_negative)
287             if two == three:
288                 log_lps = two + log_spread
289                 trace(u"small loss approx log_lps", log_lps)
290             else:
291                 log_lps = math.log(log_plus(0, chi) - log_plus(0, chi0))
292                 log_lps += log_spread
293                 trace(u"small loss direct log_lps", log_lps)
294         return log_lps
295
296     @staticmethod
297     def lfit_erf(trace, load, mrr, spread):
298         """Erf-based fitting function.
299
300         Return the logarithm of average packet loss per second
301         when the load (argument) is offered to a system with given
302         mrr and spread (parameters).
303         Erf function is Primitive function to normal distribution density.
304         The average itself is definite integral from zero to load,
305         of shifted and x-scaled erf function.
306         As the integrator is sensitive to discontinuities,
307         and it calls this function at large areas of parameter space,
308         the implementation has to avoid rounding errors, overflows,
309         and correctly approximate underflows.
310
311         TODO: Explain how the high-level description
312         has been converted into an implementation full of ifs.
313
314         :param trace: A multiprocessing-friendly logging function (closure).
315         :param load: Offered load (positive), in packets per second.
316         :param mrr: Parameter of this fitting function, equal to limiting
317             (positive) average number of packets received (as opposed to lost)
318             when offered load is many spreads more than mrr.
319         :param spread: The x-scaling parameter (positive). No nice semantics,
320             roughly corresponds to size of "tail" for loads below mrr.
321         :type trace: function (str, object) -> NoneType
322         :type load: float
323         :type mrr: float
324         :type spread: float
325         :returns: Logarithm of average number of packets lost per second.
326         :rtype: float
327         """
328         # Beware, this chi has the sign opposite to the stretch function chi.
329         # TODO: The stretch sign is just to have less minuses. Worth changing?
330         chi = (mrr - load) / spread
331         chi0 = mrr / spread
332         trace(u"Erf: load", load)
333         trace(u"mrr", mrr)
334         trace(u"spread", spread)
335         trace(u"chi", chi)
336         trace(u"chi0", chi0)
337         if chi >= -1.0:
338             trace(u"positive, b roughly bigger than m", None)
339             if chi > math.exp(10):
340                 first = PLRsearch.log_xerfcx_10 + 2 * (math.log(chi) - 10)
341                 trace(u"approximated first", first)
342             else:
343                 first = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi))
344                 trace(u"exact first", first)
345             first -= chi * chi
346             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
347             second -= chi0 * chi0
348             intermediate = log_minus(first, second)
349             trace(u"first", first)
350         else:
351             trace(u"negative, b roughly smaller than m", None)
352             exp_first = PLRsearch.xerfcx_limit + chi * erfcx(-chi)
353             exp_first *= math.exp(-chi * chi)
354             exp_first -= 2 * chi
355             # TODO: Why has the following line chi there (as opposed to chi0)?
356             # In general the functions would be more readable if they explicitly
357             #     return math.log(func(chi) - func(chi0))
358             # for some function "func", at least for some branches.
359             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
360             second -= chi0 * chi0
361             intermediate = math.log(exp_first - math.exp(second))
362             trace(u"exp_first", exp_first)
363         trace(u"second", second)
364         trace(u"intermediate", intermediate)
365         result = intermediate + math.log(spread) - math.log(erfc(-chi0))
366         trace(u"result", result)
367         return result
368
369     @staticmethod
370     def find_critical_rate(
371             trace, lfit_func, min_rate, max_rate, loss_ratio_target,
372             mrr, spread):
373         """Given ratio target and parameters, return the achieving offered load.
374
375         This is basically an inverse function to lfit_func
376         when parameters are fixed.
377         Instead of implementing effective implementation
378         of the inverse function, this implementation uses
379         brute force binary search. It is bisecting (nim_rate, max_rate) interval
380         until the critical load is found (or interval becomes degenerate).
381         This implementation assures min and max rate limits are honored.
382
383         TODO: Use some method with faster convergence?
384
385         :param trace: A multiprocessing-friendly logging function (closure).
386         :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
387         :param min_rate: Lower bound for binary search [pps].
388         :param max_rate: Upper bound for binary search [pps].
389         :param loss_ratio_target: Fitting function should return loss rate
390             giving this ratio at the returned load and parameters [1].
391         :param mrr: The mrr parameter for the fitting function [pps].
392         :param spread: The spread parameter for the fittinmg function [pps].
393         :type trace: function (str, object) -> None
394         :type lfit_func: Function from 3 floats to float.
395         :type min_rate: float
396         :type max_rate: float
397         :type loss_ratio_target: float
398         :type mrr: float
399         :type spread: float
400         :returns: Load [pps] which achieves the target with given parameters.
401         :rtype: float
402         """
403         trace("Finding critical rate for loss_ratio_target", loss_ratio_target)
404         rate_lo = min_rate
405         rate_hi = max_rate
406         loss_ratio = -1
407         while loss_ratio != loss_ratio_target:
408             rate = (rate_hi + rate_lo) / 2.0
409             if rate in (rate_hi, rate_lo):
410                 break
411             loss_rate = math.exp(lfit_func(trace, rate, mrr, spread))
412             loss_ratio = loss_rate / rate
413             if loss_ratio > loss_ratio_target:
414                 trace(u"halving down", rate)
415                 rate_hi = rate
416             elif loss_ratio < loss_ratio_target:
417                 trace(u"halving up", rate)
418                 rate_lo = rate
419         trace(u"found", rate)
420         return rate
421
422     @staticmethod
423     def log_weight(trace, lfit_func, trial_result_list, mrr, spread):
424         """Return log of weight of trial results by the function and parameters.
425
426         Integrator assumes uniform distribution, but over different parameters.
427         Weight and likelihood are used interchangeably here anyway.
428
429         Each trial has an intended load, a sent count and a loss count
430         (probably counting unsent packets as loss, as they signal
431         the load is too high for the traffic generator).
432         The fitting function is used to compute the average loss rate.
433         Geometric distribution (with average loss per trial) is used
434         to get likelihood of one trial result, the overal likelihood
435         is a product of all trial likelihoods.
436         As likelihoods can be extremely small, logarithms are tracked instead.
437
438         The current implementation does not use direct loss rate
439         from the fitting function, as the input and output units may not match
440         (e.g. intended load in TCP transactions, loss in packets).
441         Instead, the expected average loss is scaled according to the number
442         of packets actually sent.
443
444         TODO: Copy ReceiveRateMeasurement from MLRsearch.
445
446         :param trace: A multiprocessing-friendly logging function (closure).
447         :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
448         :param trial_result_list: List of trial measurement results.
449         :param mrr: The mrr parameter for the fitting function.
450         :param spread: The spread parameter for the fitting function.
451         :type trace: function (str, object) -> None
452         :type lfit_func: Function from 3 floats to float.
453         :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
454         :type mrr: float
455         :type spread: float
456         :returns: Logarithm of result weight for given function and parameters.
457         :rtype: float
458         """
459         log_likelihood = 0.0
460         trace(u"log_weight for mrr", mrr)
461         trace(u"spread", spread)
462         for result in trial_result_list:
463             trace(u"for tr", result.target_tr)
464             trace(u"lc", result.loss_count)
465             trace(u"d", result.duration)
466             # _rel_ values use units of target_tr (transactions per second).
467             log_avg_rel_loss_per_second = lfit_func(
468                 trace, result.target_tr, mrr, spread
469             )
470             # _abs_ values use units of loss count (maybe packets).
471             # There can be multiple packets per transaction.
472             log_avg_abs_loss_per_trial = log_avg_rel_loss_per_second + math.log(
473                 result.transmit_count / result.target_tr
474             )
475             # Geometric probability computation for logarithms.
476             log_trial_likelihood = log_plus(0.0, -log_avg_abs_loss_per_trial)
477             log_trial_likelihood *= -result.loss_count
478             log_trial_likelihood -= log_plus(0.0, +log_avg_abs_loss_per_trial)
479             log_likelihood += log_trial_likelihood
480             trace(u"avg_loss_per_trial", math.exp(log_avg_abs_loss_per_trial))
481             trace(u"log_trial_likelihood", log_trial_likelihood)
482         return log_likelihood
483
484     def measure_and_compute(
485             self, trial_duration, transmit_rate, trial_result_list,
486             min_rate, max_rate, focus_trackers=(None, None), max_samples=None):
487         """Perform both measurement and computation at once.
488
489         High level steps: Prepare and launch computation worker processes,
490         perform the measurement, stop computation and combine results.
491
492         Integrator needs a specific function to process (-1, 1) parameters.
493         As our fitting functions use dimensional parameters,
494         so a transformation is performed, resulting in a specific prior
495         distribution over the dimensional parameters.
496         Maximal rate (line rate) is needed for that transformation.
497
498         Two fitting functions are used, computation is started
499         on temporary worker process per fitting function. After the measurement,
500         average and stdev of the critical rate (not log) of each worker
501         are combined and returned. Raw averages are also returned,
502         offered load for next iteration is chosen based on them.
503         The idea is that one fitting function might be fitting much better,
504         measurements at its avg are best for relevant results (for both),
505         but we do not know which fitting function it is.
506
507         Focus trackers are updated in-place. If a focus tracker in None,
508         new instance is created.
509
510         TODO: Define class for result object, so that fields are documented.
511         TODO: Re-use processes, instead creating on each computation?
512         TODO: As only one result is needed fresh, figure out a way
513         how to keep the other worker running. This will alow shorter
514         duration per trial. Special handling at first and last measurement
515         will be needed (to properly initialize and to properly combine results).
516
517         :param trial_duration: Length of the measurement in seconds.
518         :param transmit_rate: Offered load in packets per second.
519         :param trial_result_list: Results of previous measurements.
520         :param min_rate: Practical minimum of possible ofered load.
521         :param max_rate: Practical maximum of possible ofered load.
522         :param focus_trackers: Pair of trackers initialized
523             to speed up the numeric computation.
524         :param max_samples: Limit for integrator samples, for debugging.
525         :type trial_duration: float
526         :type transmit_rate: float
527         :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
528         :type min_rate: float
529         :type max_rate: float
530         :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker
531         :type max_samples: None or int
532         :returns: Measurement and computation results.
533         :rtype: _ComputeResult
534         """
535         logging.debug(
536             f"measure_and_compute started with self {self!r}, trial_duration "
537             f"{trial_duration!r}, transmit_rate {transmit_rate!r}, "
538             f"trial_result_list {trial_result_list!r}, max_rate {max_rate!r}, "
539             f"focus_trackers {focus_trackers!r}, max_samples {max_samples!r}"
540         )
541         # Preparation phase.
542         dimension = 2
543         stretch_focus_tracker, erf_focus_tracker = focus_trackers
544         if stretch_focus_tracker is None:
545             stretch_focus_tracker = stat_trackers.VectorStatTracker(dimension)
546             stretch_focus_tracker.unit_reset()
547         if erf_focus_tracker is None:
548             erf_focus_tracker = stat_trackers.VectorStatTracker(dimension)
549             erf_focus_tracker.unit_reset()
550         old_trackers = stretch_focus_tracker.copy(), erf_focus_tracker.copy()
551
552         def start_computing(fitting_function, focus_tracker):
553             """Just a block of code to be used for each fitting function.
554
555             Define function for integrator, create process and pipe ends,
556             start computation, return the boss pipe end.
557
558             :param fitting_function: lfit_erf or lfit_stretch.
559             :param focus_tracker: Tracker initialized to speed up the numeric
560                 computation.
561             :type fitting_function: Function from 3 floats to float.
562             :type focus_tracker: None or stat_trackers.VectorStatTracker
563             :returns: Boss end of communication pipe.
564             :rtype: multiprocessing.Connection
565             """
566
567             def value_logweight_func(trace, x_mrr, x_spread):
568                 """Return log of critical rate and log of likelihood.
569
570                 This is a closure. The ancestor function got
571                 trial_result_list as a parameter, and we are accessing it.
572                 As integrator has strict conditions on function signature,
573                 trial_result_list cannot be an explicit argument
574                 of the current function.
575                 This is also why we have to define this closure
576                 at each invocation of the ancestor function anew.
577
578                 The dimensional spread parameter is the (dimensional) mrr
579                 raised to the power of x_spread scaled to interval (0, 1).
580                 The dimensional mrr parameter distribution has shape of
581                 1/(1+x^2), but x==1 corresponds to max_rate
582                 and 1.0 pps is added to avoid numerical problems in fitting
583                 functions.
584
585                 TODO: x^-2 (for x>1.0) might be simpler/nicer prior.
586
587                 :param trace: Multiprocessing-safe logging function (closure).
588                 :param x_mrr: The first dimensionless param
589                     from (-1, 1) interval.
590                 :param x_spread: The second dimensionless param
591                     from (-1, 1) interval.
592                 :type trace: function (str, object) -> None
593                 :type x_mrr: float
594                 :type x_spread: float
595                 :returns: Log of critical rate [pps] and log of likelihood.
596                 :rtype: 2-tuple of float
597                 """
598                 mrr = max_rate * (1.0 / (x_mrr + 1.0) - 0.5) + 1.0
599                 spread = math.exp((x_spread + 1.0) / 2.0 * math.log(mrr))
600                 logweight = self.log_weight(
601                     trace, fitting_function, trial_result_list, mrr, spread
602                 )
603                 value = math.log(
604                     self.find_critical_rate(
605                         trace, fitting_function, min_rate, max_rate,
606                         self.packet_loss_ratio_target, mrr, spread
607                     )
608                 )
609                 return value, logweight
610
611             dilled_function = dill.dumps(value_logweight_func)
612             boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
613             # Do not send yet, run the worker first to avoid a deadlock.
614             # See https://stackoverflow.com/a/15716500
615             worker = multiprocessing.Process(
616                 target=Integrator.try_estimate_nd,
617                 args=(worker_pipe_end, 10.0, self.trace_enabled)
618             )
619             worker.daemon = True
620             worker.start()
621             boss_pipe_end.send(
622                 (dimension, dilled_function, focus_tracker, max_samples)
623             )
624             return boss_pipe_end
625
626         erf_pipe = start_computing(self.lfit_erf, erf_focus_tracker)
627         stretch_pipe = start_computing(self.lfit_stretch, stretch_focus_tracker)
628
629         # Measurement phase.
630         measurement = self.measurer.measure(trial_duration, transmit_rate)
631
632         # Processing phase.
633         def stop_computing(name, pipe):
634             """Just a block of code to be used for each worker.
635
636             Send stop object, poll for result, then either
637             unpack response, log messages and return, or raise traceback.
638
639             TODO: Define class/structure for the return value?
640
641             :param name: Human friendly worker identifier for logging purposes.
642             :param pipe: Boss end of connection towards worker to stop.
643             :type name: str
644             :type pipe: multiprocessing.Connection
645             :returns: Computed value tracker, actual focus tracker,
646                 and number of samples used for this iteration.
647             :rtype: _PartialResult
648             """
649             # If worker encountered an exception, we get it in the recv below,
650             # but send will report a broken pipe.
651             # EAFP says we should ignore the error (instead of polling first).
652             # https://devblogs.microsoft.com/python
653             #   /idiomatic-python-eafp-versus-lbyl/
654             try:
655                 pipe.send(None)
656             except BrokenPipeError:
657                 pass
658             if not pipe.poll(10.0):
659                 raise RuntimeError(f"Worker {name} did not finish!")
660             result_or_traceback = pipe.recv()
661             try:
662                 value_tracker, focus_tracker, debug_list, trace_list, sampls = (
663                     result_or_traceback
664                 )
665             except ValueError:
666                 raise RuntimeError(
667                     f"Worker {name} failed with the following traceback:\n"
668                     f"{result_or_traceback}"
669                 )
670             logging.info(f"Logs from worker {name!r}:")
671             for message in debug_list:
672                 logging.info(message)
673             for message in trace_list:
674                 logging.debug(message)
675             logging.debug(
676                 f"trackers: value {value_tracker!r} focus {focus_tracker!r}"
677             )
678             return _PartialResult(value_tracker, focus_tracker, sampls)
679
680         stretch_result = stop_computing(u"stretch", stretch_pipe)
681         erf_result = stop_computing(u"erf", erf_pipe)
682         result = PLRsearch._get_result(measurement, stretch_result, erf_result)
683         logging.info(
684             f"measure_and_compute finished with trial result "
685             f"{result.measurement!r} avg {result.avg!r} stdev {result.stdev!r} "
686             f"stretch {result.stretch_exp_avg!r} erf {result.erf_exp_avg!r} "
687             f"new trackers {result.trackers!r} old trackers {old_trackers!r} "
688             f"stretch samples {stretch_result.samples!r} erf samples "
689             f"{erf_result.samples!r}"
690         )
691         return result
692
693     @staticmethod
694     def _get_result(measurement, stretch_result, erf_result):
695         """Process and collate results from measure_and_compute.
696
697         Turn logarithm based values to exponential ones,
698         combine averages and stdevs of two fitting functions into a whole.
699
700         :param measurement: The trial measurement obtained during computation.
701         :param stretch_result: Computation output for stretch fitting function.
702         :param erf_result: Computation output for erf fitting function.
703         :type measurement: ReceiveRateMeasurement
704         :type stretch_result: _PartialResult
705         :type erf_result: _PartialResult
706         :returns: Combined results.
707         :rtype: _ComputeResult
708         """
709         stretch_avg = stretch_result.value_tracker.average
710         erf_avg = erf_result.value_tracker.average
711         stretch_var = stretch_result.value_tracker.get_pessimistic_variance()
712         erf_var = erf_result.value_tracker.get_pessimistic_variance()
713         avg_log = (stretch_avg + erf_avg) / 2.0
714         var_log = (stretch_var + erf_var) / 2.0
715         var_log += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
716         stdev_log = math.sqrt(var_log)
717         low, upp = math.exp(avg_log - stdev_log), math.exp(avg_log + stdev_log)
718         avg = (low + upp) / 2
719         stdev = avg - low
720         trackers = (stretch_result.focus_tracker, erf_result.focus_tracker)
721         sea = math.exp(stretch_avg)
722         eea = math.exp(erf_avg)
723         return _ComputeResult(measurement, avg, stdev, sea, eea, trackers)
724
725
726 # Named tuples, for multiple local variables to be passed as return value.
727 _PartialResult = namedtuple(
728     u"_PartialResult", u"value_tracker focus_tracker samples"
729 )
730 """Two stat trackers and sample counter.
731
732 :param value_tracker: Tracker for the value (critical load) being integrated.
733 :param focus_tracker: Tracker for focusing integration inputs (sample points).
734 :param samples: How many samples were used for the computation.
735 :type value_tracker: stat_trackers.ScalarDualStatTracker
736 :type focus_tracker: stat_trackers.VectorStatTracker
737 :type samples: int
738 """
739
740 _ComputeResult = namedtuple(
741     u"_ComputeResult",
742     u"measurement avg stdev stretch_exp_avg erf_exp_avg trackers"
743 )
744 """Measurement, 4 computation result values, pair of trackers.
745
746 :param measurement: The trial measurement result obtained during computation.
747 :param avg: Overall average of critical rate estimate.
748 :param stdev: Overall standard deviation of critical rate estimate.
749 :param stretch_exp_avg: Stretch fitting function estimate average exponentiated.
750 :param erf_exp_avg: Erf fitting function estimate average, exponentiated.
751 :param trackers: Pair of focus trackers to start next iteration with.
752 :type measurement: ReceiveRateMeasurement
753 :type avg: float
754 :type stdev: float
755 :type stretch_exp_avg: float
756 :type erf_exp_avg: float
757 :type trackers: 2-tuple of stat_trackers.VectorStatTracker
758 """