2 """ L2BD ARP term Test """
8 from socket import AF_INET, AF_INET6
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP
13 from scapy.utils import inet_pton, inet_ntop
14 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
15 in6_mactoifaceid, in6_ismaddr
16 from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
17 ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
18 ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
19 ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
21 from framework import VppTestCase, VppTestRunner
22 from util import Host, ppp, mactobinary
25 class TestL2bdArpTerm(VppTestCase):
26 """ L2BD arp termination Test Case """
31 Perform standard class setup (defined by class method setUpClass in
32 class VppTestCase) before running the test case, set test case related
33 variables and configure VPP.
35 super(TestL2bdArpTerm, cls).setUpClass()
38 # Create pg interfaces
40 cls.ifs_per_bd = ifs_per_bd = 3
41 n_ifs = n_bd * ifs_per_bd
42 cls.create_pg_interfaces(range(n_ifs))
44 # Set up all interfaces
45 for i in cls.pg_interfaces:
51 super(TestL2bdArpTerm, cls).tearDownClass()
56 Clear trace and packet infos before running each test.
58 self.reset_packet_infos()
59 super(TestL2bdArpTerm, self).setUp()
63 Show various debug prints after each test.
65 super(TestL2bdArpTerm, self).tearDown()
67 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
68 self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
70 def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
72 ip = e.ip4n if is_ipv6 == 0 else e.ip6n
73 self.vapi.bd_ip_mac_add_del(bd_id=bd_id,
80 def mac_list(cls, b6_range):
81 return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
84 def ip4_host(cls, subnet, host, mac):
86 ip4="172.17.1%02u.%u" % (subnet, host))
89 def ip4_hosts(cls, subnet, start, mac_list):
90 return {cls.ip4_host(subnet, start + j, mac_list[j])
91 for j in range(len(mac_list))}
94 def ip6_host(cls, subnet, host, mac):
96 ip6="fd01:%x::%x" % (subnet, host))
99 def ip6_hosts(cls, subnet, start, mac_list):
100 return {cls.ip6_host(subnet, start + j, mac_list[j])
101 for j in range(len(mac_list))}
104 def bd_swifs(cls, b):
107 return [cls.pg_interfaces[j] for j in range(start, start + n)]
109 def bd_add_del(self, bd_id=1, is_add=1):
111 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
112 for swif in self.bd_swifs(bd_id):
113 swif_idx = swif.sw_if_index
114 self.vapi.sw_interface_set_l2_bridge(
115 swif_idx, bd_id=bd_id, enable=is_add)
117 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
120 def arp_req(cls, src_host, host):
121 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
123 hwsrc=src_host.bin_mac,
128 def arp_reqs(cls, src_host, entries):
129 return [cls.arp_req(src_host, e) for e in entries]
132 def garp_req(cls, host):
133 return cls.arp_req(host, host)
136 def garp_reqs(cls, entries):
137 return [cls.garp_req(e) for e in entries]
139 def arp_resp_host(self, src_host, arp_resp):
140 ether = arp_resp[Ether]
141 self.assertEqual(ether.dst, src_host.mac)
144 self.assertEqual(arp.hwtype, 1)
145 self.assertEqual(arp.ptype, 0x800)
146 self.assertEqual(arp.hwlen, 6)
147 self.assertEqual(arp.plen, 4)
148 arp_opts = {"who-has": 1, "is-at": 2}
149 self.assertEqual(arp.op, arp_opts["is-at"])
150 self.assertEqual(arp.hwdst, src_host.mac)
151 self.assertEqual(arp.pdst, src_host.ip4)
152 return Host(mac=arp.hwsrc, ip4=arp.psrc)
154 def arp_resp_hosts(self, src_host, pkts):
155 return {self.arp_resp_host(src_host, p) for p in pkts}
157 def inttoip4(self, ip):
158 o1 = int(ip / 16777216) % 256
159 o2 = int(ip / 65536) % 256
160 o3 = int(ip / 256) % 256
162 return '%(o1)s.%(o2)s.%(o3)s.%(o4)s' % locals()
164 def arp_event_host(self, e):
165 return Host(mac=':'.join(['%02x' % ord(char) for char in e.new_mac]),
166 ip4=self.inttoip4(e.address))
168 def arp_event_hosts(self, evs):
169 return {self.arp_event_host(e) for e in evs}
171 def nd_event_host(self, e):
172 return Host(mac=':'.join(['%02x' % ord(char) for char in e.new_mac]),
173 ip6=inet_ntop(AF_INET6, e.address))
175 def nd_event_hosts(self, evs):
176 return {self.nd_event_host(e) for e in evs}
179 def ns_req(cls, src_host, host):
180 nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
181 d = inet_ntop(AF_INET6, nsma)
182 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
183 IPv6(dst=d, src=src_host.ip6) /
184 ICMPv6ND_NS(tgt=host.ip6) /
185 ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac))
188 def ns_reqs_dst(cls, entries, dst_host):
189 return [cls.ns_req(e, dst_host) for e in entries]
192 def ns_reqs_src(cls, src_host, entries):
193 return [cls.ns_req(src_host, e) for e in entries]
195 def na_resp_host(self, src_host, rx):
196 self.assertEqual(rx[Ether].dst, src_host.mac)
197 self.assertEqual(in6_ptop(rx[IPv6].dst),
198 in6_ptop(src_host.ip6))
200 self.assertTrue(rx.haslayer(ICMPv6ND_NA))
201 self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
204 return Host(mac=na.lladdr, ip6=na.tgt)
206 def na_resp_hosts(self, src_host, pkts):
207 return {self.na_resp_host(src_host, p) for p in pkts}
209 def set_bd_flags(self, bd_id, **args):
211 Enable/disable defined feature(s) of the bridge domain.
213 :param int bd_id: Bridge domain ID.
214 :param list args: List of feature/status pairs. Allowed features: \
215 learn, forward, flood, uu_flood and arp_term. Status False means \
216 disable, status True means enable the feature.
217 :raise: ValueError in case of unknown feature in the input.
221 feature_bitmap = 1 << 0
222 elif flag == "forward":
223 feature_bitmap = 1 << 1
224 elif flag == "flood":
225 feature_bitmap = 1 << 2
226 elif flag == "uu_flood":
227 feature_bitmap = 1 << 3
228 elif flag == "arp_term":
229 feature_bitmap = 1 << 4
231 raise ValueError("Unknown feature used: %s" % flag)
232 is_set = 1 if args[flag] else 0
233 self.vapi.bridge_flags(bd_id, is_set, feature_bitmap)
234 self.logger.info("Bridge domain ID %d updated" % bd_id)
236 def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
237 reqs = self.arp_reqs(src_host, req_hosts)
239 for swif in self.bd_swifs(bd_id):
240 swif.add_stream(reqs)
242 self.pg_enable_capture(self.pg_interfaces)
245 for swif in self.bd_swifs(bd_id):
246 resp_pkts = swif.get_capture(len(resp_hosts))
247 resps = self.arp_resp_hosts(src_host, resp_pkts)
248 self.assertEqual(len(resps ^ resp_hosts), 0)
250 def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
251 reqs = self.ns_reqs_src(src_host, req_hosts)
253 for swif in self.bd_swifs(bd_id):
254 swif.add_stream(reqs)
256 self.pg_enable_capture(self.pg_interfaces)
259 for swif in self.bd_swifs(bd_id):
260 resp_pkts = swif.get_capture(len(resp_hosts))
261 resps = self.na_resp_hosts(src_host, resp_pkts)
262 self.assertEqual(len(resps ^ resp_hosts), 0)
264 def test_l2bd_arp_term_01(self):
265 """ L2BD arp term - add 5 hosts, verify arp responses
267 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
268 self.bd_add_del(1, is_add=1)
269 self.set_bd_flags(1, arp_term=True, flood=False,
270 uu_flood=False, learn=False)
271 macs = self.mac_list(range(1, 5))
272 hosts = self.ip4_hosts(4, 1, macs)
273 self.add_del_arp_term_hosts(hosts, is_add=1)
274 self.verify_arp(src_host, hosts, hosts)
275 type(self).hosts = hosts
277 def test_l2bd_arp_term_02(self):
278 """ L2BD arp term - delete 3 hosts, verify arp responses
280 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
281 macs = self.mac_list(range(1, 3))
282 deleted = self.ip4_hosts(4, 1, macs)
283 self.add_del_arp_term_hosts(deleted, is_add=0)
284 remaining = self.hosts - deleted
285 self.verify_arp(src_host, self.hosts, remaining)
286 type(self).hosts = remaining
287 self.bd_add_del(1, is_add=0)
289 def test_l2bd_arp_term_03(self):
290 """ L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses
292 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
293 self.bd_add_del(1, is_add=1)
294 self.set_bd_flags(1, arp_term=True, flood=False,
295 uu_flood=False, learn=False)
296 macs = self.mac_list(range(1, 3))
297 readded = self.ip4_hosts(4, 1, macs)
298 self.add_del_arp_term_hosts(readded, is_add=1)
299 self.verify_arp(src_host, self.hosts | readded, readded)
300 type(self).hosts = readded
302 def test_l2bd_arp_term_04(self):
303 """ L2BD arp term - 2 IP4 addrs per host
305 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
306 macs = self.mac_list(range(1, 3))
307 sub5_hosts = self.ip4_hosts(5, 1, macs)
308 self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
309 hosts = self.hosts | sub5_hosts
310 self.verify_arp(src_host, hosts, hosts)
311 type(self).hosts = hosts
312 self.bd_add_del(1, is_add=0)
314 def test_l2bd_arp_term_05(self):
315 """ L2BD arp term - create and update 10 IP4-mac pairs
317 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
318 self.bd_add_del(1, is_add=1)
319 self.set_bd_flags(1, arp_term=True, flood=False,
320 uu_flood=False, learn=False)
321 macs1 = self.mac_list(range(10, 20))
322 hosts1 = self.ip4_hosts(5, 1, macs1)
323 self.add_del_arp_term_hosts(hosts1, is_add=1)
324 self.verify_arp(src_host, hosts1, hosts1)
325 macs2 = self.mac_list(range(20, 30))
326 hosts2 = self.ip4_hosts(5, 1, macs2)
327 self.add_del_arp_term_hosts(hosts2, is_add=1)
328 self.verify_arp(src_host, hosts1, hosts2)
329 self.bd_add_del(1, is_add=0)
331 def test_l2bd_arp_term_06(self):
332 """ L2BD arp/ND term - hosts with both ip4/ip6
334 src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
335 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
336 self.bd_add_del(1, is_add=1)
337 # enable flood to make sure requests are not flooded
338 self.set_bd_flags(1, arp_term=True, flood=True,
339 uu_flood=False, learn=False)
340 macs = self.mac_list(range(10, 20))
341 hosts6 = self.ip6_hosts(5, 1, macs)
342 hosts4 = self.ip4_hosts(5, 1, macs)
343 self.add_del_arp_term_hosts(hosts4, is_add=1)
344 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
345 self.verify_arp(src_host4, hosts4, hosts4)
346 self.verify_nd(src_host6, hosts6, hosts6)
347 self.bd_add_del(1, is_add=0)
349 def test_l2bd_arp_term_07(self):
350 """ L2BD ND term - Add and Del hosts, verify ND replies
352 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
353 self.bd_add_del(1, is_add=1)
354 self.set_bd_flags(1, arp_term=True, flood=False,
355 uu_flood=False, learn=False)
356 macs = self.mac_list(range(10, 20))
357 hosts6 = self.ip6_hosts(5, 1, macs)
358 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
359 self.verify_nd(src_host6, hosts6, hosts6)
360 del_macs = self.mac_list(range(10, 15))
361 deleted = self.ip6_hosts(5, 1, del_macs)
362 self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
363 self.verify_nd(src_host6, hosts6, hosts6 - deleted)
364 self.bd_add_del(1, is_add=0)
366 def test_l2bd_arp_term_08(self):
367 """ L2BD ND term - Add and update IP+mac, verify ND replies
369 src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
370 self.bd_add_del(1, is_add=1)
371 self.set_bd_flags(1, arp_term=True, flood=False,
372 uu_flood=False, learn=False)
373 macs1 = self.mac_list(range(10, 20))
374 hosts = self.ip6_hosts(5, 1, macs1)
375 self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
376 self.verify_nd(src_host, hosts, hosts)
377 macs2 = self.mac_list(range(20, 30))
378 updated = self.ip6_hosts(5, 1, macs2)
379 self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
380 self.verify_nd(src_host, hosts, updated)
381 self.bd_add_del(1, is_add=0)
383 def test_l2bd_arp_term_09(self):
384 """ L2BD arp term - send garps, verify arp event reports
386 self.vapi.want_ip4_arp_events()
387 self.bd_add_del(1, is_add=1)
388 self.set_bd_flags(1, arp_term=True, flood=False,
389 uu_flood=False, learn=False)
390 macs = self.mac_list(range(90, 95))
391 hosts = self.ip4_hosts(5, 1, macs)
393 garps = self.garp_reqs(hosts)
394 self.bd_swifs(1)[0].add_stream(garps)
396 self.pg_enable_capture(self.pg_interfaces)
398 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
399 for i in range(len(hosts))]
400 ev_hosts = self.arp_event_hosts(evs)
401 self.assertEqual(len(ev_hosts ^ hosts), 0)
403 def test_l2bd_arp_term_10(self):
404 """ L2BD arp term - send duplicate garps, verify suppression
406 macs = self.mac_list(range(70, 71))
407 hosts = self.ip4_hosts(6, 1, macs)
409 """ send the packet 5 times expect one event
411 garps = self.garp_reqs(hosts) * 5
412 self.bd_swifs(1)[0].add_stream(garps)
414 self.pg_enable_capture(self.pg_interfaces)
416 evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
417 for i in range(len(hosts))]
418 ev_hosts = self.arp_event_hosts(evs)
419 self.assertEqual(len(ev_hosts ^ hosts), 0)
421 def test_l2bd_arp_term_11(self):
422 """ L2BD arp term - disable ip4 arp events,send garps, verify no events
424 self.vapi.want_ip4_arp_events(enable_disable=0)
425 macs = self.mac_list(range(90, 95))
426 hosts = self.ip4_hosts(5, 1, macs)
428 garps = self.garp_reqs(hosts)
429 self.bd_swifs(1)[0].add_stream(garps)
431 self.pg_enable_capture(self.pg_interfaces)
434 self.assertEqual(len(self.vapi.collect_events()), 0)
435 self.bd_add_del(1, is_add=0)
437 def test_l2bd_arp_term_12(self):
438 """ L2BD ND term - send NS packets verify reports
440 self.vapi.want_ip6_nd_events(address=inet_pton(AF_INET6, "::0"))
441 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
442 self.bd_add_del(1, is_add=1)
443 self.set_bd_flags(1, arp_term=True, flood=False,
444 uu_flood=False, learn=False)
445 macs = self.mac_list(range(10, 15))
446 hosts = self.ip6_hosts(5, 1, macs)
447 reqs = self.ns_reqs_dst(hosts, dst_host)
448 self.bd_swifs(1)[0].add_stream(reqs)
450 self.pg_enable_capture(self.pg_interfaces)
452 evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
453 for i in range(len(hosts))]
454 ev_hosts = self.nd_event_hosts(evs)
455 self.assertEqual(len(ev_hosts ^ hosts), 0)
457 def test_l2bd_arp_term_13(self):
458 """ L2BD ND term - send duplicate ns, verify suppression
460 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
461 macs = self.mac_list(range(10, 11))
462 hosts = self.ip6_hosts(5, 1, macs)
463 reqs = self.ns_reqs_dst(hosts, dst_host) * 5
464 self.bd_swifs(1)[0].add_stream(reqs)
466 self.pg_enable_capture(self.pg_interfaces)
468 evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
469 for i in range(len(hosts))]
470 ev_hosts = self.nd_event_hosts(evs)
471 self.assertEqual(len(ev_hosts ^ hosts), 0)
473 def test_l2bd_arp_term_14(self):
474 """ L2BD ND term - disable ip4 arp events,send ns, verify no events
476 self.vapi.want_ip6_nd_events(enable_disable=0,
477 address=inet_pton(AF_INET6, "::0"))
478 dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
479 macs = self.mac_list(range(10, 15))
480 hosts = self.ip6_hosts(5, 1, macs)
481 reqs = self.ns_reqs_dst(hosts, dst_host)
482 self.bd_swifs(1)[0].add_stream(reqs)
484 self.pg_enable_capture(self.pg_interfaces)
487 self.assertEqual(len(self.vapi.collect_events()), 0)
488 self.bd_add_del(1, is_add=0)
491 if __name__ == '__main__':
492 unittest.main(testRunner=VppTestRunner)