4 from scapy.layers.l2 import Ether
5 from scapy.layers.inet import IP, ICMP
8 from framework import VppTestCase, VppTestRunner, running_extended_tests
9 from remote_test import RemoteClass, RemoteVppTestCase
10 from vpp_memif import remove_all_memif_vpp_config, \
11 VppSocketFilename, VppMemif
12 from vpp_ip_route import VppIpRoute, VppRoutePath
13 from vpp_papi import VppEnum
16 class TestMemif(VppTestCase):
17 """ Memif Test Case """
21 # fork new process before client connects to VPP
22 cls.remote_test = RemoteClass(RemoteVppTestCase)
23 cls.remote_test.start_remote()
24 cls.remote_test.set_request_timeout(10)
25 super(TestMemif, cls).setUpClass()
26 cls.remote_test.setUpClass(cls.tempdir)
27 cls.create_pg_interfaces(range(1))
28 for pg in cls.pg_interfaces:
34 def tearDownClass(cls):
35 cls.remote_test.tearDownClass()
36 cls.remote_test.quit_remote()
37 for pg in cls.pg_interfaces:
41 super(TestMemif, cls).tearDownClass()
44 remove_all_memif_vpp_config(self)
45 remove_all_memif_vpp_config(self.remote_test)
46 super(TestMemif, self).tearDown()
48 def _check_socket_filename(self, dump, socket_id, filename):
50 if (d.socket_id == socket_id) and (
51 d.socket_filename == filename):
55 def test_memif_socket_filename_add_del(self):
56 """ Memif socket filename add/del """
58 # dump default socket filename
59 dump = self.vapi.memif_socket_filename_dump()
61 self._check_socket_filename(
62 dump, 0, "%s/memif.sock" % self.tempdir))
68 self, 1, "%s/memif1.sock" % self.tempdir))
69 # default path (test tempdir)
75 add_default_folder=True))
76 # create new folder in default folder
82 add_default_folder=True))
84 for sock in memif_sockets:
86 dump = sock.query_vpp_config()
88 self._check_socket_filename(
91 sock.socket_filename))
93 for sock in memif_sockets:
94 sock.remove_vpp_config()
96 dump = self.vapi.memif_socket_filename_dump()
98 self._check_socket_filename(
99 dump, 0, "%s/memif.sock" % self.tempdir))
101 def _create_delete_test_one_interface(self, memif):
102 memif.add_vpp_config()
104 dump = memif.query_vpp_config()
106 self.assertTrue(dump)
107 self.assertEqual(dump.sw_if_index, memif.sw_if_index)
108 self.assertEqual(dump.role, memif.role)
109 self.assertEqual(dump.mode, memif.mode)
110 if (memif.socket_id is not None):
111 self.assertEqual(dump.socket_id, memif.socket_id)
113 memif.remove_vpp_config()
115 dump = memif.query_vpp_config()
117 self.assertFalse(dump)
119 def _connect_test_one_interface(self, memif):
120 self.assertTrue(memif.wait_for_link_up(5))
121 dump = memif.query_vpp_config()
123 if memif.role == VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE:
124 self.assertEqual(dump.ring_size, memif.ring_size)
125 self.assertEqual(dump.buffer_size, memif.buffer_size)
127 self.assertEqual(dump.ring_size, 1)
128 self.assertEqual(dump.buffer_size, 0)
130 def _connect_test_interface_pair(self, memif0, memif1):
131 memif0.add_vpp_config()
132 memif1.add_vpp_config()
137 self._connect_test_one_interface(memif0)
138 self._connect_test_one_interface(memif1)
140 memif0.remove_vpp_config()
141 memif1.remove_vpp_config()
143 def test_memif_create_delete(self):
144 """ Memif create/delete interface """
148 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
149 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET)
150 self._create_delete_test_one_interface(memif)
151 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER
152 self._create_delete_test_one_interface(memif)
154 def test_memif_create_custom_socket(self):
155 """ Memif create with non-default socket filename """
159 memif_sockets.append(
161 self, 1, "%s/memif1.sock" % self.tempdir))
162 # default path (test tempdir)
163 memif_sockets.append(
168 add_default_folder=True))
169 # create new folder in default folder
170 memif_sockets.append(
175 add_default_folder=True))
179 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
180 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET)
182 for sock in memif_sockets:
183 sock.add_vpp_config()
184 memif.socket_id = sock.socket_id
185 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE
186 self._create_delete_test_one_interface(memif)
187 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER
188 self._create_delete_test_one_interface(memif)
190 def test_memif_connect(self):
191 """ Memif connect """
194 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
195 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
200 remote_socket = VppSocketFilename(self.remote_test, 1,
201 "%s/memif.sock" % self.tempdir)
202 remote_socket.add_vpp_config()
204 remote_memif = VppMemif(
206 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER,
207 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
213 self._connect_test_interface_pair(memif, remote_memif)
215 memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER
216 remote_memif.role = VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE
218 self._connect_test_interface_pair(memif, remote_memif)
220 def _create_icmp(self, pg, memif, num):
223 pkt = (Ether(dst=pg.local_mac, src=pg.remote_mac) /
224 IP(src=pg.remote_ip4, dst=memif.ip_prefix.address) /
225 ICMP(id=memif.if_id, type='echo-request', seq=i))
229 def _verify_icmp(self, pg, memif, rx, seq):
231 self.assertEqual(ip.src, memif.ip_prefix.address)
232 self.assertEqual(ip.dst, pg.remote_ip4)
233 self.assertEqual(ip.proto, 1)
235 self.assertEqual(icmp.type, 0) # echo-reply
236 self.assertEqual(icmp.id, memif.if_id)
237 self.assertEqual(icmp.seq, seq)
239 def test_memif_ping(self):
244 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_SLAVE,
245 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET)
247 remote_socket = VppSocketFilename(self.remote_test, 1,
248 "%s/memif.sock" % self.tempdir)
249 remote_socket.add_vpp_config()
251 remote_memif = VppMemif(
253 VppEnum.vl_api_memif_role_t.MEMIF_ROLE_API_MASTER,
254 VppEnum.vl_api_memif_mode_t.MEMIF_MODE_API_ETHERNET,
257 memif.add_vpp_config()
261 remote_memif.add_vpp_config()
262 remote_memif.config_ip4()
263 remote_memif.admin_up()
265 self.assertTrue(memif.wait_for_link_up(5))
266 self.assertTrue(remote_memif.wait_for_link_up(5))
268 # add routing to remote vpp
269 route = VppIpRoute(self.remote_test, self.pg0._local_ip4_subnet, 24,
270 [VppRoutePath(memif.ip_prefix.address, 0xffffffff)],
273 route.add_vpp_config()
275 # create ICMP echo-request from local pg to remote memif
277 pkts = self._create_icmp(self.pg0, remote_memif, packet_num)
279 self.pg0.add_stream(pkts)
280 self.pg_enable_capture(self.pg_interfaces)
282 capture = self.pg0.get_capture(packet_num, timeout=2)
285 self._verify_icmp(self.pg0, remote_memif, c, seq)
288 route.remove_vpp_config()
291 if __name__ == '__main__':
292 unittest.main(testRunner=VppTestRunner)