2 from socket import AF_INET, AF_INET6, inet_pton
5 from scapy.packet import Raw
6 from scapy.layers.l2 import Ether
7 from scapy.layers.inet import IP, UDP
9 from framework import VppTestCase, VppTestRunner
10 from vpp_interface import VppInterface
11 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
14 class VppPipe(VppInterface):
21 return self.result.pipe_sw_if_index[1]
25 return self.result.pipe_sw_if_index[0]
27 def __init__(self, test, instance=0xffffffff):
28 super(VppPipe, self).__init__(test)
30 self.instance = instance
32 def add_vpp_config(self):
33 self.result = self._test.vapi.pipe_create(
34 0 if self.instance == 0xffffffff else 1,
36 self.set_sw_if_index(self.result.sw_if_index)
38 def remove_vpp_config(self):
39 self._test.vapi.pipe_delete(
40 self.result.sw_if_index)
43 return "pipe-%d" % (self._sw_if_index)
45 def query_vpp_config(self):
46 pipes = self._test.vapi.pipe_dump()
48 if p.sw_if_index == self.result.sw_if_index:
52 def set_unnumbered(self, ip_sw_if_index, is_add=True):
53 res = self._test.vapi.sw_interface_set_unnumbered(ip_sw_if_index,
55 res = self._test.vapi.sw_interface_set_unnumbered(ip_sw_if_index,
59 class TestPipe(VppTestCase):
64 super(TestPipe, cls).setUpClass()
67 def tearDownClass(cls):
68 super(TestPipe, cls).tearDownClass()
71 super(TestPipe, self).setUp()
73 self.create_pg_interfaces(range(4))
75 for i in self.pg_interfaces:
79 for i in self.pg_interfaces:
82 super(TestPipe, self).tearDown()
87 pipes = [VppPipe(self), VppPipe(self, 10)]
94 # L2 cross-connect pipe0 east with pg0 and west with pg1
96 self.vapi.sw_interface_set_l2_xconnect(self.pg0.sw_if_index,
99 self.vapi.sw_interface_set_l2_xconnect(pipes[0].east,
100 self.pg0.sw_if_index,
102 self.vapi.sw_interface_set_l2_xconnect(self.pg1.sw_if_index,
105 self.vapi.sw_interface_set_l2_xconnect(pipes[0].west,
106 self.pg1.sw_if_index,
109 # test bi-directional L2 flow pg0<->pg1
110 p = (Ether(src=self.pg0.remote_mac,
111 dst=self.pg1.remote_mac) /
114 UDP(sport=1234, dport=1234) /
117 self.send_and_expect(self.pg0, p * 65, self.pg1)
118 self.send_and_expect(self.pg1, p * 65, self.pg0)
121 # Attach ACL to ensure features are run on the pipe
123 rule_1 = ({'is_permit': 0,
126 'srcport_or_icmptype_first': 1234,
127 'srcport_or_icmptype_last': 1234,
128 'src_ip_prefix_len': 32,
129 'src_ip_addr': inet_pton(AF_INET, "1.1.1.1"),
130 'dstport_or_icmpcode_first': 1234,
131 'dstport_or_icmpcode_last': 1234,
132 'dst_ip_prefix_len': 32,
133 'dst_ip_addr': inet_pton(AF_INET, "1.1.1.2")})
134 acl = self.vapi.acl_add_replace(acl_index=4294967295,
137 # Apply the ACL on the pipe on output
138 self.vapi.acl_interface_set_acl_list(pipes[0].east,
141 self.send_and_assert_no_replies(self.pg0, p * 65)
142 self.send_and_expect(self.pg1, p * 65, self.pg0)
144 # remove from output and apply on input
145 self.vapi.acl_interface_set_acl_list(pipes[0].east,
148 self.vapi.acl_interface_set_acl_list(pipes[0].west,
151 self.send_and_assert_no_replies(self.pg0, p * 65)
152 self.send_and_expect(self.pg1, p * 65, self.pg0)
153 self.vapi.acl_interface_set_acl_list(pipes[0].west,
156 self.send_and_expect(self.pg0, p * 65, self.pg1)
157 self.send_and_expect(self.pg1, p * 65, self.pg0)
160 # L3 routes in two separate tables so a pipe can be used to L3
164 tables.append(VppIpTable(self, 1))
165 tables.append(VppIpTable(self, 2))
170 self.pg2.set_table_ip4(1)
171 self.pg2.config_ip4()
172 self.pg2.resolve_arp()
173 self.pg3.set_table_ip4(2)
174 self.pg3.config_ip4()
175 self.pg3.resolve_arp()
178 routes.append(VppIpRoute(self, "1.1.1.1", 32,
179 [VppRoutePath(self.pg3.remote_ip4,
180 self.pg3.sw_if_index)],
182 routes.append(VppIpRoute(self, "1.1.1.1", 32,
183 [VppRoutePath("0.0.0.0", pipes[1].east)],
185 routes.append(VppIpRoute(self, "1.1.1.2", 32,
186 [VppRoutePath("0.0.0.0", pipes[1].west)],
188 routes.append(VppIpRoute(self, "1.1.1.2", 32,
189 [VppRoutePath(self.pg2.remote_ip4,
190 self.pg2.sw_if_index)],
196 p_east = (Ether(src=self.pg2.remote_mac,
197 dst=self.pg2.local_mac) /
200 UDP(sport=1234, dport=1234) /
203 # bind the pipe ends to the correct tables
204 self.vapi.sw_interface_set_table(pipes[1].west, 0, 2)
205 self.vapi.sw_interface_set_table(pipes[1].east, 0, 1)
207 # IP is not enabled on the pipes at this point
208 self.send_and_assert_no_replies(self.pg2, p_east * 65)
210 # IP enable the Pipes by making them unnumbered
211 pipes[0].set_unnumbered(self.pg2.sw_if_index)
212 pipes[1].set_unnumbered(self.pg3.sw_if_index)
214 self.send_and_expect(self.pg2, p_east * 65, self.pg3)
216 # and the return path
217 p_west = (Ether(src=self.pg3.remote_mac,
218 dst=self.pg3.local_mac) /
221 UDP(sport=1234, dport=1234) /
223 self.send_and_expect(self.pg3, p_west * 65, self.pg2)
226 # Use ACLs to test features run on the Pipes
228 self.vapi.acl_interface_set_acl_list(pipes[1].east,
231 self.send_and_assert_no_replies(self.pg2, p_east * 65)
232 self.send_and_expect(self.pg3, p_west * 65, self.pg2)
234 # remove from output and apply on input
235 self.vapi.acl_interface_set_acl_list(pipes[1].east,
238 self.vapi.acl_interface_set_acl_list(pipes[1].west,
241 self.send_and_assert_no_replies(self.pg2, p_east * 65)
242 self.send_and_expect(self.pg3, p_west * 65, self.pg2)
243 self.vapi.acl_interface_set_acl_list(pipes[1].west,
246 self.send_and_expect(self.pg2, p_east * 65, self.pg3)
247 self.send_and_expect(self.pg3, p_west * 65, self.pg2)
249 # cleanup (so the tables delete)
250 self.pg2.unconfig_ip4()
251 self.pg2.set_table_ip4(0)
252 self.pg3.unconfig_ip4()
253 self.pg3.set_table_ip4(0)
254 self.vapi.sw_interface_set_table(pipes[1].west, 0, 0)
255 self.vapi.sw_interface_set_table(pipes[1].east, 0, 0)
258 if __name__ == '__main__':
259 unittest.main(testRunner=VppTestRunner)