from .topology import NodeType
from .topology import NodeSubTypeTG
from .topology import Topology
-from .TRexConfigGenerator import TrexInitConfig
+from .TRexConfigGenerator import TrexConfig
from .DUTSetup import DUTSetup as DS
__all__ = [u"TGDropRateSearchImpl", u"TrafficGenerator", u"OptimizedSearch"]
self.ramp_up_duration = None
self.state_timeout = None
# Transient data needed for async measurements.
- self._xstats = (None, None)
+ self._xstats = ()
@property
def node(self):
message = u"Get T-Rex version failed!"
stdout, _ = exec_cmd_no_error(tg_node, command, message=message)
return stdout.strip()
- else:
- return "none"
+ return "none"
- def initialize_traffic_generator(
- self, tg_node, tg_if1, tg_if2, tg_if1_adj_node, tg_if1_adj_if,
- tg_if2_adj_node, tg_if2_adj_if, osi_layer, tg_if1_dst_mac=None,
- tg_if2_dst_mac=None):
+ def initialize_traffic_generator(self, osi_layer, parallel_links=1):
"""TG initialization.
- :param tg_node: Traffic generator node.
- :param tg_if1: TG - name of first interface.
- :param tg_if2: TG - name of second interface.
- :param tg_if1_adj_node: TG if1 adjecent node.
- :param tg_if1_adj_if: TG if1 adjecent interface.
- :param tg_if2_adj_node: TG if2 adjecent node.
- :param tg_if2_adj_if: TG if2 adjecent interface.
:param osi_layer: 'L2', 'L3' or 'L7' - OSI Layer testing type.
- :param tg_if1_dst_mac: Interface 1 destination MAC address.
- :param tg_if2_dst_mac: Interface 2 destination MAC address.
- :type tg_node: dict
- :type tg_if1: str
- :type tg_if2: str
- :type tg_if1_adj_node: dict
- :type tg_if1_adj_if: str
- :type tg_if2_adj_node: dict
- :type tg_if2_adj_if: str
+ :param parallel_links: Number of parallel links to configure.
:type osi_layer: str
- :type tg_if1_dst_mac: str
- :type tg_if2_dst_mac: str
- :returns: nothing
- :raises RuntimeError: In case of issue during initialization.
+ :type parallel_links: int
+ :raises ValueError: If OSI layer is unknown.
"""
- subtype = check_subtype(tg_node)
+ if osi_layer not in ("L2", "L3", "L7"):
+ raise ValueError("Unknown OSI layer!")
+
+ topology = BuiltIn().get_variable_value("&{topology_info}")
+ self._node = topology["TG"]
+ subtype = check_subtype(self._node)
+
if subtype == NodeSubTypeTG.TREX:
- self._node = tg_node
+ trex_topology = list()
self._mode = TrexMode.ASTF if osi_layer == "L7" else TrexMode.STL
- if osi_layer == "L2":
- tg_if1_adj_addr = Topology().get_interface_mac(tg_node, tg_if2)
- tg_if2_adj_addr = Topology().get_interface_mac(tg_node, tg_if1)
- elif osi_layer in ("L3", "L7"):
- tg_if1_adj_addr = Topology().get_interface_mac(
- tg_if1_adj_node, tg_if1_adj_if
+ for link in range(1, parallel_links*2, 2):
+ tg_if1_adj_addr = topology[f"TG_pf{link+1}_mac"][0]
+ tg_if2_adj_addr = topology[f"TG_pf{link}_mac"][0]
+ if osi_layer in ("L3", "L7") and "DUT1" in topology.keys():
+ ifl = BuiltIn().get_variable_value("${int}")
+ last = topology["duts_count"]
+ tg_if1_adj_addr = Topology().get_interface_mac(
+ topology["DUT1"],
+ BuiltIn().get_variable_value(
+ f"${{DUT1_{ifl}{link}}}[0]"
+ )
+ )
+ tg_if2_adj_addr = Topology().get_interface_mac(
+ topology[f"DUT{last}"],
+ BuiltIn().get_variable_value(
+ f"${{DUT{last}_{ifl}{link+1}}}[0]"
+ )
+ )
+
+ trex_topology.append(
+ dict(
+ interface=topology[f"TG_pf{link}"][0],
+ dst_mac=tg_if1_adj_addr
+ )
)
- tg_if2_adj_addr = Topology().get_interface_mac(
- tg_if2_adj_node, tg_if2_adj_if
+ trex_topology.append(
+ dict(
+ interface=topology[f"TG_pf{link+1}"][0],
+ dst_mac=tg_if2_adj_addr
+ )
)
- else:
- raise ValueError("Unknown OSI layer!")
-
- tg_topology = list()
- tg_topology.append(dict(interface=tg_if1, dst_mac=tg_if1_adj_addr))
- tg_topology.append(dict(interface=tg_if2, dst_mac=tg_if2_adj_addr))
- if1_pci = Topology().get_interface_pci_addr(self._node, tg_if1)
- if2_pci = Topology().get_interface_pci_addr(self._node, tg_if2)
- if min(if1_pci, if2_pci) != if1_pci:
- self._ifaces_reordered = True
- tg_topology.reverse()
-
- TrexInitConfig.init_trex_startup_configuration(tg_node, tg_topology)
- TrafficGenerator.startup_trex(tg_node, osi_layer, subtype=subtype)
+ if1_pci = topology[f"TG_pf{link}_pci"][0]
+ if2_pci = topology[f"TG_pf{link+1}_pci"][0]
+ if min(if1_pci, if2_pci) != if1_pci:
+ self._ifaces_reordered = True
+ trex_topology.reverse()
+
+ TrexConfig.add_startup_configuration(
+ self._node, trex_topology
+ )
+ TrafficGenerator.startup_trex(
+ self._node, osi_layer, subtype=subtype
+ )
@staticmethod
def startup_trex(tg_node, osi_layer, subtype=None):
:type node: dict
:raises RuntimeError: If stop traffic script fails.
"""
- command_line = OptionString().add(u"python3")
+ command_line = OptionString().add("python3")
dirname = f"{Constants.REMOTE_FW_DIR}/GPL/tools/trex"
command_line.add(f"'{dirname}/trex_astf_stop.py'")
- command_line.change_prefix(u"--")
- for index, value in enumerate(self._xstats):
+ command_line.add("--xstat")
+ for value in self._xstats:
if value is not None:
- value = value.replace(u"'", u"\"")
- command_line.add_equals(f"xstat{index}", f"'{value}'")
+ value = value.replace("'", "\"")
+ command_line.add(f"'{value}'")
stdout, _ = exec_cmd_no_error(
node, command_line,
- message=u"T-Rex ASTF runtime error!"
+ message="T-Rex ASTF runtime error!"
)
self._parse_traffic_results(stdout)
:type node: dict
:raises RuntimeError: If stop traffic script fails.
"""
- command_line = OptionString().add(u"python3")
+ command_line = OptionString().add("python3")
dirname = f"{Constants.REMOTE_FW_DIR}/GPL/tools/trex"
command_line.add(f"'{dirname}/trex_stl_stop.py'")
- command_line.change_prefix(u"--")
- for index, value in enumerate(self._xstats):
+ command_line.add("--xstat")
+ for value in self._xstats:
if value is not None:
- value = value.replace(u"'", u"\"")
- command_line.add_equals(f"xstat{index}", f"'{value}'")
+ value = value.replace("'", "\"")
+ command_line.add(f"'{value}'")
stdout, _ = exec_cmd_no_error(
node, command_line,
- message=u"T-Rex STL runtime error!"
+ message="T-Rex STL runtime error!"
)
self._parse_traffic_results(stdout)
self._sent = None
self._loss = None
self._latency = None
- xstats = [None, None]
+ xstats = []
self._l7_data = dict()
self._l7_data[u"client"] = dict()
self._l7_data[u"client"][u"active_flows"] = None
index = 0
for line in stdout.splitlines():
if f"Xstats snapshot {index}: " in line:
- xstats[index] = line[19:]
+ xstats.append(line[19:])
index += 1
- if index == 2:
- break
self._xstats = tuple(xstats)
else:
self._target_duration = duration
self._loss = None
self._latency = None
- xstats = [None, None]
+ xstats = []
index = 0
for line in stdout.splitlines():
if f"Xstats snapshot {index}: " in line:
- xstats[index] = line[19:]
+ xstats.append(line[19:])
index += 1
- if index == 2:
- break
self._xstats = tuple(xstats)
else:
self._target_duration = duration