+import socket
import unittest
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, ICMP
+import six
from framework import VppTestCase, VppTestRunner, running_extended_tests
from remote_test import RemoteClass, RemoteVppTestCase
-from vpp_memif import *
+from vpp_memif import MEMIF_MODE, MEMIF_ROLE, remove_all_memif_vpp_config, \
+ VppSocketFilename, VppMemif
+from vpp_ip_route import VppIpRoute, VppRoutePath
class TestMemif(VppTestCase):
def _check_socket_filename(self, dump, socket_id, filename):
for d in dump:
if (d.socket_id == socket_id) and (
- d.socket_filename.rstrip("\0") == filename):
+ d.socket_filename.rstrip(b"\0") == filename):
return True
return False
def test_memif_socket_filename_add_del(self):
- """ Memif socket filenale add/del """
+ """ Memif socket filename add/del """
# dump default socket filename
dump = self.vapi.memif_socket_filename_dump()
self.assertTrue(
self._check_socket_filename(
- dump, 0, "/run/vpp/memif.sock"))
+ dump, 0, b"%s/memif.sock" % six.ensure_binary(
+ self.tempdir, encoding='utf-8')))
memif_sockets = []
# existing path
memif_sockets.append(
VppSocketFilename(
- self, 1, "/run/vpp/memif1.sock"))
- # default path ("/run/vpp")
+ self, 1, b"%s/memif1.sock" % six.ensure_binary(
+ self.tempdir, encoding='utf-8')))
+ # default path (test tempdir)
memif_sockets.append(
VppSocketFilename(
self,
2,
- "memif2.sock",
+ b"memif2.sock",
add_default_folder=True))
# create new folder in default folder
memif_sockets.append(
VppSocketFilename(
self,
3,
- "sock/memif3.sock",
+ b"sock/memif3.sock",
add_default_folder=True))
for sock in memif_sockets:
dump = self.vapi.memif_socket_filename_dump()
self.assertTrue(
self._check_socket_filename(
- dump, 0, "/run/vpp/memif.sock"))
+ dump, 0, b"%s/memif.sock" % six.ensure_binary(
+ self.tempdir, encoding='utf-8')))
def _create_delete_test_one_interface(self, memif):
memif.add_vpp_config()
self._create_delete_test_one_interface(memif)
def test_memif_create_custom_socket(self):
- """ Memif create with non-default socket filname """
+ """ Memif create with non-default socket filename """
memif_sockets = []
# existing path
memif_sockets.append(
VppSocketFilename(
- self, 1, "/run/vpp/memif1.sock"))
- # default path ("/run/vpp")
+ self, 1, b"%s/memif1.sock" % six.ensure_binary(
+ self.tempdir, encoding='utf-8')))
+ # default path (test tempdir)
memif_sockets.append(
VppSocketFilename(
self,
2,
- "memif2.sock",
+ b"memif2.sock",
add_default_folder=True))
# create new folder in default folder
memif_sockets.append(
VppSocketFilename(
self,
3,
- "sock/memif3.sock",
+ b"sock/memif3.sock",
add_default_folder=True))
memif = VppMemif(self, MEMIF_ROLE.SLAVE, MEMIF_MODE.ETHERNET)
def test_memif_connect(self):
""" Memif connect """
- memif = VppMemif(
- self,
- MEMIF_ROLE.SLAVE,
- MEMIF_MODE.ETHERNET,
- ring_size=1024,
- buffer_size=2048)
- remote_memif = VppMemif(
- self.remote_test,
- MEMIF_ROLE.MASTER,
- MEMIF_MODE.ETHERNET,
- ring_size=1024,
- buffer_size=2048)
+ memif = VppMemif(self, MEMIF_ROLE.SLAVE, MEMIF_MODE.ETHERNET,
+ ring_size=1024, buffer_size=2048)
+
+ remote_socket = VppSocketFilename(self.remote_test, 1,
+ b"%s/memif.sock" % six.ensure_binary(
+ self.tempdir, encoding='utf-8'))
+ remote_socket.add_vpp_config()
+
+ remote_memif = VppMemif(self.remote_test, MEMIF_ROLE.MASTER,
+ MEMIF_MODE.ETHERNET, socket_id=1,
+ ring_size=1024, buffer_size=2048)
self._connect_test_interface_pair(memif, remote_memif)
def test_memif_ping(self):
""" Memif ping """
- memif = VppMemif(self, MEMIF_ROLE.MASTER, MEMIF_MODE.ETHERNET)
- remote_memif = VppMemif(self.remote_test, MEMIF_ROLE.SLAVE,
- MEMIF_MODE.ETHERNET)
+
+ memif = VppMemif(self, MEMIF_ROLE.SLAVE, MEMIF_MODE.ETHERNET)
+
+ remote_socket = VppSocketFilename(self.remote_test, 1,
+ b"%s/memif.sock" % six.ensure_binary(
+ self.tempdir, encoding='utf-8'))
+ remote_socket.add_vpp_config()
+
+ remote_memif = VppMemif(self.remote_test, MEMIF_ROLE.MASTER,
+ MEMIF_MODE.ETHERNET, socket_id=1)
memif.add_vpp_config()
memif.config_ip4()
self.assertTrue(remote_memif.wait_for_link_up(5))
# add routing to remote vpp
- dst_addr = socket.inet_pton(socket.AF_INET, self.pg0._local_ip4_subnet)
- dst_addr_len = 24
- next_hop_addr = socket.inet_pton(socket.AF_INET, memif.ip4_addr)
- self.remote_test.vapi.ip_add_del_route(
- dst_addr, dst_addr_len, next_hop_addr)
+ route = VppIpRoute(self.remote_test, self.pg0._local_ip4_subnet, 24,
+ [VppRoutePath(memif.ip4_addr, 0xffffffff)],
+ register=False)
+
+ route.add_vpp_config()
# create ICMP echo-request from local pg to remote memif
packet_num = 10
self._verify_icmp(self.pg0, remote_memif, c, seq)
seq += 1
+ route.remove_vpp_config()
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)