2 """ L2BD ARP term Test """
8 from socket import AF_INET, AF_INET6
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP
13 from scapy.utils import inet_pton, inet_ntop
14 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
15 in6_mactoifaceid, in6_ismaddr
16 from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
17 ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
18 ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
19 ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
21 from framework import VppTestCase, VppTestRunner
22 from util import Host, ppp, mactobinary
23 from vpp_mac import VppMacAddress
24 from vpp_ip import VppIpAddress
27 class TestL2bdArpTerm(VppTestCase):
28 """ L2BD arp termination Test Case """
33 Perform standard class setup (defined by class method setUpClass in
34 class VppTestCase) before running the test case, set test case related
35 variables and configure VPP.
37 super(TestL2bdArpTerm, cls).setUpClass()
40 # Create pg interfaces
42 cls.ifs_per_bd = ifs_per_bd = 3
43 n_ifs = n_bd * ifs_per_bd
44 cls.create_pg_interfaces(range(n_ifs))
46 # Set up all interfaces
47 for i in cls.pg_interfaces:
53 super(TestL2bdArpTerm, cls).tearDownClass()
58 Clear trace and packet infos before running each test.
60 self.reset_packet_infos()
61 super(TestL2bdArpTerm, self).setUp()
65 Show various debug prints after each test.
67 super(TestL2bdArpTerm, self).tearDown()
69 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
70 self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
72 def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
74 ip = e.ip4 if is_ipv6 == 0 else e.ip6
75 self.vapi.bd_ip_mac_add_del(bd_id=bd_id,
76 mac=VppMacAddress(e.mac).encode(),
77 ip=VppIpAddress(ip).encode(),
82 def mac_list(cls, b6_range):
83 return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
86 def ip4_host(cls, subnet, host, mac):
88 ip4="172.17.1%02u.%u" % (subnet, host))
91 def ip4_hosts(cls, subnet, start, mac_list):
92 return {cls.ip4_host(subnet, start + j, mac_list[j])
93 for j in range(len(mac_list))}
96 def ip6_host(cls, subnet, host, mac):
98 ip6="fd01:%x::%x" % (subnet, host))
101 def ip6_hosts(cls, subnet, start, mac_list):
102 return {cls.ip6_host(subnet, start + j, mac_list[j])
103 for j in range(len(mac_list))}
106 def bd_swifs(cls, b):
109 return [cls.pg_interfaces[j] for j in range(start, start + n)]
111 def bd_add_del(self, bd_id=1, is_add=1):
113 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
114 for swif in self.bd_swifs(bd_id):
115 swif_idx = swif.sw_if_index
116 self.vapi.sw_interface_set_l2_bridge(
117 swif_idx, bd_id=bd_id, enable=is_add)
119 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
122 def arp_req(cls, src_host, host):
123 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
125 hwsrc=src_host.bin_mac,
130 def arp_reqs(cls, src_host, entries):
131 return [cls.arp_req(src_host, e) for e in entries]
134 def garp_req(cls, host):
135 return cls.arp_req(host, host)
138 def garp_reqs(cls, entries):
139 return [cls.garp_req(e) for e in entries]
141 def arp_resp_host(self, src_host, arp_resp):
142 ether = arp_resp[Ether]
143 self.assertEqual(ether.dst, src_host.mac)
146 self.assertEqual(arp.hwtype, 1)
147 self.assertEqual(arp.ptype, 0x800)
148 self.assertEqual(arp.hwlen, 6)
149 self.assertEqual(arp.plen, 4)
150 arp_opts = {"who-has": 1, "is-at": 2}
151 self.assertEqual(arp.op, arp_opts["is-at"])
152 self.assertEqual(arp.hwdst, src_host.mac)
153 self.assertEqual(arp.pdst, src_host.ip4)
154 return Host(mac=arp.hwsrc, ip4=arp.psrc)
156 def arp_resp_hosts(self, src_host, pkts):
157 return {self.arp_resp_host(src_host, p) for p in pkts}
159 def inttoip4(self, ip):
160 o1 = int(ip / 16777216) % 256
161 o2 = int(ip / 65536) % 256
162 o3 = int(ip / 256) % 256
164 return '%(o1)s.%(o2)s.%(o3)s.%(o4)s' % locals()
166 def arp_event_host(self, e):
167 return Host(mac=':'.join(['%02x' % ord(char) for char in e.new_mac]),
168 ip4=self.inttoip4(e.address))
170 def arp_event_hosts(self, evs):
171 return {self.arp_event_host(e) for e in evs}
173 def nd_event_host(self, e):
174 return Host(mac=':'.join(['%02x' % ord(char) for char in e.new_mac]),
175 ip6=inet_ntop(AF_INET6, e.address))
177 def nd_event_hosts(self, evs):
178 return {self.nd_event_host(e) for e in evs}
181 def ns_req(cls, src_host, host):
182 nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
183 d = inet_ntop(AF_INET6, nsma)
184 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
185 IPv6(dst=d, src=src_host.ip6) /
186 ICMPv6ND_NS(tgt=host.ip6) /
187 ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac))
190 def ns_reqs_dst(cls, entries, dst_host):
191 return [cls.ns_req(e, dst_host) for e in entries]
194 def ns_reqs_src(cls, src_host, entries):
195 return [cls.ns_req(src_host, e) for e in entries]
197 def na_resp_host(self, src_host, rx):
198 self.assertEqual(rx[Ether].dst, src_host.mac)
199 self.assertEqual(in6_ptop(rx[IPv6].dst),
200 in6_ptop(src_host.ip6))
202 self.assertTrue(rx.haslayer(ICMPv6ND_NA))
203 self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
206 return Host(mac=na.lladdr, ip6=na.tgt)
208 def na_resp_hosts(self, src_host, pkts):
209 return {self.na_resp_host(src_host, p) for p in pkts}
211 def set_bd_flags(self, bd_id, **args):
213 Enable/disable defined feature(s) of the bridge domain.
215 :param int bd_id: Bridge domain ID.
216 :param list args: List of feature/status pairs. Allowed features: \
217 learn, forward, flood, uu_flood and arp_term. Status False means \
218 disable, status True means enable the feature.
219 :raise: ValueError in case of unknown feature in the input.
223 feature_bitmap = 1 << 0
224 elif flag == "forward":
225 feature_bitmap = 1 << 1
226 elif flag == "flood":
227 feature_bitmap = 1 << 2
228 elif flag == "uu_flood":
229 feature_bitmap = 1 << 3
230 elif flag == "arp_term":
231 feature_bitmap = 1 << 4
233 raise ValueError("Unknown feature used: %s" % flag)
234 is_set = 1 if args[flag] else 0
235 self.vapi.bridge_flags(bd_id, is_set, feature_bitmap)
236 self.logger.info("Bridge domain ID %d updated" % bd_id)
238 def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
239 reqs = self.arp_reqs(src_host, req_hosts)
241 for swif in self.bd_swifs(bd_id):
242 swif.add_stream(reqs)
244 self.pg_enable_capture(self.pg_interfaces)
247 for swif in self.bd_swifs(bd_id):
248 resp_pkts = swif.get_capture(len(resp_hosts))
249 resps = self.arp_resp_hosts(src_host, resp_pkts)
250 self.assertEqual(len(resps ^ resp_hosts), 0)
252 def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
253 reqs = self.ns_reqs_src(src_host, req_hosts)
255 for swif in self.bd_swifs(bd_id):
256 swif.add_stream(reqs)
258 self.pg_enable_capture(self.pg_interfaces)
261 for swif in self.bd_swifs(bd_id):
262 resp_pkts = swif.get_capture(len(resp_hosts))
263 resps = self.na_resp_hosts(src_host, resp_pkts)
264 self.assertEqual(len(resps ^ resp_hosts), 0)
266 def test_l2bd_arp_term_01(self):
267 """ L2BD arp term - add 5 hosts, verify arp responses
269 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
270 self.bd_add_del(1, is_add=1)
271 self.set_bd_flags(1, arp_term=True, flood=False,
272 uu_flood=False, learn=False)
273 macs = self.mac_list(range(1, 5))
274 hosts = self.ip4_hosts(4, 1, macs)
275 self.add_del_arp_term_hosts(hosts, is_add=1)
277 self.verify_arp(src_host, hosts, hosts)
278 type(self).hosts = hosts
280 def test_l2bd_arp_term_02(self):
281 """ L2BD arp term - delete 3 hosts, verify arp responses
283 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
284 macs = self.mac_list(range(1, 3))
285 deleted = self.ip4_hosts(4, 1, macs)
286 self.add_del_arp_term_hosts(deleted, is_add=0)
287 remaining = self.hosts - deleted
288 self.verify_arp(src_host, self.hosts, remaining)
289 type(self).hosts = remaining
290 self.bd_add_del(1, is_add=0)
292 def test_l2bd_arp_term_03(self):
293 """ L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses
295 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
296 self.bd_add_del(1, is_add=1)
297 self.set_bd_flags(1, arp_term=True, flood=False,
298 uu_flood=False, learn=False)
299 macs = self.mac_list(range(1, 3))
300 readded = self.ip4_hosts(4, 1, macs)
301 self.add_del_arp_term_hosts(readded, is_add=1)
302 self.verify_arp(src_host, self.hosts | readded, readded)
303 type(self).hosts = readded
305 def test_l2bd_arp_term_04(self):
306 """ L2BD arp term - 2 IP4 addrs per host
308 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
309 macs = self.mac_list(range(1, 3))
310 sub5_hosts = self.ip4_hosts(5, 1, macs)
311 self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
312 hosts = self.hosts | sub5_hosts
313 self.verify_arp(src_host, hosts, hosts)
314 type(self).hosts = hosts
315 self.bd_add_del(1, is_add=0)
317 def test_l2bd_arp_term_05(self):
318 """ L2BD arp term - create and update 10 IP4-mac pairs
320 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
321 self.bd_add_del(1, is_add=1)
322 self.set_bd_flags(1, arp_term=True, flood=False,
323 uu_flood=False, learn=False)
324 macs1 = self.mac_list(range(10, 20))
325 hosts1 = self.ip4_hosts(5, 1, macs1)
326 self.add_del_arp_term_hosts(hosts1, is_add=1)
327 self.verify_arp(src_host, hosts1, hosts1)
328 macs2 = self.mac_list(range(20, 30))
329 hosts2 = self.ip4_hosts(5, 1, macs2)
330 self.add_del_arp_term_hosts(hosts2, is_add=1)
331 self.verify_arp(src_host, hosts1, hosts2)
332 self.bd_add_del(1, is_add=0)
334 def test_l2bd_arp_term_06(self):
335 """ L2BD arp/ND term - hosts with both ip4/ip6
337 src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
338 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
339 self.bd_add_del(1, is_add=1)
340 # enable flood to make sure requests are not flooded
341 self.set_bd_flags(1, arp_term=True, flood=True,
342 uu_flood=False, learn=False)
343 macs = self.mac_list(range(10, 20))
344 hosts6 = self.ip6_hosts(5, 1, macs)
345 hosts4 = self.ip4_hosts(5, 1, macs)
346 self.add_del_arp_term_hosts(hosts4, is_add=1)
347 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
348 self.verify_arp(src_host4, hosts4, hosts4)
349 self.verify_nd(src_host6, hosts6, hosts6)
350 self.bd_add_del(1, is_add=0)
352 def test_l2bd_arp_term_07(self):
353 """ L2BD ND term - Add and Del hosts, verify ND replies
355 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
356 self.bd_add_del(1, is_add=1)
357 self.set_bd_flags(1, arp_term=True, flood=False,
358 uu_flood=False, learn=False)
359 macs = self.mac_list(range(10, 20))
360 hosts6 = self.ip6_hosts(5, 1, macs)
361 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
362 self.verify_nd(src_host6, hosts6, hosts6)
363 del_macs = self.mac_list(range(10, 15))
364 deleted = self.ip6_hosts(5, 1, del_macs)
365 self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
366 self.verify_nd(src_host6, hosts6, hosts6 - deleted)
367 self.bd_add_del(1, is_add=0)
369 def test_l2bd_arp_term_08(self):
370 """ L2BD ND term - Add and update IP+mac, verify ND replies
372 src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
373 self.bd_add_del(1, is_add=1)
374 self.set_bd_flags(1, arp_term=True, flood=False,
375 uu_flood=False, learn=False)
376 macs1 = self.mac_list(range(10, 20))
377 hosts = self.ip6_hosts(5, 1, macs1)
378 self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
379 self.verify_nd(src_host, hosts, hosts)
380 macs2 = self.mac_list(range(20, 30))
381 updated = self.ip6_hosts(5, 1, macs2)
382 self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
383 self.verify_nd(src_host, hosts, updated)
384 self.bd_add_del(1, is_add=0)
386 def test_l2bd_arp_term_09(self):
387 """ L2BD arp term - send garps, verify arp event reports
389 self.vapi.want_ip4_arp_events()
390 self.bd_add_del(1, is_add=1)
391 self.set_bd_flags(1, arp_term=True, flood=False,
392 uu_flood=False, learn=False)
393 macs = self.mac_list(range(90, 95))
394 hosts = self.ip4_hosts(5, 1, macs)
396 garps = self.garp_reqs(hosts)
397 self.bd_swifs(1)[0].add_stream(garps)
399 self.pg_enable_capture(self.pg_interfaces)
401 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
402 for i in range(len(hosts))]
403 ev_hosts = self.arp_event_hosts(evs)
404 self.assertEqual(len(ev_hosts ^ hosts), 0)
406 def test_l2bd_arp_term_10(self):
407 """ L2BD arp term - send duplicate garps, verify suppression
409 macs = self.mac_list(range(70, 71))
410 hosts = self.ip4_hosts(6, 1, macs)
412 """ send the packet 5 times expect one event
414 garps = self.garp_reqs(hosts) * 5
415 self.bd_swifs(1)[0].add_stream(garps)
417 self.pg_enable_capture(self.pg_interfaces)
419 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
420 for i in range(len(hosts))]
421 ev_hosts = self.arp_event_hosts(evs)
422 self.assertEqual(len(ev_hosts ^ hosts), 0)
424 def test_l2bd_arp_term_11(self):
425 """ L2BD arp term - disable ip4 arp events,send garps, verify no events
427 self.vapi.want_ip4_arp_events(enable_disable=0)
428 macs = self.mac_list(range(90, 95))
429 hosts = self.ip4_hosts(5, 1, macs)
431 garps = self.garp_reqs(hosts)
432 self.bd_swifs(1)[0].add_stream(garps)
434 self.pg_enable_capture(self.pg_interfaces)
437 self.assertEqual(len(self.vapi.collect_events()), 0)
438 self.bd_add_del(1, is_add=0)
440 def test_l2bd_arp_term_12(self):
441 """ L2BD ND term - send NS packets verify reports
443 self.vapi.want_ip6_nd_events(address=inet_pton(AF_INET6, "::0"))
444 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
445 self.bd_add_del(1, is_add=1)
446 self.set_bd_flags(1, arp_term=True, flood=False,
447 uu_flood=False, learn=False)
448 macs = self.mac_list(range(10, 15))
449 hosts = self.ip6_hosts(5, 1, macs)
450 reqs = self.ns_reqs_dst(hosts, dst_host)
451 self.bd_swifs(1)[0].add_stream(reqs)
453 self.pg_enable_capture(self.pg_interfaces)
455 evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
456 for i in range(len(hosts))]
457 ev_hosts = self.nd_event_hosts(evs)
458 self.assertEqual(len(ev_hosts ^ hosts), 0)
460 def test_l2bd_arp_term_13(self):
461 """ L2BD ND term - send duplicate ns, verify suppression
463 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
464 macs = self.mac_list(range(10, 11))
465 hosts = self.ip6_hosts(5, 1, macs)
466 reqs = self.ns_reqs_dst(hosts, dst_host) * 5
467 self.bd_swifs(1)[0].add_stream(reqs)
469 self.pg_enable_capture(self.pg_interfaces)
471 evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
472 for i in range(len(hosts))]
473 ev_hosts = self.nd_event_hosts(evs)
474 self.assertEqual(len(ev_hosts ^ hosts), 0)
476 def test_l2bd_arp_term_14(self):
477 """ L2BD ND term - disable ip4 arp events,send ns, verify no events
479 self.vapi.want_ip6_nd_events(enable_disable=0,
480 address=inet_pton(AF_INET6, "::0"))
481 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
482 macs = self.mac_list(range(10, 15))
483 hosts = self.ip6_hosts(5, 1, macs)
484 reqs = self.ns_reqs_dst(hosts, dst_host)
485 self.bd_swifs(1)[0].add_stream(reqs)
487 self.pg_enable_capture(self.pg_interfaces)
490 self.assertEqual(len(self.vapi.collect_events()), 0)
491 self.bd_add_del(1, is_add=0)
494 if __name__ == '__main__':
495 unittest.main(testRunner=VppTestRunner)