2 """ L2BD ARP term Test """
8 from socket import AF_INET, AF_INET6
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP
13 from scapy.utils import inet_pton, inet_ntop
14 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
15 in6_mactoifaceid, in6_ismaddr
16 from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
17 ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
18 ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
19 ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
21 from framework import VppTestCase, VppTestRunner
22 from util import Host, ppp
25 class TestL2bdArpTerm(VppTestCase):
26 """ L2BD arp termination Test Case """
31 Perform standard class setup (defined by class method setUpClass in
32 class VppTestCase) before running the test case, set test case related
33 variables and configure VPP.
35 super(TestL2bdArpTerm, cls).setUpClass()
38 # Create pg interfaces
40 cls.ifs_per_bd = ifs_per_bd = 3
41 n_ifs = n_bd * ifs_per_bd
42 cls.create_pg_interfaces(range(n_ifs))
44 # Set up all interfaces
45 for i in cls.pg_interfaces:
51 super(TestL2bdArpTerm, cls).tearDownClass()
56 Clear trace and packet infos before running each test.
58 self.reset_packet_infos()
59 super(TestL2bdArpTerm, self).setUp()
63 Show various debug prints after each test.
65 super(TestL2bdArpTerm, self).tearDown()
67 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
68 self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
70 def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
72 ip = e.ip4 if is_ipv6 == 0 else e.ip6
73 self.vapi.bd_ip_mac_add_del(bd_id=bd_id,
80 def mac_list(cls, b6_range):
81 return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
84 def ip4_host(cls, subnet, host, mac):
86 ip4="172.17.1%02u.%u" % (subnet, host))
89 def ip4_hosts(cls, subnet, start, mac_list):
90 return {cls.ip4_host(subnet, start + j, mac_list[j])
91 for j in range(len(mac_list))}
94 def ip6_host(cls, subnet, host, mac):
96 ip6="fd01:%x::%x" % (subnet, host))
99 def ip6_hosts(cls, subnet, start, mac_list):
100 return {cls.ip6_host(subnet, start + j, mac_list[j])
101 for j in range(len(mac_list))}
104 def bd_swifs(cls, b):
107 return [cls.pg_interfaces[j] for j in range(start, start + n)]
109 def bd_add_del(self, bd_id=1, is_add=1):
111 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
112 for swif in self.bd_swifs(bd_id):
113 swif_idx = swif.sw_if_index
114 self.vapi.sw_interface_set_l2_bridge(
115 swif_idx, bd_id=bd_id, enable=is_add)
117 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
120 def arp_req(cls, src_host, host):
121 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
123 hwsrc=src_host.bin_mac,
128 def arp_reqs(cls, src_host, entries):
129 return [cls.arp_req(src_host, e) for e in entries]
132 def garp_req(cls, host):
133 return cls.arp_req(host, host)
136 def garp_reqs(cls, entries):
137 return [cls.garp_req(e) for e in entries]
139 def arp_resp_host(self, src_host, arp_resp):
140 ether = arp_resp[Ether]
141 self.assertEqual(ether.dst, src_host.mac)
144 self.assertEqual(arp.hwtype, 1)
145 self.assertEqual(arp.ptype, 0x800)
146 self.assertEqual(arp.hwlen, 6)
147 self.assertEqual(arp.plen, 4)
148 arp_opts = {"who-has": 1, "is-at": 2}
149 self.assertEqual(arp.op, arp_opts["is-at"])
150 self.assertEqual(arp.hwdst, src_host.mac)
151 self.assertEqual(arp.pdst, src_host.ip4)
152 return Host(mac=arp.hwsrc, ip4=arp.psrc)
154 def arp_resp_hosts(self, src_host, pkts):
155 return {self.arp_resp_host(src_host, p) for p in pkts}
159 o1 = int(ip / 16777216) % 256
160 o2 = int(ip / 65536) % 256
161 o3 = int(ip / 256) % 256
163 return '%s.%s.%s.%s' % (o1, o2, o3, o4)
165 def arp_event_host(self, e):
166 return Host(str(e.mac), ip4=str(e.ip))
168 def arp_event_hosts(self, evs):
169 return {self.arp_event_host(e) for e in evs}
171 def nd_event_host(self, e):
172 return Host(str(e.mac), ip6=str(e.ip))
174 def nd_event_hosts(self, evs):
175 return {self.nd_event_host(e) for e in evs}
178 def ns_req(cls, src_host, host):
179 nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
180 d = inet_ntop(AF_INET6, nsma)
181 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
182 IPv6(dst=d, src=src_host.ip6) /
183 ICMPv6ND_NS(tgt=host.ip6) /
184 ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac))
187 def ns_reqs_dst(cls, entries, dst_host):
188 return [cls.ns_req(e, dst_host) for e in entries]
191 def ns_reqs_src(cls, src_host, entries):
192 return [cls.ns_req(src_host, e) for e in entries]
194 def na_resp_host(self, src_host, rx):
195 self.assertEqual(rx[Ether].dst, src_host.mac)
196 self.assertEqual(in6_ptop(rx[IPv6].dst),
197 in6_ptop(src_host.ip6))
199 self.assertTrue(rx.haslayer(ICMPv6ND_NA))
200 self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
203 return Host(mac=na.lladdr, ip6=na.tgt)
205 def na_resp_hosts(self, src_host, pkts):
206 return {self.na_resp_host(src_host, p) for p in pkts}
208 def set_bd_flags(self, bd_id, **args):
210 Enable/disable defined feature(s) of the bridge domain.
212 :param int bd_id: Bridge domain ID.
213 :param list args: List of feature/status pairs. Allowed features: \
214 learn, forward, flood, uu_flood and arp_term. Status False means \
215 disable, status True means enable the feature.
216 :raise: ValueError in case of unknown feature in the input.
220 feature_bitmap = 1 << 0
221 elif flag == "forward":
222 feature_bitmap = 1 << 1
223 elif flag == "flood":
224 feature_bitmap = 1 << 2
225 elif flag == "uu_flood":
226 feature_bitmap = 1 << 3
227 elif flag == "arp_term":
228 feature_bitmap = 1 << 4
230 raise ValueError("Unknown feature used: %s" % flag)
231 is_set = 1 if args[flag] else 0
232 self.vapi.bridge_flags(bd_id, is_set, feature_bitmap)
233 self.logger.info("Bridge domain ID %d updated" % bd_id)
235 def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
236 reqs = self.arp_reqs(src_host, req_hosts)
238 for swif in self.bd_swifs(bd_id):
239 swif.add_stream(reqs)
241 self.pg_enable_capture(self.pg_interfaces)
244 for swif in self.bd_swifs(bd_id):
245 resp_pkts = swif.get_capture(len(resp_hosts))
246 resps = self.arp_resp_hosts(src_host, resp_pkts)
247 self.assertEqual(len(resps ^ resp_hosts), 0)
249 def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
250 reqs = self.ns_reqs_src(src_host, req_hosts)
252 for swif in self.bd_swifs(bd_id):
253 swif.add_stream(reqs)
255 self.pg_enable_capture(self.pg_interfaces)
258 for swif in self.bd_swifs(bd_id):
259 resp_pkts = swif.get_capture(len(resp_hosts))
260 resps = self.na_resp_hosts(src_host, resp_pkts)
261 self.assertEqual(len(resps ^ resp_hosts), 0)
263 def test_l2bd_arp_term_01(self):
264 """ L2BD arp term - add 5 hosts, verify arp responses
266 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
267 self.bd_add_del(1, is_add=1)
268 self.set_bd_flags(1, arp_term=True, flood=False,
269 uu_flood=False, learn=False)
270 macs = self.mac_list(range(1, 5))
271 hosts = self.ip4_hosts(4, 1, macs)
272 self.add_del_arp_term_hosts(hosts, is_add=1)
274 self.verify_arp(src_host, hosts, hosts)
275 type(self).hosts = hosts
277 def test_l2bd_arp_term_02(self):
278 """ L2BD arp term - delete 3 hosts, verify arp responses
280 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
281 macs = self.mac_list(range(1, 3))
282 deleted = self.ip4_hosts(4, 1, macs)
283 self.add_del_arp_term_hosts(deleted, is_add=0)
284 remaining = self.hosts - deleted
285 self.verify_arp(src_host, self.hosts, remaining)
286 type(self).hosts = remaining
287 self.bd_add_del(1, is_add=0)
289 def test_l2bd_arp_term_03(self):
290 """ L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses
292 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
293 self.bd_add_del(1, is_add=1)
294 self.set_bd_flags(1, arp_term=True, flood=False,
295 uu_flood=False, learn=False)
296 macs = self.mac_list(range(1, 3))
297 readded = self.ip4_hosts(4, 1, macs)
298 self.add_del_arp_term_hosts(readded, is_add=1)
299 self.verify_arp(src_host, self.hosts | readded, readded)
300 type(self).hosts = readded
302 def test_l2bd_arp_term_04(self):
303 """ L2BD arp term - 2 IP4 addrs per host
305 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
306 macs = self.mac_list(range(1, 3))
307 sub5_hosts = self.ip4_hosts(5, 1, macs)
308 self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
309 hosts = self.hosts | sub5_hosts
310 self.verify_arp(src_host, hosts, hosts)
311 type(self).hosts = hosts
312 self.bd_add_del(1, is_add=0)
314 def test_l2bd_arp_term_05(self):
315 """ L2BD arp term - create and update 10 IP4-mac pairs
317 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
318 self.bd_add_del(1, is_add=1)
319 self.set_bd_flags(1, arp_term=True, flood=False,
320 uu_flood=False, learn=False)
321 macs1 = self.mac_list(range(10, 20))
322 hosts1 = self.ip4_hosts(5, 1, macs1)
323 self.add_del_arp_term_hosts(hosts1, is_add=1)
324 self.verify_arp(src_host, hosts1, hosts1)
325 macs2 = self.mac_list(range(20, 30))
326 hosts2 = self.ip4_hosts(5, 1, macs2)
327 self.add_del_arp_term_hosts(hosts2, is_add=1)
328 self.verify_arp(src_host, hosts1, hosts2)
329 self.bd_add_del(1, is_add=0)
331 def test_l2bd_arp_term_06(self):
332 """ L2BD arp/ND term - hosts with both ip4/ip6
334 src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
335 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
336 self.bd_add_del(1, is_add=1)
337 # enable flood to make sure requests are not flooded
338 self.set_bd_flags(1, arp_term=True, flood=True,
339 uu_flood=False, learn=False)
340 macs = self.mac_list(range(10, 20))
341 hosts6 = self.ip6_hosts(5, 1, macs)
342 hosts4 = self.ip4_hosts(5, 1, macs)
343 self.add_del_arp_term_hosts(hosts4, is_add=1)
344 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
345 self.verify_arp(src_host4, hosts4, hosts4)
346 self.verify_nd(src_host6, hosts6, hosts6)
347 self.bd_add_del(1, is_add=0)
349 def test_l2bd_arp_term_07(self):
350 """ L2BD ND term - Add and Del hosts, verify ND replies
352 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
353 self.bd_add_del(1, is_add=1)
354 self.set_bd_flags(1, arp_term=True, flood=False,
355 uu_flood=False, learn=False)
356 macs = self.mac_list(range(10, 20))
357 hosts6 = self.ip6_hosts(5, 1, macs)
358 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
359 self.verify_nd(src_host6, hosts6, hosts6)
360 del_macs = self.mac_list(range(10, 15))
361 deleted = self.ip6_hosts(5, 1, del_macs)
362 self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
363 self.verify_nd(src_host6, hosts6, hosts6 - deleted)
364 self.bd_add_del(1, is_add=0)
366 def test_l2bd_arp_term_08(self):
367 """ L2BD ND term - Add and update IP+mac, verify ND replies
369 src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
370 self.bd_add_del(1, is_add=1)
371 self.set_bd_flags(1, arp_term=True, flood=False,
372 uu_flood=False, learn=False)
373 macs1 = self.mac_list(range(10, 20))
374 hosts = self.ip6_hosts(5, 1, macs1)
375 self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
376 self.verify_nd(src_host, hosts, hosts)
377 macs2 = self.mac_list(range(20, 30))
378 updated = self.ip6_hosts(5, 1, macs2)
379 self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
380 self.verify_nd(src_host, hosts, updated)
381 self.bd_add_del(1, is_add=0)
383 def test_l2bd_arp_term_09(self):
384 """ L2BD arp term - send garps, verify arp event reports
386 self.vapi.want_ip4_arp_events()
387 self.bd_add_del(1, is_add=1)
388 self.set_bd_flags(1, arp_term=True, flood=False,
389 uu_flood=False, learn=False)
390 macs = self.mac_list(range(90, 95))
391 hosts = self.ip4_hosts(5, 1, macs)
393 garps = self.garp_reqs(hosts)
394 self.bd_swifs(1)[0].add_stream(garps)
396 self.pg_enable_capture(self.pg_interfaces)
398 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
399 for i in range(len(hosts))]
400 ev_hosts = self.arp_event_hosts(evs)
401 self.assertEqual(len(ev_hosts ^ hosts), 0)
403 def test_l2bd_arp_term_10(self):
404 """ L2BD arp term - send duplicate garps, verify suppression
406 macs = self.mac_list(range(70, 71))
407 hosts = self.ip4_hosts(6, 1, macs)
409 """ send the packet 5 times expect one event
411 garps = self.garp_reqs(hosts) * 5
412 self.bd_swifs(1)[0].add_stream(garps)
414 self.pg_enable_capture(self.pg_interfaces)
416 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
417 for i in range(len(hosts))]
418 ev_hosts = self.arp_event_hosts(evs)
419 self.assertEqual(len(ev_hosts ^ hosts), 0)
421 def test_l2bd_arp_term_11(self):
422 """ L2BD arp term - disable ip4 arp events,send garps, verify no events
424 self.vapi.want_ip4_arp_events(enable_disable=0)
425 macs = self.mac_list(range(90, 95))
426 hosts = self.ip4_hosts(5, 1, macs)
428 garps = self.garp_reqs(hosts)
429 self.bd_swifs(1)[0].add_stream(garps)
431 self.pg_enable_capture(self.pg_interfaces)
434 self.assertEqual(len(self.vapi.collect_events()), 0)
435 self.bd_add_del(1, is_add=0)
437 def test_l2bd_arp_term_12(self):
438 """ L2BD ND term - send NS packets verify reports
440 self.vapi.want_ip6_nd_events(ip="::")
441 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
442 self.bd_add_del(1, is_add=1)
443 self.set_bd_flags(1, arp_term=True, flood=False,
444 uu_flood=False, learn=False)
445 macs = self.mac_list(range(10, 15))
446 hosts = self.ip6_hosts(5, 1, macs)
447 reqs = self.ns_reqs_dst(hosts, dst_host)
448 self.bd_swifs(1)[0].add_stream(reqs)
450 self.pg_enable_capture(self.pg_interfaces)
452 evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
453 for i in range(len(hosts))]
454 ev_hosts = self.nd_event_hosts(evs)
455 self.assertEqual(len(ev_hosts ^ hosts), 0)
457 def test_l2bd_arp_term_13(self):
458 """ L2BD ND term - send duplicate ns, verify suppression
460 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
461 macs = self.mac_list(range(10, 11))
462 hosts = self.ip6_hosts(5, 1, macs)
463 reqs = self.ns_reqs_dst(hosts, dst_host) * 5
464 self.bd_swifs(1)[0].add_stream(reqs)
466 self.pg_enable_capture(self.pg_interfaces)
468 evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
469 for i in range(len(hosts))]
470 ev_hosts = self.nd_event_hosts(evs)
471 self.assertEqual(len(ev_hosts ^ hosts), 0)
473 def test_l2bd_arp_term_14(self):
474 """ L2BD ND term - disable ip4 arp events,send ns, verify no events
476 self.vapi.want_ip6_nd_events(enable_disable=0, ip="::")
477 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
478 macs = self.mac_list(range(10, 15))
479 hosts = self.ip6_hosts(5, 1, macs)
480 reqs = self.ns_reqs_dst(hosts, dst_host)
481 self.bd_swifs(1)[0].add_stream(reqs)
483 self.pg_enable_capture(self.pg_interfaces)
486 self.assertEqual(len(self.vapi.collect_events()), 0)
487 self.bd_add_del(1, is_add=0)
490 if __name__ == '__main__':
491 unittest.main(testRunner=VppTestRunner)