1 # Copyright (c) 2024 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Module holding PLRsearch class."""
18 import multiprocessing
21 from collections import namedtuple
25 from scipy.special import erfcx, erfc
27 # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
28 # then switch to absolute imports within PLRsearch package.
29 # Current usage of relative imports is just a short term workaround.
30 from . import Integrator
31 from . import stat_trackers
32 from .log_plus import log_plus, log_minus
36 """A class to encapsulate data relevant for the search method.
38 The context is performance testing of packet processing systems.
39 The system, when being offered a steady stream of packets,
40 can process some of them successfully, other are considered "lost".
42 See docstring of the search method for algorithm description.
44 Two constants are stored as class fields for speed.
46 Method other than search (and than __init__)
47 are just internal code structure.
49 TODO: Those method names should start with underscore then.
52 xerfcx_limit = math.pow(math.acos(0), -0.5)
53 log_xerfcx_10 = math.log(xerfcx_limit - math.exp(10) * erfcx(math.exp(10)))
58 trial_duration_per_trial,
59 packet_loss_ratio_target,
60 trial_number_offset=0,
64 """Store rate measurer and additional parameters.
66 The measurer must never report negative loss count.
68 TODO: Copy AbstractMeasurer from MLRsearch.
70 :param measurer: The measurer to call when searching.
71 :param trial_duration_per_trial: Each trial has larger duration
72 than the previous trial. This is the increment, in seconds.
73 :param packet_loss_ratio_target: The algorithm tries to estimate
74 the offered load leading to this ratio on average.
75 Trial ratio is number of packets lost divided by packets offered.
76 :param trial_number_offset: The "first" trial number will be 1+this.
77 Use this to ensure first iterations have enough time to compute
78 reasonable estimates for later trials to use.
79 :param timeout: The search ends if it lasts more than this many seconds.
80 :type measurer: MLRsearch.AbstractMeasurer
81 :type trial_duration_per_trial: float
82 :type packet_loss_ratio_target: float
83 :type trial_number_offset: int
86 self.measurer = measurer
87 self.trial_duration_per_trial = float(trial_duration_per_trial)
88 self.packet_loss_ratio_target = float(packet_loss_ratio_target)
89 self.trial_number_offset = int(trial_number_offset)
90 self.timeout = float(timeout)
91 self.trace_enabled = bool(trace_enabled)
93 def search(self, min_rate, max_rate):
94 """Perform the search, return average and stdev for throughput estimate.
96 Considering measurer and packet_loss_ratio_target (see __init__),
97 find such an offered load (called critical load) that is expected
98 to hit the target loss ratio in the limit of very long trial duration.
99 As the system is probabilistic (and test duration is finite),
100 the critical ratio is only estimated.
101 Return the average and standard deviation of the estimate.
103 In principle, this algorithm performs trial measurements,
104 each with varied offered load (which is constant during the trial).
105 During each measurement, Bayesian inference is performed
106 on all the measurement results so far.
107 When timeout is up, the last estimate is returned,
108 else another trial is performed.
110 It is assumed that the system under test, even though not deterministic,
111 still follows the rule of large numbers. In another words,
112 any growing set of measurements at a particular offered load
113 will converge towards unique (for the given load) packet loss ratio.
114 This means there is a deterministic (but unknown) function
115 mapping the offered load to average loss ratio.
116 This function is called loss ratio function.
117 This also assumes the average loss ratio
118 does not depend on trial duration.
120 The actual probability distribution of loss counts, achieving
121 the average ratio on trials of various duration
122 can be complicated (and can depend on offered load), but simply assuming
123 Poisson distribution will make the algorithm converge.
124 Binomial distribution would be more precise,
125 but Poisson is more practical, as it effectively gives
126 less information content to high ratio results.
128 Even when applying other assumptions on the loss ratio function
129 (increasing function, limit zero ratio when load goes to zero,
130 global upper limit on rate of packets processed), there are still
131 too many different shapes of possible loss functions,
132 which makes full Bayesian reasoning intractable.
134 This implementation radically simplifies things by examining
135 only two shapes, each with finitely many (in this case just two)
136 parameters. In other words, two fitting functions
137 (each with two parameters and one argument).
138 When restricting model space to one of the two fitting functions,
139 the Bayesian inference becomes tractable (even though it needs
140 numerical integration from Integrator class).
142 The first measurement is done at the middle between
143 min_rate and max_rate, to help with convergence
144 if max_rate measurements give loss below target.
145 TODO: Fix overflow error and use min_rate instead of the middle.
147 The second measurement is done at max_rate, next few measurements
148 have offered load of previous load minus excess loss rate.
149 This simple rule is found to be good when offered loads
150 so far are way above the critical rate. After few measurements,
151 inference from fitting functions converges faster that this initial
152 "optimistic" procedure.
154 Offered loads close to (limiting) critical rate are the most useful,
155 as linear approximation of the fitting function
156 becomes good enough there (thus reducing the impact
157 of the overall shape of fitting function).
158 After several trials, usually one of the fitting functions
159 has better predictions than the other one, but the algorithm
160 does not track that. Simply, it uses the estimate average,
161 alternating between the functions.
162 Multiple workarounds are applied to try and avoid measurements
163 both in zero loss region and in big loss region,
164 as their results tend to make the critical load estimate worse.
166 The returned average and stdev is a combination of the two fitting
169 :param min_rate: Avoid measuring at offered loads below this,
170 in packets per second.
171 :param max_rate: Avoid measuring at offered loads above this,
172 in packets per second.
173 :type min_rate: float
174 :type max_rate: float
175 :returns: Average and stdev of critical load estimate.
176 :rtype: 2-tuple of float
178 stop_time = time.time() + self.timeout
179 min_rate = float(min_rate)
180 max_rate = float(max_rate)
182 f"Started search with min_rate {min_rate!r}, "
183 f"max_rate {max_rate!r}"
185 trial_result_list = []
186 trial_number = self.trial_number_offset
187 focus_trackers = (None, None)
188 transmit_rate = (min_rate + max_rate) / 2.0
189 lossy_loads = [max_rate]
190 zeros = 0 # How many consecutive zero loss results are happening.
193 logging.info(f"Trial {trial_number!r}")
194 results = self.measure_and_compute(
195 self.trial_duration_per_trial * trial_number,
202 measurement, average, stdev, avg1, avg2, focus_trackers = results
203 # Workaround for unsent packets and other anomalies.
204 measurement.plr_loss_count = min(
205 measurement.intended_count,
206 int(measurement.intended_count * measurement.loss_ratio + 0.9),
209 f"loss ratio {measurement.plr_loss_count}"
210 f" / {measurement.intended_count}"
213 # TODO: Ratio of fill rate to drain rate seems to have
214 # exponential impact. Make it configurable, or is 4:3 good enough?
215 if measurement.plr_loss_count >= (
216 measurement.intended_count * self.packet_loss_ratio_target
218 for _ in range(4 * zeros):
219 lossy_loads.append(measurement.intended_load)
222 logging.debug("High enough loss, lossy loads added.")
225 f"Not a high loss, zero counter bumped to {zeros}."
227 if stop_time <= time.time():
228 return average, stdev
229 trial_result_list.append(measurement)
230 if (trial_number - self.trial_number_offset) <= 1:
232 elif (trial_number - self.trial_number_offset) <= 3:
233 next_load = measurement.relative_forwarding_rate / (
234 1.0 - self.packet_loss_ratio_target
237 next_load = (avg1 + avg2) / 2.0
239 if lossy_loads[0] > next_load:
240 diminisher = math.pow(2.0, 1 - zeros)
241 next_load = lossy_loads[0] + diminisher * next_load
242 next_load /= 1.0 + diminisher
243 # On zero measurement, we need to drain obsoleted low losses
244 # even if we did not use them to increase next_load,
245 # in order to get to usable loses at higher loads.
246 if len(lossy_loads) > 3:
247 lossy_loads = lossy_loads[3:]
249 f"Zeros {zeros!r} orig {(avg1 + avg2) / 2.0!r} "
250 f"next {next_load!r} loads {lossy_loads!r}"
252 transmit_rate = min(max_rate, max(min_rate, next_load))
255 def lfit_stretch(trace, load, mrr, spread):
256 """Stretch-based fitting function.
258 Return the logarithm of average packet loss per second
259 when the load (argument) is offered to a system with given
260 mrr and spread (parameters).
261 Stretch function is 1/(1+Exp[-x]). The average itself is definite
262 integral from zero to load, of shifted and x-scaled stretch function.
263 As the integrator is sensitive to discontinuities,
264 and it calls this function at large areas of parameter space,
265 the implementation has to avoid rounding errors, overflows,
266 and correctly approximate underflows.
268 TODO: Explain how the high-level description
269 has been converted into an implementation full of ifs.
271 :param trace: A multiprocessing-friendly logging function (closure).
272 :param load: Offered load (positive), in packets per second.
273 :param mrr: Parameter of this fitting function, equal to limiting
274 (positive) average number of packets received (as opposed to lost)
275 when offered load is many spreads more than mrr.
276 :param spread: The x-scaling parameter (positive). No nice semantics,
277 roughly corresponds to size of "tail" for loads below mrr.
278 :type trace: function (str, object) -> NoneType
282 :returns: Logarithm of average number of packets lost per second.
285 # TODO: What is the fastest way to use such values?
288 log_spread = math.log(spread)
289 # TODO: chi is from https://en.wikipedia.org/wiki/Nondimensionalization
290 chi = (load - mrr) / spread
292 trace("stretch: load", load)
294 trace("spread", spread)
299 load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread
301 trace("big loss direct log_lps", log_lps)
303 two_positive = log_plus(chi, 2 * chi0 - log_2)
304 two_negative = log_plus(chi0, 2 * chi - log_2)
305 if two_positive <= two_negative:
306 log_lps = log_minus(chi, chi0) + log_spread
307 trace("small loss crude log_lps", log_lps)
309 two = log_minus(two_positive, two_negative)
310 three_positive = log_plus(two_positive, 3 * chi - log_3)
311 three_negative = log_plus(two_negative, 3 * chi0 - log_3)
312 three = log_minus(three_positive, three_negative)
314 log_lps = two + log_spread
315 trace("small loss approx log_lps", log_lps)
317 log_lps = math.log(log_plus(0, chi) - log_plus(0, chi0))
318 log_lps += log_spread
319 trace("small loss direct log_lps", log_lps)
323 def lfit_erf(trace, load, mrr, spread):
324 """Erf-based fitting function.
326 Return the logarithm of average packet loss per second
327 when the load (argument) is offered to a system with given
328 mrr and spread (parameters).
329 Erf function is Primitive function to normal distribution density.
330 The average itself is definite integral from zero to load,
331 of shifted and x-scaled erf function.
332 As the integrator is sensitive to discontinuities,
333 and it calls this function at large areas of parameter space,
334 the implementation has to avoid rounding errors, overflows,
335 and correctly approximate underflows.
337 TODO: Explain how the high-level description
338 has been converted into an implementation full of ifs.
340 :param trace: A multiprocessing-friendly logging function (closure).
341 :param load: Offered load (positive), in packets per second.
342 :param mrr: Parameter of this fitting function, equal to limiting
343 (positive) average number of packets received (as opposed to lost)
344 when offered load is many spreads more than mrr.
345 :param spread: The x-scaling parameter (positive). No nice semantics,
346 roughly corresponds to size of "tail" for loads below mrr.
347 :type trace: function (str, object) -> NoneType
351 :returns: Logarithm of average number of packets lost per second.
354 # Beware, this chi has the sign opposite to the stretch function chi.
355 # TODO: The stretch sign is just to have less minuses. Worth changing?
356 chi = (mrr - load) / spread
358 trace("Erf: load", load)
360 trace("spread", spread)
364 trace("positive, b roughly bigger than m", None)
365 if chi > math.exp(10):
366 first = PLRsearch.log_xerfcx_10 + 2 * (math.log(chi) - 10)
367 trace("approximated first", first)
369 first = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi))
370 trace("exact first", first)
372 second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
373 second -= chi0 * chi0
374 intermediate = log_minus(first, second)
375 trace("first", first)
377 trace("negative, b roughly smaller than m", None)
378 exp_first = PLRsearch.xerfcx_limit + chi * erfcx(-chi)
379 exp_first *= math.exp(-chi * chi)
381 # TODO: Why has the following line chi there (as opposed to chi0)?
382 # In general the functions would be more readable if they explicitly
383 # return math.log(func(chi) - func(chi0))
384 # for some function "func", at least for some branches.
385 second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
386 second -= chi0 * chi0
387 intermediate = math.log(exp_first - math.exp(second))
388 trace("exp_first", exp_first)
389 trace("second", second)
390 trace("intermediate", intermediate)
391 result = intermediate + math.log(spread) - math.log(erfc(-chi0))
392 trace("result", result)
396 def find_critical_rate(
397 trace, lfit_func, min_rate, max_rate, loss_ratio_target, mrr, spread
399 """Given ratio target and parameters, return the achieving offered load.
401 This is basically an inverse function to lfit_func
402 when parameters are fixed.
403 Instead of implementing effective implementation
404 of the inverse function, this implementation uses
405 brute force binary search. It is bisecting (nim_rate, max_rate) interval
406 until the critical load is found (or interval becomes degenerate).
407 This implementation assures min and max rate limits are honored.
409 TODO: Use some method with faster convergence?
411 :param trace: A multiprocessing-friendly logging function (closure).
412 :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
413 :param min_rate: Lower bound for binary search [pps].
414 :param max_rate: Upper bound for binary search [pps].
415 :param loss_ratio_target: Fitting function should return loss rate
416 giving this ratio at the returned load and parameters [1].
417 :param mrr: The mrr parameter for the fitting function [pps].
418 :param spread: The spread parameter for the fittinmg function [pps].
419 :type trace: function (str, object) -> None
420 :type lfit_func: Function from 3 floats to float.
421 :type min_rate: float
422 :type max_rate: float
423 :type loss_ratio_target: float
426 :returns: Load [pps] which achieves the target with given parameters.
429 trace("Finding critical rate for loss_ratio_target", loss_ratio_target)
433 while loss_ratio != loss_ratio_target:
434 rate = (rate_hi + rate_lo) / 2.0
435 if rate in (rate_hi, rate_lo):
437 loss_rate = math.exp(lfit_func(trace, rate, mrr, spread))
438 loss_ratio = loss_rate / rate
439 if loss_ratio > loss_ratio_target:
440 trace("halving down", rate)
442 elif loss_ratio < loss_ratio_target:
443 trace("halving up", rate)
449 def log_weight(trace, lfit_func, trial_result_list, mrr, spread):
450 """Return log of weight of trial results by the function and parameters.
452 Integrator assumes uniform distribution, but over different parameters.
453 Weight and likelihood are used interchangeably here anyway.
455 Each trial has an intended load, a sent count and a loss count
456 (probably counting unsent packets as loss, as they signal
457 the load is too high for the traffic generator).
458 The fitting function is used to compute the average loss rate.
459 Geometric distribution (with average loss per trial) is used
460 to get likelihood of one trial result, the overal likelihood
461 is a product of all trial likelihoods.
462 As likelihoods can be extremely small, logarithms are tracked instead.
464 The current implementation does not use direct loss rate
465 from the fitting function, as the input and output units may not match
466 (e.g. intended load in TCP transactions, loss in packets).
467 Instead, the expected average loss is scaled according to the number
468 of packets actually sent.
470 TODO: Copy MeasurementResult from MLRsearch.
472 :param trace: A multiprocessing-friendly logging function (closure).
473 :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
474 :param trial_result_list: List of trial measurement results.
475 :param mrr: The mrr parameter for the fitting function.
476 :param spread: The spread parameter for the fitting function.
477 :type trace: function (str, object) -> None
478 :type lfit_func: Function from 3 floats to float.
479 :type trial_result_list: list of MLRsearch.MeasurementResult
482 :returns: Logarithm of result weight for given function and parameters.
486 trace("log_weight for mrr", mrr)
487 trace("spread", spread)
488 for result in trial_result_list:
489 trace("for tr", result.intended_load)
490 trace("plc", result.plr_loss_count)
491 trace("d", result.intended_duration)
492 # _rel_ values use units of intended_load (transactions per second).
493 log_avg_rel_loss_per_second = lfit_func(
494 trace, result.intended_load, mrr, spread
496 # _abs_ values use units of loss count (maybe packets).
497 # There can be multiple packets per transaction.
498 log_avg_abs_loss_per_trial = log_avg_rel_loss_per_second + math.log(
499 result.offered_count / result.intended_load
501 # Geometric probability computation for logarithms.
502 log_trial_likelihood = log_plus(0.0, -log_avg_abs_loss_per_trial)
503 log_trial_likelihood *= -result.plr_loss_count
504 log_trial_likelihood -= log_plus(0.0, +log_avg_abs_loss_per_trial)
505 log_likelihood += log_trial_likelihood
506 trace("avg_loss_per_trial", math.exp(log_avg_abs_loss_per_trial))
507 trace("log_trial_likelihood", log_trial_likelihood)
508 return log_likelihood
510 def measure_and_compute(
517 focus_trackers=(None, None),
520 """Perform both measurement and computation at once.
522 High level steps: Prepare and launch computation worker processes,
523 perform the measurement, stop computation and combine results.
525 Integrator needs a specific function to process (-1, 1) parameters.
526 As our fitting functions use dimensional parameters,
527 so a transformation is performed, resulting in a specific prior
528 distribution over the dimensional parameters.
529 Maximal rate (line rate) is needed for that transformation.
531 Two fitting functions are used, computation is started
532 on temporary worker process per fitting function. After the measurement,
533 average and stdev of the critical rate (not log) of each worker
534 are combined and returned. Raw averages are also returned,
535 offered load for next iteration is chosen based on them.
536 The idea is that one fitting function might be fitting much better,
537 measurements at its avg are best for relevant results (for both),
538 but we do not know which fitting function it is.
540 Focus trackers are updated in-place. If a focus tracker in None,
541 new instance is created.
543 TODO: Define class for result object, so that fields are documented.
544 TODO: Re-use processes, instead creating on each computation?
545 TODO: As only one result is needed fresh, figure out a way
546 how to keep the other worker running. This will alow shorter
547 duration per trial. Special handling at first and last measurement
548 will be needed (to properly initialize and to properly combine results).
550 :param trial_duration: Length of the measurement in seconds.
551 :param transmit_rate: Offered load in packets per second.
552 :param trial_result_list: Results of previous measurements.
553 :param min_rate: Practical minimum of possible ofered load.
554 :param max_rate: Practical maximum of possible ofered load.
555 :param focus_trackers: Pair of trackers initialized
556 to speed up the numeric computation.
557 :param max_samples: Limit for integrator samples, for debugging.
558 :type trial_duration: float
559 :type transmit_rate: float
560 :type trial_result_list: list of MLRsearch.MeasurementResult
561 :type min_rate: float
562 :type max_rate: float
563 :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker
564 :type max_samples: None or int
565 :returns: Measurement and computation results.
566 :rtype: _ComputeResult
569 f"measure_and_compute started with self {self!r}, trial_duration "
570 f"{trial_duration!r}, transmit_rate {transmit_rate!r}, "
571 f"trial_result_list {trial_result_list!r}, max_rate {max_rate!r}, "
572 f"focus_trackers {focus_trackers!r}, max_samples {max_samples!r}"
576 stretch_focus_tracker, erf_focus_tracker = focus_trackers
577 if stretch_focus_tracker is None:
578 stretch_focus_tracker = stat_trackers.VectorStatTracker(dimension)
579 stretch_focus_tracker.unit_reset()
580 if erf_focus_tracker is None:
581 erf_focus_tracker = stat_trackers.VectorStatTracker(dimension)
582 erf_focus_tracker.unit_reset()
583 old_trackers = stretch_focus_tracker.copy(), erf_focus_tracker.copy()
585 def start_computing(fitting_function, focus_tracker):
586 """Just a block of code to be used for each fitting function.
588 Define function for integrator, create process and pipe ends,
589 start computation, return the boss pipe end.
591 :param fitting_function: lfit_erf or lfit_stretch.
592 :param focus_tracker: Tracker initialized to speed up the numeric
594 :type fitting_function: Function from 3 floats to float.
595 :type focus_tracker: None or stat_trackers.VectorStatTracker
596 :returns: Boss end of communication pipe.
597 :rtype: multiprocessing.Connection
600 boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
601 # Starting the worker first. Contrary to documentation
602 # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection
603 # sending of large object without active listener on the other side
604 # results in a deadlock, not in a ValueError.
605 # See https://stackoverflow.com/questions/15137292/large-objects-and-multiprocessing-pipes-and-send
606 worker = multiprocessing.Process(
607 target=Integrator.try_estimate_nd,
608 args=(worker_pipe_end, 5.0, self.trace_enabled),
613 # Only now it is safe to send the function to compute with.
614 def value_logweight_func(trace, x_mrr, x_spread):
615 """Return log of critical rate and log of likelihood.
617 This is a closure. The ancestor function got
618 trial_result_list as a parameter, and we are accessing it.
619 As integrator has strict conditions on function signature,
620 trial_result_list cannot be an explicit argument
621 of the current function.
622 This is also why we have to define this closure
623 at each invocation of the ancestor function anew.
625 The dimensional spread parameter is the (dimensional) mrr
626 raised to the power of x_spread scaled to interval (0, 1).
627 The dimensional mrr parameter distribution has shape of
628 1/(1+x^2), but x==1 corresponds to max_rate
629 and 1.0 pps is added to avoid numerical problems in fitting
632 TODO: x^-2 (for x>1.0) might be simpler/nicer prior.
634 :param trace: Multiprocessing-safe logging function (closure).
635 :param x_mrr: The first dimensionless param
636 from (-1, 1) interval.
637 :param x_spread: The second dimensionless param
638 from (-1, 1) interval.
639 :type trace: function (str, object) -> None
641 :type x_spread: float
642 :returns: Log of critical rate [pps] and log of likelihood.
643 :rtype: 2-tuple of float
645 mrr = max_rate * (1.0 / (x_mrr + 1.0) - 0.5) + 1.0
646 spread = math.exp((x_spread + 1.0) / 2.0 * math.log(mrr))
647 logweight = self.log_weight(
648 trace, fitting_function, trial_result_list, mrr, spread
651 self.find_critical_rate(
656 self.packet_loss_ratio_target,
661 return value, logweight
663 dilled_function = dill.dumps(value_logweight_func)
665 (dimension, dilled_function, focus_tracker, max_samples)
669 erf_pipe = start_computing(self.lfit_erf, erf_focus_tracker)
670 stretch_pipe = start_computing(self.lfit_stretch, stretch_focus_tracker)
673 measurement = self.measurer.measure(trial_duration, transmit_rate)
676 def stop_computing(name, pipe):
677 """Just a block of code to be used for each worker.
679 Send stop object, poll for result, then either
680 unpack response, log messages and return, or raise traceback.
682 TODO: Define class/structure for the return value?
684 :param name: Human friendly worker identifier for logging purposes.
685 :param pipe: Boss end of connection towards worker to stop.
687 :type pipe: multiprocessing.Connection
688 :returns: Computed value tracker, actual focus tracker,
689 and number of samples used for this iteration.
690 :rtype: _PartialResult
692 # If worker encountered an exception, we get it in the recv below,
693 # but send will report a broken pipe.
694 # EAFP says we should ignore the error (instead of polling first).
695 # https://devblogs.microsoft.com/python
696 # /idiomatic-python-eafp-versus-lbyl/
699 except BrokenPipeError:
701 if not pipe.poll(10.0):
702 raise RuntimeError(f"Worker {name} did not finish!")
703 result_or_traceback = pipe.recv()
711 ) = result_or_traceback
712 except ValueError as exc:
714 f"Worker {name} failed with the following traceback:\n"
715 f"{result_or_traceback}"
717 logging.info(f"Logs from worker {name!r}:")
718 for message in debug_list:
719 logging.info(message)
720 for message in trace_list:
721 logging.debug(message)
723 f"trackers: value {value_tracker!r} focus {focus_tracker!r}"
725 return _PartialResult(value_tracker, focus_tracker, sampls)
727 stretch_result = stop_computing("stretch", stretch_pipe)
728 erf_result = stop_computing("erf", erf_pipe)
729 result = PLRsearch._get_result(measurement, stretch_result, erf_result)
731 f"measure_and_compute finished with trial result "
732 f"{result.measurement!r} avg {result.avg!r} stdev {result.stdev!r} "
733 f"stretch {result.stretch_exp_avg!r} erf {result.erf_exp_avg!r} "
734 f"new trackers {result.trackers!r} old trackers {old_trackers!r} "
735 f"stretch samples {stretch_result.samples!r} erf samples "
736 f"{erf_result.samples!r}"
741 def _get_result(measurement, stretch_result, erf_result):
742 """Process and collate results from measure_and_compute.
744 Turn logarithm based values to exponential ones,
745 combine averages and stdevs of two fitting functions into a whole.
747 :param measurement: The trial measurement obtained during computation.
748 :param stretch_result: Computation output for stretch fitting function.
749 :param erf_result: Computation output for erf fitting function.
750 :type measurement: MeasurementResult
751 :type stretch_result: _PartialResult
752 :type erf_result: _PartialResult
753 :returns: Combined results.
754 :rtype: _ComputeResult
756 stretch_avg = stretch_result.value_tracker.average
757 erf_avg = erf_result.value_tracker.average
758 stretch_var = stretch_result.value_tracker.get_pessimistic_variance()
759 erf_var = erf_result.value_tracker.get_pessimistic_variance()
760 avg_log = (stretch_avg + erf_avg) / 2.0
761 var_log = (stretch_var + erf_var) / 2.0
762 var_log += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
763 stdev_log = math.sqrt(var_log)
764 low, upp = math.exp(avg_log - stdev_log), math.exp(avg_log + stdev_log)
765 avg = (low + upp) / 2
767 trackers = (stretch_result.focus_tracker, erf_result.focus_tracker)
768 sea = math.exp(stretch_avg)
769 eea = math.exp(erf_avg)
770 return _ComputeResult(measurement, avg, stdev, sea, eea, trackers)
773 # Named tuples, for multiple local variables to be passed as return value.
774 _PartialResult = namedtuple(
775 "_PartialResult", "value_tracker focus_tracker samples"
777 """Two stat trackers and sample counter.
779 :param value_tracker: Tracker for the value (critical load) being integrated.
780 :param focus_tracker: Tracker for focusing integration inputs (sample points).
781 :param samples: How many samples were used for the computation.
782 :type value_tracker: stat_trackers.ScalarDualStatTracker
783 :type focus_tracker: stat_trackers.VectorStatTracker
787 _ComputeResult = namedtuple(
789 "measurement avg stdev stretch_exp_avg erf_exp_avg trackers",
791 """Measurement, 4 computation result values, pair of trackers.
793 :param measurement: The trial measurement result obtained during computation.
794 :param avg: Overall average of critical rate estimate.
795 :param stdev: Overall standard deviation of critical rate estimate.
796 :param stretch_exp_avg: Stretch fitting function estimate average exponentiated.
797 :param erf_exp_avg: Erf fitting function estimate average, exponentiated.
798 :param trackers: Pair of focus trackers to start next iteration with.
799 :type measurement: MeasurementResult
802 :type stretch_exp_avg: float
803 :type erf_exp_avg: float
804 :type trackers: 2-tuple of stat_trackers.VectorStatTracker