4 from scapy.layers.l2 import Ether
5 from scapy.layers.inet import IP, ICMP
7 from framework import VppTestCase, VppTestRunner
8 from framework import tag_run_solo
9 from remote_test import RemoteClass, RemoteVppTestCase
10 from vpp_memif import remove_all_memif_vpp_config, VppSocketFilename, VppMemif
11 from vpp_ip_route import VppIpRoute, VppRoutePath
12 from vpp_papi import VppEnum
16 class TestMemif(VppTestCase):
19 remote_class = RemoteVppTestCase
22 def get_cpus_required(cls):
23 return super().get_cpus_required() + cls.remote_class.get_cpus_required()
26 def assign_cpus(cls, cpus):
27 remote_cpus = cpus[: cls.remote_class.get_cpus_required()]
28 my_cpus = cpus[cls.remote_class.get_cpus_required() :]
29 cls.remote_class.assign_cpus(remote_cpus)
30 super().assign_cpus(my_cpus)
34 # fork new process before client connects to VPP
35 cls.remote_test = RemoteClass(cls.remote_class)
36 cls.remote_test.start_remote()
37 cls.remote_test.set_request_timeout(10)
38 super(TestMemif, cls).setUpClass()
39 cls.remote_test.setUpClass(cls.tempdir)
40 cls.create_pg_interfaces(range(1))
41 for pg in cls.pg_interfaces:
47 def tearDownClass(cls):
48 cls.remote_test.tearDownClass()
49 cls.remote_test.quit_remote()
50 for pg in cls.pg_interfaces:
54 super(TestMemif, cls).tearDownClass()
57 remove_all_memif_vpp_config(self)
58 remove_all_memif_vpp_config(self.remote_test)
59 super(TestMemif, self).tearDown()
61 def _check_socket_filename(self, dump, socket_id, filename):
63 if (d.socket_id == socket_id) and (d.socket_filename == filename):
67 def test_memif_socket_filename_add_del(self):
68 """Memif socket filename add/del"""
70 # dump default socket filename
71 dump = self.vapi.memif_socket_filename_dump()
73 self._check_socket_filename(dump, 0, "%s/memif.sock" % self.tempdir)
79 VppSocketFilename(self, 1, "%s/memif1.sock" % self.tempdir)
81 # default path (test tempdir)
83 VppSocketFilename(self, 2, "memif2.sock", add_default_folder=True)
85 # create new folder in default folder
87 VppSocketFilename(self, 3, "sock/memif3.sock", add_default_folder=True)
90 for sock in memif_sockets:
92 dump = sock.query_vpp_config()
94 self._check_socket_filename(dump, sock.socket_id, sock.socket_filename)
97 for sock in memif_sockets:
98 sock.remove_vpp_config()
100 dump = self.vapi.memif_socket_filename_dump()
102 self._check_socket_filename(dump, 0, "%s/memif.sock" % self.tempdir)
105 def _create_delete_test_one_interface(self, memif):
106 memif.add_vpp_config()
108 dump = memif.query_vpp_config()
110 self.assertTrue(dump)
111 self.assertEqual(dump.sw_if_index, memif.sw_if_index)
112 self.assertEqual(dump.role, memif.role)
113 self.assertEqual(dump.mode, memif.mode)
114 if memif.socket_id is not None:
115 self.assertEqual(dump.socket_id, memif.socket_id)
117 memif.remove_vpp_config()
119 dump = memif.query_vpp_config()
121 self.assertFalse(dump)
123 def _connect_test_one_interface(self, memif):
124 self.assertTrue(memif.wait_for_link_up(5))
125 dump = memif.query_vpp_config()
127 if memif.role == VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE:
128 self.assertEqual(dump.ring_size, memif.ring_size)
129 self.assertEqual(dump.buffer_size, memif.buffer_size)
131 self.assertEqual(dump.ring_size, 1)
132 self.assertEqual(dump.buffer_size, 0)
134 def _connect_test_interface_pair(self, memif0, memif1):
135 memif0.add_vpp_config()
136 memif1.add_vpp_config()
141 self._connect_test_one_interface(memif0)
142 self._connect_test_one_interface(memif1)
144 memif0.remove_vpp_config()
145 memif1.remove_vpp_config()
147 def test_memif_create_delete(self):
148 """Memif create/delete interface"""
152 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
153 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
155 self._create_delete_test_one_interface(memif)
156 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER
157 self._create_delete_test_one_interface(memif)
159 def test_memif_create_custom_socket(self):
160 """Memif create with non-default socket filename"""
164 memif_sockets.append(
165 VppSocketFilename(self, 1, "%s/memif1.sock" % self.tempdir)
167 # default path (test tempdir)
168 memif_sockets.append(
169 VppSocketFilename(self, 2, "memif2.sock", add_default_folder=True)
171 # create new folder in default folder
172 memif_sockets.append(
173 VppSocketFilename(self, 3, "sock/memif3.sock", add_default_folder=True)
178 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
179 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
182 for sock in memif_sockets:
183 sock.add_vpp_config()
184 memif.socket_id = sock.socket_id
185 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE
186 self._create_delete_test_one_interface(memif)
187 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER
188 self._create_delete_test_one_interface(memif)
190 def test_memif_connect(self):
194 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
195 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
201 remote_socket = VppSocketFilename(
202 self.remote_test, 1, "%s/memif.sock" % self.tempdir
204 remote_socket.add_vpp_config()
206 remote_memif = VppMemif(
208 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER,
209 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
216 self._connect_test_interface_pair(memif, remote_memif)
218 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER
219 remote_memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE
221 self._connect_test_interface_pair(memif, remote_memif)
223 def _create_icmp(self, pg, memif, num):
227 Ether(dst=pg.local_mac, src=pg.remote_mac)
228 / IP(src=pg.remote_ip4, dst=str(memif.ip_prefix.network_address))
229 / ICMP(id=memif.if_id, type="echo-request", seq=i)
234 def _verify_icmp(self, pg, memif, rx, seq):
236 self.assertEqual(ip.src, str(memif.ip_prefix.network_address))
237 self.assertEqual(ip.dst, pg.remote_ip4)
238 self.assertEqual(ip.proto, 1)
240 self.assertEqual(icmp.type, 0) # echo-reply
241 self.assertEqual(icmp.id, memif.if_id)
242 self.assertEqual(icmp.seq, seq)
244 def test_memif_ping(self):
249 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
250 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
253 remote_socket = VppSocketFilename(
254 self.remote_test, 1, "%s/memif.sock" % self.tempdir
256 remote_socket.add_vpp_config()
258 remote_memif = VppMemif(
260 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER,
261 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
265 memif.add_vpp_config()
269 remote_memif.add_vpp_config()
270 remote_memif.config_ip4()
271 remote_memif.admin_up()
273 self.assertTrue(memif.wait_for_link_up(5))
274 self.assertTrue(remote_memif.wait_for_link_up(5))
276 # add routing to remote vpp
279 self.pg0._local_ip4_subnet,
281 [VppRoutePath(memif.ip_prefix.network_address, 0xFFFFFFFF)],
285 route.add_vpp_config()
287 # create ICMP echo-request from local pg to remote memif
289 pkts = self._create_icmp(self.pg0, remote_memif, packet_num)
291 self.pg0.add_stream(pkts)
292 self.pg_enable_capture(self.pg_interfaces)
294 capture = self.pg0.get_capture(packet_num, timeout=2)
297 self._verify_icmp(self.pg0, remote_memif, c, seq)
300 route.remove_vpp_config()
302 def test_memif_admin_up_down_up(self):
303 """Memif admin up/down/up"""
306 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
307 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
313 remote_socket = VppSocketFilename(
314 self.remote_test, 1, "%s/memif.sock" % self.tempdir
316 remote_socket.add_vpp_config()
318 remote_memif = VppMemif(
320 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER,
321 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
328 memif.add_vpp_config()
329 remote_memif.add_vpp_config()
332 remote_memif.admin_up()
334 remote_memif.admin_down()
336 remote_memif.admin_up()
338 self._connect_test_one_interface(memif)
339 self._connect_test_one_interface(remote_memif)
341 memif.remove_vpp_config()
342 remote_memif.remove_vpp_config()
343 remote_socket.remove_vpp_config()
346 if __name__ == "__main__":
347 unittest.main(testRunner=VppTestRunner)