-# Copyright (c) 2016 Cisco and/or its affiliates.
+# Copyright (c) 2019 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
"""Lisp utilities library."""
-from resources.libraries.python.parsers.JsonParser import JsonParser
-from resources.libraries.python.topology import Topology
-from resources.libraries.python.VatExecutor import VatExecutor
+from robot.api import logger
+from ipaddress import IPv4Address, IPv6Address
+from resources.libraries.python.topology import Topology
+from resources.libraries.python.PapiExecutor import PapiSocketExecutor
+from resources.libraries.python.L2Util import L2Util
class LispUtil(object):
"""Implements keywords for Lisp tests."""
pass
@staticmethod
- def vpp_show_lisp_locator_set(node):
+ def vpp_show_lisp_state(node):
+ """Get lisp state from VPP node.
+
+ :param node: VPP node.
+ :type node: dict
+ :returns: Lisp gpe state.
+ :rtype: dict
+ """
+ cmd = 'show_lisp_status'
+ err_msg = "Failed to get LISP status on host {host}".format(
+ host=node['host'])
+
+ with PapiSocketExecutor(node) as papi_exec:
+ reply = papi_exec.add(cmd).get_reply(err_msg)
+
+ data = dict()
+ data["feature_status"] = "enabled" if reply["feature_status"] else \
+ "disabled"
+ data["gpe_status"] = "enabled" if reply["gpe_status"] else "disabled"
+ return data
+
+ @staticmethod
+ def vpp_show_lisp_locator_set(node, items_filter):
"""Get lisp locator_set from VPP node.
:param node: VPP node.
+ :param items_filter: Filter which specifies which items should be
+ retrieved - local, remote, empty string = both.
:type node: dict
- :return: Lisp locator_set data as python list.
+ :type items_filter: str
+ :returns: Lisp locator_set data as python list.
:rtype: list
"""
- vat = VatExecutor()
- vat.execute_script_json_out('lisp/show_lisp_locator_set.vat', node)
- return JsonParser().parse_data(vat.get_script_stdout())
+ ifilter = {"_": 0, "_local": 1, "_remote": 2}
+ args = dict(filter=ifilter["_" + items_filter])
+
+ cmd = 'lisp_locator_set_dump'
+ err_msg = "Failed to get LISP locator set on host {host}".format(
+ host=node['host'])
+
+ try:
+ with PapiSocketExecutor(node) as papi_exec:
+ details = papi_exec.add(cmd, **args).get_details(err_msg)
+ data = []
+ for locator in details:
+ data.append({"ls_name": locator["ls_name"].rstrip('\x00'),
+ "ls_index": locator["ls_index"]})
+ return data
+ except (ValueError, LookupError):
+ return []
@staticmethod
- def vpp_show_lisp_local_eid_table(node):
- """Get lisp local eid table from VPP node.
+ def vpp_show_lisp_eid_table(node):
+ """Get lisp eid table from VPP node.
:param node: VPP node.
:type node: dict
- :return: Lisp eid table as python list.
+ :returns: Lisp eid table as python list.
:rtype: list
"""
- vat = VatExecutor()
- vat.execute_script_json_out('lisp/show_lisp_local_eid_table.vat', node)
- return JsonParser().parse_data(vat.get_script_stdout())
+ cmd = 'lisp_eid_table_dump'
+ err_msg = "Failed to get LISP eid table on host {host}".format(
+ host=node['host'])
+
+ with PapiSocketExecutor(node) as papi_exec:
+ details = papi_exec.add(cmd).get_details(err_msg)
+
+ data = []
+ for eid_details in details:
+ eid = 'Bad eid type'
+ if eid_details["eid_type"] == 0:
+ prefix = str(eid_details["eid_prefix_len"])
+ eid = str(IPv4Address(eid_details["eid"][0:4])) + "/" + prefix
+ elif eid_details["eid_type"] == 1:
+ prefix = str(eid_details["eid_prefix_len"])
+ eid = str(IPv6Address(eid_details["eid"])) + "/" + prefix
+ elif eid_details["eid_type"] == 2:
+ eid = str(L2Util.bin_to_mac(eid_details["eid"][0:6]))
+ data.append({"action": eid_details["action"],
+ "is_local": eid_details["is_local"],
+ "eid": eid,
+ "vni": eid_details["vni"],
+ "ttl": eid_details["ttl"],
+ "authoritative": eid_details["authoritative"]})
+ return data
@staticmethod
def vpp_show_lisp_map_resolver(node):
:param node: VPP node.
:type node: dict
- :return: Lisp map resolver as python list.
+ :returns: Lisp map resolver as python list.
:rtype: list
"""
- vat = VatExecutor()
- vat.execute_script_json_out('lisp/show_lisp_map_resolver.vat', node)
- return JsonParser().parse_data(vat.get_script_stdout())
+ cmd = 'lisp_map_resolver_dump'
+ err_msg = "Failed to get LISP map resolver on host {host}".format(
+ host=node['host'])
+
+ with PapiSocketExecutor(node) as papi_exec:
+ details = papi_exec.add(cmd).get_details(err_msg)
+
+ data = []
+ for resolver in details:
+ address = 'Bad is_ipv6 flag'
+ if resolver["is_ipv6"] == 0:
+ address = str(IPv4Address(resolver["ip_address"][0:4]))
+ elif resolver["is_ipv6"] == 1:
+ address = str(IPv6Address(resolver["ip_address"]))
+ data.append({"map resolver": address})
+ return data
+
+ @staticmethod
+ def vpp_show_lisp_map_register(node):
+ """Get LISP Map Register from VPP node.
+
+ :param node: VPP node.
+ :type node: dict
+ :returns: LISP Map Register as python dict.
+ :rtype: dict
+ """
+
+ cmd = 'show_lisp_map_register_state'
+ err_msg = "Failed to get LISP map register state on host {host}".format(
+ host=node['host'])
+
+ with PapiSocketExecutor(node) as papi_exec:
+ reply = papi_exec.add(cmd).get_reply(err_msg)
+
+ data = dict()
+ data["state"] = "enabled" if reply["is_enabled"] else "disabled"
+ logger.info(data)
+ return data
+
+ @staticmethod
+ def vpp_show_lisp_map_request_mode(node):
+ """Get LISP Map Request mode from VPP node.
+
+ :param node: VPP node.
+ :type node: dict
+ :returns: LISP Map Request mode as python dict.
+ :rtype: dict
+ """
+
+ cmd = 'show_lisp_map_request_mode'
+ err_msg = "Failed to get LISP map request mode on host {host}".format(
+ host=node['host'])
+
+ with PapiSocketExecutor(node) as papi_exec:
+ reply = papi_exec.add(cmd).get_reply(err_msg)
+
+ data = dict()
+ data["map_request_mode"] = "src-dst" if reply["mode"] else "dst-only"
+ logger.info(data)
+ return data
+
+ @staticmethod
+ def vpp_show_lisp_map_server(node):
+ """Get LISP Map Server from VPP node.
+
+ :param node: VPP node.
+ :type node: dict
+ :returns: LISP Map Server as python list.
+ :rtype: list
+ """
+
+ cmd = 'lisp_map_server_dump'
+ err_msg = "Failed to get LISP map server on host {host}".format(
+ host=node['host'])
+
+ with PapiSocketExecutor(node) as papi_exec:
+ details = papi_exec.add(cmd).get_details(err_msg)
+
+ data = []
+ for server in details:
+ address = 'Bad is_ipv6 flag'
+ if server["is_ipv6"] == 0:
+ address = str(IPv4Address(server["ip_address"][0:4]))
+ elif server["is_ipv6"] == 1:
+ address = str(IPv6Address(server["ip_address"]))
+ data.append({"map-server": address})
+ logger.info(data)
+ return data
+
+ @staticmethod
+ def vpp_show_lisp_petr_config(node):
+ """Get LISP PETR configuration from VPP node.
+
+ :param node: VPP node.
+ :type node: dict
+ :returns: LISP PETR configuration as python dict.
+ :rtype: dict
+ """
+
+# Note: VAT is returning ipv6 address instead of ipv4
+
+ cmd = 'show_lisp_use_petr'
+ err_msg = "Failed to get LISP petr config on host {host}".format(
+ host=node['host'])
+
+ with PapiSocketExecutor(node) as papi_exec:
+ reply = papi_exec.add(cmd).get_reply(err_msg)
+
+ data = dict()
+ data["status"] = "enabled" if reply["status"] else "disabled"
+ address = 'Bad is_ip4 flag'
+ if reply["is_ip4"] == 0:
+ address = str(IPv6Address(reply["address"]))
+ elif reply["is_ip4"] == 1:
+ address = str(IPv4Address(reply["address"][0:4]))
+ data["address"] = address
+ logger.info(data)
+ return data
+
+ @staticmethod
+ def vpp_show_lisp_rloc_config(node):
+ """Get LISP RLOC configuration from VPP node.
+
+ :param node: VPP node.
+ :type node: dict
+ :returns: LISP RLOC configuration as python dict.
+ :rtype: dict
+ """
+
+ cmd = 'show_lisp_rloc_probe_state'
+ err_msg = "Failed to get LISP rloc config on host {host}".format(
+ host=node['host'])
+
+ with PapiSocketExecutor(node) as papi_exec:
+ reply = papi_exec.add(cmd).get_reply(err_msg)
+
+ data = dict()
+ data["state"] = "enabled" if reply["is_enabled"] else "disabled"
+ logger.info(data)
+ return data
+
+ @staticmethod
+ def vpp_show_lisp_pitr(node):
+ """Get Lisp PITR feature config from VPP node.
+
+ :param node: VPP node.
+ :type node: dict
+ :returns: Lisp PITR config data.
+ :rtype: dict
+ """
+
+ cmd = 'show_lisp_pitr'
+ err_msg = "Failed to get LISP pitr on host {host}".format(
+ host=node['host'])
+
+ with PapiSocketExecutor(node) as papi_exec:
+ reply = papi_exec.add(cmd).get_reply(err_msg)
+
+ data = dict()
+ data["status"] = "enabled" if reply["status"] else "disabled"
+ return data
@staticmethod
def lisp_should_be_equal(lisp_val1, lisp_val2):
for tmp in lisp_val1:
if tmp not in lisp_val2:
- raise RuntimeError('Value {} is not find in vpp:\n'
+ raise RuntimeError('Value {} not found in vpp:\n'
'{}'.format(tmp, lisp_val2))
def lisp_locator_s_should_be_equal(self, locator_set1, locator_set2):
:param locator_set1: Generate lisp value.
:param locator_set2: Lisp value from VPP.
- :type locator_set1: dict
+ :type locator_set1: list
:type locator_set2: list
"""
- reset_list = []
locator_set_list = []
- for locator_set_type, item in locator_set1.iteritems():
- if locator_set_type == 'normal':
- self.lisp_should_be_equal(item, locator_set2)
- elif locator_set_type == 'reset':
- for locator_list in reversed(item):
- name = locator_list.get('locator-set')
- if name not in locator_set_list:
- reset_list.insert(0, locator_list)
- locator_set_list.append(name)
- self.lisp_should_be_equal(reset_list, locator_set2)
- else:
- raise ValueError('Unknown locator_set_type value: '
- '{}'.format(locator_set_type))
+ for item in locator_set1:
+ if item not in locator_set_list:
+ locator_set_list.append(item)
+ self.lisp_should_be_equal(locator_set_list, locator_set2)
@staticmethod
- def generate_lisp_locator_set_data(node, locator_set_number):
+ def generate_unique_lisp_locator_set_data(node, locator_set_number):
"""Generate a list of lisp locator_set we want set to VPP and
- then check if is set correct.
-
- "normal" type of data set locator_set just once.
+ then check if it is set correctly. All locator_sets are unique.
:param node: VPP node.
:param locator_set_number: Generate n locator_set.
:type node: dict
:type locator_set_number: str
- :return: dict of lisp locator_set.
- :rtype: dict
+ :returns: list of lisp locator_set, list of lisp locator_set expected
+ from VAT.
+ :rtype: tuple
"""
topo = Topology()
locator_set_list = []
+ locator_set_list_vat = []
i = 0
for num in range(0, int(locator_set_number)):
+ locator_list = []
for interface in node['interfaces'].values():
link = interface.get('link')
i += 1
if_name = topo.get_interface_by_link_name(node, link)
sw_if_index = topo.get_interface_sw_index(node, if_name)
if if_name is not None:
- l_name = 'ls{0}'.format(num)
- locator_set = {'locator-set': l_name,
- 'locator': sw_if_index,
- 'priority': i,
- 'weight': i}
- locator_set_list.append(locator_set)
+ locator = {'locator-index': sw_if_index,
+ 'priority': i,
+ 'weight': i}
+ locator_list.append(locator)
+
+ l_name = 'ls{0}'.format(num)
+ locator_set = {'locator-set': l_name,
+ 'locator': locator_list}
+ locator_set_list.append(locator_set)
+
+ locator_set_vat = {"ls_name": l_name,
+ "ls_index": num}
+ locator_set_list_vat.append(locator_set_vat)
- loc_type = {'normal': locator_set_list}
- return loc_type
+ return locator_set_list, locator_set_list_vat
@staticmethod
- def generate_lisp_locator_set_reset_data(node, locator_set_number):
+ def generate_duplicate_lisp_locator_set_data(node, locator_set_number):
"""Generate a list of lisp locator_set we want set to VPP and
- then check if is set correct.
-
- "reset" type of data set locator_set multiple times,
- use to test reset locator_set in vpp.
+ then check if it is set correctly. Some locator_sets are duplicated.
:param node: VPP node.
:param locator_set_number: Generate n locator_set.
:type node: dict
:type locator_set_number: str
- :return: dict of lisp locator_set.
- :rtype: dict
+ :returns: list of lisp locator_set, list of lisp locator_set expected
+ from VAT.
+ :rtype: tuple
"""
topo = Topology()
-
locator_set_list = []
+ locator_set_list_vat = []
+ i = 0
for num in range(0, int(locator_set_number)):
+ locator_list = []
for interface in node['interfaces'].values():
link = interface.get('link')
+ i += 1
if link is None:
continue
sw_if_index = topo.get_interface_sw_index(node, if_name)
if if_name is not None:
l_name = 'ls{0}'.format(num)
+ locator = {'locator-index': sw_if_index,
+ 'priority': i,
+ 'weight': i}
+ locator_list.append(locator)
locator_set = {'locator-set': l_name,
- 'locator': sw_if_index,
- 'priority': 1,
- 'weight': 1}
+ 'locator': locator_list}
locator_set_list.append(locator_set)
- loc_type = {'reset': locator_set_list}
- return loc_type
-
- @staticmethod
- def generate_lisp_local_eid_data(ipv4_num, ipv6_num):
- """Generate a list of lisp local eid we want set to VPP and
- then check if is set correct.
-
- :param ipv4_num: Generate n ipv4 eid address.
- :param ipv6_num: Generate n ipv6 eid address.
- :type ipv4_num: str
- :type ipv6_num: str
- :return: list of lisp local eid.
- :rtype: list
- """
-
- eid_table = []
- for num in range(0, int(ipv4_num)):
- addrr = '192.168.{}.1'.format(num)
- eid = {'eid address': addrr,
- 'eid prefix len': 24,
- 'locator-set': 'ls1'}
- eid_table.append(eid)
-
- for num in range(0, int(ipv6_num)):
- addrr = '10:{}::1'.format(num + 1)
- eid = {'eid address': addrr,
- 'eid prefix len': 32,
- 'locator-set': 'ls1'}
- eid_table.append(eid)
-
- return eid_table
-
- @staticmethod
- def generate_lisp_map_resolver_data(ipv4_num, ipv6_num):
- """Generate a list of lisp map resolvers we want set to VPP and
- then check if is set correct.
-
- :param ipv4_num: Generate n ipv4 map resolver address.
- :param ipv6_num: Generate n ipv6 map resolver address.
- :type ipv4_num: str
- :type ipv6_num: str
- :return: list of lisp map resolver.
- :rtype: list
- """
-
- map_resolver = []
- for i in range(0, int(ipv4_num)):
- addr = '192.169.{}.1'.format(i)
- resolver = {'map resolver': addr}
- map_resolver.append(resolver)
-
- for i in range(0, int(ipv6_num)):
- addr = '12:{}::1'.format(i + 1)
- resolver = {'map resolver': addr}
- map_resolver.append(resolver)
+ locator_set_vat = {"ls_name": l_name,
+ "ls_index": num}
+ locator_set_list_vat.append(locator_set_vat)
- return map_resolver
+ return locator_set_list, locator_set_list_vat
def lisp_is_empty(self, lisp_params):
"""Check if the input param are empty.