2 """ L2BD ARP term Test """
8 from socket import AF_INET, AF_INET6
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP
13 from scapy.utils import inet_pton, inet_ntop
14 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
15 in6_mactoifaceid, in6_ismaddr
16 from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
17 ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
18 ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
19 ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
21 from framework import VppTestCase, VppTestRunner
22 from util import Host, ppp, mactobinary
23 from vpp_mac import VppMacAddress
24 from vpp_ip import VppIpAddress
27 class TestL2bdArpTerm(VppTestCase):
28 """ L2BD arp termination Test Case """
33 Perform standard class setup (defined by class method setUpClass in
34 class VppTestCase) before running the test case, set test case related
35 variables and configure VPP.
37 super(TestL2bdArpTerm, cls).setUpClass()
40 # Create pg interfaces
42 cls.ifs_per_bd = ifs_per_bd = 3
43 n_ifs = n_bd * ifs_per_bd
44 cls.create_pg_interfaces(range(n_ifs))
46 # Set up all interfaces
47 for i in cls.pg_interfaces:
53 super(TestL2bdArpTerm, cls).tearDownClass()
58 Clear trace and packet infos before running each test.
60 self.reset_packet_infos()
61 super(TestL2bdArpTerm, self).setUp()
65 Show various debug prints after each test.
67 super(TestL2bdArpTerm, self).tearDown()
69 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
70 self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
72 def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
74 ip = e.ip4 if is_ipv6 == 0 else e.ip6
75 self.vapi.bd_ip_mac_add_del(bd_id=bd_id,
76 mac=VppMacAddress(e.mac).encode(),
77 ip=VppIpAddress(ip).encode(),
82 def mac_list(cls, b6_range):
83 return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
86 def ip4_host(cls, subnet, host, mac):
88 ip4="172.17.1%02u.%u" % (subnet, host))
91 def ip4_hosts(cls, subnet, start, mac_list):
92 return {cls.ip4_host(subnet, start + j, mac_list[j])
93 for j in range(len(mac_list))}
96 def ip6_host(cls, subnet, host, mac):
98 ip6="fd01:%x::%x" % (subnet, host))
101 def ip6_hosts(cls, subnet, start, mac_list):
102 return {cls.ip6_host(subnet, start + j, mac_list[j])
103 for j in range(len(mac_list))}
106 def bd_swifs(cls, b):
109 return [cls.pg_interfaces[j] for j in range(start, start + n)]
111 def bd_add_del(self, bd_id=1, is_add=1):
113 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
114 for swif in self.bd_swifs(bd_id):
115 swif_idx = swif.sw_if_index
116 self.vapi.sw_interface_set_l2_bridge(
117 swif_idx, bd_id=bd_id, enable=is_add)
119 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
122 def arp_req(cls, src_host, host):
123 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
125 hwsrc=src_host.bin_mac,
130 def arp_reqs(cls, src_host, entries):
131 return [cls.arp_req(src_host, e) for e in entries]
134 def garp_req(cls, host):
135 return cls.arp_req(host, host)
138 def garp_reqs(cls, entries):
139 return [cls.garp_req(e) for e in entries]
141 def arp_resp_host(self, src_host, arp_resp):
142 ether = arp_resp[Ether]
143 self.assertEqual(ether.dst, src_host.mac)
146 self.assertEqual(arp.hwtype, 1)
147 self.assertEqual(arp.ptype, 0x800)
148 self.assertEqual(arp.hwlen, 6)
149 self.assertEqual(arp.plen, 4)
150 arp_opts = {"who-has": 1, "is-at": 2}
151 self.assertEqual(arp.op, arp_opts["is-at"])
152 self.assertEqual(arp.hwdst, src_host.mac)
153 self.assertEqual(arp.pdst, src_host.ip4)
154 return Host(mac=arp.hwsrc, ip4=arp.psrc)
156 def arp_resp_hosts(self, src_host, pkts):
157 return {self.arp_resp_host(src_host, p) for p in pkts}
161 o1 = int(ip / 16777216) % 256
162 o2 = int(ip / 65536) % 256
163 o3 = int(ip / 256) % 256
165 return '%s.%s.%s.%s' % (o1, o2, o3, o4)
167 def arp_event_host(self, e):
168 return Host(mac=':'.join(['%02x' % ord(char) for char in e.new_mac]),
169 ip4=self.inttoip4(e.address))
171 def arp_event_hosts(self, evs):
172 return {self.arp_event_host(e) for e in evs}
174 def nd_event_host(self, e):
175 return Host(mac=':'.join(['%02x' % ord(char) for char in e.new_mac]),
176 ip6=inet_ntop(AF_INET6, e.address))
178 def nd_event_hosts(self, evs):
179 return {self.nd_event_host(e) for e in evs}
182 def ns_req(cls, src_host, host):
183 nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
184 d = inet_ntop(AF_INET6, nsma)
185 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
186 IPv6(dst=d, src=src_host.ip6) /
187 ICMPv6ND_NS(tgt=host.ip6) /
188 ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac))
191 def ns_reqs_dst(cls, entries, dst_host):
192 return [cls.ns_req(e, dst_host) for e in entries]
195 def ns_reqs_src(cls, src_host, entries):
196 return [cls.ns_req(src_host, e) for e in entries]
198 def na_resp_host(self, src_host, rx):
199 self.assertEqual(rx[Ether].dst, src_host.mac)
200 self.assertEqual(in6_ptop(rx[IPv6].dst),
201 in6_ptop(src_host.ip6))
203 self.assertTrue(rx.haslayer(ICMPv6ND_NA))
204 self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
207 return Host(mac=na.lladdr, ip6=na.tgt)
209 def na_resp_hosts(self, src_host, pkts):
210 return {self.na_resp_host(src_host, p) for p in pkts}
212 def set_bd_flags(self, bd_id, **args):
214 Enable/disable defined feature(s) of the bridge domain.
216 :param int bd_id: Bridge domain ID.
217 :param list args: List of feature/status pairs. Allowed features: \
218 learn, forward, flood, uu_flood and arp_term. Status False means \
219 disable, status True means enable the feature.
220 :raise: ValueError in case of unknown feature in the input.
224 feature_bitmap = 1 << 0
225 elif flag == "forward":
226 feature_bitmap = 1 << 1
227 elif flag == "flood":
228 feature_bitmap = 1 << 2
229 elif flag == "uu_flood":
230 feature_bitmap = 1 << 3
231 elif flag == "arp_term":
232 feature_bitmap = 1 << 4
234 raise ValueError("Unknown feature used: %s" % flag)
235 is_set = 1 if args[flag] else 0
236 self.vapi.bridge_flags(bd_id, is_set, feature_bitmap)
237 self.logger.info("Bridge domain ID %d updated" % bd_id)
239 def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
240 reqs = self.arp_reqs(src_host, req_hosts)
242 for swif in self.bd_swifs(bd_id):
243 swif.add_stream(reqs)
245 self.pg_enable_capture(self.pg_interfaces)
248 for swif in self.bd_swifs(bd_id):
249 resp_pkts = swif.get_capture(len(resp_hosts))
250 resps = self.arp_resp_hosts(src_host, resp_pkts)
251 self.assertEqual(len(resps ^ resp_hosts), 0)
253 def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
254 reqs = self.ns_reqs_src(src_host, req_hosts)
256 for swif in self.bd_swifs(bd_id):
257 swif.add_stream(reqs)
259 self.pg_enable_capture(self.pg_interfaces)
262 for swif in self.bd_swifs(bd_id):
263 resp_pkts = swif.get_capture(len(resp_hosts))
264 resps = self.na_resp_hosts(src_host, resp_pkts)
265 self.assertEqual(len(resps ^ resp_hosts), 0)
267 def test_l2bd_arp_term_01(self):
268 """ L2BD arp term - add 5 hosts, verify arp responses
270 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
271 self.bd_add_del(1, is_add=1)
272 self.set_bd_flags(1, arp_term=True, flood=False,
273 uu_flood=False, learn=False)
274 macs = self.mac_list(range(1, 5))
275 hosts = self.ip4_hosts(4, 1, macs)
276 self.add_del_arp_term_hosts(hosts, is_add=1)
278 self.verify_arp(src_host, hosts, hosts)
279 type(self).hosts = hosts
281 def test_l2bd_arp_term_02(self):
282 """ L2BD arp term - delete 3 hosts, verify arp responses
284 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
285 macs = self.mac_list(range(1, 3))
286 deleted = self.ip4_hosts(4, 1, macs)
287 self.add_del_arp_term_hosts(deleted, is_add=0)
288 remaining = self.hosts - deleted
289 self.verify_arp(src_host, self.hosts, remaining)
290 type(self).hosts = remaining
291 self.bd_add_del(1, is_add=0)
293 def test_l2bd_arp_term_03(self):
294 """ L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses
296 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
297 self.bd_add_del(1, is_add=1)
298 self.set_bd_flags(1, arp_term=True, flood=False,
299 uu_flood=False, learn=False)
300 macs = self.mac_list(range(1, 3))
301 readded = self.ip4_hosts(4, 1, macs)
302 self.add_del_arp_term_hosts(readded, is_add=1)
303 self.verify_arp(src_host, self.hosts | readded, readded)
304 type(self).hosts = readded
306 def test_l2bd_arp_term_04(self):
307 """ L2BD arp term - 2 IP4 addrs per host
309 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
310 macs = self.mac_list(range(1, 3))
311 sub5_hosts = self.ip4_hosts(5, 1, macs)
312 self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
313 hosts = self.hosts | sub5_hosts
314 self.verify_arp(src_host, hosts, hosts)
315 type(self).hosts = hosts
316 self.bd_add_del(1, is_add=0)
318 def test_l2bd_arp_term_05(self):
319 """ L2BD arp term - create and update 10 IP4-mac pairs
321 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
322 self.bd_add_del(1, is_add=1)
323 self.set_bd_flags(1, arp_term=True, flood=False,
324 uu_flood=False, learn=False)
325 macs1 = self.mac_list(range(10, 20))
326 hosts1 = self.ip4_hosts(5, 1, macs1)
327 self.add_del_arp_term_hosts(hosts1, is_add=1)
328 self.verify_arp(src_host, hosts1, hosts1)
329 macs2 = self.mac_list(range(20, 30))
330 hosts2 = self.ip4_hosts(5, 1, macs2)
331 self.add_del_arp_term_hosts(hosts2, is_add=1)
332 self.verify_arp(src_host, hosts1, hosts2)
333 self.bd_add_del(1, is_add=0)
335 def test_l2bd_arp_term_06(self):
336 """ L2BD arp/ND term - hosts with both ip4/ip6
338 src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
339 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
340 self.bd_add_del(1, is_add=1)
341 # enable flood to make sure requests are not flooded
342 self.set_bd_flags(1, arp_term=True, flood=True,
343 uu_flood=False, learn=False)
344 macs = self.mac_list(range(10, 20))
345 hosts6 = self.ip6_hosts(5, 1, macs)
346 hosts4 = self.ip4_hosts(5, 1, macs)
347 self.add_del_arp_term_hosts(hosts4, is_add=1)
348 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
349 self.verify_arp(src_host4, hosts4, hosts4)
350 self.verify_nd(src_host6, hosts6, hosts6)
351 self.bd_add_del(1, is_add=0)
353 def test_l2bd_arp_term_07(self):
354 """ L2BD ND term - Add and Del hosts, verify ND replies
356 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
357 self.bd_add_del(1, is_add=1)
358 self.set_bd_flags(1, arp_term=True, flood=False,
359 uu_flood=False, learn=False)
360 macs = self.mac_list(range(10, 20))
361 hosts6 = self.ip6_hosts(5, 1, macs)
362 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
363 self.verify_nd(src_host6, hosts6, hosts6)
364 del_macs = self.mac_list(range(10, 15))
365 deleted = self.ip6_hosts(5, 1, del_macs)
366 self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
367 self.verify_nd(src_host6, hosts6, hosts6 - deleted)
368 self.bd_add_del(1, is_add=0)
370 def test_l2bd_arp_term_08(self):
371 """ L2BD ND term - Add and update IP+mac, verify ND replies
373 src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
374 self.bd_add_del(1, is_add=1)
375 self.set_bd_flags(1, arp_term=True, flood=False,
376 uu_flood=False, learn=False)
377 macs1 = self.mac_list(range(10, 20))
378 hosts = self.ip6_hosts(5, 1, macs1)
379 self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
380 self.verify_nd(src_host, hosts, hosts)
381 macs2 = self.mac_list(range(20, 30))
382 updated = self.ip6_hosts(5, 1, macs2)
383 self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
384 self.verify_nd(src_host, hosts, updated)
385 self.bd_add_del(1, is_add=0)
387 def test_l2bd_arp_term_09(self):
388 """ L2BD arp term - send garps, verify arp event reports
390 self.vapi.want_ip4_arp_events()
391 self.bd_add_del(1, is_add=1)
392 self.set_bd_flags(1, arp_term=True, flood=False,
393 uu_flood=False, learn=False)
394 macs = self.mac_list(range(90, 95))
395 hosts = self.ip4_hosts(5, 1, macs)
397 garps = self.garp_reqs(hosts)
398 self.bd_swifs(1)[0].add_stream(garps)
400 self.pg_enable_capture(self.pg_interfaces)
402 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
403 for i in range(len(hosts))]
404 ev_hosts = self.arp_event_hosts(evs)
405 self.assertEqual(len(ev_hosts ^ hosts), 0)
407 def test_l2bd_arp_term_10(self):
408 """ L2BD arp term - send duplicate garps, verify suppression
410 macs = self.mac_list(range(70, 71))
411 hosts = self.ip4_hosts(6, 1, macs)
413 """ send the packet 5 times expect one event
415 garps = self.garp_reqs(hosts) * 5
416 self.bd_swifs(1)[0].add_stream(garps)
418 self.pg_enable_capture(self.pg_interfaces)
420 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
421 for i in range(len(hosts))]
422 ev_hosts = self.arp_event_hosts(evs)
423 self.assertEqual(len(ev_hosts ^ hosts), 0)
425 def test_l2bd_arp_term_11(self):
426 """ L2BD arp term - disable ip4 arp events,send garps, verify no events
428 self.vapi.want_ip4_arp_events(enable_disable=0)
429 macs = self.mac_list(range(90, 95))
430 hosts = self.ip4_hosts(5, 1, macs)
432 garps = self.garp_reqs(hosts)
433 self.bd_swifs(1)[0].add_stream(garps)
435 self.pg_enable_capture(self.pg_interfaces)
438 self.assertEqual(len(self.vapi.collect_events()), 0)
439 self.bd_add_del(1, is_add=0)
441 def test_l2bd_arp_term_12(self):
442 """ L2BD ND term - send NS packets verify reports
444 self.vapi.want_ip6_nd_events(address=inet_pton(AF_INET6, "::0"))
445 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
446 self.bd_add_del(1, is_add=1)
447 self.set_bd_flags(1, arp_term=True, flood=False,
448 uu_flood=False, learn=False)
449 macs = self.mac_list(range(10, 15))
450 hosts = self.ip6_hosts(5, 1, macs)
451 reqs = self.ns_reqs_dst(hosts, dst_host)
452 self.bd_swifs(1)[0].add_stream(reqs)
454 self.pg_enable_capture(self.pg_interfaces)
456 evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
457 for i in range(len(hosts))]
458 ev_hosts = self.nd_event_hosts(evs)
459 self.assertEqual(len(ev_hosts ^ hosts), 0)
461 def test_l2bd_arp_term_13(self):
462 """ L2BD ND term - send duplicate ns, verify suppression
464 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
465 macs = self.mac_list(range(10, 11))
466 hosts = self.ip6_hosts(5, 1, macs)
467 reqs = self.ns_reqs_dst(hosts, dst_host) * 5
468 self.bd_swifs(1)[0].add_stream(reqs)
470 self.pg_enable_capture(self.pg_interfaces)
472 evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
473 for i in range(len(hosts))]
474 ev_hosts = self.nd_event_hosts(evs)
475 self.assertEqual(len(ev_hosts ^ hosts), 0)
477 def test_l2bd_arp_term_14(self):
478 """ L2BD ND term - disable ip4 arp events,send ns, verify no events
480 self.vapi.want_ip6_nd_events(enable_disable=0,
481 address=inet_pton(AF_INET6, "::0"))
482 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
483 macs = self.mac_list(range(10, 15))
484 hosts = self.ip6_hosts(5, 1, macs)
485 reqs = self.ns_reqs_dst(hosts, dst_host)
486 self.bd_swifs(1)[0].add_stream(reqs)
488 self.pg_enable_capture(self.pg_interfaces)
491 self.assertEqual(len(self.vapi.collect_events()), 0)
492 self.bd_add_del(1, is_add=0)
495 if __name__ == '__main__':
496 unittest.main(testRunner=VppTestRunner)