session: add vnet_connect_stream 99/43899/10
authorMatus Fabian <[email protected]>
Thu, 16 Oct 2025 14:04:34 +0000 (10:04 -0400)
committerFlorin Coras <[email protected]>
Thu, 23 Oct 2025 07:59:17 +0000 (07:59 +0000)
Ask transport to open stream on existing connection, stream must be
opened on same thread as parent connection. For internall
applications stream is opened instantaneously.
Transport protocl must implement connect_stream callback.
This is useful for protocols like HTTP/2, HTTP/3 or QUIC which use
streams to provide lightweight, ordered byte-stream abstraction to an
application, they are created by sending data.

Type: improvement

Change-Id: I4b6b5a003a6e1c56135cb26e067c42956ba5ae06
Signed-off-by: Matus Fabian <[email protected]>
src/vnet/session/application.c
src/vnet/session/application.h
src/vnet/session/application_interface.h
src/vnet/session/application_worker.c
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_types.h
src/vnet/session/transport.c
src/vnet/session/transport.h

index 52a89d5..a6e0cae 100644 (file)
@@ -1465,6 +1465,25 @@ vnet_connect (vnet_connect_args_t *a)
   return app_worker_connect_session (client_wrk, &a->sep_ext, &a->sh);
 }
 
+session_error_t
+vnet_connect_stream (vnet_connect_args_t *a)
+{
+  app_worker_t *client_wrk;
+  application_t *client;
+
+  /* stream must be opened on same thread as parent connection */
+  ASSERT (a->sep_ext.parent_handle != SESSION_INVALID_HANDLE);
+  ASSERT (vlib_get_thread_index () ==
+         session_thread_from_handle (a->sep_ext.parent_handle));
+
+  a->sep_ext.opaque = a->api_context;
+
+  client = application_get (a->app_index);
+  client_wrk = application_get_worker (client, a->wrk_map_index);
+
+  return app_worker_connect_stream (client_wrk, &a->sep_ext, &a->sh);
+}
+
 session_error_t
 vnet_unlisten (vnet_unlisten_args_t *a)
 {
index 0b22d21..ea05934 100644 (file)
@@ -350,6 +350,8 @@ int app_worker_own_session (app_worker_t * app_wrk, session_t * s);
 void app_worker_free (app_worker_t * app_wrk);
 int app_worker_connect_session (app_worker_t *app, session_endpoint_cfg_t *sep,
                                session_handle_t *rsh);
+int app_worker_connect_stream (app_worker_t *app, session_endpoint_cfg_t *sep,
+                              session_handle_t *rsh);
 session_error_t app_worker_start_listen (app_worker_t *app_wrk,
                                         app_listener_t *lstnr);
 int app_worker_stop_listen (app_worker_t * app_wrk, app_listener_t * al);
index 40c6b15..e229a87 100644 (file)
@@ -264,6 +264,7 @@ session_error_t vnet_application_attach (vnet_app_attach_args_t *a);
 session_error_t vnet_application_detach (vnet_app_detach_args_t *a);
 session_error_t vnet_listen (vnet_listen_args_t *a);
 session_error_t vnet_connect (vnet_connect_args_t *a);
+session_error_t vnet_connect_stream (vnet_connect_args_t *a);
 session_error_t vnet_unlisten (vnet_unlisten_args_t *a);
 session_error_t vnet_shutdown_session (vnet_shutdown_args_t *a);
 session_error_t vnet_disconnect_session (vnet_disconnect_args_t *a);
index 6d58767..42017bf 100644 (file)
@@ -666,6 +666,18 @@ app_worker_connect_session (app_worker_t *app_wrk, session_endpoint_cfg_t *sep,
   return session_open (sep, rsh);
 }
 
+int
+app_worker_connect_stream (app_worker_t *app_wrk, session_endpoint_cfg_t *sep,
+                          session_handle_t *rsh)
+{
+  if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk)))
+    return SESSION_E_REFUSED;
+
+  sep->app_wrk_index = app_wrk->wrk_index;
+
+  return session_open_stream (sep, rsh);
+}
+
 int
 app_worker_session_fifo_tuning (app_worker_t * app_wrk, session_t * s,
                                svm_fifo_t * f,
index 71230f2..bce9a7d 100644 (file)
@@ -505,6 +505,24 @@ session_alloc_for_connection (transport_connection_t * tc)
   return s;
 }
 
+static session_t *
+session_alloc_for_stream (session_handle_t parent_handle)
+{
+  session_t *s, *ps;
+  clib_thread_index_t thread_index =
+    session_thread_from_handle (parent_handle);
+
+  ASSERT (thread_index == vlib_get_thread_index ());
+
+  ps = session_get_from_handle (parent_handle);
+  s = session_alloc (thread_index);
+  s->listener_handle = SESSION_INVALID_HANDLE;
+  s->session_type = ps->session_type;
+  session_set_state (s, SESSION_STATE_CLOSED);
+
+  return s;
+}
+
 session_t *
 session_alloc_for_half_open (transport_connection_t *tc)
 {
@@ -1351,6 +1369,67 @@ session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh)
   return session_open_srv_fns[tst](rmt, rsh);
 }
 
