2 from abc import abstractmethod, ABCMeta
6 from util import Host, mk_ll_addr, mactobinary
9 class VppInterface(object):
10 """Generic VPP interface."""
11 __metaclass__ = ABCMeta
14 def sw_if_index(self):
15 """Interface index assigned by VPP."""
16 return self._sw_if_index
20 """MAC-address of the remote interface "connected" to this interface"""
21 return self._remote_hosts[0].mac
25 """MAC-address of the VPP interface."""
26 return self._local_mac
30 return self._local_addr
33 def remote_addr(self):
34 return self._remote_addr
37 def local_addr_n(self):
38 return self._local_addr_n
41 def remote_addr_n(self):
42 return self._remote_addr_n
46 """Local IPv4 address on VPP interface (string)."""
47 return self._local_ip4
51 """Local IPv4 address - raw, suitable as API parameter."""
52 return socket.inet_pton(socket.AF_INET, self._local_ip4)
56 """IPv4 address of remote peer "connected" to this interface."""
57 return self._remote_hosts[0].ip4
60 def remote_ip4n(self):
61 """IPv4 address of remote peer - raw, suitable as API parameter."""
62 return socket.inet_pton(socket.AF_INET, self.remote_ip4)
66 """Local IPv6 address on VPP interface (string)."""
67 return self._local_ip6
71 """Local IPv6 address - raw, suitable as API parameter."""
72 return socket.inet_pton(socket.AF_INET6, self.local_ip6)
76 """IPv6 address of remote peer "connected" to this interface."""
77 return self._remote_hosts[0].ip6
80 def remote_ip6n(self):
81 """IPv6 address of remote peer - raw, suitable as API parameter"""
82 return socket.inet_pton(socket.AF_INET6, self.remote_ip6)
85 def local_ip6_ll(self):
86 """Local IPv6 link-local address on VPP interface (string)."""
87 return self._local_ip6_ll
90 def local_ip6n_ll(self):
91 """Local IPv6 link-local address - raw, suitable as API parameter."""
92 return self._local_ip6n_ll
95 def remote_ip6_ll(self):
96 """Link-local IPv6 address of remote peer
97 "connected" to this interface."""
98 return self._remote_ip6_ll
101 def remote_ip6n_ll(self):
102 """Link-local IPv6 address of remote peer
103 - raw, suitable as API parameter"""
104 return self._remote_ip6n_ll
108 """Name of the interface."""
113 """RAW result of sw_interface_dump for this interface."""
118 """Test case creating this interface."""
122 def remote_hosts(self):
123 """Remote hosts list"""
124 return self._remote_hosts
127 def remote_hosts(self, value):
129 :param list value: List of remote hosts.
131 self._remote_hosts = value
132 self._hosts_by_mac = {}
133 self._hosts_by_ip4 = {}
134 self._hosts_by_ip6 = {}
135 for host in self._remote_hosts:
136 self._hosts_by_mac[host.mac] = host
137 self._hosts_by_ip4[host.ip4] = host
138 self._hosts_by_ip6[host.ip6] = host
140 def host_by_mac(self, mac):
142 :param mac: MAC address to find host by.
143 :return: Host object assigned to interface.
145 return self._hosts_by_mac[mac]
147 def host_by_ip4(self, ip):
149 :param ip: IPv4 address to find host by.
150 :return: Host object assigned to interface.
152 return self._hosts_by_ip4[ip]
154 def host_by_ip6(self, ip):
156 :param ip: IPv6 address to find host by.
157 :return: Host object assigned to interface.
159 return self._hosts_by_ip6[ip]
161 def generate_remote_hosts(self, count=1):
162 """Generate and add remote hosts for the interface.
164 :param int count: Number of generated remote hosts.
166 self._remote_hosts = []
167 self._hosts_by_mac = {}
168 self._hosts_by_ip4 = {}
169 self._hosts_by_ip6 = {}
171 2, count + 2): # 0: network address, 1: local vpp address
172 mac = "02:%02x:00:00:ff:%02x" % (self.sw_if_index, i)
173 ip4 = "172.16.%u.%u" % (self.sw_if_index, i)
174 ip6 = "fd01:%x::%x" % (self.sw_if_index, i)
175 ip6_ll = mk_ll_addr(mac)
176 host = Host(mac, ip4, ip6, ip6_ll)
177 self._remote_hosts.append(host)
178 self._hosts_by_mac[mac] = host
179 self._hosts_by_ip4[ip4] = host
180 self._hosts_by_ip6[ip6] = host
183 def __init__(self, test):
186 self._remote_hosts = []
187 self._hosts_by_mac = {}
188 self._hosts_by_ip4 = {}
189 self._hosts_by_ip6 = {}
191 def set_mac(self, mac):
192 self._local_mac = mac
193 self._local_ip6_ll = mk_ll_addr(mac)
194 self.test.vapi.sw_interface_set_mac_address(
196 mactobinary(self._local_mac))
198 def set_sw_if_index(self, sw_if_index):
199 self._sw_if_index = sw_if_index
201 self.generate_remote_hosts()
203 self._local_ip4 = "172.16.%u.1" % self.sw_if_index
204 self._local_ip4n = socket.inet_pton(socket.AF_INET, self.local_ip4)
205 self._local_ip4_subnet = "172.16.%u.0" % self.sw_if_index
206 self._local_ip4n_subnet = socket.inet_pton(socket.AF_INET,
207 self._local_ip4_subnet)
208 self._local_ip4_bcast = "172.16.%u.255" % self.sw_if_index
209 self._local_ip4n_bcast = socket.inet_pton(socket.AF_INET,
210 self._local_ip4_bcast)
211 self.local_ip4_prefix_len = 24
212 self.has_ip4_config = False
213 self.ip4_table_id = 0
215 self._local_ip6 = "fd01:%x::1" % self.sw_if_index
216 self._local_ip6n = socket.inet_pton(socket.AF_INET6, self.local_ip6)
217 self.local_ip6_prefix_len = 64
218 self.has_ip6_config = False
219 self.ip6_table_id = 0
221 self._local_addr = {socket.AF_INET: self.local_ip4,
222 socket.AF_INET6: self.local_ip6}
223 self._local_addr_n = {socket.AF_INET: self.local_ip4n,
224 socket.AF_INET6: self.local_ip6n}
225 self._remote_addr = {socket.AF_INET: self.remote_ip4,
226 socket.AF_INET6: self.remote_ip6}
227 self._remote_addr_n = {socket.AF_INET: self.remote_ip4n,
228 socket.AF_INET6: self.remote_ip6n}
230 r = self.test.vapi.sw_interface_dump()
232 if intf.sw_if_index == self.sw_if_index:
233 self._name = intf.interface_name.split(b'\0', 1)[0]
235 ':'.join(intf.l2_address.encode('hex')[i:i + 2]
236 for i in range(0, 12, 2))
241 "Could not find interface with sw_if_index %d "
242 "in interface dump %s" %
243 (self.sw_if_index, six.reprlib(r)))
244 self._local_ip6_ll = mk_ll_addr(self.local_mac)
245 self._local_ip6n_ll = socket.inet_pton(socket.AF_INET6,
247 self._remote_ip6_ll = mk_ll_addr(self.remote_mac)
248 self._remote_ip6n_ll = socket.inet_pton(socket.AF_INET6,
251 def config_ip4(self):
252 """Configure IPv4 address on the VPP interface."""
253 self.test.vapi.sw_interface_add_del_address(
254 self.sw_if_index, self.local_ip4n, self.local_ip4_prefix_len)
255 self.has_ip4_config = True
257 def unconfig_ip4(self):
258 """Remove IPv4 address on the VPP interface."""
260 if self.has_ip4_config:
261 self.test.vapi.sw_interface_add_del_address(
264 self.local_ip4_prefix_len,
266 except AttributeError:
267 self.has_ip4_config = False
268 self.has_ip4_config = False
270 def configure_ipv4_neighbors(self):
271 """For every remote host assign neighbor's MAC to IPv4 addresses.
273 :param vrf_id: The FIB table / VRF ID. (Default value = 0)
275 for host in self._remote_hosts:
276 macn = host.mac.replace(":", "").decode('hex')
278 self.test.vapi.ip_neighbor_add_del(
279 self.sw_if_index, macn, ipn)
281 def config_ip6(self):
282 """Configure IPv6 address on the VPP interface."""
283 self.test.vapi.sw_interface_add_del_address(
284 self.sw_if_index, self._local_ip6n, self.local_ip6_prefix_len,
286 self.has_ip6_config = True
288 def unconfig_ip6(self):
289 """Remove IPv6 address on the VPP interface."""
291 if self.has_ip6_config:
292 self.test.vapi.sw_interface_add_del_address(
295 self.local_ip6_prefix_len,
297 except AttributeError:
298 self.has_ip6_config = False
299 self.has_ip6_config = False
301 def configure_ipv6_neighbors(self):
302 """For every remote host assign neighbor's MAC to IPv6 addresses.
304 :param vrf_id: The FIB table / VRF ID. (Default value = 0)
306 for host in self._remote_hosts:
307 macn = host.mac.replace(":", "").decode('hex')
309 self.test.vapi.ip_neighbor_add_del(
310 self.sw_if_index, macn, ipn, is_ipv6=1)
313 """Unconfigure IPv6 and IPv4 address on the VPP interface."""
317 def set_table_ip4(self, table_id):
318 """Set the interface in a IPv4 Table.
320 .. note:: Must be called before configuring IP4 addresses.
322 self.ip4_table_id = table_id
323 self.test.vapi.sw_interface_set_table(
324 self.sw_if_index, 0, self.ip4_table_id)
326 def set_table_ip6(self, table_id):
327 """Set the interface in a IPv6 Table.
329 .. note:: Must be called before configuring IP6 addresses.
331 self.ip6_table_id = table_id
332 self.test.vapi.sw_interface_set_table(
333 self.sw_if_index, 1, self.ip6_table_id)
335 def disable_ipv6_ra(self):
336 """Configure IPv6 RA suppress on the VPP interface."""
337 self.test.vapi.sw_interface_ra_suppress(self.sw_if_index)
339 def ip6_ra_config(self, no=0, suppress=0, send_unicast=0):
340 """Configure IPv6 RA suppress on the VPP interface."""
341 self.test.vapi.ip6_sw_interface_ra_config(self.sw_if_index,
346 def ip6_ra_prefix(self, address, address_length, is_no=0,
347 off_link=0, no_autoconfig=0, use_default=0):
348 """Configure IPv6 RA suppress on the VPP interface."""
349 self.test.vapi.ip6_sw_interface_ra_prefix(self.sw_if_index,
354 no_autoconfig=no_autoconfig,
355 use_default=use_default)
358 """Put interface ADMIN-UP."""
359 self.test.vapi.sw_interface_set_flags(self.sw_if_index,
362 def admin_down(self):
363 """Put interface ADMIN-down."""
364 self.test.vapi.sw_interface_set_flags(self.sw_if_index,
367 def ip6_enable(self):
368 """IPv6 Enable interface"""
369 self.test.vapi.ip6_sw_interface_enable_disable(self.sw_if_index,
372 def ip6_disable(self):
373 """Put interface ADMIN-DOWN."""
374 self.test.vapi.ip6_sw_interface_enable_disable(self.sw_if_index,
377 def add_sub_if(self, sub_if):
378 """Register a sub-interface with this interface.
380 :param sub_if: sub-interface
382 if not hasattr(self, 'sub_if'):
385 if isinstance(self.sub_if, list):
386 self.sub_if.append(sub_if)
390 def enable_mpls(self):
391 """Enable MPLS on the VPP interface."""
392 self.test.vapi.sw_interface_enable_disable_mpls(
395 def disable_mpls(self):
396 """Enable MPLS on the VPP interface."""
397 self.test.vapi.sw_interface_enable_disable_mpls(
400 def is_ip4_entry_in_fib_dump(self, dump):
402 if i.address == self.local_ip4n and \
403 i.address_length == self.local_ip4_prefix_len and \
404 i.table_id == self.ip4_table_id:
408 def set_unnumbered(self, ip_sw_if_index):
409 """ Set the interface to unnumbered via ip_sw_if_index """
410 self.test.vapi.sw_interface_set_unnumbered(
414 def unset_unnumbered(self, ip_sw_if_index):
415 """ Unset the interface to unnumbered via ip_sw_if_index """
416 self.test.vapi.sw_interface_set_unnumbered(
421 def set_proxy_arp(self, enable=1):
422 """ Set the interface to enable/disable Proxy ARP """
423 self.test.vapi.proxy_arp_intfc_enable_disable(
427 def query_vpp_config(self):
428 dump = self.test.vapi.sw_interface_dump()
429 return self.is_interface_config_in_dump(dump)
431 def get_interface_config_from_dump(self, dump):
433 if i.interface_name.rstrip(' \t\r\n\0') == self.name and \
434 i.sw_if_index == self.sw_if_index:
439 def is_interface_config_in_dump(self, dump):
440 return self.get_interface_config_from_dump(dump) is not None
442 def assert_interface_state(self, admin_up_down, link_up_down,
445 event = self.test.vapi.wait_for_event(timeout=1,
446 name='sw_interface_event')
447 self.test.assert_equal(event.sw_if_index, self.sw_if_index,
449 self.test.assert_equal(event.admin_up_down, admin_up_down,
451 self.test.assert_equal(event.link_up_down, link_up_down,
453 dump = self.test.vapi.sw_interface_dump()
454 if_state = self.get_interface_config_from_dump(dump)
455 self.test.assert_equal(if_state.admin_up_down, admin_up_down,
457 self.test.assert_equal(if_state.link_up_down, link_up_down,