8 from util import Host, mk_ll_addr
9 from vpp_papi import mac_ntop
12 @six.add_metaclass(abc.ABCMeta)
13 class VppInterface(object):
14 """Generic VPP interface."""
17 def sw_if_index(self):
18 """Interface index assigned by VPP."""
19 return self._sw_if_index
23 """MAC-address of the remote interface "connected" to this interface"""
24 return self._remote_hosts[0].mac
28 """MAC-address of the VPP interface."""
29 return self._local_mac
33 return self._local_addr
36 def remote_addr(self):
37 return self._remote_addr
40 def local_addr_n(self):
41 return self._local_addr_n
44 def remote_addr_n(self):
45 return self._remote_addr_n
49 """Local IPv4 address on VPP interface (string)."""
50 return self._local_ip4
54 """Local IPv4 address - raw, suitable as API parameter."""
55 return socket.inet_pton(socket.AF_INET, self._local_ip4)
59 """IPv4 address of remote peer "connected" to this interface."""
60 return self._remote_hosts[0].ip4
63 def remote_ip4n(self):
64 """IPv4 address of remote peer - raw, suitable as API parameter."""
65 return socket.inet_pton(socket.AF_INET, self.remote_ip4)
69 """Local IPv6 address on VPP interface (string)."""
70 return self._local_ip6
74 """Local IPv6 address - raw, suitable as API parameter."""
75 return socket.inet_pton(socket.AF_INET6, self.local_ip6)
79 """IPv6 address of remote peer "connected" to this interface."""
80 return self._remote_hosts[0].ip6
83 def remote_ip6n(self):
84 """IPv6 address of remote peer - raw, suitable as API parameter"""
85 return socket.inet_pton(socket.AF_INET6, self.remote_ip6)
88 def local_ip6_ll(self):
89 """Local IPv6 link-local address on VPP interface (string)."""
90 return self._local_ip6_ll
93 def local_ip6n_ll(self):
94 """Local IPv6 link-local address - raw, suitable as API parameter."""
95 return self._local_ip6n_ll
98 def remote_ip6_ll(self):
99 """Link-local IPv6 address of remote peer
100 "connected" to this interface."""
101 return self._remote_ip6_ll
104 def remote_ip6n_ll(self):
105 """Link-local IPv6 address of remote peer
106 - raw, suitable as API parameter"""
107 return self._remote_ip6n_ll
111 """Name of the interface."""
116 """RAW result of sw_interface_dump for this interface."""
121 """Test case creating this interface."""
125 def remote_hosts(self):
126 """Remote hosts list"""
127 return self._remote_hosts
130 def remote_hosts(self, value):
132 :param list value: List of remote hosts.
134 self._remote_hosts = value
135 self._hosts_by_mac = {}
136 self._hosts_by_ip4 = {}
137 self._hosts_by_ip6 = {}
138 for host in self._remote_hosts:
139 self._hosts_by_mac[host.mac] = host
140 self._hosts_by_ip4[host.ip4] = host
141 self._hosts_by_ip6[host.ip6] = host
143 def host_by_mac(self, mac):
145 :param mac: MAC address to find host by.
146 :return: Host object assigned to interface.
148 return self._hosts_by_mac[mac]
150 def host_by_ip4(self, ip):
152 :param ip: IPv4 address to find host by.
153 :return: Host object assigned to interface.
155 return self._hosts_by_ip4[ip]
157 def host_by_ip6(self, ip):
159 :param ip: IPv6 address to find host by.
160 :return: Host object assigned to interface.
162 return self._hosts_by_ip6[ip]
164 def generate_remote_hosts(self, count=1):
165 """Generate and add remote hosts for the interface.
167 :param int count: Number of generated remote hosts.
169 self._remote_hosts = []
170 self._hosts_by_mac = {}
171 self._hosts_by_ip4 = {}
172 self._hosts_by_ip6 = {}
174 2, count + 2): # 0: network address, 1: local vpp address
175 mac = "02:%02x:00:00:ff:%02x" % (self.sw_if_index, i)
176 ip4 = "172.16.%u.%u" % (self.sw_if_index, i)
177 ip6 = "fd01:%x::%x" % (self.sw_if_index, i)
178 ip6_ll = mk_ll_addr(mac)
179 host = Host(mac, ip4, ip6, ip6_ll)
180 self._remote_hosts.append(host)
181 self._hosts_by_mac[mac] = host
182 self._hosts_by_ip4[ip4] = host
183 self._hosts_by_ip6[ip6] = host
186 def __init__(self, test):
189 self._remote_hosts = []
190 self._hosts_by_mac = {}
191 self._hosts_by_ip4 = {}
192 self._hosts_by_ip6 = {}
194 def set_mac(self, mac):
195 self._local_mac = str(mac)
196 self._local_ip6_ll = mk_ll_addr(self._local_mac)
197 self.test.vapi.sw_interface_set_mac_address(
198 self.sw_if_index, mac.packed)
200 def set_sw_if_index(self, sw_if_index):
201 self._sw_if_index = sw_if_index
203 self.generate_remote_hosts()
205 self._local_ip4 = "172.16.%u.1" % self.sw_if_index
206 self._local_ip4n = socket.inet_pton(socket.AF_INET, self.local_ip4)
207 self._local_ip4_subnet = "172.16.%u.0" % self.sw_if_index
208 self._local_ip4n_subnet = socket.inet_pton(socket.AF_INET,
209 self._local_ip4_subnet)
210 self._local_ip4_bcast = "172.16.%u.255" % self.sw_if_index
211 self._local_ip4n_bcast = socket.inet_pton(socket.AF_INET,
212 self._local_ip4_bcast)
213 self.local_ip4_prefix_len = 24
214 self.has_ip4_config = False
215 self.ip4_table_id = 0
217 self._local_ip6 = "fd01:%x::1" % self.sw_if_index
218 self._local_ip6n = socket.inet_pton(socket.AF_INET6, self.local_ip6)
219 self.local_ip6_prefix_len = 64
220 self.has_ip6_config = False
221 self.ip6_table_id = 0
223 self._local_addr = {socket.AF_INET: self.local_ip4,
224 socket.AF_INET6: self.local_ip6}
225 self._local_addr_n = {socket.AF_INET: self.local_ip4n,
226 socket.AF_INET6: self.local_ip6n}
227 self._remote_addr = {socket.AF_INET: self.remote_ip4,
228 socket.AF_INET6: self.remote_ip6}
229 self._remote_addr_n = {socket.AF_INET: self.remote_ip4n,
230 socket.AF_INET6: self.remote_ip6n}
232 r = self.test.vapi.sw_interface_dump()
234 if intf.sw_if_index == self.sw_if_index:
235 self._name = intf.interface_name.split(b'\0',
237 self._local_mac = mac_ntop(intf.l2_address)
242 "Could not find interface with sw_if_index %d "
243 "in interface dump %s" %
244 (self.sw_if_index, moves.reprlib.repr(r)))
245 self._local_ip6_ll = mk_ll_addr(self.local_mac)
246 self._local_ip6n_ll = socket.inet_pton(socket.AF_INET6,
248 self._remote_ip6_ll = mk_ll_addr(self.remote_mac)
249 self._remote_ip6n_ll = socket.inet_pton(socket.AF_INET6,
252 def config_ip4(self):
253 """Configure IPv4 address on the VPP interface."""
254 self.test.vapi.sw_interface_add_del_address(
255 self.sw_if_index, self.local_ip4n, self.local_ip4_prefix_len)
256 self.has_ip4_config = True
258 def unconfig_ip4(self):
259 """Remove IPv4 address on the VPP interface."""
261 if self.has_ip4_config:
262 self.test.vapi.sw_interface_add_del_address(
265 self.local_ip4_prefix_len,
267 except AttributeError:
268 self.has_ip4_config = False
269 self.has_ip4_config = False
271 def configure_ipv4_neighbors(self):
272 """For every remote host assign neighbor's MAC to IPv4 addresses.
274 :param vrf_id: The FIB table / VRF ID. (Default value = 0)
276 for host in self._remote_hosts:
277 self.test.vapi.ip_neighbor_add_del(self.sw_if_index,
281 def config_ip6(self):
282 """Configure IPv6 address on the VPP interface."""
283 self.test.vapi.sw_interface_add_del_address(
284 self.sw_if_index, self._local_ip6n, self.local_ip6_prefix_len,
286 self.has_ip6_config = True
288 def unconfig_ip6(self):
289 """Remove IPv6 address on the VPP interface."""
291 if self.has_ip6_config:
292 self.test.vapi.sw_interface_add_del_address(
295 self.local_ip6_prefix_len,
297 except AttributeError:
298 self.has_ip6_config = False
299 self.has_ip6_config = False
301 def configure_ipv6_neighbors(self):
302 """For every remote host assign neighbor's MAC to IPv6 addresses.
304 :param vrf_id: The FIB table / VRF ID. (Default value = 0)
306 for host in self._remote_hosts:
307 self.test.vapi.ip_neighbor_add_del(self.sw_if_index,
312 """Unconfigure IPv6 and IPv4 address on the VPP interface."""
316 def set_table_ip4(self, table_id):
317 """Set the interface in a IPv4 Table.
319 .. note:: Must be called before configuring IP4 addresses.
321 self.ip4_table_id = table_id
322 self.test.vapi.sw_interface_set_table(
323 self.sw_if_index, 0, self.ip4_table_id)
325 def set_table_ip6(self, table_id):
326 """Set the interface in a IPv6 Table.
328 .. note:: Must be called before configuring IP6 addresses.
330 self.ip6_table_id = table_id
331 self.test.vapi.sw_interface_set_table(
332 self.sw_if_index, 1, self.ip6_table_id)
334 def disable_ipv6_ra(self):
335 """Configure IPv6 RA suppress on the VPP interface."""
336 self.test.vapi.sw_interface_ra_suppress(self.sw_if_index)
338 def ip6_ra_config(self, no=0, suppress=0, send_unicast=0):
339 """Configure IPv6 RA suppress on the VPP interface."""
340 self.test.vapi.ip6_sw_interface_ra_config(self.sw_if_index,
345 def ip6_ra_prefix(self, address, address_length, is_no=0,
346 off_link=0, no_autoconfig=0, use_default=0):
347 """Configure IPv6 RA suppress on the VPP interface."""
348 self.test.vapi.ip6_sw_interface_ra_prefix(self.sw_if_index,
353 no_autoconfig=no_autoconfig,
354 use_default=use_default)
357 """Put interface ADMIN-UP."""
358 self.test.vapi.sw_interface_set_flags(self.sw_if_index,
361 def admin_down(self):
362 """Put interface ADMIN-down."""
363 self.test.vapi.sw_interface_set_flags(self.sw_if_index,
366 def ip6_enable(self):
367 """IPv6 Enable interface"""
368 self.test.vapi.ip6_sw_interface_enable_disable(self.sw_if_index,
371 def ip6_disable(self):
372 """Put interface ADMIN-DOWN."""
373 self.test.vapi.ip6_sw_interface_enable_disable(self.sw_if_index,
376 def add_sub_if(self, sub_if):
377 """Register a sub-interface with this interface.
379 :param sub_if: sub-interface
381 if not hasattr(self, 'sub_if'):
384 if isinstance(self.sub_if, list):
385 self.sub_if.append(sub_if)
389 def enable_mpls(self):
390 """Enable MPLS on the VPP interface."""
391 self.test.vapi.sw_interface_enable_disable_mpls(
394 def disable_mpls(self):
395 """Enable MPLS on the VPP interface."""
396 self.test.vapi.sw_interface_enable_disable_mpls(
399 def is_ip4_entry_in_fib_dump(self, dump):
401 if i.address == self.local_ip4n and \
402 i.address_length == self.local_ip4_prefix_len and \
403 i.table_id == self.ip4_table_id:
407 def set_unnumbered(self, ip_sw_if_index):
408 """ Set the interface to unnumbered via ip_sw_if_index """
409 self.test.vapi.sw_interface_set_unnumbered(
413 def unset_unnumbered(self, ip_sw_if_index):
414 """ Unset the interface to unnumbered via ip_sw_if_index """
415 self.test.vapi.sw_interface_set_unnumbered(
420 def set_proxy_arp(self, enable=1):
421 """ Set the interface to enable/disable Proxy ARP """
422 self.test.vapi.proxy_arp_intfc_enable_disable(
426 def query_vpp_config(self):
427 dump = self.test.vapi.sw_interface_dump()
428 return self.is_interface_config_in_dump(dump)
430 def get_interface_config_from_dump(self, dump):
432 if i.interface_name.rstrip(' \t\r\n\0') == self.name and \
433 i.sw_if_index == self.sw_if_index:
438 def is_interface_config_in_dump(self, dump):
439 return self.get_interface_config_from_dump(dump) is not None
441 def assert_interface_state(self, admin_up_down, link_up_down,
444 event = self.test.vapi.wait_for_event(timeout=1,
445 name='sw_interface_event')
446 self.test.assert_equal(event.sw_if_index, self.sw_if_index,
448 self.test.assert_equal(event.admin_up_down, admin_up_down,
450 self.test.assert_equal(event.link_up_down, link_up_down,
452 dump = self.test.vapi.sw_interface_dump()
453 if_state = self.get_interface_config_from_dump(dump)
454 self.test.assert_equal(if_state.admin_up_down, admin_up_down,
456 self.test.assert_equal(if_state.link_up_down, link_up_down,