2 from socket import AF_INET, AF_INET6, inet_pton
4 from framework import VppTestCase, VppTestRunner
5 from vpp_interface import VppInterface
6 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, UDP
13 class VppPipe(VppInterface):
20 return self.result.pipe_sw_if_index[1]
24 return self.result.pipe_sw_if_index[0]
26 def __init__(self, test, instance=0xffffffff):
27 super(VppPipe, self).__init__(test)
29 self.instance = instance
31 def add_vpp_config(self):
32 self.result = self._test.vapi.pipe_create(
33 0 if self.instance == 0xffffffff else 1,
35 self.set_sw_if_index(self.result.parent_sw_if_index)
37 def remove_vpp_config(self):
38 self._test.vapi.pipe_delete(
39 self.result.parent_sw_if_index)
42 return self.object_id()
45 return "pipe-%d" % (self._sw_if_index)
47 def query_vpp_config(self):
48 pipes = self._test.vapi.pipe_dump()
50 if p.parent_sw_if_index == self.result.parent_sw_if_index:
54 def set_unnumbered(self, ip_sw_if_index, is_add=True):
55 res = self._test.vapi.sw_interface_set_unnumbered(
56 self.east, ip_sw_if_index, is_add)
57 res = self._test.vapi.sw_interface_set_unnumbered(
58 self.west, ip_sw_if_index, is_add)
61 class TestPipe(VppTestCase):
65 super(TestPipe, self).setUp()
67 self.create_pg_interfaces(range(4))
69 for i in self.pg_interfaces:
73 for i in self.pg_interfaces:
76 super(TestPipe, self).tearDown()
82 pipes.append(VppPipe(self))
83 pipes.append(VppPipe(self, 10))
90 # L2 cross-connect pipe0 east with pg0 and west with pg1
92 self.vapi.sw_interface_set_l2_xconnect(self.pg0.sw_if_index,
95 self.vapi.sw_interface_set_l2_xconnect(pipes[0].east,
98 self.vapi.sw_interface_set_l2_xconnect(self.pg1.sw_if_index,
101 self.vapi.sw_interface_set_l2_xconnect(pipes[0].west,
102 self.pg1.sw_if_index,
105 # test bi-drectional L2 flow pg0<->pg1
106 p = (Ether(src=self.pg0.remote_mac,
107 dst=self.pg1.remote_mac) /
110 UDP(sport=1234, dport=1234) /
113 self.send_and_expect(self.pg0, p * 65, self.pg1)
114 self.send_and_expect(self.pg1, p * 65, self.pg0)
117 # Attach ACL to ensure features are run on the pipe
119 rule_1 = ({'is_permit': 0,
122 'srcport_or_icmptype_first': 1234,
123 'srcport_or_icmptype_last': 1234,
124 'src_ip_prefix_len': 32,
125 'src_ip_addr': inet_pton(AF_INET, "1.1.1.1"),
126 'dstport_or_icmpcode_first': 1234,
127 'dstport_or_icmpcode_last': 1234,
128 'dst_ip_prefix_len': 32,
129 'dst_ip_addr': inet_pton(AF_INET, "1.1.1.2")})
130 acl = self.vapi.acl_add_replace(acl_index=4294967295,
133 # Apply the ACL on the pipe on output
134 self.vapi.acl_interface_set_acl_list(pipes[0].east,
137 self.send_and_assert_no_replies(self.pg0, p * 65)
138 self.send_and_expect(self.pg1, p * 65, self.pg0)
140 # remove from output and apply on input
141 self.vapi.acl_interface_set_acl_list(pipes[0].east,
144 self.vapi.acl_interface_set_acl_list(pipes[0].west,
147 self.send_and_assert_no_replies(self.pg0, p * 65)
148 self.send_and_expect(self.pg1, p * 65, self.pg0)
149 self.vapi.acl_interface_set_acl_list(pipes[0].west,
152 self.send_and_expect(self.pg0, p * 65, self.pg1)
153 self.send_and_expect(self.pg1, p * 65, self.pg0)
156 # L3 routes in two separate tables so a pipe can be used to L3
160 tables.append(VppIpTable(self, 1))
161 tables.append(VppIpTable(self, 2))
166 self.pg2.set_table_ip4(1)
167 self.pg2.config_ip4()
168 self.pg2.resolve_arp()
169 self.pg3.set_table_ip4(2)
170 self.pg3.config_ip4()
171 self.pg3.resolve_arp()
174 routes.append(VppIpRoute(self, "1.1.1.1", 32,
175 [VppRoutePath(self.pg3.remote_ip4,
176 self.pg3.sw_if_index)],
178 routes.append(VppIpRoute(self, "1.1.1.1", 32,
179 [VppRoutePath("0.0.0.0", pipes[1].east)],
181 routes.append(VppIpRoute(self, "1.1.1.2", 32,
182 [VppRoutePath("0.0.0.0", pipes[1].west)],
184 routes.append(VppIpRoute(self, "1.1.1.2", 32,
185 [VppRoutePath(self.pg2.remote_ip4,
186 self.pg2.sw_if_index)],
192 p_east = (Ether(src=self.pg2.remote_mac,
193 dst=self.pg2.local_mac) /
196 UDP(sport=1234, dport=1234) /
199 # bind the pipe ends to the correct tables
200 self.vapi.sw_interface_set_table(pipes[1].west, 0, 2)
201 self.vapi.sw_interface_set_table(pipes[1].east, 0, 1)
203 # IP is not enabled on the pipes at this point
204 self.send_and_assert_no_replies(self.pg2, p_east * 65)
206 # IP enable the Pipes by making them unnumbered
207 pipes[0].set_unnumbered(self.pg2.sw_if_index)
208 pipes[1].set_unnumbered(self.pg3.sw_if_index)
210 self.send_and_expect(self.pg2, p_east * 65, self.pg3)
212 # and the return path
213 p_west = (Ether(src=self.pg3.remote_mac,
214 dst=self.pg3.local_mac) /
217 UDP(sport=1234, dport=1234) /
219 self.send_and_expect(self.pg3, p_west * 65, self.pg2)
222 # Use ACLs to test features run on the Pipes
224 self.vapi.acl_interface_set_acl_list(pipes[1].east,
227 self.send_and_assert_no_replies(self.pg2, p_east * 65)
228 self.send_and_expect(self.pg3, p_west * 65, self.pg2)
230 # remove from output and apply on input
231 self.vapi.acl_interface_set_acl_list(pipes[1].east,
234 self.vapi.acl_interface_set_acl_list(pipes[1].west,
237 self.send_and_assert_no_replies(self.pg2, p_east * 65)
238 self.send_and_expect(self.pg3, p_west * 65, self.pg2)
239 self.vapi.acl_interface_set_acl_list(pipes[1].west,
242 self.send_and_expect(self.pg2, p_east * 65, self.pg3)
243 self.send_and_expect(self.pg3, p_west * 65, self.pg2)
245 # cleanup (so the tables delete)
246 self.pg2.unconfig_ip4()
247 self.pg2.set_table_ip4(0)
248 self.pg3.unconfig_ip4()
249 self.pg3.set_table_ip4(0)
250 self.vapi.sw_interface_set_table(pipes[1].west, 0, 0)
251 self.vapi.sw_interface_set_table(pipes[1].east, 0, 0)
254 if __name__ == '__main__':
255 unittest.main(testRunner=VppTestRunner)