From: Matus Fabian Date: Thu, 16 Oct 2025 14:04:34 +0000 (-0400) Subject: session: add vnet_connect_stream X-Git-Url: https://gerrit.fd.io/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F99%2F43899%2F10;p=vpp.git session: add vnet_connect_stream Ask transport to open stream on existing connection, stream must be opened on same thread as parent connection. For internall applications stream is opened instantaneously. Transport protocl must implement connect_stream callback. This is useful for protocols like HTTP/2, HTTP/3 or QUIC which use streams to provide lightweight, ordered byte-stream abstraction to an application, they are created by sending data. Type: improvement Change-Id: I4b6b5a003a6e1c56135cb26e067c42956ba5ae06 Signed-off-by: Matus Fabian --- diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 52a89d5a02d..a6e0caed67f 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -1465,6 +1465,25 @@ vnet_connect (vnet_connect_args_t *a) return app_worker_connect_session (client_wrk, &a->sep_ext, &a->sh); } +session_error_t +vnet_connect_stream (vnet_connect_args_t *a) +{ + app_worker_t *client_wrk; + application_t *client; + + /* stream must be opened on same thread as parent connection */ + ASSERT (a->sep_ext.parent_handle != SESSION_INVALID_HANDLE); + ASSERT (vlib_get_thread_index () == + session_thread_from_handle (a->sep_ext.parent_handle)); + + a->sep_ext.opaque = a->api_context; + + client = application_get (a->app_index); + client_wrk = application_get_worker (client, a->wrk_map_index); + + return app_worker_connect_stream (client_wrk, &a->sep_ext, &a->sh); +} + session_error_t vnet_unlisten (vnet_unlisten_args_t *a) { diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h index 0b22d210f52..ea05934f772 100644 --- a/src/vnet/session/application.h +++ b/src/vnet/session/application.h @@ -350,6 +350,8 @@ int app_worker_own_session (app_worker_t * app_wrk, session_t * s); void app_worker_free (app_worker_t * app_wrk); int app_worker_connect_session (app_worker_t *app, session_endpoint_cfg_t *sep, session_handle_t *rsh); +int app_worker_connect_stream (app_worker_t *app, session_endpoint_cfg_t *sep, + session_handle_t *rsh); session_error_t app_worker_start_listen (app_worker_t *app_wrk, app_listener_t *lstnr); int app_worker_stop_listen (app_worker_t * app_wrk, app_listener_t * al); diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h index 40c6b15f8da..e229a87e4e8 100644 --- a/src/vnet/session/application_interface.h +++ b/src/vnet/session/application_interface.h @@ -264,6 +264,7 @@ session_error_t vnet_application_attach (vnet_app_attach_args_t *a); session_error_t vnet_application_detach (vnet_app_detach_args_t *a); session_error_t vnet_listen (vnet_listen_args_t *a); session_error_t vnet_connect (vnet_connect_args_t *a); +session_error_t vnet_connect_stream (vnet_connect_args_t *a); session_error_t vnet_unlisten (vnet_unlisten_args_t *a); session_error_t vnet_shutdown_session (vnet_shutdown_args_t *a); session_error_t vnet_disconnect_session (vnet_disconnect_args_t *a); diff --git a/src/vnet/session/application_worker.c b/src/vnet/session/application_worker.c index 6d5876724ae..42017bf3f0d 100644 --- a/src/vnet/session/application_worker.c +++ b/src/vnet/session/application_worker.c @@ -666,6 +666,18 @@ app_worker_connect_session (app_worker_t *app_wrk, session_endpoint_cfg_t *sep, return session_open (sep, rsh); } +int +app_worker_connect_stream (app_worker_t *app_wrk, session_endpoint_cfg_t *sep, + session_handle_t *rsh) +{ + if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk))) + return SESSION_E_REFUSED; + + sep->app_wrk_index = app_wrk->wrk_index; + + return session_open_stream (sep, rsh); +} + int app_worker_session_fifo_tuning (app_worker_t * app_wrk, session_t * s, svm_fifo_t * f, diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 71230f2eac7..bce9a7da0c9 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -505,6 +505,24 @@ session_alloc_for_connection (transport_connection_t * tc) return s; } +static session_t * +session_alloc_for_stream (session_handle_t parent_handle) +{ + session_t *s, *ps; + clib_thread_index_t thread_index = + session_thread_from_handle (parent_handle); + + ASSERT (thread_index == vlib_get_thread_index ()); + + ps = session_get_from_handle (parent_handle); + s = session_alloc (thread_index); + s->listener_handle = SESSION_INVALID_HANDLE; + s->session_type = ps->session_type; + session_set_state (s, SESSION_STATE_CLOSED); + + return s; +} + session_t * session_alloc_for_half_open (transport_connection_t *tc) { @@ -1351,6 +1369,67 @@ session_open (session_endpoint_cfg_t *rmt, session_handle_t *rsh) return session_open_srv_fns[tst](rmt, rsh); } +/** + * Ask transport to open stream on existing connection. + */ +int +session_open_stream (session_endpoint_cfg_t *sep, session_handle_t *rsh) +{ + transport_connection_t *tc; + transport_endpoint_cfg_t *tep; + app_worker_t *app_wrk; + session_t *s; + u32 conn_index; + int rv; + + app_wrk = app_worker_get (sep->app_wrk_index); + tep = session_endpoint_to_transport_cfg (sep); + + /* allocate session and fifos now */ + s = session_alloc_for_stream (sep->parent_handle); + s->app_wrk_index = app_wrk->wrk_index; + s->opaque = sep->opaque; + s->flags |= SESSION_F_STREAM; + if ((rv = app_worker_init_connected (app_wrk, s))) + { + session_free (s); + if (app_worker_application_is_builtin (app_wrk)) + return rv; + return app_worker_connect_notify (app_wrk, 0, rv, sep->opaque); + } + + rv = transport_connect_stream (sep->transport_proto, tep, s, &conn_index); + if (rv < 0) + { + SESSION_DBG ("Transport failed to open stream."); + segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo); + session_free (s); + if (app_worker_application_is_builtin (app_wrk)) + return rv; + return app_worker_connect_notify (app_wrk, 0, rv, sep->opaque); + } + + session_set_state (s, SESSION_STATE_READY); + + tc = + transport_get_connection (sep->transport_proto, conn_index, + session_thread_from_handle (sep->parent_handle)); + + /* Attach transport to session and vice versa */ + s->connection_index = tc->c_index; + tc->s_index = s->session_index; + *rsh = session_handle (s); + + /* builtin apps are synchronous */ + if (app_worker_application_is_builtin (app_wrk)) + { + s->flags |= SESSION_F_RX_READY; + return SESSION_E_NONE; + } + + return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, sep->opaque); +} + /** * Ask transport to listen on session endpoint. * diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index 62f79bc3270..896d31da9da 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -538,6 +538,7 @@ session_clone_safe (u32 session_index, clib_thread_index_t thread_index) } int session_open (session_endpoint_cfg_t *sep, session_handle_t *rsh); +int session_open_stream (session_endpoint_cfg_t *sep, session_handle_t *rsh); int session_listen (session_t * s, session_endpoint_cfg_t * sep); int session_stop_listen (session_t * s); void session_half_close (session_t *s); diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h index 147c00ab542..5489c673071 100644 --- a/src/vnet/session/session_types.h +++ b/src/vnet/session/session_types.h @@ -190,7 +190,8 @@ typedef enum _ (APP_CLOSED, "app-closed") \ _ (IS_CLESS, "connectionless") \ _ (RX_READY, "rx-ready") \ - _ (TPT_INIT_CLOSE, "transport-init-close") + _ (TPT_INIT_CLOSE, "transport-init-close") \ + _ (STREAM, "stream") typedef enum session_flags_bits_ { diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c index 981903c3591..8b8c8623cdc 100644 --- a/src/vnet/session/transport.c +++ b/src/vnet/session/transport.c @@ -369,6 +369,15 @@ transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep) return tp_vfts[tp].connect (tep); } +int +transport_connect_stream (transport_proto_t tp, transport_endpoint_cfg_t *tep, + session_t *stream_session, u32 *conn_index) +{ + if (PREDICT_FALSE (!tp_vfts[tp].connect_stream)) + return SESSION_E_TRANSPORT_NO_REG; + return tp_vfts[tp].connect_stream (tep, stream_session, conn_index); +} + void transport_half_close (transport_proto_t tp, u32 conn_index, u8 thread_index) { diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h index 10477b9da67..adb43b4f8a6 100644 --- a/src/vnet/session/transport.h +++ b/src/vnet/session/transport.h @@ -17,6 +17,7 @@ #define SRC_VNET_SESSION_TRANSPORT_H_ #include +#include #include #define TRANSPORT_PACER_MIN_MSS 1460 @@ -74,6 +75,8 @@ typedef struct _transport_proto_vft u32 (*start_listen) (u32 session_index, transport_endpoint_cfg_t *lcl); u32 (*stop_listen) (u32 conn_index); int (*connect) (transport_endpoint_cfg_t * rmt); + int (*connect_stream) (transport_endpoint_cfg_t *rmt, + session_t *session_index, u32 *conn_index); void (*half_close) (u32 conn_index, clib_thread_index_t thread_index); void (*close) (u32 conn_index, clib_thread_index_t thread_index); void (*reset) (u32 conn_index, clib_thread_index_t thread_index); @@ -137,6 +140,9 @@ extern transport_proto_vft_t *tp_vfts; if (VAR_ALLOW_BM & (1 << VAR)) int transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep); +int transport_connect_stream (transport_proto_t tp, + transport_endpoint_cfg_t *tep, + session_t *stream_session, u32 *conn_index); void transport_half_close (transport_proto_t tp, u32 conn_index, u8 thread_index); void transport_close (transport_proto_t tp, u32 conn_index, u8 thread_index);