Python3: resources and libraries
[csit.git] / resources / libraries / python / LispUtil.py
index 556ae1b..a77f4ad 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cisco and/or its affiliates.
+# Copyright (c) 2019 Cisco and/or its affiliates.
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
 
 """Lisp utilities library."""
 
-from resources.libraries.python.parsers.JsonParser import JsonParser
-from resources.libraries.python.topology import Topology
-from resources.libraries.python.VatExecutor import VatExecutor, VatTerminal
+from ipaddress import IPv4Address, IPv6Address
+from robot.api import logger
 
+from resources.libraries.python.L2Util import L2Util
+from resources.libraries.python.PapiExecutor import PapiSocketExecutor
+from resources.libraries.python.topology import Topology
 
-class LispUtil(object):
+class LispUtil:
     """Implements keywords for Lisp tests."""
 
-    def __init__(self):
-        pass
-
     @staticmethod
     def vpp_show_lisp_state(node):
         """Get lisp state from VPP node.
 
         :param node: VPP node.
         :type node: dict
-        :return: Lisp gpe state.
-        :rtype: list
+        :returns: Lisp gpe state.
+        :rtype: dict
         """
+        cmd = u"show_lisp_status"
+        err_msg = f"Failed to get LISP status on host {node['host']}"
 
-        vat = VatExecutor()
-        vat.execute_script_json_out('lisp/show_lisp_status.vat',
-                                    node)
-        return JsonParser().parse_data(vat.get_script_stdout())
+        with PapiSocketExecutor(node) as papi_exec:
+            reply = papi_exec.add(cmd).get_reply(err_msg)
+
+        data = dict()
+        data[u"feature_status"] = u"enabled" if reply[u"feature_status"] \
+            else u"disabled"
+        data[u"gpe_status"] = u"enabled" if reply[u"gpe_status"] \
+            else u"disabled"
+        return data
 
     @staticmethod
     def vpp_show_lisp_locator_set(node, items_filter):
@@ -45,20 +51,33 @@ class LispUtil(object):
 
         :param node: VPP node.
         :param items_filter: Filter which specifies which items should be
-        retrieved - local, remote, empty string = both.
+            retrieved - local, remote, empty string = both.
         :type node: dict
         :type items_filter: str
-        :return: Lisp locator_set data as python list.
+        :returns: Lisp locator_set data as python list.
         :rtype: list
         """
+        ifilter = {u"_": 0, u"_local": 1, u"_remote": 2}
+        args = dict(
+            filter=ifilter[u"_" + items_filter]
+        )
+
+        cmd = u"lisp_locator_set_dump"
+        err_msg = f"Failed to get LISP locator set on host {node['host']}"
 
         try:
-            with VatTerminal(node) as vat:
-                response = vat.vat_terminal_exec_cmd_from_template(
-                    'lisp/show_lisp_locator_set.vat', filter=items_filter)
-            return response[0]
-        except ValueError:
-            return []
+            with PapiSocketExecutor(node) as papi_exec:
+                details = papi_exec.add(cmd, **args).get_details(err_msg)
+            data = list()
+            for locator in details:
+                data.append(
+                    {u"ls_name": locator[u"ls_name"].rstrip(b'\0'),
+                     u"ls_index": locator[u"ls_index"]}
+                )
+            return data
+        except (ValueError, LookupError) as err:
+            logger.warn(f"Failed to get LISP locator set {err}")
+            return list()
 
     @staticmethod
     def vpp_show_lisp_eid_table(node):
@@ -66,13 +85,38 @@ class LispUtil(object):
 
         :param node: VPP node.
         :type node: dict
-        :return: Lisp eid table as python list.
+        :returns: Lisp eid table as python list.
         :rtype: list
         """
