acl: API cleanup
[vpp.git] / src / plugins / abf / test / test_abf.py
1 #!/usr/bin/env python3
2
3 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
4 import unittest
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel, \
9     VppIpTable, FibPathProto
10 from vpp_acl import AclRule, VppAcl
11
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP, UDP
15 from scapy.layers.inet6 import IPv6
16 from ipaddress import IPv4Network, IPv6Network
17
18 from vpp_object import VppObject
19
20 NUM_PKTS = 67
21
22
23 def find_abf_policy(test, id):
24     policies = test.vapi.abf_policy_dump()
25     for p in policies:
26         if id == p.policy.policy_id:
27             return True
28     return False
29
30
31 def find_abf_itf_attach(test, id, sw_if_index):
32     attachs = test.vapi.abf_itf_attach_dump()
33     for a in attachs:
34         if id == a.attach.policy_id and \
35            sw_if_index == a.attach.sw_if_index:
36             return True
37     return False
38
39
40 class VppAbfPolicy(VppObject):
41
42     def __init__(self,
43                  test,
44                  policy_id,
45                  acl,
46                  paths):
47         self._test = test
48         self.policy_id = policy_id
49         self.acl = acl
50         self.paths = paths
51         self.encoded_paths = []
52         for path in self.paths:
53             self.encoded_paths.append(path.encode())
54
55     def add_vpp_config(self):
56         self._test.vapi.abf_policy_add_del(
57             1,
58             {'policy_id': self.policy_id,
59              'acl_index': self.acl.acl_index,
60              'n_paths': len(self.paths),
61              'paths': self.encoded_paths})
62         self._test.registry.register(self, self._test.logger)
63
64     def remove_vpp_config(self):
65         self._test.vapi.abf_policy_add_del(
66             0,
67             {'policy_id': self.policy_id,
68              'acl_index': self.acl.acl_index,
69              'n_paths': len(self.paths),
70              'paths': self.encoded_paths})
71
72     def query_vpp_config(self):
73         return find_abf_policy(self._test, self.policy_id)
74
75     def object_id(self):
76         return ("abf-policy-%d" % self.policy_id)
77
78
79 class VppAbfAttach(VppObject):
80
81     def __init__(self,
82                  test,
83                  policy_id,
84                  sw_if_index,
85                  priority,
86                  is_ipv6=0):
87         self._test = test
88         self.policy_id = policy_id
89         self.sw_if_index = sw_if_index
90         self.priority = priority
91         self.is_ipv6 = is_ipv6
92
93     def add_vpp_config(self):
94         self._test.vapi.abf_itf_attach_add_del(
95             1,
96             {'policy_id': self.policy_id,
97              'sw_if_index': self.sw_if_index,
98              'priority': self.priority,
99              'is_ipv6': self.is_ipv6})
100         self._test.registry.register(self, self._test.logger)
101
102     def remove_vpp_config(self):
103         self._test.vapi.abf_itf_attach_add_del(
104             0,
105             {'policy_id': self.policy_id,
106              'sw_if_index': self.sw_if_index,
107              'priority': self.priority,
108              'is_ipv6': self.is_ipv6})
109
110     def query_vpp_config(self):
111         return find_abf_itf_attach(self._test,
112                                    self.policy_id,
113                                    self.sw_if_index)
114
115     def object_id(self):
116         return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index))
117
118
119 class TestAbf(VppTestCase):
120     """ ABF Test Case """
121
122     @classmethod
123     def setUpClass(cls):
124         super(TestAbf, cls).setUpClass()
125
126     @classmethod
127     def tearDownClass(cls):
128         super(TestAbf, cls).tearDownClass()
129
130     def setUp(self):
131         super(TestAbf, self).setUp()
132
133         self.create_pg_interfaces(range(5))
134
135         for i in self.pg_interfaces[:4]:
136             i.admin_up()
137             i.config_ip4()
138             i.resolve_arp()
139             i.config_ip6()
140             i.resolve_ndp()
141
142     def tearDown(self):
143         for i in self.pg_interfaces:
144             i.unconfig_ip4()
145             i.unconfig_ip6()
146             i.admin_down()
147         super(TestAbf, self).tearDown()
148
149     def test_abf4(self):
150         """ IPv4 ACL Based Forwarding
151         """
152
153         #
154         # We are not testing the various matching capabilities
155         # of ACLs, that's done elsewhere. Here ware are testing
156         # the application of ACLs to a forwarding path to achieve
157         # ABF
158         # So we construct just a few ACLs to ensure the ABF policies
159         # are correctly constructed and used. And a few path types
160         # to test the API path decoding.
161         #
162
163         #
164         # Rule 1
165         #
166         rule_1 = AclRule(is_permit=1, proto=17, ports=1234,
167                          src_prefix=IPv4Network("1.1.1.1/32"),
168                          dst_prefix=IPv4Network("1.1.1.2/32"))
169         acl_1 = VppAcl(self, rules=[rule_1])
170         acl_1.add_vpp_config()
171
172         #
173         # ABF policy for ACL 1 - path via interface 1
174         #
175         abf_1 = VppAbfPolicy(self, 10, acl_1,
176                              [VppRoutePath(self.pg1.remote_ip4,
177                                            self.pg1.sw_if_index)])
178         abf_1.add_vpp_config()
179
180         #
181         # Attach the policy to input interface Pg0
182         #
183         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
184         attach_1.add_vpp_config()
185
186         #
187         # fire in packet matching the ACL src,dst. If it's forwarded
188         # then the ABF was successful, since default routing will drop it
189         #
190         p_1 = (Ether(src=self.pg0.remote_mac,
191                      dst=self.pg0.local_mac) /
192                IP(src="1.1.1.1", dst="1.1.1.2") /
193                UDP(sport=1234, dport=1234) /
194                Raw(b'\xa5' * 100))
195         self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg1)
196
197         #
198         # Attach a 'better' priority policy to the same interface
199         #
200         abf_2 = VppAbfPolicy(self, 11, acl_1,
201                              [VppRoutePath(self.pg2.remote_ip4,
202                                            self.pg2.sw_if_index)])
203         abf_2.add_vpp_config()
204         attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
205         attach_2.add_vpp_config()
206
207         self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
208
209         #
210         # Attach a policy with priority in the middle
211         #
212         abf_3 = VppAbfPolicy(self, 12, acl_1,
213                              [VppRoutePath(self.pg3.remote_ip4,
214                                            self.pg3.sw_if_index)])
215         abf_3.add_vpp_config()
216         attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
217         attach_3.add_vpp_config()
218
219         self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
220
221         #
222         # remove the best priority
223         #
224         attach_2.remove_vpp_config()
225         self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg3)
226
227         #
228         # Attach one of the same policies to Pg1
229         #
230         attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
231         attach_4.add_vpp_config()
232
233         p_2 = (Ether(src=self.pg1.remote_mac,
234                      dst=self.pg1.local_mac) /
235                IP(src="1.1.1.1", dst="1.1.1.2") /
236                UDP(sport=1234, dport=1234) /
237                Raw(b'\xa5' * 100))
238         self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
239
240         #
241         # detach the policy from PG1, now expect traffic to be dropped
242         #
243         attach_4.remove_vpp_config()
244
245         self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
246
247         #
248         # Swap to route via a next-hop in the non-default table
249         #
250         table_20 = VppIpTable(self, 20)
251         table_20.add_vpp_config()
252
253         self.pg4.set_table_ip4(table_20.table_id)
254         self.pg4.admin_up()
255         self.pg4.config_ip4()
256         self.pg4.resolve_arp()
257
258         abf_13 = VppAbfPolicy(self, 13, acl_1,
259                               [VppRoutePath(self.pg4.remote_ip4,
260                                             0xffffffff,
261                                             nh_table_id=table_20.table_id)])
262         abf_13.add_vpp_config()
263         attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
264         attach_5.add_vpp_config()
265
266         self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg4)
267
268         self.pg4.unconfig_ip4()
269         self.pg4.set_table_ip4(0)
270
271     def test_abf6(self):
272         """ IPv6 ACL Based Forwarding
273         """
274
275         #
276         # Simple test for matching IPv6 packets
277         #
278
279         #
280         # Rule 1
281         #
282         rule_1 = AclRule(is_permit=1, proto=17, ports=1234,
283                          src_prefix=IPv6Network("2001::2/128"),
284                          dst_prefix=IPv6Network("2001::1/128"))
285         acl_1 = VppAcl(self, rules=[rule_1])
286         acl_1.add_vpp_config()
287
288         #
289         # ABF policy for ACL 1 - path via interface 1
290         #
291         abf_1 = VppAbfPolicy(self, 10, acl_1,
292                              [VppRoutePath("3001::1",
293                                            0xffffffff)])
294         abf_1.add_vpp_config()
295
296         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index,
297                                 45, is_ipv6=True)
298         attach_1.add_vpp_config()
299
300         #
301         # a packet matching the rule
302         #
303         p = (Ether(src=self.pg0.remote_mac,
304                    dst=self.pg0.local_mac) /
305              IPv6(src="2001::2", dst="2001::1") /
306              UDP(sport=1234, dport=1234) /
307              Raw(b'\xa5' * 100))
308
309         #
310         # packets are dropped because there is no route to the policy's
311         # next hop
312         #
313         self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
314
315         #
316         # add a route resolving the next-hop
317         #
318         route = VppIpRoute(self, "3001::1", 32,
319                            [VppRoutePath(self.pg1.remote_ip6,
320                                          self.pg1.sw_if_index)])
321         route.add_vpp_config()
322
323         #
324         # now expect packets forwarded.
325         #
326         self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
327
328
329 if __name__ == '__main__':
330     unittest.main(testRunner=VppTestRunner)