import os
import subprocess
import signal
+from config import config
from framework import tag_fixme_vpp_workers
-from framework import VppTestCase, VppTestRunner, running_extended_tests, \
- Worker
+from framework import VppTestCase, VppTestRunner, Worker
from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
""" QUIC Test Application Worker """
process = None
- def __init__(self, build_dir, appname, executable_args, logger, role,
+ def __init__(self, appname, executable_args, logger, role,
testcase, env=None, *args, **kwargs):
if env is None:
env = {}
- app = "%s/bin/%s" % (build_dir, appname)
+ app = f"{config.vpp_build_dir}/vpp/bin/{appname}"
self.args = [app] + executable_args
self.role = role
self.wait_for_gdb = 'wait-for-gdb'
def setUp(self):
super(QUICTestCase, self).setUp()
- var = "VPP_BUILD_DIR"
- self.build_dir = os.getenv(var, None)
- if self.build_dir is None:
- raise Exception("Environment variable `%s' not set" % var)
- self.vppDebug = 'vpp_debug' in self.build_dir
+ self.vppDebug = 'vpp_debug' in config.vpp_build_dir
self.create_loopback_interfaces(2)
self.uri = "quic://%s/1234" % self.loop0.local_ip4
def server(self, *args):
_args = self.server_echo_test_args + list(args)
self.worker_server = QUICAppWorker(
- self.build_dir,
self.app,
_args,
self.logger,
def client(self, *args):
_args = self.client_echo_test_args + list(args)
self.worker_client = QUICAppWorker(
- self.build_dir,
self.app,
_args,
self.logger,
test_bytes = ''
timeout = 60
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_quic_ext_transfer_big(self):
"""QUIC external transfer, big stream"""
self.server("TX=0", "RX=2G")
class QUICEchoExtQcloseRxTestCase(QUICEchoExtTestCase):
"""QUIC Echo External Transfer Qclose Rx Test Case"""
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
@unittest.skip("testcase under development")
def test_quic_ext_qclose_rx(self):
"""QUIC external transfer, rx close"""
class QUICEchoExtQcloseTxTestCase(QUICEchoExtTestCase):
"""QUIC Echo External Transfer Qclose Tx Test Case"""
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
@unittest.skip("testcase under development")
def test_quic_ext_qclose_tx(self):
"""QUIC external transfer, tx close"""
class QUICEchoExtEarlyQcloseRxTestCase(QUICEchoExtTestCase):
"""QUIC Echo External Transfer Early Qclose Rx Test Case"""
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
@unittest.skip("testcase under development")
def test_quic_ext_early_qclose_rx(self):
"""QUIC external transfer, early rx close"""
class QUICEchoExtEarlyQcloseTxTestCase(QUICEchoExtTestCase):
"""QUIC Echo External Transfer Early Qclose Tx Test Case"""
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
@unittest.skip("testcase under development")
def test_quic_ext_early_qclose_tx(self):
"""QUIC external transfer, early tx close"""
class QUICEchoExtScloseRxTestCase(QUICEchoExtTestCase):
"""QUIC Echo External Transfer Sclose Rx Test Case"""
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
@unittest.skip("testcase under development")
def test_quic_ext_sclose_rx(self):
"""QUIC external transfer, rx stream close"""
class QUICEchoExtScloseTxTestCase(QUICEchoExtTestCase):
"""QUIC Echo External Transfer Sclose Tx Test Case"""
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
@unittest.skip("testcase under development")
def test_quic_ext_sclose_tx(self):
"""QUIC external transfer, tx stream close"""
class QUICEchoExtEarlyScloseRxTestCase(QUICEchoExtTestCase):
"""QUIC Echo External Transfer Early Sclose Rx Test Case"""
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
@unittest.skip("testcase under development")
def test_quic_ext_early_sclose_rx(self):
"""QUIC external transfer, early rx stream close"""
class QUICEchoExtEarlyScloseTxTestCase(QUICEchoExtTestCase):
"""QUIC Echo External Transfer Early Sclose Tx Test Case"""
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
@unittest.skip("testcase under development")
def test_quic_ext_early_sclose_tx(self):
"""QUIC external transfer, early tx stream close"""
test_bytes = ''
timeout = 60
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_quic_ext_transfer_server_stream_big(self):
"""QUIC external server transfer, big stream"""
self.server("TX=2G", "RX=0")
"""QUIC Echo External Transfer Server Stream Qclose Rx Test Case"""
quic_setup = "serverstream"
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
@unittest.skip("testcase under development")
def test_quic_ext_server_stream_qclose_rx(self):
"""QUIC external server transfer, rx close"""
"""QUIC Echo External Transfer Server Stream Qclose Tx Test Case"""
quic_setup = "serverstream"
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
@unittest.skip("testcase under development")
def test_quic_ext_server_stream_qclose_tx(self):
"""QUIC external server transfer, tx close"""
"""QUIC Echo External Transfer Server Stream Early Qclose Rx Test Case"""
quic_setup = "serverstream"
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
@unittest.skip("testcase under development")
def test_quic_ext_server_stream_early_qclose_rx(self):
"""QUIC external server transfer, early rx close"""
"""QUIC Echo External Transfer Server Stream Early Qclose Tx Test Case"""
quic_setup = "serverstream"
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
@unittest.skip("testcase under development")
def test_quic_ext_server_stream_early_qclose_tx(self):
"""QUIC external server transfer, early tx close"""
"""QUIC Echo External Transfer Server Stream Sclose Rx Test Case"""
quic_setup = "serverstream"
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
@unittest.skip("testcase under development")
def test_quic_ext_server_stream_sclose_rx(self):
"""QUIC external server transfer, rx stream close"""
"""QUIC Echo External Transfer Server Stream Sclose Tx Test Case"""
quic_setup = "serverstream"
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
@unittest.skip("testcase under development")
def test_quic_ext_server_stream_sclose_tx(self):
"""QUIC external server transfer, tx stream close"""
"""QUIC Echo External Transfer Server Stream Early Sclose Rx Test Case"""
quic_setup = "serverstream"
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
@unittest.skip("testcase under development")
def test_quic_ext_server_stream_early_sclose_rx(self):
"""QUIC external server transfer, early rx stream close"""
"""QUIC Echo Ext Transfer Server Stream Early Sclose Tx Test Case"""
quic_setup = "serverstream"
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
@unittest.skip("testcase under development")
def test_quic_ext_server_stream_early_sclose_tx(self):
"""QUIC external server transfer, early tx stream close"""
"""QUIC Echo External Transfer Server Stream MultiWorker Test Case"""
quic_setup = "serverstream"
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
@unittest.skip("testcase under development")
def test_quic_ext_transfer_server_stream_multi_workers(self):
"""QUIC external server transfer, multi-worker"""