2 """ L2BD ARP term Test """
8 from socket import AF_INET, AF_INET6
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP
13 from scapy.utils import inet_pton, inet_ntop
14 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
15 in6_mactoifaceid, in6_ismaddr
16 from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
17 ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
18 ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
19 ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
21 from framework import VppTestCase, VppTestRunner
22 from util import Host, ppp
25 class TestL2bdArpTerm(VppTestCase):
26 """ L2BD arp termination Test Case """
31 Perform standard class setup (defined by class method setUpClass in
32 class VppTestCase) before running the test case, set test case related
33 variables and configure VPP.
35 super(TestL2bdArpTerm, cls).setUpClass()
38 # Create pg interfaces
40 cls.ifs_per_bd = ifs_per_bd = 3
41 n_ifs = n_bd * ifs_per_bd
42 cls.create_pg_interfaces(range(n_ifs))
44 # Set up all interfaces
45 for i in cls.pg_interfaces:
51 super(TestL2bdArpTerm, cls).tearDownClass()
56 Clear trace and packet infos before running each test.
58 self.reset_packet_infos()
59 super(TestL2bdArpTerm, self).setUp()
63 Show various debug prints after each test.
65 super(TestL2bdArpTerm, self).tearDown()
67 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
68 self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
70 def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
72 ip = e.ip4 if is_ipv6 == 0 else e.ip6
73 self.vapi.bd_ip_mac_add_del(bd_id=bd_id, is_add=is_add, ip=ip,
77 def mac_list(cls, b6_range):
78 return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
81 def ip4_host(cls, subnet, host, mac):
83 ip4="172.17.1%02u.%u" % (subnet, host))
86 def ip4_hosts(cls, subnet, start, mac_list):
87 return {cls.ip4_host(subnet, start + j, mac_list[j])
88 for j in range(len(mac_list))}
91 def ip6_host(cls, subnet, host, mac):
93 ip6="fd01:%x::%x" % (subnet, host))
96 def ip6_hosts(cls, subnet, start, mac_list):
97 return {cls.ip6_host(subnet, start + j, mac_list[j])
98 for j in range(len(mac_list))}
101 def bd_swifs(cls, b):
104 return [cls.pg_interfaces[j] for j in range(start, start + n)]
106 def bd_add_del(self, bd_id=1, is_add=1):
108 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
109 for swif in self.bd_swifs(bd_id):
110 swif_idx = swif.sw_if_index
111 self.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=swif_idx,
112 bd_id=bd_id, enable=is_add)
114 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
117 def arp_req(cls, src_host, host):
118 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
120 hwsrc=src_host.bin_mac,
125 def arp_reqs(cls, src_host, entries):
126 return [cls.arp_req(src_host, e) for e in entries]
129 def garp_req(cls, host):
130 return cls.arp_req(host, host)
133 def garp_reqs(cls, entries):
134 return [cls.garp_req(e) for e in entries]
136 def arp_resp_host(self, src_host, arp_resp):
137 ether = arp_resp[Ether]
138 self.assertEqual(ether.dst, src_host.mac)
141 self.assertEqual(arp.hwtype, 1)
142 self.assertEqual(arp.ptype, 0x800)
143 self.assertEqual(arp.hwlen, 6)
144 self.assertEqual(arp.plen, 4)
145 arp_opts = {"who-has": 1, "is-at": 2}
146 self.assertEqual(arp.op, arp_opts["is-at"])
147 self.assertEqual(arp.hwdst, src_host.mac)
148 self.assertEqual(arp.pdst, src_host.ip4)
149 return Host(mac=arp.hwsrc, ip4=arp.psrc)
151 def arp_resp_hosts(self, src_host, pkts):
152 return {self.arp_resp_host(src_host, p) for p in pkts}
156 o1 = int(ip / 16777216) % 256
157 o2 = int(ip / 65536) % 256
158 o3 = int(ip / 256) % 256
160 return '%s.%s.%s.%s' % (o1, o2, o3, o4)
162 def arp_event_host(self, e):
163 return Host(str(e.mac), ip4=str(e.ip))
165 def arp_event_hosts(self, evs):
166 return {self.arp_event_host(e) for e in evs}
168 def nd_event_host(self, e):
169 return Host(str(e.mac), ip6=str(e.ip))
171 def nd_event_hosts(self, evs):
172 return {self.nd_event_host(e) for e in evs}
175 def ns_req(cls, src_host, host):
176 nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
177 d = inet_ntop(AF_INET6, nsma)
178 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
179 IPv6(dst=d, src=src_host.ip6) /
180 ICMPv6ND_NS(tgt=host.ip6) /
181 ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac))
184 def ns_reqs_dst(cls, entries, dst_host):
185 return [cls.ns_req(e, dst_host) for e in entries]
188 def ns_reqs_src(cls, src_host, entries):
189 return [cls.ns_req(src_host, e) for e in entries]
191 def na_resp_host(self, src_host, rx):
192 self.assertEqual(rx[Ether].dst, src_host.mac)
193 self.assertEqual(in6_ptop(rx[IPv6].dst),
194 in6_ptop(src_host.ip6))
196 self.assertTrue(rx.haslayer(ICMPv6ND_NA))
197 self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
200 return Host(mac=na.lladdr, ip6=na.tgt)
202 def na_resp_hosts(self, src_host, pkts):
203 return {self.na_resp_host(src_host, p) for p in pkts}
205 def set_bd_flags(self, bd_id, **args):
207 Enable/disable defined feature(s) of the bridge domain.
209 :param int bd_id: Bridge domain ID.
210 :param list args: List of feature/status pairs. Allowed features: \
211 learn, forward, flood, uu_flood and arp_term. Status False means \
212 disable, status True means enable the feature.
213 :raise: ValueError in case of unknown feature in the input.
217 feature_bitmap = 1 << 0
218 elif flag == "forward":
219 feature_bitmap = 1 << 1
220 elif flag == "flood":
221 feature_bitmap = 1 << 2
222 elif flag == "uu_flood":
223 feature_bitmap = 1 << 3
224 elif flag == "arp_term":
225 feature_bitmap = 1 << 4
227 raise ValueError("Unknown feature used: %s" % flag)
228 is_set = 1 if args[flag] else 0
229 self.vapi.bridge_flags(bd_id=bd_id, is_set=is_set,
230 flags=feature_bitmap)
231 self.logger.info("Bridge domain ID %d updated" % bd_id)
233 def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
234 reqs = self.arp_reqs(src_host, req_hosts)
236 for swif in self.bd_swifs(bd_id):
237 swif.add_stream(reqs)
239 self.pg_enable_capture(self.pg_interfaces)
242 for swif in self.bd_swifs(bd_id):
243 resp_pkts = swif.get_capture(len(resp_hosts))
244 resps = self.arp_resp_hosts(src_host, resp_pkts)
245 self.assertEqual(len(resps ^ resp_hosts), 0)
247 def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
248 reqs = self.ns_reqs_src(src_host, req_hosts)
250 for swif in self.bd_swifs(bd_id):
251 swif.add_stream(reqs)
253 self.pg_enable_capture(self.pg_interfaces)
256 for swif in self.bd_swifs(bd_id):
257 resp_pkts = swif.get_capture(len(resp_hosts))
258 resps = self.na_resp_hosts(src_host, resp_pkts)
259 self.assertEqual(len(resps ^ resp_hosts), 0)
261 def test_l2bd_arp_term_01(self):
262 """ L2BD arp term - add 5 hosts, verify arp responses
264 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
265 self.bd_add_del(1, is_add=1)
266 self.set_bd_flags(1, arp_term=True, flood=False,
267 uu_flood=False, learn=False)
268 macs = self.mac_list(range(1, 5))
269 hosts = self.ip4_hosts(4, 1, macs)
270 self.add_del_arp_term_hosts(hosts, is_add=1)
272 self.verify_arp(src_host, hosts, hosts)
273 type(self).hosts = hosts
275 def test_l2bd_arp_term_02(self):
276 """ L2BD arp term - delete 3 hosts, verify arp responses
278 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
279 macs = self.mac_list(range(1, 3))
280 deleted = self.ip4_hosts(4, 1, macs)
281 self.add_del_arp_term_hosts(deleted, is_add=0)
282 remaining = self.hosts - deleted
283 self.verify_arp(src_host, self.hosts, remaining)
284 type(self).hosts = remaining
285 self.bd_add_del(1, is_add=0)
287 def test_l2bd_arp_term_03(self):
288 """ L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses
290 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
291 self.bd_add_del(1, is_add=1)
292 self.set_bd_flags(1, arp_term=True, flood=False,
293 uu_flood=False, learn=False)
294 macs = self.mac_list(range(1, 3))
295 readded = self.ip4_hosts(4, 1, macs)
296 self.add_del_arp_term_hosts(readded, is_add=1)
297 self.verify_arp(src_host, self.hosts | readded, readded)
298 type(self).hosts = readded
300 def test_l2bd_arp_term_04(self):
301 """ L2BD arp term - 2 IP4 addrs per host
303 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
304 macs = self.mac_list(range(1, 3))
305 sub5_hosts = self.ip4_hosts(5, 1, macs)
306 self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
307 hosts = self.hosts | sub5_hosts
308 self.verify_arp(src_host, hosts, hosts)
309 type(self).hosts = hosts
310 self.bd_add_del(1, is_add=0)
312 def test_l2bd_arp_term_05(self):
313 """ L2BD arp term - create and update 10 IP4-mac pairs
315 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
316 self.bd_add_del(1, is_add=1)
317 self.set_bd_flags(1, arp_term=True, flood=False,
318 uu_flood=False, learn=False)
319 macs1 = self.mac_list(range(10, 20))
320 hosts1 = self.ip4_hosts(5, 1, macs1)
321 self.add_del_arp_term_hosts(hosts1, is_add=1)
322 self.verify_arp(src_host, hosts1, hosts1)
323 macs2 = self.mac_list(range(20, 30))
324 hosts2 = self.ip4_hosts(5, 1, macs2)
325 self.add_del_arp_term_hosts(hosts2, is_add=1)
326 self.verify_arp(src_host, hosts1, hosts2)
327 self.bd_add_del(1, is_add=0)
329 def test_l2bd_arp_term_06(self):
330 """ L2BD arp/ND term - hosts with both ip4/ip6
332 src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
333 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
334 self.bd_add_del(1, is_add=1)
335 # enable flood to make sure requests are not flooded
336 self.set_bd_flags(1, arp_term=True, flood=True,
337 uu_flood=False, learn=False)
338 macs = self.mac_list(range(10, 20))
339 hosts6 = self.ip6_hosts(5, 1, macs)
340 hosts4 = self.ip4_hosts(5, 1, macs)
341 self.add_del_arp_term_hosts(hosts4, is_add=1)
342 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
343 self.verify_arp(src_host4, hosts4, hosts4)
344 self.verify_nd(src_host6, hosts6, hosts6)
345 self.bd_add_del(1, is_add=0)
347 def test_l2bd_arp_term_07(self):
348 """ L2BD ND term - Add and Del hosts, verify ND replies
350 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
351 self.bd_add_del(1, is_add=1)
352 self.set_bd_flags(1, arp_term=True, flood=False,
353 uu_flood=False, learn=False)
354 macs = self.mac_list(range(10, 20))
355 hosts6 = self.ip6_hosts(5, 1, macs)
356 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
357 self.verify_nd(src_host6, hosts6, hosts6)
358 del_macs = self.mac_list(range(10, 15))
359 deleted = self.ip6_hosts(5, 1, del_macs)
360 self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
361 self.verify_nd(src_host6, hosts6, hosts6 - deleted)
362 self.bd_add_del(1, is_add=0)
364 def test_l2bd_arp_term_08(self):
365 """ L2BD ND term - Add and update IP+mac, verify ND replies
367 src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
368 self.bd_add_del(1, is_add=1)
369 self.set_bd_flags(1, arp_term=True, flood=False,
370 uu_flood=False, learn=False)
371 macs1 = self.mac_list(range(10, 20))
372 hosts = self.ip6_hosts(5, 1, macs1)
373 self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
374 self.verify_nd(src_host, hosts, hosts)
375 macs2 = self.mac_list(range(20, 30))
376 updated = self.ip6_hosts(5, 1, macs2)
377 self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
378 self.verify_nd(src_host, hosts, updated)
379 self.bd_add_del(1, is_add=0)
381 def test_l2bd_arp_term_09(self):
382 """ L2BD arp term - send garps, verify arp event reports
384 self.vapi.want_ip4_arp_events()
385 self.bd_add_del(1, is_add=1)
386 self.set_bd_flags(1, arp_term=True, flood=False,
387 uu_flood=False, learn=False)
388 macs = self.mac_list(range(90, 95))
389 hosts = self.ip4_hosts(5, 1, macs)
391 garps = self.garp_reqs(hosts)
392 self.bd_swifs(1)[0].add_stream(garps)
394 self.pg_enable_capture(self.pg_interfaces)
396 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
397 for i in range(len(hosts))]
398 ev_hosts = self.arp_event_hosts(evs)
399 self.assertEqual(len(ev_hosts ^ hosts), 0)
401 def test_l2bd_arp_term_10(self):
402 """ L2BD arp term - send duplicate garps, verify suppression
404 macs = self.mac_list(range(70, 71))
405 hosts = self.ip4_hosts(6, 1, macs)
407 """ send the packet 5 times expect one event
409 garps = self.garp_reqs(hosts) * 5
410 self.bd_swifs(1)[0].add_stream(garps)
412 self.pg_enable_capture(self.pg_interfaces)
414 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
415 for i in range(len(hosts))]
416 ev_hosts = self.arp_event_hosts(evs)
417 self.assertEqual(len(ev_hosts ^ hosts), 0)
419 def test_l2bd_arp_term_11(self):
420 """ L2BD arp term - disable ip4 arp events,send garps, verify no events
422 self.vapi.want_ip4_arp_events(enable_disable=0)
423 macs = self.mac_list(range(90, 95))
424 hosts = self.ip4_hosts(5, 1, macs)
426 garps = self.garp_reqs(hosts)
427 self.bd_swifs(1)[0].add_stream(garps)
429 self.pg_enable_capture(self.pg_interfaces)
432 self.assertEqual(len(self.vapi.collect_events()), 0)
433 self.bd_add_del(1, is_add=0)
435 def test_l2bd_arp_term_12(self):
436 """ L2BD ND term - send NS packets verify reports
438 self.vapi.want_ip6_nd_events(ip="::")
439 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
440 self.bd_add_del(1, is_add=1)
441 self.set_bd_flags(1, arp_term=True, flood=False,
442 uu_flood=False, learn=False)
443 macs = self.mac_list(range(10, 15))
444 hosts = self.ip6_hosts(5, 1, macs)
445 reqs = self.ns_reqs_dst(hosts, dst_host)
446 self.bd_swifs(1)[0].add_stream(reqs)
448 self.pg_enable_capture(self.pg_interfaces)
450 evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
451 for i in range(len(hosts))]
452 ev_hosts = self.nd_event_hosts(evs)
453 self.assertEqual(len(ev_hosts ^ hosts), 0)
455 def test_l2bd_arp_term_13(self):
456 """ L2BD ND term - send duplicate ns, verify suppression
458 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
459 macs = self.mac_list(range(10, 11))
460 hosts = self.ip6_hosts(5, 1, macs)
461 reqs = self.ns_reqs_dst(hosts, dst_host) * 5
462 self.bd_swifs(1)[0].add_stream(reqs)
464 self.pg_enable_capture(self.pg_interfaces)
466 evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
467 for i in range(len(hosts))]
468 ev_hosts = self.nd_event_hosts(evs)
469 self.assertEqual(len(ev_hosts ^ hosts), 0)
471 def test_l2bd_arp_term_14(self):
472 """ L2BD ND term - disable ip4 arp events,send ns, verify no events
474 self.vapi.want_ip6_nd_events(enable_disable=0, ip="::")
475 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
476 macs = self.mac_list(range(10, 15))
477 hosts = self.ip6_hosts(5, 1, macs)
478 reqs = self.ns_reqs_dst(hosts, dst_host)
479 self.bd_swifs(1)[0].add_stream(reqs)
481 self.pg_enable_capture(self.pg_interfaces)
484 self.assertEqual(len(self.vapi.collect_events()), 0)
485 self.bd_add_del(1, is_add=0)
488 if __name__ == '__main__':
489 unittest.main(testRunner=VppTestRunner)