3 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import (
15 from vpp_acl import AclRule, VppAcl
17 from scapy.packet import Raw
18 from scapy.layers.l2 import Ether
19 from scapy.layers.inet import IP, UDP
20 from scapy.layers.inet6 import IPv6
21 from ipaddress import IPv4Network, IPv6Network
23 from vpp_object import VppObject
28 def find_abf_policy(test, id):
29 policies = test.vapi.abf_policy_dump()
31 if id == p.policy.policy_id:
36 def find_abf_itf_attach(test, id, sw_if_index):
37 attachs = test.vapi.abf_itf_attach_dump()
39 if id == a.attach.policy_id and sw_if_index == a.attach.sw_if_index:
44 class VppAbfPolicy(VppObject):
45 def __init__(self, test, policy_id, acl, paths):
47 self.policy_id = policy_id
50 self.encoded_paths = []
51 for path in self.paths:
52 self.encoded_paths.append(path.encode())
54 def add_vpp_config(self):
55 self._test.vapi.abf_policy_add_del(
58 "policy_id": self.policy_id,
59 "acl_index": self.acl.acl_index,
60 "n_paths": len(self.paths),
61 "paths": self.encoded_paths,
64 self._test.registry.register(self, self._test.logger)
66 def remove_vpp_config(self):
67 self._test.vapi.abf_policy_add_del(
70 "policy_id": self.policy_id,
71 "acl_index": self.acl.acl_index,
72 "n_paths": len(self.paths),
73 "paths": self.encoded_paths,
77 def query_vpp_config(self):
78 return find_abf_policy(self._test, self.policy_id)
81 return "abf-policy-%d" % self.policy_id
84 class VppAbfAttach(VppObject):
85 def __init__(self, test, policy_id, sw_if_index, priority, is_ipv6=0):
87 self.policy_id = policy_id
88 self.sw_if_index = sw_if_index
89 self.priority = priority
90 self.is_ipv6 = is_ipv6
92 def add_vpp_config(self):
93 self._test.vapi.abf_itf_attach_add_del(
96 "policy_id": self.policy_id,
97 "sw_if_index": self.sw_if_index,
98 "priority": self.priority,
99 "is_ipv6": self.is_ipv6,
102 self._test.registry.register(self, self._test.logger)
104 def remove_vpp_config(self):
105 self._test.vapi.abf_itf_attach_add_del(
108 "policy_id": self.policy_id,
109 "sw_if_index": self.sw_if_index,
110 "priority": self.priority,
111 "is_ipv6": self.is_ipv6,
115 def query_vpp_config(self):
116 return find_abf_itf_attach(self._test, self.policy_id, self.sw_if_index)
119 return "abf-attach-%d-%d" % (self.policy_id, self.sw_if_index)
122 class TestAbf(VppTestCase):
127 super(TestAbf, cls).setUpClass()
130 def tearDownClass(cls):
131 super(TestAbf, cls).tearDownClass()
134 super(TestAbf, self).setUp()
136 self.create_pg_interfaces(range(5))
138 for i in self.pg_interfaces[:4]:
146 for i in self.pg_interfaces:
150 super(TestAbf, self).tearDown()
153 """IPv4 ACL Based Forwarding"""
156 # We are not testing the various matching capabilities
157 # of ACLs, that's done elsewhere. Here ware are testing
158 # the application of ACLs to a forwarding path to achieve
160 # So we construct just a few ACLs to ensure the ABF policies
161 # are correctly constructed and used. And a few path types
162 # to test the API path decoding.
172 src_prefix=IPv4Network("1.1.1.1/32"),
173 dst_prefix=IPv4Network("1.1.1.2/32"),
175 acl_1 = VppAcl(self, rules=[rule_1])
176 acl_1.add_vpp_config()
179 # ABF policy for ACL 1 - path via interface 1
181 abf_1 = VppAbfPolicy(
182 self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
184 abf_1.add_vpp_config()
187 # Attach the policy to input interface Pg0
189 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
190 attach_1.add_vpp_config()
193 # fire in packet matching the ACL src,dst. If it's forwarded
194 # then the ABF was successful, since default routing will drop it
197 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
198 / IP(src="1.1.1.1", dst="1.1.1.2")
199 / UDP(sport=1234, dport=1234)
202 self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
205 # Attach a 'better' priority policy to the same interface
207 abf_2 = VppAbfPolicy(
208 self, 11, acl_1, [VppRoutePath(self.pg2.remote_ip4, self.pg2.sw_if_index)]
210 abf_2.add_vpp_config()
211 attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
212 attach_2.add_vpp_config()
214 self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
217 # Attach a policy with priority in the middle
219 abf_3 = VppAbfPolicy(
220 self, 12, acl_1, [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)]
222 abf_3.add_vpp_config()
223 attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
224 attach_3.add_vpp_config()
226 self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
229 # remove the best priority
231 attach_2.remove_vpp_config()
232 self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg3)
235 # Attach one of the same policies to Pg1
237 attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
238 attach_4.add_vpp_config()
241 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
242 / IP(src="1.1.1.1", dst="1.1.1.2")
243 / UDP(sport=1234, dport=1234)
246 self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
249 # detach the policy from PG1, now expect traffic to be dropped
251 attach_4.remove_vpp_config()
253 self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
256 # Swap to route via a next-hop in the non-default table
258 table_20 = VppIpTable(self, 20)
259 table_20.add_vpp_config()
261 self.pg4.set_table_ip4(table_20.table_id)
263 self.pg4.config_ip4()
264 self.pg4.resolve_arp()
266 abf_13 = VppAbfPolicy(
272 self.pg4.remote_ip4, 0xFFFFFFFF, nh_table_id=table_20.table_id
276 abf_13.add_vpp_config()
277 attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
278 attach_5.add_vpp_config()
280 self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg4)
282 self.pg4.unconfig_ip4()
283 self.pg4.set_table_ip4(0)
286 """IPv6 ACL Based Forwarding"""
289 # Simple test for matching IPv6 packets
299 src_prefix=IPv6Network("2001::2/128"),
300 dst_prefix=IPv6Network("2001::1/128"),
302 acl_1 = VppAcl(self, rules=[rule_1])
303 acl_1.add_vpp_config()
306 # ABF policy for ACL 1 - path via interface 1
308 abf_1 = VppAbfPolicy(self, 10, acl_1, [VppRoutePath("3001::1", 0xFFFFFFFF)])
309 abf_1.add_vpp_config()
311 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 45, is_ipv6=True)
312 attach_1.add_vpp_config()
315 # a packet matching the rule
318 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
319 / IPv6(src="2001::2", dst="2001::1")
320 / UDP(sport=1234, dport=1234)
325 # packets are dropped because there is no route to the policy's
328 self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
331 # add a route resolving the next-hop
337 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
339 route.add_vpp_config()
342 # now expect packets forwarded.
344 self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
346 def test_abf4_deny(self):
347 """IPv4 ACL Deny Rule"""
353 pg0_subnet = ipaddress.ip_network(self.pg0.local_ip4_prefix, strict=False)
354 pg2_subnet = ipaddress.ip_network(self.pg2.local_ip4_prefix, strict=False)
355 pg3_subnet = ipaddress.ip_network(self.pg3.local_ip4_prefix, strict=False)
360 src_prefix=IPv4Network(pg0_subnet),
361 dst_prefix=IPv4Network(pg3_subnet),
363 rule_permit = AclRule(
367 src_prefix=IPv4Network(pg0_subnet),
368 dst_prefix=IPv4Network(pg2_subnet),
370 acl_1 = VppAcl(self, rules=[rule_deny, rule_permit])
371 acl_1.add_vpp_config()
374 # ABF policy for ACL 1 - path via interface 1
376 abf_1 = VppAbfPolicy(
377 self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
379 abf_1.add_vpp_config()
382 # Attach the policy to input interface Pg0
384 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
385 attach_1.add_vpp_config()
388 # a packet matching the deny rule
391 Ether(src=self.pg0.remote_mac, dst=self.pg3.remote_mac)
392 / IP(src=self.pg0.remote_ip4, dst=self.pg3.remote_ip4)
393 / UDP(sport=1234, dport=1234)
396 self.send_and_expect(self.pg0, p_deny * NUM_PKTS, self.pg3)
399 # a packet matching the permit rule
402 Ether(src=self.pg0.remote_mac, dst=self.pg2.remote_mac)
403 / IP(src=self.pg0.remote_ip4, dst=self.pg2.remote_ip4)
404 / UDP(sport=1234, dport=1234)
407 self.send_and_expect(self.pg0, p_permit * NUM_PKTS, self.pg1)
409 def test_abf6_deny(self):
410 """IPv6 ACL Deny Rule"""
416 pg0_subnet = ipaddress.ip_network(self.pg0.local_ip6_prefix, strict=False)
417 pg2_subnet = ipaddress.ip_network(self.pg2.local_ip6_prefix, strict=False)
418 pg3_subnet = ipaddress.ip_network(self.pg3.local_ip6_prefix, strict=False)
423 src_prefix=IPv6Network(pg0_subnet),
424 dst_prefix=IPv6Network(pg3_subnet),
426 rule_permit = AclRule(
430 src_prefix=IPv6Network(pg0_subnet),
431 dst_prefix=IPv6Network(pg2_subnet),
433 acl_1 = VppAcl(self, rules=[rule_deny, rule_permit])
434 acl_1.add_vpp_config()
437 # ABF policy for ACL 1 - path via interface 1
439 abf_1 = VppAbfPolicy(
440 self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
442 abf_1.add_vpp_config()
445 # Attach the policy to input interface Pg0
447 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50, is_ipv6=1)
448 attach_1.add_vpp_config()
451 # a packet matching the deny rule
454 Ether(src=self.pg0.remote_mac, dst=self.pg3.remote_mac)
455 / IPv6(src=self.pg0.remote_ip6, dst=self.pg3.remote_ip6)
456 / UDP(sport=1234, dport=1234)
459 self.send_and_expect(self.pg0, p_deny * NUM_PKTS, self.pg3)
462 # a packet matching the permit rule
465 Ether(src=self.pg0.remote_mac, dst=self.pg2.remote_mac)
466 / IPv6(src=self.pg0.remote_ip6, dst=self.pg2.remote_ip6)
467 / UDP(sport=1234, dport=1234)
470 self.send_and_expect(self.pg0, p_permit * NUM_PKTS, self.pg1)
473 if __name__ == "__main__":
474 unittest.main(testRunner=VppTestRunner)