From: Matus Fabian Date: Tue, 21 Oct 2025 17:37:03 +0000 (-0400) Subject: quic: add transport connect_stream callback X-Git-Url: https://gerrit.fd.io/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F17%2F43917%2F5;p=vpp.git quic: add transport connect_stream callback app now open quic streams via vnet_connect_strea Type: improvement Change-Id: I36157b7971983a6d4cbb68d86c84754d6e1faa62 Signed-off-by: Matus Fabian --- diff --git a/src/plugins/hs_apps/echo_client.c b/src/plugins/hs_apps/echo_client.c index c0c25c4ba0b..da2a0f533ea 100644 --- a/src/plugins/hs_apps/echo_client.c +++ b/src/plugins/hs_apps/echo_client.c @@ -649,41 +649,6 @@ ec_cleanup (ec_main_t *ecm) clib_spinlock_free (&ecm->rtt_stats.w_lock); } -static int -quic_ec_qsession_connected_callback (u32 app_index, u32 api_context, - session_t *s, session_error_t err) -{ - session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL; - ec_main_t *ecm = &ec_main; - vnet_connect_args_t _a, *a = &_a; - u32 stream_n; - int rv; - - ec_dbg ("QUIC Connection handle %d", session_handle (s)); - - a->uri = (char *) ecm->connect_uri; - if (parse_uri (a->uri, &sep)) - return -1; - sep.parent_handle = session_handle (s); - - for (stream_n = 0; stream_n < ecm->quic_streams; stream_n++) - { - clib_memset (a, 0, sizeof (*a)); - a->app_index = ecm->app_index; - a->api_context = -2 - api_context; - clib_memcpy (&a->sep_ext, &sep, sizeof (sep)); - - ec_dbg ("QUIC opening stream %d", stream_n); - if ((rv = vnet_connect (a))) - { - clib_error ("Stream session %d opening failed: %d", stream_n, rv); - return -1; - } - ec_dbg ("QUIC stream %d connected", stream_n); - } - return 0; -} - static int ec_ctrl_send (hs_test_cmd_t cmd) { @@ -735,7 +700,11 @@ quic_ec_session_connected_callback (u32 app_index, u32 api_context, ec_main_t *ecm = &ec_main; ec_session_t *es; ec_worker_t *wrk; - clib_thread_index_t thread_index; + session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL; + vnet_connect_args_t _a, *a = &_a; + session_t *stream_session; + u32 stream_n; + int rv; if (PREDICT_FALSE (api_context == HS_CTRL_HANDLE)) return ec_ctrl_session_connected_callback (s); @@ -751,31 +720,42 @@ quic_ec_session_connected_callback (u32 app_index, u32 api_context, return 0; } - if (s->listener_handle == SESSION_INVALID_HANDLE) - return quic_ec_qsession_connected_callback (app_index, api_context, s, - err); - ec_dbg ("STREAM Connection callback %d", api_context); - - thread_index = s->thread_index; - ASSERT (thread_index == vlib_get_thread_index () - || session_transport_service_type (s) == TRANSPORT_SERVICE_CL); + ASSERT (s->listener_handle == SESSION_INVALID_HANDLE); + ASSERT (!(s->flags & SESSION_F_STREAM)); - wrk = ec_worker_get (thread_index); + ec_dbg ("QUIC Connection handle %d", session_handle (s)); - /* - * Setup session - */ - es = ec_session_alloc (wrk); - hs_test_app_session_init (es, s); + clib_memset (a, 0, sizeof (*a)); + a->app_index = ecm->app_index; + sep.parent_handle = session_handle (s); + sep.transport_proto = TRANSPORT_PROTO_QUIC; + clib_memcpy (&a->sep_ext, &sep, sizeof (sep)); + wrk = ec_worker_get (s->thread_index); - es->bytes_to_send = ecm->bytes_to_send; - es->bytes_to_receive = ecm->echo_bytes ? ecm->bytes_to_send : 0ULL; - es->vpp_session_handle = session_handle (s); - es->vpp_session_index = s->session_index; - s->opaque = es->session_index; + for (stream_n = 0; stream_n < ecm->quic_streams; stream_n++) + { + ec_dbg ("QUIC opening stream %d", stream_n); + es = ec_session_alloc (wrk); + a->api_context = es->session_index; + if ((rv = vnet_connect_stream (a))) + { + clib_error ("Stream session %d opening failed: %U", stream_n, + format_session_error, rv); + ecm->run_test = EC_EXITING; + signal_evt_to_cli (EC_CLI_CONNECTS_FAILED); + return -1; + } + ec_dbg ("QUIC stream %d connected", stream_n); + stream_session = session_get_from_handle (a->sh); + hs_test_app_session_init (es, stream_session); + es->bytes_to_send = ecm->bytes_to_send; + es->bytes_to_receive = ecm->echo_bytes ? ecm->bytes_to_send : 0ULL; + es->vpp_session_handle = a->sh; + es->vpp_session_index = stream_session->session_index; + vec_add1 (wrk->conn_indices, es->session_index); + } - vec_add1 (wrk->conn_indices, es->session_index); - clib_atomic_fetch_add (&ecm->ready_connections, 1); + clib_atomic_fetch_add (&ecm->ready_connections, ecm->quic_streams); if (ecm->ready_connections == ecm->expected_connections) { ecm->run_test = EC_RUNNING; diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c index e52b0b09e8d..d37de7fe00c 100644 --- a/src/plugins/quic/quic.c +++ b/src/plugins/quic/quic.c @@ -361,11 +361,101 @@ quic_connect (transport_endpoint_cfg_t * tep) quic_session = session_get_from_handle_if_valid (sep->parent_handle); if (quic_session) - return quic_connect_stream (quic_session, sep); + { +#if CLIB_DEBUG > 0 + clib_warning ("deprecated, use vnet_connect_stream to open stream"); +#endif + return quic_connect_stream (quic_session, sep); + } else return quic_connect_connection (sep); } +static int +quic_proto_connect_stream (transport_endpoint_cfg_t *tep, + session_t *stream_session, u32 *conn_index) +{ + quic_main_t *qm = &quic_main; + session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) tep; + session_t *quic_session; + sep = (session_endpoint_cfg_t *) tep; + u32 sctx_index; + quic_ctx_t *qctx, *sctx; + quic_stream_data_t *stream_data; + void *conn; + void *stream; + u8 is_unidir; + int rv; + + quic_session = session_get_from_handle (sep->parent_handle); + + /* Find base session to which the user want to attach a stream */ + QUIC_DBG (2, "Connect stream: session 0x%lx", sep->parent_handle); + if (session_type_transport_proto (quic_session->session_type) != + TRANSPORT_PROTO_QUIC) + { + QUIC_ERR ("received incompatible session"); + return SESSION_E_UNKNOWN; + } + + sctx_index = quic_ctx_alloc ( + qm, quic_session->thread_index); /* Allocate before we get pointers */ + sctx = quic_ctx_get (sctx_index, quic_session->thread_index); + qctx = + quic_ctx_get (quic_session->connection_index, quic_session->thread_index); + if (quic_ctx_is_stream (qctx)) + { + QUIC_ERR ("session is a stream"); + quic_ctx_free (qm, sctx); + return SESSION_E_UNKNOWN; + } + + sctx->parent_app_wrk_id = qctx->parent_app_wrk_id; + sctx->parent_app_id = qctx->parent_app_id; + sctx->quic_connection_ctx_id = qctx->c_c_index; + sctx->c_c_index = sctx_index; + sctx->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP; + sctx->flags |= QUIC_F_IS_STREAM; + + if (!(conn = qctx->conn)) + return SESSION_E_UNKNOWN; + + is_unidir = sep->transport_flags & TRANSPORT_CFG_F_UNIDIRECTIONAL; + rv = quic_eng_connect_stream (conn, &stream, &stream_data, is_unidir); + if (rv) + { + QUIC_DBG (1, + "Connect stream: failed %d, conn %p, stream %p, stream_data " + "%p, unidir %d", + rv, conn, &stream, &stream_data, is_unidir); + return rv; + } + quic_increment_counter (qm, QUIC_ERROR_OPENED_STREAM, 1); + + sctx->stream = stream; + sctx->crypto_context_index = qctx->crypto_context_index; + sctx->c_s_index = stream_session->session_index; + stream_data->ctx_id = sctx->c_c_index; + stream_data->thread_index = sctx->c_thread_index; + stream_data->app_rx_data_len = 0; + stream_data->app_tx_data_len = 0; + + *conn_index = sctx_index; + + QUIC_DBG ( + 2, "Connect stream: stream_session handle 0x%lx, sctx_index %u, thread %u", + session_handle (stream_session), sctx_index, qctx->c_thread_index); + if (is_unidir) + stream_session->flags |= SESSION_F_UNIDIRECTIONAL; + svm_fifo_init_ooo_lookup (stream_session->rx_fifo, 0 /* ooo enq */); + svm_fifo_init_ooo_lookup (stream_session->tx_fifo, 1 /* ooo deq */); + svm_fifo_add_want_deq_ntf (stream_session->rx_fifo, + SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL | + SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY); + + return SESSION_E_NONE; +} + static void quic_proto_on_close (u32 ctx_index, clib_thread_index_t thread_index) { @@ -884,6 +974,7 @@ static clib_error_t *quic_enable (vlib_main_t *vm, u8 is_en); static transport_proto_vft_t quic_proto = { .enable = quic_enable, .connect = quic_connect, + .connect_stream = quic_proto_connect_stream, .close = quic_proto_on_close, .start_listen = quic_start_listen, .stop_listen = quic_stop_listen, diff --git a/src/plugins/quic_quicly/quic_quicly.c b/src/plugins/quic_quicly/quic_quicly.c index f6fc3b306dc..0202faa95ec 100644 --- a/src/plugins/quic_quicly/quic_quicly.c +++ b/src/plugins/quic_quicly/quic_quicly.c @@ -1230,6 +1230,8 @@ quic_quicly_connect_stream (void *quic_conn, void **quic_stream, QUIC_DBG (2, "quicly_open_stream() failed with %d", rv); /* TODO: Define appropriate QUIC return values for QUIC VFT's! */ + if (rv == QUICLY_TRANSPORT_ERROR_STREAM_LIMIT) + return SESSION_E_MAX_STREAMS_HIT; return -1; } diff --git a/src/plugins/quic_quicly/quic_quicly_crypto.c b/src/plugins/quic_quicly/quic_quicly_crypto.c index 6d57c85b557..7b26661c642 100644 --- a/src/plugins/quic_quicly/quic_quicly_crypto.c +++ b/src/plugins/quic_quicly/quic_quicly_crypto.c @@ -140,6 +140,7 @@ quic_quicly_on_stream_open (quicly_stream_open_t *self, return 0; } stream_session = session_alloc (qctx->c_thread_index); + stream_session->flags |= SESSION_F_STREAM; QUIC_DBG (2, "ACCEPTED stream_session 0x%lx ctx %u", session_handle (stream_session), sctx_id); sctx = quic_quicly_get_quic_ctx (sctx_id, qctx->c_thread_index); diff --git a/test/asf/test_quic.py b/test/asf/test_quic.py index c769f972675..5fdcc683265 100644 --- a/test/asf/test_quic.py +++ b/test/asf/test_quic.py @@ -151,6 +151,7 @@ class QUICEchoIntTestCase(QUICTestCase): test_bytes = " test-bytes" extra_vpp_config = ["session", "{", "enable", "poll-main", "}"] + vpp_worker_count = 2 def setUp(self): super(QUICEchoIntTestCase, self).setUp() @@ -177,7 +178,6 @@ class QUICEchoIntTestCase(QUICTestCase): self.assertNotIn("failed", error) -@tag_fixme_vpp_workers class QUICEchoIntTransferTestCase(QUICEchoIntTestCase): """QUIC Echo Internal Transfer Test Case""" @@ -187,7 +187,6 @@ class QUICEchoIntTransferTestCase(QUICEchoIntTestCase): self.client("bytes", "2m") -@tag_fixme_vpp_workers class QUICEchoIntSerialTestCase(QUICEchoIntTestCase): """QUIC Echo Internal Serial Transfer Test Case""" @@ -201,7 +200,6 @@ class QUICEchoIntSerialTestCase(QUICEchoIntTestCase): self.client("bytes", "2m") -@tag_fixme_vpp_workers class QUICEchoIntMStreamTestCase(QUICEchoIntTestCase): """QUIC Echo Internal MultiStream Test Case"""