3 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel, VppIpTable
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, UDP
13 from scapy.layers.inet6 import IPv6
15 from vpp_object import VppObject
18 def find_abf_policy(test, id):
19 policies = test.vapi.abf_policy_dump()
21 if id == p.policy.policy_id:
26 def find_abf_itf_attach(test, id, sw_if_index):
27 attachs = test.vapi.abf_itf_attach_dump()
29 if id == a.attach.policy_id and \
30 sw_if_index == a.attach.sw_if_index:
35 class VppAbfPolicy(VppObject):
43 self.policy_id = policy_id
47 def encode_paths(self):
52 if type(l) == VppMplsLabel:
53 lstack.append(l.encode())
55 lstack.append({'label': l, 'ttl': 255})
56 n_labels = len(lstack)
57 while (len(lstack) < 16):
59 br_paths.append({'next_hop': p.nh_addr,
62 'sw_if_index': 0xffffffff,
64 'table_id': p.nh_table_id,
65 'next_hop_id': p.next_hop_id,
66 'is_udp_encap': p.is_udp_encap,
68 'label_stack': lstack})
71 def add_vpp_config(self):
72 self._test.vapi.abf_policy_add_del(
74 {'policy_id': self.policy_id,
75 'acl_index': self.acl.acl_index,
76 'n_paths': len(self.paths),
77 'paths': self.encode_paths()})
78 self._test.registry.register(self, self._test.logger)
80 def remove_vpp_config(self):
81 self._test.vapi.abf_policy_add_del(
83 {'policy_id': self.policy_id,
84 'acl_index': self.acl.acl_index,
85 'n_paths': len(self.paths),
86 'paths': self.encode_paths()})
88 def query_vpp_config(self):
89 return find_abf_policy(self._test, self.policy_id)
92 return ("abf-policy-%d" % self.policy_id)
95 class VppAbfAttach(VppObject):
104 self.policy_id = policy_id
105 self.sw_if_index = sw_if_index
106 self.priority = priority
107 self.is_ipv6 = is_ipv6
109 def add_vpp_config(self):
110 self._test.vapi.abf_itf_attach_add_del(
112 {'policy_id': self.policy_id,
113 'sw_if_index': self.sw_if_index,
114 'priority': self.priority,
115 'is_ipv6': self.is_ipv6})
116 self._test.registry.register(self, self._test.logger)
118 def remove_vpp_config(self):
119 self._test.vapi.abf_itf_attach_add_del(
121 {'policy_id': self.policy_id,
122 'sw_if_index': self.sw_if_index,
123 'priority': self.priority,
124 'is_ipv6': self.is_ipv6})
126 def query_vpp_config(self):
127 return find_abf_itf_attach(self._test,
132 return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index))
135 class TestAbf(VppTestCase):
136 """ ABF Test Case """
139 super(TestAbf, self).setUp()
141 self.create_pg_interfaces(range(5))
143 for i in self.pg_interfaces[:4]:
151 for i in self.pg_interfaces:
156 super(TestAbf, self).tearDown()
159 """ IPv4 ACL Based Forwarding
163 # We are not testing the various matching capabilities
164 # of ACLs, that's done elsewhere. Here ware are testing
165 # the application of ACLs to a forwarding path to achieve
167 # So we construct just a few ACLs to ensure the ABF policies
168 # are correctly constructed and used. And a few path types
169 # to test the API path decoding.
175 rule_1 = ({'is_permit': 1,
178 'srcport_or_icmptype_first': 1234,
179 'srcport_or_icmptype_last': 1234,
180 'src_ip_prefix_len': 32,
181 'src_ip_addr': inet_pton(AF_INET, "1.1.1.1"),
182 'dstport_or_icmpcode_first': 1234,
183 'dstport_or_icmpcode_last': 1234,
184 'dst_ip_prefix_len': 32,
185 'dst_ip_addr': inet_pton(AF_INET, "1.1.1.2")})
186 acl_1 = self.vapi.acl_add_replace(acl_index=4294967295, r=[rule_1])
189 # ABF policy for ACL 1 - path via interface 1
191 abf_1 = VppAbfPolicy(self, 10, acl_1,
192 [VppRoutePath(self.pg1.remote_ip4,
193 self.pg1.sw_if_index)])
194 abf_1.add_vpp_config()
197 # Attach the policy to input interface Pg0
199 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
200 attach_1.add_vpp_config()
203 # fire in packet matching the ACL src,dst. If it's forwarded
204 # then the ABF was successful, since default routing will drop it
206 p_1 = (Ether(src=self.pg0.remote_mac,
207 dst=self.pg0.local_mac) /
208 IP(src="1.1.1.1", dst="1.1.1.2") /
209 UDP(sport=1234, dport=1234) /
211 self.send_and_expect(self.pg0, p_1*65, self.pg1)
214 # Attach a 'better' priority policy to the same interface
216 abf_2 = VppAbfPolicy(self, 11, acl_1,
217 [VppRoutePath(self.pg2.remote_ip4,
218 self.pg2.sw_if_index)])
219 abf_2.add_vpp_config()
220 attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
221 attach_2.add_vpp_config()
223 self.send_and_expect(self.pg0, p_1*65, self.pg2)
226 # Attach a policy with priority in the middle
228 abf_3 = VppAbfPolicy(self, 12, acl_1,
229 [VppRoutePath(self.pg3.remote_ip4,
230 self.pg3.sw_if_index)])
231 abf_3.add_vpp_config()
232 attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
233 attach_3.add_vpp_config()
235 self.send_and_expect(self.pg0, p_1*65, self.pg2)
238 # remove the best priority
240 attach_2.remove_vpp_config()
241 self.send_and_expect(self.pg0, p_1*65, self.pg3)
244 # Attach one of the same policies to Pg1
246 attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
247 attach_4.add_vpp_config()
249 p_2 = (Ether(src=self.pg1.remote_mac,
250 dst=self.pg1.local_mac) /
251 IP(src="1.1.1.1", dst="1.1.1.2") /
252 UDP(sport=1234, dport=1234) /
254 self.send_and_expect(self.pg1, p_2 * 65, self.pg3)
257 # detach the policy from PG1, now expect traffic to be dropped
259 attach_4.remove_vpp_config()
261 self.send_and_assert_no_replies(self.pg1, p_2 * 65, "Detached")
264 # Swap to route via a next-hop in the non-default table
266 table_20 = VppIpTable(self, 20)
267 table_20.add_vpp_config()
269 self.pg4.set_table_ip4(table_20.table_id)
271 self.pg4.config_ip4()
272 self.pg4.resolve_arp()
274 abf_13 = VppAbfPolicy(self, 13, acl_1,
275 [VppRoutePath(self.pg4.remote_ip4,
277 nh_table_id=table_20.table_id)])
278 abf_13.add_vpp_config()
279 attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
280 attach_5.add_vpp_config()
282 self.send_and_expect(self.pg0, p_1*65, self.pg4)
284 self.pg4.unconfig_ip4()
285 self.pg4.set_table_ip4(0)
288 """ IPv6 ACL Based Forwarding
292 # Simple test for matching IPv6 packets
298 rule_1 = ({'is_permit': 1,
301 'srcport_or_icmptype_first': 1234,
302 'srcport_or_icmptype_last': 1234,
303 'src_ip_prefix_len': 128,
304 'src_ip_addr': inet_pton(AF_INET6, "2001::2"),
305 'dstport_or_icmpcode_first': 1234,
306 'dstport_or_icmpcode_last': 1234,
307 'dst_ip_prefix_len': 128,
308 'dst_ip_addr': inet_pton(AF_INET6, "2001::1")})
309 acl_1 = self.vapi.acl_add_replace(acl_index=4294967295,
313 # ABF policy for ACL 1 - path via interface 1
315 abf_1 = VppAbfPolicy(self, 10, acl_1,
316 [VppRoutePath("3001::1",
318 proto=DpoProto.DPO_PROTO_IP6)])
319 abf_1.add_vpp_config()
321 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index,
323 attach_1.add_vpp_config()
326 # a packet matching the rule
328 p = (Ether(src=self.pg0.remote_mac,
329 dst=self.pg0.local_mac) /
330 IPv6(src="2001::2", dst="2001::1") /
331 UDP(sport=1234, dport=1234) /
335 # packets are dropped because there is no route to the policy's
338 self.send_and_assert_no_replies(self.pg1, p * 65, "no route")
341 # add a route resolving the next-hop
343 route = VppIpRoute(self, "3001::1", 32,
344 [VppRoutePath(self.pg1.remote_ip6,
345 self.pg1.sw_if_index,
346 proto=DpoProto.DPO_PROTO_IP6)],
348 route.add_vpp_config()
351 # now expect packets forwarded.
353 self.send_and_expect(self.pg0, p * 65, self.pg1)
356 if __name__ == '__main__':
357 unittest.main(testRunner=VppTestRunner)