hsa: mt support for cl udp test 78/43178/6
authorFlorin Coras <[email protected]>
Fri, 13 Jun 2025 21:26:54 +0000 (17:26 -0400)
committerDave Barach <[email protected]>
Sat, 14 Jun 2025 22:23:13 +0000 (22:23 +0000)
Also add tests for
- multi worker cl connects/binds
- 2 multi worker servers binding the same port

Type: improvement

Change-Id: I222756b7664ffdba83cb69bb0c730526dad3065c
Signed-off-by: Florin Coras <[email protected]>
src/plugins/hs_apps/vcl/vcl_test_cl_udp.c
test/asf/test_vcl.py

index 066635e..a0df4c2 100644 (file)
@@ -2,9 +2,28 @@
  * Copyright(c) 2025 Cisco Systems, Inc.
  */
 
+/*
+ * VCL CL UDP Test Client/Server with Multi-threading Support
+ *
+ * Usage:
+ *   Server: vcl_test_cl_udp -s <server_ip> [-w <num_workers>]
+ *   Client: vcl_test_cl_udp -c <server_ip> [-w <num_workers>]
+ *
+ * Options:
+ *   -s <ip>    Start as server bound to specified IP address
+ *   -c <ip>    Start as client connecting to specified IP address
+ *   -w <num>   Number of worker threads (default: 1)
+ */
+
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <string.h>
+#include <pthread.h>
+#include <unistd.h>
+#include <getopt.h>
+#include <netinet/in.h>
+#include <unistd.h>
+#include <stdlib.h>
 #include <vcl/vppcom.h>
 #include <hs_apps/vcl/vcl_test.h>
 
@@ -25,10 +44,20 @@ typedef struct vtclu_main_
     struct sockaddr_storage clnt_addr;
   };
   uint16_t port;
+  int num_workers;
+  pthread_t *worker_threads;
+  int thread_id_counter;
+  volatile int msgs_received;
 } vt_clu_main_t;
 
 static vt_clu_main_t vt_clu_main;
 
+typedef struct vtclu_worker_args_
+{
+  vt_clu_main_t *vclum;
+  int worker_id;
+} vtclu_worker_args_t;
+
 static void
 vt_clu_parse_args (vt_clu_main_t *vclum, int argc, char **argv)
 {
@@ -36,9 +65,10 @@ vt_clu_parse_args (vt_clu_main_t *vclum, int argc, char **argv)
 
   memset (vclum, 0, sizeof (*vclum));
   vclum->port = VCL_TEST_SERVER_PORT;
+  vclum->num_workers = 1;
 
   opterr = 0;
-  while ((c = getopt (argc, argv, "s:c:")) != -1)
+  while ((c = getopt (argc, argv, "s:c:w:")) != -1)
     switch (c)
       {
       case 's':
@@ -53,7 +83,16 @@ vt_clu_parse_args (vt_clu_main_t *vclum, int argc, char **argv)
        if (inet_pton (
              AF_INET, optarg,
              &((struct sockaddr_in *) &vclum->clnt_addr)->sin_addr) != 1)
-         break;
+         vtwrn ("couldn't parse ipv4 addr %s", optarg);
+       break;
+      case 'w':
+       vclum->num_workers = atoi (optarg);
+       if (vclum->num_workers <= 0)
+         {
+           vtwrn ("invalid number of workers %s", optarg);
+           vclum->num_workers = 1;
+         }
+       break;
       }
 
   if (vclum->app_type == VT_CLU_TYPE_NONE)
@@ -65,81 +104,163 @@ vt_clu_parse_args (vt_clu_main_t *vclum, int argc, char **argv)
   vclum->endpt.is_ip4 = 1;
   vclum->endpt.ip =
     (uint8_t *) &((struct sockaddr_in *) &vclum->srvr_addr)->sin_addr;
-  vclum->endpt.port = htons (vclum->endpt.port);
+  vclum->endpt.port = htons (vclum->port);
 }
 
