tests: refactor. Replace literal constant w/ named constant.
[vpp.git] / test / test_abf.py
1 #!/usr/bin/env python
2
3 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
4 import unittest
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel, VppIpTable
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, UDP
13 from scapy.layers.inet6 import IPv6
14
15 from vpp_object import VppObject
16
17 NUM_PKTS = 67
18
19
20 def find_abf_policy(test, id):
21     policies = test.vapi.abf_policy_dump()
22     for p in policies:
23         if id == p.policy.policy_id:
24             return True
25     return False
26
27
28 def find_abf_itf_attach(test, id, sw_if_index):
29     attachs = test.vapi.abf_itf_attach_dump()
30     for a in attachs:
31         if id == a.attach.policy_id and \
32            sw_if_index == a.attach.sw_if_index:
33             return True
34     return False
35
36
37 class VppAbfPolicy(VppObject):
38
39     def __init__(self,
40                  test,
41                  policy_id,
42                  acl,
43                  paths):
44         self._test = test
45         self.policy_id = policy_id
46         self.acl = acl
47         self.paths = paths
48
49     def encode_paths(self):
50         br_paths = []
51         for p in self.paths:
52             lstack = []
53             for l in p.nh_labels:
54                 if type(l) == VppMplsLabel:
55                     lstack.append(l.encode())
56                 else:
57                     lstack.append({'label': l, 'ttl': 255})
58             n_labels = len(lstack)
59             while (len(lstack) < 16):
60                 lstack.append({})
61             br_paths.append({'next_hop': p.nh_addr,
62                              'weight': 1,
63                              'afi': p.proto,
64                              'sw_if_index': 0xffffffff,
65                              'preference': 0,
66                              'table_id': p.nh_table_id,
67                              'next_hop_id': p.next_hop_id,
68                              'is_udp_encap': p.is_udp_encap,
69                              'n_labels': n_labels,
70                              'label_stack': lstack})
71         return br_paths
72
73     def add_vpp_config(self):
74         self._test.vapi.abf_policy_add_del(
75             1,
76             {'policy_id': self.policy_id,
77              'acl_index': self.acl.acl_index,
78              'n_paths': len(self.paths),
79              'paths': self.encode_paths()})
80         self._test.registry.register(self, self._test.logger)
81
82     def remove_vpp_config(self):
83         self._test.vapi.abf_policy_add_del(
84             0,
85             {'policy_id': self.policy_id,
86              'acl_index': self.acl.acl_index,
87              'n_paths': len(self.paths),
88              'paths': self.encode_paths()})
89
90     def query_vpp_config(self):
91         return find_abf_policy(self._test, self.policy_id)
92
93     def object_id(self):
94         return ("abf-policy-%d" % self.policy_id)
95
96
97 class VppAbfAttach(VppObject):
98
99     def __init__(self,
100                  test,
101                  policy_id,
102                  sw_if_index,
103                  priority,
104                  is_ipv6=0):
105         self._test = test
106         self.policy_id = policy_id
107         self.sw_if_index = sw_if_index
108         self.priority = priority
109         self.is_ipv6 = is_ipv6
110
111     def add_vpp_config(self):
112         self._test.vapi.abf_itf_attach_add_del(
113             1,
114             {'policy_id': self.policy_id,
115              'sw_if_index': self.sw_if_index,
116              'priority': self.priority,
117              'is_ipv6': self.is_ipv6})
118         self._test.registry.register(self, self._test.logger)
119
120     def remove_vpp_config(self):
121         self._test.vapi.abf_itf_attach_add_del(
122             0,
123             {'policy_id': self.policy_id,
124              'sw_if_index': self.sw_if_index,
125              'priority': self.priority,
126              'is_ipv6': self.is_ipv6})
127
128     def query_vpp_config(self):
129         return find_abf_itf_attach(self._test,
130                                    self.policy_id,
131                                    self.sw_if_index)
132
133     def object_id(self):
134         return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index))
135
136
137 class TestAbf(VppTestCase):
138     """ ABF Test Case """
139
140     @classmethod
141     def setUpClass(cls):
142         super(TestAbf, cls).setUpClass()
143
144     @classmethod
145     def tearDownClass(cls):
146         super(TestAbf, cls).tearDownClass()
147
148     def setUp(self):
149         super(TestAbf, self).setUp()
150
151         self.create_pg_interfaces(range(5))
152
153         for i in self.pg_interfaces[:4]:
154             i.admin_up()
155             i.config_ip4()
156             i.resolve_arp()
157             i.config_ip6()
158             i.resolve_ndp()
159
160     def tearDown(self):
161         for i in self.pg_interfaces:
162             i.unconfig_ip4()
163             i.unconfig_ip6()
164             i.ip6_disable()
165             i.admin_down()
166         super(TestAbf, self).tearDown()
167
168     def test_abf4(self):
169         """ IPv4 ACL Based Forwarding
170         """
171
172         #
173         # We are not testing the various matching capabilities
174         # of ACLs, that's done elsewhere. Here ware are testing
175         # the application of ACLs to a forwarding path to achieve
176         # ABF
177         # So we construct just a few ACLs to ensure the ABF policies
178         # are correctly constructed and used. And a few path types
179         # to test the API path decoding.
180         #
181
182         #
183         # Rule 1
184         #
185         rule_1 = ({'is_permit': 1,
186                    'is_ipv6': 0,
187                    'proto': 17,
188                    'srcport_or_icmptype_first': 1234,
189                    'srcport_or_icmptype_last': 1234,
190                    'src_ip_prefix_len': 32,
191                    'src_ip_addr': inet_pton(AF_INET, "1.1.1.1"),
192                    'dstport_or_icmpcode_first': 1234,
193                    'dstport_or_icmpcode_last': 1234,
194                    'dst_ip_prefix_len': 32,
195                    'dst_ip_addr': inet_pton(AF_INET, "1.1.1.2")})
196         acl_1 = self.vapi.acl_add_replace(acl_index=4294967295, r=[rule_1])
197
198         #
199         # ABF policy for ACL 1 - path via interface 1
200         #
201         abf_1 = VppAbfPolicy(self, 10, acl_1,
202                              [VppRoutePath(self.pg1.remote_ip4,
203                                            self.pg1.sw_if_index)])
204         abf_1.add_vpp_config()
205
206         #
207         # Attach the policy to input interface Pg0
208         #
209         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
210         attach_1.add_vpp_config()
211
212         #
213         # fire in packet matching the ACL src,dst. If it's forwarded
214         # then the ABF was successful, since default routing will drop it
215         #
216         p_1 = (Ether(src=self.pg0.remote_mac,
217                      dst=self.pg0.local_mac) /
218                IP(src="1.1.1.1", dst="1.1.1.2") /
219                UDP(sport=1234, dport=1234) /
220                Raw('\xa5' * 100))
221         self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg1)
222
223         #
224         # Attach a 'better' priority policy to the same interface
225         #
226         abf_2 = VppAbfPolicy(self, 11, acl_1,
227                              [VppRoutePath(self.pg2.remote_ip4,
228                                            self.pg2.sw_if_index)])
229         abf_2.add_vpp_config()
230         attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
231         attach_2.add_vpp_config()
232
233         self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
234
235         #
236         # Attach a policy with priority in the middle
237         #
238         abf_3 = VppAbfPolicy(self, 12, acl_1,
239                              [VppRoutePath(self.pg3.remote_ip4,
240                                            self.pg3.sw_if_index)])
241         abf_3.add_vpp_config()
242         attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
243         attach_3.add_vpp_config()
244
245         self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
246
247         #
248         # remove the best priority
249         #
250         attach_2.remove_vpp_config()
251         self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg3)
252
253         #
254         # Attach one of the same policies to Pg1
255         #
256         attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
257         attach_4.add_vpp_config()
258
259         p_2 = (Ether(src=self.pg1.remote_mac,
260                      dst=self.pg1.local_mac) /
261                IP(src="1.1.1.1", dst="1.1.1.2") /
262                UDP(sport=1234, dport=1234) /
263                Raw('\xa5' * 100))
264         self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
265
266         #
267         # detach the policy from PG1, now expect traffic to be dropped
268         #
269         attach_4.remove_vpp_config()
270
271         self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
272
273         #
274         # Swap to route via a next-hop in the non-default table
275         #
276         table_20 = VppIpTable(self, 20)
277         table_20.add_vpp_config()
278
279         self.pg4.set_table_ip4(table_20.table_id)
280         self.pg4.admin_up()
281         self.pg4.config_ip4()
282         self.pg4.resolve_arp()
283
284         abf_13 = VppAbfPolicy(self, 13, acl_1,
285                               [VppRoutePath(self.pg4.remote_ip4,
286                                             0xffffffff,
287                                             nh_table_id=table_20.table_id)])
288         abf_13.add_vpp_config()
289         attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
290         attach_5.add_vpp_config()
291
292         self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg4)
293
294         self.pg4.unconfig_ip4()
295         self.pg4.set_table_ip4(0)
296
297     def test_abf6(self):
298         """ IPv6 ACL Based Forwarding
299         """
300
301         #
302         # Simple test for matching IPv6 packets
303         #
304
305         #
306         # Rule 1
307         #
308         rule_1 = ({'is_permit': 1,
309                    'is_ipv6': 1,
310                    'proto': 17,
311                    'srcport_or_icmptype_first': 1234,
312                    'srcport_or_icmptype_last': 1234,
313                    'src_ip_prefix_len': 128,
314                    'src_ip_addr': inet_pton(AF_INET6, "2001::2"),
315                    'dstport_or_icmpcode_first': 1234,
316                    'dstport_or_icmpcode_last': 1234,
317                    'dst_ip_prefix_len': 128,
318                    'dst_ip_addr': inet_pton(AF_INET6, "2001::1")})
319         acl_1 = self.vapi.acl_add_replace(acl_index=4294967295,
320                                           r=[rule_1])
321
322         #
323         # ABF policy for ACL 1 - path via interface 1
324         #
325         abf_1 = VppAbfPolicy(self, 10, acl_1,
326                              [VppRoutePath("3001::1",
327                                            0xffffffff,
328                                            proto=DpoProto.DPO_PROTO_IP6)])
329         abf_1.add_vpp_config()
330
331         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index,
332                                 45, is_ipv6=True)
333         attach_1.add_vpp_config()
334
335         #
336         # a packet matching the rule
337         #
338         p = (Ether(src=self.pg0.remote_mac,
339                    dst=self.pg0.local_mac) /
340              IPv6(src="2001::2", dst="2001::1") /
341              UDP(sport=1234, dport=1234) /
342              Raw('\xa5' * 100))
343
344         #
345         # packets are dropped because there is no route to the policy's
346         # next hop
347         #
348         self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
349
350         #
351         # add a route resolving the next-hop
352         #
353         route = VppIpRoute(self, "3001::1", 32,
354                            [VppRoutePath(self.pg1.remote_ip6,
355                                          self.pg1.sw_if_index,
356                                          proto=DpoProto.DPO_PROTO_IP6)],
357                            is_ip6=1)
358         route.add_vpp_config()
359
360         #
361         # now expect packets forwarded.
362         #
363         self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
364
365
366 if __name__ == '__main__':
367     unittest.main(testRunner=VppTestRunner)