X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2FTrafficGenerator.py;h=fa645c32d4a41b0c0452fbd6fe0453c19d20e77a;hb=e2c0d2e1481f899ba82a3892694a04a127668b2b;hp=80248add038203463b2c99f0fca9d66ad10ae845;hpb=023fa41e51c966a1956bda6b915ffd894ff10e84;p=csit.git diff --git a/resources/libraries/python/TrafficGenerator.py b/resources/libraries/python/TrafficGenerator.py index 80248add03..fa645c32d4 100644 --- a/resources/libraries/python/TrafficGenerator.py +++ b/resources/libraries/python/TrafficGenerator.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020 Cisco and/or its affiliates. +# Copyright (c) 2023 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -13,13 +13,13 @@ """Performance testing traffic generator library.""" +import math import time from robot.api import logger from robot.libraries.BuiltIn import BuiltIn from .Constants import Constants -from .CpuUtils import CpuUtils from .DropRateSearch import DropRateSearch from .MLRsearch.AbstractMeasurer import AbstractMeasurer from .MLRsearch.MultipleLossRatioSearch import MultipleLossRatioSearch @@ -30,6 +30,8 @@ from .ssh import exec_cmd_no_error, exec_cmd from .topology import NodeType from .topology import NodeSubTypeTG from .topology import Topology +from .TRexConfigGenerator import TrexConfig +from .DUTSetup import DUTSetup as DS __all__ = [u"TGDropRateSearchImpl", u"TrafficGenerator", u"OptimizedSearch"] @@ -127,18 +129,13 @@ class TrexMode: STL = u"STL" -# TODO: Pylint says too-many-instance-attributes. class TrafficGenerator(AbstractMeasurer): """Traffic Generator.""" - # TODO: Remove "trex" from lines which could work with other TGs. - # Use one instance of TrafficGenerator for all tests in test suite ROBOT_LIBRARY_SCOPE = u"TEST SUITE" def __init__(self): - # TODO: Separate into few dataclasses/dicts. - # Pylint dislikes large unstructured state, and it is right. self._node = None self._mode = None # TG interface order mapping @@ -171,9 +168,13 @@ class TrafficGenerator(AbstractMeasurer): self.sleep_till_duration = None self.transaction_type = None self.duration_limit = None + self.ramp_up_start = None + self.ramp_up_stop = None + self.ramp_up_rate = None + self.ramp_up_duration = None + self.state_timeout = None # Transient data needed for async measurements. - self._xstats = (None, None) - # TODO: Rename "xstats" to something opaque, so T-Rex is not privileged? + self._xstats = () @property def node(self): @@ -246,101 +247,96 @@ class TrafficGenerator(AbstractMeasurer): f"{self._node[u'subtype']} not running in {expected_mode} mode!" ) - # TODO: pylint says disable=too-many-locals. - def initialize_traffic_generator( - self, tg_node, tg_if1, tg_if2, tg_if1_adj_node, tg_if1_adj_if, - tg_if2_adj_node, tg_if2_adj_if, osi_layer, tg_if1_dst_mac=None, - tg_if2_dst_mac=None): - """TG initialization. + @staticmethod + def get_tg_type(tg_node): + """Log and return the installed traffic generator type. - TODO: Document why do we need (and how do we use) _ifaces_reordered. + :param tg_node: Node from topology file. + :type tg_node: dict + :returns: Traffic generator type string. + :rtype: str + :raises RuntimeError: If command returns nonzero return code. + """ + return str(check_subtype(tg_node)) - :param tg_node: Traffic generator node. - :param tg_if1: TG - name of first interface. - :param tg_if2: TG - name of second interface. - :param tg_if1_adj_node: TG if1 adjecent node. - :param tg_if1_adj_if: TG if1 adjecent interface. - :param tg_if2_adj_node: TG if2 adjecent node. - :param tg_if2_adj_if: TG if2 adjecent interface. - :param osi_layer: 'L2', 'L3' or 'L7' - OSI Layer testing type. - :param tg_if1_dst_mac: Interface 1 destination MAC address. - :param tg_if2_dst_mac: Interface 2 destination MAC address. + @staticmethod + def get_tg_version(tg_node): + """Log and return the installed traffic generator version. + + :param tg_node: Node from topology file. :type tg_node: dict - :type tg_if1: str - :type tg_if2: str - :type tg_if1_adj_node: dict - :type tg_if1_adj_if: str - :type tg_if2_adj_node: dict - :type tg_if2_adj_if: str - :type osi_layer: str - :type tg_if1_dst_mac: str - :type tg_if2_dst_mac: str - :returns: nothing - :raises RuntimeError: In case of issue during initialization. + :returns: Traffic generator version string. + :rtype: str + :raises RuntimeError: If command returns nonzero return code. """ subtype = check_subtype(tg_node) if subtype == NodeSubTypeTG.TREX: - self._node = tg_node - self._mode = TrexMode.ASTF if osi_layer == u"L7" else TrexMode.STL - if1 = dict() - if2 = dict() - if1[u"pci"] = Topology().get_interface_pci_addr(self._node, tg_if1) - if2[u"pci"] = Topology().get_interface_pci_addr(self._node, tg_if2) - if1[u"addr"] = Topology().get_interface_mac(self._node, tg_if1) - if2[u"addr"] = Topology().get_interface_mac(self._node, tg_if2) - - if osi_layer == u"L2": - if1[u"adj_addr"] = if2[u"addr"] - if2[u"adj_addr"] = if1[u"addr"] - elif osi_layer in (u"L3", u"L7"): - if1[u"adj_addr"] = Topology().get_interface_mac( - tg_if1_adj_node, tg_if1_adj_if - ) - if2[u"adj_addr"] = Topology().get_interface_mac( - tg_if2_adj_node, tg_if2_adj_if - ) - else: - raise ValueError(u"Unknown OSI layer!") + command = f"cat {Constants.TREX_INSTALL_DIR}/VERSION" + message = u"Get T-Rex version failed!" + stdout, _ = exec_cmd_no_error(tg_node, command, message=message) + return stdout.strip() + return "none" - # in case of switched environment we can override MAC addresses - if tg_if1_dst_mac is not None and tg_if2_dst_mac is not None: - if1[u"adj_addr"] = tg_if1_dst_mac - if2[u"adj_addr"] = tg_if2_dst_mac + def initialize_traffic_generator(self, osi_layer, parallel_links=1): + """TG initialization. - if min(if1[u"pci"], if2[u"pci"]) != if1[u"pci"]: - if1, if2 = if2, if1 - self._ifaces_reordered = True + :param osi_layer: 'L2', 'L3' or 'L7' - OSI Layer testing type. + :param parallel_links: Number of parallel links to configure. + :type osi_layer: str + :type parallel_links: int + :raises ValueError: If OSI layer is unknown. + """ + if osi_layer not in ("L2", "L3", "L7"): + raise ValueError("Unknown OSI layer!") - master_thread_id, latency_thread_id, socket, threads = \ - CpuUtils.get_affinity_trex( - self._node, tg_if1, tg_if2, - tg_dtc=Constants.TREX_CORE_COUNT) + topology = BuiltIn().get_variable_value("&{topology_info}") + self._node = topology["TG"] + subtype = check_subtype(self._node) - if osi_layer in (u"L2", u"L3", u"L7"): - exec_cmd_no_error( - self._node, - f"sh -c 'cat << EOF > /etc/trex_cfg.yaml\n" - f"- version: 2\n" - f" c: {len(threads)}\n" - f" limit_memory: {Constants.TREX_LIMIT_MEMORY}\n" - f" interfaces: [\"{if1[u'pci']}\",\"{if2[u'pci']}\"]\n" - f" port_info:\n" - f" - dest_mac: \'{if1[u'adj_addr']}\'\n" - f" src_mac: \'{if1[u'addr']}\'\n" - f" - dest_mac: \'{if2[u'adj_addr']}\'\n" - f" src_mac: \'{if2[u'addr']}\'\n" - f" platform :\n" - f" master_thread_id: {master_thread_id}\n" - f" latency_thread_id: {latency_thread_id}\n" - f" dual_if:\n" - f" - socket: {socket}\n" - f" threads: {threads}\n" - f"EOF'", - sudo=True, message=u"T-Rex config generation!" - ) - else: - raise ValueError(u"Unknown OSI layer!") + if subtype == NodeSubTypeTG.TREX: + trex_topology = list() + self._mode = TrexMode.ASTF if osi_layer == "L7" else TrexMode.STL + + for link in range(1, parallel_links*2, 2): + tg_if1_adj_addr = topology[f"TG_pf{link+1}_mac"][0] + tg_if2_adj_addr = topology[f"TG_pf{link}_mac"][0] + if osi_layer in ("L3", "L7") and "DUT1" in topology.keys(): + ifl = BuiltIn().get_variable_value("${int}") + last = topology["duts_count"] + tg_if1_adj_addr = Topology().get_interface_mac( + topology["DUT1"], + BuiltIn().get_variable_value( + f"${{DUT1_{ifl}{link}}}[0]" + ) + ) + tg_if2_adj_addr = Topology().get_interface_mac( + topology[f"DUT{last}"], + BuiltIn().get_variable_value( + f"${{DUT{last}_{ifl}{link+1}}}[0]" + ) + ) + trex_topology.append( + dict( + interface=topology[f"TG_pf{link}"][0], + dst_mac=tg_if1_adj_addr + ) + ) + trex_topology.append( + dict( + interface=topology[f"TG_pf{link+1}"][0], + dst_mac=tg_if2_adj_addr + ) + ) + if1_pci = topology[f"TG_pf{link}_pci"][0] + if2_pci = topology[f"TG_pf{link+1}_pci"][0] + if min(if1_pci, if2_pci) != if1_pci: + self._ifaces_reordered = True + trex_topology.reverse() + + TrexConfig.add_startup_configuration( + self._node, trex_topology + ) TrafficGenerator.startup_trex( self._node, osi_layer, subtype=subtype ) @@ -368,18 +364,27 @@ class TrafficGenerator(AbstractMeasurer): tg_node, cmd, sudo=True, message=u"Kill TRex failed!" ) - # Configure TRex. - ports = '' + # Prepare interfaces for TRex. + tg_port_drv = Constants.TREX_PORT_DRIVER + mlx_driver = u"" for port in tg_node[u"interfaces"].values(): - if u'Mellanox' not in port.get(u'model'): - ports += f" {port.get(u'pci_address')}" - - cmd = f"sh -c \"cd {Constants.TREX_INSTALL_DIR}/scripts/ && " \ - f"./dpdk_nic_bind.py -u {ports} || true\"" - exec_cmd_no_error( - tg_node, cmd, sudo=True, - message=u"Unbind PCI ports from driver failed!" - ) + if u"Mellanox" in port.get(u"model"): + mlx_driver = port.get(u"driver") + pci_addr = port.get(u'pci_address') + cur_driver = DS.get_pci_dev_driver(tg_node, pci_addr) + if cur_driver == mlx_driver: + pass + elif not cur_driver: + DS.pci_driver_bind(tg_node, pci_addr, mlx_driver) + else: + DS.pci_driver_unbind(tg_node, pci_addr) + DS.pci_driver_bind(tg_node, pci_addr, mlx_driver) + else: + pci_addr = port.get(u'pci_address') + cur_driver = DS.get_pci_dev_driver(tg_node, pci_addr) + if cur_driver: + DS.pci_driver_unbind(tg_node, pci_addr) + DS.pci_driver_bind(tg_node, pci_addr, tg_port_drv) # Start TRex. cd_cmd = f"cd '{Constants.TREX_INSTALL_DIR}/scripts/'" @@ -404,12 +409,11 @@ class TrafficGenerator(AbstractMeasurer): raise RuntimeError(u"Start TRex failed!") # Test T-Rex API responsiveness. - cmd = u"python3" - cmd += f" {Constants.REMOTE_FW_DIR}/GPL/tools/trex/" + cmd = f"python3 {Constants.REMOTE_FW_DIR}/GPL/tools/trex/" if osi_layer in (u"L2", u"L3"): - cmd += f"trex_stl_assert.py" + cmd += u"trex_stl_assert.py" elif osi_layer == u"L7": - cmd += f"trex_astf_assert.py" + cmd += u"trex_astf_assert.py" else: raise ValueError(u"Unknown OSI layer!") try: @@ -494,11 +498,11 @@ class TrafficGenerator(AbstractMeasurer): command_line = OptionString().add(u"python3") dirname = f"{Constants.REMOTE_FW_DIR}/GPL/tools/trex" command_line.add(f"'{dirname}/trex_stl_stop.py'") - command_line.change_prefix(u"--") - for index, value in enumerate(self._xstats): + command_line.add("--xstat") + for value in self._xstats: if value is not None: - value = value.replace(u"'", u"\"") - command_line.add_equals(f"xstat{index}", f"'{value}'") + value = value.replace("'", "\"") + command_line.add(f"'{value}'") stdout, _ = exec_cmd_no_error( node, command_line, message=u"T-Rex STL runtime error!" @@ -523,7 +527,42 @@ class TrafficGenerator(AbstractMeasurer): raise ValueError(u"Unsupported T-Rex traffic profile!") self._stop_time = time.monotonic() - return self.get_measurement_result() + return self._get_measurement_result() + + def _compute_duration(self, duration, multiplier): + """Compute duration for profile driver. + + The final result is influenced by transaction scale and duration limit. + It is assumed a higher level function has already set those to self. + The duration argument is the target value from search point of view, + before the overrides are applied here. + + Minus one (signalling async traffic start) is kept. + + Completeness flag is also included. Duration limited or async trials + are not considered complete for ramp-up purposes. + + :param duration: Time expressed in seconds for how long to send traffic. + :param multiplier: Traffic rate in transactions per second. + :type duration: float + :type multiplier: float + :returns: New duration and whether it was a complete ramp-up candidate. + :rtype: float, bool + """ + if duration < 0.0: + # Keep the async -1. + return duration, False + computed_duration = duration + if self.transaction_scale: + computed_duration = self.transaction_scale / multiplier + # Log the computed duration, + # so we can compare with what telemetry suggests + # the real duration was. + logger.debug(f"Expected duration {computed_duration}") + if not self.duration_limit: + return computed_duration, True + limited_duration = min(computed_duration, self.duration_limit) + return limited_duration, (limited_duration == computed_duration) def trex_astf_start_remote_exec( self, duration, multiplier, async_call=False): @@ -571,19 +610,7 @@ class TrafficGenerator(AbstractMeasurer): if not isinstance(duration, (float, int)): duration = float(duration) - # Duration logic. - computed_duration = duration - if duration > 0.0: - if self.transaction_scale: - computed_duration = self.transaction_scale / multiplier - # Log the computed duration, - # so we can compare with what telemetry suggests - # the real duration was. - logger.debug(f"Expected duration {computed_duration}") - computed_duration += 0.1115 - # Else keep -1. - if self.duration_limit: - computed_duration = min(computed_duration, self.duration_limit) + computed_duration, _ = self._compute_duration(duration, multiplier) command_line = OptionString().add(u"python3") dirname = f"{Constants.REMOTE_FW_DIR}/GPL/tools/trex" @@ -595,6 +622,9 @@ class TrafficGenerator(AbstractMeasurer): ) command_line.add_with_value(u"duration", f"{computed_duration!r}") command_line.add_with_value(u"frame_size", self.frame_size) + command_line.add_with_value( + u"n_data_frames", Constants.ASTF_N_DATA_FRAMES + ) command_line.add_with_value(u"multiplier", multiplier) command_line.add_with_value(u"port_0", p_0) command_line.add_with_value(u"port_1", p_1) @@ -604,6 +634,9 @@ class TrafficGenerator(AbstractMeasurer): command_line.add_if(u"async_start", async_call) command_line.add_if(u"latency", self.use_latency) command_line.add_if(u"force", Constants.TREX_SEND_FORCE) + command_line.add_with_value( + u"delay", Constants.PERF_TRIAL_ASTF_DELAY + ) self._start_time = time.monotonic() self._rate = multiplier @@ -620,7 +653,7 @@ class TrafficGenerator(AbstractMeasurer): self._sent = None self._loss = None self._latency = None - xstats = [None, None] + xstats = [] self._l7_data = dict() self._l7_data[u"client"] = dict() self._l7_data[u"client"][u"active_flows"] = None @@ -653,10 +686,8 @@ class TrafficGenerator(AbstractMeasurer): index = 0 for line in stdout.splitlines(): if f"Xstats snapshot {index}: " in line: - xstats[index] = line[19:] + xstats.append(line[19:]) index += 1 - if index == 2: - break self._xstats = tuple(xstats) else: self._target_duration = duration @@ -687,8 +718,8 @@ class TrafficGenerator(AbstractMeasurer): p_0, p_1 = (1, 0) if self._ifaces_reordered else (0, 1) if not isinstance(duration, (float, int)): duration = float(duration) - if self.duration_limit: - duration = min(duration, self.duration_limit) + + duration, _ = self._compute_duration(duration=duration, multiplier=rate) command_line = OptionString().add(u"python3") dirname = f"{Constants.REMOTE_FW_DIR}/GPL/tools/trex" @@ -709,8 +740,8 @@ class TrafficGenerator(AbstractMeasurer): command_line.add_if(u"async_start", async_call) command_line.add_if(u"latency", self.use_latency) command_line.add_if(u"force", Constants.TREX_SEND_FORCE) + command_line.add_with_value(u"delay", Constants.PERF_TRIAL_STL_DELAY) - # TODO: This is ugly. Handle parsing better. self._start_time = time.monotonic() self._rate = float(rate[:-3]) if u"pps" in rate else float(rate) stdout, _ = exec_cmd_no_error( @@ -727,14 +758,12 @@ class TrafficGenerator(AbstractMeasurer): self._loss = None self._latency = None - xstats = [None, None] + xstats = [] index = 0 for line in stdout.splitlines(): if f"Xstats snapshot {index}: " in line: - xstats[index] = line[19:] + xstats.append(line[19:]) index += 1 - if index == 2: - break self._xstats = tuple(xstats) else: self._target_duration = duration @@ -755,6 +784,10 @@ class TrafficGenerator(AbstractMeasurer): transaction_type=u"packet", duration_limit=0.0, use_latency=False, + ramp_up_rate=None, + ramp_up_duration=None, + state_timeout=240.0, + ramp_up_only=False, ): """Send traffic from all configured interfaces on TG. @@ -775,6 +808,8 @@ class TrafficGenerator(AbstractMeasurer): Bidirectional STL profiles are treated as transactions with two packets. + The return value is None for async. + :param duration: Duration of test traffic generation in seconds. :param rate: Traffic rate in transactions per second. :param frame_size: Frame size (L2) in Bytes. @@ -797,6 +832,10 @@ class TrafficGenerator(AbstractMeasurer): duration. :param use_latency: Whether to measure latency during the trial. Default: False. + :param ramp_up_rate: Rate to use in ramp-up trials [pps]. + :param ramp_up_duration: Duration of ramp-up trials [s]. + :param state_timeout: Time of life of DUT state [s]. + :param ramp_up_only: If true, do not perform main trial measurement. :type duration: float :type rate: float :type frame_size: str @@ -809,8 +848,12 @@ class TrafficGenerator(AbstractMeasurer): :type transaction_type: str :type duration_limit: float :type use_latency: bool + :type ramp_up_rate: float + :type ramp_up_duration: float + :type state_timeout: float + :type ramp_up_only: bool :returns: TG results. - :rtype: str + :rtype: ReceiveRateMeasurement or None :raises ValueError: If TG traffic profile is not supported. """ self.set_rate_provider_defaults( @@ -823,10 +866,19 @@ class TrafficGenerator(AbstractMeasurer): transaction_type=transaction_type, duration_limit=duration_limit, use_latency=use_latency, + ramp_up_rate=ramp_up_rate, + ramp_up_duration=ramp_up_duration, + state_timeout=state_timeout, + ) + return self._send_traffic_on_tg_with_ramp_up( + duration=duration, + rate=rate, + async_call=async_call, + ramp_up_only=ramp_up_only, ) - self._send_traffic_on_tg_internal(duration, rate, async_call) - def _send_traffic_on_tg_internal(self, duration, rate, async_call=False): + def _send_traffic_on_tg_internal( + self, duration, rate, async_call=False): """Send traffic from all configured interfaces on TG. This is an internal function, it assumes set_rate_provider_defaults @@ -838,6 +890,9 @@ class TrafficGenerator(AbstractMeasurer): need to specify their own values, and we do not want the measure call to overwrite them with defaults. + This function is used both for automated ramp-up trials + and for explicitly called trials. + :param duration: Duration of test traffic generation in seconds. :param rate: Traffic rate in transactions per second. :param async_call: Async mode. @@ -845,7 +900,7 @@ class TrafficGenerator(AbstractMeasurer): :type rate: float :type async_call: bool :returns: TG results. - :rtype: str + :rtype: ReceiveRateMeasurement or None :raises ValueError: If TG traffic profile is not supported. """ subtype = check_subtype(self._node) @@ -856,14 +911,108 @@ class TrafficGenerator(AbstractMeasurer): ) elif u"trex-stl" in self.traffic_profile: unit_rate_str = str(rate) + u"pps" - # TODO: Suport transaction_scale et al? self.trex_stl_start_remote_exec( duration, unit_rate_str, async_call ) else: raise ValueError(u"Unsupported T-Rex traffic profile!") - return self._result + return None if async_call else self._get_measurement_result() + + def _send_traffic_on_tg_with_ramp_up( + self, duration, rate, async_call=False, ramp_up_only=False): + """Send traffic from all interfaces on TG, maybe after ramp-up. + + This is an internal function, it assumes set_rate_provider_defaults + has been called to remember most values. + The reason why need to remember various values is that + the traffic can be asynchronous, and parsing needs those values. + The reason why this is a separate function from the one + which calls set_rate_provider_defaults is that some search algorithms + need to specify their own values, and we do not want the measure call + to overwrite them with defaults. + + If ramp-up tracking is detected, a computation is performed, + and if state timeout is near, trial at ramp-up rate and duration + is inserted before the main trial measurement. + + The ramp_up_only parameter forces a ramp-up without immediate + trial measurement, which is useful in case self remembers + a previous ramp-up trial that belongs to a different test (phase). + + Return None if trial is async or ramp-up only. + + :param duration: Duration of test traffic generation in seconds. + :param rate: Traffic rate in transactions per second. + :param async_call: Async mode. + :param ramp_up_only: If true, do not perform main trial measurement. + :type duration: float + :type rate: float + :type async_call: bool + :type ramp_up_only: bool + :returns: TG results. + :rtype: ReceiveRateMeasurement or None + :raises ValueError: If TG traffic profile is not supported. + """ + complete = False + if self.ramp_up_rate: + # Figure out whether we need to insert a ramp-up trial. + if ramp_up_only or self.ramp_up_start is None: + # We never ramped up yet (at least not in this test case). + ramp_up_needed = True + else: + # We ramped up before, but maybe it was too long ago. + # Adding a constant overhead to be safe. + time_now = time.monotonic() + 1.0 + computed_duration, complete = self._compute_duration( + duration=duration, + multiplier=rate, + ) + # There are two conditions for inserting ramp-up. + # If early sessions are expiring already, + # or if late sessions are to expire before measurement is over. + ramp_up_start_delay = time_now - self.ramp_up_start + ramp_up_stop_delay = time_now - self.ramp_up_stop + ramp_up_stop_delay += computed_duration + bigger_delay = max(ramp_up_start_delay, ramp_up_stop_delay) + # Final boolean decision. + ramp_up_needed = (bigger_delay >= self.state_timeout) + if ramp_up_needed: + logger.debug( + u"State may time out during next real trial, " + u"inserting a ramp-up trial." + ) + self.ramp_up_start = time.monotonic() + self._send_traffic_on_tg_internal( + duration=self.ramp_up_duration, + rate=self.ramp_up_rate, + async_call=async_call, + ) + self.ramp_up_stop = time.monotonic() + logger.debug(u"Ramp-up done.") + else: + logger.debug( + u"State will probably not time out during next real trial, " + u"no ramp-up trial needed just yet." + ) + if ramp_up_only: + return None + trial_start = time.monotonic() + result = self._send_traffic_on_tg_internal( + duration=duration, + rate=rate, + async_call=async_call, + ) + trial_end = time.monotonic() + if self.ramp_up_rate: + # Optimization: No loss acts as a good ramp-up, if it was complete. + if complete and result is not None and result.loss_count == 0: + logger.debug(u"Good trial acts as a ramp-up") + self.ramp_up_start = trial_start + self.ramp_up_stop = trial_end + else: + logger.debug(u"Loss or incomplete, does not act as a ramp-up.") + return result def no_traffic_loss_occurred(self): """Fail if loss occurred in traffic run. @@ -879,14 +1028,12 @@ class TrafficGenerator(AbstractMeasurer): def fail_if_no_traffic_forwarded(self): """Fail if no traffic forwarded. - TODO: Check number of passed transactions instead. - :returns: nothing :raises Exception: If no traffic forwarded. """ if self._received is None: raise RuntimeError(u"The traffic generation has not been issued") - if self._received == u"0": + if self._received == 0: raise RuntimeError(u"No traffic forwarded") def partial_traffic_loss_accepted( @@ -1030,7 +1177,7 @@ class TrafficGenerator(AbstractMeasurer): self._l7_data[u"server"][u"tcp"][u"rx_bytes"] = \ int(self._result.get(u"server_tcp_rx_bytes", 0)) - def get_measurement_result(self): + def _get_measurement_result(self): """Return the result of last measurement as ReceiveRateMeasurement. Separate function, as measurements can end either by time @@ -1039,9 +1186,7 @@ class TrafficGenerator(AbstractMeasurer): The target_tr field of ReceiveRateMeasurement is in transactions per second. Transmit count and loss count units depend on the transaction type. Usually they are in transactions - per second, or aggregate packets per second. - - TODO: Fail on running or already reported measurement. + per second, or aggregated packets per second. :returns: Structure containing the result of the measurement. :rtype: ReceiveRateMeasurement @@ -1072,16 +1217,27 @@ class TrafficGenerator(AbstractMeasurer): if not target_duration: target_duration = approximated_duration transmit_rate = self._rate + unsent = 0 if self.transaction_type == u"packet": partial_attempt_count = self._sent - expected_attempt_count = self._sent - fail_count = self._loss + packet_rate = transmit_rate * self.ppta + # We have a float. TRex way of rounding it is not obvious. + # The biggest source of mismatch is Inter Stream Gap. + # So the code tolerates 10 usec of missing packets. + expected_attempt_count = (target_duration - 1e-5) * packet_rate + expected_attempt_count = math.ceil(expected_attempt_count) + # TRex can send more. + expected_attempt_count = max(expected_attempt_count, self._sent) + unsent = expected_attempt_count - self._sent + pass_count = self._received + fail_count = expected_attempt_count - pass_count elif self.transaction_type == u"udp_cps": if not self.transaction_scale: raise RuntimeError(u"Add support for no-limit udp_cps.") partial_attempt_count = self._l7_data[u"client"][u"sent"] # We do not care whether TG is slow, it should have attempted all. expected_attempt_count = self.transaction_scale + unsent = expected_attempt_count - partial_attempt_count pass_count = self._l7_data[u"client"][u"received"] fail_count = expected_attempt_count - pass_count elif self.transaction_type == u"tcp_cps": @@ -1091,15 +1247,19 @@ class TrafficGenerator(AbstractMeasurer): partial_attempt_count = ctca # We do not care whether TG is slow, it should have attempted all. expected_attempt_count = self.transaction_scale - # TODO: Is there a better packet-based counter? - pass_count = self._l7_data[u"server"][u"tcp"][u"connects"] + unsent = expected_attempt_count - partial_attempt_count + # From TCP point of view, server/connects counts full connections, + # but we are testing NAT session so client/connects counts that + # (half connections from TCP point of view). + pass_count = self._l7_data[u"client"][u"tcp"][u"connects"] fail_count = expected_attempt_count - pass_count elif self.transaction_type == u"udp_pps": if not self.transaction_scale: raise RuntimeError(u"Add support for no-limit udp_pps.") partial_attempt_count = self._sent expected_attempt_count = self.transaction_scale * self.ppta - fail_count = self._loss + (expected_attempt_count - self._sent) + unsent = expected_attempt_count - self._sent + fail_count = self._loss + unsent elif self.transaction_type == u"tcp_pps": if not self.transaction_scale: raise RuntimeError(u"Add support for no-limit tcp_pps.") @@ -1112,9 +1272,13 @@ class TrafficGenerator(AbstractMeasurer): # A simple workaround is to add absolute difference. # Probability of retransmissions exactly cancelling # packets unsent due to duration stretching is quite low. - fail_count = self._loss + abs(expected_attempt_count - self._sent) + unsent = abs(expected_attempt_count - self._sent) + fail_count = self._loss + unsent else: raise RuntimeError(f"Unknown parsing {self.transaction_type!r}") + if unsent and isinstance(self._approximated_duration, float): + # Do not report unsent for "manual". + logger.debug(f"Unsent packets/transactions: {unsent}") if fail_count < 0 and not self.negative_loss: fail_count = 0 measurement = ReceiveRateMeasurement( @@ -1156,19 +1320,16 @@ class TrafficGenerator(AbstractMeasurer): time_stop = time_start + duration if self.resetter: self.resetter() - self._send_traffic_on_tg_internal( + result = self._send_traffic_on_tg_with_ramp_up( duration=duration, rate=transmit_rate, async_call=False, ) - result = self.get_measurement_result() logger.debug(f"trial measurement result: {result!r}") # In PLRsearch, computation needs the specified time to complete. if self.sleep_till_duration: sleeptime = time_stop - time.monotonic() if sleeptime > 0.0: - # TODO: Sometimes we have time to do additional trials here, - # adapt PLRsearch to accept all the results. time.sleep(sleeptime) return result @@ -1186,6 +1347,9 @@ class TrafficGenerator(AbstractMeasurer): negative_loss=True, sleep_till_duration=False, use_latency=False, + ramp_up_rate=None, + ramp_up_duration=None, + state_timeout=240.0, ): """Store values accessed by measure(). @@ -1206,7 +1370,6 @@ class TrafficGenerator(AbstractMeasurer): :param transaction_type: An identifier specifying which counters and formulas to use when computing attempted and failed transactions. Default: "packet". - TODO: Does this also specify parsing for the measured duration? :param duration_limit: Zero or maximum limit for computed (or given) duration. :param negative_loss: If false, negative loss is reported as zero loss. @@ -1214,6 +1377,9 @@ class TrafficGenerator(AbstractMeasurer): sleep until it matches duration. Needed for PLRsearch. :param use_latency: Whether to measure latency during the trial. Default: False. + :param ramp_up_rate: Rate to use in ramp-up trials [pps]. + :param ramp_up_duration: Duration of ramp-up trials [s]. + :param state_timeout: Time of life of DUT state [s]. :type frame_size: str or int :type traffic_profile: str :type ppta: int @@ -1226,6 +1392,9 @@ class TrafficGenerator(AbstractMeasurer): :type negative_loss: bool :type sleep_till_duration: bool :type use_latency: bool + :type ramp_up_rate: float + :type ramp_up_duration: float + :type state_timeout: float """ self.frame_size = frame_size self.traffic_profile = str(traffic_profile) @@ -1239,13 +1408,16 @@ class TrafficGenerator(AbstractMeasurer): self.negative_loss = bool(negative_loss) self.sleep_till_duration = bool(sleep_till_duration) self.use_latency = bool(use_latency) + self.ramp_up_rate = float(ramp_up_rate) + self.ramp_up_duration = float(ramp_up_duration) + self.state_timeout = float(state_timeout) class OptimizedSearch: """Class to be imported as Robot Library, containing search keywords. Aside of setting up measurer and forwarding arguments, - the main business is to translate min/max rate from unidir to aggregate. + the main business is to translate min/max rate from unidir to aggregated. """ @staticmethod @@ -1259,8 +1431,7 @@ class OptimizedSearch: final_trial_duration=30.0, initial_trial_duration=1.0, number_of_intermediate_phases=2, - timeout=720.0, - doublings=1, + timeout=1200.0, ppta=1, resetter=None, traffic_directions=2, @@ -1268,21 +1439,25 @@ class OptimizedSearch: transaction_scale=0, transaction_type=u"packet", use_latency=False, + ramp_up_rate=None, + ramp_up_duration=None, + state_timeout=240.0, + expansion_coefficient=4.0, ): """Setup initialized TG, perform optimized search, return intervals. - If transaction_scale is nonzero, all non-init trial durations - are set to 2.0 (as they do not affect the real trial duration) + If transaction_scale is nonzero, all init and non-init trial durations + are set to 1.0 (as they do not affect the real trial duration) and zero intermediate phases are used. - The initial phase still uses 1.0 seconds, to force remeasurement. - That makes initial phase act as a warmup. + This way no re-measurement happens. + Warmup has to be handled via resetter or ramp-up mechanisms. :param frame_size: Frame size identifier or value [B]. :param traffic_profile: Module name as a traffic profile identifier. See GPL/traffic_profiles/trex for implemented modules. :param minimum_transmit_rate: Minimal load in transactions per second. :param maximum_transmit_rate: Maximal load in transactions per second. - :param packet_loss_ratio: Fraction of packets lost, for PDR [1]. + :param packet_loss_ratio: Ratio of packets lost, for PDR [1]. :param final_relative_width: Final lower bound transmit rate cannot be more distant that this multiple of upper bound [1]. :param final_trial_duration: Trial duration for the final phase [s]. @@ -1292,9 +1467,6 @@ class OptimizedSearch: to perform before the final phase [1]. :param timeout: The search will fail itself when not finished before this overall time [s]. - :param doublings: How many doublings to do in external search step. - Default 1 is suitable for fairly stable tests, - less stable tests might get better overal duration with 2 or more. :param ppta: Packets per transaction, aggregated over directions. Needed for udp_pps which does not have a good transaction counter, so we need to compute expected number of packets. @@ -1310,6 +1482,10 @@ class OptimizedSearch: transactions. Default: "packet". :param use_latency: Whether to measure latency during the trial. Default: False. + :param ramp_up_rate: Rate to use in ramp-up trials [pps]. + :param ramp_up_duration: Duration of ramp-up trials [s]. + :param state_timeout: Time of life of DUT state [s]. + :param expansion_coefficient: In external search multiply width by this. :type frame_size: str or int :type traffic_profile: str :type minimum_transmit_rate: float @@ -1320,7 +1496,6 @@ class OptimizedSearch: :type initial_trial_duration: float :type number_of_intermediate_phases: int :type timeout: float - :type doublings: int :type ppta: int :type resetter: Optional[Callable[[], None]] :type traffic_directions: int @@ -1328,9 +1503,13 @@ class OptimizedSearch: :type transaction_scale: int :type transaction_type: str :type use_latency: bool + :type ramp_up_rate: float + :type ramp_up_duration: float + :type state_timeout: float + :type expansion_coefficient: float :returns: Structure containing narrowed down NDR and PDR intervals and their measurements. - :rtype: NdrPdrResult + :rtype: List[Receiverateinterval] :raises RuntimeError: If total duration is larger than timeout. """ # we need instance of TrafficGenerator instantiated by Robot Framework @@ -1339,13 +1518,11 @@ class OptimizedSearch: u"resources.libraries.python.TrafficGenerator" ) # Overrides for fixed transaction amount. - # TODO: Move to robot code? We have two call sites, so this saves space, - # even though this is surprising for log readers. if transaction_scale: initial_trial_duration = 1.0 - final_trial_duration = 2.0 + final_trial_duration = 1.0 number_of_intermediate_phases = 0 - timeout = 3600.0 + timeout += transaction_scale * 3e-4 tg_instance.set_rate_provider_defaults( frame_size=frame_size, traffic_profile=traffic_profile, @@ -1357,6 +1534,9 @@ class OptimizedSearch: transaction_scale=transaction_scale, transaction_type=transaction_type, use_latency=use_latency, + ramp_up_rate=ramp_up_rate, + ramp_up_duration=ramp_up_duration, + state_timeout=state_timeout, ) algorithm = MultipleLossRatioSearch( measurer=tg_instance, @@ -1365,14 +1545,20 @@ class OptimizedSearch: number_of_intermediate_phases=number_of_intermediate_phases, initial_trial_duration=initial_trial_duration, timeout=timeout, - doublings=doublings, + debug=logger.debug, + expansion_coefficient=expansion_coefficient, ) - result = algorithm.narrow_down_ndr_and_pdr( + if packet_loss_ratio: + packet_loss_ratios = [0.0, packet_loss_ratio] + else: + # Happens in reconf tests. + packet_loss_ratios = [packet_loss_ratio] + results = algorithm.narrow_down_intervals( min_rate=minimum_transmit_rate, max_rate=maximum_transmit_rate, - packet_loss_ratio=packet_loss_ratio, + packet_loss_ratios=packet_loss_ratios, ) - return result + return results @staticmethod def perform_soak_search( @@ -1392,6 +1578,9 @@ class OptimizedSearch: transaction_scale=0, transaction_type=u"packet", use_latency=False, + ramp_up_rate=None, + ramp_up_duration=None, + state_timeout=240.0, ): """Setup initialized TG, perform soak search, return avg and stdev. @@ -1400,7 +1589,7 @@ class OptimizedSearch: See GPL/traffic_profiles/trex for implemented modules. :param minimum_transmit_rate: Minimal load in transactions per second. :param maximum_transmit_rate: Maximal load in transactions per second. - :param plr_target: Fraction of packets lost to achieve [1]. + :param plr_target: Ratio of packets lost to achieve [1]. :param tdpt: Trial duration per trial. The algorithm linearly increases trial duration with trial number, this is the increment between succesive trials, in seconds. @@ -1428,6 +1617,9 @@ class OptimizedSearch: transactions. Default: "packet". :param use_latency: Whether to measure latency during the trial. Default: False. + :param ramp_up_rate: Rate to use in ramp-up trials [pps]. + :param ramp_up_duration: Duration of ramp-up trials [s]. + :param state_timeout: Time of life of DUT state [s]. :type frame_size: str or int :type traffic_profile: str :type minimum_transmit_rate: float @@ -1443,15 +1635,16 @@ class OptimizedSearch: :type transaction_scale: int :type transaction_type: str :type use_latency: bool - :returns: Average and stdev of estimated aggregate rate giving PLR. + :type ramp_up_rate: float + :type ramp_up_duration: float + :type state_timeout: float + :returns: Average and stdev of estimated aggregated rate giving PLR. :rtype: 2-tuple of float """ tg_instance = BuiltIn().get_library_instance( u"resources.libraries.python.TrafficGenerator" ) # Overrides for fixed transaction amount. - # TODO: Move to robot code? We have a single call site - # but MLRsearch has two and we want the two to be used similarly. if transaction_scale: timeout = 7200.0 tg_instance.set_rate_provider_defaults( @@ -1466,6 +1659,9 @@ class OptimizedSearch: transaction_scale=transaction_scale, transaction_type=transaction_type, use_latency=use_latency, + ramp_up_rate=ramp_up_rate, + ramp_up_duration=ramp_up_duration, + state_timeout=state_timeout, ) algorithm = PLRsearch( measurer=tg_instance,