VPP-1508 python3 tests locals()
[vpp.git] / test / test_l2bd_arp_term.py
1 #!/usr/bin/env python
2 """ L2BD ARP term Test """
3
4 import unittest
5 import random
6 import copy
7
8 from socket import AF_INET, AF_INET6
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP
13 from scapy.utils import inet_pton, inet_ntop
14 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
15     in6_mactoifaceid, in6_ismaddr
16 from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
17     ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
18     ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
19     ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
20
21 from framework import VppTestCase, VppTestRunner
22 from util import Host, ppp, mactobinary
23 from vpp_mac import VppMacAddress
24 from vpp_ip import VppIpAddress
25
26
27 class TestL2bdArpTerm(VppTestCase):
28     """ L2BD arp termination Test Case """
29
30     @classmethod
31     def setUpClass(cls):
32         """
33         Perform standard class setup (defined by class method setUpClass in
34         class VppTestCase) before running the test case, set test case related
35         variables and configure VPP.
36         """
37         super(TestL2bdArpTerm, cls).setUpClass()
38
39         try:
40             # Create pg interfaces
41             n_bd = 1
42             cls.ifs_per_bd = ifs_per_bd = 3
43             n_ifs = n_bd * ifs_per_bd
44             cls.create_pg_interfaces(range(n_ifs))
45
46             # Set up all interfaces
47             for i in cls.pg_interfaces:
48                 i.admin_up()
49
50             cls.hosts = set()
51
52         except Exception:
53             super(TestL2bdArpTerm, cls).tearDownClass()
54             raise
55
56     def setUp(self):
57         """
58         Clear trace and packet infos before running each test.
59         """
60         self.reset_packet_infos()
61         super(TestL2bdArpTerm, self).setUp()
62
63     def tearDown(self):
64         """
65         Show various debug prints after each test.
66         """
67         super(TestL2bdArpTerm, self).tearDown()
68         if not self.vpp_dead:
69             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
70             self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
71
72     def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
73         for e in entries:
74             ip = e.ip4 if is_ipv6 == 0 else e.ip6
75             self.vapi.bd_ip_mac_add_del(bd_id=bd_id,
76                                         mac=VppMacAddress(e.mac).encode(),
77                                         ip=VppIpAddress(ip).encode(),
78                                         is_ipv6=is_ipv6,
79                                         is_add=is_add)
80
81     @classmethod
82     def mac_list(cls, b6_range):
83         return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
84
85     @classmethod
86     def ip4_host(cls, subnet, host, mac):
87         return Host(mac=mac,
88                     ip4="172.17.1%02u.%u" % (subnet, host))
89
90     @classmethod
91     def ip4_hosts(cls, subnet, start, mac_list):
92         return {cls.ip4_host(subnet, start + j, mac_list[j])
93                 for j in range(len(mac_list))}
94
95     @classmethod
96     def ip6_host(cls, subnet, host, mac):
97         return Host(mac=mac,
98                     ip6="fd01:%x::%x" % (subnet, host))
99
100     @classmethod
101     def ip6_hosts(cls, subnet, start, mac_list):
102         return {cls.ip6_host(subnet, start + j, mac_list[j])
103                 for j in range(len(mac_list))}
104
105     @classmethod
106     def bd_swifs(cls, b):
107         n = cls.ifs_per_bd
108         start = (b - 1) * n
109         return [cls.pg_interfaces[j] for j in range(start, start + n)]
110
111     def bd_add_del(self, bd_id=1, is_add=1):
112         if is_add:
113             self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
114         for swif in self.bd_swifs(bd_id):
115             swif_idx = swif.sw_if_index
116             self.vapi.sw_interface_set_l2_bridge(
117                 swif_idx, bd_id=bd_id, enable=is_add)
118         if not is_add:
119             self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
120
121     @classmethod
122     def arp_req(cls, src_host, host):
123         return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
124                 ARP(op="who-has",
125                     hwsrc=src_host.bin_mac,
126                     pdst=host.ip4,
127                     psrc=src_host.ip4))
128
129     @classmethod
130     def arp_reqs(cls, src_host, entries):
131         return [cls.arp_req(src_host, e) for e in entries]
132
133     @classmethod
134     def garp_req(cls, host):
135         return cls.arp_req(host, host)
136
137     @classmethod
138     def garp_reqs(cls, entries):
139         return [cls.garp_req(e) for e in entries]
140
141     def arp_resp_host(self, src_host, arp_resp):
142         ether = arp_resp[Ether]
143         self.assertEqual(ether.dst, src_host.mac)
144
145         arp = arp_resp[ARP]
146         self.assertEqual(arp.hwtype, 1)
147         self.assertEqual(arp.ptype, 0x800)
148         self.assertEqual(arp.hwlen, 6)
149         self.assertEqual(arp.plen, 4)
150         arp_opts = {"who-has": 1, "is-at": 2}
151         self.assertEqual(arp.op, arp_opts["is-at"])
152         self.assertEqual(arp.hwdst, src_host.mac)
153         self.assertEqual(arp.pdst, src_host.ip4)
154         return Host(mac=arp.hwsrc, ip4=arp.psrc)
155
156     def arp_resp_hosts(self, src_host, pkts):
157         return {self.arp_resp_host(src_host, p) for p in pkts}
158
159     @staticmethod
160     def inttoip4(ip):
161         o1 = int(ip / 16777216) % 256
162         o2 = int(ip / 65536) % 256
163         o3 = int(ip / 256) % 256
164         o4 = int(ip) % 256
165         return '%s.%s.%s.%s' % (o1, o2, o3, o4)
166
167     def arp_event_host(self, e):
168         return Host(mac=':'.join(['%02x' % ord(char) for char in e.new_mac]),
169                     ip4=self.inttoip4(e.address))
170
171     def arp_event_hosts(self, evs):
172         return {self.arp_event_host(e) for e in evs}
173
174     def nd_event_host(self, e):
175         return Host(mac=':'.join(['%02x' % ord(char) for char in e.new_mac]),
176                     ip6=inet_ntop(AF_INET6, e.address))
177
178     def nd_event_hosts(self, evs):
179         return {self.nd_event_host(e) for e in evs}
180
181     @classmethod
182     def ns_req(cls, src_host, host):
183         nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
184         d = inet_ntop(AF_INET6, nsma)
185         return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
186                 IPv6(dst=d, src=src_host.ip6) /
187                 ICMPv6ND_NS(tgt=host.ip6) /
188                 ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac))
189
190     @classmethod
191     def ns_reqs_dst(cls, entries, dst_host):
192         return [cls.ns_req(e, dst_host) for e in entries]
193
194     @classmethod
195     def ns_reqs_src(cls, src_host, entries):
196         return [cls.ns_req(src_host, e) for e in entries]
197
198     def na_resp_host(self, src_host, rx):
199         self.assertEqual(rx[Ether].dst, src_host.mac)
200         self.assertEqual(in6_ptop(rx[IPv6].dst),
201                          in6_ptop(src_host.ip6))
202
203         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
204         self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
205
206         na = rx[ICMPv6ND_NA]
207         return Host(mac=na.lladdr, ip6=na.tgt)
208
209     def na_resp_hosts(self, src_host, pkts):
210         return {self.na_resp_host(src_host, p) for p in pkts}
211
212     def set_bd_flags(self, bd_id, **args):
213         """
214         Enable/disable defined feature(s) of the bridge domain.
215
216         :param int bd_id: Bridge domain ID.
217         :param list args: List of feature/status pairs. Allowed features: \
218         learn, forward, flood, uu_flood and arp_term. Status False means \
219         disable, status True means enable the feature.
220         :raise: ValueError in case of unknown feature in the input.
221         """
222         for flag in args:
223             if flag == "learn":
224                 feature_bitmap = 1 << 0
225             elif flag == "forward":
226                 feature_bitmap = 1 << 1
227             elif flag == "flood":
228                 feature_bitmap = 1 << 2
229             elif flag == "uu_flood":
230                 feature_bitmap = 1 << 3
231             elif flag == "arp_term":
232                 feature_bitmap = 1 << 4
233             else:
234                 raise ValueError("Unknown feature used: %s" % flag)
235             is_set = 1 if args[flag] else 0
236             self.vapi.bridge_flags(bd_id, is_set, feature_bitmap)
237         self.logger.info("Bridge domain ID %d updated" % bd_id)
238
239     def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
240         reqs = self.arp_reqs(src_host, req_hosts)
241
242         for swif in self.bd_swifs(bd_id):
243             swif.add_stream(reqs)
244
245         self.pg_enable_capture(self.pg_interfaces)
246         self.pg_start()
247
248         for swif in self.bd_swifs(bd_id):
249             resp_pkts = swif.get_capture(len(resp_hosts))
250             resps = self.arp_resp_hosts(src_host, resp_pkts)
251             self.assertEqual(len(resps ^ resp_hosts), 0)
252
253     def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
254         reqs = self.ns_reqs_src(src_host, req_hosts)
255
256         for swif in self.bd_swifs(bd_id):
257             swif.add_stream(reqs)
258
259         self.pg_enable_capture(self.pg_interfaces)
260         self.pg_start()
261
262         for swif in self.bd_swifs(bd_id):
263             resp_pkts = swif.get_capture(len(resp_hosts))
264             resps = self.na_resp_hosts(src_host, resp_pkts)
265             self.assertEqual(len(resps ^ resp_hosts), 0)
266
267     def test_l2bd_arp_term_01(self):
268         """ L2BD arp term - add 5 hosts, verify arp responses
269         """
270         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
271         self.bd_add_del(1, is_add=1)
272         self.set_bd_flags(1, arp_term=True, flood=False,
273                           uu_flood=False, learn=False)
274         macs = self.mac_list(range(1, 5))
275         hosts = self.ip4_hosts(4, 1, macs)
276         self.add_del_arp_term_hosts(hosts, is_add=1)
277
278         self.verify_arp(src_host, hosts, hosts)
279         type(self).hosts = hosts
280
281     def test_l2bd_arp_term_02(self):
282         """ L2BD arp term - delete 3 hosts, verify arp responses
283         """
284         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
285         macs = self.mac_list(range(1, 3))
286         deleted = self.ip4_hosts(4, 1, macs)
287         self.add_del_arp_term_hosts(deleted, is_add=0)
288         remaining = self.hosts - deleted
289         self.verify_arp(src_host, self.hosts, remaining)
290         type(self).hosts = remaining
291         self.bd_add_del(1, is_add=0)
292
293     def test_l2bd_arp_term_03(self):
294         """ L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses
295         """
296         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
297         self.bd_add_del(1, is_add=1)
298         self.set_bd_flags(1, arp_term=True, flood=False,
299                           uu_flood=False, learn=False)
300         macs = self.mac_list(range(1, 3))
301         readded = self.ip4_hosts(4, 1, macs)
302         self.add_del_arp_term_hosts(readded, is_add=1)
303         self.verify_arp(src_host, self.hosts | readded, readded)
304         type(self).hosts = readded
305
306     def test_l2bd_arp_term_04(self):
307         """ L2BD arp term - 2 IP4 addrs per host
308         """
309         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
310         macs = self.mac_list(range(1, 3))
311         sub5_hosts = self.ip4_hosts(5, 1, macs)
312         self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
313         hosts = self.hosts | sub5_hosts
314         self.verify_arp(src_host, hosts, hosts)
315         type(self).hosts = hosts
316         self.bd_add_del(1, is_add=0)
317
318     def test_l2bd_arp_term_05(self):
319         """ L2BD arp term - create and update 10 IP4-mac pairs
320         """
321         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
322         self.bd_add_del(1, is_add=1)
323         self.set_bd_flags(1, arp_term=True, flood=False,
324                           uu_flood=False, learn=False)
325         macs1 = self.mac_list(range(10, 20))
326         hosts1 = self.ip4_hosts(5, 1, macs1)
327         self.add_del_arp_term_hosts(hosts1, is_add=1)
328         self.verify_arp(src_host, hosts1, hosts1)
329         macs2 = self.mac_list(range(20, 30))
330         hosts2 = self.ip4_hosts(5, 1, macs2)
331         self.add_del_arp_term_hosts(hosts2, is_add=1)
332         self.verify_arp(src_host, hosts1, hosts2)
333         self.bd_add_del(1, is_add=0)
334
335     def test_l2bd_arp_term_06(self):
336         """ L2BD arp/ND term - hosts with both ip4/ip6
337         """
338         src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
339         src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
340         self.bd_add_del(1, is_add=1)
341         # enable flood to make sure requests are not flooded
342         self.set_bd_flags(1, arp_term=True, flood=True,
343                           uu_flood=False, learn=False)
344         macs = self.mac_list(range(10, 20))
345         hosts6 = self.ip6_hosts(5, 1, macs)
346         hosts4 = self.ip4_hosts(5, 1, macs)
347         self.add_del_arp_term_hosts(hosts4, is_add=1)
348         self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
349         self.verify_arp(src_host4, hosts4, hosts4)
350         self.verify_nd(src_host6, hosts6, hosts6)
351         self.bd_add_del(1, is_add=0)
352
353     def test_l2bd_arp_term_07(self):
354         """ L2BD ND term - Add and Del hosts, verify ND replies
355         """
356         src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
357         self.bd_add_del(1, is_add=1)
358         self.set_bd_flags(1, arp_term=True, flood=False,
359                           uu_flood=False, learn=False)
360         macs = self.mac_list(range(10, 20))
361         hosts6 = self.ip6_hosts(5, 1, macs)
362         self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
363         self.verify_nd(src_host6, hosts6, hosts6)
364         del_macs = self.mac_list(range(10, 15))
365         deleted = self.ip6_hosts(5, 1, del_macs)
366         self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
367         self.verify_nd(src_host6, hosts6, hosts6 - deleted)
368         self.bd_add_del(1, is_add=0)
369
370     def test_l2bd_arp_term_08(self):
371         """ L2BD ND term - Add and update IP+mac, verify ND replies
372         """
373         src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
374         self.bd_add_del(1, is_add=1)
375         self.set_bd_flags(1, arp_term=True, flood=False,
376                           uu_flood=False, learn=False)
377         macs1 = self.mac_list(range(10, 20))
378         hosts = self.ip6_hosts(5, 1, macs1)
379         self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
380         self.verify_nd(src_host, hosts, hosts)
381         macs2 = self.mac_list(range(20, 30))
382         updated = self.ip6_hosts(5, 1, macs2)
383         self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
384         self.verify_nd(src_host, hosts, updated)
385         self.bd_add_del(1, is_add=0)
386
387     def test_l2bd_arp_term_09(self):
388         """ L2BD arp term - send garps, verify arp event reports
389         """
390         self.vapi.want_ip4_arp_events()
391         self.bd_add_del(1, is_add=1)
392         self.set_bd_flags(1, arp_term=True, flood=False,
393                           uu_flood=False, learn=False)
394         macs = self.mac_list(range(90, 95))
395         hosts = self.ip4_hosts(5, 1, macs)
396
397         garps = self.garp_reqs(hosts)
398         self.bd_swifs(1)[0].add_stream(garps)
399
400         self.pg_enable_capture(self.pg_interfaces)
401         self.pg_start()
402         evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
403                for i in range(len(hosts))]
404         ev_hosts = self.arp_event_hosts(evs)
405         self.assertEqual(len(ev_hosts ^ hosts), 0)
406
407     def test_l2bd_arp_term_10(self):
408         """ L2BD arp term - send duplicate garps, verify suppression
409         """
410         macs = self.mac_list(range(70, 71))
411         hosts = self.ip4_hosts(6, 1, macs)
412
413         """ send the packet 5 times expect one event
414         """
415         garps = self.garp_reqs(hosts) * 5
416         self.bd_swifs(1)[0].add_stream(garps)
417
418         self.pg_enable_capture(self.pg_interfaces)
419         self.pg_start()
420         evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
421                for i in range(len(hosts))]
422         ev_hosts = self.arp_event_hosts(evs)
423         self.assertEqual(len(ev_hosts ^ hosts), 0)
424
425     def test_l2bd_arp_term_11(self):
426         """ L2BD arp term - disable ip4 arp events,send garps, verify no events
427         """
428         self.vapi.want_ip4_arp_events(enable_disable=0)
429         macs = self.mac_list(range(90, 95))
430         hosts = self.ip4_hosts(5, 1, macs)
431
432         garps = self.garp_reqs(hosts)
433         self.bd_swifs(1)[0].add_stream(garps)
434
435         self.pg_enable_capture(self.pg_interfaces)
436         self.pg_start()
437         self.sleep(1)
438         self.assertEqual(len(self.vapi.collect_events()), 0)
439         self.bd_add_del(1, is_add=0)
440
441     def test_l2bd_arp_term_12(self):
442         """ L2BD ND term - send NS packets verify reports
443         """
444         self.vapi.want_ip6_nd_events(address=inet_pton(AF_INET6, "::0"))
445         dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
446         self.bd_add_del(1, is_add=1)
447         self.set_bd_flags(1, arp_term=True, flood=False,
448                           uu_flood=False, learn=False)
449         macs = self.mac_list(range(10, 15))
450         hosts = self.ip6_hosts(5, 1, macs)
451         reqs = self.ns_reqs_dst(hosts, dst_host)
452         self.bd_swifs(1)[0].add_stream(reqs)
453
454         self.pg_enable_capture(self.pg_interfaces)
455         self.pg_start()
456         evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
457                for i in range(len(hosts))]
458         ev_hosts = self.nd_event_hosts(evs)
459         self.assertEqual(len(ev_hosts ^ hosts), 0)
460
461     def test_l2bd_arp_term_13(self):
462         """ L2BD ND term - send duplicate ns, verify suppression
463         """
464         dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
465         macs = self.mac_list(range(10, 11))
466         hosts = self.ip6_hosts(5, 1, macs)
467         reqs = self.ns_reqs_dst(hosts, dst_host) * 5
468         self.bd_swifs(1)[0].add_stream(reqs)
469
470         self.pg_enable_capture(self.pg_interfaces)
471         self.pg_start()
472         evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
473                for i in range(len(hosts))]
474         ev_hosts = self.nd_event_hosts(evs)
475         self.assertEqual(len(ev_hosts ^ hosts), 0)
476
477     def test_l2bd_arp_term_14(self):
478         """ L2BD ND term - disable ip4 arp events,send ns, verify no events
479         """
480         self.vapi.want_ip6_nd_events(enable_disable=0,
481                                      address=inet_pton(AF_INET6, "::0"))
482         dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
483         macs = self.mac_list(range(10, 15))
484         hosts = self.ip6_hosts(5, 1, macs)
485         reqs = self.ns_reqs_dst(hosts, dst_host)
486         self.bd_swifs(1)[0].add_stream(reqs)
487
488         self.pg_enable_capture(self.pg_interfaces)
489         self.pg_start()
490         self.sleep(1)
491         self.assertEqual(len(self.vapi.collect_events()), 0)
492         self.bd_add_del(1, is_add=0)
493
494
495 if __name__ == '__main__':
496     unittest.main(testRunner=VppTestRunner)