Python3: resources and libraries
[csit.git] / resources / libraries / python / VPPUtil.py
index 6268e36..72325d8 100644 (file)
 
 """VPP util library."""
 
-import binascii
-
 from robot.api import logger
 
 from resources.libraries.python.Constants import Constants
 from resources.libraries.python.DUTSetup import DUTSetup
-from resources.libraries.python.PapiExecutor import PapiExecutor
+from resources.libraries.python.PapiExecutor import PapiSocketExecutor
 from resources.libraries.python.ssh import exec_cmd_no_error
 from resources.libraries.python.topology import NodeType
-from resources.libraries.python.VatExecutor import VatExecutor
 
 
-class VPPUtil(object):
+class VPPUtil:
     """General class for any VPP related methods/functions."""
 
     @staticmethod
@@ -41,20 +38,20 @@ class VPPUtil(object):
         :type additional_cmds: tuple
         """
         def_setting_tb_displayed = {
-            'IPv6 FIB': 'ip6 fib',
-            'IPv4 FIB': 'ip fib',
-            'Interface IP': 'int addr',
-            'Interfaces': 'int',
-            'ARP': 'ip arp',
-            'Errors': 'err'
+            u"IPv6 FIB": u"ip6 fib",
+            u"IPv4 FIB": u"ip fib",
+            u"Interface IP": u"int addr",
+            u"Interfaces": u"int",
+            u"ARP": u"ip arp",
+            u"Errors": u"err"
         }
 
         if additional_cmds:
             for cmd in additional_cmds:
-                def_setting_tb_displayed['Custom Setting: {}'.format(cmd)] = cmd
+                def_setting_tb_displayed[f"Custom Setting: {cmd}"] = cmd
 
         for _, cmd in def_setting_tb_displayed.items():
-            command = 'vppctl sh {cmd}'.format(cmd=cmd)
+            command = f"vppctl sh {cmd}"
             exec_cmd_no_error(node, command, timeout=30, sudo=True)
 
     @staticmethod
@@ -74,7 +71,7 @@ class VPPUtil(object):
         :type nodes: dict
         """
         for node in nodes.values():
-            if node['type'] == NodeType.DUT:
+            if node[u"type"] == NodeType.DUT:
                 VPPUtil.restart_vpp_service(node)
 
     @staticmethod
@@ -94,7 +91,7 @@ class VPPUtil(object):
         :type nodes: dict
         """
         for node in nodes.values():
-            if node['type'] == NodeType.DUT:
+            if node[u"type"] == NodeType.DUT:
                 VPPUtil.stop_vpp_service(node)
 
     @staticmethod
@@ -104,9 +101,8 @@ class VPPUtil(object):
         :param node: Topology node.
         :type node: dict
         """
-        cmd = 'command -v vpp'
-        exec_cmd_no_error(
-            node, cmd, message='VPP is not installed!')
+        cmd = u"command -v vpp"
+        exec_cmd_no_error(node, cmd, message=u"VPP is not installed!")
 
     @staticmethod
     def verify_vpp_started(node):
@@ -115,11 +111,16 @@ class VPPUtil(object):
         :param node: Topology node.
         :type node: dict
         """
-        cmd = ('vppctl show pci 2>&1 | '
-               'fgrep -v "Connection refused" | '
-               'fgrep -v "No such file or directory"')
+        cmd = u"echo \"show pci\" | sudo socat - UNIX-CONNECT:/run/vpp/cli.sock"
+        exec_cmd_no_error(
+            node, cmd, sudo=False, message=u"VPP failed to start!", retries=120
+        )
+
+        cmd = u"vppctl show pci 2>&1 | fgrep -v \"Connection refused\" | " \
+              u"fgrep -v \"No such file or directory\""
         exec_cmd_no_error(
-            node, cmd, sudo=True, message='VPP failed to start!', retries=120)
+            node, cmd, sudo=True, message=u"VPP failed to start!", retries=120
+        )
 
     @staticmethod
     def verify_vpp(node):
@@ -132,10 +133,11 @@ class VPPUtil(object):
         """
         VPPUtil.verify_vpp_installed(node)
         try:
-            # Verify responsivness of vppctl.
+            # Verify responsiveness of vppctl.
             VPPUtil.verify_vpp_started(node)
-            # Verify responsivness of PAPI.
+            # Verify responsiveness of PAPI.
             VPPUtil.show_log(node)
+            VPPUtil.vpp_show_version(node)
         finally:
             DUTSetup.get_service_logs(node, Constants.VPP_UNIT)
 
@@ -147,33 +149,23 @@ class VPPUtil(object):
         :type nodes: dict
         """
         for node in nodes.values():
-            if node['type'] == NodeType.DUT:
+            if node[u"type"] == NodeType.DUT:
                 VPPUtil.verify_vpp(node)
 
     @staticmethod
-    def vpp_show_version(node, verbose=True):
+    def vpp_show_version(node):
         """Run "show_version" PAPI command.
 
         :param node: Node to run command on.
