X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2FMLRsearch%2FMultipleLossRatioSearch.py;fp=resources%2Flibraries%2Fpython%2FMLRsearch%2FMultipleLossRatioSearch.py;h=0000000000000000000000000000000000000000;hb=e5dbe10d9599b9a53fa07e6fadfaf427ba6d69e3;hp=0e6c8cfa58d2ccc5148b0876c99b27c3a8478fb7;hpb=c6dfb6c09c5dafd1d522f96b4b86c5ec5efc1c83;p=csit.git diff --git a/resources/libraries/python/MLRsearch/MultipleLossRatioSearch.py b/resources/libraries/python/MLRsearch/MultipleLossRatioSearch.py deleted file mode 100644 index 0e6c8cfa58..0000000000 --- a/resources/libraries/python/MLRsearch/MultipleLossRatioSearch.py +++ /dev/null @@ -1,485 +0,0 @@ -# Copyright (c) 2021 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Module defining MultipleLossRatioSearch class.""" - -import logging -import math -import time - -from .MeasurementDatabase import MeasurementDatabase -from .ProgressState import ProgressState -from .ReceiveRateInterval import ReceiveRateInterval -from .WidthArithmetics import ( - multiply_relative_width, - step_down, - step_up, - multiple_step_down, - multiple_step_up, - half_step_up, -) - - -class MultipleLossRatioSearch: - """Optimized binary search algorithm for finding bounds for multiple ratios. - - This is unofficially a subclass of AbstractSearchAlgorithm, - but constructor signature is different. - - Traditional binary search algorithm needs initial interval - (lower and upper bound), and returns final interval after bisecting - (until some exit condition is met). - The exit condition is usually related to the interval width, - (upper bound value minus lower bound value). - - The optimized algorithm contains several improvements - aimed to reduce overall search time. - - One improvement is searching for multiple intervals at once. - The intervals differ by the target loss ratio. Lower bound - has to have equal or smaller loss ratio, upper bound has to have larger. - - Next improvement is that the initial interval does not need to be valid. - Imagine initial interval (10, 11) where loss at 11 is smaller - than the searched ratio. - The algorithm will try (11, 13) interval next, and if 13 is still smaller, - (13, 17) and so on, doubling width until the upper bound is valid. - The part when interval expands is called external search, - the part when interval is bisected is called internal search. - - Next improvement is that trial measurements at small trial duration - can be used to find a reasonable interval for full trial duration search. - This results in more trials performed, but smaller overall duration - in general. - - Next improvement is bisecting in logarithmic quantities, - so that exit criteria can be independent of measurement units. - - Next improvement is basing the initial interval on receive rates. - - Final improvement is exiting early if the minimal value - is not a valid lower bound. - - The complete search consist of several phases, - each phase performing several trial measurements. - Initial phase creates initial interval based on receive rates - at maximum rate and at maximum receive rate (MRR). - Final phase and preceding intermediate phases are performing - external and internal search steps, - each resulting interval is the starting point for the next phase. - The resulting intervals of final phase is the result of the whole algorithm. - - Each non-initial phase uses its own trial duration. - Any non-initial phase stops searching (for all ratios independently) - when minimum is not a valid lower bound (at current duration), - or all of the following is true: - Both bounds are valid, bounds are measured at the current phase - trial duration, interval width is less than the width goal - for current phase. - - TODO: Review and update this docstring according to rst docs. - """ - - def __init__( - self, measurer, final_relative_width=0.005, - final_trial_duration=30.0, initial_trial_duration=1.0, - number_of_intermediate_phases=2, timeout=600.0, debug=None, - expansion_coefficient=2.0): - """Store the measurer object and additional arguments. - - :param measurer: Rate provider to use by this search object. - :param final_relative_width: Final lower bound transmit rate - cannot be more distant that this multiple of upper bound [1]. - :param final_trial_duration: Trial duration for the final phase [s]. - :param initial_trial_duration: Trial duration for the initial phase - and also for the first intermediate phase [s]. - :param number_of_intermediate_phases: Number of intermediate phases - to perform before the final phase [1]. - :param timeout: The search will fail itself when not finished - before this overall time [s]. - :param debug: Callable to use instead of logging.debug(). - :param expansion_coefficient: External search multiplies width by this. - :type measurer: AbstractMeasurer.AbstractMeasurer - :type final_relative_width: float - :type final_trial_duration: float - :type initial_trial_duration: float - :type number_of_intermediate_phases: int - :type timeout: float - :type debug: Optional[Callable[[str], None]] - :type expansion_coefficient: float - """ - self.measurer = measurer - self.final_trial_duration = float(final_trial_duration) - self.final_relative_width = float(final_relative_width) - self.number_of_intermediate_phases = int(number_of_intermediate_phases) - self.initial_trial_duration = float(initial_trial_duration) - self.timeout = float(timeout) - self.state = None - self.debug = logging.debug if debug is None else debug - self.expansion_coefficient = float(expansion_coefficient) - - def narrow_down_intervals(self, min_rate, max_rate, packet_loss_ratios): - """Perform initial phase, create state object, proceed with next phases. - - The current implementation requires the ratios so be unique and sorted. - Also non-empty. - - :param min_rate: Minimal target transmit rate [tps]. - :param max_rate: Maximal target transmit rate [tps]. - :param packet_loss_ratios: Target ratios of packets loss to locate. - :type min_rate: float - :type max_rate: float - :type packet_loss_ratios: Iterable[float] - :returns: Structure containing narrowed down intervals - and their measurements. - :rtype: List[ReceiveRateInterval] - :raises RuntimeError: If total duration is larger than timeout. - Or if ratios list is (empty or) not sorted or unique. - """ - min_rate = float(min_rate) - max_rate = float(max_rate) - packet_loss_ratios = [float(ratio) for ratio in packet_loss_ratios] - if len(packet_loss_ratios) < 1: - raise RuntimeError(u"At least one ratio is required!") - if packet_loss_ratios != sorted(set(packet_loss_ratios)): - raise RuntimeError(u"Input ratios have to be sorted and unique!") - measurements = list() - self.debug(f"First measurement at max rate: {max_rate}") - measured = self.measurer.measure( - duration=self.initial_trial_duration, - transmit_rate=max_rate, - ) - measurements.append(measured) - initial_width_goal = self.final_relative_width - for _ in range(self.number_of_intermediate_phases): - initial_width_goal = multiply_relative_width( - initial_width_goal, 2.0 - ) - max_lo = step_down(max_rate, initial_width_goal) - mrr = max(min_rate, min(max_lo, measured.relative_receive_rate)) - self.debug(f"Second measurement at mrr: {mrr}") - measured = self.measurer.measure( - duration=self.initial_trial_duration, - transmit_rate=mrr, - ) - measurements.append(measured) - # Attempt to get narrower width. - if measured.loss_ratio > packet_loss_ratios[0]: - max_lo = step_down(mrr, initial_width_goal) - mrr2 = min(max_lo, measured.relative_receive_rate) - else: - mrr2 = step_up(mrr, initial_width_goal) - if min_rate < mrr2 < max_rate: - self.debug(f"Third measurement at mrr2: {mrr2}") - measured = self.measurer.measure( - duration=self.initial_trial_duration, - transmit_rate=mrr2, - ) - measurements.append(measured) - # If mrr2 > mrr and mrr2 got zero loss, - # it is better to do external search from mrr2 up. - # To prevent bisection between mrr2 and max_rate, - # we simply remove the max_rate measurement. - # Similar logic applies to higher loss ratio goals. - # Overall, with mrr2 measurement done, we never need - # the first measurement done at max rate. - measurements = measurements[1:] - database = MeasurementDatabase(measurements) - stop_time = time.monotonic() + self.timeout - self.state = ProgressState( - database, self.number_of_intermediate_phases, - self.final_trial_duration, self.final_relative_width, - packet_loss_ratios, min_rate, max_rate, stop_time - ) - self.ndrpdr() - return self.state.database.get_results(ratio_list=packet_loss_ratios) - - def ndrpdr(self): - """Perform trials for this phase. State is updated in-place. - - Recursion to smaller durations is performed (if not performed yet). - - :raises RuntimeError: If total duration is larger than timeout. - """ - state = self.state - if state.phases > 0: - # We need to finish preceding intermediate phases first. - saved_phases = state.phases - state.phases -= 1 - # Preceding phases have shorter duration. - saved_duration = state.duration - duration_multiplier = state.duration / self.initial_trial_duration - phase_exponent = float(state.phases) / saved_phases - state.duration = self.initial_trial_duration * math.pow( - duration_multiplier, phase_exponent - ) - # Shorter durations do not need that narrow widths. - saved_width = state.width_goal - state.width_goal = multiply_relative_width(saved_width, 2.0) - # Recurse. - self.ndrpdr() - # Restore the state for current phase. - state.width_goal = saved_width - state.duration = saved_duration - state.phases = saved_phases # Not needed, but just in case. - self.debug( - f"Starting phase with {state.duration} duration" - f" and {state.width_goal} relative width goal." - ) - failing_fast = False - database = state.database - database.set_current_duration(state.duration) - while time.monotonic() < state.stop_time: - for index, ratio in enumerate(state.packet_loss_ratios): - new_tr = self._select_for_ratio(ratio) - if new_tr is None: - # Either this ratio is fine, or min rate got invalid result. - # If fine, we will continue to handle next ratio. - if index > 0: - # First ratio passed, all next have a valid lower bound. - continue - lower_bound, _, _, _, _, _ = database.get_bounds(ratio) - if lower_bound is None: - failing_fast = True - self.debug(u"No valid lower bound for this iteration.") - break - # First ratio is fine. - continue - # We have transmit rate to measure at. - # We do not check duration versus stop_time here, - # as some measurers can be unpredictably faster - # than what duration suggests. - measurement = self.measurer.measure( - duration=state.duration, - transmit_rate=new_tr, - ) - database.add(measurement) - # Restart ratio handling on updated database. - break - else: - # No ratio needs measuring, we are done with this phase. - self.debug(u"Phase done.") - break - # We have broken out of the for loop. - if failing_fast: - # Abort the while loop early. - break - # Not failing fast but database got updated, restart the while loop. - else: - # Time is up. - raise RuntimeError(u"Optimized search takes too long.") - # Min rate is not valid, but returning what we have - # so next duration can recover. - - @staticmethod - def improves(new_bound, lower_bound, upper_bound): - """Return whether new bound improves upon old bounds. - - To improve, new_bound has to be not None, - and between the old bounds (where the bound is not None). - - This piece of logic is commonly used, when we know old bounds - from a primary source (e.g. current duration database) - and new bound from a secondary source (e.g. previous duration database). - Having a function allows "if improves(..):" construction to save space. - - :param new_bound: The bound we consider applying. - :param lower_bound: Known bound, new_bound has to be higher to apply. - :param upper_bound: Known bound, new_bound has to be lower to apply. - :type new_bound: Optional[ReceiveRateMeasurement] - :type lower_bound: Optional[ReceiveRateMeasurement] - :type upper_bound: Optional[ReceiveRateMeasurement] - :returns: Whether we can apply the new bound. - :rtype: bool - """ - if new_bound is None: - return False - if lower_bound is not None: - if new_bound.target_tr <= lower_bound.target_tr: - return False - if upper_bound is not None: - if new_bound.target_tr >= upper_bound.target_tr: - return False - return True - - def _select_for_ratio(self, ratio): - """Return None or new target_tr to measure at. - - Returning None means either we have narrow enough valid interval - for this ratio, or we are hitting min rate and should fail early. - - :param ratio: Loss ratio to ensure narrow valid bounds for. - :type ratio: float - :returns: The next target transmit rate to measure at. - :rtype: Optional[float] - :raises RuntimeError: If database inconsistency is detected. - """ - state = self.state - data = state.database - bounds = data.get_bounds(ratio) - cur_lo1, cur_hi1, pre_lo, pre_hi, cur_lo2, cur_hi2 = bounds - pre_lo_improves = self.improves(pre_lo, cur_lo1, cur_hi1) - pre_hi_improves = self.improves(pre_hi, cur_lo1, cur_hi1) - # TODO: Detect also the other case for initial bisect, see below. - if pre_lo_improves and pre_hi_improves: - # We allowed larger width for previous phase - # as single bisect here guarantees only one re-measurement. - new_tr = self._bisect(pre_lo, pre_hi) - if new_tr is not None: - self.debug(f"Initial bisect for {ratio}, tr: {new_tr}") - return new_tr - if pre_lo_improves: - new_tr = pre_lo.target_tr - self.debug(f"Re-measuring lower bound for {ratio}, tr: {new_tr}") - return new_tr - if pre_hi_improves: - # This can also happen when we did not do initial bisect - # for this ratio yet, but the previous duration lower bound - # for this ratio got already re-measured as previous duration - # upper bound for previous ratio. - new_tr = pre_hi.target_tr - self.debug(f"Re-measuring upper bound for {ratio}, tr: {new_tr}") - return new_tr - if cur_lo1 is None and cur_hi1 is None: - raise RuntimeError(u"No results found in databases!") - if cur_lo1 is None: - # Upper bound exists (cur_hi1). - # We already tried previous lower bound. - # So, we want to extend down. - new_tr = self._extend_down( - cur_hi1, cur_hi2, pre_hi, second_needed=False - ) - self.debug( - f"Extending down for {ratio}:" - f" old {cur_hi1.target_tr} new {new_tr}" - ) - return new_tr - if cur_hi1 is None: - # Lower bound exists (cur_lo1). - # We already tried previous upper bound. - # So, we want to extend up. - new_tr = self._extend_up(cur_lo1, cur_lo2, pre_lo) - self.debug( - f"Extending up for {ratio}:" - f" old {cur_lo1.target_tr} new {new_tr}" - ) - return new_tr - # Both bounds exist (cur_lo1 and cur_hi1). - # cur_lo1 might have been selected for this ratio (we are bisecting) - # or for previous ratio (we are extending down for this ratio). - # Compute both estimates and choose the higher value. - bisected_tr = self._bisect(cur_lo1, cur_hi1) - extended_tr = self._extend_down( - cur_hi1, cur_hi2, pre_hi, second_needed=True - ) - # Only if both are not None we need to decide. - if bisected_tr and extended_tr and extended_tr > bisected_tr: - self.debug( - f"Extending down for {ratio}:" - f" old {cur_hi1.target_tr} new {extended_tr}" - ) - new_tr = extended_tr - else: - self.debug( - f"Bisecting for {ratio}: lower {cur_lo1.target_tr}," - f" upper {cur_hi1.target_tr}, new {bisected_tr}" - ) - new_tr = bisected_tr - return new_tr - - def _extend_down(self, cur_hi1, cur_hi2, pre_hi, second_needed=False): - """Return extended width below, or None if hitting min rate. - - If no second tightest (nor previous) upper bound is available, - the behavior is governed by second_needed argument. - If true, return None. If false, start from width goal. - This is useful, as if a bisect is possible, - we want to give it a chance. - - :param cur_hi1: Tightest upper bound for current duration. Has to exist. - :param cur_hi2: Second tightest current upper bound, may not exist. - :param pre_hi: Tightest upper bound, previous duration, may not exist. - :param second_needed: Whether second tightest bound is required. - :type cur_hi1: ReceiveRateMeasurement - :type cur_hi2: Optional[ReceiveRateMeasurement] - :type pre_hi: Optional[ReceiveRateMeasurement] - :type second_needed: bool - :returns: The next target transmit rate to measure at. - :rtype: Optional[float] - """ - state = self.state - old_tr = cur_hi1.target_tr - if state.min_rate >= old_tr: - self.debug(u"Extend down hits min rate.") - return None - next_bound = cur_hi2 - if self.improves(pre_hi, cur_hi1, cur_hi2): - next_bound = pre_hi - if next_bound is None and second_needed: - return None - old_width = state.width_goal - if next_bound is not None: - old_width = ReceiveRateInterval(cur_hi1, next_bound).rel_tr_width - old_width = max(old_width, state.width_goal) - new_tr = multiple_step_down( - old_tr, old_width, self.expansion_coefficient - ) - new_tr = max(new_tr, state.min_rate) - return new_tr - - def _extend_up(self, cur_lo1, cur_lo2, pre_lo): - """Return extended width above, or None if hitting max rate. - - :param cur_lo1: Tightest lower bound for current duration. Has to exist. - :param cur_lo2: Second tightest current lower bound, may not exist. - :param pre_lo: Tightest lower bound, previous duration, may not exist. - :type cur_lo1: ReceiveRateMeasurement - :type cur_lo2: Optional[ReceiveRateMeasurement] - :type pre_lo: Optional[ReceiveRateMeasurement] - :returns: The next target transmit rate to measure at. - :rtype: Optional[float] - """ - state = self.state - old_tr = cur_lo1.target_tr - if state.max_rate <= old_tr: - self.debug(u"Extend up hits max rate.") - return None - next_bound = cur_lo2 - if self.improves(pre_lo, cur_lo2, cur_lo1): - next_bound = pre_lo - old_width = state.width_goal - if next_bound is not None: - old_width = ReceiveRateInterval(cur_lo1, next_bound).rel_tr_width - old_width = max(old_width, state.width_goal) - new_tr = multiple_step_up(old_tr, old_width, self.expansion_coefficient) - new_tr = min(new_tr, state.max_rate) - return new_tr - - def _bisect(self, lower_bound, upper_bound): - """Return middle rate or None if width is narrow enough. - - :param lower_bound: Measurement to use as a lower bound. Has to exist. - :param upper_bound: Measurement to use as an upper bound. Has to exist. - :type lower_bound: ReceiveRateMeasurement - :type upper_bound: ReceiveRateMeasurement - :returns: The next target transmit rate to measure at. - :rtype: Optional[float] - :raises RuntimeError: If database inconsistency is detected. - """ - state = self.state - width = ReceiveRateInterval(lower_bound, upper_bound).rel_tr_width - if width <= state.width_goal: - self.debug(u"No more bisects needed.") - return None - new_tr = half_step_up(lower_bound.target_tr, width, state.width_goal) - return new_tr