http: http/2 multiplexing 25/42925/9
authorMatus Fabian <[email protected]>
Mon, 5 May 2025 17:28:04 +0000 (13:28 -0400)
committerFlorin Coras <[email protected]>
Tue, 13 May 2025 16:36:55 +0000 (16:36 +0000)
schedule tx of streams, within one http connection, using
round-robin like algorithm with two queues, first is used for
new responses to send headers frame and second for data frames,
stream can send one frame each round

Type: feature

Change-Id: Id89ed08e845418151498c9afed0f88824ce6dacf
Signed-off-by: Matus Fabian <[email protected]>
extras/hs-test/http2_test.go
src/plugins/http/http.c
src/plugins/http/http2/http2.c
src/plugins/http/http_buffer.c
src/plugins/http/http_buffer.h
src/plugins/http/http_private.h

index f62d5e3..747f517 100644 (file)
@@ -9,7 +9,7 @@ import (
 )
 
 func init() {
-       RegisterH2Tests(Http2TcpGetTest, Http2TcpPostTest, Http2MultiplexingTest)
+       RegisterH2Tests(Http2TcpGetTest, Http2TcpPostTest, Http2MultiplexingTest, Http2MultiplexingMTTest)
 }
 
 func Http2TcpGetTest(s *H2Suite) {
@@ -64,7 +64,7 @@ func Http2MultiplexingTest(s *H2Suite) {
        serverAddress := s.VppAddr()
        vpp.Vppctl("http tps uri tcp://0.0.0.0/80 no-zc")
 
-       args := fmt.Sprintf("--log-file=%s -T10 -n20 -c1 -m100 http://%s:80/test_file_20M", s.H2loadLogFileName(s.Containers.H2load), serverAddress)
+       args := fmt.Sprintf("--log-file=%s -T10 -n21 -c1 -m100 http://%s:80/test_file_20M", s.H2loadLogFileName(s.Containers.H2load), serverAddress)
        s.Containers.H2load.ExtraRunningArgs = args
        s.Containers.H2load.Run()
 
@@ -76,3 +76,19 @@ func Http2MultiplexingTest(s *H2Suite) {
        s.AssertContains(o, " 0 errored")
        s.AssertContains(o, " 0 timeout")
 }
+
+func Http2MultiplexingMTTest(s *H2Suite) {
+       vpp := s.Containers.Vpp.VppInstance
+       serverAddress := s.VppAddr()
+       vpp.Vppctl("http tps uri tcp://0.0.0.0/80 no-zc")
+
+       args := fmt.Sprintf("-T10 -n100 -c4 -r1 -m10 http://%s:80/test_file_20M", serverAddress)
+       s.Containers.H2load.ExtraRunningArgs = args
+       s.Containers.H2load.Run()
+
+       o, _ := s.Containers.H2load.GetOutput()
+       s.Log(o)
+       s.AssertContains(o, " 0 failed")
+       s.AssertContains(o, " 0 errored")
+       s.AssertContains(o, " 0 timeout")
+}
index 94914aa..bf43ab0 100644 (file)
@@ -128,7 +128,7 @@ http_conn_alloc_w_thread (clib_thread_index_t thread_index)
   return (hc - wrk->conn_pool);
 }
 
-static inline http_conn_t *
+http_conn_t *
 http_conn_get_w_thread (u32 hc_index, clib_thread_index_t thread_index)
 {
   http_worker_t *wrk = http_worker_get (thread_index);
index 6c420c5..0097421 100644 (file)
@@ -59,12 +59,13 @@ typedef struct http2_req_
   u32 our_window;
   u8 *payload;
   u32 payload_len;
-  clib_llist_anchor_t resume_list;
+  clib_llist_anchor_t sched_list;
 } http2_req_t;
 
 #define foreach_http2_conn_flags                                              \
   _ (EXPECT_PREFACE, "expect-preface")                                        \
-  _ (PREFACE_VERIFIED, "preface-verified")
+  _ (PREFACE_VERIFIED, "preface-verified")                                    \
+  _ (TS_DESCHED, "ts-descheduled")
 
 typedef enum http2_conn_flags_bit_
 {
@@ -82,6 +83,7 @@ typedef enum http2_conn_flags_
 
 typedef struct http2_conn_ctx_
 {
+  u32 hc_index;
   http2_conn_settings_t peer_settings;
   hpack_dynamic_table_t decoder_dynamic_table;
   u8 flags;
@@ -90,28 +92,57 @@ typedef struct http2_conn_ctx_
   u32 peer_window;
   u32 our_window;
   uword *req_by_stream_id;
-  clib_llist_index_t streams_to_resume;
+  clib_llist_index_t new_tx_streams; /* headers */
+  clib_llist_index_t old_tx_streams; /* data */
   http2_conn_settings_t settings;
+  clib_llist_anchor_t sched_list;
 } http2_conn_ctx_t;
 
+typedef struct http2_worker_ctx_
+{
+  http2_conn_ctx_t *conn_pool;
+  http2_req_t *req_pool;
+  clib_llist_index_t sched_head;
+} http2_worker_ctx_t;
+
 typedef struct http2_main_
 {
-  http2_conn_ctx_t **conn_pool;
-  http2_req_t **req_pool;
+  http2_worker_ctx_t *wrk_ctx;
   http2_conn_settings_t settings;
+  u32 n_sessions;
 } http2_main_t;
 
+typedef enum
+{
+  HTTP2_SCHED_WEIGHT_DATA_PTR = 1,
+  HTTP2_SCHED_WEIGHT_DATA_INLINE = 2,
+  HTTP2_SCHED_WEIGHT_HEADERS_PTR = 3,
+  HTTP2_SCHED_WEIGHT_HEADERS_INLINE = 4,
+} http2_sched_weight_t;
+
+#define HTTP2_SCHED_MAX_EMISSIONS 32
+
 static http2_main_t http2_main;
 
-http2_conn_ctx_t *
+static_always_inline http2_worker_ctx_t *
+http2_get_worker (clib_thread_index_t thread_index)
+{
+  return &http2_main.wrk_ctx[thread_index];
+}
+
+static void http2_update_time_callback (f64 now, u8 thread_index);
+
+static inline http2_conn_ctx_t *
 http2_conn_ctx_alloc_w_thread (http_conn_t *hc)
 {
   http2_main_t *h2m = &http2_main;
   http2_conn_ctx_t *h2c;
+  http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
+  u32 cnt;
 
-  pool_get_aligned_safe (h2m->conn_pool[hc->c_thread_index], h2c,
-                        CLIB_CACHE_LINE_BYTES);
+  pool_get_aligned_safe (wrk->conn_pool, h2c, CLIB_CACHE_LINE_BYTES);
   clib_memset (h2c, 0, sizeof (*h2c));
+  h2c->hc_index = hc->hc_hc_index;
   h2c->peer_settings = http2_default_conn_settings;
   h2c->peer_window = HTTP2_INITIAL_WIN_SIZE;
   h2c->our_window = HTTP2_CONNECTION_WINDOW_SIZE;
@@ -120,54 +151,65 @@ http2_conn_ctx_alloc_w_thread (http_conn_t *hc)
   h2c->settings.initial_window_size =
     clib_min (h2c->settings.initial_window_size, hc->app_rx_fifo_size);
   h2c->req_by_stream_id = hash_create (0, sizeof (uword));
-  h2c->streams_to_resume =
-    clib_llist_make_head (h2m->req_pool[hc->c_thread_index], resume_list);
-  hc->opaque =
-    uword_to_pointer (h2c - h2m->conn_pool[hc->c_thread_index], void *);
-  HTTP_DBG (1, "h2c [%u]%x", hc->c_thread_index,
-           h2c - h2m->conn_pool[hc->c_thread_index]);
+  h2c->new_tx_streams = clib_llist_make_head (wrk->req_pool, sched_list);
+  h2c->old_tx_streams = clib_llist_make_head (wrk->req_pool, sched_list);
+  h2c->sched_list.next = CLIB_LLIST_INVALID_INDEX;
+  h2c->sched_list.prev = CLIB_LLIST_INVALID_INDEX;
+  hc->opaque = uword_to_pointer (h2c - wrk->conn_pool, void *);
+  cnt = clib_atomic_fetch_add_relax (&h2m->n_sessions, 1);
+  /* (re)start stream tx scheduler if this is first connection */
+  /* TODO: update session infra to do this on per thread basis */
+  if (cnt == 0)
+    session_register_update_time_fn (http2_update_time_callback, 1);
+  HTTP_DBG (1, "h2c [%u]%x", hc->c_thread_index, h2c - wrk->conn_pool);
   return h2c;
 }
 
 static inline http2_conn_ctx_t *
 http2_conn_ctx_get_w_thread (http_conn_t *hc)
 {
-  http2_main_t *h2m = &http2_main;
+  http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
   u32 h2c_index = pointer_to_uword (hc->opaque);
-  return pool_elt_at_index (h2m->conn_pool[hc->c_thread_index], h2c_index);
+  return pool_elt_at_index (wrk->conn_pool, h2c_index);
 }
 
 static inline void
 http2_conn_ctx_free (http_conn_t *hc)
 {
   http2_main_t *h2m = &http2_main;
+  http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
   http2_conn_ctx_t *h2c;
+  u32 cnt;
 
   h2c = http2_conn_ctx_get_w_thread (hc);
-  HTTP_DBG (1, "h2c [%u]%x", hc->c_thread_index,
-           h2c - h2m->conn_pool[hc->c_thread_index]);
+  HTTP_DBG (1, "h2c [%u]%x", hc->c_thread_index, h2c - wrk->conn_pool);
   hash_free (h2c->req_by_stream_id);
   if (hc->flags & HTTP_CONN_F_HAS_REQUEST)
     hpack_dynamic_table_free (&h2c->decoder_dynamic_table);
   if (CLIB_DEBUG)
     memset (h2c, 0xba, sizeof (*h2c));
-  pool_put (h2m->conn_pool[hc->c_thread_index], h2c);
+  pool_put (wrk->conn_pool, h2c);
+  cnt = clib_atomic_fetch_sub_relax (&h2m->n_sessions, 1);
+  ASSERT (cnt > 0);
+  /* stop stream tx scheduler if this was last active connection so we are not
+   * running empty */
+  if (cnt == 1)
+    session_register_update_time_fn (http2_update_time_callback, 0);
 }
 
 static inline http2_req_t *
 http2_conn_alloc_req (http_conn_t *hc, u32 stream_id)
 {
-  http2_main_t *h2m = &http2_main;
+  http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
   http2_conn_ctx_t *h2c;
   http2_req_t *req;
   u32 req_index;
   http_req_handle_t hr_handle;
 
-  pool_get_aligned_safe (h2m->req_pool[hc->c_thread_index], req,
-                        CLIB_CACHE_LINE_BYTES);
+  pool_get_aligned_safe (wrk->req_pool, req, CLIB_CACHE_LINE_BYTES);
   clib_memset (req, 0, sizeof (*req));
   req->base.hr_pa_session_handle = SESSION_INVALID_HANDLE;
-  req_index = req - h2m->req_pool[hc->c_thread_index];
+  req_index = req - wrk->req_pool;
   hr_handle.version = HTTP_VERSION_2;
   hr_handle.req_index = req_index;
   req->base.hr_req_handle = hr_handle.as_u32;
@@ -175,11 +217,11 @@ http2_conn_alloc_req (http_conn_t *hc, u32 stream_id)
   req->base.c_thread_index = hc->c_thread_index;
   req->stream_id = stream_id;
   req->stream_state = HTTP2_STREAM_STATE_IDLE;
-  req->resume_list.next = CLIB_LLIST_INVALID_INDEX;
-  req->resume_list.prev = CLIB_LLIST_INVALID_INDEX;
+  req->sched_list.next = CLIB_LLIST_INVALID_INDEX;
+  req->sched_list.prev = CLIB_LLIST_INVALID_INDEX;
   h2c = http2_conn_ctx_get_w_thread (hc);
   HTTP_DBG (1, "h2c [%u]%x req_index %x stream_id %u", hc->c_thread_index,
-           h2c - h2m->conn_pool[hc->c_thread_index], req_index, stream_id);
+           h2c - wrk->conn_pool, req_index, stream_id);
   req->peer_window = h2c->peer_settings.initial_window_size;
   req->our_window = h2c->settings.initial_window_size;
   hash_set (h2c->req_by_stream_id, stream_id, req_index);
@@ -190,27 +232,27 @@ static inline void
 http2_conn_free_req (http2_conn_ctx_t *h2c, http2_req_t *req,
                     clib_thread_index_t thread_index)
 {
-  http2_main_t *h2m = &http2_main;
+  http2_worker_ctx_t *wrk = http2_get_worker (thread_index);
 
   HTTP_DBG (1, "h2c [%u]%x req_index %x stream_id %u", thread_index,
-           h2c - h2m->conn_pool[thread_index],
+           h2c - wrk->conn_pool,
            ((http_req_handle_t) req->base.hr_req_handle).req_index,
            req->stream_id);
-  if (clib_llist_elt_is_linked (req, resume_list))
-    clib_llist_remove (h2m->req_pool[thread_index], resume_list, req);
+  if (clib_llist_elt_is_linked (req, sched_list))
+    clib_llist_remove (wrk->req_pool, sched_list, req);
   vec_free (req->base.headers);
   vec_free (req->base.target);
   http_buffer_free (&req->base.tx_buf);
   hash_unset (h2c->req_by_stream_id, req->stream_id);
   if (CLIB_DEBUG)
     memset (req, 0xba, sizeof (*req));
-  pool_put (h2m->req_pool[thread_index], req);
+  pool_put (wrk->req_pool, req);
 }
 
 http2_req_t *
 http2_conn_get_req (http_conn_t *hc, u32 stream_id)
 {
-  http2_main_t *h2m = &http2_main;
+  http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
   http2_conn_ctx_t *h2c;
   uword *p;
 
@@ -218,7 +260,7 @@ http2_conn_get_req (http_conn_t *hc, u32 stream_id)
   p = hash_get (h2c->req_by_stream_id, stream_id);
   if (p)
     {
-      return pool_elt_at_index (h2m->req_pool[hc->c_thread_index], p[0]);
+      return pool_elt_at_index (wrk->req_pool, p[0]);
     }
   else
     {
@@ -231,59 +273,67 @@ http2_conn_get_req (http_conn_t *hc, u32 stream_id)
 always_inline http2_req_t *
 http2_req_get (u32 req_index, clib_thread_index_t thread_index)
 {
-  http2_main_t *h2m = &http2_main;
+  http2_worker_ctx_t *wrk = http2_get_worker (thread_index);
 
-  return pool_elt_at_index (h2m->req_pool[thread_index], req_index);
+  return pool_elt_at_index (wrk->req_pool, req_index);
 }
 
-always_inline int
-http2_req_update_peer_window (http2_req_t *req, i64 delta)
+always_inline void
+http2_conn_schedule (http2_conn_ctx_t *h2c, clib_thread_index_t thread_index)
 {
-  i64 new_value;
+  http2_worker_ctx_t *wrk = http2_get_worker (thread_index);
+  http2_conn_ctx_t *he;
 
-  new_value = (i64) req->peer_window + delta;
-  if (new_value > HTTP2_WIN_SIZE_MAX)
-    return -1;
-  req->peer_window = (i32) new_value;
-  HTTP_DBG (1, "new window size %d", req->peer_window);
-  return 0;
+  if (!clib_llist_elt_is_linked (h2c, sched_list) &&
+      !(h2c->flags & HTTP2_CONN_F_TS_DESCHED))
+    {
+      he = clib_llist_elt (wrk->conn_pool, wrk->sched_head);
+      clib_llist_add_tail (wrk->conn_pool, sched_list, h2c, he);
+    }
 }
 
 always_inline void
-http2_req_add_to_resume_list (http2_conn_ctx_t *h2c, http2_req_t *req)
+http2_req_schedule_data_tx (http_conn_t *hc, http2_req_t *req)
 {
-  http2_main_t *h2m = &http2_main;
+  http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
+  http2_conn_ctx_t *h2c;
   http2_req_t *he;
 
-  req->flags &= ~HTTP2_REQ_F_NEED_WINDOW_UPDATE;
-  he = clib_llist_elt (h2m->req_pool[req->base.c_thread_index],
-                      h2c->streams_to_resume);
-  clib_llist_add_tail (h2m->req_pool[req->base.c_thread_index], resume_list,
-                      req, he);
+  h2c = http2_conn_ctx_get_w_thread (hc);
+  he = clib_llist_elt (wrk->req_pool, h2c->old_tx_streams);
+  clib_llist_add_tail (wrk->req_pool, sched_list, req, he);
 }
 
-always_inline void
-http2_resume_list_process (http_conn_t *hc)
+always_inline int
+http2_req_update_peer_window (http_conn_t *hc, http2_req_t *req, i64 delta)
 {
-  http2_main_t *h2m = &http2_main;
-  http2_req_t *he, *req;
+  http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
   http2_conn_ctx_t *h2c;
+  i64 new_value;
 
-  h2c = http2_conn_ctx_get_w_thread (hc);
-  he =
-    clib_llist_elt (h2m->req_pool[hc->c_thread_index], h2c->streams_to_resume);
-
-  /* check if something in list and reschedule first app session from list if
-   * we have some space in connection window */
-  if (h2c->peer_window > 0 &&
-      !clib_llist_is_empty (h2m->req_pool[hc->c_thread_index], resume_list,
-                           he))
-    {
-      req =
-       clib_llist_next (h2m->req_pool[hc->c_thread_index], resume_list, he);
-      clib_llist_remove (h2m->req_pool[hc->c_thread_index], resume_list, req);
-      transport_connection_reschedule (&req->base.connection);
+  new_value = (i64) req->peer_window + delta;
+  if (new_value > HTTP2_WIN_SIZE_MAX)
+    return -1;
+  req->peer_window = (i32) new_value;
+  HTTP_DBG (1, "new window size %d", req->peer_window);
+  /* settings change can make stream window negative */
+  if (req->peer_window <= 0)
+    {
+      HTTP_DBG (1, "descheduling need stream window update");
+      req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE;
+      if (clib_llist_elt_is_linked (req, sched_list))
+       clib_llist_remove (wrk->req_pool, sched_list, req);
+      return 0;
     }
+  if (req->flags & HTTP2_REQ_F_NEED_WINDOW_UPDATE)
+    {
+      req->flags &= ~HTTP2_REQ_F_NEED_WINDOW_UPDATE;
+      http2_req_schedule_data_tx (hc, req);
+      h2c = http2_conn_ctx_get_w_thread (hc);
+      if (h2c->peer_window > 0)
+       http2_conn_schedule (h2c, hc->c_thread_index);
+    }
+  return 0;
 }
 
 /* send GOAWAY frame and close TCP connection */
@@ -291,6 +341,7 @@ always_inline void
 http2_connection_error (http_conn_t *hc, http2_error_t error,
                        transport_send_params_t *sp)
 {
+  http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
   u8 *response;
   u32 req_index, stream_id;
   http2_conn_ctx_t *h2c;
@@ -308,6 +359,8 @@ http2_connection_error (http_conn_t *hc, http2_error_t error,
                  if (req->stream_state != HTTP2_STREAM_STATE_CLOSED)
                    session_transport_reset_notify (&req->base.connection);
                }));
+  if (clib_llist_elt_is_linked (h2c, sched_list))
+    clib_llist_remove (wrk->conn_pool, sched_list, h2c);
   http_shutdown_transport (hc);
 }
 
@@ -328,6 +381,8 @@ always_inline void
 http2_stream_error (http_conn_t *hc, http2_req_t *req, http2_error_t error,
                    transport_send_params_t *sp)
 {
+  http2_conn_ctx_t *h2c;
+
   ASSERT (req->stream_state > HTTP2_STREAM_STATE_IDLE);
 
   http2_send_stream_error (hc, req->stream_id, error, sp);
@@ -336,11 +391,17 @@ http2_stream_error (http_conn_t *hc, http2_req_t *req, http2_error_t error,
     session_transport_closed_notify (&req->base.connection);
   else
     session_transport_closing_notify (&req->base.connection);
+
+  h2c = http2_conn_ctx_get_w_thread (hc);
+  session_transport_delete_notify (&req->base.connection);
+  http2_conn_free_req (h2c, req, hc->c_thread_index);
 }
 
 always_inline void
-http2_stream_close (http2_req_t *req)
+http2_stream_close (http2_req_t *req, http_conn_t *hc)
 {
+  http2_conn_ctx_t *h2c;
+
   req->stream_state = HTTP2_STREAM_STATE_CLOSED;
   if (req->flags & HTTP2_REQ_F_APP_CLOSED)
     {
@@ -356,6 +417,10 @@ http2_stream_close (http2_req_t *req)
                ((http_req_handle_t) req->base.hr_req_handle).req_index);
       session_transport_closing_notify (&req->base.connection);
     }
+
+  h2c = http2_conn_ctx_get_w_thread (hc);
+  session_transport_delete_notify (&req->base.connection);
+  http2_conn_free_req (h2c, req, hc->c_thread_index);
 }
 
 always_inline void
@@ -384,6 +449,249 @@ http2_send_server_preface (http_conn_t *hc)
   http_io_ts_after_write (hc, 1);
 }
 
+/***********************/
+/* stream TX scheduler */
+/***********************/
+
+static void
+http2_sched_dispatch_headers (http2_req_t *req, http_conn_t *hc,
+                             u8 *n_emissions)
+{
+  http_msg_t msg;
+  u8 *response, *date, *app_headers = 0;
+  u8 fh[HTTP2_FRAME_HEADER_SIZE];
+  hpack_response_control_data_t control_data;
+  u8 flags = HTTP2_FRAME_FLAG_END_HEADERS;
+  u32 n_written, stream_id, n_deq;
+  http2_conn_ctx_t *h2c;
+
+  http_get_app_msg (&req->base, &msg);
+  ASSERT (msg.type == HTTP_MSG_REPLY);
+  n_deq = sizeof (msg);
+  *n_emissions += msg.data.type == HTTP_MSG_DATA_PTR ?
+                   HTTP2_SCHED_WEIGHT_HEADERS_PTR :
+                   HTTP2_SCHED_WEIGHT_HEADERS_INLINE;
+
+  response = http_get_tx_buf (hc);
+  date = format (0, "%U", format_http_time_now, hc);
+
+  control_data.sc = msg.code;
+  control_data.content_len = msg.data.body_len;
+  control_data.server_name = hc->app_name;
+  control_data.server_name_len = vec_len (hc->app_name);
+  control_data.date = date;
+  control_data.date_len = vec_len (date);
+
+  if (msg.data.headers_len)
+    {
+      n_deq += msg.data.type == HTTP_MSG_DATA_PTR ? sizeof (uword) :
+                                                   msg.data.headers_len;
+      app_headers = http_get_app_header_list (&req->base, &msg);
+    }
+
+  hpack_serialize_response (app_headers, msg.data.headers_len, &control_data,
+                           &response);
+  vec_free (date);
+
+  h2c = http2_conn_ctx_get_w_thread (hc);
+  if (vec_len (response) > h2c->peer_settings.max_frame_size)
+    {
+      /* TODO: CONTINUATION (headers fragmentation) */
+      clib_warning ("resp headers greater than SETTINGS_MAX_FRAME_SIZE");
+      http2_stream_error (hc, req, HTTP2_ERROR_INTERNAL_ERROR, 0);
+      return;
+    }
+
+  stream_id = req->stream_id;
+  if (msg.data.body_len)
+    {
+      /* start sending the actual data */
+      http_req_tx_buffer_init (&req->base, &msg);
+      HTTP_DBG (1, "adding to data queue req_index %x",
+               ((http_req_handle_t) req->base.hr_req_handle).req_index);
+      http2_req_schedule_data_tx (hc, req);
+      http_io_as_dequeue_notify (&req->base, n_deq);
+    }
+  else
+    {
+      /* no response body, we are done */
+      flags |= HTTP2_FRAME_FLAG_END_STREAM;
+      http2_stream_close (req, hc);
+    }
+
+  http2_frame_write_headers_header (vec_len (response), stream_id, flags, fh);
+  svm_fifo_seg_t segs[2] = { { fh, HTTP2_FRAME_HEADER_SIZE },
+                            { response, vec_len (response) } };
+  n_written = http_io_ts_write_segs (hc, segs, 2, 0);
+  ASSERT (n_written == (HTTP2_FRAME_HEADER_SIZE + vec_len (response)));
+  http_io_ts_after_write (hc, 0);
+}
+
+static void
+http2_sched_dispatch_data (http2_req_t *req, http_conn_t *hc, u8 *n_emissions)
+{
+  u32 max_write, max_read, n_segs, n_read, n_written = 0;
+  svm_fifo_seg_t *app_segs, *segs = 0;
+  http_buffer_t *hb = &req->base.tx_buf;
+  u8 fh[HTTP2_FRAME_HEADER_SIZE];
+  u8 finished = 0, flags = 0;
+  http2_conn_ctx_t *h2c;
+
+  ASSERT (http_buffer_bytes_left (hb) > 0);
+
+  *n_emissions += hb->type == HTTP_BUFFER_PTR ? HTTP2_SCHED_WEIGHT_DATA_PTR :
+                                               HTTP2_SCHED_WEIGHT_DATA_INLINE;
+
+  h2c = http2_conn_ctx_get_w_thread (hc);
+
+  max_write = http_io_ts_max_write (hc, 0);
+  max_write -= HTTP2_FRAME_HEADER_SIZE;
+  max_write = clib_min (max_write, (u32) req->peer_window);
+  max_write = clib_min (max_write, h2c->peer_window);
+  max_write = clib_min (max_write, h2c->peer_settings.max_frame_size);
+
+  max_read = http_buffer_bytes_left (hb);
+
+  n_read = http_buffer_get_segs (hb, max_write, &app_segs, &n_segs);
+  if (n_read == 0)
+    {
+      HTTP_DBG (1, "no data to deq");
+      transport_connection_reschedule (&req->base.connection);
+      return;
+    }
+
+  finished = (max_read - n_read) == 0;
+  flags = finished ? HTTP2_FRAME_FLAG_END_STREAM : 0;
+  http2_frame_write_data_header (n_read, req->stream_id, flags, fh);
+  vec_validate (segs, 0);
+  segs[0].len = HTTP2_FRAME_HEADER_SIZE;
+  segs[0].data = fh;
+  vec_append (segs, app_segs);
+
+  n_written = http_io_ts_write_segs (hc, segs, n_segs + 1, 0);
+  ASSERT (n_written == (HTTP2_FRAME_HEADER_SIZE + n_read));
+  vec_free (segs);
+  http_buffer_drain (hb, n_read);
+  req->peer_window -= n_read;
+  h2c->peer_window -= n_read;
+
+  if (finished)
+    {
+      /* all done, close stream */
+      http_buffer_free (hb);
+      if (hc->flags & HTTP_CONN_F_IS_SERVER)
+       http2_stream_close (req, hc);
+      else
+       req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
+    }
+  else
+    {
+      if (req->peer_window == 0)
+       {
+         /* mark that we need window update on stream */
+         HTTP_DBG (1, "stream window is full");
+         req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE;
+       }
+      else
+       {
+         /* schedule for next round */
+         HTTP_DBG (1, "adding to data queue req_index %x",
+                   ((http_req_handle_t) req->base.hr_req_handle).req_index);
+         http2_req_schedule_data_tx (hc, req);
+         http_io_as_dequeue_notify (&req->base, n_read);
+       }
+    }
+
+  http_io_ts_after_write (hc, finished);
+}
+
+static void
+http2_update_time_callback (f64 now, u8 thread_index)
+{
+  http2_worker_ctx_t *wrk = http2_get_worker (thread_index);
+  http2_conn_ctx_t *h2c;
+  http_conn_t *hc;
+  http2_req_t *req, *new_he, *old_he;
+  clib_llist_index_t ri, old_ti, next_ri, ci;
+  u8 n_emissions = 0;
+
+  /*
+   * Run stream tx scheduler, we want to run for short time each heart-beat, so
+   * only one stream is processed with cap on frames emission. Since not all
+   * frames are equal, from CPU cycles or memory copy perspective, different
+   * weights are assigned when incrementing emissions counter. In most of cases
+   * connection is schedule only if it will be able to send data, same applies
+   * to streams within connection.
+   */
+  ci = clib_llist_next_index (clib_llist_elt (wrk->conn_pool, wrk->sched_head),
+                             sched_list);
+  if (ci != wrk->sched_head)
+    {
+      h2c = clib_llist_elt (wrk->conn_pool, ci);
+      ASSERT (!(h2c->flags & HTTP2_CONN_F_TS_DESCHED));
+      clib_llist_remove (wrk->conn_pool, sched_list, h2c);
+      hc = http_conn_get_w_thread (h2c->hc_index, thread_index);
+      ASSERT (hc->flags & HTTP_CONN_F_HAS_REQUEST);
+
+      /* first handle new responses (headers frame) */
+      new_he = clib_llist_elt (wrk->req_pool, h2c->new_tx_streams);
+      ri = clib_llist_next_index (new_he, sched_list);
+      /* save tail of old list so we will do only one round of already queued
+       * streams */
+      old_ti = clib_llist_prev_index (
+       clib_llist_elt (wrk->req_pool, h2c->old_tx_streams), sched_list);
+      while (ri != h2c->new_tx_streams &&
+            !http_io_ts_check_write_thresh (hc) &&
+            n_emissions < HTTP2_SCHED_MAX_EMISSIONS)
+       {
+         req = clib_llist_elt (wrk->req_pool, ri);
+         ri = clib_llist_next_index (req, sched_list);
+         HTTP_DBG (1, "sending headers req_index %x",
+                   ((http_req_handle_t) req->base.hr_req_handle).req_index);
+         clib_llist_remove (wrk->req_pool, sched_list, req);
+         http2_sched_dispatch_headers (req, hc, &n_emissions);
+       }
+
+      /* handle old responses (data frames), if we had any prior to processing
+       * new ones, each stream tx one frame for now */
+      /* TODO RFC9218 Prioritization (urgency will be weight) */
+      old_he = clib_llist_elt (wrk->req_pool, h2c->old_tx_streams);
+      if (old_ti != h2c->old_tx_streams)
+       {
+         ri = clib_llist_next_index (old_he, sched_list);
+         while (!http_io_ts_check_write_thresh (hc) && h2c->peer_window > 0 &&
+                n_emissions < HTTP2_SCHED_MAX_EMISSIONS)
+           {
+             req = clib_llist_elt (wrk->req_pool, ri);
+             next_ri = clib_llist_next_index (req, sched_list);
+             HTTP_DBG (
+               1, "sending data req_index %x",
+               ((http_req_handle_t) req->base.hr_req_handle).req_index);
+             clib_llist_remove (wrk->req_pool, sched_list, req);
+             http2_sched_dispatch_data (req, hc, &n_emissions);
+             if (ri == old_ti)
+               break;
+
+             ri = next_ri;
+           }
+       }
+      /* deschedule http connection and wait for deq notification if underlying
+       * transport session tx fifo is almost full */
+      if (http_io_ts_check_write_thresh (hc))
+       {
+         h2c->flags |= HTTP2_CONN_F_TS_DESCHED;
+         http_io_ts_add_want_deq_ntf (hc);
+         if (clib_llist_elt_is_linked (h2c, sched_list))
+           clib_llist_remove (wrk->conn_pool, sched_list, h2c);
+         return;
+       }
+      /* reschedule connection if something is waiting in queue */
+      if (!clib_llist_is_empty (wrk->req_pool, sched_list, new_he) ||
+         !clib_llist_is_empty (wrk->req_pool, sched_list, old_he))
+       http2_conn_schedule (h2c, hc->c_thread_index);
+    }
+}
+
 /*************************************/
 /* request state machine handlers RX */
 /*************************************/
@@ -564,68 +872,24 @@ http2_req_state_wait_app_reply (http_conn_t *hc, http2_req_t *req,
                                transport_send_params_t *sp,
                                http2_error_t *error)
 {
-  http_msg_t msg;
-  u8 *response, *date, *app_headers = 0;
-  u8 fh[HTTP2_FRAME_HEADER_SIZE];
-  hpack_response_control_data_t control_data;
-  u8 flags = HTTP2_FRAME_FLAG_END_HEADERS;
-  http_sm_result_t sm_result = HTTP_SM_ERROR;
-  u32 n_written;
+  http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
+  http2_req_t *he;
   http2_conn_ctx_t *h2c;
 
-  http_get_app_msg (&req->base, &msg);
-  ASSERT (msg.type == HTTP_MSG_REPLY);
-
-  response = http_get_tx_buf (hc);
-  date = format (0, "%U", format_http_time_now, hc);
-
-  control_data.sc = msg.code;
-  control_data.content_len = msg.data.body_len;
-  control_data.server_name = hc->app_name;
-  control_data.server_name_len = vec_len (hc->app_name);
-  control_data.date = date;
-  control_data.date_len = vec_len (date);
-
-  if (msg.data.headers_len)
-    app_headers = http_get_app_header_list (&req->base, &msg);
-
-  hpack_serialize_response (app_headers, msg.data.headers_len, &control_data,
-                           &response);
-  vec_free (date);
+  ASSERT (!clib_llist_elt_is_linked (req, sched_list));
 
+  /* add response to stream scheduler */
+  HTTP_DBG (1, "adding to headers queue req_index %x",
+           ((http_req_handle_t) req->base.hr_req_handle).req_index);
   h2c = http2_conn_ctx_get_w_thread (hc);
-  if (vec_len (response) > h2c->peer_settings.max_frame_size)
-    {
-      /* TODO: CONTINUATION (headers fragmentation) */
-      clib_warning ("resp headers greater than SETTINGS_MAX_FRAME_SIZE");
-      *error = HTTP2_ERROR_INTERNAL_ERROR;
-      return HTTP_SM_ERROR;
-    }
-
-  if (msg.data.body_len)
-    {
-      /* start sending the actual data */
-      http_req_tx_buffer_init (&req->base, &msg);
-      http_req_state_change (&req->base, HTTP_REQ_STATE_APP_IO_MORE_DATA);
-      sm_result = HTTP_SM_CONTINUE;
-    }
-  else
-    {
-      /* no response body, we are done */
-      flags |= HTTP2_FRAME_FLAG_END_STREAM;
-      sm_result = HTTP_SM_STOP;
-      http2_stream_close (req);
-    }
+  he = clib_llist_elt (wrk->req_pool, h2c->new_tx_streams);
+  clib_llist_add_tail (wrk->req_pool, sched_list, req, he);
+  http2_conn_schedule (h2c, hc->c_thread_index);
 
-  http2_frame_write_headers_header (vec_len (response), req->stream_id, flags,
-                                   fh);
-  svm_fifo_seg_t segs[2] = { { fh, HTTP2_FRAME_HEADER_SIZE },
-                            { response, vec_len (response) } };
-  n_written = http_io_ts_write_segs (hc, segs, 2, sp);
-  ASSERT (n_written == (HTTP2_FRAME_HEADER_SIZE + vec_len (response)));
-  http_io_ts_after_write (hc, 0);
+  http_req_state_change (&req->base, HTTP_REQ_STATE_APP_IO_MORE_DATA);
+  http_req_deschedule (&req->base, sp);
 
-  return sm_result;
+  return HTTP_SM_STOP;
 }
 
 static http_sm_result_t
@@ -633,84 +897,20 @@ http2_req_state_app_io_more_data (http_conn_t *hc, http2_req_t *req,
                                  transport_send_params_t *sp,
                                  http2_error_t *error)
 {
-  u32 max_write, max_read, n_segs, n_read, n_written = 0;
-  svm_fifo_seg_t *app_segs, *segs = 0;
-  http_buffer_t *hb = &req->base.tx_buf;
-  u8 fh[HTTP2_FRAME_HEADER_SIZE];
-  u8 finished = 0, flags = 0;
   http2_conn_ctx_t *h2c;
 
-  ASSERT (http_buffer_bytes_left (hb) > 0);
+  ASSERT (!clib_llist_elt_is_linked (req, sched_list));
 
-  if (req->peer_window <= 0)
-    {
-      HTTP_DBG (1, "stream window is full");
-      /* mark that we need window update on stream */
-      req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE;
-      http_req_deschedule (&req->base, sp);
-      return HTTP_SM_STOP;
-    }
+  /* add data back to stream scheduler */
+  HTTP_DBG (1, "adding to data queue req_index %x",
+           ((http_req_handle_t) req->base.hr_req_handle).req_index);
+  http2_req_schedule_data_tx (hc, req);
   h2c = http2_conn_ctx_get_w_thread (hc);
-  if (h2c->peer_window == 0)
-    {
-      HTTP_DBG (1, "connection window is full");
-      /* add to waiting queue */
-      http2_req_add_to_resume_list (h2c, req);
-      http_req_deschedule (&req->base, sp);
-      return HTTP_SM_STOP;
-    }
-
-  max_write = http_io_ts_max_write (hc, sp);
-  if (max_write <= HTTP2_FRAME_HEADER_SIZE)
-    {
-      HTTP_DBG (1, "ts tx fifo full");
-      goto check_fifo;
-    }
-  max_write -= HTTP2_FRAME_HEADER_SIZE;
-  max_write = clib_min (max_write, (u32) req->peer_window);
-  max_write = clib_min (max_write, h2c->peer_window);
-  max_write = clib_min (max_write, h2c->peer_settings.max_frame_size);
-
-  max_read = http_buffer_bytes_left (hb);
-
-  n_read = http_buffer_get_segs (hb, max_write, &app_segs, &n_segs);
-  if (n_read == 0)
-    {
-      HTTP_DBG (1, "no data to deq");
-      goto check_fifo;
-    }
-
-  finished = (max_read - n_read) == 0;
-  flags = finished ? HTTP2_FRAME_FLAG_END_STREAM : 0;
-  http2_frame_write_data_header (n_read, req->stream_id, flags, fh);
-  vec_validate (segs, 0);
-  segs[0].len = HTTP2_FRAME_HEADER_SIZE;
-  segs[0].data = fh;
-  vec_append (segs, app_segs);
-
-  n_written = http_io_ts_write_segs (hc, segs, n_segs + 1, sp);
-  ASSERT (n_written == (HTTP2_FRAME_HEADER_SIZE + n_read));
-  vec_free (segs);
-  http_buffer_drain (hb, n_read);
-  req->peer_window -= n_read;
-  h2c->peer_window -= n_read;
+  if (h2c->peer_window > 0)
+    http2_conn_schedule (h2c, hc->c_thread_index);
 
-  if (finished)
-    {
-      http_buffer_free (hb);
-      if (hc->flags & HTTP_CONN_F_IS_SERVER)
-       http2_stream_close (req);
-      else
-       req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
-    }
-  http_io_ts_after_write (hc, finished);
+  http_req_deschedule (&req->base, sp);
 
-check_fifo:
-  if (http_io_ts_check_write_thresh (hc))
-    {
-      http_io_ts_add_want_deq_ntf (hc);
-      http_req_deschedule (&req->base, sp);
-    }
   return HTTP_SM_STOP;
 }
 
@@ -759,7 +959,6 @@ http2_req_run_state_machine (http_conn_t *hc, http2_req_t *req,
 {
   http_sm_result_t res;
   http2_error_t error;
-  http2_conn_ctx_t *h2c;
 
   do
     {
@@ -776,13 +975,6 @@ http2_req_run_state_machine (http_conn_t *hc, http2_req_t *req,
     }
   while (res == HTTP_SM_CONTINUE);
 
-  if (req->stream_state == HTTP2_STREAM_STATE_CLOSED)
-    {
-      h2c = http2_conn_ctx_get_w_thread (hc);
-      session_transport_delete_notify (&req->base.connection);
-      http2_conn_free_req (h2c, req, hc->c_thread_index);
-    }
-
   return HTTP2_ERROR_NO_ERROR;
 }
 
@@ -935,6 +1127,7 @@ http2_handle_data_frame (http_conn_t *hc, http2_frame_header_t *fh)
 static http2_error_t
 http2_handle_window_update_frame (http_conn_t *hc, http2_frame_header_t *fh)
 {
+  http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
   u8 *rx_buf;
   u32 win_increment;
   http2_error_t rv;
@@ -971,6 +1164,11 @@ http2_handle_window_update_frame (http_conn_t *hc, http2_frame_header_t *fh)
       if (win_increment > (HTTP2_WIN_SIZE_MAX - h2c->peer_window))
        return HTTP2_ERROR_FLOW_CONTROL_ERROR;
       h2c->peer_window += win_increment;
+      /* reschedule connection if we have pending data */
+      if (!clib_llist_is_empty (
+           wrk->req_pool, sched_list,
+           clib_llist_elt (wrk->req_pool, h2c->old_tx_streams)))
+       http2_conn_schedule (h2c, hc->c_thread_index);
     }
   else
     {
@@ -990,13 +1188,11 @@ http2_handle_window_update_frame (http_conn_t *hc, http2_frame_header_t *fh)
        }
       if (req->stream_state != HTTP2_STREAM_STATE_CLOSED)
        {
-         if (http2_req_update_peer_window (req, win_increment))
+         if (http2_req_update_peer_window (hc, req, win_increment))
            {
              http2_stream_error (hc, req, HTTP2_ERROR_FLOW_CONTROL_ERROR, 0);
              return HTTP2_ERROR_NO_ERROR;
            }
-         if (req->flags & HTTP2_REQ_F_NEED_WINDOW_UPDATE)
-           http2_req_add_to_resume_list (h2c, req);
        }
     }
 
@@ -1056,11 +1252,9 @@ http2_handle_settings_frame (http_conn_t *hc, http2_frame_header_t *fh)
              req = http2_req_get (req_index, hc->c_thread_index);
              if (req->stream_state != HTTP2_STREAM_STATE_CLOSED)
                {
-                 if (http2_req_update_peer_window (req, win_size_delta))
+                 if (http2_req_update_peer_window (hc, req, win_size_delta))
                    http2_stream_error (hc, req,
                                        HTTP2_ERROR_FLOW_CONTROL_ERROR, 0);
-                 if (req->flags & HTTP2_REQ_F_NEED_WINDOW_UPDATE)
-                   http2_req_add_to_resume_list (h2c, req);
                }
            }));
        }