+/**
+ * Ask transport to open stream on existing connection.
+ */
+int
+session_open_stream (session_endpoint_cfg_t *sep, session_handle_t *rsh)
+{
+  transport_connection_t *tc;
+  transport_endpoint_cfg_t *tep;
+  app_worker_t *app_wrk;
+  session_t *s;
+  u32 conn_index;
+  int rv;
+
+  app_wrk = app_worker_get (sep->app_wrk_index);
+  tep = session_endpoint_to_transport_cfg (sep);
+
+  /* allocate session and fifos now */
+  s = session_alloc_for_stream (sep->parent_handle);
+  s->app_wrk_index = app_wrk->wrk_index;
+  s->opaque = sep->opaque;
+  s->flags |= SESSION_F_STREAM;
+  if ((rv = app_worker_init_connected (app_wrk, s)))
+    {
+      session_free (s);
+      if (app_worker_application_is_builtin (app_wrk))
+       return rv;
+      return app_worker_connect_notify (app_wrk, 0, rv, sep->opaque);
+    }
+
+  rv = transport_connect_stream (sep->transport_proto, tep, s, &conn_index);
+  if (rv < 0)
+    {
+      SESSION_DBG ("Transport failed to open stream.");
+      segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
+      session_free (s);
+      if (app_worker_application_is_builtin (app_wrk))
+       return rv;
+      return app_worker_connect_notify (app_wrk, 0, rv, sep->opaque);
+    }
+
+  session_set_state (s, SESSION_STATE_READY);
+
+  tc =
+    transport_get_connection (sep->transport_proto, conn_index,
+                             session_thread_from_handle (sep->parent_handle));
+
+  /* Attach transport to session and vice versa */
+  s->connection_index = tc->c_index;
+  tc->s_index = s->session_index;
+  *rsh = session_handle (s);
+
+  /* builtin apps are synchronous */
+  if (app_worker_application_is_builtin (app_wrk))
+    {
+      s->flags |= SESSION_F_RX_READY;
+      return SESSION_E_NONE;
+    }
+
+  return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, sep->opaque);
+}
+
 /**
  * Ask transport to listen on session endpoint.
  *
index 62f79bc..896d31d 100644 (file)
@@ -538,6 +538,7 @@ session_clone_safe (u32 session_index, clib_thread_index_t thread_index)
 }
 
 int session_open (session_endpoint_cfg_t *sep, session_handle_t *rsh);
+int session_open_stream (session_endpoint_cfg_t *sep, session_handle_t *rsh);
 int session_listen (session_t * s, session_endpoint_cfg_t * sep);
 int session_stop_listen (session_t * s);
 void session_half_close (session_t *s);
index 147c00a..5489c67 100644 (file)
@@ -190,7 +190,8 @@ typedef enum
   _ (APP_CLOSED, "app-closed")                                                \
   _ (IS_CLESS, "connectionless")                                              \
   _ (RX_READY, "rx-ready")                                                    \
-  _ (TPT_INIT_CLOSE, "transport-init-close")
+  _ (TPT_INIT_CLOSE, "transport-init-close")                                  \
+  _ (STREAM, "stream")
 
 typedef enum session_flags_bits_
 {
index 981903c..8b8c862 100644 (file)
@@ -369,6 +369,15 @@ transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep)
   return tp_vfts[tp].connect (tep);
 }
 
+int
+transport_connect_stream (transport_proto_t tp, transport_endpoint_cfg_t *tep,
+                         session_t *stream_session, u32 *conn_index)
+{
+  if (PREDICT_FALSE (!tp_vfts[tp].connect_stream))
+    return SESSION_E_TRANSPORT_NO_REG;
+  return tp_vfts[tp].connect_stream (tep, stream_session, conn_index);
+}
+
 void
 transport_half_close (transport_proto_t tp, u32 conn_index, u8 thread_index)
 {
index 10477b9..adb43b4 100644 (file)
@@ -17,6 +17,7 @@
 #define SRC_VNET_SESSION_TRANSPORT_H_
 
 #include <vnet/vnet.h>
+#include <vnet/session/session_types.h>
 #include <vnet/session/transport_types.h>
 
 #define TRANSPORT_PACER_MIN_MSS        1460
@@ -74,6 +75,8 @@ typedef struct _transport_proto_vft
   u32 (*start_listen) (u32 session_index, transport_endpoint_cfg_t *lcl);
   u32 (*stop_listen) (u32 conn_index);
   int (*connect) (transport_endpoint_cfg_t * rmt);
+  int (*connect_stream) (transport_endpoint_cfg_t *rmt,
+                        session_t *session_index, u32 *conn_index);
   void (*half_close) (u32 conn_index, clib_thread_index_t thread_index);
   void (*close) (u32 conn_index, clib_thread_index_t thread_index);
   void (*reset) (u32 conn_index, clib_thread_index_t thread_index);
@@ -137,6 +140,9 @@ extern transport_proto_vft_t *tp_vfts;
       if (VAR_ALLOW_BM & (1 << VAR))
 
 int transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep);
+int transport_connect_stream (transport_proto_t tp,
+                             transport_endpoint_cfg_t *tep,
+                             session_t *stream_session, u32 *conn_index);
 void transport_half_close (transport_proto_t tp, u32 conn_index,
                           u8 thread_index);
 void transport_close (transport_proto_t tp, u32 conn_index, u8 thread_index);