X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2FPLRsearch%2FPLRsearch.py;h=cdfd3081494ac9c246162e60d6011029bcb2165f;hb=9780b57a9640e9ab40e40ec122ac80e09cd74c79;hp=db870c55dc94236869acd9b9ad4d4dc45c9f7c7f;hpb=752a4c3304581fa375f520fdb15a9f87604e11be;p=csit.git diff --git a/resources/libraries/python/PLRsearch/PLRsearch.py b/resources/libraries/python/PLRsearch/PLRsearch.py index db870c55dc..cdfd308149 100644 --- a/resources/libraries/python/PLRsearch/PLRsearch.py +++ b/resources/libraries/python/PLRsearch/PLRsearch.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019 Cisco and/or its affiliates. +# Copyright (c) 2021 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -18,19 +18,21 @@ import math import multiprocessing import time +from collections import namedtuple + import dill -# TODO: Inform pylint about scipy (of correct version) being available. + from scipy.special import erfcx, erfc # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH, # then switch to absolute imports within PLRsearch package. # Current usage of relative imports is just a short term workaround. -import Integrator # pylint: disable=relative-import -from log_plus import log_plus, log_minus # pylint: disable=relative-import -import stat_trackers # pylint: disable=relative-import +from . import Integrator +from . import stat_trackers +from .log_plus import log_plus, log_minus -class PLRsearch(object): +class PLRsearch: """A class to encapsulate data relevant for the search method. The context is performance testing of packet processing systems. @@ -41,8 +43,9 @@ class PLRsearch(object): Two constants are stored as class fields for speed. - Method othed than search (and than __init__) + Method other than search (and than __init__) are just internal code structure. + TODO: Those method names should start with underscore then. """ @@ -51,9 +54,11 @@ class PLRsearch(object): def __init__( self, measurer, trial_duration_per_trial, packet_loss_ratio_target, - trial_number_offset=0, timeout=1800.0, trace_enabled=False): + trial_number_offset=0, timeout=7200.0, trace_enabled=False): """Store rate measurer and additional parameters. + The measurer must never report negative loss count. + TODO: Copy AbstractMeasurer from MLRsearch. :param measurer: The measurer to call when searching. @@ -162,35 +167,37 @@ class PLRsearch(object): :type min_rate: float :type max_rate: float :returns: Average and stdev of critical load estimate. - :rtype: 2-tuple of floats + :rtype: 2-tuple of float """ stop_time = time.time() + self.timeout min_rate = float(min_rate) max_rate = float(max_rate) - logging.info("Started search with min_rate %(min)r, max_rate %(max)r", - {"min": min_rate, "max": max_rate}) + logging.info( + f"Started search with min_rate {min_rate!r}, " + f"max_rate {max_rate!r}" + ) trial_result_list = list() trial_number = self.trial_number_offset focus_trackers = (None, None) transmit_rate = (min_rate + max_rate) / 2.0 lossy_loads = [max_rate] - zeros = [0, 0] # Cosecutive zero loss, separately for stretch and erf. + zeros = 0 # How many consecutive zero loss results are happening. while 1: trial_number += 1 - logging.info("Trial %(number)r", {"number": trial_number}) + logging.info(f"Trial {trial_number!r}") results = self.measure_and_compute( self.trial_duration_per_trial * trial_number, transmit_rate, - trial_result_list, min_rate, max_rate, focus_trackers) + trial_result_list, min_rate, max_rate, focus_trackers + ) measurement, average, stdev, avg1, avg2, focus_trackers = results - index = trial_number % 2 - zeros[index] += 1 + zeros += 1 # TODO: Ratio of fill rate to drain rate seems to have # exponential impact. Make it configurable, or is 4:3 good enough? - if measurement.loss_fraction >= self.packet_loss_ratio_target: - for _ in range(4 * zeros[index]): + if measurement.loss_ratio >= self.packet_loss_ratio_target: + for _ in range(4 * zeros): lossy_loads.append(measurement.target_tr) if measurement.loss_count > 0: - zeros[index] = 0 + zeros = 0 lossy_loads.sort() if stop_time <= time.time(): return average, stdev @@ -198,24 +205,24 @@ class PLRsearch(object): if (trial_number - self.trial_number_offset) <= 1: next_load = max_rate elif (trial_number - self.trial_number_offset) <= 3: - next_load = (measurement.receive_rate / ( + next_load = (measurement.relative_receive_rate / ( 1.0 - self.packet_loss_ratio_target)) else: - index = (trial_number + 1) % 2 - next_load = (avg1, avg2)[index] - if zeros[index] > 0: + next_load = (avg1 + avg2) / 2.0 + if zeros > 0: if lossy_loads[0] > next_load: - diminisher = math.pow(2.0, 1 - zeros[index]) + diminisher = math.pow(2.0, 1 - zeros) next_load = lossy_loads[0] + diminisher * next_load next_load /= (1.0 + diminisher) # On zero measurement, we need to drain obsoleted low losses # even if we did not use them to increase next_load, - # in order to get to usable loses with higher load. + # in order to get to usable loses at higher loads. if len(lossy_loads) > 3: lossy_loads = lossy_loads[3:] - logging.debug("Zeros %(z)r orig %(o)r next %(n)r loads %(s)r", - {"z": zeros, "o": (avg1, avg2)[index], - "n": next_load, "s": lossy_loads}) + logging.debug( + f"Zeros {zeros!r} orig {(avg1 + avg2) / 2.0!r} " + f"next {next_load!r} loads {lossy_loads!r}" + ) transmit_rate = min(max_rate, max(min_rate, next_load)) @staticmethod @@ -256,21 +263,22 @@ class PLRsearch(object): # TODO: chi is from https://en.wikipedia.org/wiki/Nondimensionalization chi = (load - mrr) / spread chi0 = -mrr / spread - trace("stretch: load", load) - trace("mrr", mrr) - trace("spread", spread) - trace("chi", chi) - trace("chi0", chi0) + trace(u"stretch: load", load) + trace(u"mrr", mrr) + trace(u"spread", spread) + trace(u"chi", chi) + trace(u"chi0", chi0) if chi > 0: log_lps = math.log( - load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread) - trace("big loss direct log_lps", log_lps) + load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread + ) + trace(u"big loss direct log_lps", log_lps) else: two_positive = log_plus(chi, 2 * chi0 - log_2) two_negative = log_plus(chi0, 2 * chi - log_2) if two_positive <= two_negative: log_lps = log_minus(chi, chi0) + log_spread - trace("small loss crude log_lps", log_lps) + trace(u"small loss crude log_lps", log_lps) return log_lps two = log_minus(two_positive, two_negative) three_positive = log_plus(two_positive, 3 * chi - log_3) @@ -278,11 +286,11 @@ class PLRsearch(object): three = log_minus(three_positive, three_negative) if two == three: log_lps = two + log_spread - trace("small loss approx log_lps", log_lps) + trace(u"small loss approx log_lps", log_lps) else: log_lps = math.log(log_plus(0, chi) - log_plus(0, chi0)) log_lps += log_spread - trace("small loss direct log_lps", log_lps) + trace(u"small loss direct log_lps", log_lps) return log_lps @staticmethod @@ -321,26 +329,26 @@ class PLRsearch(object): # TODO: The stretch sign is just to have less minuses. Worth changing? chi = (mrr - load) / spread chi0 = mrr / spread - trace("Erf: load", load) - trace("mrr", mrr) - trace("spread", spread) - trace("chi", chi) - trace("chi0", chi0) + trace(u"Erf: load", load) + trace(u"mrr", mrr) + trace(u"spread", spread) + trace(u"chi", chi) + trace(u"chi0", chi0) if chi >= -1.0: - trace("positive, b roughly bigger than m", None) + trace(u"positive, b roughly bigger than m", None) if chi > math.exp(10): first = PLRsearch.log_xerfcx_10 + 2 * (math.log(chi) - 10) - trace("approximated first", first) + trace(u"approximated first", first) else: first = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi)) - trace("exact first", first) + trace(u"exact first", first) first -= chi * chi second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0)) second -= chi0 * chi0 intermediate = log_minus(first, second) - trace("first", first) + trace(u"first", first) else: - trace("negative, b roughly smaller than m", None) + trace(u"negative, b roughly smaller than m", None) exp_first = PLRsearch.xerfcx_limit + chi * erfcx(-chi) exp_first *= math.exp(-chi * chi) exp_first -= 2 * chi @@ -351,11 +359,11 @@ class PLRsearch(object): second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0)) second -= chi0 * chi0 intermediate = math.log(exp_first - math.exp(second)) - trace("exp_first", exp_first) - trace("second", second) - trace("intermediate", intermediate) + trace(u"exp_first", exp_first) + trace(u"second", second) + trace(u"intermediate", intermediate) result = intermediate + math.log(spread) - math.log(erfc(-chi0)) - trace("result", result) + trace(u"result", result) return result @staticmethod @@ -386,7 +394,7 @@ class PLRsearch(object): :type lfit_func: Function from 3 floats to float. :type min_rate: float :type max_rate: float - :type log_lps_target: float + :type loss_ratio_target: float :type mrr: float :type spread: float :returns: Load [pps] which achieves the target with given parameters. @@ -398,17 +406,17 @@ class PLRsearch(object): loss_ratio = -1 while loss_ratio != loss_ratio_target: rate = (rate_hi + rate_lo) / 2.0 - if rate == rate_hi or rate == rate_lo: + if rate in (rate_hi, rate_lo): break loss_rate = math.exp(lfit_func(trace, rate, mrr, spread)) loss_ratio = loss_rate / rate if loss_ratio > loss_ratio_target: - trace("halving down", rate) + trace(u"halving down", rate) rate_hi = rate elif loss_ratio < loss_ratio_target: - trace("halving up", rate) + trace(u"halving up", rate) rate_lo = rate - trace("found", rate) + trace(u"found", rate) return rate @staticmethod @@ -429,40 +437,42 @@ class PLRsearch(object): :param trace: A multiprocessing-friendly logging function (closure). :param lfit_func: Fitting function, typically lfit_spread or lfit_erf. - :param result_list: List of trial measurement results. + :param trial_result_list: List of trial measurement results. :param mrr: The mrr parameter for the fitting function. - :param spread: The spread parameter for the fittinmg function. + :param spread: The spread parameter for the fitting function. :type trace: function (str, object) -> None :type lfit_func: Function from 3 floats to float. - :type result_list: list of MLRsearch.ReceiveRateMeasurement + :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement :type mrr: float :type spread: float :returns: Logarithm of result weight for given function and parameters. :rtype: float """ log_likelihood = 0.0 - trace("log_weight for mrr", mrr) - trace("spread", spread) + trace(u"log_weight for mrr", mrr) + trace(u"spread", spread) for result in trial_result_list: - trace("for tr", result.target_tr) - trace("lc", result.loss_count) - trace("d", result.duration) - log_avg_loss_per_second = lfit_func( - trace, result.target_tr, mrr, spread) - log_avg_loss_per_trial = ( - log_avg_loss_per_second + math.log(result.duration)) - # Poisson probability computation works nice for logarithms. - log_trial_likelihood = ( - result.loss_count * log_avg_loss_per_trial - - math.exp(log_avg_loss_per_trial)) - log_trial_likelihood -= math.lgamma(1 + result.loss_count) + trace(u"for tr", result.target_tr) + trace(u"lc", result.loss_count) + trace(u"d", result.duration) + # _rel_ values use units of target_tr (transactions per second). + log_avg_rel_loss_per_second = lfit_func( + trace, result.target_tr, mrr, spread + ) + # _abs_ values use units of loss count (maybe packets). + # There can be multiple packets per transaction. + log_avg_abs_loss_per_trial = log_avg_rel_loss_per_second + math.log( + result.transmit_count / result.target_tr + ) + # Geometric probability computation for logarithms. + log_trial_likelihood = log_plus(0.0, -log_avg_abs_loss_per_trial) + log_trial_likelihood *= -result.loss_count + log_trial_likelihood -= log_plus(0.0, +log_avg_abs_loss_per_trial) log_likelihood += log_trial_likelihood - trace("avg_loss_per_trial", math.exp(log_avg_loss_per_trial)) - trace("log_trial_likelihood", log_trial_likelihood) + trace(u"avg_loss_per_trial", math.exp(log_avg_abs_loss_per_trial)) + trace(u"log_trial_likelihood", log_trial_likelihood) return log_likelihood - # TODO: Refactor (somehow) so pylint stops complaining about - # too many local variables. def measure_and_compute( self, trial_duration, transmit_rate, trial_result_list, min_rate, max_rate, focus_trackers=(None, None), max_samples=None): @@ -512,15 +522,14 @@ class PLRsearch(object): :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker :type max_samples: None or int :returns: Measurement and computation results. - :rtype: 6-tuple: ReceiveRateMeasurement, 4 floats, 2-tuple of trackers. + :rtype: _ComputeResult """ logging.debug( - "measure_and_compute started with self %(self)r, trial_duration " - + "%(dur)r, transmit_rate %(tr)r, trial_result_list %(trl)r, " - + "max_rate %(mr)r, focus_trackers %(track)r, max_samples %(ms)r", - {"self": self, "dur": trial_duration, "tr": transmit_rate, - "trl": trial_result_list, "mr": max_rate, "track": focus_trackers, - "ms": max_samples}) + f"measure_and_compute started with self {self!r}, trial_duration " + f"{trial_duration!r}, transmit_rate {transmit_rate!r}, " + f"trial_result_list {trial_result_list!r}, max_rate {max_rate!r}, " + f"focus_trackers {focus_trackers!r}, max_samples {max_samples!r}" + ) # Preparation phase. dimension = 2 stretch_focus_tracker, erf_focus_tracker = focus_trackers @@ -531,6 +540,7 @@ class PLRsearch(object): erf_focus_tracker = stat_trackers.VectorStatTracker(dimension) erf_focus_tracker.unit_reset() old_trackers = stretch_focus_tracker.copy(), erf_focus_tracker.copy() + def start_computing(fitting_function, focus_tracker): """Just a block of code to be used for each fitting function. @@ -538,14 +548,14 @@ class PLRsearch(object): start computation, return the boss pipe end. :param fitting_function: lfit_erf or lfit_stretch. - :param bias_avg: Tuple of floats to start searching around. - :param bias_cov: Covariance matrix defining initial focus shape. + :param focus_tracker: Tracker initialized to speed up the numeric + computation. :type fitting_function: Function from 3 floats to float. - :type bias_avg: 2-tuple of floats - :type bias_cov: 2-tuple of 2-tuples of floats + :type focus_tracker: None or stat_trackers.VectorStatTracker :returns: Boss end of communication pipe. :rtype: multiprocessing.Connection """ + def value_logweight_func(trace, x_mrr, x_spread): """Return log of critical rate and log of likelihood. @@ -580,27 +590,37 @@ class PLRsearch(object): mrr = max_rate * (1.0 / (x_mrr + 1.0) - 0.5) + 1.0 spread = math.exp((x_spread + 1.0) / 2.0 * math.log(mrr)) logweight = self.log_weight( - trace, fitting_function, trial_result_list, mrr, spread) - value = math.log(self.find_critical_rate( - trace, fitting_function, min_rate, max_rate, - self.packet_loss_ratio_target, mrr, spread)) + trace, fitting_function, trial_result_list, mrr, spread + ) + value = math.log( + self.find_critical_rate( + trace, fitting_function, min_rate, max_rate, + self.packet_loss_ratio_target, mrr, spread + ) + ) return value, logweight + dilled_function = dill.dumps(value_logweight_func) boss_pipe_end, worker_pipe_end = multiprocessing.Pipe() - boss_pipe_end.send( - (dimension, dilled_function, focus_tracker, max_samples)) + # Do not send yet, run the worker first to avoid a deadlock. + # See https://stackoverflow.com/a/15716500 worker = multiprocessing.Process( - target=Integrator.try_estimate_nd, args=( - worker_pipe_end, 10.0, self.trace_enabled)) + target=Integrator.try_estimate_nd, + args=(worker_pipe_end, 10.0, self.trace_enabled) + ) worker.daemon = True worker.start() + boss_pipe_end.send( + (dimension, dilled_function, focus_tracker, max_samples) + ) return boss_pipe_end - erf_pipe = start_computing( - self.lfit_erf, erf_focus_tracker) - stretch_pipe = start_computing( - self.lfit_stretch, stretch_focus_tracker) + + erf_pipe = start_computing(self.lfit_erf, erf_focus_tracker) + stretch_pipe = start_computing(self.lfit_stretch, stretch_focus_tracker) + # Measurement phase. measurement = self.measurer.measure(trial_duration, transmit_rate) + # Processing phase. def stop_computing(name, pipe): """Just a block of code to be used for each worker. @@ -616,51 +636,115 @@ class PLRsearch(object): :type pipe: multiprocessing.Connection :returns: Computed value tracker, actual focus tracker, and number of samples used for this iteration. - :rtype: 3-tuple of tracker, tracker and int + :rtype: _PartialResult """ - pipe.send(None) + # If worker encountered an exception, we get it in the recv below, + # but send will report a broken pipe. + # EAFP says we should ignore the error (instead of polling first). + # https://devblogs.microsoft.com/python + # /idiomatic-python-eafp-versus-lbyl/ + try: + pipe.send(None) + except BrokenPipeError: + pass if not pipe.poll(10.0): - raise RuntimeError( - "Worker {name} did not finish!".format(name=name)) + raise RuntimeError(f"Worker {name} did not finish!") result_or_traceback = pipe.recv() try: value_tracker, focus_tracker, debug_list, trace_list, sampls = ( - result_or_traceback) + result_or_traceback + ) except ValueError: raise RuntimeError( - "Worker {name} failed with the following traceback:\n{tr}" - .format(name=name, tr=result_or_traceback)) - logging.info("Logs from worker %(name)r:", {"name": name}) + f"Worker {name} failed with the following traceback:\n" + f"{result_or_traceback}" + ) + logging.info(f"Logs from worker {name!r}:") for message in debug_list: logging.info(message) for message in trace_list: logging.debug(message) - logging.debug("trackers: value %(val)r focus %(foc)r", { - "val": value_tracker, "foc": focus_tracker}) - return value_tracker, focus_tracker, sampls - stretch_value_tracker, stretch_focus_tracker, stretch_samples = ( - stop_computing("stretch", stretch_pipe)) - erf_value_tracker, erf_focus_tracker, erf_samples = ( - stop_computing("erf", erf_pipe)) - stretch_avg = stretch_value_tracker.average - erf_avg = erf_value_tracker.average - # TODO: Take into account secondary stats. - stretch_stdev = math.exp(stretch_value_tracker.log_variance / 2) - erf_stdev = math.exp(erf_value_tracker.log_variance / 2) - avg = math.exp((stretch_avg + erf_avg) / 2.0) - var = (stretch_stdev * stretch_stdev + erf_stdev * erf_stdev) / 2.0 - var += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0 - stdev = avg * math.sqrt(var) - focus_trackers = (stretch_focus_tracker, erf_focus_tracker) + logging.debug( + f"trackers: value {value_tracker!r} focus {focus_tracker!r}" + ) + return _PartialResult(value_tracker, focus_tracker, sampls) + + stretch_result = stop_computing(u"stretch", stretch_pipe) + erf_result = stop_computing(u"erf", erf_pipe) + result = PLRsearch._get_result(measurement, stretch_result, erf_result) logging.info( - "measure_and_compute finished with trial result %(res)r " - "avg %(avg)r stdev %(stdev)r stretch %(a1)r erf %(a2)r " - "new trackers %(nt)r old trackers %(ot)r stretch samples %(ss)r " - "erf samples %(es)r", - {"res": measurement, "avg": avg, "stdev": stdev, - "a1": math.exp(stretch_avg), "a2": math.exp(erf_avg), - "nt": focus_trackers, "ot": old_trackers, "ss": stretch_samples, - "es": erf_samples}) - return ( - measurement, avg, stdev, math.exp(stretch_avg), - math.exp(erf_avg), focus_trackers) + f"measure_and_compute finished with trial result " + f"{result.measurement!r} avg {result.avg!r} stdev {result.stdev!r} " + f"stretch {result.stretch_exp_avg!r} erf {result.erf_exp_avg!r} " + f"new trackers {result.trackers!r} old trackers {old_trackers!r} " + f"stretch samples {stretch_result.samples!r} erf samples " + f"{erf_result.samples!r}" + ) + return result + + @staticmethod + def _get_result(measurement, stretch_result, erf_result): + """Process and collate results from measure_and_compute. + + Turn logarithm based values to exponential ones, + combine averages and stdevs of two fitting functions into a whole. + + :param measurement: The trial measurement obtained during computation. + :param stretch_result: Computation output for stretch fitting function. + :param erf_result: Computation output for erf fitting function. + :type measurement: ReceiveRateMeasurement + :type stretch_result: _PartialResult + :type erf_result: _PartialResult + :returns: Combined results. + :rtype: _ComputeResult + """ + stretch_avg = stretch_result.value_tracker.average + erf_avg = erf_result.value_tracker.average + stretch_var = stretch_result.value_tracker.get_pessimistic_variance() + erf_var = erf_result.value_tracker.get_pessimistic_variance() + avg_log = (stretch_avg + erf_avg) / 2.0 + var_log = (stretch_var + erf_var) / 2.0 + var_log += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0 + stdev_log = math.sqrt(var_log) + low, upp = math.exp(avg_log - stdev_log), math.exp(avg_log + stdev_log) + avg = (low + upp) / 2 + stdev = avg - low + trackers = (stretch_result.focus_tracker, erf_result.focus_tracker) + sea = math.exp(stretch_avg) + eea = math.exp(erf_avg) + return _ComputeResult(measurement, avg, stdev, sea, eea, trackers) + + +# Named tuples, for multiple local variables to be passed as return value. +_PartialResult = namedtuple( + u"_PartialResult", u"value_tracker focus_tracker samples" +) +"""Two stat trackers and sample counter. + +:param value_tracker: Tracker for the value (critical load) being integrated. +:param focus_tracker: Tracker for focusing integration inputs (sample points). +:param samples: How many samples were used for the computation. +:type value_tracker: stat_trackers.ScalarDualStatTracker +:type focus_tracker: stat_trackers.VectorStatTracker +:type samples: int +""" + +_ComputeResult = namedtuple( + u"_ComputeResult", + u"measurement avg stdev stretch_exp_avg erf_exp_avg trackers" +) +"""Measurement, 4 computation result values, pair of trackers. + +:param measurement: The trial measurement result obtained during computation. +:param avg: Overall average of critical rate estimate. +:param stdev: Overall standard deviation of critical rate estimate. +:param stretch_exp_avg: Stretch fitting function estimate average exponentiated. +:param erf_exp_avg: Erf fitting function estimate average, exponentiated. +:param trackers: Pair of focus trackers to start next iteration with. +:type measurement: ReceiveRateMeasurement +:type avg: float +:type stdev: float +:type stretch_exp_avg: float +:type erf_exp_avg: float +:type trackers: 2-tuple of stat_trackers.VectorStatTracker +"""