3 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
6 from config import config
8 from framework import VppTestCase, VppTestRunner
9 from vpp_ip import DpoProto
10 from vpp_ip_route import (
17 from vpp_acl import AclRule, VppAcl
19 from scapy.packet import Raw
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet import IP, UDP
22 from scapy.layers.inet6 import IPv6
23 from ipaddress import IPv4Network, IPv6Network
25 from vpp_object import VppObject
30 def find_abf_policy(test, id):
31 policies = test.vapi.abf_policy_dump()
33 if id == p.policy.policy_id:
38 def find_abf_itf_attach(test, id, sw_if_index):
39 attachs = test.vapi.abf_itf_attach_dump()
41 if id == a.attach.policy_id and sw_if_index == a.attach.sw_if_index:
46 class VppAbfPolicy(VppObject):
47 def __init__(self, test, policy_id, acl, paths):
49 self.policy_id = policy_id
52 self.encoded_paths = []
53 for path in self.paths:
54 self.encoded_paths.append(path.encode())
56 def add_vpp_config(self):
57 self._test.vapi.abf_policy_add_del(
60 "policy_id": self.policy_id,
61 "acl_index": self.acl.acl_index,
62 "n_paths": len(self.paths),
63 "paths": self.encoded_paths,
66 self._test.registry.register(self, self._test.logger)
68 def remove_vpp_config(self):
69 self._test.vapi.abf_policy_add_del(
72 "policy_id": self.policy_id,
73 "acl_index": self.acl.acl_index,
74 "n_paths": len(self.paths),
75 "paths": self.encoded_paths,
79 def query_vpp_config(self):
80 return find_abf_policy(self._test, self.policy_id)
83 return "abf-policy-%d" % self.policy_id
86 class VppAbfAttach(VppObject):
87 def __init__(self, test, policy_id, sw_if_index, priority, is_ipv6=0):
89 self.policy_id = policy_id
90 self.sw_if_index = sw_if_index
91 self.priority = priority
92 self.is_ipv6 = is_ipv6
94 def add_vpp_config(self):
95 self._test.vapi.abf_itf_attach_add_del(
98 "policy_id": self.policy_id,
99 "sw_if_index": self.sw_if_index,
100 "priority": self.priority,
101 "is_ipv6": self.is_ipv6,
104 self._test.registry.register(self, self._test.logger)
106 def remove_vpp_config(self):
107 self._test.vapi.abf_itf_attach_add_del(
110 "policy_id": self.policy_id,
111 "sw_if_index": self.sw_if_index,
112 "priority": self.priority,
113 "is_ipv6": self.is_ipv6,
117 def query_vpp_config(self):
118 return find_abf_itf_attach(self._test, self.policy_id, self.sw_if_index)
121 return "abf-attach-%d-%d" % (self.policy_id, self.sw_if_index)
125 "acl" in config.excluded_plugins,
126 "Exclude ABF plugin tests due to absence of ACL plugin",
128 @unittest.skipIf("abf" in config.excluded_plugins, "Exclude ABF plugin tests")
129 class TestAbf(VppTestCase):
134 super(TestAbf, cls).setUpClass()
137 def tearDownClass(cls):
138 super(TestAbf, cls).tearDownClass()
141 super(TestAbf, self).setUp()
143 self.create_pg_interfaces(range(5))
145 for i in self.pg_interfaces[:4]:
153 for i in self.pg_interfaces:
157 super(TestAbf, self).tearDown()
160 """IPv4 ACL Based Forwarding"""
163 # We are not testing the various matching capabilities
164 # of ACLs, that's done elsewhere. Here ware are testing
165 # the application of ACLs to a forwarding path to achieve
167 # So we construct just a few ACLs to ensure the ABF policies
168 # are correctly constructed and used. And a few path types
169 # to test the API path decoding.
179 src_prefix=IPv4Network("1.1.1.1/32"),
180 dst_prefix=IPv4Network("1.1.1.2/32"),
182 acl_1 = VppAcl(self, rules=[rule_1])
183 acl_1.add_vpp_config()
186 # ABF policy for ACL 1 - path via interface 1
188 abf_1 = VppAbfPolicy(
189 self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
191 abf_1.add_vpp_config()
194 # Attach the policy to input interface Pg0
196 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
197 attach_1.add_vpp_config()
200 # fire in packet matching the ACL src,dst. If it's forwarded
201 # then the ABF was successful, since default routing will drop it
204 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
205 / IP(src="1.1.1.1", dst="1.1.1.2")
206 / UDP(sport=1234, dport=1234)
209 self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
212 # Attach a 'better' priority policy to the same interface
214 abf_2 = VppAbfPolicy(
215 self, 11, acl_1, [VppRoutePath(self.pg2.remote_ip4, self.pg2.sw_if_index)]
217 abf_2.add_vpp_config()
218 attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
219 attach_2.add_vpp_config()
221 self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
224 # Attach a policy with priority in the middle
226 abf_3 = VppAbfPolicy(
227 self, 12, acl_1, [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)]
229 abf_3.add_vpp_config()
230 attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
231 attach_3.add_vpp_config()
233 self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
236 # remove the best priority
238 attach_2.remove_vpp_config()
239 self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg3)
242 # Attach one of the same policies to Pg1
244 attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
245 attach_4.add_vpp_config()
248 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
249 / IP(src="1.1.1.1", dst="1.1.1.2")
250 / UDP(sport=1234, dport=1234)
253 self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
256 # detach the policy from PG1, now expect traffic to be dropped
258 attach_4.remove_vpp_config()
260 self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
263 # Swap to route via a next-hop in the non-default table
265 table_20 = VppIpTable(self, 20)
266 table_20.add_vpp_config()
268 self.pg4.set_table_ip4(table_20.table_id)
270 self.pg4.config_ip4()
271 self.pg4.resolve_arp()
273 abf_13 = VppAbfPolicy(
279 self.pg4.remote_ip4, 0xFFFFFFFF, nh_table_id=table_20.table_id
283 abf_13.add_vpp_config()
284 attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
285 attach_5.add_vpp_config()
287 self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg4)
289 self.pg4.unconfig_ip4()
290 self.pg4.set_table_ip4(0)
293 """IPv6 ACL Based Forwarding"""
296 # Simple test for matching IPv6 packets
306 src_prefix=IPv6Network("2001::2/128"),
307 dst_prefix=IPv6Network("2001::1/128"),
309 acl_1 = VppAcl(self, rules=[rule_1])
310 acl_1.add_vpp_config()
313 # ABF policy for ACL 1 - path via interface 1
315 abf_1 = VppAbfPolicy(self, 10, acl_1, [VppRoutePath("3001::1", 0xFFFFFFFF)])
316 abf_1.add_vpp_config()
318 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 45, is_ipv6=True)
319 attach_1.add_vpp_config()
322 # a packet matching the rule
325 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
326 / IPv6(src="2001::2", dst="2001::1")
327 / UDP(sport=1234, dport=1234)
332 # packets are dropped because there is no route to the policy's
335 self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
338 # add a route resolving the next-hop
344 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
346 route.add_vpp_config()
349 # now expect packets forwarded.
351 self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
353 def test_abf4_deny(self):
354 """IPv4 ACL Deny Rule"""
360 pg0_subnet = ipaddress.ip_network(self.pg0.local_ip4_prefix, strict=False)
361 pg2_subnet = ipaddress.ip_network(self.pg2.local_ip4_prefix, strict=False)
362 pg3_subnet = ipaddress.ip_network(self.pg3.local_ip4_prefix, strict=False)
367 src_prefix=IPv4Network(pg0_subnet),
368 dst_prefix=IPv4Network(pg3_subnet),
370 rule_permit = AclRule(
374 src_prefix=IPv4Network(pg0_subnet),
375 dst_prefix=IPv4Network(pg2_subnet),
377 acl_1 = VppAcl(self, rules=[rule_deny, rule_permit])
378 acl_1.add_vpp_config()
381 # ABF policy for ACL 1 - path via interface 1
383 abf_1 = VppAbfPolicy(
384 self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
386 abf_1.add_vpp_config()
389 # Attach the policy to input interface Pg0
391 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
392 attach_1.add_vpp_config()
395 # a packet matching the deny rule
398 Ether(src=self.pg0.remote_mac, dst=self.pg3.remote_mac)
399 / IP(src=self.pg0.remote_ip4, dst=self.pg3.remote_ip4)
400 / UDP(sport=1234, dport=1234)
403 self.send_and_expect(self.pg0, p_deny * NUM_PKTS, self.pg3)
406 # a packet matching the permit rule
409 Ether(src=self.pg0.remote_mac, dst=self.pg2.remote_mac)
410 / IP(src=self.pg0.remote_ip4, dst=self.pg2.remote_ip4)
411 / UDP(sport=1234, dport=1234)
414 self.send_and_expect(self.pg0, p_permit * NUM_PKTS, self.pg1)
416 def test_abf6_deny(self):
417 """IPv6 ACL Deny Rule"""
423 pg0_subnet = ipaddress.ip_network(self.pg0.local_ip6_prefix, strict=False)
424 pg2_subnet = ipaddress.ip_network(self.pg2.local_ip6_prefix, strict=False)
425 pg3_subnet = ipaddress.ip_network(self.pg3.local_ip6_prefix, strict=False)
430 src_prefix=IPv6Network(pg0_subnet),
431 dst_prefix=IPv6Network(pg3_subnet),
433 rule_permit = AclRule(
437 src_prefix=IPv6Network(pg0_subnet),
438 dst_prefix=IPv6Network(pg2_subnet),
440 acl_1 = VppAcl(self, rules=[rule_deny, rule_permit])
441 acl_1.add_vpp_config()
444 # ABF policy for ACL 1 - path via interface 1
446 abf_1 = VppAbfPolicy(
447 self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
449 abf_1.add_vpp_config()
452 # Attach the policy to input interface Pg0
454 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50, is_ipv6=1)
455 attach_1.add_vpp_config()
458 # a packet matching the deny rule
461 Ether(src=self.pg0.remote_mac, dst=self.pg3.remote_mac)
462 / IPv6(src=self.pg0.remote_ip6, dst=self.pg3.remote_ip6)
463 / UDP(sport=1234, dport=1234)
466 self.send_and_expect(self.pg0, p_deny * NUM_PKTS, self.pg3)
469 # a packet matching the permit rule
472 Ether(src=self.pg0.remote_mac, dst=self.pg2.remote_mac)
473 / IPv6(src=self.pg0.remote_ip6, dst=self.pg2.remote_ip6)
474 / UDP(sport=1234, dport=1234)
477 self.send_and_expect(self.pg0, p_permit * NUM_PKTS, self.pg1)
480 if __name__ == "__main__":
481 unittest.main(testRunner=VppTestRunner)