X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2FPLRsearch%2FPLRsearch.py;h=7599a9e64daf78cb6cc78e45433c3e8682fabd23;hb=e5dbe10d9599b9a53fa07e6fadfaf427ba6d69e3;hp=ce65fd2ec82548f6cc8233fd41483e27bce2f3da;hpb=72b45cfe662107c8e1bb549df71ba51352a898ee;p=csit.git diff --git a/resources/libraries/python/PLRsearch/PLRsearch.py b/resources/libraries/python/PLRsearch/PLRsearch.py index ce65fd2ec8..7599a9e64d 100644 --- a/resources/libraries/python/PLRsearch/PLRsearch.py +++ b/resources/libraries/python/PLRsearch/PLRsearch.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021 Cisco and/or its affiliates. +# Copyright (c) 2023 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -195,8 +195,8 @@ class PLRsearch: # exponential impact. Make it configurable, or is 4:3 good enough? if measurement.loss_ratio >= self.packet_loss_ratio_target: for _ in range(4 * zeros): - lossy_loads.append(measurement.target_tr) - if measurement.loss_count > 0: + lossy_loads.append(measurement.intended_load) + if measurement.loss_ratio > 0.0: zeros = 0 lossy_loads.sort() if stop_time <= time.time(): @@ -205,7 +205,7 @@ class PLRsearch: if (trial_number - self.trial_number_offset) <= 1: next_load = max_rate elif (trial_number - self.trial_number_offset) <= 3: - next_load = (measurement.relative_receive_rate / ( + next_load = (measurement.relative_forwarding_rate / ( 1.0 - self.packet_loss_ratio_target)) else: next_load = (avg1 + avg2) / 2.0 @@ -441,7 +441,7 @@ class PLRsearch: Instead, the expected average loss is scaled according to the number of packets actually sent. - TODO: Copy ReceiveRateMeasurement from MLRsearch. + TODO: Copy MeasurementResult from MLRsearch. :param trace: A multiprocessing-friendly logging function (closure). :param lfit_func: Fitting function, typically lfit_spread or lfit_erf. @@ -450,7 +450,7 @@ class PLRsearch: :param spread: The spread parameter for the fitting function. :type trace: function (str, object) -> None :type lfit_func: Function from 3 floats to float. - :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement + :type trial_result_list: list of MLRsearch.MeasurementResult :type mrr: float :type spread: float :returns: Logarithm of result weight for given function and parameters. @@ -460,17 +460,17 @@ class PLRsearch: trace(u"log_weight for mrr", mrr) trace(u"spread", spread) for result in trial_result_list: - trace(u"for tr", result.target_tr) + trace(u"for tr", result.intended_load) trace(u"lc", result.loss_count) - trace(u"d", result.duration) - # _rel_ values use units of target_tr (transactions per second). + trace(u"d", result.intended_duration) + # _rel_ values use units of intended_load (transactions per second). log_avg_rel_loss_per_second = lfit_func( - trace, result.target_tr, mrr, spread + trace, result.intended_load, mrr, spread ) # _abs_ values use units of loss count (maybe packets). # There can be multiple packets per transaction. log_avg_abs_loss_per_trial = log_avg_rel_loss_per_second + math.log( - result.transmit_count / result.target_tr + result.offered_count / result.intended_load ) # Geometric probability computation for logarithms. log_trial_likelihood = log_plus(0.0, -log_avg_abs_loss_per_trial) @@ -524,7 +524,7 @@ class PLRsearch: :param max_samples: Limit for integrator samples, for debugging. :type trial_duration: float :type transmit_rate: float - :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement + :type trial_result_list: list of MLRsearch.MeasurementResult :type min_rate: float :type max_rate: float :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker @@ -564,6 +564,20 @@ class PLRsearch: :rtype: multiprocessing.Connection """ + boss_pipe_end, worker_pipe_end = multiprocessing.Pipe() + # Starting the worker first. Contrary to documentation + # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection + # sending of large object without active listener on the other side + # results in a deadlock, not in a ValueError. + # See https://stackoverflow.com/questions/15137292/large-objects-and-multiprocessing-pipes-and-send + worker = multiprocessing.Process( + target=Integrator.try_estimate_nd, + args=(worker_pipe_end, 5.0, self.trace_enabled) + ) + worker.daemon = True + worker.start() + + # Only now it is safe to send the function to compute with. def value_logweight_func(trace, x_mrr, x_spread): """Return log of critical rate and log of likelihood. @@ -609,15 +623,6 @@ class PLRsearch: return value, logweight dilled_function = dill.dumps(value_logweight_func) - boss_pipe_end, worker_pipe_end = multiprocessing.Pipe() - # Do not send yet, run the worker first to avoid a deadlock. - # See https://stackoverflow.com/a/15716500 - worker = multiprocessing.Process( - target=Integrator.try_estimate_nd, - args=(worker_pipe_end, 10.0, self.trace_enabled) - ) - worker.daemon = True - worker.start() boss_pipe_end.send( (dimension, dilled_function, focus_tracker, max_samples) ) @@ -700,7 +705,7 @@ class PLRsearch: :param measurement: The trial measurement obtained during computation. :param stretch_result: Computation output for stretch fitting function. :param erf_result: Computation output for erf fitting function. - :type measurement: ReceiveRateMeasurement + :type measurement: MeasurementResult :type stretch_result: _PartialResult :type erf_result: _PartialResult :returns: Combined results. @@ -749,7 +754,7 @@ _ComputeResult = namedtuple( :param stretch_exp_avg: Stretch fitting function estimate average exponentiated. :param erf_exp_avg: Erf fitting function estimate average, exponentiated. :param trackers: Pair of focus trackers to start next iteration with. -:type measurement: ReceiveRateMeasurement +:type measurement: MeasurementResult :type avg: float :type stdev: float :type stretch_exp_avg: float