From: Matus Fabian Date: Fri, 18 Jul 2025 17:04:07 +0000 (-0400) Subject: http: h2 client multiplexing X-Git-Tag: v26.02-rc0~153 X-Git-Url: https://gerrit.fd.io/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F58%2F43458%2F15;p=vpp.git http: h2 client multiplexing Type: improvement Change-Id: I768df864cbda26b0901528789b52a33e788c2258 Signed-off-by: Matus Fabian --- diff --git a/extras/hs-test/http2_test.go b/extras/hs-test/http2_test.go index 238d335be84..26605f2140a 100644 --- a/extras/hs-test/http2_test.go +++ b/extras/hs-test/http2_test.go @@ -6,13 +6,15 @@ import ( "strings" "time" + "github.com/edwarnicke/exechelper" + . "fd.io/hs-test/infra" ) func init() { RegisterH2Tests(Http2TcpGetTest, Http2TcpPostTest, Http2MultiplexingTest, Http2TlsTest, Http2ContinuationTxTest, Http2ServerMemLeakTest, - Http2ClientGetTest, Http2ClientPostTest, Http2ClientPostPtrTest, Http2ClientGetRepeatTest) - RegisterH2MWTests(Http2MultiplexingMWTest) + Http2ClientGetTest, Http2ClientPostTest, Http2ClientPostPtrTest, Http2ClientGetRepeatTest, Http2ClientMultiplexingTest) + RegisterH2MWTests(Http2MultiplexingMWTest, Http2ClientMultiplexingMWTest) RegisterVethTests(Http2CliTlsTest, Http2ClientContinuationTest) } @@ -233,6 +235,66 @@ func Http2ClientGetRepeatTest(s *Http2Suite) { s.Log(o) } +func Http2ClientMultiplexingTest(s *Http2Suite) { + vpp := s.Containers.Vpp.VppInstance + serverAddress := s.HostAddr() + ":" + s.Ports.Port2 + + s.CreateNginxServer() + s.AssertNil(s.Containers.NginxServer.Start()) + + uri := "https://" + serverAddress + "/httpTestFile" + cmd := fmt.Sprintf("http client http2 streams %d repeat %d uri %s", 10, 20, uri) + o := vpp.Vppctl(cmd) + s.Log(o) + s.AssertContains(o, "20 request(s)") + logPath := s.Containers.NginxServer.GetHostWorkDir() + "/" + s.Containers.NginxServer.Name + "-access.log" + logContents, err := exechelper.Output("cat " + logPath) + s.Log(string(logContents)) + s.AssertNil(err) + s.AssertContains(string(logContents), "conn_reqs=20") + + /* test session cleanup */ + httpStreamCleanupDone := false + tcpSessionCleanupDone := false + for nTries := 0; nTries < 30; nTries++ { + o := vpp.Vppctl("show session verbose") + if !strings.Contains(o, "[T]") { + tcpSessionCleanupDone = true + } + if !strings.Contains(o, "[H2]") { + httpStreamCleanupDone = true + } + if httpStreamCleanupDone && tcpSessionCleanupDone { + break + } + time.Sleep(1 * time.Second) + } + s.AssertEqual(true, tcpSessionCleanupDone, "TCP session not cleaned up") + s.AssertEqual(true, httpStreamCleanupDone, "HTTP/2 stream not cleaned up") +} + +func Http2ClientMultiplexingMWTest(s *Http2Suite) { + s.CpusPerVppContainer = 3 + s.SetupTest() + + vpp := s.Containers.Vpp.VppInstance + serverAddress := s.HostAddr() + ":" + s.Ports.Port2 + + s.CreateNginxServer() + s.AssertNil(s.Containers.NginxServer.Start()) + + uri := "https://" + serverAddress + "/httpTestFile" + cmd := fmt.Sprintf("http client http2 sessions 2 streams %d repeat %d uri %s", 5, 20, uri) + o := vpp.Vppctl(cmd) + s.Log(o) + s.AssertContains(o, "20 request(s)") + logPath := s.Containers.NginxServer.GetHostWorkDir() + "/" + s.Containers.NginxServer.Name + "-access.log" + logContents, err := exechelper.Output("cat " + logPath) + s.Log(string(logContents)) + s.AssertNil(err) + s.AssertEqual(2, strings.Count(string(logContents), "conn_reqs=10")) +} + func Http2ClientContinuationTest(s *VethsSuite) { serverAddress := s.Interfaces.Server.Ip4AddressString() + ":" + s.Ports.Port1 diff --git a/extras/hs-test/infra/suite_http2.go b/extras/hs-test/infra/suite_http2.go index b26ffd9e978..69739bc8a6b 100644 --- a/extras/hs-test/infra/suite_http2.go +++ b/extras/hs-test/infra/suite_http2.go @@ -98,7 +98,12 @@ func (s *Http2Suite) SetupTest() { } func (s *Http2Suite) TeardownTest() { - s.HstSuite.TeardownTest() + defer s.HstSuite.TeardownTest() + vpp := s.Containers.Vpp.VppInstance + if CurrentSpecReport().Failed() { + s.Log(vpp.Vppctl("show session verbose 2")) + s.Log(vpp.Vppctl("show error")) + } } func (s *Http2Suite) VppAddr() string { @@ -514,9 +519,11 @@ var _ = Describe("H2SpecClientSuite", Ordered, Serial, func() { }) testCases := []struct { - desc string - portOffset int + desc string + portOffset int + clientExtraArgs string }{ + // some tests are testing error conditions after request is completed so in this run http client with repeat {desc: "client/1/1", portOffset: 0}, {desc: "client/4.1/1", portOffset: 1}, {desc: "client/4.1/2", portOffset: 2}, @@ -535,8 +542,8 @@ var _ = Describe("H2SpecClientSuite", Ordered, Serial, func() { //{desc: "client/5.1/6", portOffset: 13}, //{desc: "client/5.1/7", portOffset: 14}, {desc: "client/5.1/8", portOffset: 15}, - {desc: "client/5.1/9", portOffset: 16}, - {desc: "client/5.1/10", portOffset: 17}, + {desc: "client/5.1/9", portOffset: 16, clientExtraArgs: "repeat 2 "}, + {desc: "client/5.1/10", portOffset: 17, clientExtraArgs: "repeat 2 "}, {desc: "client/5.1.1/1", portOffset: 18}, {desc: "client/5.4.1/1", portOffset: 19}, {desc: "client/5.4.1/2", portOffset: 20}, @@ -552,7 +559,7 @@ var _ = Describe("H2SpecClientSuite", Ordered, Serial, func() { //{desc: "client/6.3/2", portOffset: 29}, {desc: "client/6.4/1", portOffset: 30}, {desc: "client/6.4/2", portOffset: 31}, - {desc: "client/6.4/3", portOffset: 32}, + {desc: "client/6.4/3", portOffset: 32, clientExtraArgs: "repeat 2 "}, {desc: "client/6.5/1", portOffset: 33}, {desc: "client/6.5/2", portOffset: 34}, {desc: "client/6.5/3", portOffset: 35}, @@ -573,11 +580,11 @@ var _ = Describe("H2SpecClientSuite", Ordered, Serial, func() { {desc: "client/6.9.1/1", portOffset: 49}, // TODO: message framing without content length using END_STREAM flag //{desc: "client/6.9.1/2", portOffset: 50}, - {desc: "client/6.10/1", portOffset: 51}, + {desc: "client/6.10/1", portOffset: 51, clientExtraArgs: "repeat 2 "}, {desc: "client/6.10/2", portOffset: 52}, {desc: "client/6.10/3", portOffset: 53}, - {desc: "client/6.10/4", portOffset: 54}, - {desc: "client/6.10/5", portOffset: 55}, + {desc: "client/6.10/4", portOffset: 54, clientExtraArgs: "repeat 2 "}, + {desc: "client/6.10/5", portOffset: 55, clientExtraArgs: "repeat 2 "}, {desc: "client/6.10/6", portOffset: 56}, } @@ -611,10 +618,8 @@ var _ = Describe("H2SpecClientSuite", Ordered, Serial, func() { go h2spec.RunClientSpec(conf) - cmd := fmt.Sprintf("http client timeout 5 verbose uri https://%s:%d/", serverAddress, h2specdFromPort+test.portOffset) - res := s.Containers.Vpp.VppInstance.Vppctl(cmd) - s.Log(res) - s.AssertNotContains(res, "error: timeout") + cmd := fmt.Sprintf("http client timeout 5 %s uri https://%s:%d/", test.clientExtraArgs, serverAddress, h2specdFromPort+test.portOffset) + s.Log(s.Containers.Vpp.VppInstance.Vppctl(cmd)) oChan := make(chan string) go func() { diff --git a/extras/hs-test/resources/nginx/nginx_server.conf b/extras/hs-test/resources/nginx/nginx_server.conf index a40ed7c309a..d161e3c4164 100644 --- a/extras/hs-test/resources/nginx/nginx_server.conf +++ b/extras/hs-test/resources/nginx/nginx_server.conf @@ -13,6 +13,9 @@ events { } http { + log_format access_log_fmt '$remote_addr - $remote_user [$time_local] ' + '"$request" $status $body_bytes_sent ' + '"$http_referer" "$http_user_agent" conn=$connection conn_reqs=$connection_requests'; keepalive_timeout 300s; keepalive_requests 1000000; client_body_timeout {{.Timeout}}s; @@ -20,7 +23,7 @@ http { send_timeout {{.Timeout}}s; sendfile on; server { - access_log /tmp/nginx/{{.LogPrefix}}-access.log; + access_log /tmp/nginx/{{.LogPrefix}}-access.log access_log_fmt; listen {{.Port}}; listen {{.PortSsl}} ssl; server_name {{.Address}}; diff --git a/src/plugins/hs_apps/http_client.c b/src/plugins/hs_apps/http_client.c index 97dbca788ee..40eb1d8c514 100644 --- a/src/plugins/hs_apps/http_client.c +++ b/src/plugins/hs_apps/http_client.c @@ -15,7 +15,8 @@ #define foreach_hc_s_flag \ _ (1, IS_CLOSED) \ _ (2, PRINTABLE_BODY) \ - _ (4, CHUNKED_BODY) + _ (4, CHUNKED_BODY) \ + _ (8, IS_PARENT) typedef enum hc_s_flag_ { @@ -26,7 +27,7 @@ typedef enum hc_s_flag_ typedef struct { - u64 req_per_wrk; + u64 max_req; u64 request_count; f64 start, end; f64 elapsed_time; @@ -46,6 +47,12 @@ typedef struct u8 *http_response; u8 *response_status; FILE *file_ptr; + union + { + u32 child_count; + u32 parent_index; + }; + u32 http_session_index; } hc_session_t; typedef struct @@ -83,7 +90,8 @@ typedef struct bool verbose; f64 timeout; http_req_method_t req_method; - u64 repeat_count; + u64 reqs_per_session; + u64 reqs_remainder; f64 duration; bool repeat; bool multi_session; @@ -91,6 +99,7 @@ typedef struct u32 connected_counter; u32 worker_index; u32 max_sessions; + u32 max_streams; u32 private_segment_size; u32 prealloc_fifos; u32 fifo_size; @@ -112,6 +121,7 @@ typedef enum HC_GENERIC_ERR, HC_FOPEN_FAILED, HC_REPEAT_DONE, + HC_MAX_STREAMS_HIT, } hc_cli_signal_t; #define mime_printable_max_len 35 @@ -151,6 +161,7 @@ hc_session_alloc (hc_worker_t *wrk) pool_get_zero (wrk->sessions, s); s->session_index = s - wrk->sessions; s->thread_index = wrk->thread_index; + HTTP_DBG (1, "[%u]%u", s->thread_index, s->session_index); return s; } @@ -221,32 +232,110 @@ done: return 0; } +typedef struct +{ + u64 parent_handle; + u32 parent_index; +} hc_connect_streams_args_t; + +static void +hc_connect_streams_rpc (void *rpc_args) +{ + hc_connect_streams_args_t *args = rpc_args; + hc_main_t *hcm = &hc_main; + vnet_connect_args_t _a, *a = &_a; + hc_worker_t *wrk; + hc_session_t *ho_hs; + u32 i; + int rv; + + clib_memset (a, 0, sizeof (*a)); + clib_memcpy (&a->sep_ext, &hcm->connect_sep, sizeof (hcm->connect_sep)); + a->sep_ext.parent_handle = args->parent_handle; + a->app_index = hcm->app_index; + + for (i = 0; i < (hcm->max_streams - 1); i++) + { + /* allocate half-open session */ + wrk = hc_worker_get (transport_cl_thread ()); + ho_hs = hc_session_alloc (wrk); + ho_hs->parent_index = args->parent_index; + a->api_context = ho_hs->session_index; + + rv = vnet_connect (a); + if (rv) + clib_warning (0, "connect returned: %U", format_session_error, rv); + } + vec_free (args); +} + +static void +hc_connect_streams (u64 parent_handle, u32 parent_index) +{ + hc_connect_streams_args_t *args = 0; + + vec_validate (args, 0); + args->parent_handle = parent_handle; + args->parent_index = parent_index; + + session_send_rpc_evt_to_thread_force (transport_cl_thread (), + hc_connect_streams_rpc, args); +} + static int -hc_session_connected_callback (u32 app_index, u32 hc_session_index, - session_t *s, session_error_t err) +hc_session_connected_callback (u32 app_index, u32 ho_index, session_t *s, + session_error_t err) { hc_main_t *hcm = &hc_main; hc_worker_t *wrk; - hc_session_t *hc_session; + hc_session_t *hc_session, *ho_session, *parent_session; hc_http_header_t *header; + http_version_t http_version; u8 *f = 0; + u32 s_index; if (err) { - clib_warning ("hc_session_index[%d] connected error: %U", - hc_session_index, format_session_error, err); - vlib_process_signal_event_mt (vlib_get_main (), hcm->cli_node_index, - HC_CONNECT_FAILED, 0); + clib_warning ("connected error: %U", format_session_error, err); + if (err == SESSION_E_MAX_STREAMS_HIT) + vlib_process_signal_event_mt (vlib_get_main (), hcm->cli_node_index, + HC_MAX_STREAMS_HIT, 0); + else + vlib_process_signal_event_mt (vlib_get_main (), hcm->cli_node_index, + HC_CONNECT_FAILED, 0); return -1; } + ho_session = hc_session_get (ho_index, transport_cl_thread ()); wrk = hc_worker_get (s->thread_index); - hc_session = hc_session_alloc (wrk); + s_index = hc_session->session_index; + clib_memcpy_fast (hc_session, ho_session, sizeof (*hc_session)); + hc_session->session_index = s_index; + hc_session->thread_index = s->thread_index; + hc_session->http_session_index = s->session_index; + clib_spinlock_lock_if_init (&hcm->lock); hcm->connected_counter++; clib_spinlock_unlock_if_init (&hcm->lock); + if (hc_session->session_flags & HC_S_FLAG_IS_PARENT) + { + http_version = http_session_get_version (s); + if (http_version == HTTP_VERSION_2 && hcm->max_streams > 1) + { + HTTP_DBG (1, "parent connected, going to open %u streams", + hcm->max_streams - 1); + hc_connect_streams (session_handle (s), hc_session->session_index); + } + } + else + { + parent_session = + hc_session_get (hc_session->parent_index, hc_session->thread_index); + parent_session->child_count++; + } + hc_session->thread_index = s->thread_index; hc_session->body_recv = 0; s->opaque = hc_session->session_index; @@ -254,19 +343,18 @@ hc_session_connected_callback (u32 app_index, u32 hc_session_index, if (hcm->multi_session) { - hc_session->stats.req_per_wrk = hcm->repeat_count / hcm->max_sessions; + hc_session->stats.max_req = hcm->reqs_per_session; clib_spinlock_lock_if_init (&hcm->lock); /* add remaining requests to the first connected session */ if (hcm->connected_counter == 1) { - hc_session->stats.req_per_wrk += - hcm->repeat_count % hcm->max_sessions; + hc_session->stats.max_req += hcm->reqs_remainder; } clib_spinlock_unlock_if_init (&hcm->lock); } else { - hc_session->stats.req_per_wrk = hcm->repeat_count; + hc_session->stats.max_req = hcm->reqs_per_session; hcm->worker_index = s->thread_index; } if (hcm->filename) @@ -369,7 +457,7 @@ hc_session_transport_closed_callback (session_t *s) } /* send an event when all sessions are closed */ - if (hcm->done_count >= hcm->max_sessions) + if (hcm->done_count >= (hcm->max_sessions * hcm->max_streams)) { if (hcm->was_transport_closed) { @@ -428,7 +516,10 @@ hc_rx_callback (session_t *s) max_deq = svm_fifo_max_dequeue_cons (s->rx_fifo); if (PREDICT_FALSE (max_deq == 0)) - goto done; + { + HTTP_DBG (1, "no data to deq"); + return 0; + } if (hc_session->to_recv == 0) { @@ -444,6 +535,10 @@ hc_rx_callback (session_t *s) return -1; } + HTTP_DBG (1, "hc_session_index[%u]%u %U content-length: %lu", + s->thread_index, s->opaque, format_http_status_code, msg.code, + msg.data.body_len); + if (msg.data.headers_len) { http_init_header_table_buf (&hc_session->resp_headers, msg); @@ -519,6 +614,7 @@ hc_rx_callback (session_t *s) svm_fifo_max_dequeue (s->rx_fifo)); if (!max_deq) { + HTTP_DBG (1, "body not yet received"); goto done; } u32 n_deq = clib_min (hc_session->to_recv, max_deq); @@ -543,6 +639,7 @@ hc_rx_callback (session_t *s) ASSERT (hc_session->to_recv >= rv); hc_session->to_recv -= rv; hc_session->body_recv += rv; + HTTP_DBG (1, "read %u, left to recv %u", n_deq, hc_session->to_recv); if (hcm->filename) { if (hc_session->file_ptr == NULL) @@ -566,10 +663,28 @@ done: hc_session->stats.request_count++; if (hc_session->stats.elapsed_time >= hcm->duration && - hc_session->stats.request_count >= hc_session->stats.req_per_wrk) + hc_session->stats.request_count >= hc_session->stats.max_req) { HTTP_DBG (1, "repeat done"); - hc_session_disconnect_callback (s); + if (hc_session->session_flags & HC_S_FLAG_IS_PARENT) + { + /* parent must be closed last */ + if (hc_session->child_count != 0) + hc_session->session_flags |= HC_S_FLAG_IS_CLOSED; + else + hc_session_disconnect_callback (s); + } + else + { + hc_session_disconnect_callback (s); + hc_session_t *parent = hc_session_get ( + hc_session->parent_index, hc_session->thread_index); + parent->child_count--; + if (parent->child_count == 0 && + parent->session_flags & HC_S_FLAG_IS_CLOSED) + hc_session_disconnect_callback (session_get ( + parent->http_session_index, parent->thread_index)); + } } else { @@ -619,6 +734,14 @@ hc_tx_callback (session_t *s) return 0; } +static void +hc_ho_cleanup_callback (session_t *s) +{ + HTTP_DBG (1, "ho index %u", s->opaque); + hc_worker_t *wrk = hc_worker_get (transport_cl_thread ()); + pool_put_index (wrk->sessions, s->opaque); +} + static session_cb_vft_t hc_session_cb_vft = { .session_connected_callback = hc_session_connected_callback, .session_disconnect_callback = hc_session_disconnect_callback, @@ -626,6 +749,7 @@ static session_cb_vft_t hc_session_cb_vft = { .session_reset_callback = hc_session_reset_callback, .builtin_app_rx_callback = hc_rx_callback, .builtin_app_tx_callback = hc_tx_callback, + .half_open_cleanup_callback = hc_ho_cleanup_callback, }; static clib_error_t * @@ -683,24 +807,30 @@ hc_attach () return 0; } -static int +static void hc_connect_rpc (void *rpc_args) { vnet_connect_args_t *a = rpc_args; int rv = ~0; hc_main_t *hcm = &hc_main; + hc_worker_t *wrk; + hc_session_t *ho_hs; for (u32 i = 0; i < hcm->max_sessions; i++) { + /* allocate half-open session */ + wrk = hc_worker_get (transport_cl_thread ()); + ho_hs = hc_session_alloc (wrk); + ho_hs->session_flags |= HC_S_FLAG_IS_PARENT; + a->api_context = ho_hs->session_index; + rv = vnet_connect (a); - if (rv > 0) + if (rv) clib_warning (0, "connect returned: %U", format_session_error, rv); } session_endpoint_free_ext_cfgs (&a->sep_ext); vec_free (a); - - return rv; } static void @@ -710,6 +840,7 @@ hc_connect () vnet_connect_args_t *a = 0; transport_endpt_ext_cfg_t *ext_cfg; transport_endpt_cfg_http_t http_cfg = { (u32) hcm->timeout, 0 }; + vec_validate (a, 0); clib_memset (a, 0, sizeof (a[0])); clib_memcpy (&a->sep_ext, &hcm->connect_sep, sizeof (hcm->connect_sep)); @@ -731,7 +862,7 @@ hc_connect () ext_cfg->crypto.alpn_protos[0] = TLS_ALPN_PROTO_HTTP_1_1; break; case HTTP_VERSION_2: - ext_cfg->crypto.alpn_protos[1] = TLS_ALPN_PROTO_HTTP_2; + ext_cfg->crypto.alpn_protos[0] = TLS_ALPN_PROTO_HTTP_2; break; default: break; @@ -753,7 +884,7 @@ hc_get_req_stats (vlib_main_t *vm) hc_session_t *hc_session; vec_foreach (wrk, hcm->wrk) { - vec_foreach (hc_session, wrk->sessions) + pool_foreach (hc_session, wrk->sessions) { hc_stats.request_count += hc_session->stats.request_count; hc_session->stats.request_count = 0; @@ -809,6 +940,9 @@ hc_get_event (vlib_main_t *vm) case HC_CONNECT_FAILED: err = clib_error_return (0, "error: failed to connect"); break; + case HC_MAX_STREAMS_HIT: + err = clib_error_return (0, "error: max streams hit"); + break; case HC_TRANSPORT_CLOSED: err = clib_error_return (0, "error: transport closed"); break; @@ -922,7 +1056,7 @@ hc_worker_cleanup (hc_worker_t *wrk) HTTP_DBG (1, "worker and worker sessions cleanup"); vec_free (wrk->headers_buf); - vec_foreach (hc_session, wrk->sessions) + pool_foreach (hc_session, wrk->sessions) { http_free_header_table (&hc_session->resp_headers); vec_free (hc_session->http_response); @@ -963,7 +1097,7 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input, hc_main_t *hcm = &hc_main; clib_error_t *err = 0; unformat_input_t _line_input, *line_input = &_line_input; - u64 mem_size; + u64 mem_size, repeat_count = 0; u8 *appns_id = 0; u8 *path = 0; u8 *file_data; @@ -972,13 +1106,13 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input, u8 *value; int rv; hcm->timeout = 10; - hcm->repeat_count = 0; hcm->duration = 0; hcm->repeat = false; hcm->multi_session = false; hcm->done_count = 0; hcm->connected_counter = 0; hcm->max_sessions = 1; + hcm->max_streams = 1; hcm->prealloc_fifos = 0; hcm->private_segment_size = 0; hcm->fifo_size = 0; @@ -1034,7 +1168,7 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input, hcm->verbose = true; else if (unformat (line_input, "timeout %f", &hcm->timeout)) ; - else if (unformat (line_input, "repeat %d", &hcm->repeat_count)) + else if (unformat (line_input, "repeat %d", &repeat_count)) { hcm->repeat = true; } @@ -1049,6 +1183,14 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input, goto done; } } + else if (unformat (line_input, "streams %d", &hcm->max_streams)) + { + if (hcm->max_streams <= 1) + { + err = clib_error_return (0, "streams must be > 1"); + goto done; + } + } else if (unformat (line_input, "prealloc-fifos %d", &hcm->prealloc_fifos)) ; @@ -1099,7 +1241,7 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input, } } - if (hcm->duration && hcm->repeat_count) + if (hcm->duration && repeat_count) { err = clib_error_return ( 0, "combining duration and repeat is not supported"); @@ -1113,6 +1255,21 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input, goto done; } + if (hcm->max_streams > 1 && !hcm->repeat) + { + err = clib_error_return ( + 0, "multiple streams are only supported with request repeating"); + goto done; + } + + if (repeat_count) + { + hcm->reqs_per_session = + repeat_count / (hcm->max_sessions * hcm->max_streams); + hcm->reqs_remainder = + repeat_count % (hcm->max_sessions * hcm->max_streams); + } + if ((rv = parse_target ((char **) &hcm->uri, (char **) &hcm->target))) { err = clib_error_return (0, "target parse error: %U", diff --git a/src/plugins/http/http.c b/src/plugins/http/http.c index ccf987a6ad0..fc1fc81ff22 100644 --- a/src/plugins/http/http.c +++ b/src/plugins/http/http.c @@ -788,6 +788,7 @@ http_transport_enable (vlib_main_t *vm, u8 is_en) http_main_t *hm = &http_main; u32 num_threads, i; http_engine_vft_t *http_version; + http_worker_t *wrk; if (!is_en) { @@ -828,6 +829,10 @@ http_transport_enable (vlib_main_t *vm, u8 is_en) } vec_validate (hm->wrk, num_threads - 1); + vec_foreach (wrk, hm->wrk) + { + clib_spinlock_init (&wrk->pending_stream_connects_lock); + } vec_validate (hm->rx_bufs, num_threads - 1); vec_validate (hm->tx_bufs, num_threads - 1); vec_validate (hm->app_header_lists, num_threads - 1); @@ -858,11 +863,10 @@ http_transport_enable (vlib_main_t *vm, u8 is_en) } static int -http_transport_connect (transport_endpoint_cfg_t *tep) +http_connect_connection (session_endpoint_cfg_t *sep) { vnet_connect_args_t _cargs, *cargs = &_cargs; http_main_t *hm = &http_main; - session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) tep; application_t *app; http_conn_t *hc; int error; @@ -942,6 +946,127 @@ http_transport_connect (transport_endpoint_cfg_t *tep) return 0; } +static int +http_connect_stream (u64 parent_handle, u32 opaque) +{ + session_t *hs; + http_req_handle_t rh; + u32 hc_index; + http_conn_t *hc; + + hs = session_get_from_handle (parent_handle); + if (session_type_transport_proto (hs->session_type) != TRANSPORT_PROTO_HTTP) + { + HTTP_DBG (1, "received incompatible session"); + return -1; + } + + rh.as_u32 = hs->connection_index; + if (rh.version != HTTP_VERSION_2) + { + HTTP_DBG (1, "%U multiplexing not supported", format_http_version, + rh.version); + return -1; + } + + hc_index = http_vfts[rh.version].hc_index_get_by_req_index ( + rh.req_index, hs->thread_index); + HTTP_DBG (1, "hc [%u]%x", hs->thread_index, hc_index); + + hc = http_conn_get_w_thread (hc_index, hs->thread_index); + + if (hc->state == HTTP_CONN_STATE_CLOSED) + { + HTTP_DBG (1, "conn closed"); + return -1; + } + + return http_vfts[rh.version].conn_connect_stream_callback (hc, opaque); +} + +static void +http_handle_stream_connects_rpc (void *args) +{ + clib_thread_index_t thread_index = pointer_to_uword (args); + http_worker_t *wrk; + u32 n_pending, max_connects, n_connects = 0; + http_pending_connect_stream_t *pc; + + wrk = http_worker_get (thread_index); + + clib_spinlock_lock (&wrk->pending_stream_connects_lock); + + n_pending = clib_fifo_elts (wrk->pending_connect_streams); + max_connects = clib_min (32, n_pending); + vec_validate (wrk->burst_connect_streams, max_connects); + + while (n_connects < max_connects) + clib_fifo_sub1 (wrk->pending_connect_streams, + wrk->burst_connect_streams[n_connects++]); + + clib_spinlock_unlock (&wrk->pending_stream_connects_lock); + + n_connects = 0; + while (n_connects < max_connects) + { + pc = &wrk->burst_connect_streams[n_connects++]; + http_connect_stream (pc->parent_handle, pc->opaque); + } + + /* more work to do? */ + if (max_connects < n_pending) + session_send_rpc_evt_to_thread_force ( + thread_index, http_handle_stream_connects_rpc, + uword_to_pointer ((uword) thread_index, void *)); +} + +static int +http_program_connect_stream (session_endpoint_cfg_t *sep) +{ + clib_thread_index_t parent_thread_index = + session_thread_from_handle (sep->parent_handle); + http_worker_t *wrk; + u32 n_pending; + + ASSERT (session_vlib_thread_is_cl_thread ()); + + /* if we are already on same worker as parent, handle connect */ + if (parent_thread_index == transport_cl_thread ()) + return http_connect_stream (sep->parent_handle, sep->opaque); + + /* if not on same worker as parent, queue request */ + wrk = http_worker_get (parent_thread_index); + + clib_spinlock_lock (&wrk->pending_stream_connects_lock); + + http_pending_connect_stream_t p = { .parent_handle = sep->parent_handle, + .opaque = sep->opaque }; + clib_fifo_add1 (wrk->pending_connect_streams, p); + n_pending = clib_fifo_elts (wrk->pending_connect_streams); + + clib_spinlock_unlock (&wrk->pending_stream_connects_lock); + + if (n_pending == 1) + session_send_rpc_evt_to_thread_force ( + parent_thread_index, http_handle_stream_connects_rpc, + uword_to_pointer ((uword) parent_thread_index, void *)); + + return 0; +} + +static int +http_transport_connect (transport_endpoint_cfg_t *tep) +{ + session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) tep; + session_t *hs; + + hs = session_get_from_handle_if_valid (sep->parent_handle); + if (hs) + return http_program_connect_stream (sep); + else + return http_connect_connection (sep); +} + static u32 http_start_listen (u32 app_listener_index, transport_endpoint_cfg_t *tep) { diff --git a/src/plugins/http/http1.c b/src/plugins/http/http1.c index e7ddaf350b1..d8e313bed6a 100644 --- a/src/plugins/http/http1.c +++ b/src/plugins/http/http1.c @@ -1954,7 +1954,7 @@ http1_transport_connected_callback (http_conn_t *hc) req = http1_conn_alloc_req (hc); http_req_state_change (req, HTTP_REQ_STATE_WAIT_APP_METHOD); - return http_conn_established (hc, req); + return http_conn_established (hc, req, hc->hc_pa_app_api_ctx); } static void diff --git a/src/plugins/http/http2/http2.c b/src/plugins/http/http2/http2.c index 52147b8a1e9..4a3ccf37bd2 100644 --- a/src/plugins/http/http2/http2.c +++ b/src/plugins/http/http2/http2.c @@ -104,7 +104,7 @@ typedef struct http2_conn_ctx_ u8 *unparsed_headers; /* temporary storing rx fragmented headers */ u8 *unsent_headers; /* temporary storing tx fragmented headers */ u32 unsent_headers_offset; - u32 client_req_index; + u32 req_num; } http2_conn_ctx_t; typedef struct http2_worker_ctx_ @@ -197,6 +197,7 @@ http2_conn_ctx_free (http_conn_t *hc) h2c = http2_conn_ctx_get_w_thread (hc); HTTP_DBG (1, "h2c [%u]%x", hc->c_thread_index, h2c - wrk->conn_pool); + ASSERT (h2c->req_num == 0); hash_free (h2c->req_by_stream_id); if (hc->flags & HTTP_CONN_F_HAS_REQUEST) hpack_dynamic_table_free (&h2c->decoder_dynamic_table); @@ -238,16 +239,19 @@ http2_conn_alloc_req (http_conn_t *hc) h2c - wrk->conn_pool, req_index); req->peer_window = h2c->peer_settings.initial_window_size; req->our_window = h2c->settings.initial_window_size; + h2c->req_num++; return req; } static_always_inline void http2_req_set_stream_id (http2_req_t *req, http2_conn_ctx_t *h2c, - u32 stream_id) + u32 stream_id, u8 unset_old) { HTTP_DBG (1, "req_index [%u]%x stream_id %u", req->base.c_thread_index, ((http_req_handle_t) req->base.hr_req_handle).req_index, stream_id); + if (unset_old && req->stream_id) + hash_unset (h2c->req_by_stream_id, req->stream_id); req->stream_id = stream_id; hash_set (h2c->req_by_stream_id, stream_id, ((http_req_handle_t) req->base.hr_req_handle).req_index); @@ -273,6 +277,7 @@ http2_conn_free_req (http2_conn_ctx_t *h2c, http2_req_t *req, if (CLIB_DEBUG) memset (req, 0xba, sizeof (*req)); pool_put (wrk->req_pool, req); + h2c->req_num--; } static inline void @@ -284,9 +289,7 @@ http2_conn_reset_req (http2_conn_ctx_t *h2c, http2_req_t *req, if (clib_llist_elt_is_linked (req, sched_list)) clib_llist_remove (wrk->req_pool, sched_list, req); http_buffer_free (&req->base.tx_buf); - if (req->stream_id) - hash_unset (h2c->req_by_stream_id, req->stream_id); - req->flags = 0; + req->flags &= ~HTTP2_REQ_F_NEED_WINDOW_UPDATE; req->stream_state = HTTP2_STREAM_STATE_IDLE; req->peer_window = h2c->peer_settings.initial_window_size; req->our_window = h2c->settings.initial_window_size; @@ -429,11 +432,11 @@ http2_connection_error (http_conn_t *hc, http2_error_t error, } else { - if (hc->flags & HTTP_CONN_F_HAS_REQUEST) - { - req = http2_req_get (h2c->client_req_index, hc->c_thread_index); - session_transport_reset_notify (&req->base.connection); - } + hash_foreach (stream_id, req_index, h2c->req_by_stream_id, ({ + req = http2_req_get (req_index, hc->c_thread_index); + session_transport_reset_notify ( + &req->base.connection); + })); } } if (clib_llist_elt_is_linked (h2c, sched_list)) @@ -1088,7 +1091,7 @@ http2_sched_dispatch_req_headers (http2_req_t *req, http_conn_t *hc, max_write = clib_min (max_write, h2c->peer_settings.max_frame_size); stream_id = http2_conn_get_next_stream_id (h2c); - http2_req_set_stream_id (req, h2c, stream_id); + http2_req_set_stream_id (req, h2c, stream_id, 1); http_io_as_dequeue_notify (&req->base, n_deq); @@ -1590,6 +1593,7 @@ http2_req_state_transport_io_more_data (http_conn_t *hc, http2_req_t *req, transport_connection_reschedule (&req->base.connection); h2c = http2_conn_ctx_get_w_thread (hc); http2_conn_reset_req (h2c, req, hc->c_thread_index); + http_io_as_del_want_read_ntf (&req->base); } } http_io_as_write (&req->base, req->payload, req->payload_len); @@ -1639,8 +1643,8 @@ http2_req_state_udp_tunnel_rx (http_conn_t *hc, http2_req_t *req, http2_error_t *error) { int rv; - u8 payload_offset; - u64 payload_len; + u8 payload_offset = 0; + u64 payload_len = 0; session_dgram_hdr_t hdr; HTTP_DBG (1, "udp tunnel received data from client"); @@ -1898,8 +1902,7 @@ http2_handle_headers_frame (http_conn_t *hc, http2_frame_header_t *fh) return HTTP2_ERROR_STREAM_CLOSED; } h2c->last_opened_stream_id = fh->stream_id; - if (hash_elts (h2c->req_by_stream_id) == - h2c->settings.max_concurrent_streams) + if (h2c->req_num == h2c->settings.max_concurrent_streams) { HTTP_DBG (1, "SETTINGS_MAX_CONCURRENT_STREAMS exceeded"); http_io_ts_drain (hc, fh->length); @@ -1908,7 +1911,7 @@ http2_handle_headers_frame (http_conn_t *hc, http2_frame_header_t *fh) return HTTP2_ERROR_NO_ERROR; } req = http2_conn_alloc_req (hc); - http2_req_set_stream_id (req, h2c, fh->stream_id); + http2_req_set_stream_id (req, h2c, fh->stream_id, 0); req->dispatch_headers_cb = http2_sched_dispatch_resp_headers; http_conn_accept_request (hc, &req->base); http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_TRANSPORT_METHOD); @@ -2150,7 +2153,9 @@ http2_handle_data_frame (http_conn_t *hc, http2_frame_header_t *fh) req->our_window -= fh->length; h2c->our_window -= fh->length; - HTTP_DBG (1, "run state machine"); + HTTP_DBG (1, "run state machine '%U' req_index %x", format_http_req_state, + req->base.state, + ((http_req_handle_t) req->base.hr_req_handle).req_index); return http2_req_run_state_machine (hc, req, 0, 0); } @@ -2272,14 +2277,13 @@ http2_handle_settings_frame (http_conn_t *hc, http2_frame_header_t *fh) h2c->flags &= ~HTTP2_CONN_F_EXPECT_SERVER_SETTINGS; HTTP_DBG (1, "client connection established"); req = http2_conn_alloc_req (hc); - h2c->client_req_index = - ((http_req_handle_t) req->base.hr_req_handle).req_index; + req->flags |= HTTP2_REQ_F_IS_PARENT; hc->flags |= HTTP_CONN_F_HAS_REQUEST; hpack_dynamic_table_init ( &h2c->decoder_dynamic_table, http2_default_conn_settings.header_table_size); http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_APP_METHOD); - if (http_conn_established (hc, &req->base)) + if (http_conn_established (hc, &req->base, hc->hc_pa_app_api_ctx)) return HTTP2_ERROR_INTERNAL_ERROR; } @@ -2388,10 +2392,11 @@ http2_handle_goaway_frame (http_conn_t *hc, http2_frame_header_t *fh) if (!(hc->flags & HTTP_CONN_F_IS_SERVER)) { ASSERT (hc->flags & HTTP_CONN_F_HAS_REQUEST); - req = http2_req_get (h2c->client_req_index, hc->c_thread_index); - if (!req) - return HTTP2_ERROR_NO_ERROR; - session_transport_closed_notify (&req->base.connection); + hash_foreach (stream_id, req_index, h2c->req_by_stream_id, ({ + req = http2_req_get (req_index, hc->c_thread_index); + session_transport_closed_notify ( + &req->base.connection); + })); } } else @@ -2635,6 +2640,12 @@ http2_app_close_callback (http_conn_t *hc, u32 req_index, { HTTP_DBG (1, "nothing more to send, confirm close"); session_transport_closed_notify (&req->base.connection); + if (req->flags & HTTP2_REQ_F_IS_PARENT) + { + HTTP_DBG (1, "client app closed parent, closing connection"); + ASSERT (!(hc->flags & HTTP_CONN_F_IS_SERVER)); + http_shutdown_transport (hc); + } } else if (req->base.is_tunnel) { @@ -2950,6 +2961,28 @@ http2_conn_accept_callback (http_conn_t *hc) h2c->flags |= HTTP2_CONN_F_PREFACE_VERIFIED; } +static int +http2_conn_connect_stream_callback (http_conn_t *hc, u32 parent_app_api_ctx) +{ + http2_conn_ctx_t *h2c; + http2_req_t *req; + app_worker_t *app_wrk; + + HTTP_DBG (1, "hc [%u]%x", hc->c_thread_index, hc->hc_hc_index); + h2c = http2_conn_ctx_get_w_thread (hc); + ASSERT (!(hc->flags & HTTP_CONN_F_IS_SERVER)); + ASSERT (!(h2c->flags & HTTP2_CONN_F_EXPECT_SERVER_SETTINGS)); + app_wrk = app_worker_get_if_valid (hc->hc_pa_wrk_index); + if (!app_wrk) + return -1; + if (h2c->req_num == h2c->settings.max_concurrent_streams) + return app_worker_connect_notify (app_wrk, 0, SESSION_E_MAX_STREAMS_HIT, + hc->hc_pa_app_api_ctx); + req = http2_conn_alloc_req (hc); + http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_APP_METHOD); + return http_conn_established (hc, &req->base, parent_app_api_ctx); +} + static void http2_conn_cleanup_callback (http_conn_t *hc) { @@ -2959,14 +2992,8 @@ http2_conn_cleanup_callback (http_conn_t *hc) HTTP_DBG (1, "hc [%u]%x", hc->c_thread_index, hc->hc_hc_index); h2c = http2_conn_ctx_get_w_thread (hc); - if (hc->flags & HTTP_CONN_F_IS_SERVER) - hash_foreach (stream_id, req_index, h2c->req_by_stream_id, - ({ vec_add1 (req_indices, req_index); })); - else - { - if (hc->flags & HTTP_CONN_F_HAS_REQUEST) - vec_add1 (req_indices, h2c->client_req_index); - } + hash_foreach (stream_id, req_index, h2c->req_by_stream_id, + ({ vec_add1 (req_indices, req_index); })); vec_foreach (req_index_p, req_indices) { @@ -3074,6 +3101,7 @@ const static http_engine_vft_t http2_engine = { .transport_conn_reschedule_callback = http2_transport_conn_reschedule_callback, .conn_accept_callback = http2_conn_accept_callback, + .conn_connect_stream_callback = http2_conn_connect_stream_callback, .conn_cleanup_callback = http2_conn_cleanup_callback, .enable_callback = http2_enable_callback, .unformat_cfg_callback = http2_unformat_config_callback, diff --git a/src/plugins/http/http_private.h b/src/plugins/http/http_private.h index b6a63b9711f..f6666af81e1 100644 --- a/src/plugins/http/http_private.h +++ b/src/plugins/http/http_private.h @@ -209,9 +209,18 @@ typedef struct http_tc_ void *opaque; /* version specific data */ } http_conn_t; +typedef struct http_pending_connect_stream_ +{ + u64 parent_handle; + u32 opaque; +} http_pending_connect_stream_t; + typedef struct http_worker_ { http_conn_t *conn_pool; + clib_spinlock_t pending_stream_connects_lock; + http_pending_connect_stream_t *pending_connect_streams; + http_pending_connect_stream_t *burst_connect_streams; } http_worker_t; typedef struct http_main_ @@ -265,6 +274,8 @@ typedef struct http_engine_vft_ void (*transport_reset_callback) (http_conn_t *hc); void (*transport_conn_reschedule_callback) (http_conn_t *hc); void (*conn_accept_callback) (http_conn_t *hc); /* optional */ + int (*conn_connect_stream_callback) (http_conn_t *hc, + u32 parent_app_api_ctx); /* optional */ void (*conn_cleanup_callback) (http_conn_t *hc); void (*enable_callback) (void); /* optional */ uword (*unformat_cfg_callback) (unformat_input_t *input); /* optional */ @@ -556,6 +567,14 @@ http_io_as_add_want_read_ntf (http_req_t *req) SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY); } +always_inline void +http_io_as_del_want_read_ntf (http_req_t *req) +{ + session_t *as = session_get_from_handle (req->hr_pa_session_handle); + svm_fifo_del_want_deq_ntf (as->rx_fifo, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL | + SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY); +} + always_inline void http_io_as_reset_has_read_ntf (http_req_t *req) { @@ -859,7 +878,8 @@ http_conn_accept_request (http_conn_t *hc, http_req_t *req) } always_inline int -http_conn_established (http_conn_t *hc, http_req_t *req) +http_conn_established (http_conn_t *hc, http_req_t *req, + u32 parent_app_api_ctx) { session_t *as; app_worker_t *app_wrk; @@ -873,7 +893,7 @@ http_conn_established (http_conn_t *hc, http_req_t *req) as->app_wrk_index = hc->hc_pa_wrk_index; as->connection_index = req->hr_req_handle; as->session_state = SESSION_STATE_READY; - as->opaque = hc->hc_pa_app_api_ctx; + as->opaque = parent_app_api_ctx; ts = session_get_from_handle (hc->hc_tc_session_handle); as->session_type = session_type_from_proto_and_ip ( TRANSPORT_PROTO_HTTP, session_type_is_ip4 (ts->session_type)); @@ -895,7 +915,7 @@ http_conn_established (http_conn_t *hc, http_req_t *req) return rv; } - app_worker_connect_notify (app_wrk, as, 0, hc->hc_pa_app_api_ctx); + app_worker_connect_notify (app_wrk, as, 0, parent_app_api_ctx); req->hr_pa_session_handle = session_handle (as); req->hr_pa_wrk_index = as->app_wrk_index;