-
-        vat = VatExecutor()
-        vat.execute_script_json_out('lisp/show_lisp_eid_table.vat', node)
-        return JsonParser().parse_data(vat.get_script_stdout())
+        cmd = u"lisp_eid_table_dump"
+        err_msg = f"Failed to get LISP eid table on host {node[u'host']}"
+
+        with PapiSocketExecutor(node) as papi_exec:
+            details = papi_exec.add(cmd).get_details(err_msg)
+
+        data = list()
+        for eid_details in details:
+            eid = u"Bad eid type"
+            if eid_details[u"eid_type"] == 0:
+                prefix = str(eid_details[u"eid_prefix_len"])
+                eid = str(IPv4Address(eid_details[u"eid"][0:4])) + u"/" + \
+                      prefix
+            elif eid_details[u"eid_type"] == 1:
+                prefix = str(eid_details[u"eid_prefix_len"])
+                eid = str(IPv6Address(eid_details[u"eid"])) + u"/" + prefix
+            elif eid_details[u"eid_type"] == 2:
+                eid = str(L2Util.bin_to_mac(eid_details[u"eid"][0:6]))
+            data.append(
+                {
+                    u"action": eid_details[u"action"],
+                    u"is_local": eid_details[u"is_local"],
+                    u"eid": eid,
+                    u"vni": eid_details[u"vni"],
+                    u"ttl": eid_details[u"ttl"],
+                    u"authoritative": eid_details[u"authoritative"]
+                }
+            )
+        return data
 
     @staticmethod
     def vpp_show_lisp_map_resolver(node):
@@ -80,13 +124,158 @@ class LispUtil(object):
 
         :param node: VPP node.
         :type node: dict
-        :return: Lisp map resolver as python list.
+        :returns: Lisp map resolver as python list.
         :rtype: list
         """
+        cmd = u"lisp_map_resolver_dump"
+        err_msg = f"Failed to get LISP map resolver on host {node[u'host']}"
+
+        with PapiSocketExecutor(node) as papi_exec:
+            details = papi_exec.add(cmd).get_details(err_msg)
+
+        data = list()
+        for resolver in details:
+            address = u"Bad is_ipv6 flag"
+            if resolver[u"is_ipv6"] == 0:
+                address = str(IPv4Address(resolver[u"ip_address"][0:4]))
+            elif resolver[u"is_ipv6"] == 1:
+                address = str(IPv6Address(resolver[u"ip_address"]))
+            data.append({u"map resolver": address})
+        return data
+
+    @staticmethod
+    def vpp_show_lisp_map_register(node):
+        """Get LISP Map Register from VPP node.
+
+        :param node: VPP node.
+        :type node: dict
+        :returns: LISP Map Register as python dict.
+        :rtype: dict
+        """
+        cmd = u"show_lisp_map_register_state"
+        err_msg = f"Failed to get LISP map register state on host " \
+            f"{node[u'host']}"
+
+        with PapiSocketExecutor(node) as papi_exec:
+            reply = papi_exec.add(cmd).get_reply(err_msg)
+
+        data = dict()
+        data[u"state"] = u"enabled" if reply[u"is_enabled"] else u"disabled"
+        logger.info(data)
+        return data
+
+    @staticmethod
+    def vpp_show_lisp_map_request_mode(node):
+        """Get LISP Map Request mode from VPP node.
+
+        :param node: VPP node.
+        :type node: dict
+        :returns: LISP Map Request mode as python dict.
+        :rtype: dict
+        """
+        cmd = u"show_lisp_map_request_mode"
+        err_msg = f"Failed to get LISP map request mode on host {node[u'host']}"
 
-        vat = VatExecutor()
-        vat.execute_script_json_out('lisp/show_lisp_map_resolver.vat', node)
-        return JsonParser().parse_data(vat.get_script_stdout())
+        with PapiSocketExecutor(node) as papi_exec:
+            reply = papi_exec.add(cmd).get_reply(err_msg)
+
+        data = dict()
+        data[u"map_request_mode"] = u"src-dst" if reply[u"mode"] \
+            else u"dst-only"
+        logger.info(data)
+        return data
+
+    @staticmethod
+    def vpp_show_lisp_map_server(node):
+        """Get LISP Map Server from VPP node.
+
+        :param node: VPP node.
+        :type node: dict
+        :returns: LISP Map Server as python list.
+        :rtype: list
+        """
+        cmd = u"lisp_map_server_dump"
+        err_msg = f"Failed to get LISP map server on host {node[u'host']}"
+
+        with PapiSocketExecutor(node) as papi_exec:
+            details = papi_exec.add(cmd).get_details(err_msg)
+
+        data = list()
+        for server in details:
+            address = u"Bad is_ipv6 flag"
+            if server[u"is_ipv6"] == 0:
+                address = str(IPv4Address(server[u"ip_address"][0:4]))
+            elif server[u"is_ipv6"] == 1:
+                address = str(IPv6Address(server[u"ip_address"]))
+            data.append({u"map-server": address})
+        logger.info(data)
+        return data
+
+    @staticmethod
+    def vpp_show_lisp_petr_config(node):
+        """Get LISP PETR configuration from VPP node.
+
+        :param node: VPP node.
+        :type node: dict
+        :returns: LISP PETR configuration as python dict.
+        :rtype: dict
+        """
+        # Note: VAT is returning ipv6 address instead of ipv4
+        cmd = u"show_lisp_use_petr"
+        err_msg = f"Failed to get LISP petr config on host {node[u'host']}"
+
+        with PapiSocketExecutor(node) as papi_exec:
+            reply = papi_exec.add(cmd).get_reply(err_msg)
+
+        data = dict()
+        data[u"status"] = u"enabled" if reply[u"status"] else u"disabled"
+        address = u"Bad is_ip4 flag"
+        if reply[u"is_ip4"] == 0:
+            address = str(IPv6Address(reply[u"address"]))
+        elif reply[u"is_ip4"] == 1:
+            address = str(IPv4Address(reply[u"address"][0:4]))
+        data[u"address"] = address
+        logger.info(data)
+        return data
+
+    @staticmethod
+    def vpp_show_lisp_rloc_config(node):
+        """Get LISP RLOC configuration from VPP node.
+
+        :param node: VPP node.
+        :type node: dict
+        :returns: LISP RLOC configuration as python dict.
+        :rtype: dict
+        """
+        cmd = u"show_lisp_rloc_probe_state"
+        err_msg = f"Failed to get LISP rloc config on host {node[u'host']}"
+
+        with PapiSocketExecutor(node) as papi_exec:
+            reply = papi_exec.add(cmd).get_reply(err_msg)
+
+        data = dict()
+        data[u"state"] = u"enabled" if reply[u"is_enabled"] else u"disabled"
+        logger.info(data)
+        return data
+
+    @staticmethod
+    def vpp_show_lisp_pitr(node):
+        """Get Lisp PITR feature config from VPP node.
+
+        :param node: VPP node.
+        :type node: dict
+        :returns: Lisp PITR config data.
+        :rtype: dict
+        """
+        cmd = u"show_lisp_pitr"
+        err_msg = f"Failed to get LISP pitr on host {node[u'host']}"
+
+        with PapiSocketExecutor(node) as papi_exec:
+            reply = papi_exec.add(cmd).get_reply(err_msg)
+
+        data = dict()
+        data[u"status"] = u"enabled" if reply[u"status"] else u"disabled"
+        return data
 
     @staticmethod
     def lisp_should_be_equal(lisp_val1, lisp_val2):
@@ -97,19 +286,20 @@ class LispUtil(object):
         :type lisp_val1: list
         :type lisp_val2: list
         """
