2 """ L2BD ARP term Test """
8 from socket import AF_INET, AF_INET6
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP
13 from scapy.utils import inet_pton, inet_ntop
14 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
15 in6_mactoifaceid, in6_ismaddr
16 from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
17 ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
18 ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
19 ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
21 from framework import VppTestCase, VppTestRunner
22 from util import Host, ppp, mactobinary
25 class TestL2bdArpTerm(VppTestCase):
26 """ L2BD arp termination Test Case """
31 Perform standard class setup (defined by class method setUpClass in
32 class VppTestCase) before running the test case, set test case related
33 variables and configure VPP.
35 super(TestL2bdArpTerm, cls).setUpClass()
38 # Create pg interfaces
40 cls.ifs_per_bd = ifs_per_bd = 3
41 n_ifs = n_bd * ifs_per_bd
42 cls.create_pg_interfaces(range(n_ifs))
44 # Set up all interfaces
45 for i in cls.pg_interfaces:
51 super(TestL2bdArpTerm, cls).tearDownClass()
56 Clear trace and packet infos before running each test.
58 self.reset_packet_infos()
59 super(TestL2bdArpTerm, self).setUp()
63 Show various debug prints after each test.
65 super(TestL2bdArpTerm, self).tearDown()
67 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
68 self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
70 def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
72 ip = e.ip4n if is_ipv6 == 0 else e.ip6n
73 self.vapi.bd_ip_mac_add_del(bd_id=bd_id,
80 def mac_list(cls, b6_range):
81 return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
84 def ip4_host(cls, subnet, host, mac):
86 ip4="172.17.1%02u.%u" % (subnet, host))
89 def ip4_hosts(cls, subnet, start, mac_list):
90 return {cls.ip4_host(subnet, start + j, mac_list[j])
91 for j in range(len(mac_list))}
94 def ip6_host(cls, subnet, host, mac):
96 ip6="fd01:%x::%x" % (subnet, host))
99 def ip6_hosts(cls, subnet, start, mac_list):
100 return {cls.ip6_host(subnet, start + j, mac_list[j])
101 for j in range(len(mac_list))}
104 def bd_swifs(cls, b):
107 return [cls.pg_interfaces[j] for j in range(start, start + n)]
109 def bd_add_del(self, bd_id=1, is_add=1):
111 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
112 for swif in self.bd_swifs(bd_id):
113 swif_idx = swif.sw_if_index
114 self.vapi.sw_interface_set_l2_bridge(
115 swif_idx, bd_id=bd_id, enable=is_add)
117 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
120 def arp_req(cls, src_host, host):
121 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
123 hwsrc=src_host.bin_mac,
128 def arp_reqs(cls, src_host, entries):
129 return [cls.arp_req(src_host, e) for e in entries]
132 def garp_req(cls, host):
133 return cls.arp_req(host, host)
136 def garp_reqs(cls, entries):
137 return [cls.garp_req(e) for e in entries]
139 def arp_resp_host(self, src_host, arp_resp):
140 ether = arp_resp[Ether]
141 self.assertEqual(ether.dst, src_host.mac)
144 self.assertEqual(arp.hwtype, 1)
145 self.assertEqual(arp.ptype, 0x800)
146 self.assertEqual(arp.hwlen, 6)
147 self.assertEqual(arp.plen, 4)
148 arp_opts = {"who-has": 1, "is-at": 2}
149 self.assertEqual(arp.op, arp_opts["is-at"])
150 self.assertEqual(arp.hwdst, src_host.mac)
151 self.assertEqual(arp.pdst, src_host.ip4)
152 return Host(mac=arp.hwsrc, ip4=arp.psrc)
154 def arp_resp_hosts(self, src_host, pkts):
155 return {self.arp_resp_host(src_host, p) for p in pkts}
157 def inttoip4(self, ip):
158 o1 = int(ip / 16777216) % 256
159 o2 = int(ip / 65536) % 256
160 o3 = int(ip / 256) % 256
162 return '%(o1)s.%(o2)s.%(o3)s.%(o4)s' % locals()
164 def arp_event_host(self, e):
165 return Host(mac=':'.join(['%02x' % ord(char) for char in e.new_mac]),
166 ip4=self.inttoip4(e.address))
168 def arp_event_hosts(self, evs):
169 return {self.arp_event_host(e) for e in evs}
172 def ns_req(cls, src_host, host):
173 nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
174 d = inet_ntop(AF_INET6, nsma)
175 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
176 IPv6(dst=d, src=src_host.ip6) /
177 ICMPv6ND_NS(tgt=host.ip6) /
178 ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac))
181 def ns_reqs(cls, src_host, entries):
182 return [cls.ns_req(src_host, e) for e in entries]
184 def na_resp_host(self, src_host, rx):
185 self.assertEqual(rx[Ether].dst, src_host.mac)
186 self.assertEqual(in6_ptop(rx[IPv6].dst),
187 in6_ptop(src_host.ip6))
189 self.assertTrue(rx.haslayer(ICMPv6ND_NA))
190 self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
193 return Host(mac=na.lladdr, ip6=na.tgt)
195 def na_resp_hosts(self, src_host, pkts):
196 return {self.na_resp_host(src_host, p) for p in pkts}
198 def set_bd_flags(self, bd_id, **args):
200 Enable/disable defined feature(s) of the bridge domain.
202 :param int bd_id: Bridge domain ID.
203 :param list args: List of feature/status pairs. Allowed features: \
204 learn, forward, flood, uu_flood and arp_term. Status False means \
205 disable, status True means enable the feature.
206 :raise: ValueError in case of unknown feature in the input.
210 feature_bitmap = 1 << 0
211 elif flag == "forward":
212 feature_bitmap = 1 << 1
213 elif flag == "flood":
214 feature_bitmap = 1 << 2
215 elif flag == "uu_flood":
216 feature_bitmap = 1 << 3
217 elif flag == "arp_term":
218 feature_bitmap = 1 << 4
220 raise ValueError("Unknown feature used: %s" % flag)
221 is_set = 1 if args[flag] else 0
222 self.vapi.bridge_flags(bd_id, is_set, feature_bitmap)
223 self.logger.info("Bridge domain ID %d updated" % bd_id)
225 def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
226 reqs = self.arp_reqs(src_host, req_hosts)
228 for swif in self.bd_swifs(bd_id):
229 swif.add_stream(reqs)
231 self.pg_enable_capture(self.pg_interfaces)
234 for swif in self.bd_swifs(bd_id):
235 resp_pkts = swif.get_capture(len(resp_hosts))
236 resps = self.arp_resp_hosts(src_host, resp_pkts)
237 self.assertEqual(len(resps ^ resp_hosts), 0)
239 def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
240 reqs = self.ns_reqs(src_host, req_hosts)
242 for swif in self.bd_swifs(bd_id):
243 swif.add_stream(reqs)
245 self.pg_enable_capture(self.pg_interfaces)
248 for swif in self.bd_swifs(bd_id):
249 resp_pkts = swif.get_capture(len(resp_hosts))
250 resps = self.na_resp_hosts(src_host, resp_pkts)
251 self.assertEqual(len(resps ^ resp_hosts), 0)
253 def test_l2bd_arp_term_01(self):
254 """ L2BD arp term - add 5 hosts, verify arp responses
256 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
257 self.bd_add_del(1, is_add=1)
258 self.set_bd_flags(1, arp_term=True, flood=False,
259 uu_flood=False, learn=False)
260 macs = self.mac_list(range(1, 5))
261 hosts = self.ip4_hosts(4, 1, macs)
262 self.add_del_arp_term_hosts(hosts, is_add=1)
263 self.verify_arp(src_host, hosts, hosts)
264 type(self).hosts = hosts
266 def test_l2bd_arp_term_02(self):
267 """ L2BD arp term - delete 3 hosts, verify arp responses
269 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
270 macs = self.mac_list(range(1, 3))
271 deleted = self.ip4_hosts(4, 1, macs)
272 self.add_del_arp_term_hosts(deleted, is_add=0)
273 remaining = self.hosts - deleted
274 self.verify_arp(src_host, self.hosts, remaining)
275 type(self).hosts = remaining
276 self.bd_add_del(1, is_add=0)
278 def test_l2bd_arp_term_03(self):
279 """ L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses
281 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
282 self.bd_add_del(1, is_add=1)
283 self.set_bd_flags(1, arp_term=True, flood=False,
284 uu_flood=False, learn=False)
285 macs = self.mac_list(range(1, 3))
286 readded = self.ip4_hosts(4, 1, macs)
287 self.add_del_arp_term_hosts(readded, is_add=1)
288 self.verify_arp(src_host, self.hosts | readded, readded)
289 type(self).hosts = readded
291 def test_l2bd_arp_term_04(self):
292 """ L2BD arp term - 2 IP4 addrs per host
294 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
295 macs = self.mac_list(range(1, 3))
296 sub5_hosts = self.ip4_hosts(5, 1, macs)
297 self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
298 hosts = self.hosts | sub5_hosts
299 self.verify_arp(src_host, hosts, hosts)
300 type(self).hosts = hosts
301 self.bd_add_del(1, is_add=0)
303 def test_l2bd_arp_term_05(self):
304 """ L2BD arp term - create and update 10 IP4-mac pairs
306 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
307 self.bd_add_del(1, is_add=1)
308 self.set_bd_flags(1, arp_term=True, flood=False,
309 uu_flood=False, learn=False)
310 macs1 = self.mac_list(range(10, 20))
311 hosts1 = self.ip4_hosts(5, 1, macs1)
312 self.add_del_arp_term_hosts(hosts1, is_add=1)
313 self.verify_arp(src_host, hosts1, hosts1)
314 macs2 = self.mac_list(range(20, 30))
315 hosts2 = self.ip4_hosts(5, 1, macs2)
316 self.add_del_arp_term_hosts(hosts2, is_add=1)
317 self.verify_arp(src_host, hosts1, hosts2)
318 self.bd_add_del(1, is_add=0)
320 def test_l2bd_arp_term_06(self):
321 """ L2BD arp/ND term - hosts with both ip4/ip6
323 src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
324 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
325 self.bd_add_del(1, is_add=1)
326 # enable flood to make sure requests are not flooded
327 self.set_bd_flags(1, arp_term=True, flood=True,
328 uu_flood=False, learn=False)
329 macs = self.mac_list(range(10, 20))
330 hosts6 = self.ip6_hosts(5, 1, macs)
331 hosts4 = self.ip4_hosts(5, 1, macs)
332 self.add_del_arp_term_hosts(hosts4, is_add=1)
333 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
334 self.verify_arp(src_host4, hosts4, hosts4)
335 self.verify_nd(src_host6, hosts6, hosts6)
336 self.bd_add_del(1, is_add=0)
338 def test_l2bd_arp_term_07(self):
339 """ L2BD ND term - Add and Del hosts, verify ND replies
341 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
342 self.bd_add_del(1, is_add=1)
343 self.set_bd_flags(1, arp_term=True, flood=False,
344 uu_flood=False, learn=False)
345 macs = self.mac_list(range(10, 20))
346 hosts6 = self.ip6_hosts(5, 1, macs)
347 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
348 self.verify_nd(src_host6, hosts6, hosts6)
349 del_macs = self.mac_list(range(10, 15))
350 deleted = self.ip6_hosts(5, 1, del_macs)
351 self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
352 self.verify_nd(src_host6, hosts6, hosts6 - deleted)
353 self.bd_add_del(1, is_add=0)
355 def test_l2bd_arp_term_08(self):
356 """ L2BD ND term - Add and update IP+mac, verify ND replies
358 src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
359 self.bd_add_del(1, is_add=1)
360 self.set_bd_flags(1, arp_term=True, flood=False,
361 uu_flood=False, learn=False)
362 macs1 = self.mac_list(range(10, 20))
363 hosts = self.ip6_hosts(5, 1, macs1)
364 self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
365 self.verify_nd(src_host, hosts, hosts)
366 macs2 = self.mac_list(range(20, 30))
367 updated = self.ip6_hosts(5, 1, macs2)
368 self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
369 self.verify_nd(src_host, hosts, updated)
370 self.bd_add_del(1, is_add=0)
372 def test_l2bd_arp_term_09(self):
373 """ L2BD arp term - send garps, verify arp event reports
375 self.vapi.want_ip4_arp_events()
376 self.bd_add_del(1, is_add=1)
377 self.set_bd_flags(1, arp_term=True, flood=False,
378 uu_flood=False, learn=False)
379 macs = self.mac_list(range(90, 95))
380 hosts = self.ip4_hosts(5, 1, macs)
382 garps = self.garp_reqs(hosts)
383 self.bd_swifs(1)[0].add_stream(garps)
385 self.pg_enable_capture(self.pg_interfaces)
387 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
388 for i in range(len(hosts))]
389 ev_hosts = self.arp_event_hosts(evs)
390 self.assertEqual(len(ev_hosts ^ hosts), 0)
392 def test_l2bd_arp_term_10(self):
393 """ L2BD arp term - send duplicate garps, verify suppression
395 macs = self.mac_list(range(70, 71))
396 hosts = self.ip4_hosts(6, 1, macs)
398 """ send the packet 5 times expect one event
400 garps = self.garp_reqs(hosts) * 5
401 self.bd_swifs(1)[0].add_stream(garps)
403 self.pg_enable_capture(self.pg_interfaces)
405 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
406 for i in range(len(hosts))]
407 ev_hosts = self.arp_event_hosts(evs)
408 self.assertEqual(len(ev_hosts ^ hosts), 0)
410 def test_l2bd_arp_term_11(self):
411 """ L2BD arp term - disable ip4 arp events,send garps, verify no events
413 self.vapi.want_ip4_arp_events(enable_disable=0)
414 macs = self.mac_list(range(90, 95))
415 hosts = self.ip4_hosts(5, 1, macs)
417 garps = self.garp_reqs(hosts)
418 self.bd_swifs(1)[0].add_stream(garps)
420 self.pg_enable_capture(self.pg_interfaces)
423 self.assertEqual(len(self.vapi.collect_events()), 0)
424 self.bd_add_del(1, is_add=0)
427 if __name__ == '__main__':
428 unittest.main(testRunner=VppTestRunner)