3 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel, \
9 VppIpTable, FibPathProto
10 from vpp_acl import AclRule, VppAcl
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP, UDP
15 from scapy.layers.inet6 import IPv6
16 from ipaddress import IPv4Network, IPv6Network
18 from vpp_object import VppObject
23 def find_abf_policy(test, id):
24 policies = test.vapi.abf_policy_dump()
26 if id == p.policy.policy_id:
31 def find_abf_itf_attach(test, id, sw_if_index):
32 attachs = test.vapi.abf_itf_attach_dump()
34 if id == a.attach.policy_id and \
35 sw_if_index == a.attach.sw_if_index:
40 class VppAbfPolicy(VppObject):
48 self.policy_id = policy_id
51 self.encoded_paths = []
52 for path in self.paths:
53 self.encoded_paths.append(path.encode())
55 def add_vpp_config(self):
56 self._test.vapi.abf_policy_add_del(
58 {'policy_id': self.policy_id,
59 'acl_index': self.acl.acl_index,
60 'n_paths': len(self.paths),
61 'paths': self.encoded_paths})
62 self._test.registry.register(self, self._test.logger)
64 def remove_vpp_config(self):
65 self._test.vapi.abf_policy_add_del(
67 {'policy_id': self.policy_id,
68 'acl_index': self.acl.acl_index,
69 'n_paths': len(self.paths),
70 'paths': self.encoded_paths})
72 def query_vpp_config(self):
73 return find_abf_policy(self._test, self.policy_id)
76 return ("abf-policy-%d" % self.policy_id)
79 class VppAbfAttach(VppObject):
88 self.policy_id = policy_id
89 self.sw_if_index = sw_if_index
90 self.priority = priority
91 self.is_ipv6 = is_ipv6
93 def add_vpp_config(self):
94 self._test.vapi.abf_itf_attach_add_del(
96 {'policy_id': self.policy_id,
97 'sw_if_index': self.sw_if_index,
98 'priority': self.priority,
99 'is_ipv6': self.is_ipv6})
100 self._test.registry.register(self, self._test.logger)
102 def remove_vpp_config(self):
103 self._test.vapi.abf_itf_attach_add_del(
105 {'policy_id': self.policy_id,
106 'sw_if_index': self.sw_if_index,
107 'priority': self.priority,
108 'is_ipv6': self.is_ipv6})
110 def query_vpp_config(self):
111 return find_abf_itf_attach(self._test,
116 return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index))
119 class TestAbf(VppTestCase):
120 """ ABF Test Case """
124 super(TestAbf, cls).setUpClass()
127 def tearDownClass(cls):
128 super(TestAbf, cls).tearDownClass()
131 super(TestAbf, self).setUp()
133 self.create_pg_interfaces(range(5))
135 for i in self.pg_interfaces[:4]:
143 for i in self.pg_interfaces:
147 super(TestAbf, self).tearDown()
150 """ IPv4 ACL Based Forwarding
154 # We are not testing the various matching capabilities
155 # of ACLs, that's done elsewhere. Here ware are testing
156 # the application of ACLs to a forwarding path to achieve
158 # So we construct just a few ACLs to ensure the ABF policies
159 # are correctly constructed and used. And a few path types
160 # to test the API path decoding.
166 rule_1 = AclRule(is_permit=1, proto=17, ports=1234,
167 src_prefix=IPv4Network("1.1.1.1/32"),
168 dst_prefix=IPv4Network("1.1.1.2/32"))
169 acl_1 = VppAcl(self, rules=[rule_1])
170 acl_1.add_vpp_config()
173 # ABF policy for ACL 1 - path via interface 1
175 abf_1 = VppAbfPolicy(self, 10, acl_1,
176 [VppRoutePath(self.pg1.remote_ip4,
177 self.pg1.sw_if_index)])
178 abf_1.add_vpp_config()
181 # Attach the policy to input interface Pg0
183 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
184 attach_1.add_vpp_config()
187 # fire in packet matching the ACL src,dst. If it's forwarded
188 # then the ABF was successful, since default routing will drop it
190 p_1 = (Ether(src=self.pg0.remote_mac,
191 dst=self.pg0.local_mac) /
192 IP(src="1.1.1.1", dst="1.1.1.2") /
193 UDP(sport=1234, dport=1234) /
195 self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg1)
198 # Attach a 'better' priority policy to the same interface
200 abf_2 = VppAbfPolicy(self, 11, acl_1,
201 [VppRoutePath(self.pg2.remote_ip4,
202 self.pg2.sw_if_index)])
203 abf_2.add_vpp_config()
204 attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
205 attach_2.add_vpp_config()
207 self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
210 # Attach a policy with priority in the middle
212 abf_3 = VppAbfPolicy(self, 12, acl_1,
213 [VppRoutePath(self.pg3.remote_ip4,
214 self.pg3.sw_if_index)])
215 abf_3.add_vpp_config()
216 attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
217 attach_3.add_vpp_config()
219 self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
222 # remove the best priority
224 attach_2.remove_vpp_config()
225 self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg3)
228 # Attach one of the same policies to Pg1
230 attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
231 attach_4.add_vpp_config()
233 p_2 = (Ether(src=self.pg1.remote_mac,
234 dst=self.pg1.local_mac) /
235 IP(src="1.1.1.1", dst="1.1.1.2") /
236 UDP(sport=1234, dport=1234) /
238 self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
241 # detach the policy from PG1, now expect traffic to be dropped
243 attach_4.remove_vpp_config()
245 self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
248 # Swap to route via a next-hop in the non-default table
250 table_20 = VppIpTable(self, 20)
251 table_20.add_vpp_config()
253 self.pg4.set_table_ip4(table_20.table_id)
255 self.pg4.config_ip4()
256 self.pg4.resolve_arp()
258 abf_13 = VppAbfPolicy(self, 13, acl_1,
259 [VppRoutePath(self.pg4.remote_ip4,
261 nh_table_id=table_20.table_id)])
262 abf_13.add_vpp_config()
263 attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
264 attach_5.add_vpp_config()
266 self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg4)
268 self.pg4.unconfig_ip4()
269 self.pg4.set_table_ip4(0)
272 """ IPv6 ACL Based Forwarding
276 # Simple test for matching IPv6 packets
282 rule_1 = AclRule(is_permit=1, proto=17, ports=1234,
283 src_prefix=IPv6Network("2001::2/128"),
284 dst_prefix=IPv6Network("2001::1/128"))
285 acl_1 = VppAcl(self, rules=[rule_1])
286 acl_1.add_vpp_config()
289 # ABF policy for ACL 1 - path via interface 1
291 abf_1 = VppAbfPolicy(self, 10, acl_1,
292 [VppRoutePath("3001::1",
294 abf_1.add_vpp_config()
296 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index,
298 attach_1.add_vpp_config()
301 # a packet matching the rule
303 p = (Ether(src=self.pg0.remote_mac,
304 dst=self.pg0.local_mac) /
305 IPv6(src="2001::2", dst="2001::1") /
306 UDP(sport=1234, dport=1234) /
310 # packets are dropped because there is no route to the policy's
313 self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
316 # add a route resolving the next-hop
318 route = VppIpRoute(self, "3001::1", 32,
319 [VppRoutePath(self.pg1.remote_ip6,
320 self.pg1.sw_if_index)])
321 route.add_vpp_config()
324 # now expect packets forwarded.
326 self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
329 if __name__ == '__main__':
330 unittest.main(testRunner=VppTestRunner)