# Copyright (c) 2021 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Module defining MultipleLossRatioSearch class.""" import logging import math import time from .MeasurementDatabase import MeasurementDatabase from .ProgressState import ProgressState from .ReceiveRateInterval import ReceiveRateInterval from .WidthArithmetics import ( multiply_relative_width, step_down, step_up, multiple_step_down, multiple_step_up, half_step_up, ) class MultipleLossRatioSearch: """Optimized binary search algorithm for finding bounds for multiple ratios. This is unofficially a subclass of AbstractSearchAlgorithm, but constructor signature is different. Traditional binary search algorithm needs initial interval (lower and upper bound), and returns final interval after bisecting (until some exit condition is met). The exit condition is usually related to the interval width, (upper bound value minus lower bound value). The optimized algorithm contains several improvements aimed to reduce overall search time. One improvement is searching for multiple intervals at once. The intervals differ by the target loss ratio. Lower bound has to have equal or smaller loss ratio, upper bound has to have larger. Next improvement is that the initial interval does not need to be valid. Imagine initial interval (10, 11) where loss at 11 is smaller than the searched ratio. The algorithm will try (11, 13) interval next, and if 13 is still smaller, (13, 17) and so on, doubling width until the upper bound is valid. The part when interval expands is called external search, the part when interval is bisected is called internal search. Next improvement is that trial measurements at small trial duration can be used to find a reasonable interval for full trial duration search. This results in more trials performed, but smaller overall duration in general. Next improvement is bisecting in logarithmic quantities, so that exit criteria can be independent of measurement units. Next improvement is basing the initial interval on receive rates. Final improvement is exiting early if the minimal value is not a valid lower bound. The complete search consist of several phases, each phase performing several trial measurements. Initial phase creates initial interval based on receive rates at maximum rate and at maximum receive rate (MRR). Final phase and preceding intermediate phases are performing external and internal search steps, each resulting interval is the starting point for the next phase. The resulting intervals of final phase is the result of the whole algorithm. Each non-initial phase uses its own trial duration. Any non-initial phase stops searching (for all ratios independently) when minimum is not a valid lower bound (at current duration), or all of the following is true: Both bounds are valid, bounds are measured at the current phase trial duration, interval width is less than the width goal for current phase. TODO: Review and update this docstring according to rst docs. """ def __init__( self, measurer, final_relative_width=0.005, final_trial_duration=30.0, initial_trial_duration=1.0, number_of_intermediate_phases=2, timeout=600.0, debug=None, expansion_coefficient=2.0): """Store the measurer object and additional arguments. :param measurer: Rate provider to use by this search object. :param final_relative_width: Final lower bound transmit rate cannot be more distant that this multiple of upper bound [1]. :param final_trial_duration: Trial duration for the final phase [s]. :param initial_trial_duration: Trial duration for the initial phase and also for the first intermediate phase [s]. :param number_of_intermediate_phases: Number of intermediate phases to perform before the final phase [1]. :param timeout: The search will fail itself when not finished before this overall time [s]. :param debug: Callable to use instead of logging.debug(). :param expansion_coefficient: External search multiplies width by this. :type measurer: AbstractMeasurer.AbstractMeasurer :type final_relative_width: float :type final_trial_duration: float :type initial_trial_duration: float :type number_of_intermediate_phases: int :type timeout: float :type debug: Optional[Callable[[str], None]] :type expansion_coefficient: float """ self.measurer = measurer self.final_trial_duration = float(final_trial_duration) self.final_relative_width = float(final_relative_width) self.number_of_intermediate_phases = int(number_of_intermediate_phases) self.initial_trial_duration = float(initial_trial_duration) self.timeout = float(timeout) self.state = None self.debug = logging.debug if debug is None else debug self.expansion_coefficient = float(expansion_coefficient) def narrow_down_intervals(self, min_rate, max_rate, packet_loss_ratios): """Perform initial phase, create state object, proceed with next phases. The current implementation requires the ratios so be unique and sorted. Also non-empty. :param min_rate: Minimal target transmit rate [tps]. :param max_rate: Maximal target transmit rate [tps]. :param packet_loss_ratios: Target ratios of packets loss to locate. :type min_rate: float :type max_rate: float :type packet_loss_ratios: Iterable[float] :returns: Structure containing narrowed down intervals and their measurements. :rtype: List[ReceiveRateInterval] :raises RuntimeError: If total duration is larger than timeout. Or if ratios list is (empty or) not sorted or unique. """ min_rate = float(min_rate) max_rate = float(max_rate) packet_loss_ratios = [float(ratio) for ratio in packet_loss_ratios] if len(packet_loss_ratios) < 1: raise RuntimeError(u"At least one ratio is required!") if packet_loss_ratios != sorted(set(packet_loss_ratios)): raise RuntimeError(u"Input ratios have to be sorted and unique!") measurements = list() self.debug(f"First measurement at max rate: {max_rate}") measured = self.measurer.measure( duration=self.initial_trial_duration, transmit_rate=max_rate, ) measurements.append(measured) initial_width_goal = self.final_relative_width for _ in range(self.number_of_intermediate_phases): initial_width_goal = multiply_relative_width( initial_width_goal, 2.0 ) max_lo = step_down(max_rate, initial_width_goal) mrr = max(min_rate, min(max_lo, measured.relative_receive_rate)) self.debug(f"Second measurement at mrr: {mrr}") measured = self.measurer.measure( duration=self.initial_trial_duration, transmit_rate=mrr, ) measurements.append(measured) # Attempt to get narrower width. if measured.loss_ratio > packet_loss_ratios[0]: max_lo = step_down(mrr, initial_width_goal) mrr2 = min(max_lo, measured.relative_receive_rate) else: mrr2 = step_up(mrr, initial_width_goal) if min_rate < mrr2 < max_rate: self.debug(f"Third measurement at mrr2: {mrr2}") measured = self.measurer.measure( duration=self.initial_trial_duration, transmit_rate=mrr2, ) measurements.append(measured) # If mrr2 > mrr and mrr2 got zero loss, # it is better to do external search from mrr2 up. # To prevent bisection between mrr2 and max_rate, # we simply remove the max_rate measurement. # Similar logic applies to higher loss ratio goals. # Overall, with mrr2 measurement done, we never need # the first measurement done at max rate. measurements = measurements[1:] database = MeasurementDatabase(measurements) stop_time = time.monotonic() + self.timeout self.state = ProgressState( database, self.number_of_intermediate_phases, self.final_trial_duration, self.final_relative_width, packet_loss_ratios, min_rate, max_rate, stop_time ) self.ndrpdr() return self.state.database.get_results(ratio_list=packet_loss_ratios) def ndrpdr(self): """Perform trials for this phase. State is updated in-place. Recursion to smaller durations is performed (if not performed yet). :raises RuntimeError: If total duration is larger than timeout. """ state = self.state if state.phases > 0: # We need to finish preceding intermediate phases first. saved_phases = state.phases state.phases -= 1 # Preceding phases have shorter duration. saved_duration = state.duration duration_multiplier = state.duration / self.initial_trial_duration phase_exponent = float(state.phases) / saved_phases state.duration = self.initial_trial_duration * math.pow( duration_multiplier, phase_exponent ) # Shorter durations do not need that narrow widths. saved_width = state.width_goal state.width_goal = multiply_relative_width(saved_width, 2.0) # Recurse. self.ndrpdr() # Restore the state for current phase. state.width_goal = saved_width state.duration = saved_duration state.phases = saved_phases # Not needed, but just in case. self.debug( f"Starting phase with {state.duration} duration" f" and {state.width_goal} relative width goal." ) failing_fast = False database = state.database database.set_current_duration(state.duration) while time.monotonic() < state.stop_time: for index, ratio in enumerate(state.packet_loss_ratios): new_tr = self._select_for_ratio(ratio) if new_tr is None: # Either this ratio is fine, or min rate got invalid result. # If fine, we will continue to handle next ratio. if index > 0: # First ratio passed, all next have a valid lower bound. continue lower_bound, _, _, _, _, _ = database.get_bounds(ratio) if lower_bound is None: failing_fast = True self.debug(u"No valid lower bound for this iteration.") break # First ratio is fine. continue # We have transmit rate to measure at. # We do not check duration versus stop_time here, # as some measurers can be unpredictably faster # than what duration suggests. measurement = self.measurer.measure( duration=state.duration, transmit_rate=new_tr, ) database.add(measurement) # Restart ratio handling on updated database. break else: # No ratio needs measuring, we are done with this phase. self.debug(u"Phase done.") break # We have broken out of the for loop. if failing_fast: # Abort the while loop early. break # Not failing fast but database got updated, restart the while loop. else: # Time is up. raise RuntimeError(u"Optimized search takes too long.") # Min rate is not valid, but returning what we have # so next duration can recover. @staticmethod def improves(new_bound, lower_bound, upper_bound): """Return whether new bound improves upon old bounds. To improve, new_bound has to be not None, and between the old bounds (where the bound is not None). This piece of logic is commonly used, when we know old bounds from a primary source (e.g. current duration database) and new bound from a secondary source (e.g. previous duration database). Having a function allows "if improves(..):" construction to save space. :param new_bound: The bound we consider applying. :param lower_bound: Known bound, new_bound has to be higher to apply. :param upper_bound: Known bound, new_bound has to be lower to apply. :type new_bound: Optional[ReceiveRateMeasurement] :type lower_bound: Optional[ReceiveRateMeasurement] :type upper_bound: Optional[ReceiveRateMeasurement] :returns: Whether we can apply the new bound. :rtype: bool """ if new_bound is None: return False if lower_bound is not None: if new_bound.target_tr <= lower_bound.target_tr: return False if upper_bound is not None: if new_bound.target_tr >= upper_bound.target_tr: return False return True def _select_for_ratio(self, ratio): """Return None or new target_tr to measure at. Returning None means either we have narrow enough valid interval for this ratio, or we are hitting min rate and should fail early. :param ratio: Loss ratio to ensure narrow valid bounds for. :type ratio: float :returns: The next target transmit rate to measure at. :rtype: Optional[float] :raises RuntimeError: If database inconsistency is detected. """ state = self.state data = state.database bounds = data.get_bounds(ratio) cur_lo1, cur_hi1, pre_lo, pre_hi, cur_lo2, cur_hi2 = bounds pre_lo_improves = self.improves(pre_lo, cur_lo1, cur_hi1) pre_hi_improves = self.improves(pre_hi, cur_lo1, cur_hi1) # TODO: Detect also the other case for initial bisect, see below. if pre_lo_improves and pre_hi_improves: # We allowed larger width for previous phase # as single bisect here guarantees only one re-measurement. new_tr = self._bisect(pre_lo, pre_hi) if new_tr is not None: self.debug(f"Initial bisect for {ratio}, tr: {new_tr}") return new_tr if pre_lo_improves: new_tr = pre_lo.target_tr self.debug(f"Re-measuring lower bound for {ratio}, tr: {new_tr}") return new_tr if pre_hi_improves: # This can also happen when we did not do initial bisect # for this ratio yet, but the previous duration lower bound # for this ratio got already re-measured as previous duration # upper bound for previous ratio. new_tr = pre_hi.target_tr self.debug(f"Re-measuring upper bound for {ratio}, tr: {new_tr}") return new_tr if cur_lo1 is None and cur_hi1 is None: raise RuntimeError(u"No results found in databases!") if cur_lo1 is None: # Upper bound exists (cur_hi1). # We already tried previous lower bound. # So, we want to extend down. new_tr = self._extend_down( cur_hi1, cur_hi2, pre_hi, second_needed=False ) self.debug( f"Extending down for {ratio}:" f" old {cur_hi1.target_tr} new {new_tr}" ) return new_tr if cur_hi1 is None: # Lower bound exists (cur_lo1). # We already tried previous upper bound. # So, we want to extend up. new_tr = self._extend_up(cur_lo1, cur_lo2, pre_lo) self.debug( f"Extending up for {ratio}:" f" old {cur_lo1.target_tr} new {new_tr}" ) return new_tr # Both bounds exist (cur_lo1 and cur_hi1). # cur_lo1 might have been selected for this ratio (we are bisecting) # or for previous ratio (we are extending down for this ratio). # Compute both estimates and choose the higher value. bisected_tr = self._bisect(cur_lo1, cur_hi1) extended_tr = self._extend_down( cur_hi1, cur_hi2, pre_hi, second_needed=True ) # Only if both are not None we need to decide. if bisected_tr and extended_tr and extended_tr > bisected_tr: self.debug( f"Extending down for {ratio}:" f" old {cur_hi1.target_tr} new {extended_tr}" ) new_tr = extended_tr else: self.debug( f"Bisecting for {ratio}: lower {cur_lo1.target_tr}," f" upper {cur_hi1.target_tr}, new {bisected_tr}" ) new_tr = bisected_tr return new_tr def _extend_down(self, cur_hi1, cur_hi2, pre_hi, second_needed=False): """Return extended width below, or None if hitting min rate. If no second tightest (nor previous) upper bound is available, the behavior is governed by second_needed argument. If true, return None. If false, start from width goal. This is useful, as if a bisect is possible, we want to give it a chance. :param cur_hi1: Tightest upper bound for current duration. Has to exist. :param cur_hi2: Second tightest current upper bound, may not exist. :param pre_hi: Tightest upper bound, previous duration, may not exist. :param second_needed: Whether second tightest bound is required. :type cur_hi1: ReceiveRateMeasurement :type cur_hi2: Optional[ReceiveRateMeasurement] :type pre_hi: Optional[ReceiveRateMeasurement] :type second_needed: bool :returns: The next target transmit rate to measure at. :rtype: Optional[float] """ state = self.state old_tr = cur_hi1.target_tr if state.min_rate >= old_tr: self.debug(u"Extend down hits min rate.") return None next_bound = cur_hi2 if self.improves(pre_hi, cur_hi1, cur_hi2): next_bound = pre_hi if next_bound is None and second_needed: return None old_width = state.width_goal if next_bound is not None: old_width = ReceiveRateInterval(cur_hi1, next_bound).rel_tr_width old_width = max(old_width, state.width_goal) new_tr = multiple_step_down( old_tr, old_width, self.expansion_coefficient ) new_tr = max(new_tr, state.min_rate) return new_tr def _extend_up(self, cur_lo1, cur_lo2, pre_lo): """Return extended width above, or None if hitting max rate. :param cur_lo1: Tightest lower bound for current duration. Has to exist. :param cur_lo2: Second tightest current lower bound, may not exist. :param pre_lo: Tightest lower bound, previous duration, may not exist. :type cur_lo1: ReceiveRateMeasurement :type cur_lo2: Optional[ReceiveRateMeasurement] :type pre_lo: Optional[ReceiveRateMeasurement] :returns: The next target transmit rate to measure at. :rtype: Optional[float] """ state = self.state old_tr = cur_lo1.target_tr if state.max_rate <= old_tr: self.debug(u"Extend up hits max rate.") return None next_bound = cur_lo2 if self.improves(pre_lo, cur_lo2, cur_lo1): next_bound = pre_lo old_width = state.width_goal if next_bound is not None: old_width = ReceiveRateInterval(cur_lo1, next_bound).rel_tr_width old_width = max(old_width, state.width_goal) new_tr = multiple_step_up(old_tr, old_width, self.expansion_coefficient) new_tr = min(new_tr, state.max_rate) return new_tr def _bisect(self, lower_bound, upper_bound): """Return middle rate or None if width is narrow enough. :param lower_bound: Measurement to use as a lower bound. Has to exist. :param upper_bound: Measurement to use as an upper bound. Has to exist. :type lower_bound: ReceiveRateMeasurement :type upper_bound: ReceiveRateMeasurement :returns: The next target transmit rate to measure at. :rtype: Optional[float] :raises RuntimeError: If database inconsistency is detected. """ state = self.state width = ReceiveRateInterval(lower_bound, upper_bound).rel_tr_width if width <= state.width_goal: self.debug(u"No more bisects needed.") return None new_tr = half_step_up(lower_bound.target_tr, width, state.width_goal) return new_tr