From: Matus Fabian Date: Thu, 17 Apr 2025 13:47:08 +0000 (-0400) Subject: http: http/2 flow control X-Git-Tag: v25.10-rc0~38 X-Git-Url: https://gerrit.fd.io/r/gitweb?a=commitdiff_plain;h=ad5159837e298eb5d182c04f73e8a00613cc0833;p=vpp.git http: http/2 flow control Type: feature Change-Id: Id8e2c51d19a9aef9a74c000dad490ede5eebf2b6 Signed-off-by: Matus Fabian --- diff --git a/extras/hs-test/go.mod b/extras/hs-test/go.mod index 8da936be5b0..0f11b1431b0 100644 --- a/extras/hs-test/go.mod +++ b/extras/hs-test/go.mod @@ -13,6 +13,7 @@ require ( github.com/sirupsen/logrus v1.9.3 github.com/summerwind/h2spec v2.2.1+incompatible go.fd.io/govpp v0.10.0 + golang.org/x/net v0.28.0 gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.30.2 k8s.io/apimachinery v0.30.2 @@ -81,7 +82,6 @@ require ( golang.org/x/crypto v0.26.0 // indirect golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 // indirect golang.org/x/mod v0.18.0 // indirect - golang.org/x/net v0.28.0 // indirect golang.org/x/oauth2 v0.18.0 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.23.0 // indirect diff --git a/extras/hs-test/h2spec_extras/h2spec_extras.go b/extras/hs-test/h2spec_extras/h2spec_extras.go new file mode 100644 index 00000000000..3c2b9dd76a9 --- /dev/null +++ b/extras/hs-test/h2spec_extras/h2spec_extras.go @@ -0,0 +1,170 @@ +package h2spec_extras + +import ( + "fmt" + + "github.com/summerwind/h2spec/config" + "github.com/summerwind/h2spec/spec" + "golang.org/x/net/http2" +) + +var key = "extras" + +func NewTestGroup(section string, name string) *spec.TestGroup { + return &spec.TestGroup{ + Key: key, + Section: section, + Name: name, + } +} + +func Spec() *spec.TestGroup { + tg := &spec.TestGroup{ + Key: key, + Name: "extras for HTTP/2 server", + } + + tg.AddTestGroup(FlowControl()) + + return tg +} + +func VerifyWindowUpdate(conn *spec.Conn, streamID, expectedIncrement uint32) error { + actual, passed := conn.WaitEventByType(spec.EventWindowUpdateFrame) + actualStr := actual.String() + switch event := actual.(type) { + case spec.WindowUpdateFrameEvent: + actualStr = fmt.Sprintf("WINDOW_UPDATE Frame (stream_id:%d, increment:%d)", event.StreamID, event.Increment) + passed = (event.StreamID == streamID) && (event.Increment == expectedIncrement) + default: + passed = false + } + + if !passed { + expected := []string{ + fmt.Sprintf("WINDOW_UPDATE Frame (stream_id:%d, increment:%d)", streamID, expectedIncrement), + } + + return &spec.TestError{ + Expected: expected, + Actual: actualStr, + } + } + return nil +} + +func FlowControl() *spec.TestGroup { + tg := NewTestGroup("1", "Flow control") + tg.AddTestCase(&spec.TestCase{ + Desc: "Sends a WINDOW_UPDATE frame on connection", + Requirement: "The endpoint MUST NOT send a flow-controlled frame with a length that exceeds the space available.", + Run: func(c *config.Config, conn *spec.Conn) error { + var streamID uint32 = 1 + + // turn off automatic connection window update + conn.WindowUpdate = false + + err := conn.Handshake() + if err != nil { + return err + } + + headers := spec.CommonHeaders(c) + headers[2].Value = "/4kB" + + // consume most of the connection window + for i := 0; i <= 14; i++ { + hp := http2.HeadersFrameParam{ + StreamID: streamID, + EndStream: true, + EndHeaders: true, + BlockFragment: conn.EncodeHeaders(headers), + } + conn.WriteHeaders(hp) + streamID += 2 + err := spec.VerifyEventType(conn, spec.EventDataFrame) + if err != nil { + return err + } + } + + hp := http2.HeadersFrameParam{ + StreamID: streamID, + EndStream: true, + EndHeaders: true, + BlockFragment: conn.EncodeHeaders(headers), + } + conn.WriteHeaders(hp) + // verify reception of DATA frame + err = spec.VerifyEventType(conn, spec.EventDataFrame) + if err != nil { + return err + } + + // increment connection window + conn.WriteWindowUpdate(0, 65535) + + // wait for DATA frame with rest of the content + actual, passed := conn.WaitEventByType(spec.EventDataFrame) + switch event := actual.(type) { + case spec.DataFrameEvent: + passed = event.Header().Length == 1 + default: + passed = false + } + + if !passed { + expected := []string{ + fmt.Sprintf("DATA Frame (length:1, flags:0x00, stream_id:%d)", streamID), + } + + return &spec.TestError{ + Expected: expected, + Actual: actual.String(), + } + } + + return nil + }, + }) + + tg.AddTestCase(&spec.TestCase{ + Desc: "Receive a WINDOW_UPDATE frame on stream", + Requirement: "The receiver of a frame sends a WINDOW_UPDATE frame as it consumes data and frees up space in flow-control windows.", + Run: func(c *config.Config, conn *spec.Conn) error { + var streamID uint32 = 1 + + err := conn.Handshake() + if err != nil { + return err + } + + headers := spec.CommonHeaders(c) + headers[0].Value = "POST" + headers = append(headers, spec.HeaderField("content-length", "12")) + hp := http2.HeadersFrameParam{ + StreamID: streamID, + EndStream: false, + EndHeaders: true, + BlockFragment: conn.EncodeHeaders(headers), + } + conn.WriteHeaders(hp) + // we send window update on stream when app read data from rx fifo, so send DATA frame and wait for WINDOW_UPDATE frame + conn.WriteData(streamID, false, []byte("AAAA")) + err = VerifyWindowUpdate(conn, streamID, 4) + if err != nil { + return err + } + // test it again + conn.WriteData(streamID, false, []byte("BBBBB")) + err = VerifyWindowUpdate(conn, streamID, 5) + if err != nil { + return err + } + // we don't send stream window update if stream is half-closed, so HEADERS frame should be received + conn.WriteData(streamID, true, []byte("CCC")) + return spec.VerifyHeadersFrame(conn, streamID) + }, + }) + return tg +} diff --git a/extras/hs-test/http2_test.go b/extras/hs-test/http2_test.go index 5fa32192652..1be14554441 100644 --- a/extras/hs-test/http2_test.go +++ b/extras/hs-test/http2_test.go @@ -9,7 +9,7 @@ import ( ) func init() { - RegisterH2Tests(Http2TcpGetTest) + RegisterH2Tests(Http2TcpGetTest, Http2TcpPostTest) } func Http2TcpGetTest(s *H2Suite) { @@ -48,3 +48,13 @@ func Http2TcpGetTest(s *H2Suite) { o := vpp.Vppctl("show session verbose proto http") s.AssertNotContains(o, "LISTEN") } + +func Http2TcpPostTest(s *H2Suite) { + vpp := s.Containers.Vpp.VppInstance + serverAddress := s.VppAddr() + s.Log(vpp.Vppctl("http static server uri tcp://" + serverAddress + "/80 url-handlers max-body-size 20m rx-buff-thresh 20m fifo-size 65k debug 2")) + s.Log(vpp.Vppctl("test-url-handler enable")) + args := fmt.Sprintf("--max-time 10 --noproxy '*' --data-binary @%s --http2-prior-knowledge http://%s:80/test3", CurlContainerTestFile, serverAddress) + _, log := s.RunCurlContainer(s.Containers.Curl, args) + s.AssertContains(log, "HTTP/2 200") +} diff --git a/extras/hs-test/infra/suite_h2.go b/extras/hs-test/infra/suite_h2.go index 6e910b9faf4..8f7426a9b19 100644 --- a/extras/hs-test/infra/suite_h2.go +++ b/extras/hs-test/infra/suite_h2.go @@ -11,6 +11,7 @@ import ( "github.com/summerwind/h2spec/spec" + "fd.io/hs-test/h2spec_extras" . "github.com/onsi/ginkgo/v2" "github.com/summerwind/h2spec/config" "github.com/summerwind/h2spec/generic" @@ -124,9 +125,8 @@ var genericTests = []h2specTest{ {desc: "generic/3.5/1"}, {desc: "generic/3.7/1"}, {desc: "generic/3.8/1"}, - // TODO: flow control - //{desc: "generic/3.9/1"}, - //{desc: "generic/3.9/2"}, + {desc: "generic/3.9/1"}, + {desc: "generic/3.9/2"}, // TODO: CONTINUATION //{desc: "generic/3.10/1"}, //{desc: "generic/3.10/2"}, @@ -180,12 +180,10 @@ var http2Tests = []h2specTest{ {desc: "http2/4.3/3"}, {desc: "http2/5.1.1/1"}, {desc: "http2/5.1.1/2"}, - // TODO: flow control - // {desc: "http2/5.1.2/1"}, + {desc: "http2/5.1.2/1"}, {desc: "http2/5.1/1"}, {desc: "http2/5.1/2"}, - // TODO: flow control - // {desc: "http2/5.1/3"}, + {desc: "http2/5.1/3"}, // TODO: CONTINUATION // {desc: "http2/5.1/4"}, {desc: "http2/5.1/5"}, @@ -222,8 +220,7 @@ var http2Tests = []h2specTest{ {desc: "http2/6.5.2/3"}, {desc: "http2/6.5.2/4"}, {desc: "http2/6.5.2/5"}, - // TODO: flow control - // {desc: "http2/6.5.3/1"}, + {desc: "http2/6.5.3/1"}, {desc: "http2/6.5.3/2"}, {desc: "http2/6.5/1"}, {desc: "http2/6.5/2"}, @@ -233,16 +230,17 @@ var http2Tests = []h2specTest{ {desc: "http2/6.7/3"}, {desc: "http2/6.7/4"}, {desc: "http2/6.8/1"}, - // TODO: flow control - // {desc: "http2/6.9.1/1"}, - // {desc: "http2/6.9.1/2"}, + {desc: "http2/6.9.1/1"}, + {desc: "http2/6.9.1/2"}, + // TODO: message framing without content length using END_STREAM flag // {desc: "http2/6.9.1/3"}, - // {desc: "http2/6.9.2/1"}, - // {desc: "http2/6.9.2/2"}, - // {desc: "http2/6.9.2/3"}, - // {desc: "http2/6.9/1"}, + {desc: "http2/6.9.2/1"}, + {desc: "http2/6.9.2/2"}, + {desc: "http2/6.9.2/3"}, + {desc: "http2/6.9/1"}, + // TODO: message framing without content length using END_STREAM flag // {desc: "http2/6.9/2"}, - // {desc: "http2/6.9/3"}, + {desc: "http2/6.9/3"}, // TODO: CONTINUATION // {desc: "http2/6.10/1"}, // {desc: "http2/6.10/2"}, @@ -273,10 +271,16 @@ var http2Tests = []h2specTest{ {desc: "http2/8.2/1"}, } +var extrasTests = []h2specTest{ + {desc: "extras/1/1"}, + {desc: "extras/1/2"}, +} + const ( GenericTestGroup int = 1 HpackTestGroup int = 2 Http2TestGroup int = 3 + ExtrasTestGroup int = 4 ) var specs = []struct { @@ -286,6 +290,7 @@ var specs = []struct { {GenericTestGroup, genericTests}, {HpackTestGroup, hpackTests}, {Http2TestGroup, http2Tests}, + {ExtrasTestGroup, extrasTests}, } // Marked as pending since http plugin is not build with http/2 enabled by default @@ -341,6 +346,9 @@ var _ = Describe("H2SpecSuite", Pending, Ordered, ContinueOnFailure, func() { case Http2TestGroup: tg = http2.Spec() break + case ExtrasTestGroup: + tg = h2spec_extras.Spec() + break } tg.Test(conf) diff --git a/src/plugins/hs_apps/test_builtins.c b/src/plugins/hs_apps/test_builtins.c index 4c324d5b953..5403be739ca 100644 --- a/src/plugins/hs_apps/test_builtins.c +++ b/src/plugins/hs_apps/test_builtins.c @@ -52,14 +52,15 @@ VLIB_REGISTER_NODE (test_builtins_timer_process_node) = { }; static void -send_data_to_hss (hss_session_handle_t sh, u8 *data, u8 free_vec_data) +send_data_to_hss (hss_session_handle_t sh, u8 *data, uword data_len, + u8 free_vec_data) { tb_main_t *tbm = &tb_main; hss_url_handler_args_t args = {}; args.sh = sh; args.data = data; - args.data_len = vec_len (data); + args.data_len = data_len; args.ct = HTTP_CONTENT_TEXT_PLAIN; args.sc = HTTP_STATUS_OK; args.free_vec_data = free_vec_data; @@ -74,7 +75,7 @@ handle_get_test1 (hss_url_handler_args_t *args) clib_warning ("get request on test1"); data = format (0, "hello"); - send_data_to_hss (args->sh, data, 1); + send_data_to_hss (args->sh, data, vec_len (data), 1); return HSS_URL_HANDLER_ASYNC; } @@ -86,7 +87,7 @@ handle_get_test2 (hss_url_handler_args_t *args) clib_warning ("get request on test2"); data = format (0, "some data"); - send_data_to_hss (args->sh, data, 1); + send_data_to_hss (args->sh, data, vec_len (data), 1); return HSS_URL_HANDLER_ASYNC; } @@ -106,7 +107,7 @@ delayed_resp_cb (u32 *expired_timers) e = pool_elt_at_index (tbm->delayed_resps, pool_index); clib_warning ("sending delayed data"); data = format (0, "delayed data"); - send_data_to_hss (e->sh, data, 1); + send_data_to_hss (e->sh, data, vec_len (data), 1); pool_put (tbm->delayed_resps, e); } } @@ -129,7 +130,7 @@ handle_get_test_delayed (hss_url_handler_args_t *args) static hss_url_handler_rc_t handle_post_test3 (hss_url_handler_args_t *args) { - send_data_to_hss (args->sh, 0, 0); + send_data_to_hss (args->sh, 0, 0, 0); return HSS_URL_HANDLER_ASYNC; } @@ -137,7 +138,15 @@ static hss_url_handler_rc_t handle_get_64bytes (hss_url_handler_args_t *args) { tb_main_t *tbm = &tb_main; - send_data_to_hss (args->sh, tbm->test_data, 0); + send_data_to_hss (args->sh, tbm->test_data, 64, 0); + return HSS_URL_HANDLER_ASYNC; +} + +static hss_url_handler_rc_t +handle_get_4kbytes (hss_url_handler_args_t *args) +{ + tb_main_t *tbm = &tb_main; + send_data_to_hss (args->sh, tbm->test_data, 4 << 10, 0); return HSS_URL_HANDLER_ASYNC; } @@ -157,8 +166,8 @@ test_builtins_init (vlib_main_t *vm) return; } - tbm->test_data = format ( - 0, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"); + /* init test data, big buffer */ + vec_validate_init_empty (tbm->test_data, (4 << 10) - 1, 'x'); (*fp) (handle_get_test1, "test1", HTTP_REQ_GET); (*fp) (handle_get_test1, "test1", HTTP_REQ_POST); @@ -166,6 +175,7 @@ test_builtins_init (vlib_main_t *vm) (*fp) (handle_get_test_delayed, "test_delayed", HTTP_REQ_GET); (*fp) (handle_post_test3, "test3", HTTP_REQ_POST); (*fp) (handle_get_64bytes, "64B", HTTP_REQ_GET); + (*fp) (handle_get_4kbytes, "4kB", HTTP_REQ_GET); tbm->send_data = vlib_get_plugin_symbol ("http_static_plugin.so", "hss_session_send_data"); diff --git a/src/plugins/http/http.c b/src/plugins/http/http.c index c7eefcdae48..94914aaccc3 100644 --- a/src/plugins/http/http.c +++ b/src/plugins/http/http.c @@ -904,6 +904,7 @@ http_start_listen (u32 app_listener_index, transport_endpoint_cfg_t *tep) http_conn_t *lhc; u32 lhc_index; transport_endpt_ext_cfg_t *ext_cfg; + segment_manager_props_t *props; sep = (session_endpoint_cfg_t *) tep; @@ -953,6 +954,9 @@ http_start_listen (u32 app_listener_index, transport_endpoint_cfg_t *tep) lhc->flags |= HTTP_CONN_F_IS_SERVER; + props = application_segment_manager_properties (app); + lhc->app_rx_fifo_size = props->rx_fifo_size; + if (vec_len (app->name)) lhc->app_name = vec_dup (app->name); else diff --git a/src/plugins/http/http2/frame.c b/src/plugins/http/http2/frame.c index c9c1931f02a..580ffff22c5 100644 --- a/src/plugins/http/http2/frame.c +++ b/src/plugins/http/http2/frame.c @@ -142,7 +142,7 @@ http2_frame_read_window_update (u32 *increment, u8 *payload, u32 payload_len) value = (u32 *) payload; - if (value == 0) + if (*value == 0) return HTTP2_ERROR_PROTOCOL_ERROR; *increment = clib_net_to_host_u32 (*value) & 0x7FFFFFFF; diff --git a/src/plugins/http/http2/http2.c b/src/plugins/http/http2/http2.c index 67db185823c..3c6949c3bc2 100644 --- a/src/plugins/http/http2/http2.c +++ b/src/plugins/http/http2/http2.c @@ -2,6 +2,7 @@ * Copyright(c) 2025 Cisco Systems, Inc. */ +#include #include #include #include @@ -11,6 +12,12 @@ #define HTTP_2_ENABLE 0 #endif +#define HTTP2_WIN_SIZE_MAX 0x7FFFFFFF +#define HTTP2_INITIAL_WIN_SIZE 65535 +/* connection-level flow control window kind of mirrors TCP flow control */ +/* TODO: configurable? */ +#define HTTP2_CONNECTION_WINDOW_SIZE (10 << 20) + #define foreach_http2_stream_state \ _ (IDLE, "IDLE") \ _ (OPEN, "OPEN") \ @@ -24,7 +31,9 @@ typedef enum http2_stream_state_ #undef _ } http2_stream_state_t; -#define foreach_http2_req_flags _ (APP_CLOSED, "app-closed") +#define foreach_http2_req_flags \ + _ (APP_CLOSED, "app-closed") \ + _ (NEED_WINDOW_UPDATE, "need-window-update") typedef enum http2_req_flags_bit_ { @@ -46,9 +55,11 @@ typedef struct http2_req_ http2_stream_state_t stream_state; u8 flags; u32 stream_id; - u64 peer_window; + i32 peer_window; /* can become negative after settings change */ + u32 our_window; u8 *payload; u32 payload_len; + clib_llist_anchor_t resume_list; } http2_req_t; #define foreach_http2_conn_flags \ @@ -76,8 +87,11 @@ typedef struct http2_conn_ctx_ u8 flags; u32 last_opened_stream_id; u32 last_processed_stream_id; - u64 peer_window; + u32 peer_window; + u32 our_window; uword *req_by_stream_id; + clib_llist_index_t streams_to_resume; + http2_conn_settings_t settings; } http2_conn_ctx_t; typedef struct http2_main_ @@ -99,8 +113,15 @@ http2_conn_ctx_alloc_w_thread (http_conn_t *hc) CLIB_CACHE_LINE_BYTES); clib_memset (h2c, 0, sizeof (*h2c)); h2c->peer_settings = http2_default_conn_settings; - h2c->peer_window = h2c->peer_settings.initial_window_size; + h2c->peer_window = HTTP2_INITIAL_WIN_SIZE; + h2c->our_window = HTTP2_CONNECTION_WINDOW_SIZE; + h2c->settings = h2m->settings; + /* adjust settings according to app rx_fifo size */ + h2c->settings.initial_window_size = + clib_min (h2c->settings.initial_window_size, hc->app_rx_fifo_size); h2c->req_by_stream_id = hash_create (0, sizeof (uword)); + h2c->streams_to_resume = + clib_llist_make_head (h2m->req_pool[hc->c_thread_index], resume_list); hc->opaque = uword_to_pointer (h2c - h2m->conn_pool[hc->c_thread_index], void *); HTTP_DBG (1, "h2c [%u]%x", hc->c_thread_index, @@ -154,10 +175,13 @@ http2_conn_alloc_req (http_conn_t *hc, u32 stream_id) req->base.c_thread_index = hc->c_thread_index; req->stream_id = stream_id; req->stream_state = HTTP2_STREAM_STATE_IDLE; + req->resume_list.next = CLIB_LLIST_INVALID_INDEX; + req->resume_list.prev = CLIB_LLIST_INVALID_INDEX; h2c = http2_conn_ctx_get_w_thread (hc); HTTP_DBG (1, "h2c [%u]%x req_index %x stream_id %u", hc->c_thread_index, h2c - h2m->conn_pool[hc->c_thread_index], req_index, stream_id); req->peer_window = h2c->peer_settings.initial_window_size; + req->our_window = h2c->settings.initial_window_size; hash_set (h2c->req_by_stream_id, stream_id, req_index); return req; } @@ -172,6 +196,8 @@ http2_conn_free_req (http2_conn_ctx_t *h2c, http2_req_t *req, h2c - h2m->conn_pool[thread_index], ((http_req_handle_t) req->base.hr_req_handle).req_index, req->stream_id); + if (clib_llist_elt_is_linked (req, resume_list)) + clib_llist_remove (h2m->req_pool[thread_index], resume_list, req); vec_free (req->base.headers); vec_free (req->base.target); http_buffer_free (&req->base.tx_buf); @@ -210,6 +236,56 @@ http2_req_get (u32 req_index, clib_thread_index_t thread_index) return pool_elt_at_index (h2m->req_pool[thread_index], req_index); } +always_inline int +http2_req_update_peer_window (http2_req_t *req, i64 delta) +{ + i64 new_value; + + new_value = (i64) req->peer_window + delta; + if (new_value > HTTP2_WIN_SIZE_MAX) + return -1; + req->peer_window = (i32) new_value; + HTTP_DBG (1, "new window size %d", req->peer_window); + return 0; +} + +always_inline void +http2_req_add_to_resume_list (http2_conn_ctx_t *h2c, http2_req_t *req) +{ + http2_main_t *h2m = &http2_main; + http2_req_t *he; + + req->flags &= ~HTTP2_REQ_F_NEED_WINDOW_UPDATE; + he = clib_llist_elt (h2m->req_pool[req->base.c_thread_index], + h2c->streams_to_resume); + clib_llist_add_tail (h2m->req_pool[req->base.c_thread_index], resume_list, + req, he); +} + +always_inline void +http2_resume_list_process (http_conn_t *hc) +{ + http2_main_t *h2m = &http2_main; + http2_req_t *he, *req; + http2_conn_ctx_t *h2c; + + h2c = http2_conn_ctx_get_w_thread (hc); + he = + clib_llist_elt (h2m->req_pool[hc->c_thread_index], h2c->streams_to_resume); + + /* check if something in list and reschedule first app session from list if + * we have some space in connection window */ + if (h2c->peer_window > 0 && + !clib_llist_is_empty (h2m->req_pool[hc->c_thread_index], resume_list, + he)) + { + req = + clib_llist_next (h2m->req_pool[hc->c_thread_index], resume_list, he); + clib_llist_remove (h2m->req_pool[hc->c_thread_index], resume_list, req); + transport_connection_reschedule (&req->base.connection); + } +} + /* send GOAWAY frame and close TCP connection */ always_inline void http2_connection_error (http_conn_t *hc, http2_error_t error, @@ -286,23 +362,26 @@ always_inline void http2_send_server_preface (http_conn_t *hc) { u8 *response; - http2_main_t *h2m = &http2_main; http2_settings_entry_t *setting, *settings_list = 0; + http2_conn_ctx_t *h2c = http2_conn_ctx_get_w_thread (hc); #define _(v, label, member, min, max, default_value, err_code) \ - if (h2m->settings.member != default_value) \ + if (h2c->settings.member != default_value) \ { \ vec_add2 (settings_list, setting, 1); \ setting->identifier = HTTP2_SETTINGS_##label; \ - setting->value = h2m->settings.member; \ + setting->value = h2c->settings.member; \ } foreach_http2_settings #undef _ response = http_get_tx_buf (hc); http2_frame_write_settings (settings_list, &response); + /* send also connection window update */ + http2_frame_write_window_update (h2c->our_window - HTTP2_INITIAL_WIN_SIZE, 0, + &response); http_io_ts_write (hc, response, vec_len (response), 0); - http_io_ts_after_write (hc, 0); + http_io_ts_after_write (hc, 1); } /*************************************/ @@ -389,6 +468,7 @@ http2_req_state_wait_transport_method (http_conn_t *hc, http2_req_t *req, return HTTP_SM_STOP; } new_state = HTTP_REQ_STATE_TRANSPORT_IO_MORE_DATA; + http_io_as_add_want_read_ntf (&req->base); } /* TODO: message framing without content length using END_STREAM flag */ if (req->base.body_len == 0 && req->stream_state == HTTP2_STREAM_STATE_OPEN) @@ -444,6 +524,8 @@ http2_req_state_transport_io_more_data (http_conn_t *hc, http2_req_t *req, transport_send_params_t *sp, http2_error_t *error) { + u32 max_enq; + if (req->payload_len > req->base.to_recv) { HTTP_DBG (1, "received more data than expected"); @@ -458,6 +540,13 @@ http2_req_state_transport_io_more_data (http_conn_t *hc, http2_req_t *req, http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp); return HTTP_SM_STOP; } + max_enq = http_io_as_max_write (&req->base); + if (max_enq < req->payload_len) + { + clib_warning ("app's rx fifo full"); + http2_stream_error (hc, req, HTTP2_ERROR_INTERNAL_ERROR, sp); + return HTTP_SM_STOP; + } if (req->base.to_recv == 0) http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_APP_REPLY); http_io_as_write (&req->base, req->payload, req->payload_len); @@ -482,6 +571,7 @@ http2_req_state_wait_app_reply (http_conn_t *hc, http2_req_t *req, u8 flags = HTTP2_FRAME_FLAG_END_HEADERS; http_sm_result_t sm_result = HTTP_SM_ERROR; u32 n_written; + http2_conn_ctx_t *h2c; http_get_app_msg (&req->base, &msg); ASSERT (msg.type == HTTP_MSG_REPLY); @@ -503,6 +593,15 @@ http2_req_state_wait_app_reply (http_conn_t *hc, http2_req_t *req, &response); vec_free (date); + h2c = http2_conn_ctx_get_w_thread (hc); + if (vec_len (response) > h2c->peer_settings.max_frame_size) + { + /* TODO: CONTINUATION (headers fragmentation) */ + clib_warning ("resp headers greater than SETTINGS_MAX_FRAME_SIZE"); + *error = HTTP2_ERROR_INTERNAL_ERROR; + return HTTP_SM_ERROR; + } + if (msg.data.body_len) { /* start sending the actual data */ @@ -539,18 +638,42 @@ http2_req_state_app_io_more_data (http_conn_t *hc, http2_req_t *req, http_buffer_t *hb = &req->base.tx_buf; u8 fh[HTTP2_FRAME_HEADER_SIZE]; u8 finished = 0, flags = 0; + http2_conn_ctx_t *h2c; ASSERT (http_buffer_bytes_left (hb) > 0); + + if (req->peer_window <= 0) + { + HTTP_DBG (1, "stream window is full"); + /* mark that we need window update on stream */ + req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE; + http_req_deschedule (&req->base, sp); + return HTTP_SM_STOP; + } + h2c = http2_conn_ctx_get_w_thread (hc); + if (h2c->peer_window == 0) + { + HTTP_DBG (1, "connection window is full"); + /* add to waiting queue */ + http2_req_add_to_resume_list (h2c, req); + http_req_deschedule (&req->base, sp); + return HTTP_SM_STOP; + } + max_write = http_io_ts_max_write (hc, sp); if (max_write <= HTTP2_FRAME_HEADER_SIZE) { HTTP_DBG (1, "ts tx fifo full"); goto check_fifo; } + max_write -= HTTP2_FRAME_HEADER_SIZE; + max_write = clib_min (max_write, (u32) req->peer_window); + max_write = clib_min (max_write, h2c->peer_window); + max_write = clib_min (max_write, h2c->peer_settings.max_frame_size); + max_read = http_buffer_bytes_left (hb); - n_read = http_buffer_get_segs (hb, max_write - HTTP2_FRAME_HEADER_SIZE, - &app_segs, &n_segs); + n_read = http_buffer_get_segs (hb, max_write, &app_segs, &n_segs); if (n_read == 0) { HTTP_DBG (1, "no data to deq"); @@ -569,6 +692,8 @@ http2_req_state_app_io_more_data (http_conn_t *hc, http2_req_t *req, ASSERT (n_written == (HTTP2_FRAME_HEADER_SIZE + n_read)); vec_free (segs); http_buffer_drain (hb, n_read); + req->peer_window -= n_read; + h2c->peer_window -= n_read; if (finished) { @@ -668,7 +793,6 @@ http2_req_run_state_machine (http_conn_t *hc, http2_req_t *req, static http2_error_t http2_handle_headers_frame (http_conn_t *hc, http2_frame_header_t *fh) { - http2_main_t *h2m = &http2_main; http2_req_t *req; u8 *rx_buf; http2_error_t rv; @@ -697,7 +821,7 @@ http2_handle_headers_frame (http_conn_t *hc, http2_frame_header_t *fh) } h2c->last_opened_stream_id = fh->stream_id; if (hash_elts (h2c->req_by_stream_id) == - h2m->settings.max_concurrent_streams) + h2c->settings.max_concurrent_streams) { HTTP_DBG (1, "SETTINGS_MAX_CONCURRENT_STREAMS exceeded"); http_io_ts_drain (hc, fh->length); @@ -748,6 +872,8 @@ http2_handle_data_frame (http_conn_t *hc, http2_frame_header_t *fh) http2_conn_ctx_t *h2c; req = http2_conn_get_req (hc, fh->stream_id); + h2c = http2_conn_ctx_get_w_thread (hc); + if (!req) { if (fh->stream_id == 0) @@ -755,7 +881,6 @@ http2_handle_data_frame (http_conn_t *hc, http2_frame_header_t *fh) HTTP_DBG (1, "DATA frame with stream id 0"); return HTTP2_ERROR_PROTOCOL_ERROR; } - h2c = http2_conn_ctx_get_w_thread (hc); if (fh->stream_id <= h2c->last_opened_stream_id) { HTTP_DBG (1, "stream closed, ignoring frame"); @@ -776,6 +901,18 @@ http2_handle_data_frame (http_conn_t *hc, http2_frame_header_t *fh) return HTTP2_ERROR_NO_ERROR; } + if (fh->length > req->our_window) + { + HTTP_DBG (1, "error: peer violated stream flow control"); + http2_stream_error (hc, req, HTTP2_ERROR_FLOW_CONTROL_ERROR, 0); + return HTTP2_ERROR_NO_ERROR; + } + if (fh->length > h2c->our_window) + { + HTTP_DBG (1, "error: peer violated connection flow control"); + return HTTP2_ERROR_FLOW_CONTROL_ERROR; + } + if (fh->flags & HTTP2_FRAME_FLAG_END_STREAM) req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED; @@ -788,6 +925,9 @@ http2_handle_data_frame (http_conn_t *hc, http2_frame_header_t *fh) if (rv != HTTP2_ERROR_NO_ERROR) return rv; + req->our_window -= fh->length; + h2c->our_window -= fh->length; + HTTP_DBG (1, "run state machine"); return http2_req_run_state_machine (hc, req, 0, 0); } @@ -798,6 +938,10 @@ http2_handle_window_update_frame (http_conn_t *hc, http2_frame_header_t *fh) u8 *rx_buf; u32 win_increment; http2_error_t rv; + http2_req_t *req; + http2_conn_ctx_t *h2c; + + h2c = http2_conn_ctx_get_w_thread (hc); rx_buf = http_get_rx_buf (hc); vec_validate (rx_buf, fh->length - 1); @@ -805,9 +949,57 @@ http2_handle_window_update_frame (http_conn_t *hc, http2_frame_header_t *fh) rv = http2_frame_read_window_update (&win_increment, rx_buf, fh->length); if (rv != HTTP2_ERROR_NO_ERROR) - return rv; + { + HTTP_DBG (1, "invalid WINDOW_UPDATE frame (stream id %u)", + fh->stream_id); + /* error on the connection flow-control window is connection error */ + if (fh->stream_id == 0) + return rv; + /* otherwise it is stream error */ + req = http2_conn_get_req (hc, fh->stream_id); + if (!req) + http2_send_stream_error (hc, fh->stream_id, rv, 0); + else + http2_stream_error (hc, req, rv, 0); + return HTTP2_ERROR_NO_ERROR; + } + + HTTP_DBG (1, "WINDOW_UPDATE %u (stream id %u)", win_increment, + fh->stream_id); + if (fh->stream_id == 0) + { + if (win_increment > (HTTP2_WIN_SIZE_MAX - h2c->peer_window)) + return HTTP2_ERROR_FLOW_CONTROL_ERROR; + h2c->peer_window += win_increment; + } + else + { + req = http2_conn_get_req (hc, fh->stream_id); + if (!req) + { + if (fh->stream_id > h2c->last_opened_stream_id) + { + HTTP_DBG ( + 1, + "received WINDOW_UPDATE frame on idle stream (stream id %u)", + fh->stream_id); + return HTTP2_ERROR_PROTOCOL_ERROR; + } + /* ignore window update on closed stream */ + return HTTP2_ERROR_NO_ERROR; + } + if (req->stream_state != HTTP2_STREAM_STATE_CLOSED) + { + if (http2_req_update_peer_window (req, win_increment)) + { + http2_stream_error (hc, req, HTTP2_ERROR_FLOW_CONTROL_ERROR, 0); + return HTTP2_ERROR_NO_ERROR; + } + if (req->flags & HTTP2_REQ_F_NEED_WINDOW_UPDATE) + http2_req_add_to_resume_list (h2c, req); + } + } - /* TODO: flow control */ return HTTP2_ERROR_NO_ERROR; } @@ -818,6 +1010,9 @@ http2_handle_settings_frame (http_conn_t *hc, http2_frame_header_t *fh) http2_error_t rv; http2_conn_settings_t new_settings; http2_conn_ctx_t *h2c; + http2_req_t *req; + u32 stream_id, req_index; + i32 win_size_delta; if (fh->stream_id != 0) return HTTP2_ERROR_PROTOCOL_ERROR; @@ -842,13 +1037,34 @@ http2_handle_settings_frame (http_conn_t *hc, http2_frame_header_t *fh) rv = http2_frame_read_settings (&new_settings, rx_buf, fh->length); if (rv != HTTP2_ERROR_NO_ERROR) return rv; - h2c->peer_settings = new_settings; /* ACK peer settings */ http2_frame_write_settings_ack (&resp); http_io_ts_write (hc, resp, vec_len (resp), 0); vec_free (resp); http_io_ts_after_write (hc, 0); + + /* change of SETTINGS_INITIAL_WINDOW_SIZE, we must adjust the size of all + * stream flow-control windows */ + if (h2c->peer_settings.initial_window_size != + new_settings.initial_window_size) + { + win_size_delta = (i32) new_settings.initial_window_size - + (i32) h2c->peer_settings.initial_window_size; + hash_foreach ( + stream_id, req_index, h2c->req_by_stream_id, ({ + req = http2_req_get (req_index, hc->c_thread_index); + if (req->stream_state != HTTP2_STREAM_STATE_CLOSED) + { + if (http2_req_update_peer_window (req, win_size_delta)) + http2_stream_error (hc, req, + HTTP2_ERROR_FLOW_CONTROL_ERROR, 0); + if (req->flags & HTTP2_REQ_F_NEED_WINDOW_UPDATE) + http2_req_add_to_resume_list (h2c, req); + } + })); + } + h2c->peer_settings = new_settings; } return HTTP2_ERROR_NO_ERROR; @@ -915,6 +1131,8 @@ http2_handle_goaway_frame (http_conn_t *hc, http2_frame_header_t *fh) if (rv != HTTP2_ERROR_NO_ERROR) return rv; + HTTP_DBG (1, "received GOAWAY %U", format_http2_error, error_code); + if (error_code == HTTP2_ERROR_NO_ERROR) { /* TODO: graceful shutdown (no new streams) */ @@ -1094,6 +1312,9 @@ http2_app_tx_callback (http_conn_t *hc, u32 req_index, return; } + /* maybe we can continue sending data on some stream */ + http2_resume_list_process (hc); + /* reset http connection expiration timer */ http_conn_timer_update (hc); } @@ -1103,6 +1324,30 @@ http2_app_rx_evt_callback (http_conn_t *hc, u32 req_index, clib_thread_index_t thread_index) { /* TODO: continue tunnel RX */ + http2_req_t *req; + u8 *response; + u32 increment; + + req = http2_req_get (req_index, thread_index); + if (!req) + { + HTTP_DBG (1, "req already deleted"); + return; + } + HTTP_DBG (1, "received app read notification stream id %u", req->stream_id); + /* send stream window update if app read data in rx fifo and we expect more + * data (stream is still open) */ + if (req->stream_state == HTTP2_STREAM_STATE_OPEN) + { + http_io_as_reset_has_read_ntf (&req->base); + response = http_get_tx_buf (hc); + increment = http_io_as_max_write (&req->base) - req->our_window; + HTTP_DBG (1, "stream window increment %u", increment); + req->our_window += increment; + http2_frame_write_window_update (increment, req->stream_id, &response); + http_io_ts_write (hc, response, vec_len (response), 0); + http_io_ts_after_write (hc, 0); + } } static void @@ -1156,7 +1401,6 @@ http2_transport_connected_callback (http_conn_t *hc) static void http2_transport_rx_callback (http_conn_t *hc) { - http2_main_t *h2m = &http2_main; http2_frame_header_t fh; u32 to_deq; u8 *rx_buf; @@ -1208,10 +1452,10 @@ http2_transport_rx_callback (http_conn_t *hc) http_io_ts_read (hc, rx_buf, HTTP2_FRAME_HEADER_SIZE, 1); to_deq -= HTTP2_FRAME_HEADER_SIZE; http2_frame_header_read (rx_buf, &fh); - if (fh.length > h2m->settings.max_frame_size) + if (fh.length > h2c->settings.max_frame_size) { HTTP_DBG (1, "frame length %lu exceeded SETTINGS_MAX_FRAME_SIZE %lu", - fh.length, h2m->settings.max_frame_size); + fh.length, h2c->settings.max_frame_size); http2_connection_error (hc, HTTP2_ERROR_FRAME_SIZE_ERROR, 0); return; } @@ -1278,6 +1522,21 @@ http2_transport_rx_callback (http_conn_t *hc) } } + /* send connection window update if more than half consumed */ + if (h2c->our_window < HTTP2_CONNECTION_WINDOW_SIZE / 2) + { + HTTP_DBG (1, "connection window increment %u", + HTTP2_CONNECTION_WINDOW_SIZE - h2c->our_window); + u8 *response = http_get_tx_buf (hc); + http2_frame_write_window_update ( + HTTP2_CONNECTION_WINDOW_SIZE - h2c->our_window, 0, &response); + http_io_ts_write (hc, response, vec_len (response), 0); + http_io_ts_after_write (hc, 0); + h2c->our_window = HTTP2_CONNECTION_WINDOW_SIZE; + } + /* maybe we can continue sending data on some stream */ + http2_resume_list_process (hc); + /* reset http connection expiration timer */ http_conn_timer_update (hc); } diff --git a/src/plugins/http/http_plugin.rst b/src/plugins/http/http_plugin.rst index 4e799a57668..4154a413726 100644 --- a/src/plugins/http/http_plugin.rst +++ b/src/plugins/http/http_plugin.rst @@ -231,6 +231,12 @@ Now we can start reading body content, following block of code could be executed u64 curr = vec_len (ctx->resp_body); rv = svm_fifo_dequeue (ts->rx_fifo, n_deq, ctx->resp_body + curr); ASSERT (rv == n_deq); + /* notify http transport that we read data if requested */ + if (svm_fifo_needs_deq_ntf (ts->rx_fifo, n_deq)) + { + svm_fifo_clear_deq_ntf (ts->rx_fifo); + session_program_transport_io_evt (ts->handle, SESSION_IO_EVT_RX); + } /* update length of the vector */ vec_set_len (ctx->resp_body, curr + n_deq); /* update number of remaining bytes to receive */ @@ -242,6 +248,9 @@ Now we can start reading body content, following block of code could be executed /* send 200 OK response */ } +.. note:: + When body content is read from the ``rx_fifo`` app need to send notification to HTTP layer if requested, it is required for HTTP/2 flow control. + Sending data """""""""""""" diff --git a/src/plugins/http/http_private.h b/src/plugins/http/http_private.h index 1f9812de7fa..662be060341 100644 --- a/src/plugins/http/http_private.h +++ b/src/plugins/http/http_private.h @@ -219,6 +219,7 @@ typedef struct http_tc_ http_conn_state_t state; u32 timer_handle; u32 timeout; + u32 app_rx_fifo_size; u8 *app_name; u8 *host; http_conn_flags_t flags; @@ -563,6 +564,21 @@ http_io_as_add_want_deq_ntf (http_req_t *req) svm_fifo_add_want_deq_ntf (as->rx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); } +always_inline void +http_io_as_add_want_read_ntf (http_req_t *req) +{ + session_t *as = session_get_from_handle (req->hr_pa_session_handle); + svm_fifo_add_want_deq_ntf (as->rx_fifo, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL | + SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY); +} + +always_inline void +http_io_as_reset_has_read_ntf (http_req_t *req) +{ + session_t *as = session_get_from_handle (req->hr_pa_session_handle); + svm_fifo_reset_has_deq_ntf (as->rx_fifo); +} + always_inline u32 http_io_as_max_write (http_req_t *req) {