3 from abc import abstractmethod, ABCMeta
7 from util import Host, mk_ll_addr
8 from vpp_mac import mactobinary, binarytomac
11 class VppInterface(object):
12 """Generic VPP interface."""
13 __metaclass__ = ABCMeta
16 def sw_if_index(self):
17 """Interface index assigned by VPP."""
18 return self._sw_if_index
22 """MAC-address of the remote interface "connected" to this interface"""
23 return self._remote_hosts[0].mac
27 """MAC-address of the VPP interface."""
28 return self._local_mac
32 return self._local_addr
35 def remote_addr(self):
36 return self._remote_addr
39 def local_addr_n(self):
40 return self._local_addr_n
43 def remote_addr_n(self):
44 return self._remote_addr_n
48 """Local IPv4 address on VPP interface (string)."""
49 return self._local_ip4
53 """Local IPv4 address - raw, suitable as API parameter."""
54 return socket.inet_pton(socket.AF_INET, self._local_ip4)
58 """IPv4 address of remote peer "connected" to this interface."""
59 return self._remote_hosts[0].ip4
62 def remote_ip4n(self):
63 """IPv4 address of remote peer - raw, suitable as API parameter."""
64 return socket.inet_pton(socket.AF_INET, self.remote_ip4)
68 """Local IPv6 address on VPP interface (string)."""
69 return self._local_ip6
73 """Local IPv6 address - raw, suitable as API parameter."""
74 return socket.inet_pton(socket.AF_INET6, self.local_ip6)
78 """IPv6 address of remote peer "connected" to this interface."""
79 return self._remote_hosts[0].ip6
82 def remote_ip6n(self):
83 """IPv6 address of remote peer - raw, suitable as API parameter"""
84 return socket.inet_pton(socket.AF_INET6, self.remote_ip6)
87 def local_ip6_ll(self):
88 """Local IPv6 link-local address on VPP interface (string)."""
89 return self._local_ip6_ll
92 def local_ip6n_ll(self):
93 """Local IPv6 link-local address - raw, suitable as API parameter."""
94 return self._local_ip6n_ll
97 def remote_ip6_ll(self):
98 """Link-local IPv6 address of remote peer
99 "connected" to this interface."""
100 return self._remote_ip6_ll
103 def remote_ip6n_ll(self):
104 """Link-local IPv6 address of remote peer
105 - raw, suitable as API parameter"""
106 return self._remote_ip6n_ll
110 """Name of the interface."""
115 """RAW result of sw_interface_dump for this interface."""
120 """Test case creating this interface."""
124 def remote_hosts(self):
125 """Remote hosts list"""
126 return self._remote_hosts
129 def remote_hosts(self, value):
131 :param list value: List of remote hosts.
133 self._remote_hosts = value
134 self._hosts_by_mac = {}
135 self._hosts_by_ip4 = {}
136 self._hosts_by_ip6 = {}
137 for host in self._remote_hosts:
138 self._hosts_by_mac[host.mac] = host
139 self._hosts_by_ip4[host.ip4] = host
140 self._hosts_by_ip6[host.ip6] = host
142 def host_by_mac(self, mac):
144 :param mac: MAC address to find host by.
145 :return: Host object assigned to interface.
147 return self._hosts_by_mac[mac]
149 def host_by_ip4(self, ip):
151 :param ip: IPv4 address to find host by.
152 :return: Host object assigned to interface.
154 return self._hosts_by_ip4[ip]
156 def host_by_ip6(self, ip):
158 :param ip: IPv6 address to find host by.
159 :return: Host object assigned to interface.
161 return self._hosts_by_ip6[ip]
163 def generate_remote_hosts(self, count=1):
164 """Generate and add remote hosts for the interface.
166 :param int count: Number of generated remote hosts.
168 self._remote_hosts = []
169 self._hosts_by_mac = {}
170 self._hosts_by_ip4 = {}
171 self._hosts_by_ip6 = {}
173 2, count + 2): # 0: network address, 1: local vpp address
174 mac = "02:%02x:00:00:ff:%02x" % (self.sw_if_index, i)
175 ip4 = "172.16.%u.%u" % (self.sw_if_index, i)
176 ip6 = "fd01:%x::%x" % (self.sw_if_index, i)
177 ip6_ll = mk_ll_addr(mac)
178 host = Host(mac, ip4, ip6, ip6_ll)
179 self._remote_hosts.append(host)
180 self._hosts_by_mac[mac] = host
181 self._hosts_by_ip4[ip4] = host
182 self._hosts_by_ip6[ip6] = host
185 def __init__(self, test):
188 self._remote_hosts = []
189 self._hosts_by_mac = {}
190 self._hosts_by_ip4 = {}
191 self._hosts_by_ip6 = {}
193 def set_mac(self, mac):
194 self._local_mac = mac
195 self._local_ip6_ll = mk_ll_addr(mac)
196 self.test.vapi.sw_interface_set_mac_address(
198 mactobinary(self._local_mac))
200 def set_sw_if_index(self, sw_if_index):
201 self._sw_if_index = sw_if_index
203 self.generate_remote_hosts()
205 self._local_ip4 = "172.16.%u.1" % self.sw_if_index
206 self._local_ip4n = socket.inet_pton(socket.AF_INET, self.local_ip4)
207 self._local_ip4_subnet = "172.16.%u.0" % self.sw_if_index
208 self._local_ip4n_subnet = socket.inet_pton(socket.AF_INET,
209 self._local_ip4_subnet)
210 self._local_ip4_bcast = "172.16.%u.255" % self.sw_if_index
211 self._local_ip4n_bcast = socket.inet_pton(socket.AF_INET,
212 self._local_ip4_bcast)
213 self.local_ip4_prefix_len = 24
214 self.has_ip4_config = False
215 self.ip4_table_id = 0
217 self._local_ip6 = "fd01:%x::1" % self.sw_if_index
218 self._local_ip6n = socket.inet_pton(socket.AF_INET6, self.local_ip6)
219 self.local_ip6_prefix_len = 64
220 self.has_ip6_config = False
221 self.ip6_table_id = 0
223 self._local_addr = {socket.AF_INET: self.local_ip4,
224 socket.AF_INET6: self.local_ip6}
225 self._local_addr_n = {socket.AF_INET: self.local_ip4n,
226 socket.AF_INET6: self.local_ip6n}
227 self._remote_addr = {socket.AF_INET: self.remote_ip4,
228 socket.AF_INET6: self.remote_ip6}
229 self._remote_addr_n = {socket.AF_INET: self.remote_ip4n,
230 socket.AF_INET6: self.remote_ip6n}
232 r = self.test.vapi.sw_interface_dump()
234 if intf.sw_if_index == self.sw_if_index:
235 self._name = intf.interface_name.split(b'\0',
237 self._local_mac = binarytomac(intf.l2_address)
242 "Could not find interface with sw_if_index %d "
243 "in interface dump %s" %
244 (self.sw_if_index, moves.reprlib.repr(r)))
245 self._local_ip6_ll = mk_ll_addr(self.local_mac)
246 self._local_ip6n_ll = socket.inet_pton(socket.AF_INET6,
248 self._remote_ip6_ll = mk_ll_addr(self.remote_mac)
249 self._remote_ip6n_ll = socket.inet_pton(socket.AF_INET6,
252 def config_ip4(self):
253 """Configure IPv4 address on the VPP interface."""
254 self.test.vapi.sw_interface_add_del_address(
255 self.sw_if_index, self.local_ip4n, self.local_ip4_prefix_len)
256 self.has_ip4_config = True
258 def unconfig_ip4(self):
259 """Remove IPv4 address on the VPP interface."""
261 if self.has_ip4_config:
262 self.test.vapi.sw_interface_add_del_address(
265 self.local_ip4_prefix_len,
267 except AttributeError:
268 self.has_ip4_config = False
269 self.has_ip4_config = False
271 def configure_ipv4_neighbors(self):
272 """For every remote host assign neighbor's MAC to IPv4 addresses.
274 :param vrf_id: The FIB table / VRF ID. (Default value = 0)
276 for host in self._remote_hosts:
277 macn = mactobinary(host.mac)
279 self.test.vapi.ip_neighbor_add_del(
280 self.sw_if_index, macn, ipn)
282 def config_ip6(self):
283 """Configure IPv6 address on the VPP interface."""
284 self.test.vapi.sw_interface_add_del_address(
285 self.sw_if_index, self._local_ip6n, self.local_ip6_prefix_len,
287 self.has_ip6_config = True
289 def unconfig_ip6(self):
290 """Remove IPv6 address on the VPP interface."""
292 if self.has_ip6_config:
293 self.test.vapi.sw_interface_add_del_address(
296 self.local_ip6_prefix_len,
298 except AttributeError:
299 self.has_ip6_config = False
300 self.has_ip6_config = False
302 def configure_ipv6_neighbors(self):
303 """For every remote host assign neighbor's MAC to IPv6 addresses.
305 :param vrf_id: The FIB table / VRF ID. (Default value = 0)
307 for host in self._remote_hosts:
308 macn = mactobinary(host.mac)
310 self.test.vapi.ip_neighbor_add_del(
311 self.sw_if_index, macn, ipn, is_ipv6=1)
314 """Unconfigure IPv6 and IPv4 address on the VPP interface."""
318 def set_table_ip4(self, table_id):
319 """Set the interface in a IPv4 Table.
321 .. note:: Must be called before configuring IP4 addresses.
323 self.ip4_table_id = table_id
324 self.test.vapi.sw_interface_set_table(
325 self.sw_if_index, 0, self.ip4_table_id)
327 def set_table_ip6(self, table_id):
328 """Set the interface in a IPv6 Table.
330 .. note:: Must be called before configuring IP6 addresses.
332 self.ip6_table_id = table_id
333 self.test.vapi.sw_interface_set_table(
334 self.sw_if_index, 1, self.ip6_table_id)
336 def disable_ipv6_ra(self):
337 """Configure IPv6 RA suppress on the VPP interface."""
338 self.test.vapi.sw_interface_ra_suppress(self.sw_if_index)
340 def ip6_ra_config(self, no=0, suppress=0, send_unicast=0):
341 """Configure IPv6 RA suppress on the VPP interface."""
342 self.test.vapi.ip6_sw_interface_ra_config(self.sw_if_index,
347 def ip6_ra_prefix(self, address, address_length, is_no=0,
348 off_link=0, no_autoconfig=0, use_default=0):
349 """Configure IPv6 RA suppress on the VPP interface."""
350 self.test.vapi.ip6_sw_interface_ra_prefix(self.sw_if_index,
355 no_autoconfig=no_autoconfig,
356 use_default=use_default)
359 """Put interface ADMIN-UP."""
360 self.test.vapi.sw_interface_set_flags(self.sw_if_index,
363 def admin_down(self):
364 """Put interface ADMIN-down."""
365 self.test.vapi.sw_interface_set_flags(self.sw_if_index,
368 def ip6_enable(self):
369 """IPv6 Enable interface"""
370 self.test.vapi.ip6_sw_interface_enable_disable(self.sw_if_index,
373 def ip6_disable(self):
374 """Put interface ADMIN-DOWN."""
375 self.test.vapi.ip6_sw_interface_enable_disable(self.sw_if_index,
378 def add_sub_if(self, sub_if):
379 """Register a sub-interface with this interface.
381 :param sub_if: sub-interface
383 if not hasattr(self, 'sub_if'):
386 if isinstance(self.sub_if, list):
387 self.sub_if.append(sub_if)
391 def enable_mpls(self):
392 """Enable MPLS on the VPP interface."""
393 self.test.vapi.sw_interface_enable_disable_mpls(
396 def disable_mpls(self):
397 """Enable MPLS on the VPP interface."""
398 self.test.vapi.sw_interface_enable_disable_mpls(
401 def is_ip4_entry_in_fib_dump(self, dump):
403 if i.address == self.local_ip4n and \
404 i.address_length == self.local_ip4_prefix_len and \
405 i.table_id == self.ip4_table_id:
409 def set_unnumbered(self, ip_sw_if_index):
410 """ Set the interface to unnumbered via ip_sw_if_index """
411 self.test.vapi.sw_interface_set_unnumbered(
415 def unset_unnumbered(self, ip_sw_if_index):
416 """ Unset the interface to unnumbered via ip_sw_if_index """
417 self.test.vapi.sw_interface_set_unnumbered(
422 def set_proxy_arp(self, enable=1):
423 """ Set the interface to enable/disable Proxy ARP """
424 self.test.vapi.proxy_arp_intfc_enable_disable(
428 def query_vpp_config(self):
429 dump = self.test.vapi.sw_interface_dump()
430 return self.is_interface_config_in_dump(dump)
432 def get_interface_config_from_dump(self, dump):
434 if i.interface_name.rstrip(' \t\r\n\0') == self.name and \
435 i.sw_if_index == self.sw_if_index:
440 def is_interface_config_in_dump(self, dump):
441 return self.get_interface_config_from_dump(dump) is not None
443 def assert_interface_state(self, admin_up_down, link_up_down,
446 event = self.test.vapi.wait_for_event(timeout=1,
447 name='sw_interface_event')
448 self.test.assert_equal(event.sw_if_index, self.sw_if_index,
450 self.test.assert_equal(event.admin_up_down, admin_up_down,
452 self.test.assert_equal(event.link_up_down, link_up_down,
454 dump = self.test.vapi.sw_interface_dump()
455 if_state = self.get_interface_config_from_dump(dump)
456 self.test.assert_equal(if_state.admin_up_down, admin_up_down,
458 self.test.assert_equal(if_state.link_up_down, link_up_down,