1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Module holding PLRsearch class."""
18 import multiprocessing
22 # TODO: Inform pylint about scipy (of correct version) being available.
23 from scipy.special import erfcx, erfc
25 # TODO: The preferred way to consume this code is via a pip package.
26 # If your project copies code instead, make sure your pylint check does not
27 # require these imports to be absolute and descending from your superpackage.
29 from log_plus import log_plus, log_minus
32 class PLRsearch(object):
33 """A class to encapsulate data relevant for search method.
35 The context is performance testing of packet processing systems.
36 The system, when being offered a steady stream of packets,
37 can process some of them successfully, other are considered "lost".
39 See docstring of the search method for algorithm description.
41 Two constants are stored as class fields for speed.
43 Method othed than search (and than __init__)
44 are just internal code structure.
45 TODO: Those method names should start with underscore then.
47 TODO: Figure out how to replace #print with logging
48 without slowing down too much.
51 xerfcx_limit = math.pow(math.acos(0), -0.5)
52 log_xerfcx_10 = math.log(xerfcx_limit - math.exp(10) * erfcx(math.exp(10)))
55 self, measurer, trial_duration_per_trial, packet_loss_ratio_target,
56 trial_number_offset=0, timeout=60.0):
57 """Store rate measurer and additional parameters.
59 Also declare packet_loss_per_second_target field (float),
60 to be initialized later.
62 TODO: Copy AbstractMeasurer from MLRsearch.
64 :param measurer: The measurer to call when searching.
65 :param trial_duration_per_trial: Each trial has larger duration
66 than the previous trial. This is the increment, in seconds.
67 :param packet_loss_ratio_target: The algorithm tries to estimate
68 the offered load leading to this ratio on average.
69 Trial ratio is number of packets lost divided by packets offered.
70 :param trial_number_offset: The "first" trial number will be 1+this.
71 Use this to ensure first iterations have enough time to compute
72 reasonable estimates for later trials to use.
73 :param timeout: The search ends if it lasts more than this many seconds.
74 :type measurer: MLRsearch.AbstractMeasurer
75 :type trial_duration_per_trial: float
76 :type packet_loss_ratio_target: float
77 :type trial_number_offset: int
80 self.measurer = measurer
81 self.trial_duration_per_trial = trial_duration_per_trial
82 self.packet_loss_ratio_target = packet_loss_ratio_target
83 self.trial_number_offset = trial_number_offset
84 self.timeout = timeout
86 def search(self, min_rate, max_rate):
87 """Perform the search, return average and stdev for throughput estimate.
89 Considering measurer and packet_loss_ratio_target (see __init__),
90 find such an offered load (called critical load) that is expected
91 to hit the target loss ratio in the limit of very long trial duration.
92 As the system is probabilistic (and test duration is finite),
93 the critical ratio is only estimated.
94 Return the average and standard deviation of the estimate.
96 In principle, this algorithm performs trial measurements,
97 each with varied offered load (which is constant during the trial).
98 During each measurement, Bayesian inference is performed
99 on all the measurement results so far.
100 When timeout is up, the last estimate is returned,
101 else another trial is performed.
103 It is assumed that the system under test, even though not deterministic,
104 still follows the rule of large numbers. In another words,
105 any growing set of measurements at a particular offered load
106 will converge towards unique (for the given load) packet loss ratio.
107 This means there is a deterministic (but unknown) function
108 mapping the offered load to average loss ratio.
109 This function is called loss function.
110 This also assumes the average loss ratio
111 does not depend on trial duration.
113 The actual probability distribution of loss counts, achieving
114 the average ratio on trials of various duration
115 can be complicated (and can depend on offered load), but simply assuming
116 Poisson distribution will make the algorithm converge.
117 Binomial distribution would be more precise,
118 but Poisson is more practical, as it effectively gives
119 less information content to high ratio results.
121 Even when applying other assumptions on the loss function
122 (increasing function, limit zero ratio when load goes to zero,
123 global upper limit on rate of packets processed), there are still
124 too many different shapes of possible loss functions,
125 which makes full Bayesian reasoning intractable.
127 This implementation radically simplifies things by examining
128 only two shapes, each with finitely many (in this case just two)
129 parameters. In other words, two fitting functions
130 (each with two parameters and one argument).
131 When restricting model space to one of the two fitting functions,
132 the Bayesian inference becomes tractable (even though it needs
133 numerical integration from Integrator class).
135 The first measurement is done at min_rate to help with convergence
136 if max_rate measurements give loss below target.
137 FIXME: Fix overflow error and really use min_rate.
138 The second measurement is done at max_rate, next few measurements
139 have offered load of previous load minus excess loss ratio.
140 This simple rule is found to be good when offered loads
141 so far are way above the critical rate. After few measurements,
142 inference from fitting functions converges faster that the initial
143 "optimistic" procedure.
145 Offered loads close to (limiting) critical rate are the most useful,
146 as linear approximation of the fitting function
147 becomes good enough there (thus reducing the impact
148 of the overall shape of fitting function).
149 After several trials, usually one of the fitting functions
150 has better predictions than the other one, but the algorithm
151 does not track that. Simply, it uses the estimate average,
152 alternating between the functions.
154 The returned average and stdev is a combination of the two fitting
157 TODO: If measurement at max_rate is already below target loss rate,
158 the algorithm always measures at max_rate, and likelihood
159 no longer has single maximum, leading to "random" estimates.
160 Find a way to avoid that.
162 :param min_rate: Avoid measuring at offered loads below this,
163 in packets per second.
164 :param max_rate: Avoid measuring at offered loads above this,
165 in packets per second.
166 :type min_rate: float
167 :type max_rate: float
168 :returns: Average and stdev of critical load estimate.
169 :rtype: 2-tuple of floats
171 stop_time = time.time() + self.timeout
172 min_rate = float(min_rate)
173 max_rate = float(max_rate)
174 trial_result_list = list()
175 trial_number = self.trial_number_offset
176 integrator_data = (None, None, None, None)
177 message = "Trial %(number)r computed avg %(avg)r stdev %(stdev)r"
178 message += " stretch %(a1)r erf %(a2)r difference %(diff)r"
179 transmit_rate = (min_rate + max_rate) / 2.0
182 trial_duration = trial_number * self.trial_duration_per_trial
183 results = self.measure_and_compute(
184 trial_duration, transmit_rate, trial_result_list, max_rate,
186 measurement, average, stdev, avg1, avg2, integrator_data = results
187 logging.info(message, {
188 "number": trial_number, "avg": average, "stdev": stdev,
189 "a1": avg1, "a2": avg2, "diff": avg2 - avg1})
190 if stop_time <= time.time():
191 return average, stdev
192 trial_result_list.append(measurement)
193 if (trial_number - self.trial_number_offset) <= 1:
195 elif (trial_number - self.trial_number_offset) <= 3:
196 next_load = (measurement.receive_rate
197 / (1.0 - self.packet_loss_ratio_target))
199 next_load = avg1 if trial_number % 2 else avg2
200 transmit_rate = min(max_rate, max(min_rate, next_load))
203 def lfit_stretch(load, mrr, spread):
204 """Stretch-based fitting function.
206 Return the logarithm of average packet loss per second
207 when the load (argument) is offered to a system with given
208 mrr and spread (parameters).
209 Stretch function is 1/(1+Exp[-x]). The average itself is definite
210 integral from zero to load, of shifted and x-scaled stretch function.
211 As the integrator is sensitive to discontinuities,
212 and it calls this function at large areas of parameter space,
213 the implementation has to avoid rounding errors, overflows,
214 and correctly approximate underflows.
216 TODO: Explain how the high-level description
217 has been converted into an implementation full of ifs.
219 :param load: Offered load (positive), in packets per second.
220 :param mrr: Parameter of this fitting function, equal to limiting
221 (positive) average number of packets received (as opposed to lost)
222 when offered load is many spreads more than mrr.
223 :param spread: The x-scaling parameter (positive). No nice semantics,
224 roughly corresponds to size of "tail" for loads below mrr.
228 :returns: Logarithm of average number of packets lost per second.
231 # TODO: chi is from https://en.wikipedia.org/wiki/Nondimensionalization
232 chi = (load - mrr) / spread
234 # print "load", load, "mrr", mrr, "spread", spread, "chi", chi
237 load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread)
238 # print "big loss direct log_lps", log_lps
240 approx = (math.exp(chi) - math.exp(2 * chi) / 2) * spread
243 # print "small loss crude log_lps", log_lps
245 third = math.exp(3 * chi) / 3 * spread
246 if approx + third != approx + 2 * third:
248 (log_plus(0, chi) - log_plus(0, chi0)) * spread)
249 # print "small loss direct log_lps", log_lps
252 approx - (math.exp(chi0) - math.exp(2 * chi0)) * spread)
253 # print "small loss approx log_lps", log_lps
257 def lfit_erf(load, mrr, spread):
258 """Erf-based fitting function.
260 Return the logarithm of average packet loss per second
261 when the load (argument) is offered to a system with given
262 mrr and spread (parameters).
263 Erf function is Primitive function to normal distribution density.
264 The average itself is definite integral from zero to load,
265 of shifted and x-scaled erf function.
266 As the integrator is sensitive to discontinuities,
267 and it calls this function at large areas of parameter space,
268 the implementation has to avoid rounding errors, overflows,
269 and correctly approximate underflows.
271 TODO: Explain how the high-level description
272 has been converted into an implementation full of ifs.
274 :param load: Offered load (positive), in packets per second.
275 :param mrr: Parameter of this fitting function, equal to limiting
276 (positive) average number of packets received (as opposed to lost)
277 when offered load is many spreads more than mrr.
278 :param spread: The x-scaling parameter (positive). No nice semantics,
279 roughly corresponds to size of "tail" for loads below mrr.
283 :returns: Logarithm of average number of packets lost per second.
286 # Beware, this chi has the sign opposite to the stretch function chi.
287 # TODO: The stretch sign is just to have less minuses. Worth changing?
288 chi = (mrr - load) / spread
290 # print "load", load, "mrr", mrr, "spread", spread,
291 # print "chi", chi, "chi0", chi0
293 # print "positive, b ~> m"
294 if chi > math.exp(10):
295 first = PLRsearch.log_xerfcx_10 + 2 * (math.log(chi) - 10)
296 # print "approximated"
298 first = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi))
301 second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
302 second -= chi0 * chi0
303 intermediate = log_minus(first, second)
304 # print "first", first, "second", second,
305 # print "intermediate", intermediate
307 # print "negative, b ~< m"
308 exp_first = PLRsearch.xerfcx_limit + chi * erfcx(-chi)
309 exp_first *= math.exp(-chi * chi)
311 # TODO: Why has the following line chi there (as opposed to chi0)?
312 # In general the functions would be more readable if they explicitly
313 # return math.log(func(chi) - func(chi0))
314 # for some function "func", at least for some branches.
315 second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
316 second -= chi0 * chi0
317 intermediate = math.log(exp_first - math.exp(second))
318 # print "exp_first", exp_first, "second", second,
319 # print "intermediate", intermediate
320 result = intermediate + math.log(spread) - math.log(erfc(-chi0))
321 # print "lfit erf result", result
325 def find_critical_rate(lfit_func, log_lps_target, mrr, spread):
326 """Given lps target and parameters, return the achieving offered load.
328 This is basically an inverse function to lfit_func
329 when parameters are fixed.
330 Instead of implementing effective implementation
331 of the inverse function, this implementation uses
332 brute force binary search.
333 The search starts at an arbitrary offered load,
334 uses exponential search to find the other bound
335 and then bisecting until the load is found (or interval
336 becoming degenerate).
338 TODO: Use at least some method with faster convergence.
340 :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
341 :param log_lps_target: Fitting function should return this
342 at the returned load and parameters.
343 :param mrr: The mrr parameter for the fitting function.
344 :param spread: The spread parameter for the fittinmg function.
345 :type lfit_func: Function from 3 floats to float.
346 :type log_lps_target: float
349 :returns: Load [pps] which achieves the target with given parameters.
352 # TODO: Should we make the initial rate configurable?
354 log_loss = lfit_func(rate, mrr, spread)
355 if log_loss == log_lps_target:
357 # Exponential search part.
358 if log_loss > log_lps_target:
361 rate_lo = rate_hi / 2.0
362 log_loss = lfit_func(rate_lo, mrr, spread)
363 if log_loss > log_lps_target:
366 if log_loss == log_lps_target:
372 rate_hi = rate_lo * 2.0
373 log_loss = lfit_func(rate_hi, mrr, spread)
374 if log_loss < log_lps_target:
377 if log_loss == log_lps_target:
380 # Binary search part.
381 while rate_hi != rate_lo:
382 rate = (rate_hi + rate_lo) / 2.0
383 log_loss = lfit_func(rate, mrr, spread)
384 if rate == rate_hi or rate == rate_lo or log_loss == log_lps_target:
385 # print "found", rate
387 if log_loss > log_lps_target:
393 def log_weight(lfit_func, trial_result_list, mrr, spread):
394 """Return log of weight of trial results by the function and parameters.
396 Integrator assumes uniform distribution, but over different parameters.
397 Weight and likelihood are used interchangeably here anyway.
399 Each trial has an offered load, a duration and a loss count.
400 Fitting function is used to compute the average loss per second.
401 Poisson distribution (with average loss per trial) is used
402 to get likelihood of one trial result, the overal likelihood is product.
403 As likelihoods can be extremely small, logarithms are tracked instead.
405 TODO: Copy ReceiveRateMeasurement from MLRsearch.
407 :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
408 :param result_list: List of trial measurement results.
409 :param mrr: The mrr parameter for the fitting function.
410 :param spread: The spread parameter for the fittinmg function.
411 :type lfit_func: Function from 3 floats to float.
412 :type result_list: list of MLRsearch.ReceiveRateMeasurement
415 :returns: Logarithm of result weight for given function and parameters.
419 for result in trial_result_list:
420 # print "DEBUG for tr", result.target_tr,
421 # print "lc", result.loss_count, "d", result.duration
422 log_avg_loss_per_second = lfit_func(result.target_tr, mrr, spread)
423 log_avg_loss_per_trial = (
424 log_avg_loss_per_second + math.log(result.duration))
425 # Poisson probability computation works nice for logarithms.
426 log_trial_likelihood = (
427 result.loss_count * log_avg_loss_per_trial
428 - math.exp(log_avg_loss_per_trial))
429 log_trial_likelihood -= math.lgamma(1 + result.loss_count)
430 log_likelihood += log_trial_likelihood
431 # print "log_avg_loss_per_second", log_avg_loss_per_second
432 # print "log_avg_loss_per_trial", log_avg_loss_per_trial
433 # print "avg_loss_per_trial", math.exp(log_avg_loss_per_trial)
434 # print "log_trial_likelihood", log_trial_likelihood
435 # print "log_likelihood", log_likelihood
436 # print "returning log_likelihood", log_likelihood
437 return log_likelihood
439 # FIXME: Refactor (somehow) so pylint stops complaining about
440 # too many local variables.
441 # Some work already done in subsequent changes.
442 def measure_and_compute(
443 self, trial_duration, transmit_rate,
444 trial_result_list, max_rate, integrator_data):
445 """Perform both measurement and computation at once.
447 High level steps: Prepare and launch computation worker processes,
448 perform the measurement, stop computation and combine results.
450 Integrator needs a specific function to process (-1, 1) parameters.
451 As our fitting functions use dimensional parameters,
452 so a transformation is performed, resulting in a specific prior
453 distribution over the dimensional parameters.
454 Maximal rate (line rate) is needed for that transformation.
456 Two fitting functions are used, computation is started
457 on temporary worker process per fitting function. After the measurement,
458 Average and stdev of the critical rate (not log) of each worker
459 are combined and returned. Raw averages are also returned,
460 offered load for next iteration is chosen from them.
461 The idea is that one fitting function might be fitting much better,
462 measurements at its avg are best for relevant results (for both),
463 but we do not know which fitting function it is.
465 TODO: Define class for integrator data, so that fields are documented.
466 TODO: Define class for result object, so that fields are documented.
467 TODO: More processes to the slower fitting function?
468 TODO: Re-use processes, instead creating on each computation.
469 TODO: As only one result is needed fresh, figure out a way
470 how to keep the other worker running. This will alow shorter
471 duration per trial. Special handling at first and last measurement
472 will be needed (to properly initialize and to properly combine results).
474 :param trial_duration: Length of the measurement in seconds.
475 :param transmit_rate: Offered load in packets per second.
476 :param trial_result_list: Results of previous measurements.
477 :param max_rate: Theoretic maximum of possible ofered load.
478 :param integrator_data: Hints to speed up the numeric computation.
479 :type trial_duration: float
480 :type transmit_rate: float
481 :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
482 :type max_rate: float
483 :type integrator_data: 4-tuple of gaussian positions and covariances
484 :returns: Measurement and computation results.
485 :rtype: 6-tuple: ReceiveRateMeasurement, floats, integrator data.
489 stretch_bias_avg, erf_bias_avg, stretch_bias_cov, erf_bias_cov = (
491 packet_loss_per_second_target = (
492 transmit_rate * self.packet_loss_ratio_target)
493 def start_computing(fitting_function, bias_avg, bias_cov):
494 """Just a block of code to be used for each fitting function.
496 Define function for integrator, create process and pipe ends,
497 start computation, return the boss pipe end.
499 :param fitting_function: lfit_erf or lfit_stretch.
500 :param bias_avg: Tuple of floats to start searching around.
501 :param bias_cov: Covariance matrix defining initial focus shape.
502 :type fitting_function: Function from 3 floats to float.
503 :type bias_avg: 2-tuple of floats
504 :type bias_cov: 2-tuple of 2-tuples of floats
505 :returns: Boss end of communication pipe.
506 :rtype: multiprocessing.Connection
508 def value_logweight_func(x_mrr, x_spread):
509 """Return log of critical rate and log of likelihood.
511 This is a closure. The ancestor function got
512 trial_result_list as a parameter, and we are accessing it.
513 As integrator has strict conditions on function signature,
514 trial_result_list cannot be an explicit argument
515 of the current function.
516 This is also why we have to define this closure
517 at each invocation of the ancestor function anew.
519 The dimensional spread parameter is the (dimensional) mrr
520 raised to the power of x_spread scaled to interval (0, 1).
521 The dimensional mrr parameter distribution has shape of
522 1/(1+x^2), but x==1 corresponds to max_rate
523 and 1.0 pps is added to avoid numerical problems in fitting
526 TODO: x^-2 (for x>1.0) might be simpler/nicer prior.
528 :param x_mrr: The first dimensionless param
529 from (-1, 1) interval.
530 :param x_spread: The second dimensionless param
531 from (-1, 1) interval.
532 :returns: Log of critical rate [pps] and log of likelihood.
533 :rtype: 2-tuple of float
535 mrr = max_rate * (1.0 / (x_mrr + 1.0) - 0.5) + 1.0
536 spread = math.exp((x_spread + 1.0) / 2.0 * math.log(mrr))
537 # print "mrr", mrr, "spread", spread
538 logweight = self.log_weight(
539 fitting_function, trial_result_list, mrr, spread)
540 value = math.log(self.find_critical_rate(
542 math.log(packet_loss_per_second_target), mrr, spread))
543 return value, logweight
544 dilled_function = dill.dumps(value_logweight_func)
545 boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
547 (dimension, dilled_function, bias_avg, bias_cov))
548 worker = multiprocessing.Process(
549 target=Integrator.try_estimate_nd, args=(worker_pipe_end,))
553 erf_pipe = start_computing(
554 self.lfit_erf, erf_bias_avg, erf_bias_cov)
555 stretch_pipe = start_computing(
556 self.lfit_stretch, stretch_bias_avg, stretch_bias_cov)
558 measurement = self.measurer.measure(trial_duration, transmit_rate)
561 stretch_pipe.send(None)
562 if not stretch_pipe.poll(1.0):
563 raise RuntimeError("Stretch worker did not finish!")
564 result_or_traceback = stretch_pipe.recv()
566 (stretch_avg, stretch_stdev, stretch_bias_avg,
567 stretch_bias_cov, debug_list, _) = result_or_traceback
570 "Stretch worker failed with the following traceback:\n{tr}"
571 .format(tr=result_or_traceback))
572 logging.info("Logs from stretch worker:")
573 for message in debug_list:
574 logging.debug(message)
575 if not erf_pipe.poll(1.0):
576 raise RuntimeError("Erf worker did not finish!")
577 result_or_traceback = erf_pipe.recv()
579 (erf_avg, erf_stdev, erf_bias_avg,
580 erf_bias_cov, debug_list, _) = result_or_traceback
583 "Erf worker failed with the following traceback:\n{tr}"
584 .format(tr=result_or_traceback))
585 logging.info("Logs from erf worker:")
586 for message in debug_list:
587 logging.debug(message)
588 avg = math.exp((stretch_avg + erf_avg) / 2.0)
589 var = (stretch_stdev * stretch_stdev + erf_stdev * erf_stdev) / 2.0
590 var += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
591 stdev = avg * math.sqrt(var)
593 stretch_bias_avg, erf_bias_avg, stretch_bias_cov, erf_bias_cov)
595 measurement, avg, stdev, math.exp(stretch_avg),
596 math.exp(erf_avg), integrator_data)