Improve PLRsearch yet again
[csit.git] / resources / libraries / python / PLRsearch / PLRsearch.py
1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Module holding PLRsearch class."""
15
16 import logging
17 import math
18 import multiprocessing
19 import time
20 from collections import namedtuple
21
22 import dill
23 from scipy.special import erfcx, erfc
24
25 # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
26 # then switch to absolute imports within PLRsearch package.
27 # Current usage of relative imports is just a short term workaround.
28 from . import Integrator
29 from .log_plus import log_plus, log_minus
30 from . import stat_trackers
31
32
33 class PLRsearch(object):
34     """A class to encapsulate data relevant for the search method.
35
36     The context is performance testing of packet processing systems.
37     The system, when being offered a steady stream of packets,
38     can process some of them successfully, other are considered "lost".
39
40     See docstring of the search method for algorithm description.
41
42     Two constants are stored as class fields for speed.
43
44     Method othed than search (and than __init__)
45     are just internal code structure.
46
47     TODO: Those method names should start with underscore then.
48     """
49
50     xerfcx_limit = math.pow(math.acos(0), -0.5)
51     log_xerfcx_10 = math.log(xerfcx_limit - math.exp(10) * erfcx(math.exp(10)))
52
53     def __init__(
54             self, measurer, trial_duration_per_trial, packet_loss_ratio_target,
55             trial_number_offset=0, timeout=1800.0, trace_enabled=False):
56         """Store rate measurer and additional parameters.
57
58         TODO: Copy AbstractMeasurer from MLRsearch.
59
60         :param measurer: The measurer to call when searching.
61         :param trial_duration_per_trial: Each trial has larger duration
62             than the previous trial. This is the increment, in seconds.
63         :param packet_loss_ratio_target: The algorithm tries to estimate
64             the offered load leading to this ratio on average.
65             Trial ratio is number of packets lost divided by packets offered.
66         :param trial_number_offset: The "first" trial number will be 1+this.
67             Use this to ensure first iterations have enough time to compute
68             reasonable estimates for later trials to use.
69         :param timeout: The search ends if it lasts more than this many seconds.
70         :type measurer: MLRsearch.AbstractMeasurer
71         :type trial_duration_per_trial: float
72         :type packet_loss_ratio_target: float
73         :type trial_number_offset: int
74         :type timeout: float
75         """
76         self.measurer = measurer
77         self.trial_duration_per_trial = float(trial_duration_per_trial)
78         self.packet_loss_ratio_target = float(packet_loss_ratio_target)
79         self.trial_number_offset = int(trial_number_offset)
80         self.timeout = float(timeout)
81         self.trace_enabled = bool(trace_enabled)
82
83     def search(self, min_rate, max_rate):
84         """Perform the search, return average and stdev for throughput estimate.
85
86         Considering measurer and packet_loss_ratio_target (see __init__),
87         find such an offered load (called critical load) that is expected
88         to hit the target loss ratio in the limit of very long trial duration.
89         As the system is probabilistic (and test duration is finite),
90         the critical ratio is only estimated.
91         Return the average and standard deviation of the estimate.
92
93         In principle, this algorithm performs trial measurements,
94         each with varied offered load (which is constant during the trial).
95         During each measurement, Bayesian inference is performed
96         on all the measurement results so far.
97         When timeout is up, the last estimate is returned,
98         else another trial is performed.
99
100         It is assumed that the system under test, even though not deterministic,
101         still follows the rule of large numbers. In another words,
102         any growing set of measurements at a particular offered load
103         will converge towards unique (for the given load) packet loss ratio.
104         This means there is a deterministic (but unknown) function
105         mapping the offered load to average loss ratio.
106         This function is called loss ratio function.
107         This also assumes the average loss ratio
108         does not depend on trial duration.
109
110         The actual probability distribution of loss counts, achieving
111         the average ratio on trials of various duration
112         can be complicated (and can depend on offered load), but simply assuming
113         Poisson distribution will make the algorithm converge.
114         Binomial distribution would be more precise,
115         but Poisson is more practical, as it effectively gives
116         less information content to high ratio results.
117
118         Even when applying other assumptions on the loss ratio function
119         (increasing function, limit zero ratio when load goes to zero,
120         global upper limit on rate of packets processed), there are still
121         too many different shapes of possible loss functions,
122         which makes full Bayesian reasoning intractable.
123
124         This implementation radically simplifies things by examining
125         only two shapes, each with finitely many (in this case just two)
126         parameters. In other words, two fitting functions
127         (each with two parameters and one argument).
128         When restricting model space to one of the two fitting functions,
129         the Bayesian inference becomes tractable (even though it needs
130         numerical integration from Integrator class).
131
132         The first measurement is done at the middle between
133         min_rate and max_rate, to help with convergence
134         if max_rate measurements give loss below target.
135         TODO: Fix overflow error and use min_rate instead of the middle.
136
137         The second measurement is done at max_rate, next few measurements
138         have offered load of previous load minus excess loss rate.
139         This simple rule is found to be good when offered loads
140         so far are way above the critical rate. After few measurements,
141         inference from fitting functions converges faster that this initial
142         "optimistic" procedure.
143
144         Offered loads close to (limiting) critical rate are the most useful,
145         as linear approximation of the fitting function
146         becomes good enough there (thus reducing the impact
147         of the overall shape of fitting function).
148         After several trials, usually one of the fitting functions
149         has better predictions than the other one, but the algorithm
150         does not track that. Simply, it uses the estimate average,
151         alternating between the functions.
152         Multiple workarounds are applied to try and avoid measurements
153         both in zero loss region and in big loss region,
154         as their results tend to make the critical load estimate worse.
155
156         The returned average and stdev is a combination of the two fitting
157         estimates.
158
159         :param min_rate: Avoid measuring at offered loads below this,
160             in packets per second.
161         :param max_rate: Avoid measuring at offered loads above this,
162             in packets per second.
163         :type min_rate: float
164         :type max_rate: float
165         :returns: Average and stdev of critical load estimate.
166         :rtype: 2-tuple of float
167         """
168         stop_time = time.time() + self.timeout
169         min_rate = float(min_rate)
170         max_rate = float(max_rate)
171         logging.info("Started search with min_rate %(min)r, max_rate %(max)r",
172                      {"min": min_rate, "max": max_rate})
173         trial_result_list = list()
174         trial_number = self.trial_number_offset
175         focus_trackers = (None, None)
176         transmit_rate = (min_rate + max_rate) / 2.0
177         lossy_loads = [max_rate]
178         zeros = 0  # How many cosecutive zero loss results are happening.
179         while 1:
180             trial_number += 1
181             logging.info("Trial %(number)r", {"number": trial_number})
182             results = self.measure_and_compute(
183                 self.trial_duration_per_trial * trial_number, transmit_rate,
184                 trial_result_list, min_rate, max_rate, focus_trackers)
185             measurement, average, stdev, avg1, avg2, focus_trackers = results
186             zeros += 1
187             # TODO: Ratio of fill rate to drain rate seems to have
188             # exponential impact. Make it configurable, or is 4:3 good enough?
189             if measurement.loss_fraction >= self.packet_loss_ratio_target:
190                 for _ in range(4 * zeros):
191                     lossy_loads.append(measurement.target_tr)
192             if measurement.loss_count > 0:
193                 zeros = 0
194             lossy_loads.sort()
195             if stop_time <= time.time():
196                 return average, stdev
197             trial_result_list.append(measurement)
198             if (trial_number - self.trial_number_offset) <= 1:
199                 next_load = max_rate
200             elif (trial_number - self.trial_number_offset) <= 3:
201                 next_load = (measurement.receive_rate / (
202                     1.0 - self.packet_loss_ratio_target))
203             else:
204                 next_load = (avg1 + avg2) / 2.0
205                 if zeros > 0:
206                     if lossy_loads[0] > next_load:
207                         diminisher = math.pow(2.0, 1 - zeros)
208                         next_load = lossy_loads[0] + diminisher * next_load
209                         next_load /= (1.0 + diminisher)
210                     # On zero measurement, we need to drain obsoleted low losses
211                     # even if we did not use them to increase next_load,
212                     # in order to get to usable loses at higher loads.
213                     if len(lossy_loads) > 3:
214                         lossy_loads = lossy_loads[3:]
215                 logging.debug("Zeros %(z)r orig %(o)r next %(n)r loads %(s)r",
216                               {"z": zeros, "o": (avg1 + avg2) / 2.0,
217                                "n": next_load, "s": lossy_loads})
218             transmit_rate = min(max_rate, max(min_rate, next_load))
219
220     @staticmethod
221     def lfit_stretch(trace, load, mrr, spread):
222         """Stretch-based fitting function.
223
224         Return the logarithm of average packet loss per second
225         when the load (argument) is offered to a system with given
226         mrr and spread (parameters).
227         Stretch function is 1/(1+Exp[-x]). The average itself is definite
228         integral from zero to load, of shifted and x-scaled stretch function.
229         As the integrator is sensitive to discontinuities,
230         and it calls this function at large areas of parameter space,
231         the implementation has to avoid rounding errors, overflows,
232         and correctly approximate underflows.
233
234         TODO: Explain how the high-level description
235         has been converted into an implementation full of ifs.
236
237         :param trace: A multiprocessing-friendly logging function (closure).
238         :param load: Offered load (positive), in packets per second.
239         :param mrr: Parameter of this fitting function, equal to limiting
240             (positive) average number of packets received (as opposed to lost)
241             when offered load is many spreads more than mrr.
242         :param spread: The x-scaling parameter (positive). No nice semantics,
243             roughly corresponds to size of "tail" for loads below mrr.
244         :type trace: function (str, object) -> NoneType
245         :type load: float
246         :type mrr: float
247         :type spread: float
248         :returns: Logarithm of average number of packets lost per second.
249         :rtype: float
250         """
251         # TODO: What is the fastest way to use such values?
252         log_2 = math.log(2)
253         log_3 = math.log(3)
254         log_spread = math.log(spread)
255         # TODO: chi is from https://en.wikipedia.org/wiki/Nondimensionalization
256         chi = (load - mrr) / spread
257         chi0 = -mrr / spread
258         trace("stretch: load", load)
259         trace("mrr", mrr)
260         trace("spread", spread)
261         trace("chi", chi)
262         trace("chi0", chi0)
263         if chi > 0:
264             log_lps = math.log(
265                 load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread)
266             trace("big loss direct log_lps", log_lps)
267         else:
268             two_positive = log_plus(chi, 2 * chi0 - log_2)
269             two_negative = log_plus(chi0, 2 * chi - log_2)
270             if two_positive <= two_negative:
271                 log_lps = log_minus(chi, chi0) + log_spread
272                 trace("small loss crude log_lps", log_lps)
273                 return log_lps
274             two = log_minus(two_positive, two_negative)
275             three_positive = log_plus(two_positive, 3 * chi - log_3)
276             three_negative = log_plus(two_negative, 3 * chi0 - log_3)
277             three = log_minus(three_positive, three_negative)
278             if two == three:
279                 log_lps = two + log_spread
280                 trace("small loss approx log_lps", log_lps)
281             else:
282                 log_lps = math.log(log_plus(0, chi) - log_plus(0, chi0))
283                 log_lps += log_spread
284                 trace("small loss direct log_lps", log_lps)
285         return log_lps
286
287     @staticmethod
288     def lfit_erf(trace, load, mrr, spread):
289         """Erf-based fitting function.
290
291         Return the logarithm of average packet loss per second
292         when the load (argument) is offered to a system with given
293         mrr and spread (parameters).
294         Erf function is Primitive function to normal distribution density.
295         The average itself is definite integral from zero to load,
296         of shifted and x-scaled erf function.
297         As the integrator is sensitive to discontinuities,
298         and it calls this function at large areas of parameter space,
299         the implementation has to avoid rounding errors, overflows,
300         and correctly approximate underflows.
301
302         TODO: Explain how the high-level description
303         has been converted into an implementation full of ifs.
304
305         :param trace: A multiprocessing-friendly logging function (closure).
306         :param load: Offered load (positive), in packets per second.
307         :param mrr: Parameter of this fitting function, equal to limiting
308             (positive) average number of packets received (as opposed to lost)
309             when offered load is many spreads more than mrr.
310         :param spread: The x-scaling parameter (positive). No nice semantics,
311             roughly corresponds to size of "tail" for loads below mrr.
312         :type trace: function (str, object) -> NoneType
313         :type load: float
314         :type mrr: float
315         :type spread: float
316         :returns: Logarithm of average number of packets lost per second.
317         :rtype: float
318         """
319         # Beware, this chi has the sign opposite to the stretch function chi.
320         # TODO: The stretch sign is just to have less minuses. Worth changing?
321         chi = (mrr - load) / spread
322         chi0 = mrr / spread
323         trace("Erf: load", load)
324         trace("mrr", mrr)
325         trace("spread", spread)
326         trace("chi", chi)
327         trace("chi0", chi0)
328         if chi >= -1.0:
329             trace("positive, b roughly bigger than m", None)
330             if chi > math.exp(10):
331                 first = PLRsearch.log_xerfcx_10 + 2 * (math.log(chi) - 10)
332                 trace("approximated first", first)
333             else:
334                 first = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi))
335                 trace("exact first", first)
336             first -= chi * chi
337             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
338             second -= chi0 * chi0
339             intermediate = log_minus(first, second)
340             trace("first", first)
341         else:
342             trace("negative, b roughly smaller than m", None)
343             exp_first = PLRsearch.xerfcx_limit + chi * erfcx(-chi)
344             exp_first *= math.exp(-chi * chi)
345             exp_first -= 2 * chi
346             # TODO: Why has the following line chi there (as opposed to chi0)?
347             # In general the functions would be more readable if they explicitly
348             #     return math.log(func(chi) - func(chi0))
349             # for some function "func", at least for some branches.
350             second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
351             second -= chi0 * chi0
352             intermediate = math.log(exp_first - math.exp(second))
353             trace("exp_first", exp_first)
354         trace("second", second)
355         trace("intermediate", intermediate)
356         result = intermediate + math.log(spread) - math.log(erfc(-chi0))
357         trace("result", result)
358         return result
359
360     @staticmethod
361     def find_critical_rate(
362             trace, lfit_func, min_rate, max_rate, loss_ratio_target,
363             mrr, spread):
364         """Given ratio target and parameters, return the achieving offered load.
365
366         This is basically an inverse function to lfit_func
367         when parameters are fixed.
368         Instead of implementing effective implementation
369         of the inverse function, this implementation uses
370         brute force binary search. It is bisecting (nim_rate, max_rate) interval
371         until the critical load is found (or interval becomes degenerate).
372         This implementation assures min and max rate limits are honored.
373
374         TODO: Use some method with faster convergence?
375
376         :param trace: A multiprocessing-friendly logging function (closure).
377         :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
378         :param min_rate: Lower bound for binary search [pps].
379         :param max_rate: Upper bound for binary search [pps].
380         :param loss_ratio_target: Fitting function should return loss rate
381             giving this ratio at the returned load and parameters [1].
382         :param mrr: The mrr parameter for the fitting function [pps].
383         :param spread: The spread parameter for the fittinmg function [pps].
384         :type trace: function (str, object) -> None
385         :type lfit_func: Function from 3 floats to float.
386         :type min_rate: float
387         :type max_rate: float
388         :type log_lps_target: float
389         :type mrr: float
390         :type spread: float
391         :returns: Load [pps] which achieves the target with given parameters.
392         :rtype: float
393         """
394         trace("Finding critical rate for loss_ratio_target", loss_ratio_target)
395         rate_lo = min_rate
396         rate_hi = max_rate
397         loss_ratio = -1
398         while loss_ratio != loss_ratio_target:
399             rate = (rate_hi + rate_lo) / 2.0
400             if rate == rate_hi or rate == rate_lo:
401                 break
402             loss_rate = math.exp(lfit_func(trace, rate, mrr, spread))
403             loss_ratio = loss_rate / rate
404             if loss_ratio > loss_ratio_target:
405                 trace("halving down", rate)
406                 rate_hi = rate
407             elif loss_ratio < loss_ratio_target:
408                 trace("halving up", rate)
409                 rate_lo = rate
410         trace("found", rate)
411         return rate
412
413     @staticmethod
414     def log_weight(trace, lfit_func, trial_result_list, mrr, spread):
415         """Return log of weight of trial results by the function and parameters.
416
417         Integrator assumes uniform distribution, but over different parameters.
418         Weight and likelihood are used interchangeably here anyway.
419
420         Each trial has an offered load, a duration and a loss count.
421         Fitting function is used to compute the average loss per second.
422         Poisson distribution (with average loss per trial) is used
423         to get likelihood of one trial result, the overal likelihood
424         is a product of all trial likelihoods.
425         As likelihoods can be extremely small, logarithms are tracked instead.
426
427         TODO: Copy ReceiveRateMeasurement from MLRsearch.
428
429         :param trace: A multiprocessing-friendly logging function (closure).
430         :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
431         :param result_list: List of trial measurement results.
432         :param mrr: The mrr parameter for the fitting function.
433         :param spread: The spread parameter for the fittinmg function.
434         :type trace: function (str, object) -> None
435         :type lfit_func: Function from 3 floats to float.
436         :type result_list: list of MLRsearch.ReceiveRateMeasurement
437         :type mrr: float
438         :type spread: float
439         :returns: Logarithm of result weight for given function and parameters.
440         :rtype: float
441         """
442         log_likelihood = 0.0
443         trace("log_weight for mrr", mrr)
444         trace("spread", spread)
445         for result in trial_result_list:
446             trace("for tr", result.target_tr)
447             trace("lc", result.loss_count)
448             trace("d", result.duration)
449             log_avg_loss_per_second = lfit_func(
450                 trace, result.target_tr, mrr, spread)
451             log_avg_loss_per_trial = (
452                 log_avg_loss_per_second + math.log(result.duration))
453             # Poisson probability computation works nice for logarithms.
454             log_trial_likelihood = (
455                 result.loss_count * log_avg_loss_per_trial
456                 - math.exp(log_avg_loss_per_trial))
457             log_trial_likelihood -= math.lgamma(1 + result.loss_count)
458             log_likelihood += log_trial_likelihood
459             trace("avg_loss_per_trial", math.exp(log_avg_loss_per_trial))
460             trace("log_trial_likelihood", log_trial_likelihood)
461         return log_likelihood
462
463     def measure_and_compute(
464             self, trial_duration, transmit_rate, trial_result_list,
465             min_rate, max_rate, focus_trackers=(None, None), max_samples=None):
466         """Perform both measurement and computation at once.
467
468         High level steps: Prepare and launch computation worker processes,
469         perform the measurement, stop computation and combine results.
470
471         Integrator needs a specific function to process (-1, 1) parameters.
472         As our fitting functions use dimensional parameters,
473         so a transformation is performed, resulting in a specific prior
474         distribution over the dimensional parameters.
475         Maximal rate (line rate) is needed for that transformation.
476
477         Two fitting functions are used, computation is started
478         on temporary worker process per fitting function. After the measurement,
479         average and stdev of the critical rate (not log) of each worker
480         are combined and returned. Raw averages are also returned,
481         offered load for next iteration is chosen based on them.
482         The idea is that one fitting function might be fitting much better,
483         measurements at its avg are best for relevant results (for both),
484         but we do not know which fitting function it is.
485
486         Focus trackers are updated in-place. If a focus tracker in None,
487         new instance is created.
488
489         TODO: Define class for result object, so that fields are documented.
490         TODO: Re-use processes, instead creating on each computation?
491         TODO: As only one result is needed fresh, figure out a way
492         how to keep the other worker running. This will alow shorter
493         duration per trial. Special handling at first and last measurement
494         will be needed (to properly initialize and to properly combine results).
495
496         :param trial_duration: Length of the measurement in seconds.
497         :param transmit_rate: Offered load in packets per second.
498         :param trial_result_list: Results of previous measurements.
499         :param min_rate: Practical minimum of possible ofered load.
500         :param max_rate: Practical maximum of possible ofered load.
501         :param focus_trackers: Pair of trackers initialized
502             to speed up the numeric computation.
503         :param max_samples: Limit for integrator samples, for debugging.
504         :type trial_duration: float
505         :type transmit_rate: float
506         :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
507         :type min_rate: float
508         :type max_rate: float
509         :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker
510         :type max_samples: None or int
511         :returns: Measurement and computation results.
512         :rtype: _ComputeResult
513         """
514         logging.debug(
515             "measure_and_compute started with self %(self)r, trial_duration "
516             "%(dur)r, transmit_rate %(tr)r, trial_result_list %(trl)r, "
517             "max_rate %(mr)r, focus_trackers %(track)r, max_samples %(ms)r",
518             {"self": self, "dur": trial_duration, "tr": transmit_rate,
519              "trl": trial_result_list, "mr": max_rate, "track": focus_trackers,
520              "ms": max_samples})
521         # Preparation phase.
522         dimension = 2
523         stretch_focus_tracker, erf_focus_tracker = focus_trackers
524         if stretch_focus_tracker is None:
525             stretch_focus_tracker = stat_trackers.VectorStatTracker(dimension)
526             stretch_focus_tracker.unit_reset()
527         if erf_focus_tracker is None:
528             erf_focus_tracker = stat_trackers.VectorStatTracker(dimension)
529             erf_focus_tracker.unit_reset()
530         old_trackers = stretch_focus_tracker.copy(), erf_focus_tracker.copy()
531
532         def start_computing(fitting_function, focus_tracker):
533             """Just a block of code to be used for each fitting function.
534
535             Define function for integrator, create process and pipe ends,
536             start computation, return the boss pipe end.
537
538             :param fitting_function: lfit_erf or lfit_stretch.
539             :param bias_avg: Tuple of floats to start searching around.
540             :param bias_cov: Covariance matrix defining initial focus shape.
541             :type fitting_function: Function from 3 floats to float.
542             :type bias_avg: 2-tuple of floats
543             :type bias_cov: 2-tuple of 2-tuples of floats
544             :returns: Boss end of communication pipe.
545             :rtype: multiprocessing.Connection
546             """
547
548             def value_logweight_func(trace, x_mrr, x_spread):
549                 """Return log of critical rate and log of likelihood.
550
551                 This is a closure. The ancestor function got
552                 trial_result_list as a parameter, and we are accessing it.
553                 As integrator has strict conditions on function signature,
554                 trial_result_list cannot be an explicit argument
555                 of the current function.
556                 This is also why we have to define this closure
557                 at each invocation of the ancestor function anew.
558
559                 The dimensional spread parameter is the (dimensional) mrr
560                 raised to the power of x_spread scaled to interval (0, 1).
561                 The dimensional mrr parameter distribution has shape of
562                 1/(1+x^2), but x==1 corresponds to max_rate
563                 and 1.0 pps is added to avoid numerical problems in fitting
564                 functions.
565
566                 TODO: x^-2 (for x>1.0) might be simpler/nicer prior.
567
568                 :param trace: Multiprocessing-safe logging function (closure).
569                 :param x_mrr: The first dimensionless param
570                     from (-1, 1) interval.
571                 :param x_spread: The second dimensionless param
572                     from (-1, 1) interval.
573                 :type trace: function (str, object) -> None
574                 :type x_mrr: float
575                 :type x_spread: float
576                 :returns: Log of critical rate [pps] and log of likelihood.
577                 :rtype: 2-tuple of float
578                 """
579                 mrr = max_rate * (1.0 / (x_mrr + 1.0) - 0.5) + 1.0
580                 spread = math.exp((x_spread + 1.0) / 2.0 * math.log(mrr))
581                 logweight = self.log_weight(
582                     trace, fitting_function, trial_result_list, mrr, spread)
583                 value = math.log(self.find_critical_rate(
584                     trace, fitting_function, min_rate, max_rate,
585                     self.packet_loss_ratio_target, mrr, spread))
586                 return value, logweight
587
588             dilled_function = dill.dumps(value_logweight_func)
589             boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
590             boss_pipe_end.send(
591                 (dimension, dilled_function, focus_tracker, max_samples))
592             worker = multiprocessing.Process(
593                 target=Integrator.try_estimate_nd, args=(
594                     worker_pipe_end, 10.0, self.trace_enabled))
595             worker.daemon = True
596             worker.start()
597             return boss_pipe_end
598
599         erf_pipe = start_computing(
600             self.lfit_erf, erf_focus_tracker)
601         stretch_pipe = start_computing(
602             self.lfit_stretch, stretch_focus_tracker)
603
604         # Measurement phase.
605         measurement = self.measurer.measure(trial_duration, transmit_rate)
606
607         # Processing phase.
608         def stop_computing(name, pipe):
609             """Just a block of code to be used for each worker.
610
611             Send stop object, poll for result, then either
612             unpack response, log messages and return, or raise traceback.
613
614             TODO: Define class/structure for the return value?
615
616             :param name: Human friendly worker identifier for logging purposes.
617             :param pipe: Boss end of connection towards worker to stop.
618             :type name: str
619             :type pipe: multiprocessing.Connection
620             :returns: Computed value tracker, actual focus tracker,
621                 and number of samples used for this iteration.
622             :rtype: _PartialResult
623             """
624             pipe.send(None)
625             if not pipe.poll(10.0):
626                 raise RuntimeError(
627                     "Worker {name} did not finish!".format(name=name))
628             result_or_traceback = pipe.recv()
629             try:
630                 value_tracker, focus_tracker, debug_list, trace_list, sampls = (
631                     result_or_traceback)
632             except ValueError:
633                 raise RuntimeError(
634                     "Worker {name} failed with the following traceback:\n{tr}"
635                     .format(name=name, tr=result_or_traceback))
636             logging.info("Logs from worker %(name)r:", {"name": name})
637             for message in debug_list:
638                 logging.info(message)
639             for message in trace_list:
640                 logging.debug(message)
641             logging.debug("trackers: value %(val)r focus %(foc)r", {
642                 "val": value_tracker, "foc": focus_tracker})
643             return _PartialResult(value_tracker, focus_tracker, sampls)
644
645         stretch_result = stop_computing("stretch", stretch_pipe)
646         erf_result = stop_computing("erf", erf_pipe)
647         result = PLRsearch._get_result(measurement, stretch_result, erf_result)
648         logging.info(
649             "measure_and_compute finished with trial result %(res)r "
650             "avg %(avg)r stdev %(stdev)r stretch %(a1)r erf %(a2)r "
651             "new trackers %(nt)r old trackers %(ot)r stretch samples %(ss)r "
652             "erf samples %(es)r",
653             {"res": result.measurement,
654              "avg": result.avg, "stdev": result.stdev,
655              "a1": result.stretch_exp_avg, "a2": result.erf_exp_avg,
656              "nt": result.trackers, "ot": old_trackers,
657              "ss": stretch_result.samples, "es": erf_result.samples})
658         return result
659
660     @staticmethod
661     def _get_result(measurement, stretch_result, erf_result):
662         """Process and collate results from measure_and_compute.
663
664         Turn logarithm based values to exponential ones,
665         combine averages and stdevs of two fitting functions into a whole.
666
667         :param measurement: The trial measurement obtained during computation.
668         :param stretch_result: Computation output for stretch fitting function.
669         :param erf_result: Computation output for erf fitting function.
670         :type measurement: ReceiveRateMeasurement
671         :type stretch_result: _PartialResult
672         :type erf_result: _PartialResult
673         :returns: Combined results.
674         :rtype: _ComputeResult
675         """
676         stretch_avg = stretch_result.value_tracker.average
677         erf_avg = erf_result.value_tracker.average
678         stretch_var = stretch_result.value_tracker.get_pessimistic_variance()
679         erf_var = erf_result.value_tracker.get_pessimistic_variance()
680         avg_log = (stretch_avg + erf_avg) / 2.0
681         var_log = (stretch_var + erf_var) / 2.0
682         var_log += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
683         stdev_log = math.sqrt(var_log)
684         low, upp = math.exp(avg_log - stdev_log), math.exp(avg_log + stdev_log)
685         avg = (low + upp) / 2
686         stdev = avg - low
687         trackers = (stretch_result.focus_tracker, erf_result.focus_tracker)
688         sea = math.exp(stretch_avg)
689         eea = math.exp(erf_avg)
690         return _ComputeResult(measurement, avg, stdev, sea, eea, trackers)
691
692
693 # Named tuples, for multiple local variables to be passed as return value.
694 _PartialResult = namedtuple(
695     "_PartialResult", "value_tracker focus_tracker samples")
696 """Two stat trackers and sample counter.
697
698 :param value_tracker: Tracker for the value (critical load) being integrated.
699 :param focus_tracker: Tracker for focusing integration inputs (sample points).
700 :param samples: How many samples were used for the computation.
701 :type value_tracker: stat_trackers.ScalarDualStatTracker
702 :type focus_tracker: stat_trackers.VectorStatTracker
703 :type samples: int
704 """
705
706 _ComputeResult = namedtuple(
707     "_ComputeResult",
708     "measurement avg stdev stretch_exp_avg erf_exp_avg trackers")
709 """Measurement, 4 computation result values, pair of trackers.
710
711 :param measurement: The trial measurement result obtained during computation.
712 :param avg: Overall average of critical rate estimate.
713 :param stdev: Overall standard deviation of critical rate estimate.
714 :param stretch_exp_avg: Stretch fitting function estimate average exponentiated.
715 :param erf_exp_avg: Erf fitting function estimate average, exponentiated.
716 :param trackers: Pair of focus trackers to start next iteration with.
717 :type measurement: ReceiveRateMeasurement
718 :type avg: float
719 :type stdev: float
720 :type stretch_exp_avg: float
721 :type erf_exp_avg: float
722 :type trackers: 2-tuple of stat_trackers.VectorStatTracker
723 """