From: Matus Fabian Date: Sat, 18 Oct 2025 18:42:18 +0000 (-0400) Subject: http: add transport connect_stream callback X-Git-Url: https://gerrit.fd.io/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F09%2F43909%2F9;p=vpp.git http: add transport connect_stream callback app now open HTTP/2 streams via vnet_connect_stream instead of vnet_connect Type: improvement Change-Id: Icc9a948d7c2a7d50c9d83fefb10f267fb56e4367 Signed-off-by: Matus Fabian --- diff --git a/src/plugins/hs_apps/http_client.c b/src/plugins/hs_apps/http_client.c index 36ee5fb0bec..1ef25a273b3 100644 --- a/src/plugins/hs_apps/http_client.c +++ b/src/plugins/hs_apps/http_client.c @@ -166,8 +166,7 @@ hc_session_alloc (hc_worker_t *wrk) } static int -hc_request (session_t *s, hc_worker_t *wrk, hc_session_t *hc_session, - session_error_t err) +hc_request (session_t *s, hc_worker_t *wrk, hc_session_t *hc_session) { hc_main_t *hcm = &hc_main; u64 to_send; @@ -231,54 +230,51 @@ done: return 0; } -typedef struct -{ - u64 parent_handle; - u32 parent_index; -} hc_connect_streams_args_t; - -static void -hc_connect_streams_rpc (void *rpc_args) +static int +hc_connect_streams (u64 parent_handle, u32 parent_index) { - hc_connect_streams_args_t *args = rpc_args; hc_main_t *hcm = &hc_main; vnet_connect_args_t _a, *a = &_a; hc_worker_t *wrk; - hc_session_t *ho_hs; + hc_session_t *hs; u32 i; int rv; + session_t *s; clib_memset (a, 0, sizeof (*a)); clib_memcpy (&a->sep_ext, &hcm->connect_sep, sizeof (hcm->connect_sep)); - a->sep_ext.parent_handle = args->parent_handle; + a->sep_ext.parent_handle = parent_handle; a->app_index = hcm->app_index; + wrk = hc_worker_get (session_thread_from_handle (parent_handle)); + for (i = 0; i < (hcm->max_streams - 1); i++) { - /* allocate half-open session */ - wrk = hc_worker_get (transport_cl_thread ()); - ho_hs = hc_session_alloc (wrk); - ho_hs->parent_index = args->parent_index; - a->api_context = ho_hs->session_index; + hs = hc_session_alloc (wrk); + hs->parent_index = parent_index; + a->api_context = hs->session_index; - rv = vnet_connect (a); + rv = vnet_connect_stream (a); if (rv) - clib_warning (0, "connect returned: %U", format_session_error, rv); + { + clib_warning (0, "connect returned: %U", format_session_error, rv); + if (rv == SESSION_E_MAX_STREAMS_HIT) + vlib_process_signal_event_mt ( + vlib_get_main (), hcm->cli_node_index, HC_MAX_STREAMS_HIT, 0); + else + vlib_process_signal_event_mt ( + vlib_get_main (), hcm->cli_node_index, HC_CONNECT_FAILED, 0); + return -1; + } + s = session_get_from_handle (a->sh); + hs->http_session_index = s->session_index; + hs->stats.max_req = hcm->reqs_per_session; + hs->stats.start = vlib_time_now (wrk->vlib_main); + if (hc_request (s, wrk, hs)) + return -1; } - vec_free (args); -} - -static void -hc_connect_streams (u64 parent_handle, u32 parent_index) -{ - hc_connect_streams_args_t *args = 0; - vec_validate (args, 0); - args->parent_handle = parent_handle; - args->parent_index = parent_index; - - session_send_rpc_evt_to_thread_force (transport_cl_thread (), - hc_connect_streams_rpc, args); + return 0; } static int @@ -287,7 +283,7 @@ hc_session_connected_callback (u32 app_index, u32 ho_index, session_t *s, { hc_main_t *hcm = &hc_main; hc_worker_t *wrk; - hc_session_t *hc_session, *ho_session, *parent_session; + hc_session_t *hc_session, *ho_session; hc_http_header_t *header; http_version_t http_version; u8 *f = 0; @@ -296,12 +292,8 @@ hc_session_connected_callback (u32 app_index, u32 ho_index, session_t *s, if (err) { clib_warning ("connected error: %U", format_session_error, err); - if (err == SESSION_E_MAX_STREAMS_HIT) - vlib_process_signal_event_mt (vlib_get_main (), hcm->cli_node_index, - HC_MAX_STREAMS_HIT, 0); - else - vlib_process_signal_event_mt (vlib_get_main (), hcm->cli_node_index, - HC_CONNECT_FAILED, 0); + vlib_process_signal_event_mt (vlib_get_main (), hcm->cli_node_index, + HC_CONNECT_FAILED, 0); return -1; } @@ -318,24 +310,6 @@ hc_session_connected_callback (u32 app_index, u32 ho_index, session_t *s, hcm->connected_counter++; clib_spinlock_unlock_if_init (&hcm->lock); - if (hc_session->session_flags & HC_S_FLAG_IS_PARENT) - { - http_version = http_session_get_version (s); - if (http_version == HTTP_VERSION_2 && hcm->max_streams > 1) - { - HTTP_DBG (1, "parent connected, going to open %u streams", - hcm->max_streams - 1); - hc_connect_streams (session_handle (s), hc_session->session_index); - } - } - else - { - parent_session = - hc_session_get (hc_session->parent_index, hc_session->thread_index); - parent_session->child_count++; - } - - hc_session->thread_index = s->thread_index; hc_session->body_recv = 0; s->opaque = hc_session->session_index; wrk->session_index = hc_session->session_index; @@ -422,7 +396,21 @@ hc_session_connected_callback (u32 app_index, u32 ho_index, session_t *s, hc_session->stats.start = vlib_time_now (wrk->vlib_main); - return hc_request (s, wrk, hc_session, err); + if (hc_request (s, wrk, hc_session)) + return -1; + + http_version = http_session_get_version (s); + if (http_version == HTTP_VERSION_2 && hcm->max_streams > 1) + { + ASSERT (hc_session->session_flags & HC_S_FLAG_IS_PARENT); + HTTP_DBG (1, "parent connected, going to open %u streams", + hcm->max_streams - 1); + hc_session->child_count = hcm->max_streams - 1; + if (hc_connect_streams (session_handle (s), hc_session->session_index)) + return -1; + } + + return 0; } static void @@ -503,7 +491,6 @@ hc_rx_callback (session_t *s) http_msg_t msg; int rv; u32 max_deq; - session_error_t session_err = 0; int send_err = 0; http_version_t http_version; @@ -688,7 +675,7 @@ done: else { HTTP_DBG (1, "doing another repeat"); - send_err = hc_request (s, wrk, hc_session, session_err); + send_err = hc_request (s, wrk, hc_session); if (send_err) clib_warning ("failed to send request, error %d", send_err); } diff --git a/src/plugins/hs_apps/http_connect_proxy_client.c b/src/plugins/hs_apps/http_connect_proxy_client.c index ab966268991..cb9bdd13158 100644 --- a/src/plugins/hs_apps/http_connect_proxy_client.c +++ b/src/plugins/hs_apps/http_connect_proxy_client.c @@ -823,6 +823,8 @@ hcpc_open_http_stream (u32 session_index) hcpc_main_t *hcpcm = &hcpc_main; vnet_connect_args_t _a, *a = &_a; session_error_t rv; + session_t *s; + hcpc_session_t *ps; clib_memset (a, 0, sizeof (*a)); clib_memcpy (&a->sep_ext, &hcpcm->proxy_server_sep, @@ -831,9 +833,74 @@ hcpc_open_http_stream (u32 session_index) a->app_index = hcpcm->http_app_index; a->api_context = session_index; - rv = vnet_connect (a); + rv = vnet_connect_stream (a); if (rv) - clib_warning ("connect returned: %U", format_session_error, rv); + { + clib_warning ("session %u connect error: %U", session_index, + format_session_error, rv); + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); + ps = hcpc_session_get (session_index); + ASSERT (ps); + ps->state = HCPC_SESSION_CLOSED; + ps->http_disconnected = 1; + ps->http_establishing = 0; + if (!ps->intercept_diconnected) + { + ps->intercept.rx_fifo->master_thread_index = + ps->intercept.tx_fifo->master_thread_index; + if (ps->intercept.session_handle != SESSION_INVALID_HANDLE) + { + session_reset ( + session_get_from_handle (ps->intercept.session_handle)); + ps->intercept_diconnected = 1; + } + else + { + if (session_thread_from_handle (hcpcm->http_connection_handle) != + ps->intercept.tx_fifo->master_thread_index) + { + session_send_rpc_evt_to_thread ( + ps->intercept.tx_fifo->master_thread_index, + hcpc_session_postponed_free_rpc, + uword_to_pointer (ps->session_index, void *)); + } + else + hcpc_session_free (ps); + } + if (rv == SESSION_E_MAX_STREAMS_HIT) + hcpc_worker_stats_inc (vlib_get_thread_index (), max_streams_hit, + 1); + } + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); + return; + } + + HCPC_DBG ("stream for session [%u] opened", session_index); + s = session_get_from_handle (a->sh); + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); + ps = hcpc_session_get (session_index); + ASSERT (ps); + ps->http.session_handle = session_handle (s); + ps->http.rx_fifo = s->rx_fifo; + ps->http.tx_fifo = s->tx_fifo; + + /* listener session was already closed */ + if (ps->intercept_diconnected) + { + hcpc_worker_stats_inc (s->thread_index, + client_closed_before_stream_opened, 1); + session_reset (s); + ps->http_establishing = 0; + ps->http_disconnected = 1; + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); + return; + } + + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); + + if (svm_fifo_max_dequeue (s->tx_fifo)) + if (svm_fifo_set_event (s->tx_fifo)) + session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX); } static void @@ -847,8 +914,10 @@ hcpc_connect_http_stream_rpc (void *rpc_args) static void hcpc_connect_http_stream (u32 session_index) { - u32 connects_thread = transport_cl_thread (), thread_index; + hcpc_main_t *hcpcm = &hcpc_main; + u32 connects_thread, thread_index; + connects_thread = session_thread_from_handle (hcpcm->http_connection_handle); thread_index = vlib_get_thread_index (); if (thread_index == connects_thread) @@ -858,7 +927,7 @@ hcpc_connect_http_stream (u32 session_index) } session_send_rpc_evt_to_thread_force ( - transport_cl_thread (), hcpc_connect_http_stream_rpc, + connects_thread, hcpc_connect_http_stream_rpc, uword_to_pointer (session_index, void *)); } @@ -1039,82 +1108,27 @@ hcpc_http_session_connected_callback (u32 app_index, u32 session_index, hcpc_main_t *hcpcm = &hcpc_main; hcpc_session_t *ps; + ASSERT (hcpcm->http_connection_handle == SESSION_INVALID_HANDLE); + if (err) { - clib_warning ("session %u connect error: %U", session_index, + clib_warning ("connect to http proxy server failed: %U", format_session_error, err); - - /* connect to http proxy server failed */ - if (hcpcm->http_connection_handle == SESSION_INVALID_HANDLE) - return 0; - - clib_spinlock_lock_if_init (&hcpcm->sessions_lock); - ps = hcpc_session_get (session_index); - ASSERT (ps); - ps->state = HCPC_SESSION_CLOSED; - ps->http_disconnected = 1; - ps->http_establishing = 0; - if (!ps->intercept_diconnected) - { - if (ps->intercept.session_handle != SESSION_INVALID_HANDLE) - { - session_reset ( - session_get_from_handle (ps->intercept.session_handle)); - ps->intercept_diconnected = 1; - } - else - hcpc_delete_session (s, 1); - } - clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); - if (err == SESSION_E_MAX_STREAMS_HIT) - hcpc_worker_stats_inc (vlib_get_thread_index (), max_streams_hit, 1); - return 0; - } - - if (hcpcm->http_connection_handle == SESSION_INVALID_HANDLE) - { - HCPC_DBG ("parent session connected"); - clib_spinlock_lock_if_init (&hcpcm->sessions_lock); - ps = hcpc_session_alloc (); - ps->http.session_handle = session_handle (s); - ps->http.rx_fifo = s->rx_fifo; - ps->http.tx_fifo = s->tx_fifo; - ps->flags |= HCPC_SESSION_F_IS_PARENT; - clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); - s->opaque = ps->session_index; - hcpcm->http_connection_handle = session_handle (s); - vlib_process_signal_event_mt (vlib_get_main (), - hcpcm->process_node_index, - HCPC_EVENT_PROXY_CONNECTED, 0); return 0; } - HCPC_DBG ("stream for session [%u] opened", session_index); + HCPC_DBG ("parent session connected"); clib_spinlock_lock_if_init (&hcpcm->sessions_lock); - ps = hcpc_session_get (session_index); - ASSERT (ps); + ps = hcpc_session_alloc (); ps->http.session_handle = session_handle (s); ps->http.rx_fifo = s->rx_fifo; ps->http.tx_fifo = s->tx_fifo; - - /* listener session was already closed */ - if (ps->intercept_diconnected) - { - hcpc_worker_stats_inc (s->thread_index, - client_closed_before_stream_opened, 1); - session_reset (s); - ps->http_establishing = 0; - ps->http_disconnected = 1; - clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); - return -1; - } - + ps->flags |= HCPC_SESSION_F_IS_PARENT; clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); - - if (svm_fifo_max_dequeue (s->tx_fifo)) - if (svm_fifo_set_event (s->tx_fifo)) - session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX); - + s->opaque = ps->session_index; + hcpcm->http_connection_handle = session_handle (s); + vlib_process_signal_event_mt (vlib_get_main (), hcpcm->process_node_index, + HCPC_EVENT_PROXY_CONNECTED, 0); return 0; } diff --git a/src/plugins/http/http.c b/src/plugins/http/http.c index 1ec48ef7213..b4697d5a961 100644 --- a/src/plugins/http/http.c +++ b/src/plugins/http/http.c @@ -300,7 +300,7 @@ http_get_app_header_list (http_req_t *req, http_msg_t *msg) u8 *app_headers; int rv; - as = session_get_from_handle (req->hr_pa_session_handle); + as = session_get (req->c_s_index, req->c_thread_index); if (msg->data.type == HTTP_MSG_DATA_PTR) { @@ -328,7 +328,7 @@ http_get_app_target (http_req_t *req, http_msg_t *msg) u8 *target; int rv; - as = session_get_from_handle (req->hr_pa_session_handle); + as = session_get (req->c_s_index, req->c_thread_index); if (msg->data.type == HTTP_MSG_DATA_PTR) { @@ -371,7 +371,7 @@ http_get_rx_buf (http_conn_t *hc) void http_req_tx_buffer_init (http_req_t *req, http_msg_t *msg) { - session_t *as = session_get_from_handle (req->hr_pa_session_handle); + session_t *as = session_get (req->c_s_index, req->c_thread_index); http_buffer_init (&req->tx_buf, msg_to_buf_type[msg->data.type], as->tx_fifo, msg->data.body_len); } @@ -828,7 +828,6 @@ http_transport_enable (vlib_main_t *vm, u8 is_en) vec_validate (hm->wrk, num_threads - 1); vec_foreach (wrk, hm->wrk) { - clib_spinlock_init (&wrk->pending_stream_connects_lock); clib_memset (&wrk->stats, 0, sizeof (wrk->stats)); } vec_validate (hm->rx_bufs, num_threads - 1); @@ -951,7 +950,7 @@ http_connect_connection (session_endpoint_cfg_t *sep) } static int -http_connect_stream (u64 parent_handle, u32 opaque) +http_connect_stream (u64 parent_handle, u32 *req_index) { session_t *hs; http_req_handle_t rh; @@ -985,90 +984,26 @@ http_connect_stream (u64 parent_handle, u32 opaque) return -1; } - return http_vfts[rh.version].conn_connect_stream_callback (hc, opaque); -} - -static void -http_handle_stream_connects_rpc (void *args) -{ - clib_thread_index_t thread_index = pointer_to_uword (args); - http_worker_t *wrk; - u32 n_pending, max_connects, n_connects = 0; - http_pending_connect_stream_t *pc; - - wrk = http_worker_get (thread_index); - - clib_spinlock_lock (&wrk->pending_stream_connects_lock); - - n_pending = clib_fifo_elts (wrk->pending_connect_streams); - max_connects = clib_min (32, n_pending); - vec_validate (wrk->burst_connect_streams, max_connects); - - while (n_connects < max_connects) - clib_fifo_sub1 (wrk->pending_connect_streams, - wrk->burst_connect_streams[n_connects++]); - - clib_spinlock_unlock (&wrk->pending_stream_connects_lock); - - n_connects = 0; - while (n_connects < max_connects) - { - pc = &wrk->burst_connect_streams[n_connects++]; - http_connect_stream (pc->parent_handle, pc->opaque); - } - - /* more work to do? */ - if (max_connects < n_pending) - session_send_rpc_evt_to_thread_force ( - thread_index, http_handle_stream_connects_rpc, - uword_to_pointer ((uword) thread_index, void *)); + return http_vfts[rh.version].conn_connect_stream_callback (hc, req_index); } static int -http_program_connect_stream (session_endpoint_cfg_t *sep) +http_transport_connect (transport_endpoint_cfg_t *tep) { - clib_thread_index_t parent_thread_index = - session_thread_from_handle (sep->parent_handle); - http_worker_t *wrk; - u32 n_pending; - - ASSERT (session_vlib_thread_is_cl_thread ()); - - /* if we are already on same worker as parent, handle connect */ - if (parent_thread_index == transport_cl_thread ()) - return http_connect_stream (sep->parent_handle, sep->opaque); - - /* if not on same worker as parent, queue request */ - wrk = http_worker_get (parent_thread_index); - - clib_spinlock_lock (&wrk->pending_stream_connects_lock); - - http_pending_connect_stream_t p = { .parent_handle = sep->parent_handle, - .opaque = sep->opaque }; - clib_fifo_add1 (wrk->pending_connect_streams, p); - n_pending = clib_fifo_elts (wrk->pending_connect_streams); - - clib_spinlock_unlock (&wrk->pending_stream_connects_lock); - - if (n_pending == 1) - session_send_rpc_evt_to_thread_force ( - parent_thread_index, http_handle_stream_connects_rpc, - uword_to_pointer ((uword) parent_thread_index, void *)); + session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) tep; - return 0; + ASSERT (sep->parent_handle == SESSION_INVALID_HANDLE); + return http_connect_connection (sep); } static int -http_transport_connect (transport_endpoint_cfg_t *tep) +http_transport_connect_stream (transport_endpoint_cfg_t *tep, + CLIB_UNUSED (session_t *stream_session), + u32 *conn_index) { session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) tep; - session_t *hs; - hs = session_get_from_handle_if_valid (sep->parent_handle); - if (hs) - return http_program_connect_stream (sep); - else - return http_connect_connection (sep); + return http_connect_stream (sep->parent_handle, conn_index); } static u32 @@ -1443,6 +1378,7 @@ http_transport_cleanup_ho (u32 ho_hc_index) static const transport_proto_vft_t http_proto = { .enable = http_transport_enable, .connect = http_transport_connect, + .connect_stream = http_transport_connect_stream, .start_listen = http_start_listen, .stop_listen = http_stop_listen, .half_close = http_transport_shutdown, diff --git a/src/plugins/http/http1.c b/src/plugins/http/http1.c index 19cc1879b8e..af9eec4b6e4 100644 --- a/src/plugins/http/http1.c +++ b/src/plugins/http/http1.c @@ -77,7 +77,7 @@ http1_conn_alloc_req (http_conn_t *hc) pool_get_aligned_safe (h1m->req_pool[hc->c_thread_index], req, CLIB_CACHE_LINE_BYTES); clib_memset (req, 0, sizeof (*req)); - req->hr_pa_session_handle = SESSION_INVALID_HANDLE; + req->c_s_index = SESSION_INVALID_INDEX; req_index = req - h1m->req_pool[hc->c_thread_index]; hr_handle.version = HTTP_VERSION_1; hr_handle.req_index = req_index; @@ -1967,7 +1967,7 @@ http1_transport_connected_callback (http_conn_t *hc) req = http1_conn_alloc_req (hc); http_req_state_change (req, HTTP_REQ_STATE_WAIT_APP_METHOD); http_stats_connections_established_inc (hc->c_thread_index); - return http_conn_established (hc, req, hc->hc_pa_app_api_ctx, 0); + return http_conn_established (hc, req, hc->hc_pa_app_api_ctx); } static void diff --git a/src/plugins/http/http2/http2.c b/src/plugins/http/http2/http2.c index 45d35cba09c..286c1fc9a55 100644 --- a/src/plugins/http/http2/http2.c +++ b/src/plugins/http/http2/http2.c @@ -251,7 +251,7 @@ http2_conn_alloc_req (http_conn_t *hc, u8 is_parent) pool_get_aligned_safe (wrk->req_pool, req, CLIB_CACHE_LINE_BYTES); clib_memset (req, 0, sizeof (*req)); - req->base.hr_pa_session_handle = SESSION_INVALID_HANDLE; + req->base.c_s_index = SESSION_INVALID_INDEX; req_index = req - wrk->req_pool; hr_handle.version = HTTP_VERSION_2; hr_handle.req_index = req_index; @@ -2450,7 +2450,7 @@ http2_handle_settings_frame (http_conn_t *hc, http2_frame_header_t *fh) http2_default_conn_settings.header_table_size); http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_APP_METHOD); http_stats_connections_established_inc (hc->c_thread_index); - if (http_conn_established (hc, &req->base, hc->hc_pa_app_api_ctx, 0)) + if (http_conn_established (hc, &req->base, hc->hc_pa_app_api_ctx)) return HTTP2_ERROR_INTERNAL_ERROR; } @@ -3278,29 +3278,22 @@ http2_conn_accept_callback (http_conn_t *hc) } static int -http2_conn_connect_stream_callback (http_conn_t *hc, u32 parent_app_api_ctx) +http2_conn_connect_stream_callback (http_conn_t *hc, u32 *req_index) { http2_conn_ctx_t *h2c; http2_req_t *req; - app_worker_t *app_wrk; - int rv; HTTP_DBG (1, "hc [%u]%x", hc->c_thread_index, hc->hc_hc_index); h2c = http2_conn_ctx_get_w_thread (hc); ASSERT (!(hc->flags & HTTP_CONN_F_IS_SERVER)); ASSERT (!(h2c->flags & HTTP2_CONN_F_EXPECT_SERVER_SETTINGS)); - app_wrk = app_worker_get_if_valid (hc->hc_pa_wrk_index); - if (!app_wrk) - return -1; if (h2c->req_num == h2c->settings.max_concurrent_streams) - return app_worker_connect_notify (app_wrk, 0, SESSION_E_MAX_STREAMS_HIT, - parent_app_api_ctx); + return SESSION_E_MAX_STREAMS_HIT; req = http2_conn_alloc_req (hc, 0); + req->base.hr_pa_wrk_index = hc->hc_pa_wrk_index; http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_APP_METHOD); - rv = http_conn_established (hc, &req->base, parent_app_api_ctx, 1); - if (rv != 0) - http2_conn_free_req (h2c, req, hc->c_thread_index); - return rv; + *req_index = req->base.hr_req_handle; + return SESSION_E_NONE; } static void diff --git a/src/plugins/http/http_private.h b/src/plugins/http/http_private.h index f5d600c5164..14b24b17b91 100644 --- a/src/plugins/http/http_private.h +++ b/src/plugins/http/http_private.h @@ -113,7 +113,6 @@ typedef struct http_req_ http_req_id_t c_http_req_id; }; #define hr_pa_wrk_index c_http_req_id.parent_app_wrk_index -#define hr_pa_session_handle c_http_req_id.app_session_handle #define hr_hc_index c_http_req_id.hc_index #define hr_req_handle connection.c_index @@ -240,9 +239,6 @@ typedef struct http_pending_connect_stream_ typedef struct http_worker_ { http_conn_t *conn_pool; - clib_spinlock_t pending_stream_connects_lock; - http_pending_connect_stream_t *pending_connect_streams; - http_pending_connect_stream_t *burst_connect_streams; http_wrk_stats_t stats; } http_worker_t; @@ -299,7 +295,7 @@ typedef struct http_engine_vft_ void (*transport_conn_reschedule_callback) (http_conn_t *hc); void (*conn_accept_callback) (http_conn_t *hc); /* optional */ int (*conn_connect_stream_callback) (http_conn_t *hc, - u32 parent_app_api_ctx); /* optional */ + u32 *req_index); /* optional */ void (*conn_cleanup_callback) (http_conn_t *hc); void (*enable_callback) (void); /* optional */ uword (*unformat_cfg_callback) (unformat_input_t *input); /* optional */ @@ -471,7 +467,7 @@ http_app_worker_rx_notify (http_req_t *req) session_t *as; app_worker_t *app_wrk; - as = session_get_from_handle (req->hr_pa_session_handle); + as = session_get (req->c_s_index, req->c_thread_index); if (!(as->flags & SESSION_F_RX_EVT)) { app_wrk = app_worker_get_if_valid (as->app_wrk_index); @@ -509,7 +505,7 @@ http_get_app_msg (http_req_t *req, http_msg_t *msg) session_t *as; int rv; - as = session_get_from_handle (req->hr_pa_session_handle); + as = session_get (req->c_s_index, req->c_thread_index); rv = svm_fifo_dequeue (as->tx_fifo, sizeof (*msg), (u8 *) msg); ASSERT (rv == sizeof (*msg)); } @@ -581,14 +577,14 @@ http_req_deschedule (http_req_t *req, transport_send_params_t *sp) always_inline void http_io_as_add_want_deq_ntf (http_req_t *req) { - session_t *as = session_get_from_handle (req->hr_pa_session_handle); + session_t *as = session_get (req->c_s_index, req->c_thread_index); svm_fifo_add_want_deq_ntf (as->rx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); } always_inline void http_io_as_add_want_read_ntf (http_req_t *req) { - session_t *as = session_get_from_handle (req->hr_pa_session_handle); + session_t *as = session_get (req->c_s_index, req->c_thread_index); svm_fifo_add_want_deq_ntf (as->rx_fifo, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL | SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY); } @@ -596,7 +592,7 @@ http_io_as_add_want_read_ntf (http_req_t *req) always_inline void http_io_as_del_want_read_ntf (http_req_t *req) { - session_t *as = session_get_from_handle (req->hr_pa_session_handle); + session_t *as = session_get (req->c_s_index, req->c_thread_index); svm_fifo_del_want_deq_ntf (as->rx_fifo, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL | SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY); } @@ -604,14 +600,14 @@ http_io_as_del_want_read_ntf (http_req_t *req) always_inline void http_io_as_reset_has_read_ntf (http_req_t *req) { - session_t *as = session_get_from_handle (req->hr_pa_session_handle); + session_t *as = session_get (req->c_s_index, req->c_thread_index); svm_fifo_reset_has_deq_ntf (as->rx_fifo); } always_inline void http_io_as_dequeue_notify (http_req_t *req, u32 n_last_deq) { - session_t *as = session_get_from_handle (req->hr_pa_session_handle); + session_t *as = session_get (req->c_s_index, req->c_thread_index); if (svm_fifo_needs_deq_ntf (as->tx_fifo, n_last_deq)) session_dequeue_notify (as); } @@ -619,14 +615,14 @@ http_io_as_dequeue_notify (http_req_t *req, u32 n_last_deq) always_inline u32 http_io_as_max_write (http_req_t *req) { - session_t *as = session_get_from_handle (req->hr_pa_session_handle); + session_t *as = session_get (req->c_s_index, req->c_thread_index); return svm_fifo_max_enqueue_prod (as->rx_fifo); } always_inline u32 http_io_as_max_read (http_req_t *req) { - session_t *as = session_get_from_handle (req->hr_pa_session_handle); + session_t *as = session_get (req->c_s_index, req->c_thread_index); return svm_fifo_max_dequeue_cons (as->tx_fifo); } @@ -634,7 +630,7 @@ always_inline void http_io_as_write (http_req_t *req, u8 *data, u32 len) { int n_written; - session_t *as = session_get_from_handle (req->hr_pa_session_handle); + session_t *as = session_get (req->c_s_index, req->c_thread_index); n_written = svm_fifo_enqueue (as->rx_fifo, len, data); ASSERT (n_written == len); @@ -645,7 +641,7 @@ http_io_as_write_segs (http_req_t *req, const svm_fifo_seg_t segs[], u32 n_segs) { int n_written; - session_t *as = session_get_from_handle (req->hr_pa_session_handle); + session_t *as = session_get (req->c_s_index, req->c_thread_index); n_written = svm_fifo_enqueue_segments (as->rx_fifo, segs, n_segs, 0); ASSERT (n_written > 0); return (u32) n_written; @@ -655,7 +651,7 @@ always_inline u32 http_io_as_peek (http_req_t *req, u8 *buf, u32 len, u32 offset) { int n_read; - session_t *as = session_get_from_handle (req->hr_pa_session_handle); + session_t *as = session_get (req->c_s_index, req->c_thread_index); n_read = svm_fifo_peek (as->tx_fifo, offset, len, buf); ASSERT (n_read > 0); @@ -667,7 +663,7 @@ http_io_as_read_segs (http_req_t *req, svm_fifo_seg_t *segs, u32 *n_segs, u32 max_bytes) { int n_read; - session_t *as = session_get_from_handle (req->hr_pa_session_handle); + session_t *as = session_get (req->c_s_index, req->c_thread_index); n_read = svm_fifo_segments (as->tx_fifo, 0, segs, n_segs, max_bytes); ASSERT (n_read > 0); return (u32) n_read; @@ -676,21 +672,21 @@ http_io_as_read_segs (http_req_t *req, svm_fifo_seg_t *segs, u32 *n_segs, always_inline void http_io_as_drain (http_req_t *req, u32 len) { - session_t *as = session_get_from_handle (req->hr_pa_session_handle); + session_t *as = session_get (req->c_s_index, req->c_thread_index); svm_fifo_dequeue_drop (as->tx_fifo, len); } always_inline void http_io_as_drain_all (http_req_t *req) { - session_t *as = session_get_from_handle (req->hr_pa_session_handle); + session_t *as = session_get (req->c_s_index, req->c_thread_index); svm_fifo_dequeue_drop_all (as->tx_fifo); } always_inline void http_io_as_drain_unread (http_req_t *req) { - session_t *as = session_get_from_handle (req->hr_pa_session_handle); + session_t *as = session_get (req->c_s_index, req->c_thread_index); svm_fifo_dequeue_drop_all (as->rx_fifo); } @@ -871,13 +867,12 @@ http_conn_accept_request (http_conn_t *hc, http_req_t *req) if ((rv = app_worker_init_accepted (as))) { HTTP_DBG (1, "failed to allocate fifos"); - req->hr_pa_session_handle = SESSION_INVALID_HANDLE; + req->c_s_index = SESSION_INVALID_INDEX; session_free (as); hc->flags |= HTTP_CONN_F_NO_APP_SESSION; return rv; } - req->hr_pa_session_handle = session_handle (as); req->hr_pa_wrk_index = as->app_wrk_index; app_wrk = app_worker_get (as->app_wrk_index); @@ -885,7 +880,7 @@ http_conn_accept_request (http_conn_t *hc, http_req_t *req) if ((rv = app_worker_accept_notify (app_wrk, as))) { HTTP_DBG (1, "app accept returned"); - req->hr_pa_session_handle = SESSION_INVALID_HANDLE; + req->c_s_index = SESSION_INVALID_INDEX; session_free (as); hc->flags |= HTTP_CONN_F_NO_APP_SESSION; return rv; @@ -896,7 +891,7 @@ http_conn_accept_request (http_conn_t *hc, http_req_t *req) always_inline int http_conn_established (http_conn_t *hc, http_req_t *req, - u32 parent_app_api_ctx, u8 is_stream) + u32 parent_app_api_ctx) { session_t *as; app_worker_t *app_wrk; @@ -904,12 +899,9 @@ http_conn_established (http_conn_t *hc, http_req_t *req, http_conn_t *ho_hc; int rv; - if (!is_stream) - { - ho_hc = http_ho_conn_get (hc->ho_index); - /* in chain with TLS there is race on half-open cleanup */ - __atomic_fetch_or (&ho_hc->flags, HTTP_CONN_F_HO_DONE, __ATOMIC_RELEASE); - } + ho_hc = http_ho_conn_get (hc->ho_index); + /* in chain with TLS there is race on half-open cleanup */ + __atomic_fetch_or (&ho_hc->flags, HTTP_CONN_F_HO_DONE, __ATOMIC_RELEASE); /* allocate app session and initialize */ as = session_alloc (hc->c_thread_index); @@ -943,7 +935,6 @@ http_conn_established (http_conn_t *hc, http_req_t *req, app_worker_connect_notify (app_wrk, as, 0, parent_app_api_ctx); - req->hr_pa_session_handle = session_handle (as); req->hr_pa_wrk_index = as->app_wrk_index; return 0;