-# Copyright (c) 2019 Cisco and/or its affiliates.
+# Copyright (c) 2024 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
import multiprocessing
import time
+from collections import namedtuple
+
import dill
-# TODO: Inform pylint about scipy (of correct version) being available.
+
from scipy.special import erfcx, erfc
# TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
# then switch to absolute imports within PLRsearch package.
# Current usage of relative imports is just a short term workaround.
-import Integrator # pylint: disable=relative-import
-from log_plus import log_plus, log_minus # pylint: disable=relative-import
-import stat_trackers # pylint: disable=relative-import
+from . import Integrator
+from . import stat_trackers
+from .log_plus import log_plus, log_minus
-class PLRsearch(object):
+class PLRsearch:
"""A class to encapsulate data relevant for the search method.
The context is performance testing of packet processing systems.
Two constants are stored as class fields for speed.
- Method othed than search (and than __init__)
+ Method other than search (and than __init__)
are just internal code structure.
+
TODO: Those method names should start with underscore then.
"""
log_xerfcx_10 = math.log(xerfcx_limit - math.exp(10) * erfcx(math.exp(10)))
def __init__(
- self, measurer, trial_duration_per_trial, packet_loss_ratio_target,
- trial_number_offset=0, timeout=1800.0, trace_enabled=False):
+ self,
+ measurer,
+ trial_duration_per_trial,
+ packet_loss_ratio_target,
+ trial_number_offset=0,
+ timeout=7200.0,
+ trace_enabled=False,
+ ):
"""Store rate measurer and additional parameters.
+ The measurer must never report negative loss count.
+
TODO: Copy AbstractMeasurer from MLRsearch.
:param measurer: The measurer to call when searching.
:type min_rate: float
:type max_rate: float
:returns: Average and stdev of critical load estimate.
- :rtype: 2-tuple of floats
+ :rtype: 2-tuple of float
"""
stop_time = time.time() + self.timeout
min_rate = float(min_rate)
max_rate = float(max_rate)
- logging.info("Started search with min_rate %(min)r, max_rate %(max)r",
- {"min": min_rate, "max": max_rate})
+ logging.info(
+ f"Started search with min_rate {min_rate!r}, "
+ f"max_rate {max_rate!r}"
+ )
trial_result_list = list()
trial_number = self.trial_number_offset
focus_trackers = (None, None)
transmit_rate = (min_rate + max_rate) / 2.0
lossy_loads = [max_rate]
- zeros = [0, 0] # Cosecutive zero loss, separately for stretch and erf.
+ zeros = 0 # How many consecutive zero loss results are happening.
while 1:
trial_number += 1
- logging.info("Trial %(number)r", {"number": trial_number})
+ logging.info(f"Trial {trial_number!r}")
results = self.measure_and_compute(
- self.trial_duration_per_trial * trial_number, transmit_rate,
- trial_result_list, min_rate, max_rate, focus_trackers)
+ self.trial_duration_per_trial * trial_number,
+ transmit_rate,
+ trial_result_list,
+ min_rate,
+ max_rate,
+ focus_trackers,
+ )
measurement, average, stdev, avg1, avg2, focus_trackers = results
- index = trial_number % 2
- zeros[index] += 1
+ zeros += 1
# TODO: Ratio of fill rate to drain rate seems to have
# exponential impact. Make it configurable, or is 4:3 good enough?
- if measurement.loss_fraction >= self.packet_loss_ratio_target:
- for _ in range(4 * zeros[index]):
- lossy_loads.append(measurement.target_tr)
- if measurement.loss_count > 0:
- zeros[index] = 0
+ if measurement.loss_ratio >= self.packet_loss_ratio_target:
+ for _ in range(4 * zeros):
+ lossy_loads.append(measurement.intended_load)
+ if measurement.loss_ratio > 0.0:
+ zeros = 0
lossy_loads.sort()
if stop_time <= time.time():
return average, stdev
if (trial_number - self.trial_number_offset) <= 1:
next_load = max_rate
elif (trial_number - self.trial_number_offset) <= 3:
- next_load = (measurement.receive_rate / (
- 1.0 - self.packet_loss_ratio_target))
+ next_load = measurement.relative_forwarding_rate / (
+ 1.0 - self.packet_loss_ratio_target
+ )
else:
- index = (trial_number + 1) % 2
- next_load = (avg1, avg2)[index]
- if zeros[index] > 0:
+ next_load = (avg1 + avg2) / 2.0
+ if zeros > 0:
if lossy_loads[0] > next_load:
- diminisher = math.pow(2.0, 1 - zeros[index])
+ diminisher = math.pow(2.0, 1 - zeros)
next_load = lossy_loads[0] + diminisher * next_load
- next_load /= (1.0 + diminisher)
+ next_load /= 1.0 + diminisher
# On zero measurement, we need to drain obsoleted low losses
# even if we did not use them to increase next_load,
- # in order to get to usable loses with higher load.
+ # in order to get to usable loses at higher loads.
if len(lossy_loads) > 3:
lossy_loads = lossy_loads[3:]
- logging.debug("Zeros %(z)r orig %(o)r next %(n)r loads %(s)r",
- {"z": zeros, "o": (avg1, avg2)[index],
- "n": next_load, "s": lossy_loads})
+ logging.debug(
+ f"Zeros {zeros!r} orig {(avg1 + avg2) / 2.0!r} "
+ f"next {next_load!r} loads {lossy_loads!r}"
+ )
transmit_rate = min(max_rate, max(min_rate, next_load))
@staticmethod
trace("chi0", chi0)
if chi > 0:
log_lps = math.log(
- load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread)
+ load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread
+ )
trace("big loss direct log_lps", log_lps)
else:
two_positive = log_plus(chi, 2 * chi0 - log_2)
@staticmethod
def find_critical_rate(
- trace, lfit_func, min_rate, max_rate, loss_ratio_target,
- mrr, spread):
+ trace, lfit_func, min_rate, max_rate, loss_ratio_target, mrr, spread
+ ):
"""Given ratio target and parameters, return the achieving offered load.
This is basically an inverse function to lfit_func
:type lfit_func: Function from 3 floats to float.
:type min_rate: float
:type max_rate: float
- :type log_lps_target: float
+ :type loss_ratio_target: float
:type mrr: float
:type spread: float
:returns: Load [pps] which achieves the target with given parameters.
loss_ratio = -1
while loss_ratio != loss_ratio_target:
rate = (rate_hi + rate_lo) / 2.0
- if rate == rate_hi or rate == rate_lo:
+ if rate in (rate_hi, rate_lo):
break
loss_rate = math.exp(lfit_func(trace, rate, mrr, spread))
loss_ratio = loss_rate / rate
Integrator assumes uniform distribution, but over different parameters.
Weight and likelihood are used interchangeably here anyway.
- Each trial has an offered load, a duration and a loss count.
- Fitting function is used to compute the average loss per second.
- Poisson distribution (with average loss per trial) is used
+ Each trial has an intended load, a sent count and a loss count
+ (probably counting unsent packets as loss, as they signal
+ the load is too high for the traffic generator).
+ The fitting function is used to compute the average loss rate.
+ Geometric distribution (with average loss per trial) is used
to get likelihood of one trial result, the overal likelihood
is a product of all trial likelihoods.
As likelihoods can be extremely small, logarithms are tracked instead.
- TODO: Copy ReceiveRateMeasurement from MLRsearch.
+ The current implementation does not use direct loss rate
+ from the fitting function, as the input and output units may not match
+ (e.g. intended load in TCP transactions, loss in packets).
+ Instead, the expected average loss is scaled according to the number
+ of packets actually sent.
+
+ TODO: Copy MeasurementResult from MLRsearch.
:param trace: A multiprocessing-friendly logging function (closure).
:param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
- :param result_list: List of trial measurement results.
+ :param trial_result_list: List of trial measurement results.
:param mrr: The mrr parameter for the fitting function.
- :param spread: The spread parameter for the fittinmg function.
+ :param spread: The spread parameter for the fitting function.
:type trace: function (str, object) -> None
:type lfit_func: Function from 3 floats to float.
- :type result_list: list of MLRsearch.ReceiveRateMeasurement
+ :type trial_result_list: list of MLRsearch.MeasurementResult
:type mrr: float
:type spread: float
:returns: Logarithm of result weight for given function and parameters.
trace("log_weight for mrr", mrr)
trace("spread", spread)
for result in trial_result_list:
- trace("for tr", result.target_tr)
+ trace("for tr", result.intended_load)
trace("lc", result.loss_count)
- trace("d", result.duration)
- log_avg_loss_per_second = lfit_func(
- trace, result.target_tr, mrr, spread)
- log_avg_loss_per_trial = (
- log_avg_loss_per_second + math.log(result.duration))
- # Poisson probability computation works nice for logarithms.
- log_trial_likelihood = (
- result.loss_count * log_avg_loss_per_trial
- - math.exp(log_avg_loss_per_trial))
- log_trial_likelihood -= math.lgamma(1 + result.loss_count)
+ trace("d", result.intended_duration)
+ # _rel_ values use units of intended_load (transactions per second).
+ log_avg_rel_loss_per_second = lfit_func(
+ trace, result.intended_load, mrr, spread
+ )
+ # _abs_ values use units of loss count (maybe packets).
+ # There can be multiple packets per transaction.
+ log_avg_abs_loss_per_trial = log_avg_rel_loss_per_second + math.log(
+ result.offered_count / result.intended_load
+ )
+ # Geometric probability computation for logarithms.
+ log_trial_likelihood = log_plus(0.0, -log_avg_abs_loss_per_trial)
+ log_trial_likelihood *= -result.loss_count
+ log_trial_likelihood -= log_plus(0.0, +log_avg_abs_loss_per_trial)
log_likelihood += log_trial_likelihood
- trace("avg_loss_per_trial", math.exp(log_avg_loss_per_trial))
+ trace("avg_loss_per_trial", math.exp(log_avg_abs_loss_per_trial))
trace("log_trial_likelihood", log_trial_likelihood)
return log_likelihood
- # TODO: Refactor (somehow) so pylint stops complaining about
- # too many local variables.
def measure_and_compute(
- self, trial_duration, transmit_rate, trial_result_list,
- min_rate, max_rate, focus_trackers=(None, None), max_samples=None):
+ self,
+ trial_duration,
+ transmit_rate,
+ trial_result_list,
+ min_rate,
+ max_rate,
+ focus_trackers=(None, None),
+ max_samples=None,
+ ):
"""Perform both measurement and computation at once.
High level steps: Prepare and launch computation worker processes,
:param max_samples: Limit for integrator samples, for debugging.
:type trial_duration: float
:type transmit_rate: float
- :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
+ :type trial_result_list: list of MLRsearch.MeasurementResult
:type min_rate: float
:type max_rate: float
:type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker
:type max_samples: None or int
:returns: Measurement and computation results.
- :rtype: 6-tuple: ReceiveRateMeasurement, 4 floats, 2-tuple of trackers.
+ :rtype: _ComputeResult
"""
logging.debug(
- "measure_and_compute started with self %(self)r, trial_duration "
- + "%(dur)r, transmit_rate %(tr)r, trial_result_list %(trl)r, "
- + "max_rate %(mr)r, focus_trackers %(track)r, max_samples %(ms)r",
- {"self": self, "dur": trial_duration, "tr": transmit_rate,
- "trl": trial_result_list, "mr": max_rate, "track": focus_trackers,
- "ms": max_samples})
+ f"measure_and_compute started with self {self!r}, trial_duration "
+ f"{trial_duration!r}, transmit_rate {transmit_rate!r}, "
+ f"trial_result_list {trial_result_list!r}, max_rate {max_rate!r}, "
+ f"focus_trackers {focus_trackers!r}, max_samples {max_samples!r}"
+ )
# Preparation phase.
dimension = 2
stretch_focus_tracker, erf_focus_tracker = focus_trackers
erf_focus_tracker = stat_trackers.VectorStatTracker(dimension)
erf_focus_tracker.unit_reset()
old_trackers = stretch_focus_tracker.copy(), erf_focus_tracker.copy()
+
def start_computing(fitting_function, focus_tracker):
"""Just a block of code to be used for each fitting function.
start computation, return the boss pipe end.
:param fitting_function: lfit_erf or lfit_stretch.
- :param bias_avg: Tuple of floats to start searching around.
- :param bias_cov: Covariance matrix defining initial focus shape.
+ :param focus_tracker: Tracker initialized to speed up the numeric
+ computation.
:type fitting_function: Function from 3 floats to float.
- :type bias_avg: 2-tuple of floats
- :type bias_cov: 2-tuple of 2-tuples of floats
+ :type focus_tracker: None or stat_trackers.VectorStatTracker
:returns: Boss end of communication pipe.
:rtype: multiprocessing.Connection
"""
+
+ boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
+ # Starting the worker first. Contrary to documentation
+ # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection
+ # sending of large object without active listener on the other side
+ # results in a deadlock, not in a ValueError.
+ # See https://stackoverflow.com/questions/15137292/large-objects-and-multiprocessing-pipes-and-send
+ worker = multiprocessing.Process(
+ target=Integrator.try_estimate_nd,
+ args=(worker_pipe_end, 5.0, self.trace_enabled),
+ )
+ worker.daemon = True
+ worker.start()
+
+ # Only now it is safe to send the function to compute with.
def value_logweight_func(trace, x_mrr, x_spread):
"""Return log of critical rate and log of likelihood.
mrr = max_rate * (1.0 / (x_mrr + 1.0) - 0.5) + 1.0
spread = math.exp((x_spread + 1.0) / 2.0 * math.log(mrr))
logweight = self.log_weight(
- trace, fitting_function, trial_result_list, mrr, spread)
- value = math.log(self.find_critical_rate(
- trace, fitting_function, min_rate, max_rate,
- self.packet_loss_ratio_target, mrr, spread))
+ trace, fitting_function, trial_result_list, mrr, spread
+ )
+ value = math.log(
+ self.find_critical_rate(
+ trace,
+ fitting_function,
+ min_rate,
+ max_rate,
+ self.packet_loss_ratio_target,
+ mrr,
+ spread,
+ )
+ )
return value, logweight
+
dilled_function = dill.dumps(value_logweight_func)
- boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
boss_pipe_end.send(
- (dimension, dilled_function, focus_tracker, max_samples))
- worker = multiprocessing.Process(
- target=Integrator.try_estimate_nd, args=(
- worker_pipe_end, 10.0, self.trace_enabled))
- worker.daemon = True
- worker.start()
+ (dimension, dilled_function, focus_tracker, max_samples)
+ )
return boss_pipe_end
- erf_pipe = start_computing(
- self.lfit_erf, erf_focus_tracker)
- stretch_pipe = start_computing(
- self.lfit_stretch, stretch_focus_tracker)
+
+ erf_pipe = start_computing(self.lfit_erf, erf_focus_tracker)
+ stretch_pipe = start_computing(self.lfit_stretch, stretch_focus_tracker)
+
# Measurement phase.
measurement = self.measurer.measure(trial_duration, transmit_rate)
+
# Processing phase.
def stop_computing(name, pipe):
"""Just a block of code to be used for each worker.
:type pipe: multiprocessing.Connection
:returns: Computed value tracker, actual focus tracker,
and number of samples used for this iteration.
- :rtype: 3-tuple of tracker, tracker and int
+ :rtype: _PartialResult
"""
- pipe.send(None)
+ # If worker encountered an exception, we get it in the recv below,
+ # but send will report a broken pipe.
+ # EAFP says we should ignore the error (instead of polling first).
+ # https://devblogs.microsoft.com/python
+ # /idiomatic-python-eafp-versus-lbyl/
+ try:
+ pipe.send(None)
+ except BrokenPipeError:
+ pass
if not pipe.poll(10.0):
- raise RuntimeError(
- "Worker {name} did not finish!".format(name=name))
+ raise RuntimeError(f"Worker {name} did not finish!")
result_or_traceback = pipe.recv()
try:
- value_tracker, focus_tracker, debug_list, trace_list, sampls = (
- result_or_traceback)
+ (
+ value_tracker,
+ focus_tracker,
+ debug_list,
+ trace_list,
+ sampls,
+ ) = result_or_traceback
except ValueError:
raise RuntimeError(
- "Worker {name} failed with the following traceback:\n{tr}"
- .format(name=name, tr=result_or_traceback))
- logging.info("Logs from worker %(name)r:", {"name": name})
+ f"Worker {name} failed with the following traceback:\n"
+ f"{result_or_traceback}"
+ )
+ logging.info(f"Logs from worker {name!r}:")
for message in debug_list:
logging.info(message)
for message in trace_list:
logging.debug(message)
- logging.debug("trackers: value %(val)r focus %(foc)r", {
- "val": value_tracker, "foc": focus_tracker})
- return value_tracker, focus_tracker, sampls
- stretch_value_tracker, stretch_focus_tracker, stretch_samples = (
- stop_computing("stretch", stretch_pipe))
- erf_value_tracker, erf_focus_tracker, erf_samples = (
- stop_computing("erf", erf_pipe))
- stretch_avg = stretch_value_tracker.average
- erf_avg = erf_value_tracker.average
- # TODO: Take into account secondary stats.
- stretch_stdev = math.exp(stretch_value_tracker.log_variance / 2)
- erf_stdev = math.exp(erf_value_tracker.log_variance / 2)
- avg = math.exp((stretch_avg + erf_avg) / 2.0)
- var = (stretch_stdev * stretch_stdev + erf_stdev * erf_stdev) / 2.0
- var += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
- stdev = avg * math.sqrt(var)
- focus_trackers = (stretch_focus_tracker, erf_focus_tracker)
+ logging.debug(
+ f"trackers: value {value_tracker!r} focus {focus_tracker!r}"
+ )
+ return _PartialResult(value_tracker, focus_tracker, sampls)
+
+ stretch_result = stop_computing("stretch", stretch_pipe)
+ erf_result = stop_computing("erf", erf_pipe)
+ result = PLRsearch._get_result(measurement, stretch_result, erf_result)
logging.info(
- "measure_and_compute finished with trial result %(res)r "
- "avg %(avg)r stdev %(stdev)r stretch %(a1)r erf %(a2)r "
- "new trackers %(nt)r old trackers %(ot)r stretch samples %(ss)r "
- "erf samples %(es)r",
- {"res": measurement, "avg": avg, "stdev": stdev,
- "a1": math.exp(stretch_avg), "a2": math.exp(erf_avg),
- "nt": focus_trackers, "ot": old_trackers, "ss": stretch_samples,
- "es": erf_samples})
- return (
- measurement, avg, stdev, math.exp(stretch_avg),
- math.exp(erf_avg), focus_trackers)
+ f"measure_and_compute finished with trial result "
+ f"{result.measurement!r} avg {result.avg!r} stdev {result.stdev!r} "
+ f"stretch {result.stretch_exp_avg!r} erf {result.erf_exp_avg!r} "
+ f"new trackers {result.trackers!r} old trackers {old_trackers!r} "
+ f"stretch samples {stretch_result.samples!r} erf samples "
+ f"{erf_result.samples!r}"
+ )
+ return result
+
+ @staticmethod
+ def _get_result(measurement, stretch_result, erf_result):
+ """Process and collate results from measure_and_compute.
+
+ Turn logarithm based values to exponential ones,
+ combine averages and stdevs of two fitting functions into a whole.
+
+ :param measurement: The trial measurement obtained during computation.
+ :param stretch_result: Computation output for stretch fitting function.
+ :param erf_result: Computation output for erf fitting function.
+ :type measurement: MeasurementResult
+ :type stretch_result: _PartialResult
+ :type erf_result: _PartialResult
+ :returns: Combined results.
+ :rtype: _ComputeResult
+ """
+ stretch_avg = stretch_result.value_tracker.average
+ erf_avg = erf_result.value_tracker.average
+ stretch_var = stretch_result.value_tracker.get_pessimistic_variance()
+ erf_var = erf_result.value_tracker.get_pessimistic_variance()
+ avg_log = (stretch_avg + erf_avg) / 2.0
+ var_log = (stretch_var + erf_var) / 2.0
+ var_log += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
+ stdev_log = math.sqrt(var_log)
+ low, upp = math.exp(avg_log - stdev_log), math.exp(avg_log + stdev_log)
+ avg = (low + upp) / 2
+ stdev = avg - low
+ trackers = (stretch_result.focus_tracker, erf_result.focus_tracker)
+ sea = math.exp(stretch_avg)
+ eea = math.exp(erf_avg)
+ return _ComputeResult(measurement, avg, stdev, sea, eea, trackers)
+
+
+# Named tuples, for multiple local variables to be passed as return value.
+_PartialResult = namedtuple(
+ "_PartialResult", "value_tracker focus_tracker samples"
+)
+"""Two stat trackers and sample counter.
+
+:param value_tracker: Tracker for the value (critical load) being integrated.
+:param focus_tracker: Tracker for focusing integration inputs (sample points).
+:param samples: How many samples were used for the computation.
+:type value_tracker: stat_trackers.ScalarDualStatTracker
+:type focus_tracker: stat_trackers.VectorStatTracker
+:type samples: int
+"""
+
+_ComputeResult = namedtuple(
+ "_ComputeResult",
+ "measurement avg stdev stretch_exp_avg erf_exp_avg trackers",
+)
+"""Measurement, 4 computation result values, pair of trackers.
+
+:param measurement: The trial measurement result obtained during computation.
+:param avg: Overall average of critical rate estimate.
+:param stdev: Overall standard deviation of critical rate estimate.
+:param stretch_exp_avg: Stretch fitting function estimate average exponentiated.
+:param erf_exp_avg: Erf fitting function estimate average, exponentiated.
+:param trackers: Pair of focus trackers to start next iteration with.
+:type measurement: MeasurementResult
+:type avg: float
+:type stdev: float
+:type stretch_exp_avg: float
+:type erf_exp_avg: float
+:type trackers: 2-tuple of stat_trackers.VectorStatTracker
+"""