tests: replace pycodestyle with black
[vpp.git] / test / test_abf.py
1 #!/usr/bin/env python3
2
3 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
4 import unittest
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import (
9     VppIpRoute,
10     VppRoutePath,
11     VppMplsLabel,
12     VppIpTable,
13     FibPathProto,
14 )
15 from vpp_acl import AclRule, VppAcl
16
17 from scapy.packet import Raw
18 from scapy.layers.l2 import Ether
19 from scapy.layers.inet import IP, UDP
20 from scapy.layers.inet6 import IPv6
21 from ipaddress import IPv4Network, IPv6Network
22
23 from vpp_object import VppObject
24
25 NUM_PKTS = 67
26
27
28 def find_abf_policy(test, id):
29     policies = test.vapi.abf_policy_dump()
30     for p in policies:
31         if id == p.policy.policy_id:
32             return True
33     return False
34
35
36 def find_abf_itf_attach(test, id, sw_if_index):
37     attachs = test.vapi.abf_itf_attach_dump()
38     for a in attachs:
39         if id == a.attach.policy_id and sw_if_index == a.attach.sw_if_index:
40             return True
41     return False
42
43
44 class VppAbfPolicy(VppObject):
45     def __init__(self, test, policy_id, acl, paths):
46         self._test = test
47         self.policy_id = policy_id
48         self.acl = acl
49         self.paths = paths
50         self.encoded_paths = []
51         for path in self.paths:
52             self.encoded_paths.append(path.encode())
53
54     def add_vpp_config(self):
55         self._test.vapi.abf_policy_add_del(
56             1,
57             {
58                 "policy_id": self.policy_id,
59                 "acl_index": self.acl.acl_index,
60                 "n_paths": len(self.paths),
61                 "paths": self.encoded_paths,
62             },
63         )
64         self._test.registry.register(self, self._test.logger)
65
66     def remove_vpp_config(self):
67         self._test.vapi.abf_policy_add_del(
68             0,
69             {
70                 "policy_id": self.policy_id,
71                 "acl_index": self.acl.acl_index,
72                 "n_paths": len(self.paths),
73                 "paths": self.encoded_paths,
74             },
75         )
76
77     def query_vpp_config(self):
78         return find_abf_policy(self._test, self.policy_id)
79
80     def object_id(self):
81         return "abf-policy-%d" % self.policy_id
82
83
84 class VppAbfAttach(VppObject):
85     def __init__(self, test, policy_id, sw_if_index, priority, is_ipv6=0):
86         self._test = test
87         self.policy_id = policy_id
88         self.sw_if_index = sw_if_index
89         self.priority = priority
90         self.is_ipv6 = is_ipv6
91
92     def add_vpp_config(self):
93         self._test.vapi.abf_itf_attach_add_del(
94             1,
95             {
96                 "policy_id": self.policy_id,
97                 "sw_if_index": self.sw_if_index,
98                 "priority": self.priority,
99                 "is_ipv6": self.is_ipv6,
100             },
101         )
102         self._test.registry.register(self, self._test.logger)
103
104     def remove_vpp_config(self):
105         self._test.vapi.abf_itf_attach_add_del(
106             0,
107             {
108                 "policy_id": self.policy_id,
109                 "sw_if_index": self.sw_if_index,
110                 "priority": self.priority,
111                 "is_ipv6": self.is_ipv6,
112             },
113         )
114
115     def query_vpp_config(self):
116         return find_abf_itf_attach(self._test, self.policy_id, self.sw_if_index)
117
118     def object_id(self):
119         return "abf-attach-%d-%d" % (self.policy_id, self.sw_if_index)
120
121
122 class TestAbf(VppTestCase):
123     """ABF Test Case"""
124
125     @classmethod
126     def setUpClass(cls):
127         super(TestAbf, cls).setUpClass()
128
129     @classmethod
130     def tearDownClass(cls):
131         super(TestAbf, cls).tearDownClass()
132
133     def setUp(self):
134         super(TestAbf, self).setUp()
135
136         self.create_pg_interfaces(range(5))
137
138         for i in self.pg_interfaces[:4]:
139             i.admin_up()
140             i.config_ip4()
141             i.resolve_arp()
142             i.config_ip6()
143             i.resolve_ndp()
144
145     def tearDown(self):
146         for i in self.pg_interfaces:
147             i.unconfig_ip4()
148             i.unconfig_ip6()
149             i.admin_down()
150         super(TestAbf, self).tearDown()
151
152     def test_abf4(self):
153         """IPv4 ACL Based Forwarding"""
154
155         #
156         # We are not testing the various matching capabilities
157         # of ACLs, that's done elsewhere. Here ware are testing
158         # the application of ACLs to a forwarding path to achieve
159         # ABF
160         # So we construct just a few ACLs to ensure the ABF policies
161         # are correctly constructed and used. And a few path types
162         # to test the API path decoding.
163         #
164
165         #
166         # Rule 1
167         #
168         rule_1 = AclRule(
169             is_permit=1,
170             proto=17,
171             ports=1234,
172             src_prefix=IPv4Network("1.1.1.1/32"),
173             dst_prefix=IPv4Network("1.1.1.2/32"),
174         )
175         acl_1 = VppAcl(self, rules=[rule_1])
176         acl_1.add_vpp_config()
177
178         #
179         # ABF policy for ACL 1 - path via interface 1
180         #
181         abf_1 = VppAbfPolicy(
182             self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
183         )
184         abf_1.add_vpp_config()
185
186         #
187         # Attach the policy to input interface Pg0
188         #
189         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
190         attach_1.add_vpp_config()
191
192         #
193         # fire in packet matching the ACL src,dst. If it's forwarded
194         # then the ABF was successful, since default routing will drop it
195         #
196         p_1 = (
197             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
198             / IP(src="1.1.1.1", dst="1.1.1.2")
199             / UDP(sport=1234, dport=1234)
200             / Raw(b"\xa5" * 100)
201         )
202         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
203
204         #
205         # Attach a 'better' priority policy to the same interface
206         #
207         abf_2 = VppAbfPolicy(
208             self, 11, acl_1, [VppRoutePath(self.pg2.remote_ip4, self.pg2.sw_if_index)]
209         )
210         abf_2.add_vpp_config()
211         attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
212         attach_2.add_vpp_config()
213
214         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
215
216         #
217         # Attach a policy with priority in the middle
218         #
219         abf_3 = VppAbfPolicy(
220             self, 12, acl_1, [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)]
221         )
222         abf_3.add_vpp_config()
223         attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
224         attach_3.add_vpp_config()
225
226         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
227
228         #
229         # remove the best priority
230         #
231         attach_2.remove_vpp_config()
232         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg3)
233
234         #
235         # Attach one of the same policies to Pg1
236         #
237         attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
238         attach_4.add_vpp_config()
239
240         p_2 = (
241             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
242             / IP(src="1.1.1.1", dst="1.1.1.2")
243             / UDP(sport=1234, dport=1234)
244             / Raw(b"\xa5" * 100)
245         )
246         self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
247
248         #
249         # detach the policy from PG1, now expect traffic to be dropped
250         #
251         attach_4.remove_vpp_config()
252
253         self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
254
255         #
256         # Swap to route via a next-hop in the non-default table
257         #
258         table_20 = VppIpTable(self, 20)
259         table_20.add_vpp_config()
260
261         self.pg4.set_table_ip4(table_20.table_id)
262         self.pg4.admin_up()
263         self.pg4.config_ip4()
264         self.pg4.resolve_arp()
265
266         abf_13 = VppAbfPolicy(
267             self,
268             13,
269             acl_1,
270             [
271                 VppRoutePath(
272                     self.pg4.remote_ip4, 0xFFFFFFFF, nh_table_id=table_20.table_id
273                 )
274             ],
275         )
276         abf_13.add_vpp_config()
277         attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
278         attach_5.add_vpp_config()
279
280         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg4)
281
282         self.pg4.unconfig_ip4()
283         self.pg4.set_table_ip4(0)
284
285     def test_abf6(self):
286         """IPv6 ACL Based Forwarding"""
287
288         #
289         # Simple test for matching IPv6 packets
290         #
291
292         #
293         # Rule 1
294         #
295         rule_1 = AclRule(
296             is_permit=1,
297             proto=17,
298             ports=1234,
299             src_prefix=IPv6Network("2001::2/128"),
300             dst_prefix=IPv6Network("2001::1/128"),
301         )
302         acl_1 = VppAcl(self, rules=[rule_1])
303         acl_1.add_vpp_config()
304
305         #
306         # ABF policy for ACL 1 - path via interface 1
307         #
308         abf_1 = VppAbfPolicy(self, 10, acl_1, [VppRoutePath("3001::1", 0xFFFFFFFF)])
309         abf_1.add_vpp_config()
310
311         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 45, is_ipv6=True)
312         attach_1.add_vpp_config()
313
314         #
315         # a packet matching the rule
316         #
317         p = (
318             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
319             / IPv6(src="2001::2", dst="2001::1")
320             / UDP(sport=1234, dport=1234)
321             / Raw(b"\xa5" * 100)
322         )
323
324         #
325         # packets are dropped because there is no route to the policy's
326         # next hop
327         #
328         self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
329
330         #
331         # add a route resolving the next-hop
332         #
333         route = VppIpRoute(
334             self,
335             "3001::1",
336             32,
337             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
338         )
339         route.add_vpp_config()
340
341         #
342         # now expect packets forwarded.
343         #
344         self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
345
346
347 if __name__ == "__main__":
348     unittest.main(testRunner=VppTestRunner)