4 from scapy.layers.l2 import Ether
5 from scapy.layers.inet import IP, ICMP
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from remote_test import RemoteClass, RemoteVppTestCase
9 from vpp_memif import remove_all_memif_vpp_config, \
10 VppSocketFilename, VppMemif
11 from vpp_ip_route import VppIpRoute, VppRoutePath
12 from vpp_papi import VppEnum
15 class TestMemif(VppTestCase):
16 """ Memif Test Case """
24 # fork new process before client connects to VPP
25 cls.remote_test = RemoteClass(RemoteVppTestCase)
26 cls.remote_test.start_remote()
27 cls.remote_test.set_request_timeout(10)
28 super(TestMemif, cls).setUpClass()
29 cls.remote_test.setUpClass(cls.tempdir)
30 cls.create_pg_interfaces(range(1))
31 for pg in cls.pg_interfaces:
37 def tearDownClass(cls):
38 cls.remote_test.tearDownClass()
39 cls.remote_test.quit_remote()
40 for pg in cls.pg_interfaces:
44 super(TestMemif, cls).tearDownClass()
47 remove_all_memif_vpp_config(self)
48 remove_all_memif_vpp_config(self.remote_test)
49 super(TestMemif, self).tearDown()
51 def _check_socket_filename(self, dump, socket_id, filename):
53 if (d.socket_id == socket_id) and (
54 d.socket_filename == filename):
58 def test_memif_socket_filename_add_del(self):
59 """ Memif socket filename add/del """
61 # dump default socket filename
62 dump = self.vapi.memif_socket_filename_dump()
64 self._check_socket_filename(
65 dump, 0, "%s/memif.sock" % self.tempdir))
71 self, 1, "%s/memif1.sock" % self.tempdir))
72 # default path (test tempdir)
78 add_default_folder=True))
79 # create new folder in default folder
85 add_default_folder=True))
87 for sock in memif_sockets:
89 dump = sock.query_vpp_config()
91 self._check_socket_filename(
94 sock.socket_filename))
96 for sock in memif_sockets:
97 sock.remove_vpp_config()
99 dump = self.vapi.memif_socket_filename_dump()
101 self._check_socket_filename(
102 dump, 0, "%s/memif.sock" % self.tempdir))
104 def _create_delete_test_one_interface(self, memif):
105 memif.add_vpp_config()
107 dump = memif.query_vpp_config()
109 self.assertTrue(dump)
110 self.assertEqual(dump.sw_if_index, memif.sw_if_index)
111 self.assertEqual(dump.role, memif.role)
112 self.assertEqual(dump.mode, memif.mode)
113 if (memif.socket_id is not None):
114 self.assertEqual(dump.socket_id, memif.socket_id)
116 memif.remove_vpp_config()
118 dump = memif.query_vpp_config()
120 self.assertFalse(dump)
122 def _connect_test_one_interface(self, memif):
123 self.assertTrue(memif.wait_for_link_up(5))
124 dump = memif.query_vpp_config()
126 if memif.role == VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE:
127 self.assertEqual(dump.ring_size, memif.ring_size)
128 self.assertEqual(dump.buffer_size, memif.buffer_size)
130 self.assertEqual(dump.ring_size, 1)
131 self.assertEqual(dump.buffer_size, 0)
133 def _connect_test_interface_pair(self, memif0, memif1):
134 memif0.add_vpp_config()
135 memif1.add_vpp_config()
140 self._connect_test_one_interface(memif0)
141 self._connect_test_one_interface(memif1)
143 memif0.remove_vpp_config()
144 memif1.remove_vpp_config()
146 def test_memif_create_delete(self):
147 """ Memif create/delete interface """
151 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
152 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET)
153 self._create_delete_test_one_interface(memif)
154 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER
155 self._create_delete_test_one_interface(memif)
157 def test_memif_create_custom_socket(self):
158 """ Memif create with non-default socket filename """
162 memif_sockets.append(
164 self, 1, "%s/memif1.sock" % self.tempdir))
165 # default path (test tempdir)
166 memif_sockets.append(
171 add_default_folder=True))
172 # create new folder in default folder
173 memif_sockets.append(
178 add_default_folder=True))
182 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
183 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET)
185 for sock in memif_sockets:
186 sock.add_vpp_config()
187 memif.socket_id = sock.socket_id
188 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE
189 self._create_delete_test_one_interface(memif)
190 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER
191 self._create_delete_test_one_interface(memif)
193 def test_memif_connect(self):
194 """ Memif connect """
197 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
198 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
203 remote_socket = VppSocketFilename(self.remote_test, 1,
204 "%s/memif.sock" % self.tempdir)
205 remote_socket.add_vpp_config()
207 remote_memif = VppMemif(
209 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER,
210 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
216 self._connect_test_interface_pair(memif, remote_memif)
218 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER
219 remote_memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE
221 self._connect_test_interface_pair(memif, remote_memif)
223 def _create_icmp(self, pg, memif, num):
226 pkt = (Ether(dst=pg.local_mac, src=pg.remote_mac) /
227 IP(src=pg.remote_ip4,
228 dst=str(memif.ip_prefix.network_address)) /
229 ICMP(id=memif.if_id, type='echo-request', seq=i))
233 def _verify_icmp(self, pg, memif, rx, seq):
235 self.assertEqual(ip.src, str(memif.ip_prefix.network_address))
236 self.assertEqual(ip.dst, pg.remote_ip4)
237 self.assertEqual(ip.proto, 1)
239 self.assertEqual(icmp.type, 0) # echo-reply
240 self.assertEqual(icmp.id, memif.if_id)
241 self.assertEqual(icmp.seq, seq)
243 def test_memif_ping(self):
248 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
249 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET)
251 remote_socket = VppSocketFilename(self.remote_test, 1,
252 "%s/memif.sock" % self.tempdir)
253 remote_socket.add_vpp_config()
255 remote_memif = VppMemif(
257 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER,
258 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
261 memif.add_vpp_config()
265 remote_memif.add_vpp_config()
266 remote_memif.config_ip4()
267 remote_memif.admin_up()
269 self.assertTrue(memif.wait_for_link_up(5))
270 self.assertTrue(remote_memif.wait_for_link_up(5))
272 # add routing to remote vpp
273 route = VppIpRoute(self.remote_test, self.pg0._local_ip4_subnet, 24,
274 [VppRoutePath(memif.ip_prefix.network_address,
278 route.add_vpp_config()
280 # create ICMP echo-request from local pg to remote memif
282 pkts = self._create_icmp(self.pg0, remote_memif, packet_num)
284 self.pg0.add_stream(pkts)
285 self.pg_enable_capture(self.pg_interfaces)
287 capture = self.pg0.get_capture(packet_num, timeout=2)
290 self._verify_icmp(self.pg0, remote_memif, c, seq)
293 route.remove_vpp_config()
296 if __name__ == '__main__':
297 unittest.main(testRunner=VppTestRunner)