2 """ L2BD ARP term Test """
8 from socket import AF_INET, AF_INET6
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP
13 from scapy.utils import inet_pton, inet_ntop
14 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
15 in6_mactoifaceid, in6_ismaddr
16 from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
17 ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
18 ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
19 ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
21 from framework import VppTestCase, VppTestRunner
22 from util import Host, ppp
25 class TestL2bdArpTerm(VppTestCase):
26 """ L2BD arp termination Test Case """
31 Perform standard class setup (defined by class method setUpClass in
32 class VppTestCase) before running the test case, set test case related
33 variables and configure VPP.
35 super(TestL2bdArpTerm, cls).setUpClass()
38 # Create pg interfaces
40 cls.ifs_per_bd = ifs_per_bd = 3
41 n_ifs = n_bd * ifs_per_bd
42 cls.create_pg_interfaces(range(n_ifs))
44 # Set up all interfaces
45 for i in cls.pg_interfaces:
51 super(TestL2bdArpTerm, cls).tearDownClass()
56 Clear trace and packet infos before running each test.
58 self.reset_packet_infos()
59 super(TestL2bdArpTerm, self).setUp()
63 Show various debug prints after each test.
65 super(TestL2bdArpTerm, self).tearDown()
67 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
68 self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
70 def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
72 ip = e.ip4 if is_ipv6 == 0 else e.ip6
73 self.vapi.bd_ip_mac_add_del(bd_id=bd_id,
80 def mac_list(cls, b6_range):
81 return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
84 def ip4_host(cls, subnet, host, mac):
86 ip4="172.17.1%02u.%u" % (subnet, host))
89 def ip4_hosts(cls, subnet, start, mac_list):
90 return {cls.ip4_host(subnet, start + j, mac_list[j])
91 for j in range(len(mac_list))}
94 def ip6_host(cls, subnet, host, mac):
96 ip6="fd01:%x::%x" % (subnet, host))
99 def ip6_hosts(cls, subnet, start, mac_list):
100 return {cls.ip6_host(subnet, start + j, mac_list[j])
101 for j in range(len(mac_list))}
104 def bd_swifs(cls, b):
107 return [cls.pg_interfaces[j] for j in range(start, start + n)]
109 def bd_add_del(self, bd_id=1, is_add=1):
111 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
112 for swif in self.bd_swifs(bd_id):
113 swif_idx = swif.sw_if_index
114 self.vapi.sw_interface_set_l2_bridge(
115 swif_idx, bd_id=bd_id, enable=is_add)
117 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
120 def arp_req(cls, src_host, host):
121 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
123 hwsrc=src_host.bin_mac,
128 def arp_reqs(cls, src_host, entries):
129 return [cls.arp_req(src_host, e) for e in entries]
132 def garp_req(cls, host):
133 return cls.arp_req(host, host)
136 def garp_reqs(cls, entries):
137 return [cls.garp_req(e) for e in entries]
139 def arp_resp_host(self, src_host, arp_resp):
140 ether = arp_resp[Ether]
141 self.assertEqual(ether.dst, src_host.mac)
144 self.assertEqual(arp.hwtype, 1)
145 self.assertEqual(arp.ptype, 0x800)
146 self.assertEqual(arp.hwlen, 6)
147 self.assertEqual(arp.plen, 4)
148 arp_opts = {"who-has": 1, "is-at": 2}
149 self.assertEqual(arp.op, arp_opts["is-at"])
150 self.assertEqual(arp.hwdst, src_host.mac)
151 self.assertEqual(arp.pdst, src_host.ip4)
152 return Host(mac=arp.hwsrc, ip4=arp.psrc)
154 def arp_resp_hosts(self, src_host, pkts):
155 return {self.arp_resp_host(src_host, p) for p in pkts}
159 o1 = int(ip / 16777216) % 256
160 o2 = int(ip / 65536) % 256
161 o3 = int(ip / 256) % 256
163 return '%s.%s.%s.%s' % (o1, o2, o3, o4)
165 def arp_event_host(self, e):
166 return Host(mac=':'.join(['%02x' % ord(char) for char in e.new_mac]),
167 ip4=self.inttoip4(e.address))
169 def arp_event_hosts(self, evs):
170 return {self.arp_event_host(e) for e in evs}
172 def nd_event_host(self, e):
173 return Host(mac=':'.join(['%02x' % ord(char) for char in e.new_mac]),
174 ip6=inet_ntop(AF_INET6, e.address))
176 def nd_event_hosts(self, evs):
177 return {self.nd_event_host(e) for e in evs}
180 def ns_req(cls, src_host, host):
181 nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
182 d = inet_ntop(AF_INET6, nsma)
183 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
184 IPv6(dst=d, src=src_host.ip6) /
185 ICMPv6ND_NS(tgt=host.ip6) /
186 ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac))
189 def ns_reqs_dst(cls, entries, dst_host):
190 return [cls.ns_req(e, dst_host) for e in entries]
193 def ns_reqs_src(cls, src_host, entries):
194 return [cls.ns_req(src_host, e) for e in entries]
196 def na_resp_host(self, src_host, rx):
197 self.assertEqual(rx[Ether].dst, src_host.mac)
198 self.assertEqual(in6_ptop(rx[IPv6].dst),
199 in6_ptop(src_host.ip6))
201 self.assertTrue(rx.haslayer(ICMPv6ND_NA))
202 self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
205 return Host(mac=na.lladdr, ip6=na.tgt)
207 def na_resp_hosts(self, src_host, pkts):
208 return {self.na_resp_host(src_host, p) for p in pkts}
210 def set_bd_flags(self, bd_id, **args):
212 Enable/disable defined feature(s) of the bridge domain.
214 :param int bd_id: Bridge domain ID.
215 :param list args: List of feature/status pairs. Allowed features: \
216 learn, forward, flood, uu_flood and arp_term. Status False means \
217 disable, status True means enable the feature.
218 :raise: ValueError in case of unknown feature in the input.
222 feature_bitmap = 1 << 0
223 elif flag == "forward":
224 feature_bitmap = 1 << 1
225 elif flag == "flood":
226 feature_bitmap = 1 << 2
227 elif flag == "uu_flood":
228 feature_bitmap = 1 << 3
229 elif flag == "arp_term":
230 feature_bitmap = 1 << 4
232 raise ValueError("Unknown feature used: %s" % flag)
233 is_set = 1 if args[flag] else 0
234 self.vapi.bridge_flags(bd_id, is_set, feature_bitmap)
235 self.logger.info("Bridge domain ID %d updated" % bd_id)
237 def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
238 reqs = self.arp_reqs(src_host, req_hosts)
240 for swif in self.bd_swifs(bd_id):
241 swif.add_stream(reqs)
243 self.pg_enable_capture(self.pg_interfaces)
246 for swif in self.bd_swifs(bd_id):
247 resp_pkts = swif.get_capture(len(resp_hosts))
248 resps = self.arp_resp_hosts(src_host, resp_pkts)
249 self.assertEqual(len(resps ^ resp_hosts), 0)
251 def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
252 reqs = self.ns_reqs_src(src_host, req_hosts)
254 for swif in self.bd_swifs(bd_id):
255 swif.add_stream(reqs)
257 self.pg_enable_capture(self.pg_interfaces)
260 for swif in self.bd_swifs(bd_id):
261 resp_pkts = swif.get_capture(len(resp_hosts))
262 resps = self.na_resp_hosts(src_host, resp_pkts)
263 self.assertEqual(len(resps ^ resp_hosts), 0)
265 def test_l2bd_arp_term_01(self):
266 """ L2BD arp term - add 5 hosts, verify arp responses
268 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
269 self.bd_add_del(1, is_add=1)
270 self.set_bd_flags(1, arp_term=True, flood=False,
271 uu_flood=False, learn=False)
272 macs = self.mac_list(range(1, 5))
273 hosts = self.ip4_hosts(4, 1, macs)
274 self.add_del_arp_term_hosts(hosts, is_add=1)
276 self.verify_arp(src_host, hosts, hosts)
277 type(self).hosts = hosts
279 def test_l2bd_arp_term_02(self):
280 """ L2BD arp term - delete 3 hosts, verify arp responses
282 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
283 macs = self.mac_list(range(1, 3))
284 deleted = self.ip4_hosts(4, 1, macs)
285 self.add_del_arp_term_hosts(deleted, is_add=0)
286 remaining = self.hosts - deleted
287 self.verify_arp(src_host, self.hosts, remaining)
288 type(self).hosts = remaining
289 self.bd_add_del(1, is_add=0)
291 def test_l2bd_arp_term_03(self):
292 """ L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses
294 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
295 self.bd_add_del(1, is_add=1)
296 self.set_bd_flags(1, arp_term=True, flood=False,
297 uu_flood=False, learn=False)
298 macs = self.mac_list(range(1, 3))
299 readded = self.ip4_hosts(4, 1, macs)
300 self.add_del_arp_term_hosts(readded, is_add=1)
301 self.verify_arp(src_host, self.hosts | readded, readded)
302 type(self).hosts = readded
304 def test_l2bd_arp_term_04(self):
305 """ L2BD arp term - 2 IP4 addrs per host
307 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
308 macs = self.mac_list(range(1, 3))
309 sub5_hosts = self.ip4_hosts(5, 1, macs)
310 self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
311 hosts = self.hosts | sub5_hosts
312 self.verify_arp(src_host, hosts, hosts)
313 type(self).hosts = hosts
314 self.bd_add_del(1, is_add=0)
316 def test_l2bd_arp_term_05(self):
317 """ L2BD arp term - create and update 10 IP4-mac pairs
319 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
320 self.bd_add_del(1, is_add=1)
321 self.set_bd_flags(1, arp_term=True, flood=False,
322 uu_flood=False, learn=False)
323 macs1 = self.mac_list(range(10, 20))
324 hosts1 = self.ip4_hosts(5, 1, macs1)
325 self.add_del_arp_term_hosts(hosts1, is_add=1)
326 self.verify_arp(src_host, hosts1, hosts1)
327 macs2 = self.mac_list(range(20, 30))
328 hosts2 = self.ip4_hosts(5, 1, macs2)
329 self.add_del_arp_term_hosts(hosts2, is_add=1)
330 self.verify_arp(src_host, hosts1, hosts2)
331 self.bd_add_del(1, is_add=0)
333 def test_l2bd_arp_term_06(self):
334 """ L2BD arp/ND term - hosts with both ip4/ip6
336 src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
337 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
338 self.bd_add_del(1, is_add=1)
339 # enable flood to make sure requests are not flooded
340 self.set_bd_flags(1, arp_term=True, flood=True,
341 uu_flood=False, learn=False)
342 macs = self.mac_list(range(10, 20))
343 hosts6 = self.ip6_hosts(5, 1, macs)
344 hosts4 = self.ip4_hosts(5, 1, macs)
345 self.add_del_arp_term_hosts(hosts4, is_add=1)
346 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
347 self.verify_arp(src_host4, hosts4, hosts4)
348 self.verify_nd(src_host6, hosts6, hosts6)
349 self.bd_add_del(1, is_add=0)
351 def test_l2bd_arp_term_07(self):
352 """ L2BD ND term - Add and Del hosts, verify ND replies
354 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
355 self.bd_add_del(1, is_add=1)
356 self.set_bd_flags(1, arp_term=True, flood=False,
357 uu_flood=False, learn=False)
358 macs = self.mac_list(range(10, 20))
359 hosts6 = self.ip6_hosts(5, 1, macs)
360 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
361 self.verify_nd(src_host6, hosts6, hosts6)
362 del_macs = self.mac_list(range(10, 15))
363 deleted = self.ip6_hosts(5, 1, del_macs)
364 self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
365 self.verify_nd(src_host6, hosts6, hosts6 - deleted)
366 self.bd_add_del(1, is_add=0)
368 def test_l2bd_arp_term_08(self):
369 """ L2BD ND term - Add and update IP+mac, verify ND replies
371 src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
372 self.bd_add_del(1, is_add=1)
373 self.set_bd_flags(1, arp_term=True, flood=False,
374 uu_flood=False, learn=False)
375 macs1 = self.mac_list(range(10, 20))
376 hosts = self.ip6_hosts(5, 1, macs1)
377 self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
378 self.verify_nd(src_host, hosts, hosts)
379 macs2 = self.mac_list(range(20, 30))
380 updated = self.ip6_hosts(5, 1, macs2)
381 self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
382 self.verify_nd(src_host, hosts, updated)
383 self.bd_add_del(1, is_add=0)
385 def test_l2bd_arp_term_09(self):
386 """ L2BD arp term - send garps, verify arp event reports
388 self.vapi.want_ip4_arp_events()
389 self.bd_add_del(1, is_add=1)
390 self.set_bd_flags(1, arp_term=True, flood=False,
391 uu_flood=False, learn=False)
392 macs = self.mac_list(range(90, 95))
393 hosts = self.ip4_hosts(5, 1, macs)
395 garps = self.garp_reqs(hosts)
396 self.bd_swifs(1)[0].add_stream(garps)
398 self.pg_enable_capture(self.pg_interfaces)
400 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
401 for i in range(len(hosts))]
402 ev_hosts = self.arp_event_hosts(evs)
403 self.assertEqual(len(ev_hosts ^ hosts), 0)
405 def test_l2bd_arp_term_10(self):
406 """ L2BD arp term - send duplicate garps, verify suppression
408 macs = self.mac_list(range(70, 71))
409 hosts = self.ip4_hosts(6, 1, macs)
411 """ send the packet 5 times expect one event
413 garps = self.garp_reqs(hosts) * 5
414 self.bd_swifs(1)[0].add_stream(garps)
416 self.pg_enable_capture(self.pg_interfaces)
418 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
419 for i in range(len(hosts))]
420 ev_hosts = self.arp_event_hosts(evs)
421 self.assertEqual(len(ev_hosts ^ hosts), 0)
423 def test_l2bd_arp_term_11(self):
424 """ L2BD arp term - disable ip4 arp events,send garps, verify no events
426 self.vapi.want_ip4_arp_events(enable_disable=0)
427 macs = self.mac_list(range(90, 95))
428 hosts = self.ip4_hosts(5, 1, macs)
430 garps = self.garp_reqs(hosts)
431 self.bd_swifs(1)[0].add_stream(garps)
433 self.pg_enable_capture(self.pg_interfaces)
436 self.assertEqual(len(self.vapi.collect_events()), 0)
437 self.bd_add_del(1, is_add=0)
439 def test_l2bd_arp_term_12(self):
440 """ L2BD ND term - send NS packets verify reports
442 self.vapi.want_ip6_nd_events(address=inet_pton(AF_INET6, "::0"))
443 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
444 self.bd_add_del(1, is_add=1)
445 self.set_bd_flags(1, arp_term=True, flood=False,
446 uu_flood=False, learn=False)
447 macs = self.mac_list(range(10, 15))
448 hosts = self.ip6_hosts(5, 1, macs)
449 reqs = self.ns_reqs_dst(hosts, dst_host)
450 self.bd_swifs(1)[0].add_stream(reqs)
452 self.pg_enable_capture(self.pg_interfaces)
454 evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
455 for i in range(len(hosts))]
456 ev_hosts = self.nd_event_hosts(evs)
457 self.assertEqual(len(ev_hosts ^ hosts), 0)
459 def test_l2bd_arp_term_13(self):
460 """ L2BD ND term - send duplicate ns, verify suppression
462 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
463 macs = self.mac_list(range(10, 11))
464 hosts = self.ip6_hosts(5, 1, macs)
465 reqs = self.ns_reqs_dst(hosts, dst_host) * 5
466 self.bd_swifs(1)[0].add_stream(reqs)
468 self.pg_enable_capture(self.pg_interfaces)
470 evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
471 for i in range(len(hosts))]
472 ev_hosts = self.nd_event_hosts(evs)
473 self.assertEqual(len(ev_hosts ^ hosts), 0)
475 def test_l2bd_arp_term_14(self):
476 """ L2BD ND term - disable ip4 arp events,send ns, verify no events
478 self.vapi.want_ip6_nd_events(enable_disable=0,
479 address=inet_pton(AF_INET6, "::0"))
480 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
481 macs = self.mac_list(range(10, 15))
482 hosts = self.ip6_hosts(5, 1, macs)
483 reqs = self.ns_reqs_dst(hosts, dst_host)
484 self.bd_swifs(1)[0].add_stream(reqs)
486 self.pg_enable_capture(self.pg_interfaces)
489 self.assertEqual(len(self.vapi.collect_events()), 0)
490 self.bd_add_del(1, is_add=0)
493 if __name__ == '__main__':
494 unittest.main(testRunner=VppTestRunner)