nat: improve nat44-ed outside address distribution
[vpp.git] / test / test_abf.py
1 #!/usr/bin/env python3
2
3 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
4 import unittest
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import (
9     VppIpRoute,
10     VppRoutePath,
11     VppMplsLabel,
12     VppIpTable,
13     FibPathProto,
14 )
15 from vpp_acl import AclRule, VppAcl
16
17 from scapy.packet import Raw
18 from scapy.layers.l2 import Ether
19 from scapy.layers.inet import IP, UDP
20 from scapy.layers.inet6 import IPv6
21 from ipaddress import IPv4Network, IPv6Network
22
23 from vpp_object import VppObject
24
25 NUM_PKTS = 67
26
27
28 def find_abf_policy(test, id):
29     policies = test.vapi.abf_policy_dump()
30     for p in policies:
31         if id == p.policy.policy_id:
32             return True
33     return False
34
35
36 def find_abf_itf_attach(test, id, sw_if_index):
37     attachs = test.vapi.abf_itf_attach_dump()
38     for a in attachs:
39         if id == a.attach.policy_id and sw_if_index == a.attach.sw_if_index:
40             return True
41     return False
42
43
44 class VppAbfPolicy(VppObject):
45     def __init__(self, test, policy_id, acl, paths):
46         self._test = test
47         self.policy_id = policy_id
48         self.acl = acl
49         self.paths = paths
50         self.encoded_paths = []
51         for path in self.paths:
52             self.encoded_paths.append(path.encode())
53
54     def add_vpp_config(self):
55         self._test.vapi.abf_policy_add_del(
56             1,
57             {
58                 "policy_id": self.policy_id,
59                 "acl_index": self.acl.acl_index,
60                 "n_paths": len(self.paths),
61                 "paths": self.encoded_paths,
62             },
63         )
64         self._test.registry.register(self, self._test.logger)
65
66     def remove_vpp_config(self):
67         self._test.vapi.abf_policy_add_del(
68             0,
69             {
70                 "policy_id": self.policy_id,
71                 "acl_index": self.acl.acl_index,
72                 "n_paths": len(self.paths),
73                 "paths": self.encoded_paths,
74             },
75         )
76
77     def query_vpp_config(self):
78         return find_abf_policy(self._test, self.policy_id)
79
80     def object_id(self):
81         return "abf-policy-%d" % self.policy_id
82
83
84 class VppAbfAttach(VppObject):
85     def __init__(self, test, policy_id, sw_if_index, priority, is_ipv6=0):
86         self._test = test
87         self.policy_id = policy_id
88         self.sw_if_index = sw_if_index
89         self.priority = priority
90         self.is_ipv6 = is_ipv6
91
92     def add_vpp_config(self):
93         self._test.vapi.abf_itf_attach_add_del(
94             1,
95             {
96                 "policy_id": self.policy_id,
97                 "sw_if_index": self.sw_if_index,
98                 "priority": self.priority,
99                 "is_ipv6": self.is_ipv6,
100             },
101         )
102         self._test.registry.register(self, self._test.logger)
103
104     def remove_vpp_config(self):
105         self._test.vapi.abf_itf_attach_add_del(
106             0,
107             {
108                 "policy_id": self.policy_id,
109                 "sw_if_index": self.sw_if_index,
110                 "priority": self.priority,
111                 "is_ipv6": self.is_ipv6,
112             },
113         )
114
115     def query_vpp_config(self):
116         return find_abf_itf_attach(self._test, self.policy_id, self.sw_if_index)
117
118     def object_id(self):
119         return "abf-attach-%d-%d" % (self.policy_id, self.sw_if_index)
120
121
122 class TestAbf(VppTestCase):
123     """ABF Test Case"""
124
125     @classmethod
126     def setUpClass(cls):
127         super(TestAbf, cls).setUpClass()
128
129     @classmethod
130     def tearDownClass(cls):
131         super(TestAbf, cls).tearDownClass()
132
133     def setUp(self):
134         super(TestAbf, self).setUp()
135
136         self.create_pg_interfaces(range(5))
137
138         for i in self.pg_interfaces[:4]:
139             i.admin_up()
140             i.config_ip4()
141             i.resolve_arp()
142             i.config_ip6()
143             i.resolve_ndp()
144
145     def tearDown(self):
146         for i in self.pg_interfaces:
147             i.unconfig_ip4()
148             i.unconfig_ip6()
149             i.admin_down()
150         super(TestAbf, self).tearDown()
151
152     def test_abf4(self):
153         """IPv4 ACL Based Forwarding"""
154
155         #
156         # We are not testing the various matching capabilities
157         # of ACLs, that's done elsewhere. Here ware are testing
158         # the application of ACLs to a forwarding path to achieve
159         # ABF
160         # So we construct just a few ACLs to ensure the ABF policies
161         # are correctly constructed and used. And a few path types
162         # to test the API path decoding.
163         #
164
165         #
166         # Rule 1
167         #
168         rule_1 = AclRule(
169             is_permit=1,
170             proto=17,
171             ports=1234,
172             src_prefix=IPv4Network("1.1.1.1/32"),
173             dst_prefix=IPv4Network("1.1.1.2/32"),
174         )
175         acl_1 = VppAcl(self, rules=[rule_1])
176         acl_1.add_vpp_config()
177
178         #
179         # ABF policy for ACL 1 - path via interface 1
180         #
181         abf_1 = VppAbfPolicy(
182             self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
183         )
184         abf_1.add_vpp_config()
185
186         #
187         # Attach the policy to input interface Pg0
188         #
189         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
190         attach_1.add_vpp_config()
191
192         #
193         # fire in packet matching the ACL src,dst. If it's forwarded
194         # then the ABF was successful, since default routing will drop it
195         #
196         p_1 = (
197             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
198             / IP(src="1.1.1.1", dst="1.1.1.2")
199             / UDP(sport=1234, dport=1234)
200             / Raw(b"\xa5" * 100)
201         )
202         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
203
204         #
205         # Attach a 'better' priority policy to the same interface
206         #
207         abf_2 = VppAbfPolicy(
208             self, 11, acl_1, [VppRoutePath(self.pg2.remote_ip4, self.pg2.sw_if_index)]
209         )
210         abf_2.add_vpp_config()
211         attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
212         attach_2.add_vpp_config()
213
214         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
215
216         #
217         # Attach a policy with priority in the middle
218         #
219         abf_3 = VppAbfPolicy(
220             self, 12, acl_1, [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)]
221         )
222         abf_3.add_vpp_config()
223         attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
224         attach_3.add_vpp_config()
225
226         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
227
228         #
229         # remove the best priority
230         #
231         attach_2.remove_vpp_config()
232         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg3)
233
234         #
235         # Attach one of the same policies to Pg1
236         #
237         attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
238         attach_4.add_vpp_config()
239
240         p_2 = (
241             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
242             / IP(src="1.1.1.1", dst="1.1.1.2")
243             / UDP(sport=1234, dport=1234)
244             / Raw(b"\xa5" * 100)
245         )
246         self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
247
248         #
249         # detach the policy from PG1, now expect traffic to be dropped
250         #
251         attach_4.remove_vpp_config()
252
253         self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
254
255         #
256         # Swap to route via a next-hop in the non-default table
257         #
258         table_20 = VppIpTable(self, 20)
259         table_20.add_vpp_config()
260
261         self.pg4.set_table_ip4(table_20.table_id)
262         self.pg4.admin_up()
263         self.pg4.config_ip4()
264         self.pg4.resolve_arp()
265
266         abf_13 = VppAbfPolicy(
267             self,
268             13,
269             acl_1,
270             [
271                 VppRoutePath(
272                     self.pg4.remote_ip4, 0xFFFFFFFF, nh_table_id=table_20.table_id
273                 )
274             ],
275         )
276         abf_13.add_vpp_config()
277         attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
278         attach_5.add_vpp_config()
279
280         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg4)
281
282         self.pg4.unconfig_ip4()
283         self.pg4.set_table_ip4(0)
284
285     def test_abf6(self):
286         """IPv6 ACL Based Forwarding"""
287
288         #
289         # Simple test for matching IPv6 packets
290         #
291
292         #
293         # Rule 1
294         #
295         rule_1 = AclRule(
296             is_permit=1,
297             proto=17,
298             ports=1234,
299             src_prefix=IPv6Network("2001::2/128"),
300             dst_prefix=IPv6Network("2001::1/128"),
301         )
302         acl_1 = VppAcl(self, rules=[rule_1])
303         acl_1.add_vpp_config()
304
305         #
306         # ABF policy for ACL 1 - path via interface 1
307         #
308         abf_1 = VppAbfPolicy(self, 10, acl_1, [VppRoutePath("3001::1", 0xFFFFFFFF)])
309         abf_1.add_vpp_config()
310
311         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 45, is_ipv6=True)
312         attach_1.add_vpp_config()
313
314         #
315         # a packet matching the rule
316         #
317         p = (
318             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
319             / IPv6(src="2001::2", dst="2001::1")
320             / UDP(sport=1234, dport=1234)
321             / Raw(b"\xa5" * 100)
322         )
323
324         #
325         # packets are dropped because there is no route to the policy's
326         # next hop
327         #
328         self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
329
330         #
331         # add a route resolving the next-hop
332         #
333         route = VppIpRoute(
334             self,
335             "3001::1",
336             32,
337             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
338         )
339         route.add_vpp_config()
340
341         #
342         # now expect packets forwarded.
343         #
344         self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
345
346     def test_abf4_deny(self):
347         """IPv4 ACL Deny Rule"""
348         import ipaddress
349
350         #
351         # Rules 1/2
352         #
353         pg0_subnet = ipaddress.ip_network(self.pg0.local_ip4_prefix, strict=False)
354         pg2_subnet = ipaddress.ip_network(self.pg2.local_ip4_prefix, strict=False)
355         pg3_subnet = ipaddress.ip_network(self.pg3.local_ip4_prefix, strict=False)
356         rule_deny = AclRule(
357             is_permit=0,
358             proto=17,
359             ports=1234,
360             src_prefix=IPv4Network(pg0_subnet),
361             dst_prefix=IPv4Network(pg3_subnet),
362         )
363         rule_permit = AclRule(
364             is_permit=1,
365             proto=17,
366             ports=1234,
367             src_prefix=IPv4Network(pg0_subnet),
368             dst_prefix=IPv4Network(pg2_subnet),
369         )
370         acl_1 = VppAcl(self, rules=[rule_deny, rule_permit])
371         acl_1.add_vpp_config()
372
373         #
374         # ABF policy for ACL 1 - path via interface 1
375         #
376         abf_1 = VppAbfPolicy(
377             self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
378         )
379         abf_1.add_vpp_config()
380
381         #
382         # Attach the policy to input interface Pg0
383         #
384         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
385         attach_1.add_vpp_config()
386
387         #
388         # a packet matching the deny rule
389         #
390         p_deny = (
391             Ether(src=self.pg0.remote_mac, dst=self.pg3.remote_mac)
392             / IP(src=self.pg0.remote_ip4, dst=self.pg3.remote_ip4)
393             / UDP(sport=1234, dport=1234)
394             / Raw(b"\xa5" * 100)
395         )
396         self.send_and_expect(self.pg0, p_deny * NUM_PKTS, self.pg3)
397
398         #
399         # a packet matching the permit rule
400         #
401         p_permit = (
402             Ether(src=self.pg0.remote_mac, dst=self.pg2.remote_mac)
403             / IP(src=self.pg0.remote_ip4, dst=self.pg2.remote_ip4)
404             / UDP(sport=1234, dport=1234)
405             / Raw(b"\xa5" * 100)
406         )
407         self.send_and_expect(self.pg0, p_permit * NUM_PKTS, self.pg1)
408
409     def test_abf6_deny(self):
410         """IPv6 ACL Deny Rule"""
411         import ipaddress
412
413         #
414         # Rules 1/2
415         #
416         pg0_subnet = ipaddress.ip_network(self.pg0.local_ip6_prefix, strict=False)
417         pg2_subnet = ipaddress.ip_network(self.pg2.local_ip6_prefix, strict=False)
418         pg3_subnet = ipaddress.ip_network(self.pg3.local_ip6_prefix, strict=False)
419         rule_deny = AclRule(
420             is_permit=0,
421             proto=17,
422             ports=1234,
423             src_prefix=IPv6Network(pg0_subnet),
424             dst_prefix=IPv6Network(pg3_subnet),
425         )
426         rule_permit = AclRule(
427             is_permit=1,
428             proto=17,
429             ports=1234,
430             src_prefix=IPv6Network(pg0_subnet),
431             dst_prefix=IPv6Network(pg2_subnet),
432         )
433         acl_1 = VppAcl(self, rules=[rule_deny, rule_permit])
434         acl_1.add_vpp_config()
435
436         #
437         # ABF policy for ACL 1 - path via interface 1
438         #
439         abf_1 = VppAbfPolicy(
440             self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
441         )
442         abf_1.add_vpp_config()
443
444         #
445         # Attach the policy to input interface Pg0
446         #
447         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50, is_ipv6=1)
448         attach_1.add_vpp_config()
449
450         #
451         # a packet matching the deny rule
452         #
453         p_deny = (
454             Ether(src=self.pg0.remote_mac, dst=self.pg3.remote_mac)
455             / IPv6(src=self.pg0.remote_ip6, dst=self.pg3.remote_ip6)
456             / UDP(sport=1234, dport=1234)
457             / Raw(b"\xa5" * 100)
458         )
459         self.send_and_expect(self.pg0, p_deny * NUM_PKTS, self.pg3)
460
461         #
462         # a packet matching the permit rule
463         #
464         p_permit = (
465             Ether(src=self.pg0.remote_mac, dst=self.pg2.remote_mac)
466             / IPv6(src=self.pg0.remote_ip6, dst=self.pg2.remote_ip6)
467             / UDP(sport=1234, dport=1234)
468             / Raw(b"\xa5" * 100)
469         )
470         self.send_and_expect(self.pg0, p_permit * NUM_PKTS, self.pg1)
471
472
473 if __name__ == "__main__":
474     unittest.main(testRunner=VppTestRunner)