* Copyright(c) 2025 Cisco Systems, Inc.
*/
+/*
+ * VCL CL UDP Test Client/Server with Multi-threading Support
+ *
+ * Usage:
+ * Server: vcl_test_cl_udp -s <server_ip> [-w <num_workers>]
+ * Client: vcl_test_cl_udp -c <server_ip> [-w <num_workers>]
+ *
+ * Options:
+ * -s <ip> Start as server bound to specified IP address
+ * -c <ip> Start as client connecting to specified IP address
+ * -w <num> Number of worker threads (default: 1)
+ */
+
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
+#include <pthread.h>
+#include <unistd.h>
+#include <getopt.h>
+#include <netinet/in.h>
+#include <unistd.h>
+#include <stdlib.h>
#include <vcl/vppcom.h>
#include <hs_apps/vcl/vcl_test.h>
struct sockaddr_storage clnt_addr;
};
uint16_t port;
+ int num_workers;
+ pthread_t *worker_threads;
+ int thread_id_counter;
+ volatile int msgs_received;
} vt_clu_main_t;
static vt_clu_main_t vt_clu_main;
+typedef struct vtclu_worker_args_
+{
+ vt_clu_main_t *vclum;
+ int worker_id;
+} vtclu_worker_args_t;
+
static void
vt_clu_parse_args (vt_clu_main_t *vclum, int argc, char **argv)
{
memset (vclum, 0, sizeof (*vclum));
vclum->port = VCL_TEST_SERVER_PORT;
+ vclum->num_workers = 1;
opterr = 0;
- while ((c = getopt (argc, argv, "s:c:")) != -1)
+ while ((c = getopt (argc, argv, "s:c:w:")) != -1)
switch (c)
{
case 's':
if (inet_pton (
AF_INET, optarg,
&((struct sockaddr_in *) &vclum->clnt_addr)->sin_addr) != 1)
- break;
+ vtwrn ("couldn't parse ipv4 addr %s", optarg);
+ break;
+ case 'w':
+ vclum->num_workers = atoi (optarg);
+ if (vclum->num_workers <= 0)
+ {
+ vtwrn ("invalid number of workers %s", optarg);
+ vclum->num_workers = 1;
+ }
+ break;
}
if (vclum->app_type == VT_CLU_TYPE_NONE)
vclum->endpt.is_ip4 = 1;
vclum->endpt.ip =
(uint8_t *) &((struct sockaddr_in *) &vclum->srvr_addr)->sin_addr;
- vclum->endpt.port = htons (vclum->endpt.port);
+ vclum->endpt.port = htons (vclum->port);
}
-int
-main (int argc, char **argv)
+static int
+vt_clu_test_done (vt_clu_main_t *vclum)
{
- vt_clu_main_t *vclum = &vt_clu_main;
+ return vclum->msgs_received >= vclum->num_workers;
+}
+
+__thread char ep_ip_str[INET_ADDRSTRLEN + 16];
+
+static char *
+vt_clu_ep_to_str (vppcom_endpt_t *ep)
+{
+ inet_ntop (AF_INET, ep->ip, ep_ip_str, INET_ADDRSTRLEN);
+ snprintf (ep_ip_str + strlen (ep_ip_str),
+ INET_ADDRSTRLEN - strlen (ep_ip_str), ":%d", ntohs (ep->port));
+ return ep_ip_str;
+}
+
+__thread jmp_buf sig_jmp_buf;
+
+static void
+vt_clu_sig_handler (int sig)
+{
+ longjmp (sig_jmp_buf, 1);
+}
+
+void
+vt_clu_catch_sig (void (*handler) (int))
+{
+ signal (SIGUSR1, handler);
+}
+
+void
+vt_clu_handle_sig (vt_clu_main_t *vclum, int worker_id)
+{
+ vtinf ("Worker %d interrupted", worker_id);
+ vclum->msgs_received = vclum->num_workers;
+}
+
+static void *
+vt_clu_server_worker (void *arg)
+{
+ vtclu_worker_args_t *args = (vtclu_worker_args_t *) arg;
+ vt_clu_main_t *vclum = args->vclum;
+ int worker_id = args->worker_id;
int rv, vcl_sh;
const int buflen = 64;
char buf[buflen];
-
struct sockaddr_in _addr;
vppcom_endpt_t rmt_ep = { .ip = (void *) &_addr };
- vt_clu_parse_args (vclum, argc, argv);
+ if (worker_id)
+ vppcom_worker_register ();
- rv = vppcom_app_create ("vcl_test_cl_udp");
- if (rv)
- vtfail ("vppcom_app_create()", rv);
+ vtinf ("Server worker %d starting", worker_id);
vcl_sh = vppcom_session_create (VPPCOM_PROTO_UDP, 0 /* is_nonblocking */);
if (vcl_sh < 0)
{
vterr ("vppcom_session_create()", vcl_sh);
- return vcl_sh;
+ return NULL;
}
- if (vclum->app_type == VT_CLU_TYPE_SERVER)
+ /* Bind to the same endpoint as main thread */
+ rv = vppcom_session_bind (vcl_sh, &vclum->endpt);
+ if (rv < 0)
{
- /* Listen is implicit */
- rv = vppcom_session_bind (vcl_sh, &vclum->endpt);
- if (rv < 0)
- {
- vterr ("vppcom_session_bind()", rv);
- return rv;
- }
+ vterr ("vppcom_session_bind()", rv);
+ return NULL;
+ }
+
+ vt_clu_catch_sig (vt_clu_sig_handler);
+ if (setjmp (sig_jmp_buf))
+ vt_clu_handle_sig (vclum, worker_id);
+ /* Server worker loop */
+ while (!vt_clu_test_done (vclum))
+ {
rv = vppcom_session_recvfrom (vcl_sh, buf, buflen, 0, &rmt_ep);
if (rv < 0)
{
- vterr ("vppcom_session_recvfrom()", rv);
- return rv;
+ vtwrn ("worker %d: recvfrom returned %d", worker_id, rv);
+ break;
}
buf[rv] = 0;
- vtinf ("Received message from client: %s", buf);
- char *msg = "hello cl udp client";
- int msg_len = strnlen (msg, buflen);
- memcpy (buf, msg, msg_len);
+ vtinf ("Worker %d received message from client %s: %s", worker_id,
+ vt_clu_ep_to_str (&rmt_ep), buf);
+ vt_atomic_add (&vclum->msgs_received, 1);
+
+ char response[buflen];
+ int msg_len =
+ snprintf (response, buflen, "hello from worker %d", worker_id);
+
/* send 2 times to be sure */
for (int i = 0; i < 2; i++)
{
- rv = vppcom_session_sendto (vcl_sh, buf, msg_len, 0, &rmt_ep);
+ rv = vppcom_session_sendto (vcl_sh, response, msg_len, 0, &rmt_ep);
if (rv < 0)
{
- vterr ("vppcom_session_sendto()", rv);
- return rv;
+ vtwrn ("worker %d: sendto returned %d", worker_id, rv);
+ break;
}
usleep (500);
}
}
- else if (vclum->app_type == VT_CLU_TYPE_CLIENT)
+
+ vppcom_session_close (vcl_sh);
+ vtinf ("Server worker %d exiting", worker_id);
+ return NULL;
+}
+
+static void *
+vt_clu_client_worker (void *arg)
+{
+ vtclu_worker_args_t *args = (vtclu_worker_args_t *) arg;
+ vt_clu_main_t *vclum = args->vclum;
+ int worker_id = args->worker_id;
+ int rv, vcl_sh;
+ const int buflen = 64;
+ char buf[buflen];
+ struct sockaddr_in _addr;
+ vppcom_endpt_t rmt_ep = { .ip = (void *) &_addr };
+
+ if (worker_id)
+ vppcom_worker_register ();
+
+ vtinf ("Client worker %d starting", worker_id);
+
+ vcl_sh = vppcom_session_create (VPPCOM_PROTO_UDP, 0 /* is_nonblocking */);
+ if (vcl_sh < 0)
{
- char *msg = "hello cl udp server";
- int msg_len = strnlen (msg, buflen);
- memcpy (buf, msg, msg_len);
+ vterr ("vppcom_session_create()", vcl_sh);
+ return NULL;
+ }
+
+ char message[buflen];
+ int msg_len =
+ snprintf (message, buflen, "hello from client worker %d", worker_id);
+
+ vt_clu_catch_sig (vt_clu_sig_handler);
+ if (setjmp (sig_jmp_buf))
+ vt_clu_handle_sig (vclum, worker_id);
+ while (!vt_clu_test_done (vclum))
+ {
/* send 3 times to be sure */
for (int i = 0; i < 3; i++)
{
- rv = vppcom_session_sendto (vcl_sh, buf, msg_len, 0, &vclum->endpt);
+ rv =
+ vppcom_session_sendto (vcl_sh, message, msg_len, 0, &vclum->endpt);
if (rv < 0)
{
- vterr ("vppcom_session_sendto()", rv);
- return rv;
+ vtwrn ("worker %d: sendto returned %d", worker_id, rv);
+ goto cleanup;
}
usleep (500);
}
rv = vppcom_session_recvfrom (vcl_sh, buf, buflen, 0, &rmt_ep);
if (rv < 0)
{
- vterr ("vppcom_session_recvfrom()", rv);
- return rv;
+ vtwrn ("worker %d: recvfrom returned %d", worker_id, rv);
+ goto cleanup;
}
buf[rv] = 0;
- vtinf ("Received message from server: %s", buf);
+
+ vtinf ("Worker %d received message from server %s: %s", worker_id,
+ vt_clu_ep_to_str (&rmt_ep), buf);
+
+ vt_atomic_add (&vclum->msgs_received, 1);
+ }
+
+cleanup:
+ vppcom_session_close (vcl_sh);
+ vtinf ("Client worker %d exiting", worker_id);
+ return NULL;
+}
+
+int
+main (int argc, char **argv)
+{
+ vt_clu_main_t *vclum = &vt_clu_main;
+ int rv;
+
+ vt_clu_parse_args (vclum, argc, argv);
+
+ rv = vppcom_app_create ("vcl_test_cl_udp");
+ if (rv)
+ vtfail ("vppcom_app_create()", rv);
+
+ vtinf ("Starting %s with %d worker(s)",
+ vclum->app_type == VT_CLU_TYPE_SERVER ? "server" : "client",
+ vclum->num_workers);
+
+ vclum->worker_threads = calloc (vclum->num_workers, sizeof (pthread_t));
+ vtclu_worker_args_t *worker_args =
+ calloc (vclum->num_workers, sizeof (vtclu_worker_args_t));
+
+ if (!vclum->worker_threads || !worker_args)
+ {
+ vterr ("Failed to allocate memory for worker threads", -1);
+ return -1;
}
+
+ void *(*worker_func) (void *) = (vclum->app_type == VT_CLU_TYPE_SERVER) ?
+ vt_clu_server_worker :
+ vt_clu_client_worker;
+
+ /* Create worker threads */
+ for (int i = 1; i < vclum->num_workers; i++)
+ {
+ worker_args[i].vclum = vclum;
+ worker_args[i].worker_id = i;
+
+ rv = pthread_create (&vclum->worker_threads[i], NULL, worker_func,
+ &worker_args[i]);
+ if (rv != 0)
+ {
+ vterr ("Failed to create worker thread", rv);
+ /* Clean up any threads that were created */
+ for (int j = 0; j < i; j++)
+ pthread_cancel (vclum->worker_threads[j]);
+ return rv;
+ }
+ }
+
+ /* First worker */
+ worker_args[0].vclum = vclum;
+ worker_args[0].worker_id = 0;
+ worker_func (worker_args);
+
+ /* Wait for all worker threads to complete */
+ while (!vt_clu_test_done (vclum))
+ ;
+
+ for (int i = 1; i < vclum->num_workers; i++)
+ {
+ pthread_kill (vclum->worker_threads[i], SIGUSR1);
+ pthread_join (vclum->worker_threads[i], NULL);
+ }
+
+ free (vclum->worker_threads);
+ free (worker_args);
+ vtinf ("All worker threads completed");
+
+ vppcom_app_destroy ();
+ return 0;
}
\ No newline at end of file
@classmethod
def setUpClass(cls):
+ cls.session_startup = ["poll-main", "use-app-socket-api"]
super(VCLThruHostStackCLUDPEcho, cls).setUpClass()
@classmethod
def setUp(self):
super(VCLThruHostStackCLUDPEcho, self).setUp()
+ self.sapi_server_sock = "1"
+ self.sapi_client_sock = "2"
self.thru_host_stack_setup()
self.pre_test_sleep = 2
self.timeout = 5
client_args,
)
+ def test_vcl_thru_host_stack_cl_udp_mt_echo(self):
+ """run VCL IPv4 thru host stack CL UDP MT echo test"""
+ server_args = ["-s", self.loop0.local_ip4, "-w", "2"]
+ client_args = ["-c", self.loop0.local_ip4, "-w", "2"]
+ self.thru_host_stack_test(
+ "vcl_test_cl_udp",
+ server_args,
+ "vcl_test_cl_udp",
+ client_args,
+ )
+
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show app server"))
+ self.logger.debug(self.vapi.cli("show session verbose"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
+
+ "hs_apps" in config.excluded_plugins, "Exclude tests requiring hs_apps plugin"
+)
+class VCLThruHostStackCLUDPBinds(VCLTestCase):
+ """VCL Thru Host Stack CL UDP Binds"""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.session_startup = ["poll-main", "use-app-socket-api"]
+ super(VCLThruHostStackCLUDPBinds, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLThruHostStackCLUDPBinds, cls).tearDownClass()
+
+ def setUp(self):
+ super(VCLThruHostStackCLUDPBinds, self).setUp()
+
+ self.sapi_server_sock = "default"
+ self.timeout = 5
+
+ self.vapi.session_enable_disable(is_enable=1)
+ self.create_loopback_interfaces(2)
+ for i in self.lo_interfaces:
+ i.admin_up()
+ i.config_ip4()
+
+ def tearDown(self):
+ for i in self.lo_interfaces:
+ i.unconfig_ip4()
+ i.admin_down()
+ i.remove_vpp_config()
+ super(VCLThruHostStackCLUDPBinds, self).tearDown()
+
+ def test_vcl_thru_host_stack_cl_udp_multiple_binds(self):
+ """run VCL IPv4 thru host stack CL UDP multiple binds test"""
+
+ # 2 CL UDP servers bound to the same port but different IPs
+ server1_args = ["-s", self.loop0.local_ip4, "-w", "2"]
+ server2_args = ["-s", self.loop1.local_ip4, "-w", "2"]
+
+ sapi_sock = "%s/app_ns_sockets/%s" % (self.tempdir, self.sapi_server_sock)
+ self.vcl_app_env = {
+ "VCL_APP_SCOPE_GLOBAL": "true",
+ "VCL_VPP_SAPI_SOCKET": sapi_sock,
+ }
+
+ worker_server1 = VCLAppWorker(
+ "vcl_test_cl_udp", server1_args, self.logger, self.vcl_app_env, "server1"
+ )
+ worker_server1.start()
+ self.sleep(0.5)
+
+ worker_server2 = VCLAppWorker(
+ "vcl_test_cl_udp", server2_args, self.logger, self.vcl_app_env, "server2"
+ )
+ worker_server2.start()
+ self.sleep(0.5)
+
+ session_output = self.vapi.cli("show session verbose")
+ self.logger.debug(session_output)
+ self.assertIn(self.loop0.local_ip4, session_output)
+ self.assertIn(self.loop1.local_ip4, session_output)
+ self.assertIn("[U]", session_output)
+ self.assertIn("LISTEN", session_output)
+
+ try:
+ worker_server1.process.send_signal(signal.SIGUSR1)
+ worker_server2.process.send_signal(signal.SIGUSR1)
+ except (AttributeError, OSError) as e:
+ self.logger.warning(f"Failed to send SIGUSR1: {e}")
+
+ self.sleep(0.5)
+
+ worker_server2.join(self.timeout)
+
def show_commands_at_teardown(self):
self.logger.debug(self.vapi.cli("show app server"))
self.logger.debug(self.vapi.cli("show session verbose"))