TEST:add l2bd nd term tests
[vpp.git] / test / test_l2bd_arp_term.py
1 #!/usr/bin/env python
2 """ L2BD ARP term Test """
3
4 import unittest
5 import random
6 import copy
7
8 from socket import AF_INET6
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP
13 from scapy.utils import inet_pton, inet_ntop
14 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
15     in6_mactoifaceid, in6_ismaddr
16 from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
17     ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
18     ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
19     ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
20
21 from framework import VppTestCase, VppTestRunner
22 from util import Host, ppp, mactobinary
23
24
25 class TestL2bdArpTerm(VppTestCase):
26     """ L2BD arp termination Test Case """
27
28     @classmethod
29     def setUpClass(cls):
30         """
31         Perform standard class setup (defined by class method setUpClass in
32         class VppTestCase) before running the test case, set test case related
33         variables and configure VPP.
34         """
35         super(TestL2bdArpTerm, cls).setUpClass()
36
37         try:
38             # Create pg interfaces
39             n_bd = 1
40             cls.ifs_per_bd = ifs_per_bd = 3
41             n_ifs = n_bd * ifs_per_bd
42             cls.create_pg_interfaces(range(n_ifs))
43
44             # Set up all interfaces
45             for i in cls.pg_interfaces:
46                 i.admin_up()
47
48             cls.hosts = set()
49
50         except Exception:
51             super(TestL2bdArpTerm, cls).tearDownClass()
52             raise
53
54     def setUp(self):
55         """
56         Clear trace and packet infos before running each test.
57         """
58         self.reset_packet_infos()
59         super(TestL2bdArpTerm, self).setUp()
60
61     def tearDown(self):
62         """
63         Show various debug prints after each test.
64         """
65         super(TestL2bdArpTerm, self).tearDown()
66         if not self.vpp_dead:
67             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
68             self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
69
70     def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
71         for e in entries:
72             ip = e.ip4n if is_ipv6 == 0 else e.ip6n
73             self.vapi.bd_ip_mac_add_del(bd_id=bd_id,
74                                         mac=e.bin_mac,
75                                         ip=ip,
76                                         is_ipv6=is_ipv6,
77                                         is_add=is_add)
78
79     @classmethod
80     def mac_list(cls, b6_range):
81         return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
82
83     @classmethod
84     def ip4_host(cls, subnet, host, mac):
85         return Host(mac=mac,
86                     ip4="172.17.1%02u.%u" % (subnet, host))
87
88     @classmethod
89     def ip4_hosts(cls, subnet, start, mac_list):
90         return {cls.ip4_host(subnet, start + j, mac_list[j])
91                 for j in range(len(mac_list))}
92
93     @classmethod
94     def ip6_host(cls, subnet, host, mac):
95         return Host(mac=mac,
96                     ip6="fd01:%x::%x" % (subnet, host))
97
98     @classmethod
99     def ip6_hosts(cls, subnet, start, mac_list):
100         return {cls.ip6_host(subnet, start + j, mac_list[j])
101                 for j in range(len(mac_list))}
102
103     @classmethod
104     def bd_swifs(cls, b):
105         n = cls.ifs_per_bd
106         start = (b - 1) * n
107         return [cls.pg_interfaces[j] for j in range(start, start + n)]
108
109     def bd_add_del(self, bd_id=1, is_add=1):
110         if is_add:
111             self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
112         for swif in self.bd_swifs(bd_id):
113             swif_idx = swif.sw_if_index
114             self.vapi.sw_interface_set_l2_bridge(
115                 swif_idx, bd_id=bd_id, enable=is_add)
116         if not is_add:
117             self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
118
119     @classmethod
120     def arp(cls, src_host, host):
121         return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
122                 ARP(op="who-has",
123                     hwsrc=src_host.bin_mac,
124                     pdst=host.ip4,
125                     psrc=src_host.ip4))
126
127     @classmethod
128     def arp_reqs(cls, src_host, entries):
129         return [cls.arp(src_host, e) for e in entries]
130
131     def arp_resp_host(self, src_host, arp_resp):
132         ether = arp_resp[Ether]
133         self.assertEqual(ether.dst, src_host.mac)
134
135         arp = arp_resp[ARP]
136         self.assertEqual(arp.hwtype, 1)
137         self.assertEqual(arp.ptype, 0x800)
138         self.assertEqual(arp.hwlen, 6)
139         self.assertEqual(arp.plen, 4)
140         arp_opts = {"who-has": 1, "is-at": 2}
141         self.assertEqual(arp.op, arp_opts["is-at"])
142         self.assertEqual(arp.hwdst, src_host.mac)
143         self.assertEqual(arp.pdst, src_host.ip4)
144         return Host(mac=arp.hwsrc, ip4=arp.psrc)
145
146     def arp_resp_hosts(self, src_host, pkts):
147         return {self.arp_resp_host(src_host, p) for p in pkts}
148
149     @classmethod
150     def ns_req(cls, src_host, host):
151         nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
152         d = inet_ntop(AF_INET6, nsma)
153         return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
154                 IPv6(dst=d, src=src_host.ip6) /
155                 ICMPv6ND_NS(tgt=host.ip6) /
156                 ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac))
157
158     @classmethod
159     def ns_reqs(cls, src_host, entries):
160         return [cls.ns_req(src_host, e) for e in entries]
161
162     def na_resp_host(self, src_host, rx):
163         self.assertEqual(rx[Ether].dst, src_host.mac)
164         self.assertEqual(in6_ptop(rx[IPv6].dst),
165                          in6_ptop(src_host.ip6))
166
167         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
168         self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
169
170         na = rx[ICMPv6ND_NA]
171         return Host(mac=na.lladdr, ip6=na.tgt)
172
173     def na_resp_hosts(self, src_host, pkts):
174         return {self.na_resp_host(src_host, p) for p in pkts}
175
176     def set_bd_flags(self, bd_id, **args):
177         """
178         Enable/disable defined feature(s) of the bridge domain.
179
180         :param int bd_id: Bridge domain ID.
181         :param list args: List of feature/status pairs. Allowed features: \
182         learn, forward, flood, uu_flood and arp_term. Status False means \
183         disable, status True means enable the feature.
184         :raise: ValueError in case of unknown feature in the input.
185         """
186         for flag in args:
187             if flag == "learn":
188                 feature_bitmap = 1 << 0
189             elif flag == "forward":
190                 feature_bitmap = 1 << 1
191             elif flag == "flood":
192                 feature_bitmap = 1 << 2
193             elif flag == "uu_flood":
194                 feature_bitmap = 1 << 3
195             elif flag == "arp_term":
196                 feature_bitmap = 1 << 4
197             else:
198                 raise ValueError("Unknown feature used: %s" % flag)
199             is_set = 1 if args[flag] else 0
200             self.vapi.bridge_flags(bd_id, is_set, feature_bitmap)
201         self.logger.info("Bridge domain ID %d updated" % bd_id)
202
203     def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
204         reqs = self.arp_reqs(src_host, req_hosts)
205
206         for swif in self.bd_swifs(bd_id):
207             swif.add_stream(reqs)
208
209         self.pg_enable_capture(self.pg_interfaces)
210         self.pg_start()
211
212         for swif in self.bd_swifs(bd_id):
213             resp_pkts = swif.get_capture(len(resp_hosts))
214             resps = self.arp_resp_hosts(src_host, resp_pkts)
215             self.assertEqual(len(resps ^ resp_hosts), 0)
216
217     def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
218         reqs = self.ns_reqs(src_host, req_hosts)
219
220         for swif in self.bd_swifs(bd_id):
221             swif.add_stream(reqs)
222
223         self.pg_enable_capture(self.pg_interfaces)
224         self.pg_start()
225
226         for swif in self.bd_swifs(bd_id):
227             resp_pkts = swif.get_capture(len(resp_hosts))
228             resps = self.na_resp_hosts(src_host, resp_pkts)
229             self.assertEqual(len(resps ^ resp_hosts), 0)
230
231     def test_l2bd_arp_term_01(self):
232         """ L2BD arp term - add 5 hosts, verify arp responses
233         """
234         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
235         self.bd_add_del(1, is_add=1)
236         self.set_bd_flags(1, arp_term=True, flood=False,
237                           uu_flood=False, learn=False)
238         macs = self.mac_list(range(1, 5))
239         hosts = self.ip4_hosts(4, 1, macs)
240         self.add_del_arp_term_hosts(hosts, is_add=1)
241         self.verify_arp(src_host, hosts, hosts)
242         type(self).hosts = hosts
243
244     def test_l2bd_arp_term_02(self):
245         """ L2BD arp term - delete 3 hosts, verify arp responses
246         """
247         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
248         macs = self.mac_list(range(1, 3))
249         deleted = self.ip4_hosts(4, 1, macs)
250         self.add_del_arp_term_hosts(deleted, is_add=0)
251         remaining = self.hosts - deleted
252         self.verify_arp(src_host, self.hosts, remaining)
253         type(self).hosts = remaining
254         self.bd_add_del(1, is_add=0)
255
256     def test_l2bd_arp_term_03(self):
257         """ L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses
258         """
259         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
260         self.bd_add_del(1, is_add=1)
261         self.set_bd_flags(1, arp_term=True, flood=False,
262                           uu_flood=False, learn=False)
263         macs = self.mac_list(range(1, 3))
264         readded = self.ip4_hosts(4, 1, macs)
265         self.add_del_arp_term_hosts(readded, is_add=1)
266         self.verify_arp(src_host, self.hosts | readded, readded)
267         type(self).hosts = readded
268
269     def test_l2bd_arp_term_04(self):
270         """ L2BD arp term - 2 IP4 addrs per host
271         """
272         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
273         macs = self.mac_list(range(1, 3))
274         sub5_hosts = self.ip4_hosts(5, 1, macs)
275         self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
276         hosts = self.hosts | sub5_hosts
277         self.verify_arp(src_host, hosts, hosts)
278         type(self).hosts = hosts
279         self.bd_add_del(1, is_add=0)
280
281     def test_l2bd_arp_term_05(self):
282         """ L2BD arp term - create and update 10 IP4-mac pairs
283         """
284         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
285         self.bd_add_del(1, is_add=1)
286         self.set_bd_flags(1, arp_term=True, flood=False,
287                           uu_flood=False, learn=False)
288         macs1 = self.mac_list(range(10, 20))
289         hosts1 = self.ip4_hosts(5, 1, macs1)
290         self.add_del_arp_term_hosts(hosts1, is_add=1)
291         self.verify_arp(src_host, hosts1, hosts1)
292         macs2 = self.mac_list(range(20, 30))
293         hosts2 = self.ip4_hosts(5, 1, macs2)
294         self.add_del_arp_term_hosts(hosts2, is_add=1)
295         self.verify_arp(src_host, hosts1, hosts2)
296         self.bd_add_del(1, is_add=0)
297
298     def test_l2bd_arp_term_06(self):
299         """ L2BD arp/ND term - hosts with both ip4/ip6
300         """
301         src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
302         src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
303         self.bd_add_del(1, is_add=1)
304         # enable flood to make sure requests are not flooded
305         self.set_bd_flags(1, arp_term=True, flood=True,
306                           uu_flood=False, learn=False)
307         macs = self.mac_list(range(10, 20))
308         hosts6 = self.ip6_hosts(5, 1, macs)
309         hosts4 = self.ip4_hosts(5, 1, macs)
310         self.add_del_arp_term_hosts(hosts4, is_add=1)
311         self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
312         self.verify_arp(src_host4, hosts4, hosts4)
313         self.verify_nd(src_host6, hosts6, hosts6)
314         self.bd_add_del(1, is_add=0)
315
316     def test_l2bd_arp_term_07(self):
317         """ L2BD ND term - Add and Del hosts, verify ND replies
318         """
319         src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
320         self.bd_add_del(1, is_add=1)
321         self.set_bd_flags(1, arp_term=True, flood=False,
322                           uu_flood=False, learn=False)
323         macs = self.mac_list(range(10, 20))
324         hosts6 = self.ip6_hosts(5, 1, macs)
325         self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
326         self.verify_nd(src_host6, hosts6, hosts6)
327         del_macs = self.mac_list(range(10, 15))
328         deleted = self.ip6_hosts(5, 1, del_macs)
329         self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
330         self.verify_nd(src_host6, hosts6, hosts6 - deleted)
331         self.bd_add_del(1, is_add=0)
332
333     def test_l2bd_arp_term_08(self):
334         """ L2BD ND term - Add and update IP+mac, verify ND replies
335         """
336         src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
337         self.bd_add_del(1, is_add=1)
338         self.set_bd_flags(1, arp_term=True, flood=False,
339                           uu_flood=False, learn=False)
340         macs1 = self.mac_list(range(10, 20))
341         hosts = self.ip6_hosts(5, 1, macs1)
342         self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
343         self.verify_nd(src_host, hosts, hosts)
344         macs2 = self.mac_list(range(20, 30))
345         updated = self.ip6_hosts(5, 1, macs2)
346         self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
347         self.verify_nd(src_host, hosts, updated)
348         self.bd_add_del(1, is_add=0)
349
350
351 if __name__ == '__main__':
352     unittest.main(testRunner=VppTestRunner)