From 6024c1d4c33cb20c1220ec9e29d54e132b44c7bf Mon Sep 17 00:00:00 2001 From: Matus Fabian Date: Thu, 23 Oct 2025 14:01:35 -0400 Subject: [PATCH] vcl: connect stream with vnet_connect_stream vppcom_session_stream_connect now creates SESSION_CTRL_EVT_CONNECT_STREAM event which is handled with vnet_connect_stream Type: improvement Change-Id: I19fda1f36fa710bae27cfd399dcf3aa8b63012d6 Signed-off-by: Matus Fabian --- src/vcl/vppcom.c | 35 +++++++++++++++++++----- src/vnet/session/session_node.c | 58 ++++++++++++++++++++++++++++++++++++++++ src/vnet/session/session_types.h | 2 ++ test-c/hs-test/vcl_test.go | 23 +++++++++++++--- test/asf/test_vcl.py | 26 ++++++++++++++++++ 5 files changed, 135 insertions(+), 9 deletions(-) diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c index 75af90c43df..2fdc69cb7dd 100644 --- a/src/vcl/vppcom.c +++ b/src/vcl/vppcom.c @@ -119,6 +119,33 @@ vcl_send_session_connect (vcl_worker_t * wrk, vcl_session_t * s) } } +static void +vcl_send_session_connect_stream (vcl_worker_t *wrk, vcl_session_t *s) +{ + app_session_evt_t _app_evt, *app_evt = &_app_evt; + session_connect_msg_t *mp; + svm_msg_q_t *mq; + + mq = vcl_worker_ctrl_mq (wrk); + app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_CONNECT_STREAM); + mp = (session_connect_msg_t *) app_evt->evt->data; + memset (mp, 0, sizeof (*mp)); + mp->client_index = wrk->api_client_handle; + mp->context = s->session_index; + mp->wrk_index = wrk->vpp_wrk_index; + mp->parent_handle = s->parent_handle; + mp->proto = s->session_type; + if (s->ext_config) + vcl_msg_add_ext_config (s, &mp->ext_config); + app_send_ctrl_evt_to_vpp (mq, app_evt); + + if (s->ext_config) + { + clib_mem_free (s->ext_config); + s->ext_config = 0; + } +} + void vcl_send_session_unlisten (vcl_worker_t * wrk, vcl_session_t * s) { @@ -2051,19 +2078,15 @@ vppcom_session_stream_connect (uint32_t session_handle, return VPPCOM_OK; } - /* Connect to quic session specifics */ - session->transport.is_ip4 = parent_session->transport.is_ip4; - session->transport.rmt_ip.ip4.as_u32 = (uint32_t) 1; - session->transport.rmt_port = 0; session->parent_handle = parent_session->vpp_handle; VDBG (0, "session handle %u: connecting to session %u [0x%llx]", session_handle, parent_session_handle, parent_session->vpp_handle); /* - * Send connect request and wait for reply from vpp + * Send connect stream request and wait for reply from vpp */ - vcl_send_session_connect (wrk, session); + vcl_send_session_connect_stream (wrk, session); rv = vppcom_wait_for_session_state_change (session_index, VCL_STATE_READY, vcm->cfg.session_timeout); diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index aca494e7830..81ed68b4084 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -321,6 +321,61 @@ session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt) } } +static void +session_mq_connect_stream_handler (session_worker_t *wrk, + session_evt_elt_t *elt) +{ + session_connect_msg_t *mp; + vnet_connect_args_t _a, *a = &_a; + app_worker_t *app_wrk; + application_t *app; + int rv; + clib_thread_index_t thread_index = wrk - session_main.wrk; + + mp = session_evt_ctrl_data (wrk, elt); + + if (PREDICT_FALSE (thread_index != + session_thread_from_handle (mp->parent_handle))) + { + clib_warning ("Connect on wrong thread. Dropping"); + return; + } + + app = application_lookup (mp->client_index); + if (!app) + { + return; + } + + clib_memset (a, 0, sizeof (*a)); + a->sep.transport_proto = mp->proto; + a->sep_ext.parent_handle = mp->parent_handle; + a->sep_ext.transport_flags = mp->flags; + a->api_context = mp->context; + a->app_index = app->app_index; + a->wrk_map_index = mp->wrk_index; + + if (mp->ext_config) + { + transport_endpt_ext_cfg_t *ext_cfg = + session_mq_get_ext_config (app, mp->ext_config); + a->sep_ext.ext_cfgs.data = (u8 *) ext_cfg; + a->sep_ext.ext_cfgs.len = + ext_cfg->len + TRANSPORT_ENDPT_EXT_CFG_HEADER_SIZE; + a->sep_ext.ext_cfgs.tail_offset = a->sep_ext.ext_cfgs.len; + } + + if ((rv = vnet_connect_stream (a))) + { + session_worker_stat_error_inc (wrk, rv, 1); + app_wrk = application_get_worker (app, mp->wrk_index); + app_worker_connect_notify (app_wrk, 0, rv, mp->context); + } + + if (mp->ext_config) + session_mq_free_ext_config (app, mp->ext_config); +} + static void session_mq_connect_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt) { @@ -1732,6 +1787,9 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt) case SESSION_CTRL_EVT_CONNECT: session_mq_connect_handler (wrk, elt); break; + case SESSION_CTRL_EVT_CONNECT_STREAM: + session_mq_connect_stream_handler (wrk, elt); + break; case SESSION_CTRL_EVT_CONNECT_URI: session_mq_connect_uri_handler (wrk, elt); break; diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h index 5489c673071..ae4d246ae3f 100644 --- a/src/vnet/session/session_types.h +++ b/src/vnet/session/session_types.h @@ -392,6 +392,7 @@ typedef enum SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY, SESSION_CTRL_EVT_TRANSPORT_CLOSED, SESSION_CTRL_EVT_HALF_CLEANUP, + SESSION_CTRL_EVT_CONNECT_STREAM, } session_evt_type_t; #define foreach_session_ctrl_evt \ @@ -421,6 +422,7 @@ typedef enum _ (APP_WRK_RPC, app_wrk_rpc) \ _ (TRANSPORT_ATTR, transport_attr) \ _ (TRANSPORT_ATTR_REPLY, transport_attr_reply) \ + _ (CONNECT_STREAM, connect) \ /* Deprecated and will be removed. Use types above */ #define FIFO_EVENT_APP_RX SESSION_IO_EVT_RX #define FIFO_EVENT_APP_TX SESSION_IO_EVT_TX diff --git a/test-c/hs-test/vcl_test.go b/test-c/hs-test/vcl_test.go index f184d78257e..67f30331f32 100644 --- a/test-c/hs-test/vcl_test.go +++ b/test-c/hs-test/vcl_test.go @@ -2,15 +2,17 @@ package main import ( "fmt" + "strings" "time" . "fd.io/hs-test/infra" ) func init() { - RegisterVethTests(XEchoVclClientUdpTest, XEchoVclClientTcpTest, XEchoVclServerUdpTest, + RegisterVethTests(XEchoVclClientUdpTest, XEchoVclClientTcpTest, XEchoVclServerUdpTest, VclEchoQuicTest, XEchoVclServerTcpTest, VclEchoTcpTest, VclEchoUdpTest, VclHttpPostTest, VclClUdpDscpTest) RegisterSoloVethTests(VclRetryAttachTest) + RegisterVethMWTests(VclEchoQuicMWTest) } func getVclConfig(c *Container, ns_id_optional ...string) string { @@ -80,7 +82,12 @@ func testXEchoVclServer(s *VethsSuite, proto string) { s.AssertContains(o, "Test finished at") } -func testVclEcho(s *VethsSuite, proto string) { +func testVclEcho(s *VethsSuite, proto string, extraArgs ...string) { + extras := "" + if len(extraArgs) > 0 { + extras = strings.Join(extraArgs, " ") + extras += " " + } srvVppCont := s.Containers.ServerVpp srvAppCont := s.Containers.ServerApp serverVethAddress := s.Interfaces.Server.Ip4AddressString() @@ -93,7 +100,7 @@ func testVclEcho(s *VethsSuite, proto string) { echoClnContainer := s.GetTransientContainerByName("client-app") echoClnContainer.CreateFile("/vcl.conf", getVclConfig(echoClnContainer)) - testClientCommand := "vcl_test_client -p " + proto + " " + serverVethAddress + " " + s.Ports.Port1 + testClientCommand := "vcl_test_client " + extras + "-p " + proto + " " + serverVethAddress + " " + s.Ports.Port1 echoClnContainer.AddEnvVar("VCL_CONFIG", "/vcl.conf") o, err := echoClnContainer.Exec(true, testClientCommand) s.AssertNil(err) @@ -108,6 +115,16 @@ func VclEchoUdpTest(s *VethsSuite) { testVclEcho(s, "udp") } +func VclEchoQuicTest(s *VethsSuite) { + testVclEcho(s, "quic", "-N 1000") +} + +func VclEchoQuicMWTest(s *VethsSuite) { + s.CpusPerVppContainer = 3 + s.SetupTest() + testVclEcho(s, "quic", "-s 20 -q 10 -N 1000") +} + func VclHttpPostTest(s *VethsSuite) { testVclEcho(s, "http") } diff --git a/test/asf/test_vcl.py b/test/asf/test_vcl.py index 4cf6fb635ba..d1425edc08c 100644 --- a/test/asf/test_vcl.py +++ b/test/asf/test_vcl.py @@ -872,6 +872,20 @@ class VCLThruHostStackQUIC(VCLTestCase): self.loop0.local_ip4, self.server_port, ] + self.client_bi_dir_multi_stream_quic_test_args = [ + "-N", + "1000", + "-B", + "-X", + "-p", + "quic", + "-s", + "10", + "-q", + "10", + self.loop0.local_ip4, + self.server_port, + ] @unittest.skipUnless(config.extended, "part of extended tests") def test_vcl_thru_host_stack_quic_uni_dir(self): @@ -885,6 +899,18 @@ class VCLThruHostStackQUIC(VCLTestCase): self.client_uni_dir_quic_test_args, ) + @unittest.skipUnless(config.extended, "part of extended tests") + def test_vcl_thru_host_stack_quic_bi_dir_multi_stream(self): + """run VCL thru host stack bi-directional multi stream QUIC test""" + + self.timeout = self.client_uni_dir_quic_timeout + self.thru_host_stack_test( + "vcl_test_server", + self.server_quic_args, + "vcl_test_client", + self.client_bi_dir_multi_stream_quic_test_args, + ) + def tearDown(self): self.thru_host_stack_tear_down() super(VCLThruHostStackQUIC, self).tearDown() -- 2.16.6