-# Copyright (c) 2016 Cisco and/or its affiliates.
+# Copyright (c) 2018 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
def get_latency(self):
"""Return min/avg/max latency.
- :return: Latency stats.
+ :returns: Latency stats.
:rtype: list
"""
pass
@abstractmethod
def measure_loss(self, rate, frame_size, loss_acceptance,
- loss_acceptance_type, traffic_type):
+ loss_acceptance_type, traffic_type, skip_warmup=False):
"""Send traffic from TG and measure count of dropped frames.
:param rate: Offered traffic load.
:param loss_acceptance: Permitted drop ratio or frames count.
:param loss_acceptance_type: Type of permitted loss.
:param traffic_type: Traffic profile ([2,3]-node-L[2,3], ...).
+ :param skip_warmup: Start TRex without warmup traffic if true.
:type rate: int
:type frame_size: str
:type loss_acceptance: float
:type loss_acceptance_type: LossAcceptanceType
:type traffic_type: str
- :return: Drop threshold exceeded? (True/False)
- :rtype bool
+ :type traffic_type: bool
+ :returns: Drop threshold exceeded? (True/False)
+ :rtype: bool
"""
pass
:param min_rate: Lower value of search boundaries.
:type max_rate: float
:type min_rate: float
- :return: nothing
+ :returns: nothing
+ :raises ValueError: If min rate is lower than 0 or higher than max rate.
"""
if float(min_rate) <= 0:
raise ValueError("min_rate must be higher than 0")
:param loss_acceptance: Loss acceptance treshold for PDR search.
:type loss_acceptance: str
- :return: nothing
+ :returns: nothing
+ :raises ValueError: If loss acceptance is lower than zero.
"""
if float(loss_acceptance) < 0:
raise ValueError("Loss acceptance must be higher or equal 0")
def get_loss_acceptance(self):
"""Return configured loss acceptance treshold.
- :return: Loss acceptance treshold.
+ :returns: Loss acceptance treshold.
:rtype: float
"""
return self._loss_acceptance
def set_loss_acceptance_type_percentage(self):
"""Set loss acceptance treshold type to percentage.
- :return: nothing
+ :returns: nothing
"""
self._loss_acceptance_type = LossAcceptanceType.PERCENTAGE
def set_loss_acceptance_type_frames(self):
"""Set loss acceptance treshold type to frames.
- :return: nothing
+ :returns: nothing
"""
self._loss_acceptance_type = LossAcceptanceType.FRAMES
"""Return true if loss acceptance treshold type is percentage,
false otherwise.
- :return: True if loss acceptance treshold type is percentage.
+ :returns: True if loss acceptance treshold type is percentage.
:rtype: boolean
"""
return self._loss_acceptance_type == LossAcceptanceType.PERCENTAGE
:param step_rate: Linear search step size.
:type step_rate: float
- :return: nothing
+ :returns: nothing
"""
self._rate_linear_step = float(step_rate)
def set_search_rate_type_percentage(self):
"""Set rate type to percentage of linerate.
- :return: nothing
+ :returns: nothing
"""
self._set_search_rate_type(RateType.PERCENTAGE)
def set_search_rate_type_bps(self):
"""Set rate type to bits per second.
- :return: nothing
+ :returns: nothing
"""
self._set_search_rate_type(RateType.BITS_PER_SECOND)
def set_search_rate_type_pps(self):
"""Set rate type to packets per second.
- :return: nothing
+ :returns: nothing
"""
self._set_search_rate_type(RateType.PACKETS_PER_SECOND)
:param rate_type: Type of rate to set.
:type rate_type: RateType
- :return: nothing
+ :returns: nothing
+ :raises Exception: If rate type is unknown.
"""
if rate_type not in RateType:
raise Exception("rate_type unknown: {}".format(rate_type))
:param frame_size: Size of frames.
:type frame_size: str
- :return: nothing
+ :returns: nothing
"""
self._frame_size = frame_size
:param duration: Number of seconds for traffic to run.
:type duration: int
- :return: nothing
+ :returns: nothing
"""
self._duration = int(duration)
def get_duration(self):
"""Return configured duration of single traffic run.
- :return: Number of seconds for traffic to run.
+ :returns: Number of seconds for traffic to run.
:rtype: int
"""
return self._duration
:param convergence: Treshold value number.
:type convergence: float
- :return: nothing
+ :returns: nothing
"""
self._binary_convergence_threshold = float(convergence)
def get_binary_convergence_threshold(self):
"""Get convergence for binary search.
- :return: Treshold value number.
+ :returns: Treshold value number.
:rtype: float
"""
return self._binary_convergence_threshold
def get_rate_type_str(self):
"""Return rate type representation.
- :return: String representation of rate type.
+ :returns: String representation of rate type.
:rtype: str
+ :raises ValueError: If rate type is unknown.
"""
if self._rate_type == RateType.PERCENTAGE:
return "%"
:param max_attempts: Number of traffic runs.
:type max_attempts: int
- :return: nothing
+ :returns: nothing
+ :raises ValueError: If max attempts is lower than zero.
"""
if int(max_attempts) > 0:
self._max_attempts = int(max_attempts)
else:
- raise ValueError("Max attempt must by greater then zero")
+ raise ValueError("Max attempt must by greater than zero")
def get_max_attempts(self):
"""Return maximum number of traffic runs during one rate step.
- :return: Number of traffic runs.
+ :returns: Number of traffic runs.
:rtype: int
"""
return self._max_attempts
def set_search_result_type_best_of_n(self):
"""Set type of search result evaluation to Best of N.
- :return: nothing
+ :returns: nothing
"""
self._set_search_result_type(SearchResultType.BEST_OF_N)
def set_search_result_type_worst_of_n(self):
"""Set type of search result evaluation to Worst of N.
- :return: nothing
+ :returns: nothing
"""
self._set_search_result_type(SearchResultType.WORST_OF_N)
:param search_type: Type of search result evaluation to set.
:type search_type: SearchResultType
- :return: nothing
+ :returns: nothing
+ :raises ValueError: If search type is unknown.
"""
if search_type not in SearchResultType:
raise ValueError("search_type unknown: {}".format(search_type))
:param res_list: List of return values from all runs at one rate step.
:type res_list: list
- :return: True if at least one run is True, False otherwise.
+ :returns: True if at least one run is True, False otherwise.
:rtype: boolean
"""
# Return True if any element of the iterable is True.
:param res_list: List of return values from all runs at one rate step.
:type res_list: list
- :return: False if at least one run is False, True otherwise.
+ :returns: False if at least one run is False, True otherwise.
:rtype: boolean
"""
# Return False if not all elements of the iterable are True.
- return not all(res_list)
+ return all(res_list)
def _get_res_based_on_search_type(self, res_list):
"""Return result of search based on search evaluation type.
:param res_list: List of return values from all runs at one rate step.
:type res_list: list
- :return: Boolean based on search result type.
+ :returns: Boolean based on search result type.
:rtype: boolean
+ :raises ValueError: If search result type is unknown.
"""
if self._search_result_type == SearchResultType.BEST_OF_N:
return self._get_best_of_n(res_list)
:param traffic_type: Traffic profile.
:type start_rate: float
:type traffic_type: str
- :return: nothing
+ :returns: nothing
+ :raises ValueError: If start rate is not in range.
"""
if not self._rate_min <= float(start_rate) <= self._rate_max:
if prev_rate is None:
self._search_result = SearchResults.FAILURE
self._search_result_rate = None
- return
# else we found the rate, which is value from previous run
else:
self._search_result = SearchResults.SUCCESS
self._search_result_rate = prev_rate
- return
+ return
# there was no loss / loss below acceptance criteria
elif res:
prev_rate = rate
def verify_search_result(self):
"""Fail if search was not successful.
- :return: Result rate and latency stats.
+ :returns: Result rate and latency stats.
:rtype: tuple
+ :raises Exception: If search failed.
"""
if self._search_result == SearchResults.FAILURE:
raise Exception('Search FAILED')
SearchResults.SUSPICIOUS]:
return self._search_result_rate, self.get_latency()
- def binary_search(self, b_min, b_max, traffic_type, skip_max_rate=False):
+ def binary_search(self, b_min, b_max, traffic_type, skip_max_rate=False,
+ skip_warmup=False):
"""Binary search of rate with loss below acceptance criteria.
:param b_min: Min range rate.
:param b_max: Max range rate.
:param traffic_type: Traffic profile.
:param skip_max_rate: Start with max rate first
+ :param skip_warmup: Start TRex without warmup traffic if true.
:type b_min: float
:type b_max: float
:type traffic_type: str
:type skip_max_rate: bool
- :return: nothing
+ :type skip_warmup: bool
+ :returns: nothing
+ :raises ValueError: If input values are not valid.
"""
if not self._rate_min <= float(b_min) <= self._rate_max:
if float(b_max) < float(b_min):
raise ValueError("Min rate is greater than max rate")
- # binary search
- if skip_max_rate:
- # rate is half of interval + start of interval
- rate = ((float(b_max) - float(b_min)) / 2) + float(b_min)
- else:
- # rate is max of interval
- rate = float(b_max)
+ # rate is half of interval + start of interval if not using max rate
+ rate = ((float(b_max) - float(b_min)) / 2) + float(b_min) \
+ if skip_max_rate else float(b_max)
+
# rate diff with previous run
rate_diff = abs(self._last_binary_rate - rate)
# convergence criterium
if float(rate_diff) < float(self._binary_convergence_threshold):
- if not self._search_result_rate:
- self._search_result = SearchResults.FAILURE
- else:
- self._search_result = SearchResults.SUCCESS
+ self._search_result = SearchResults.SUCCESS \
+ if self._search_result_rate else SearchResults.FAILURE
return
self._last_binary_rate = rate
res.append(self.measure_loss(rate, self._frame_size,
self._loss_acceptance,
self._loss_acceptance_type,
- traffic_type))
+ traffic_type, skip_warmup=skip_warmup))
res = self._get_res_based_on_search_type(res)
# loss occurred and it was above acceptance criteria
if not res:
- self.binary_search(b_min, rate, traffic_type, True)
+ self.binary_search(b_min, rate, traffic_type, True, True)
# there was no loss / loss below acceptance criteria
else:
self._search_result_rate = rate
- self.binary_search(rate, b_max, traffic_type, True)
+ self.binary_search(rate, b_max, traffic_type, True, True)
def combined_search(self, start_rate, traffic_type):
"""Combined search of rate with loss below acceptance criteria.
:param traffic_type: Traffic profile.
:type start_rate: float
:type traffic_type: str
- :return: nothing
+ :returns: nothing
+ :raises RuntimeError: If linear search failed.
"""
self.linear_search(start_rate, traffic_type)
:type num_b: float
:type rel_tol: float
:type abs_tol: float
- :return: Returns True if num_a is close in value to num_b or equal.
+ :returns: Returns True if num_a is close in value to num_b or equal.
False otherwise.
:rtype: boolean
+ :raises ValueError: If input values are not valid.
"""
if num_a == num_b:
return abs(num_b - num_a) <= max(rel_tol * max(abs(num_a), abs(num_b)),
abs_tol)
-