2 """ L2BD ARP term Test """
8 from socket import AF_INET, AF_INET6, inet_pton, inet_ntop
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP
13 from scapy.utils6 import (
21 from scapy.layers.inet6 import (
29 ICMPv6MRD_Solicitation,
32 ICMPv6NDOptPrefixInfo,
39 from framework import VppTestCase, VppTestRunner
40 from util import Host, ppp
43 class TestL2bdArpTerm(VppTestCase):
44 """L2BD arp termination Test Case"""
49 Perform standard class setup (defined by class method setUpClass in
50 class VppTestCase) before running the test case, set test case related
51 variables and configure VPP.
53 super(TestL2bdArpTerm, cls).setUpClass()
56 # Create pg interfaces
58 cls.ifs_per_bd = ifs_per_bd = 3
59 n_ifs = n_bd * ifs_per_bd
60 cls.create_pg_interfaces(range(n_ifs))
62 # Set up all interfaces
63 for i in cls.pg_interfaces:
69 super(TestL2bdArpTerm, cls).tearDownClass()
73 def tearDownClass(cls):
74 super(TestL2bdArpTerm, cls).tearDownClass()
78 Clear trace and packet infos before running each test.
80 self.reset_packet_infos()
81 super(TestL2bdArpTerm, self).setUp()
85 Show various debug prints after each test.
87 super(TestL2bdArpTerm, self).tearDown()
89 def show_commands_at_teardown(self):
90 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
91 # many tests delete bridge-domain 1 as the last task. don't output
92 # the details of a non-existent bridge-domain.
93 if self.vapi.l2_fib_table_dump(bd_id=1):
94 self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
96 def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
98 ip = e.ip4 if is_ipv6 == 0 else e.ip6
99 self.vapi.bd_ip_mac_add_del(
100 is_add=is_add, entry={"bd_id": bd_id, "ip": ip, "mac": e.mac}
104 def mac_list(cls, b6_range):
105 return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
108 def ip4_host(cls, subnet, host, mac):
109 return Host(mac=mac, ip4="172.17.1%02u.%u" % (subnet, host))
112 def ip4_hosts(cls, subnet, start, mac_list):
114 cls.ip4_host(subnet, start + j, mac_list[j]) for j in range(len(mac_list))
118 def ip6_host(cls, subnet, host, mac):
119 return Host(mac=mac, ip6="fd01:%x::%x" % (subnet, host))
122 def ip6_hosts(cls, subnet, start, mac_list):
124 cls.ip6_host(subnet, start + j, mac_list[j]) for j in range(len(mac_list))
128 def bd_swifs(cls, b):
131 return [cls.pg_interfaces[j] for j in range(start, start + n)]
133 def bd_add_del(self, bd_id=1, is_add=1):
135 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
136 for swif in self.bd_swifs(bd_id):
137 swif_idx = swif.sw_if_index
138 self.vapi.sw_interface_set_l2_bridge(
139 rx_sw_if_index=swif_idx, bd_id=bd_id, enable=is_add
142 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
145 def arp_req(cls, src_host, host):
146 return Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) / ARP(
147 op="who-has", hwsrc=src_host.bin_mac, pdst=host.ip4, psrc=src_host.ip4
151 def arp_reqs(cls, src_host, entries):
152 return [cls.arp_req(src_host, e) for e in entries]
155 def garp_req(cls, host):
156 return cls.arp_req(host, host)
159 def garp_reqs(cls, entries):
160 return [cls.garp_req(e) for e in entries]
162 def arp_resp_host(self, src_host, arp_resp):
163 ether = arp_resp[Ether]
164 self.assertEqual(ether.dst, src_host.mac)
167 self.assertEqual(arp.hwtype, 1)
168 self.assertEqual(arp.ptype, 0x800)
169 self.assertEqual(arp.hwlen, 6)
170 self.assertEqual(arp.plen, 4)
171 arp_opts = {"who-has": 1, "is-at": 2}
172 self.assertEqual(arp.op, arp_opts["is-at"])
173 self.assertEqual(arp.hwdst, src_host.mac)
174 self.assertEqual(arp.pdst, src_host.ip4)
175 return Host(mac=arp.hwsrc, ip4=arp.psrc)
177 def arp_resp_hosts(self, src_host, pkts):
178 return {self.arp_resp_host(src_host, p) for p in pkts}
182 o1 = int(ip / 16777216) % 256
183 o2 = int(ip / 65536) % 256
184 o3 = int(ip / 256) % 256
186 return "%s.%s.%s.%s" % (o1, o2, o3, o4)
188 def arp_event_host(self, e):
189 return Host(str(e.mac), ip4=str(e.ip))
191 def arp_event_hosts(self, evs):
192 return {self.arp_event_host(e) for e in evs}
194 def nd_event_host(self, e):
195 return Host(str(e.mac), ip6=str(e.ip))
197 def nd_event_hosts(self, evs):
198 return {self.nd_event_host(e) for e in evs}
201 def ns_req(cls, src_host, host):
202 nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
203 d = inet_ntop(AF_INET6, nsma)
205 Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac)
206 / IPv6(dst=d, src=src_host.ip6)
207 / ICMPv6ND_NS(tgt=host.ip6)
208 / ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac)
212 def ns_reqs_dst(cls, entries, dst_host):
213 return [cls.ns_req(e, dst_host) for e in entries]
216 def ns_reqs_src(cls, src_host, entries):
217 return [cls.ns_req(src_host, e) for e in entries]
219 def na_resp_host(self, src_host, rx):
220 self.assertEqual(rx[Ether].dst, src_host.mac)
221 self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(src_host.ip6))
223 self.assertTrue(rx.haslayer(ICMPv6ND_NA))
224 self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
227 return Host(mac=na.lladdr, ip6=na.tgt)
229 def na_resp_hosts(self, src_host, pkts):
230 return {self.na_resp_host(src_host, p) for p in pkts}
232 def set_bd_flags(self, bd_id, **args):
234 Enable/disable defined feature(s) of the bridge domain.
236 :param int bd_id: Bridge domain ID.
237 :param list args: List of feature/status pairs. Allowed features: \
238 learn, forward, flood, uu_flood and arp_term. Status False means \
239 disable, status True means enable the feature.
240 :raise: ValueError in case of unknown feature in the input.
244 feature_bitmap = 1 << 0
245 elif flag == "forward":
246 feature_bitmap = 1 << 1
247 elif flag == "flood":
248 feature_bitmap = 1 << 2
249 elif flag == "uu_flood":
250 feature_bitmap = 1 << 3
251 elif flag == "arp_term":
252 feature_bitmap = 1 << 4
254 raise ValueError("Unknown feature used: %s" % flag)
255 is_set = 1 if args[flag] else 0
256 self.vapi.bridge_flags(bd_id=bd_id, is_set=is_set, flags=feature_bitmap)
257 self.logger.info("Bridge domain ID %d updated" % bd_id)
259 def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
260 reqs = self.arp_reqs(src_host, req_hosts)
262 for swif in self.bd_swifs(bd_id):
263 swif.add_stream(reqs)
265 self.pg_enable_capture(self.pg_interfaces)
268 for swif in self.bd_swifs(bd_id):
269 resp_pkts = swif.get_capture(len(resp_hosts))
270 resps = self.arp_resp_hosts(src_host, resp_pkts)
271 self.assertEqual(len(resps ^ resp_hosts), 0)
273 def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
274 reqs = self.ns_reqs_src(src_host, req_hosts)
276 for swif in self.bd_swifs(bd_id):
277 swif.add_stream(reqs)
279 self.pg_enable_capture(self.pg_interfaces)
282 for swif in self.bd_swifs(bd_id):
283 resp_pkts = swif.get_capture(len(resp_hosts))
284 resps = self.na_resp_hosts(src_host, resp_pkts)
285 self.assertEqual(len(resps ^ resp_hosts), 0)
287 def test_l2bd_arp_term_01(self):
288 """L2BD arp term - add 5 hosts, verify arp responses"""
289 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
290 self.bd_add_del(1, is_add=1)
291 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
292 macs = self.mac_list(range(1, 5))
293 hosts = self.ip4_hosts(4, 1, macs)
294 self.add_del_arp_term_hosts(hosts, is_add=1)
296 self.verify_arp(src_host, hosts, hosts)
297 type(self).hosts = hosts
299 def test_l2bd_arp_term_02(self):
300 """L2BD arp term - delete 3 hosts, verify arp responses"""
301 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
302 macs = self.mac_list(range(1, 3))
303 deleted = self.ip4_hosts(4, 1, macs)
304 self.add_del_arp_term_hosts(deleted, is_add=0)
305 remaining = self.hosts - deleted
306 self.verify_arp(src_host, self.hosts, remaining)
307 type(self).hosts = remaining
308 self.bd_add_del(1, is_add=0)
310 def test_l2bd_arp_term_03(self):
311 """L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses"""
312 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
313 self.bd_add_del(1, is_add=1)
314 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
315 macs = self.mac_list(range(1, 3))
316 readded = self.ip4_hosts(4, 1, macs)
317 self.add_del_arp_term_hosts(readded, is_add=1)
318 self.verify_arp(src_host, self.hosts | readded, readded)
319 type(self).hosts = readded
321 def test_l2bd_arp_term_04(self):
322 """L2BD arp term - 2 IP4 addrs per host"""
323 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
324 macs = self.mac_list(range(1, 3))
325 sub5_hosts = self.ip4_hosts(5, 1, macs)
326 self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
327 hosts = self.hosts | sub5_hosts
328 self.verify_arp(src_host, hosts, hosts)
329 type(self).hosts = hosts
330 self.bd_add_del(1, is_add=0)
332 def test_l2bd_arp_term_05(self):
333 """L2BD arp term - create and update 10 IP4-mac pairs"""
334 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
335 self.bd_add_del(1, is_add=1)
336 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
337 macs1 = self.mac_list(range(10, 20))
338 hosts1 = self.ip4_hosts(5, 1, macs1)
339 self.add_del_arp_term_hosts(hosts1, is_add=1)
340 self.verify_arp(src_host, hosts1, hosts1)
341 macs2 = self.mac_list(range(20, 30))
342 hosts2 = self.ip4_hosts(5, 1, macs2)
343 self.add_del_arp_term_hosts(hosts2, is_add=1)
344 self.verify_arp(src_host, hosts1, hosts2)
345 self.bd_add_del(1, is_add=0)
347 def test_l2bd_arp_term_06(self):
348 """L2BD arp/ND term - hosts with both ip4/ip6"""
349 src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
350 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
351 self.bd_add_del(1, is_add=1)
352 # enable flood to make sure requests are not flooded
353 self.set_bd_flags(1, arp_term=True, flood=True, uu_flood=False, learn=False)
354 macs = self.mac_list(range(10, 20))
355 hosts6 = self.ip6_hosts(5, 1, macs)
356 hosts4 = self.ip4_hosts(5, 1, macs)
357 self.add_del_arp_term_hosts(hosts4, is_add=1)
358 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
359 self.verify_arp(src_host4, hosts4, hosts4)
360 self.verify_nd(src_host6, hosts6, hosts6)
361 self.bd_add_del(1, is_add=0)
363 def test_l2bd_arp_term_07(self):
364 """L2BD ND term - Add and Del hosts, verify ND replies"""
365 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
366 self.bd_add_del(1, is_add=1)
367 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
368 macs = self.mac_list(range(10, 20))
369 hosts6 = self.ip6_hosts(5, 1, macs)
370 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
371 self.verify_nd(src_host6, hosts6, hosts6)
372 del_macs = self.mac_list(range(10, 15))
373 deleted = self.ip6_hosts(5, 1, del_macs)
374 self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
375 self.verify_nd(src_host6, hosts6, hosts6 - deleted)
376 self.bd_add_del(1, is_add=0)
378 def test_l2bd_arp_term_08(self):
379 """L2BD ND term - Add and update IP+mac, verify ND replies"""
380 src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
381 self.bd_add_del(1, is_add=1)
382 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
383 macs1 = self.mac_list(range(10, 20))
384 hosts = self.ip6_hosts(5, 1, macs1)
385 self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
386 self.verify_nd(src_host, hosts, hosts)
387 macs2 = self.mac_list(range(20, 30))
388 updated = self.ip6_hosts(5, 1, macs2)
389 self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
390 self.verify_nd(src_host, hosts, updated)
391 self.bd_add_del(1, is_add=0)
393 def test_l2bd_arp_term_09(self):
394 """L2BD arp term - send garps, verify arp event reports"""
395 self.vapi.want_l2_arp_term_events(enable=1)
396 self.bd_add_del(1, is_add=1)
397 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
398 macs = self.mac_list(range(90, 95))
399 hosts = self.ip4_hosts(5, 1, macs)
401 garps = self.garp_reqs(hosts)
402 self.bd_swifs(1)[0].add_stream(garps)
404 self.pg_enable_capture(self.pg_interfaces)
407 self.vapi.wait_for_event(1, "l2_arp_term_event") for i in range(len(hosts))
409 ev_hosts = self.arp_event_hosts(evs)
410 self.assertEqual(len(ev_hosts ^ hosts), 0)
412 def test_l2bd_arp_term_10(self):
413 """L2BD arp term - send duplicate garps, verify suppression"""
414 macs = self.mac_list(range(70, 71))
415 hosts = self.ip4_hosts(6, 1, macs)
417 """ send the packet 5 times expect one event
419 garps = self.garp_reqs(hosts) * 5
420 self.bd_swifs(1)[0].add_stream(garps)
422 self.pg_enable_capture(self.pg_interfaces)
425 self.vapi.wait_for_event(1, "l2_arp_term_event") for i in range(len(hosts))
427 ev_hosts = self.arp_event_hosts(evs)
428 self.assertEqual(len(ev_hosts ^ hosts), 0)
430 def test_l2bd_arp_term_11(self):
431 """L2BD arp term - disable ip4 arp events,send garps, verify no events"""
432 self.vapi.want_l2_arp_term_events(enable=0)
433 macs = self.mac_list(range(90, 95))
434 hosts = self.ip4_hosts(5, 1, macs)
436 garps = self.garp_reqs(hosts)
437 self.bd_swifs(1)[0].add_stream(garps)
439 self.pg_enable_capture(self.pg_interfaces)
442 self.assertEqual(len(self.vapi.collect_events()), 0)
443 self.bd_add_del(1, is_add=0)
445 def test_l2bd_arp_term_12(self):
446 """L2BD ND term - send NS packets verify reports"""
447 self.vapi.want_l2_arp_term_events(enable=1)
448 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
449 self.bd_add_del(1, is_add=1)
450 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
451 macs = self.mac_list(range(10, 15))
452 hosts = self.ip6_hosts(5, 1, macs)
453 reqs = self.ns_reqs_dst(hosts, dst_host)
454 self.bd_swifs(1)[0].add_stream(reqs)
456 self.pg_enable_capture(self.pg_interfaces)
459 self.vapi.wait_for_event(2, "l2_arp_term_event") for i in range(len(hosts))
461 ev_hosts = self.nd_event_hosts(evs)
462 self.assertEqual(len(ev_hosts ^ hosts), 0)
464 def test_l2bd_arp_term_13(self):
465 """L2BD ND term - send duplicate ns, verify suppression"""
466 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
467 macs = self.mac_list(range(16, 17))
468 hosts = self.ip6_hosts(5, 1, macs)
469 reqs = self.ns_reqs_dst(hosts, dst_host) * 5
470 self.bd_swifs(1)[0].add_stream(reqs)
472 self.pg_enable_capture(self.pg_interfaces)
475 self.vapi.wait_for_event(2, "l2_arp_term_event") for i in range(len(hosts))
477 ev_hosts = self.nd_event_hosts(evs)
478 self.assertEqual(len(ev_hosts ^ hosts), 0)
480 def test_l2bd_arp_term_14(self):
481 """L2BD ND term - disable ip4 arp events,send ns, verify no events"""
482 self.vapi.want_l2_arp_term_events(enable=0)
483 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
484 macs = self.mac_list(range(10, 15))
485 hosts = self.ip6_hosts(5, 1, macs)
486 reqs = self.ns_reqs_dst(hosts, dst_host)
487 self.bd_swifs(1)[0].add_stream(reqs)
489 self.pg_enable_capture(self.pg_interfaces)
492 self.assertEqual(len(self.vapi.collect_events()), 0)
493 self.bd_add_del(1, is_add=0)
496 if __name__ == "__main__":
497 unittest.main(testRunner=VppTestRunner)