self._search_result = None
self._search_result_rate = None
+ @abstractmethod
+ def get_latency(self):
+ """Return min/avg/max latency.
+
+ :returns: Latency stats.
+ :rtype: list
+ """
+ pass
+
@abstractmethod
def measure_loss(self, rate, frame_size, loss_acceptance,
loss_acceptance_type, traffic_type):
:type loss_acceptance: float
:type loss_acceptance_type: LossAcceptanceType
:type traffic_type: str
- :return: Drop threshold exceeded? (True/False)
+ :returns: Drop threshold exceeded? (True/False)
:rtype bool
"""
pass
:param min_rate: Lower value of search boundaries.
:type max_rate: float
:type min_rate: float
- :return: nothing
+ :returns: nothing
+ :raises: ValueError if min rate is lower than 0 and higher than max rate
"""
if float(min_rate) <= 0:
raise ValueError("min_rate must be higher than 0")
self._rate_max = float(max_rate)
self._rate_min = float(min_rate)
+ def set_loss_acceptance(self, loss_acceptance):
+ """Set loss acceptance treshold for PDR search.
+
+ :param loss_acceptance: Loss acceptance treshold for PDR search.
+ :type loss_acceptance: str
+ :returns: nothing
+ :raises: ValueError if loss acceptance is lower than zero
+ """
+ if float(loss_acceptance) < 0:
+ raise ValueError("Loss acceptance must be higher or equal 0")
+ else:
+ self._loss_acceptance = float(loss_acceptance)
+
+ def get_loss_acceptance(self):
+ """Return configured loss acceptance treshold.
+
+ :returns: Loss acceptance treshold.
+ :rtype: float
+ """
+ return self._loss_acceptance
+
+ def set_loss_acceptance_type_percentage(self):
+ """Set loss acceptance treshold type to percentage.
+
+ :returns: nothing
+ """
+ self._loss_acceptance_type = LossAcceptanceType.PERCENTAGE
+
+ def set_loss_acceptance_type_frames(self):
+ """Set loss acceptance treshold type to frames.
+
+ :returns: nothing
+ """
+ self._loss_acceptance_type = LossAcceptanceType.FRAMES
+
+ def loss_acceptance_type_is_percentage(self):
+ """Return true if loss acceptance treshold type is percentage,
+ false otherwise.
+
+ :returns: True if loss acceptance treshold type is percentage.
+ :rtype: boolean
+ """
+ return self._loss_acceptance_type == LossAcceptanceType.PERCENTAGE
+
def set_search_linear_step(self, step_rate):
"""Set step size for linear search.
:param step_rate: Linear search step size.
:type step_rate: float
- :return: nothing
+ :returns: nothing
"""
self._rate_linear_step = float(step_rate)
def set_search_rate_type_percentage(self):
"""Set rate type to percentage of linerate.
- :return: nothing
+ :returns: nothing
"""
self._set_search_rate_type(RateType.PERCENTAGE)
def set_search_rate_type_bps(self):
"""Set rate type to bits per second.
- :return: nothing
+ :returns: nothing
"""
self._set_search_rate_type(RateType.BITS_PER_SECOND)
def set_search_rate_type_pps(self):
"""Set rate type to packets per second.
- :return: nothing
+ :returns: nothing
"""
self._set_search_rate_type(RateType.PACKETS_PER_SECOND)
:param rate_type: Type of rate to set.
:type rate_type: RateType
- :return: nothing
+ :returns: nothing
+ :raises: Exception if rate type is unknown
"""
if rate_type not in RateType:
raise Exception("rate_type unknown: {}".format(rate_type))
:param frame_size: Size of frames.
:type frame_size: str
- :return: nothing
+ :returns: nothing
"""
self._frame_size = frame_size
:param duration: Number of seconds for traffic to run.
:type duration: int
- :return: nothing
+ :returns: nothing
"""
self._duration = int(duration)
def get_duration(self):
"""Return configured duration of single traffic run.
- :return: Number of seconds for traffic to run.
+ :returns: Number of seconds for traffic to run.
:rtype: int
"""
return self._duration
:param convergence: Treshold value number.
:type convergence: float
- :return: nothing
+ :returns: nothing
"""
self._binary_convergence_threshold = float(convergence)
def get_binary_convergence_threshold(self):
"""Get convergence for binary search.
- :return: Treshold value number.
+ :returns: Treshold value number.
:rtype: float
"""
return self._binary_convergence_threshold
def get_rate_type_str(self):
"""Return rate type representation.
- :return: String representation of rate type.
+ :returns: String representation of rate type.
:rtype: str
+ :raises: ValueError if rate type is unknown
"""
if self._rate_type == RateType.PERCENTAGE:
return "%"
:param max_attempts: Number of traffic runs.
:type max_attempts: int
- :return: nothing
+ :returns: nothing
+ :raises: ValueError if max attempts is lower than zero
"""
if int(max_attempts) > 0:
self._max_attempts = int(max_attempts)
else:
- raise ValueError("Max attempt must by greater then zero")
+ raise ValueError("Max attempt must by greater than zero")
def get_max_attempts(self):
"""Return maximum number of traffic runs during one rate step.
- :return: Number of traffic runs.
+ :returns: Number of traffic runs.
:rtype: int
"""
return self._max_attempts
def set_search_result_type_best_of_n(self):
"""Set type of search result evaluation to Best of N.
- :return: nothing
+ :returns: nothing
"""
self._set_search_result_type(SearchResultType.BEST_OF_N)
def set_search_result_type_worst_of_n(self):
"""Set type of search result evaluation to Worst of N.
- :return: nothing
+ :returns: nothing
"""
self._set_search_result_type(SearchResultType.WORST_OF_N)
:param search_type: Type of search result evaluation to set.
:type search_type: SearchResultType
- :return: nothing
+ :returns: nothing
+ :raises: ValueError if search type is unknown
"""
if search_type not in SearchResultType:
raise ValueError("search_type unknown: {}".format(search_type))
:param res_list: List of return values from all runs at one rate step.
:type res_list: list
- :return: True if at least one run is True, False otherwise.
+ :returns: True if at least one run is True, False otherwise.
:rtype: boolean
"""
# Return True if any element of the iterable is True.
:param res_list: List of return values from all runs at one rate step.
:type res_list: list
- :return: False if at least one run is False, True otherwise.
+ :returns: False if at least one run is False, True otherwise.
:rtype: boolean
"""
# Return False if not all elements of the iterable are True.
- return not all(res_list)
+ return all(res_list)
def _get_res_based_on_search_type(self, res_list):
"""Return result of search based on search evaluation type.
:param res_list: List of return values from all runs at one rate step.
:type res_list: list
- :return: Boolean based on search result type.
+ :returns: Boolean based on search result type.
:rtype: boolean
+ :raises: ValueError if search result type is unknown
"""
if self._search_result_type == SearchResultType.BEST_OF_N:
return self._get_best_of_n(res_list)
else:
raise ValueError("Unknown search result type")
-
def linear_search(self, start_rate, traffic_type):
"""Linear search of rate with loss below acceptance criteria.
:param start_rate: Initial rate.
:param traffic_type: Traffic profile.
:type start_rate: float
- :param traffic_type: str
- :return: nothing
+ :type traffic_type: str
+ :returns: nothing
+ :raises: ValueError if start rate is not in range
"""
if not self._rate_min <= float(start_rate) <= self._rate_max:
res = self._get_res_based_on_search_type(res)
if self._search_linear_direction == SearchDirection.BOTTOM_UP:
- # loss occured and it was above acceptance criteria
+ # loss occurred and it was above acceptance criteria
if not res:
# if this is first run then we didn't find drop rate
if prev_rate is None:
raise RuntimeError("Unknown search result")
elif self._search_linear_direction == SearchDirection.TOP_DOWN:
- # loss occured, decrease rate
+ # loss occurred, decrease rate
if not res:
prev_rate = rate
rate -= self._rate_linear_step
def verify_search_result(self):
"""Fail if search was not successful.
- :return: Result rate.
- :rtype: float
+ :returns: Result rate and latency stats.
+ :rtype: tuple
+ :raises: Exception if search failed
"""
if self._search_result == SearchResults.FAILURE:
raise Exception('Search FAILED')
elif self._search_result in [SearchResults.SUCCESS,
SearchResults.SUSPICIOUS]:
- return self._search_result_rate
+ return self._search_result_rate, self.get_latency()
- def binary_search(self, b_min, b_max, traffic_type):
+ def binary_search(self, b_min, b_max, traffic_type, skip_max_rate=False):
"""Binary search of rate with loss below acceptance criteria.
:param b_min: Min range rate.
:param b_max: Max range rate.
:param traffic_type: Traffic profile.
+ :param skip_max_rate: Start with max rate first
:type b_min: float
:type b_max: float
:type traffic_type: str
- :return: nothing
+ :type skip_max_rate: bool
+ :returns: nothing
+ :raises: ValueError if input values are not valid
"""
if not self._rate_min <= float(b_min) <= self._rate_max:
if not self._rate_min <= float(b_max) <= self._rate_max:
raise ValueError("Max rate is not in min,max range")
if float(b_max) < float(b_min):
- raise ValueError("Min rate is greater then max rate")
+ raise ValueError("Min rate is greater than max rate")
# binary search
- # rate is half of interval + start of interval
- rate = ((float(b_max) - float(b_min)) / 2) + float(b_min)
+ if skip_max_rate:
+ # rate is half of interval + start of interval
+ rate = ((float(b_max) - float(b_min)) / 2) + float(b_min)
+ else:
+ # rate is max of interval
+ rate = float(b_max)
# rate diff with previous run
rate_diff = abs(self._last_binary_rate - rate)
res = self._get_res_based_on_search_type(res)
- # loss occured and it was above acceptance criteria
+ # loss occurred and it was above acceptance criteria
if not res:
- self.binary_search(b_min, rate, traffic_type)
+ self.binary_search(b_min, rate, traffic_type, True)
# there was no loss / loss below acceptance criteria
else:
self._search_result_rate = rate
- self.binary_search(rate, b_max, traffic_type)
+ self.binary_search(rate, b_max, traffic_type, True)
def combined_search(self, start_rate, traffic_type):
"""Combined search of rate with loss below acceptance criteria.
:param traffic_type: Traffic profile.
:type start_rate: float
:type traffic_type: str
- :return: nothing
+ :returns: nothing
+ :raises: RuntimeError if linear search failed
"""
self.linear_search(start_rate, traffic_type)
self._search_result_rate = None
# we will use binary search to refine search in one linear step
- self.binary_search(b_min, b_max, traffic_type)
+ self.binary_search(b_min, b_max, traffic_type, True)
# linear and binary search succeed
if self._search_result == SearchResults.SUCCESS:
:type num_b: float
:type rel_tol: float
:type abs_tol: float
- :return: Returns True if num_a is close in value to num_b or equal.
+ :returns: Returns True if num_a is close in value to num_b or equal.
False otherwise.
:rtype: boolean
+ :raises: ValueError if input values are not valid
"""
if num_a == num_b: