-# Copyright (c) 2016 Cisco and/or its affiliates.
+# Copyright (c) 2021 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@unique
class SearchDirection(Enum):
"""Direction of linear search."""
-
TOP_DOWN = 1
BOTTOM_UP = 2
@unique
class SearchResults(Enum):
"""Result of the drop rate search."""
-
SUCCESS = 1
FAILURE = 2
SUSPICIOUS = 3
@unique
class RateType(Enum):
"""Type of rate units."""
-
PERCENTAGE = 1
PACKETS_PER_SECOND = 2
BITS_PER_SECOND = 3
@unique
class LossAcceptanceType(Enum):
"""Type of the loss acceptance criteria."""
-
FRAMES = 1
PERCENTAGE = 2
@unique
class SearchResultType(Enum):
"""Type of search result evaluation."""
-
BEST_OF_N = 1
WORST_OF_N = 2
-class DropRateSearch(object):
+class DropRateSearch(metaclass=ABCMeta):
"""Abstract class with search algorithm implementation."""
- __metaclass__ = ABCMeta
-
def __init__(self):
# duration of traffic run (binary, linear)
self._duration = 60
# permitted values: LossAcceptanceType
self._loss_acceptance_type = LossAcceptanceType.FRAMES
# size of frames to send
- self._frame_size = "64"
- # binary convergence criterium type is self._rate_type
+ self._frame_size = u"64"
+ # binary convergence criterion type is self._rate_type
self._binary_convergence_threshold = 5000
# numbers of traffic runs during one rate step
self._max_attempts = 1
:returns: Latency stats.
:rtype: list
"""
- pass
@abstractmethod
- def measure_loss(self, rate, frame_size, loss_acceptance,
- loss_acceptance_type, traffic_type, skip_warmup=False):
+ def measure_loss(
+ self, rate, frame_size, loss_acceptance, loss_acceptance_type,
+ traffic_profile, skip_warmup=False):
"""Send traffic from TG and measure count of dropped frames.
:param rate: Offered traffic load.
:param frame_size: Size of frame.
:param loss_acceptance: Permitted drop ratio or frames count.
:param loss_acceptance_type: Type of permitted loss.
- :param traffic_type: Traffic profile ([2,3]-node-L[2,3], ...).
+ :param traffic_profile: Module name to use for traffic generation.
:param skip_warmup: Start TRex without warmup traffic if true.
- :type rate: int
+ :type rate: float
:type frame_size: str
:type loss_acceptance: float
:type loss_acceptance_type: LossAcceptanceType
- :type traffic_type: str
- :type traffic_type: bool
+ :type traffic_profile: str
+ :type skip_warmup: bool
:returns: Drop threshold exceeded? (True/False)
- :rtype bool
+ :rtype: bool
"""
- pass
def set_search_rate_boundaries(self, max_rate, min_rate):
"""Set search boundaries: min,max.
:type max_rate: float
:type min_rate: float
:returns: nothing
- :raises: ValueError if min rate is lower than 0 and higher than max rate
+ :raises ValueError: If min rate is lower than 0 or higher than max rate.
"""
if float(min_rate) <= 0:
- raise ValueError("min_rate must be higher than 0")
+ msg = u"min_rate must be higher than 0"
elif float(min_rate) > float(max_rate):
- raise ValueError("min_rate must be lower than max_rate")
+ msg = u"min_rate must be lower than max_rate"
else:
self._rate_max = float(max_rate)
self._rate_min = float(min_rate)
+ return
+ raise ValueError(msg)
def set_loss_acceptance(self, loss_acceptance):
- """Set loss acceptance treshold for PDR search.
+ """Set loss acceptance threshold for PDR search.
- :param loss_acceptance: Loss acceptance treshold for PDR search.
+ :param loss_acceptance: Loss acceptance threshold for PDR search.
:type loss_acceptance: str
:returns: nothing
- :raises: ValueError if loss acceptance is lower than zero
+ :raises ValueError: If loss acceptance is lower than zero.
"""
- if float(loss_acceptance) < 0:
- raise ValueError("Loss acceptance must be higher or equal 0")
- else:
+ if float(loss_acceptance) >= 0:
self._loss_acceptance = float(loss_acceptance)
+ else:
+ raise ValueError(u"Loss acceptance must be higher or equal 0")
def get_loss_acceptance(self):
- """Return configured loss acceptance treshold.
+ """Return configured loss acceptance threshold.
- :returns: Loss acceptance treshold.
+ :returns: Loss acceptance threshold.
:rtype: float
"""
return self._loss_acceptance
def set_loss_acceptance_type_percentage(self):
- """Set loss acceptance treshold type to percentage.
+ """Set loss acceptance threshold type to percentage.
:returns: nothing
"""
self._loss_acceptance_type = LossAcceptanceType.PERCENTAGE
def set_loss_acceptance_type_frames(self):
- """Set loss acceptance treshold type to frames.
+ """Set loss acceptance threshold type to frames.
:returns: nothing
"""
self._loss_acceptance_type = LossAcceptanceType.FRAMES
def loss_acceptance_type_is_percentage(self):
- """Return true if loss acceptance treshold type is percentage,
+ """Return true if loss acceptance threshold type is percentage,
false otherwise.
- :returns: True if loss acceptance treshold type is percentage.
+ :returns: True if loss acceptance threshold type is percentage.
:rtype: boolean
"""
return self._loss_acceptance_type == LossAcceptanceType.PERCENTAGE
:param rate_type: Type of rate to set.
:type rate_type: RateType
:returns: nothing
- :raises: Exception if rate type is unknown
+ :raises Exception: If rate type is unknown.
"""
- if rate_type not in RateType:
- raise Exception("rate_type unknown: {}".format(rate_type))
- else:
+ if rate_type in RateType:
self._rate_type = rate_type
+ else:
+ raise Exception(f"rate_type unknown: {rate_type}")
def set_search_frame_size(self, frame_size):
"""Set size of frames to send.
def set_binary_convergence_threshold(self, convergence):
"""Set convergence for binary search.
- :param convergence: Treshold value number.
+ :param convergence: Threshold value number.
:type convergence: float
:returns: nothing
"""
def get_binary_convergence_threshold(self):
"""Get convergence for binary search.
- :returns: Treshold value number.
+ :returns: Threshold value number.
:rtype: float
"""
return self._binary_convergence_threshold
:returns: String representation of rate type.
:rtype: str
- :raises: ValueError if rate type is unknown
+ :raises ValueError: If rate type is unknown.
"""
if self._rate_type == RateType.PERCENTAGE:
- return "%"
+ retval = u"%"
elif self._rate_type == RateType.BITS_PER_SECOND:
- return "bps"
+ retval = u"bps"
elif self._rate_type == RateType.PACKETS_PER_SECOND:
- return "pps"
+ retval = u"pps"
else:
- raise ValueError("RateType unknown")
+ raise ValueError(u"RateType unknown")
+ return retval
def set_max_attempts(self, max_attempts):
"""Set maximum number of traffic runs during one rate step.
:param max_attempts: Number of traffic runs.
:type max_attempts: int
:returns: nothing
- :raises: ValueError if max attempts is lower than zero
+ :raises ValueError: If max attempts is lower than zero.
"""
if int(max_attempts) > 0:
self._max_attempts = int(max_attempts)
else:
- raise ValueError("Max attempt must by greater than zero")
+ raise ValueError(u"Max attempt must by greater than zero")
def get_max_attempts(self):
"""Return maximum number of traffic runs during one rate step.
:param search_type: Type of search result evaluation to set.
:type search_type: SearchResultType
:returns: nothing
- :raises: ValueError if search type is unknown
+ :raises ValueError: If search type is unknown.
"""
- if search_type not in SearchResultType:
- raise ValueError("search_type unknown: {}".format(search_type))
- else:
+ if search_type in SearchResultType:
self._search_result_type = search_type
+ else:
+ raise ValueError(f"search_type unknown: {search_type}")
@staticmethod
def _get_best_of_n(res_list):
:type res_list: list
:returns: Boolean based on search result type.
:rtype: boolean
- :raises: ValueError if search result type is unknown
+ :raises ValueError: If search result type is unknown.
"""
if self._search_result_type == SearchResultType.BEST_OF_N:
- return self._get_best_of_n(res_list)
+ retval = self._get_best_of_n(res_list)
elif self._search_result_type == SearchResultType.WORST_OF_N:
- return self._get_worst_of_n(res_list)
+ retval = self._get_worst_of_n(res_list)
else:
- raise ValueError("Unknown search result type")
+ raise ValueError(u"Unknown search result type")
+ return retval
- def linear_search(self, start_rate, traffic_type):
+ def linear_search(self, start_rate, traffic_profile):
"""Linear search of rate with loss below acceptance criteria.
:param start_rate: Initial rate.
- :param traffic_type: Traffic profile.
+ :param traffic_profile: Module name to use for traffic generation.
:type start_rate: float
- :type traffic_type: str
+ :type traffic_profile: str
:returns: nothing
- :raises: ValueError if start rate is not in range
+ :raises ValueError: If start rate is not in range.
"""
-
if not self._rate_min <= float(start_rate) <= self._rate_max:
- raise ValueError("Start rate is not in min,max range")
+ raise ValueError(u"Start rate is not in min,max range")
rate = float(start_rate)
# the last but one step
while True:
res = []
for dummy in range(self._max_attempts):
- res.append(self.measure_loss(rate, self._frame_size,
- self._loss_acceptance,
- self._loss_acceptance_type,
- traffic_type))
+ res.append(
+ self.measure_loss(
+ rate, self._frame_size, self._loss_acceptance,
+ self._loss_acceptance_type, traffic_profile
+ )
+ )
res = self._get_res_based_on_search_type(res)
- if self._search_linear_direction == SearchDirection.BOTTOM_UP:
- # loss occurred and it was above acceptance criteria
- if not res:
- # if this is first run then we didn't find drop rate
- if prev_rate is None:
- self._search_result = SearchResults.FAILURE
- self._search_result_rate = None
- # else we found the rate, which is value from previous run
- else:
- self._search_result = SearchResults.SUCCESS
- self._search_result_rate = prev_rate
- return
- # there was no loss / loss below acceptance criteria
- elif res:
- prev_rate = rate
- rate += self._rate_linear_step
- if rate > self._rate_max:
- if prev_rate != self._rate_max:
- # one last step with rate set to _rate_max
- rate = self._rate_max
- continue
- else:
- self._search_result = SearchResults.SUCCESS
- self._search_result_rate = prev_rate
- return
- else:
- continue
- else:
- raise RuntimeError("Unknown search result")
-
- elif self._search_linear_direction == SearchDirection.TOP_DOWN:
+ if self._search_linear_direction == SearchDirection.TOP_DOWN:
# loss occurred, decrease rate
if not res:
prev_rate = rate
# one last step with rate set to _rate_min
rate = self._rate_min
continue
- else:
- self._search_result = SearchResults.FAILURE
- self._search_result_rate = None
- return
- else:
- continue
+ self._search_result = SearchResults.FAILURE
+ self._search_result_rate = None
+ return
+ continue
# no loss => non/partial drop rate found
elif res:
self._search_result = SearchResults.SUCCESS
self._search_result_rate = rate
return
- else:
- raise RuntimeError("Unknown search result")
- else:
- raise Exception("Unknown search direction")
-
- raise Exception("Wrong codepath")
+ raise RuntimeError(u"Unknown search result")
+ raise Exception(u"Unknown search direction")
def verify_search_result(self):
"""Fail if search was not successful.
:returns: Result rate and latency stats.
:rtype: tuple
- :raises: Exception if search failed
+ :raises Exception: If search failed.
"""
- if self._search_result == SearchResults.FAILURE:
- raise Exception('Search FAILED')
- elif self._search_result in [SearchResults.SUCCESS,
- SearchResults.SUSPICIOUS]:
+ if self._search_result in \
+ [SearchResults.SUCCESS, SearchResults.SUSPICIOUS]:
return self._search_result_rate, self.get_latency()
+ raise Exception(u"Search FAILED")
- def binary_search(self, b_min, b_max, traffic_type, skip_max_rate=False,
- skip_warmup=False):
+ def binary_search(
+ self, b_min, b_max, traffic_profile, skip_max_rate=False,
+ skip_warmup=False):
"""Binary search of rate with loss below acceptance criteria.
:param b_min: Min range rate.
:param b_max: Max range rate.
- :param traffic_type: Traffic profile.
+ :param traffic_profile: Module name to use for traffic generation.
:param skip_max_rate: Start with max rate first
:param skip_warmup: Start TRex without warmup traffic if true.
:type b_min: float
:type b_max: float
- :type traffic_type: str
+ :type traffic_profile: str
:type skip_max_rate: bool
:type skip_warmup: bool
:returns: nothing
- :raises: ValueError if input values are not valid
+ :raises ValueError: If input values are not valid.
"""
-
if not self._rate_min <= float(b_min) <= self._rate_max:
- raise ValueError("Min rate is not in min,max range")
+ raise ValueError(u"Min rate is not in min,max range")
if not self._rate_min <= float(b_max) <= self._rate_max:
- raise ValueError("Max rate is not in min,max range")
+ raise ValueError(u"Max rate is not in min,max range")
if float(b_max) < float(b_min):
- raise ValueError("Min rate is greater than max rate")
+ raise ValueError(u"Min rate is greater than max rate")
# rate is half of interval + start of interval if not using max rate
rate = ((float(b_max) - float(b_min)) / 2) + float(b_min) \
res = []
for dummy in range(self._max_attempts):
- res.append(self.measure_loss(rate, self._frame_size,
- self._loss_acceptance,
- self._loss_acceptance_type,
- traffic_type, skip_warmup=skip_warmup))
+ res.append(self.measure_loss(
+ rate, self._frame_size, self._loss_acceptance,
+ self._loss_acceptance_type, traffic_profile,
+ skip_warmup=skip_warmup
+ ))
res = self._get_res_based_on_search_type(res)
# loss occurred and it was above acceptance criteria
if not res:
- self.binary_search(b_min, rate, traffic_type, True, True)
+ self.binary_search(b_min, rate, traffic_profile, True, True)
# there was no loss / loss below acceptance criteria
else:
self._search_result_rate = rate
- self.binary_search(rate, b_max, traffic_type, True, True)
+ self.binary_search(rate, b_max, traffic_profile, True, True)
- def combined_search(self, start_rate, traffic_type):
+ def combined_search(self, start_rate, traffic_profile):
"""Combined search of rate with loss below acceptance criteria.
:param start_rate: Initial rate.
- :param traffic_type: Traffic profile.
+ :param traffic_profile: Module name to use for traffic generation.
:type start_rate: float
- :type traffic_type: str
+ :type traffic_profile: str
:returns: nothing
- :raises: RuntimeError if linear search failed
+ :raises RuntimeError: If linear search failed.
"""
+ self.linear_search(start_rate, traffic_profile)
- self.linear_search(start_rate, traffic_type)
-
- if self._search_result in [SearchResults.SUCCESS,
- SearchResults.SUSPICIOUS]:
+ if self._search_result in \
+ [SearchResults.SUCCESS, SearchResults.SUSPICIOUS]:
b_min = self._search_result_rate
b_max = self._search_result_rate + self._rate_linear_step
self._search_result_rate = None
# we will use binary search to refine search in one linear step
- self.binary_search(b_min, b_max, traffic_type, True)
+ self.binary_search(b_min, b_max, traffic_profile, True)
+
- # linear and binary search succeed
- if self._search_result == SearchResults.SUCCESS:
- return
# linear search succeed but binary failed or suspicious
- else:
+ if self._search_result != SearchResults.SUCCESS:
self._search_result = SearchResults.SUSPICIOUS
self._search_result_rate = temp_rate
+ # linear and binary search succeed
+ else:
+ return
else:
- raise RuntimeError("Linear search FAILED")
+ raise RuntimeError(u"Linear search FAILED")
@staticmethod
def floats_are_close_equal(num_a, num_b, rel_tol=1e-9, abs_tol=0.0):
:param num_a: First number to compare.
:param num_b: Second number to compare.
- :param rel_tol=1e-9: The relative tolerance.
- :param abs_tol=0.0: The minimum absolute tolerance level.
+ :param rel_tol: The relative tolerance.
+ :param abs_tol: The minimum absolute tolerance level. (Optional,
+ default value: 0.0)
:type num_a: float
:type num_b: float
:type rel_tol: float
:type abs_tol: float
:returns: Returns True if num_a is close in value to num_b or equal.
- False otherwise.
+ False otherwise.
:rtype: boolean
- :raises: ValueError if input values are not valid
+ :raises ValueError: If input values are not valid.
"""
-
if num_a == num_b:
return True
if rel_tol < 0.0 or abs_tol < 0.0:
- raise ValueError('Error tolerances must be non-negative')
+ raise ValueError(u"Error tolerances must be non-negative")
- return abs(num_b - num_a) <= max(rel_tol * max(abs(num_a), abs(num_b)),
- abs_tol)
+ return abs(num_b - num_a) <= max(
+ rel_tol * max(abs(num_a), abs(num_b)), abs_tol
+ )