2 """ L2BD ARP term Test """
8 from socket import AF_INET, AF_INET6
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP
13 from scapy.utils import inet_pton, inet_ntop
14 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
15 in6_mactoifaceid, in6_ismaddr
16 from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
17 ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
18 ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
19 ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
21 from framework import VppTestCase, VppTestRunner
22 from util import Host, ppp
25 class TestL2bdArpTerm(VppTestCase):
26 """ L2BD arp termination Test Case """
31 Perform standard class setup (defined by class method setUpClass in
32 class VppTestCase) before running the test case, set test case related
33 variables and configure VPP.
35 super(TestL2bdArpTerm, cls).setUpClass()
38 # Create pg interfaces
40 cls.ifs_per_bd = ifs_per_bd = 3
41 n_ifs = n_bd * ifs_per_bd
42 cls.create_pg_interfaces(range(n_ifs))
44 # Set up all interfaces
45 for i in cls.pg_interfaces:
51 super(TestL2bdArpTerm, cls).tearDownClass()
56 Clear trace and packet infos before running each test.
58 self.reset_packet_infos()
59 super(TestL2bdArpTerm, self).setUp()
63 Show various debug prints after each test.
65 super(TestL2bdArpTerm, self).tearDown()
67 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
68 self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
70 def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
72 ip = e.ip4 if is_ipv6 == 0 else e.ip6
73 self.vapi.bd_ip_mac_add_del(bd_id=bd_id, is_add=is_add, ip=ip,
77 def mac_list(cls, b6_range):
78 return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
81 def ip4_host(cls, subnet, host, mac):
83 ip4="172.17.1%02u.%u" % (subnet, host))
86 def ip4_hosts(cls, subnet, start, mac_list):
87 return {cls.ip4_host(subnet, start + j, mac_list[j])
88 for j in range(len(mac_list))}
91 def ip6_host(cls, subnet, host, mac):
93 ip6="fd01:%x::%x" % (subnet, host))
96 def ip6_hosts(cls, subnet, start, mac_list):
97 return {cls.ip6_host(subnet, start + j, mac_list[j])
98 for j in range(len(mac_list))}
101 def bd_swifs(cls, b):
104 return [cls.pg_interfaces[j] for j in range(start, start + n)]
106 def bd_add_del(self, bd_id=1, is_add=1):
108 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
109 for swif in self.bd_swifs(bd_id):
110 swif_idx = swif.sw_if_index
111 self.vapi.sw_interface_set_l2_bridge(
112 swif_idx, bd_id=bd_id, enable=is_add)
114 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
117 def arp_req(cls, src_host, host):
118 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
120 hwsrc=src_host.bin_mac,
125 def arp_reqs(cls, src_host, entries):
126 return [cls.arp_req(src_host, e) for e in entries]
129 def garp_req(cls, host):
130 return cls.arp_req(host, host)
133 def garp_reqs(cls, entries):
134 return [cls.garp_req(e) for e in entries]
136 def arp_resp_host(self, src_host, arp_resp):
137 ether = arp_resp[Ether]
138 self.assertEqual(ether.dst, src_host.mac)
141 self.assertEqual(arp.hwtype, 1)
142 self.assertEqual(arp.ptype, 0x800)
143 self.assertEqual(arp.hwlen, 6)
144 self.assertEqual(arp.plen, 4)
145 arp_opts = {"who-has": 1, "is-at": 2}
146 self.assertEqual(arp.op, arp_opts["is-at"])
147 self.assertEqual(arp.hwdst, src_host.mac)
148 self.assertEqual(arp.pdst, src_host.ip4)
149 return Host(mac=arp.hwsrc, ip4=arp.psrc)
151 def arp_resp_hosts(self, src_host, pkts):
152 return {self.arp_resp_host(src_host, p) for p in pkts}
156 o1 = int(ip / 16777216) % 256
157 o2 = int(ip / 65536) % 256
158 o3 = int(ip / 256) % 256
160 return '%s.%s.%s.%s' % (o1, o2, o3, o4)
162 def arp_event_host(self, e):
163 return Host(str(e.mac), ip4=str(e.ip))
165 def arp_event_hosts(self, evs):
166 return {self.arp_event_host(e) for e in evs}
168 def nd_event_host(self, e):
169 return Host(str(e.mac), ip6=str(e.ip))
171 def nd_event_hosts(self, evs):
172 return {self.nd_event_host(e) for e in evs}
175 def ns_req(cls, src_host, host):
176 nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
177 d = inet_ntop(AF_INET6, nsma)
178 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
179 IPv6(dst=d, src=src_host.ip6) /
180 ICMPv6ND_NS(tgt=host.ip6) /
181 ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac))
184 def ns_reqs_dst(cls, entries, dst_host):
185 return [cls.ns_req(e, dst_host) for e in entries]
188 def ns_reqs_src(cls, src_host, entries):
189 return [cls.ns_req(src_host, e) for e in entries]
191 def na_resp_host(self, src_host, rx):
192 self.assertEqual(rx[Ether].dst, src_host.mac)
193 self.assertEqual(in6_ptop(rx[IPv6].dst),
194 in6_ptop(src_host.ip6))
196 self.assertTrue(rx.haslayer(ICMPv6ND_NA))
197 self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
200 return Host(mac=na.lladdr, ip6=na.tgt)
202 def na_resp_hosts(self, src_host, pkts):
203 return {self.na_resp_host(src_host, p) for p in pkts}
205 def set_bd_flags(self, bd_id, **args):
207 Enable/disable defined feature(s) of the bridge domain.
209 :param int bd_id: Bridge domain ID.
210 :param list args: List of feature/status pairs. Allowed features: \
211 learn, forward, flood, uu_flood and arp_term. Status False means \
212 disable, status True means enable the feature.
213 :raise: ValueError in case of unknown feature in the input.
217 feature_bitmap = 1 << 0
218 elif flag == "forward":
219 feature_bitmap = 1 << 1
220 elif flag == "flood":
221 feature_bitmap = 1 << 2
222 elif flag == "uu_flood":
223 feature_bitmap = 1 << 3
224 elif flag == "arp_term":
225 feature_bitmap = 1 << 4
227 raise ValueError("Unknown feature used: %s" % flag)
228 is_set = 1 if args[flag] else 0
229 self.vapi.bridge_flags(bd_id, is_set, feature_bitmap)
230 self.logger.info("Bridge domain ID %d updated" % bd_id)
232 def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
233 reqs = self.arp_reqs(src_host, req_hosts)
235 for swif in self.bd_swifs(bd_id):
236 swif.add_stream(reqs)
238 self.pg_enable_capture(self.pg_interfaces)
241 for swif in self.bd_swifs(bd_id):
242 resp_pkts = swif.get_capture(len(resp_hosts))
243 resps = self.arp_resp_hosts(src_host, resp_pkts)
244 self.assertEqual(len(resps ^ resp_hosts), 0)
246 def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
247 reqs = self.ns_reqs_src(src_host, req_hosts)
249 for swif in self.bd_swifs(bd_id):
250 swif.add_stream(reqs)
252 self.pg_enable_capture(self.pg_interfaces)
255 for swif in self.bd_swifs(bd_id):
256 resp_pkts = swif.get_capture(len(resp_hosts))
257 resps = self.na_resp_hosts(src_host, resp_pkts)
258 self.assertEqual(len(resps ^ resp_hosts), 0)
260 def test_l2bd_arp_term_01(self):
261 """ L2BD arp term - add 5 hosts, verify arp responses
263 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
264 self.bd_add_del(1, is_add=1)
265 self.set_bd_flags(1, arp_term=True, flood=False,
266 uu_flood=False, learn=False)
267 macs = self.mac_list(range(1, 5))
268 hosts = self.ip4_hosts(4, 1, macs)
269 self.add_del_arp_term_hosts(hosts, is_add=1)
271 self.verify_arp(src_host, hosts, hosts)
272 type(self).hosts = hosts
274 def test_l2bd_arp_term_02(self):
275 """ L2BD arp term - delete 3 hosts, verify arp responses
277 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
278 macs = self.mac_list(range(1, 3))
279 deleted = self.ip4_hosts(4, 1, macs)
280 self.add_del_arp_term_hosts(deleted, is_add=0)
281 remaining = self.hosts - deleted
282 self.verify_arp(src_host, self.hosts, remaining)
283 type(self).hosts = remaining
284 self.bd_add_del(1, is_add=0)
286 def test_l2bd_arp_term_03(self):
287 """ L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses
289 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
290 self.bd_add_del(1, is_add=1)
291 self.set_bd_flags(1, arp_term=True, flood=False,
292 uu_flood=False, learn=False)
293 macs = self.mac_list(range(1, 3))
294 readded = self.ip4_hosts(4, 1, macs)
295 self.add_del_arp_term_hosts(readded, is_add=1)
296 self.verify_arp(src_host, self.hosts | readded, readded)
297 type(self).hosts = readded
299 def test_l2bd_arp_term_04(self):
300 """ L2BD arp term - 2 IP4 addrs per host
302 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
303 macs = self.mac_list(range(1, 3))
304 sub5_hosts = self.ip4_hosts(5, 1, macs)
305 self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
306 hosts = self.hosts | sub5_hosts
307 self.verify_arp(src_host, hosts, hosts)
308 type(self).hosts = hosts
309 self.bd_add_del(1, is_add=0)
311 def test_l2bd_arp_term_05(self):
312 """ L2BD arp term - create and update 10 IP4-mac pairs
314 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
315 self.bd_add_del(1, is_add=1)
316 self.set_bd_flags(1, arp_term=True, flood=False,
317 uu_flood=False, learn=False)
318 macs1 = self.mac_list(range(10, 20))
319 hosts1 = self.ip4_hosts(5, 1, macs1)
320 self.add_del_arp_term_hosts(hosts1, is_add=1)
321 self.verify_arp(src_host, hosts1, hosts1)
322 macs2 = self.mac_list(range(20, 30))
323 hosts2 = self.ip4_hosts(5, 1, macs2)
324 self.add_del_arp_term_hosts(hosts2, is_add=1)
325 self.verify_arp(src_host, hosts1, hosts2)
326 self.bd_add_del(1, is_add=0)
328 def test_l2bd_arp_term_06(self):
329 """ L2BD arp/ND term - hosts with both ip4/ip6
331 src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
332 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
333 self.bd_add_del(1, is_add=1)
334 # enable flood to make sure requests are not flooded
335 self.set_bd_flags(1, arp_term=True, flood=True,
336 uu_flood=False, learn=False)
337 macs = self.mac_list(range(10, 20))
338 hosts6 = self.ip6_hosts(5, 1, macs)
339 hosts4 = self.ip4_hosts(5, 1, macs)
340 self.add_del_arp_term_hosts(hosts4, is_add=1)
341 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
342 self.verify_arp(src_host4, hosts4, hosts4)
343 self.verify_nd(src_host6, hosts6, hosts6)
344 self.bd_add_del(1, is_add=0)
346 def test_l2bd_arp_term_07(self):
347 """ L2BD ND term - Add and Del hosts, verify ND replies
349 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
350 self.bd_add_del(1, is_add=1)
351 self.set_bd_flags(1, arp_term=True, flood=False,
352 uu_flood=False, learn=False)
353 macs = self.mac_list(range(10, 20))
354 hosts6 = self.ip6_hosts(5, 1, macs)
355 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
356 self.verify_nd(src_host6, hosts6, hosts6)
357 del_macs = self.mac_list(range(10, 15))
358 deleted = self.ip6_hosts(5, 1, del_macs)
359 self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
360 self.verify_nd(src_host6, hosts6, hosts6 - deleted)
361 self.bd_add_del(1, is_add=0)
363 def test_l2bd_arp_term_08(self):
364 """ L2BD ND term - Add and update IP+mac, verify ND replies
366 src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
367 self.bd_add_del(1, is_add=1)
368 self.set_bd_flags(1, arp_term=True, flood=False,
369 uu_flood=False, learn=False)
370 macs1 = self.mac_list(range(10, 20))
371 hosts = self.ip6_hosts(5, 1, macs1)
372 self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
373 self.verify_nd(src_host, hosts, hosts)
374 macs2 = self.mac_list(range(20, 30))
375 updated = self.ip6_hosts(5, 1, macs2)
376 self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
377 self.verify_nd(src_host, hosts, updated)
378 self.bd_add_del(1, is_add=0)
380 def test_l2bd_arp_term_09(self):
381 """ L2BD arp term - send garps, verify arp event reports
383 self.vapi.want_ip4_arp_events()
384 self.bd_add_del(1, is_add=1)
385 self.set_bd_flags(1, arp_term=True, flood=False,
386 uu_flood=False, learn=False)
387 macs = self.mac_list(range(90, 95))
388 hosts = self.ip4_hosts(5, 1, macs)
390 garps = self.garp_reqs(hosts)
391 self.bd_swifs(1)[0].add_stream(garps)
393 self.pg_enable_capture(self.pg_interfaces)
395 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
396 for i in range(len(hosts))]
397 ev_hosts = self.arp_event_hosts(evs)
398 self.assertEqual(len(ev_hosts ^ hosts), 0)
400 def test_l2bd_arp_term_10(self):
401 """ L2BD arp term - send duplicate garps, verify suppression
403 macs = self.mac_list(range(70, 71))
404 hosts = self.ip4_hosts(6, 1, macs)
406 """ send the packet 5 times expect one event
408 garps = self.garp_reqs(hosts) * 5
409 self.bd_swifs(1)[0].add_stream(garps)
411 self.pg_enable_capture(self.pg_interfaces)
413 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
414 for i in range(len(hosts))]
415 ev_hosts = self.arp_event_hosts(evs)
416 self.assertEqual(len(ev_hosts ^ hosts), 0)
418 def test_l2bd_arp_term_11(self):
419 """ L2BD arp term - disable ip4 arp events,send garps, verify no events
421 self.vapi.want_ip4_arp_events(enable_disable=0)
422 macs = self.mac_list(range(90, 95))
423 hosts = self.ip4_hosts(5, 1, macs)
425 garps = self.garp_reqs(hosts)
426 self.bd_swifs(1)[0].add_stream(garps)
428 self.pg_enable_capture(self.pg_interfaces)
431 self.assertEqual(len(self.vapi.collect_events()), 0)
432 self.bd_add_del(1, is_add=0)
434 def test_l2bd_arp_term_12(self):
435 """ L2BD ND term - send NS packets verify reports
437 self.vapi.want_ip6_nd_events(ip="::")
438 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
439 self.bd_add_del(1, is_add=1)
440 self.set_bd_flags(1, arp_term=True, flood=False,
441 uu_flood=False, learn=False)
442 macs = self.mac_list(range(10, 15))
443 hosts = self.ip6_hosts(5, 1, macs)
444 reqs = self.ns_reqs_dst(hosts, dst_host)
445 self.bd_swifs(1)[0].add_stream(reqs)
447 self.pg_enable_capture(self.pg_interfaces)
449 evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
450 for i in range(len(hosts))]
451 ev_hosts = self.nd_event_hosts(evs)
452 self.assertEqual(len(ev_hosts ^ hosts), 0)
454 def test_l2bd_arp_term_13(self):
455 """ L2BD ND term - send duplicate ns, verify suppression
457 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
458 macs = self.mac_list(range(10, 11))
459 hosts = self.ip6_hosts(5, 1, macs)
460 reqs = self.ns_reqs_dst(hosts, dst_host) * 5
461 self.bd_swifs(1)[0].add_stream(reqs)
463 self.pg_enable_capture(self.pg_interfaces)
465 evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
466 for i in range(len(hosts))]
467 ev_hosts = self.nd_event_hosts(evs)
468 self.assertEqual(len(ev_hosts ^ hosts), 0)
470 def test_l2bd_arp_term_14(self):
471 """ L2BD ND term - disable ip4 arp events,send ns, verify no events
473 self.vapi.want_ip6_nd_events(enable_disable=0, ip="::")
474 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
475 macs = self.mac_list(range(10, 15))
476 hosts = self.ip6_hosts(5, 1, macs)
477 reqs = self.ns_reqs_dst(hosts, dst_host)
478 self.bd_swifs(1)[0].add_stream(reqs)
480 self.pg_enable_capture(self.pg_interfaces)
483 self.assertEqual(len(self.vapi.collect_events()), 0)
484 self.bd_add_del(1, is_add=0)
487 if __name__ == '__main__':
488 unittest.main(testRunner=VppTestRunner)