""" VCL Test Application Worker """
def __init__(self, build_dir, appname, executable_args, logger, env=None,
- *args, **kwargs):
-
+ role=None, *args, **kwargs):
+ self.role = role
if env is None:
env = {}
vcl_lib_dir = "%s/vpp/lib" % build_dir
self.env = {'VCL_VPP_API_SOCKET': self.get_api_sock_path(),
'VCL_APP_SCOPE_LOCAL': "true"}
worker_server = VCLAppWorker(self.build_dir, server_app, server_args,
- self.logger, self.env)
+ self.logger, self.env, "server")
worker_server.start()
self.sleep(self.pre_test_sleep)
worker_client = VCLAppWorker(self.build_dir, client_app, client_args,
- self.logger, self.env)
+ self.logger, self.env, "client")
worker_client.start()
worker_client.join(self.timeout)
try:
'VCL_APP_NAMESPACE_SECRET': "1234"}
worker_server = VCLAppWorker(self.build_dir, server_app, server_args,
- self.logger, self.env)
+ self.logger, self.env, "server")
worker_server.start()
self.sleep(self.pre_test_sleep)
self.env.update({'VCL_APP_NAMESPACE_ID': "2",
'VCL_APP_NAMESPACE_SECRET': "5678"})
worker_client = VCLAppWorker(self.build_dir, client_app, client_args,
- self.logger, self.env)
+ self.logger, self.env, "client")
worker_client.start()
worker_client.join(self.timeout)
self.logger.debug(self.vapi.cli("show app mq"))
+class VCLThruHostStackDTLS(VCLTestCase):
+ """ VCL Thru Host Stack DTLS """
+
+ @classmethod
+ def setUpClass(cls):
+ super(VCLThruHostStackDTLS, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLThruHostStackDTLS, cls).tearDownClass()
+
+ def setUp(self):
+ super(VCLThruHostStackDTLS, self).setUp()
+
+ self.thru_host_stack_setup()
+ self.client_uni_dir_dtls_timeout = 20
+ self.server_dtls_args = ["-p", "dtls", self.server_port]
+ self.client_uni_dir_dtls_test_args = ["-N", "1000", "-U", "-X",
+ "-p", "dtls", "-T 1400",
+ self.loop0.local_ip4,
+ self.server_port]
+
+ def test_vcl_thru_host_stack_dtls_uni_dir(self):
+ """ run VCL thru host stack uni-directional DTLS test """
+
+ self.timeout = self.client_uni_dir_dtls_timeout
+ self.thru_host_stack_test("vcl_test_server", self.server_dtls_args,
+ "vcl_test_client",
+ self.client_uni_dir_dtls_test_args)
+
+ def tearDown(self):
+ self.thru_host_stack_tear_down()
+ super(VCLThruHostStackDTLS, self).tearDown()
+
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show app server"))
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
+
+class VCLThruHostStackQUIC(VCLTestCase):
+ """ VCL Thru Host Stack QUIC """
+
+ @classmethod
+ def setUpClass(cls):
+ cls.extra_vpp_plugin_config.append("plugin quic_plugin.so { enable }")
+ super(VCLThruHostStackQUIC, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLThruHostStackQUIC, cls).tearDownClass()
+
+ def setUp(self):
+ super(VCLThruHostStackQUIC, self).setUp()
+
+ self.thru_host_stack_setup()
+ self.client_uni_dir_quic_timeout = 20
+ self.server_quic_args = ["-p", "quic", self.server_port]
+ self.client_uni_dir_quic_test_args = ["-N", "1000", "-U", "-X",
+ "-p", "quic",
+ self.loop0.local_ip4,
+ self.server_port]
+
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ def test_vcl_thru_host_stack_quic_uni_dir(self):
+ """ run VCL thru host stack uni-directional QUIC test """
+
+ self.timeout = self.client_uni_dir_quic_timeout
+ self.thru_host_stack_test("vcl_test_server", self.server_quic_args,
+ "vcl_test_client",
+ self.client_uni_dir_quic_test_args)
+
+ def tearDown(self):
+ self.thru_host_stack_tear_down()
+ super(VCLThruHostStackQUIC, self).tearDown()
+
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show app server"))
+ self.logger.debug(self.vapi.cli("show session verbose 2"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
+
class VCLThruHostStackBidirNsock(VCLTestCase):
""" VCL Thru Host Stack Bidir Nsock """