X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2FLispUtil.py;h=a77f4ad8544601adcac8ced89e6d2ea83871da4a;hb=5fc9a9c0129adb984c2823f2f7c691e8bb0b7798;hp=114cd72202d5e8d8c726a496b585d295faebf0cf;hpb=63a426bfd056dc93cd11bf0743b03a13b8489e28;p=csit.git diff --git a/resources/libraries/python/LispUtil.py b/resources/libraries/python/LispUtil.py index 114cd72202..a77f4ad854 100644 --- a/resources/libraries/python/LispUtil.py +++ b/resources/libraries/python/LispUtil.py @@ -13,19 +13,16 @@ """Lisp utilities library.""" -from robot.api import logger from ipaddress import IPv4Address, IPv6Address +from robot.api import logger -from resources.libraries.python.topology import Topology -from resources.libraries.python.PapiExecutor import PapiSocketExecutor from resources.libraries.python.L2Util import L2Util +from resources.libraries.python.PapiExecutor import PapiSocketExecutor +from resources.libraries.python.topology import Topology -class LispUtil(object): +class LispUtil: """Implements keywords for Lisp tests.""" - def __init__(self): - pass - @staticmethod def vpp_show_lisp_state(node): """Get lisp state from VPP node. @@ -35,17 +32,17 @@ class LispUtil(object): :returns: Lisp gpe state. :rtype: dict """ - cmd = 'show_lisp_status' - err_msg = "Failed to get LISP status on host {host}".format( - host=node['host']) + cmd = u"show_lisp_status" + err_msg = f"Failed to get LISP status on host {node['host']}" with PapiSocketExecutor(node) as papi_exec: reply = papi_exec.add(cmd).get_reply(err_msg) data = dict() - data["feature_status"] = "enabled" if reply["feature_status"] else \ - "disabled" - data["gpe_status"] = "enabled" if reply["gpe_status"] else "disabled" + data[u"feature_status"] = u"enabled" if reply[u"feature_status"] \ + else u"disabled" + data[u"gpe_status"] = u"enabled" if reply[u"gpe_status"] \ + else u"disabled" return data @staticmethod @@ -60,24 +57,27 @@ class LispUtil(object): :returns: Lisp locator_set data as python list. :rtype: list """ + ifilter = {u"_": 0, u"_local": 1, u"_remote": 2} + args = dict( + filter=ifilter[u"_" + items_filter] + ) - ifilter = {"_": 0, "_local": 1, "_remote": 2} - args = dict(filter=ifilter["_" + items_filter]) - - cmd = 'lisp_locator_set_dump' - err_msg = "Failed to get LISP locator set on host {host}".format( - host=node['host']) + cmd = u"lisp_locator_set_dump" + err_msg = f"Failed to get LISP locator set on host {node['host']}" try: with PapiSocketExecutor(node) as papi_exec: details = papi_exec.add(cmd, **args).get_details(err_msg) - data = [] + data = list() for locator in details: - data.append({"ls_name": locator["ls_name"].rstrip('\x00'), - "ls_index": locator["ls_index"]}) + data.append( + {u"ls_name": locator[u"ls_name"].rstrip(b'\0'), + u"ls_index": locator[u"ls_index"]} + ) return data - except (ValueError, LookupError): - return [] + except (ValueError, LookupError) as err: + logger.warn(f"Failed to get LISP locator set {err}") + return list() @staticmethod def vpp_show_lisp_eid_table(node): @@ -88,31 +88,34 @@ class LispUtil(object): :returns: Lisp eid table as python list. :rtype: list """ - - cmd = 'lisp_eid_table_dump' - err_msg = "Failed to get LISP eid table on host {host}".format( - host=node['host']) + cmd = u"lisp_eid_table_dump" + err_msg = f"Failed to get LISP eid table on host {node[u'host']}" with PapiSocketExecutor(node) as papi_exec: details = papi_exec.add(cmd).get_details(err_msg) - data = [] + data = list() for eid_details in details: - eid = 'Bad eid type' - if eid_details["eid_type"] == 0: - prefix = str(eid_details["eid_prefix_len"]) - eid = str(IPv4Address(eid_details["eid"][0:4])) + "/" + prefix - elif eid_details["eid_type"] == 1: - prefix = str(eid_details["eid_prefix_len"]) - eid = str(IPv6Address(eid_details["eid"])) + "/" + prefix - elif eid_details["eid_type"] == 2: - eid = str(L2Util.bin_to_mac(eid_details["eid"][0:6])) - data.append({"action": eid_details["action"], - "is_local": eid_details["is_local"], - "eid": eid, - "vni": eid_details["vni"], - "ttl": eid_details["ttl"], - "authoritative": eid_details["authoritative"]}) + eid = u"Bad eid type" + if eid_details[u"eid_type"] == 0: + prefix = str(eid_details[u"eid_prefix_len"]) + eid = str(IPv4Address(eid_details[u"eid"][0:4])) + u"/" + \ + prefix + elif eid_details[u"eid_type"] == 1: + prefix = str(eid_details[u"eid_prefix_len"]) + eid = str(IPv6Address(eid_details[u"eid"])) + u"/" + prefix + elif eid_details[u"eid_type"] == 2: + eid = str(L2Util.bin_to_mac(eid_details[u"eid"][0:6])) + data.append( + { + u"action": eid_details[u"action"], + u"is_local": eid_details[u"is_local"], + u"eid": eid, + u"vni": eid_details[u"vni"], + u"ttl": eid_details[u"ttl"], + u"authoritative": eid_details[u"authoritative"] + } + ) return data @staticmethod @@ -124,22 +127,20 @@ class LispUtil(object): :returns: Lisp map resolver as python list. :rtype: list """ - - cmd = 'lisp_map_resolver_dump' - err_msg = "Failed to get LISP map resolver on host {host}".format( - host=node['host']) + cmd = u"lisp_map_resolver_dump" + err_msg = f"Failed to get LISP map resolver on host {node[u'host']}" with PapiSocketExecutor(node) as papi_exec: details = papi_exec.add(cmd).get_details(err_msg) - data = [] + data = list() for resolver in details: - address = 'Bad is_ipv6 flag' - if resolver["is_ipv6"] == 0: - address = str(IPv4Address(resolver["ip_address"][0:4])) - elif resolver["is_ipv6"] == 1: - address = str(IPv6Address(resolver["ip_address"])) - data.append({"map resolver": address}) + address = u"Bad is_ipv6 flag" + if resolver[u"is_ipv6"] == 0: + address = str(IPv4Address(resolver[u"ip_address"][0:4])) + elif resolver[u"is_ipv6"] == 1: + address = str(IPv6Address(resolver[u"ip_address"])) + data.append({u"map resolver": address}) return data @staticmethod @@ -151,16 +152,15 @@ class LispUtil(object): :returns: LISP Map Register as python dict. :rtype: dict """ - - cmd = 'show_lisp_map_register_state' - err_msg = "Failed to get LISP map register state on host {host}".format( - host=node['host']) + cmd = u"show_lisp_map_register_state" + err_msg = f"Failed to get LISP map register state on host " \ + f"{node[u'host']}" with PapiSocketExecutor(node) as papi_exec: reply = papi_exec.add(cmd).get_reply(err_msg) data = dict() - data["state"] = "enabled" if reply["is_enabled"] else "disabled" + data[u"state"] = u"enabled" if reply[u"is_enabled"] else u"disabled" logger.info(data) return data @@ -173,16 +173,15 @@ class LispUtil(object): :returns: LISP Map Request mode as python dict. :rtype: dict """ - - cmd = 'show_lisp_map_request_mode' - err_msg = "Failed to get LISP map request mode on host {host}".format( - host=node['host']) + cmd = u"show_lisp_map_request_mode" + err_msg = f"Failed to get LISP map request mode on host {node[u'host']}" with PapiSocketExecutor(node) as papi_exec: reply = papi_exec.add(cmd).get_reply(err_msg) data = dict() - data["map_request_mode"] = "src-dst" if reply["mode"] else "dst-only" + data[u"map_request_mode"] = u"src-dst" if reply[u"mode"] \ + else u"dst-only" logger.info(data) return data @@ -195,22 +194,20 @@ class LispUtil(object): :returns: LISP Map Server as python list. :rtype: list """ - - cmd = 'lisp_map_server_dump' - err_msg = "Failed to get LISP map server on host {host}".format( - host=node['host']) + cmd = u"lisp_map_server_dump" + err_msg = f"Failed to get LISP map server on host {node[u'host']}" with PapiSocketExecutor(node) as papi_exec: details = papi_exec.add(cmd).get_details(err_msg) - data = [] + data = list() for server in details: - address = 'Bad is_ipv6 flag' - if server["is_ipv6"] == 0: - address = str(IPv4Address(server["ip_address"][0:4])) - elif server["is_ipv6"] == 1: - address = str(IPv6Address(server["ip_address"])) - data.append({"map-server": address}) + address = u"Bad is_ipv6 flag" + if server[u"is_ipv6"] == 0: + address = str(IPv4Address(server[u"ip_address"][0:4])) + elif server[u"is_ipv6"] == 1: + address = str(IPv6Address(server[u"ip_address"])) + data.append({u"map-server": address}) logger.info(data) return data @@ -223,24 +220,21 @@ class LispUtil(object): :returns: LISP PETR configuration as python dict. :rtype: dict """ - -# Note: VAT is returning ipv6 address instead of ipv4 - - cmd = 'show_lisp_use_petr' - err_msg = "Failed to get LISP petr config on host {host}".format( - host=node['host']) + # Note: VAT is returning ipv6 address instead of ipv4 + cmd = u"show_lisp_use_petr" + err_msg = f"Failed to get LISP petr config on host {node[u'host']}" with PapiSocketExecutor(node) as papi_exec: reply = papi_exec.add(cmd).get_reply(err_msg) data = dict() - data["status"] = "enabled" if reply["status"] else "disabled" - address = 'Bad is_ip4 flag' - if reply["is_ip4"] == 0: - address = str(IPv6Address(reply["address"])) - elif reply["is_ip4"] == 1: - address = str(IPv4Address(reply["address"][0:4])) - data["address"] = address + data[u"status"] = u"enabled" if reply[u"status"] else u"disabled" + address = u"Bad is_ip4 flag" + if reply[u"is_ip4"] == 0: + address = str(IPv6Address(reply[u"address"])) + elif reply[u"is_ip4"] == 1: + address = str(IPv4Address(reply[u"address"][0:4])) + data[u"address"] = address logger.info(data) return data @@ -253,16 +247,14 @@ class LispUtil(object): :returns: LISP RLOC configuration as python dict. :rtype: dict """ - - cmd = 'show_lisp_rloc_probe_state' - err_msg = "Failed to get LISP rloc config on host {host}".format( - host=node['host']) + cmd = u"show_lisp_rloc_probe_state" + err_msg = f"Failed to get LISP rloc config on host {node[u'host']}" with PapiSocketExecutor(node) as papi_exec: reply = papi_exec.add(cmd).get_reply(err_msg) data = dict() - data["state"] = "enabled" if reply["is_enabled"] else "disabled" + data[u"state"] = u"enabled" if reply[u"is_enabled"] else u"disabled" logger.info(data) return data @@ -275,16 +267,14 @@ class LispUtil(object): :returns: Lisp PITR config data. :rtype: dict """ - - cmd = 'show_lisp_pitr' - err_msg = "Failed to get LISP pitr on host {host}".format( - host=node['host']) + cmd = u"show_lisp_pitr" + err_msg = f"Failed to get LISP pitr on host {node[u'host']}" with PapiSocketExecutor(node) as papi_exec: reply = papi_exec.add(cmd).get_reply(err_msg) data = dict() - data["status"] = "enabled" if reply["status"] else "disabled" + data[u"status"] = u"enabled" if reply[u"status"] else u"disabled" return data @staticmethod @@ -296,19 +286,20 @@ class LispUtil(object): :type lisp_val1: list :type lisp_val2: list """ - len1 = len(lisp_val1) len2 = len(lisp_val2) + if len1 != len2: - raise RuntimeError('Values are not same. ' - 'Value 1 {} \n' - 'Value 2 {}.'.format(lisp_val1, - lisp_val2)) + raise RuntimeError( + f"Values are not same. Value 1 {lisp_val1} \n" + f"Value 2 {lisp_val2}." + ) for tmp in lisp_val1: if tmp not in lisp_val2: - raise RuntimeError('Value {} not found in vpp:\n' - '{}'.format(tmp, lisp_val2)) + raise RuntimeError( + f"Value {tmp} not found in vpp:\n{lisp_val2}" + ) def lisp_locator_s_should_be_equal(self, locator_set1, locator_set2): """Fail if the lisp values are not equal. @@ -318,11 +309,12 @@ class LispUtil(object): :type locator_set1: list :type locator_set2: list """ + locator_set_list = list() - locator_set_list = [] for item in locator_set1: if item not in locator_set_list: locator_set_list.append(item) + self.lisp_should_be_equal(locator_set_list, locator_set2) @staticmethod @@ -338,16 +330,15 @@ class LispUtil(object): from VAT. :rtype: tuple """ - topo = Topology() + locator_set_list = list() + locator_set_list_vat = list() - locator_set_list = [] - locator_set_list_vat = [] i = 0 for num in range(0, int(locator_set_number)): - locator_list = [] - for interface in node['interfaces'].values(): - link = interface.get('link') + locator_list = list() + for interface in list(node[u"interfaces"].values()): + link = interface.get(u"link") i += 1 if link is None: continue @@ -355,18 +346,24 @@ class LispUtil(object): if_name = topo.get_interface_by_link_name(node, link) sw_if_index = topo.get_interface_sw_index(node, if_name) if if_name is not None: - locator = {'locator-index': sw_if_index, - 'priority': i, - 'weight': i} + locator = { + u"locator-index": sw_if_index, + u"priority": i, + u"weight": i + } locator_list.append(locator) - l_name = 'ls{0}'.format(num) - locator_set = {'locator-set': l_name, - 'locator': locator_list} + l_name = f"ls{num}" + locator_set = { + u"locator-set": l_name, + u"locator": locator_list + } locator_set_list.append(locator_set) - locator_set_vat = {"ls_name": l_name, - "ls_index": num} + locator_set_vat = { + u"ls_name": l_name, + u"ls_index": num + } locator_set_list_vat.append(locator_set_vat) return locator_set_list, locator_set_list_vat @@ -384,15 +381,15 @@ class LispUtil(object): from VAT. :rtype: tuple """ - topo = Topology() - locator_set_list = [] - locator_set_list_vat = [] + locator_set_list = list() + locator_set_list_vat = list() + i = 0 for num in range(0, int(locator_set_number)): locator_list = [] - for interface in node['interfaces'].values(): - link = interface.get('link') + for interface in list(node[u"interfaces"].values()): + link = interface.get(u"link") i += 1 if link is None: continue @@ -400,17 +397,23 @@ class LispUtil(object): if_name = topo.get_interface_by_link_name(node, link) sw_if_index = topo.get_interface_sw_index(node, if_name) if if_name is not None: - l_name = 'ls{0}'.format(num) - locator = {'locator-index': sw_if_index, - 'priority': i, - 'weight': i} + l_name = f"ls{num}" + locator = { + u"locator-index": sw_if_index, + u"priority": i, + u"weight": i + } locator_list.append(locator) - locator_set = {'locator-set': l_name, - 'locator': locator_list} + locator_set = { + u"locator-set": l_name, + u"locator": locator_list + } locator_set_list.append(locator_set) - locator_set_vat = {"ls_name": l_name, - "ls_index": num} + locator_set_vat = { + u"ls_name": l_name, + u"ls_index": num + } locator_set_list_vat.append(locator_set_vat) return locator_set_list, locator_set_list_vat @@ -421,5 +424,4 @@ class LispUtil(object): :param lisp_params: Should be empty list. :type lisp_params: list """ - self.lisp_should_be_equal([], lisp_params)