4 from scapy.layers.l2 import Ether
5 from scapy.layers.inet import IP, ICMP
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from framework import tag_run_solo
9 from remote_test import RemoteClass, RemoteVppTestCase
10 from vpp_memif import remove_all_memif_vpp_config, \
11 VppSocketFilename, VppMemif
12 from vpp_ip_route import VppIpRoute, VppRoutePath
13 from vpp_papi import VppEnum
17 class TestMemif(VppTestCase):
18 """ Memif Test Case """
19 remote_class = RemoteVppTestCase
22 def get_cpus_required(cls):
23 return (super().get_cpus_required() +
24 cls.remote_class.get_cpus_required())
27 def assign_cpus(cls, cpus):
28 remote_cpus = cpus[:cls.remote_class.get_cpus_required()]
29 my_cpus = cpus[cls.remote_class.get_cpus_required():]
30 cls.remote_class.assign_cpus(remote_cpus)
31 super().assign_cpus(my_cpus)
35 # fork new process before client connects to VPP
36 cls.remote_test = RemoteClass(cls.remote_class)
37 cls.remote_test.start_remote()
38 cls.remote_test.set_request_timeout(10)
39 super(TestMemif, cls).setUpClass()
40 cls.remote_test.setUpClass(cls.tempdir)
41 cls.create_pg_interfaces(range(1))
42 for pg in cls.pg_interfaces:
48 def tearDownClass(cls):
49 cls.remote_test.tearDownClass()
50 cls.remote_test.quit_remote()
51 for pg in cls.pg_interfaces:
55 super(TestMemif, cls).tearDownClass()
58 remove_all_memif_vpp_config(self)
59 remove_all_memif_vpp_config(self.remote_test)
60 super(TestMemif, self).tearDown()
62 def _check_socket_filename(self, dump, socket_id, filename):
64 if (d.socket_id == socket_id) and (
65 d.socket_filename == filename):
69 def test_memif_socket_filename_add_del(self):
70 """ Memif socket filename add/del """
72 # dump default socket filename
73 dump = self.vapi.memif_socket_filename_dump()
75 self._check_socket_filename(
76 dump, 0, "%s/memif.sock" % self.tempdir))
82 self, 1, "%s/memif1.sock" % self.tempdir))
83 # default path (test tempdir)
89 add_default_folder=True))
90 # create new folder in default folder
96 add_default_folder=True))
98 for sock in memif_sockets:
100 dump = sock.query_vpp_config()
102 self._check_socket_filename(
105 sock.socket_filename))
107 for sock in memif_sockets:
108 sock.remove_vpp_config()
110 dump = self.vapi.memif_socket_filename_dump()
112 self._check_socket_filename(
113 dump, 0, "%s/memif.sock" % self.tempdir))
115 def _create_delete_test_one_interface(self, memif):
116 memif.add_vpp_config()
118 dump = memif.query_vpp_config()
120 self.assertTrue(dump)
121 self.assertEqual(dump.sw_if_index, memif.sw_if_index)
122 self.assertEqual(dump.role, memif.role)
123 self.assertEqual(dump.mode, memif.mode)
124 if (memif.socket_id is not None):
125 self.assertEqual(dump.socket_id, memif.socket_id)
127 memif.remove_vpp_config()
129 dump = memif.query_vpp_config()
131 self.assertFalse(dump)
133 def _connect_test_one_interface(self, memif):
134 self.assertTrue(memif.wait_for_link_up(5))
135 dump = memif.query_vpp_config()
137 if memif.role == VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE:
138 self.assertEqual(dump.ring_size, memif.ring_size)
139 self.assertEqual(dump.buffer_size, memif.buffer_size)
141 self.assertEqual(dump.ring_size, 1)
142 self.assertEqual(dump.buffer_size, 0)
144 def _connect_test_interface_pair(self, memif0, memif1):
145 memif0.add_vpp_config()
146 memif1.add_vpp_config()
151 self._connect_test_one_interface(memif0)
152 self._connect_test_one_interface(memif1)
154 memif0.remove_vpp_config()
155 memif1.remove_vpp_config()
157 def test_memif_create_delete(self):
158 """ Memif create/delete interface """
162 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
163 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET)
164 self._create_delete_test_one_interface(memif)
165 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER
166 self._create_delete_test_one_interface(memif)
168 def test_memif_create_custom_socket(self):
169 """ Memif create with non-default socket filename """
173 memif_sockets.append(
175 self, 1, "%s/memif1.sock" % self.tempdir))
176 # default path (test tempdir)
177 memif_sockets.append(
182 add_default_folder=True))
183 # create new folder in default folder
184 memif_sockets.append(
189 add_default_folder=True))
193 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
194 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET)
196 for sock in memif_sockets:
197 sock.add_vpp_config()
198 memif.socket_id = sock.socket_id
199 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE
200 self._create_delete_test_one_interface(memif)
201 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER
202 self._create_delete_test_one_interface(memif)
204 def test_memif_connect(self):
205 """ Memif connect """
208 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
209 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
214 remote_socket = VppSocketFilename(self.remote_test, 1,
215 "%s/memif.sock" % self.tempdir)
216 remote_socket.add_vpp_config()
218 remote_memif = VppMemif(
220 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER,
221 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
227 self._connect_test_interface_pair(memif, remote_memif)
229 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER
230 remote_memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE
232 self._connect_test_interface_pair(memif, remote_memif)
234 def _create_icmp(self, pg, memif, num):
237 pkt = (Ether(dst=pg.local_mac, src=pg.remote_mac) /
238 IP(src=pg.remote_ip4,
239 dst=str(memif.ip_prefix.network_address)) /
240 ICMP(id=memif.if_id, type='echo-request', seq=i))
244 def _verify_icmp(self, pg, memif, rx, seq):
246 self.assertEqual(ip.src, str(memif.ip_prefix.network_address))
247 self.assertEqual(ip.dst, pg.remote_ip4)
248 self.assertEqual(ip.proto, 1)
250 self.assertEqual(icmp.type, 0) # echo-reply
251 self.assertEqual(icmp.id, memif.if_id)
252 self.assertEqual(icmp.seq, seq)
254 def test_memif_ping(self):
259 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
260 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET)
262 remote_socket = VppSocketFilename(self.remote_test, 1,
263 "%s/memif.sock" % self.tempdir)
264 remote_socket.add_vpp_config()
266 remote_memif = VppMemif(
268 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER,
269 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
272 memif.add_vpp_config()
276 remote_memif.add_vpp_config()
277 remote_memif.config_ip4()
278 remote_memif.admin_up()
280 self.assertTrue(memif.wait_for_link_up(5))
281 self.assertTrue(remote_memif.wait_for_link_up(5))
283 # add routing to remote vpp
284 route = VppIpRoute(self.remote_test, self.pg0._local_ip4_subnet, 24,
285 [VppRoutePath(memif.ip_prefix.network_address,
289 route.add_vpp_config()
291 # create ICMP echo-request from local pg to remote memif
293 pkts = self._create_icmp(self.pg0, remote_memif, packet_num)
295 self.pg0.add_stream(pkts)
296 self.pg_enable_capture(self.pg_interfaces)
298 capture = self.pg0.get_capture(packet_num, timeout=2)
301 self._verify_icmp(self.pg0, remote_memif, c, seq)
304 route.remove_vpp_config()
306 def test_memif_admin_up_down_up(self):
307 """ Memif admin up/down/up """
310 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
311 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
316 remote_socket = VppSocketFilename(self.remote_test, 1,
317 "%s/memif.sock" % self.tempdir)
318 remote_socket.add_vpp_config()
320 remote_memif = VppMemif(
322 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER,
323 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
329 memif.add_vpp_config()
330 remote_memif.add_vpp_config()
333 remote_memif.admin_up()
335 remote_memif.admin_down()
337 remote_memif.admin_up()
339 self._connect_test_one_interface(memif)
340 self._connect_test_one_interface(remote_memif)
342 memif.remove_vpp_config()
343 remote_memif.remove_vpp_config()
344 remote_socket.remove_vpp_config()
347 if __name__ == '__main__':
348 unittest.main(testRunner=VppTestRunner)