2 """ L2BD ARP term Test """
6 from socket import AF_INET6, inet_pton, inet_ntop
8 from scapy.layers.l2 import Ether, ARP
9 from scapy.utils6 import (
13 from scapy.layers.inet6 import (
22 from framework import VppTestCase
23 from asfframework import VppTestRunner
27 class TestL2bdArpTerm(VppTestCase):
28 """L2BD arp termination Test Case"""
33 Perform standard class setup (defined by class method setUpClass in
34 class VppTestCase) before running the test case, set test case related
35 variables and configure VPP.
37 super(TestL2bdArpTerm, cls).setUpClass()
40 # Create pg interfaces
42 cls.ifs_per_bd = ifs_per_bd = 3
43 n_ifs = n_bd * ifs_per_bd
44 cls.create_pg_interfaces(range(n_ifs))
46 # Set up all interfaces
47 for i in cls.pg_interfaces:
53 super(TestL2bdArpTerm, cls).tearDownClass()
57 def tearDownClass(cls):
58 super(TestL2bdArpTerm, cls).tearDownClass()
62 Clear trace and packet infos before running each test.
64 self.reset_packet_infos()
65 super(TestL2bdArpTerm, self).setUp()
69 Show various debug prints after each test.
71 super(TestL2bdArpTerm, self).tearDown()
73 def show_commands_at_teardown(self):
74 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
75 # many tests delete bridge-domain 1 as the last task. don't output
76 # the details of a non-existent bridge-domain.
77 if self.vapi.l2_fib_table_dump(bd_id=1):
78 self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
80 def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
82 ip = e.ip4 if is_ipv6 == 0 else e.ip6
83 self.vapi.bd_ip_mac_add_del(
84 is_add=is_add, entry={"bd_id": bd_id, "ip": ip, "mac": e.mac}
88 def mac_list(cls, b6_range):
89 return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
92 def ip4_host(cls, subnet, host, mac):
93 return Host(mac=mac, ip4="172.17.1%02u.%u" % (subnet, host))
96 def ip4_hosts(cls, subnet, start, mac_list):
98 cls.ip4_host(subnet, start + j, mac_list[j]) for j in range(len(mac_list))
102 def ip6_host(cls, subnet, host, mac):
103 return Host(mac=mac, ip6="fd01:%x::%x" % (subnet, host))
106 def ip6_hosts(cls, subnet, start, mac_list):
108 cls.ip6_host(subnet, start + j, mac_list[j]) for j in range(len(mac_list))
112 def bd_swifs(cls, b):
115 return [cls.pg_interfaces[j] for j in range(start, start + n)]
117 def bd_add_del(self, bd_id=1, is_add=1):
119 self.vapi.bridge_domain_add_del_v2(
120 bd_id=bd_id, is_add=is_add, flood=1, uu_flood=1, forward=1, learn=1
122 for swif in self.bd_swifs(bd_id):
123 swif_idx = swif.sw_if_index
124 self.vapi.sw_interface_set_l2_bridge(
125 rx_sw_if_index=swif_idx, bd_id=bd_id, enable=is_add
128 self.vapi.bridge_domain_add_del_v2(bd_id=bd_id, is_add=is_add)
131 def arp_req(cls, src_host, host):
132 return Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) / ARP(
133 op="who-has", hwsrc=src_host.bin_mac, pdst=host.ip4, psrc=src_host.ip4
137 def arp_reqs(cls, src_host, entries):
138 return [cls.arp_req(src_host, e) for e in entries]
141 def garp_req(cls, host):
142 return cls.arp_req(host, host)
145 def garp_reqs(cls, entries):
146 return [cls.garp_req(e) for e in entries]
148 def arp_resp_host(self, src_host, arp_resp):
149 ether = arp_resp[Ether]
150 self.assertEqual(ether.dst, src_host.mac)
153 self.assertEqual(arp.hwtype, 1)
154 self.assertEqual(arp.ptype, 0x800)
155 self.assertEqual(arp.hwlen, 6)
156 self.assertEqual(arp.plen, 4)
157 arp_opts = {"who-has": 1, "is-at": 2}
158 self.assertEqual(arp.op, arp_opts["is-at"])
159 self.assertEqual(arp.hwdst, src_host.mac)
160 self.assertEqual(arp.pdst, src_host.ip4)
161 return Host(mac=arp.hwsrc, ip4=arp.psrc)
163 def arp_resp_hosts(self, src_host, pkts):
164 return {self.arp_resp_host(src_host, p) for p in pkts}
168 o1 = int(ip / 16777216) % 256
169 o2 = int(ip / 65536) % 256
170 o3 = int(ip / 256) % 256
172 return "%s.%s.%s.%s" % (o1, o2, o3, o4)
174 def arp_event_host(self, e):
175 return Host(str(e.mac), ip4=str(e.ip))
177 def arp_event_hosts(self, evs):
178 return {self.arp_event_host(e) for e in evs}
180 def nd_event_host(self, e):
181 return Host(str(e.mac), ip6=str(e.ip))
183 def nd_event_hosts(self, evs):
184 return {self.nd_event_host(e) for e in evs}
187 def ns_req(cls, src_host, host):
188 nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
189 d = inet_ntop(AF_INET6, nsma)
191 Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac)
192 / IPv6(dst=d, src=src_host.ip6)
193 / ICMPv6ND_NS(tgt=host.ip6)
194 / ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac)
198 def ns_reqs_dst(cls, entries, dst_host):
199 return [cls.ns_req(e, dst_host) for e in entries]
202 def ns_reqs_src(cls, src_host, entries):
203 return [cls.ns_req(src_host, e) for e in entries]
205 def na_resp_host(self, src_host, rx):
206 self.assertEqual(rx[Ether].dst, src_host.mac)
207 self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(src_host.ip6))
209 self.assertTrue(rx.haslayer(ICMPv6ND_NA))
210 self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
213 return Host(mac=na.lladdr, ip6=na.tgt)
215 def na_resp_hosts(self, src_host, pkts):
216 return {self.na_resp_host(src_host, p) for p in pkts}
218 def set_bd_flags(self, bd_id, **args):
220 Enable/disable defined feature(s) of the bridge domain.
222 :param int bd_id: Bridge domain ID.
223 :param list args: List of feature/status pairs. Allowed features: \
224 learn, forward, flood, uu_flood and arp_term. Status False means \
225 disable, status True means enable the feature.
226 :raise: ValueError in case of unknown feature in the input.
230 feature_bitmap = 1 << 0
231 elif flag == "forward":
232 feature_bitmap = 1 << 1
233 elif flag == "flood":
234 feature_bitmap = 1 << 2
235 elif flag == "uu_flood":
236 feature_bitmap = 1 << 3
237 elif flag == "arp_term":
238 feature_bitmap = 1 << 4
240 raise ValueError("Unknown feature used: %s" % flag)
241 is_set = 1 if args[flag] else 0
242 self.vapi.bridge_flags(bd_id=bd_id, is_set=is_set, flags=feature_bitmap)
243 self.logger.info("Bridge domain ID %d updated" % bd_id)
245 def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
246 reqs = self.arp_reqs(src_host, req_hosts)
248 for swif in self.bd_swifs(bd_id):
249 swif.add_stream(reqs)
251 self.pg_enable_capture(self.pg_interfaces)
254 for swif in self.bd_swifs(bd_id):
255 resp_pkts = swif.get_capture(len(resp_hosts))
256 resps = self.arp_resp_hosts(src_host, resp_pkts)
257 self.assertEqual(len(resps ^ resp_hosts), 0)
259 def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
260 reqs = self.ns_reqs_src(src_host, req_hosts)
262 for swif in self.bd_swifs(bd_id):
263 swif.add_stream(reqs)
265 self.pg_enable_capture(self.pg_interfaces)
268 for swif in self.bd_swifs(bd_id):
269 resp_pkts = swif.get_capture(len(resp_hosts))
270 resps = self.na_resp_hosts(src_host, resp_pkts)
271 self.assertEqual(len(resps ^ resp_hosts), 0)
273 def test_l2bd_arp_term_01(self):
274 """L2BD arp term - add 5 hosts, verify arp responses"""
275 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
276 self.bd_add_del(1, is_add=1)
277 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
278 macs = self.mac_list(range(1, 5))
279 hosts = self.ip4_hosts(4, 1, macs)
280 self.add_del_arp_term_hosts(hosts, is_add=1)
282 self.verify_arp(src_host, hosts, hosts)
283 type(self).hosts = hosts
285 def test_l2bd_arp_term_02(self):
286 """L2BD arp term - delete 3 hosts, verify arp responses"""
287 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
288 macs = self.mac_list(range(1, 3))
289 deleted = self.ip4_hosts(4, 1, macs)
290 self.add_del_arp_term_hosts(deleted, is_add=0)
291 remaining = self.hosts - deleted
292 self.verify_arp(src_host, self.hosts, remaining)
293 type(self).hosts = remaining
294 self.bd_add_del(1, is_add=0)
296 def test_l2bd_arp_term_03(self):
297 """L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses"""
298 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
299 self.bd_add_del(1, is_add=1)
300 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
301 macs = self.mac_list(range(1, 3))
302 readded = self.ip4_hosts(4, 1, macs)
303 self.add_del_arp_term_hosts(readded, is_add=1)
304 self.verify_arp(src_host, self.hosts | readded, readded)
305 type(self).hosts = readded
307 def test_l2bd_arp_term_04(self):
308 """L2BD arp term - 2 IP4 addrs per host"""
309 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
310 macs = self.mac_list(range(1, 3))
311 sub5_hosts = self.ip4_hosts(5, 1, macs)
312 self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
313 hosts = self.hosts | sub5_hosts
314 self.verify_arp(src_host, hosts, hosts)
315 type(self).hosts = hosts
316 self.bd_add_del(1, is_add=0)
318 def test_l2bd_arp_term_05(self):
319 """L2BD arp term - create and update 10 IP4-mac pairs"""
320 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
321 self.bd_add_del(1, is_add=1)
322 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
323 macs1 = self.mac_list(range(10, 20))
324 hosts1 = self.ip4_hosts(5, 1, macs1)
325 self.add_del_arp_term_hosts(hosts1, is_add=1)
326 self.verify_arp(src_host, hosts1, hosts1)
327 macs2 = self.mac_list(range(20, 30))
328 hosts2 = self.ip4_hosts(5, 1, macs2)
329 self.add_del_arp_term_hosts(hosts2, is_add=1)
330 self.verify_arp(src_host, hosts1, hosts2)
331 self.bd_add_del(1, is_add=0)
333 def test_l2bd_arp_term_06(self):
334 """L2BD arp/ND term - hosts with both ip4/ip6"""
335 src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
336 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
337 self.bd_add_del(1, is_add=1)
338 # enable flood to make sure requests are not flooded
339 self.set_bd_flags(1, arp_term=True, flood=True, uu_flood=False, learn=False)
340 macs = self.mac_list(range(10, 20))
341 hosts6 = self.ip6_hosts(5, 1, macs)
342 hosts4 = self.ip4_hosts(5, 1, macs)
343 self.add_del_arp_term_hosts(hosts4, is_add=1)
344 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
345 self.verify_arp(src_host4, hosts4, hosts4)
346 self.verify_nd(src_host6, hosts6, hosts6)
347 self.bd_add_del(1, is_add=0)
349 def test_l2bd_arp_term_07(self):
350 """L2BD ND term - Add and Del hosts, verify ND replies"""
351 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
352 self.bd_add_del(1, is_add=1)
353 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
354 macs = self.mac_list(range(10, 20))
355 hosts6 = self.ip6_hosts(5, 1, macs)
356 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
357 self.verify_nd(src_host6, hosts6, hosts6)
358 del_macs = self.mac_list(range(10, 15))
359 deleted = self.ip6_hosts(5, 1, del_macs)
360 self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
361 self.verify_nd(src_host6, hosts6, hosts6 - deleted)
362 self.bd_add_del(1, is_add=0)
364 def test_l2bd_arp_term_08(self):
365 """L2BD ND term - Add and update IP+mac, verify ND replies"""
366 src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
367 self.bd_add_del(1, is_add=1)
368 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
369 macs1 = self.mac_list(range(10, 20))
370 hosts = self.ip6_hosts(5, 1, macs1)
371 self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
372 self.verify_nd(src_host, hosts, hosts)
373 macs2 = self.mac_list(range(20, 30))
374 updated = self.ip6_hosts(5, 1, macs2)
375 self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
376 self.verify_nd(src_host, hosts, updated)
377 self.bd_add_del(1, is_add=0)
379 def test_l2bd_arp_term_09(self):
380 """L2BD arp term - send garps, verify arp event reports"""
381 self.vapi.want_l2_arp_term_events(enable=1)
382 self.bd_add_del(1, is_add=1)
383 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
384 macs = self.mac_list(range(90, 95))
385 hosts = self.ip4_hosts(5, 1, macs)
387 garps = self.garp_reqs(hosts)
388 self.bd_swifs(1)[0].add_stream(garps)
390 self.pg_enable_capture(self.pg_interfaces)
393 self.vapi.wait_for_event(1, "l2_arp_term_event") for i in range(len(hosts))
395 ev_hosts = self.arp_event_hosts(evs)
396 self.assertEqual(len(ev_hosts ^ hosts), 0)
398 def test_l2bd_arp_term_10(self):
399 """L2BD arp term - send duplicate garps, verify suppression"""
400 macs = self.mac_list(range(70, 71))
401 hosts = self.ip4_hosts(6, 1, macs)
403 """ send the packet 5 times expect one event
405 garps = self.garp_reqs(hosts) * 5
406 self.bd_swifs(1)[0].add_stream(garps)
408 self.pg_enable_capture(self.pg_interfaces)
411 self.vapi.wait_for_event(1, "l2_arp_term_event") for i in range(len(hosts))
413 ev_hosts = self.arp_event_hosts(evs)
414 self.assertEqual(len(ev_hosts ^ hosts), 0)
416 def test_l2bd_arp_term_11(self):
417 """L2BD arp term - disable ip4 arp events,send garps, verify no events"""
418 self.vapi.want_l2_arp_term_events(enable=0)
419 macs = self.mac_list(range(90, 95))
420 hosts = self.ip4_hosts(5, 1, macs)
422 garps = self.garp_reqs(hosts)
423 self.bd_swifs(1)[0].add_stream(garps)
425 self.pg_enable_capture(self.pg_interfaces)
428 self.assertEqual(len(self.vapi.collect_events()), 0)
429 self.bd_add_del(1, is_add=0)
431 def test_l2bd_arp_term_12(self):
432 """L2BD ND term - send NS packets verify reports"""
433 self.vapi.want_l2_arp_term_events(enable=1)
434 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
435 self.bd_add_del(1, is_add=1)
436 self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
437 macs = self.mac_list(range(10, 15))
438 hosts = self.ip6_hosts(5, 1, macs)
439 reqs = self.ns_reqs_dst(hosts, dst_host)
440 self.bd_swifs(1)[0].add_stream(reqs)
442 self.pg_enable_capture(self.pg_interfaces)
445 self.vapi.wait_for_event(2, "l2_arp_term_event") for i in range(len(hosts))
447 ev_hosts = self.nd_event_hosts(evs)
448 self.assertEqual(len(ev_hosts ^ hosts), 0)
450 def test_l2bd_arp_term_13(self):
451 """L2BD ND term - send duplicate ns, verify suppression"""
452 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
453 macs = self.mac_list(range(16, 17))
454 hosts = self.ip6_hosts(5, 1, macs)
455 reqs = self.ns_reqs_dst(hosts, dst_host) * 5
456 self.bd_swifs(1)[0].add_stream(reqs)
458 self.pg_enable_capture(self.pg_interfaces)
461 self.vapi.wait_for_event(2, "l2_arp_term_event") for i in range(len(hosts))
463 ev_hosts = self.nd_event_hosts(evs)
464 self.assertEqual(len(ev_hosts ^ hosts), 0)
466 def test_l2bd_arp_term_14(self):
467 """L2BD ND term - disable ip4 arp events,send ns, verify no events"""
468 self.vapi.want_l2_arp_term_events(enable=0)
469 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
470 macs = self.mac_list(range(10, 15))
471 hosts = self.ip6_hosts(5, 1, macs)
472 reqs = self.ns_reqs_dst(hosts, dst_host)
473 self.bd_swifs(1)[0].add_stream(reqs)
475 self.pg_enable_capture(self.pg_interfaces)
478 self.assertEqual(len(self.vapi.collect_events()), 0)
479 self.bd_add_del(1, is_add=0)
482 if __name__ == "__main__":
483 unittest.main(testRunner=VppTestRunner)