vcl: connect stream with vnet_connect_stream 36/43936/3
authorMatus Fabian <[email protected]>
Thu, 23 Oct 2025 18:01:35 +0000 (14:01 -0400)
committerFlorin Coras <[email protected]>
Fri, 24 Oct 2025 20:13:50 +0000 (20:13 +0000)
vppcom_session_stream_connect now creates
SESSION_CTRL_EVT_CONNECT_STREAM event which is handled with
vnet_connect_stream

Type: improvement

Change-Id: I19fda1f36fa710bae27cfd399dcf3aa8b63012d6
Signed-off-by: Matus Fabian <[email protected]>
src/vcl/vppcom.c
src/vnet/session/session_node.c
src/vnet/session/session_types.h
test-c/hs-test/vcl_test.go
test/asf/test_vcl.py

index 75af90c..2fdc69c 100644 (file)
@@ -119,6 +119,33 @@ vcl_send_session_connect (vcl_worker_t * wrk, vcl_session_t * s)
     }
 }
 
+static void
+vcl_send_session_connect_stream (vcl_worker_t *wrk, vcl_session_t *s)
+{
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  session_connect_msg_t *mp;
+  svm_msg_q_t *mq;
+
+  mq = vcl_worker_ctrl_mq (wrk);
+  app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_CONNECT_STREAM);
+  mp = (session_connect_msg_t *) app_evt->evt->data;
+  memset (mp, 0, sizeof (*mp));
+  mp->client_index = wrk->api_client_handle;
+  mp->context = s->session_index;
+  mp->wrk_index = wrk->vpp_wrk_index;
+  mp->parent_handle = s->parent_handle;
+  mp->proto = s->session_type;
+  if (s->ext_config)
+    vcl_msg_add_ext_config (s, &mp->ext_config);
+  app_send_ctrl_evt_to_vpp (mq, app_evt);
+
+  if (s->ext_config)
+    {
+      clib_mem_free (s->ext_config);
+      s->ext_config = 0;
+    }
+}
+
 void
 vcl_send_session_unlisten (vcl_worker_t * wrk, vcl_session_t * s)
 {
@@ -2051,19 +2078,15 @@ vppcom_session_stream_connect (uint32_t session_handle,
       return VPPCOM_OK;
     }
 
-  /* Connect to quic session specifics */
-  session->transport.is_ip4 = parent_session->transport.is_ip4;
-  session->transport.rmt_ip.ip4.as_u32 = (uint32_t) 1;
-  session->transport.rmt_port = 0;
   session->parent_handle = parent_session->vpp_handle;
 
   VDBG (0, "session handle %u: connecting to session %u [0x%llx]",
        session_handle, parent_session_handle, parent_session->vpp_handle);
 
   /*
-   * Send connect request and wait for reply from vpp
+   * Send connect stream request and wait for reply from vpp
    */
-  vcl_send_session_connect (wrk, session);
+  vcl_send_session_connect_stream (wrk, session);
   rv = vppcom_wait_for_session_state_change (session_index, VCL_STATE_READY,
                                             vcm->cfg.session_timeout);
 
index aca494e..81ed68b 100644 (file)
@@ -321,6 +321,61 @@ session_mq_connect_handler (session_worker_t *wrk, session_evt_elt_t *elt)
     }
 }
 
+static void
+session_mq_connect_stream_handler (session_worker_t *wrk,
+                                  session_evt_elt_t *elt)
+{
+  session_connect_msg_t *mp;
+  vnet_connect_args_t _a, *a = &_a;
+  app_worker_t *app_wrk;
+  application_t *app;
+  int rv;
+  clib_thread_index_t thread_index = wrk - session_main.wrk;
+
+  mp = session_evt_ctrl_data (wrk, elt);
+
+  if (PREDICT_FALSE (thread_index !=
+                    session_thread_from_handle (mp->parent_handle)))
+    {
+      clib_warning ("Connect on wrong thread. Dropping");
+      return;
+    }
+
+  app = application_lookup (mp->client_index);
+  if (!app)
+    {
+      return;
+    }
+
+  clib_memset (a, 0, sizeof (*a));
+  a->sep.transport_proto = mp->proto;
+  a->sep_ext.parent_handle = mp->parent_handle;
+  a->sep_ext.transport_flags = mp->flags;
+  a->api_context = mp->context;
+  a->app_index = app->app_index;
+  a->wrk_map_index = mp->wrk_index;
+
+  if (mp->ext_config)
+    {
+      transport_endpt_ext_cfg_t *ext_cfg =
+       session_mq_get_ext_config (app, mp->ext_config);
+      a->sep_ext.ext_cfgs.data = (u8 *) ext_cfg;
+      a->sep_ext.ext_cfgs.len =
+       ext_cfg->len + TRANSPORT_ENDPT_EXT_CFG_HEADER_SIZE;
+      a->sep_ext.ext_cfgs.tail_offset = a->sep_ext.ext_cfgs.len;
+    }
+
+  if ((rv = vnet_connect_stream (a)))
+    {
+      session_worker_stat_error_inc (wrk, rv, 1);
+      app_wrk = application_get_worker (app, mp->wrk_index);
+      app_worker_connect_notify (app_wrk, 0, rv, mp->context);
+    }
+
+  if (mp->ext_config)
+    session_mq_free_ext_config (app, mp->ext_config);
+}
+
 static void
 session_mq_connect_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt)
 {
@@ -1732,6 +1787,9 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
     case SESSION_CTRL_EVT_CONNECT:
       session_mq_connect_handler (wrk, elt);
       break;
+    case SESSION_CTRL_EVT_CONNECT_STREAM:
+      session_mq_connect_stream_handler (wrk, elt);
+      break;
     case SESSION_CTRL_EVT_CONNECT_URI:
       session_mq_connect_uri_handler (wrk, elt);
       break;
index 5489c67..ae4d246 100644 (file)
@@ -392,6 +392,7 @@ typedef enum
   SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY,
   SESSION_CTRL_EVT_TRANSPORT_CLOSED,
   SESSION_CTRL_EVT_HALF_CLEANUP,
+  SESSION_CTRL_EVT_CONNECT_STREAM,
 } session_evt_type_t;
 
 #define foreach_session_ctrl_evt                                              \
