Add: Use containers for shared TG
[csit.git] / resources / libraries / python / TrafficGenerator.py
index 9e0a1d2..e93da04 100644 (file)
 
 """Performance testing traffic generator library."""
 
+import time
+
 from robot.api import logger
 from robot.libraries.BuiltIn import BuiltIn
 
 from .DropRateSearch import DropRateSearch
 from .Constants import Constants
-from .ssh import SSH
+from .ssh import SSH, exec_cmd_no_error
 from .topology import NodeType
 from .topology import NodeSubTypeTG
 from .topology import Topology
@@ -30,6 +32,31 @@ from .PLRsearch.PLRsearch import PLRsearch
 __all__ = ['TGDropRateSearchImpl', 'TrafficGenerator', 'OptimizedSearch']
 
 
+def check_subtype(node):
+    """Return supported subtype of given node, or raise an exception.
+
+    Currently only one subtype is supported,
+    but we want our code to be ready for other ones.
+
+    :param node: Topology node to check. Can be None.
+    :type node: dict or NoneType
+    :returns: Subtype detected.
+    :rtype: NodeSubTypeTG
+    :raises RuntimeError: If node is not supported, message explains how.
+    """
+    if node.get('type') is None:
+        raise RuntimeError('Node type is not defined')
+    elif node['type'] != NodeType.TG:
+        raise RuntimeError('Node type is {typ!r}, not a TG'.format(
+            typ=node['type']))
+    elif node.get('subtype') is None:
+        raise RuntimeError('TG subtype is not defined')
+    elif node['subtype'] == NodeSubTypeTG.TREX:
+        return NodeSubTypeTG.TREX
+    raise RuntimeError('TG subtype {sub!r} is not supported'.format(
+        sub=node['subtype']))
+
+
 class TGDropRateSearchImpl(DropRateSearch):
     """Drop Rate Search implementation."""
 
@@ -62,34 +89,24 @@ class TGDropRateSearchImpl(DropRateSearch):
         # to be able to use trex_stl-*()
         tg_instance = BuiltIn().get_library_instance(
             'resources.libraries.python.TrafficGenerator')
-
-        if tg_instance.node['subtype'] is None:
-            raise RuntimeError('TG subtype not defined')
-        elif tg_instance.node['subtype'] == NodeSubTypeTG.TREX:
+        subtype = check_subtype(tg_instance.node)
+        if subtype == NodeSubTypeTG.TREX:
             unit_rate = str(rate) + self.get_rate_type_str()
             if skip_warmup:
-                tg_instance.trex_stl_start_remote_exec(self.get_duration(),
-                                                       unit_rate, frame_size,
-                                                       traffic_profile,
-                                                       warmup_time=0.0)
+                tg_instance.trex_stl_start_remote_exec(
+                    self.get_duration(), unit_rate, frame_size, traffic_profile,
+                    warmup_time=0.0)
             else:
-                tg_instance.trex_stl_start_remote_exec(self.get_duration(),
-                                                       unit_rate, frame_size,
-                                                       traffic_profile)
+                tg_instance.trex_stl_start_remote_exec(
+                    self.get_duration(), unit_rate, frame_size, traffic_profile)
             loss = tg_instance.get_loss()
             sent = tg_instance.get_sent()
             if self.loss_acceptance_type_is_percentage():
                 loss = (float(loss) / float(sent)) * 100
