2 """ L2BD ARP term Test """
8 from socket import AF_INET, AF_INET6
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP
13 from scapy.utils import inet_pton, inet_ntop
14 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
15 in6_mactoifaceid, in6_ismaddr
16 from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
17 ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
18 ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
19 ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
21 from framework import VppTestCase, VppTestRunner
22 from util import Host, ppp
25 class TestL2bdArpTerm(VppTestCase):
26 """ L2BD arp termination Test Case """
31 Perform standard class setup (defined by class method setUpClass in
32 class VppTestCase) before running the test case, set test case related
33 variables and configure VPP.
35 super(TestL2bdArpTerm, cls).setUpClass()
38 # Create pg interfaces
40 cls.ifs_per_bd = ifs_per_bd = 3
41 n_ifs = n_bd * ifs_per_bd
42 cls.create_pg_interfaces(range(n_ifs))
44 # Set up all interfaces
45 for i in cls.pg_interfaces:
51 super(TestL2bdArpTerm, cls).tearDownClass()
55 def tearDownClass(cls):
56 super(TestL2bdArpTerm, cls).tearDownClass()
60 Clear trace and packet infos before running each test.
62 self.reset_packet_infos()
63 super(TestL2bdArpTerm, self).setUp()
67 Show various debug prints after each test.
69 super(TestL2bdArpTerm, self).tearDown()
71 def show_commands_at_teardown(self):
72 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
73 # many tests delete bridge-domain 1 as the last task. don't output
74 # the details of a non-existent bridge-domain.
75 if self.vapi.l2_fib_table_dump(bd_id=1):
76 self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
78 def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
80 ip = e.ip4 if is_ipv6 == 0 else e.ip6
81 self.vapi.bd_ip_mac_add_del(bd_id=bd_id, is_add=is_add, ip=ip,
85 def mac_list(cls, b6_range):
86 return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
89 def ip4_host(cls, subnet, host, mac):
91 ip4="172.17.1%02u.%u" % (subnet, host))
94 def ip4_hosts(cls, subnet, start, mac_list):
95 return {cls.ip4_host(subnet, start + j, mac_list[j])
96 for j in range(len(mac_list))}
99 def ip6_host(cls, subnet, host, mac):
101 ip6="fd01:%x::%x" % (subnet, host))
104 def ip6_hosts(cls, subnet, start, mac_list):
105 return {cls.ip6_host(subnet, start + j, mac_list[j])
106 for j in range(len(mac_list))}
109 def bd_swifs(cls, b):
112 return [cls.pg_interfaces[j] for j in range(start, start + n)]
114 def bd_add_del(self, bd_id=1, is_add=1):
116 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
117 for swif in self.bd_swifs(bd_id):
118 swif_idx = swif.sw_if_index
119 self.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=swif_idx,
120 bd_id=bd_id, enable=is_add)
122 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
125 def arp_req(cls, src_host, host):
126 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
128 hwsrc=src_host.bin_mac,
133 def arp_reqs(cls, src_host, entries):
134 return [cls.arp_req(src_host, e) for e in entries]
137 def garp_req(cls, host):
138 return cls.arp_req(host, host)
141 def garp_reqs(cls, entries):
142 return [cls.garp_req(e) for e in entries]
144 def arp_resp_host(self, src_host, arp_resp):
145 ether = arp_resp[Ether]
146 self.assertEqual(ether.dst, src_host.mac)
149 self.assertEqual(arp.hwtype, 1)
150 self.assertEqual(arp.ptype, 0x800)
151 self.assertEqual(arp.hwlen, 6)
152 self.assertEqual(arp.plen, 4)
153 arp_opts = {"who-has": 1, "is-at": 2}
154 self.assertEqual(arp.op, arp_opts["is-at"])
155 self.assertEqual(arp.hwdst, src_host.mac)
156 self.assertEqual(arp.pdst, src_host.ip4)
157 return Host(mac=arp.hwsrc, ip4=arp.psrc)
159 def arp_resp_hosts(self, src_host, pkts):
160 return {self.arp_resp_host(src_host, p) for p in pkts}
164 o1 = int(ip / 16777216) % 256
165 o2 = int(ip / 65536) % 256
166 o3 = int(ip / 256) % 256
168 return '%s.%s.%s.%s' % (o1, o2, o3, o4)
170 def arp_event_host(self, e):
171 return Host(str(e.mac), ip4=str(e.ip))
173 def arp_event_hosts(self, evs):
174 return {self.arp_event_host(e) for e in evs}
176 def nd_event_host(self, e):
177 return Host(str(e.mac), ip6=str(e.ip))
179 def nd_event_hosts(self, evs):
180 return {self.nd_event_host(e) for e in evs}
183 def ns_req(cls, src_host, host):
184 nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
185 d = inet_ntop(AF_INET6, nsma)
186 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
187 IPv6(dst=d, src=src_host.ip6) /
188 ICMPv6ND_NS(tgt=host.ip6) /
189 ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac))
192 def ns_reqs_dst(cls, entries, dst_host):
193 return [cls.ns_req(e, dst_host) for e in entries]
196 def ns_reqs_src(cls, src_host, entries):
197 return [cls.ns_req(src_host, e) for e in entries]
199 def na_resp_host(self, src_host, rx):
200 self.assertEqual(rx[Ether].dst, src_host.mac)
201 self.assertEqual(in6_ptop(rx[IPv6].dst),
202 in6_ptop(src_host.ip6))
204 self.assertTrue(rx.haslayer(ICMPv6ND_NA))
205 self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
208 return Host(mac=na.lladdr, ip6=na.tgt)
210 def na_resp_hosts(self, src_host, pkts):
211 return {self.na_resp_host(src_host, p) for p in pkts}
213 def set_bd_flags(self, bd_id, **args):
215 Enable/disable defined feature(s) of the bridge domain.
217 :param int bd_id: Bridge domain ID.
218 :param list args: List of feature/status pairs. Allowed features: \
219 learn, forward, flood, uu_flood and arp_term. Status False means \
220 disable, status True means enable the feature.
221 :raise: ValueError in case of unknown feature in the input.
225 feature_bitmap = 1 << 0
226 elif flag == "forward":
227 feature_bitmap = 1 << 1
228 elif flag == "flood":
229 feature_bitmap = 1 << 2
230 elif flag == "uu_flood":
231 feature_bitmap = 1 << 3
232 elif flag == "arp_term":
233 feature_bitmap = 1 << 4
235 raise ValueError("Unknown feature used: %s" % flag)
236 is_set = 1 if args[flag] else 0
237 self.vapi.bridge_flags(bd_id=bd_id, is_set=is_set,
238 flags=feature_bitmap)
239 self.logger.info("Bridge domain ID %d updated" % bd_id)
241 def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
242 reqs = self.arp_reqs(src_host, req_hosts)
244 for swif in self.bd_swifs(bd_id):
245 swif.add_stream(reqs)
247 self.pg_enable_capture(self.pg_interfaces)
250 for swif in self.bd_swifs(bd_id):
251 resp_pkts = swif.get_capture(len(resp_hosts))
252 resps = self.arp_resp_hosts(src_host, resp_pkts)
253 self.assertEqual(len(resps ^ resp_hosts), 0)
255 def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
256 reqs = self.ns_reqs_src(src_host, req_hosts)
258 for swif in self.bd_swifs(bd_id):
259 swif.add_stream(reqs)
261 self.pg_enable_capture(self.pg_interfaces)
264 for swif in self.bd_swifs(bd_id):
265 resp_pkts = swif.get_capture(len(resp_hosts))
266 resps = self.na_resp_hosts(src_host, resp_pkts)
267 self.assertEqual(len(resps ^ resp_hosts), 0)
269 def test_l2bd_arp_term_01(self):
270 """ L2BD arp term - add 5 hosts, verify arp responses
272 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
273 self.bd_add_del(1, is_add=1)
274 self.set_bd_flags(1, arp_term=True, flood=False,
275 uu_flood=False, learn=False)
276 macs = self.mac_list(range(1, 5))
277 hosts = self.ip4_hosts(4, 1, macs)
278 self.add_del_arp_term_hosts(hosts, is_add=1)
280 self.verify_arp(src_host, hosts, hosts)
281 type(self).hosts = hosts
283 def test_l2bd_arp_term_02(self):
284 """ L2BD arp term - delete 3 hosts, verify arp responses
286 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
287 macs = self.mac_list(range(1, 3))
288 deleted = self.ip4_hosts(4, 1, macs)
289 self.add_del_arp_term_hosts(deleted, is_add=0)
290 remaining = self.hosts - deleted
291 self.verify_arp(src_host, self.hosts, remaining)
292 type(self).hosts = remaining
293 self.bd_add_del(1, is_add=0)
295 def test_l2bd_arp_term_03(self):
296 """ L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses
298 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
299 self.bd_add_del(1, is_add=1)
300 self.set_bd_flags(1, arp_term=True, flood=False,
301 uu_flood=False, learn=False)
302 macs = self.mac_list(range(1, 3))
303 readded = self.ip4_hosts(4, 1, macs)
304 self.add_del_arp_term_hosts(readded, is_add=1)
305 self.verify_arp(src_host, self.hosts | readded, readded)
306 type(self).hosts = readded
308 def test_l2bd_arp_term_04(self):
309 """ L2BD arp term - 2 IP4 addrs per host
311 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
312 macs = self.mac_list(range(1, 3))
313 sub5_hosts = self.ip4_hosts(5, 1, macs)
314 self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
315 hosts = self.hosts | sub5_hosts
316 self.verify_arp(src_host, hosts, hosts)
317 type(self).hosts = hosts
318 self.bd_add_del(1, is_add=0)
320 def test_l2bd_arp_term_05(self):
321 """ L2BD arp term - create and update 10 IP4-mac pairs
323 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
324 self.bd_add_del(1, is_add=1)
325 self.set_bd_flags(1, arp_term=True, flood=False,
326 uu_flood=False, learn=False)
327 macs1 = self.mac_list(range(10, 20))
328 hosts1 = self.ip4_hosts(5, 1, macs1)
329 self.add_del_arp_term_hosts(hosts1, is_add=1)
330 self.verify_arp(src_host, hosts1, hosts1)
331 macs2 = self.mac_list(range(20, 30))
332 hosts2 = self.ip4_hosts(5, 1, macs2)
333 self.add_del_arp_term_hosts(hosts2, is_add=1)
334 self.verify_arp(src_host, hosts1, hosts2)
335 self.bd_add_del(1, is_add=0)
337 def test_l2bd_arp_term_06(self):
338 """ L2BD arp/ND term - hosts with both ip4/ip6
340 src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
341 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
342 self.bd_add_del(1, is_add=1)
343 # enable flood to make sure requests are not flooded
344 self.set_bd_flags(1, arp_term=True, flood=True,
345 uu_flood=False, learn=False)
346 macs = self.mac_list(range(10, 20))
347 hosts6 = self.ip6_hosts(5, 1, macs)
348 hosts4 = self.ip4_hosts(5, 1, macs)
349 self.add_del_arp_term_hosts(hosts4, is_add=1)
350 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
351 self.verify_arp(src_host4, hosts4, hosts4)
352 self.verify_nd(src_host6, hosts6, hosts6)
353 self.bd_add_del(1, is_add=0)
355 def test_l2bd_arp_term_07(self):
356 """ L2BD ND term - Add and Del hosts, verify ND replies
358 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
359 self.bd_add_del(1, is_add=1)
360 self.set_bd_flags(1, arp_term=True, flood=False,
361 uu_flood=False, learn=False)
362 macs = self.mac_list(range(10, 20))
363 hosts6 = self.ip6_hosts(5, 1, macs)
364 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
365 self.verify_nd(src_host6, hosts6, hosts6)
366 del_macs = self.mac_list(range(10, 15))
367 deleted = self.ip6_hosts(5, 1, del_macs)
368 self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
369 self.verify_nd(src_host6, hosts6, hosts6 - deleted)
370 self.bd_add_del(1, is_add=0)
372 def test_l2bd_arp_term_08(self):
373 """ L2BD ND term - Add and update IP+mac, verify ND replies
375 src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
376 self.bd_add_del(1, is_add=1)
377 self.set_bd_flags(1, arp_term=True, flood=False,
378 uu_flood=False, learn=False)
379 macs1 = self.mac_list(range(10, 20))
380 hosts = self.ip6_hosts(5, 1, macs1)
381 self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
382 self.verify_nd(src_host, hosts, hosts)
383 macs2 = self.mac_list(range(20, 30))
384 updated = self.ip6_hosts(5, 1, macs2)
385 self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
386 self.verify_nd(src_host, hosts, updated)
387 self.bd_add_del(1, is_add=0)
389 def test_l2bd_arp_term_09(self):
390 """ L2BD arp term - send garps, verify arp event reports
392 self.vapi.want_ip4_arp_events()
393 self.bd_add_del(1, is_add=1)
394 self.set_bd_flags(1, arp_term=True, flood=False,
395 uu_flood=False, learn=False)
396 macs = self.mac_list(range(90, 95))
397 hosts = self.ip4_hosts(5, 1, macs)
399 garps = self.garp_reqs(hosts)
400 self.bd_swifs(1)[0].add_stream(garps)
402 self.pg_enable_capture(self.pg_interfaces)
404 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
405 for i in range(len(hosts))]
406 ev_hosts = self.arp_event_hosts(evs)
407 self.assertEqual(len(ev_hosts ^ hosts), 0)
409 def test_l2bd_arp_term_10(self):
410 """ L2BD arp term - send duplicate garps, verify suppression
412 macs = self.mac_list(range(70, 71))
413 hosts = self.ip4_hosts(6, 1, macs)
415 """ send the packet 5 times expect one event
417 garps = self.garp_reqs(hosts) * 5
418 self.bd_swifs(1)[0].add_stream(garps)
420 self.pg_enable_capture(self.pg_interfaces)
422 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
423 for i in range(len(hosts))]
424 ev_hosts = self.arp_event_hosts(evs)
425 self.assertEqual(len(ev_hosts ^ hosts), 0)
427 def test_l2bd_arp_term_11(self):
428 """ L2BD arp term - disable ip4 arp events,send garps, verify no events
430 self.vapi.want_ip4_arp_events(enable_disable=0)
431 macs = self.mac_list(range(90, 95))
432 hosts = self.ip4_hosts(5, 1, macs)
434 garps = self.garp_reqs(hosts)
435 self.bd_swifs(1)[0].add_stream(garps)
437 self.pg_enable_capture(self.pg_interfaces)
440 self.assertEqual(len(self.vapi.collect_events()), 0)
441 self.bd_add_del(1, is_add=0)
443 def test_l2bd_arp_term_12(self):
444 """ L2BD ND term - send NS packets verify reports
446 self.vapi.want_ip6_nd_events(ip="::")
447 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
448 self.bd_add_del(1, is_add=1)
449 self.set_bd_flags(1, arp_term=True, flood=False,
450 uu_flood=False, learn=False)
451 macs = self.mac_list(range(10, 15))
452 hosts = self.ip6_hosts(5, 1, macs)
453 reqs = self.ns_reqs_dst(hosts, dst_host)
454 self.bd_swifs(1)[0].add_stream(reqs)
456 self.pg_enable_capture(self.pg_interfaces)
458 evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
459 for i in range(len(hosts))]
460 ev_hosts = self.nd_event_hosts(evs)
461 self.assertEqual(len(ev_hosts ^ hosts), 0)
463 def test_l2bd_arp_term_13(self):
464 """ L2BD ND term - send duplicate ns, verify suppression
466 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
467 macs = self.mac_list(range(10, 11))
468 hosts = self.ip6_hosts(5, 1, macs)
469 reqs = self.ns_reqs_dst(hosts, dst_host) * 5
470 self.bd_swifs(1)[0].add_stream(reqs)
472 self.pg_enable_capture(self.pg_interfaces)
474 evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
475 for i in range(len(hosts))]
476 ev_hosts = self.nd_event_hosts(evs)
477 self.assertEqual(len(ev_hosts ^ hosts), 0)
479 def test_l2bd_arp_term_14(self):
480 """ L2BD ND term - disable ip4 arp events,send ns, verify no events
482 self.vapi.want_ip6_nd_events(enable_disable=0, ip="::")
483 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
484 macs = self.mac_list(range(10, 15))
485 hosts = self.ip6_hosts(5, 1, macs)
486 reqs = self.ns_reqs_dst(hosts, dst_host)
487 self.bd_swifs(1)[0].add_stream(reqs)
489 self.pg_enable_capture(self.pg_interfaces)
492 self.assertEqual(len(self.vapi.collect_events()), 0)
493 self.bd_add_del(1, is_add=0)
496 if __name__ == '__main__':
497 unittest.main(testRunner=VppTestRunner)