http: http/2 flow control 33/42833/10
authorMatus Fabian <[email protected]>
Thu, 17 Apr 2025 13:47:08 +0000 (09:47 -0400)
committerFlorin Coras <[email protected]>
Tue, 6 May 2025 06:37:07 +0000 (06:37 +0000)
Type: feature

Change-Id: Id8e2c51d19a9aef9a74c000dad490ede5eebf2b6
Signed-off-by: Matus Fabian <[email protected]>
extras/hs-test/go.mod
extras/hs-test/h2spec_extras/h2spec_extras.go [new file with mode: 0644]
extras/hs-test/http2_test.go
extras/hs-test/infra/suite_h2.go
src/plugins/hs_apps/test_builtins.c
src/plugins/http/http.c
src/plugins/http/http2/frame.c
src/plugins/http/http2/http2.c
src/plugins/http/http_plugin.rst
src/plugins/http/http_private.h

index 8da936b..0f11b14 100644 (file)
@@ -13,6 +13,7 @@ require (
        github.com/sirupsen/logrus v1.9.3
        github.com/summerwind/h2spec v2.2.1+incompatible
        go.fd.io/govpp v0.10.0
+       golang.org/x/net v0.28.0
        gopkg.in/yaml.v3 v3.0.1
        k8s.io/api v0.30.2
        k8s.io/apimachinery v0.30.2
@@ -81,7 +82,6 @@ require (
        golang.org/x/crypto v0.26.0 // indirect
        golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 // indirect
        golang.org/x/mod v0.18.0 // indirect
-       golang.org/x/net v0.28.0 // indirect
        golang.org/x/oauth2 v0.18.0 // indirect
        golang.org/x/sync v0.8.0 // indirect
        golang.org/x/sys v0.23.0 // indirect
diff --git a/extras/hs-test/h2spec_extras/h2spec_extras.go b/extras/hs-test/h2spec_extras/h2spec_extras.go
new file mode 100644 (file)
index 0000000..3c2b9dd
--- /dev/null
@@ -0,0 +1,170 @@
+package h2spec_extras
+
+import (
+       "fmt"
+
+       "github.com/summerwind/h2spec/config"
+       "github.com/summerwind/h2spec/spec"
+       "golang.org/x/net/http2"
+)
+
+var key = "extras"
+
+func NewTestGroup(section string, name string) *spec.TestGroup {
+       return &spec.TestGroup{
+               Key:     key,
+               Section: section,
+               Name:    name,
+       }
+}
+
+func Spec() *spec.TestGroup {
+       tg := &spec.TestGroup{
+               Key:  key,
+               Name: "extras for HTTP/2 server",
+       }
+
+       tg.AddTestGroup(FlowControl())
+
+       return tg
+}
+
+func VerifyWindowUpdate(conn *spec.Conn, streamID, expectedIncrement uint32) error {
+       actual, passed := conn.WaitEventByType(spec.EventWindowUpdateFrame)
+       actualStr := actual.String()
+       switch event := actual.(type) {
+       case spec.WindowUpdateFrameEvent:
+               actualStr = fmt.Sprintf("WINDOW_UPDATE Frame (stream_id:%d, increment:%d)", event.StreamID, event.Increment)
+               passed = (event.StreamID == streamID) && (event.Increment == expectedIncrement)
+       default:
+               passed = false
+       }
+
+       if !passed {
+               expected := []string{
+                       fmt.Sprintf("WINDOW_UPDATE Frame (stream_id:%d, increment:%d)", streamID, expectedIncrement),
+               }
+
+               return &spec.TestError{
+                       Expected: expected,
+                       Actual:   actualStr,
+               }
+       }
+       return nil
+}
+
+func FlowControl() *spec.TestGroup {
+       tg := NewTestGroup("1", "Flow control")
+       tg.AddTestCase(&spec.TestCase{
+               Desc:        "Sends a WINDOW_UPDATE frame on connection",
+               Requirement: "The endpoint MUST NOT send a flow-controlled frame with a length that exceeds the space available.",
+               Run: func(c *config.Config, conn *spec.Conn) error {
+                       var streamID uint32 = 1
+
+                       // turn off automatic connection window update
+                       conn.WindowUpdate = false
+
+                       err := conn.Handshake()
+                       if err != nil {
+                               return err
+                       }
+
+                       headers := spec.CommonHeaders(c)
+                       headers[2].Value = "/4kB"
+
+                       // consume most of the connection window
+                       for i := 0; i <= 14; i++ {
+                               hp := http2.HeadersFrameParam{
+                                       StreamID:      streamID,
+                                       EndStream:     true,
+                                       EndHeaders:    true,
+                                       BlockFragment: conn.EncodeHeaders(headers),
+                               }
+                               conn.WriteHeaders(hp)
+                               streamID += 2
+                               err := spec.VerifyEventType(conn, spec.EventDataFrame)
+                               if err != nil {
+                                       return err
+                               }
+                       }
+
+                       hp := http2.HeadersFrameParam{
+                               StreamID:      streamID,
+                               EndStream:     true,
+                               EndHeaders:    true,
+                               BlockFragment: conn.EncodeHeaders(headers),
+                       }
+                       conn.WriteHeaders(hp)
+                       // verify reception of DATA frame
+                       err = spec.VerifyEventType(conn, spec.EventDataFrame)
+                       if err != nil {
+                               return err
+                       }
+
+                       // increment connection window
+                       conn.WriteWindowUpdate(0, 65535)
+
+                       // wait for DATA frame with rest of the content
+                       actual, passed := conn.WaitEventByType(spec.EventDataFrame)
+                       switch event := actual.(type) {
+                       case spec.DataFrameEvent:
+                               passed = event.Header().Length == 1
+                       default:
+                               passed = false
+                       }
+
+                       if !passed {
+                               expected := []string{
+                                       fmt.Sprintf("DATA Frame (length:1, flags:0x00, stream_id:%d)", streamID),
+                               }
+
+                               return &spec.TestError{
+                                       Expected: expected,
+                                       Actual:   actual.String(),
+                               }
+                       }
+
+                       return nil
+               },
+       })
+
+       tg.AddTestCase(&spec.TestCase{
+               Desc:        "Receive a WINDOW_UPDATE frame on stream",
+               Requirement: "The receiver of a frame sends a WINDOW_UPDATE frame as it consumes data and frees up space in flow-control windows.",
+               Run: func(c *config.Config, conn *spec.Conn) error {
+                       var streamID uint32 = 1
+
+                       err := conn.Handshake()
+                       if err != nil {
+                               return err
+                       }
+
+                       headers := spec.CommonHeaders(c)
+                       headers[0].Value = "POST"
+                       headers = append(headers, spec.HeaderField("content-length", "12"))
+                       hp := http2.HeadersFrameParam{
+                               StreamID:      streamID,
+                               EndStream:     false,
+                               EndHeaders:    true,
+                               BlockFragment: conn.EncodeHeaders(headers),
+                       }
+                       conn.WriteHeaders(hp)
+                       // we send window update on stream when app read data from rx fifo, so send DATA frame and wait for WINDOW_UPDATE frame
+                       conn.WriteData(streamID, false, []byte("AAAA"))
+                       err = VerifyWindowUpdate(conn, streamID, 4)
+                       if err != nil {
+                               return err
+                       }
+                       // test it again
+                       conn.WriteData(streamID, false, []byte("BBBBB"))
+                       err = VerifyWindowUpdate(conn, streamID, 5)
+                       if err != nil {
+                               return err
+                       }
+                       // we don't send stream window update if stream is half-closed, so HEADERS frame should be received
+                       conn.WriteData(streamID, true, []byte("CCC"))
+                       return spec.VerifyHeadersFrame(conn, streamID)
+               },
+       })
+       return tg
+}
index 5fa3219..1be1455 100644 (file)
@@ -9,7 +9,7 @@ import (
 )
 
 func init() {
-       RegisterH2Tests(Http2TcpGetTest)
+       RegisterH2Tests(Http2TcpGetTest, Http2TcpPostTest)
 }
 
 func Http2TcpGetTest(s *H2Suite) {
@@ -48,3 +48,13 @@ func Http2TcpGetTest(s *H2Suite) {
        o := vpp.Vppctl("show session verbose proto http")
        s.AssertNotContains(o, "LISTEN")
 }
+
+func Http2TcpPostTest(s *H2Suite) {
+       vpp := s.Containers.Vpp.VppInstance
+       serverAddress := s.VppAddr()
+       s.Log(vpp.Vppctl("http static server uri tcp://" + serverAddress + "/80 url-handlers max-body-size 20m rx-buff-thresh 20m fifo-size 65k debug 2"))
+       s.Log(vpp.Vppctl("test-url-handler enable"))
+       args := fmt.Sprintf("--max-time 10 --noproxy '*' --data-binary @%s --http2-prior-knowledge http://%s:80/test3", CurlContainerTestFile, serverAddress)
+       _, log := s.RunCurlContainer(s.Containers.Curl, args)
+       s.AssertContains(log, "HTTP/2 200")
+}
index 6e910b9..8f7426a 100644 (file)
@@ -11,6 +11,7 @@ import (
 
        "github.com/summerwind/h2spec/spec"
 
+       "fd.io/hs-test/h2spec_extras"
        . "github.com/onsi/ginkgo/v2"
        "github.com/summerwind/h2spec/config"
        "github.com/summerwind/h2spec/generic"
@@ -124,9 +125,8 @@ var genericTests = []h2specTest{
        {desc: "generic/3.5/1"},
        {desc: "generic/3.7/1"},
        {desc: "generic/3.8/1"},
-       // TODO: flow control
-       //{desc: "generic/3.9/1"},
-       //{desc: "generic/3.9/2"},
+       {desc: "generic/3.9/1"},
+       {desc: "generic/3.9/2"},
        // TODO: CONTINUATION
        //{desc: "generic/3.10/1"},
        //{desc: "generic/3.10/2"},
@@ -180,12 +180,10 @@ var http2Tests = []h2specTest{
        {desc: "http2/4.3/3"},
        {desc: "http2/5.1.1/1"},
        {desc: "http2/5.1.1/2"},
-       // TODO: flow control
-       // {desc: "http2/5.1.2/1"},
+       {desc: "http2/5.1.2/1"},
        {desc: "http2/5.1/1"},
        {desc: "http2/5.1/2"},
-       // TODO: flow control
-       // {desc: "http2/5.1/3"},
+       {desc: "http2/5.1/3"},
        // TODO: CONTINUATION
        // {desc: "http2/5.1/4"},
        {desc: "http2/5.1/5"},
@@ -222,8 +220,7 @@ var http2Tests = []h2specTest{
        {desc: "http2/6.5.2/3"},
        {desc: "http2/6.5.2/4"},
        {desc: "http2/6.5.2/5"},
-       // TODO: flow control
-       // {desc: "http2/6.5.3/1"},
+       {desc: "http2/6.5.3/1"},
        {desc: "http2/6.5.3/2"},
        {desc: "http2/6.5/1"},
        {desc: "http2/6.5/2"},
@@ -233,16 +230,17 @@ var http2Tests = []h2specTest{
        {desc: "http2/6.7/3"},
        {desc: "http2/6.7/4"},
        {desc: "http2/6.8/1"},
-       // TODO: flow control
-       // {desc: "http2/6.9.1/1"},
-       // {desc: "http2/6.9.1/2"},
+       {desc: "http2/6.9.1/1"},
+       {desc: "http2/6.9.1/2"},
+       // TODO: message framing without content length using END_STREAM flag
        // {desc: "http2/6.9.1/3"},
-       // {desc: "http2/6.9.2/1"},
-       // {desc: "http2/6.9.2/2"},
-       // {desc: "http2/6.9.2/3"},
-       // {desc: "http2/6.9/1"},
+       {desc: "http2/6.9.2/1"},
+       {desc: "http2/6.9.2/2"},
+       {desc: "http2/6.9.2/3"},
+       {desc: "http2/6.9/1"},
+       // TODO: message framing without content length using END_STREAM flag
        // {desc: "http2/6.9/2"},
-       // {desc: "http2/6.9/3"},
+       {desc: "http2/6.9/3"},
        // TODO: CONTINUATION
        // {desc: "http2/6.10/1"},
        // {desc: "http2/6.10/2"},
@@ -273,10 +271,16 @@ var http2Tests = []h2specTest{
        {desc: "http2/8.2/1"},
 }
 
+var extrasTests = []h2specTest{
+       {desc: "extras/1/1"},
+       {desc: "extras/1/2"},
+}
+
 const (
        GenericTestGroup int = 1
        HpackTestGroup   int = 2
        Http2TestGroup   int = 3
+       ExtrasTestGroup  int = 4
 )
 
 var specs = []struct {
@@ -286,6 +290,7 @@ var specs = []struct {
        {GenericTestGroup, genericTests},
        {HpackTestGroup, hpackTests},
        {Http2TestGroup, http2Tests},
+       {ExtrasTestGroup, extrasTests},
 }
 
 // Marked as pending since http plugin is not build with http/2 enabled by default
@@ -341,6 +346,9 @@ var _ = Describe("H2SpecSuite", Pending, Ordered, ContinueOnFailure, func() {
                                case Http2TestGroup:
                                        tg = http2.Spec()
                                        break
+                               case ExtrasTestGroup:
+                                       tg = h2spec_extras.Spec()
+                                       break
                                }
                                tg.Test(conf)
 
index 4c324d5..5403be7 100644 (file)
@@ -52,14 +52,15 @@ VLIB_REGISTER_NODE (test_builtins_timer_process_node) = {
 };
 
 static void
-send_data_to_hss (hss_session_handle_t sh, u8 *data, u8 free_vec_data)
+send_data_to_hss (hss_session_handle_t sh, u8 *data, uword data_len,
+                 u8 free_vec_data)
 {
   tb_main_t *tbm = &tb_main;
   hss_url_handler_args_t args = {};
 
   args.sh = sh;
   args.data = data;
-  args.data_len = vec_len (data);
+  args.data_len = data_len;
   args.ct = HTTP_CONTENT_TEXT_PLAIN;
   args.sc = HTTP_STATUS_OK;
   args.free_vec_data = free_vec_data;
@@ -74,7 +75,7 @@ handle_get_test1 (hss_url_handler_args_t *args)
 
   clib_warning ("get request on test1");
   data = format (0, "hello");
-  send_data_to_hss (args->sh, data, 1);
+  send_data_to_hss (args->sh, data, vec_len (data), 1);
 
   return HSS_URL_HANDLER_ASYNC;
 }
@@ -86,7 +87,7 @@ handle_get_test2 (hss_url_handler_args_t *args)
 
   clib_warning ("get request on test2");
   data = format (0, "some data");
-  send_data_to_hss (args->sh, data, 1);
+  send_data_to_hss (args->sh, data, vec_len (data), 1);
 
   return HSS_URL_HANDLER_ASYNC;
 }
@@ -106,7 +107,7 @@ delayed_resp_cb (u32 *expired_timers)
       e = pool_elt_at_index (tbm->delayed_resps, pool_index);
       clib_warning ("sending delayed data");
       data = format (0, "delayed data");
-      send_data_to_hss (e->sh, data, 1);
+      send_data_to_hss (e->sh, data, vec_len (data), 1);
       pool_put (tbm->delayed_resps, e);
     }
 }
@@ -129,7 +130,7 @@ handle_get_test_delayed (hss_url_handler_args_t *args)
 static hss_url_handler_rc_t
 handle_post_test3 (hss_url_handler_args_t *args)
 {
-  send_data_to_hss (args->sh, 0, 0);
+  send_data_to_hss (args->sh, 0, 0, 0);
   return HSS_URL_HANDLER_ASYNC;
 }
 
@@ -137,7 +138,15 @@ static hss_url_handler_rc_t
 handle_get_64bytes (hss_url_handler_args_t *args)
 {
   tb_main_t *tbm = &tb_main;
-  send_data_to_hss (args->sh, tbm->test_data, 0);
+  send_data_to_hss (args->sh, tbm->test_data, 64, 0);
+  return HSS_URL_HANDLER_ASYNC;
+}
+
+static hss_url_handler_rc_t
+handle_get_4kbytes (hss_url_handler_args_t *args)
+{
+  tb_main_t *tbm = &tb_main;
+  send_data_to_hss (args->sh, tbm->test_data, 4 << 10, 0);
   return HSS_URL_HANDLER_ASYNC;
 }
 
@@ -157,8 +166,8 @@ test_builtins_init (vlib_main_t *vm)
       return;
     }
 
-  tbm->test_data = format (
-    0, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
+  /* init test data, big buffer */
+  vec_validate_init_empty (tbm->test_data, (4 << 10) - 1, 'x');
 
   (*fp) (handle_get_test1, "test1", HTTP_REQ_GET);
   (*fp) (handle_get_test1, "test1", HTTP_REQ_POST);
@@ -166,6 +175,7 @@ test_builtins_init (vlib_main_t *vm)
   (*fp) (handle_get_test_delayed, "test_delayed", HTTP_REQ_GET);
   (*fp) (handle_post_test3, "test3", HTTP_REQ_POST);
   (*fp) (handle_get_64bytes, "64B", HTTP_REQ_GET);
+  (*fp) (handle_get_4kbytes, "4kB", HTTP_REQ_GET);
 
   tbm->send_data =
     vlib_get_plugin_symbol ("http_static_plugin.so", "hss_session_send_data");
index c7eefcd..94914aa 100644 (file)
@@ -904,6 +904,7 @@ http_start_listen (u32 app_listener_index, transport_endpoint_cfg_t *tep)
   http_conn_t *lhc;
   u32 lhc_index;
   transport_endpt_ext_cfg_t *ext_cfg;
+  segment_manager_props_t *props;
 
   sep = (session_endpoint_cfg_t *) tep;
 
@@ -953,6 +954,9 @@ http_start_listen (u32 app_listener_index, transport_endpoint_cfg_t *tep)
 
   lhc->flags |= HTTP_CONN_F_IS_SERVER;
 
+  props = application_segment_manager_properties (app);
+  lhc->app_rx_fifo_size = props->rx_fifo_size;
+
   if (vec_len (app->name))
     lhc->app_name = vec_dup (app->name);
   else
index c9c1931..580ffff 100644 (file)
@@ -142,7 +142,7 @@ http2_frame_read_window_update (u32 *increment, u8 *payload, u32 payload_len)
 
   value = (u32 *) payload;
 
-  if (value == 0)
+  if (*value == 0)
     return HTTP2_ERROR_PROTOCOL_ERROR;
 
   *increment = clib_net_to_host_u32 (*value) & 0x7FFFFFFF;
index 67db185..3c6949c 100644 (file)
@@ -2,6 +2,7 @@
  * Copyright(c) 2025 Cisco Systems, Inc.
  */
 
+#include <vppinfra/llist.h>
 #include <http/http2/hpack.h>
 #include <http/http2/frame.h>
 #include <http/http_private.h>
 #define HTTP_2_ENABLE 0
 #endif
 
+#define HTTP2_WIN_SIZE_MAX     0x7FFFFFFF
+#define HTTP2_INITIAL_WIN_SIZE 65535
+/* connection-level flow control window kind of mirrors TCP flow control */
+/* TODO: configurable? */
+#define HTTP2_CONNECTION_WINDOW_SIZE (10 << 20)
+
 #define foreach_http2_stream_state                                            \
   _ (IDLE, "IDLE")                                                            \
   _ (OPEN, "OPEN")                                                            \
@@ -24,7 +31,9 @@ typedef enum http2_stream_state_
 #undef _
 } http2_stream_state_t;
 
-#define foreach_http2_req_flags _ (APP_CLOSED, "app-closed")
+#define foreach_http2_req_flags                                               \
+  _ (APP_CLOSED, "app-closed")                                                \
+  _ (NEED_WINDOW_UPDATE, "need-window-update")
 
 typedef enum http2_req_flags_bit_
 {
@@ -46,9 +55,11 @@ typedef struct http2_req_
   http2_stream_state_t stream_state;
   u8 flags;
   u32 stream_id;
-  u64 peer_window;
+  i32 peer_window; /* can become negative after settings change */
+  u32 our_window;
   u8 *payload;
   u32 payload_len;
+  clib_llist_anchor_t resume_list;
 } http2_req_t;
 
 #define foreach_http2_conn_flags                                              \
@@ -76,8 +87,11 @@ typedef struct http2_conn_ctx_
   u8 flags;
   u32 last_opened_stream_id;
   u32 last_processed_stream_id;
-  u64 peer_window;
+  u32 peer_window;
+  u32 our_window;
   uword *req_by_stream_id;
+  clib_llist_index_t streams_to_resume;
+  http2_conn_settings_t settings;
 } http2_conn_ctx_t;
 
 typedef struct http2_main_
@@ -99,8 +113,15 @@ http2_conn_ctx_alloc_w_thread (http_conn_t *hc)
                         CLIB_CACHE_LINE_BYTES);
   clib_memset (h2c, 0, sizeof (*h2c));
   h2c->peer_settings = http2_default_conn_settings;
-  h2c->peer_window = h2c->peer_settings.initial_window_size;
+  h2c->peer_window = HTTP2_INITIAL_WIN_SIZE;
+  h2c->our_window = HTTP2_CONNECTION_WINDOW_SIZE;
+  h2c->settings = h2m->settings;
+  /* adjust settings according to app rx_fifo size */
+  h2c->settings.initial_window_size =
+    clib_min (h2c->settings.initial_window_size, hc->app_rx_fifo_size);
   h2c->req_by_stream_id = hash_create (0, sizeof (uword));
+  h2c->streams_to_resume =
+    clib_llist_make_head (h2m->req_pool[hc->c_thread_index], resume_list);
   hc->opaque =
     uword_to_pointer (h2c - h2m->conn_pool[hc->c_thread_index], void *);
   HTTP_DBG (1, "h2c [%u]%x", hc->c_thread_index,
@@ -154,10 +175,13 @@ http2_conn_alloc_req (http_conn_t *hc, u32 stream_id)
   req->base.c_thread_index = hc->c_thread_index;
   req->stream_id = stream_id;
   req->stream_state = HTTP2_STREAM_STATE_IDLE;
+  req->resume_list.next = CLIB_LLIST_INVALID_INDEX;
+  req->resume_list.prev = CLIB_LLIST_INVALID_INDEX;
   h2c = http2_conn_ctx_get_w_thread (hc);
   HTTP_DBG (1, "h2c [%u]%x req_index %x stream_id %u", hc->c_thread_index,
            h2c - h2m->conn_pool[hc->c_thread_index], req_index, stream_id);
   req->peer_window = h2c->peer_settings.initial_window_size;
+  req->our_window = h2c->settings.initial_window_size;
   hash_set (h2c->req_by_stream_id, stream_id, req_index);
   return req;
 }
@@ -172,6 +196,8 @@ http2_conn_free_req (http2_conn_ctx_t *h2c, http2_req_t *req,
            h2c - h2m->conn_pool[thread_index],
            ((http_req_handle_t) req->base.hr_req_handle).req_index,
            req->stream_id);
+  if (clib_llist_elt_is_linked (req, resume_list))
+    clib_llist_remove (h2m->req_pool[thread_index], resume_list, req);
   vec_free (req->base.headers);
   vec_free (req->base.target);
   http_buffer_free (&req->base.tx_buf);
@@ -210,6 +236,56 @@ http2_req_get (u32 req_index, clib_thread_index_t thread_index)
   return pool_elt_at_index (h2m->req_pool[thread_index], req_index);
 }
 
+always_inline int
+http2_req_update_peer_window (http2_req_t *req, i64 delta)
+{
+  i64 new_value;
+
+  new_value = (i64) req->peer_window + delta;
+  if (new_value > HTTP2_WIN_SIZE_MAX)
+    return -1;
+  req->peer_window = (i32) new_value;
+  HTTP_DBG (1, "new window size %d", req->peer_window);
+  return 0;
+}
+
+always_inline void
+http2_req_add_to_resume_list (http2_conn_ctx_t *h2c, http2_req_t *req)
+{
+  http2_main_t *h2m = &http2_main;
+  http2_req_t *he;
+
+  req->flags &= ~HTTP2_REQ_F_NEED_WINDOW_UPDATE;
+  he = clib_llist_elt (h2m->req_pool[req->base.c_thread_index],
+                      h2c->streams_to_resume);
+  clib_llist_add_tail (h2m->req_pool[req->base.c_thread_index], resume_list,
+                      req, he);
+}
+
+always_inline void
+http2_resume_list_process (http_conn_t *hc)
+{
+  http2_main_t *h2m = &http2_main;
+  http2_req_t *he, *req;
+  http2_conn_ctx_t *h2c;
+
+  h2c = http2_conn_ctx_get_w_thread (hc);
+  he =
+    clib_llist_elt (h2m->req_pool[hc->c_thread_index], h2c->streams_to_resume);
+
+  /* check if something in list and reschedule first app session from list if
+   * we have some space in connection window */
+  if (h2c->peer_window > 0 &&
+      !clib_llist_is_empty (h2m->req_pool[hc->c_thread_index], resume_list,
+                           he))
+    {
+      req =
+       clib_llist_next (h2m->req_pool[hc->c_thread_index], resume_list, he);
+      clib_llist_remove (h2m->req_pool[hc->c_thread_index], resume_list, req);
+      transport_connection_reschedule (&req->base.connection);
+    }
+}
+
 /* send GOAWAY frame and close TCP connection */
 always_inline void
 http2_connection_error (http_conn_t *hc, http2_error_t error,
@@ -286,23 +362,26 @@ always_inline void
 http2_send_server_preface (http_conn_t *hc)
 {
   u8 *response;
-  http2_main_t *h2m = &http2_main;
   http2_settings_entry_t *setting, *settings_list = 0;
+  http2_conn_ctx_t *h2c = http2_conn_ctx_get_w_thread (hc);
 
 #define _(v, label, member, min, max, default_value, err_code)                \
-  if (h2m->settings.member != default_value)                                  \
+  if (h2c->settings.member != default_value)                                  \
     {                                                                         \
       vec_add2 (settings_list, setting, 1);                                   \
       setting->identifier = HTTP2_SETTINGS_##label;                           \
-      setting->value = h2m->settings.member;                                  \
+      setting->value = h2c->settings.member;                                  \
     }
   foreach_http2_settings
 #undef _
 
     response = http_get_tx_buf (hc);
   http2_frame_write_settings (settings_list, &response);
+  /* send also connection window update */
+  http2_frame_write_window_update (h2c->our_window - HTTP2_INITIAL_WIN_SIZE, 0,
+                                  &response);
   http_io_ts_write (hc, response, vec_len (response), 0);
-  http_io_ts_after_write (hc, 0);
+  http_io_ts_after_write (hc, 1);
 }
 
 /*************************************/
@@ -389,6 +468,7 @@ http2_req_state_wait_transport_method (http_conn_t *hc, http2_req_t *req,
          return HTTP_SM_STOP;
        }
       new_state = HTTP_REQ_STATE_TRANSPORT_IO_MORE_DATA;
+      http_io_as_add_want_read_ntf (&req->base);
     }
   /* TODO: message framing without content length using END_STREAM flag */
   if (req->base.body_len == 0 && req->stream_state == HTTP2_STREAM_STATE_OPEN)
@@ -444,6 +524,8 @@ http2_req_state_transport_io_more_data (http_conn_t *hc, http2_req_t *req,
                                        transport_send_params_t *sp,
                                        http2_error_t *error)
 {
+  u32 max_enq;
+
   if (req->payload_len > req->base.to_recv)
     {
       HTTP_DBG (1, "received more data than expected");
@@ -458,6 +540,13 @@ http2_req_state_transport_io_more_data (http_conn_t *hc, http2_req_t *req,
       http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
       return HTTP_SM_STOP;
     }
+  max_enq = http_io_as_max_write (&req->base);
+  if (max_enq < req->payload_len)
+    {
+      clib_warning ("app's rx fifo full");
+      http2_stream_error (hc, req, HTTP2_ERROR_INTERNAL_ERROR, sp);
+      return HTTP_SM_STOP;
+    }
   if (req->base.to_recv == 0)
     http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_APP_REPLY);
   http_io_as_write (&req->base, req->payload, req->payload_len);
@@ -482,6 +571,7 @@ http2_req_state_wait_app_reply (http_conn_t *hc, http2_req_t *req,
   u8 flags = HTTP2_FRAME_FLAG_END_HEADERS;
   http_sm_result_t sm_result = HTTP_SM_ERROR;
   u32 n_written;
+  http2_conn_ctx_t *h2c;
 
   http_get_app_msg (&req->base, &msg);
   ASSERT (msg.type == HTTP_MSG_REPLY);
@@ -503,6 +593,15 @@ http2_req_state_wait_app_reply (http_conn_t *hc, http2_req_t *req,
                            &response);
   vec_free (date);
 
+  h2c = http2_conn_ctx_get_w_thread (hc);
+  if (vec_len (response) > h2c->peer_settings.max_frame_size)
+    {
+      /* TODO: CONTINUATION (headers fragmentation) */
+      clib_warning ("resp headers greater than SETTINGS_MAX_FRAME_SIZE");
+      *error = HTTP2_ERROR_INTERNAL_ERROR;
+      return HTTP_SM_ERROR;
+    }
+
   if (msg.data.body_len)
     {
       /* start sending the actual data */
@@ -539,18 +638,42 @@ http2_req_state_app_io_more_data (http_conn_t *hc, http2_req_t *req,
   http_buffer_t *hb = &req->base.tx_buf;
   u8 fh[HTTP2_FRAME_HEADER_SIZE];
   u8 finished = 0, flags = 0;
+  http2_conn_ctx_t *h2c;
 
   ASSERT (http_buffer_bytes_left (hb) > 0);
+
+  if (req->peer_window <= 0)
+    {
+      HTTP_DBG (1, "stream window is full");
+      /* mark that we need window update on stream */
+      req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE;
+      http_req_deschedule (&req->base, sp);
+      return HTTP_SM_STOP;
+    }
+  h2c = http2_conn_ctx_get_w_thread (hc);
+  if (h2c->peer_window == 0)
+    {
+      HTTP_DBG (1, "connection window is full");
+      /* add to waiting queue */
+      http2_req_add_to_resume_list (h2c, req);
+      http_req_deschedule (&req->base, sp);
+      return HTTP_SM_STOP;
+    }
+
   max_write = http_io_ts_max_write (hc, sp);
   if (max_write <= HTTP2_FRAME_HEADER_SIZE)
     {
       HTTP_DBG (1, "ts tx fifo full");
       goto check_fifo;
     }
+  max_write -= HTTP2_FRAME_HEADER_SIZE;
+  max_write = clib_min (max_write, (u32) req->peer_window);
+  max_write = clib_min (max_write, h2c->peer_window);
+  max_write = clib_min (max_write, h2c->peer_settings.max_frame_size);
+
   max_read = http_buffer_bytes_left (hb);
 
-  n_read = http_buffer_get_segs (hb, max_write - HTTP2_FRAME_HEADER_SIZE,
-                                &app_segs, &n_segs);
+  n_read = http_buffer_get_segs (hb, max_write, &app_segs, &n_segs);
   if (n_read == 0)
     {
       HTTP_DBG (1, "no data to deq");
@@ -569,6 +692,8 @@ http2_req_state_app_io_more_data (http_conn_t *hc, http2_req_t *req,
   ASSERT (n_written == (HTTP2_FRAME_HEADER_SIZE + n_read));
   vec_free (segs);
   http_buffer_drain (hb, n_read);
+  req->peer_window -= n_read;
+  h2c->peer_window -= n_read;
 
   if (finished)
     {
@@ -668,7 +793,6 @@ http2_req_run_state_machine (http_conn_t *hc, http2_req_t *req,
 static http2_error_t
 http2_handle_headers_frame (http_conn_t *hc, http2_frame_header_t *fh)
 {
-  http2_main_t *h2m = &http2_main;
   http2_req_t *req;
   u8 *rx_buf;
   http2_error_t rv;
@@ -697,7 +821,7 @@ http2_handle_headers_frame (http_conn_t *hc, http2_frame_header_t *fh)
        }
       h2c->last_opened_stream_id = fh->stream_id;
       if (hash_elts (h2c->req_by_stream_id) ==
-         h2m->settings.max_concurrent_streams)
+         h2c->settings.max_concurrent_streams)
        {
          HTTP_DBG (1, "SETTINGS_MAX_CONCURRENT_STREAMS exceeded");
          http_io_ts_drain (hc, fh->length);
@@ -748,6 +872,8 @@ http2_handle_data_frame (http_conn_t *hc, http2_frame_header_t *fh)
   http2_conn_ctx_t *h2c;
 
   req = http2_conn_get_req (hc, fh->stream_id);
+  h2c = http2_conn_ctx_get_w_thread (hc);
+
   if (!req)
     {
       if (fh->stream_id == 0)
@@ -755,7 +881,6 @@ http2_handle_data_frame (http_conn_t *hc, http2_frame_header_t *fh)
          HTTP_DBG (1, "DATA frame with stream id 0");
          return HTTP2_ERROR_PROTOCOL_ERROR;
        }
-      h2c = http2_conn_ctx_get_w_thread (hc);
       if (fh->stream_id <= h2c->last_opened_stream_id)
        {
          HTTP_DBG (1, "stream closed, ignoring frame");
@@ -776,6 +901,18 @@ http2_handle_data_frame (http_conn_t *hc, http2_frame_header_t *fh)
       return HTTP2_ERROR_NO_ERROR;
     }
 
+  if (fh->length > req->our_window)
+    {
+      HTTP_DBG (1, "error: peer violated stream flow control");
+      http2_stream_error (hc, req, HTTP2_ERROR_FLOW_CONTROL_ERROR, 0);
+      return HTTP2_ERROR_NO_ERROR;
+    }
+  if (fh->length > h2c->our_window)
+    {
+      HTTP_DBG (1, "error: peer violated connection flow control");
+      return HTTP2_ERROR_FLOW_CONTROL_ERROR;
+    }
+
   if (fh->flags & HTTP2_FRAME_FLAG_END_STREAM)
     req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
 
@@ -788,6 +925,9 @@ http2_handle_data_frame (http_conn_t *hc, http2_frame_header_t *fh)
   if (rv != HTTP2_ERROR_NO_ERROR)
     return rv;
 
+  req->our_window -= fh->length;
+  h2c->our_window -= fh->length;
+
   HTTP_DBG (1, "run state machine");
   return http2_req_run_state_machine (hc, req, 0, 0);
 }
@@ -798,6 +938,10 @@ http2_handle_window_update_frame (http_conn_t *hc, http2_frame_header_t *fh)
   u8 *rx_buf;
   u32 win_increment;
   http2_error_t rv;
+  http2_req_t *req;
+  http2_conn_ctx_t *h2c;
+
+  h2c = http2_conn_ctx_get_w_thread (hc);
 
   rx_buf = http_get_rx_buf (hc);
   vec_validate (rx_buf, fh->length - 1);
@@ -805,9 +949,57 @@ http2_handle_window_update_frame (http_conn_t *hc, http2_frame_header_t *fh)
 
   rv = http2_frame_read_window_update (&win_increment, rx_buf, fh->length);
   if (rv != HTTP2_ERROR_NO_ERROR)
-    return rv;
+    {
+      HTTP_DBG (1, "invalid WINDOW_UPDATE frame (stream id %u)",
+               fh->stream_id);
+      /* error on the connection flow-control window is connection error */
+      if (fh->stream_id == 0)
+       return rv;
+      /* otherwise it is stream error */
+      req = http2_conn_get_req (hc, fh->stream_id);
+      if (!req)
+       http2_send_stream_error (hc, fh->stream_id, rv, 0);
+      else
+       http2_stream_error (hc, req, rv, 0);
+      return HTTP2_ERROR_NO_ERROR;
+    }
+
+  HTTP_DBG (1, "WINDOW_UPDATE %u (stream id %u)", win_increment,
+           fh->stream_id);
+  if (fh->stream_id == 0)
+    {
+      if (win_increment > (HTTP2_WIN_SIZE_MAX - h2c->peer_window))
+       return HTTP2_ERROR_FLOW_CONTROL_ERROR;
+      h2c->peer_window += win_increment;
+    }
+  else
+    {
+      req = http2_conn_get_req (hc, fh->stream_id);
+      if (!req)
+       {
+         if (fh->stream_id > h2c->last_opened_stream_id)
+           {
+             HTTP_DBG (
+               1,
+               "received WINDOW_UPDATE frame on idle stream (stream id %u)",
+               fh->stream_id);
+             return HTTP2_ERROR_PROTOCOL_ERROR;
+           }
+         /* ignore window update on closed stream */
+         return HTTP2_ERROR_NO_ERROR;
+       }
+      if (req->stream_state != HTTP2_STREAM_STATE_CLOSED)
+       {
+         if (http2_req_update_peer_window (req, win_increment))
+           {
+             http2_stream_error (hc, req, HTTP2_ERROR_FLOW_CONTROL_ERROR, 0);
+             return HTTP2_ERROR_NO_ERROR;
+           }
+         if (req->flags & HTTP2_REQ_F_NEED_WINDOW_UPDATE)
+           http2_req_add_to_resume_list (h2c, req);
+       }
+    }
 
-  /* TODO: flow control */
   return HTTP2_ERROR_NO_ERROR;
 }
 
@@ -818,6 +1010,9 @@ http2_handle_settings_frame (http_conn_t *hc, http2_frame_header_t *fh)
   http2_error_t rv;
   http2_conn_settings_t new_settings;
   http2_conn_ctx_t *h2c;
+  http2_req_t *req;
+  u32 stream_id, req_index;
+  i32 win_size_delta;
 
   if (fh->stream_id != 0)
     return HTTP2_ERROR_PROTOCOL_ERROR;
@@ -842,13 +1037,34 @@ http2_handle_settings_frame (http_conn_t *hc, http2_frame_header_t *fh)
       rv = http2_frame_read_settings (&new_settings, rx_buf, fh->length);
       if (rv != HTTP2_ERROR_NO_ERROR)
        return rv;
-      h2c->peer_settings = new_settings;
 
       /* ACK peer settings */
       http2_frame_write_settings_ack (&resp);
       http_io_ts_write (hc, resp, vec_len (resp), 0);
       vec_free (resp);
       http_io_ts_after_write (hc, 0);
+
+      /* change of SETTINGS_INITIAL_WINDOW_SIZE, we must adjust the size of all
+       * stream flow-control windows */
+      if (h2c->peer_settings.initial_window_size !=
+         new_settings.initial_window_size)
+       {
+         win_size_delta = (i32) new_settings.initial_window_size -
+                          (i32) h2c->peer_settings.initial_window_size;
+         hash_foreach (
+           stream_id, req_index, h2c->req_by_stream_id, ({
+             req = http2_req_get (req_index, hc->c_thread_index);
+             if (req->stream_state != HTTP2_STREAM_STATE_CLOSED)
+               {
+                 if (http2_req_update_peer_window (req, win_size_delta))
+                   http2_stream_error (hc, req,
+                                       HTTP2_ERROR_FLOW_CONTROL_ERROR, 0);
+                 if (req->flags & HTTP2_REQ_F_NEED_WINDOW_UPDATE)
+                   http2_req_add_to_resume_list (h2c, req);
+               }
+           }));
+       }
+      h2c->peer_settings = new_settings;
     }
 
   return HTTP2_ERROR_NO_ERROR;
@@ -915,6 +1131,8 @@ http2_handle_goaway_frame (http_conn_t *hc, http2_frame_header_t *fh)
   if (rv != HTTP2_ERROR_NO_ERROR)
     return rv;
 
+  HTTP_DBG (1, "received GOAWAY %U", format_http2_error, error_code);
+
   if (error_code == HTTP2_ERROR_NO_ERROR)
     {
       /* TODO: graceful shutdown (no new streams) */
@@ -1094,6 +1312,9 @@ http2_app_tx_callback (http_conn_t *hc, u32 req_index,
       return;
     }
 
+  /* maybe we can continue sending data on some stream */
+  http2_resume_list_process (hc);
+
   /* reset http connection expiration timer */
   http_conn_timer_update (hc);
 }
@@ -1103,6 +1324,30 @@ http2_app_rx_evt_callback (http_conn_t *hc, u32 req_index,
                           clib_thread_index_t thread_index)
 {
   /* TODO: continue tunnel RX */
+  http2_req_t *req;
+  u8 *response;
+  u32 increment;
+
+  req = http2_req_get (req_index, thread_index);
+  if (!req)
+    {
+      HTTP_DBG (1, "req already deleted");
+      return;
+    }
+  HTTP_DBG (1, "received app read notification stream id %u", req->stream_id);
+  /* send stream window update if app read data in rx fifo and we expect more
+   * data (stream is still open) */
+  if (req->stream_state == HTTP2_STREAM_STATE_OPEN)
+    {
+      http_io_as_reset_has_read_ntf (&req->base);
+      response = http_get_tx_buf (hc);
+      increment = http_io_as_max_write (&req->base) - req->our_window;
+      HTTP_DBG (1, "stream window increment %u", increment);
+      req->our_window += increment;
+      http2_frame_write_window_update (increment, req->stream_id, &response);
+      http_io_ts_write (hc, response, vec_len (response), 0);
+      http_io_ts_after_write (hc, 0);
+    }
 }
 
 static void
@@ -1156,7 +1401,6 @@ http2_transport_connected_callback (http_conn_t *hc)
 static void
 http2_transport_rx_callback (http_conn_t *hc)
 {
-  http2_main_t *h2m = &http2_main;
   http2_frame_header_t fh;
   u32 to_deq;
   u8 *rx_buf;
@@ -1208,10 +1452,10 @@ http2_transport_rx_callback (http_conn_t *hc)
       http_io_ts_read (hc, rx_buf, HTTP2_FRAME_HEADER_SIZE, 1);
       to_deq -= HTTP2_FRAME_HEADER_SIZE;
       http2_frame_header_read (rx_buf, &fh);
-      if (fh.length > h2m->settings.max_frame_size)
+      if (fh.length > h2c->settings.max_frame_size)
        {
          HTTP_DBG (1, "frame length %lu exceeded SETTINGS_MAX_FRAME_SIZE %lu",
-                   fh.length, h2m->settings.max_frame_size);
+                   fh.length, h2c->settings.max_frame_size);
          http2_connection_error (hc, HTTP2_ERROR_FRAME_SIZE_ERROR, 0);
          return;
        }
@@ -1278,6 +1522,21 @@ http2_transport_rx_callback (http_conn_t *hc)
        }
     }
 
+  /* send connection window update if more than half consumed */
+  if (h2c->our_window < HTTP2_CONNECTION_WINDOW_SIZE / 2)
+    {
+      HTTP_DBG (1, "connection window increment %u",
+               HTTP2_CONNECTION_WINDOW_SIZE - h2c->our_window);
+      u8 *response = http_get_tx_buf (hc);
+      http2_frame_write_window_update (
+       HTTP2_CONNECTION_WINDOW_SIZE - h2c->our_window, 0, &response);
+      http_io_ts_write (hc, response, vec_len (response), 0);
+      http_io_ts_after_write (hc, 0);
+      h2c->our_window = HTTP2_CONNECTION_WINDOW_SIZE;
+    }
+  /* maybe we can continue sending data on some stream */
+  http2_resume_list_process (hc);
+
   /* reset http connection expiration timer */
   http_conn_timer_update (hc);
 }
index 4e799a5..4154a41 100644 (file)
@@ -231,6 +231,12 @@ Now we can start reading body content, following block of code could be executed
   u64 curr = vec_len (ctx->resp_body);
   rv = svm_fifo_dequeue (ts->rx_fifo, n_deq, ctx->resp_body + curr);
   ASSERT (rv == n_deq);
+  /* notify http transport that we read data if requested */
+  if (svm_fifo_needs_deq_ntf (ts->rx_fifo, n_deq))
+    {
+      svm_fifo_clear_deq_ntf (ts->rx_fifo);
+      session_program_transport_io_evt (ts->handle, SESSION_IO_EVT_RX);
+    }
   /* update length of the vector */
   vec_set_len (ctx->resp_body, curr + n_deq);
   /* update number of remaining bytes to receive */
@@ -242,6 +248,9 @@ Now we can start reading body content, following block of code could be executed
       /* send 200 OK response */
     }
 
+.. note::
+    When body content is read from the ``rx_fifo`` app need to send notification to HTTP layer if requested, it is required for HTTP/2 flow control.
+
 Sending data
 """"""""""""""
 
index 1f9812d..662be06 100644 (file)
@@ -219,6 +219,7 @@ typedef struct http_tc_
   http_conn_state_t state;
   u32 timer_handle;
   u32 timeout;
+  u32 app_rx_fifo_size;
   u8 *app_name;
   u8 *host;
   http_conn_flags_t flags;
@@ -563,6 +564,21 @@ http_io_as_add_want_deq_ntf (http_req_t *req)
   svm_fifo_add_want_deq_ntf (as->rx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
 }
 
+always_inline void
+http_io_as_add_want_read_ntf (http_req_t *req)
+{
+  session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+  svm_fifo_add_want_deq_ntf (as->rx_fifo, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL |
+                                           SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY);
+}
+
+always_inline void
+http_io_as_reset_has_read_ntf (http_req_t *req)
+{
+  session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+  svm_fifo_reset_has_deq_ntf (as->rx_fifo);
+}
+
 always_inline u32
 http_io_as_max_write (http_req_t *req)
 {