4 from scapy.layers.l2 import Ether
5 from scapy.layers.inet import IP, ICMP
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from framework import tag_run_solo
9 from remote_test import RemoteClass, RemoteVppTestCase
10 from vpp_memif import remove_all_memif_vpp_config, \
11 VppSocketFilename, VppMemif
12 from vpp_ip_route import VppIpRoute, VppRoutePath
13 from vpp_papi import VppEnum
17 class TestMemif(VppTestCase):
18 """ Memif Test Case """
22 # fork new process before client connects to VPP
23 cls.remote_test = RemoteClass(RemoteVppTestCase)
24 cls.remote_test.start_remote()
25 cls.remote_test.set_request_timeout(10)
26 super(TestMemif, cls).setUpClass()
27 cls.remote_test.setUpClass(cls.tempdir)
28 cls.create_pg_interfaces(range(1))
29 for pg in cls.pg_interfaces:
35 def tearDownClass(cls):
36 cls.remote_test.tearDownClass()
37 cls.remote_test.quit_remote()
38 for pg in cls.pg_interfaces:
42 super(TestMemif, cls).tearDownClass()
45 remove_all_memif_vpp_config(self)
46 remove_all_memif_vpp_config(self.remote_test)
47 super(TestMemif, self).tearDown()
49 def _check_socket_filename(self, dump, socket_id, filename):
51 if (d.socket_id == socket_id) and (
52 d.socket_filename == filename):
56 def test_memif_socket_filename_add_del(self):
57 """ Memif socket filename add/del """
59 # dump default socket filename
60 dump = self.vapi.memif_socket_filename_dump()
62 self._check_socket_filename(
63 dump, 0, "%s/memif.sock" % self.tempdir))
69 self, 1, "%s/memif1.sock" % self.tempdir))
70 # default path (test tempdir)
76 add_default_folder=True))
77 # create new folder in default folder
83 add_default_folder=True))
85 for sock in memif_sockets:
87 dump = sock.query_vpp_config()
89 self._check_socket_filename(
92 sock.socket_filename))
94 for sock in memif_sockets:
95 sock.remove_vpp_config()
97 dump = self.vapi.memif_socket_filename_dump()
99 self._check_socket_filename(
100 dump, 0, "%s/memif.sock" % self.tempdir))
102 def _create_delete_test_one_interface(self, memif):
103 memif.add_vpp_config()
105 dump = memif.query_vpp_config()
107 self.assertTrue(dump)
108 self.assertEqual(dump.sw_if_index, memif.sw_if_index)
109 self.assertEqual(dump.role, memif.role)
110 self.assertEqual(dump.mode, memif.mode)
111 if (memif.socket_id is not None):
112 self.assertEqual(dump.socket_id, memif.socket_id)
114 memif.remove_vpp_config()
116 dump = memif.query_vpp_config()
118 self.assertFalse(dump)
120 def _connect_test_one_interface(self, memif):
121 self.assertTrue(memif.wait_for_link_up(5))
122 dump = memif.query_vpp_config()
124 if memif.role == VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE:
125 self.assertEqual(dump.ring_size, memif.ring_size)
126 self.assertEqual(dump.buffer_size, memif.buffer_size)
128 self.assertEqual(dump.ring_size, 1)
129 self.assertEqual(dump.buffer_size, 0)
131 def _connect_test_interface_pair(self, memif0, memif1):
132 memif0.add_vpp_config()
133 memif1.add_vpp_config()
138 self._connect_test_one_interface(memif0)
139 self._connect_test_one_interface(memif1)
141 memif0.remove_vpp_config()
142 memif1.remove_vpp_config()
144 def test_memif_create_delete(self):
145 """ Memif create/delete interface """
149 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
150 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET)
151 self._create_delete_test_one_interface(memif)
152 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER
153 self._create_delete_test_one_interface(memif)
155 def test_memif_create_custom_socket(self):
156 """ Memif create with non-default socket filename """
160 memif_sockets.append(
162 self, 1, "%s/memif1.sock" % self.tempdir))
163 # default path (test tempdir)
164 memif_sockets.append(
169 add_default_folder=True))
170 # create new folder in default folder
171 memif_sockets.append(
176 add_default_folder=True))
180 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
181 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET)
183 for sock in memif_sockets:
184 sock.add_vpp_config()
185 memif.socket_id = sock.socket_id
186 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE
187 self._create_delete_test_one_interface(memif)
188 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER
189 self._create_delete_test_one_interface(memif)
191 def test_memif_connect(self):
192 """ Memif connect """
195 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
196 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
201 remote_socket = VppSocketFilename(self.remote_test, 1,
202 "%s/memif.sock" % self.tempdir)
203 remote_socket.add_vpp_config()
205 remote_memif = VppMemif(
207 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER,
208 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
214 self._connect_test_interface_pair(memif, remote_memif)
216 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER
217 remote_memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE
219 self._connect_test_interface_pair(memif, remote_memif)
221 def _create_icmp(self, pg, memif, num):
224 pkt = (Ether(dst=pg.local_mac, src=pg.remote_mac) /
225 IP(src=pg.remote_ip4,
226 dst=str(memif.ip_prefix.network_address)) /
227 ICMP(id=memif.if_id, type='echo-request', seq=i))
231 def _verify_icmp(self, pg, memif, rx, seq):
233 self.assertEqual(ip.src, str(memif.ip_prefix.network_address))
234 self.assertEqual(ip.dst, pg.remote_ip4)
235 self.assertEqual(ip.proto, 1)
237 self.assertEqual(icmp.type, 0) # echo-reply
238 self.assertEqual(icmp.id, memif.if_id)
239 self.assertEqual(icmp.seq, seq)
241 def test_memif_ping(self):
246 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
247 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET)
249 remote_socket = VppSocketFilename(self.remote_test, 1,
250 "%s/memif.sock" % self.tempdir)
251 remote_socket.add_vpp_config()
253 remote_memif = VppMemif(
255 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER,
256 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
259 memif.add_vpp_config()
263 remote_memif.add_vpp_config()
264 remote_memif.config_ip4()
265 remote_memif.admin_up()
267 self.assertTrue(memif.wait_for_link_up(5))
268 self.assertTrue(remote_memif.wait_for_link_up(5))
270 # add routing to remote vpp
271 route = VppIpRoute(self.remote_test, self.pg0._local_ip4_subnet, 24,
272 [VppRoutePath(memif.ip_prefix.network_address,
276 route.add_vpp_config()
278 # create ICMP echo-request from local pg to remote memif
280 pkts = self._create_icmp(self.pg0, remote_memif, packet_num)
282 self.pg0.add_stream(pkts)
283 self.pg_enable_capture(self.pg_interfaces)
285 capture = self.pg0.get_capture(packet_num, timeout=2)
288 self._verify_icmp(self.pg0, remote_memif, c, seq)
291 route.remove_vpp_config()
294 if __name__ == '__main__':
295 unittest.main(testRunner=VppTestRunner)