X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2FLispUtil.py;h=114cd72202d5e8d8c726a496b585d295faebf0cf;hb=bab0b570345ceb6ffeaec9e47a50e62d7303387e;hp=157702ade0082ba7a89a8dccb33592814a6a68e7;hpb=721f39743c31003ccbdad3c27ffcc3145bfccf90;p=csit.git diff --git a/resources/libraries/python/LispUtil.py b/resources/libraries/python/LispUtil.py index 157702ade0..114cd72202 100644 --- a/resources/libraries/python/LispUtil.py +++ b/resources/libraries/python/LispUtil.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 Cisco and/or its affiliates. +# Copyright (c) 2019 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -13,10 +13,12 @@ """Lisp utilities library.""" -from resources.libraries.python.parsers.JsonParser import JsonParser -from resources.libraries.python.topology import Topology -from resources.libraries.python.VatExecutor import VatExecutor +from robot.api import logger +from ipaddress import IPv4Address, IPv6Address +from resources.libraries.python.topology import Topology +from resources.libraries.python.PapiExecutor import PapiSocketExecutor +from resources.libraries.python.L2Util import L2Util class LispUtil(object): """Implements keywords for Lisp tests.""" @@ -30,42 +32,88 @@ class LispUtil(object): :param node: VPP node. :type node: dict - :return: Lisp gpe state. - :rtype: list + :returns: Lisp gpe state. + :rtype: dict """ + cmd = 'show_lisp_status' + err_msg = "Failed to get LISP status on host {host}".format( + host=node['host']) - vat = VatExecutor() - vat.execute_script_json_out('lisp/show_lisp_status.vat', - node) - return JsonParser().parse_data(vat.get_script_stdout()) + with PapiSocketExecutor(node) as papi_exec: + reply = papi_exec.add(cmd).get_reply(err_msg) + + data = dict() + data["feature_status"] = "enabled" if reply["feature_status"] else \ + "disabled" + data["gpe_status"] = "enabled" if reply["gpe_status"] else "disabled" + return data @staticmethod - def vpp_show_lisp_locator_set(node): + def vpp_show_lisp_locator_set(node, items_filter): """Get lisp locator_set from VPP node. :param node: VPP node. + :param items_filter: Filter which specifies which items should be + retrieved - local, remote, empty string = both. :type node: dict - :return: Lisp locator_set data as python list. + :type items_filter: str + :returns: Lisp locator_set data as python list. :rtype: list """ - vat = VatExecutor() - vat.execute_script_json_out('lisp/show_lisp_locator_set.vat', node) - return JsonParser().parse_data(vat.get_script_stdout()) + ifilter = {"_": 0, "_local": 1, "_remote": 2} + args = dict(filter=ifilter["_" + items_filter]) + + cmd = 'lisp_locator_set_dump' + err_msg = "Failed to get LISP locator set on host {host}".format( + host=node['host']) + + try: + with PapiSocketExecutor(node) as papi_exec: + details = papi_exec.add(cmd, **args).get_details(err_msg) + data = [] + for locator in details: + data.append({"ls_name": locator["ls_name"].rstrip('\x00'), + "ls_index": locator["ls_index"]}) + return data + except (ValueError, LookupError): + return [] @staticmethod - def vpp_show_lisp_local_eid_table(node): - """Get lisp local eid table from VPP node. + def vpp_show_lisp_eid_table(node): + """Get lisp eid table from VPP node. :param node: VPP node. :type node: dict - :return: Lisp eid table as python list. + :returns: Lisp eid table as python list. :rtype: list """ - vat = VatExecutor() - vat.execute_script_json_out('lisp/show_lisp_local_eid_table.vat', node) - return JsonParser().parse_data(vat.get_script_stdout()) + cmd = 'lisp_eid_table_dump' + err_msg = "Failed to get LISP eid table on host {host}".format( + host=node['host']) + + with PapiSocketExecutor(node) as papi_exec: + details = papi_exec.add(cmd).get_details(err_msg) + + data = [] + for eid_details in details: + eid = 'Bad eid type' + if eid_details["eid_type"] == 0: + prefix = str(eid_details["eid_prefix_len"]) + eid = str(IPv4Address(eid_details["eid"][0:4])) + "/" + prefix + elif eid_details["eid_type"] == 1: + prefix = str(eid_details["eid_prefix_len"]) + eid = str(IPv6Address(eid_details["eid"])) + "/" + prefix + elif eid_details["eid_type"] == 2: + eid = str(L2Util.bin_to_mac(eid_details["eid"][0:6])) + data.append({"action": eid_details["action"], + "is_local": eid_details["is_local"], + "eid": eid, + "vni": eid_details["vni"], + "ttl": eid_details["ttl"], + "authoritative": eid_details["authoritative"]}) + return data @staticmethod def vpp_show_lisp_map_resolver(node): @@ -73,13 +121,171 @@ class LispUtil(object): :param node: VPP node. :type node: dict - :return: Lisp map resolver as python list. + :returns: Lisp map resolver as python list. + :rtype: list + """ + + cmd = 'lisp_map_resolver_dump' + err_msg = "Failed to get LISP map resolver on host {host}".format( + host=node['host']) + + with PapiSocketExecutor(node) as papi_exec: + details = papi_exec.add(cmd).get_details(err_msg) + + data = [] + for resolver in details: + address = 'Bad is_ipv6 flag' + if resolver["is_ipv6"] == 0: + address = str(IPv4Address(resolver["ip_address"][0:4])) + elif resolver["is_ipv6"] == 1: + address = str(IPv6Address(resolver["ip_address"])) + data.append({"map resolver": address}) + return data + + @staticmethod + def vpp_show_lisp_map_register(node): + """Get LISP Map Register from VPP node. + + :param node: VPP node. + :type node: dict + :returns: LISP Map Register as python dict. + :rtype: dict + """ + + cmd = 'show_lisp_map_register_state' + err_msg = "Failed to get LISP map register state on host {host}".format( + host=node['host']) + + with PapiSocketExecutor(node) as papi_exec: + reply = papi_exec.add(cmd).get_reply(err_msg) + + data = dict() + data["state"] = "enabled" if reply["is_enabled"] else "disabled" + logger.info(data) + return data + + @staticmethod + def vpp_show_lisp_map_request_mode(node): + """Get LISP Map Request mode from VPP node. + + :param node: VPP node. + :type node: dict + :returns: LISP Map Request mode as python dict. + :rtype: dict + """ + + cmd = 'show_lisp_map_request_mode' + err_msg = "Failed to get LISP map request mode on host {host}".format( + host=node['host']) + + with PapiSocketExecutor(node) as papi_exec: + reply = papi_exec.add(cmd).get_reply(err_msg) + + data = dict() + data["map_request_mode"] = "src-dst" if reply["mode"] else "dst-only" + logger.info(data) + return data + + @staticmethod + def vpp_show_lisp_map_server(node): + """Get LISP Map Server from VPP node. + + :param node: VPP node. + :type node: dict + :returns: LISP Map Server as python list. :rtype: list """ - vat = VatExecutor() - vat.execute_script_json_out('lisp/show_lisp_map_resolver.vat', node) - return JsonParser().parse_data(vat.get_script_stdout()) + cmd = 'lisp_map_server_dump' + err_msg = "Failed to get LISP map server on host {host}".format( + host=node['host']) + + with PapiSocketExecutor(node) as papi_exec: + details = papi_exec.add(cmd).get_details(err_msg) + + data = [] + for server in details: + address = 'Bad is_ipv6 flag' + if server["is_ipv6"] == 0: + address = str(IPv4Address(server["ip_address"][0:4])) + elif server["is_ipv6"] == 1: + address = str(IPv6Address(server["ip_address"])) + data.append({"map-server": address}) + logger.info(data) + return data + + @staticmethod + def vpp_show_lisp_petr_config(node): + """Get LISP PETR configuration from VPP node. + + :param node: VPP node. + :type node: dict + :returns: LISP PETR configuration as python dict. + :rtype: dict + """ + +# Note: VAT is returning ipv6 address instead of ipv4 + + cmd = 'show_lisp_use_petr' + err_msg = "Failed to get LISP petr config on host {host}".format( + host=node['host']) + + with PapiSocketExecutor(node) as papi_exec: + reply = papi_exec.add(cmd).get_reply(err_msg) + + data = dict() + data["status"] = "enabled" if reply["status"] else "disabled" + address = 'Bad is_ip4 flag' + if reply["is_ip4"] == 0: + address = str(IPv6Address(reply["address"])) + elif reply["is_ip4"] == 1: + address = str(IPv4Address(reply["address"][0:4])) + data["address"] = address + logger.info(data) + return data + + @staticmethod + def vpp_show_lisp_rloc_config(node): + """Get LISP RLOC configuration from VPP node. + + :param node: VPP node. + :type node: dict + :returns: LISP RLOC configuration as python dict. + :rtype: dict + """ + + cmd = 'show_lisp_rloc_probe_state' + err_msg = "Failed to get LISP rloc config on host {host}".format( + host=node['host']) + + with PapiSocketExecutor(node) as papi_exec: + reply = papi_exec.add(cmd).get_reply(err_msg) + + data = dict() + data["state"] = "enabled" if reply["is_enabled"] else "disabled" + logger.info(data) + return data + + @staticmethod + def vpp_show_lisp_pitr(node): + """Get Lisp PITR feature config from VPP node. + + :param node: VPP node. + :type node: dict + :returns: Lisp PITR config data. + :rtype: dict + """ + + cmd = 'show_lisp_pitr' + err_msg = "Failed to get LISP pitr on host {host}".format( + host=node['host']) + + with PapiSocketExecutor(node) as papi_exec: + reply = papi_exec.add(cmd).get_reply(err_msg) + + data = dict() + data["status"] = "enabled" if reply["status"] else "disabled" + return data @staticmethod def lisp_should_be_equal(lisp_val1, lisp_val2): @@ -113,23 +319,10 @@ class LispUtil(object): :type locator_set2: list """ - # Remove duplicate value which is not set in vpp node. locator_set_list = [] - tmp_list = list(locator_set1) - while len(tmp_list): - locator_set = tmp_list.pop(0) - locator_set_name = locator_set.get('locator-set') - for tmp_loc_set in tmp_list: - tmp_loc_set_name = tmp_loc_set.get('locator-set') - if locator_set_name == tmp_loc_set_name: - locator_set = tmp_loc_set - tmp_list.remove(tmp_loc_set) - locator_set_list.append(locator_set) - - for locator_set in locator_set2: - if 'locator-set-index' in locator_set: - del locator_set['locator-set-index'] - + for item in locator_set1: + if item not in locator_set_list: + locator_set_list.append(item) self.lisp_should_be_equal(locator_set_list, locator_set2) @staticmethod @@ -141,13 +334,15 @@ class LispUtil(object): :param locator_set_number: Generate n locator_set. :type node: dict :type locator_set_number: str - :return: list of lisp locator_set. - :rtype: list + :returns: list of lisp locator_set, list of lisp locator_set expected + from VAT. + :rtype: tuple """ topo = Topology() locator_set_list = [] + locator_set_list_vat = [] i = 0 for num in range(0, int(locator_set_number)): locator_list = [] @@ -170,7 +365,11 @@ class LispUtil(object): 'locator': locator_list} locator_set_list.append(locator_set) - return locator_set_list + locator_set_vat = {"ls_name": l_name, + "ls_index": num} + locator_set_list_vat.append(locator_set_vat) + + return locator_set_list, locator_set_list_vat @staticmethod def generate_duplicate_lisp_locator_set_data(node, locator_set_number): @@ -181,12 +380,14 @@ class LispUtil(object): :param locator_set_number: Generate n locator_set. :type node: dict :type locator_set_number: str - :return: list of lisp locator_set. - :rtype: list + :returns: list of lisp locator_set, list of lisp locator_set expected + from VAT. + :rtype: tuple """ topo = Topology() locator_set_list = [] + locator_set_list_vat = [] i = 0 for num in range(0, int(locator_set_number)): locator_list = [] @@ -208,7 +409,11 @@ class LispUtil(object): 'locator': locator_list} locator_set_list.append(locator_set) - return locator_set_list + locator_set_vat = {"ls_name": l_name, + "ls_index": num} + locator_set_list_vat.append(locator_set_vat) + + return locator_set_list, locator_set_list_vat def lisp_is_empty(self, lisp_params): """Check if the input param are empty.