-#!/usr/bin/env python
+#!/usr/bin/env python3
""" Vpp QUIC tests """
import unittest
""" QUIC Test Application Worker """
process = None
- def __init__(self, build_dir, appname, args, logger, env={}):
+ def __init__(self, build_dir, appname, args, logger, role, testcase,
+ env={}):
app = "%s/vpp/bin/%s" % (build_dir, appname)
self.args = [app] + args
+ self.role = role
+ self.wait_for_gdb = 'wait-for-gdb'
+ self.testcase = testcase
super(QUICAppWorker, self).__init__(self.args, logger, env)
def run(self):
class QUICEchoIntTestCase(QUICTestCase):
"""QUIC Echo Internal Test Case"""
+ test_bytes = ' test-bytes'
def setUp(self):
super(QUICEchoIntTestCase, self).setUp()
- self.client_args = "uri %s fifo-size 64 test-bytes appns client" \
- % self.uri
+ self.client_args = 'uri {uri} fifo-size 64{testbytes} appns client' \
+ .format(uri=self.uri, testbytes=self.test_bytes)
self.server_args = "uri %s fifo-size 64 appns server" % self.uri
def server(self, *args):
self.client("no-output", "mbytes", "2")
+class QUICEchoIntTransferBigTestCase(QUICEchoIntTestCase):
+ """QUIC Echo Internal Transfer Big Test Case"""
+ test_bytes = ''
+
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ def test_quic_int_transfer_big(self):
+ self.server()
+ self.client("no-output", "gbytes", "10")
+
+
class QUICEchoIntSerialTestCase(QUICEchoIntTestCase):
"""QUIC Echo Internal Serial Transfer Test Case"""
def test_quic_serial_int_transfer(self):
self.client("no-output", "mbytes", "2")
+class QUICEchoIntSerialBigTestCase(QUICEchoIntTestCase):
+ """QUIC Echo Internal Serial Transfer Big Test Case"""
+
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ def test_quic_serial_int_transfer_big(self):
+ self.server()
+ self.client("no-output", "gbytes", "5")
+ self.client("no-output", "gbytes", "5")
+ self.client("no-output", "gbytes", "5")
+ self.client("no-output", "gbytes", "5")
+ self.client("no-output", "gbytes", "5")
+
+
class QUICEchoIntMStreamTestCase(QUICEchoIntTestCase):
"""QUIC Echo Internal MultiStream Test Case"""
def test_quic_int_multistream_transfer(self):
self.client("nclients", "10", "mbytes", "1", "no-output")
+class QUICEchoIntMStreamBigTestCase(QUICEchoIntTestCase):
+ """QUIC Echo Internal MultiStream Big Test Case"""
+
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ def test_quic_int_multistream_transfer(self):
+ self.server()
+ self.client("nclients", "10", "gbytes", "5", "no-output")
+
+
class QUICEchoExtTestCase(QUICTestCase):
extra_vpp_punt_config = ["session", "{", "evt_qs_memfd_seg", "}"]
quic_setup = "default"
+ test_bytes = "test-bytes:assert"
+ app = "vpp_echo"
def setUp(self):
super(QUICEchoExtTestCase, self).setUp()
"uri",
self.uri,
"json",
- "fifo-size",
- "64",
- "test-bytes:assert",
- "socket-name",
- self.api_sock]
+ self.test_bytes,
+ "socket-name", self.api_sock,
+ "quic-setup", self.quic_setup]
self.server_echo_test_args = common_args + \
- ["server", "appns", "server", "quic-setup", self.quic_setup]
+ ["server", "appns", "server"] # use default fifo-size
self.client_echo_test_args = common_args + \
- ["client", "appns", "client", "quic-setup", self.quic_setup]
+ ["client", "appns", "client", "fifo-size", "4M"]
+ error = self.vapi.cli("quic set fifo-size 2M")
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
def server(self, *args):
_args = self.server_echo_test_args + list(args)
self.worker_server = QUICAppWorker(
self.build_dir,
- "vpp_echo",
+ self.app,
_args,
- self.logger)
+ self.logger,
+ 'server',
+ self)
self.worker_server.start()
self.sleep(self.pre_test_sleep)
def client(self, *args):
_args = self.client_echo_test_args + list(args)
- # self.client_echo_test_args += "use-svm-api"
self.worker_client = QUICAppWorker(
self.build_dir,
- "vpp_echo",
+ self.app,
_args,
- self.logger)
+ self.logger,
+ 'client',
+ self)
self.worker_client.start()
- self.worker_client.join(self.timeout)
- self.worker_server.join(self.timeout)
+ timeout = None if self.debug_all else self.timeout
+ self.worker_client.join(timeout)
+ self.worker_server.join(timeout)
self.sleep(self.post_test_sleep)
def validate_ext_test_results(self):
self.validate_ext_test_results()
+class QUICEchoExtTransferBigTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Big Test Case"""
+ test_bytes = ''
+
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ def test_quic_ext_transfer_big(self):
+ self.server("TX=0", "RX=10Gb")
+ self.client("TX=10Gb", "RX=0")
+ self.validate_ext_test_results()
+
+
class QUICEchoExtQcloseRxTestCase(QUICEchoExtTestCase):
"""QUIC Echo External Transfer Qclose Rx Test Case"""
self.validate_ext_test_results()
+class QUICEchoExtBigServerStreamTestCase(QUICEchoExtTestCase):
+ """QUIC Echo External Transfer Big Server Stream Test Case"""
+ quic_setup = "serverstream"
+ test_bytes = ''
+
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ def test_quic_ext_transfer_big_server_stream(self):
+ self.server("TX=10Gb", "RX=0")
+ self.client("TX=0", "RX=10Gb")
+ self.validate_ext_test_results()
+
+
class QUICEchoExtServerStreamQcloseRxTestCase(QUICEchoExtTestCase):
"""QUIC Echo External Transfer Server Stream Qclose Rx Test Case"""
quic_setup = "serverstream"
@unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_quic_ext_transfer_server_stream_multi_workers(self):
- self.server("nclients", "4/4", "TX=10Mb", "RX=0")
- self.client("nclients", "4/4", "TX=0", "RX=10Mb")
+ self.server("nclients", "4", "quic-streams", "4", "TX=10Mb", "RX=0")
+ self.client("nclients", "4", "quic-streams", "4", "TX=0", "RX=10Mb")
self.validate_ext_test_results()