+ def ip_to_int(ip_str):
+ """Convert IP address from string format (e.g. 10.0.0.1) to integer
+ representation (167772161).
+
+ :param ip_str: IP address in string representation.
+ :type ip_str: str
+ :returns: Integer representation of IP address.
+ :rtype: int
+ """
+ return int(ip_address(unicode(ip_str)))
+
+ @staticmethod
+ def int_to_ip(ip_int):
+ """Convert IP address from integer representation (e.g. 167772161) to
+ string format (10.0.0.1).
+
+ :param ip_int: IP address in integer representation.
+ :type ip_int: int
+ :returns: String representation of IP address.
+ :rtype: str
+ """
+ return str(ip_address(ip_int))
+
+ @staticmethod
+ def vpp_get_interface_ip_addresses(node, interface, ip_version):
+ """Get list of IP addresses from an interface on a VPP node.
+
+ :param node: VPP node.
+ :param interface: Name of an interface on the VPP node.
+ :param ip_version: IP protocol version (ipv4 or ipv6).
+ :type node: dict
+ :type interface: str
+ :type ip_version: str
+ :returns: List of dictionaries, each containing IP address, subnet
+ prefix length and also the subnet mask for ipv4 addresses.
+ Note: A single interface may have multiple IP addresses assigned.
+ :rtype: list
+ """
+ sw_if_index = InterfaceUtil.get_interface_index(node, interface)
+
+ data = list()
+ if sw_if_index:
+ is_ipv6 = 1 if ip_version == 'ipv6' else 0
+
+ cmd = 'ip_address_dump'
+ cmd_reply = 'ip_address_details'
+ args = dict(sw_if_index=sw_if_index,
+ is_ipv6=is_ipv6)
+ err_msg = 'Failed to get L2FIB dump on host {host}'.format(
+ host=node['host'])
+
+ with PapiExecutor(node) as papi_exec:
+ papi_resp = papi_exec.add(cmd, **args).get_dump(err_msg)
+
+ for item in papi_resp.reply[0]['api_reply']:
+ item[cmd_reply]['ip'] = item[cmd_reply]['prefix'].split('/')[0]
+ item[cmd_reply]['prefix_length'] = int(
+ item[cmd_reply]['prefix'].split('/')[1])
+ item[cmd_reply]['is_ipv6'] = is_ipv6
+ item[cmd_reply]['netmask'] = \
+ str(IPv6Network(unicode('::/{pl}'.format(
+ pl=item[cmd_reply]['prefix_length']))).netmask) \
+ if is_ipv6 \
+ else str(IPv4Network(unicode('0.0.0.0/{pl}'.format(
+ pl=item[cmd_reply]['prefix_length']))).netmask)
+ data.append(item[cmd_reply])
+
+ return data
+
+ @staticmethod
+ def get_interface_vrf_table(node, interface, ip_version='ipv4'):
+ """Get vrf ID for the given interface.
+
+ :param node: VPP node.
+ :param interface: Name or sw_if_index of a specific interface.
+ :type node: dict
+ :param ip_version: IP protocol version (ipv4 or ipv6).
+ :type interface: str or int
+ :type ip_version: str
+ :returns: vrf ID of the specified interface.
+ :rtype: int
+ """
+ sw_if_index = InterfaceUtil.get_interface_index(node, interface)
+
+ is_ipv6 = 1 if ip_version == 'ipv6' else 0
+
+ cmd = 'sw_interface_get_table'
+ args = dict(sw_if_index=sw_if_index,
+ is_ipv6=is_ipv6)
+ err_msg = 'Failed to get VRF id assigned to interface {ifc}'.format(
+ ifc=interface)
+
+ with PapiExecutor(node) as papi_exec:
+ papi_resp = papi_exec.add(cmd, **args).get_replies(err_msg). \
+ verify_reply(err_msg=err_msg)
+
+ return papi_resp['vrf_id']
+
+ @staticmethod
+ def vpp_ip_source_check_setup(node, if_name):
+ """Setup Reverse Path Forwarding source check on interface.
+
+ :param node: VPP node.
+ :param if_name: Interface name to setup RPF source check.
+ :type node: dict
+ :type if_name: str
+ """
+ cmd = 'ip_source_check_interface_add_del'
+ args = dict(
+ sw_if_index=InterfaceUtil.get_interface_index(node, if_name),
+ is_add=1,
+ loose=0)
+ err_msg = 'Failed to enable source check on interface {ifc}'.format(
+ ifc=if_name)
+ with PapiExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_replies(err_msg). \
+ verify_reply(err_msg=err_msg)
+
+ @staticmethod
+ def vpp_ip_probe(node, interface, addr):