quic: add transport connect_stream callback 17/43917/5
authorMatus Fabian <[email protected]>
Tue, 21 Oct 2025 17:37:03 +0000 (13:37 -0400)
committerFlorin Coras <[email protected]>
Thu, 23 Oct 2025 07:59:17 +0000 (07:59 +0000)
app now open quic streams via vnet_connect_strea

Type: improvement

Change-Id: I36157b7971983a6d4cbb68d86c84754d6e1faa62
Signed-off-by: Matus Fabian <[email protected]>
src/plugins/hs_apps/echo_client.c
src/plugins/quic/quic.c
src/plugins/quic_quicly/quic_quicly.c
src/plugins/quic_quicly/quic_quicly_crypto.c
test/asf/test_quic.py

index c0c25c4..da2a0f5 100644 (file)
@@ -649,41 +649,6 @@ ec_cleanup (ec_main_t *ecm)
   clib_spinlock_free (&ecm->rtt_stats.w_lock);
 }
 
-static int
-quic_ec_qsession_connected_callback (u32 app_index, u32 api_context,
-                                    session_t *s, session_error_t err)
-{
-  session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
-  ec_main_t *ecm = &ec_main;
-  vnet_connect_args_t _a, *a = &_a;
-  u32 stream_n;
-  int rv;
-
-  ec_dbg ("QUIC Connection handle %d", session_handle (s));
-
-  a->uri = (char *) ecm->connect_uri;
-  if (parse_uri (a->uri, &sep))
-    return -1;
-  sep.parent_handle = session_handle (s);
-
-  for (stream_n = 0; stream_n < ecm->quic_streams; stream_n++)
-    {
-      clib_memset (a, 0, sizeof (*a));
-      a->app_index = ecm->app_index;
-      a->api_context = -2 - api_context;
-      clib_memcpy (&a->sep_ext, &sep, sizeof (sep));
-
-      ec_dbg ("QUIC opening stream %d", stream_n);
-      if ((rv = vnet_connect (a)))
-       {
-         clib_error ("Stream session %d opening failed: %d", stream_n, rv);
-         return -1;
-       }
-      ec_dbg ("QUIC stream %d connected", stream_n);
-    }
-  return 0;
-}
-
 static int
 ec_ctrl_send (hs_test_cmd_t cmd)
 {
@@ -735,7 +700,11 @@ quic_ec_session_connected_callback (u32 app_index, u32 api_context,
   ec_main_t *ecm = &ec_main;
   ec_session_t *es;
   ec_worker_t *wrk;
-  clib_thread_index_t thread_index;
+  session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
+  vnet_connect_args_t _a, *a = &_a;
+  session_t *stream_session;
+  u32 stream_n;
+  int rv;
 
   if (PREDICT_FALSE (api_context == HS_CTRL_HANDLE))
     return ec_ctrl_session_connected_callback (s);
@@ -751,31 +720,42 @@ quic_ec_session_connected_callback (u32 app_index, u32 api_context,
       return 0;
     }
 
-  if (s->listener_handle == SESSION_INVALID_HANDLE)
-    return quic_ec_qsession_connected_callback (app_index, api_context, s,
-                                               err);
-  ec_dbg ("STREAM Connection callback %d", api_context);
-
-  thread_index = s->thread_index;
-  ASSERT (thread_index == vlib_get_thread_index ()
-         || session_transport_service_type (s) == TRANSPORT_SERVICE_CL);
+  ASSERT (s->listener_handle == SESSION_INVALID_HANDLE);
+  ASSERT (!(s->flags & SESSION_F_STREAM));
 
-  wrk = ec_worker_get (thread_index);
+  ec_dbg ("QUIC Connection handle %d", session_handle (s));
 
-  /*
-   * Setup session
-   */
-  es = ec_session_alloc (wrk);
-  hs_test_app_session_init (es, s);
+  clib_memset (a, 0, sizeof (*a));
+  a->app_index = ecm->app_index;
+  sep.parent_handle = session_handle (s);
+  sep.transport_proto = TRANSPORT_PROTO_QUIC;
+  clib_memcpy (&a->sep_ext, &sep, sizeof (sep));
+  wrk = ec_worker_get (s->thread_index);
 
-  es->bytes_to_send = ecm->bytes_to_send;
-  es->bytes_to_receive = ecm->echo_bytes ? ecm->bytes_to_send : 0ULL;
-  es->vpp_session_handle = session_handle (s);
-  es->vpp_session_index = s->session_index;
-  s->opaque = es->session_index;
+  for (stream_n = 0; stream_n < ecm->quic_streams; stream_n++)
+    {
+      ec_dbg ("QUIC opening stream %d", stream_n);
+      es = ec_session_alloc (wrk);
+      a->api_context = es->session_index;
+      if ((rv = vnet_connect_stream (a)))
+       {
+         clib_error ("Stream session %d opening failed: %U", stream_n,
+                     format_session_error, rv);
+         ecm->run_test = EC_EXITING;
+         signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
+         return -1;
+       }
+      ec_dbg ("QUIC stream %d connected", stream_n);
+      stream_session = session_get_from_handle (a->sh);
+      hs_test_app_session_init (es, stream_session);
+      es->bytes_to_send = ecm->bytes_to_send;
+      es->bytes_to_receive = ecm->echo_bytes ? ecm->bytes_to_send : 0ULL;
+      es->vpp_session_handle = a->sh;
+      es->vpp_session_index = stream_session->session_index;
+      vec_add1 (wrk->conn_indices, es->session_index);
+    }
 
-  vec_add1 (wrk->conn_indices, es->session_index);
-  clib_atomic_fetch_add (&ecm->ready_connections, 1);
+  clib_atomic_fetch_add (&ecm->ready_connections, ecm->quic_streams);
   if (ecm->ready_connections == ecm->expected_connections)
     {
       ecm->run_test = EC_RUNNING;
index e52b0b0..d37de7f 100644 (file)
@@ -361,11 +361,101 @@ quic_connect (transport_endpoint_cfg_t * tep)
 
   quic_session = session_get_from_handle_if_valid (sep->parent_handle);
   if (quic_session)
-    return quic_connect_stream (quic_session, sep);
+    {
+#if CLIB_DEBUG > 0
+      clib_warning ("deprecated, use vnet_connect_stream to open stream");
+#endif
+      return quic_connect_stream (quic_session, sep);
+    }
   else
     return quic_connect_connection (sep);
 }
 
+static int
+quic_proto_connect_stream (transport_endpoint_cfg_t *tep,
+                          session_t *stream_session, u32 *conn_index)
+{
+  quic_main_t *qm = &quic_main;
+  session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) tep;
+  session_t *quic_session;
+  sep = (session_endpoint_cfg_t *) tep;
+  u32 sctx_index;
+  quic_ctx_t *qctx, *sctx;
+  quic_stream_data_t *stream_data;
+  void *conn;
+  void *stream;
+  u8 is_unidir;
+  int rv;
+
+  quic_session = session_get_from_handle (sep->parent_handle);
+
+  /*  Find base session to which the user want to attach a stream */
+  QUIC_DBG (2, "Connect stream: session 0x%lx", sep->parent_handle);
+  if (session_type_transport_proto (quic_session->session_type) !=
+      TRANSPORT_PROTO_QUIC)
+    {
+      QUIC_ERR ("received incompatible session");
+      return SESSION_E_UNKNOWN;
+    }
+
+  sctx_index = quic_ctx_alloc (
+    qm, quic_session->thread_index); /*  Allocate before we get pointers */
+  sctx = quic_ctx_get (sctx_index, quic_session->thread_index);
+  qctx =
+    quic_ctx_get (quic_session->connection_index, quic_session->thread_index);
+  if (quic_ctx_is_stream (qctx))
+    {
+      QUIC_ERR ("session is a stream");
+      quic_ctx_free (qm, sctx);
+      return SESSION_E_UNKNOWN;
+    }
+
+  sctx->parent_app_wrk_id = qctx->parent_app_wrk_id;
+  sctx->parent_app_id = qctx->parent_app_id;
+  sctx->quic_connection_ctx_id = qctx->c_c_index;
+  sctx->c_c_index = sctx_index;
+  sctx->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
+  sctx->flags |= QUIC_F_IS_STREAM;
+
+  if (!(conn = qctx->conn))
+    return SESSION_E_UNKNOWN;
+
+  is_unidir = sep->transport_flags & TRANSPORT_CFG_F_UNIDIRECTIONAL;
+  rv = quic_eng_connect_stream (conn, &stream, &stream_data, is_unidir);
+  if (rv)
+    {
+      QUIC_DBG (1,
+               "Connect stream: failed %d, conn %p, stream %p, stream_data "
+               "%p, unidir %d",
+               rv, conn, &stream, &stream_data, is_unidir);
+      return rv;
+    }
+  quic_increment_counter (qm, QUIC_ERROR_OPENED_STREAM, 1);
+
+  sctx->stream = stream;
+  sctx->crypto_context_index = qctx->crypto_context_index;
+  sctx->c_s_index = stream_session->session_index;
+  stream_data->ctx_id = sctx->c_c_index;
+  stream_data->thread_index = sctx->c_thread_index;
+  stream_data->app_rx_data_len = 0;
+  stream_data->app_tx_data_len = 0;
+
+  *conn_index = sctx_index;
+
+  QUIC_DBG (
+    2, "Connect stream: stream_session handle 0x%lx, sctx_index %u, thread %u",
+    session_handle (stream_session), sctx_index, qctx->c_thread_index);
+  if (is_unidir)
+    stream_session->flags |= SESSION_F_UNIDIRECTIONAL;
+  svm_fifo_init_ooo_lookup (stream_session->rx_fifo, 0 /* ooo enq */);
+  svm_fifo_init_ooo_lookup (stream_session->tx_fifo, 1 /* ooo deq */);
+  svm_fifo_add_want_deq_ntf (stream_session->rx_fifo,
+                            SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL |
+                              SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY);
+
+  return SESSION_E_NONE;
+}
+
 static void
 quic_proto_on_close (u32 ctx_index, clib_thread_index_t thread_index)
 {
@@ -884,6 +974,7 @@ static clib_error_t *quic_enable (vlib_main_t *vm, u8 is_en);
 static transport_proto_vft_t quic_proto = {
   .enable = quic_enable,
   .connect = quic_connect,
+  .connect_stream = quic_proto_connect_stream,
   .close = quic_proto_on_close,
   .start_listen = quic_start_listen,
   .stop_listen = quic_stop_listen,
index f6fc3b3..0202faa 100644 (file)
@@ -1230,6 +1230,8 @@ quic_quicly_connect_stream (void *quic_conn, void **quic_stream,
       QUIC_DBG (2, "quicly_open_stream() failed with %d", rv);
       /* TODO: Define appropriate QUIC return values for QUIC VFT's!
        */
+      if (rv == QUICLY_TRANSPORT_ERROR_STREAM_LIMIT)
+       return SESSION_E_MAX_STREAMS_HIT;
       return -1;
     }
 
index 6d57c85..7b26661 100644 (file)
@@ -140,6 +140,7 @@ quic_quicly_on_stream_open (quicly_stream_open_t *self,
       return 0;
     }
   stream_session = session_alloc (qctx->c_thread_index);
+  stream_session->flags |= SESSION_F_STREAM;
   QUIC_DBG (2, "ACCEPTED stream_session 0x%lx ctx %u",
            session_handle (stream_session), sctx_id);
   sctx = quic_quicly_get_quic_ctx (sctx_id, qctx->c_thread_index);
index c769f97..5fdcc68 100644 (file)
@@ -151,6 +151,7 @@ class QUICEchoIntTestCase(QUICTestCase):
 
     test_bytes = " test-bytes"
     extra_vpp_config = ["session", "{", "enable", "poll-main", "}"]
+    vpp_worker_count = 2
 
     def setUp(self):
         super(QUICEchoIntTestCase, self).setUp()
@@ -177,7 +178,6 @@ class QUICEchoIntTestCase(QUICTestCase):
             self.assertNotIn("failed", error)
 
 
-@tag_fixme_vpp_workers
 class QUICEchoIntTransferTestCase(QUICEchoIntTestCase):
     """QUIC Echo Internal Transfer Test Case"""
 
@@ -187,7 +187,6 @@ class QUICEchoIntTransferTestCase(QUICEchoIntTestCase):
         self.client("bytes", "2m")
 
 
-@tag_fixme_vpp_workers
 class QUICEchoIntSerialTestCase(QUICEchoIntTestCase):
     """QUIC Echo Internal Serial Transfer Test Case"""
 
@@ -201,7 +200,6 @@ class QUICEchoIntSerialTestCase(QUICEchoIntTestCase):
         self.client("bytes", "2m")
 
 
-@tag_fixme_vpp_workers
 class QUICEchoIntMStreamTestCase(QUICEchoIntTestCase):
     """QUIC Echo Internal MultiStream Test Case"""