2 """ L2BD ARP term Test """
8 from socket import AF_INET, AF_INET6, inet_pton, inet_ntop
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP
13 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
14 in6_mactoifaceid, in6_ismaddr
15 from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
16 ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
17 ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
18 ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
20 from framework import VppTestCase, VppTestRunner
21 from util import Host, ppp
24 class TestL2bdArpTerm(VppTestCase):
25 """ L2BD arp termination Test Case """
30 Perform standard class setup (defined by class method setUpClass in
31 class VppTestCase) before running the test case, set test case related
32 variables and configure VPP.
34 super(TestL2bdArpTerm, cls).setUpClass()
37 # Create pg interfaces
39 cls.ifs_per_bd = ifs_per_bd = 3
40 n_ifs = n_bd * ifs_per_bd
41 cls.create_pg_interfaces(range(n_ifs))
43 # Set up all interfaces
44 for i in cls.pg_interfaces:
50 super(TestL2bdArpTerm, cls).tearDownClass()
54 def tearDownClass(cls):
55 super(TestL2bdArpTerm, cls).tearDownClass()
59 Clear trace and packet infos before running each test.
61 self.reset_packet_infos()
62 super(TestL2bdArpTerm, self).setUp()
66 Show various debug prints after each test.
68 super(TestL2bdArpTerm, self).tearDown()
70 def show_commands_at_teardown(self):
71 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
72 # many tests delete bridge-domain 1 as the last task. don't output
73 # the details of a non-existent bridge-domain.
74 if self.vapi.l2_fib_table_dump(bd_id=1):
75 self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
77 def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
79 ip = e.ip4 if is_ipv6 == 0 else e.ip6
80 self.vapi.bd_ip_mac_add_del(is_add=is_add,
87 def mac_list(cls, b6_range):
88 return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
91 def ip4_host(cls, subnet, host, mac):
93 ip4="172.17.1%02u.%u" % (subnet, host))
96 def ip4_hosts(cls, subnet, start, mac_list):
97 return {cls.ip4_host(subnet, start + j, mac_list[j])
98 for j in range(len(mac_list))}
101 def ip6_host(cls, subnet, host, mac):
103 ip6="fd01:%x::%x" % (subnet, host))
106 def ip6_hosts(cls, subnet, start, mac_list):
107 return {cls.ip6_host(subnet, start + j, mac_list[j])
108 for j in range(len(mac_list))}
111 def bd_swifs(cls, b):
114 return [cls.pg_interfaces[j] for j in range(start, start + n)]
116 def bd_add_del(self, bd_id=1, is_add=1):
118 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
119 for swif in self.bd_swifs(bd_id):
120 swif_idx = swif.sw_if_index
121 self.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=swif_idx,
122 bd_id=bd_id, enable=is_add)
124 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
127 def arp_req(cls, src_host, host):
128 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
130 hwsrc=src_host.bin_mac,
135 def arp_reqs(cls, src_host, entries):
136 return [cls.arp_req(src_host, e) for e in entries]
139 def garp_req(cls, host):
140 return cls.arp_req(host, host)
143 def garp_reqs(cls, entries):
144 return [cls.garp_req(e) for e in entries]
146 def arp_resp_host(self, src_host, arp_resp):
147 ether = arp_resp[Ether]
148 self.assertEqual(ether.dst, src_host.mac)
151 self.assertEqual(arp.hwtype, 1)
152 self.assertEqual(arp.ptype, 0x800)
153 self.assertEqual(arp.hwlen, 6)
154 self.assertEqual(arp.plen, 4)
155 arp_opts = {"who-has": 1, "is-at": 2}
156 self.assertEqual(arp.op, arp_opts["is-at"])
157 self.assertEqual(arp.hwdst, src_host.mac)
158 self.assertEqual(arp.pdst, src_host.ip4)
159 return Host(mac=arp.hwsrc, ip4=arp.psrc)
161 def arp_resp_hosts(self, src_host, pkts):
162 return {self.arp_resp_host(src_host, p) for p in pkts}
166 o1 = int(ip / 16777216) % 256
167 o2 = int(ip / 65536) % 256
168 o3 = int(ip / 256) % 256
170 return '%s.%s.%s.%s' % (o1, o2, o3, o4)
172 def arp_event_host(self, e):
173 return Host(str(e.mac), ip4=str(e.ip))
175 def arp_event_hosts(self, evs):
176 return {self.arp_event_host(e) for e in evs}
178 def nd_event_host(self, e):
179 return Host(str(e.mac), ip6=str(e.ip))
181 def nd_event_hosts(self, evs):
182 return {self.nd_event_host(e) for e in evs}
185 def ns_req(cls, src_host, host):
186 nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
187 d = inet_ntop(AF_INET6, nsma)
188 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
189 IPv6(dst=d, src=src_host.ip6) /
190 ICMPv6ND_NS(tgt=host.ip6) /
191 ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac))
194 def ns_reqs_dst(cls, entries, dst_host):
195 return [cls.ns_req(e, dst_host) for e in entries]
198 def ns_reqs_src(cls, src_host, entries):
199 return [cls.ns_req(src_host, e) for e in entries]
201 def na_resp_host(self, src_host, rx):
202 self.assertEqual(rx[Ether].dst, src_host.mac)
203 self.assertEqual(in6_ptop(rx[IPv6].dst),
204 in6_ptop(src_host.ip6))
206 self.assertTrue(rx.haslayer(ICMPv6ND_NA))
207 self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
210 return Host(mac=na.lladdr, ip6=na.tgt)
212 def na_resp_hosts(self, src_host, pkts):
213 return {self.na_resp_host(src_host, p) for p in pkts}
215 def set_bd_flags(self, bd_id, **args):
217 Enable/disable defined feature(s) of the bridge domain.
219 :param int bd_id: Bridge domain ID.
220 :param list args: List of feature/status pairs. Allowed features: \
221 learn, forward, flood, uu_flood and arp_term. Status False means \
222 disable, status True means enable the feature.
223 :raise: ValueError in case of unknown feature in the input.
227 feature_bitmap = 1 << 0
228 elif flag == "forward":
229 feature_bitmap = 1 << 1
230 elif flag == "flood":
231 feature_bitmap = 1 << 2
232 elif flag == "uu_flood":
233 feature_bitmap = 1 << 3
234 elif flag == "arp_term":
235 feature_bitmap = 1 << 4
237 raise ValueError("Unknown feature used: %s" % flag)
238 is_set = 1 if args[flag] else 0
239 self.vapi.bridge_flags(bd_id=bd_id, is_set=is_set,
240 flags=feature_bitmap)
241 self.logger.info("Bridge domain ID %d updated" % bd_id)
243 def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
244 reqs = self.arp_reqs(src_host, req_hosts)
246 for swif in self.bd_swifs(bd_id):
247 swif.add_stream(reqs)
249 self.pg_enable_capture(self.pg_interfaces)
252 for swif in self.bd_swifs(bd_id):
253 resp_pkts = swif.get_capture(len(resp_hosts))
254 resps = self.arp_resp_hosts(src_host, resp_pkts)
255 self.assertEqual(len(resps ^ resp_hosts), 0)
257 def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
258 reqs = self.ns_reqs_src(src_host, req_hosts)
260 for swif in self.bd_swifs(bd_id):
261 swif.add_stream(reqs)
263 self.pg_enable_capture(self.pg_interfaces)
266 for swif in self.bd_swifs(bd_id):
267 resp_pkts = swif.get_capture(len(resp_hosts))
268 resps = self.na_resp_hosts(src_host, resp_pkts)
269 self.assertEqual(len(resps ^ resp_hosts), 0)
271 def test_l2bd_arp_term_01(self):
272 """ L2BD arp term - add 5 hosts, verify arp responses
274 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
275 self.bd_add_del(1, is_add=1)
276 self.set_bd_flags(1, arp_term=True, flood=False,
277 uu_flood=False, learn=False)
278 macs = self.mac_list(range(1, 5))
279 hosts = self.ip4_hosts(4, 1, macs)
280 self.add_del_arp_term_hosts(hosts, is_add=1)
282 self.verify_arp(src_host, hosts, hosts)
283 type(self).hosts = hosts
285 def test_l2bd_arp_term_02(self):
286 """ L2BD arp term - delete 3 hosts, verify arp responses
288 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
289 macs = self.mac_list(range(1, 3))
290 deleted = self.ip4_hosts(4, 1, macs)
291 self.add_del_arp_term_hosts(deleted, is_add=0)
292 remaining = self.hosts - deleted
293 self.verify_arp(src_host, self.hosts, remaining)
294 type(self).hosts = remaining
295 self.bd_add_del(1, is_add=0)
297 def test_l2bd_arp_term_03(self):
298 """ L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses
300 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
301 self.bd_add_del(1, is_add=1)
302 self.set_bd_flags(1, arp_term=True, flood=False,
303 uu_flood=False, learn=False)
304 macs = self.mac_list(range(1, 3))
305 readded = self.ip4_hosts(4, 1, macs)
306 self.add_del_arp_term_hosts(readded, is_add=1)
307 self.verify_arp(src_host, self.hosts | readded, readded)
308 type(self).hosts = readded
310 def test_l2bd_arp_term_04(self):
311 """ L2BD arp term - 2 IP4 addrs per host
313 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
314 macs = self.mac_list(range(1, 3))
315 sub5_hosts = self.ip4_hosts(5, 1, macs)
316 self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
317 hosts = self.hosts | sub5_hosts
318 self.verify_arp(src_host, hosts, hosts)
319 type(self).hosts = hosts
320 self.bd_add_del(1, is_add=0)
322 def test_l2bd_arp_term_05(self):
323 """ L2BD arp term - create and update 10 IP4-mac pairs
325 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
326 self.bd_add_del(1, is_add=1)
327 self.set_bd_flags(1, arp_term=True, flood=False,
328 uu_flood=False, learn=False)
329 macs1 = self.mac_list(range(10, 20))
330 hosts1 = self.ip4_hosts(5, 1, macs1)
331 self.add_del_arp_term_hosts(hosts1, is_add=1)
332 self.verify_arp(src_host, hosts1, hosts1)
333 macs2 = self.mac_list(range(20, 30))
334 hosts2 = self.ip4_hosts(5, 1, macs2)
335 self.add_del_arp_term_hosts(hosts2, is_add=1)
336 self.verify_arp(src_host, hosts1, hosts2)
337 self.bd_add_del(1, is_add=0)
339 def test_l2bd_arp_term_06(self):
340 """ L2BD arp/ND term - hosts with both ip4/ip6
342 src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
343 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
344 self.bd_add_del(1, is_add=1)
345 # enable flood to make sure requests are not flooded
346 self.set_bd_flags(1, arp_term=True, flood=True,
347 uu_flood=False, learn=False)
348 macs = self.mac_list(range(10, 20))
349 hosts6 = self.ip6_hosts(5, 1, macs)
350 hosts4 = self.ip4_hosts(5, 1, macs)
351 self.add_del_arp_term_hosts(hosts4, is_add=1)
352 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
353 self.verify_arp(src_host4, hosts4, hosts4)
354 self.verify_nd(src_host6, hosts6, hosts6)
355 self.bd_add_del(1, is_add=0)
357 def test_l2bd_arp_term_07(self):
358 """ L2BD ND term - Add and Del hosts, verify ND replies
360 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
361 self.bd_add_del(1, is_add=1)
362 self.set_bd_flags(1, arp_term=True, flood=False,
363 uu_flood=False, learn=False)
364 macs = self.mac_list(range(10, 20))
365 hosts6 = self.ip6_hosts(5, 1, macs)
366 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
367 self.verify_nd(src_host6, hosts6, hosts6)
368 del_macs = self.mac_list(range(10, 15))
369 deleted = self.ip6_hosts(5, 1, del_macs)
370 self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
371 self.verify_nd(src_host6, hosts6, hosts6 - deleted)
372 self.bd_add_del(1, is_add=0)
374 def test_l2bd_arp_term_08(self):
375 """ L2BD ND term - Add and update IP+mac, verify ND replies
377 src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
378 self.bd_add_del(1, is_add=1)
379 self.set_bd_flags(1, arp_term=True, flood=False,
380 uu_flood=False, learn=False)
381 macs1 = self.mac_list(range(10, 20))
382 hosts = self.ip6_hosts(5, 1, macs1)
383 self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
384 self.verify_nd(src_host, hosts, hosts)
385 macs2 = self.mac_list(range(20, 30))
386 updated = self.ip6_hosts(5, 1, macs2)
387 self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
388 self.verify_nd(src_host, hosts, updated)
389 self.bd_add_del(1, is_add=0)
391 def test_l2bd_arp_term_09(self):
392 """ L2BD arp term - send garps, verify arp event reports
394 self.vapi.want_l2_arp_term_events(enable=1)
395 self.bd_add_del(1, is_add=1)
396 self.set_bd_flags(1, arp_term=True, flood=False,
397 uu_flood=False, learn=False)
398 macs = self.mac_list(range(90, 95))
399 hosts = self.ip4_hosts(5, 1, macs)
401 garps = self.garp_reqs(hosts)
402 self.bd_swifs(1)[0].add_stream(garps)
404 self.pg_enable_capture(self.pg_interfaces)
406 evs = [self.vapi.wait_for_event(1, "l2_arp_term_event")
407 for i in range(len(hosts))]
408 ev_hosts = self.arp_event_hosts(evs)
409 self.assertEqual(len(ev_hosts ^ hosts), 0)
411 def test_l2bd_arp_term_10(self):
412 """ L2BD arp term - send duplicate garps, verify suppression
414 macs = self.mac_list(range(70, 71))
415 hosts = self.ip4_hosts(6, 1, macs)
417 """ send the packet 5 times expect one event
419 garps = self.garp_reqs(hosts) * 5
420 self.bd_swifs(1)[0].add_stream(garps)
422 self.pg_enable_capture(self.pg_interfaces)
424 evs = [self.vapi.wait_for_event(1, "l2_arp_term_event")
425 for i in range(len(hosts))]
426 ev_hosts = self.arp_event_hosts(evs)
427 self.assertEqual(len(ev_hosts ^ hosts), 0)
429 def test_l2bd_arp_term_11(self):
430 """ L2BD arp term - disable ip4 arp events,send garps, verify no events
432 self.vapi.want_l2_arp_term_events(enable=0)
433 macs = self.mac_list(range(90, 95))
434 hosts = self.ip4_hosts(5, 1, macs)
436 garps = self.garp_reqs(hosts)
437 self.bd_swifs(1)[0].add_stream(garps)
439 self.pg_enable_capture(self.pg_interfaces)
442 self.assertEqual(len(self.vapi.collect_events()), 0)
443 self.bd_add_del(1, is_add=0)
445 def test_l2bd_arp_term_12(self):
446 """ L2BD ND term - send NS packets verify reports
448 self.vapi.want_l2_arp_term_events(enable=1)
449 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
450 self.bd_add_del(1, is_add=1)
451 self.set_bd_flags(1, arp_term=True, flood=False,
452 uu_flood=False, learn=False)
453 macs = self.mac_list(range(10, 15))
454 hosts = self.ip6_hosts(5, 1, macs)
455 reqs = self.ns_reqs_dst(hosts, dst_host)
456 self.bd_swifs(1)[0].add_stream(reqs)
458 self.pg_enable_capture(self.pg_interfaces)
460 evs = [self.vapi.wait_for_event(2, "l2_arp_term_event")
461 for i in range(len(hosts))]
462 ev_hosts = self.nd_event_hosts(evs)
463 self.assertEqual(len(ev_hosts ^ hosts), 0)
465 def test_l2bd_arp_term_13(self):
466 """ L2BD ND term - send duplicate ns, verify suppression
468 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
469 macs = self.mac_list(range(16, 17))
470 hosts = self.ip6_hosts(5, 1, macs)
471 reqs = self.ns_reqs_dst(hosts, dst_host) * 5
472 self.bd_swifs(1)[0].add_stream(reqs)
474 self.pg_enable_capture(self.pg_interfaces)
476 evs = [self.vapi.wait_for_event(2, "l2_arp_term_event")
477 for i in range(len(hosts))]
478 ev_hosts = self.nd_event_hosts(evs)
479 self.assertEqual(len(ev_hosts ^ hosts), 0)
481 def test_l2bd_arp_term_14(self):
482 """ L2BD ND term - disable ip4 arp events,send ns, verify no events
484 self.vapi.want_l2_arp_term_events(enable=0)
485 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
486 macs = self.mac_list(range(10, 15))
487 hosts = self.ip6_hosts(5, 1, macs)
488 reqs = self.ns_reqs_dst(hosts, dst_host)
489 self.bd_swifs(1)[0].add_stream(reqs)
491 self.pg_enable_capture(self.pg_interfaces)
494 self.assertEqual(len(self.vapi.collect_events()), 0)
495 self.bd_add_del(1, is_add=0)
498 if __name__ == '__main__':
499 unittest.main(testRunner=VppTestRunner)