2 """ L2BD ARP term Test """
8 from socket import AF_INET, AF_INET6
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP
13 from scapy.utils import inet_pton, inet_ntop
14 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
15 in6_mactoifaceid, in6_ismaddr
16 from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
17 ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
18 ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
19 ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
21 from framework import VppTestCase, VppTestRunner
22 from util import Host, ppp
25 class TestL2bdArpTerm(VppTestCase):
26 """ L2BD arp termination Test Case """
31 Perform standard class setup (defined by class method setUpClass in
32 class VppTestCase) before running the test case, set test case related
33 variables and configure VPP.
35 super(TestL2bdArpTerm, cls).setUpClass()
38 # Create pg interfaces
40 cls.ifs_per_bd = ifs_per_bd = 3
41 n_ifs = n_bd * ifs_per_bd
42 cls.create_pg_interfaces(range(n_ifs))
44 # Set up all interfaces
45 for i in cls.pg_interfaces:
51 super(TestL2bdArpTerm, cls).tearDownClass()
55 def tearDownClass(cls):
56 super(TestL2bdArpTerm, cls).tearDownClass()
60 Clear trace and packet infos before running each test.
62 self.reset_packet_infos()
63 super(TestL2bdArpTerm, self).setUp()
67 Show various debug prints after each test.
69 super(TestL2bdArpTerm, self).tearDown()
71 def show_commands_at_teardown(self):
72 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
73 # many tests delete bridge-domain 1 as the last task. don't output
74 # the details of a non-existent bridge-domain.
75 if self.vapi.l2_fib_table_dump(bd_id=1):
76 self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
78 def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
80 ip = e.ip4 if is_ipv6 == 0 else e.ip6
81 self.vapi.bd_ip_mac_add_del(is_add=is_add,
88 def mac_list(cls, b6_range):
89 return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
92 def ip4_host(cls, subnet, host, mac):
94 ip4="172.17.1%02u.%u" % (subnet, host))
97 def ip4_hosts(cls, subnet, start, mac_list):
98 return {cls.ip4_host(subnet, start + j, mac_list[j])
99 for j in range(len(mac_list))}
102 def ip6_host(cls, subnet, host, mac):
104 ip6="fd01:%x::%x" % (subnet, host))
107 def ip6_hosts(cls, subnet, start, mac_list):
108 return {cls.ip6_host(subnet, start + j, mac_list[j])
109 for j in range(len(mac_list))}
112 def bd_swifs(cls, b):
115 return [cls.pg_interfaces[j] for j in range(start, start + n)]
117 def bd_add_del(self, bd_id=1, is_add=1):
119 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
120 for swif in self.bd_swifs(bd_id):
121 swif_idx = swif.sw_if_index
122 self.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=swif_idx,
123 bd_id=bd_id, enable=is_add)
125 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
128 def arp_req(cls, src_host, host):
129 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
131 hwsrc=src_host.bin_mac,
136 def arp_reqs(cls, src_host, entries):
137 return [cls.arp_req(src_host, e) for e in entries]
140 def garp_req(cls, host):
141 return cls.arp_req(host, host)
144 def garp_reqs(cls, entries):
145 return [cls.garp_req(e) for e in entries]
147 def arp_resp_host(self, src_host, arp_resp):
148 ether = arp_resp[Ether]
149 self.assertEqual(ether.dst, src_host.mac)
152 self.assertEqual(arp.hwtype, 1)
153 self.assertEqual(arp.ptype, 0x800)
154 self.assertEqual(arp.hwlen, 6)
155 self.assertEqual(arp.plen, 4)
156 arp_opts = {"who-has": 1, "is-at": 2}
157 self.assertEqual(arp.op, arp_opts["is-at"])
158 self.assertEqual(arp.hwdst, src_host.mac)
159 self.assertEqual(arp.pdst, src_host.ip4)
160 return Host(mac=arp.hwsrc, ip4=arp.psrc)
162 def arp_resp_hosts(self, src_host, pkts):
163 return {self.arp_resp_host(src_host, p) for p in pkts}
167 o1 = int(ip / 16777216) % 256
168 o2 = int(ip / 65536) % 256
169 o3 = int(ip / 256) % 256
171 return '%s.%s.%s.%s' % (o1, o2, o3, o4)
173 def arp_event_host(self, e):
174 return Host(str(e.mac), ip4=str(e.ip))
176 def arp_event_hosts(self, evs):
177 return {self.arp_event_host(e) for e in evs}
179 def nd_event_host(self, e):
180 return Host(str(e.mac), ip6=str(e.ip))
182 def nd_event_hosts(self, evs):
183 return {self.nd_event_host(e) for e in evs}
186 def ns_req(cls, src_host, host):
187 nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
188 d = inet_ntop(AF_INET6, nsma)
189 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
190 IPv6(dst=d, src=src_host.ip6) /
191 ICMPv6ND_NS(tgt=host.ip6) /
192 ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac))
195 def ns_reqs_dst(cls, entries, dst_host):
196 return [cls.ns_req(e, dst_host) for e in entries]
199 def ns_reqs_src(cls, src_host, entries):
200 return [cls.ns_req(src_host, e) for e in entries]
202 def na_resp_host(self, src_host, rx):
203 self.assertEqual(rx[Ether].dst, src_host.mac)
204 self.assertEqual(in6_ptop(rx[IPv6].dst),
205 in6_ptop(src_host.ip6))
207 self.assertTrue(rx.haslayer(ICMPv6ND_NA))
208 self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
211 return Host(mac=na.lladdr, ip6=na.tgt)
213 def na_resp_hosts(self, src_host, pkts):
214 return {self.na_resp_host(src_host, p) for p in pkts}
216 def set_bd_flags(self, bd_id, **args):
218 Enable/disable defined feature(s) of the bridge domain.
220 :param int bd_id: Bridge domain ID.
221 :param list args: List of feature/status pairs. Allowed features: \
222 learn, forward, flood, uu_flood and arp_term. Status False means \
223 disable, status True means enable the feature.
224 :raise: ValueError in case of unknown feature in the input.
228 feature_bitmap = 1 << 0
229 elif flag == "forward":
230 feature_bitmap = 1 << 1
231 elif flag == "flood":
232 feature_bitmap = 1 << 2
233 elif flag == "uu_flood":
234 feature_bitmap = 1 << 3
235 elif flag == "arp_term":
236 feature_bitmap = 1 << 4
238 raise ValueError("Unknown feature used: %s" % flag)
239 is_set = 1 if args[flag] else 0
240 self.vapi.bridge_flags(bd_id=bd_id, is_set=is_set,
241 flags=feature_bitmap)
242 self.logger.info("Bridge domain ID %d updated" % bd_id)
244 def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
245 reqs = self.arp_reqs(src_host, req_hosts)
247 for swif in self.bd_swifs(bd_id):
248 swif.add_stream(reqs)
250 self.pg_enable_capture(self.pg_interfaces)
253 for swif in self.bd_swifs(bd_id):
254 resp_pkts = swif.get_capture(len(resp_hosts))
255 resps = self.arp_resp_hosts(src_host, resp_pkts)
256 self.assertEqual(len(resps ^ resp_hosts), 0)
258 def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
259 reqs = self.ns_reqs_src(src_host, req_hosts)
261 for swif in self.bd_swifs(bd_id):
262 swif.add_stream(reqs)
264 self.pg_enable_capture(self.pg_interfaces)
267 for swif in self.bd_swifs(bd_id):
268 resp_pkts = swif.get_capture(len(resp_hosts))
269 resps = self.na_resp_hosts(src_host, resp_pkts)
270 self.assertEqual(len(resps ^ resp_hosts), 0)
272 def test_l2bd_arp_term_01(self):
273 """ L2BD arp term - add 5 hosts, verify arp responses
275 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
276 self.bd_add_del(1, is_add=1)
277 self.set_bd_flags(1, arp_term=True, flood=False,
278 uu_flood=False, learn=False)
279 macs = self.mac_list(range(1, 5))
280 hosts = self.ip4_hosts(4, 1, macs)
281 self.add_del_arp_term_hosts(hosts, is_add=1)
283 self.verify_arp(src_host, hosts, hosts)
284 type(self).hosts = hosts
286 def test_l2bd_arp_term_02(self):
287 """ L2BD arp term - delete 3 hosts, verify arp responses
289 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
290 macs = self.mac_list(range(1, 3))
291 deleted = self.ip4_hosts(4, 1, macs)
292 self.add_del_arp_term_hosts(deleted, is_add=0)
293 remaining = self.hosts - deleted
294 self.verify_arp(src_host, self.hosts, remaining)
295 type(self).hosts = remaining
296 self.bd_add_del(1, is_add=0)
298 def test_l2bd_arp_term_03(self):
299 """ L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses
301 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
302 self.bd_add_del(1, is_add=1)
303 self.set_bd_flags(1, arp_term=True, flood=False,
304 uu_flood=False, learn=False)
305 macs = self.mac_list(range(1, 3))
306 readded = self.ip4_hosts(4, 1, macs)
307 self.add_del_arp_term_hosts(readded, is_add=1)
308 self.verify_arp(src_host, self.hosts | readded, readded)
309 type(self).hosts = readded
311 def test_l2bd_arp_term_04(self):
312 """ L2BD arp term - 2 IP4 addrs per host
314 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
315 macs = self.mac_list(range(1, 3))
316 sub5_hosts = self.ip4_hosts(5, 1, macs)
317 self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
318 hosts = self.hosts | sub5_hosts
319 self.verify_arp(src_host, hosts, hosts)
320 type(self).hosts = hosts
321 self.bd_add_del(1, is_add=0)
323 def test_l2bd_arp_term_05(self):
324 """ L2BD arp term - create and update 10 IP4-mac pairs
326 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
327 self.bd_add_del(1, is_add=1)
328 self.set_bd_flags(1, arp_term=True, flood=False,
329 uu_flood=False, learn=False)
330 macs1 = self.mac_list(range(10, 20))
331 hosts1 = self.ip4_hosts(5, 1, macs1)
332 self.add_del_arp_term_hosts(hosts1, is_add=1)
333 self.verify_arp(src_host, hosts1, hosts1)
334 macs2 = self.mac_list(range(20, 30))
335 hosts2 = self.ip4_hosts(5, 1, macs2)
336 self.add_del_arp_term_hosts(hosts2, is_add=1)
337 self.verify_arp(src_host, hosts1, hosts2)
338 self.bd_add_del(1, is_add=0)
340 def test_l2bd_arp_term_06(self):
341 """ L2BD arp/ND term - hosts with both ip4/ip6
343 src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
344 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
345 self.bd_add_del(1, is_add=1)
346 # enable flood to make sure requests are not flooded
347 self.set_bd_flags(1, arp_term=True, flood=True,
348 uu_flood=False, learn=False)
349 macs = self.mac_list(range(10, 20))
350 hosts6 = self.ip6_hosts(5, 1, macs)
351 hosts4 = self.ip4_hosts(5, 1, macs)
352 self.add_del_arp_term_hosts(hosts4, is_add=1)
353 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
354 self.verify_arp(src_host4, hosts4, hosts4)
355 self.verify_nd(src_host6, hosts6, hosts6)
356 self.bd_add_del(1, is_add=0)
358 def test_l2bd_arp_term_07(self):
359 """ L2BD ND term - Add and Del hosts, verify ND replies
361 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
362 self.bd_add_del(1, is_add=1)
363 self.set_bd_flags(1, arp_term=True, flood=False,
364 uu_flood=False, learn=False)
365 macs = self.mac_list(range(10, 20))
366 hosts6 = self.ip6_hosts(5, 1, macs)
367 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
368 self.verify_nd(src_host6, hosts6, hosts6)
369 del_macs = self.mac_list(range(10, 15))
370 deleted = self.ip6_hosts(5, 1, del_macs)
371 self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
372 self.verify_nd(src_host6, hosts6, hosts6 - deleted)
373 self.bd_add_del(1, is_add=0)
375 def test_l2bd_arp_term_08(self):
376 """ L2BD ND term - Add and update IP+mac, verify ND replies
378 src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
379 self.bd_add_del(1, is_add=1)
380 self.set_bd_flags(1, arp_term=True, flood=False,
381 uu_flood=False, learn=False)
382 macs1 = self.mac_list(range(10, 20))
383 hosts = self.ip6_hosts(5, 1, macs1)
384 self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
385 self.verify_nd(src_host, hosts, hosts)
386 macs2 = self.mac_list(range(20, 30))
387 updated = self.ip6_hosts(5, 1, macs2)
388 self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
389 self.verify_nd(src_host, hosts, updated)
390 self.bd_add_del(1, is_add=0)
392 def test_l2bd_arp_term_09(self):
393 """ L2BD arp term - send garps, verify arp event reports
395 self.vapi.want_ip4_arp_events()
396 self.bd_add_del(1, is_add=1)
397 self.set_bd_flags(1, arp_term=True, flood=False,
398 uu_flood=False, learn=False)
399 macs = self.mac_list(range(90, 95))
400 hosts = self.ip4_hosts(5, 1, macs)
402 garps = self.garp_reqs(hosts)
403 self.bd_swifs(1)[0].add_stream(garps)
405 self.pg_enable_capture(self.pg_interfaces)
407 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
408 for i in range(len(hosts))]
409 ev_hosts = self.arp_event_hosts(evs)
410 self.assertEqual(len(ev_hosts ^ hosts), 0)
412 def test_l2bd_arp_term_10(self):
413 """ L2BD arp term - send duplicate garps, verify suppression
415 macs = self.mac_list(range(70, 71))
416 hosts = self.ip4_hosts(6, 1, macs)
418 """ send the packet 5 times expect one event
420 garps = self.garp_reqs(hosts) * 5
421 self.bd_swifs(1)[0].add_stream(garps)
423 self.pg_enable_capture(self.pg_interfaces)
425 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
426 for i in range(len(hosts))]
427 ev_hosts = self.arp_event_hosts(evs)
428 self.assertEqual(len(ev_hosts ^ hosts), 0)
430 def test_l2bd_arp_term_11(self):
431 """ L2BD arp term - disable ip4 arp events,send garps, verify no events
433 self.vapi.want_ip4_arp_events(enable_disable=0)
434 macs = self.mac_list(range(90, 95))
435 hosts = self.ip4_hosts(5, 1, macs)
437 garps = self.garp_reqs(hosts)
438 self.bd_swifs(1)[0].add_stream(garps)
440 self.pg_enable_capture(self.pg_interfaces)
443 self.assertEqual(len(self.vapi.collect_events()), 0)
444 self.bd_add_del(1, is_add=0)
446 def test_l2bd_arp_term_12(self):
447 """ L2BD ND term - send NS packets verify reports
449 self.vapi.want_ip6_nd_events(ip="::")
450 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
451 self.bd_add_del(1, is_add=1)
452 self.set_bd_flags(1, arp_term=True, flood=False,
453 uu_flood=False, learn=False)
454 macs = self.mac_list(range(10, 15))
455 hosts = self.ip6_hosts(5, 1, macs)
456 reqs = self.ns_reqs_dst(hosts, dst_host)
457 self.bd_swifs(1)[0].add_stream(reqs)
459 self.pg_enable_capture(self.pg_interfaces)
461 evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
462 for i in range(len(hosts))]
463 ev_hosts = self.nd_event_hosts(evs)
464 self.assertEqual(len(ev_hosts ^ hosts), 0)
466 def test_l2bd_arp_term_13(self):
467 """ L2BD ND term - send duplicate ns, verify suppression
469 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
470 macs = self.mac_list(range(10, 11))
471 hosts = self.ip6_hosts(5, 1, macs)
472 reqs = self.ns_reqs_dst(hosts, dst_host) * 5
473 self.bd_swifs(1)[0].add_stream(reqs)
475 self.pg_enable_capture(self.pg_interfaces)
477 evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
478 for i in range(len(hosts))]
479 ev_hosts = self.nd_event_hosts(evs)
480 self.assertEqual(len(ev_hosts ^ hosts), 0)
482 def test_l2bd_arp_term_14(self):
483 """ L2BD ND term - disable ip4 arp events,send ns, verify no events
485 self.vapi.want_ip6_nd_events(enable_disable=0, ip="::")
486 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
487 macs = self.mac_list(range(10, 15))
488 hosts = self.ip6_hosts(5, 1, macs)
489 reqs = self.ns_reqs_dst(hosts, dst_host)
490 self.bd_swifs(1)[0].add_stream(reqs)
492 self.pg_enable_capture(self.pg_interfaces)
495 self.assertEqual(len(self.vapi.collect_events()), 0)
496 self.bd_add_del(1, is_add=0)
499 if __name__ == '__main__':
500 unittest.main(testRunner=VppTestRunner)