+ subtype = check_subtype(self._node)
+ if subtype == NodeSubTypeTG.TREX:
+ # Last line from console output
+ line = stdout.splitlines()[-1]
+ results = line.split(u";")
+ if results[-1] in (u" ", u""):
+ results.pop(-1)
+ self._result = dict()
+ for result in results:
+ key, value = result.split(u"=", maxsplit=1)
+ self._result[key.strip()] = value
+ logger.info(f"TrafficGen results:\n{self._result}")
+ self._received = int(self._result.get(u"total_received"), 0)
+ self._sent = int(self._result.get(u"total_sent", 0))
+ self._loss = int(self._result.get(u"frame_loss", 0))
+ self._approximated_duration = \
+ self._result.get(u"approximated_duration", 0.0)
+ if u"manual" not in str(self._approximated_duration):
+ self._approximated_duration = float(self._approximated_duration)
+ self._latency = list()
+ self._latency.append(self._result.get(u"latency_stream_0(usec)"))
+ self._latency.append(self._result.get(u"latency_stream_1(usec)"))
+ if self._mode == TrexMode.ASTF:
+ self._l7_data = dict()
+ self._l7_data[u"client"] = dict()
+ self._l7_data[u"client"][u"sent"] = \
+ int(self._result.get(u"client_sent", 0))
+ self._l7_data[u"client"][u"received"] = \
+ int(self._result.get(u"client_received", 0))
+ self._l7_data[u"client"][u"active_flows"] = \
+ int(self._result.get(u"client_active_flows", 0))
+ self._l7_data[u"client"][u"established_flows"] = \
+ int(self._result.get(u"client_established_flows", 0))
+ self._l7_data[u"client"][u"traffic_duration"] = \
+ float(self._result.get(u"client_traffic_duration", 0.0))
+ self._l7_data[u"client"][u"err_rx_throttled"] = \
+ int(self._result.get(u"client_err_rx_throttled", 0))
+ self._l7_data[u"client"][u"err_c_nf_throttled"] = \
+ int(self._result.get(u"client_err_nf_throttled", 0))
+ self._l7_data[u"client"][u"err_flow_overflow"] = \
+ int(self._result.get(u"client_err_flow_overflow", 0))
+ self._l7_data[u"server"] = dict()
+ self._l7_data[u"server"][u"active_flows"] = \
+ int(self._result.get(u"server_active_flows", 0))
+ self._l7_data[u"server"][u"established_flows"] = \
+ int(self._result.get(u"server_established_flows", 0))
+ self._l7_data[u"server"][u"traffic_duration"] = \
+ float(self._result.get(u"server_traffic_duration", 0.0))
+ self._l7_data[u"server"][u"err_rx_throttled"] = \
+ int(self._result.get(u"client_err_rx_throttled", 0))
+ if u"udp" in self.traffic_profile:
+ self._l7_data[u"client"][u"udp"] = dict()
+ self._l7_data[u"client"][u"udp"][u"connects"] = \
+ int(self._result.get(u"client_udp_connects", 0))
+ self._l7_data[u"client"][u"udp"][u"closed_flows"] = \
+ int(self._result.get(u"client_udp_closed", 0))
+ self._l7_data[u"client"][u"udp"][u"tx_bytes"] = \
+ int(self._result.get(u"client_udp_tx_bytes", 0))
+ self._l7_data[u"client"][u"udp"][u"rx_bytes"] = \
+ int(self._result.get(u"client_udp_rx_bytes", 0))
+ self._l7_data[u"client"][u"udp"][u"tx_packets"] = \
+ int(self._result.get(u"client_udp_tx_packets", 0))
+ self._l7_data[u"client"][u"udp"][u"rx_packets"] = \
+ int(self._result.get(u"client_udp_rx_packets", 0))
+ self._l7_data[u"client"][u"udp"][u"keep_drops"] = \
+ int(self._result.get(u"client_udp_keep_drops", 0))
+ self._l7_data[u"client"][u"udp"][u"err_cwf"] = \
+ int(self._result.get(u"client_err_cwf", 0))
+ self._l7_data[u"server"][u"udp"] = dict()
+ self._l7_data[u"server"][u"udp"][u"accepted_flows"] = \
+ int(self._result.get(u"server_udp_accepts", 0))
+ self._l7_data[u"server"][u"udp"][u"closed_flows"] = \
+ int(self._result.get(u"server_udp_closed", 0))
+ self._l7_data[u"server"][u"udp"][u"tx_bytes"] = \
+ int(self._result.get(u"server_udp_tx_bytes", 0))
+ self._l7_data[u"server"][u"udp"][u"rx_bytes"] = \
+ int(self._result.get(u"server_udp_rx_bytes", 0))
+ self._l7_data[u"server"][u"udp"][u"tx_packets"] = \
+ int(self._result.get(u"server_udp_tx_packets", 0))
+ self._l7_data[u"server"][u"udp"][u"rx_packets"] = \
+ int(self._result.get(u"server_udp_rx_packets", 0))
+ elif u"tcp" in self.traffic_profile:
+ self._l7_data[u"client"][u"tcp"] = dict()
+ self._l7_data[u"client"][u"tcp"][u"initiated_flows"] = \
+ int(self._result.get(u"client_tcp_connect_inits", 0))
+ self._l7_data[u"client"][u"tcp"][u"connects"] = \
+ int(self._result.get(u"client_tcp_connects", 0))
+ self._l7_data[u"client"][u"tcp"][u"closed_flows"] = \
+ int(self._result.get(u"client_tcp_closed", 0))
+ self._l7_data[u"client"][u"tcp"][u"connattempt"] = \
+ int(self._result.get(u"client_tcp_connattempt", 0))
+ self._l7_data[u"client"][u"tcp"][u"tx_bytes"] = \
+ int(self._result.get(u"client_tcp_tx_bytes", 0))
+ self._l7_data[u"client"][u"tcp"][u"rx_bytes"] = \
+ int(self._result.get(u"client_tcp_rx_bytes", 0))
+ self._l7_data[u"server"][u"tcp"] = dict()
+ self._l7_data[u"server"][u"tcp"][u"accepted_flows"] = \
+ int(self._result.get(u"server_tcp_accepts", 0))
+ self._l7_data[u"server"][u"tcp"][u"connects"] = \
+ int(self._result.get(u"server_tcp_connects", 0))
+ self._l7_data[u"server"][u"tcp"][u"closed_flows"] = \
+ int(self._result.get(u"server_tcp_closed", 0))
+ self._l7_data[u"server"][u"tcp"][u"tx_bytes"] = \
+ int(self._result.get(u"server_tcp_tx_bytes", 0))
+ self._l7_data[u"server"][u"tcp"][u"rx_bytes"] = \
+ int(self._result.get(u"server_tcp_rx_bytes", 0))
+
+ def _get_measurement_result(self):
+ """Return the result of last measurement as ReceiveRateMeasurement.
+
+ Separate function, as measurements can end either by time
+ or by explicit call, this is the common block at the end.
+
+ The target_tr field of ReceiveRateMeasurement is in
+ transactions per second. Transmit count and loss count units
+ depend on the transaction type. Usually they are in transactions
+ per second, or aggregated packets per second.
+
+ TODO: Fail on running or already reported measurement.
+
+ :returns: Structure containing the result of the measurement.
+ :rtype: ReceiveRateMeasurement
+ """
+ try:
+ # Client duration seems to include a setup period
+ # where TRex does not send any packets yet.
+ # Server duration does not include it.
+ server_data = self._l7_data[u"server"]
+ approximated_duration = float(server_data[u"traffic_duration"])
+ except (KeyError, AttributeError, ValueError, TypeError):
+ approximated_duration = None
+ try:
+ if not approximated_duration:
+ approximated_duration = float(self._approximated_duration)
+ except ValueError: # "manual"
+ approximated_duration = None
+ if not approximated_duration:
+ if self._duration and self._duration > 0:
+ # Known recomputed or target duration.
+ approximated_duration = self._duration
+ else:
+ # It was an explicit stop.
+ if not self._stop_time:
+ raise RuntimeError(u"Unable to determine duration.")
+ approximated_duration = self._stop_time - self._start_time
+ target_duration = self._target_duration
+ if not target_duration:
+ target_duration = approximated_duration
+ transmit_rate = self._rate
+ unsent = 0
+ if self.transaction_type == u"packet":
+ partial_attempt_count = self._sent
+ packet_rate = transmit_rate * self.ppta
+ # We have a float. TRex way of rounding it is not obvious.
+ # The biggest source of mismatch is Inter Stream Gap.
+ # So the code tolerates 10 usec of missing packets.
+ expected_attempt_count = (target_duration - 1e-5) * packet_rate
+ expected_attempt_count = math.ceil(expected_attempt_count)
+ # TRex can send more.
+ expected_attempt_count = max(expected_attempt_count, self._sent)
+ unsent = expected_attempt_count - self._sent
+ pass_count = self._received
+ fail_count = expected_attempt_count - pass_count
+ elif self.transaction_type == u"udp_cps":
+ if not self.transaction_scale:
+ raise RuntimeError(u"Add support for no-limit udp_cps.")
+ partial_attempt_count = self._l7_data[u"client"][u"sent"]
+ # We do not care whether TG is slow, it should have attempted all.
+ expected_attempt_count = self.transaction_scale
+ unsent = expected_attempt_count - partial_attempt_count
+ pass_count = self._l7_data[u"client"][u"received"]
+ fail_count = expected_attempt_count - pass_count
+ elif self.transaction_type == u"tcp_cps":
+ if not self.transaction_scale:
+ raise RuntimeError(u"Add support for no-limit tcp_cps.")
+ ctca = self._l7_data[u"client"][u"tcp"][u"connattempt"]
+ partial_attempt_count = ctca
+ # We do not care whether TG is slow, it should have attempted all.
+ expected_attempt_count = self.transaction_scale
+ unsent = expected_attempt_count - partial_attempt_count
+ # From TCP point of view, server/connects counts full connections,
+ # but we are testing NAT session so client/connects counts that
+ # (half connections from TCP point of view).
+ pass_count = self._l7_data[u"client"][u"tcp"][u"connects"]
+ fail_count = expected_attempt_count - pass_count
+ elif self.transaction_type == u"udp_pps":
+ if not self.transaction_scale:
+ raise RuntimeError(u"Add support for no-limit udp_pps.")
+ partial_attempt_count = self._sent
+ expected_attempt_count = self.transaction_scale * self.ppta
+ unsent = expected_attempt_count - self._sent
+ fail_count = self._loss + unsent
+ elif self.transaction_type == u"tcp_pps":
+ if not self.transaction_scale:
+ raise RuntimeError(u"Add support for no-limit tcp_pps.")
+ partial_attempt_count = self._sent
+ expected_attempt_count = self.transaction_scale * self.ppta
+ # One loss-like scenario happens when TRex receives all packets
+ # on L2 level, but is not fast enough to process them all
+ # at L7 level, which leads to retransmissions.
+ # Those manifest as opackets larger than expected.
+ # A simple workaround is to add absolute difference.
+ # Probability of retransmissions exactly cancelling
+ # packets unsent due to duration stretching is quite low.
+ unsent = abs(expected_attempt_count - self._sent)
+ fail_count = self._loss + unsent
+ else:
+ raise RuntimeError(f"Unknown parsing {self.transaction_type!r}")
+ if unsent and isinstance(self._approximated_duration, float):
+ # Do not report unsent for "manual".
+ logger.debug(f"Unsent packets/transactions: {unsent}")
+ if fail_count < 0 and not self.negative_loss:
+ fail_count = 0
+ measurement = ReceiveRateMeasurement(
+ duration=target_duration,
+ target_tr=transmit_rate,
+ transmit_count=expected_attempt_count,
+ loss_count=fail_count,
+ approximated_duration=approximated_duration,
+ partial_transmit_count=partial_attempt_count,
+ )
+ measurement.latency = self.get_latency_int()
+ return measurement