Tests Cleanup: Fix missing calls to setUpClass/tearDownClass.
[vpp.git] / test / test_abf.py
1 #!/usr/bin/env python
2
3 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
4 import unittest
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel, VppIpTable
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, UDP
13 from scapy.layers.inet6 import IPv6
14
15 from vpp_object import VppObject
16
17
18 def find_abf_policy(test, id):
19     policies = test.vapi.abf_policy_dump()
20     for p in policies:
21         if id == p.policy.policy_id:
22             return True
23     return False
24
25
26 def find_abf_itf_attach(test, id, sw_if_index):
27     attachs = test.vapi.abf_itf_attach_dump()
28     for a in attachs:
29         if id == a.attach.policy_id and \
30            sw_if_index == a.attach.sw_if_index:
31             return True
32     return False
33
34
35 class VppAbfPolicy(VppObject):
36
37     def __init__(self,
38                  test,
39                  policy_id,
40                  acl,
41                  paths):
42         self._test = test
43         self.policy_id = policy_id
44         self.acl = acl
45         self.paths = paths
46
47     def encode_paths(self):
48         br_paths = []
49         for p in self.paths:
50             lstack = []
51             for l in p.nh_labels:
52                 if type(l) == VppMplsLabel:
53                     lstack.append(l.encode())
54                 else:
55                     lstack.append({'label': l, 'ttl': 255})
56             n_labels = len(lstack)
57             while (len(lstack) < 16):
58                 lstack.append({})
59             br_paths.append({'next_hop': p.nh_addr,
60                              'weight': 1,
61                              'afi': p.proto,
62                              'sw_if_index': 0xffffffff,
63                              'preference': 0,
64                              'table_id': p.nh_table_id,
65                              'next_hop_id': p.next_hop_id,
66                              'is_udp_encap': p.is_udp_encap,
67                              'n_labels': n_labels,
68                              'label_stack': lstack})
69         return br_paths
70
71     def add_vpp_config(self):
72         self._test.vapi.abf_policy_add_del(
73             1,
74             {'policy_id': self.policy_id,
75              'acl_index': self.acl.acl_index,
76              'n_paths': len(self.paths),
77              'paths': self.encode_paths()})
78         self._test.registry.register(self, self._test.logger)
79
80     def remove_vpp_config(self):
81         self._test.vapi.abf_policy_add_del(
82             0,
83             {'policy_id': self.policy_id,
84              'acl_index': self.acl.acl_index,
85              'n_paths': len(self.paths),
86              'paths': self.encode_paths()})
87
88     def query_vpp_config(self):
89         return find_abf_policy(self._test, self.policy_id)
90
91     def object_id(self):
92         return ("abf-policy-%d" % self.policy_id)
93
94
95 class VppAbfAttach(VppObject):
96
97     def __init__(self,
98                  test,
99                  policy_id,
100                  sw_if_index,
101                  priority,
102                  is_ipv6=0):
103         self._test = test
104         self.policy_id = policy_id
105         self.sw_if_index = sw_if_index
106         self.priority = priority
107         self.is_ipv6 = is_ipv6
108
109     def add_vpp_config(self):
110         self._test.vapi.abf_itf_attach_add_del(
111             1,
112             {'policy_id': self.policy_id,
113              'sw_if_index': self.sw_if_index,
114              'priority': self.priority,
115              'is_ipv6': self.is_ipv6})
116         self._test.registry.register(self, self._test.logger)
117
118     def remove_vpp_config(self):
119         self._test.vapi.abf_itf_attach_add_del(
120             0,
121             {'policy_id': self.policy_id,
122              'sw_if_index': self.sw_if_index,
123              'priority': self.priority,
124              'is_ipv6': self.is_ipv6})
125
126     def query_vpp_config(self):
127         return find_abf_itf_attach(self._test,
128                                    self.policy_id,
129                                    self.sw_if_index)
130
131     def object_id(self):
132         return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index))
133
134
135 class TestAbf(VppTestCase):
136     """ ABF Test Case """
137
138     @classmethod
139     def setUpClass(cls):
140         super(TestAbf, cls).setUpClass()
141
142     @classmethod
143     def tearDownClass(cls):
144         super(TestAbf, cls).tearDownClass()
145
146     def setUp(self):
147         super(TestAbf, self).setUp()
148
149         self.create_pg_interfaces(range(5))
150
151         for i in self.pg_interfaces[:4]:
152             i.admin_up()
153             i.config_ip4()
154             i.resolve_arp()
155             i.config_ip6()
156             i.resolve_ndp()
157
158     def tearDown(self):
159         for i in self.pg_interfaces:
160             i.unconfig_ip4()
161             i.unconfig_ip6()
162             i.ip6_disable()
163             i.admin_down()
164         super(TestAbf, self).tearDown()
165
166     def test_abf4(self):
167         """ IPv4 ACL Based Forwarding
168         """
169
170         #
171         # We are not testing the various matching capabilities
172         # of ACLs, that's done elsewhere. Here ware are testing
173         # the application of ACLs to a forwarding path to achieve
174         # ABF
175         # So we construct just a few ACLs to ensure the ABF policies
176         # are correctly constructed and used. And a few path types
177         # to test the API path decoding.
178         #
179
180         #
181         # Rule 1
182         #
183         rule_1 = ({'is_permit': 1,
184                    'is_ipv6': 0,
185                    'proto': 17,
186                    'srcport_or_icmptype_first': 1234,
187                    'srcport_or_icmptype_last': 1234,
188                    'src_ip_prefix_len': 32,
189                    'src_ip_addr': inet_pton(AF_INET, "1.1.1.1"),
190                    'dstport_or_icmpcode_first': 1234,
191                    'dstport_or_icmpcode_last': 1234,
192                    'dst_ip_prefix_len': 32,
193                    'dst_ip_addr': inet_pton(AF_INET, "1.1.1.2")})
194         acl_1 = self.vapi.acl_add_replace(acl_index=4294967295, r=[rule_1])
195
196         #
197         # ABF policy for ACL 1 - path via interface 1
198         #
199         abf_1 = VppAbfPolicy(self, 10, acl_1,
200                              [VppRoutePath(self.pg1.remote_ip4,
201                                            self.pg1.sw_if_index)])
202         abf_1.add_vpp_config()
203
204         #
205         # Attach the policy to input interface Pg0
206         #
207         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
208         attach_1.add_vpp_config()
209
210         #
211         # fire in packet matching the ACL src,dst. If it's forwarded
212         # then the ABF was successful, since default routing will drop it
213         #
214         p_1 = (Ether(src=self.pg0.remote_mac,
215                      dst=self.pg0.local_mac) /
216                IP(src="1.1.1.1", dst="1.1.1.2") /
217                UDP(sport=1234, dport=1234) /
218                Raw('\xa5' * 100))
219         self.send_and_expect(self.pg0, p_1*65, self.pg1)
220
221         #
222         # Attach a 'better' priority policy to the same interface
223         #
224         abf_2 = VppAbfPolicy(self, 11, acl_1,
225                              [VppRoutePath(self.pg2.remote_ip4,
226                                            self.pg2.sw_if_index)])
227         abf_2.add_vpp_config()
228         attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
229         attach_2.add_vpp_config()
230
231         self.send_and_expect(self.pg0, p_1*65, self.pg2)
232
233         #
234         # Attach a policy with priority in the middle
235         #
236         abf_3 = VppAbfPolicy(self, 12, acl_1,
237                              [VppRoutePath(self.pg3.remote_ip4,
238                                            self.pg3.sw_if_index)])
239         abf_3.add_vpp_config()
240         attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
241         attach_3.add_vpp_config()
242
243         self.send_and_expect(self.pg0, p_1*65, self.pg2)
244
245         #
246         # remove the best priority
247         #
248         attach_2.remove_vpp_config()
249         self.send_and_expect(self.pg0, p_1*65, self.pg3)
250
251         #
252         # Attach one of the same policies to Pg1
253         #
254         attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
255         attach_4.add_vpp_config()
256
257         p_2 = (Ether(src=self.pg1.remote_mac,
258                      dst=self.pg1.local_mac) /
259                IP(src="1.1.1.1", dst="1.1.1.2") /
260                UDP(sport=1234, dport=1234) /
261                Raw('\xa5' * 100))
262         self.send_and_expect(self.pg1, p_2 * 65, self.pg3)
263
264         #
265         # detach the policy from PG1, now expect traffic to be dropped
266         #
267         attach_4.remove_vpp_config()
268
269         self.send_and_assert_no_replies(self.pg1, p_2 * 65, "Detached")
270
271         #
272         # Swap to route via a next-hop in the non-default table
273         #
274         table_20 = VppIpTable(self, 20)
275         table_20.add_vpp_config()
276
277         self.pg4.set_table_ip4(table_20.table_id)
278         self.pg4.admin_up()
279         self.pg4.config_ip4()
280         self.pg4.resolve_arp()
281
282         abf_13 = VppAbfPolicy(self, 13, acl_1,
283                               [VppRoutePath(self.pg4.remote_ip4,
284                                             0xffffffff,
285                                             nh_table_id=table_20.table_id)])
286         abf_13.add_vpp_config()
287         attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
288         attach_5.add_vpp_config()
289
290         self.send_and_expect(self.pg0, p_1*65, self.pg4)
291
292         self.pg4.unconfig_ip4()
293         self.pg4.set_table_ip4(0)
294
295     def test_abf6(self):
296         """ IPv6 ACL Based Forwarding
297         """
298
299         #
300         # Simple test for matching IPv6 packets
301         #
302
303         #
304         # Rule 1
305         #
306         rule_1 = ({'is_permit': 1,
307                    'is_ipv6': 1,
308                    'proto': 17,
309                    'srcport_or_icmptype_first': 1234,
310                    'srcport_or_icmptype_last': 1234,
311                    'src_ip_prefix_len': 128,
312                    'src_ip_addr': inet_pton(AF_INET6, "2001::2"),
313                    'dstport_or_icmpcode_first': 1234,
314                    'dstport_or_icmpcode_last': 1234,
315                    'dst_ip_prefix_len': 128,
316                    'dst_ip_addr': inet_pton(AF_INET6, "2001::1")})
317         acl_1 = self.vapi.acl_add_replace(acl_index=4294967295,
318                                           r=[rule_1])
319
320         #
321         # ABF policy for ACL 1 - path via interface 1
322         #
323         abf_1 = VppAbfPolicy(self, 10, acl_1,
324                              [VppRoutePath("3001::1",
325                                            0xffffffff,
326                                            proto=DpoProto.DPO_PROTO_IP6)])
327         abf_1.add_vpp_config()
328
329         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index,
330                                 45, is_ipv6=True)
331         attach_1.add_vpp_config()
332
333         #
334         # a packet matching the rule
335         #
336         p = (Ether(src=self.pg0.remote_mac,
337                    dst=self.pg0.local_mac) /
338              IPv6(src="2001::2", dst="2001::1") /
339              UDP(sport=1234, dport=1234) /
340              Raw('\xa5' * 100))
341
342         #
343         # packets are dropped because there is no route to the policy's
344         # next hop
345         #
346         self.send_and_assert_no_replies(self.pg1, p * 65, "no route")
347
348         #
349         # add a route resolving the next-hop
350         #
351         route = VppIpRoute(self, "3001::1", 32,
352                            [VppRoutePath(self.pg1.remote_ip6,
353                                          self.pg1.sw_if_index,
354                                          proto=DpoProto.DPO_PROTO_IP6)],
355                            is_ip6=1)
356         route.add_vpp_config()
357
358         #
359         # now expect packets forwarded.
360         #
361         self.send_and_expect(self.pg0, p * 65, self.pg1)
362
363
364 if __name__ == '__main__':
365     unittest.main(testRunner=VppTestRunner)