@@ -421,6 +422,7 @@ typedef enum
   _ (APP_WRK_RPC, app_wrk_rpc)                                                \
   _ (TRANSPORT_ATTR, transport_attr)                                          \
   _ (TRANSPORT_ATTR_REPLY, transport_attr_reply)                              \
+  _ (CONNECT_STREAM, connect)                                                 \
 /* Deprecated and will be removed. Use types above */
 #define FIFO_EVENT_APP_RX SESSION_IO_EVT_RX
 #define FIFO_EVENT_APP_TX SESSION_IO_EVT_TX
index f184d78..67f3033 100644 (file)
@@ -2,15 +2,17 @@ package main
 
 import (
        "fmt"
+       "strings"
        "time"
 
        . "fd.io/hs-test/infra"
 )
 
 func init() {
-       RegisterVethTests(XEchoVclClientUdpTest, XEchoVclClientTcpTest, XEchoVclServerUdpTest,
+       RegisterVethTests(XEchoVclClientUdpTest, XEchoVclClientTcpTest, XEchoVclServerUdpTest, VclEchoQuicTest,
                XEchoVclServerTcpTest, VclEchoTcpTest, VclEchoUdpTest, VclHttpPostTest, VclClUdpDscpTest)
        RegisterSoloVethTests(VclRetryAttachTest)
+       RegisterVethMWTests(VclEchoQuicMWTest)
 }
 
 func getVclConfig(c *Container, ns_id_optional ...string) string {
@@ -80,7 +82,12 @@ func testXEchoVclServer(s *VethsSuite, proto string) {
        s.AssertContains(o, "Test finished at")
 }
 
-func testVclEcho(s *VethsSuite, proto string) {
+func testVclEcho(s *VethsSuite, proto string, extraArgs ...string) {
+       extras := ""
+       if len(extraArgs) > 0 {
+               extras = strings.Join(extraArgs, " ")
+               extras += " "
+       }
        srvVppCont := s.Containers.ServerVpp
        srvAppCont := s.Containers.ServerApp
        serverVethAddress := s.Interfaces.Server.Ip4AddressString()
@@ -93,7 +100,7 @@ func testVclEcho(s *VethsSuite, proto string) {
        echoClnContainer := s.GetTransientContainerByName("client-app")
        echoClnContainer.CreateFile("/vcl.conf", getVclConfig(echoClnContainer))
 
-       testClientCommand := "vcl_test_client -p " + proto + " " + serverVethAddress + " " + s.Ports.Port1
+       testClientCommand := "vcl_test_client " + extras + "-p " + proto + " " + serverVethAddress + " " + s.Ports.Port1
        echoClnContainer.AddEnvVar("VCL_CONFIG", "/vcl.conf")
        o, err := echoClnContainer.Exec(true, testClientCommand)
        s.AssertNil(err)
@@ -108,6 +115,16 @@ func VclEchoUdpTest(s *VethsSuite) {
        testVclEcho(s, "udp")
 }
 
+func VclEchoQuicTest(s *VethsSuite) {
+       testVclEcho(s, "quic", "-N 1000")
+}
+
+func VclEchoQuicMWTest(s *VethsSuite) {
+       s.CpusPerVppContainer = 3
+       s.SetupTest()
+       testVclEcho(s, "quic", "-s 20 -q 10 -N 1000")
+}
+
 func VclHttpPostTest(s *VethsSuite) {
        testVclEcho(s, "http")
 }
index 4cf6fb6..d1425ed 100644 (file)
@@ -872,6 +872,20 @@ class VCLThruHostStackQUIC(VCLTestCase):
             self.loop0.local_ip4,
             self.server_port,
         ]
+        self.client_bi_dir_multi_stream_quic_test_args = [
+            "-N",
+            "1000",
+            "-B",
+            "-X",
+            "-p",
+            "quic",
+            "-s",
+            "10",
+            "-q",
+            "10",
+            self.loop0.local_ip4,
+            self.server_port,
+        ]
 
     @unittest.skipUnless(config.extended, "part of extended tests")
     def test_vcl_thru_host_stack_quic_uni_dir(self):
@@ -885,6 +899,18 @@ class VCLThruHostStackQUIC(VCLTestCase):
             self.client_uni_dir_quic_test_args,
         )
 
+    @unittest.skipUnless(config.extended, "part of extended tests")
+    def test_vcl_thru_host_stack_quic_bi_dir_multi_stream(self):
+        """run VCL thru host stack bi-directional multi stream QUIC test"""
+
+        self.timeout = self.client_uni_dir_quic_timeout
+        self.thru_host_stack_test(
+            "vcl_test_server",
+            self.server_quic_args,
+            "vcl_test_client",
+            self.client_bi_dir_multi_stream_quic_test_args,
+        )
+
     def tearDown(self):
         self.thru_host_stack_tear_down()
         super(VCLThruHostStackQUIC, self).tearDown()