from abc import abstractmethod, ABCMeta
import socket
-from logging import info
from util import Host
@property
def remote_mac(self):
- """MAC-address of the remote interface "connected" to this interface."""
+ """MAC-address of the remote interface "connected" to this interface"""
return self._remote_hosts[0].mac
@property
def host_by_mac(self, mac):
"""
- :param ip: MAC address to find host by.
+ :param mac: MAC address to find host by.
:return: Host object assigned to interface.
"""
return self._hosts_by_mac[mac]
self._hosts_by_mac = {}
self._hosts_by_ip4 = {}
self._hosts_by_ip6 = {}
- for i in range(2, count+2): # 0: network address, 1: local vpp address
+ for i in range(
+ 2, count + 2): # 0: network address, 1: local vpp address
mac = "02:%02x:00:00:ff:%02x" % (self.sw_if_index, i)
ip4 = "172.16.%u.%u" % (self.sw_if_index, i)
- ip6 = "fd01:%04x::%04x" % (self.sw_if_index, i)
+ ip6 = "fd01:%x::%x" % (self.sw_if_index, i)
host = Host(mac, ip4, ip6)
self._remote_hosts.append(host)
self._hosts_by_mac[mac] = host
self._hosts_by_ip4[ip4] = host
self._hosts_by_ip6[ip6] = host
- def post_init_setup(self):
- """Additional setup run after creating an interface object."""
+ @abstractmethod
+ def __init__(self, test):
+ self._test = test
+
+ self._remote_hosts = []
+ self._hosts_by_mac = {}
+ self._hosts_by_ip4 = {}
+ self._hosts_by_ip6 = {}
self.generate_remote_hosts()
self._local_ip4 = "172.16.%u.1" % self.sw_if_index
self._local_ip4n = socket.inet_pton(socket.AF_INET, self.local_ip4)
+ self.local_ip4_prefix_len = 24
+ self.has_ip4_config = False
+ self.ip4_table_id = 0
- self._local_ip6 = "fd01:%04x::1" % self.sw_if_index
+ self._local_ip6 = "fd01:%x::1" % self.sw_if_index
self._local_ip6n = socket.inet_pton(socket.AF_INET6, self.local_ip6)
+ self.local_ip6_prefix_len = 64
+ self.has_ip6_config = False
+ self.ip6_table_id = 0
r = self.test.vapi.sw_interface_dump()
for intf in r:
if intf.sw_if_index == self.sw_if_index:
self._name = intf.interface_name.split(b'\0', 1)[0]
- self._local_mac = ':'.join(intf.l2_address.encode('hex')[i:i + 2]
- for i in range(0, 12, 2))
+ self._local_mac = \
+ ':'.join(intf.l2_address.encode('hex')[i:i + 2]
+ for i in range(0, 12, 2))
self._dump = intf
break
else:
"in interface dump %s" %
(self.sw_if_index, repr(r)))
- @abstractmethod
- def __init__(self, test, index):
- self._test = test
- self.post_init_setup()
- info("New %s, MAC=%s, remote_ip4=%s, local_ip4=%s" %
- (self.__name__, self.remote_mac, self.remote_ip4, self.local_ip4))
-
def config_ip4(self):
"""Configure IPv4 address on the VPP interface."""
- addr = self.local_ip4n
- addr_len = 24
self.test.vapi.sw_interface_add_del_address(
- self.sw_if_index, addr, addr_len)
-
- def configure_ipv4_neighbors(self):
- """For every remote host assign neighbor's MAC to IPv4 addresses."""
+ self.sw_if_index, self.local_ip4n, self.local_ip4_prefix_len)
+ self.has_ip4_config = True
+
+ def unconfig_ip4(self):
+ """Remove IPv4 address on the VPP interface."""
+ try:
+ if self.has_ip4_config:
+ self.test.vapi.sw_interface_add_del_address(
+ self.sw_if_index,
+ self.local_ip4n,
+ self.local_ip4_prefix_len,
+ is_add=0)
+ except AttributeError:
+ self.has_ip4_config = False
+ self.has_ip4_config = False
+
+ def configure_ipv4_neighbors(self, vrf_id=0):
+ """For every remote host assign neighbor's MAC to IPv4 addresses.
+
+ :param vrf_id: The FIB table / VRF ID. (Default value = 0)
+ """
for host in self._remote_hosts:
macn = host.mac.replace(":", "").decode('hex')
ipn = host.ip4n
- self.test.vapi.ip_neighbor_add_del(self.sw_if_index, macn, ipn)
+ self.test.vapi.ip_neighbor_add_del(
+ self.sw_if_index, macn, ipn, vrf_id)
def config_ip6(self):
"""Configure IPv6 address on the VPP interface."""
- addr = self._local_ip6n
- addr_len = 64
self.test.vapi.sw_interface_add_del_address(
- self.sw_if_index, addr, addr_len, is_ipv6=1)
+ self.sw_if_index, self._local_ip6n, self.local_ip6_prefix_len,
+ is_ipv6=1)
+ self.has_ip6_config = True
+
+ def unconfig_ip6(self):
+ """Remove IPv6 address on the VPP interface."""
+ try:
+ if self.has_ip6_config:
+ self.test.vapi.sw_interface_add_del_address(
+ self.sw_if_index,
+ self.local_ip6n,
+ self.local_ip6_prefix_len,
+ is_ipv6=1, is_add=0)
+ except AttributeError:
+ self.has_ip6_config = False
+ self.has_ip6_config = False
+
+ def configure_ipv6_neighbors(self):
+ """For every remote host assign neighbor's MAC to IPv6 address."""
+ for host in self._remote_hosts:
+ macn = host.mac.replace(":", "").decode('hex')
+ self.test.vapi.ip_neighbor_add_del(
+ self.sw_if_index, macn, host.ip6n, is_ipv6=1)
+
+ def unconfig(self):
+ """Unconfigure IPv6 and IPv4 address on the VPP interface."""
+ self.unconfig_ip4()
+ self.unconfig_ip6()
def set_table_ip4(self, table_id):
"""Set the interface in a IPv4 Table.
- .. note:: Must be called before configuring IP4 addresses."""
+ .. note:: Must be called before configuring IP4 addresses.
+ """
+ self.ip4_table_id = table_id
self.test.vapi.sw_interface_set_table(
- self.sw_if_index, 0, table_id)
+ self.sw_if_index, 0, self.ip4_table_id)
def set_table_ip6(self, table_id):
"""Set the interface in a IPv6 Table.
.. note:: Must be called before configuring IP6 addresses.
"""
+ self.ip6_table_id = table_id
self.test.vapi.sw_interface_set_table(
- self.sw_if_index, 1, table_id)
+ self.sw_if_index, 1, self.ip6_table_id)
def disable_ipv6_ra(self):
"""Configure IPv6 RA suppress on the VPP interface."""
self.test.vapi.sw_interface_ra_suppress(self.sw_if_index)
+ def ip6_ra_config(self, suppress=0, send_unicast=0):
+ """Configure IPv6 RA suppress on the VPP interface."""
+ self.test.vapi.ip6_sw_interface_ra_config(self.sw_if_index,
+ suppress,
+ send_unicast)
+
def admin_up(self):
"""Put interface ADMIN-UP."""
- self.test.vapi.sw_interface_set_flags(self.sw_if_index, admin_up_down=1)
+ self.test.vapi.sw_interface_set_flags(self.sw_if_index,
+ admin_up_down=1)
+
+ def admin_down(self):
+ """Put interface ADMIN-down."""
+ self.test.vapi.sw_interface_set_flags(self.sw_if_index,
+ admin_up_down=0)
+
+ def ip6_enable(self):
+ """IPv6 Enable interface"""
+ self.test.vapi.ip6_sw_interface_enable_disable(self.sw_if_index,
+ enable=1)
+
+ def ip6_disable(self):
+ """Put interface ADMIN-DOWN."""
+ self.test.vapi.ip6_sw_interface_enable_disable(self.sw_if_index,
+ enable=0)
def add_sub_if(self, sub_if):
"""Register a sub-interface with this interface.
"""Enable MPLS on the VPP interface."""
self.test.vapi.sw_interface_enable_disable_mpls(
self.sw_if_index)
+
+ def is_ip4_entry_in_fib_dump(self, dump):
+ for i in dump:
+ if i.address == self.local_ip4n and \
+ i.address_length == self.local_ip4_prefix_len and \
+ i.table_id == self.ip4_table_id:
+ return True
+ return False