-# Copyright (c) 2020 Cisco and/or its affiliates.
+# Copyright (c) 2022 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
def __init__(
self, measurer, trial_duration_per_trial, packet_loss_ratio_target,
- trial_number_offset=0, timeout=1800.0, trace_enabled=False):
+ trial_number_offset=0, timeout=7200.0, trace_enabled=False):
"""Store rate measurer and additional parameters.
+ The measurer must never report negative loss count.
+
TODO: Copy AbstractMeasurer from MLRsearch.
:param measurer: The measurer to call when searching.
zeros += 1
# TODO: Ratio of fill rate to drain rate seems to have
# exponential impact. Make it configurable, or is 4:3 good enough?
- if measurement.loss_fraction >= self.packet_loss_ratio_target:
+ if measurement.loss_ratio >= self.packet_loss_ratio_target:
for _ in range(4 * zeros):
lossy_loads.append(measurement.target_tr)
if measurement.loss_count > 0:
if (trial_number - self.trial_number_offset) <= 1:
next_load = max_rate
elif (trial_number - self.trial_number_offset) <= 3:
- next_load = (measurement.receive_rate / (
+ next_load = (measurement.relative_receive_rate / (
1.0 - self.packet_loss_ratio_target))
else:
next_load = (avg1 + avg2) / 2.0
Integrator assumes uniform distribution, but over different parameters.
Weight and likelihood are used interchangeably here anyway.
- Each trial has an offered load, a duration and a loss count.
- Fitting function is used to compute the average loss per second.
- Poisson distribution (with average loss per trial) is used
+ Each trial has an intended load, a sent count and a loss count
+ (probably counting unsent packets as loss, as they signal
+ the load is too high for the traffic generator).
+ The fitting function is used to compute the average loss rate.
+ Geometric distribution (with average loss per trial) is used
to get likelihood of one trial result, the overal likelihood
is a product of all trial likelihoods.
As likelihoods can be extremely small, logarithms are tracked instead.
+ The current implementation does not use direct loss rate
+ from the fitting function, as the input and output units may not match
+ (e.g. intended load in TCP transactions, loss in packets).
+ Instead, the expected average loss is scaled according to the number
+ of packets actually sent.
+
TODO: Copy ReceiveRateMeasurement from MLRsearch.
:param trace: A multiprocessing-friendly logging function (closure).
:param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
:param trial_result_list: List of trial measurement results.
:param mrr: The mrr parameter for the fitting function.
- :param spread: The spread parameter for the fittinmg function.
+ :param spread: The spread parameter for the fitting function.
:type trace: function (str, object) -> None
:type lfit_func: Function from 3 floats to float.
:type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
trace(u"for tr", result.target_tr)
trace(u"lc", result.loss_count)
trace(u"d", result.duration)
- log_avg_loss_per_second = lfit_func(
+ # _rel_ values use units of target_tr (transactions per second).
+ log_avg_rel_loss_per_second = lfit_func(
trace, result.target_tr, mrr, spread
)
- log_avg_loss_per_trial = (
- log_avg_loss_per_second + math.log(result.duration)
- )
- # Poisson probability computation works nice for logarithms.
- log_trial_likelihood = (
- result.loss_count * log_avg_loss_per_trial
- - math.exp(log_avg_loss_per_trial)
+ # _abs_ values use units of loss count (maybe packets).
+ # There can be multiple packets per transaction.
+ log_avg_abs_loss_per_trial = log_avg_rel_loss_per_second + math.log(
+ result.transmit_count / result.target_tr
)
- log_trial_likelihood -= math.lgamma(1 + result.loss_count)
+ # Geometric probability computation for logarithms.
+ log_trial_likelihood = log_plus(0.0, -log_avg_abs_loss_per_trial)
+ log_trial_likelihood *= -result.loss_count
+ log_trial_likelihood -= log_plus(0.0, +log_avg_abs_loss_per_trial)
log_likelihood += log_trial_likelihood
- trace(u"avg_loss_per_trial", math.exp(log_avg_loss_per_trial))
+ trace(u"avg_loss_per_trial", math.exp(log_avg_abs_loss_per_trial))
trace(u"log_trial_likelihood", log_trial_likelihood)
return log_likelihood
:rtype: multiprocessing.Connection
"""
+ boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
+ # Starting the worker first. Contrary to documentation
+ # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection
+ # sending of large object without active listener on the other side
+ # results in a deadlock, not in a ValueError.
+ # See https://stackoverflow.com/questions/15137292/large-objects-and-multiprocessing-pipes-and-send
+ worker = multiprocessing.Process(
+ target=Integrator.try_estimate_nd,
+ args=(worker_pipe_end, 10.0, self.trace_enabled)
+ )
+ worker.daemon = True
+ worker.start()
+
+ # Only now it is safe to send the function to compute with.
def value_logweight_func(trace, x_mrr, x_spread):
"""Return log of critical rate and log of likelihood.
return value, logweight
dilled_function = dill.dumps(value_logweight_func)
- boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
boss_pipe_end.send(
(dimension, dilled_function, focus_tracker, max_samples)
)
- worker = multiprocessing.Process(
- target=Integrator.try_estimate_nd,
- args=(worker_pipe_end, 10.0, self.trace_enabled)
- )
- worker.daemon = True
- worker.start()
return boss_pipe_end
erf_pipe = start_computing(self.lfit_erf, erf_focus_tracker)