PLRsearch: Use stat trackers to shorten Integrator
[csit.git] / resources / libraries / python / PLRsearch / PLRsearch.py
1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Module holding PLRsearch class."""
15
16 import logging
17 import math
18 import multiprocessing
19 import time
20
21 import dill
22 # TODO: Inform pylint about scipy (of correct version) being available.
23 from scipy.special import erfcx, erfc
24
25 # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
26 # then switch to absolute imports within PLRsearch package.
27 # Current usage of relative imports is just a short term workaround.
28 import Integrator  # pylint: disable=relative-import
29 from log_plus import log_plus, log_minus  # pylint: disable=relative-import
30 import stat_trackers  # pylint: disable=relative-import
31
32
33 class PLRsearch(object):
34     """A class to encapsulate data relevant for the search method.
35
36     The context is performance testing of packet processing systems.
37     The system, when being offered a steady stream of packets,
38     can process some of them successfully, other are considered "lost".
39
40     See docstring of the search method for algorithm description.
41
42     Two constants are stored as class fields for speed.
43
44     Method othed than search (and than __init__)
45     are just internal code structure.
46     TODO: Those method names should start with underscore then.
47     """
48
49     xerfcx_limit = math.pow(math.acos(0), -0.5)
50     log_xerfcx_10 = math.log(xerfcx_limit - math.exp(10) * erfcx(math.exp(10)))
51
52     def __init__(
53             self, measurer, trial_duration_per_trial, packet_loss_ratio_target,
54             trial_number_offset=0, timeout=1800.0, trace_enabled=False):
55         """Store rate measurer and additional parameters.
56
57         TODO: Copy AbstractMeasurer from MLRsearch.
58
59         :param measurer: The measurer to call when searching.
60         :param trial_duration_per_trial: Each trial has larger duration
61             than the previous trial. This is the increment, in seconds.
62         :param packet_loss_ratio_target: The algorithm tries to estimate
63             the offered load leading to this ratio on average.
64             Trial ratio is number of packets lost divided by packets offered.
65         :param trial_number_offset: The "first" trial number will be 1+this.
66             Use this to ensure first iterations have enough time to compute
67             reasonable estimates for later trials to use.
68         :param timeout: The search ends if it lasts more than this many seconds.
69         :type measurer: MLRsearch.AbstractMeasurer
70         :type trial_duration_per_trial: float
71         :type packet_loss_ratio_target: float
72         :type trial_number_offset: int
73         :type timeout: float
74         """
75         self.measurer = measurer
76         self.trial_duration_per_trial = float(trial_duration_per_trial)
77         self.packet_loss_ratio_target = float(packet_loss_ratio_target)
78         self.trial_number_offset = int(trial_number_offset)
79         self.timeout = float(timeout)
80         self.trace_enabled = bool(trace_enabled)
81
82     def search(self, min_rate, max_rate):
83         """Perform the search, return average and stdev for throughput estimate.
84
85         Considering measurer and packet_loss_ratio_target (see __init__),
86         find such an offered load (called critical load) that is expected
87         to hit the target loss ratio in the limit of very long trial duration.
88         As the system is probabilistic (and test duration is finite),
89         the critical ratio is only estimated.
90         Return the average and standard deviation of the estimate.
91
92         In principle, this algorithm performs trial measurements,
93         each with varied offered load (which is constant during the trial).
94         During each measurement, Bayesian inference is performed
95         on all the measurement results so far.
96         When timeout is up, the last estimate is returned,
97         else another trial is performed.
98
99         It is assumed that the system under test, even though not deterministic,
100         still follows the rule of large numbers. In another words,
101         any growing set of measurements at a particular offered load
102         will converge towards unique (for the given load) packet loss ratio.
103         This means there is a deterministic (but unknown) function
104         mapping the offered load to average loss ratio.
105         This function is called loss ratio function.
106         This also assumes the average loss ratio
107         does not depend on trial duration.
108
109         The actual probability distribution of loss counts, achieving
110         the average ratio on trials of various duration
111         can be complicated (and can depend on offered load), but simply assuming
112         Poisson distribution will make the algorithm converge.
113         Binomial distribution would be more precise,
114         but Poisson is more practical, as it effectively gives
115         less information content to high ratio results.
116
117         Even when applying other assumptions on the loss ratio function
118         (increasing function, limit zero ratio when load goes to zero,
119         global upper limit on rate of packets processed), there are still
120         too many different shapes of possible loss functions,
121         which makes full Bayesian reasoning intractable.
122
123         This implementation radically simplifies things by examining
124         only two shapes, each with finitely many (in this case just two)
125         parameters. In other words, two fitting functions
126         (each with two parameters and one argument).
127         When restricting model space to one of the two fitting functions,
128         the Bayesian inference becomes tractable (even though it needs
129         numerical integration from Integrator class).
130
131         The first measurement is done at the middle between
132         min_rate and max_rate, to help with convergence
133         if max_rate measurements give loss below target.
134         TODO: Fix overflow error and use min_rate instead of the middle.
135
136         The second measurement is done at max_rate, next few measurements
137         have offered load of previous load minus excess loss rate.
138         This simple rule is found to be good when offered loads
139         so far are way above the critical rate. After few measurements,
140         inference from fitting functions converges faster that this initial
141         "optimistic" procedure.
142
143         Offered loads close to (limiting) critical rate are the most useful,
144         as linear approximation of the fitting function
145         becomes good enough there (thus reducing the impact
146         of the overall shape of fitting function).
147         After several trials, usually one of the fitting functions
148         has better predictions than the other one, but the algorithm
149         does not track that. Simply, it uses the estimate average,
150         alternating between the functions.
151         Multiple workarounds are applied to try and avoid measurements
152         both in zero loss region and in big loss region,
153         as their results tend to make the critical load estimate worse.
154
155         The returned average and stdev is a combination of the two fitting
156         estimates.
157
158         :param min_rate: Avoid measuring at offered loads below this,
159             in packets per second.
160         :param max_rate: Avoid measuring at offered loads above this,
161             in packets per second.
162         :type min_rate: float
163         :type max_rate: float
164         :returns: Average and stdev of critical load estimate.
165         :rtype: 2-tuple of floats
166         """
167         stop_time = time.time() + self.timeout
168         min_rate = float(min_rate)
169         max_rate = float(max_rate)
170         logging.info("Started search with min_rate %(min)r, max_rate %(max)r",
171                      {"min": min_rate, "max": max_rate})
172         trial_result_list = list()
173         trial_number = self.trial_number_offset
174         focus_trackers = (None, None)
175         transmit_rate = (min_rate + max_rate) / 2.0
176         lossy_loads = [max_rate]
177         zeros = [0, 0]  # Cosecutive zero loss, separately for stretch and erf.
178         while 1:
179             trial_number += 1
180             logging.info("Trial %(number)r", {"number": trial_number})
181             results = self.measure_and_compute(
182                 self.trial_duration_per_trial * trial_number, transmit_rate,
183                 trial_result_list, min_rate, max_rate, focus_trackers)
184             measurement, average, stdev, avg1, avg2, focus_trackers = results
185             index = trial_number % 2
186             zeros[index] += 1
187             # TODO: Ratio of fill rate to drain rate seems to have
188             # exponential impact. Make it configurable, or is 4:3 good enough?
189             if measurement.loss_fraction >= self.packet_loss_ratio_target:
190                 for _ in range(4 * zeros[index]):
191                     lossy_loads.append(measurement.target_tr)
192             if measurement.loss_count > 0:
193                 zeros[index] = 0
194             lossy_loads.sort()
195             if stop_time <= time.time():
196                 return average, stdev
197             trial_result_list.append(measurement)
198             if (trial_number - self.trial_number_offset) <= 1:
199                 next_load = max_rate
200             elif (trial_number - self.trial_number_offset) <= 3:
201                 next_load = (measurement.receive_rate / (
202                     1.0 - self.packet_loss_ratio_target))
203             else:
204                 index = (trial_number + 1) % 2
205                 next_load = (avg1, avg2)[index]
206                 if zeros[index] > 0:
207                     if lossy_loads[0] > next_load:
208                         diminisher = math.pow(2.0, 1 - zeros[index])
209                         next_load = lossy_loads[0] + diminisher * next_load
210                         next_load /= (1.0 + diminisher)
211                     # On zero measurement, we need to drain obsoleted low losses
212                     # even if we did not use them to increase next_load,
213                     # in order to get to usable loses with higher load.
214                     if len(lossy_loads) > 3:
215                         lossy_loads = lossy_loads[3:]
216                 logging.debug("Zeros %(z)r orig %(o)r next %(n)r loads %(s)r",
217                               {"z": zeros, "o": (avg1, avg2)[index],
218                                "n": next_load, "s": lossy_loads})
219             transmit_rate = min(max_rate, max(min_rate, next_load))
220
221     @staticmethod
222     def lfit_stretch(trace, load, mrr, spread):
223         """Stretch-based fitting function.
224
225         Return the logarithm of average packet loss per second
226         when the load (argument) is offered to a system with given
227         mrr and spread (parameters).
228         Stretch function is 1/(1+Exp[-x]). The average itself is definite
229         integral from zero to load, of shifted and x-scaled stretch function.
230         As the integrator is sensitive to discontinuities,
231         and it calls this function at large areas of parameter space,
232         the implementation has to avoid rounding errors, overflows,
233         and correctly approximate underflows.
234
235         TODO: Explain how the high-level description
236         has been converted into an implementation full of ifs.
237
238         :param trace: A multiprocessing-friendly logging function (closure).
239         :param load: Offered load (positive), in packets per second.
240         :param mrr: Parameter of this fitting function, equal to limiting
241             (positive) average number of packets received (as opposed to lost)
242             when offered load is many spreads more than mrr.
243         :param spread: The x-scaling parameter (positive). No nice semantics,
244             roughly corresponds to size of "tail" for loads below mrr.
245         :type trace: function (str, object) -> NoneType
246         :type load: float
247         :type mrr: float
248         :type spread: float
249         :returns: Logarithm of average number of packets lost per second.
250         :rtype: float
251         """
252         # TODO: What is the fastest way to use such values?
253         log_2 = math.log(2)
254         log_3 = math.log(3)
255         log_spread = math.log(spread)
256         # TODO: chi is from https://en.wikipedia.org/wiki/Nondimensionalization
257         chi = (load - mrr) / spread
258         chi0 = -mrr / spread
259         trace("stretch: load", load)
260         trace("mrr", mrr)
261         trace("spread", spread)
262         trace("chi", chi)
263         trace("chi0", chi0)
264         if chi > 0:
265             log_lps = math.log(
266                 load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread)
267             trace("big loss direct log_lps", log_lps)
268         else:
269             two_positive = log_plus(chi, 2 * chi0 - log_2)
270             two_negative = log_plus(chi0, 2 * chi - log_2)
271             if two_positive <= two_negative:
272                 log_lps = log_minus(chi, chi0) + log_spread
273                 trace("small loss crude log_lps", log_lps)
274                 return log_lps
275             two = log_minus(two_positive, two_negative)
276             three_positive = log_plus(two_positive, 3 * chi - log_3)
277             three_negative = log_plus(two_negative, 3 * chi0 - log_3)
278             three = log_minus(three_positive, three_negative)
279             if two == three:
280                 log_lps = two + log_spread
281                 trace("small loss approx log_lps", log_lps)
282             else:
283                 log_lps = math.log(log_plus(0, chi) - log_plus(0, chi0))
284                 log_lps += log_spread
285                 trace("small loss direct log_lps", log_lps)
286         return log_lps
287
288     @staticmethod
289     def lfit_erf(trace, load, mrr, spread):
290         """Erf-based fitting function.
291
292         Return the logarithm of average packet loss per second
293         when the load (argument) is offered to a system with given
294         mrr and spread (parameters).
295         Erf function is Primitive function to normal distribution density.
296         The average itself is definite integral from zero to load,
297         of shifted and x-scaled erf function.
298         As the integrator is sensitive to discontinuities,
299         and it calls this function at large areas of parameter space,
300         the implementation has to avoid rounding errors, overflows,
301         and correctly approximate underflows.
302
303         TODO: Explain how the high-level description
304         has been converted into an implementation full of ifs.
305
306         :param trace: A multiprocessing-friendly logging function (closure).
307         :param load: Offered load (positive), in packets per second.
308         :param mrr: Parameter of this fitting function, equal to limiting
309             (positive) average number of packets received (as opposed to lost)
310             when offered load is many spreads more than mrr.
311         :param spread: The x-scaling parameter (positive). No nice semantics,
312             roughly corresponds to size of "tail" for loads below mrr.
313         :type trace: function (str, object) -> NoneType
314         :type load: float
315         :type mrr: float
316         :type spread: float
317         :returns: Logarithm of average number of packets lost per second.
318         :rtype: float
319         """
320         # Beware, this chi has the sign opposite to the stretch function chi.
321         # TODO: The stretch sign is just to have less minuses. Worth changing?
322         chi = (mrr - load) / spread
323         chi0 = mrr / spread
324         trace("Erf: load", load)
325         trace("mrr", mrr)
326         trace("spread", spread)
327         trace("chi", chi)
328         trace("chi0", chi0)
329         if chi >= -1.0:
330             trace("positive, b roughly bigger than m", None)
331             if chi > math.exp(10):
332                 first = PLRsearch.log_xerfcx_10 + 2 * (math.log(chi) - 10)
333                 trace("approximated first", first)
334             else:
335                 first = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi))
336                 trace("exact first", first)
337             first -= chi * chi
338             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
339             second -= chi0 * chi0
340             intermediate = log_minus(first, second)
341             trace("first", first)
342         else:
343             trace("negative, b roughly smaller than m", None)
344             exp_first = PLRsearch.xerfcx_limit + chi * erfcx(-chi)
345             exp_first *= math.exp(-chi * chi)
346             exp_first -= 2 * chi
347             # TODO: Why has the following line chi there (as opposed to chi0)?
348             # In general the functions would be more readable if they explicitly
349             #     return math.log(func(chi) - func(chi0))
350             # for some function "func", at least for some branches.
351             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
352             second -= chi0 * chi0
353             intermediate = math.log(exp_first - math.exp(second))
354             trace("exp_first", exp_first)
355         trace("second", second)
356         trace("intermediate", intermediate)
357         result = intermediate + math.log(spread) - math.log(erfc(-chi0))
358         trace("result", result)
359         return result
360
361     @staticmethod
362     def find_critical_rate(
363             trace, lfit_func, min_rate, max_rate, loss_ratio_target,
364             mrr, spread):
365         """Given ratio target and parameters, return the achieving offered load.
366
367         This is basically an inverse function to lfit_func
368         when parameters are fixed.
369         Instead of implementing effective implementation
370         of the inverse function, this implementation uses
371         brute force binary search. It is bisecting (nim_rate, max_rate) interval
372         until the critical load is found (or interval becomes degenerate).
373         This implementation assures min and max rate limits are honored.
374
375         TODO: Use some method with faster convergence?
376
377         :param trace: A multiprocessing-friendly logging function (closure).
378         :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
379         :param min_rate: Lower bound for binary search [pps].
380         :param max_rate: Upper bound for binary search [pps].
381         :param loss_ratio_target: Fitting function should return loss rate
382             giving this ratio at the returned load and parameters [1].
383         :param mrr: The mrr parameter for the fitting function [pps].
384         :param spread: The spread parameter for the fittinmg function [pps].
385         :type trace: function (str, object) -> None
386         :type lfit_func: Function from 3 floats to float.
387         :type min_rate: float
388         :type max_rate: float
389         :type log_lps_target: float
390         :type mrr: float
391         :type spread: float
392         :returns: Load [pps] which achieves the target with given parameters.
393         :rtype: float
394         """
395         trace("Finding critical rate for loss_ratio_target", loss_ratio_target)
396         rate_lo = min_rate
397         rate_hi = max_rate
398         loss_ratio = -1
399         while loss_ratio != loss_ratio_target:
400             rate = (rate_hi + rate_lo) / 2.0
401             if rate == rate_hi or rate == rate_lo:
402                 break
403             loss_rate = math.exp(lfit_func(trace, rate, mrr, spread))
404             loss_ratio = loss_rate / rate
405             if loss_ratio > loss_ratio_target:
406                 trace("halving down", rate)
407                 rate_hi = rate
408             elif loss_ratio < loss_ratio_target:
409                 trace("halving up", rate)
410                 rate_lo = rate
411         trace("found", rate)
412         return rate
413
414     @staticmethod
415     def log_weight(trace, lfit_func, trial_result_list, mrr, spread):
416         """Return log of weight of trial results by the function and parameters.
417
418         Integrator assumes uniform distribution, but over different parameters.
419         Weight and likelihood are used interchangeably here anyway.
420
421         Each trial has an offered load, a duration and a loss count.
422         Fitting function is used to compute the average loss per second.
423         Poisson distribution (with average loss per trial) is used
424         to get likelihood of one trial result, the overal likelihood
425         is a product of all trial likelihoods.
426         As likelihoods can be extremely small, logarithms are tracked instead.
427
428         TODO: Copy ReceiveRateMeasurement from MLRsearch.
429
430         :param trace: A multiprocessing-friendly logging function (closure).
431         :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
432         :param result_list: List of trial measurement results.
433         :param mrr: The mrr parameter for the fitting function.
434         :param spread: The spread parameter for the fittinmg function.
435         :type trace: function (str, object) -> None
436         :type lfit_func: Function from 3 floats to float.
437         :type result_list: list of MLRsearch.ReceiveRateMeasurement
438         :type mrr: float
439         :type spread: float
440         :returns: Logarithm of result weight for given function and parameters.
441         :rtype: float
442         """
443         log_likelihood = 0.0
444         trace("log_weight for mrr", mrr)
445         trace("spread", spread)
446         for result in trial_result_list:
447             trace("for tr", result.target_tr)
448             trace("lc", result.loss_count)
449             trace("d", result.duration)
450             log_avg_loss_per_second = lfit_func(
451                 trace, result.target_tr, mrr, spread)
452             log_avg_loss_per_trial = (
453                 log_avg_loss_per_second + math.log(result.duration))
454             # Poisson probability computation works nice for logarithms.
455             log_trial_likelihood = (
456                 result.loss_count * log_avg_loss_per_trial
457                 - math.exp(log_avg_loss_per_trial))
458             log_trial_likelihood -= math.lgamma(1 + result.loss_count)
459             log_likelihood += log_trial_likelihood
460             trace("avg_loss_per_trial", math.exp(log_avg_loss_per_trial))
461             trace("log_trial_likelihood", log_trial_likelihood)
462         return log_likelihood
463
464     # TODO: Refactor (somehow) so pylint stops complaining about
465     # too many local variables.
466     def measure_and_compute(
467             self, trial_duration, transmit_rate, trial_result_list,
468             min_rate, max_rate, focus_trackers=(None, None), max_samples=None):
469         """Perform both measurement and computation at once.
470
471         High level steps: Prepare and launch computation worker processes,
472         perform the measurement, stop computation and combine results.
473
474         Integrator needs a specific function to process (-1, 1) parameters.
475         As our fitting functions use dimensional parameters,
476         so a transformation is performed, resulting in a specific prior
477         distribution over the dimensional parameters.
478         Maximal rate (line rate) is needed for that transformation.
479
480         Two fitting functions are used, computation is started
481         on temporary worker process per fitting function. After the measurement,
482         average and stdev of the critical rate (not log) of each worker
483         are combined and returned. Raw averages are also returned,
484         offered load for next iteration is chosen based on them.
485         The idea is that one fitting function might be fitting much better,
486         measurements at its avg are best for relevant results (for both),
487         but we do not know which fitting function it is.
488
489         Focus trackers are updated in-place. If a focus tracker in None,
490         new instance is created.
491
492         TODO: Define class for result object, so that fields are documented.
493         TODO: Re-use processes, instead creating on each computation?
494         TODO: As only one result is needed fresh, figure out a way
495         how to keep the other worker running. This will alow shorter
496         duration per trial. Special handling at first and last measurement
497         will be needed (to properly initialize and to properly combine results).
498
499         :param trial_duration: Length of the measurement in seconds.
500         :param transmit_rate: Offered load in packets per second.
501         :param trial_result_list: Results of previous measurements.
502         :param min_rate: Practical minimum of possible ofered load.
503         :param max_rate: Practical maximum of possible ofered load.
504         :param focus_trackers: Pair of trackers initialized
505             to speed up the numeric computation.
506         :param max_samples: Limit for integrator samples, for debugging.
507         :type trial_duration: float
508         :type transmit_rate: float
509         :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
510         :type min_rate: float
511         :type max_rate: float
512         :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker
513         :type max_samples: None or int
514         :returns: Measurement and computation results.
515         :rtype: 6-tuple: ReceiveRateMeasurement, 4 floats, 2-tuple of trackers.
516         """
517         logging.debug(
518             "measure_and_compute started with self %(self)r, trial_duration "
519             + "%(dur)r, transmit_rate %(tr)r, trial_result_list %(trl)r, "
520             + "max_rate %(mr)r, focus_trackers %(track)r, max_samples %(ms)r",
521             {"self": self, "dur": trial_duration, "tr": transmit_rate,
522              "trl": trial_result_list, "mr": max_rate, "track": focus_trackers,
523              "ms": max_samples})
524         # Preparation phase.
525         dimension = 2
526         stretch_focus_tracker, erf_focus_tracker = focus_trackers
527         if stretch_focus_tracker is None:
528             stretch_focus_tracker = stat_trackers.VectorStatTracker(dimension)
529             stretch_focus_tracker.unit_reset()
530         if erf_focus_tracker is None:
531             erf_focus_tracker = stat_trackers.VectorStatTracker(dimension)
532             erf_focus_tracker.unit_reset()
533         old_trackers = stretch_focus_tracker.copy(), erf_focus_tracker.copy()
534         def start_computing(fitting_function, focus_tracker):
535             """Just a block of code to be used for each fitting function.
536
537             Define function for integrator, create process and pipe ends,
538             start computation, return the boss pipe end.
539
540             :param fitting_function: lfit_erf or lfit_stretch.
541             :param bias_avg: Tuple of floats to start searching around.
542             :param bias_cov: Covariance matrix defining initial focus shape.
543             :type fitting_function: Function from 3 floats to float.
544             :type bias_avg: 2-tuple of floats
545             :type bias_cov: 2-tuple of 2-tuples of floats
546             :returns: Boss end of communication pipe.
547             :rtype: multiprocessing.Connection
548             """
549             def value_logweight_func(trace, x_mrr, x_spread):
550                 """Return log of critical rate and log of likelihood.
551
552                 This is a closure. The ancestor function got
553                 trial_result_list as a parameter, and we are accessing it.
554                 As integrator has strict conditions on function signature,
555                 trial_result_list cannot be an explicit argument
556                 of the current function.
557                 This is also why we have to define this closure
558                 at each invocation of the ancestor function anew.
559
560                 The dimensional spread parameter is the (dimensional) mrr
561                 raised to the power of x_spread scaled to interval (0, 1).
562                 The dimensional mrr parameter distribution has shape of
563                 1/(1+x^2), but x==1 corresponds to max_rate
564                 and 1.0 pps is added to avoid numerical problems in fitting
565                 functions.
566
567                 TODO: x^-2 (for x>1.0) might be simpler/nicer prior.
568
569                 :param trace: Multiprocessing-safe logging function (closure).
570                 :param x_mrr: The first dimensionless param
571                     from (-1, 1) interval.
572                 :param x_spread: The second dimensionless param
573                     from (-1, 1) interval.
574                 :type trace: function (str, object) -> None
575                 :type x_mrr: float
576                 :type x_spread: float
577                 :returns: Log of critical rate [pps] and log of likelihood.
578                 :rtype: 2-tuple of float
579                 """
580                 mrr = max_rate * (1.0 / (x_mrr + 1.0) - 0.5) + 1.0
581                 spread = math.exp((x_spread + 1.0) / 2.0 * math.log(mrr))
582                 logweight = self.log_weight(
583                     trace, fitting_function, trial_result_list, mrr, spread)
584                 value = math.log(self.find_critical_rate(
585                     trace, fitting_function, min_rate, max_rate,
586                     self.packet_loss_ratio_target, mrr, spread))
587                 return value, logweight
588             dilled_function = dill.dumps(value_logweight_func)
589             boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
590             boss_pipe_end.send(
591                 (dimension, dilled_function, focus_tracker, max_samples))
592             worker = multiprocessing.Process(
593                 target=Integrator.try_estimate_nd, args=(
594                     worker_pipe_end, 10.0, self.trace_enabled))
595             worker.daemon = True
596             worker.start()
597             return boss_pipe_end
598         erf_pipe = start_computing(
599             self.lfit_erf, erf_focus_tracker)
600         stretch_pipe = start_computing(
601             self.lfit_stretch, stretch_focus_tracker)
602         # Measurement phase.
603         measurement = self.measurer.measure(trial_duration, transmit_rate)
604         # Processing phase.
605         def stop_computing(name, pipe):
606             """Just a block of code to be used for each worker.
607
608             Send stop object, poll for result, then either
609             unpack response, log messages and return, or raise traceback.
610
611             TODO: Define class/structure for the return value?
612
613             :param name: Human friendly worker identifier for logging purposes.
614             :param pipe: Boss end of connection towards worker to stop.
615             :type name: str
616             :type pipe: multiprocessing.Connection
617             :returns: Computed value tracker, actual focus tracker,
618                 and number of samples used for this iteration.
619             :rtype: 3-tuple of tracker, tracker and int
620             """
621             pipe.send(None)
622             if not pipe.poll(10.0):
623                 raise RuntimeError(
624                     "Worker {name} did not finish!".format(name=name))
625             result_or_traceback = pipe.recv()
626             try:
627                 value_tracker, focus_tracker, debug_list, trace_list, sampls = (
628                     result_or_traceback)
629             except ValueError:
630                 raise RuntimeError(
631                     "Worker {name} failed with the following traceback:\n{tr}"
632                     .format(name=name, tr=result_or_traceback))
633             logging.info("Logs from worker %(name)r:", {"name": name})
634             for message in debug_list:
635                 logging.info(message)
636             for message in trace_list:
637                 logging.debug(message)
638             logging.debug("trackers: value %(val)r focus %(foc)r", {
639                 "val": value_tracker, "foc": focus_tracker})
640             return value_tracker, focus_tracker, sampls
641         stretch_value_tracker, stretch_focus_tracker, stretch_samples = (
642             stop_computing("stretch", stretch_pipe))
643         erf_value_tracker, erf_focus_tracker, erf_samples = (
644             stop_computing("erf", erf_pipe))
645         stretch_avg = stretch_value_tracker.average
646         erf_avg = erf_value_tracker.average
647         # TODO: Take into account secondary stats.
648         stretch_stdev = math.exp(stretch_value_tracker.log_variance / 2)
649         erf_stdev = math.exp(erf_value_tracker.log_variance / 2)
650         avg = math.exp((stretch_avg + erf_avg) / 2.0)
651         var = (stretch_stdev * stretch_stdev + erf_stdev * erf_stdev) / 2.0
652         var += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
653         stdev = avg * math.sqrt(var)
654         focus_trackers = (stretch_focus_tracker, erf_focus_tracker)
655         logging.info(
656             "measure_and_compute finished with trial result %(res)r "
657             "avg %(avg)r stdev %(stdev)r stretch %(a1)r erf %(a2)r "
658             "new trackers %(nt)r old trackers %(ot)r stretch samples %(ss)r "
659             "erf samples %(es)r",
660             {"res": measurement, "avg": avg, "stdev": stdev,
661              "a1": math.exp(stretch_avg), "a2": math.exp(erf_avg),
662              "nt": focus_trackers, "ot": old_trackers, "ss": stretch_samples,
663              "es": erf_samples})
664         return (
665             measurement, avg, stdev, math.exp(stretch_avg),
666             math.exp(erf_avg), focus_trackers)