build: add ability to disable some plugins from packaging and tests
[vpp.git] / test / test_abf.py
1 #!/usr/bin/env python3
2
3 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
4 import unittest
5
6 from config import config
7
8 from framework import VppTestCase, VppTestRunner
9 from vpp_ip import DpoProto
10 from vpp_ip_route import (
11     VppIpRoute,
12     VppRoutePath,
13     VppMplsLabel,
14     VppIpTable,
15     FibPathProto,
16 )
17 from vpp_acl import AclRule, VppAcl
18
19 from scapy.packet import Raw
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet import IP, UDP
22 from scapy.layers.inet6 import IPv6
23 from ipaddress import IPv4Network, IPv6Network
24
25 from vpp_object import VppObject
26
27 NUM_PKTS = 67
28
29
30 def find_abf_policy(test, id):
31     policies = test.vapi.abf_policy_dump()
32     for p in policies:
33         if id == p.policy.policy_id:
34             return True
35     return False
36
37
38 def find_abf_itf_attach(test, id, sw_if_index):
39     attachs = test.vapi.abf_itf_attach_dump()
40     for a in attachs:
41         if id == a.attach.policy_id and sw_if_index == a.attach.sw_if_index:
42             return True
43     return False
44
45
46 class VppAbfPolicy(VppObject):
47     def __init__(self, test, policy_id, acl, paths):
48         self._test = test
49         self.policy_id = policy_id
50         self.acl = acl
51         self.paths = paths
52         self.encoded_paths = []
53         for path in self.paths:
54             self.encoded_paths.append(path.encode())
55
56     def add_vpp_config(self):
57         self._test.vapi.abf_policy_add_del(
58             1,
59             {
60                 "policy_id": self.policy_id,
61                 "acl_index": self.acl.acl_index,
62                 "n_paths": len(self.paths),
63                 "paths": self.encoded_paths,
64             },
65         )
66         self._test.registry.register(self, self._test.logger)
67
68     def remove_vpp_config(self):
69         self._test.vapi.abf_policy_add_del(
70             0,
71             {
72                 "policy_id": self.policy_id,
73                 "acl_index": self.acl.acl_index,
74                 "n_paths": len(self.paths),
75                 "paths": self.encoded_paths,
76             },
77         )
78
79     def query_vpp_config(self):
80         return find_abf_policy(self._test, self.policy_id)
81
82     def object_id(self):
83         return "abf-policy-%d" % self.policy_id
84
85
86 class VppAbfAttach(VppObject):
87     def __init__(self, test, policy_id, sw_if_index, priority, is_ipv6=0):
88         self._test = test
89         self.policy_id = policy_id
90         self.sw_if_index = sw_if_index
91         self.priority = priority
92         self.is_ipv6 = is_ipv6
93
94     def add_vpp_config(self):
95         self._test.vapi.abf_itf_attach_add_del(
96             1,
97             {
98                 "policy_id": self.policy_id,
99                 "sw_if_index": self.sw_if_index,
100                 "priority": self.priority,
101                 "is_ipv6": self.is_ipv6,
102             },
103         )
104         self._test.registry.register(self, self._test.logger)
105
106     def remove_vpp_config(self):
107         self._test.vapi.abf_itf_attach_add_del(
108             0,
109             {
110                 "policy_id": self.policy_id,
111                 "sw_if_index": self.sw_if_index,
112                 "priority": self.priority,
113                 "is_ipv6": self.is_ipv6,
114             },
115         )
116
117     def query_vpp_config(self):
118         return find_abf_itf_attach(self._test, self.policy_id, self.sw_if_index)
119
120     def object_id(self):
121         return "abf-attach-%d-%d" % (self.policy_id, self.sw_if_index)
122
123
124 @unittest.skipIf(
125     "acl" in config.excluded_plugins,
126     "Exclude ABF plugin tests due to absence of ACL plugin",
127 )
128 @unittest.skipIf("abf" in config.excluded_plugins, "Exclude ABF plugin tests")
129 class TestAbf(VppTestCase):
130     """ABF Test Case"""
131
132     @classmethod
133     def setUpClass(cls):
134         super(TestAbf, cls).setUpClass()
135
136     @classmethod
137     def tearDownClass(cls):
138         super(TestAbf, cls).tearDownClass()
139
140     def setUp(self):
141         super(TestAbf, self).setUp()
142
143         self.create_pg_interfaces(range(5))
144
145         for i in self.pg_interfaces[:4]:
146             i.admin_up()
147             i.config_ip4()
148             i.resolve_arp()
149             i.config_ip6()
150             i.resolve_ndp()
151
152     def tearDown(self):
153         for i in self.pg_interfaces:
154             i.unconfig_ip4()
155             i.unconfig_ip6()
156             i.admin_down()
157         super(TestAbf, self).tearDown()
158
159     def test_abf4(self):
160         """IPv4 ACL Based Forwarding"""
161
162         #
163         # We are not testing the various matching capabilities
164         # of ACLs, that's done elsewhere. Here ware are testing
165         # the application of ACLs to a forwarding path to achieve
166         # ABF
167         # So we construct just a few ACLs to ensure the ABF policies
168         # are correctly constructed and used. And a few path types
169         # to test the API path decoding.
170         #
171
172         #
173         # Rule 1
174         #
175         rule_1 = AclRule(
176             is_permit=1,
177             proto=17,
178             ports=1234,
179             src_prefix=IPv4Network("1.1.1.1/32"),
180             dst_prefix=IPv4Network("1.1.1.2/32"),
181         )
182         acl_1 = VppAcl(self, rules=[rule_1])
183         acl_1.add_vpp_config()
184
185         #
186         # ABF policy for ACL 1 - path via interface 1
187         #
188         abf_1 = VppAbfPolicy(
189             self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
190         )
191         abf_1.add_vpp_config()
192
193         #
194         # Attach the policy to input interface Pg0
195         #
196         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
197         attach_1.add_vpp_config()
198
199         #
200         # fire in packet matching the ACL src,dst. If it's forwarded
201         # then the ABF was successful, since default routing will drop it
202         #
203         p_1 = (
204             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
205             / IP(src="1.1.1.1", dst="1.1.1.2")
206             / UDP(sport=1234, dport=1234)
207             / Raw(b"\xa5" * 100)
208         )
209         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
210
211         #
212         # Attach a 'better' priority policy to the same interface
213         #
214         abf_2 = VppAbfPolicy(
215             self, 11, acl_1, [VppRoutePath(self.pg2.remote_ip4, self.pg2.sw_if_index)]
216         )
217         abf_2.add_vpp_config()
218         attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
219         attach_2.add_vpp_config()
220
221         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
222
223         #
224         # Attach a policy with priority in the middle
225         #
226         abf_3 = VppAbfPolicy(
227             self, 12, acl_1, [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)]
228         )
229         abf_3.add_vpp_config()
230         attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
231         attach_3.add_vpp_config()
232
233         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
234
235         #
236         # remove the best priority
237         #
238         attach_2.remove_vpp_config()
239         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg3)
240
241         #
242         # Attach one of the same policies to Pg1
243         #
244         attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
245         attach_4.add_vpp_config()
246
247         p_2 = (
248             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
249             / IP(src="1.1.1.1", dst="1.1.1.2")
250             / UDP(sport=1234, dport=1234)
251             / Raw(b"\xa5" * 100)
252         )
253         self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
254
255         #
256         # detach the policy from PG1, now expect traffic to be dropped
257         #
258         attach_4.remove_vpp_config()
259
260         self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
261
262         #
263         # Swap to route via a next-hop in the non-default table
264         #
265         table_20 = VppIpTable(self, 20)
266         table_20.add_vpp_config()
267
268         self.pg4.set_table_ip4(table_20.table_id)
269         self.pg4.admin_up()
270         self.pg4.config_ip4()
271         self.pg4.resolve_arp()
272
273         abf_13 = VppAbfPolicy(
274             self,
275             13,
276             acl_1,
277             [
278                 VppRoutePath(
279                     self.pg4.remote_ip4, 0xFFFFFFFF, nh_table_id=table_20.table_id
280                 )
281             ],
282         )
283         abf_13.add_vpp_config()
284         attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
285         attach_5.add_vpp_config()
286
287         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg4)
288
289         self.pg4.unconfig_ip4()
290         self.pg4.set_table_ip4(0)
291
292     def test_abf6(self):
293         """IPv6 ACL Based Forwarding"""
294
295         #
296         # Simple test for matching IPv6 packets
297         #
298
299         #
300         # Rule 1
301         #
302         rule_1 = AclRule(
303             is_permit=1,
304             proto=17,
305             ports=1234,
306             src_prefix=IPv6Network("2001::2/128"),
307             dst_prefix=IPv6Network("2001::1/128"),
308         )
309         acl_1 = VppAcl(self, rules=[rule_1])
310         acl_1.add_vpp_config()
311
312         #
313         # ABF policy for ACL 1 - path via interface 1
314         #
315         abf_1 = VppAbfPolicy(self, 10, acl_1, [VppRoutePath("3001::1", 0xFFFFFFFF)])
316         abf_1.add_vpp_config()
317
318         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 45, is_ipv6=True)
319         attach_1.add_vpp_config()
320
321         #
322         # a packet matching the rule
323         #
324         p = (
325             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
326             / IPv6(src="2001::2", dst="2001::1")
327             / UDP(sport=1234, dport=1234)
328             / Raw(b"\xa5" * 100)
329         )
330
331         #
332         # packets are dropped because there is no route to the policy's
333         # next hop
334         #
335         self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
336
337         #
338         # add a route resolving the next-hop
339         #
340         route = VppIpRoute(
341             self,
342             "3001::1",
343             32,
344             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
345         )
346         route.add_vpp_config()
347
348         #
349         # now expect packets forwarded.
350         #
351         self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
352
353     def test_abf4_deny(self):
354         """IPv4 ACL Deny Rule"""
355         import ipaddress
356
357         #
358         # Rules 1/2
359         #
360         pg0_subnet = ipaddress.ip_network(self.pg0.local_ip4_prefix, strict=False)
361         pg2_subnet = ipaddress.ip_network(self.pg2.local_ip4_prefix, strict=False)
362         pg3_subnet = ipaddress.ip_network(self.pg3.local_ip4_prefix, strict=False)
363         rule_deny = AclRule(
364             is_permit=0,
365             proto=17,
366             ports=1234,
367             src_prefix=IPv4Network(pg0_subnet),
368             dst_prefix=IPv4Network(pg3_subnet),
369         )
370         rule_permit = AclRule(
371             is_permit=1,
372             proto=17,
373             ports=1234,
374             src_prefix=IPv4Network(pg0_subnet),
375             dst_prefix=IPv4Network(pg2_subnet),
376         )
377         acl_1 = VppAcl(self, rules=[rule_deny, rule_permit])
378         acl_1.add_vpp_config()
379
380         #
381         # ABF policy for ACL 1 - path via interface 1
382         #
383         abf_1 = VppAbfPolicy(
384             self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
385         )
386         abf_1.add_vpp_config()
387
388         #
389         # Attach the policy to input interface Pg0
390         #
391         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
392         attach_1.add_vpp_config()
393
394         #
395         # a packet matching the deny rule
396         #
397         p_deny = (
398             Ether(src=self.pg0.remote_mac, dst=self.pg3.remote_mac)
399             / IP(src=self.pg0.remote_ip4, dst=self.pg3.remote_ip4)
400             / UDP(sport=1234, dport=1234)
401             / Raw(b"\xa5" * 100)
402         )
403         self.send_and_expect(self.pg0, p_deny * NUM_PKTS, self.pg3)
404
405         #
406         # a packet matching the permit rule
407         #
408         p_permit = (
409             Ether(src=self.pg0.remote_mac, dst=self.pg2.remote_mac)
410             / IP(src=self.pg0.remote_ip4, dst=self.pg2.remote_ip4)
411             / UDP(sport=1234, dport=1234)
412             / Raw(b"\xa5" * 100)
413         )
414         self.send_and_expect(self.pg0, p_permit * NUM_PKTS, self.pg1)
415
416     def test_abf6_deny(self):
417         """IPv6 ACL Deny Rule"""
418         import ipaddress
419
420         #
421         # Rules 1/2
422         #
423         pg0_subnet = ipaddress.ip_network(self.pg0.local_ip6_prefix, strict=False)
424         pg2_subnet = ipaddress.ip_network(self.pg2.local_ip6_prefix, strict=False)
425         pg3_subnet = ipaddress.ip_network(self.pg3.local_ip6_prefix, strict=False)
426         rule_deny = AclRule(
427             is_permit=0,
428             proto=17,
429             ports=1234,
430             src_prefix=IPv6Network(pg0_subnet),
431             dst_prefix=IPv6Network(pg3_subnet),
432         )
433         rule_permit = AclRule(
434             is_permit=1,
435             proto=17,
436             ports=1234,
437             src_prefix=IPv6Network(pg0_subnet),
438             dst_prefix=IPv6Network(pg2_subnet),
439         )
440         acl_1 = VppAcl(self, rules=[rule_deny, rule_permit])
441         acl_1.add_vpp_config()
442
443         #
444         # ABF policy for ACL 1 - path via interface 1
445         #
446         abf_1 = VppAbfPolicy(
447             self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
448         )
449         abf_1.add_vpp_config()
450
451         #
452         # Attach the policy to input interface Pg0
453         #
454         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50, is_ipv6=1)
455         attach_1.add_vpp_config()
456
457         #
458         # a packet matching the deny rule
459         #
460         p_deny = (
461             Ether(src=self.pg0.remote_mac, dst=self.pg3.remote_mac)
462             / IPv6(src=self.pg0.remote_ip6, dst=self.pg3.remote_ip6)
463             / UDP(sport=1234, dport=1234)
464             / Raw(b"\xa5" * 100)
465         )
466         self.send_and_expect(self.pg0, p_deny * NUM_PKTS, self.pg3)
467
468         #
469         # a packet matching the permit rule
470         #
471         p_permit = (
472             Ether(src=self.pg0.remote_mac, dst=self.pg2.remote_mac)
473             / IPv6(src=self.pg0.remote_ip6, dst=self.pg2.remote_ip6)
474             / UDP(sport=1234, dport=1234)
475             / Raw(b"\xa5" * 100)
476         )
477         self.send_and_expect(self.pg0, p_permit * NUM_PKTS, self.pg1)
478
479
480 if __name__ == "__main__":
481     unittest.main(testRunner=VppTestRunner)