1 from abc import abstractmethod, ABCMeta
7 class VppInterface(object):
8 """Generic VPP interface."""
9 __metaclass__ = ABCMeta
12 def sw_if_index(self):
13 """Interface index assigned by VPP."""
14 return self._sw_if_index
18 """MAC-address of the remote interface "connected" to this interface."""
19 return self._remote_hosts[0].mac
23 """MAC-address of the VPP interface."""
24 return self._local_mac
28 """Local IPv4 address on VPP interface (string)."""
29 return self._local_ip4
33 """Local IPv4 address - raw, suitable as API parameter."""
34 return socket.inet_pton(socket.AF_INET, self._local_ip4)
38 """IPv4 address of remote peer "connected" to this interface."""
39 return self._remote_hosts[0].ip4
42 def remote_ip4n(self):
43 """IPv4 address of remote peer - raw, suitable as API parameter."""
44 return socket.inet_pton(socket.AF_INET, self.remote_ip4)
48 """Local IPv6 address on VPP interface (string)."""
49 return self._local_ip6
53 """Local IPv6 address - raw, suitable as API parameter."""
54 return socket.inet_pton(socket.AF_INET6, self.local_ip6)
58 """IPv6 address of remote peer "connected" to this interface."""
59 return self._remote_hosts[0].ip6
62 def remote_ip6n(self):
63 """IPv6 address of remote peer - raw, suitable as API parameter"""
64 return socket.inet_pton(socket.AF_INET6, self.remote_ip6)
68 """Name of the interface."""
73 """RAW result of sw_interface_dump for this interface."""
78 """Test case creating this interface."""
82 def remote_hosts(self):
83 """Remote hosts list"""
84 return self._remote_hosts
87 def remote_hosts(self, value):
89 :param list value: List of remote hosts.
91 self._remote_hosts = value
92 self._hosts_by_mac = {}
93 self._hosts_by_ip4 = {}
94 self._hosts_by_ip6 = {}
95 for host in self._remote_hosts:
96 self._hosts_by_mac[host.mac] = host
97 self._hosts_by_ip4[host.ip4] = host
98 self._hosts_by_ip6[host.ip6] = host
100 def host_by_mac(self, mac):
102 :param mac: MAC address to find host by.
103 :return: Host object assigned to interface.
105 return self._hosts_by_mac[mac]
107 def host_by_ip4(self, ip):
109 :param ip: IPv4 address to find host by.
110 :return: Host object assigned to interface.
112 return self._hosts_by_ip4[ip]
114 def host_by_ip6(self, ip):
116 :param ip: IPv6 address to find host by.
117 :return: Host object assigned to interface.
119 return self._hosts_by_ip6[ip]
121 def generate_remote_hosts(self, count=1):
122 """Generate and add remote hosts for the interface.
124 :param int count: Number of generated remote hosts.
126 self._remote_hosts = []
127 self._hosts_by_mac = {}
128 self._hosts_by_ip4 = {}
129 self._hosts_by_ip6 = {}
131 2, count + 2): # 0: network address, 1: local vpp address
132 mac = "02:%02x:00:00:ff:%02x" % (self.sw_if_index, i)
133 ip4 = "172.16.%u.%u" % (self.sw_if_index, i)
134 ip6 = "fd01:%04x::%04x" % (self.sw_if_index, i)
135 host = Host(mac, ip4, ip6)
136 self._remote_hosts.append(host)
137 self._hosts_by_mac[mac] = host
138 self._hosts_by_ip4[ip4] = host
139 self._hosts_by_ip6[ip6] = host
142 def __init__(self, test):
145 self._remote_hosts = []
146 self._hosts_by_mac = {}
147 self._hosts_by_ip4 = {}
148 self._hosts_by_ip6 = {}
150 self.generate_remote_hosts()
152 self._local_ip4 = "172.16.%u.1" % self.sw_if_index
153 self._local_ip4n = socket.inet_pton(socket.AF_INET, self.local_ip4)
155 self._local_ip6 = "fd01:%04x::1" % self.sw_if_index
156 self._local_ip6n = socket.inet_pton(socket.AF_INET6, self.local_ip6)
158 r = self.test.vapi.sw_interface_dump()
160 if intf.sw_if_index == self.sw_if_index:
161 self._name = intf.interface_name.split(b'\0', 1)[0]
163 ':'.join(intf.l2_address.encode('hex')[i:i + 2]
164 for i in range(0, 12, 2))
169 "Could not find interface with sw_if_index %d "
170 "in interface dump %s" %
171 (self.sw_if_index, repr(r)))
173 def config_ip4(self):
174 """Configure IPv4 address on the VPP interface."""
175 addr = self.local_ip4n
177 self.test.vapi.sw_interface_add_del_address(
178 self.sw_if_index, addr, addr_len)
179 self.has_ip4_config = True
181 def unconfig_ip4(self):
182 """Remove IPv4 address on the VPP interface"""
184 if (self.has_ip4_config):
185 self.test.vapi.sw_interface_add_del_address(
189 except AttributeError:
190 self.has_ip4_config = False
191 self.has_ip4_config = False
193 def configure_ipv4_neighbors(self):
194 """For every remote host assign neighbor's MAC to IPv4 addresses."""
195 for host in self._remote_hosts:
196 macn = host.mac.replace(":", "").decode('hex')
198 self.test.vapi.ip_neighbor_add_del(self.sw_if_index, macn, ipn)
200 def config_ip6(self):
201 """Configure IPv6 address on the VPP interface."""
202 addr = self._local_ip6n
204 self.test.vapi.sw_interface_add_del_address(
205 self.sw_if_index, addr, addr_len, is_ipv6=1)
206 self.has_ip6_config = True
208 def unconfig_ip6(self):
209 """Remove IPv6 address on the VPP interface"""
211 if (self.has_ip6_config):
212 self.test.vapi.sw_interface_add_del_address(
215 64, is_ipv6=1, is_add=0)
216 except AttributeError:
217 self.has_ip6_config = False
218 self.has_ip6_config = False
221 """Unconfigure IPv6 and IPv4 address on the VPP interface"""
225 def set_table_ip4(self, table_id):
226 """Set the interface in a IPv4 Table.
228 .. note:: Must be called before configuring IP4 addresses."""
229 self.test.vapi.sw_interface_set_table(
230 self.sw_if_index, 0, table_id)
232 def set_table_ip6(self, table_id):
233 """Set the interface in a IPv6 Table.
235 .. note:: Must be called before configuring IP6 addresses.
237 self.test.vapi.sw_interface_set_table(
238 self.sw_if_index, 1, table_id)
240 def disable_ipv6_ra(self):
241 """Configure IPv6 RA suppress on the VPP interface."""
242 self.test.vapi.sw_interface_ra_suppress(self.sw_if_index)
245 """Put interface ADMIN-UP."""
246 self.test.vapi.sw_interface_set_flags(self.sw_if_index, admin_up_down=1)
248 def add_sub_if(self, sub_if):
249 """Register a sub-interface with this interface.
251 :param sub_if: sub-interface
253 if not hasattr(self, 'sub_if'):
256 if isinstance(self.sub_if, list):
257 self.sub_if.append(sub_if)
261 def enable_mpls(self):
262 """Enable MPLS on the VPP interface."""
263 self.test.vapi.sw_interface_enable_disable_mpls(