+ cmd = f"ip addr add {ip_addr}/{prefix} dev {interface}"
+
+ exec_cmd_no_error(node, cmd, timeout=5, sudo=True)
+
+ @staticmethod
+ def delete_linux_interface_ip(
+ node, interface, ip_addr, prefix_length, namespace=None):
+ """Delete IP address from interface in linux.
+
+ :param node: VPP/TG node.
+ :param interface: Interface in namespace.
+ :param ip_addr: IP to be deleted from interface.
+ :param prefix_length: IP prefix length.
+ :param namespace: Execute command in namespace. Optional
+ :type node: dict
+ :type interface: str
+ :type ip_addr: str
+ :type prefix_length: int
+ :type namespace: str
+ :raises RuntimeError: IP could not be deleted.
+ """
+ # TODO: Refactor command execution in namespaces into central
+ # methods (e.g. Namespace.exec_cmd_in_namespace)
+ if namespace is not None:
+ cmd = f"ip netns exec {namespace} ip addr del " \
+ f"{ip_addr}/{prefix_length} dev {interface}"
+ else:
+ cmd = f"ip addr del {ip_addr}/{prefix_length} dev {interface}"
+
+ exec_cmd_no_error(node, cmd, timeout=5, sudo=True)
+
+ @staticmethod
+ def linux_interface_has_ip(
+ node, interface, ip_addr, prefix_length, namespace=None):
+ """Return True if interface in linux has IP address.
+
+ :param node: VPP/TG node.
+ :param interface: Interface in namespace.
+ :param ip_addr: IP to be queried on interface.
+ :param prefix_length: IP prefix length.
+ :param namespace: Execute command in namespace. Optional
+ :type node: dict
+ :type interface: str
+ :type ip_addr: str
+ :type prefix_length: int
+ :type namespace: str
+ :rtype boolean
+ :raises RuntimeError: Request fails.
+ """
+ ip_addr_with_prefix = f"{ip_addr}/{prefix_length}"
+ if namespace is not None:
+ cmd = f"ip netns exec {namespace} ip addr show dev {interface}"
+ else:
+ cmd = f"ip addr show dev {interface}"
+
+ cmd += u" | grep 'inet ' | awk -e '{print $2}'"
+ cmd += f" | grep '{ip_addr_with_prefix}'"
+ _, stdout, _ = exec_cmd(node, cmd, timeout=5, sudo=True)
+
+ has_ip = stdout.rstrip()
+ return bool(has_ip == ip_addr_with_prefix)
+
+ @staticmethod
+ def add_linux_route(node, ip_addr, prefix, gateway, namespace=None):
+ """Add linux route in namespace.
+
+ :param node: Node where to execute command.
+ :param ip_addr: Route destination IP address.
+ :param prefix: IP prefix.
+ :param namespace: Execute command in namespace. Optional.
+ :param gateway: Gateway address.
+ :type node: dict
+ :type ip_addr: str
+ :type prefix: int
+ :type gateway: str
+ :type namespace: str
+ """
+ if namespace is not None:
+ cmd = f"ip netns exec {namespace} ip route add {ip_addr}/{prefix}" \
+ f" via {gateway}"
+ else:
+ cmd = f"ip route add {ip_addr}/{prefix} via {gateway}"
+
+ exec_cmd_no_error(node, cmd, sudo=True)
+
+ @staticmethod
+ def vpp_interface_set_ip_address(
+ node, interface, address, prefix_length=None):
+ """Set IP address to VPP interface.
+
+ :param node: VPP node.
+ :param interface: Interface name.
+ :param address: IP address.
+ :param prefix_length: Prefix length.
+ :type node: dict
+ :type interface: str
+ :type address: str
+ :type prefix_length: int
+ """
+ ip_addr = ip_address(address)
+
+ cmd = u"sw_interface_add_del_address"
+ args = dict(
+ sw_if_index=InterfaceUtil.get_interface_index(node, interface),
+ is_add=True,
+ del_all=False,
+ prefix=IPUtil.create_prefix_object(
+ ip_addr,
+ prefix_length if prefix_length else 128
+ if ip_addr.version == 6 else 32
+ )
+ )
+ err_msg = f"Failed to add IP address on interface {interface}"
+
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_reply(err_msg)
+
+ @staticmethod
+ def vpp_interface_set_ip_addresses(node, interface, ip_addr_list,
+ prefix_length=None):
+ """Set IP addresses to VPP interface.
+
+ :param node: VPP node.
+ :param interface: Interface name.
+ :param ip_addr_list: IP addresses.
+ :param prefix_length: Prefix length.
+ :type node: dict
+ :type interface: str
+ :type ip_addr_list: list
+ :type prefix_length: int
+ """
+ for ip_addr in ip_addr_list:
+ IPUtil.vpp_interface_set_ip_address(node, interface, ip_addr,
+ prefix_length)
+
+ @staticmethod
+ def vpp_add_ip_neighbor(node, iface_key, ip_addr, mac_address):
+ """Add IP neighbor on DUT node.
+
+ :param node: VPP node.
+ :param iface_key: Interface key.
+ :param ip_addr: IP address of the interface.
+ :param mac_address: MAC address of the interface.
+ :type node: dict
+ :type iface_key: str
+ :type ip_addr: str
+ :type mac_address: str
+ """
+ dst_ip = ip_address(ip_addr)
+
+ neighbor = dict(
+ sw_if_index=Topology.get_interface_sw_index(node, iface_key),
+ flags=0,
+ mac_address=str(mac_address),
+ ip_address=str(dst_ip)
+ )
+ cmd = u"ip_neighbor_add_del"
+ args = dict(
+ is_add=True,
+ neighbor=neighbor
+ )
+ err_msg = f"Failed to add IP neighbor on interface {iface_key}"
+
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_reply(err_msg)
+
+ @staticmethod
+ def create_prefix_object(ip_addr, addr_len):
+ """Create prefix object.
+
+ :param ip_addr: IPv4 or IPv6 address.
+ :param addr_len: Length of IP address.
+ :type ip_addr: IPv4Address or IPv6Address
+ :type addr_len: int
+ :returns: Prefix object.
+ :rtype: dict
+ """
+ addr = IPAddress.create_ip_address_object(ip_addr)
+
+ return dict(
+ len=int(addr_len),
+ address=addr
+ )
+
+ @staticmethod
+ def compose_vpp_route_structure(node, network, prefix_len, **kwargs):
+ """Create route object for ip_route_add_del api call.
+
+ :param node: VPP node.
+ :param network: Route destination network address.
+ :param prefix_len: Route destination network prefix length.
+ :param kwargs: Optional key-value arguments:
+
+ gateway: Route gateway address. (str)
+ interface: Route interface. (str)
+ vrf: VRF table ID. (int)
+ count: number of IP addresses to add starting from network IP (int)
+ local: The route is local with same prefix (increment is 1).
+ If None, then is not used. (bool)
+ lookup_vrf: VRF table ID for lookup. (int)
+ multipath: Enable multipath routing. (bool)
+ weight: Weight value for unequal cost multipath routing. (int)
+
+ :type node: dict
+ :type network: str
+ :type prefix_len: int
+ :type kwargs: dict
+ :returns: route parameter basic structure
+ :rtype: dict
+ """
+ interface = kwargs.get(u"interface", u"")
+ gateway = kwargs.get(u"gateway", u"")
+
+ net_addr = ip_address(network)
+
+ prefix = IPUtil.create_prefix_object(net_addr, prefix_len)
+
+ paths = list()
+ n_hop = dict(
+ address=IPAddress.union_addr(ip_address(gateway)) if gateway else 0,
+ via_label=MPLS_LABEL_INVALID,
+ obj_id=Constants.BITWISE_NON_ZERO
+ )
+ path = dict(
+ sw_if_index=InterfaceUtil.get_interface_index(node, interface)
+ if interface else Constants.BITWISE_NON_ZERO,
+ table_id=int(kwargs.get(u"lookup_vrf", 0)),
+ rpf_id=Constants.BITWISE_NON_ZERO,
+ weight=int(kwargs.get(u"weight", 1)),
+ preference=1,
+ type=getattr(
+ FibPathType, u"FIB_PATH_TYPE_LOCAL"
+ if kwargs.get(u"local", False)
+ else u"FIB_PATH_TYPE_NORMAL"
+ ).value,
+ flags=getattr(FibPathFlags, u"FIB_PATH_FLAG_NONE").value,
+ proto=getattr(
+ FibPathNhProto, u"FIB_PATH_NH_PROTO_IP6"
+ if net_addr.version == 6
+ else u"FIB_PATH_NH_PROTO_IP4"
+ ).value,
+ nh=n_hop,
+ n_labels=0,
+ label_stack=list(0 for _ in range(16))
+ )
+ paths.append(path)
+
+ route = dict(
+ table_id=int(kwargs.get(u"vrf", 0)),
+ prefix=prefix,
+ n_paths=len(paths),
+ paths=paths
+ )
+ return route
+
+ @staticmethod
+ def vpp_route_add(node, network, prefix_len, **kwargs):
+ """Add route to the VPP node.
+
+ :param node: VPP node.
+ :param network: Route destination network address.
+ :param prefix_len: Route destination network prefix length.
+ :param kwargs: Optional key-value arguments:
+
+ gateway: Route gateway address. (str)
+ interface: Route interface. (str)
+ vrf: VRF table ID. (int)
+ count: number of IP addresses to add starting from network IP (int)
+ local: The route is local with same prefix (increment is 1).
+ If None, then is not used. (bool)
+ lookup_vrf: VRF table ID for lookup. (int)
+ multipath: Enable multipath routing. (bool)
+ weight: Weight value for unequal cost multipath routing. (int)
+
+ :type node: dict
+ :type network: str
+ :type prefix_len: int
+ :type kwargs: dict
+ """
+ count = kwargs.get(u"count", 1)
+
+ if count > 100:
+ gateway = kwargs.get(u"gateway", '')
+ interface = kwargs.get(u"interface", '')
+ vrf = kwargs.get(u"vrf", None)
+ multipath = kwargs.get(u"multipath", False)
+
+ with VatTerminal(node, json_param=False) as vat:
+
+ vat.vat_terminal_exec_cmd_from_template(
+ u"vpp_route_add.vat",
+ network=network,
+ prefix_length=prefix_len,
+ via=f"via {gateway}" if gateway else u"",
+ sw_if_index=f"sw_if_index "
+ f"{InterfaceUtil.get_interface_index(node, interface)}"
+ if interface else u"",
+ vrf=f"vrf {vrf}" if vrf else u"",
+ count=f"count {count}" if count else u"",
+ multipath=u"multipath" if multipath else u""
+ )
+ return
+
+ net_addr = ip_address(network)
+ cmd = u"ip_route_add_del"
+ args = dict(
+ is_add=True,
+ is_multipath=kwargs.get(u"multipath", False),
+ route=None
+ )
+ err_msg = f"Failed to add route(s) on host {node[u'host']}"
+
+ with PapiSocketExecutor(node) as papi_exec:
+ for i in range(kwargs.get(u"count", 1)):
+ args[u"route"] = IPUtil.compose_vpp_route_structure(
+ node, net_addr + i, prefix_len, **kwargs
+ )
+ history = bool(not 1 < i < kwargs.get(u"count", 1))
+ papi_exec.add(cmd, history=history, **args)
+ papi_exec.get_replies(err_msg)
+
+ @staticmethod
+ def flush_ip_addresses(node, interface):
+ """Flush all IP addresses from specified interface.
+
+ :param node: VPP node.
+ :param interface: Interface name.
+ :type node: dict
+ :type interface: str
+ """
+ cmd = u"sw_interface_add_del_address"
+ args = dict(
+ sw_if_index=InterfaceUtil.get_interface_index(node, interface),
+ is_add=False,
+ del_all=True
+ )
+ err_msg = f"Failed to flush IP address on interface {interface}"
+
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_reply(err_msg)
+
+ @staticmethod
+ def add_fib_table(node, table_id, ipv6=False):
+ """Create new FIB table according to ID.
+
+ :param node: Node to add FIB on.
+ :param table_id: FIB table ID.
+ :param ipv6: Is this an IPv6 table
+ :type node: dict
+ :type table_id: int
+ :type ipv6: bool
+ """
+ cmd = u"ip_table_add_del"
+ table = dict(
+ table_id=int(table_id),
+ is_ip6=ipv6
+ )
+ args = dict(
+ table=table,
+ is_add=True
+ )
+ err_msg = f"Failed to add FIB table on host {node[u'host']}"
+
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_reply(err_msg)