X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2FPLRsearch%2FPLRsearch.py;h=ec58fbd10f61c9136c01539e229960daec9a729e;hb=HEAD;hp=0314a80efbfa3a093a65cb37ae241a4562880cfc;hpb=186e30a749b404210c49081d2b6d1edfc58ffe90;p=csit.git diff --git a/resources/libraries/python/PLRsearch/PLRsearch.py b/resources/libraries/python/PLRsearch/PLRsearch.py index 0314a80efb..326aa2e2d2 100644 --- a/resources/libraries/python/PLRsearch/PLRsearch.py +++ b/resources/libraries/python/PLRsearch/PLRsearch.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 Cisco and/or its affiliates. +# Copyright (c) 2024 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -53,8 +53,14 @@ class PLRsearch: log_xerfcx_10 = math.log(xerfcx_limit - math.exp(10) * erfcx(math.exp(10))) def __init__( - self, measurer, trial_duration_per_trial, packet_loss_ratio_target, - trial_number_offset=0, timeout=7200.0, trace_enabled=False): + self, + measurer, + trial_duration_per_trial, + packet_loss_ratio_target, + trial_number_offset=0, + timeout=7200.0, + trace_enabled=False, + ): """Store rate measurer and additional parameters. The measurer must never report negative loss count. @@ -176,7 +182,7 @@ class PLRsearch: f"Started search with min_rate {min_rate!r}, " f"max_rate {max_rate!r}" ) - trial_result_list = list() + trial_result_list = [] trial_number = self.trial_number_offset focus_trackers = (None, None) transmit_rate = (min_rate + max_rate) / 2.0 @@ -186,34 +192,54 @@ class PLRsearch: trial_number += 1 logging.info(f"Trial {trial_number!r}") results = self.measure_and_compute( - self.trial_duration_per_trial * trial_number, transmit_rate, - trial_result_list, min_rate, max_rate, focus_trackers + self.trial_duration_per_trial * trial_number, + transmit_rate, + trial_result_list, + min_rate, + max_rate, + focus_trackers, ) measurement, average, stdev, avg1, avg2, focus_trackers = results + # Workaround for unsent packets and other anomalies. + measurement.plr_loss_count = min( + measurement.intended_count, + int(measurement.intended_count * measurement.loss_ratio + 0.9), + ) + logging.debug( + f"loss ratio {measurement.plr_loss_count}" + f" / {measurement.intended_count}" + ) zeros += 1 # TODO: Ratio of fill rate to drain rate seems to have # exponential impact. Make it configurable, or is 4:3 good enough? - if measurement.loss_ratio >= self.packet_loss_ratio_target: + if measurement.plr_loss_count >= ( + measurement.intended_count * self.packet_loss_ratio_target + ): for _ in range(4 * zeros): - lossy_loads.append(measurement.target_tr) - if measurement.loss_count > 0: + lossy_loads.append(measurement.intended_load) + lossy_loads.sort() zeros = 0 - lossy_loads.sort() + logging.debug("High enough loss, lossy loads added.") + else: + logging.debug( + f"Not a high loss, zero counter bumped to {zeros}." + ) if stop_time <= time.time(): return average, stdev trial_result_list.append(measurement) if (trial_number - self.trial_number_offset) <= 1: next_load = max_rate elif (trial_number - self.trial_number_offset) <= 3: - next_load = (measurement.relative_receive_rate / ( - 1.0 - self.packet_loss_ratio_target)) + next_load = measurement.relative_forwarding_rate / ( + 1.0 - self.packet_loss_ratio_target + ) else: next_load = (avg1 + avg2) / 2.0 if zeros > 0: if lossy_loads[0] > next_load: diminisher = math.pow(2.0, 1 - zeros) next_load = lossy_loads[0] + diminisher * next_load - next_load /= (1.0 + diminisher) + next_load /= 1.0 + diminisher # On zero measurement, we need to drain obsoleted low losses # even if we did not use them to increase next_load, # in order to get to usable loses at higher loads. @@ -263,22 +289,22 @@ class PLRsearch: # TODO: chi is from https://en.wikipedia.org/wiki/Nondimensionalization chi = (load - mrr) / spread chi0 = -mrr / spread - trace(u"stretch: load", load) - trace(u"mrr", mrr) - trace(u"spread", spread) - trace(u"chi", chi) - trace(u"chi0", chi0) + trace("stretch: load", load) + trace("mrr", mrr) + trace("spread", spread) + trace("chi", chi) + trace("chi0", chi0) if chi > 0: log_lps = math.log( load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread ) - trace(u"big loss direct log_lps", log_lps) + trace("big loss direct log_lps", log_lps) else: two_positive = log_plus(chi, 2 * chi0 - log_2) two_negative = log_plus(chi0, 2 * chi - log_2) if two_positive <= two_negative: log_lps = log_minus(chi, chi0) + log_spread - trace(u"small loss crude log_lps", log_lps) + trace("small loss crude log_lps", log_lps) return log_lps two = log_minus(two_positive, two_negative) three_positive = log_plus(two_positive, 3 * chi - log_3) @@ -286,11 +312,11 @@ class PLRsearch: three = log_minus(three_positive, three_negative) if two == three: log_lps = two + log_spread - trace(u"small loss approx log_lps", log_lps) + trace("small loss approx log_lps", log_lps) else: log_lps = math.log(log_plus(0, chi) - log_plus(0, chi0)) log_lps += log_spread - trace(u"small loss direct log_lps", log_lps) + trace("small loss direct log_lps", log_lps) return log_lps @staticmethod @@ -329,26 +355,26 @@ class PLRsearch: # TODO: The stretch sign is just to have less minuses. Worth changing? chi = (mrr - load) / spread chi0 = mrr / spread - trace(u"Erf: load", load) - trace(u"mrr", mrr) - trace(u"spread", spread) - trace(u"chi", chi) - trace(u"chi0", chi0) + trace("Erf: load", load) + trace("mrr", mrr) + trace("spread", spread) + trace("chi", chi) + trace("chi0", chi0) if chi >= -1.0: - trace(u"positive, b roughly bigger than m", None) + trace("positive, b roughly bigger than m", None) if chi > math.exp(10): first = PLRsearch.log_xerfcx_10 + 2 * (math.log(chi) - 10) - trace(u"approximated first", first) + trace("approximated first", first) else: first = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi)) - trace(u"exact first", first) + trace("exact first", first) first -= chi * chi second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0)) second -= chi0 * chi0 intermediate = log_minus(first, second) - trace(u"first", first) + trace("first", first) else: - trace(u"negative, b roughly smaller than m", None) + trace("negative, b roughly smaller than m", None) exp_first = PLRsearch.xerfcx_limit + chi * erfcx(-chi) exp_first *= math.exp(-chi * chi) exp_first -= 2 * chi @@ -359,17 +385,17 @@ class PLRsearch: second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0)) second -= chi0 * chi0 intermediate = math.log(exp_first - math.exp(second)) - trace(u"exp_first", exp_first) - trace(u"second", second) - trace(u"intermediate", intermediate) + trace("exp_first", exp_first) + trace("second", second) + trace("intermediate", intermediate) result = intermediate + math.log(spread) - math.log(erfc(-chi0)) - trace(u"result", result) + trace("result", result) return result @staticmethod def find_critical_rate( - trace, lfit_func, min_rate, max_rate, loss_ratio_target, - mrr, spread): + trace, lfit_func, min_rate, max_rate, loss_ratio_target, mrr, spread + ): """Given ratio target and parameters, return the achieving offered load. This is basically an inverse function to lfit_func @@ -411,12 +437,12 @@ class PLRsearch: loss_rate = math.exp(lfit_func(trace, rate, mrr, spread)) loss_ratio = loss_rate / rate if loss_ratio > loss_ratio_target: - trace(u"halving down", rate) + trace("halving down", rate) rate_hi = rate elif loss_ratio < loss_ratio_target: - trace(u"halving up", rate) + trace("halving up", rate) rate_lo = rate - trace(u"found", rate) + trace("found", rate) return rate @staticmethod @@ -441,7 +467,7 @@ class PLRsearch: Instead, the expected average loss is scaled according to the number of packets actually sent. - TODO: Copy ReceiveRateMeasurement from MLRsearch. + TODO: Copy MeasurementResult from MLRsearch. :param trace: A multiprocessing-friendly logging function (closure). :param lfit_func: Fitting function, typically lfit_spread or lfit_erf. @@ -450,40 +476,47 @@ class PLRsearch: :param spread: The spread parameter for the fitting function. :type trace: function (str, object) -> None :type lfit_func: Function from 3 floats to float. - :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement + :type trial_result_list: list of MLRsearch.MeasurementResult :type mrr: float :type spread: float :returns: Logarithm of result weight for given function and parameters. :rtype: float """ log_likelihood = 0.0 - trace(u"log_weight for mrr", mrr) - trace(u"spread", spread) + trace("log_weight for mrr", mrr) + trace("spread", spread) for result in trial_result_list: - trace(u"for tr", result.target_tr) - trace(u"lc", result.loss_count) - trace(u"d", result.duration) - # _rel_ values use units of target_tr (transactions per second). + trace("for tr", result.intended_load) + trace("plc", result.plr_loss_count) + trace("d", result.intended_duration) + # _rel_ values use units of intended_load (transactions per second). log_avg_rel_loss_per_second = lfit_func( - trace, result.target_tr, mrr, spread + trace, result.intended_load, mrr, spread ) # _abs_ values use units of loss count (maybe packets). # There can be multiple packets per transaction. log_avg_abs_loss_per_trial = log_avg_rel_loss_per_second + math.log( - result.transmit_count / result.target_tr + result.offered_count / result.intended_load ) # Geometric probability computation for logarithms. log_trial_likelihood = log_plus(0.0, -log_avg_abs_loss_per_trial) - log_trial_likelihood *= -result.loss_count + log_trial_likelihood *= -result.plr_loss_count log_trial_likelihood -= log_plus(0.0, +log_avg_abs_loss_per_trial) log_likelihood += log_trial_likelihood - trace(u"avg_loss_per_trial", math.exp(log_avg_abs_loss_per_trial)) - trace(u"log_trial_likelihood", log_trial_likelihood) + trace("avg_loss_per_trial", math.exp(log_avg_abs_loss_per_trial)) + trace("log_trial_likelihood", log_trial_likelihood) return log_likelihood def measure_and_compute( - self, trial_duration, transmit_rate, trial_result_list, - min_rate, max_rate, focus_trackers=(None, None), max_samples=None): + self, + trial_duration, + transmit_rate, + trial_result_list, + min_rate, + max_rate, + focus_trackers=(None, None), + max_samples=None, + ): """Perform both measurement and computation at once. High level steps: Prepare and launch computation worker processes, @@ -524,7 +557,7 @@ class PLRsearch: :param max_samples: Limit for integrator samples, for debugging. :type trial_duration: float :type transmit_rate: float - :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement + :type trial_result_list: list of MLRsearch.MeasurementResult :type min_rate: float :type max_rate: float :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker @@ -572,7 +605,7 @@ class PLRsearch: # See https://stackoverflow.com/questions/15137292/large-objects-and-multiprocessing-pipes-and-send worker = multiprocessing.Process( target=Integrator.try_estimate_nd, - args=(worker_pipe_end, 5.0, self.trace_enabled) + args=(worker_pipe_end, 5.0, self.trace_enabled), ) worker.daemon = True worker.start() @@ -616,8 +649,13 @@ class PLRsearch: ) value = math.log( self.find_critical_rate( - trace, fitting_function, min_rate, max_rate, - self.packet_loss_ratio_target, mrr, spread + trace, + fitting_function, + min_rate, + max_rate, + self.packet_loss_ratio_target, + mrr, + spread, ) ) return value, logweight @@ -664,14 +702,18 @@ class PLRsearch: raise RuntimeError(f"Worker {name} did not finish!") result_or_traceback = pipe.recv() try: - value_tracker, focus_tracker, debug_list, trace_list, sampls = ( - result_or_traceback - ) - except ValueError: + ( + value_tracker, + focus_tracker, + debug_list, + trace_list, + sampls, + ) = result_or_traceback + except ValueError as exc: raise RuntimeError( f"Worker {name} failed with the following traceback:\n" f"{result_or_traceback}" - ) + ) from exc logging.info(f"Logs from worker {name!r}:") for message in debug_list: logging.info(message) @@ -682,8 +724,8 @@ class PLRsearch: ) return _PartialResult(value_tracker, focus_tracker, sampls) - stretch_result = stop_computing(u"stretch", stretch_pipe) - erf_result = stop_computing(u"erf", erf_pipe) + stretch_result = stop_computing("stretch", stretch_pipe) + erf_result = stop_computing("erf", erf_pipe) result = PLRsearch._get_result(measurement, stretch_result, erf_result) logging.info( f"measure_and_compute finished with trial result " @@ -705,7 +747,7 @@ class PLRsearch: :param measurement: The trial measurement obtained during computation. :param stretch_result: Computation output for stretch fitting function. :param erf_result: Computation output for erf fitting function. - :type measurement: ReceiveRateMeasurement + :type measurement: MeasurementResult :type stretch_result: _PartialResult :type erf_result: _PartialResult :returns: Combined results. @@ -730,7 +772,7 @@ class PLRsearch: # Named tuples, for multiple local variables to be passed as return value. _PartialResult = namedtuple( - u"_PartialResult", u"value_tracker focus_tracker samples" + "_PartialResult", "value_tracker focus_tracker samples" ) """Two stat trackers and sample counter. @@ -743,8 +785,8 @@ _PartialResult = namedtuple( """ _ComputeResult = namedtuple( - u"_ComputeResult", - u"measurement avg stdev stretch_exp_avg erf_exp_avg trackers" + "_ComputeResult", + "measurement avg stdev stretch_exp_avg erf_exp_avg trackers", ) """Measurement, 4 computation result values, pair of trackers. @@ -754,7 +796,7 @@ _ComputeResult = namedtuple( :param stretch_exp_avg: Stretch fitting function estimate average exponentiated. :param erf_exp_avg: Erf fitting function estimate average, exponentiated. :param trackers: Pair of focus trackers to start next iteration with. -:type measurement: ReceiveRateMeasurement +:type measurement: MeasurementResult :type avg: float :type stdev: float :type stretch_exp_avg: float