acl: revert acl: api cleanup
[vpp.git] / src / plugins / abf / test / test_abf.py
1 #!/usr/bin/env python3
2
3 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
4 import unittest
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel, \
9     VppIpTable, FibPathProto
10
11 from scapy.packet import Raw
12 from scapy.layers.l2 import Ether
13 from scapy.layers.inet import IP, UDP
14 from scapy.layers.inet6 import IPv6
15
16 from vpp_object import VppObject
17
18 NUM_PKTS = 67
19
20
21 def find_abf_policy(test, id):
22     policies = test.vapi.abf_policy_dump()
23     for p in policies:
24         if id == p.policy.policy_id:
25             return True
26     return False
27
28
29 def find_abf_itf_attach(test, id, sw_if_index):
30     attachs = test.vapi.abf_itf_attach_dump()
31     for a in attachs:
32         if id == a.attach.policy_id and \
33            sw_if_index == a.attach.sw_if_index:
34             return True
35     return False
36
37
38 class VppAbfPolicy(VppObject):
39
40     def __init__(self,
41                  test,
42                  policy_id,
43                  acl,
44                  paths):
45         self._test = test
46         self.policy_id = policy_id
47         self.acl = acl
48         self.paths = paths
49         self.encoded_paths = []
50         for path in self.paths:
51             self.encoded_paths.append(path.encode())
52
53     def add_vpp_config(self):
54         self._test.vapi.abf_policy_add_del(
55             1,
56             {'policy_id': self.policy_id,
57              'acl_index': self.acl.acl_index,
58              'n_paths': len(self.paths),
59              'paths': self.encoded_paths})
60         self._test.registry.register(self, self._test.logger)
61
62     def remove_vpp_config(self):
63         self._test.vapi.abf_policy_add_del(
64             0,
65             {'policy_id': self.policy_id,
66              'acl_index': self.acl.acl_index,
67              'n_paths': len(self.paths),
68              'paths': self.encoded_paths})
69
70     def query_vpp_config(self):
71         return find_abf_policy(self._test, self.policy_id)
72
73     def object_id(self):
74         return ("abf-policy-%d" % self.policy_id)
75
76
77 class VppAbfAttach(VppObject):
78
79     def __init__(self,
80                  test,
81                  policy_id,
82                  sw_if_index,
83                  priority,
84                  is_ipv6=0):
85         self._test = test
86         self.policy_id = policy_id
87         self.sw_if_index = sw_if_index
88         self.priority = priority
89         self.is_ipv6 = is_ipv6
90
91     def add_vpp_config(self):
92         self._test.vapi.abf_itf_attach_add_del(
93             1,
94             {'policy_id': self.policy_id,
95              'sw_if_index': self.sw_if_index,
96              'priority': self.priority,
97              'is_ipv6': self.is_ipv6})
98         self._test.registry.register(self, self._test.logger)
99
100     def remove_vpp_config(self):
101         self._test.vapi.abf_itf_attach_add_del(
102             0,
103             {'policy_id': self.policy_id,
104              'sw_if_index': self.sw_if_index,
105              'priority': self.priority,
106              'is_ipv6': self.is_ipv6})
107
108     def query_vpp_config(self):
109         return find_abf_itf_attach(self._test,
110                                    self.policy_id,
111                                    self.sw_if_index)
112
113     def object_id(self):
114         return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index))
115
116
117 class TestAbf(VppTestCase):
118     """ ABF Test Case """
119
120     @classmethod
121     def setUpClass(cls):
122         super(TestAbf, cls).setUpClass()
123
124     @classmethod
125     def tearDownClass(cls):
126         super(TestAbf, cls).tearDownClass()
127
128     def setUp(self):
129         super(TestAbf, self).setUp()
130
131         self.create_pg_interfaces(range(5))
132
133         for i in self.pg_interfaces[:4]:
134             i.admin_up()
135             i.config_ip4()
136             i.resolve_arp()
137             i.config_ip6()
138             i.resolve_ndp()
139
140     def tearDown(self):
141         for i in self.pg_interfaces:
142             i.unconfig_ip4()
143             i.unconfig_ip6()
144             i.admin_down()
145         super(TestAbf, self).tearDown()
146
147     def test_abf4(self):
148         """ IPv4 ACL Based Forwarding
149         """
150
151         #
152         # We are not testing the various matching capabilities
153         # of ACLs, that's done elsewhere. Here ware are testing
154         # the application of ACLs to a forwarding path to achieve
155         # ABF
156         # So we construct just a few ACLs to ensure the ABF policies
157         # are correctly constructed and used. And a few path types
158         # to test the API path decoding.
159         #
160
161         #
162         # Rule 1
163         #
164         rule_1 = ({'is_permit': 1,
165                    'is_ipv6': 0,
166                    'proto': 17,
167                    'srcport_or_icmptype_first': 1234,
168                    'srcport_or_icmptype_last': 1234,
169                    'src_ip_prefix_len': 32,
170                    'src_ip_addr': inet_pton(AF_INET, "1.1.1.1"),
171                    'dstport_or_icmpcode_first': 1234,
172                    'dstport_or_icmpcode_last': 1234,
173                    'dst_ip_prefix_len': 32,
174                    'dst_ip_addr': inet_pton(AF_INET, "1.1.1.2")})
175         acl_1 = self.vapi.acl_add_replace(acl_index=4294967295, r=[rule_1])
176
177         #
178         # ABF policy for ACL 1 - path via interface 1
179         #
180         abf_1 = VppAbfPolicy(self, 10, acl_1,
181                              [VppRoutePath(self.pg1.remote_ip4,
182                                            self.pg1.sw_if_index)])
183         abf_1.add_vpp_config()
184
185         #
186         # Attach the policy to input interface Pg0
187         #
188         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
189         attach_1.add_vpp_config()
190
191         #
192         # fire in packet matching the ACL src,dst. If it's forwarded
193         # then the ABF was successful, since default routing will drop it
194         #
195         p_1 = (Ether(src=self.pg0.remote_mac,
196                      dst=self.pg0.local_mac) /
197                IP(src="1.1.1.1", dst="1.1.1.2") /
198                UDP(sport=1234, dport=1234) /
199                Raw(b'\xa5' * 100))
200         self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg1)
201
202         #
203         # Attach a 'better' priority policy to the same interface
204         #
205         abf_2 = VppAbfPolicy(self, 11, acl_1,
206                              [VppRoutePath(self.pg2.remote_ip4,
207                                            self.pg2.sw_if_index)])
208         abf_2.add_vpp_config()
209         attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
210         attach_2.add_vpp_config()
211
212         self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
213
214         #
215         # Attach a policy with priority in the middle
216         #
217         abf_3 = VppAbfPolicy(self, 12, acl_1,
218                              [VppRoutePath(self.pg3.remote_ip4,
219                                            self.pg3.sw_if_index)])
220         abf_3.add_vpp_config()
221         attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
222         attach_3.add_vpp_config()
223
224         self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
225
226         #
227         # remove the best priority
228         #
229         attach_2.remove_vpp_config()
230         self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg3)
231
232         #
233         # Attach one of the same policies to Pg1
234         #
235         attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
236         attach_4.add_vpp_config()
237
238         p_2 = (Ether(src=self.pg1.remote_mac,
239                      dst=self.pg1.local_mac) /
240                IP(src="1.1.1.1", dst="1.1.1.2") /
241                UDP(sport=1234, dport=1234) /
242                Raw(b'\xa5' * 100))
243         self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
244
245         #
246         # detach the policy from PG1, now expect traffic to be dropped
247         #
248         attach_4.remove_vpp_config()
249
250         self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
251
252         #
253         # Swap to route via a next-hop in the non-default table
254         #
255         table_20 = VppIpTable(self, 20)
256         table_20.add_vpp_config()
257
258         self.pg4.set_table_ip4(table_20.table_id)
259         self.pg4.admin_up()
260         self.pg4.config_ip4()
261         self.pg4.resolve_arp()
262
263         abf_13 = VppAbfPolicy(self, 13, acl_1,
264                               [VppRoutePath(self.pg4.remote_ip4,
265                                             0xffffffff,
266                                             nh_table_id=table_20.table_id)])
267         abf_13.add_vpp_config()
268         attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
269         attach_5.add_vpp_config()
270
271         self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg4)
272
273         self.pg4.unconfig_ip4()
274         self.pg4.set_table_ip4(0)
275
276     def test_abf6(self):
277         """ IPv6 ACL Based Forwarding
278         """
279
280         #
281         # Simple test for matching IPv6 packets
282         #
283
284         #
285         # Rule 1
286         #
287         rule_1 = ({'is_permit': 1,
288                    'is_ipv6': 1,
289                    'proto': 17,
290                    'srcport_or_icmptype_first': 1234,
291                    'srcport_or_icmptype_last': 1234,
292                    'src_ip_prefix_len': 128,
293                    'src_ip_addr': inet_pton(AF_INET6, "2001::2"),
294                    'dstport_or_icmpcode_first': 1234,
295                    'dstport_or_icmpcode_last': 1234,
296                    'dst_ip_prefix_len': 128,
297                    'dst_ip_addr': inet_pton(AF_INET6, "2001::1")})
298         acl_1 = self.vapi.acl_add_replace(acl_index=4294967295,
299                                           r=[rule_1])
300
301         #
302         # ABF policy for ACL 1 - path via interface 1
303         #
304         abf_1 = VppAbfPolicy(self, 10, acl_1,
305                              [VppRoutePath("3001::1",
306                                            0xffffffff)])
307         abf_1.add_vpp_config()
308
309         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index,
310                                 45, is_ipv6=True)
311         attach_1.add_vpp_config()
312
313         #
314         # a packet matching the rule
315         #
316         p = (Ether(src=self.pg0.remote_mac,
317                    dst=self.pg0.local_mac) /
318              IPv6(src="2001::2", dst="2001::1") /
319              UDP(sport=1234, dport=1234) /
320              Raw(b'\xa5' * 100))
321
322         #
323         # packets are dropped because there is no route to the policy's
324         # next hop
325         #
326         self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
327
328         #
329         # add a route resolving the next-hop
330         #
331         route = VppIpRoute(self, "3001::1", 32,
332                            [VppRoutePath(self.pg1.remote_ip6,
333                                          self.pg1.sw_if_index)])
334         route.add_vpp_config()
335
336         #
337         # now expect packets forwarded.
338         #
339         self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
340
341
342 if __name__ == '__main__':
343     unittest.main(testRunner=VppTestRunner)