-
-            logger.trace("comparing: {} < {} {}".format(loss,
-                                                        loss_acceptance,
-                                                        loss_acceptance_type))
-            if float(loss) > float(loss_acceptance):
-                return False
-            else:
-                return True
-        else:
-            raise NotImplementedError("TG subtype not supported")
+            logger.trace("comparing: {los} < {acc} {typ}".format(
+                los=loss, acc=loss_acceptance, typ=loss_acceptance_type))
+            return float(loss) <= float(loss_acceptance)
+        return False
 
     def get_latency(self):
         """Returns min/avg/max latency.
@@ -114,18 +131,25 @@ class TrafficGenerator(AbstractMeasurer):
     ROBOT_LIBRARY_SCOPE = 'TEST SUITE'
 
     def __init__(self):
+        # TODO: Number of fields will be reduced with CSIT-1378.
+        self._node = None
+        # T-REX interface order mapping
+        self._ifaces_reordered = False
+        # Result holding fields, to be removed.
         self._result = None
         self._loss = None
         self._sent = None
         self._latency = None
         self._received = None
-        self._node = None
-        # T-REX interface order mapping
-        self._ifaces_reordered = False
-        # Parameters not given by measure().
+        # Measurement input fields, needed for async stop result.
+        self._start_time = None
+        self._rate = None
+        # Other input parameters, not knowable from measure() signature.
         self.frame_size = None
         self.traffic_profile = None
         self.warmup_time = None
+        # TODO: Rename "xstats" to something opaque, so TRex is not privileged?
+        self._xstats = (None, None)
 
     @property
     def node(self):
@@ -199,13 +223,12 @@ class TrafficGenerator(AbstractMeasurer):
         :returns: nothing
         :raises RuntimeError: In case of issue during initialization.
         """
-        if tg_node['type'] != NodeType.TG:
-            raise RuntimeError('Node type is not a TG')
-        self._node = tg_node
+        subtype = check_subtype(tg_node)
+        if subtype == NodeSubTypeTG.TREX:
+            self._node = tg_node
 
-        if tg_node['subtype'] == NodeSubTypeTG.TREX:
             ssh = SSH()
