2 """ L2BD ARP term Test """
8 from socket import AF_INET, AF_INET6, inet_pton, inet_ntop
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP
13 from scapy.utils6 import (
21 from scapy.layers.inet6 import (
29 ICMPv6MRD_Solicitation,
32 ICMPv6NDOptPrefixInfo,
39 from framework import VppTestCase, VppTestRunner
40 from util import Host, ppp
43 class TestL2bdArpTerm(VppTestCase):
44 """L2BD arp termination Test Case"""
49 Perform standard class setup (defined by class method setUpClass in
50 class VppTestCase) before running the test case, set test case related
51 variables and configure VPP.
53 super(TestL2bdArpTerm, cls).setUpClass()
56 # Create pg interfaces
58 cls.ifs_per_bd = ifs_per_bd = 3
59 n_ifs = n_bd * ifs_per_bd
60 cls.create_pg_interfaces(range(n_ifs))
62 # Set up all interfaces
63 for i in cls.pg_interfaces:
69 super(TestL2bdArpTerm, cls).tearDownClass()
73 def tearDownClass(cls):
74 super(TestL2bdArpTerm, cls).tearDownClass()
78 Clear trace and packet infos before running each test.
80 self.reset_packet_infos()
81 super(TestL2bdArpTerm, self).setUp()
85 Show various debug prints after each test.
87 super(TestL2bdArpTerm, self).tearDown()
89 def show_commands_at_teardown(self):
90 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
91 # many tests delete bridge-domain 1 as the last task. don't output
92 # the details of a non-existent bridge-domain.
93 if self.vapi.l2_fib_table_dump(bd_id=1):
94 self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
96 def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
98 ip = e.ip4 if is_ipv6 == 0 else e.ip6
99 self.vapi.bd_ip_mac_add_del(
100 is_add=is_add, entry={"bd_id": bd_id, "ip": ip, "mac": e.mac}
104 def mac_list(cls, b6_range):
105 return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
108 def ip4_host(cls, subnet, host, mac):
109 return Host(mac=mac, ip4="172.17.1%02u.%u" % (subnet, host))
112 def ip4_hosts(cls, subnet, start, mac_list):
114 cls.ip4_host(subnet, start + j, mac_list[j]) for j in range(len(mac_list))
118 def ip6_host(cls, subnet, host, mac):
119 return Host(mac=mac, ip6="fd01:%x::%x" % (subnet, host))
122 def ip6_hosts(cls, subnet, start, mac_list):
124 cls.ip6_host(subnet, start + j, mac_list[j]) for j in range(len(mac_list))
128 def bd_swifs(cls, b):
131 return [cls.pg_interfaces[j] for j in range(start, start + n)]
133 def bd_add_del(self, bd_id=1, is_add=1):
135 self.vapi.bridge_domain_add_del_v2(
136 bd_id=bd_id, is_add=is_add, flood=1, uu_flood=1, forward=1, learn=1
138 for swif in self.bd_swifs(bd_id):
139 swif_idx = swif.sw_if_index
140 self.vapi.sw_interface_set_l2_bridge(
141 rx_sw_if_index=swif_idx, bd_id=bd_id, enable=is_add
144 self.vapi.bridge_domain_add_del_v2(bd_id=bd_id, is_add=is_add)
147 def arp_req(cls, src_host, host):
148 return Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) / ARP(
149 op="who-has", hwsrc=src_host.bin_mac, pdst=host.ip4, psrc=src_host.ip4
153 def arp_reqs(cls, src_host, entries):
154 return [cls.arp_req(src_host, e) for e in entries]
157 def garp_req(cls, host):
158 return cls.arp_req(host, host)
161 def garp_reqs(cls, entries):
162 return [cls.garp_req(e) for e in entries]
164 def arp_resp_host(self, src_host, arp_resp):
165 ether = arp_resp[Ether]
166 self.assertEqual(ether.dst, src_host.mac)
169 self.assertEqual(arp.hwtype, 1)
170 self.assertEqual(arp.ptype, 0x800)
171 self.assertEqual(arp.hwlen, 6)
172 self.assertEqual(arp.plen, 4)
173 arp_opts = {"who-has": 1, "is-at": 2}
174 self.assertEqual(arp.op, arp_opts["is-at"])
175 self.assertEqual(arp.hwdst, src_host.mac)
176 self.assertEqual(arp.pdst, src_host.ip4)
177 return Host(mac=arp.hwsrc, ip4=arp.psrc)
179 def arp_resp_hosts(self, src_host, pkts):
180 return {self.arp_resp_host(src_host, p) for p in pkts}
184 o1 = int(ip / 16777216) % 256
185 o2 = int(ip / 65536) % 256
186 o3 = int(ip / 256) % 256
188 return "%s.%s.%s.%s" % (o1, o2, o3, o4)
190 def arp_event_host(self, e):
191 return Host(str(e.mac), ip4=str(e.ip))
193 def arp_event_hosts(self, evs):
194 return {self.arp_event_host(e) for e in evs}
196 def nd_event_host(self, e):
197 return Host(str(e.mac), ip6=str(e.ip))
199 def nd_event_hosts(self, evs):
200 return {self.nd_event_host(e) for e in evs}
203 def ns_req(cls, src_host, host):
204 nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
205 d = inet_ntop(AF_INET6, nsma)
207 Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac)
208 / IPv6(dst=d, src=src_host.ip6)
209 / ICMPv6ND_NS(tgt=host.ip6)
210 / ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac)
214 def ns_reqs_dst(cls, entries, dst_host):
215 return [cls.ns_req(e, dst_host) for e in entries]
218 def ns_reqs_src(cls, src_host, entries):
219 return [cls.ns_req(src_host, e) for e in entries]
221 def na_resp_host(self, src_host, rx):
222 self.assertEqual(rx[Ether].dst, src_host.mac)
223 self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(src_host.ip6))
225 self.assertTrue(rx.haslayer(ICMPv6ND_NA))
226 self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
229 return Host(mac=na.lladdr, ip6=na.tgt)
231 def na_resp_hosts(self, src_host, pkts):
232 return {self.na_resp_host(src_host, p) for p in pkts}
234 def set_bd_flags(self, bd_id, **args):
236 Enable/disable defined feature(s) of the bridge domain.
238 :param int bd_id: Bridge domain ID.
239 :param list args: List of feature/status pairs. Allowed features: \
240 learn, forward, flood, uu_flood and arp_term. Status False means \
241 disable, status True means enable the feature.
242 :raise: ValueError in case of unknown feature in the input.
246 feature_bitmap = 1 << 0
247 elif flag == "forward":
248 feature_bitmap = 1 << 1
249 elif flag == "flood":
250 feature_bitmap = 1 << 2
251 elif flag == "uu_flood":
252 feature_bitmap = 1 << 3
253 elif flag == "arp_term":
254 feature_bitmap = 1 << 4
256 raise ValueError("Unknown feature used: %s" % flag)
257 is_set = 1 if args[flag] else 0
258 self.vapi.bridge_flags(bd_id=bd_id, is_set=is_set, flags=feature_bitmap)
259 self.logger.info("Bridge domain ID %d updated" % bd_id)
261 def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
262 reqs = self.arp_reqs(src_host, req_hosts)
264 for swif in self.bd_swifs(bd_id):
265 swif.add_stream(reqs)
267 self.pg_enable_capture(self.pg_interfaces)
270 for swif in self.bd_swifs(bd_id):
271 resp_pkts = swif.get_capture(len(resp_hosts))
272 resps = self.arp_resp_hosts(src_host, resp_pkts)
273 self.assertEqual(len(resps ^ resp_hosts), 0)
275 def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
276 reqs = self.ns_reqs_src(src_host, req_hosts)
278 for swif in self.bd_swifs(bd_id):
279 swif.add_stream(reqs)
281 self.pg_enable_capture(self.pg_interfaces)
284 for swif in self.bd_swifs(bd_id):
285 resp_pkts = swif.get_capture(len(resp_hosts))
286 resps = self.na_resp_hosts(src_host, resp_pkts)
287 self.assertEqual(len(resps ^ resp_hosts), 0)
289 def test_l2bd_arp_term_01(self):
290 """L2BD arp term - add 5 hosts, verify arp responses"""
291 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
292 self.bd_add_del(1, is_add=1)
293 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
294 macs = self.mac_list(range(1, 5))
295 hosts = self.ip4_hosts(4, 1, macs)
296 self.add_del_arp_term_hosts(hosts, is_add=1)
298 self.verify_arp(src_host, hosts, hosts)
299 type(self).hosts = hosts
301 def test_l2bd_arp_term_02(self):
302 """L2BD arp term - delete 3 hosts, verify arp responses"""
303 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
304 macs = self.mac_list(range(1, 3))
305 deleted = self.ip4_hosts(4, 1, macs)
306 self.add_del_arp_term_hosts(deleted, is_add=0)
307 remaining = self.hosts - deleted
308 self.verify_arp(src_host, self.hosts, remaining)
309 type(self).hosts = remaining
310 self.bd_add_del(1, is_add=0)
312 def test_l2bd_arp_term_03(self):
313 """L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses"""
314 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
315 self.bd_add_del(1, is_add=1)
316 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
317 macs = self.mac_list(range(1, 3))
318 readded = self.ip4_hosts(4, 1, macs)
319 self.add_del_arp_term_hosts(readded, is_add=1)
320 self.verify_arp(src_host, self.hosts | readded, readded)
321 type(self).hosts = readded
323 def test_l2bd_arp_term_04(self):
324 """L2BD arp term - 2 IP4 addrs per host"""
325 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
326 macs = self.mac_list(range(1, 3))
327 sub5_hosts = self.ip4_hosts(5, 1, macs)
328 self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
329 hosts = self.hosts | sub5_hosts
330 self.verify_arp(src_host, hosts, hosts)
331 type(self).hosts = hosts
332 self.bd_add_del(1, is_add=0)
334 def test_l2bd_arp_term_05(self):
335 """L2BD arp term - create and update 10 IP4-mac pairs"""
336 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
337 self.bd_add_del(1, is_add=1)
338 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
339 macs1 = self.mac_list(range(10, 20))
340 hosts1 = self.ip4_hosts(5, 1, macs1)
341 self.add_del_arp_term_hosts(hosts1, is_add=1)
342 self.verify_arp(src_host, hosts1, hosts1)
343 macs2 = self.mac_list(range(20, 30))
344 hosts2 = self.ip4_hosts(5, 1, macs2)
345 self.add_del_arp_term_hosts(hosts2, is_add=1)
346 self.verify_arp(src_host, hosts1, hosts2)
347 self.bd_add_del(1, is_add=0)
349 def test_l2bd_arp_term_06(self):
350 """L2BD arp/ND term - hosts with both ip4/ip6"""
351 src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
352 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
353 self.bd_add_del(1, is_add=1)
354 # enable flood to make sure requests are not flooded
355 self.set_bd_flags(1, arp_term=True, flood=True, uu_flood=False, learn=False)
356 macs = self.mac_list(range(10, 20))
357 hosts6 = self.ip6_hosts(5, 1, macs)
358 hosts4 = self.ip4_hosts(5, 1, macs)
359 self.add_del_arp_term_hosts(hosts4, is_add=1)
360 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
361 self.verify_arp(src_host4, hosts4, hosts4)
362 self.verify_nd(src_host6, hosts6, hosts6)
363 self.bd_add_del(1, is_add=0)
365 def test_l2bd_arp_term_07(self):
366 """L2BD ND term - Add and Del hosts, verify ND replies"""
367 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
368 self.bd_add_del(1, is_add=1)
369 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
370 macs = self.mac_list(range(10, 20))
371 hosts6 = self.ip6_hosts(5, 1, macs)
372 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
373 self.verify_nd(src_host6, hosts6, hosts6)
374 del_macs = self.mac_list(range(10, 15))
375 deleted = self.ip6_hosts(5, 1, del_macs)
376 self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
377 self.verify_nd(src_host6, hosts6, hosts6 - deleted)
378 self.bd_add_del(1, is_add=0)
380 def test_l2bd_arp_term_08(self):
381 """L2BD ND term - Add and update IP+mac, verify ND replies"""
382 src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
383 self.bd_add_del(1, is_add=1)
384 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
385 macs1 = self.mac_list(range(10, 20))
386 hosts = self.ip6_hosts(5, 1, macs1)
387 self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
388 self.verify_nd(src_host, hosts, hosts)
389 macs2 = self.mac_list(range(20, 30))
390 updated = self.ip6_hosts(5, 1, macs2)
391 self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
392 self.verify_nd(src_host, hosts, updated)
393 self.bd_add_del(1, is_add=0)
395 def test_l2bd_arp_term_09(self):
396 """L2BD arp term - send garps, verify arp event reports"""
397 self.vapi.want_l2_arp_term_events(enable=1)
398 self.bd_add_del(1, is_add=1)
399 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
400 macs = self.mac_list(range(90, 95))
401 hosts = self.ip4_hosts(5, 1, macs)
403 garps = self.garp_reqs(hosts)
404 self.bd_swifs(1)[0].add_stream(garps)
406 self.pg_enable_capture(self.pg_interfaces)
409 self.vapi.wait_for_event(1, "l2_arp_term_event") for i in range(len(hosts))
411 ev_hosts = self.arp_event_hosts(evs)
412 self.assertEqual(len(ev_hosts ^ hosts), 0)
414 def test_l2bd_arp_term_10(self):
415 """L2BD arp term - send duplicate garps, verify suppression"""
416 macs = self.mac_list(range(70, 71))
417 hosts = self.ip4_hosts(6, 1, macs)
419 """ send the packet 5 times expect one event
421 garps = self.garp_reqs(hosts) * 5
422 self.bd_swifs(1)[0].add_stream(garps)
424 self.pg_enable_capture(self.pg_interfaces)
427 self.vapi.wait_for_event(1, "l2_arp_term_event") for i in range(len(hosts))
429 ev_hosts = self.arp_event_hosts(evs)
430 self.assertEqual(len(ev_hosts ^ hosts), 0)
432 def test_l2bd_arp_term_11(self):
433 """L2BD arp term - disable ip4 arp events,send garps, verify no events"""
434 self.vapi.want_l2_arp_term_events(enable=0)
435 macs = self.mac_list(range(90, 95))
436 hosts = self.ip4_hosts(5, 1, macs)
438 garps = self.garp_reqs(hosts)
439 self.bd_swifs(1)[0].add_stream(garps)
441 self.pg_enable_capture(self.pg_interfaces)
444 self.assertEqual(len(self.vapi.collect_events()), 0)
445 self.bd_add_del(1, is_add=0)
447 def test_l2bd_arp_term_12(self):
448 """L2BD ND term - send NS packets verify reports"""
449 self.vapi.want_l2_arp_term_events(enable=1)
450 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
451 self.bd_add_del(1, is_add=1)
452 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
453 macs = self.mac_list(range(10, 15))
454 hosts = self.ip6_hosts(5, 1, macs)
455 reqs = self.ns_reqs_dst(hosts, dst_host)
456 self.bd_swifs(1)[0].add_stream(reqs)
458 self.pg_enable_capture(self.pg_interfaces)
461 self.vapi.wait_for_event(2, "l2_arp_term_event") for i in range(len(hosts))
463 ev_hosts = self.nd_event_hosts(evs)
464 self.assertEqual(len(ev_hosts ^ hosts), 0)
466 def test_l2bd_arp_term_13(self):
467 """L2BD ND term - send duplicate ns, verify suppression"""
468 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
469 macs = self.mac_list(range(16, 17))
470 hosts = self.ip6_hosts(5, 1, macs)
471 reqs = self.ns_reqs_dst(hosts, dst_host) * 5
472 self.bd_swifs(1)[0].add_stream(reqs)
474 self.pg_enable_capture(self.pg_interfaces)
477 self.vapi.wait_for_event(2, "l2_arp_term_event") for i in range(len(hosts))
479 ev_hosts = self.nd_event_hosts(evs)
480 self.assertEqual(len(ev_hosts ^ hosts), 0)
482 def test_l2bd_arp_term_14(self):
483 """L2BD ND term - disable ip4 arp events,send ns, verify no events"""
484 self.vapi.want_l2_arp_term_events(enable=0)
485 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
486 macs = self.mac_list(range(10, 15))
487 hosts = self.ip6_hosts(5, 1, macs)
488 reqs = self.ns_reqs_dst(hosts, dst_host)
489 self.bd_swifs(1)[0].add_stream(reqs)
491 self.pg_enable_capture(self.pg_interfaces)
494 self.assertEqual(len(self.vapi.collect_events()), 0)
495 self.bd_add_del(1, is_add=0)
498 if __name__ == "__main__":
499 unittest.main(testRunner=VppTestRunner)