1 from abc import abstractmethod, ABCMeta
7 class VppInterface(object):
8 """Generic VPP interface."""
9 __metaclass__ = ABCMeta
12 def sw_if_index(self):
13 """Interface index assigned by VPP."""
14 return self._sw_if_index
18 """MAC-address of the remote interface "connected" to this interface."""
19 return self._remote_hosts[0].mac
23 """MAC-address of the VPP interface."""
24 return self._local_mac
28 """Local IPv4 address on VPP interface (string)."""
29 return self._local_ip4
33 """Local IPv4 address - raw, suitable as API parameter."""
34 return socket.inet_pton(socket.AF_INET, self._local_ip4)
38 """IPv4 address of remote peer "connected" to this interface."""
39 return self._remote_hosts[0].ip4
42 def remote_ip4n(self):
43 """IPv4 address of remote peer - raw, suitable as API parameter."""
44 return socket.inet_pton(socket.AF_INET, self.remote_ip4)
48 """Local IPv6 address on VPP interface (string)."""
49 return self._local_ip6
53 """Local IPv6 address - raw, suitable as API parameter."""
54 return socket.inet_pton(socket.AF_INET6, self.local_ip6)
58 """IPv6 address of remote peer "connected" to this interface."""
59 return self._remote_hosts[0].ip6
62 def remote_ip6n(self):
63 """IPv6 address of remote peer - raw, suitable as API parameter"""
64 return socket.inet_pton(socket.AF_INET6, self.remote_ip6)
68 """Name of the interface."""
73 """RAW result of sw_interface_dump for this interface."""
78 """Test case creating this interface."""
82 def remote_hosts(self):
83 """Remote hosts list"""
84 return self._remote_hosts
87 def remote_hosts(self, value):
89 :param list value: List of remote hosts.
91 self._remote_hosts = value
92 self._hosts_by_mac = {}
93 self._hosts_by_ip4 = {}
94 self._hosts_by_ip6 = {}
95 for host in self._remote_hosts:
96 self._hosts_by_mac[host.mac] = host
97 self._hosts_by_ip4[host.ip4] = host
98 self._hosts_by_ip6[host.ip6] = host
100 def host_by_mac(self, mac):
102 :param mac: MAC address to find host by.
103 :return: Host object assigned to interface.
105 return self._hosts_by_mac[mac]
107 def host_by_ip4(self, ip):
109 :param ip: IPv4 address to find host by.
110 :return: Host object assigned to interface.
112 return self._hosts_by_ip4[ip]
114 def host_by_ip6(self, ip):
116 :param ip: IPv6 address to find host by.
117 :return: Host object assigned to interface.
119 return self._hosts_by_ip6[ip]
121 def generate_remote_hosts(self, count=1):
122 """Generate and add remote hosts for the interface.
124 :param int count: Number of generated remote hosts.
126 self._remote_hosts = []
127 self._hosts_by_mac = {}
128 self._hosts_by_ip4 = {}
129 self._hosts_by_ip6 = {}
131 2, count + 2): # 0: network address, 1: local vpp address
132 mac = "02:%02x:00:00:ff:%02x" % (self.sw_if_index, i)
133 ip4 = "172.16.%u.%u" % (self.sw_if_index, i)
134 ip6 = "fd01:%04x::%04x" % (self.sw_if_index, i)
135 host = Host(mac, ip4, ip6)
136 self._remote_hosts.append(host)
137 self._hosts_by_mac[mac] = host
138 self._hosts_by_ip4[ip4] = host
139 self._hosts_by_ip6[ip6] = host
142 def __init__(self, test):
145 self._remote_hosts = []
146 self._hosts_by_mac = {}
147 self._hosts_by_ip4 = {}
148 self._hosts_by_ip6 = {}
150 self.generate_remote_hosts()
152 self._local_ip4 = "172.16.%u.1" % self.sw_if_index
153 self._local_ip4n = socket.inet_pton(socket.AF_INET, self.local_ip4)
154 self.local_ip4_prefix_len = 24
155 self.has_ip4_config = False
156 self.ip4_table_id = 0
158 self._local_ip6 = "fd01:%04x::1" % self.sw_if_index
159 self._local_ip6n = socket.inet_pton(socket.AF_INET6, self.local_ip6)
160 self.local_ip6_prefix_len = 64
161 self.has_ip6_config = False
162 self.ip6_table_id = 0
164 r = self.test.vapi.sw_interface_dump()
166 if intf.sw_if_index == self.sw_if_index:
167 self._name = intf.interface_name.split(b'\0', 1)[0]
169 ':'.join(intf.l2_address.encode('hex')[i:i + 2]
170 for i in range(0, 12, 2))
175 "Could not find interface with sw_if_index %d "
176 "in interface dump %s" %
177 (self.sw_if_index, repr(r)))
179 def config_ip4(self):
180 """Configure IPv4 address on the VPP interface."""
181 self.test.vapi.sw_interface_add_del_address(
182 self.sw_if_index, self.local_ip4n, self.local_ip4_prefix_len)
183 self.has_ip4_config = True
185 def unconfig_ip4(self):
186 """Remove IPv4 address on the VPP interface."""
188 if self.has_ip4_config:
189 self.test.vapi.sw_interface_add_del_address(
192 self.local_ip4_prefix_len,
194 except AttributeError:
195 self.has_ip4_config = False
196 self.has_ip4_config = False
198 def configure_ipv4_neighbors(self):
199 """For every remote host assign neighbor's MAC to IPv4 addresses."""
200 for host in self._remote_hosts:
201 macn = host.mac.replace(":", "").decode('hex')
203 self.test.vapi.ip_neighbor_add_del(self.sw_if_index, macn, ipn)
205 def config_ip6(self):
206 """Configure IPv6 address on the VPP interface."""
207 self.test.vapi.sw_interface_add_del_address(
208 self.sw_if_index, self._local_ip6n, self.local_ip6_prefix_len,
210 self.has_ip6_config = True
212 def unconfig_ip6(self):
213 """Remove IPv6 address on the VPP interface."""
215 if self.has_ip6_config:
216 self.test.vapi.sw_interface_add_del_address(
219 self.local_ip6_prefix_len,
221 except AttributeError:
222 self.has_ip6_config = False
223 self.has_ip6_config = False
226 """Unconfigure IPv6 and IPv4 address on the VPP interface."""
230 def set_table_ip4(self, table_id):
231 """Set the interface in a IPv4 Table.
233 .. note:: Must be called before configuring IP4 addresses.
235 self.ip4_table_id = table_id
236 self.test.vapi.sw_interface_set_table(
237 self.sw_if_index, 0, self.ip4_table_id)
239 def set_table_ip6(self, table_id):
240 """Set the interface in a IPv6 Table.
242 .. note:: Must be called before configuring IP6 addresses.
244 self.ip6_table_id = table_id
245 self.test.vapi.sw_interface_set_table(
246 self.sw_if_index, 1, self.ip6_table_id)
248 def disable_ipv6_ra(self):
249 """Configure IPv6 RA suppress on the VPP interface."""
250 self.test.vapi.sw_interface_ra_suppress(self.sw_if_index)
253 """Put interface ADMIN-UP."""
254 self.test.vapi.sw_interface_set_flags(self.sw_if_index, admin_up_down=1)
256 def admin_down(self):
257 """Put interface ADMIN-down."""
258 self.test.vapi.sw_interface_set_flags(self.sw_if_index, admin_up_down=0)
260 def add_sub_if(self, sub_if):
261 """Register a sub-interface with this interface.
263 :param sub_if: sub-interface
265 if not hasattr(self, 'sub_if'):
268 if isinstance(self.sub_if, list):
269 self.sub_if.append(sub_if)
273 def enable_mpls(self):
274 """Enable MPLS on the VPP interface."""
275 self.test.vapi.sw_interface_enable_disable_mpls(
278 def is_ip4_entry_in_fib_dump(self, dump):
280 if i.address == self.local_ip4n and \
281 i.address_length == self.local_ip4_prefix_len and \
282 i.table_id == self.ip4_table_id: