From: Aloys Augustin Date: Tue, 9 Apr 2019 09:36:40 +0000 (+0200) Subject: QUIC: Initial multi stream support X-Git-Tag: v20.01-rc0~808 X-Git-Url: https://gerrit.fd.io/r/gitweb?a=commitdiff_plain;ds=sidebyside;h=39d04099467414175803273433c95a96c0276252;p=vpp.git QUIC: Initial multi stream support To connect a stream, apps should call connect while passing the id of the QUIC connection in the new transport_opts field in session_endpoint_cfg_t. Apps are notified of new streams with their accept callback, which is called each time a peer opens a stream. Change-Id: I0f82ec344db58008d54641553eddec2973768435 Signed-off-by: Aloys Augustin --- diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c index 7807ae49f12..a2aa17a3ca8 100644 --- a/src/plugins/quic/quic.c +++ b/src/plugins/quic/quic.c @@ -36,67 +36,12 @@ static void quic_update_timer (quic_ctx_t * ctx); static int64_t quic_get_time (quicly_now_cb * self); static void quic_connection_closed (u32 conn_index); static void quic_disconnect (u32 ctx_index, u32 thread_index); +static int quic_connect_new_stream (session_endpoint_cfg_t * sep); +static int quic_connect_new_connection (session_endpoint_cfg_t * sep); #define QUIC_INT_MAX 0x3FFFFFFFFFFFFFFF -u32 -quic_ctx_half_open_alloc (void) -{ - quic_main_t *qm = &quic_main; - u8 will_expand = 0; - quic_ctx_t *ctx; - u32 ctx_index; - - pool_get_aligned_will_expand (qm->half_open_ctx_pool, will_expand, 0); - if (PREDICT_FALSE (will_expand && vlib_num_workers ())) - { - clib_rwlock_writer_lock (&qm->half_open_rwlock); - pool_get (qm->half_open_ctx_pool, ctx); - ctx_index = ctx - qm->half_open_ctx_pool; - clib_rwlock_writer_unlock (&qm->half_open_rwlock); - } - else - { - /* reader lock assumption: only main thread will call pool_get */ - clib_rwlock_reader_lock (&qm->half_open_rwlock); - pool_get (qm->half_open_ctx_pool, ctx); - ctx_index = ctx - qm->half_open_ctx_pool; - clib_rwlock_reader_unlock (&qm->half_open_rwlock); - } - memset (ctx, 0, sizeof (*ctx)); - return ctx_index; -} - -void -quic_ctx_half_open_free (u32 ho_index) -{ - quic_main_t *qm = &quic_main; - clib_rwlock_writer_lock (&qm->half_open_rwlock); - pool_put_index (qm->half_open_ctx_pool, ho_index); - clib_rwlock_writer_unlock (&qm->half_open_rwlock); -} - -quic_ctx_t * -quic_ctx_half_open_get (u32 ctx_index) -{ - quic_main_t *qm = &quic_main; - clib_rwlock_reader_lock (&qm->half_open_rwlock); - return pool_elt_at_index (qm->half_open_ctx_pool, ctx_index); -} - -void -quic_ctx_half_open_reader_unlock () -{ - clib_rwlock_reader_unlock (&quic_main.half_open_rwlock); -} - -u32 -quic_ctx_half_open_index (quic_ctx_t * ctx) -{ - return (ctx - quic_main.half_open_ctx_pool); -} - -u32 +static u32 quic_ctx_alloc () { u8 thread_index = vlib_get_thread_index (); @@ -138,7 +83,7 @@ quic_disconnect_transport (quic_ctx_t * ctx) { QUIC_DBG (2, "Called quic_disconnect_transport"); vnet_disconnect_args_t a = { - .handle = ctx->c_quic_ctx_id.quic_session, + .handle = ctx->c_quic_ctx_id.udp_session_handle, .app_index = quic_main.app_index, }; @@ -147,9 +92,9 @@ quic_disconnect_transport (quic_ctx_t * ctx) } static int -quic_send_datagram (session_t * session, quicly_datagram_t * packet) +quic_send_datagram (session_t * udp_session, quicly_datagram_t * packet) { - QUIC_DBG (2, "Called quic_send_datagram at %ld", quic_get_time (NULL)); + // QUIC_DBG (2, "Called quic_send_datagram at %ld", quic_get_time (NULL)); u32 max_enqueue; session_dgram_hdr_t hdr; int rv; @@ -158,8 +103,8 @@ quic_send_datagram (session_t * session, quicly_datagram_t * packet) transport_connection_t *tc; len = packet->data.len; - f = session->tx_fifo; - tc = session_get_transport (session); + f = udp_session->tx_fifo; + tc = session_get_transport (udp_session); max_enqueue = svm_fifo_max_enqueue (f); if (max_enqueue <= sizeof (session_dgram_hdr_t)) @@ -205,12 +150,21 @@ quic_send_packets (quic_ctx_t * ctx) { //QUIC_DBG (2, "Called quic_send_packets"); quicly_datagram_t *packets[16]; - session_t *quic_session; + session_t *udp_session; quicly_conn_t *conn; size_t num_packets, i; int ret; - quic_session = session_get_from_handle (ctx->c_quic_ctx_id.quic_session); + if (ctx->c_quic_ctx_id.is_stream) + { + // We have sctx, get qctx + ctx = quic_ctx_get (ctx->c_quic_ctx_id.quic_connection_ctx_id); + } + + ASSERT (!ctx->c_quic_ctx_id.is_stream); + + udp_session = + session_get_from_handle (ctx->c_quic_ctx_id.udp_session_handle); conn = ctx->c_quic_ctx_id.conn; if (!conn) @@ -223,7 +177,7 @@ quic_send_packets (quic_ctx_t * ctx) { for (i = 0; i != num_packets; ++i) { - if (quic_send_datagram (quic_session, packets[i])) + if (quic_send_datagram (udp_session, packets[i])) { QUIC_DBG (2, "quic_send_datagram failed"); goto stop_sending; @@ -242,8 +196,8 @@ quic_send_packets (quic_ctx_t * ctx) while (ret == 0 && num_packets == sizeof (packets) / sizeof (packets[0])); stop_sending: - if (svm_fifo_set_event (quic_session->tx_fifo)) - session_send_io_evt_to_thread (quic_session->tx_fifo, FIFO_EVENT_APP_TX); + if (svm_fifo_set_event (udp_session->tx_fifo)) + session_send_io_evt_to_thread (udp_session->tx_fifo, FIFO_EVENT_APP_TX); quic_update_timer (ctx); return 0; @@ -274,15 +228,15 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src, { QUIC_DBG (2, "received data: %lu bytes", len); u32 to_enqueue, ctx_id; - quic_ctx_t *ctx; - session_t *app_session; + quic_ctx_t *sctx; + session_t *stream_session; svm_fifo_t *rx_fifo; app_worker_t *app_wrk; - ctx_id = (u64) * quicly_get_data (stream->conn); - ctx = quic_ctx_get (ctx_id); - app_session = session_get_from_handle (ctx->c_quic_ctx_id.app_session); - rx_fifo = app_session->rx_fifo; + ctx_id = ((quic_stream_data_t *) stream->data)->ctx_id; + sctx = quic_ctx_get (ctx_id); + stream_session = session_get (sctx->c_s_index, vlib_get_thread_index ()); + rx_fifo = stream_session->rx_fifo; to_enqueue = svm_fifo_max_enqueue (rx_fifo); if (to_enqueue > len) to_enqueue = len; @@ -291,9 +245,10 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src, svm_fifo_enqueue_nowait (rx_fifo, to_enqueue, src); // Notify app - app_wrk = app_worker_get_if_valid (app_session->app_wrk_index); + app_wrk = app_worker_get_if_valid (stream_session->app_wrk_index); if (PREDICT_TRUE (app_wrk != 0)) - app_worker_lock_and_send_event (app_wrk, app_session, SESSION_IO_EVT_RX); + app_worker_lock_and_send_event (app_wrk, stream_session, + SESSION_IO_EVT_RX); return 0; } @@ -306,17 +261,91 @@ static const quicly_stream_callbacks_t quic_stream_callbacks = { .on_receive_reset = quic_on_receive_reset }; +static void +quic_accept_stream (void *s) +{ + quicly_stream_t *stream = (quicly_stream_t *) s; + session_t *stream_session, *quic_session; + quic_stream_data_t *stream_data; + app_worker_t *app_wrk; + quic_ctx_t *qctx, *sctx; + u32 qctx_id, sctx_id; + int rv; + + sctx_id = quic_ctx_alloc (); + + qctx_id = (u64) * quicly_get_data (stream->conn); + qctx = quic_ctx_get (qctx_id); + + stream_session = session_alloc (qctx->c_thread_index); + QUIC_DBG (1, "Created stream_session, id %u ctx %u", + stream_session->session_index, sctx_id); + + sctx = quic_ctx_get (sctx_id); + sctx->c_quic_ctx_id.parent_app_wrk_id = + qctx->c_quic_ctx_id.parent_app_wrk_id; + sctx->c_quic_ctx_id.parent_app_id = qctx->c_quic_ctx_id.parent_app_id; + sctx->c_quic_ctx_id.quic_connection_ctx_id = qctx->c_c_index; + sctx->c_c_index = sctx_id; + sctx->c_quic_ctx_id.is_stream = 1; + sctx->c_s_index = stream_session->session_index; + sctx->c_quic_ctx_id.stream = stream; + sctx->c_quic_ctx_id.stream_session_handle = session_handle (stream_session); + + quic_session = + session_get_from_handle (qctx->c_quic_ctx_id.quic_session_handle); + stream_data = (quic_stream_data_t *) stream->data; + stream_data->ctx_id = sctx_id; + + sctx->c_s_index = stream_session->session_index; + stream_session->session_state = SESSION_STATE_CREATED; + stream_session->app_wrk_index = sctx->c_quic_ctx_id.parent_app_wrk_id; + stream_session->connection_index = sctx->c_c_index; + stream_session->session_type = + session_type_from_proto_and_ip (TRANSPORT_PROTO_QUIC, + qctx->c_quic_ctx_id.udp_is_ip4); + stream_session->opaque = QUIC_SESSION_TYPE_STREAM; + stream_session->listener_index = quic_session->session_index; + stream_session->app_index = sctx->c_quic_ctx_id.parent_app_id; + + app_wrk = app_worker_get (stream_session->app_wrk_index); + if ((rv = app_worker_init_connected (app_wrk, stream_session))) + { + QUIC_DBG (1, "failed to allocate fifos"); + session_free (stream_session); + quicly_reset_stream (stream, 0x30001); + return; + } + + rv = app_worker_accept_notify (app_wrk, stream_session); + if (rv) + { + QUIC_DBG (1, "failed to notify accept worker app"); + session_free_w_fifos (stream_session); + quicly_reset_stream (stream, 0x30002); + return; + } + session_lookup_add_connection (&sctx->connection, + session_handle (stream_session)); +} + static int quic_on_stream_open (quicly_stream_open_cb * self, quicly_stream_t * stream) { QUIC_DBG (2, "on_stream_open called"); int ret; if ((ret = - quicly_streambuf_create (stream, sizeof (quicly_streambuf_t))) != 0) + quicly_streambuf_create (stream, sizeof (quic_stream_data_t))) != 0) { return ret; } stream->callbacks = &quic_stream_callbacks; + // Notify accept on parent qsession, but only if this is not a locally + // initiated stream + if (!quicly_stream_is_self_initiated (stream)) + { + quic_accept_stream (stream); + } return 0; } @@ -329,7 +358,8 @@ quic_on_conn_close (quicly_closed_by_peer_cb * self, quicly_conn_t * conn, { QUIC_DBG (2, "connection closed, reason: %s", reason); u32 ctx_index = (u64) * quicly_get_data (conn); - quic_connection_closed (ctx_index); + quic_ctx_t *ctx = quic_ctx_get (ctx_index); + session_transport_closing_notify (&ctx->connection); } static quicly_closed_by_peer_cb on_closed_by_peer = { &quic_on_conn_close }; @@ -392,13 +422,13 @@ encrypt_ticket_cb (ptls_encrypt_ticket_t * _self, ptls_t * tls, return 0; } +/* *INDENT-OFF* */ static struct st_util_session_cache_t sc = { .super = { - .cb = encrypt_ticket_cb, - }, + .cb = encrypt_ticket_cb, + }, }; -/* *INDENT-OFF* */ static ptls_context_t quic_tlsctx = { .random_bytes = ptls_openssl_random_bytes, .get_time = &ptls_get_time, @@ -499,7 +529,7 @@ ptls_get_bio_pem_object (BIO * bio, const char *label, ptls_buffer_t * buf) return ret; } -int +static int ptls_load_bio_pem_objects (BIO * bio, const char *label, ptls_iovec_t * list, size_t list_max, size_t * nb_objects) { @@ -551,7 +581,7 @@ ptls_load_bio_pem_objects (BIO * bio, const char *label, ptls_iovec_t * list, #define PTLS_MAX_CERTS_IN_CONTEXT 16 -int +static int ptls_load_bio_certificates (ptls_context_t * ctx, BIO * bio) { int ret = 0; @@ -619,13 +649,17 @@ quic_connection_closed (u32 ctx_index) quic_ctx_t *ctx; ctx = quic_ctx_get (ctx_index); - // TODO if connection is not established, just delete the session + ASSERT (!ctx->c_quic_ctx_id.is_stream); + // TODO if connection is not established, just delete the session? + + // TODO: close all streams? or is the streams closed cb called by quicly? + + session_transport_delete_notify (&ctx->connection); // Do not try to send anything anymore - ctx->stream = NULL; quicly_free (ctx->c_quic_ctx_id.conn); ctx->c_quic_ctx_id.conn = NULL; - session_transport_closing_notify (&ctx->connection); + quic_ctx_free (ctx); } static int64_t @@ -663,7 +697,7 @@ allocate_quicly_ctx (application_t * app, u8 is_client) quicly_amend_ptls_context (quicly_ctx->tls); - quicly_ctx->event_log.mask = INT64_MAX; + quicly_ctx->event_log.mask = 0; quicly_ctx->event_log.cb = quicly_new_default_event_log_cb (stderr); quicly_ctx->transport_params.max_data = QUIC_INT_MAX; @@ -714,7 +748,7 @@ quic_timer_expired (u32 conn_index) QUIC_DBG (2, "Timer expired for conn %u at %ld", conn_index, quic_get_time (NULL)); ctx = quic_ctx_get (conn_index); - ctx->c_quic_ctx_id.timer_handle = QUIC_TIMER_HANDLE_INVALID; + ctx->timer_handle = QUIC_TIMER_HANDLE_INVALID; if (quic_send_packets (ctx)) { quic_connection_closed (conn_index); @@ -732,13 +766,13 @@ quic_update_timer (quic_ctx_t * ctx) tw = &quic_main.wrk_ctx[vlib_get_thread_index ()].timer_wheel; f64 next_timeout_f = ((f64) next_timeout) / 1000.f; - clib_warning ("Timer set to %ld (%lf)", next_timeout, next_timeout_f); + // clib_warning ("Timer set to %ld (%lf)", next_timeout, next_timeout_f); - if (ctx->c_quic_ctx_id.timer_handle == QUIC_TIMER_HANDLE_INVALID) + if (ctx->timer_handle == QUIC_TIMER_HANDLE_INVALID) { if (next_timeout == INT64_MAX) return; - ctx->c_quic_ctx_id.timer_handle = + ctx->timer_handle = tw_timer_start_1t_3w_1024sl_ov (tw, ctx->c_c_index, 0, next_timeout_f); } @@ -746,11 +780,11 @@ quic_update_timer (quic_ctx_t * ctx) { if (next_timeout == INT64_MAX) { - tw_timer_stop_1t_3w_1024sl_ov (tw, ctx->c_quic_ctx_id.timer_handle); - ctx->c_quic_ctx_id.timer_handle = QUIC_TIMER_HANDLE_INVALID; + tw_timer_stop_1t_3w_1024sl_ov (tw, ctx->timer_handle); + ctx->timer_handle = QUIC_TIMER_HANDLE_INVALID; } else - tw_timer_update_1t_3w_1024sl_ov (tw, ctx->c_quic_ctx_id.timer_handle, + tw_timer_update_1t_3w_1024sl_ov (tw, ctx->timer_handle, next_timeout_f); } } @@ -773,12 +807,130 @@ quic_expired_timers_dispatch (u32 * expired_timers) * BEGIN TRANSPORT PROTO FUNCTIONS *****************************************************************************/ -int +static int quic_connect (transport_endpoint_cfg_t * tep) { QUIC_DBG (2, "Called quic_connect"); - vnet_connect_args_t _cargs = { {}, }, *cargs = &_cargs; session_endpoint_cfg_t *sep; + int connect_stream = 0; + + sep = (session_endpoint_cfg_t *) tep; + + if (sep->port == 0) + { + // TODO: better logic to detect if this is a stream or a connection request + connect_stream = 1; + } + + if (connect_stream) + { + return quic_connect_new_stream (sep); + } + else + { + return quic_connect_new_connection (sep); + } +} + +static int +quic_connect_new_stream (session_endpoint_cfg_t * sep) +{ + uint64_t quic_session_handle; + session_t *quic_session, *stream_session; + quic_stream_data_t *stream_data; + quicly_stream_t *stream; + quicly_conn_t *conn; + app_worker_t *app_wrk; + quic_ctx_t *qctx, *sctx; + u32 sctx_index; + int rv; + + // Find base session to which the user want to attach a stream + quic_session_handle = sep->transport_opts; + QUIC_DBG (2, "Opening new stream (qsession %u)", sep->transport_opts); + quic_session = session_get_from_handle (quic_session_handle); + + if (quic_session->session_type != + session_type_from_proto_and_ip (TRANSPORT_PROTO_QUIC, sep->is_ip4)) + { + QUIC_DBG (1, "received incompatible session"); + return -1; + } + + sctx_index = quic_ctx_alloc (); // Allocate before we get pointers + sctx = quic_ctx_get (sctx_index); + qctx = quic_ctx_get (quic_session->connection_index); + if (qctx->c_quic_ctx_id.is_stream) + { + QUIC_DBG (1, "session is a stream"); + quic_ctx_free (sctx); + return -1; + } + + sctx->c_quic_ctx_id.parent_app_wrk_id = + qctx->c_quic_ctx_id.parent_app_wrk_id; + sctx->c_quic_ctx_id.parent_app_id = qctx->c_quic_ctx_id.parent_app_id; + sctx->c_quic_ctx_id.quic_connection_ctx_id = qctx->c_c_index; + sctx->c_c_index = sctx_index; + sctx->c_quic_ctx_id.is_stream = 1; + + conn = qctx->c_quic_ctx_id.conn; + + if (!conn || !quicly_connection_is_ready (conn)) + return -1; + + if ((rv = quicly_open_stream (conn, &stream, 0))) + { + QUIC_DBG (2, "Stream open failed with %d", rv); + return -1; + } + sctx->c_quic_ctx_id.stream = stream; + + QUIC_DBG (2, "Opened stream %d, creating session", stream->stream_id); + + app_wrk = app_worker_get_if_valid (quic_session->app_wrk_index); + + stream_session = session_alloc (qctx->c_thread_index); + QUIC_DBG (1, "Created stream_session, id %u ctx %u", + stream_session->session_index, sctx_index); + stream_session->app_wrk_index = quic_session->app_wrk_index; + stream_session->connection_index = sctx_index; + stream_session->session_type = + session_type_from_proto_and_ip (TRANSPORT_PROTO_QUIC, + qctx->c_quic_ctx_id.udp_is_ip4); + stream_session->opaque = QUIC_SESSION_TYPE_STREAM; + sctx->c_s_index = stream_session->session_index; + sctx->c_quic_ctx_id.stream_session_handle = session_handle (stream_session); + + if (app_worker_init_connected (app_wrk, stream_session)) + { + QUIC_DBG (1, "failed to app_worker_init_connected"); + quicly_reset_stream (stream, 0x30003); + session_free_w_fifos (stream_session); + quic_ctx_free (sctx); + return app_worker_connect_notify (app_wrk, NULL, sep->opaque); + } + + stream_session->session_state = SESSION_STATE_READY; + if (app_worker_connect_notify (app_wrk, stream_session, sep->opaque)) + { + QUIC_DBG (1, "failed to notify app"); + quicly_reset_stream (stream, 0x30004); + session_free_w_fifos (stream_session); + quic_ctx_free (sctx); + return -1; + } + session_lookup_add_connection (&sctx->connection, + session_handle (stream_session)); + stream_data = (quic_stream_data_t *) stream->data; + stream_data->ctx_id = sctx->c_c_index; + return 0; +} + +static int +quic_connect_new_connection (session_endpoint_cfg_t * sep) +{ + vnet_connect_args_t _cargs = { {}, }, *cargs = &_cargs; quic_main_t *qm = &quic_main; quic_ctx_t *ctx; app_worker_t *app_wrk; @@ -786,14 +938,13 @@ quic_connect (transport_endpoint_cfg_t * tep) u32 ctx_index; int error; - sep = (session_endpoint_cfg_t *) tep; - ctx_index = quic_ctx_half_open_alloc (); - ctx = quic_ctx_half_open_get (ctx_index); - ctx->c_quic_ctx_id.parent_app_wrk_idx = sep->app_wrk_index; + ctx_index = quic_ctx_alloc (); + ctx = quic_ctx_get (ctx_index); + ctx->c_quic_ctx_id.parent_app_wrk_id = sep->app_wrk_index; ctx->c_s_index = 0xFAFAFAFA; ctx->c_quic_ctx_id.udp_is_ip4 = sep->is_ip4; - ctx->c_quic_ctx_id.timer_handle = QUIC_TIMER_HANDLE_INVALID; - ctx->c_quic_ctx_id.conn_state = QUIC_CONN_STATE_HANDSHAKE; + ctx->timer_handle = QUIC_TIMER_HANDLE_INVALID; + ctx->conn_state = QUIC_CONN_STATE_HANDSHAKE; ctx->client_opaque = sep->opaque; if (sep->hostname) { @@ -807,8 +958,6 @@ quic_connect (transport_endpoint_cfg_t * tep) format (0, "%U", format_ip46_address, &sep->ip, sep->is_ip4); } - quic_ctx_half_open_reader_unlock (); - clib_memcpy (&cargs->sep, sep, sizeof (session_endpoint_cfg_t)); cargs->sep.transport_proto = TRANSPORT_PROTO_UDP; cargs->app_index = qm->app_index; @@ -832,32 +981,37 @@ static void quic_disconnect (u32 ctx_index, u32 thread_index) { QUIC_DBG (2, "Called quic_disconnect"); - tw_timer_wheel_1t_3w_1024sl_ov_t *tw; + //tw_timer_wheel_1t_3w_1024sl_ov_t *tw; quic_ctx_t *ctx; - QUIC_DBG (1, "Disconnecting %x", ctx_index); + QUIC_DBG (1, "Closing connection %x", ctx_index); ctx = quic_ctx_get (ctx_index); - if (ctx->c_quic_ctx_id.timer_handle != QUIC_TIMER_HANDLE_INVALID) + if (ctx->c_quic_ctx_id.is_stream) { - tw = &quic_main.wrk_ctx[vlib_get_thread_index ()].timer_wheel; - tw_timer_stop_1t_3w_1024sl_ov (tw, ctx->c_quic_ctx_id.timer_handle); + quicly_stream_t *stream = ctx->c_quic_ctx_id.stream; + quicly_reset_stream (stream, 0x30000); + session_transport_delete_notify (&ctx->connection); + quic_ctx_free (ctx); + } + else + { + quicly_conn_t *conn = ctx->c_quic_ctx_id.conn; + // Start connection closing. Keep sending packets until quicly_send + // returns QUICLY_ERROR_FREE_CONNECTION + quicly_close (conn, 0, ""); + quic_send_packets (ctx); } - quic_disconnect_transport (ctx); - // This removes the session from the lookup table and frees it. - session_transport_delete_notify (&ctx->connection); - quic_ctx_free (ctx); } -u32 -quic_start_listen (u32 app_listen_session_index, transport_endpoint_t * tep) +static u32 +quic_start_listen (u32 quic_listen_session_index, transport_endpoint_t * tep) { - QUIC_DBG (2, "Called quic_start_listen"); vnet_listen_args_t _bargs, *args = &_bargs; quic_main_t *qm = &quic_main; session_handle_t udp_handle; session_endpoint_cfg_t *sep; - session_t *quic_listen_session, *app_listen_session; + session_t *udp_listen_session, *quic_listen_session; app_worker_t *app_wrk; application_t *app; quic_ctx_t *lctx; @@ -866,7 +1020,11 @@ quic_start_listen (u32 app_listen_session_index, transport_endpoint_t * tep) sep = (session_endpoint_cfg_t *) tep; app_wrk = app_worker_get (sep->app_wrk_index); + // We need to call this because we call app_worker_init_connected in + // quic_accept_stream, which assumes the connect segment manager exists + app_worker_alloc_connects_segment_manager (app_wrk); app = application_get (app_wrk->app_index); + QUIC_DBG (2, "Called quic_start_listen for app %d", app_wrk->app_index); allocate_quicly_ctx (app, 0 /* is_client */ ); @@ -881,25 +1039,26 @@ quic_start_listen (u32 app_listen_session_index, transport_endpoint_t * tep) lctx_index = quic_ctx_alloc (); // listener udp_handle = args->handle; app_listener = app_listener_get_w_handle (udp_handle); - quic_listen_session = app_listener_get_session (app_listener); - quic_listen_session->opaque = lctx_index; + udp_listen_session = app_listener_get_session (app_listener); + udp_listen_session->opaque = lctx_index; - app_listen_session = listen_session_get (app_listen_session_index); + quic_listen_session = listen_session_get (quic_listen_session_index); + quic_listen_session->opaque = QUIC_SESSION_TYPE_LISTEN; lctx = quic_ctx_get (lctx_index); // listener lctx->is_listener = 1; - lctx->c_quic_ctx_id.parent_app_wrk_idx = sep->app_wrk_index; + lctx->c_quic_ctx_id.parent_app_wrk_id = sep->app_wrk_index; lctx->c_quic_ctx_id.parent_app_id = app_wrk->app_index; - lctx->c_quic_ctx_id.quic_session = udp_handle; - lctx->c_quic_ctx_id.app_session = - listen_session_get_handle (app_listen_session); + lctx->c_quic_ctx_id.udp_session_handle = udp_handle; + lctx->c_quic_ctx_id.quic_session_handle = + listen_session_get_handle (quic_listen_session); lctx->c_quic_ctx_id.udp_is_ip4 = sep->is_ip4; QUIC_DBG (1, "Started listening %d", lctx_index); return lctx_index; } -u32 +static u32 quic_stop_listen (u32 lctx_index) { QUIC_DBG (2, "Called quic_stop_listen"); @@ -907,7 +1066,7 @@ quic_stop_listen (u32 lctx_index) lctx = quic_ctx_get (lctx_index); // listener vnet_unlisten_args_t a = { - .handle = lctx->c_quic_ctx_id.quic_session, + .handle = lctx->c_quic_ctx_id.udp_session_handle, .app_index = quic_main.app_index, .wrk_map_index = 0 /* default wrk */ }; @@ -920,7 +1079,7 @@ quic_stop_listen (u32 lctx_index) return 0; } -transport_connection_t * +static transport_connection_t * quic_connection_get (u32 ctx_index, u32 thread_index) { QUIC_DBG (2, "Called quic_connection_get"); @@ -929,7 +1088,7 @@ quic_connection_get (u32 ctx_index, u32 thread_index) return &ctx->connection; } -transport_connection_t * +static transport_connection_t * quic_listener_get (u32 listener_index) { QUIC_DBG (2, "Called quic_listener_get"); @@ -951,7 +1110,7 @@ quic_update_time (f64 now, u8 thread_index) static u8 * format_quic_connection (u8 * s, va_list * args) { - s = format (s, "[QUIC] connection"); + s = format (s, "[QUIC] connection"); //TODO return s; } @@ -959,9 +1118,8 @@ static u8 * format_quic_half_open (u8 * s, va_list * args) { u32 qc_index = va_arg (*args, u32); - quic_ctx_t *ctx = quic_ctx_half_open_get (qc_index); + quic_ctx_t *ctx = quic_ctx_get (qc_index); s = format (s, "[QUIC] half-open app %u", ctx->c_quic_ctx_id.parent_app_id); - quic_ctx_half_open_reader_unlock (); return s; } @@ -969,7 +1127,7 @@ format_quic_half_open (u8 * s, va_list * args) static u8 * format_quic_listener (u8 * s, va_list * args) { - s = format (s, "[QUIC] listener"); + s = format (s, "[QUIC] listener"); // TODO return s; } @@ -1003,89 +1161,90 @@ quic_build_sockaddr (struct sockaddr *sa, socklen_t * salen, } static int -quic_delayed_notify_app_connected (void *ctx_index) +quic_notify_app_connected (quic_ctx_t * ctx) { QUIC_DBG (1, "quic_notify_app_connected"); - session_t *app_session; + session_t *quic_session; app_worker_t *app_wrk; - quic_ctx_t *ctx; - ctx = quic_ctx_get ((u32) (u64) ctx_index); + u32 ctx_id = ctx->c_c_index; - app_wrk = app_worker_get_if_valid (ctx->c_quic_ctx_id.parent_app_wrk_idx); + app_wrk = app_worker_get_if_valid (ctx->c_quic_ctx_id.parent_app_wrk_id); if (!app_wrk) { quic_disconnect_transport (ctx); return -1; } - app_session = session_alloc (ctx->c_thread_index); - QUIC_DBG (1, "Created app_session, id %u", app_session->session_index); - ctx->c_s_index = app_session->session_index; - app_session->app_wrk_index = ctx->c_quic_ctx_id.parent_app_wrk_idx; - app_session->connection_index = ctx->c_c_index; - app_session->session_type = + quic_session = session_alloc (ctx->c_thread_index); + + QUIC_DBG (1, "Created quic_session, id %u", quic_session->session_index); + ctx->c_s_index = quic_session->session_index; + quic_session->app_wrk_index = ctx->c_quic_ctx_id.parent_app_wrk_id; + quic_session->connection_index = ctx->c_c_index; + quic_session->opaque = QUIC_SESSION_TYPE_QUIC; + quic_session->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_QUIC, ctx->c_quic_ctx_id.udp_is_ip4); - if (app_worker_init_connected (app_wrk, app_session)) // TODO dont allocate fifos + if (app_worker_init_connected (app_wrk, quic_session)) { + QUIC_DBG (1, "failed to app_worker_init_connected"); quic_disconnect (ctx->c_c_index, vlib_get_thread_index ()); return app_worker_connect_notify (app_wrk, NULL, ctx->client_opaque); } - app_session->session_state = SESSION_STATE_CONNECTING; - if (app_worker_connect_notify (app_wrk, app_session, ctx->client_opaque)) + quic_session->session_state = SESSION_STATE_CONNECTING; + if (app_worker_connect_notify (app_wrk, quic_session, ctx->client_opaque)) { QUIC_DBG (1, "failed to notify app"); quic_disconnect (ctx->c_c_index, vlib_get_thread_index ()); return -1; } - ctx->c_quic_ctx_id.app_session = session_handle (app_session); - app_session->session_state = SESSION_STATE_LISTENING; + // If the app opens a stream in its callback it may invalidate ctx + ctx = quic_ctx_get (ctx_id); + ctx->c_quic_ctx_id.quic_session_handle = session_handle (quic_session); + quic_session->session_state = SESSION_STATE_LISTENING; session_lookup_add_connection (&ctx->connection, - session_handle (app_session)); + session_handle (quic_session)); return 0; } -int -quic_session_connected_callback (u32 quic_app_index, u32 ho_ctx_idx, - session_t * s, u8 is_fail) +static int +quic_session_connected_callback (u32 quic_app_index, u32 ctx_index, + session_t * udp_session, u8 is_fail) { - QUIC_DBG (2, "Called quic_session_connected_callback"); + QUIC_DBG (2, "QSession is now connected (id %u)", + udp_session->session_index); // This should always be called before quic_connect returns since UDP always // connects instantly. struct sockaddr_in6 sa6; struct sockaddr *sa = (struct sockaddr *) &sa6; socklen_t salen; transport_connection_t *tc; - quic_ctx_t *ho_ctx, *ctx; - u32 ctx_index; + quic_ctx_t *ctx; int ret; application_t *app; app_worker_t *app_wrk; - ho_ctx = quic_ctx_half_open_get (ho_ctx_idx); + ctx = quic_ctx_get (ctx_index); if (is_fail) { u32 api_context; int rv = 0; app_wrk = - app_worker_get_if_valid (ho_ctx->c_quic_ctx_id.parent_app_wrk_idx); + app_worker_get_if_valid (ctx->c_quic_ctx_id.parent_app_wrk_id); if (app_wrk) { - api_context = ho_ctx->c_s_index; + api_context = ctx->c_s_index; app_worker_connect_notify (app_wrk, 0, api_context); } - quic_ctx_half_open_reader_unlock (); - quic_ctx_half_open_free (ho_ctx_idx); return rv; } - app_wrk = - app_worker_get_if_valid (ho_ctx->c_quic_ctx_id.parent_app_wrk_idx); + app_wrk = app_worker_get_if_valid (ctx->c_quic_ctx_id.parent_app_wrk_id); if (!app_wrk) { QUIC_DBG (1, "Appwrk not found"); @@ -1093,25 +1252,19 @@ quic_session_connected_callback (u32 quic_app_index, u32 ho_ctx_idx, } app = application_get (app_wrk->app_index); - ctx_index = quic_ctx_alloc (); - ctx = quic_ctx_get (ctx_index); - clib_memcpy (ctx, ho_ctx, sizeof (*ctx)); - quic_ctx_half_open_reader_unlock (); // TODO: this is a race - quic_ctx_half_open_free (ho_ctx_idx); - ctx->c_thread_index = vlib_get_thread_index (); ctx->c_c_index = ctx_index; - QUIC_DBG (1, "Quic connect for returned %u. New connection [%u]%x", + QUIC_DBG (1, "Quic connect returned %u. New ctx [%u]%x", is_fail, vlib_get_thread_index (), (ctx) ? ctx_index : ~0); - ctx->c_quic_ctx_id.quic_session = session_handle (s); - s->opaque = ctx_index; - s->session_state = SESSION_STATE_READY; + ctx->c_quic_ctx_id.udp_session_handle = session_handle (udp_session); + udp_session->opaque = ctx_index; + udp_session->session_state = SESSION_STATE_READY; // Init QUIC lib connection // Generate required sockaddr & salen - tc = session_get_transport (s); + tc = session_get_transport (udp_session); quic_build_sockaddr (sa, &salen, &tc->rmt_ip, tc->rmt_port, tc->is_ip4); ret = @@ -1124,20 +1277,22 @@ quic_session_connected_callback (u32 quic_app_index, u32 ho_ctx_idx, *quicly_get_data (ctx->c_quic_ctx_id.conn) = (void *) (u64) ctx_index; assert (ret == 0); - if (quic_send_packets (ctx)) + int rv = quic_send_packets (ctx); + if (rv) { + QUIC_DBG (1, "Error sending packets %d, closing connection", rv); quic_connection_closed (ctx_index); } return ret; } -void +static void quic_session_disconnect_callback (session_t * s) { clib_warning ("UDP session disconnected???"); } -void +static void quic_session_reset_callback (session_t * s) { clib_warning ("UDP session reset???"); @@ -1168,75 +1323,52 @@ quic_del_segment_callback (u32 client_index, u64 seg_handle) return 0; } -int -quic_add_vpp_q_builtin_tx_evt (session_t * s) -{ - if (svm_fifo_set_event (s->tx_fifo)) - session_send_io_evt_to_thread_custom (s, s->thread_index, - FIFO_EVENT_BUILTIN_TX); - return 0; -} - -void -quic_open_stream_if_ready (quic_ctx_t * ctx) -{ - quicly_conn_t *conn = ctx->c_quic_ctx_id.conn; - if (ctx->stream) - { - QUIC_DBG (2, "----------- > FOUND Stream id %d", - ctx->stream->stream_id); - QUIC_DBG (2, "----------- > FOUND Stream is_open %d", - ctx->stream->sendstate.is_open); - return; - } - if (quicly_connection_is_ready (conn)) - assert (!quicly_open_stream (conn, &ctx->stream, 0)); - QUIC_DBG (2, "Stream id %d", ctx->stream->stream_id); - QUIC_DBG (2, "Stream is_open %d", ctx->stream->sendstate.is_open); -} - -int -quic_custom_tx_callback (void *session) +static int +quic_custom_tx_callback (void *s) { QUIC_DBG (2, "Called quic_custom_tx_callback"); - session_t *app_session = (session_t *) session; + session_t *stream_session = (session_t *) s; quic_ctx_t *ctx; svm_fifo_t *f; + quicly_stream_t *stream; u32 deq_max; u8 *data; if (PREDICT_FALSE - (app_session->session_state >= SESSION_STATE_TRANSPORT_CLOSING)) + (stream_session->session_state >= SESSION_STATE_TRANSPORT_CLOSING)) return 0; - ctx = quic_ctx_get (app_session->connection_index); - quic_open_stream_if_ready (ctx); - if (!ctx->stream) + ctx = quic_ctx_get (stream_session->connection_index); + if (PREDICT_FALSE (!ctx->c_quic_ctx_id.is_stream)) { - quic_add_vpp_q_builtin_tx_evt (app_session); - return 0; + QUIC_DBG (1, "Error: trying to send on quic session not stream"); + return -1; } - f = app_session->tx_fifo; + stream = ctx->c_quic_ctx_id.stream; + + f = stream_session->tx_fifo; deq_max = svm_fifo_max_dequeue (f); if (!deq_max) return 0; data = svm_fifo_head (f); - if (quicly_streambuf_egress_write (ctx->stream, data, deq_max)) + if (quicly_streambuf_egress_write (stream, data, deq_max)) { assert (0); return 0; } QUIC_DBG (2, "Sent %u bytes", deq_max); svm_fifo_dequeue_drop (f, deq_max); - if (quic_send_packets (ctx)) + int rv = quic_send_packets (ctx); + if (rv) { - quic_connection_closed (ctx->c_c_index); + QUIC_DBG (1, "TX error sending packets %d, closing connection", rv); + quic_connection_closed (ctx->c_quic_ctx_id.quic_connection_ctx_id); } return 0; } -int +static inline int quic_find_packet_ctx (quic_ctx_t ** ctx, quicly_conn_t ** conn, struct sockaddr *sa, socklen_t salen, quicly_decoded_packet_t packet) @@ -1247,13 +1379,13 @@ quic_find_packet_ctx (quic_ctx_t ** ctx, quicly_conn_t ** conn, pool_foreach (ctx_, quic_main.ctx_pool[vlib_get_thread_index()], ({ conn_ = ctx_->c_quic_ctx_id.conn; - if (conn_ && !ctx_->is_listener) + if (!ctx_->c_quic_ctx_id.is_stream && conn_ && !ctx_->is_listener) { if (quicly_is_destination(conn_, sa, salen, &packet)) { *conn = conn_; *ctx = ctx_; - QUIC_DBG (2, "connection_found"); + // QUIC_DBG (2, "connection_found"); return 0; } } @@ -1266,19 +1398,21 @@ static int quic_receive (quic_ctx_t * ctx, quicly_conn_t * conn, quicly_decoded_packet_t packet) { + u32 ctx_id = ctx->c_c_index; quicly_receive (conn, &packet); + // ctx pointer may change if a new stream is opened + ctx = quic_ctx_get (ctx_id); // Conn may be set to null if the connection is terminated - if (ctx->c_quic_ctx_id.conn - && ctx->c_quic_ctx_id.conn_state == QUIC_CONN_STATE_HANDSHAKE) + if (ctx->c_quic_ctx_id.conn && ctx->conn_state == QUIC_CONN_STATE_HANDSHAKE) { if (quicly_connection_is_ready (conn)) { - ctx->c_quic_ctx_id.conn_state = QUIC_CONN_STATE_READY; + ctx->conn_state = QUIC_CONN_STATE_READY; if (quicly_is_client (conn)) - session_send_rpc_evt_to_thread_force (vlib_get_thread_index (), - &quic_delayed_notify_app_connected, - (void *) (u64) - ctx->c_c_index); + { + quic_notify_app_connected (ctx); + ctx = quic_ctx_get (ctx_id); + } } } if (quic_send_packets (ctx)) @@ -1289,43 +1423,48 @@ quic_receive (quic_ctx_t * ctx, quicly_conn_t * conn, } static int -quic_delayed_create_app_session (void *ctx_index) +quic_create_quic_session (quic_ctx_t * ctx) { - quic_ctx_t *lctx, *ctx; - session_t *app_session, *app_listen_session; + session_t *quic_session, *quic_listen_session; app_worker_t *app_wrk; + quic_ctx_t *lctx; int rv; - ctx = quic_ctx_get ((u32) (u64) ctx_index); - app_session = session_alloc (ctx->c_thread_index); - app_session->session_state = SESSION_STATE_LISTENING; - ctx->c_s_index = app_session->session_index; + quic_session = session_alloc (ctx->c_thread_index); + QUIC_DBG (1, "Created quic session, id %u ctx %u", + quic_session->session_index, ctx->c_c_index); + quic_session->session_state = SESSION_STATE_LISTENING; + ctx->c_s_index = quic_session->session_index; lctx = quic_ctx_get (ctx->c_quic_ctx_id.listener_ctx_id); - app_listen_session = - listen_session_get_from_handle (lctx->c_quic_ctx_id.app_session); - app_session->app_wrk_index = lctx->c_quic_ctx_id.parent_app_wrk_idx; - app_session->connection_index = ctx->c_c_index; - app_session->session_type = app_listen_session->session_type; - app_session->listener_index = app_listen_session->session_index; - app_session->app_index = quic_main.app_index; + quic_listen_session = + listen_session_get_from_handle (lctx->c_quic_ctx_id.quic_session_handle); + quic_session->app_wrk_index = lctx->c_quic_ctx_id.parent_app_wrk_id; + quic_session->connection_index = ctx->c_c_index; + quic_session->session_type = + session_type_from_proto_and_ip (TRANSPORT_PROTO_QUIC, + ctx->c_quic_ctx_id.udp_is_ip4); + quic_session->listener_index = quic_listen_session->session_index; + quic_session->app_index = quic_main.app_index; + quic_session->opaque = QUIC_SESSION_TYPE_QUIC; // TODO: don't alloc fifos when we don't transfer data on this session - if ((rv = app_worker_init_accepted (app_session))) + // but we still need fifos for the events? + if ((rv = app_worker_init_accepted (quic_session))) { QUIC_DBG (1, "failed to allocate fifos"); - session_free (app_session); + session_free (quic_session); return rv; } - ctx->c_quic_ctx_id.app_session = session_handle (app_session); + ctx->c_quic_ctx_id.quic_session_handle = session_handle (quic_session); ctx->c_quic_ctx_id.parent_app_id = lctx->c_quic_ctx_id.parent_app_id; ctx->c_quic_ctx_id.udp_is_ip4 = lctx->c_quic_ctx_id.udp_is_ip4; - ctx->c_quic_ctx_id.parent_app_wrk_idx = app_session->app_wrk_index; + ctx->c_quic_ctx_id.parent_app_wrk_id = quic_session->app_wrk_index; session_lookup_add_connection (&ctx->connection, - session_handle (app_session)); - app_wrk = app_worker_get (app_session->app_wrk_index); - rv = app_worker_accept_notify (app_wrk, app_session); + session_handle (quic_session)); + app_wrk = app_worker_get (quic_session->app_wrk_index); + rv = app_worker_accept_notify (app_wrk, quic_session); if (rv) { QUIC_DBG (1, "failed to notify accept worker app"); @@ -1336,23 +1475,24 @@ quic_delayed_create_app_session (void *ctx_index) static int quic_create_connection (quicly_context_t * quicly_ctx, - u64 quic_session_handle, u32 lctx_index, - quicly_conn_t * conn, struct sockaddr *sa, + u64 udp_session_handle, u32 lctx_index, + struct sockaddr *sa, socklen_t salen, quicly_decoded_packet_t packet) { quic_ctx_t *ctx; u32 ctx_index; + quicly_conn_t *conn; + int rv; /* new connection, accept and create context if packet is valid */ // TODO: check if socket is actually listening? - QUIC_DBG (2, "New connection created"); - if (quicly_accept (&conn, quicly_ctx, sa, salen, - &packet, ptls_iovec_init (NULL, 0), - &quic_main.next_cid, NULL) != 0) + if ((rv = quicly_accept (&conn, quicly_ctx, sa, salen, + &packet, ptls_iovec_init (NULL, 0), + &quic_main.next_cid, NULL))) { // Invalid packet, pass assert (conn == NULL); - QUIC_DBG (2, "Accept failed"); + QUIC_DBG (2, "Accept failed with %d", rv); return 0; } assert (conn != NULL); @@ -1366,14 +1506,13 @@ quic_create_connection (quicly_context_t * quicly_ctx, ctx->c_thread_index = vlib_get_thread_index (); ctx->c_c_index = ctx_index; - ctx->c_quic_ctx_id.quic_session = quic_session_handle; + ctx->c_quic_ctx_id.udp_session_handle = udp_session_handle; ctx->c_quic_ctx_id.listener_ctx_id = lctx_index; - ctx->c_quic_ctx_id.timer_handle = QUIC_TIMER_HANDLE_INVALID; + ctx->timer_handle = QUIC_TIMER_HANDLE_INVALID; ctx->c_quic_ctx_id.conn = conn; - session_send_rpc_evt_to_thread_force (vlib_get_thread_index (), - &quic_delayed_create_app_session, - (void *) (u64) ctx_index); + quic_create_quic_session (ctx); + if (quic_send_packets (ctx)) { quic_connection_closed (ctx_index); @@ -1382,34 +1521,34 @@ quic_create_connection (quicly_context_t * quicly_ctx, } static int -quic_reset_connection (quicly_context_t * quicly_ctx, u64 quic_session_handle, +quic_reset_connection (quicly_context_t * quicly_ctx, u64 udp_session_handle, struct sockaddr *sa, socklen_t salen, quicly_decoded_packet_t packet) { - /* short header packet; potentially a dead connection. No need to check the length of the incoming packet, - * because loop is prevented by authenticating the CID (by checking node_id and thread_id). If the peer is also - * sending a reset, then the next CID is highly likely to contain a non-authenticating CID, ... */ + /* short header packet; potentially a dead connection. No need to check the + * length of the incoming packet, because loop is prevented by authenticating + * the CID (by checking node_id and thread_id). If the peer is also sending a + * reset, then the next CID is highly likely to contain a non-authenticating + * CID, ... */ QUIC_DBG (2, "Sending stateless reset"); quicly_datagram_t *dgram; - session_t *quic_session; + session_t *udp_session; if (packet.cid.dest.plaintext.node_id == 0 && packet.cid.dest.plaintext.thread_id == 0) { dgram = quicly_send_stateless_reset (quicly_ctx, sa, salen, &packet.cid.dest.plaintext); - quic_session = session_get_from_handle (quic_session_handle); - if (quic_send_datagram (quic_session, dgram)) // TODO : missing event on fifo + udp_session = session_get_from_handle (udp_session_handle); + if (quic_send_datagram (udp_session, dgram)) // TODO : set event on fifo QUIC_DBG (2, "Send reset failed"); } return 0; } -int -quic_app_rx_callback (session_t * quic_session) +static int +quic_app_rx_callback (session_t * udp_session) { // Read data from UDP rx_fifo and pass it to the quicly conn. - QUIC_DBG (2, "Called quic_app_rx_callback"); - quicly_decoded_packet_t packet; session_dgram_hdr_t ph; application_t *app; @@ -1422,10 +1561,15 @@ quic_app_rx_callback (session_t * quic_session) socklen_t salen; u32 max_deq, len; u8 *data; - u32 lctx_index = quic_session->opaque; - u64 quic_session_handle = session_handle (quic_session); + u32 lctx_index = udp_session->opaque; + u64 udp_session_handle = session_handle (udp_session); - f = quic_session->rx_fifo; + // DEBUG + // lctx = quic_ctx_get (lctx_index); + // QUIC_DBG (2, "Got RX data on session %d", + // lctx->c_quic_ctx_id.udp_session_handle); + + f = udp_session->rx_fifo; do { @@ -1436,7 +1580,7 @@ quic_app_rx_callback (session_t * quic_session) svm_fifo_unset_event (f); return 0; } - QUIC_DBG (2, "Processing one packet at %ld", quic_get_time (NULL)); + // QUIC_DBG (2, "Processing one packet at %ld", quic_get_time (NULL)); svm_fifo_unset_event (f); svm_fifo_peek (f, 0, sizeof (ph), (u8 *) & ph); @@ -1462,11 +1606,11 @@ quic_app_rx_callback (session_t * quic_session) quic_receive (ctx, conn, packet); else if (QUICLY_PACKET_IS_LONG_HEADER (packet.octets.base[0])) quic_create_connection ((quicly_context_t *) app->quicly_ctx, - quic_session_handle, lctx_index, conn, + udp_session_handle, lctx_index, sa, salen, packet); else if (((quicly_context_t *) app->quicly_ctx)->encrypt_cid) quic_reset_connection ((quicly_context_t *) app->quicly_ctx, - quic_session_handle, sa, salen, packet); + udp_session_handle, sa, salen, packet); } svm_fifo_dequeue_drop (f, ph.data_length + ph.data_offset + @@ -1476,6 +1620,7 @@ quic_app_rx_callback (session_t * quic_session) return 0; } + /***************************************************************************** * END TRANSPORT PROTO FUNCTIONS *****************************************************************************/ @@ -1560,7 +1705,6 @@ quic_init (vlib_main_t * vm) qm->ca_cert_path = QUIC_DEFAULT_CA_CERT_PATH; qm->app_index = a->app_index; - clib_rwlock_init (&qm->half_open_rwlock); qm->tstamp_ticks_per_clock = vm->clib_time.seconds_per_clock / QUIC_TSTAMP_RESOLUTION; @@ -1573,12 +1717,6 @@ quic_init (vlib_main_t * vm) return 0; } -quic_main_t * -vnet_quic_get_main (void) -{ - return &quic_main; -} - VLIB_INIT_FUNCTION (quic_init); /* *INDENT-OFF* */ diff --git a/src/plugins/quic/quic.h b/src/plugins/quic/quic.h index 0c0147c6053..a4615946d94 100644 --- a/src/plugins/quic/quic.h +++ b/src/plugins/quic/quic.h @@ -22,10 +22,12 @@ #include #include +#include -#define QUIC_DEBUG 1 -#define QUIC_DEBUG_LEVEL_CLIENT 0 -#define QUIC_DEBUG_LEVEL_SERVER 0 + +#define QUIC_DEBUG 0 +#define QUIC_DEBUG_LEVEL_CLIENT 0 +#define QUIC_DEBUG_LEVEL_SERVER 0 #define QUIC_DEFAULT_CA_CERT_PATH "/etc/ssl/certs/ca-certificates.crt" @@ -45,24 +47,41 @@ #define QUIC_CONN_STATE_HANDSHAKE 0 #define QUIC_CONN_STATE_READY 1 +enum quic_session_type_t +{ + QUIC_SESSION_TYPE_QUIC = 0, + QUIC_SESSION_TYPE_STREAM = 1, + QUIC_SESSION_TYPE_LISTEN = INT32_MAX, +}; + /* *INDENT-OFF* */ typedef CLIB_PACKED (struct quic_ctx_id_ { - session_handle_t app_session; - session_handle_t quic_session; - u32 parent_app_wrk_idx; + u32 parent_app_wrk_id; u32 parent_app_id; - u32 listener_ctx_id; - u32 timer_handle; - quicly_conn_t *conn; - u8 udp_is_ip4; - u8 conn_state; + union { + CLIB_PACKED (struct { + session_handle_t quic_session_handle; // TODO: remove + session_handle_t udp_session_handle; + quicly_conn_t *conn; + u32 listener_ctx_id; + u8 udp_is_ip4; + }); + CLIB_PACKED (struct { + session_handle_t stream_session_handle; // TODO: remove + quicly_stream_t *stream; + u32 quic_connection_ctx_id; + }); + }; + u8 is_stream; }) quic_ctx_id_t; /* *INDENT-ON* */ STATIC_ASSERT (sizeof (quic_ctx_id_t) <= 42, "ctx id must be less than 42"); +// This structure is used to implement the concept of VPP connection for QUIC. +// We create one per connection and one per stream. typedef struct quic_ctx_ { union @@ -70,31 +89,33 @@ typedef struct quic_ctx_ transport_connection_t connection; quic_ctx_id_t c_quic_ctx_id; }; - - quicly_stream_t *stream; u8 *srv_hostname; u32 client_opaque; + u32 timer_handle; + u8 conn_state; u8 is_listener; } quic_ctx_t; +typedef struct quic_stream_data_ +{ + quicly_streambuf_t streambuf; + u32 ctx_id; +} quic_stream_data_t; + typedef struct quic_worker_ctx_ { CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); u32 time_now; /**< worker time */ tw_timer_wheel_1t_3w_1024sl_ov_t timer_wheel; /**< worker timer wheel */ - u32 *tx_buffers; /**< tx buffer free list */ } quic_worker_ctx_t; typedef struct quic_main_ { u32 app_index; - quic_ctx_t *half_open_ctx_pool; quic_ctx_t **ctx_pool; - clib_rwlock_t half_open_rwlock; quic_worker_ctx_t *wrk_ctx; f64 tstamp_ticks_per_clock; - /* * Config */ @@ -105,8 +126,6 @@ typedef struct quic_main_ char *ca_cert_path; } quic_main_t; -quic_main_t *vnet_quic_get_main (void); - #endif /* __included_quic_h__ */ /* diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h index 02dc40ab0cf..32a13cf94f9 100644 --- a/src/vnet/session/session_types.h +++ b/src/vnet/session/session_types.h @@ -42,6 +42,7 @@ typedef struct _session_endpoint_cfg u32 ns_index; u8 original_tp; u8 *hostname; + u64 transport_opts; } session_endpoint_cfg_t; #define SESSION_IP46_ZERO \