import subprocess
import signal
from config import config
-from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204
+from framework import tag_fixme_vpp_workers
from framework import VppTestCase, VppTestRunner, Worker
from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
if self.process is None:
return False
try:
- logger.debug("Killing worker process (pid %d)" % self.process.pid)
+ logger.debug(f"Killing worker process (pid {self.process.pid})")
os.killpg(os.getpgid(self.process.pid), signal.SIGKILL)
self.join(timeout)
except OSError as e:
return False
-@tag_fixme_ubuntu2204
+@unittest.skipIf("quic" in config.excluded_plugins, "Exclude QUIC plugin tests")
class QUICTestCase(VppTestCase):
"""QUIC Test Case"""
timeout = 20
pre_test_sleep = 0.3
post_test_sleep = 0.3
+ server_appns = "server"
+ server_appns_secret = None
+ client_appns = "client"
+ client_appns_secret = None
@classmethod
def setUpClass(cls):
self.vppDebug = "vpp_debug" in config.vpp_build_dir
self.create_loopback_interfaces(2)
- self.uri = "quic://%s/1234" % self.loop0.local_ip4
+ self.uri = f"quic://{self.loop0.local_ip4}/1234"
table_id = 1
for i in self.lo_interfaces:
i.admin_up()
table_id += 1
# Configure namespaces
- self.vapi.app_namespace_add_del(
- namespace_id="server", sw_if_index=self.loop0.sw_if_index
+ self.vapi.app_namespace_add_del_v4(
+ namespace_id=self.server_appns,
+ secret=self.server_appns_secret,
+ sw_if_index=self.loop0.sw_if_index,
)
- self.vapi.app_namespace_add_del(
- namespace_id="client", sw_if_index=self.loop1.sw_if_index
+ self.vapi.app_namespace_add_del_v4(
+ namespace_id=self.client_appns,
+ secret=self.client_appns_secret,
+ sw_if_index=self.loop1.sw_if_index,
)
# Add inter-table routes
"""QUIC Echo Internal Test Case"""
test_bytes = " test-bytes"
- extra_vpp_punt_config = ["session", "{", "enable", "poll-main", "}"]
+ extra_vpp_config = ["session", "{", "enable", "poll-main", "}"]
def setUp(self):
super(QUICEchoIntTestCase, self).setUp()
- self.client_args = "uri {uri} fifo-size 64{testbytes} appns client".format(
- uri=self.uri, testbytes=self.test_bytes
+ self.client_args = (
+ f"uri {self.uri} fifo-size 64k{self.test_bytes} appns {self.client_appns} "
)
- self.server_args = "uri %s fifo-size 64 appns server" % self.uri
+ self.server_args = f"uri {self.uri} fifo-size 64k appns {self.server_appns} "
def tearDown(self):
super(QUICEchoIntTestCase, self).tearDown()
def server(self, *args):
- error = self.vapi.cli(
- "test echo server %s %s" % (self.server_args, " ".join(args))
- )
+ _args = self.server_args + " ".join(args)
+ error = self.vapi.cli(f"test echo server {_args}")
if error:
self.logger.critical(error)
self.assertNotIn("failed", error)
def client(self, *args):
- error = self.vapi.cli(
- "test echo client %s %s" % (self.client_args, " ".join(args))
- )
+ _args = self.client_args + " ".join(args)
+ error = self.vapi.cli(f"test echo client {_args}")
if error:
self.logger.critical(error)
self.assertNotIn("failed", error)
def test_quic_int_transfer(self):
"""QUIC internal transfer"""
self.server()
- self.client("no-output", "mbytes", "2")
+ self.client("mbytes", "2")
@tag_fixme_vpp_workers
def test_quic_serial_int_transfer(self):
"""QUIC serial internal transfer"""
self.server()
- self.client("no-output", "mbytes", "2")
- self.client("no-output", "mbytes", "2")
- self.client("no-output", "mbytes", "2")
- self.client("no-output", "mbytes", "2")
- self.client("no-output", "mbytes", "2")
+ self.client("mbytes", "2")
+ self.client("mbytes", "2")
+ self.client("mbytes", "2")
+ self.client("mbytes", "2")
+ self.client("mbytes", "2")
@tag_fixme_vpp_workers
def test_quic_int_multistream_transfer(self):
"""QUIC internal multi-stream transfer"""
self.server()
- self.client("nclients", "10", "mbytes", "1", "no-output")
+ self.client("nclients", "10", "mbytes", "1")
class QUICEchoExtTestCase(QUICTestCase):
vpp_worker_count = 1
server_fifo_size = "1M"
client_fifo_size = "4M"
- extra_vpp_punt_config = [
+ extra_vpp_config = [
"session",
"{",
"enable",
"poll-main",
- "evt_qs_memfd_seg",
+ "use-app-socket-api",
"wrk-mqs-segment-size",
"64M",
"event-queue-length",
]
def setUp(self):
+ self.server_appns_secret = 1234
+ self.client_appns_secret = 5678
super(QUICEchoExtTestCase, self).setUp()
common_args = [
"uri",
self.uri,
"json",
self.test_bytes,
- "socket-name",
- self.get_api_sock_path(),
"quic-setup",
self.quic_setup,
"nthreads",
"1",
"mq-size",
f"{self.evt_q_len}",
+ "use-app-socket-api",
]
self.server_echo_test_args = common_args + [
"server",
"appns",
- "server",
+ f"{self.server_appns}",
"fifo-size",
f"{self.server_fifo_size}",
+ "socket-name",
+ f"{self.tempdir}/app_ns_sockets/{self.server_appns}",
]
self.client_echo_test_args = common_args + [
"client",
"appns",
- "client",
+ f"{self.client_appns}",
"fifo-size",
f"{self.client_fifo_size}",
+ "socket-name",
+ f"{self.tempdir}/app_ns_sockets/{self.client_appns}",
]
error = self.vapi.cli("quic set fifo-size 2M")
if error:
def server(self, *args):
_args = self.server_echo_test_args + list(args)
- self.worker_server = QUICAppWorker(self.app, _args, self.logger, "server", self)
+ self.worker_server = QUICAppWorker(
+ self.app, _args, self.logger, self.server_appns, self
+ )
self.worker_server.start()
self.sleep(self.pre_test_sleep)
def client(self, *args):
_args = self.client_echo_test_args + list(args)
- self.worker_client = QUICAppWorker(self.app, _args, self.logger, "client", self)
+ self.worker_client = QUICAppWorker(
+ self.app, _args, self.logger, self.client_appns, self
+ )
self.worker_client.start()
timeout = None if self.debug_all else self.timeout
self.worker_client.join(timeout)
def validate_ext_test_results(self):
server_result = self.worker_server.result
+ self.logger.debug(self.vapi.cli(f"show session verbose 2"))
client_result = self.worker_client.result
- self.logger.info("Server worker result is `%s'" % server_result)
- self.logger.info("Client worker result is `%s'" % client_result)
+ self.logger.info(f"Server worker result is `{server_result}'")
+ self.logger.info(f"Client worker result is `{client_result}'")
server_kill_error = False
if self.worker_server.result is None:
server_kill_error = self.worker_server.teardown(self.logger, self.timeout)
if self.worker_client.result is None:
self.worker_client.teardown(self.logger, self.timeout)
- err_msg = "Wrong server worker return code (%s)" % server_result
+ err_msg = f"Wrong server worker return code ({server_result})"
self.assertEqual(server_result, 0, err_msg)
self.assertIsNotNone(
- client_result, "Timeout! Client worker did not finish in %ss" % self.timeout
+ client_result, f"Timeout! Client worker did not finish in {self.timeout}s"
)
- err_msg = "Wrong client worker return code (%s)" % client_result
+ err_msg = f"Wrong client worker return code ({client_result})"
self.assertEqual(client_result, 0, err_msg)
self.assertFalse(server_kill_error, "Server kill errored")