5 from config import config
6 from framework import VppTestCase
7 from asfframework import VppTestRunner
8 from vpp_ip_route import (
13 from vpp_acl import AclRule, VppAcl
15 from scapy.packet import Raw
16 from scapy.layers.l2 import Ether
17 from scapy.layers.inet import IP, UDP
18 from scapy.layers.inet6 import IPv6
19 from ipaddress import IPv4Network, IPv6Network
21 from vpp_object import VppObject
26 def find_abf_policy(test, id):
27 policies = test.vapi.abf_policy_dump()
29 if id == p.policy.policy_id:
34 def find_abf_itf_attach(test, id, sw_if_index):
35 attachs = test.vapi.abf_itf_attach_dump()
37 if id == a.attach.policy_id and sw_if_index == a.attach.sw_if_index:
42 class VppAbfPolicy(VppObject):
43 def __init__(self, test, policy_id, acl, paths):
45 self.policy_id = policy_id
48 self.encoded_paths = []
49 for path in self.paths:
50 self.encoded_paths.append(path.encode())
52 def add_vpp_config(self):
53 self._test.vapi.abf_policy_add_del(
56 "policy_id": self.policy_id,
57 "acl_index": self.acl.acl_index,
58 "n_paths": len(self.paths),
59 "paths": self.encoded_paths,
62 self._test.registry.register(self, self._test.logger)
64 def remove_vpp_config(self):
65 self._test.vapi.abf_policy_add_del(
68 "policy_id": self.policy_id,
69 "acl_index": self.acl.acl_index,
70 "n_paths": len(self.paths),
71 "paths": self.encoded_paths,
75 def query_vpp_config(self):
76 return find_abf_policy(self._test, self.policy_id)
79 return "abf-policy-%d" % self.policy_id
82 class VppAbfAttach(VppObject):
83 def __init__(self, test, policy_id, sw_if_index, priority, is_ipv6=0):
85 self.policy_id = policy_id
86 self.sw_if_index = sw_if_index
87 self.priority = priority
88 self.is_ipv6 = is_ipv6
90 def add_vpp_config(self):
91 self._test.vapi.abf_itf_attach_add_del(
94 "policy_id": self.policy_id,
95 "sw_if_index": self.sw_if_index,
96 "priority": self.priority,
97 "is_ipv6": self.is_ipv6,
100 self._test.registry.register(self, self._test.logger)
102 def remove_vpp_config(self):
103 self._test.vapi.abf_itf_attach_add_del(
106 "policy_id": self.policy_id,
107 "sw_if_index": self.sw_if_index,
108 "priority": self.priority,
109 "is_ipv6": self.is_ipv6,
113 def query_vpp_config(self):
114 return find_abf_itf_attach(self._test, self.policy_id, self.sw_if_index)
117 return "abf-attach-%d-%d" % (self.policy_id, self.sw_if_index)
121 "acl" in config.excluded_plugins,
122 "Exclude ABF plugin tests due to absence of ACL plugin",
124 @unittest.skipIf("abf" in config.excluded_plugins, "Exclude ABF plugin tests")
125 class TestAbf(VppTestCase):
130 super(TestAbf, cls).setUpClass()
133 def tearDownClass(cls):
134 super(TestAbf, cls).tearDownClass()
137 super(TestAbf, self).setUp()
139 self.create_pg_interfaces(range(5))
141 for i in self.pg_interfaces[:4]:
149 for i in self.pg_interfaces:
153 super(TestAbf, self).tearDown()
156 """IPv4 ACL Based Forwarding"""
159 # We are not testing the various matching capabilities
160 # of ACLs, that's done elsewhere. Here ware are testing
161 # the application of ACLs to a forwarding path to achieve
163 # So we construct just a few ACLs to ensure the ABF policies
164 # are correctly constructed and used. And a few path types
165 # to test the API path decoding.
175 src_prefix=IPv4Network("1.1.1.1/32"),
176 dst_prefix=IPv4Network("1.1.1.2/32"),
178 acl_1 = VppAcl(self, rules=[rule_1])
179 acl_1.add_vpp_config()
182 # ABF policy for ACL 1 - path via interface 1
184 abf_1 = VppAbfPolicy(
185 self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
187 abf_1.add_vpp_config()
190 # Attach the policy to input interface Pg0
192 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
193 attach_1.add_vpp_config()
196 # fire in packet matching the ACL src,dst. If it's forwarded
197 # then the ABF was successful, since default routing will drop it
200 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
201 / IP(src="1.1.1.1", dst="1.1.1.2")
202 / UDP(sport=1234, dport=1234)
205 self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
208 # Attach a 'better' priority policy to the same interface
210 abf_2 = VppAbfPolicy(
211 self, 11, acl_1, [VppRoutePath(self.pg2.remote_ip4, self.pg2.sw_if_index)]
213 abf_2.add_vpp_config()
214 attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
215 attach_2.add_vpp_config()
217 self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
220 # Attach a policy with priority in the middle
222 abf_3 = VppAbfPolicy(
223 self, 12, acl_1, [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)]
225 abf_3.add_vpp_config()
226 attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
227 attach_3.add_vpp_config()
229 self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
232 # remove the best priority
234 attach_2.remove_vpp_config()
235 self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg3)
238 # Attach one of the same policies to Pg1
240 attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
241 attach_4.add_vpp_config()
244 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
245 / IP(src="1.1.1.1", dst="1.1.1.2")
246 / UDP(sport=1234, dport=1234)
249 self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
252 # detach the policy from PG1, now expect traffic to be dropped
254 attach_4.remove_vpp_config()
256 self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
259 # Swap to route via a next-hop in the non-default table
261 table_20 = VppIpTable(self, 20)
262 table_20.add_vpp_config()
264 self.pg4.set_table_ip4(table_20.table_id)
266 self.pg4.config_ip4()
267 self.pg4.resolve_arp()
269 abf_13 = VppAbfPolicy(
275 self.pg4.remote_ip4, 0xFFFFFFFF, nh_table_id=table_20.table_id
279 abf_13.add_vpp_config()
280 attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
281 attach_5.add_vpp_config()
283 self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg4)
285 self.pg4.unconfig_ip4()
286 self.pg4.set_table_ip4(0)
289 """IPv6 ACL Based Forwarding"""
292 # Simple test for matching IPv6 packets
302 src_prefix=IPv6Network("2001::2/128"),
303 dst_prefix=IPv6Network("2001::1/128"),
305 acl_1 = VppAcl(self, rules=[rule_1])
306 acl_1.add_vpp_config()
309 # ABF policy for ACL 1 - path via interface 1
311 abf_1 = VppAbfPolicy(self, 10, acl_1, [VppRoutePath("3001::1", 0xFFFFFFFF)])
312 abf_1.add_vpp_config()
314 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 45, is_ipv6=True)
315 attach_1.add_vpp_config()
318 # a packet matching the rule
321 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
322 / IPv6(src="2001::2", dst="2001::1")
323 / UDP(sport=1234, dport=1234)
328 # packets are dropped because there is no route to the policy's
331 self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
334 # add a route resolving the next-hop
340 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
342 route.add_vpp_config()
345 # now expect packets forwarded.
347 self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
349 def test_abf4_deny(self):
350 """IPv4 ACL Deny Rule"""
356 pg0_subnet = ipaddress.ip_network(self.pg0.local_ip4_prefix, strict=False)
357 pg2_subnet = ipaddress.ip_network(self.pg2.local_ip4_prefix, strict=False)
358 pg3_subnet = ipaddress.ip_network(self.pg3.local_ip4_prefix, strict=False)
363 src_prefix=IPv4Network(pg0_subnet),
364 dst_prefix=IPv4Network(pg3_subnet),
366 rule_permit = AclRule(
370 src_prefix=IPv4Network(pg0_subnet),
371 dst_prefix=IPv4Network(pg2_subnet),
373 acl_1 = VppAcl(self, rules=[rule_deny, rule_permit])
374 acl_1.add_vpp_config()
377 # ABF policy for ACL 1 - path via interface 1
379 abf_1 = VppAbfPolicy(
380 self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
382 abf_1.add_vpp_config()
385 # Attach the policy to input interface Pg0
387 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
388 attach_1.add_vpp_config()
391 # a packet matching the deny rule
394 Ether(src=self.pg0.remote_mac, dst=self.pg3.remote_mac)
395 / IP(src=self.pg0.remote_ip4, dst=self.pg3.remote_ip4)
396 / UDP(sport=1234, dport=1234)
399 self.send_and_expect(self.pg0, p_deny * NUM_PKTS, self.pg3)
402 # a packet matching the permit rule
405 Ether(src=self.pg0.remote_mac, dst=self.pg2.remote_mac)
406 / IP(src=self.pg0.remote_ip4, dst=self.pg2.remote_ip4)
407 / UDP(sport=1234, dport=1234)
410 self.send_and_expect(self.pg0, p_permit * NUM_PKTS, self.pg1)
412 def test_abf6_deny(self):
413 """IPv6 ACL Deny Rule"""
419 pg0_subnet = ipaddress.ip_network(self.pg0.local_ip6_prefix, strict=False)
420 pg2_subnet = ipaddress.ip_network(self.pg2.local_ip6_prefix, strict=False)
421 pg3_subnet = ipaddress.ip_network(self.pg3.local_ip6_prefix, strict=False)
426 src_prefix=IPv6Network(pg0_subnet),
427 dst_prefix=IPv6Network(pg3_subnet),
429 rule_permit = AclRule(
433 src_prefix=IPv6Network(pg0_subnet),
434 dst_prefix=IPv6Network(pg2_subnet),
436 acl_1 = VppAcl(self, rules=[rule_deny, rule_permit])
437 acl_1.add_vpp_config()
440 # ABF policy for ACL 1 - path via interface 1
442 abf_1 = VppAbfPolicy(
443 self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
445 abf_1.add_vpp_config()
448 # Attach the policy to input interface Pg0
450 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50, is_ipv6=1)
451 attach_1.add_vpp_config()
454 # a packet matching the deny rule
457 Ether(src=self.pg0.remote_mac, dst=self.pg3.remote_mac)
458 / IPv6(src=self.pg0.remote_ip6, dst=self.pg3.remote_ip6)
459 / UDP(sport=1234, dport=1234)
462 self.send_and_expect(self.pg0, p_deny * NUM_PKTS, self.pg3)
465 # a packet matching the permit rule
468 Ether(src=self.pg0.remote_mac, dst=self.pg2.remote_mac)
469 / IPv6(src=self.pg0.remote_ip6, dst=self.pg2.remote_ip6)
470 / UDP(sport=1234, dport=1234)
473 self.send_and_expect(self.pg0, p_permit * NUM_PKTS, self.pg1)
476 if __name__ == "__main__":
477 unittest.main(testRunner=VppTestRunner)