-
         len1 = len(lisp_val1)
         len2 = len(lisp_val2)
+
         if len1 != len2:
-            raise RuntimeError('Values are not same. '
-                               'Value 1 {} \n'
-                               'Value 2 {}.'.format(lisp_val1,
-                                                    lisp_val2))
+            raise RuntimeError(
+                f"Values are not same. Value 1 {lisp_val1} \n"
+                f"Value 2 {lisp_val2}."
+            )
 
         for tmp in lisp_val1:
             if tmp not in lisp_val2:
-                raise RuntimeError('Value {} not found in vpp:\n'
-                                   '{}'.format(tmp, lisp_val2))
+                raise RuntimeError(
+                    f"Value {tmp} not found in vpp:\n{lisp_val2}"
+                )
 
     def lisp_locator_s_should_be_equal(self, locator_set1, locator_set2):
         """Fail if the lisp values are not equal.
@@ -119,11 +309,12 @@ class LispUtil(object):
         :type locator_set1: list
         :type locator_set2: list
         """
+        locator_set_list = list()
 
-        locator_set_list = []
         for item in locator_set1:
             if item not in locator_set_list:
                 locator_set_list.append(item)
+
         self.lisp_should_be_equal(locator_set_list, locator_set2)
 
     @staticmethod
@@ -135,20 +326,19 @@ class LispUtil(object):
         :param locator_set_number: Generate n locator_set.
         :type node: dict
         :type locator_set_number: str
