ACL based forwarding
[vpp.git] / test / test_abf.py
1 #!/usr/bin/env python
2
3 from framework import VppTestCase, VppTestRunner
4 from vpp_udp_encap import *
5 from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable, DpoProto
6
7 from scapy.packet import Raw
8 from scapy.layers.l2 import Ether, ARP
9 from scapy.layers.inet import IP, UDP
10 from scapy.layers.inet6 import IPv6
11 from scapy.contrib.mpls import MPLS
12
13 from vpp_object import *
14 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
15
16
17 def find_abf_policy(test, id):
18     policies = test.vapi.abf_policy_dump()
19     for p in policies:
20         if id == p.policy.policy_id:
21             return True
22     return False
23
24
25 def find_abf_itf_attach(test, id, sw_if_index):
26     attachs = test.vapi.abf_itf_attach_dump()
27     for a in attachs:
28         if id == a.attach.policy_id and \
29            sw_if_index == a.attach.sw_if_index:
30             return True
31     return False
32
33
34 class VppAbfPolicy(VppObject):
35
36     def __init__(self,
37                  test,
38                  policy_id,
39                  acl,
40                  paths):
41         self._test = test
42         self.policy_id = policy_id
43         self.acl = acl
44         self.paths = paths
45
46     def encode_paths(self):
47         br_paths = []
48         for p in self.paths:
49             lstack = []
50             for l in p.nh_labels:
51                 if type(l) == VppMplsLabel:
52                     lstack.append(l.encode())
53                 else:
54                     lstack.append({'label': l, 'ttl': 255})
55             n_labels = len(lstack)
56             while (len(lstack) < 16):
57                 lstack.append({})
58             br_paths.append({'next_hop': p.nh_addr,
59                              'weight': 1,
60                              'afi': p.proto,
61                              'sw_if_index': 0xffffffff,
62                              'preference': 0,
63                              'table_id': p.nh_table_id,
64                              'next_hop_id': p.next_hop_id,
65                              'is_udp_encap': p.is_udp_encap,
66                              'n_labels': n_labels,
67                              'label_stack': lstack})
68         return br_paths
69
70     def add_vpp_config(self):
71         self._test.vapi.abf_policy_add_del(
72             1,
73             {'policy_id': self.policy_id,
74              'acl_index': self.acl.acl_index,
75              'n_paths': len(self.paths),
76              'paths': self.encode_paths()})
77         self._test.registry.register(self, self._test.logger)
78
79     def remove_vpp_config(self):
80         self._test.vapi.abf_policy_add_del(
81             0,
82             {'policy_id': self.policy_id,
83              'acl_index': self.acl.acl_index,
84              'n_paths': len(self.paths),
85              'paths': self.encode_paths()})
86
87     def query_vpp_config(self):
88         return find_abf_policy(self._test, self.policy_id)
89
90     def __str__(self):
91         return self.object_id()
92
93     def object_id(self):
94         return ("abf-policy-%d" % self.policy_id)
95
96
97 class VppAbfAttach(VppObject):
98
99     def __init__(self,
100                  test,
101                  policy_id,
102                  sw_if_index,
103                  priority,
104                  is_ipv6=0):
105         self._test = test
106         self.policy_id = policy_id
107         self.sw_if_index = sw_if_index
108         self.priority = priority
109         self.is_ipv6 = is_ipv6
110
111     def add_vpp_config(self):
112         self._test.vapi.abf_itf_attach_add_del(
113             1,
114             {'policy_id': self.policy_id,
115              'sw_if_index': self.sw_if_index,
116              'priority': self.priority,
117              'is_ipv6': self.is_ipv6})
118         self._test.registry.register(self, self._test.logger)
119
120     def remove_vpp_config(self):
121         self._test.vapi.abf_itf_attach_add_del(
122             0,
123             {'policy_id': self.policy_id,
124              'sw_if_index': self.sw_if_index,
125              'priority': self.priority,
126              'is_ipv6': self.is_ipv6})
127
128     def query_vpp_config(self):
129         return find_abf_itf_attach(self._test,
130                                    self.policy_id,
131                                    self.sw_if_index)
132
133     def __str__(self):
134         return self.object_id()
135
136     def object_id(self):
137         return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index))
138
139
140 class TestAbf(VppTestCase):
141     """ ABF Test Case """
142
143     def setUp(self):
144         super(TestAbf, self).setUp()
145
146         self.create_pg_interfaces(range(4))
147
148         for i in self.pg_interfaces:
149             i.admin_up()
150             i.config_ip4()
151             i.resolve_arp()
152             i.config_ip6()
153             i.resolve_ndp()
154
155     def tearDown(self):
156         for i in self.pg_interfaces:
157             i.unconfig_ip4()
158             i.unconfig_ip6()
159             i.ip6_disable()
160             i.admin_down()
161         super(TestAbf, self).tearDown()
162
163     def test_abf4(self):
164         """ IPv4 ACL Based Forwarding
165         """
166
167         #
168         # We are not testing the various matching capabilities
169         # of ACLs, that's done elsewhere. Here ware are testing
170         # the application of ACLs to a forwarding path to achieve
171         # ABF
172         # So we construct just a few ACLs to ensure the ABF policies
173         # are correclty constructed and used. And a few path types
174         # to test the API path decoding.
175         #
176
177         #
178         # Rule 1
179         #
180         rule_1 = ({'is_permit': 1,
181                    'is_ipv6': 0,
182                    'proto': 17,
183                    'srcport_or_icmptype_first': 1234,
184                    'srcport_or_icmptype_last': 1234,
185                    'src_ip_prefix_len': 32,
186                    'src_ip_addr': inet_pton(AF_INET, "1.1.1.1"),
187                    'dstport_or_icmpcode_first': 1234,
188                    'dstport_or_icmpcode_last': 1234,
189                    'dst_ip_prefix_len': 32,
190                    'dst_ip_addr': inet_pton(AF_INET, "1.1.1.2")})
191         acl_1 = self.vapi.acl_add_replace(acl_index=4294967295, r=[rule_1])
192
193         #
194         # ABF policy for ACL 1 - path via interface 1
195         #
196         abf_1 = VppAbfPolicy(self, 10, acl_1,
197                              [VppRoutePath(self.pg1.remote_ip4,
198                                            self.pg1.sw_if_index)])
199         abf_1.add_vpp_config()
200
201         #
202         # Attach the policy to input interface Pg0
203         #
204         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
205         attach_1.add_vpp_config()
206
207         #
208         # fire in packet matching the ACL src,dst. If it's forwarded
209         # then the ABF was successful, since default routing will drop it
210         #
211         p_1 = (Ether(src=self.pg0.remote_mac,
212                      dst=self.pg0.local_mac) /
213                IP(src="1.1.1.1", dst="1.1.1.2") /
214                UDP(sport=1234, dport=1234) /
215                Raw('\xa5' * 100))
216         self.send_and_expect(self.pg0, p_1*65, self.pg1)
217
218         #
219         # Attach a 'better' priority policy to the same interface
220         #
221         abf_2 = VppAbfPolicy(self, 11, acl_1,
222                              [VppRoutePath(self.pg2.remote_ip4,
223                                            self.pg2.sw_if_index)])
224         abf_2.add_vpp_config()
225         attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
226         attach_2.add_vpp_config()
227
228         self.send_and_expect(self.pg0, p_1*65, self.pg2)
229
230         #
231         # Attach a policy with priority in the middle
232         #
233         abf_3 = VppAbfPolicy(self, 12, acl_1,
234                              [VppRoutePath(self.pg3.remote_ip4,
235                                            self.pg3.sw_if_index)])
236         abf_3.add_vpp_config()
237         attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
238         attach_3.add_vpp_config()
239
240         self.send_and_expect(self.pg0, p_1*65, self.pg2)
241
242         #
243         # remove the best priority
244         #
245         attach_2.remove_vpp_config()
246         self.send_and_expect(self.pg0, p_1*65, self.pg3)
247
248         #
249         # Attach one of the same policies to Pg1
250         #
251         attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
252         attach_4.add_vpp_config()
253
254         p_2 = (Ether(src=self.pg1.remote_mac,
255                      dst=self.pg1.local_mac) /
256                IP(src="1.1.1.1", dst="1.1.1.2") /
257                UDP(sport=1234, dport=1234) /
258                Raw('\xa5' * 100))
259         self.send_and_expect(self.pg1, p_2 * 65, self.pg3)
260
261         #
262         # detach the policy from PG1, now expect traffic to be dropped
263         #
264         attach_4.remove_vpp_config()
265
266         self.send_and_assert_no_replies(self.pg1, p_2 * 65, "Detached")
267
268     def test_abf6(self):
269         """ IPv6 ACL Based Forwarding
270         """
271
272         #
273         # Simple test for matching IPv6 packets
274         #
275
276         #
277         # Rule 1
278         #
279         rule_1 = ({'is_permit': 1,
280                    'is_ipv6': 1,
281                    'proto': 17,
282                    'srcport_or_icmptype_first': 1234,
283                    'srcport_or_icmptype_last': 1234,
284                    'src_ip_prefix_len': 128,
285                    'src_ip_addr': inet_pton(AF_INET6, "2001::2"),
286                    'dstport_or_icmpcode_first': 1234,
287                    'dstport_or_icmpcode_last': 1234,
288                    'dst_ip_prefix_len': 128,
289                    'dst_ip_addr': inet_pton(AF_INET6, "2001::1")})
290         acl_1 = self.vapi.acl_add_replace(acl_index=4294967295,
291                                           r=[rule_1])
292
293         #
294         # ABF policy for ACL 1 - path via interface 1
295         #
296         abf_1 = VppAbfPolicy(self, 10, acl_1,
297                              [VppRoutePath("3001::1",
298                                            0xffffffff,
299                                            proto=DpoProto.DPO_PROTO_IP6)])
300         abf_1.add_vpp_config()
301
302         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index,
303                                 45, is_ipv6=True)
304         attach_1.add_vpp_config()
305
306         #
307         # a packet matching the rule
308         #
309         p = (Ether(src=self.pg0.remote_mac,
310                    dst=self.pg0.local_mac) /
311              IPv6(src="2001::2", dst="2001::1") /
312              UDP(sport=1234, dport=1234) /
313              Raw('\xa5' * 100))
314
315         #
316         # packets are dropped because there is no route to the policy's
317         # next hop
318         #
319         self.send_and_assert_no_replies(self.pg1, p * 65, "no route")
320
321         #
322         # add a route resolving the next-hop
323         #
324         route = VppIpRoute(self, "3001::1", 32,
325                            [VppRoutePath(self.pg1.remote_ip6,
326                                          self.pg1.sw_if_index,
327                                          proto=DpoProto.DPO_PROTO_IP6)],
328                            is_ip6=1)
329         route.add_vpp_config()
330
331         #
332         # now expect packets forwarded.
333         #
334         self.send_and_expect(self.pg0, p * 65, self.pg1)
335
336
337 if __name__ == '__main__':
338     unittest.main(testRunner=VppTestRunner)