-        :param verbose: Show version, compile date and compile location if True
-            otherwise show only version.
         :type node: dict
-        :type verbose: bool
         :returns: VPP version.
         :rtype: str
         """
-        with PapiExecutor(node) as papi_exec:
-            data = papi_exec.add('show_version').execute_should_pass().\
-                verify_reply()
-        version = ('VPP version:      {ver}\n'.
-                   format(ver=data['version'].rstrip('\0x00')))
-        if verbose:
-            version += ('Compile date:     {date}\n'
-                        'Compile location: {cl}\n '.
-                        format(date=data['build_date'].rstrip('\0x00'),
-                               cl=data['build_directory'].rstrip('\0x00')))
-        logger.info(version)
-        return data['version'].rstrip('\0x00')
+        cmd = u"show_version"
+        with PapiSocketExecutor(node) as papi_exec:
+            reply = papi_exec.add(cmd).get_reply()
+        logger.info(f"VPP version: {reply[u'version']}\n")
+        return f"{reply[u'version']}"
 
     @staticmethod
     def show_vpp_version_on_all_duts(nodes):
@@ -183,7 +175,7 @@ class VPPUtil(object):
         :type nodes: dict
         """
         for node in nodes.values():
-            if node['type'] == NodeType.DUT:
+            if node[u"type"] == NodeType.DUT:
                 VPPUtil.vpp_show_version(node)
 
     @staticmethod
@@ -194,73 +186,74 @@ class VPPUtil(object):
         :type node: dict
         """
 
-        cmd = 'sw_interface_dump'
-        cmd_reply = 'sw_interface_details'
-        args = dict(name_filter_valid=0, name_filter='')
-        err_msg = 'Failed to get interface dump on host {host}'.format(
-            host=node['host'])
-        with PapiExecutor(node) as papi_exec:
-            papi_resp = papi_exec.add(cmd, **args).execute_should_pass(err_msg)
-
-        papi_if_dump = papi_resp.reply[0]['api_reply']
-
-        if_data = list()
-        for item in papi_if_dump:
-            data = item[cmd_reply]
-            data['interface_name'] = data['interface_name'].rstrip('\x00')
-            data['tag'] = data['tag'].rstrip('\x00')
-            data['l2_address'] = str(':'.join(binascii.hexlify(
-                data['l2_address'])[i:i + 2] for i in range(0, 12, 2)).
-                                     decode('ascii'))
-            if_data.append(data)
+        cmd = u"sw_interface_dump"
+        args = dict(
+            name_filter_valid=False,
+            name_filter=u""
+        )
+        err_msg = f"Failed to get interface dump on host {node[u'host']}"
+        with PapiSocketExecutor(node) as papi_exec:
+            details = papi_exec.add(cmd, **args).get_details(err_msg)
+
+        for if_dump in details:
+            if_dump[u"l2_address"] = str(if_dump[u"l2_address"])
+            if_dump[u"b_dmac"] = str(if_dump[u"b_dmac"])
+            if_dump[u"b_smac"] = str(if_dump[u"b_smac"])
+            if_dump[u"flags"] = if_dump[u"flags"].value
+            if_dump[u"type"] = if_dump[u"type"].value
+            if_dump[u"link_duplex"] = if_dump[u"link_duplex"].value
+            if_dump[u"sub_if_flags"] = if_dump[u"sub_if_flags"].value \
+                if hasattr(if_dump[u"sub_if_flags"], u"value") \
+                else int(if_dump[u"sub_if_flags"])
         # TODO: return only base data
-        logger.trace('Interface data of host {host}:\n{if_data}'.format(
-            host=node['host'], if_data=if_data))
+        logger.trace(f"Interface data of host {node[u'host']}:\n{details}")
 
     @staticmethod
-    def vpp_show_crypto_device_mapping(node):
-        """Run "show crypto device mapping" CLI command.
-
-        :param node: Node to run command on.
-        :type node: dict
-        """
-        vat = VatExecutor()
-        vat.execute_script("show_crypto_device_mapping.vat", node,
-                           json_out=False)
-
-    @staticmethod
-    def vpp_enable_traces_on_dut(node):
+    def vpp_enable_traces_on_dut(node, fail_on_error=False):
         """Enable vpp packet traces on the DUT node.
 
         :param node: DUT node to set up.
+        :param fail_on_error: If True, keyword fails if an error occurs,
+            otherwise passes.
         :type node: dict