-        :return: list of lisp locator_set, list of lisp locator_set expected
-        from VAT.
+        :returns: list of lisp locator_set, list of lisp locator_set expected
+            from VAT.
         :rtype: tuple
         """
-
         topo = Topology()
+        locator_set_list = list()
+        locator_set_list_vat = list()
 
-        locator_set_list = []
-        locator_set_list_vat = []
         i = 0
         for num in range(0, int(locator_set_number)):
-            locator_list = []
-            for interface in node['interfaces'].values():
-                link = interface.get('link')
+            locator_list = list()
+            for interface in list(node[u"interfaces"].values()):
+                link = interface.get(u"link")
                 i += 1
                 if link is None:
                     continue
@@ -156,18 +346,24 @@ class LispUtil(object):
                 if_name = topo.get_interface_by_link_name(node, link)
                 sw_if_index = topo.get_interface_sw_index(node, if_name)
                 if if_name is not None:
-                    locator = {'locator-index': sw_if_index,
-                               'priority': i,
-                               'weight': i}
+                    locator = {
+                        u"locator-index": sw_if_index,
+                        u"priority": i,
+                        u"weight": i
+                    }
                     locator_list.append(locator)
 
-            l_name = 'ls{0}'.format(num)
-            locator_set = {'locator-set': l_name,
-                           'locator': locator_list}
+            l_name = f"ls{num}"
+            locator_set = {
+                u"locator-set": l_name,
+                u"locator": locator_list
+            }
             locator_set_list.append(locator_set)
 
-            locator_set_vat = {"ls_name": l_name,
-                               "ls_index": num}
+            locator_set_vat = {
+                u"ls_name": l_name,
+                u"ls_index": num
+            }
             locator_set_list_vat.append(locator_set_vat)
 
         return locator_set_list, locator_set_list_vat
@@ -181,19 +377,19 @@ class LispUtil(object):
         :param locator_set_number: Generate n locator_set.
         :type node: dict
         :type locator_set_number: str
-        :return: list of lisp locator_set, list of lisp locator_set expected
-        from VAT.
+        :returns: list of lisp locator_set, list of lisp locator_set expected
+            from VAT.
         :rtype: tuple
         """
-
         topo = Topology()
-        locator_set_list = []
-        locator_set_list_vat = []
+        locator_set_list = list()
+        locator_set_list_vat = list()
+
         i = 0
         for num in range(0, int(locator_set_number)):
             locator_list = []
-            for interface in node['interfaces'].values():
-                link = interface.get('link')
+            for interface in list(node[u"interfaces"].values()):
+                link = interface.get(u"link")
                 i += 1
                 if link is None:
                     continue
@@ -201,17 +397,23 @@ class LispUtil(object):
                 if_name = topo.get_interface_by_link_name(node, link)
                 sw_if_index = topo.get_interface_sw_index(node, if_name)
                 if if_name is not None:
-                    l_name = 'ls{0}'.format(num)
-                    locator = {'locator-index': sw_if_index,
-                               'priority': i,
-                               'weight': i}
+                    l_name = f"ls{num}"
+                    locator = {
+                        u"locator-index": sw_if_index,
+                        u"priority": i,
+                        u"weight": i
+                    }
                     locator_list.append(locator)
-                    locator_set = {'locator-set': l_name,
-                                   'locator': locator_list}
+                    locator_set = {
+                        u"locator-set": l_name,
+                        u"locator": locator_list
+                    }
                     locator_set_list.append(locator_set)
 
-                    locator_set_vat = {"ls_name": l_name,
-                               "ls_index": num}
+                    locator_set_vat = {
+                        u"ls_name": l_name,
+                        u"ls_index": num
+                    }
                     locator_set_list_vat.append(locator_set_vat)
 
         return locator_set_list, locator_set_list_vat
@@ -222,5 +424,4 @@ class LispUtil(object):
         :param lisp_params: Should be empty list.
         :type lisp_params: list
         """
-
         self.lisp_should_be_equal([], lisp_params)