@@ -1073,6 +1267,7 @@ http2_handle_settings_frame (http_conn_t *hc, http2_frame_header_t *fh)
 static http2_error_t
 http2_handle_rst_stream_frame (http_conn_t *hc, http2_frame_header_t *fh)
 {
+  http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
   u8 *rx_buf;
   http2_error_t rv;
   http2_req_t *req;
@@ -1106,6 +1301,8 @@ http2_handle_rst_stream_frame (http_conn_t *hc, http2_frame_header_t *fh)
 
   req->stream_state = HTTP2_STREAM_STATE_CLOSED;
   session_transport_reset_notify (&req->base.connection);
+  if (clib_llist_elt_is_linked (req, sched_list))
+    clib_llist_remove (wrk->req_pool, sched_list, req);
 
   return HTTP2_ERROR_NO_ERROR;
 }
@@ -1113,6 +1310,7 @@ http2_handle_rst_stream_frame (http_conn_t *hc, http2_frame_header_t *fh)
 static http2_error_t
 http2_handle_goaway_frame (http_conn_t *hc, http2_frame_header_t *fh)
 {
+  http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
   u8 *rx_buf;
   http2_error_t rv;
   u32 error_code, last_stream_id, req_index, stream_id;
@@ -1145,6 +1343,8 @@ http2_handle_goaway_frame (http_conn_t *hc, http2_frame_header_t *fh)
                      req = http2_req_get (req_index, hc->c_thread_index);
                      session_transport_reset_notify (&req->base.connection);
                    }));
