3 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel, \
9 VppIpTable, FibPathProto
11 from scapy.packet import Raw
12 from scapy.layers.l2 import Ether
13 from scapy.layers.inet import IP, UDP
14 from scapy.layers.inet6 import IPv6
16 from vpp_object import VppObject
21 def find_abf_policy(test, id):
22 policies = test.vapi.abf_policy_dump()
24 if id == p.policy.policy_id:
29 def find_abf_itf_attach(test, id, sw_if_index):
30 attachs = test.vapi.abf_itf_attach_dump()
32 if id == a.attach.policy_id and \
33 sw_if_index == a.attach.sw_if_index:
38 class VppAbfPolicy(VppObject):
46 self.policy_id = policy_id
49 self.encoded_paths = []
50 for path in self.paths:
51 self.encoded_paths.append(path.encode())
53 def add_vpp_config(self):
54 self._test.vapi.abf_policy_add_del(
56 {'policy_id': self.policy_id,
57 'acl_index': self.acl.acl_index,
58 'n_paths': len(self.paths),
59 'paths': self.encoded_paths})
60 self._test.registry.register(self, self._test.logger)
62 def remove_vpp_config(self):
63 self._test.vapi.abf_policy_add_del(
65 {'policy_id': self.policy_id,
66 'acl_index': self.acl.acl_index,
67 'n_paths': len(self.paths),
68 'paths': self.encoded_paths})
70 def query_vpp_config(self):
71 return find_abf_policy(self._test, self.policy_id)
74 return ("abf-policy-%d" % self.policy_id)
77 class VppAbfAttach(VppObject):
86 self.policy_id = policy_id
87 self.sw_if_index = sw_if_index
88 self.priority = priority
89 self.is_ipv6 = is_ipv6
91 def add_vpp_config(self):
92 self._test.vapi.abf_itf_attach_add_del(
94 {'policy_id': self.policy_id,
95 'sw_if_index': self.sw_if_index,
96 'priority': self.priority,
97 'is_ipv6': self.is_ipv6})
98 self._test.registry.register(self, self._test.logger)
100 def remove_vpp_config(self):
101 self._test.vapi.abf_itf_attach_add_del(
103 {'policy_id': self.policy_id,
104 'sw_if_index': self.sw_if_index,
105 'priority': self.priority,
106 'is_ipv6': self.is_ipv6})
108 def query_vpp_config(self):
109 return find_abf_itf_attach(self._test,
114 return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index))
117 class TestAbf(VppTestCase):
118 """ ABF Test Case """
122 super(TestAbf, cls).setUpClass()
125 def tearDownClass(cls):
126 super(TestAbf, cls).tearDownClass()
129 super(TestAbf, self).setUp()
131 self.create_pg_interfaces(range(5))
133 for i in self.pg_interfaces[:4]:
141 for i in self.pg_interfaces:
145 super(TestAbf, self).tearDown()
148 """ IPv4 ACL Based Forwarding
152 # We are not testing the various matching capabilities
153 # of ACLs, that's done elsewhere. Here ware are testing
154 # the application of ACLs to a forwarding path to achieve
156 # So we construct just a few ACLs to ensure the ABF policies
157 # are correctly constructed and used. And a few path types
158 # to test the API path decoding.
164 rule_1 = ({'is_permit': 1,
167 'srcport_or_icmptype_first': 1234,
168 'srcport_or_icmptype_last': 1234,
169 'src_ip_prefix_len': 32,
170 'src_ip_addr': inet_pton(AF_INET, "1.1.1.1"),
171 'dstport_or_icmpcode_first': 1234,
172 'dstport_or_icmpcode_last': 1234,
173 'dst_ip_prefix_len': 32,
174 'dst_ip_addr': inet_pton(AF_INET, "1.1.1.2")})
175 acl_1 = self.vapi.acl_add_replace(acl_index=4294967295, r=[rule_1])
178 # ABF policy for ACL 1 - path via interface 1
180 abf_1 = VppAbfPolicy(self, 10, acl_1,
181 [VppRoutePath(self.pg1.remote_ip4,
182 self.pg1.sw_if_index)])
183 abf_1.add_vpp_config()
186 # Attach the policy to input interface Pg0
188 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
189 attach_1.add_vpp_config()
192 # fire in packet matching the ACL src,dst. If it's forwarded
193 # then the ABF was successful, since default routing will drop it
195 p_1 = (Ether(src=self.pg0.remote_mac,
196 dst=self.pg0.local_mac) /
197 IP(src="1.1.1.1", dst="1.1.1.2") /
198 UDP(sport=1234, dport=1234) /
200 self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg1)
203 # Attach a 'better' priority policy to the same interface
205 abf_2 = VppAbfPolicy(self, 11, acl_1,
206 [VppRoutePath(self.pg2.remote_ip4,
207 self.pg2.sw_if_index)])
208 abf_2.add_vpp_config()
209 attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
210 attach_2.add_vpp_config()
212 self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
215 # Attach a policy with priority in the middle
217 abf_3 = VppAbfPolicy(self, 12, acl_1,
218 [VppRoutePath(self.pg3.remote_ip4,
219 self.pg3.sw_if_index)])
220 abf_3.add_vpp_config()
221 attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
222 attach_3.add_vpp_config()
224 self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
227 # remove the best priority
229 attach_2.remove_vpp_config()
230 self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg3)
233 # Attach one of the same policies to Pg1
235 attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
236 attach_4.add_vpp_config()
238 p_2 = (Ether(src=self.pg1.remote_mac,
239 dst=self.pg1.local_mac) /
240 IP(src="1.1.1.1", dst="1.1.1.2") /
241 UDP(sport=1234, dport=1234) /
243 self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
246 # detach the policy from PG1, now expect traffic to be dropped
248 attach_4.remove_vpp_config()
250 self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
253 # Swap to route via a next-hop in the non-default table
255 table_20 = VppIpTable(self, 20)
256 table_20.add_vpp_config()
258 self.pg4.set_table_ip4(table_20.table_id)
260 self.pg4.config_ip4()
261 self.pg4.resolve_arp()
263 abf_13 = VppAbfPolicy(self, 13, acl_1,
264 [VppRoutePath(self.pg4.remote_ip4,
266 nh_table_id=table_20.table_id)])
267 abf_13.add_vpp_config()
268 attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
269 attach_5.add_vpp_config()
271 self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg4)
273 self.pg4.unconfig_ip4()
274 self.pg4.set_table_ip4(0)
277 """ IPv6 ACL Based Forwarding
281 # Simple test for matching IPv6 packets
287 rule_1 = ({'is_permit': 1,
290 'srcport_or_icmptype_first': 1234,
291 'srcport_or_icmptype_last': 1234,
292 'src_ip_prefix_len': 128,
293 'src_ip_addr': inet_pton(AF_INET6, "2001::2"),
294 'dstport_or_icmpcode_first': 1234,
295 'dstport_or_icmpcode_last': 1234,
296 'dst_ip_prefix_len': 128,
297 'dst_ip_addr': inet_pton(AF_INET6, "2001::1")})
298 acl_1 = self.vapi.acl_add_replace(acl_index=4294967295,
302 # ABF policy for ACL 1 - path via interface 1
304 abf_1 = VppAbfPolicy(self, 10, acl_1,
305 [VppRoutePath("3001::1",
307 abf_1.add_vpp_config()
309 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index,
311 attach_1.add_vpp_config()
314 # a packet matching the rule
316 p = (Ether(src=self.pg0.remote_mac,
317 dst=self.pg0.local_mac) /
318 IPv6(src="2001::2", dst="2001::1") /
319 UDP(sport=1234, dport=1234) /
323 # packets are dropped because there is no route to the policy's
326 self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
329 # add a route resolving the next-hop
331 route = VppIpRoute(self, "3001::1", 32,
332 [VppRoutePath(self.pg1.remote_ip6,
333 self.pg1.sw_if_index)])
334 route.add_vpp_config()
337 # now expect packets forwarded.
339 self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
342 if __name__ == '__main__':
343 unittest.main(testRunner=VppTestRunner)