X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2FLispUtil.py;h=a77f4ad8544601adcac8ced89e6d2ea83871da4a;hp=2926d01dfbe376af30486a97ff1c5e1d9e52f863;hb=d68951ac245150eeefa6e0f4156e4c1b5c9e9325;hpb=cf561a6e3d4c4fbd78ab6c9d0a9aa817bb3300fc diff --git a/resources/libraries/python/LispUtil.py b/resources/libraries/python/LispUtil.py index 2926d01dfb..a77f4ad854 100644 --- a/resources/libraries/python/LispUtil.py +++ b/resources/libraries/python/LispUtil.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 Cisco and/or its affiliates. +# Copyright (c) 2019 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -13,17 +13,16 @@ """Lisp utilities library.""" -from resources.libraries.python.parsers.JsonParser import JsonParser -from resources.libraries.python.topology import Topology -from resources.libraries.python.VatExecutor import VatExecutor, VatTerminal +from ipaddress import IPv4Address, IPv6Address +from robot.api import logger +from resources.libraries.python.L2Util import L2Util +from resources.libraries.python.PapiExecutor import PapiSocketExecutor +from resources.libraries.python.topology import Topology -class LispUtil(object): +class LispUtil: """Implements keywords for Lisp tests.""" - def __init__(self): - pass - @staticmethod def vpp_show_lisp_state(node): """Get lisp state from VPP node. @@ -31,13 +30,20 @@ class LispUtil(object): :param node: VPP node. :type node: dict :returns: Lisp gpe state. - :rtype: list + :rtype: dict """ + cmd = u"show_lisp_status" + err_msg = f"Failed to get LISP status on host {node['host']}" - vat = VatExecutor() - vat.execute_script_json_out('lisp/show_lisp_status.vat', - node) - return JsonParser().parse_data(vat.get_script_stdout()) + with PapiSocketExecutor(node) as papi_exec: + reply = papi_exec.add(cmd).get_reply(err_msg) + + data = dict() + data[u"feature_status"] = u"enabled" if reply[u"feature_status"] \ + else u"disabled" + data[u"gpe_status"] = u"enabled" if reply[u"gpe_status"] \ + else u"disabled" + return data @staticmethod def vpp_show_lisp_locator_set(node, items_filter): @@ -45,20 +51,33 @@ class LispUtil(object): :param node: VPP node. :param items_filter: Filter which specifies which items should be - retrieved - local, remote, empty string = both. + retrieved - local, remote, empty string = both. :type node: dict :type items_filter: str :returns: Lisp locator_set data as python list. :rtype: list """ + ifilter = {u"_": 0, u"_local": 1, u"_remote": 2} + args = dict( + filter=ifilter[u"_" + items_filter] + ) + + cmd = u"lisp_locator_set_dump" + err_msg = f"Failed to get LISP locator set on host {node['host']}" try: - with VatTerminal(node) as vat: - response = vat.vat_terminal_exec_cmd_from_template( - 'lisp/show_lisp_locator_set.vat', filter=items_filter) - return response[0] - except ValueError: - return [] + with PapiSocketExecutor(node) as papi_exec: + details = papi_exec.add(cmd, **args).get_details(err_msg) + data = list() + for locator in details: + data.append( + {u"ls_name": locator[u"ls_name"].rstrip(b'\0'), + u"ls_index": locator[u"ls_index"]} + ) + return data + except (ValueError, LookupError) as err: + logger.warn(f"Failed to get LISP locator set {err}") + return list() @staticmethod def vpp_show_lisp_eid_table(node): @@ -69,10 +88,35 @@ class LispUtil(object): :returns: Lisp eid table as python list. :rtype: list """ - - vat = VatExecutor() - vat.execute_script_json_out('lisp/show_lisp_eid_table.vat', node) - return JsonParser().parse_data(vat.get_script_stdout()) + cmd = u"lisp_eid_table_dump" + err_msg = f"Failed to get LISP eid table on host {node[u'host']}" + + with PapiSocketExecutor(node) as papi_exec: + details = papi_exec.add(cmd).get_details(err_msg) + + data = list() + for eid_details in details: + eid = u"Bad eid type" + if eid_details[u"eid_type"] == 0: + prefix = str(eid_details[u"eid_prefix_len"]) + eid = str(IPv4Address(eid_details[u"eid"][0:4])) + u"/" + \ + prefix + elif eid_details[u"eid_type"] == 1: + prefix = str(eid_details[u"eid_prefix_len"]) + eid = str(IPv6Address(eid_details[u"eid"])) + u"/" + prefix + elif eid_details[u"eid_type"] == 2: + eid = str(L2Util.bin_to_mac(eid_details[u"eid"][0:6])) + data.append( + { + u"action": eid_details[u"action"], + u"is_local": eid_details[u"is_local"], + u"eid": eid, + u"vni": eid_details[u"vni"], + u"ttl": eid_details[u"ttl"], + u"authoritative": eid_details[u"authoritative"] + } + ) + return data @staticmethod def vpp_show_lisp_map_resolver(node): @@ -83,10 +127,155 @@ class LispUtil(object): :returns: Lisp map resolver as python list. :rtype: list """ + cmd = u"lisp_map_resolver_dump" + err_msg = f"Failed to get LISP map resolver on host {node[u'host']}" + + with PapiSocketExecutor(node) as papi_exec: + details = papi_exec.add(cmd).get_details(err_msg) + + data = list() + for resolver in details: + address = u"Bad is_ipv6 flag" + if resolver[u"is_ipv6"] == 0: + address = str(IPv4Address(resolver[u"ip_address"][0:4])) + elif resolver[u"is_ipv6"] == 1: + address = str(IPv6Address(resolver[u"ip_address"])) + data.append({u"map resolver": address}) + return data + + @staticmethod + def vpp_show_lisp_map_register(node): + """Get LISP Map Register from VPP node. + + :param node: VPP node. + :type node: dict + :returns: LISP Map Register as python dict. + :rtype: dict + """ + cmd = u"show_lisp_map_register_state" + err_msg = f"Failed to get LISP map register state on host " \ + f"{node[u'host']}" + + with PapiSocketExecutor(node) as papi_exec: + reply = papi_exec.add(cmd).get_reply(err_msg) + + data = dict() + data[u"state"] = u"enabled" if reply[u"is_enabled"] else u"disabled" + logger.info(data) + return data + + @staticmethod + def vpp_show_lisp_map_request_mode(node): + """Get LISP Map Request mode from VPP node. + + :param node: VPP node. + :type node: dict + :returns: LISP Map Request mode as python dict. + :rtype: dict + """ + cmd = u"show_lisp_map_request_mode" + err_msg = f"Failed to get LISP map request mode on host {node[u'host']}" - vat = VatExecutor() - vat.execute_script_json_out('lisp/show_lisp_map_resolver.vat', node) - return JsonParser().parse_data(vat.get_script_stdout()) + with PapiSocketExecutor(node) as papi_exec: + reply = papi_exec.add(cmd).get_reply(err_msg) + + data = dict() + data[u"map_request_mode"] = u"src-dst" if reply[u"mode"] \ + else u"dst-only" + logger.info(data) + return data + + @staticmethod + def vpp_show_lisp_map_server(node): + """Get LISP Map Server from VPP node. + + :param node: VPP node. + :type node: dict + :returns: LISP Map Server as python list. + :rtype: list + """ + cmd = u"lisp_map_server_dump" + err_msg = f"Failed to get LISP map server on host {node[u'host']}" + + with PapiSocketExecutor(node) as papi_exec: + details = papi_exec.add(cmd).get_details(err_msg) + + data = list() + for server in details: + address = u"Bad is_ipv6 flag" + if server[u"is_ipv6"] == 0: + address = str(IPv4Address(server[u"ip_address"][0:4])) + elif server[u"is_ipv6"] == 1: + address = str(IPv6Address(server[u"ip_address"])) + data.append({u"map-server": address}) + logger.info(data) + return data + + @staticmethod + def vpp_show_lisp_petr_config(node): + """Get LISP PETR configuration from VPP node. + + :param node: VPP node. + :type node: dict + :returns: LISP PETR configuration as python dict. + :rtype: dict + """ + # Note: VAT is returning ipv6 address instead of ipv4 + cmd = u"show_lisp_use_petr" + err_msg = f"Failed to get LISP petr config on host {node[u'host']}" + + with PapiSocketExecutor(node) as papi_exec: + reply = papi_exec.add(cmd).get_reply(err_msg) + + data = dict() + data[u"status"] = u"enabled" if reply[u"status"] else u"disabled" + address = u"Bad is_ip4 flag" + if reply[u"is_ip4"] == 0: + address = str(IPv6Address(reply[u"address"])) + elif reply[u"is_ip4"] == 1: + address = str(IPv4Address(reply[u"address"][0:4])) + data[u"address"] = address + logger.info(data) + return data + + @staticmethod + def vpp_show_lisp_rloc_config(node): + """Get LISP RLOC configuration from VPP node. + + :param node: VPP node. + :type node: dict + :returns: LISP RLOC configuration as python dict. + :rtype: dict + """ + cmd = u"show_lisp_rloc_probe_state" + err_msg = f"Failed to get LISP rloc config on host {node[u'host']}" + + with PapiSocketExecutor(node) as papi_exec: + reply = papi_exec.add(cmd).get_reply(err_msg) + + data = dict() + data[u"state"] = u"enabled" if reply[u"is_enabled"] else u"disabled" + logger.info(data) + return data + + @staticmethod + def vpp_show_lisp_pitr(node): + """Get Lisp PITR feature config from VPP node. + + :param node: VPP node. + :type node: dict + :returns: Lisp PITR config data. + :rtype: dict + """ + cmd = u"show_lisp_pitr" + err_msg = f"Failed to get LISP pitr on host {node[u'host']}" + + with PapiSocketExecutor(node) as papi_exec: + reply = papi_exec.add(cmd).get_reply(err_msg) + + data = dict() + data[u"status"] = u"enabled" if reply[u"status"] else u"disabled" + return data @staticmethod def lisp_should_be_equal(lisp_val1, lisp_val2): @@ -97,19 +286,20 @@ class LispUtil(object): :type lisp_val1: list :type lisp_val2: list """ - len1 = len(lisp_val1) len2 = len(lisp_val2) + if len1 != len2: - raise RuntimeError('Values are not same. ' - 'Value 1 {} \n' - 'Value 2 {}.'.format(lisp_val1, - lisp_val2)) + raise RuntimeError( + f"Values are not same. Value 1 {lisp_val1} \n" + f"Value 2 {lisp_val2}." + ) for tmp in lisp_val1: if tmp not in lisp_val2: - raise RuntimeError('Value {} not found in vpp:\n' - '{}'.format(tmp, lisp_val2)) + raise RuntimeError( + f"Value {tmp} not found in vpp:\n{lisp_val2}" + ) def lisp_locator_s_should_be_equal(self, locator_set1, locator_set2): """Fail if the lisp values are not equal. @@ -119,11 +309,12 @@ class LispUtil(object): :type locator_set1: list :type locator_set2: list """ + locator_set_list = list() - locator_set_list = [] for item in locator_set1: if item not in locator_set_list: locator_set_list.append(item) + self.lisp_should_be_equal(locator_set_list, locator_set2) @staticmethod @@ -136,19 +327,18 @@ class LispUtil(object): :type node: dict :type locator_set_number: str :returns: list of lisp locator_set, list of lisp locator_set expected - from VAT. + from VAT. :rtype: tuple """ - topo = Topology() + locator_set_list = list() + locator_set_list_vat = list() - locator_set_list = [] - locator_set_list_vat = [] i = 0 for num in range(0, int(locator_set_number)): - locator_list = [] - for interface in node['interfaces'].values(): - link = interface.get('link') + locator_list = list() + for interface in list(node[u"interfaces"].values()): + link = interface.get(u"link") i += 1 if link is None: continue @@ -156,18 +346,24 @@ class LispUtil(object): if_name = topo.get_interface_by_link_name(node, link) sw_if_index = topo.get_interface_sw_index(node, if_name) if if_name is not None: - locator = {'locator-index': sw_if_index, - 'priority': i, - 'weight': i} + locator = { + u"locator-index": sw_if_index, + u"priority": i, + u"weight": i + } locator_list.append(locator) - l_name = 'ls{0}'.format(num) - locator_set = {'locator-set': l_name, - 'locator': locator_list} + l_name = f"ls{num}" + locator_set = { + u"locator-set": l_name, + u"locator": locator_list + } locator_set_list.append(locator_set) - locator_set_vat = {"ls_name": l_name, - "ls_index": num} + locator_set_vat = { + u"ls_name": l_name, + u"ls_index": num + } locator_set_list_vat.append(locator_set_vat) return locator_set_list, locator_set_list_vat @@ -182,18 +378,18 @@ class LispUtil(object): :type node: dict :type locator_set_number: str :returns: list of lisp locator_set, list of lisp locator_set expected - from VAT. + from VAT. :rtype: tuple """ - topo = Topology() - locator_set_list = [] - locator_set_list_vat = [] + locator_set_list = list() + locator_set_list_vat = list() + i = 0 for num in range(0, int(locator_set_number)): locator_list = [] - for interface in node['interfaces'].values(): - link = interface.get('link') + for interface in list(node[u"interfaces"].values()): + link = interface.get(u"link") i += 1 if link is None: continue @@ -201,17 +397,23 @@ class LispUtil(object): if_name = topo.get_interface_by_link_name(node, link) sw_if_index = topo.get_interface_sw_index(node, if_name) if if_name is not None: - l_name = 'ls{0}'.format(num) - locator = {'locator-index': sw_if_index, - 'priority': i, - 'weight': i} + l_name = f"ls{num}" + locator = { + u"locator-index": sw_if_index, + u"priority": i, + u"weight": i + } locator_list.append(locator) - locator_set = {'locator-set': l_name, - 'locator': locator_list} + locator_set = { + u"locator-set": l_name, + u"locator": locator_list + } locator_set_list.append(locator_set) - locator_set_vat = {"ls_name": l_name, - "ls_index": num} + locator_set_vat = { + u"ls_name": l_name, + u"ls_index": num + } locator_set_list_vat.append(locator_set_vat) return locator_set_list, locator_set_list_vat @@ -222,5 +424,4 @@ class LispUtil(object): :param lisp_params: Should be empty list. :type lisp_params: list """ - self.lisp_should_be_equal([], lisp_params)