- elif ndr_hi.duration < state.duration:
- logging.info("ndr hi maximal re-measure")
- state = self._measure_and_update_state(
- state, state.maximum_transmit_rate)
- continue
- if pdr_hi.loss_fraction <= state.packet_loss_ratio:
- if pdr_hi.target_tr < state.maximum_transmit_rate:
- new_tr = min(
- state.maximum_transmit_rate,
- self.expand_up(
- pdr_rel_width, self.doublings, pdr_hi.target_tr))
- logging.info("pdr hi external %s", new_tr)
- state = self._measure_and_update_state(state, new_tr)
- continue
- elif pdr_hi.duration < state.duration:
- logging.info("ndr hi maximal re-measure")
- state = self._measure_and_update_state(
- state, state.maximum_transmit_rate)
- continue
- # If we are hitting maximum_transmit_rate,
- # it is still worth narrowing width,
- # hoping large enough loss fraction will happen.
- # But if we are hitting the minimal rate (at current duration),
- # no additional measurement will help with that,
- # so we can stop narrowing in this phase.
- if (ndr_lo.target_tr <= state.minimum_transmit_rate
- and ndr_lo.loss_fraction > 0.0):
- ndr_rel_width = 0.0
- if (pdr_lo.target_tr <= state.minimum_transmit_rate
- and pdr_lo.loss_fraction > state.packet_loss_ratio):
- pdr_rel_width = 0.0
- if ndr_rel_width > state.width_goal:
- # We have to narrow NDR width first, as NDR internal search
- # can invalidate PDR (but not vice versa).
- new_tr = self.half_step_up(ndr_rel_width, ndr_lo.target_tr)
- logging.info("Bisecting for NDR at %s", new_tr)
- state = self._measure_and_update_state(state, new_tr)
- continue
- if pdr_rel_width > state.width_goal:
- # PDR iternal search.
- new_tr = self.half_step_up(pdr_rel_width, pdr_lo.target_tr)
- logging.info("Bisecting for PDR at %s", new_tr)
- state = self._measure_and_update_state(state, new_tr)
- continue
- # We do not need to improve width, but there still might be
- # some measurements with smaller duration.
- # We need to re-measure with full duration, possibly
- # creating invalid bounds to resolve (thus broadening width).
- if ndr_lo.duration < state.duration:
- logging.info("re-measuring NDR lower bound")
- self._measure_and_update_state(state, ndr_lo.target_tr)
- continue
- if pdr_lo.duration < state.duration:
- logging.info("re-measuring PDR lower bound")
- self._measure_and_update_state(state, pdr_lo.target_tr)
- continue
- # Except when lower bounds have high loss fraction, in that case
- # we do not need to re-measure _upper_ bounds.
- if ndr_hi.duration < state.duration and ndr_rel_width > 0.0:
- logging.info("re-measuring NDR upper bound")
- self._measure_and_update_state(state, ndr_hi.target_tr)
- continue
- if pdr_hi.duration < state.duration and pdr_rel_width > 0.0:
- logging.info("re-measuring PDR upper bound")
- self._measure_and_update_state(state, pdr_hi.target_tr)
- continue
- # Widths are narrow (or lower bound minimal), bound measurements
- # are long enough, we can return.
- logging.info("phase done")
- break
- return state
+ # We have transmit rate to measure at.
+ # We do not check duration versus stop_time here,
+ # as some measurers can be unpredictably faster
+ # than what duration suggests.
+ measurement = self.measurer.measure(
+ duration=state.duration,
+ transmit_rate=new_tr,
+ )
+ database.add(measurement)
+ # Restart ratio handling on updated database.
+ break
+ else:
+ # No ratio needs measuring, we are done with this phase.
+ self.debug(u"Phase done.")
+ break
+ # We have broken out of the for loop.
+ if failing_fast:
+ # Abort the while loop early.
+ break
+ # Not failing fast but database got updated, restart the while loop.
+ else:
+ # Time is up.
+ raise RuntimeError(u"Optimized search takes too long.")
+ # Min rate is not valid, but returning what we have
+ # so next duration can recover.
+
+ @staticmethod
+ def improves(new_bound, lower_bound, upper_bound):
+ """Return whether new bound improves upon old bounds.
+
+ To improve, new_bound has to be not None,
+ and between the old bounds (where the bound is not None).
+
+ This piece of logic is commonly used, when we know old bounds
+ from a primary source (e.g. current duration database)
+ and new bound from a secondary source (e.g. previous duration database).
+ Having a function allows "if improves(..):" construction to save space.
+
+ :param new_bound: The bound we consider applying.
+ :param lower_bound: Known bound, new_bound has to be higher to apply.
+ :param upper_bound: Known bound, new_bound has to be lower to apply.
+ :type new_bound: Optional[ReceiveRateMeasurement]
+ :type lower_bound: Optional[ReceiveRateMeasurement]
+ :type upper_bound: Optional[ReceiveRateMeasurement]
+ :returns: Whether we can apply the new bound.
+ :rtype: bool
+ """
+ if new_bound is None:
+ return False
+ if lower_bound is not None:
+ if new_bound.target_tr <= lower_bound.target_tr:
+ return False
+ if upper_bound is not None:
+ if new_bound.target_tr >= upper_bound.target_tr:
+ return False
+ return True
+
+ def _select_for_ratio(self, ratio):
+ """Return None or new target_tr to measure at.
+
+ Returning None means either we have narrow enough valid interval
+ for this ratio, or we are hitting min rate and should fail early.
+
+ :param ratio: Loss ratio to ensure narrow valid bounds for.
+ :type ratio: float
+ :returns: The next target transmit rate to measure at.
+ :rtype: Optional[float]
+ :raises RuntimeError: If database inconsistency is detected.
+ """
+ state = self.state
+ data = state.database
+ bounds = data.get_bounds(ratio)
+ cur_lo1, cur_hi1, pre_lo, pre_hi, cur_lo2, cur_hi2 = bounds
+ pre_lo_improves = self.improves(pre_lo, cur_lo1, cur_hi1)
+ pre_hi_improves = self.improves(pre_hi, cur_lo1, cur_hi1)
+ if pre_lo_improves and pre_hi_improves:
+ # We allowed larger width for previous phase
+ # as single bisect here guarantees only one re-measurement.
+ new_tr = self._bisect(pre_lo, pre_hi)
+ if new_tr is not None:
+ self.debug(f"Initial bisect for {ratio}, tr: {new_tr}")
+ return new_tr
+ if pre_lo_improves:
+ new_tr = pre_lo.target_tr
+ self.debug(f"Re-measuring lower bound for {ratio}, tr: {new_tr}")
+ return new_tr
+ if pre_hi_improves:
+ new_tr = pre_hi.target_tr
+ self.debug(f"Re-measuring upper bound for {ratio}, tr: {new_tr}")
+ return new_tr
+ if cur_lo1 is None and cur_hi1 is None:
+ raise RuntimeError(u"No results found in databases!")
+ if cur_lo1 is None:
+ # Upper bound exists (cur_hi1).
+ # We already tried previous lower bound.
+ # So, we want to extend down.
+ new_tr = self._extend_down(
+ cur_hi1, cur_hi2, pre_hi, second_needed=False
+ )
+ self.debug(
+ f"Extending down for {ratio}:"
+ f" old {cur_hi1.target_tr} new {new_tr}"
+ )
+ return new_tr
+ if cur_hi1 is None:
+ # Lower bound exists (cur_lo1).
+ # We already tried previous upper bound.
+ # So, we want to extend up.
+ new_tr = self._extend_up(cur_lo1, cur_lo2, pre_lo)
+ self.debug(
+ f"Extending up for {ratio}:"
+ f" old {cur_lo1.target_tr} new {new_tr}"
+ )
+ return new_tr
+ # Both bounds exist (cur_lo1 and cur_hi1).
+ # cur_lo1 might have been selected for this ratio (we are bisecting)
+ # or for previous ratio (we are extending down for this ratio).
+ # Compute both estimates and choose the higher value.
+ bisected_tr = self._bisect(cur_lo1, cur_hi1)
+ extended_tr = self._extend_down(
+ cur_hi1, cur_hi2, pre_hi, second_needed=True
+ )
+ # Only if both are not None we need to decide.
+ if bisected_tr and extended_tr and extended_tr > bisected_tr:
+ self.debug(
+ f"Extending down for {ratio}:"
+ f" old {cur_hi1.target_tr} new {extended_tr}"
+ )
+ new_tr = extended_tr
+ else:
+ self.debug(
+ f"Bisecting for {ratio}: lower {cur_lo1.target_tr},"
+ f" upper {cur_hi1.target_tr}, new {bisected_tr}"
+ )
+ new_tr = bisected_tr
+ return new_tr
+
+ def _extend_down(self, cur_hi1, cur_hi2, pre_hi, second_needed=False):
+ """Return extended width below, or None if hitting min rate.
+
+ If no second tightest (nor previous) upper bound is available,
+ the behavior is governed by second_needed argument.
+ If true, return None, if false, start from width goal.
+ This is useful, as if a bisect is possible,
+ we want to give it a chance.
+
+ :param cur_hi1: Tightest upper bound for current duration. Has to exist.
+ :param cur_hi2: Second tightest current upper bound, may not exist.
+ :param pre_hi: Tightest upper bound, previous duration, may not exist.
+ :param second_needed: Whether second tightest bound is required.
+ :type cur_hi1: ReceiveRateMeasurement
+ :type cur_hi2: Optional[ReceiveRateMeasurement]
+ :type pre_hi: Optional[ReceiveRateMeasurement]
+ :type second_needed: bool
+ :returns: The next target transmit rate to measure at.
+ :rtype: Optional[float]
+ """
+ state = self.state
+ old_tr = cur_hi1.target_tr
+ next_bound = cur_hi2
+ if self.improves(pre_hi, cur_hi1, cur_hi2):
+ next_bound = pre_hi
+ if next_bound is None and second_needed:
+ return None
+ old_width = state.width_goal
+ if next_bound is not None:
+ old_width = ReceiveRateInterval(cur_hi1, next_bound).rel_tr_width
+ old_width = max(old_width, state.width_goal)
+ new_tr = multiple_step_down(
+ old_tr, old_width, self.expansion_coefficient
+ )
+ new_tr = max(new_tr, state.min_rate)
+ if new_tr >= old_tr:
+ self.debug(u"Extend down hits max rate.")
+ return None
+ return new_tr
+
+ def _extend_up(self, cur_lo1, cur_lo2, pre_lo):
+ """Return extended width above, or None if hitting max rate.
+
+ :param cur_lo1: Tightest lower bound for current duration. Has to exist.
+ :param cur_lo2: Second tightest current lower bound, may not exist.
+ :param pre_lo: Tightest lower bound, previous duration, may not exist.
+ :type cur_lo1: ReceiveRateMeasurement
+ :type cur_lo2: Optional[ReceiveRateMeasurement]
+ :type pre_lo: Optional[ReceiveRateMeasurement]
+ :returns: The next target transmit rate to measure at.
+ :rtype: Optional[float]
+ """
+ state = self.state
+ old_tr = cur_lo1.target_tr
+ next_bound = cur_lo2
+ if self.improves(pre_lo, cur_lo2, cur_lo1):
+ next_bound = pre_lo
+ old_width = state.width_goal
+ if next_bound is not None:
+ old_width = ReceiveRateInterval(cur_lo1, next_bound).rel_tr_width
+ old_width = max(old_width, state.width_goal)
+ new_tr = multiple_step_up(old_tr, old_width, self.expansion_coefficient)
+ new_tr = min(new_tr, state.max_rate)
+ if new_tr <= old_tr:
+ self.debug(u"Extend up hits max rate.")
+ return None
+ return new_tr
+
+ def _bisect(self, lower_bound, upper_bound):
+ """Return middle rate or None if width is narrow enough.
+
+ :param lower_bound: Measurement to use as a lower bound. Has to exist.
+ :param upper_bound: Measurement to use as an upper bound. Has to exist.
+ :type lower_bound: ReceiveRateMeasurement
+ :type upper_bound: ReceiveRateMeasurement
+ :returns: The next target transmit rate to measure at.
+ :rtype: Optional[float]
+ :raises RuntimeError: If database inconsistency is detected.
+ """
+ state = self.state
+ width = ReceiveRateInterval(lower_bound, upper_bound).rel_tr_width
+ if width <= state.width_goal:
+ self.debug(u"No more bisects needed.")
+ return None
+ new_tr = half_step_up(lower_bound.target_tr, width, state.width_goal)
+ return new_tr