-int
-main (int argc, char **argv)
+static int
+vt_clu_test_done (vt_clu_main_t *vclum)
 {
-  vt_clu_main_t *vclum = &vt_clu_main;
+  return vclum->msgs_received >= vclum->num_workers;
+}
+
+__thread char ep_ip_str[INET_ADDRSTRLEN + 16];
+
+static char *
+vt_clu_ep_to_str (vppcom_endpt_t *ep)
+{
+  inet_ntop (AF_INET, ep->ip, ep_ip_str, INET_ADDRSTRLEN);
+  snprintf (ep_ip_str + strlen (ep_ip_str),
+           INET_ADDRSTRLEN - strlen (ep_ip_str), ":%d", ntohs (ep->port));
+  return ep_ip_str;
+}
+
+__thread jmp_buf sig_jmp_buf;
+
+static void
+vt_clu_sig_handler (int sig)
+{
+  longjmp (sig_jmp_buf, 1);
+}
+
+void
+vt_clu_catch_sig (void (*handler) (int))
+{
+  signal (SIGUSR1, handler);
+}
+
+void
+vt_clu_handle_sig (vt_clu_main_t *vclum, int worker_id)
+{
+  vtinf ("Worker %d interrupted", worker_id);
+  vclum->msgs_received = vclum->num_workers;
+}
+
+static void *
+vt_clu_server_worker (void *arg)
+{
+  vtclu_worker_args_t *args = (vtclu_worker_args_t *) arg;
+  vt_clu_main_t *vclum = args->vclum;
+  int worker_id = args->worker_id;
   int rv, vcl_sh;
   const int buflen = 64;
   char buf[buflen];
-
   struct sockaddr_in _addr;
   vppcom_endpt_t rmt_ep = { .ip = (void *) &_addr };
 
-  vt_clu_parse_args (vclum, argc, argv);
+  if (worker_id)
+    vppcom_worker_register ();
 
-  rv = vppcom_app_create ("vcl_test_cl_udp");
-  if (rv)
-    vtfail ("vppcom_app_create()", rv);
+  vtinf ("Server worker %d starting", worker_id);
 
   vcl_sh = vppcom_session_create (VPPCOM_PROTO_UDP, 0 /* is_nonblocking */);
   if (vcl_sh < 0)
     {
       vterr ("vppcom_session_create()", vcl_sh);
-      return vcl_sh;
+      return NULL;
     }
 
-  if (vclum->app_type == VT_CLU_TYPE_SERVER)
+  /* Bind to the same endpoint as main thread */
+  rv = vppcom_session_bind (vcl_sh, &vclum->endpt);
+  if (rv < 0)
     {
-      /* Listen is implicit */
-      rv = vppcom_session_bind (vcl_sh, &vclum->endpt);
-      if (rv < 0)
-       {
-         vterr ("vppcom_session_bind()", rv);
-         return rv;
-       }
+      vterr ("vppcom_session_bind()", rv);
+      return NULL;
+    }
+
+  vt_clu_catch_sig (vt_clu_sig_handler);
+  if (setjmp (sig_jmp_buf))
+    vt_clu_handle_sig (vclum, worker_id);
 
+  /* Server worker loop */
+  while (!vt_clu_test_done (vclum))
+    {
       rv = vppcom_session_recvfrom (vcl_sh, buf, buflen, 0, &rmt_ep);
       if (rv < 0)
        {
-         vterr ("vppcom_session_recvfrom()", rv);
-         return rv;
+         vtwrn ("worker %d: recvfrom returned %d", worker_id, rv);
+         break;
        }
       buf[rv] = 0;
-      vtinf ("Received message from client: %s", buf);
 
-      char *msg = "hello cl udp client";
-      int msg_len = strnlen (msg, buflen);
-      memcpy (buf, msg, msg_len);
+      vtinf ("Worker %d received message from client %s: %s", worker_id,
+            vt_clu_ep_to_str (&rmt_ep), buf);
+      vt_atomic_add (&vclum->msgs_received, 1);
+
+      char response[buflen];
+      int msg_len =
+       snprintf (response, buflen, "hello from worker %d", worker_id);
+
       /* send 2 times to be sure */
       for (int i = 0; i < 2; i++)
        {
-         rv = vppcom_session_sendto (vcl_sh, buf, msg_len, 0, &rmt_ep);
+         rv = vppcom_session_sendto (vcl_sh, response, msg_len, 0, &rmt_ep);
          if (rv < 0)
            {
-             vterr ("vppcom_session_sendto()", rv);
-             return rv;
+             vtwrn ("worker %d: sendto returned %d", worker_id, rv);
+             break;
            }
          usleep (500);
        }
     }
-  else if (vclum->app_type == VT_CLU_TYPE_CLIENT)
+
+  vppcom_session_close (vcl_sh);
+  vtinf ("Server worker %d exiting", worker_id);
+  return NULL;
+}
+
+static void *
+vt_clu_client_worker (void *arg)
+{
+  vtclu_worker_args_t *args = (vtclu_worker_args_t *) arg;
+  vt_clu_main_t *vclum = args->vclum;
+  int worker_id = args->worker_id;
+  int rv, vcl_sh;
+  const int buflen = 64;
+  char buf[buflen];
+  struct sockaddr_in _addr;
+  vppcom_endpt_t rmt_ep = { .ip = (void *) &_addr };
+
+  if (worker_id)
+    vppcom_worker_register ();
+
+  vtinf ("Client worker %d starting", worker_id);
+
+  vcl_sh = vppcom_session_create (VPPCOM_PROTO_UDP, 0 /* is_nonblocking */);
+  if (vcl_sh < 0)
     {
-      char *msg = "hello cl udp server";
-      int msg_len = strnlen (msg, buflen);
-      memcpy (buf, msg, msg_len);
+      vterr ("vppcom_session_create()", vcl_sh);
+      return NULL;
+    }
+
+  char message[buflen];
+  int msg_len =
+    snprintf (message, buflen, "hello from client worker %d", worker_id);
+
+  vt_clu_catch_sig (vt_clu_sig_handler);
+  if (setjmp (sig_jmp_buf))
+    vt_clu_handle_sig (vclum, worker_id);
 
+  while (!vt_clu_test_done (vclum))
+    {
       /* send 3 times to be sure */
       for (int i = 0; i < 3; i++)
        {
-         rv = vppcom_session_sendto (vcl_sh, buf, msg_len, 0, &vclum->endpt);
+         rv =
+           vppcom_session_sendto (vcl_sh, message, msg_len, 0, &vclum->endpt);
          if (rv < 0)
            {
-             vterr ("vppcom_session_sendto()", rv);
-             return rv;
+             vtwrn ("worker %d: sendto returned %d", worker_id, rv);
+             goto cleanup;
            }
          usleep (500);
        }
@@ -147,10 +268,90 @@ main (int argc, char **argv)
       rv = vppcom_session_recvfrom (vcl_sh, buf, buflen, 0, &rmt_ep);
       if (rv < 0)
        {
-         vterr ("vppcom_session_recvfrom()", rv);
-         return rv;
+         vtwrn ("worker %d: recvfrom returned %d", worker_id, rv);
+         goto cleanup;
        }
       buf[rv] = 0;
-      vtinf ("Received message from server: %s", buf);
+
+      vtinf ("Worker %d received message from server %s: %s", worker_id,
+            vt_clu_ep_to_str (&rmt_ep), buf);
+
+      vt_atomic_add (&vclum->msgs_received, 1);
+    }
+
+cleanup:
+  vppcom_session_close (vcl_sh);
+  vtinf ("Client worker %d exiting", worker_id);
+  return NULL;
+}
+
+int
+main (int argc, char **argv)
+{
+  vt_clu_main_t *vclum = &vt_clu_main;
+  int rv;
+
+  vt_clu_parse_args (vclum, argc, argv);
+
+  rv = vppcom_app_create ("vcl_test_cl_udp");
+  if (rv)
+    vtfail ("vppcom_app_create()", rv);
+
+  vtinf ("Starting %s with %d worker(s)",
+        vclum->app_type == VT_CLU_TYPE_SERVER ? "server" : "client",
+        vclum->num_workers);
+
+  vclum->worker_threads = calloc (vclum->num_workers, sizeof (pthread_t));
+  vtclu_worker_args_t *worker_args =
+    calloc (vclum->num_workers, sizeof (vtclu_worker_args_t));
+
+  if (!vclum->worker_threads || !worker_args)
+    {
+      vterr ("Failed to allocate memory for worker threads", -1);
+      return -1;
     }
+
+  void *(*worker_func) (void *) = (vclum->app_type == VT_CLU_TYPE_SERVER) ?
+                                   vt_clu_server_worker :
+                                   vt_clu_client_worker;
+
+  /* Create worker threads */
+  for (int i = 1; i < vclum->num_workers; i++)
+    {
+      worker_args[i].vclum = vclum;
+      worker_args[i].worker_id = i;
+
+      rv = pthread_create (&vclum->worker_threads[i], NULL, worker_func,
+                          &worker_args[i]);
+      if (rv != 0)
+       {
+         vterr ("Failed to create worker thread", rv);
+         /* Clean up any threads that were created */
+         for (int j = 0; j < i; j++)
+           pthread_cancel (vclum->worker_threads[j]);
+         return rv;
+       }
+    }
+
+  /* First worker */
+  worker_args[0].vclum = vclum;
+  worker_args[0].worker_id = 0;
+  worker_func (worker_args);
+
+  /* Wait for all worker threads to complete */
+  while (!vt_clu_test_done (vclum))
+    ;
+
+  for (int i = 1; i < vclum->num_workers; i++)
+    {
+      pthread_kill (vclum->worker_threads[i], SIGUSR1);
+      pthread_join (vclum->worker_threads[i], NULL);
+    }
+
+  free (vclum->worker_threads);
+  free (worker_args);
+  vtinf ("All worker threads completed");
+
+  vppcom_app_destroy ();
+  return 0;
 }