-            ssh.connect(tg_node)
+            ssh.connect(self._node)
 
             (ret, _, _) = ssh.exec_command(
                 "sudo -E sh -c '{0}/resources/tools/trex/"
@@ -215,10 +238,10 @@ class TrafficGenerator(AbstractMeasurer):
             if int(ret) != 0:
                 raise RuntimeError('TRex installation failed.')
 
-            if1_pci = Topology().get_interface_pci_addr(tg_node, tg_if1)
-            if2_pci = Topology().get_interface_pci_addr(tg_node, tg_if2)
-            if1_addr = Topology().get_interface_mac(tg_node, tg_if1)
-            if2_addr = Topology().get_interface_mac(tg_node, tg_if2)
+            if1_pci = Topology().get_interface_pci_addr(self._node, tg_if1)
+            if2_pci = Topology().get_interface_pci_addr(self._node, tg_if2)
+            if1_addr = Topology().get_interface_mac(self._node, tg_if1)
+            if2_addr = Topology().get_interface_mac(self._node, tg_if2)
 
             if osi_layer == 'L2':
                 if1_adj_addr = if2_addr
@@ -229,8 +252,8 @@ class TrafficGenerator(AbstractMeasurer):
                 if2_adj_addr = Topology().get_interface_mac(tg_if2_adj_node,
                                                             tg_if2_adj_if)
             elif osi_layer == 'L7':
-                if1_addr = Topology().get_interface_ip4(tg_node, tg_if1)
-                if2_addr = Topology().get_interface_ip4(tg_node, tg_if2)
+                if1_addr = Topology().get_interface_ip4(self._node, tg_if1)
+                if2_addr = Topology().get_interface_ip4(self._node, tg_if2)
                 if1_adj_addr = Topology().get_interface_ip4(tg_if1_adj_node,
                                                             tg_if1_adj_if)
                 if2_adj_addr = Topology().get_interface_ip4(tg_if2_adj_node,
@@ -252,8 +275,7 @@ class TrafficGenerator(AbstractMeasurer):
             if osi_layer == 'L2' or osi_layer == 'L3':
                 (ret, _, _) = ssh.exec_command(
                     "sudo sh -c 'cat << EOF > /etc/trex_cfg.yaml\n"
-                    "- port_limit: 2\n"
-                    "  version: 2\n"
+                    "- version: 2\n"
                     "  interfaces: [\"{0}\",\"{1}\"]\n"
                     "  port_info:\n"
                     "      - dest_mac: [{2}]\n"
@@ -269,8 +291,7 @@ class TrafficGenerator(AbstractMeasurer):
             elif osi_layer == 'L7':
                 (ret, _, _) = ssh.exec_command(
                     "sudo sh -c 'cat << EOF > /etc/trex_cfg.yaml\n"
-                    "- port_limit: 2\n"
-                    "  version: 2\n"
+                    "- version: 2\n"
                     "  interfaces: [\"{0}\",\"{1}\"]\n"
                     "  port_info:\n"
                     "      - ip: [{2}]\n"
@@ -286,49 +307,66 @@ class TrafficGenerator(AbstractMeasurer):
             if int(ret) != 0:
                 raise RuntimeError('TRex config generation error')
 
-            for _ in range(0, 3):
-                # kill TRex only if it is already running
-                ssh.exec_command(
-                    "sh -c 'pgrep t-rex && sudo pkill t-rex && sleep 3'")
+            self._startup_trex(osi_layer)
 
-                # configure TRex
-                (ret, _, _) = ssh.exec_command(
-                    "sh -c 'cd {0}/scripts/ && sudo ./trex-cfg'"\
-                    .format(Constants.TREX_INSTALL_DIR))
-                if int(ret) != 0:
-                    raise RuntimeError('trex-cfg failed')
-
-                # start TRex
-                if osi_layer == 'L2' or osi_layer == 'L3':
-                    (ret, _, _) = ssh.exec_command(
-                        "sh -c 'cd {0}/scripts/ && "
-                        "sudo nohup ./t-rex-64 -i -c 7 --iom 0 > /tmp/trex.log "
-                        "2>&1 &' > /dev/null"\
-                        .format(Constants.TREX_INSTALL_DIR))
-                elif osi_layer == 'L7':
-                    (ret, _, _) = ssh.exec_command(
-                        "sh -c 'cd {0}/scripts/ && "
-                        "sudo nohup ./t-rex-64 --astf -i -c 7 --iom 0 > "
-                        "/tmp/trex.log 2>&1 &' > /dev/null"\
-                        .format(Constants.TREX_INSTALL_DIR))
-                else:
-                    raise ValueError("Unknown Test Type")
-                if int(ret) != 0:
-                    ssh.exec_command("sh -c 'cat /tmp/trex.log'")
-                    raise RuntimeError('t-rex-64 startup failed')
-
-                # get TRex server info
-                (ret, _, _) = ssh.exec_command(
-                    "sh -c 'sleep 3; "
-                    "{0}/resources/tools/trex/trex_server_info.py'"\
-                    .format(Constants.REMOTE_FW_DIR),
-                    timeout=120)
-                if int(ret) == 0:
-                    # If we get info TRex is running
-                    return
-            # after max retries TRex is still not responding to API
-            # critical error occurred
-            raise RuntimeError('t-rex-64 startup failed')
+    def _startup_trex(self, osi_layer):
+        """Startup sequence for the TRex traffic generator.
+
+        :param osi_layer: 'L2', 'L3' or 'L7' - OSI Layer testing type.
+        :type osi_layer: str
+        :raises RuntimeError: If node subtype is not a TREX or startup failed.
+        """
+        # No need to check subtype, we know it is TREX.
+        for _ in range(0, 3):
+            # Kill TRex only if it is already running.
+            cmd = "sh -c 'pgrep t-rex && pkill t-rex && sleep 3 || true'"
+            exec_cmd_no_error(
+                self._node, cmd, sudo=True, message='Kill TRex failed!')
+
+            # Configure TRex.
+            ports = ''
+            for port in self._node['interfaces'].values():
+                ports += ' {pci}'.format(pci=port.get('pci_address'))
+
+            cmd = ("sh -c 'cd {dir}/scripts/ && "
+                   "./dpdk_nic_bind.py -u {ports} || true'"
+                   .format(dir=Constants.TREX_INSTALL_DIR, ports=ports))
+            exec_cmd_no_error(
+                self._node, cmd, sudo=True,
+                message='Unbind PCI ports from driver failed!')
+
+            cmd = ("sh -c 'cd {dir}/scripts/ && ./trex-cfg'"
+                   .format(dir=Constants.TREX_INSTALL_DIR))
+            exec_cmd_no_error(
+                self._node, cmd, sudo=True, message='Config TRex failed!')
+
+            # Start TRex.
+            cmd = ("sh -c 'cd {dir}/scripts/ && "
+                   "nohup ./t-rex-64 {mode} --prefix $(hostname)"
+                   " -i -c 7 > /tmp/trex.log 2>&1 &' > /dev/null"
+                   .format(dir=Constants.TREX_INSTALL_DIR,
+                           mode='--astf' if osi_layer == 'L7' else ''))
+            try:
+                exec_cmd_no_error(self._node, cmd, sudo=True)
+            except RuntimeError:
+                cmd = "sh -c 'cat /tmp/trex.log'"
+                exec_cmd_no_error(self._node, cmd, sudo=True,
+                                  message='Get TRex logs failed!')
+                raise RuntimeError('Start TRex failed!')
+
+            # Test if TRex starts successfuly.
+            cmd = ("sh -c '{dir}/resources/tools/trex/trex_server_info.py'"
+                   .format(dir=Constants.REMOTE_FW_DIR))
+            try:
+                exec_cmd_no_error(
+                    self._node, cmd, sudo=True, message='Test TRex failed!',
+                    retries=20)
+            except RuntimeError:
+                continue
+            return
+        # After max retries TRex is still not responding to API critical error
+        # occurred.
+        raise RuntimeError('Start TRex failed after multiple retries!')
 
     @staticmethod
     def is_trex_running(node):
@@ -340,8 +378,7 @@ class TrafficGenerator(AbstractMeasurer):
         :rtype: bool
         :raises RuntimeError: If node type is not a TG.
         """
-        if node['type'] != NodeType.TG:
-            raise RuntimeError('Node type is not a TG')
+        # No need to check subtype, we know it is TREX.
 
         ssh = SSH()
         ssh.connect(node)
@@ -358,9 +395,8 @@ class TrafficGenerator(AbstractMeasurer):
         :raises RuntimeError: If node type is not a TG,
             or if TRex teardown fails.
         """
-        if node['type'] != NodeType.TG:
-            raise RuntimeError('Node type is not a TG')
-        if node['subtype'] == NodeSubTypeTG.TREX:
+        subtype = check_subtype(node)
+        if subtype == NodeSubTypeTG.TREX:
             ssh = SSH()
             ssh.connect(node)
             (ret, _, _) = ssh.exec_command(
@@ -368,31 +404,64 @@ class TrafficGenerator(AbstractMeasurer):
             if int(ret) != 0:
                 raise RuntimeError('pkill t-rex failed')
 
-    @staticmethod
-    def trex_stl_stop_remote_exec(node):
+    def _parse_traffic_results(self, stdout):
+        """Parse stdout of scripts into fields of self.
+
+        Block of code to reuse, by sync start, or stop after async.
+        TODO: Is the output TG subtype dependent?
+
+        :param stdout: Text containing the standard output.
+        :type stdout: str
+        """
+        # last line from console output
+        line = stdout.splitlines()[-1]
+        self._result = line
+        logger.info('TrafficGen result: {0}'.format(self._result))
+        self._received = self._result.split(', ')[1].split('=')[1]
+        self._sent = self._result.split(', ')[2].split('=')[1]
+        self._loss = self._result.split(', ')[3].split('=')[1]
+        self._latency = []
+        self._latency.append(self._result.split(', ')[4].split('=')[1])
+        self._latency.append(self._result.split(', ')[5].split('=')[1])
+
+    def trex_stl_stop_remote_exec(self, node):
         """Execute script on remote node over ssh to stop running traffic.
 
+        Internal state is updated with measurement results.
+
         :param node: TRex generator node.
         :type node: dict
-        :returns: Nothing
         :raises RuntimeError: If stop traffic script fails.
         """
+        # No need to check subtype, we know it is TREX.
         ssh = SSH()
         ssh.connect(node)
 
-        (ret, _, _) = ssh.exec_command(
-            "sh -c '{}/resources/tools/trex/"
-            "trex_stateless_stop.py'".format(Constants.REMOTE_FW_DIR))
+        x_args = ""
+        for index, value in enumerate(self._xstats):
+            if value is not None:
+                # Nested quoting is fun.
+                value = value.replace("'", "\"")
+                x_args += " --xstat{i}='\"'\"'{v}'\"'\"'".format(
+                    i=index, v=value)
+        (ret, stdout, _) = ssh.exec_command(
+            "sh -c '{d}/resources/tools/trex/trex_stateless_stop.py{a}'"\
+            .format(d=Constants.REMOTE_FW_DIR, a=x_args))
 
         if int(ret) != 0:
             raise RuntimeError('TRex stateless runtime error')
 
+        self._parse_traffic_results(stdout)
+
     def trex_stl_start_remote_exec(
             self, duration, rate, frame_size, traffic_profile, async_call=False,
             latency=True, warmup_time=5.0, unidirection=False, tx_port=0,
             rx_port=1):
         """Execute script on remote node over ssh to start traffic.
 
+        In sync mode, measurement results are stored internally.
+        In async mode, initial data including xstats are stored internally.
+
         :param duration: Time expresed in seconds for how long to send traffic.
         :param rate: Traffic rate expressed with units (pps, %)
         :param frame_size: L2 frame size to send (without padding and IPG).
@@ -418,6 +487,7 @@ class TrafficGenerator(AbstractMeasurer):
         :type rx_port: int
         :raises RuntimeError: In case of TG driver issue.
         """
+        # No need to check subtype, we know it is TREX.
         ssh = SSH()
         ssh.connect(self._node)
         reorder = self._ifaces_reordered  # Just to make the next line fit.
@@ -440,7 +510,7 @@ class TrafficGenerator(AbstractMeasurer):
                 frame_size=frame_size, rate=rate, warmup=warmup_time, p_0=p_0,
                 p_1=p_1)
         if async_call:
-            command += " --async"
+            command += " --async_start"
         if latency:
             command += " --latency"
         if unidirection:
@@ -454,32 +524,37 @@ class TrafficGenerator(AbstractMeasurer):
             raise RuntimeError('TRex stateless runtime error')
         elif async_call:
             #no result
+            self._start_time = time.time()
+            self._rate = float(rate[:-3]) if "pps" in rate else float(rate)
             self._received = None
             self._sent = None
             self._loss = None
             self._latency = None
+            xstats = [None, None]
+            index = 0
+            for line in stdout.splitlines():
+                if "Xstats snapshot {i}: ".format(i=index) in line:
+                    xstats[index] = line[19:]
+                    index += 1
+                if index == 2:
+                    break
+            self._xstats = tuple(xstats)
         else:
-            # last line from console output
-            line = stdout.splitlines()[-1]
-            self._result = line
-            logger.info('TrafficGen result: {0}'.format(self._result))
-            self._received = self._result.split(', ')[1].split('=')[1]
-            self._sent = self._result.split(', ')[2].split('=')[1]
-            self._loss = self._result.split(', ')[3].split('=')[1]
-            self._latency = []
-            self._latency.append(self._result.split(', ')[4].split('=')[1])
-            self._latency.append(self._result.split(', ')[5].split('=')[1])
+            self._parse_traffic_results(stdout)
+            self._start_time = None
+            self._rate = None
 
     def stop_traffic_on_tg(self):
         """Stop all traffic on TG.
 
-        :returns: Nothing
+        :returns: Structure containing the result of the measurement.
+        :rtype: ReceiveRateMeasurement
         :raises RuntimeError: If TG is not set.
         """
-        if self._node is None:
-            raise RuntimeError("TG is not set")
-        if self._node['subtype'] == NodeSubTypeTG.TREX:
+        subtype = check_subtype(self._node)
+        if subtype == NodeSubTypeTG.TREX:
             self.trex_stl_stop_remote_exec(self._node)
+        return self.get_measurement_result()
 
     def send_traffic_on_tg(
             self, duration, rate, frame_size, traffic_profile, warmup_time=5,
@@ -487,6 +562,11 @@ class TrafficGenerator(AbstractMeasurer):
             rx_port=1):
         """Send traffic from all configured interfaces on TG.
 
+        In async mode, xstats is stored internally,
+        to enable getting correct result when stopping the traffic.
+        In both modes, stdout is returned,
+        but _parse_traffic_results only works in sync output.
+
         Note that bidirectional traffic also contains flows
         transmitted from rx_port and received in tx_port.
         But some tests use asymmetric traffic, so those arguments are relevant.
@@ -530,22 +610,11 @@ class TrafficGenerator(AbstractMeasurer):
             or if subtype is not specified.
         :raises NotImplementedError: If TG is not supported.
         """
-
-        node = self._node
-        if node is None:
-            raise RuntimeError("TG is not set")
-
-        if node['type'] != NodeType.TG:
-            raise RuntimeError('Node type is not a TG')
-
-        if node['subtype'] is None:
-            raise RuntimeError('TG subtype not defined')
-        elif node['subtype'] == NodeSubTypeTG.TREX:
+        subtype = check_subtype(self._node)
+        if subtype == NodeSubTypeTG.TREX:
             self.trex_stl_start_remote_exec(
                 duration, rate, frame_size, traffic_profile, async_call,
                 latency, warmup_time, unidirection, tx_port, rx_port)
-        else:
-            raise NotImplementedError("TG subtype not supported")
 
         return self._result
 
@@ -612,6 +681,36 @@ class TrafficGenerator(AbstractMeasurer):
         self.traffic_profile = str(traffic_profile)
         self.warmup_time = float(warmup_time)
 
+    def get_measurement_result(self, duration=None, transmit_rate=None):
+        """Return the result of last measurement as ReceiveRateMeasurement.
+
+        Separate function, as measurements can end either by time
+        or by explicit call, this is the common block at the end.
+
+        TODO: Fail on running or already reported measurement.
+
+        :param duration: Measurement duration [s] if known beforehand.
+            For explicitly stopped measurement it is estimated.
+        :param transmit_rate: Target aggregate transmit rate [pps].
+            If not given, computed assuming it was bidirectional.
+        :type duration: float or NoneType
+        :type transmit_rate: float or NoneType
+        :returns: Structure containing the result of the measurement.
+        :rtype: ReceiveRateMeasurement
+        """
+        if duration is None:
+            duration = time.time() - self._start_time
+            self._start_time = None
+        if transmit_rate is None:
+            # Assuming bi-directional traffic here.
+            transmit_rate = self._rate * 2.0
+        transmit_count = int(self.get_sent())
+        loss_count = int(self.get_loss())
+        measurement = ReceiveRateMeasurement(
+            duration, transmit_rate, transmit_count, loss_count)
+        measurement.latency = self.get_latency_int()
+        return measurement
+
     def measure(self, duration, transmit_rate):
         """Run bi-directional measurement, parse and return results.
 
@@ -627,17 +726,12 @@ class TrafficGenerator(AbstractMeasurer):
         """
         duration = float(duration)
         transmit_rate = float(transmit_rate)
-        # Trex needs target Tr per stream, but reports aggregate Tx and Dx.
+        # TG needs target Tr per stream, but reports aggregate Tx and Dx.
         unit_rate = str(transmit_rate / 2.0) + "pps"
         self.send_traffic_on_tg(
             duration, unit_rate, self.frame_size, self.traffic_profile,
             warmup_time=self.warmup_time, latency=True)
-        transmit_count = int(self.get_sent())
-        loss_count = int(self.get_loss())
-        measurement = ReceiveRateMeasurement(
-            duration, transmit_rate, transmit_count, loss_count)
-        measurement.latency = self.get_latency_int()
-        return measurement
+        return self.get_measurement_result(duration, transmit_rate)
 
 
 class OptimizedSearch(object):