1 from abc import abstractmethod, ABCMeta
4 from util import Host, mk_ll_addr
5 from vpp_neighbor import VppNeighbor
8 class VppInterface(object):
9 """Generic VPP interface."""
10 __metaclass__ = ABCMeta
13 def sw_if_index(self):
14 """Interface index assigned by VPP."""
15 return self._sw_if_index
19 """MAC-address of the remote interface "connected" to this interface"""
20 return self._remote_hosts[0].mac
24 """MAC-address of the VPP interface."""
25 return self._local_mac
29 """Local IPv4 address on VPP interface (string)."""
30 return self._local_ip4
34 """Local IPv4 address - raw, suitable as API parameter."""
35 return socket.inet_pton(socket.AF_INET, self._local_ip4)
39 """IPv4 address of remote peer "connected" to this interface."""
40 return self._remote_hosts[0].ip4
43 def remote_ip4n(self):
44 """IPv4 address of remote peer - raw, suitable as API parameter."""
45 return socket.inet_pton(socket.AF_INET, self.remote_ip4)
49 """Local IPv6 address on VPP interface (string)."""
50 return self._local_ip6
54 """Local IPv6 address - raw, suitable as API parameter."""
55 return socket.inet_pton(socket.AF_INET6, self.local_ip6)
58 def local_ip6_ll(self):
59 """Local IPv6 linnk-local address on VPP interface (string)."""
60 return self._local_ip6_ll
63 def local_ip6n_ll(self):
64 """Local IPv6 link-local address - raw, suitable as API parameter."""
65 return self.local_ip6n_ll
69 """IPv6 address of remote peer "connected" to this interface."""
70 return self._remote_hosts[0].ip6
73 def remote_ip6n(self):
74 """IPv6 address of remote peer - raw, suitable as API parameter"""
75 return socket.inet_pton(socket.AF_INET6, self.remote_ip6)
79 """Name of the interface."""
84 """RAW result of sw_interface_dump for this interface."""
89 """Test case creating this interface."""
93 def remote_hosts(self):
94 """Remote hosts list"""
95 return self._remote_hosts
98 def remote_hosts(self, value):
100 :param list value: List of remote hosts.
102 self._remote_hosts = value
103 self._hosts_by_mac = {}
104 self._hosts_by_ip4 = {}
105 self._hosts_by_ip6 = {}
106 for host in self._remote_hosts:
107 self._hosts_by_mac[host.mac] = host
108 self._hosts_by_ip4[host.ip4] = host
109 self._hosts_by_ip6[host.ip6] = host
111 def host_by_mac(self, mac):
113 :param mac: MAC address to find host by.
114 :return: Host object assigned to interface.
116 return self._hosts_by_mac[mac]
118 def host_by_ip4(self, ip):
120 :param ip: IPv4 address to find host by.
121 :return: Host object assigned to interface.
123 return self._hosts_by_ip4[ip]
125 def host_by_ip6(self, ip):
127 :param ip: IPv6 address to find host by.
128 :return: Host object assigned to interface.
130 return self._hosts_by_ip6[ip]
132 def generate_remote_hosts(self, count=1):
133 """Generate and add remote hosts for the interface.
135 :param int count: Number of generated remote hosts.
137 self._remote_hosts = []
138 self._hosts_by_mac = {}
139 self._hosts_by_ip4 = {}
140 self._hosts_by_ip6 = {}
142 2, count + 2): # 0: network address, 1: local vpp address
143 mac = "02:%02x:00:00:ff:%02x" % (self.sw_if_index, i)
144 ip4 = "172.16.%u.%u" % (self.sw_if_index, i)
145 ip6 = "fd01:%x::%x" % (self.sw_if_index, i)
146 ip6_ll = mk_ll_addr(mac)
147 host = Host(mac, ip4, ip6, ip6_ll)
148 self._remote_hosts.append(host)
149 self._hosts_by_mac[mac] = host
150 self._hosts_by_ip4[ip4] = host
151 self._hosts_by_ip6[ip6] = host
154 def __init__(self, test):
157 self._remote_hosts = []
158 self._hosts_by_mac = {}
159 self._hosts_by_ip4 = {}
160 self._hosts_by_ip6 = {}
162 self.generate_remote_hosts()
164 self._local_ip4 = "172.16.%u.1" % self.sw_if_index
165 self._local_ip4n = socket.inet_pton(socket.AF_INET, self.local_ip4)
166 self.local_ip4_prefix_len = 24
167 self.has_ip4_config = False
168 self.ip4_table_id = 0
170 self._local_ip6 = "fd01:%x::1" % self.sw_if_index
171 self._local_ip6n = socket.inet_pton(socket.AF_INET6, self.local_ip6)
172 self.local_ip6_prefix_len = 64
173 self.has_ip6_config = False
174 self.ip6_table_id = 0
176 r = self.test.vapi.sw_interface_dump()
178 if intf.sw_if_index == self.sw_if_index:
179 self._name = intf.interface_name.split(b'\0', 1)[0]
181 ':'.join(intf.l2_address.encode('hex')[i:i + 2]
182 for i in range(0, 12, 2))
187 "Could not find interface with sw_if_index %d "
188 "in interface dump %s" %
189 (self.sw_if_index, repr(r)))
190 self._local_ip6_ll = mk_ll_addr(self.local_mac)
191 self._local_ip6n_ll = socket.inet_pton(socket.AF_INET6,
194 def config_ip4(self):
195 """Configure IPv4 address on the VPP interface."""
196 self.test.vapi.sw_interface_add_del_address(
197 self.sw_if_index, self.local_ip4n, self.local_ip4_prefix_len)
198 self.has_ip4_config = True
200 def unconfig_ip4(self):
201 """Remove IPv4 address on the VPP interface."""
203 if self.has_ip4_config:
204 self.test.vapi.sw_interface_add_del_address(
207 self.local_ip4_prefix_len,
209 except AttributeError:
210 self.has_ip4_config = False
211 self.has_ip4_config = False
213 def configure_ipv4_neighbors(self):
214 """For every remote host assign neighbor's MAC to IPv4 addresses.
216 :param vrf_id: The FIB table / VRF ID. (Default value = 0)
218 for host in self._remote_hosts:
219 macn = host.mac.replace(":", "").decode('hex')
221 self.test.vapi.ip_neighbor_add_del(
222 self.sw_if_index, macn, ipn)
224 def config_ip6(self):
225 """Configure IPv6 address on the VPP interface."""
226 self.test.vapi.sw_interface_add_del_address(
227 self.sw_if_index, self._local_ip6n, self.local_ip6_prefix_len,
229 self.has_ip6_config = True
231 def unconfig_ip6(self):
232 """Remove IPv6 address on the VPP interface."""
234 if self.has_ip6_config:
235 self.test.vapi.sw_interface_add_del_address(
238 self.local_ip6_prefix_len,
240 except AttributeError:
241 self.has_ip6_config = False
242 self.has_ip6_config = False
244 def configure_ipv6_neighbors(self):
245 """For every remote host assign neighbor's MAC to IPv6 addresses.
247 :param vrf_id: The FIB table / VRF ID. (Default value = 0)
249 for host in self._remote_hosts:
250 macn = host.mac.replace(":", "").decode('hex')
252 self.test.vapi.ip_neighbor_add_del(
253 self.sw_if_index, macn, ipn, is_ipv6=1)
256 """Unconfigure IPv6 and IPv4 address on the VPP interface."""
260 def set_table_ip4(self, table_id):
261 """Set the interface in a IPv4 Table.
263 .. note:: Must be called before configuring IP4 addresses.
265 self.ip4_table_id = table_id
266 self.test.vapi.sw_interface_set_table(
267 self.sw_if_index, 0, self.ip4_table_id)
269 def set_table_ip6(self, table_id):
270 """Set the interface in a IPv6 Table.
272 .. note:: Must be called before configuring IP6 addresses.
274 self.ip6_table_id = table_id
275 self.test.vapi.sw_interface_set_table(
276 self.sw_if_index, 1, self.ip6_table_id)
278 def disable_ipv6_ra(self):
279 """Configure IPv6 RA suppress on the VPP interface."""
280 self.test.vapi.sw_interface_ra_suppress(self.sw_if_index)
282 def ip6_ra_config(self, no=0, suppress=0, send_unicast=0):
283 """Configure IPv6 RA suppress on the VPP interface."""
284 self.test.vapi.ip6_sw_interface_ra_config(self.sw_if_index,
289 def ip6_ra_prefix(self, address, address_length, is_no=0,
290 off_link=0, no_autoconfig=0, use_default=0):
291 """Configure IPv6 RA suppress on the VPP interface."""
292 self.test.vapi.ip6_sw_interface_ra_prefix(self.sw_if_index,
297 no_autoconfig=no_autoconfig,
298 use_default=use_default)
301 """Put interface ADMIN-UP."""
302 self.test.vapi.sw_interface_set_flags(self.sw_if_index,
305 def admin_down(self):
306 """Put interface ADMIN-down."""
307 self.test.vapi.sw_interface_set_flags(self.sw_if_index,
310 def ip6_enable(self):
311 """IPv6 Enable interface"""
312 self.test.vapi.ip6_sw_interface_enable_disable(self.sw_if_index,
315 def ip6_disable(self):
316 """Put interface ADMIN-DOWN."""
317 self.test.vapi.ip6_sw_interface_enable_disable(self.sw_if_index,
320 def add_sub_if(self, sub_if):
321 """Register a sub-interface with this interface.
323 :param sub_if: sub-interface
325 if not hasattr(self, 'sub_if'):
328 if isinstance(self.sub_if, list):
329 self.sub_if.append(sub_if)
333 def enable_mpls(self):
334 """Enable MPLS on the VPP interface."""
335 self.test.vapi.sw_interface_enable_disable_mpls(
338 def disable_mpls(self):
339 """Enable MPLS on the VPP interface."""
340 self.test.vapi.sw_interface_enable_disable_mpls(
343 def is_ip4_entry_in_fib_dump(self, dump):
345 if i.address == self.local_ip4n and \
346 i.address_length == self.local_ip4_prefix_len and \
347 i.table_id == self.ip4_table_id:
351 def set_unnumbered(self, ip_sw_if_index):
352 """ Set the interface to unnumbered via ip_sw_if_index """
353 self.test.vapi.sw_interface_set_unnumbered(
357 def unset_unnumbered(self, ip_sw_if_index):
358 """ Unset the interface to unnumbered via ip_sw_if_index """
359 self.test.vapi.sw_interface_set_unnumbered(
364 def set_proxy_arp(self, enable=1):
365 """ Set the interface to enable/disable Proxy ARP """
366 self.test.vapi.proxy_arp_intfc_enable_disable(