\ No newline at end of file
index 6044f76..76a368f 100644 (file)
@@ -569,6 +569,7 @@ class VCLThruHostStackCLUDPEcho(VCLTestCase):
 
     @classmethod
     def setUpClass(cls):
+        cls.session_startup = ["poll-main", "use-app-socket-api"]
         super(VCLThruHostStackCLUDPEcho, cls).setUpClass()
 
     @classmethod
@@ -578,6 +579,8 @@ class VCLThruHostStackCLUDPEcho(VCLTestCase):
     def setUp(self):
         super(VCLThruHostStackCLUDPEcho, self).setUp()
 
+        self.sapi_server_sock = "1"
+        self.sapi_client_sock = "2"
         self.thru_host_stack_setup()
         self.pre_test_sleep = 2
         self.timeout = 5
@@ -597,6 +600,99 @@ class VCLThruHostStackCLUDPEcho(VCLTestCase):
             client_args,
         )
 
+    def test_vcl_thru_host_stack_cl_udp_mt_echo(self):
+        """run VCL IPv4 thru host stack CL UDP MT echo test"""
+        server_args = ["-s", self.loop0.local_ip4, "-w", "2"]
+        client_args = ["-c", self.loop0.local_ip4, "-w", "2"]
+        self.thru_host_stack_test(
+            "vcl_test_cl_udp",
+            server_args,
+            "vcl_test_cl_udp",
+            client_args,
+        )
+
+    def show_commands_at_teardown(self):
+        self.logger.debug(self.vapi.cli("show app server"))
+        self.logger.debug(self.vapi.cli("show session verbose"))
+        self.logger.debug(self.vapi.cli("show app mq"))
+
+
+    "hs_apps" in config.excluded_plugins, "Exclude tests requiring hs_apps plugin"
+)
+class VCLThruHostStackCLUDPBinds(VCLTestCase):
+    """VCL Thru Host Stack CL UDP Binds"""
+
+    @classmethod
+    def setUpClass(cls):
+        cls.session_startup = ["poll-main", "use-app-socket-api"]
+        super(VCLThruHostStackCLUDPBinds, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(VCLThruHostStackCLUDPBinds, cls).tearDownClass()
+
+    def setUp(self):
+        super(VCLThruHostStackCLUDPBinds, self).setUp()
+
+        self.sapi_server_sock = "default"
+        self.timeout = 5
+
+        self.vapi.session_enable_disable(is_enable=1)
+        self.create_loopback_interfaces(2)
+        for i in self.lo_interfaces:
+            i.admin_up()
+            i.config_ip4()
+
+    def tearDown(self):
+        for i in self.lo_interfaces:
+            i.unconfig_ip4()
+            i.admin_down()
+            i.remove_vpp_config()
+        super(VCLThruHostStackCLUDPBinds, self).tearDown()
+
+    def test_vcl_thru_host_stack_cl_udp_multiple_binds(self):
+        """run VCL IPv4 thru host stack CL UDP multiple binds test"""
+
+        # 2 CL UDP servers bound to the same port but different IPs
+        server1_args = ["-s", self.loop0.local_ip4, "-w", "2"]
+        server2_args = ["-s", self.loop1.local_ip4, "-w", "2"]
+
+        sapi_sock = "%s/app_ns_sockets/%s" % (self.tempdir, self.sapi_server_sock)
+        self.vcl_app_env = {
+            "VCL_APP_SCOPE_GLOBAL": "true",
+            "VCL_VPP_SAPI_SOCKET": sapi_sock,
+        }
+
+        worker_server1 = VCLAppWorker(
+            "vcl_test_cl_udp", server1_args, self.logger, self.vcl_app_env, "server1"
+        )
+        worker_server1.start()
+        self.sleep(0.5)
+
+        worker_server2 = VCLAppWorker(
+            "vcl_test_cl_udp", server2_args, self.logger, self.vcl_app_env, "server2"
+        )
+        worker_server2.start()
+        self.sleep(0.5)
+
+        session_output = self.vapi.cli("show session verbose")
+        self.logger.debug(session_output)
+        self.assertIn(self.loop0.local_ip4, session_output)
+        self.assertIn(self.loop1.local_ip4, session_output)
+        self.assertIn("[U]", session_output)
+        self.assertIn("LISTEN", session_output)
+
+        try:
+            worker_server1.process.send_signal(signal.SIGUSR1)
+            worker_server2.process.send_signal(signal.SIGUSR1)
+        except (AttributeError, OSError) as e:
+            self.logger.warning(f"Failed to send SIGUSR1: {e}")
+
+        self.sleep(0.5)
+
+        worker_server2.join(self.timeout)
+
     def show_commands_at_teardown(self):
         self.logger.debug(self.vapi.cli("show app server"))
         self.logger.debug(self.vapi.cli("show session verbose"))