e20d293d3c0f8425c9a8c826c914b12e8726167e
[csit.git] / resources / libraries / python / PLRsearch / PLRsearch.py
1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Module holding PLRsearch class."""
15
16 import logging
17 import math
18 import multiprocessing
19 import time
20
21 from collections import namedtuple
22
23 import dill
24
25 from scipy.special import erfcx, erfc
26
27 # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
28 # then switch to absolute imports within PLRsearch package.
29 # Current usage of relative imports is just a short term workaround.
30 from . import Integrator
31 from . import stat_trackers
32 from .log_plus import log_plus, log_minus
33
34
35 class PLRsearch:
36     """A class to encapsulate data relevant for the search method.
37
38     The context is performance testing of packet processing systems.
39     The system, when being offered a steady stream of packets,
40     can process some of them successfully, other are considered "lost".
41
42     See docstring of the search method for algorithm description.
43
44     Two constants are stored as class fields for speed.
45
46     Method other than search (and than __init__)
47     are just internal code structure.
48
49     TODO: Those method names should start with underscore then.
50     """
51
52     xerfcx_limit = math.pow(math.acos(0), -0.5)
53     log_xerfcx_10 = math.log(xerfcx_limit - math.exp(10) * erfcx(math.exp(10)))
54
55     def __init__(
56             self, measurer, trial_duration_per_trial, packet_loss_ratio_target,
57             trial_number_offset=0, timeout=1800.0, trace_enabled=False):
58         """Store rate measurer and additional parameters.
59
60         TODO: Copy AbstractMeasurer from MLRsearch.
61
62         :param measurer: The measurer to call when searching.
63         :param trial_duration_per_trial: Each trial has larger duration
64             than the previous trial. This is the increment, in seconds.
65         :param packet_loss_ratio_target: The algorithm tries to estimate
66             the offered load leading to this ratio on average.
67             Trial ratio is number of packets lost divided by packets offered.
68         :param trial_number_offset: The "first" trial number will be 1+this.
69             Use this to ensure first iterations have enough time to compute
70             reasonable estimates for later trials to use.
71         :param timeout: The search ends if it lasts more than this many seconds.
72         :type measurer: MLRsearch.AbstractMeasurer
73         :type trial_duration_per_trial: float
74         :type packet_loss_ratio_target: float
75         :type trial_number_offset: int
76         :type timeout: float
77         """
78         self.measurer = measurer
79         self.trial_duration_per_trial = float(trial_duration_per_trial)
80         self.packet_loss_ratio_target = float(packet_loss_ratio_target)
81         self.trial_number_offset = int(trial_number_offset)
82         self.timeout = float(timeout)
83         self.trace_enabled = bool(trace_enabled)
84
85     def search(self, min_rate, max_rate):
86         """Perform the search, return average and stdev for throughput estimate.
87
88         Considering measurer and packet_loss_ratio_target (see __init__),
89         find such an offered load (called critical load) that is expected
90         to hit the target loss ratio in the limit of very long trial duration.
91         As the system is probabilistic (and test duration is finite),
92         the critical ratio is only estimated.
93         Return the average and standard deviation of the estimate.
94
95         In principle, this algorithm performs trial measurements,
96         each with varied offered load (which is constant during the trial).
97         During each measurement, Bayesian inference is performed
98         on all the measurement results so far.
99         When timeout is up, the last estimate is returned,
100         else another trial is performed.
101
102         It is assumed that the system under test, even though not deterministic,
103         still follows the rule of large numbers. In another words,
104         any growing set of measurements at a particular offered load
105         will converge towards unique (for the given load) packet loss ratio.
106         This means there is a deterministic (but unknown) function
107         mapping the offered load to average loss ratio.
108         This function is called loss ratio function.
109         This also assumes the average loss ratio
110         does not depend on trial duration.
111
112         The actual probability distribution of loss counts, achieving
113         the average ratio on trials of various duration
114         can be complicated (and can depend on offered load), but simply assuming
115         Poisson distribution will make the algorithm converge.
116         Binomial distribution would be more precise,
117         but Poisson is more practical, as it effectively gives
118         less information content to high ratio results.
119
120         Even when applying other assumptions on the loss ratio function
121         (increasing function, limit zero ratio when load goes to zero,
122         global upper limit on rate of packets processed), there are still
123         too many different shapes of possible loss functions,
124         which makes full Bayesian reasoning intractable.
125
126         This implementation radically simplifies things by examining
127         only two shapes, each with finitely many (in this case just two)
128         parameters. In other words, two fitting functions
129         (each with two parameters and one argument).
130         When restricting model space to one of the two fitting functions,
131         the Bayesian inference becomes tractable (even though it needs
132         numerical integration from Integrator class).
133
134         The first measurement is done at the middle between
135         min_rate and max_rate, to help with convergence
136         if max_rate measurements give loss below target.
137         TODO: Fix overflow error and use min_rate instead of the middle.
138
139         The second measurement is done at max_rate, next few measurements
140         have offered load of previous load minus excess loss rate.
141         This simple rule is found to be good when offered loads
142         so far are way above the critical rate. After few measurements,
143         inference from fitting functions converges faster that this initial
144         "optimistic" procedure.
145
146         Offered loads close to (limiting) critical rate are the most useful,
147         as linear approximation of the fitting function
148         becomes good enough there (thus reducing the impact
149         of the overall shape of fitting function).
150         After several trials, usually one of the fitting functions
151         has better predictions than the other one, but the algorithm
152         does not track that. Simply, it uses the estimate average,
153         alternating between the functions.
154         Multiple workarounds are applied to try and avoid measurements
155         both in zero loss region and in big loss region,
156         as their results tend to make the critical load estimate worse.
157
158         The returned average and stdev is a combination of the two fitting
159         estimates.
160
161         :param min_rate: Avoid measuring at offered loads below this,
162             in packets per second.
163         :param max_rate: Avoid measuring at offered loads above this,
164             in packets per second.
165         :type min_rate: float
166         :type max_rate: float
167         :returns: Average and stdev of critical load estimate.
168         :rtype: 2-tuple of float
169         """
170         stop_time = time.time() + self.timeout
171         min_rate = float(min_rate)
172         max_rate = float(max_rate)
173         logging.info(
174             f"Started search with min_rate {min_rate!r}, "
175             f"max_rate {max_rate!r}"
176         )
177         trial_result_list = list()
178         trial_number = self.trial_number_offset
179         focus_trackers = (None, None)
180         transmit_rate = (min_rate + max_rate) / 2.0
181         lossy_loads = [max_rate]
182         zeros = 0  # How many consecutive zero loss results are happening.
183         while 1:
184             trial_number += 1
185             logging.info(f"Trial {trial_number!r}")
186             results = self.measure_and_compute(
187                 self.trial_duration_per_trial * trial_number, transmit_rate,
188                 trial_result_list, min_rate, max_rate, focus_trackers
189             )
190             measurement, average, stdev, avg1, avg2, focus_trackers = results
191             zeros += 1
192             # TODO: Ratio of fill rate to drain rate seems to have
193             # exponential impact. Make it configurable, or is 4:3 good enough?
194             if measurement.loss_fraction >= self.packet_loss_ratio_target:
195                 for _ in range(4 * zeros):
196                     lossy_loads.append(measurement.target_tr)
197             if measurement.loss_count > 0:
198                 zeros = 0
199             lossy_loads.sort()
200             if stop_time <= time.time():
201                 return average, stdev
202             trial_result_list.append(measurement)
203             if (trial_number - self.trial_number_offset) <= 1:
204                 next_load = max_rate
205             elif (trial_number - self.trial_number_offset) <= 3:
206                 next_load = (measurement.receive_rate / (
207                     1.0 - self.packet_loss_ratio_target))
208             else:
209                 next_load = (avg1 + avg2) / 2.0
210                 if zeros > 0:
211                     if lossy_loads[0] > next_load:
212                         diminisher = math.pow(2.0, 1 - zeros)
213                         next_load = lossy_loads[0] + diminisher * next_load
214                         next_load /= (1.0 + diminisher)
215                     # On zero measurement, we need to drain obsoleted low losses
216                     # even if we did not use them to increase next_load,
217                     # in order to get to usable loses at higher loads.
218                     if len(lossy_loads) > 3:
219                         lossy_loads = lossy_loads[3:]
220                 logging.debug(
221                     f"Zeros {zeros!r} orig {(avg1 + avg2) / 2.0!r} "
222                     f"next {next_load!r} loads {lossy_loads!r}"
223                 )
224             transmit_rate = min(max_rate, max(min_rate, next_load))
225
226     @staticmethod
227     def lfit_stretch(trace, load, mrr, spread):
228         """Stretch-based fitting function.
229
230         Return the logarithm of average packet loss per second
231         when the load (argument) is offered to a system with given
232         mrr and spread (parameters).
233         Stretch function is 1/(1+Exp[-x]). The average itself is definite
234         integral from zero to load, of shifted and x-scaled stretch function.
235         As the integrator is sensitive to discontinuities,
236         and it calls this function at large areas of parameter space,
237         the implementation has to avoid rounding errors, overflows,
238         and correctly approximate underflows.
239
240         TODO: Explain how the high-level description
241         has been converted into an implementation full of ifs.
242
243         :param trace: A multiprocessing-friendly logging function (closure).
244         :param load: Offered load (positive), in packets per second.
245         :param mrr: Parameter of this fitting function, equal to limiting
246             (positive) average number of packets received (as opposed to lost)
247             when offered load is many spreads more than mrr.
248         :param spread: The x-scaling parameter (positive). No nice semantics,
249             roughly corresponds to size of "tail" for loads below mrr.
250         :type trace: function (str, object) -> NoneType
251         :type load: float
252         :type mrr: float
253         :type spread: float
254         :returns: Logarithm of average number of packets lost per second.
255         :rtype: float
256         """
257         # TODO: What is the fastest way to use such values?
258         log_2 = math.log(2)
259         log_3 = math.log(3)
260         log_spread = math.log(spread)
261         # TODO: chi is from https://en.wikipedia.org/wiki/Nondimensionalization
262         chi = (load - mrr) / spread
263         chi0 = -mrr / spread
264         trace(u"stretch: load", load)
265         trace(u"mrr", mrr)
266         trace(u"spread", spread)
267         trace(u"chi", chi)
268         trace(u"chi0", chi0)
269         if chi > 0:
270             log_lps = math.log(
271                 load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread
272             )
273             trace(u"big loss direct log_lps", log_lps)
274         else:
275             two_positive = log_plus(chi, 2 * chi0 - log_2)
276             two_negative = log_plus(chi0, 2 * chi - log_2)
277             if two_positive <= two_negative:
278                 log_lps = log_minus(chi, chi0) + log_spread
279                 trace(u"small loss crude log_lps", log_lps)
280                 return log_lps
281             two = log_minus(two_positive, two_negative)
282             three_positive = log_plus(two_positive, 3 * chi - log_3)
283             three_negative = log_plus(two_negative, 3 * chi0 - log_3)
284             three = log_minus(three_positive, three_negative)
285             if two == three:
286                 log_lps = two + log_spread
287                 trace(u"small loss approx log_lps", log_lps)
288             else:
289                 log_lps = math.log(log_plus(0, chi) - log_plus(0, chi0))
290                 log_lps += log_spread
291                 trace(u"small loss direct log_lps", log_lps)
292         return log_lps
293
294     @staticmethod
295     def lfit_erf(trace, load, mrr, spread):
296         """Erf-based fitting function.
297
298         Return the logarithm of average packet loss per second
299         when the load (argument) is offered to a system with given
300         mrr and spread (parameters).
301         Erf function is Primitive function to normal distribution density.
302         The average itself is definite integral from zero to load,
303         of shifted and x-scaled erf function.
304         As the integrator is sensitive to discontinuities,
305         and it calls this function at large areas of parameter space,
306         the implementation has to avoid rounding errors, overflows,
307         and correctly approximate underflows.
308
309         TODO: Explain how the high-level description
310         has been converted into an implementation full of ifs.
311
312         :param trace: A multiprocessing-friendly logging function (closure).
313         :param load: Offered load (positive), in packets per second.
314         :param mrr: Parameter of this fitting function, equal to limiting
315             (positive) average number of packets received (as opposed to lost)
316             when offered load is many spreads more than mrr.
317         :param spread: The x-scaling parameter (positive). No nice semantics,
318             roughly corresponds to size of "tail" for loads below mrr.
319         :type trace: function (str, object) -> NoneType
320         :type load: float
321         :type mrr: float
322         :type spread: float
323         :returns: Logarithm of average number of packets lost per second.
324         :rtype: float
325         """
326         # Beware, this chi has the sign opposite to the stretch function chi.
327         # TODO: The stretch sign is just to have less minuses. Worth changing?
328         chi = (mrr - load) / spread
329         chi0 = mrr / spread
330         trace(u"Erf: load", load)
331         trace(u"mrr", mrr)
332         trace(u"spread", spread)
333         trace(u"chi", chi)
334         trace(u"chi0", chi0)
335         if chi >= -1.0:
336             trace(u"positive, b roughly bigger than m", None)
337             if chi > math.exp(10):
338                 first = PLRsearch.log_xerfcx_10 + 2 * (math.log(chi) - 10)
339                 trace(u"approximated first", first)
340             else:
341                 first = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi))
342                 trace(u"exact first", first)
343             first -= chi * chi
344             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
345             second -= chi0 * chi0
346             intermediate = log_minus(first, second)
347             trace(u"first", first)
348         else:
349             trace(u"negative, b roughly smaller than m", None)
350             exp_first = PLRsearch.xerfcx_limit + chi * erfcx(-chi)
351             exp_first *= math.exp(-chi * chi)
352             exp_first -= 2 * chi
353             # TODO: Why has the following line chi there (as opposed to chi0)?
354             # In general the functions would be more readable if they explicitly
355             #     return math.log(func(chi) - func(chi0))
356             # for some function "func", at least for some branches.
357             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
358             second -= chi0 * chi0
359             intermediate = math.log(exp_first - math.exp(second))
360             trace(u"exp_first", exp_first)
361         trace(u"second", second)
362         trace(u"intermediate", intermediate)
363         result = intermediate + math.log(spread) - math.log(erfc(-chi0))
364         trace(u"result", result)
365         return result
366
367     @staticmethod
368     def find_critical_rate(
369             trace, lfit_func, min_rate, max_rate, loss_ratio_target,
370             mrr, spread):
371         """Given ratio target and parameters, return the achieving offered load.
372
373         This is basically an inverse function to lfit_func
374         when parameters are fixed.
375         Instead of implementing effective implementation
376         of the inverse function, this implementation uses
377         brute force binary search. It is bisecting (nim_rate, max_rate) interval
378         until the critical load is found (or interval becomes degenerate).
379         This implementation assures min and max rate limits are honored.
380
381         TODO: Use some method with faster convergence?
382
383         :param trace: A multiprocessing-friendly logging function (closure).
384         :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
385         :param min_rate: Lower bound for binary search [pps].
386         :param max_rate: Upper bound for binary search [pps].
387         :param loss_ratio_target: Fitting function should return loss rate
388             giving this ratio at the returned load and parameters [1].
389         :param mrr: The mrr parameter for the fitting function [pps].
390         :param spread: The spread parameter for the fittinmg function [pps].
391         :type trace: function (str, object) -> None
392         :type lfit_func: Function from 3 floats to float.
393         :type min_rate: float
394         :type max_rate: float
395         :type loss_ratio_target: float
396         :type mrr: float
397         :type spread: float
398         :returns: Load [pps] which achieves the target with given parameters.
399         :rtype: float
400         """
401         trace("Finding critical rate for loss_ratio_target", loss_ratio_target)
402         rate_lo = min_rate
403         rate_hi = max_rate
404         loss_ratio = -1
405         while loss_ratio != loss_ratio_target:
406             rate = (rate_hi + rate_lo) / 2.0
407             if rate in (rate_hi, rate_lo):
408                 break
409             loss_rate = math.exp(lfit_func(trace, rate, mrr, spread))
410             loss_ratio = loss_rate / rate
411             if loss_ratio > loss_ratio_target:
412                 trace(u"halving down", rate)
413                 rate_hi = rate
414             elif loss_ratio < loss_ratio_target:
415                 trace(u"halving up", rate)
416                 rate_lo = rate
417         trace(u"found", rate)
418         return rate
419
420     @staticmethod
421     def log_weight(trace, lfit_func, trial_result_list, mrr, spread):
422         """Return log of weight of trial results by the function and parameters.
423
424         Integrator assumes uniform distribution, but over different parameters.
425         Weight and likelihood are used interchangeably here anyway.
426
427         Each trial has an offered load, a duration and a loss count.
428         Fitting function is used to compute the average loss per second.
429         Poisson distribution (with average loss per trial) is used
430         to get likelihood of one trial result, the overal likelihood
431         is a product of all trial likelihoods.
432         As likelihoods can be extremely small, logarithms are tracked instead.
433
434         TODO: Copy ReceiveRateMeasurement from MLRsearch.
435
436         :param trace: A multiprocessing-friendly logging function (closure).
437         :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
438         :param trial_result_list: List of trial measurement results.
439         :param mrr: The mrr parameter for the fitting function.
440         :param spread: The spread parameter for the fittinmg function.
441         :type trace: function (str, object) -> None
442         :type lfit_func: Function from 3 floats to float.
443         :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
444         :type mrr: float
445         :type spread: float
446         :returns: Logarithm of result weight for given function and parameters.
447         :rtype: float
448         """
449         log_likelihood = 0.0
450         trace(u"log_weight for mrr", mrr)
451         trace(u"spread", spread)
452         for result in trial_result_list:
453             trace(u"for tr", result.target_tr)
454             trace(u"lc", result.loss_count)
455             trace(u"d", result.duration)
456             log_avg_loss_per_second = lfit_func(
457                 trace, result.target_tr, mrr, spread
458             )
459             log_avg_loss_per_trial = (
460                 log_avg_loss_per_second + math.log(result.duration)
461             )
462             # Poisson probability computation works nice for logarithms.
463             log_trial_likelihood = (
464                 result.loss_count * log_avg_loss_per_trial
465                 - math.exp(log_avg_loss_per_trial)
466             )
467             log_trial_likelihood -= math.lgamma(1 + result.loss_count)
468             log_likelihood += log_trial_likelihood
469             trace(u"avg_loss_per_trial", math.exp(log_avg_loss_per_trial))
470             trace(u"log_trial_likelihood", log_trial_likelihood)
471         return log_likelihood
472
473     def measure_and_compute(
474             self, trial_duration, transmit_rate, trial_result_list,
475             min_rate, max_rate, focus_trackers=(None, None), max_samples=None):
476         """Perform both measurement and computation at once.
477
478         High level steps: Prepare and launch computation worker processes,
479         perform the measurement, stop computation and combine results.
480
481         Integrator needs a specific function to process (-1, 1) parameters.
482         As our fitting functions use dimensional parameters,
483         so a transformation is performed, resulting in a specific prior
484         distribution over the dimensional parameters.
485         Maximal rate (line rate) is needed for that transformation.
486
487         Two fitting functions are used, computation is started
488         on temporary worker process per fitting function. After the measurement,
489         average and stdev of the critical rate (not log) of each worker
490         are combined and returned. Raw averages are also returned,
491         offered load for next iteration is chosen based on them.
492         The idea is that one fitting function might be fitting much better,
493         measurements at its avg are best for relevant results (for both),
494         but we do not know which fitting function it is.
495
496         Focus trackers are updated in-place. If a focus tracker in None,
497         new instance is created.
498
499         TODO: Define class for result object, so that fields are documented.
500         TODO: Re-use processes, instead creating on each computation?
501         TODO: As only one result is needed fresh, figure out a way
502         how to keep the other worker running. This will alow shorter
503         duration per trial. Special handling at first and last measurement
504         will be needed (to properly initialize and to properly combine results).
505
506         :param trial_duration: Length of the measurement in seconds.
507         :param transmit_rate: Offered load in packets per second.
508         :param trial_result_list: Results of previous measurements.
509         :param min_rate: Practical minimum of possible ofered load.
510         :param max_rate: Practical maximum of possible ofered load.
511         :param focus_trackers: Pair of trackers initialized
512             to speed up the numeric computation.
513         :param max_samples: Limit for integrator samples, for debugging.
514         :type trial_duration: float
515         :type transmit_rate: float
516         :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
517         :type min_rate: float
518         :type max_rate: float
519         :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker
520         :type max_samples: None or int
521         :returns: Measurement and computation results.
522         :rtype: _ComputeResult
523         """
524         logging.debug(
525             f"measure_and_compute started with self {self!r}, trial_duration "
526             f"{trial_duration!r}, transmit_rate {transmit_rate!r}, "
527             f"trial_result_list {trial_result_list!r}, max_rate {max_rate!r}, "
528             f"focus_trackers {focus_trackers!r}, max_samples {max_samples!r}"
529         )
530         # Preparation phase.
531         dimension = 2
532         stretch_focus_tracker, erf_focus_tracker = focus_trackers
533         if stretch_focus_tracker is None:
534             stretch_focus_tracker = stat_trackers.VectorStatTracker(dimension)
535             stretch_focus_tracker.unit_reset()
536         if erf_focus_tracker is None:
537             erf_focus_tracker = stat_trackers.VectorStatTracker(dimension)
538             erf_focus_tracker.unit_reset()
539         old_trackers = stretch_focus_tracker.copy(), erf_focus_tracker.copy()
540
541         def start_computing(fitting_function, focus_tracker):
542             """Just a block of code to be used for each fitting function.
543
544             Define function for integrator, create process and pipe ends,
545             start computation, return the boss pipe end.
546
547             :param fitting_function: lfit_erf or lfit_stretch.
548             :param focus_tracker: Tracker initialized to speed up the numeric
549                 computation.
550             :type fitting_function: Function from 3 floats to float.
551             :type focus_tracker: None or stat_trackers.VectorStatTracker
552             :returns: Boss end of communication pipe.
553             :rtype: multiprocessing.Connection
554             """
555
556             def value_logweight_func(trace, x_mrr, x_spread):
557                 """Return log of critical rate and log of likelihood.
558
559                 This is a closure. The ancestor function got
560                 trial_result_list as a parameter, and we are accessing it.
561                 As integrator has strict conditions on function signature,
562                 trial_result_list cannot be an explicit argument
563                 of the current function.
564                 This is also why we have to define this closure
565                 at each invocation of the ancestor function anew.
566
567                 The dimensional spread parameter is the (dimensional) mrr
568                 raised to the power of x_spread scaled to interval (0, 1).
569                 The dimensional mrr parameter distribution has shape of
570                 1/(1+x^2), but x==1 corresponds to max_rate
571                 and 1.0 pps is added to avoid numerical problems in fitting
572                 functions.
573
574                 TODO: x^-2 (for x>1.0) might be simpler/nicer prior.
575
576                 :param trace: Multiprocessing-safe logging function (closure).
577                 :param x_mrr: The first dimensionless param
578                     from (-1, 1) interval.
579                 :param x_spread: The second dimensionless param
580                     from (-1, 1) interval.
581                 :type trace: function (str, object) -> None
582                 :type x_mrr: float
583                 :type x_spread: float
584                 :returns: Log of critical rate [pps] and log of likelihood.
585                 :rtype: 2-tuple of float
586                 """
587                 mrr = max_rate * (1.0 / (x_mrr + 1.0) - 0.5) + 1.0
588                 spread = math.exp((x_spread + 1.0) / 2.0 * math.log(mrr))
589                 logweight = self.log_weight(
590                     trace, fitting_function, trial_result_list, mrr, spread
591                 )
592                 value = math.log(
593                     self.find_critical_rate(
594                         trace, fitting_function, min_rate, max_rate,
595                         self.packet_loss_ratio_target, mrr, spread
596                     )
597                 )
598                 return value, logweight
599
600             dilled_function = dill.dumps(value_logweight_func)
601             boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
602             boss_pipe_end.send(
603                 (dimension, dilled_function, focus_tracker, max_samples)
604             )
605             worker = multiprocessing.Process(
606                 target=Integrator.try_estimate_nd,
607                 args=(worker_pipe_end, 10.0, self.trace_enabled)
608             )
609             worker.daemon = True
610             worker.start()
611             return boss_pipe_end
612
613         erf_pipe = start_computing(self.lfit_erf, erf_focus_tracker)
614         stretch_pipe = start_computing(self.lfit_stretch, stretch_focus_tracker)
615
616         # Measurement phase.
617         measurement = self.measurer.measure(trial_duration, transmit_rate)
618
619         # Processing phase.
620         def stop_computing(name, pipe):
621             """Just a block of code to be used for each worker.
622
623             Send stop object, poll for result, then either
624             unpack response, log messages and return, or raise traceback.
625
626             TODO: Define class/structure for the return value?
627
628             :param name: Human friendly worker identifier for logging purposes.
629             :param pipe: Boss end of connection towards worker to stop.
630             :type name: str
631             :type pipe: multiprocessing.Connection
632             :returns: Computed value tracker, actual focus tracker,
633                 and number of samples used for this iteration.
634             :rtype: _PartialResult
635             """
636             pipe.send(None)
637             if not pipe.poll(10.0):
638                 raise RuntimeError(f"Worker {name} did not finish!")
639             result_or_traceback = pipe.recv()
640             try:
641                 value_tracker, focus_tracker, debug_list, trace_list, sampls = (
642                     result_or_traceback
643                 )
644             except ValueError:
645                 raise RuntimeError(
646                     f"Worker {name} failed with the following traceback:\n"
647                     f"{result_or_traceback}"
648                 )
649             logging.info(f"Logs from worker {name!r}:")
650             for message in debug_list:
651                 logging.info(message)
652             for message in trace_list:
653                 logging.debug(message)
654             logging.debug(
655                 f"trackers: value {value_tracker!r} focus {focus_tracker!r}"
656             )
657             return _PartialResult(value_tracker, focus_tracker, sampls)
658
659         stretch_result = stop_computing(u"stretch", stretch_pipe)
660         erf_result = stop_computing(u"erf", erf_pipe)
661         result = PLRsearch._get_result(measurement, stretch_result, erf_result)
662         logging.info(
663             f"measure_and_compute finished with trial result "
664             f"{result.measurement!r} avg {result.avg!r} stdev {result.stdev!r} "
665             f"stretch {result.stretch_exp_avg!r} erf {result.erf_exp_avg!r} "
666             f"new trackers {result.trackers!r} old trackers {old_trackers!r} "
667             f"stretch samples {stretch_result.samples!r} erf samples "
668             f"{erf_result.samples!r}"
669         )
670         return result
671
672     @staticmethod
673     def _get_result(measurement, stretch_result, erf_result):
674         """Process and collate results from measure_and_compute.
675
676         Turn logarithm based values to exponential ones,
677         combine averages and stdevs of two fitting functions into a whole.
678
679         :param measurement: The trial measurement obtained during computation.
680         :param stretch_result: Computation output for stretch fitting function.
681         :param erf_result: Computation output for erf fitting function.
682         :type measurement: ReceiveRateMeasurement
683         :type stretch_result: _PartialResult
684         :type erf_result: _PartialResult
685         :returns: Combined results.
686         :rtype: _ComputeResult
687         """
688         stretch_avg = stretch_result.value_tracker.average
689         erf_avg = erf_result.value_tracker.average
690         stretch_var = stretch_result.value_tracker.get_pessimistic_variance()
691         erf_var = erf_result.value_tracker.get_pessimistic_variance()
692         avg_log = (stretch_avg + erf_avg) / 2.0
693         var_log = (stretch_var + erf_var) / 2.0
694         var_log += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
695         stdev_log = math.sqrt(var_log)
696         low, upp = math.exp(avg_log - stdev_log), math.exp(avg_log + stdev_log)
697         avg = (low + upp) / 2
698         stdev = avg - low
699         trackers = (stretch_result.focus_tracker, erf_result.focus_tracker)
700         sea = math.exp(stretch_avg)
701         eea = math.exp(erf_avg)
702         return _ComputeResult(measurement, avg, stdev, sea, eea, trackers)
703
704
705 # Named tuples, for multiple local variables to be passed as return value.
706 _PartialResult = namedtuple(
707     u"_PartialResult", u"value_tracker focus_tracker samples"
708 )
709 """Two stat trackers and sample counter.
710
711 :param value_tracker: Tracker for the value (critical load) being integrated.
712 :param focus_tracker: Tracker for focusing integration inputs (sample points).
713 :param samples: How many samples were used for the computation.
714 :type value_tracker: stat_trackers.ScalarDualStatTracker
715 :type focus_tracker: stat_trackers.VectorStatTracker
716 :type samples: int
717 """
718
719 _ComputeResult = namedtuple(
720     u"_ComputeResult",
721     u"measurement avg stdev stretch_exp_avg erf_exp_avg trackers"
722 )
723 """Measurement, 4 computation result values, pair of trackers.
724
725 :param measurement: The trial measurement result obtained during computation.
726 :param avg: Overall average of critical rate estimate.
727 :param stdev: Overall standard deviation of critical rate estimate.
728 :param stretch_exp_avg: Stretch fitting function estimate average exponentiated.
729 :param erf_exp_avg: Erf fitting function estimate average, exponentiated.
730 :param trackers: Pair of focus trackers to start next iteration with.
731 :type measurement: ReceiveRateMeasurement
732 :type avg: float
733 :type stdev: float
734 :type stretch_exp_avg: float
735 :type erf_exp_avg: float
736 :type trackers: 2-tuple of stat_trackers.VectorStatTracker
737 """