ethernet: check destination mac for L3 in ethernet-input node
[vpp.git] / test / test_abf.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from config import config
6 from framework import VppTestCase
7 from asfframework import VppTestRunner
8 from vpp_ip_route import (
9     VppIpRoute,
10     VppRoutePath,
11     VppIpTable,
12 )
13 from vpp_acl import AclRule, VppAcl
14
15 from scapy.packet import Raw
16 from scapy.layers.l2 import Ether
17 from scapy.layers.inet import IP, UDP
18 from scapy.layers.inet6 import IPv6
19 from ipaddress import IPv4Network, IPv6Network
20
21 from vpp_object import VppObject
22
23 NUM_PKTS = 67
24
25
26 def find_abf_policy(test, id):
27     policies = test.vapi.abf_policy_dump()
28     for p in policies:
29         if id == p.policy.policy_id:
30             return True
31     return False
32
33
34 def find_abf_itf_attach(test, id, sw_if_index):
35     attachs = test.vapi.abf_itf_attach_dump()
36     for a in attachs:
37         if id == a.attach.policy_id and sw_if_index == a.attach.sw_if_index:
38             return True
39     return False
40
41
42 class VppAbfPolicy(VppObject):
43     def __init__(self, test, policy_id, acl, paths):
44         self._test = test
45         self.policy_id = policy_id
46         self.acl = acl
47         self.paths = paths
48         self.encoded_paths = []
49         for path in self.paths:
50             self.encoded_paths.append(path.encode())
51
52     def add_vpp_config(self):
53         self._test.vapi.abf_policy_add_del(
54             1,
55             {
56                 "policy_id": self.policy_id,
57                 "acl_index": self.acl.acl_index,
58                 "n_paths": len(self.paths),
59                 "paths": self.encoded_paths,
60             },
61         )
62         self._test.registry.register(self, self._test.logger)
63
64     def remove_vpp_config(self):
65         self._test.vapi.abf_policy_add_del(
66             0,
67             {
68                 "policy_id": self.policy_id,
69                 "acl_index": self.acl.acl_index,
70                 "n_paths": len(self.paths),
71                 "paths": self.encoded_paths,
72             },
73         )
74
75     def query_vpp_config(self):
76         return find_abf_policy(self._test, self.policy_id)
77
78     def object_id(self):
79         return "abf-policy-%d" % self.policy_id
80
81
82 class VppAbfAttach(VppObject):
83     def __init__(self, test, policy_id, sw_if_index, priority, is_ipv6=0):
84         self._test = test
85         self.policy_id = policy_id
86         self.sw_if_index = sw_if_index
87         self.priority = priority
88         self.is_ipv6 = is_ipv6
89
90     def add_vpp_config(self):
91         self._test.vapi.abf_itf_attach_add_del(
92             1,
93             {
94                 "policy_id": self.policy_id,
95                 "sw_if_index": self.sw_if_index,
96                 "priority": self.priority,
97                 "is_ipv6": self.is_ipv6,
98             },
99         )
100         self._test.registry.register(self, self._test.logger)
101
102     def remove_vpp_config(self):
103         self._test.vapi.abf_itf_attach_add_del(
104             0,
105             {
106                 "policy_id": self.policy_id,
107                 "sw_if_index": self.sw_if_index,
108                 "priority": self.priority,
109                 "is_ipv6": self.is_ipv6,
110             },
111         )
112
113     def query_vpp_config(self):
114         return find_abf_itf_attach(self._test, self.policy_id, self.sw_if_index)
115
116     def object_id(self):
117         return "abf-attach-%d-%d" % (self.policy_id, self.sw_if_index)
118
119
120 @unittest.skipIf(
121     "acl" in config.excluded_plugins,
122     "Exclude ABF plugin tests due to absence of ACL plugin",
123 )
124 @unittest.skipIf("abf" in config.excluded_plugins, "Exclude ABF plugin tests")
125 class TestAbf(VppTestCase):
126     """ABF Test Case"""
127
128     @classmethod
129     def setUpClass(cls):
130         super(TestAbf, cls).setUpClass()
131
132     @classmethod
133     def tearDownClass(cls):
134         super(TestAbf, cls).tearDownClass()
135
136     def setUp(self):
137         super(TestAbf, self).setUp()
138
139         self.create_pg_interfaces(range(5))
140
141         for i in self.pg_interfaces[:4]:
142             i.admin_up()
143             i.config_ip4()
144             i.resolve_arp()
145             i.config_ip6()
146             i.resolve_ndp()
147
148     def tearDown(self):
149         for i in self.pg_interfaces:
150             i.unconfig_ip4()
151             i.unconfig_ip6()
152             i.admin_down()
153         super(TestAbf, self).tearDown()
154
155     def test_abf4(self):
156         """IPv4 ACL Based Forwarding"""
157
158         #
159         # We are not testing the various matching capabilities
160         # of ACLs, that's done elsewhere. Here ware are testing
161         # the application of ACLs to a forwarding path to achieve
162         # ABF
163         # So we construct just a few ACLs to ensure the ABF policies
164         # are correctly constructed and used. And a few path types
165         # to test the API path decoding.
166         #
167
168         #
169         # Rule 1
170         #
171         rule_1 = AclRule(
172             is_permit=1,
173             proto=17,
174             ports=1234,
175             src_prefix=IPv4Network("1.1.1.1/32"),
176             dst_prefix=IPv4Network("1.1.1.2/32"),
177         )
178         acl_1 = VppAcl(self, rules=[rule_1])
179         acl_1.add_vpp_config()
180
181         #
182         # ABF policy for ACL 1 - path via interface 1
183         #
184         abf_1 = VppAbfPolicy(
185             self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
186         )
187         abf_1.add_vpp_config()
188
189         #
190         # Attach the policy to input interface Pg0
191         #
192         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
193         attach_1.add_vpp_config()
194
195         #
196         # fire in packet matching the ACL src,dst. If it's forwarded
197         # then the ABF was successful, since default routing will drop it
198         #
199         p_1 = (
200             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
201             / IP(src="1.1.1.1", dst="1.1.1.2")
202             / UDP(sport=1234, dport=1234)
203             / Raw(b"\xa5" * 100)
204         )
205         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
206
207         #
208         # Attach a 'better' priority policy to the same interface
209         #
210         abf_2 = VppAbfPolicy(
211             self, 11, acl_1, [VppRoutePath(self.pg2.remote_ip4, self.pg2.sw_if_index)]
212         )
213         abf_2.add_vpp_config()
214         attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
215         attach_2.add_vpp_config()
216
217         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
218
219         #
220         # Attach a policy with priority in the middle
221         #
222         abf_3 = VppAbfPolicy(
223             self, 12, acl_1, [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)]
224         )
225         abf_3.add_vpp_config()
226         attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
227         attach_3.add_vpp_config()
228
229         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
230
231         #
232         # remove the best priority
233         #
234         attach_2.remove_vpp_config()
235         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg3)
236
237         #
238         # Attach one of the same policies to Pg1
239         #
240         attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
241         attach_4.add_vpp_config()
242
243         p_2 = (
244             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
245             / IP(src="1.1.1.1", dst="1.1.1.2")
246             / UDP(sport=1234, dport=1234)
247             / Raw(b"\xa5" * 100)
248         )
249         self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
250
251         #
252         # detach the policy from PG1, now expect traffic to be dropped
253         #
254         attach_4.remove_vpp_config()
255
256         self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
257
258         #
259         # Swap to route via a next-hop in the non-default table
260         #
261         table_20 = VppIpTable(self, 20)
262         table_20.add_vpp_config()
263
264         self.pg4.set_table_ip4(table_20.table_id)
265         self.pg4.admin_up()
266         self.pg4.config_ip4()
267         self.pg4.resolve_arp()
268
269         abf_13 = VppAbfPolicy(
270             self,
271             13,
272             acl_1,
273             [
274                 VppRoutePath(
275                     self.pg4.remote_ip4, 0xFFFFFFFF, nh_table_id=table_20.table_id
276                 )
277             ],
278         )
279         abf_13.add_vpp_config()
280         attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
281         attach_5.add_vpp_config()
282
283         self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg4)
284
285         self.pg4.unconfig_ip4()
286         self.pg4.set_table_ip4(0)
287
288     def test_abf6(self):
289         """IPv6 ACL Based Forwarding"""
290
291         #
292         # Simple test for matching IPv6 packets
293         #
294
295         #
296         # Rule 1
297         #
298         rule_1 = AclRule(
299             is_permit=1,
300             proto=17,
301             ports=1234,
302             src_prefix=IPv6Network("2001::2/128"),
303             dst_prefix=IPv6Network("2001::1/128"),
304         )
305         acl_1 = VppAcl(self, rules=[rule_1])
306         acl_1.add_vpp_config()
307
308         #
309         # ABF policy for ACL 1 - path via interface 1
310         #
311         abf_1 = VppAbfPolicy(self, 10, acl_1, [VppRoutePath("3001::1", 0xFFFFFFFF)])
312         abf_1.add_vpp_config()
313
314         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 45, is_ipv6=True)
315         attach_1.add_vpp_config()
316
317         #
318         # a packet matching the rule
319         #
320         p = (
321             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
322             / IPv6(src="2001::2", dst="2001::1")
323             / UDP(sport=1234, dport=1234)
324             / Raw(b"\xa5" * 100)
325         )
326
327         #
328         # packets are dropped because there is no route to the policy's
329         # next hop
330         #
331         self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
332
333         #
334         # add a route resolving the next-hop
335         #
336         route = VppIpRoute(
337             self,
338             "3001::1",
339             32,
340             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
341         )
342         route.add_vpp_config()
343
344         #
345         # now expect packets forwarded.
346         #
347         self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
348
349     def test_abf4_deny(self):
350         """IPv4 ACL Deny Rule"""
351         import ipaddress
352
353         #
354         # Rules 1/2
355         #
356         pg0_subnet = ipaddress.ip_network(self.pg0.local_ip4_prefix, strict=False)
357         pg2_subnet = ipaddress.ip_network(self.pg2.local_ip4_prefix, strict=False)
358         pg3_subnet = ipaddress.ip_network(self.pg3.local_ip4_prefix, strict=False)
359         rule_deny = AclRule(
360             is_permit=0,
361             proto=17,
362             ports=1234,
363             src_prefix=IPv4Network(pg0_subnet),
364             dst_prefix=IPv4Network(pg3_subnet),
365         )
366         rule_permit = AclRule(
367             is_permit=1,
368             proto=17,
369             ports=1234,
370             src_prefix=IPv4Network(pg0_subnet),
371             dst_prefix=IPv4Network(pg2_subnet),
372         )
373         acl_1 = VppAcl(self, rules=[rule_deny, rule_permit])
374         acl_1.add_vpp_config()
375
376         #
377         # ABF policy for ACL 1 - path via interface 1
378         #
379         abf_1 = VppAbfPolicy(
380             self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
381         )
382         abf_1.add_vpp_config()
383
384         #
385         # Attach the policy to input interface Pg0
386         #
387         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
388         attach_1.add_vpp_config()
389
390         #
391         # a packet matching the deny rule
392         #
393         p_deny = (
394             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
395             / IP(src=self.pg0.remote_ip4, dst=self.pg3.remote_ip4)
396             / UDP(sport=1234, dport=1234)
397             / Raw(b"\xa5" * 100)
398         )
399         self.send_and_expect(self.pg0, p_deny * NUM_PKTS, self.pg3)
400
401         #
402         # a packet matching the permit rule
403         #
404         p_permit = (
405             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
406             / IP(src=self.pg0.remote_ip4, dst=self.pg2.remote_ip4)
407             / UDP(sport=1234, dport=1234)
408             / Raw(b"\xa5" * 100)
409         )
410         self.send_and_expect(self.pg0, p_permit * NUM_PKTS, self.pg1)
411
412     def test_abf6_deny(self):
413         """IPv6 ACL Deny Rule"""
414         import ipaddress
415
416         #
417         # Rules 1/2
418         #
419         pg0_subnet = ipaddress.ip_network(self.pg0.local_ip6_prefix, strict=False)
420         pg2_subnet = ipaddress.ip_network(self.pg2.local_ip6_prefix, strict=False)
421         pg3_subnet = ipaddress.ip_network(self.pg3.local_ip6_prefix, strict=False)
422         rule_deny = AclRule(
423             is_permit=0,
424             proto=17,
425             ports=1234,
426             src_prefix=IPv6Network(pg0_subnet),
427             dst_prefix=IPv6Network(pg3_subnet),
428         )
429         rule_permit = AclRule(
430             is_permit=1,
431             proto=17,
432             ports=1234,
433             src_prefix=IPv6Network(pg0_subnet),
434             dst_prefix=IPv6Network(pg2_subnet),
435         )
436         acl_1 = VppAcl(self, rules=[rule_deny, rule_permit])
437         acl_1.add_vpp_config()
438
439         #
440         # ABF policy for ACL 1 - path via interface 1
441         #
442         abf_1 = VppAbfPolicy(
443             self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
444         )
445         abf_1.add_vpp_config()
446
447         #
448         # Attach the policy to input interface Pg0
449         #
450         attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50, is_ipv6=1)
451         attach_1.add_vpp_config()
452
453         #
454         # a packet matching the deny rule
455         #
456         p_deny = (
457             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
458             / IPv6(src=self.pg0.remote_ip6, dst=self.pg3.remote_ip6)
459             / UDP(sport=1234, dport=1234)
460             / Raw(b"\xa5" * 100)
461         )
462         self.send_and_expect(self.pg0, p_deny * NUM_PKTS, self.pg3)
463
464         #
465         # a packet matching the permit rule
466         #
467         p_permit = (
468             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
469             / IPv6(src=self.pg0.remote_ip6, dst=self.pg2.remote_ip6)
470             / UDP(sport=1234, dport=1234)
471             / Raw(b"\xa5" * 100)
472         )
473         self.send_and_expect(self.pg0, p_permit * NUM_PKTS, self.pg1)
474
475
476 if __name__ == "__main__":
477     unittest.main(testRunner=VppTestRunner)