tests: replace pycodestyle with black
[vpp.git] / test / test_l2bd_arp_term.py
1 #!/usr/bin/env python3
2 """ L2BD ARP term Test """
3
4 import unittest
5 import random
6 import copy
7
8 from socket import AF_INET, AF_INET6, inet_pton, inet_ntop
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP
13 from scapy.utils6 import (
14     in6_getnsma,
15     in6_getnsmac,
16     in6_ptop,
17     in6_islladdr,
18     in6_mactoifaceid,
19     in6_ismaddr,
20 )
21 from scapy.layers.inet6 import (
22     IPv6,
23     UDP,
24     ICMPv6ND_NS,
25     ICMPv6ND_RS,
26     ICMPv6ND_RA,
27     ICMPv6NDOptSrcLLAddr,
28     getmacbyip6,
29     ICMPv6MRD_Solicitation,
30     ICMPv6NDOptMTU,
31     ICMPv6NDOptSrcLLAddr,
32     ICMPv6NDOptPrefixInfo,
33     ICMPv6ND_NA,
34     ICMPv6NDOptDstLLAddr,
35     ICMPv6DestUnreach,
36     icmp6types,
37 )
38
39 from framework import VppTestCase, VppTestRunner
40 from util import Host, ppp
41
42
43 class TestL2bdArpTerm(VppTestCase):
44     """L2BD arp termination Test Case"""
45
46     @classmethod
47     def setUpClass(cls):
48         """
49         Perform standard class setup (defined by class method setUpClass in
50         class VppTestCase) before running the test case, set test case related
51         variables and configure VPP.
52         """
53         super(TestL2bdArpTerm, cls).setUpClass()
54
55         try:
56             # Create pg interfaces
57             n_bd = 1
58             cls.ifs_per_bd = ifs_per_bd = 3
59             n_ifs = n_bd * ifs_per_bd
60             cls.create_pg_interfaces(range(n_ifs))
61
62             # Set up all interfaces
63             for i in cls.pg_interfaces:
64                 i.admin_up()
65
66             cls.hosts = set()
67
68         except Exception:
69             super(TestL2bdArpTerm, cls).tearDownClass()
70             raise
71
72     @classmethod
73     def tearDownClass(cls):
74         super(TestL2bdArpTerm, cls).tearDownClass()
75
76     def setUp(self):
77         """
78         Clear trace and packet infos before running each test.
79         """
80         self.reset_packet_infos()
81         super(TestL2bdArpTerm, self).setUp()
82
83     def tearDown(self):
84         """
85         Show various debug prints after each test.
86         """
87         super(TestL2bdArpTerm, self).tearDown()
88
89     def show_commands_at_teardown(self):
90         self.logger.info(self.vapi.ppcli("show l2fib verbose"))
91         # many tests delete bridge-domain 1 as the last task.  don't output
92         # the details of a non-existent bridge-domain.
93         if self.vapi.l2_fib_table_dump(bd_id=1):
94             self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
95
96     def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
97         for e in entries:
98             ip = e.ip4 if is_ipv6 == 0 else e.ip6
99             self.vapi.bd_ip_mac_add_del(
100                 is_add=is_add, entry={"bd_id": bd_id, "ip": ip, "mac": e.mac}
101             )
102
103     @classmethod
104     def mac_list(cls, b6_range):
105         return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
106
107     @classmethod
108     def ip4_host(cls, subnet, host, mac):
109         return Host(mac=mac, ip4="172.17.1%02u.%u" % (subnet, host))
110
111     @classmethod
112     def ip4_hosts(cls, subnet, start, mac_list):
113         return {
114             cls.ip4_host(subnet, start + j, mac_list[j]) for j in range(len(mac_list))
115         }
116
117     @classmethod
118     def ip6_host(cls, subnet, host, mac):
119         return Host(mac=mac, ip6="fd01:%x::%x" % (subnet, host))
120
121     @classmethod
122     def ip6_hosts(cls, subnet, start, mac_list):
123         return {
124             cls.ip6_host(subnet, start + j, mac_list[j]) for j in range(len(mac_list))
125         }
126
127     @classmethod
128     def bd_swifs(cls, b):
129         n = cls.ifs_per_bd
130         start = (b - 1) * n
131         return [cls.pg_interfaces[j] for j in range(start, start + n)]
132
133     def bd_add_del(self, bd_id=1, is_add=1):
134         if is_add:
135             self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
136         for swif in self.bd_swifs(bd_id):
137             swif_idx = swif.sw_if_index
138             self.vapi.sw_interface_set_l2_bridge(
139                 rx_sw_if_index=swif_idx, bd_id=bd_id, enable=is_add
140             )
141         if not is_add:
142             self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
143
144     @classmethod
145     def arp_req(cls, src_host, host):
146         return Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) / ARP(
147             op="who-has", hwsrc=src_host.bin_mac, pdst=host.ip4, psrc=src_host.ip4
148         )
149
150     @classmethod
151     def arp_reqs(cls, src_host, entries):
152         return [cls.arp_req(src_host, e) for e in entries]
153
154     @classmethod
155     def garp_req(cls, host):
156         return cls.arp_req(host, host)
157
158     @classmethod
159     def garp_reqs(cls, entries):
160         return [cls.garp_req(e) for e in entries]
161
162     def arp_resp_host(self, src_host, arp_resp):
163         ether = arp_resp[Ether]
164         self.assertEqual(ether.dst, src_host.mac)
165
166         arp = arp_resp[ARP]
167         self.assertEqual(arp.hwtype, 1)
168         self.assertEqual(arp.ptype, 0x800)
169         self.assertEqual(arp.hwlen, 6)
170         self.assertEqual(arp.plen, 4)
171         arp_opts = {"who-has": 1, "is-at": 2}
172         self.assertEqual(arp.op, arp_opts["is-at"])
173         self.assertEqual(arp.hwdst, src_host.mac)
174         self.assertEqual(arp.pdst, src_host.ip4)
175         return Host(mac=arp.hwsrc, ip4=arp.psrc)
176
177     def arp_resp_hosts(self, src_host, pkts):
178         return {self.arp_resp_host(src_host, p) for p in pkts}
179
180     @staticmethod
181     def inttoip4(ip):
182         o1 = int(ip / 16777216) % 256
183         o2 = int(ip / 65536) % 256
184         o3 = int(ip / 256) % 256
185         o4 = int(ip) % 256
186         return "%s.%s.%s.%s" % (o1, o2, o3, o4)
187
188     def arp_event_host(self, e):
189         return Host(str(e.mac), ip4=str(e.ip))
190
191     def arp_event_hosts(self, evs):
192         return {self.arp_event_host(e) for e in evs}
193
194     def nd_event_host(self, e):
195         return Host(str(e.mac), ip6=str(e.ip))
196
197     def nd_event_hosts(self, evs):
198         return {self.nd_event_host(e) for e in evs}
199
200     @classmethod
201     def ns_req(cls, src_host, host):
202         nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
203         d = inet_ntop(AF_INET6, nsma)
204         return (
205             Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac)
206             / IPv6(dst=d, src=src_host.ip6)
207             / ICMPv6ND_NS(tgt=host.ip6)
208             / ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac)
209         )
210
211     @classmethod
212     def ns_reqs_dst(cls, entries, dst_host):
213         return [cls.ns_req(e, dst_host) for e in entries]
214
215     @classmethod
216     def ns_reqs_src(cls, src_host, entries):
217         return [cls.ns_req(src_host, e) for e in entries]
218
219     def na_resp_host(self, src_host, rx):
220         self.assertEqual(rx[Ether].dst, src_host.mac)
221         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(src_host.ip6))
222
223         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
224         self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
225
226         na = rx[ICMPv6ND_NA]
227         return Host(mac=na.lladdr, ip6=na.tgt)
228
229     def na_resp_hosts(self, src_host, pkts):
230         return {self.na_resp_host(src_host, p) for p in pkts}
231
232     def set_bd_flags(self, bd_id, **args):
233         """
234         Enable/disable defined feature(s) of the bridge domain.
235
236         :param int bd_id: Bridge domain ID.
237         :param list args: List of feature/status pairs. Allowed features: \
238         learn, forward, flood, uu_flood and arp_term. Status False means \
239         disable, status True means enable the feature.
240         :raise: ValueError in case of unknown feature in the input.
241         """
242         for flag in args:
243             if flag == "learn":
244                 feature_bitmap = 1 << 0
245             elif flag == "forward":
246                 feature_bitmap = 1 << 1
247             elif flag == "flood":
248                 feature_bitmap = 1 << 2
249             elif flag == "uu_flood":
250                 feature_bitmap = 1 << 3
251             elif flag == "arp_term":
252                 feature_bitmap = 1 << 4
253             else:
254                 raise ValueError("Unknown feature used: %s" % flag)
255             is_set = 1 if args[flag] else 0
256             self.vapi.bridge_flags(bd_id=bd_id, is_set=is_set, flags=feature_bitmap)
257         self.logger.info("Bridge domain ID %d updated" % bd_id)
258
259     def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
260         reqs = self.arp_reqs(src_host, req_hosts)
261
262         for swif in self.bd_swifs(bd_id):
263             swif.add_stream(reqs)
264
265         self.pg_enable_capture(self.pg_interfaces)
266         self.pg_start()
267
268         for swif in self.bd_swifs(bd_id):
269             resp_pkts = swif.get_capture(len(resp_hosts))
270             resps = self.arp_resp_hosts(src_host, resp_pkts)
271             self.assertEqual(len(resps ^ resp_hosts), 0)
272
273     def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
274         reqs = self.ns_reqs_src(src_host, req_hosts)
275
276         for swif in self.bd_swifs(bd_id):
277             swif.add_stream(reqs)
278
279         self.pg_enable_capture(self.pg_interfaces)
280         self.pg_start()
281
282         for swif in self.bd_swifs(bd_id):
283             resp_pkts = swif.get_capture(len(resp_hosts))
284             resps = self.na_resp_hosts(src_host, resp_pkts)
285             self.assertEqual(len(resps ^ resp_hosts), 0)
286
287     def test_l2bd_arp_term_01(self):
288         """L2BD arp term - add 5 hosts, verify arp responses"""
289         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
290         self.bd_add_del(1, is_add=1)
291         self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
292         macs = self.mac_list(range(1, 5))
293         hosts = self.ip4_hosts(4, 1, macs)
294         self.add_del_arp_term_hosts(hosts, is_add=1)
295
296         self.verify_arp(src_host, hosts, hosts)
297         type(self).hosts = hosts
298
299     def test_l2bd_arp_term_02(self):
300         """L2BD arp term - delete 3 hosts, verify arp responses"""
301         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
302         macs = self.mac_list(range(1, 3))
303         deleted = self.ip4_hosts(4, 1, macs)
304         self.add_del_arp_term_hosts(deleted, is_add=0)
305         remaining = self.hosts - deleted
306         self.verify_arp(src_host, self.hosts, remaining)
307         type(self).hosts = remaining
308         self.bd_add_del(1, is_add=0)
309
310     def test_l2bd_arp_term_03(self):
311         """L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses"""
312         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
313         self.bd_add_del(1, is_add=1)
314         self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
315         macs = self.mac_list(range(1, 3))
316         readded = self.ip4_hosts(4, 1, macs)
317         self.add_del_arp_term_hosts(readded, is_add=1)
318         self.verify_arp(src_host, self.hosts | readded, readded)
319         type(self).hosts = readded
320
321     def test_l2bd_arp_term_04(self):
322         """L2BD arp term - 2 IP4 addrs per host"""
323         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
324         macs = self.mac_list(range(1, 3))
325         sub5_hosts = self.ip4_hosts(5, 1, macs)
326         self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
327         hosts = self.hosts | sub5_hosts
328         self.verify_arp(src_host, hosts, hosts)
329         type(self).hosts = hosts
330         self.bd_add_del(1, is_add=0)
331
332     def test_l2bd_arp_term_05(self):
333         """L2BD arp term - create and update 10 IP4-mac pairs"""
334         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
335         self.bd_add_del(1, is_add=1)
336         self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
337         macs1 = self.mac_list(range(10, 20))
338         hosts1 = self.ip4_hosts(5, 1, macs1)
339         self.add_del_arp_term_hosts(hosts1, is_add=1)
340         self.verify_arp(src_host, hosts1, hosts1)
341         macs2 = self.mac_list(range(20, 30))
342         hosts2 = self.ip4_hosts(5, 1, macs2)
343         self.add_del_arp_term_hosts(hosts2, is_add=1)
344         self.verify_arp(src_host, hosts1, hosts2)
345         self.bd_add_del(1, is_add=0)
346
347     def test_l2bd_arp_term_06(self):
348         """L2BD arp/ND term - hosts with both ip4/ip6"""
349         src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
350         src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
351         self.bd_add_del(1, is_add=1)
352         # enable flood to make sure requests are not flooded
353         self.set_bd_flags(1, arp_term=True, flood=True, uu_flood=False, learn=False)
354         macs = self.mac_list(range(10, 20))
355         hosts6 = self.ip6_hosts(5, 1, macs)
356         hosts4 = self.ip4_hosts(5, 1, macs)
357         self.add_del_arp_term_hosts(hosts4, is_add=1)
358         self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
359         self.verify_arp(src_host4, hosts4, hosts4)
360         self.verify_nd(src_host6, hosts6, hosts6)
361         self.bd_add_del(1, is_add=0)
362
363     def test_l2bd_arp_term_07(self):
364         """L2BD ND term - Add and Del hosts, verify ND replies"""
365         src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
366         self.bd_add_del(1, is_add=1)
367         self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
368         macs = self.mac_list(range(10, 20))
369         hosts6 = self.ip6_hosts(5, 1, macs)
370         self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
371         self.verify_nd(src_host6, hosts6, hosts6)
372         del_macs = self.mac_list(range(10, 15))
373         deleted = self.ip6_hosts(5, 1, del_macs)
374         self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
375         self.verify_nd(src_host6, hosts6, hosts6 - deleted)
376         self.bd_add_del(1, is_add=0)
377
378     def test_l2bd_arp_term_08(self):
379         """L2BD ND term - Add and update IP+mac, verify ND replies"""
380         src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
381         self.bd_add_del(1, is_add=1)
382         self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
383         macs1 = self.mac_list(range(10, 20))
384         hosts = self.ip6_hosts(5, 1, macs1)
385         self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
386         self.verify_nd(src_host, hosts, hosts)
387         macs2 = self.mac_list(range(20, 30))
388         updated = self.ip6_hosts(5, 1, macs2)
389         self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
390         self.verify_nd(src_host, hosts, updated)
391         self.bd_add_del(1, is_add=0)
392
393     def test_l2bd_arp_term_09(self):
394         """L2BD arp term - send garps, verify arp event reports"""
395         self.vapi.want_l2_arp_term_events(enable=1)
396         self.bd_add_del(1, is_add=1)
397         self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
398         macs = self.mac_list(range(90, 95))
399         hosts = self.ip4_hosts(5, 1, macs)
400
401         garps = self.garp_reqs(hosts)
402         self.bd_swifs(1)[0].add_stream(garps)
403
404         self.pg_enable_capture(self.pg_interfaces)
405         self.pg_start()
406         evs = [
407             self.vapi.wait_for_event(1, "l2_arp_term_event") for i in range(len(hosts))
408         ]
409         ev_hosts = self.arp_event_hosts(evs)
410         self.assertEqual(len(ev_hosts ^ hosts), 0)
411
412     def test_l2bd_arp_term_10(self):
413         """L2BD arp term - send duplicate garps, verify suppression"""
414         macs = self.mac_list(range(70, 71))
415         hosts = self.ip4_hosts(6, 1, macs)
416
417         """ send the packet 5 times expect one event
418         """
419         garps = self.garp_reqs(hosts) * 5
420         self.bd_swifs(1)[0].add_stream(garps)
421
422         self.pg_enable_capture(self.pg_interfaces)
423         self.pg_start()
424         evs = [
425             self.vapi.wait_for_event(1, "l2_arp_term_event") for i in range(len(hosts))
426         ]
427         ev_hosts = self.arp_event_hosts(evs)
428         self.assertEqual(len(ev_hosts ^ hosts), 0)
429
430     def test_l2bd_arp_term_11(self):
431         """L2BD arp term - disable ip4 arp events,send garps, verify no events"""
432         self.vapi.want_l2_arp_term_events(enable=0)
433         macs = self.mac_list(range(90, 95))
434         hosts = self.ip4_hosts(5, 1, macs)
435
436         garps = self.garp_reqs(hosts)
437         self.bd_swifs(1)[0].add_stream(garps)
438
439         self.pg_enable_capture(self.pg_interfaces)
440         self.pg_start()
441         self.sleep(1)
442         self.assertEqual(len(self.vapi.collect_events()), 0)
443         self.bd_add_del(1, is_add=0)
444
445     def test_l2bd_arp_term_12(self):
446         """L2BD ND term - send NS packets verify reports"""
447         self.vapi.want_l2_arp_term_events(enable=1)
448         dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
449         self.bd_add_del(1, is_add=1)
450         self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False)
451         macs = self.mac_list(range(10, 15))
452         hosts = self.ip6_hosts(5, 1, macs)
453         reqs = self.ns_reqs_dst(hosts, dst_host)
454         self.bd_swifs(1)[0].add_stream(reqs)
455
456         self.pg_enable_capture(self.pg_interfaces)
457         self.pg_start()
458         evs = [
459             self.vapi.wait_for_event(2, "l2_arp_term_event") for i in range(len(hosts))
460         ]
461         ev_hosts = self.nd_event_hosts(evs)
462         self.assertEqual(len(ev_hosts ^ hosts), 0)
463
464     def test_l2bd_arp_term_13(self):
465         """L2BD ND term - send duplicate ns, verify suppression"""
466         dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
467         macs = self.mac_list(range(16, 17))
468         hosts = self.ip6_hosts(5, 1, macs)
469         reqs = self.ns_reqs_dst(hosts, dst_host) * 5
470         self.bd_swifs(1)[0].add_stream(reqs)
471
472         self.pg_enable_capture(self.pg_interfaces)
473         self.pg_start()
474         evs = [
475             self.vapi.wait_for_event(2, "l2_arp_term_event") for i in range(len(hosts))
476         ]
477         ev_hosts = self.nd_event_hosts(evs)
478         self.assertEqual(len(ev_hosts ^ hosts), 0)
479
480     def test_l2bd_arp_term_14(self):
481         """L2BD ND term - disable ip4 arp events,send ns, verify no events"""
482         self.vapi.want_l2_arp_term_events(enable=0)
483         dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
484         macs = self.mac_list(range(10, 15))
485         hosts = self.ip6_hosts(5, 1, macs)
486         reqs = self.ns_reqs_dst(hosts, dst_host)
487         self.bd_swifs(1)[0].add_stream(reqs)
488
489         self.pg_enable_capture(self.pg_interfaces)
490         self.pg_start()
491         self.sleep(1)
492         self.assertEqual(len(self.vapi.collect_events()), 0)
493         self.bd_add_del(1, is_add=0)
494
495
496 if __name__ == "__main__":
497     unittest.main(testRunner=VppTestRunner)