+      if (clib_llist_elt_is_linked (h2c, sched_list))
+       clib_llist_remove (wrk->conn_pool, sched_list, h2c);
       http_shutdown_transport (hc);
     }
 
@@ -1272,7 +1472,7 @@ http2_app_tx_callback (http_conn_t *hc, u32 req_index,
   http2_req_t *req;
   http2_error_t rv;
 
-  HTTP_DBG (1, "hc [%u]%x req_index %u", hc->c_thread_index, hc->hc_hc_index,
+  HTTP_DBG (1, "hc [%u]%x req_index %x", hc->c_thread_index, hc->hc_hc_index,
            req_index);
   req = http2_req_get (req_index, hc->c_thread_index);
 
@@ -1312,9 +1512,6 @@ http2_app_tx_callback (http_conn_t *hc, u32 req_index,
       return;
     }
 
-  /* maybe we can continue sending data on some stream */
-  http2_resume_list_process (hc);
-
   /* reset http connection expiration timer */
   http_conn_timer_update (hc);
 }
@@ -1356,7 +1553,7 @@ http2_app_close_callback (http_conn_t *hc, u32 req_index,
 {
   http2_req_t *req;
 
-  HTTP_DBG (1, "hc [%u]%x req_index %u", hc->c_thread_index, hc->hc_hc_index,
+  HTTP_DBG (1, "hc [%u]%x req_index %x", hc->c_thread_index, hc->hc_hc_index,
            req_index);
   req = http2_req_get (req_index, thread_index);
   if (!req)
@@ -1384,7 +1581,7 @@ http2_app_reset_callback (http_conn_t *hc, u32 req_index,
 {
   http2_req_t *req;
 
-  HTTP_DBG (1, "hc [%u]%x req_index %u", hc->c_thread_index, hc->hc_hc_index,
+  HTTP_DBG (1, "hc [%u]%x req_index %x", hc->c_thread_index, hc->hc_hc_index,
            req_index);
   req = http2_req_get (req_index, thread_index);
   req->flags |= HTTP2_REQ_F_APP_CLOSED;
@@ -1534,8 +1731,6 @@ http2_transport_rx_callback (http_conn_t *hc)
       http_io_ts_after_write (hc, 0);
       h2c->our_window = HTTP2_CONNECTION_WINDOW_SIZE;
     }
-  /* maybe we can continue sending data on some stream */
-  http2_resume_list_process (hc);
 
   /* reset http connection expiration timer */
   http_conn_timer_update (hc);
@@ -1544,6 +1739,7 @@ http2_transport_rx_callback (http_conn_t *hc)
 static void
 http2_transport_close_callback (http_conn_t *hc)
 {
+  http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
   u32 req_index, stream_id, n_open_streams = 0;
   http2_req_t *req;
   http2_conn_ctx_t *h2c;
@@ -1561,7 +1757,7 @@ http2_transport_close_callback (http_conn_t *hc)
                  req = http2_req_get (req_index, hc->c_thread_index);
                  if (req->stream_state != HTTP2_STREAM_STATE_CLOSED)
                    {
-                     HTTP_DBG (1, "req_index %u", req_index);
+                     HTTP_DBG (1, "req_index %x", req_index);
                      session_transport_closing_notify (&req->base.connection);
                      n_open_streams++;
                    }
@@ -1569,6 +1765,8 @@ http2_transport_close_callback (http_conn_t *hc)
   if (n_open_streams == 0)
     {
       HTTP_DBG (1, "no open stream disconnecting");
+      if (clib_llist_elt_is_linked (h2c, sched_list))
+       clib_llist_remove (wrk->conn_pool, sched_list, h2c);
       http_disconnect_transport (hc);
     }
 }
@@ -1576,6 +1774,7 @@ http2_transport_close_callback (http_conn_t *hc)
 static void
 http2_transport_reset_callback (http_conn_t *hc)
 {
+  http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
   u32 req_index, stream_id;
   http2_req_t *req;
   http2_conn_ctx_t *h2c;
@@ -1590,36 +1789,33 @@ http2_transport_reset_callback (http_conn_t *hc)
                  req = http2_req_get (req_index, hc->c_thread_index);
                  if (req->stream_state != HTTP2_STREAM_STATE_CLOSED)
                    {
-                     HTTP_DBG (1, "req_index %u", req_index);
+                     HTTP_DBG (1, "req_index %x", req_index);
                      session_transport_reset_notify (&req->base.connection);
                    }
                }));
+  if (clib_llist_elt_is_linked (h2c, sched_list))
+    clib_llist_remove (wrk->conn_pool, sched_list, h2c);
 }
 
 static void
 http2_transport_conn_reschedule_callback (http_conn_t *hc)
 {
-  u32 req_index, stream_id;
-  http2_req_t *req;
+  http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
   http2_conn_ctx_t *h2c;
 
   HTTP_DBG (1, "hc [%u]%x", hc->c_thread_index, hc->hc_hc_index);
   ASSERT (hc->flags & HTTP_CONN_F_HAS_REQUEST);
 
-  if (!(hc->flags & HTTP_CONN_F_HAS_REQUEST))
-    return;
-
   h2c = http2_conn_ctx_get_w_thread (hc);
-  hash_foreach (
-    stream_id, req_index, h2c->req_by_stream_id, ({
-      req = http2_req_get (req_index, hc->c_thread_index);
-      if (req->stream_state != HTTP2_STREAM_STATE_CLOSED &&
-         transport_connection_is_descheduled (&req->base.connection))
-       {
-         HTTP_DBG (1, "req_index %u", req_index);
-         transport_connection_reschedule (&req->base.connection);
-       }
-    }));
+  h2c->flags &= ~HTTP2_CONN_F_TS_DESCHED;
+  /* reschedule connection if something is waiting in queue */
+  if (!clib_llist_is_empty (
+       wrk->req_pool, sched_list,
+       clib_llist_elt (wrk->req_pool, h2c->new_tx_streams)) ||
+      !clib_llist_is_empty (
+       wrk->req_pool, sched_list,
+       clib_llist_elt (wrk->req_pool, h2c->old_tx_streams)))
+    http2_conn_schedule (h2c, hc->c_thread_index);
 }
 
 static void
@@ -1665,11 +1861,17 @@ http2_enable_callback (void)
   http2_main_t *h2m = &http2_main;
   vlib_thread_main_t *vtm = vlib_get_thread_main ();
   u32 num_threads;
+  http2_worker_ctx_t *wrk;
+  int i;
 
   num_threads = 1 /* main thread */ + vtm->n_threads;
 
-  vec_validate (h2m->conn_pool, num_threads - 1);
-  vec_validate (h2m->req_pool, num_threads - 1);
+  vec_validate (h2m->wrk_ctx, num_threads - 1);
+  for (i = 0; i < num_threads; i++)
+    {
+      wrk = &h2m->wrk_ctx[i];
+      wrk->sched_head = clib_llist_make_head (wrk->conn_pool, sched_list);
+    }
 }
 
 static int
index fd90fbf..493abb3 100644 (file)
@@ -211,6 +211,7 @@ http_buffer_init (http_buffer_t *hb, http_buffer_type_t type, svm_fifo_t *f,
                  u64 data_len)
 {
   hb->vft = &buf_vfts[type];
+  hb->type = type;
   hb->vft->init (hb, f, data_len);
 }
 
index 01b37d4..6176f10 100644 (file)
@@ -31,6 +31,7 @@ typedef struct http_buffer_vft_ http_buffer_vft_t;
 typedef struct http_buffer_
 {
   http_buffer_vft_t *vft;
+  http_buffer_type_t type;
   u8 data[HTTP_BUFFER_DATA_SZ];
 } http_buffer_t;
 
index 662be06..a7898cf 100644 (file)
@@ -330,6 +330,9 @@ u8 *format_http_req_state (u8 *s, va_list *va);
 u8 *format_http_conn_state (u8 *s, va_list *args);
 u8 *format_http_time_now (u8 *s, va_list *args);
 
+http_conn_t *http_conn_get_w_thread (u32 hc_index,
+                                    clib_thread_index_t thread_index);
+
 /**
  * @brief Find the first occurrence of the string in the vector.
  *
@@ -579,6 +582,14 @@ http_io_as_reset_has_read_ntf (http_req_t *req)
   svm_fifo_reset_has_deq_ntf (as->rx_fifo);
 }
 
+always_inline void
+http_io_as_dequeue_notify (http_req_t *req, u32 n_last_deq)
+{
+  session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+  if (svm_fifo_needs_deq_ntf (as->tx_fifo, n_last_deq))
+    session_dequeue_notify (as);
+}
+
 always_inline u32
 http_io_as_max_write (http_req_t *req)
 {
@@ -682,8 +693,11 @@ always_inline u32
 http_io_ts_max_write (http_conn_t *hc, transport_send_params_t *sp)
 {
   session_t *ts = session_get_from_handle (hc->hc_tc_session_handle);
-  return clib_min (svm_fifo_max_enqueue_prod (ts->tx_fifo),
-                  sp->max_burst_size);
+  if (sp)
+    return clib_min (svm_fifo_max_enqueue_prod (ts->tx_fifo),
+                    sp->max_burst_size);
+  else
+    return svm_fifo_max_enqueue_prod (ts->tx_fifo);
 }
 
 always_inline int
@@ -783,8 +797,11 @@ http_io_ts_write_segs (http_conn_t *hc, const svm_fifo_seg_t segs[],
   session_t *ts = session_get_from_handle (hc->hc_tc_session_handle);
   n_written = svm_fifo_enqueue_segments (ts->tx_fifo, segs, n_segs, 0);
   ASSERT (n_written > 0);
-  sp->bytes_dequeued += n_written;
-  sp->max_burst_size -= n_written;
+  if (sp)
+    {
+      sp->bytes_dequeued += n_written;
+      sp->max_burst_size -= n_written;
+    }
   return (u32) n_written;
 }