From: Florin Coras Date: Fri, 13 Jun 2025 21:26:54 +0000 (-0400) Subject: hsa: mt support for cl udp test X-Git-Url: https://gerrit.fd.io/r/gitweb?a=commitdiff_plain;h=40e1ff601501649d64789d80b2a1c7652b325249;p=vpp.git hsa: mt support for cl udp test Also add tests for - multi worker cl connects/binds - 2 multi worker servers binding the same port Type: improvement Change-Id: I222756b7664ffdba83cb69bb0c730526dad3065c Signed-off-by: Florin Coras --- diff --git a/src/plugins/hs_apps/vcl/vcl_test_cl_udp.c b/src/plugins/hs_apps/vcl/vcl_test_cl_udp.c index 066635e3d9b..a0df4c228c0 100644 --- a/src/plugins/hs_apps/vcl/vcl_test_cl_udp.c +++ b/src/plugins/hs_apps/vcl/vcl_test_cl_udp.c @@ -2,9 +2,28 @@ * Copyright(c) 2025 Cisco Systems, Inc. */ +/* + * VCL CL UDP Test Client/Server with Multi-threading Support + * + * Usage: + * Server: vcl_test_cl_udp -s [-w ] + * Client: vcl_test_cl_udp -c [-w ] + * + * Options: + * -s Start as server bound to specified IP address + * -c Start as client connecting to specified IP address + * -w Number of worker threads (default: 1) + */ + #include #include #include +#include +#include +#include +#include +#include +#include #include #include @@ -25,10 +44,20 @@ typedef struct vtclu_main_ struct sockaddr_storage clnt_addr; }; uint16_t port; + int num_workers; + pthread_t *worker_threads; + int thread_id_counter; + volatile int msgs_received; } vt_clu_main_t; static vt_clu_main_t vt_clu_main; +typedef struct vtclu_worker_args_ +{ + vt_clu_main_t *vclum; + int worker_id; +} vtclu_worker_args_t; + static void vt_clu_parse_args (vt_clu_main_t *vclum, int argc, char **argv) { @@ -36,9 +65,10 @@ vt_clu_parse_args (vt_clu_main_t *vclum, int argc, char **argv) memset (vclum, 0, sizeof (*vclum)); vclum->port = VCL_TEST_SERVER_PORT; + vclum->num_workers = 1; opterr = 0; - while ((c = getopt (argc, argv, "s:c:")) != -1) + while ((c = getopt (argc, argv, "s:c:w:")) != -1) switch (c) { case 's': @@ -53,7 +83,16 @@ vt_clu_parse_args (vt_clu_main_t *vclum, int argc, char **argv) if (inet_pton ( AF_INET, optarg, &((struct sockaddr_in *) &vclum->clnt_addr)->sin_addr) != 1) - break; + vtwrn ("couldn't parse ipv4 addr %s", optarg); + break; + case 'w': + vclum->num_workers = atoi (optarg); + if (vclum->num_workers <= 0) + { + vtwrn ("invalid number of workers %s", optarg); + vclum->num_workers = 1; + } + break; } if (vclum->app_type == VT_CLU_TYPE_NONE) @@ -65,81 +104,163 @@ vt_clu_parse_args (vt_clu_main_t *vclum, int argc, char **argv) vclum->endpt.is_ip4 = 1; vclum->endpt.ip = (uint8_t *) &((struct sockaddr_in *) &vclum->srvr_addr)->sin_addr; - vclum->endpt.port = htons (vclum->endpt.port); + vclum->endpt.port = htons (vclum->port); } -int -main (int argc, char **argv) +static int +vt_clu_test_done (vt_clu_main_t *vclum) { - vt_clu_main_t *vclum = &vt_clu_main; + return vclum->msgs_received >= vclum->num_workers; +} + +__thread char ep_ip_str[INET_ADDRSTRLEN + 16]; + +static char * +vt_clu_ep_to_str (vppcom_endpt_t *ep) +{ + inet_ntop (AF_INET, ep->ip, ep_ip_str, INET_ADDRSTRLEN); + snprintf (ep_ip_str + strlen (ep_ip_str), + INET_ADDRSTRLEN - strlen (ep_ip_str), ":%d", ntohs (ep->port)); + return ep_ip_str; +} + +__thread jmp_buf sig_jmp_buf; + +static void +vt_clu_sig_handler (int sig) +{ + longjmp (sig_jmp_buf, 1); +} + +void +vt_clu_catch_sig (void (*handler) (int)) +{ + signal (SIGUSR1, handler); +} + +void +vt_clu_handle_sig (vt_clu_main_t *vclum, int worker_id) +{ + vtinf ("Worker %d interrupted", worker_id); + vclum->msgs_received = vclum->num_workers; +} + +static void * +vt_clu_server_worker (void *arg) +{ + vtclu_worker_args_t *args = (vtclu_worker_args_t *) arg; + vt_clu_main_t *vclum = args->vclum; + int worker_id = args->worker_id; int rv, vcl_sh; const int buflen = 64; char buf[buflen]; - struct sockaddr_in _addr; vppcom_endpt_t rmt_ep = { .ip = (void *) &_addr }; - vt_clu_parse_args (vclum, argc, argv); + if (worker_id) + vppcom_worker_register (); - rv = vppcom_app_create ("vcl_test_cl_udp"); - if (rv) - vtfail ("vppcom_app_create()", rv); + vtinf ("Server worker %d starting", worker_id); vcl_sh = vppcom_session_create (VPPCOM_PROTO_UDP, 0 /* is_nonblocking */); if (vcl_sh < 0) { vterr ("vppcom_session_create()", vcl_sh); - return vcl_sh; + return NULL; } - if (vclum->app_type == VT_CLU_TYPE_SERVER) + /* Bind to the same endpoint as main thread */ + rv = vppcom_session_bind (vcl_sh, &vclum->endpt); + if (rv < 0) { - /* Listen is implicit */ - rv = vppcom_session_bind (vcl_sh, &vclum->endpt); - if (rv < 0) - { - vterr ("vppcom_session_bind()", rv); - return rv; - } + vterr ("vppcom_session_bind()", rv); + return NULL; + } + + vt_clu_catch_sig (vt_clu_sig_handler); + if (setjmp (sig_jmp_buf)) + vt_clu_handle_sig (vclum, worker_id); + /* Server worker loop */ + while (!vt_clu_test_done (vclum)) + { rv = vppcom_session_recvfrom (vcl_sh, buf, buflen, 0, &rmt_ep); if (rv < 0) { - vterr ("vppcom_session_recvfrom()", rv); - return rv; + vtwrn ("worker %d: recvfrom returned %d", worker_id, rv); + break; } buf[rv] = 0; - vtinf ("Received message from client: %s", buf); - char *msg = "hello cl udp client"; - int msg_len = strnlen (msg, buflen); - memcpy (buf, msg, msg_len); + vtinf ("Worker %d received message from client %s: %s", worker_id, + vt_clu_ep_to_str (&rmt_ep), buf); + vt_atomic_add (&vclum->msgs_received, 1); + + char response[buflen]; + int msg_len = + snprintf (response, buflen, "hello from worker %d", worker_id); + /* send 2 times to be sure */ for (int i = 0; i < 2; i++) { - rv = vppcom_session_sendto (vcl_sh, buf, msg_len, 0, &rmt_ep); + rv = vppcom_session_sendto (vcl_sh, response, msg_len, 0, &rmt_ep); if (rv < 0) { - vterr ("vppcom_session_sendto()", rv); - return rv; + vtwrn ("worker %d: sendto returned %d", worker_id, rv); + break; } usleep (500); } } - else if (vclum->app_type == VT_CLU_TYPE_CLIENT) + + vppcom_session_close (vcl_sh); + vtinf ("Server worker %d exiting", worker_id); + return NULL; +} + +static void * +vt_clu_client_worker (void *arg) +{ + vtclu_worker_args_t *args = (vtclu_worker_args_t *) arg; + vt_clu_main_t *vclum = args->vclum; + int worker_id = args->worker_id; + int rv, vcl_sh; + const int buflen = 64; + char buf[buflen]; + struct sockaddr_in _addr; + vppcom_endpt_t rmt_ep = { .ip = (void *) &_addr }; + + if (worker_id) + vppcom_worker_register (); + + vtinf ("Client worker %d starting", worker_id); + + vcl_sh = vppcom_session_create (VPPCOM_PROTO_UDP, 0 /* is_nonblocking */); + if (vcl_sh < 0) { - char *msg = "hello cl udp server"; - int msg_len = strnlen (msg, buflen); - memcpy (buf, msg, msg_len); + vterr ("vppcom_session_create()", vcl_sh); + return NULL; + } + + char message[buflen]; + int msg_len = + snprintf (message, buflen, "hello from client worker %d", worker_id); + + vt_clu_catch_sig (vt_clu_sig_handler); + if (setjmp (sig_jmp_buf)) + vt_clu_handle_sig (vclum, worker_id); + while (!vt_clu_test_done (vclum)) + { /* send 3 times to be sure */ for (int i = 0; i < 3; i++) { - rv = vppcom_session_sendto (vcl_sh, buf, msg_len, 0, &vclum->endpt); + rv = + vppcom_session_sendto (vcl_sh, message, msg_len, 0, &vclum->endpt); if (rv < 0) { - vterr ("vppcom_session_sendto()", rv); - return rv; + vtwrn ("worker %d: sendto returned %d", worker_id, rv); + goto cleanup; } usleep (500); } @@ -147,10 +268,90 @@ main (int argc, char **argv) rv = vppcom_session_recvfrom (vcl_sh, buf, buflen, 0, &rmt_ep); if (rv < 0) { - vterr ("vppcom_session_recvfrom()", rv); - return rv; + vtwrn ("worker %d: recvfrom returned %d", worker_id, rv); + goto cleanup; } buf[rv] = 0; - vtinf ("Received message from server: %s", buf); + + vtinf ("Worker %d received message from server %s: %s", worker_id, + vt_clu_ep_to_str (&rmt_ep), buf); + + vt_atomic_add (&vclum->msgs_received, 1); + } + +cleanup: + vppcom_session_close (vcl_sh); + vtinf ("Client worker %d exiting", worker_id); + return NULL; +} + +int +main (int argc, char **argv) +{ + vt_clu_main_t *vclum = &vt_clu_main; + int rv; + + vt_clu_parse_args (vclum, argc, argv); + + rv = vppcom_app_create ("vcl_test_cl_udp"); + if (rv) + vtfail ("vppcom_app_create()", rv); + + vtinf ("Starting %s with %d worker(s)", + vclum->app_type == VT_CLU_TYPE_SERVER ? "server" : "client", + vclum->num_workers); + + vclum->worker_threads = calloc (vclum->num_workers, sizeof (pthread_t)); + vtclu_worker_args_t *worker_args = + calloc (vclum->num_workers, sizeof (vtclu_worker_args_t)); + + if (!vclum->worker_threads || !worker_args) + { + vterr ("Failed to allocate memory for worker threads", -1); + return -1; } + + void *(*worker_func) (void *) = (vclum->app_type == VT_CLU_TYPE_SERVER) ? + vt_clu_server_worker : + vt_clu_client_worker; + + /* Create worker threads */ + for (int i = 1; i < vclum->num_workers; i++) + { + worker_args[i].vclum = vclum; + worker_args[i].worker_id = i; + + rv = pthread_create (&vclum->worker_threads[i], NULL, worker_func, + &worker_args[i]); + if (rv != 0) + { + vterr ("Failed to create worker thread", rv); + /* Clean up any threads that were created */ + for (int j = 0; j < i; j++) + pthread_cancel (vclum->worker_threads[j]); + return rv; + } + } + + /* First worker */ + worker_args[0].vclum = vclum; + worker_args[0].worker_id = 0; + worker_func (worker_args); + + /* Wait for all worker threads to complete */ + while (!vt_clu_test_done (vclum)) + ; + + for (int i = 1; i < vclum->num_workers; i++) + { + pthread_kill (vclum->worker_threads[i], SIGUSR1); + pthread_join (vclum->worker_threads[i], NULL); + } + + free (vclum->worker_threads); + free (worker_args); + vtinf ("All worker threads completed"); + + vppcom_app_destroy (); + return 0; } \ No newline at end of file diff --git a/test/asf/test_vcl.py b/test/asf/test_vcl.py index 6044f76f831..76a368f28d2 100644 --- a/test/asf/test_vcl.py +++ b/test/asf/test_vcl.py @@ -569,6 +569,7 @@ class VCLThruHostStackCLUDPEcho(VCLTestCase): @classmethod def setUpClass(cls): + cls.session_startup = ["poll-main", "use-app-socket-api"] super(VCLThruHostStackCLUDPEcho, cls).setUpClass() @classmethod @@ -578,6 +579,8 @@ class VCLThruHostStackCLUDPEcho(VCLTestCase): def setUp(self): super(VCLThruHostStackCLUDPEcho, self).setUp() + self.sapi_server_sock = "1" + self.sapi_client_sock = "2" self.thru_host_stack_setup() self.pre_test_sleep = 2 self.timeout = 5 @@ -597,6 +600,99 @@ class VCLThruHostStackCLUDPEcho(VCLTestCase): client_args, ) + def test_vcl_thru_host_stack_cl_udp_mt_echo(self): + """run VCL IPv4 thru host stack CL UDP MT echo test""" + server_args = ["-s", self.loop0.local_ip4, "-w", "2"] + client_args = ["-c", self.loop0.local_ip4, "-w", "2"] + self.thru_host_stack_test( + "vcl_test_cl_udp", + server_args, + "vcl_test_cl_udp", + client_args, + ) + + def show_commands_at_teardown(self): + self.logger.debug(self.vapi.cli("show app server")) + self.logger.debug(self.vapi.cli("show session verbose")) + self.logger.debug(self.vapi.cli("show app mq")) + + +@unittest.skipIf( + "hs_apps" in config.excluded_plugins, "Exclude tests requiring hs_apps plugin" +) +class VCLThruHostStackCLUDPBinds(VCLTestCase): + """VCL Thru Host Stack CL UDP Binds""" + + @classmethod + def setUpClass(cls): + cls.session_startup = ["poll-main", "use-app-socket-api"] + super(VCLThruHostStackCLUDPBinds, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(VCLThruHostStackCLUDPBinds, cls).tearDownClass() + + def setUp(self): + super(VCLThruHostStackCLUDPBinds, self).setUp() + + self.sapi_server_sock = "default" + self.timeout = 5 + + self.vapi.session_enable_disable(is_enable=1) + self.create_loopback_interfaces(2) + for i in self.lo_interfaces: + i.admin_up() + i.config_ip4() + + def tearDown(self): + for i in self.lo_interfaces: + i.unconfig_ip4() + i.admin_down() + i.remove_vpp_config() + super(VCLThruHostStackCLUDPBinds, self).tearDown() + + def test_vcl_thru_host_stack_cl_udp_multiple_binds(self): + """run VCL IPv4 thru host stack CL UDP multiple binds test""" + + # 2 CL UDP servers bound to the same port but different IPs + server1_args = ["-s", self.loop0.local_ip4, "-w", "2"] + server2_args = ["-s", self.loop1.local_ip4, "-w", "2"] + + sapi_sock = "%s/app_ns_sockets/%s" % (self.tempdir, self.sapi_server_sock) + self.vcl_app_env = { + "VCL_APP_SCOPE_GLOBAL": "true", + "VCL_VPP_SAPI_SOCKET": sapi_sock, + } + + worker_server1 = VCLAppWorker( + "vcl_test_cl_udp", server1_args, self.logger, self.vcl_app_env, "server1" + ) + worker_server1.start() + self.sleep(0.5) + + worker_server2 = VCLAppWorker( + "vcl_test_cl_udp", server2_args, self.logger, self.vcl_app_env, "server2" + ) + worker_server2.start() + self.sleep(0.5) + + session_output = self.vapi.cli("show session verbose") + self.logger.debug(session_output) + self.assertIn(self.loop0.local_ip4, session_output) + self.assertIn(self.loop1.local_ip4, session_output) + self.assertIn("[U]", session_output) + self.assertIn("LISTEN", session_output) + + try: + worker_server1.process.send_signal(signal.SIGUSR1) + worker_server2.process.send_signal(signal.SIGUSR1) + except (AttributeError, OSError) as e: + self.logger.warning(f"Failed to send SIGUSR1: {e}") + + self.sleep(0.5) + + worker_server2.join(self.timeout) + def show_commands_at_teardown(self): self.logger.debug(self.vapi.cli("show app server")) self.logger.debug(self.vapi.cli("show session verbose"))