X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2FTrafficGenerator.py;h=455a21ebb8401a1f2355db8f843f335a81a9cce3;hp=80248add038203463b2c99f0fca9d66ad10ae845;hb=500dba02e62ba24f94972a0ef8f023418a7eee09;hpb=023fa41e51c966a1956bda6b915ffd894ff10e84 diff --git a/resources/libraries/python/TrafficGenerator.py b/resources/libraries/python/TrafficGenerator.py index 80248add03..455a21ebb8 100644 --- a/resources/libraries/python/TrafficGenerator.py +++ b/resources/libraries/python/TrafficGenerator.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020 Cisco and/or its affiliates. +# Copyright (c) 2021 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -171,6 +171,11 @@ class TrafficGenerator(AbstractMeasurer): self.sleep_till_duration = None self.transaction_type = None self.duration_limit = None + self.ramp_up_start = None + self.ramp_up_stop = None + self.ramp_up_rate = None + self.ramp_up_duration = None + self.state_timeout = None # Transient data needed for async measurements. self._xstats = (None, None) # TODO: Rename "xstats" to something opaque, so T-Rex is not privileged? @@ -404,12 +409,11 @@ class TrafficGenerator(AbstractMeasurer): raise RuntimeError(u"Start TRex failed!") # Test T-Rex API responsiveness. - cmd = u"python3" - cmd += f" {Constants.REMOTE_FW_DIR}/GPL/tools/trex/" + cmd = f"python3 {Constants.REMOTE_FW_DIR}/GPL/tools/trex/" if osi_layer in (u"L2", u"L3"): - cmd += f"trex_stl_assert.py" + cmd += u"trex_stl_assert.py" elif osi_layer == u"L7": - cmd += f"trex_astf_assert.py" + cmd += u"trex_astf_assert.py" else: raise ValueError(u"Unknown OSI layer!") try: @@ -523,7 +527,43 @@ class TrafficGenerator(AbstractMeasurer): raise ValueError(u"Unsupported T-Rex traffic profile!") self._stop_time = time.monotonic() - return self.get_measurement_result() + return self._get_measurement_result() + + def _compute_duration(self, duration, multiplier): + """Compute duration for profile driver. + + The final result is influenced by transaction scale and duration limit. + It is assumed a higher level function has already set those to self. + The duration argument is the target value from search point of view, + before the overrides are applied here. + + Minus one (signalling async traffic start) is kept. + + Completeness flag is also included. Duration limited or async trials + are not considered complete for ramp-up purposes. + + :param duration: Time expressed in seconds for how long to send traffic. + :param multiplier: Traffic rate in transactions per second. + :type duration: float + :type multiplier: float + :returns: New duration and whether it was a complete ramp-up candidate. + :rtype: float, bool + """ + if duration < 0.0: + # Keep the async -1. + return duration, False + computed_duration = duration + if self.transaction_scale: + computed_duration = self.transaction_scale / multiplier + # Log the computed duration, + # so we can compare with what telemetry suggests + # the real duration was. + logger.debug(f"Expected duration {computed_duration}") + computed_duration += 0.1115 + if not self.duration_limit: + return computed_duration, True + limited_duration = min(computed_duration, self.duration_limit) + return limited_duration, (limited_duration == computed_duration) def trex_astf_start_remote_exec( self, duration, multiplier, async_call=False): @@ -571,19 +611,9 @@ class TrafficGenerator(AbstractMeasurer): if not isinstance(duration, (float, int)): duration = float(duration) - # Duration logic. - computed_duration = duration - if duration > 0.0: - if self.transaction_scale: - computed_duration = self.transaction_scale / multiplier - # Log the computed duration, - # so we can compare with what telemetry suggests - # the real duration was. - logger.debug(f"Expected duration {computed_duration}") - computed_duration += 0.1115 - # Else keep -1. - if self.duration_limit: - computed_duration = min(computed_duration, self.duration_limit) + # TODO: Refactor the code so duration is computed only once, + # and both the initial and the computed durations are logged. + computed_duration, _ = self._compute_duration(duration, multiplier) command_line = OptionString().add(u"python3") dirname = f"{Constants.REMOTE_FW_DIR}/GPL/tools/trex" @@ -687,8 +717,10 @@ class TrafficGenerator(AbstractMeasurer): p_0, p_1 = (1, 0) if self._ifaces_reordered else (0, 1) if not isinstance(duration, (float, int)): duration = float(duration) - if self.duration_limit: - duration = min(duration, self.duration_limit) + + # TODO: Refactor the code so duration is computed only once, + # and both the initial and the computed durations are logged. + duration, _ = self._compute_duration(duration=duration, multiplier=rate) command_line = OptionString().add(u"python3") dirname = f"{Constants.REMOTE_FW_DIR}/GPL/tools/trex" @@ -755,6 +787,10 @@ class TrafficGenerator(AbstractMeasurer): transaction_type=u"packet", duration_limit=0.0, use_latency=False, + ramp_up_rate=None, + ramp_up_duration=None, + state_timeout=300.0, + ramp_up_only=False, ): """Send traffic from all configured interfaces on TG. @@ -775,6 +811,8 @@ class TrafficGenerator(AbstractMeasurer): Bidirectional STL profiles are treated as transactions with two packets. + The return value is None for async. + :param duration: Duration of test traffic generation in seconds. :param rate: Traffic rate in transactions per second. :param frame_size: Frame size (L2) in Bytes. @@ -797,6 +835,10 @@ class TrafficGenerator(AbstractMeasurer): duration. :param use_latency: Whether to measure latency during the trial. Default: False. + :param ramp_up_rate: Rate to use in ramp-up trials [pps]. + :param ramp_up_duration: Duration of ramp-up trials [s]. + :param state_timeout: Time of life of DUT state [s]. + :param ramp_up_only: If true, do not perform main trial measurement. :type duration: float :type rate: float :type frame_size: str @@ -809,8 +851,12 @@ class TrafficGenerator(AbstractMeasurer): :type transaction_type: str :type duration_limit: float :type use_latency: bool + :type ramp_up_rate: float + :type ramp_up_duration: float + :type state_timeout: float + :type ramp_up_only: bool :returns: TG results. - :rtype: str + :rtype: ReceiveRateMeasurement or None :raises ValueError: If TG traffic profile is not supported. """ self.set_rate_provider_defaults( @@ -823,10 +869,19 @@ class TrafficGenerator(AbstractMeasurer): transaction_type=transaction_type, duration_limit=duration_limit, use_latency=use_latency, + ramp_up_rate=ramp_up_rate, + ramp_up_duration=ramp_up_duration, + state_timeout=state_timeout, + ) + return self._send_traffic_on_tg_with_ramp_up( + duration=duration, + rate=rate, + async_call=async_call, + ramp_up_only=ramp_up_only, ) - self._send_traffic_on_tg_internal(duration, rate, async_call) - def _send_traffic_on_tg_internal(self, duration, rate, async_call=False): + def _send_traffic_on_tg_internal( + self, duration, rate, async_call=False): """Send traffic from all configured interfaces on TG. This is an internal function, it assumes set_rate_provider_defaults @@ -838,6 +893,9 @@ class TrafficGenerator(AbstractMeasurer): need to specify their own values, and we do not want the measure call to overwrite them with defaults. + This function is used both for automated ramp-up trials + and for explicitly called trials. + :param duration: Duration of test traffic generation in seconds. :param rate: Traffic rate in transactions per second. :param async_call: Async mode. @@ -845,7 +903,7 @@ class TrafficGenerator(AbstractMeasurer): :type rate: float :type async_call: bool :returns: TG results. - :rtype: str + :rtype: ReceiveRateMeasurement or None :raises ValueError: If TG traffic profile is not supported. """ subtype = check_subtype(self._node) @@ -863,7 +921,103 @@ class TrafficGenerator(AbstractMeasurer): else: raise ValueError(u"Unsupported T-Rex traffic profile!") - return self._result + return None if async_call else self._get_measurement_result() + + def _send_traffic_on_tg_with_ramp_up( + self, duration, rate, async_call=False, ramp_up_only=False): + """Send traffic from all interfaces on TG, maybe after ramp-up. + + This is an internal function, it assumes set_rate_provider_defaults + has been called to remember most values. + The reason why need to remember various values is that + the traffic can be asynchronous, and parsing needs those values. + The reason why this is a separate function from the one + which calls set_rate_provider_defaults is that some search algorithms + need to specify their own values, and we do not want the measure call + to overwrite them with defaults. + + If ramp-up tracking is detected, a computation is performed, + and if state timeout is near, trial at ramp-up rate and duration + is inserted before the main trial measurement. + + The ramp_up_only parameter forces a ramp-up without immediate + trial measurement, which is useful in case self remembers + a previous ramp-up trial that belongs to a different test (phase). + + Return None if trial is async or ramp-up only. + + :param duration: Duration of test traffic generation in seconds. + :param rate: Traffic rate in transactions per second. + :param async_call: Async mode. + :param ramp_up_only: If true, do not perform main trial measurement. + :type duration: float + :type rate: float + :type async_call: bool + :type ramp_up_only: bool + :returns: TG results. + :rtype: ReceiveRateMeasurement or None + :raises ValueError: If TG traffic profile is not supported. + """ + complete = False + if self.ramp_up_rate: + # Figure out whether we need to insert a ramp-up trial. + # TODO: Give up on async_call=True? + if ramp_up_only or self.ramp_up_start is None: + # We never ramped up yet (at least not in this test case). + ramp_up_needed = True + else: + # We ramped up before, but maybe it was too long ago. + # Adding a constant overhead to be safe. + time_now = time.monotonic() + 1.0 + computed_duration, complete = self._compute_duration( + duration=duration, + multiplier=rate, + ) + # There are two conditions for inserting ramp-up. + # If early sessions are expiring already, + # or if late sessions are to expire before measurement is over. + ramp_up_start_delay = time_now - self.ramp_up_start + ramp_up_stop_delay = time_now - self.ramp_up_stop + ramp_up_stop_delay += computed_duration + bigger_delay = max(ramp_up_start_delay, ramp_up_stop_delay) + # Final boolean decision. + ramp_up_needed = (bigger_delay >= self.state_timeout) + if ramp_up_needed: + logger.debug( + u"State may time out during next real trial, " + u"inserting a ramp-up trial." + ) + self.ramp_up_start = time.monotonic() + self._send_traffic_on_tg_internal( + duration=self.ramp_up_duration, + rate=self.ramp_up_rate, + async_call=async_call, + ) + self.ramp_up_stop = time.monotonic() + logger.debug(u"Ramp-up done.") + else: + logger.debug( + u"State will probably not time out during next real trial, " + u"no ramp-up trial needed just yet." + ) + if ramp_up_only: + return None + trial_start = time.monotonic() + result = self._send_traffic_on_tg_internal( + duration=duration, + rate=rate, + async_call=async_call, + ) + trial_end = time.monotonic() + if self.ramp_up_rate: + # Optimization: No loss acts as a good ramp-up, if it was complete. + if complete and result is not None and result.loss_count == 0: + logger.debug(u"Good trial acts as a ramp-up") + self.ramp_up_start = trial_start + self.ramp_up_stop = trial_end + else: + logger.debug(u"Loss or incomplete, does not act as a ramp-up.") + return result def no_traffic_loss_occurred(self): """Fail if loss occurred in traffic run. @@ -886,7 +1040,7 @@ class TrafficGenerator(AbstractMeasurer): """ if self._received is None: raise RuntimeError(u"The traffic generation has not been issued") - if self._received == u"0": + if self._received == 0: raise RuntimeError(u"No traffic forwarded") def partial_traffic_loss_accepted( @@ -1030,7 +1184,7 @@ class TrafficGenerator(AbstractMeasurer): self._l7_data[u"server"][u"tcp"][u"rx_bytes"] = \ int(self._result.get(u"server_tcp_rx_bytes", 0)) - def get_measurement_result(self): + def _get_measurement_result(self): """Return the result of last measurement as ReceiveRateMeasurement. Separate function, as measurements can end either by time @@ -1091,8 +1245,10 @@ class TrafficGenerator(AbstractMeasurer): partial_attempt_count = ctca # We do not care whether TG is slow, it should have attempted all. expected_attempt_count = self.transaction_scale - # TODO: Is there a better packet-based counter? - pass_count = self._l7_data[u"server"][u"tcp"][u"connects"] + # From TCP point of view, server/connects counts full connections, + # but we are testing NAT session so client/connects counts that + # (half connections from TCP point of view). + pass_count = self._l7_data[u"client"][u"tcp"][u"connects"] fail_count = expected_attempt_count - pass_count elif self.transaction_type == u"udp_pps": if not self.transaction_scale: @@ -1156,12 +1312,11 @@ class TrafficGenerator(AbstractMeasurer): time_stop = time_start + duration if self.resetter: self.resetter() - self._send_traffic_on_tg_internal( + result = self._send_traffic_on_tg_with_ramp_up( duration=duration, rate=transmit_rate, async_call=False, ) - result = self.get_measurement_result() logger.debug(f"trial measurement result: {result!r}") # In PLRsearch, computation needs the specified time to complete. if self.sleep_till_duration: @@ -1186,6 +1341,9 @@ class TrafficGenerator(AbstractMeasurer): negative_loss=True, sleep_till_duration=False, use_latency=False, + ramp_up_rate=None, + ramp_up_duration=None, + state_timeout=300.0, ): """Store values accessed by measure(). @@ -1214,6 +1372,9 @@ class TrafficGenerator(AbstractMeasurer): sleep until it matches duration. Needed for PLRsearch. :param use_latency: Whether to measure latency during the trial. Default: False. + :param ramp_up_rate: Rate to use in ramp-up trials [pps]. + :param ramp_up_duration: Duration of ramp-up trials [s]. + :param state_timeout: Time of life of DUT state [s]. :type frame_size: str or int :type traffic_profile: str :type ppta: int @@ -1226,6 +1387,9 @@ class TrafficGenerator(AbstractMeasurer): :type negative_loss: bool :type sleep_till_duration: bool :type use_latency: bool + :type ramp_up_rate: float + :type ramp_up_duration: float + :type state_timeout: float """ self.frame_size = frame_size self.traffic_profile = str(traffic_profile) @@ -1239,6 +1403,9 @@ class TrafficGenerator(AbstractMeasurer): self.negative_loss = bool(negative_loss) self.sleep_till_duration = bool(sleep_till_duration) self.use_latency = bool(use_latency) + self.ramp_up_rate = float(ramp_up_rate) + self.ramp_up_duration = float(ramp_up_duration) + self.state_timeout = float(state_timeout) class OptimizedSearch: @@ -1268,6 +1435,9 @@ class OptimizedSearch: transaction_scale=0, transaction_type=u"packet", use_latency=False, + ramp_up_rate=None, + ramp_up_duration=None, + state_timeout=300.0, ): """Setup initialized TG, perform optimized search, return intervals. @@ -1310,6 +1480,9 @@ class OptimizedSearch: transactions. Default: "packet". :param use_latency: Whether to measure latency during the trial. Default: False. + :param ramp_up_rate: Rate to use in ramp-up trials [pps]. + :param ramp_up_duration: Duration of ramp-up trials [s]. + :param state_timeout: Time of life of DUT state [s]. :type frame_size: str or int :type traffic_profile: str :type minimum_transmit_rate: float @@ -1328,6 +1501,9 @@ class OptimizedSearch: :type transaction_scale: int :type transaction_type: str :type use_latency: bool + :type ramp_up_rate: float + :type ramp_up_duration: float + :type state_timeout: float :returns: Structure containing narrowed down NDR and PDR intervals and their measurements. :rtype: NdrPdrResult @@ -1345,7 +1521,7 @@ class OptimizedSearch: initial_trial_duration = 1.0 final_trial_duration = 2.0 number_of_intermediate_phases = 0 - timeout = 3600.0 + timeout += transaction_scale * 3e-4 tg_instance.set_rate_provider_defaults( frame_size=frame_size, traffic_profile=traffic_profile, @@ -1357,6 +1533,9 @@ class OptimizedSearch: transaction_scale=transaction_scale, transaction_type=transaction_type, use_latency=use_latency, + ramp_up_rate=ramp_up_rate, + ramp_up_duration=ramp_up_duration, + state_timeout=state_timeout, ) algorithm = MultipleLossRatioSearch( measurer=tg_instance, @@ -1392,6 +1571,9 @@ class OptimizedSearch: transaction_scale=0, transaction_type=u"packet", use_latency=False, + ramp_up_rate=None, + ramp_up_duration=None, + state_timeout=300.0, ): """Setup initialized TG, perform soak search, return avg and stdev. @@ -1428,6 +1610,9 @@ class OptimizedSearch: transactions. Default: "packet". :param use_latency: Whether to measure latency during the trial. Default: False. + :param ramp_up_rate: Rate to use in ramp-up trials [pps]. + :param ramp_up_duration: Duration of ramp-up trials [s]. + :param state_timeout: Time of life of DUT state [s]. :type frame_size: str or int :type traffic_profile: str :type minimum_transmit_rate: float @@ -1443,6 +1628,9 @@ class OptimizedSearch: :type transaction_scale: int :type transaction_type: str :type use_latency: bool + :type ramp_up_rate: float + :type ramp_up_duration: float + :type state_timeout: float :returns: Average and stdev of estimated aggregate rate giving PLR. :rtype: 2-tuple of float """ @@ -1453,6 +1641,8 @@ class OptimizedSearch: # TODO: Move to robot code? We have a single call site # but MLRsearch has two and we want the two to be used similarly. if transaction_scale: + # TODO: What is a good value for max scale? + # TODO: Scale the timeout with transaction scale. timeout = 7200.0 tg_instance.set_rate_provider_defaults( frame_size=frame_size, @@ -1466,6 +1656,9 @@ class OptimizedSearch: transaction_scale=transaction_scale, transaction_type=transaction_type, use_latency=use_latency, + ramp_up_rate=ramp_up_rate, + ramp_up_duration=ramp_up_duration, + state_timeout=state_timeout, ) algorithm = PLRsearch( measurer=tg_instance,