3 from scapy.layers.l2 import Ether
4 from scapy.layers.inet import IP, ICMP
6 from framework import VppTestCase
7 from asfframework import (
13 from remote_test import RemoteClass, RemoteVppTestCase
14 from vpp_memif import remove_all_memif_vpp_config, VppSocketFilename, VppMemif
15 from vpp_ip_route import VppIpRoute, VppRoutePath
16 from vpp_papi import VppEnum
21 class TestMemif(VppTestCase):
24 remote_class = RemoteVppTestCase
27 def get_cpus_required(cls):
28 return super().get_cpus_required() + cls.remote_class.get_cpus_required()
31 def assign_cpus(cls, cpus):
32 remote_cpus = cpus[: cls.remote_class.get_cpus_required()]
33 my_cpus = cpus[cls.remote_class.get_cpus_required() :]
34 cls.remote_class.assign_cpus(remote_cpus)
35 super().assign_cpus(my_cpus)
39 # fork new process before client connects to VPP
40 cls.remote_test = RemoteClass(cls.remote_class)
41 cls.remote_test.start_remote()
42 cls.remote_test.set_request_timeout(10)
43 super(TestMemif, cls).setUpClass()
44 if is_distro_debian11 == True and not hasattr(cls, "vpp"):
45 cls.remote_test.quit_remote()
47 cls.remote_test.setUpClass(cls.tempdir)
48 cls.create_pg_interfaces(range(1))
49 for pg in cls.pg_interfaces:
55 def tearDownClass(cls):
56 cls.remote_test.tearDownClass()
57 cls.remote_test.quit_remote()
58 for pg in cls.pg_interfaces:
62 super(TestMemif, cls).tearDownClass()
65 remove_all_memif_vpp_config(self)
66 remove_all_memif_vpp_config(self.remote_test)
67 super(TestMemif, self).tearDown()
69 def _check_socket_filename(self, dump, socket_id, filename):
71 if (d.socket_id == socket_id) and (d.socket_filename == filename):
75 def test_memif_socket_filename_add_del(self):
76 """Memif socket filename add/del"""
78 # dump default socket filename
79 dump = self.vapi.memif_socket_filename_dump()
81 self._check_socket_filename(dump, 0, "%s/memif.sock" % self.tempdir)
87 VppSocketFilename(self, 1, "%s/memif1.sock" % self.tempdir)
89 # default path (test tempdir)
91 VppSocketFilename(self, 2, "memif2.sock", add_default_folder=True)
93 # create new folder in default folder
95 VppSocketFilename(self, 3, "sock/memif3.sock", add_default_folder=True)
98 for sock in memif_sockets:
100 dump = sock.query_vpp_config()
102 self._check_socket_filename(dump, sock.socket_id, sock.socket_filename)
105 for sock in memif_sockets:
106 sock.remove_vpp_config()
108 dump = self.vapi.memif_socket_filename_dump()
110 self._check_socket_filename(dump, 0, "%s/memif.sock" % self.tempdir)
113 def _create_delete_test_one_interface(self, memif):
114 memif.add_vpp_config()
116 dump = memif.query_vpp_config()
118 self.assertTrue(dump)
119 self.assertEqual(dump.sw_if_index, memif.sw_if_index)
120 self.assertEqual(dump.role, memif.role)
121 self.assertEqual(dump.mode, memif.mode)
122 if memif.socket_id is not None:
123 self.assertEqual(dump.socket_id, memif.socket_id)
125 memif.remove_vpp_config()
127 dump = memif.query_vpp_config()
129 self.assertFalse(dump)
131 def _connect_test_one_interface(self, memif):
132 self.assertTrue(memif.wait_for_link_up(5))
133 dump = memif.query_vpp_config()
135 if memif.role == VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE:
136 self.assertEqual(dump.ring_size, memif.ring_size)
137 self.assertEqual(dump.buffer_size, memif.buffer_size)
139 self.assertEqual(dump.ring_size, 1)
140 self.assertEqual(dump.buffer_size, 0)
142 def _connect_test_interface_pair(self, memif0, memif1):
143 memif0.add_vpp_config()
144 memif1.add_vpp_config()
149 self._connect_test_one_interface(memif0)
150 self._connect_test_one_interface(memif1)
152 memif0.remove_vpp_config()
153 memif1.remove_vpp_config()
155 def test_memif_create_delete(self):
156 """Memif create/delete interface"""
160 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
161 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
163 self._create_delete_test_one_interface(memif)
164 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER
165 self._create_delete_test_one_interface(memif)
167 def test_memif_create_custom_socket(self):
168 """Memif create with non-default socket filename"""
172 memif_sockets.append(
173 VppSocketFilename(self, 1, "%s/memif1.sock" % self.tempdir)
175 # default path (test tempdir)
176 memif_sockets.append(
177 VppSocketFilename(self, 2, "memif2.sock", add_default_folder=True)
179 # create new folder in default folder
180 memif_sockets.append(
181 VppSocketFilename(self, 3, "sock/memif3.sock", add_default_folder=True)
186 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
187 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
190 for sock in memif_sockets:
191 sock.add_vpp_config()
192 memif.socket_id = sock.socket_id
193 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE
194 self._create_delete_test_one_interface(memif)
195 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER
196 self._create_delete_test_one_interface(memif)
198 def test_memif_connect(self):
202 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
203 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
209 remote_socket = VppSocketFilename(
210 self.remote_test, 1, "%s/memif.sock" % self.tempdir
212 remote_socket.add_vpp_config()
214 remote_memif = VppMemif(
216 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER,
217 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
224 self._connect_test_interface_pair(memif, remote_memif)
226 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER
227 remote_memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE
229 self._connect_test_interface_pair(memif, remote_memif)
231 def _create_icmp(self, pg, memif, num):
235 Ether(dst=pg.local_mac, src=pg.remote_mac)
236 / IP(src=pg.remote_ip4, dst=str(memif.ip_prefix.network_address))
237 / ICMP(id=memif.if_id, type="echo-request", seq=i)
242 def _verify_icmp(self, pg, memif, rx, seq):
244 self.assertEqual(ip.src, str(memif.ip_prefix.network_address))
245 self.assertEqual(ip.dst, pg.remote_ip4)
246 self.assertEqual(ip.proto, 1)
248 self.assertEqual(icmp.type, 0) # echo-reply
249 self.assertEqual(icmp.id, memif.if_id)
250 self.assertEqual(icmp.seq, seq)
252 def test_memif_ping(self):
257 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
258 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
261 remote_socket = VppSocketFilename(
262 self.remote_test, 1, "%s/memif.sock" % self.tempdir
264 remote_socket.add_vpp_config()
266 remote_memif = VppMemif(
268 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER,
269 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
273 memif.add_vpp_config()
277 remote_memif.add_vpp_config()
278 remote_memif.config_ip4()
279 remote_memif.admin_up()
281 self.assertTrue(memif.wait_for_link_up(5))
282 self.assertTrue(remote_memif.wait_for_link_up(5))
284 # add routing to remote vpp
287 self.pg0._local_ip4_subnet,
289 [VppRoutePath(memif.ip_prefix.network_address, 0xFFFFFFFF)],
293 route.add_vpp_config()
295 # create ICMP echo-request from local pg to remote memif
297 pkts = self._create_icmp(self.pg0, remote_memif, packet_num)
299 self.pg0.add_stream(pkts)
300 self.pg_enable_capture(self.pg_interfaces)
302 capture = self.pg0.get_capture(packet_num, timeout=2)
305 self._verify_icmp(self.pg0, remote_memif, c, seq)
308 route.remove_vpp_config()
310 def test_memif_admin_up_down_up(self):
311 """Memif admin up/down/up"""
314 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
315 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
321 remote_socket = VppSocketFilename(
322 self.remote_test, 1, "%s/memif.sock" % self.tempdir
324 remote_socket.add_vpp_config()
326 remote_memif = VppMemif(
328 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER,
329 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
336 memif.add_vpp_config()
337 remote_memif.add_vpp_config()
340 remote_memif.admin_up()
342 remote_memif.admin_down()
344 remote_memif.admin_up()
346 self._connect_test_one_interface(memif)
347 self._connect_test_one_interface(remote_memif)
349 memif.remove_vpp_config()
350 remote_memif.remove_vpp_config()
351 remote_socket.remove_vpp_config()
354 if __name__ == "__main__":
355 unittest.main(testRunner=VppTestRunner)