+        :type fail_on_error: bool
         """
-        vat = VatExecutor()
-        vat.execute_script("enable_dpdk_traces.vat", node, json_out=False)
-        vat.execute_script("enable_vhost_user_traces.vat", node, json_out=False)
-        vat.execute_script("enable_memif_traces.vat", node, json_out=False)
+        cmds = [
+            u"trace add dpdk-input 50",
+            u"trace add vhost-user-input 50",
+            u"trace add memif-input 50",
+            u"trace add avf-input 50"
+        ]
+
+        for cmd in cmds:
+            try:
+                PapiSocketExecutor.run_cli_cmd(node, cmd)
+            except AssertionError:
+                if fail_on_error:
+                    raise
 
     @staticmethod
-    def vpp_enable_traces_on_all_duts(nodes):
+    def vpp_enable_traces_on_all_duts(nodes, fail_on_error=False):
         """Enable vpp packet traces on all DUTs in the given topology.
 
         :param nodes: Nodes in the topology.
+        :param fail_on_error: If True, keyword fails if an error occurs,
+            otherwise passes.
         :type nodes: dict
+        :type fail_on_error: bool
         """
         for node in nodes.values():
-            if node['type'] == NodeType.DUT:
-                VPPUtil.vpp_enable_traces_on_dut(node)
+            if node[u"type"] == NodeType.DUT:
+                VPPUtil.vpp_enable_traces_on_dut(node, fail_on_error)
 
     @staticmethod
-    def vpp_enable_elog_traces_on_dut(node):
-        """Enable API/CLI/Barrier traces on the DUT node.
+    def vpp_enable_elog_traces(node):
+        """Enable API/CLI/Barrier traces on the specified topology node.
 
-        :param node: DUT node to set up.
+        :param node: Topology node.
         :type node: dict
         """
-        vat = VatExecutor()
-        vat.execute_script("elog_trace_api_cli_barrier.vat", node,
-                           json_out=False)
+        PapiSocketExecutor.run_cli_cmd(node, "elog trace api cli barrier")
 
     @staticmethod
     def vpp_enable_elog_traces_on_all_duts(nodes):
@@ -270,18 +263,17 @@ class VPPUtil(object):
         :type nodes: dict
         """
         for node in nodes.values():
-            if node['type'] == NodeType.DUT:
-                VPPUtil.vpp_enable_elog_traces_on_dut(node)
+            if node[u"type"] == NodeType.DUT:
+                VPPUtil.vpp_enable_elog_traces(node)
 
     @staticmethod
-    def show_event_logger_on_dut(node):
-        """Show event logger on the DUT node.
+    def show_event_logger(node):
+        """Show event logger on the specified topology node.
 
-        :param node: DUT node to show traces on.
+        :param node: Topology node.
         :type node: dict
         """
-        vat = VatExecutor()
-        vat.execute_script("show_event_logger.vat", node, json_out=False)
+        PapiSocketExecutor.run_cli_cmd(node, u"show event-logger")
 
     @staticmethod
     def show_event_logger_on_all_duts(nodes):
@@ -291,21 +283,28 @@ class VPPUtil(object):
         :type nodes: dict
         """
         for node in nodes.values():
-            if node['type'] == NodeType.DUT:
-                VPPUtil.show_event_logger_on_dut(node)
+            if node[u"type"] == NodeType.DUT:
+                VPPUtil.show_event_logger(node)
 
     @staticmethod
     def show_log(node):
-        """Show log on the specified topology node.
+        """Show logging on the specified topology node.
 
         :param node: Topology node.
         :type node: dict
-        :returns: VPP log data.
-        :rtype: list
         """
-        with PapiExecutor(node) as papi_exec:
-            return papi_exec.add('cli_inband', cmd='show log').get_replies().\
-                verify_reply()["reply"]
+        PapiSocketExecutor.run_cli_cmd(node, u"show logging")
+
+    @staticmethod
+    def show_log_on_all_duts(nodes):
+        """Show logging on all DUTs in the given topology.
+
+        :param nodes: Nodes in the topology.
+        :type nodes: dict
+        """
+        for node in nodes.values():
+            if node[u"type"] == NodeType.DUT:
+                VPPUtil.show_log(node)
 
     @staticmethod
     def vpp_show_threads(node):
@@ -316,6 +315,19 @@ class VPPUtil(object):
         :returns: VPP thread data.
         :rtype: list
         """
-        with PapiExecutor(node) as papi_exec:
-            return papi_exec.add('show_threads').execute_should_pass().\
-                verify_reply()["thread_data"]
+        cmd = u"show_threads"
+        with PapiSocketExecutor(node) as papi_exec:
+            reply = papi_exec.add(cmd).get_reply()
+
+        threads_data = list()
+        for thread in reply[u"thread_data"]:
+            thread_data = list()
+            for item in thread:
+                if isinstance(item, str):
+                    item = item.rstrip('\x00')
+                thread_data.append(item)
+            threads_data.append(thread_data)
+
+        logger.trace(f"show threads:\n{threads_data}")
+
+        return threads_data