-# Copyright (c) 2019 Cisco and/or its affiliates.
+# Copyright (c) 2020 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# limitations under the License.
"""Common IP utilities library."""
-
import re
from enum import IntEnum
from resources.libraries.python.Constants import Constants
from resources.libraries.python.InterfaceUtil import InterfaceUtil
+from resources.libraries.python.IPAddress import IPAddress
from resources.libraries.python.PapiExecutor import PapiSocketExecutor
from resources.libraries.python.ssh import exec_cmd_no_error, exec_cmd
from resources.libraries.python.topology import Topology
from resources.libraries.python.VatExecutor import VatTerminal
+from resources.libraries.python.Namespaces import Namespaces
# from vpp/src/vnet/vnet/mpls/mpls_types.h
MPLS_LABEL_INVALID = MPLS_IETF_MAX_LABEL + 1
-class AddressFamily(IntEnum):
- """IP address family."""
- ADDRESS_IP4 = 0
- ADDRESS_IP6 = 1
-
-
class FibPathType(IntEnum):
"""FIB path types."""
FIB_PATH_TYPE_NORMAL = 0
class FibPathFlags(IntEnum):
"""FIB path flags."""
FIB_PATH_FLAG_NONE = 0
- FIB_PATH_FLAG_RESOLVE_VIA_ATTACHED = 1 # pylint: disable=invalid-name
+ # TODO: Name too long for pylint, fix in VPP.
+ FIB_PATH_FLAG_RESOLVE_VIA_ATTACHED = 1
FIB_PATH_FLAG_RESOLVE_VIA_HOST = 2
if not sw_if_index:
return list()
- is_ipv6 = 1 if ip_version == u"ipv6" else 0
-
cmd = u"ip_address_dump"
args = dict(
sw_if_index=sw_if_index,
- is_ipv6=is_ipv6
+ is_ipv6=bool(ip_version == u"ipv6")
)
err_msg = f"Failed to get L2FIB dump on host {node[u'host']}"
raise AssertionError(f"IP addresses are not equal: {ip1} != {ip2}")
@staticmethod
- def setup_network_namespace(
- node, namespace_name, interface_name, ip_addr, prefix):
+ def setup_network_namespace(node, namespace_name, interface_name,
+ ip_addr_list, prefix_length):
"""Setup namespace on given node and attach interface and IP to
this namespace. Applicable also on TG node.
:param node: VPP node.
:param namespace_name: Namespace name.
:param interface_name: Interface name.
- :param ip_addr: IP address of namespace's interface.
- :param prefix: IP address prefix length.
+ :param ip_addr_list: List of IP addresses of namespace's interface.
+ :param prefix_length: IP address prefix length.
:type node: dict
:type namespace_name: str
:type interface_name: str
- :type ip_addr: str
- :type prefix: int
+ :type ip_addr_list: list
+ :type prefix_length: int
"""
- cmd = f"ip netns add {namespace_name}"
- exec_cmd_no_error(node, cmd, sudo=True)
+ Namespaces.create_namespace(node, namespace_name)
- cmd = f"ip link set dev {interface_name} up netns {namespace_name}"
+ cmd = f"ip netns exec {namespace_name} ip link set {interface_name} up"
exec_cmd_no_error(node, cmd, sudo=True)
- cmd = f"ip netns exec {namespace_name} ip addr add {ip_addr}/{prefix}" \
- f" dev {interface_name}"
- exec_cmd_no_error(node, cmd, sudo=True)
+ for ip_addr in ip_addr_list:
+ cmd = f"ip netns exec {namespace_name} ip addr add " \
+ f"{ip_addr}/{prefix_length} dev {interface_name}"
+ exec_cmd_no_error(node, cmd, sudo=True)
@staticmethod
def linux_enable_forwarding(node, ip_ver=u"ipv4"):
exec_cmd_no_error(node, cmd, timeout=5, sudo=True)
+ @staticmethod
+ def delete_linux_interface_ip(
+ node, interface, ip_addr, prefix_length, namespace=None):
+ """Delete IP address from interface in linux.
+
+ :param node: VPP/TG node.
+ :param interface: Interface in namespace.
+ :param ip_addr: IP to be deleted from interface.
+ :param prefix_length: IP prefix length.
+ :param namespace: Execute command in namespace. Optional
+ :type node: dict
+ :type interface: str
+ :type ip_addr: str
+ :type prefix_length: int
+ :type namespace: str
+ :raises RuntimeError: IP could not be deleted.
+ """
+ # TODO: Refactor command execution in namespaces into central
+ # methods (e.g. Namespace.exec_cmd_in_namespace)
+ if namespace is not None:
+ cmd = f"ip netns exec {namespace} ip addr del " \
+ f"{ip_addr}/{prefix_length} dev {interface}"
+ else:
+ cmd = f"ip addr del {ip_addr}/{prefix_length} dev {interface}"
+
+ exec_cmd_no_error(node, cmd, timeout=5, sudo=True)
+
+ @staticmethod
+ def linux_interface_has_ip(
+ node, interface, ip_addr, prefix_length, namespace=None):
+ """Return True if interface in linux has IP address.
+
+ :param node: VPP/TG node.
+ :param interface: Interface in namespace.
+ :param ip_addr: IP to be queried on interface.
+ :param prefix_length: IP prefix length.
+ :param namespace: Execute command in namespace. Optional
+ :type node: dict
+ :type interface: str
+ :type ip_addr: str
+ :type prefix_length: int
+ :type namespace: str
+ :rtype boolean
+ :raises RuntimeError: Request fails.
+ """
+ ip_addr_with_prefix = f"{ip_addr}/{prefix_length}"
+ if namespace is not None:
+ cmd = f"ip netns exec {namespace} ip addr show dev {interface}"
+ else:
+ cmd = f"ip addr show dev {interface}"
+
+ cmd += u" | grep 'inet ' | awk -e '{print $2}'"
+ cmd += f" | grep '{ip_addr_with_prefix}'"
+ _, stdout, _ = exec_cmd(node, cmd, timeout=5, sudo=True)
+
+ has_ip = stdout.rstrip()
+ return bool(has_ip == ip_addr_with_prefix)
+
@staticmethod
def add_linux_route(node, ip_addr, prefix, gateway, namespace=None):
"""Add linux route in namespace.
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
+ @staticmethod
+ def vpp_interface_set_ip_addresses(node, interface, ip_addr_list,
+ prefix_length=None):
+ """Set IP addresses to VPP interface.
+
+ :param node: VPP node.
+ :param interface: Interface name.
+ :param ip_addr_list: IP addresses.
+ :param prefix_length: Prefix length.
+ :type node: dict
+ :type interface: str
+ :type ip_addr_list: list
+ :type prefix_length: int
+ """
+ for ip_addr in ip_addr_list:
+ IPUtil.vpp_interface_set_ip_address(node, interface, ip_addr,
+ prefix_length)
+
@staticmethod
def vpp_add_ip_neighbor(node, iface_key, ip_addr, mac_address):
"""Add IP neighbor on DUT node.
)
cmd = u"ip_neighbor_add_del"
args = dict(
- is_add=1,
+ is_add=True,
neighbor=neighbor
)
err_msg = f"Failed to add IP neighbor on interface {iface_key}"
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
- @staticmethod
- def union_addr(ip_addr):
- """Creates union IP address.
-
- :param ip_addr: IPv4 or IPv6 address.
- :type ip_addr: IPv4Address or IPv6Address
- :returns: Union IP address.
- :rtype: dict
- """
- return dict(ip6=ip_addr.packed) if ip_addr.version == 6 \
- else dict(ip4=ip_addr.packed)
-
- @staticmethod
- def create_ip_address_object(ip_addr):
- """Create IP address object.
-
- :param ip_addr: IPv4 or IPv6 address
- :type ip_addr: IPv4Address or IPv6Address
- :returns: IP address object.
- :rtype: dict
- """
- return dict(
- af=getattr(
- AddressFamily, u"ADDRESS_IP6" if ip_addr.version == 6
- else u"ADDRESS_IP4"
- ).value,
- un=IPUtil.union_addr(ip_addr)
- )
-
@staticmethod
def create_prefix_object(ip_addr, addr_len):
"""Create prefix object.
:param ip_addr: IPv4 or IPv6 address.
- :para, addr_len: Length of IP address.
+ :param addr_len: Length of IP address.
:type ip_addr: IPv4Address or IPv6Address
:type addr_len: int
:returns: Prefix object.
:rtype: dict
"""
- addr = IPUtil.create_ip_address_object(ip_addr)
+ addr = IPAddress.create_ip_address_object(ip_addr)
return dict(
len=int(addr_len),
paths = list()
n_hop = dict(
- address=IPUtil.union_addr(ip_address(gateway)) if gateway else 0,
+ address=IPAddress.union_addr(ip_address(gateway)) if gateway else 0,
via_label=MPLS_LABEL_INVALID,
obj_id=Constants.BITWISE_NON_ZERO
)
net_addr = ip_address(network)
cmd = u"ip_route_add_del"
args = dict(
- is_add=1,
- is_multipath=int(kwargs.get(u"multipath", False)),
+ is_add=True,
+ is_multipath=kwargs.get(u"multipath", False),
route=None
)
err_msg = f"Failed to add route(s) on host {node[u'host']}"
cmd = u"ip_table_add_del"
table = dict(
table_id=int(table_id),
- is_ip6=int(ipv6)
+ is_ip6=ipv6
)
args = dict(
table=table,
- is_add=1
+ is_add=True
)
err_msg = f"Failed to add FIB table on host {node[u'host']}"