u32 our_window;
u8 *payload;
u32 payload_len;
- clib_llist_anchor_t resume_list;
+ clib_llist_anchor_t sched_list;
} http2_req_t;
#define foreach_http2_conn_flags \
_ (EXPECT_PREFACE, "expect-preface") \
- _ (PREFACE_VERIFIED, "preface-verified")
+ _ (PREFACE_VERIFIED, "preface-verified") \
+ _ (TS_DESCHED, "ts-descheduled")
typedef enum http2_conn_flags_bit_
{
typedef struct http2_conn_ctx_
{
+ u32 hc_index;
http2_conn_settings_t peer_settings;
hpack_dynamic_table_t decoder_dynamic_table;
u8 flags;
u32 peer_window;
u32 our_window;
uword *req_by_stream_id;
- clib_llist_index_t streams_to_resume;
+ clib_llist_index_t new_tx_streams; /* headers */
+ clib_llist_index_t old_tx_streams; /* data */
http2_conn_settings_t settings;
+ clib_llist_anchor_t sched_list;
} http2_conn_ctx_t;
+typedef struct http2_worker_ctx_
+{
+ http2_conn_ctx_t *conn_pool;
+ http2_req_t *req_pool;
+ clib_llist_index_t sched_head;
+} http2_worker_ctx_t;
+
typedef struct http2_main_
{
- http2_conn_ctx_t **conn_pool;
- http2_req_t **req_pool;
+ http2_worker_ctx_t *wrk_ctx;
http2_conn_settings_t settings;
+ u32 n_sessions;
} http2_main_t;
+typedef enum
+{
+ HTTP2_SCHED_WEIGHT_DATA_PTR = 1,
+ HTTP2_SCHED_WEIGHT_DATA_INLINE = 2,
+ HTTP2_SCHED_WEIGHT_HEADERS_PTR = 3,
+ HTTP2_SCHED_WEIGHT_HEADERS_INLINE = 4,
+} http2_sched_weight_t;
+
+#define HTTP2_SCHED_MAX_EMISSIONS 32
+
static http2_main_t http2_main;
-http2_conn_ctx_t *
+static_always_inline http2_worker_ctx_t *
+http2_get_worker (clib_thread_index_t thread_index)
+{
+ return &http2_main.wrk_ctx[thread_index];
+}
+
+static void http2_update_time_callback (f64 now, u8 thread_index);
+
+static inline http2_conn_ctx_t *
http2_conn_ctx_alloc_w_thread (http_conn_t *hc)
{
http2_main_t *h2m = &http2_main;
http2_conn_ctx_t *h2c;
+ http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
+ u32 cnt;
- pool_get_aligned_safe (h2m->conn_pool[hc->c_thread_index], h2c,
- CLIB_CACHE_LINE_BYTES);
+ pool_get_aligned_safe (wrk->conn_pool, h2c, CLIB_CACHE_LINE_BYTES);
clib_memset (h2c, 0, sizeof (*h2c));
+ h2c->hc_index = hc->hc_hc_index;
h2c->peer_settings = http2_default_conn_settings;
h2c->peer_window = HTTP2_INITIAL_WIN_SIZE;
h2c->our_window = HTTP2_CONNECTION_WINDOW_SIZE;
h2c->settings.initial_window_size =
clib_min (h2c->settings.initial_window_size, hc->app_rx_fifo_size);
h2c->req_by_stream_id = hash_create (0, sizeof (uword));
- h2c->streams_to_resume =
- clib_llist_make_head (h2m->req_pool[hc->c_thread_index], resume_list);
- hc->opaque =
- uword_to_pointer (h2c - h2m->conn_pool[hc->c_thread_index], void *);
- HTTP_DBG (1, "h2c [%u]%x", hc->c_thread_index,
- h2c - h2m->conn_pool[hc->c_thread_index]);
+ h2c->new_tx_streams = clib_llist_make_head (wrk->req_pool, sched_list);
+ h2c->old_tx_streams = clib_llist_make_head (wrk->req_pool, sched_list);
+ h2c->sched_list.next = CLIB_LLIST_INVALID_INDEX;
+ h2c->sched_list.prev = CLIB_LLIST_INVALID_INDEX;
+ hc->opaque = uword_to_pointer (h2c - wrk->conn_pool, void *);
+ cnt = clib_atomic_fetch_add_relax (&h2m->n_sessions, 1);
+ /* (re)start stream tx scheduler if this is first connection */
+ /* TODO: update session infra to do this on per thread basis */
+ if (cnt == 0)
+ session_register_update_time_fn (http2_update_time_callback, 1);
+ HTTP_DBG (1, "h2c [%u]%x", hc->c_thread_index, h2c - wrk->conn_pool);
return h2c;
}
static inline http2_conn_ctx_t *
http2_conn_ctx_get_w_thread (http_conn_t *hc)
{
- http2_main_t *h2m = &http2_main;
+ http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
u32 h2c_index = pointer_to_uword (hc->opaque);
- return pool_elt_at_index (h2m->conn_pool[hc->c_thread_index], h2c_index);
+ return pool_elt_at_index (wrk->conn_pool, h2c_index);
}
static inline void
http2_conn_ctx_free (http_conn_t *hc)
{
http2_main_t *h2m = &http2_main;
+ http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
http2_conn_ctx_t *h2c;
+ u32 cnt;
h2c = http2_conn_ctx_get_w_thread (hc);
- HTTP_DBG (1, "h2c [%u]%x", hc->c_thread_index,
- h2c - h2m->conn_pool[hc->c_thread_index]);
+ HTTP_DBG (1, "h2c [%u]%x", hc->c_thread_index, h2c - wrk->conn_pool);
hash_free (h2c->req_by_stream_id);
if (hc->flags & HTTP_CONN_F_HAS_REQUEST)
hpack_dynamic_table_free (&h2c->decoder_dynamic_table);
if (CLIB_DEBUG)
memset (h2c, 0xba, sizeof (*h2c));
- pool_put (h2m->conn_pool[hc->c_thread_index], h2c);
+ pool_put (wrk->conn_pool, h2c);
+ cnt = clib_atomic_fetch_sub_relax (&h2m->n_sessions, 1);
+ ASSERT (cnt > 0);
+ /* stop stream tx scheduler if this was last active connection so we are not
+ * running empty */
+ if (cnt == 1)
+ session_register_update_time_fn (http2_update_time_callback, 0);
}
static inline http2_req_t *
http2_conn_alloc_req (http_conn_t *hc, u32 stream_id)
{
- http2_main_t *h2m = &http2_main;
+ http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
http2_conn_ctx_t *h2c;
http2_req_t *req;
u32 req_index;
http_req_handle_t hr_handle;
- pool_get_aligned_safe (h2m->req_pool[hc->c_thread_index], req,
- CLIB_CACHE_LINE_BYTES);
+ pool_get_aligned_safe (wrk->req_pool, req, CLIB_CACHE_LINE_BYTES);
clib_memset (req, 0, sizeof (*req));
req->base.hr_pa_session_handle = SESSION_INVALID_HANDLE;
- req_index = req - h2m->req_pool[hc->c_thread_index];
+ req_index = req - wrk->req_pool;
hr_handle.version = HTTP_VERSION_2;
hr_handle.req_index = req_index;
req->base.hr_req_handle = hr_handle.as_u32;
req->base.c_thread_index = hc->c_thread_index;
req->stream_id = stream_id;
req->stream_state = HTTP2_STREAM_STATE_IDLE;
- req->resume_list.next = CLIB_LLIST_INVALID_INDEX;
- req->resume_list.prev = CLIB_LLIST_INVALID_INDEX;
+ req->sched_list.next = CLIB_LLIST_INVALID_INDEX;
+ req->sched_list.prev = CLIB_LLIST_INVALID_INDEX;
h2c = http2_conn_ctx_get_w_thread (hc);
HTTP_DBG (1, "h2c [%u]%x req_index %x stream_id %u", hc->c_thread_index,
- h2c - h2m->conn_pool[hc->c_thread_index], req_index, stream_id);
+ h2c - wrk->conn_pool, req_index, stream_id);
req->peer_window = h2c->peer_settings.initial_window_size;
req->our_window = h2c->settings.initial_window_size;
hash_set (h2c->req_by_stream_id, stream_id, req_index);
http2_conn_free_req (http2_conn_ctx_t *h2c, http2_req_t *req,
clib_thread_index_t thread_index)
{
- http2_main_t *h2m = &http2_main;
+ http2_worker_ctx_t *wrk = http2_get_worker (thread_index);
HTTP_DBG (1, "h2c [%u]%x req_index %x stream_id %u", thread_index,
- h2c - h2m->conn_pool[thread_index],
+ h2c - wrk->conn_pool,
((http_req_handle_t) req->base.hr_req_handle).req_index,
req->stream_id);
- if (clib_llist_elt_is_linked (req, resume_list))
- clib_llist_remove (h2m->req_pool[thread_index], resume_list, req);
+ if (clib_llist_elt_is_linked (req, sched_list))
+ clib_llist_remove (wrk->req_pool, sched_list, req);
vec_free (req->base.headers);
vec_free (req->base.target);
http_buffer_free (&req->base.tx_buf);
hash_unset (h2c->req_by_stream_id, req->stream_id);
if (CLIB_DEBUG)
memset (req, 0xba, sizeof (*req));
- pool_put (h2m->req_pool[thread_index], req);
+ pool_put (wrk->req_pool, req);
}
http2_req_t *
http2_conn_get_req (http_conn_t *hc, u32 stream_id)
{
- http2_main_t *h2m = &http2_main;
+ http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
http2_conn_ctx_t *h2c;
uword *p;
p = hash_get (h2c->req_by_stream_id, stream_id);
if (p)
{
- return pool_elt_at_index (h2m->req_pool[hc->c_thread_index], p[0]);
+ return pool_elt_at_index (wrk->req_pool, p[0]);
}
else
{
always_inline http2_req_t *
http2_req_get (u32 req_index, clib_thread_index_t thread_index)
{
- http2_main_t *h2m = &http2_main;
+ http2_worker_ctx_t *wrk = http2_get_worker (thread_index);
- return pool_elt_at_index (h2m->req_pool[thread_index], req_index);
+ return pool_elt_at_index (wrk->req_pool, req_index);
}
-always_inline int
-http2_req_update_peer_window (http2_req_t *req, i64 delta)
+always_inline void
+http2_conn_schedule (http2_conn_ctx_t *h2c, clib_thread_index_t thread_index)
{
- i64 new_value;
+ http2_worker_ctx_t *wrk = http2_get_worker (thread_index);
+ http2_conn_ctx_t *he;
- new_value = (i64) req->peer_window + delta;
- if (new_value > HTTP2_WIN_SIZE_MAX)
- return -1;
- req->peer_window = (i32) new_value;
- HTTP_DBG (1, "new window size %d", req->peer_window);
- return 0;
+ if (!clib_llist_elt_is_linked (h2c, sched_list) &&
+ !(h2c->flags & HTTP2_CONN_F_TS_DESCHED))
+ {
+ he = clib_llist_elt (wrk->conn_pool, wrk->sched_head);
+ clib_llist_add_tail (wrk->conn_pool, sched_list, h2c, he);
+ }
}
always_inline void
-http2_req_add_to_resume_list (http2_conn_ctx_t *h2c, http2_req_t *req)
+http2_req_schedule_data_tx (http_conn_t *hc, http2_req_t *req)
{
- http2_main_t *h2m = &http2_main;
+ http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
+ http2_conn_ctx_t *h2c;
http2_req_t *he;
- req->flags &= ~HTTP2_REQ_F_NEED_WINDOW_UPDATE;
- he = clib_llist_elt (h2m->req_pool[req->base.c_thread_index],
- h2c->streams_to_resume);
- clib_llist_add_tail (h2m->req_pool[req->base.c_thread_index], resume_list,
- req, he);
+ h2c = http2_conn_ctx_get_w_thread (hc);
+ he = clib_llist_elt (wrk->req_pool, h2c->old_tx_streams);
+ clib_llist_add_tail (wrk->req_pool, sched_list, req, he);
}
-always_inline void
-http2_resume_list_process (http_conn_t *hc)
+always_inline int
+http2_req_update_peer_window (http_conn_t *hc, http2_req_t *req, i64 delta)
{
- http2_main_t *h2m = &http2_main;
- http2_req_t *he, *req;
+ http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
http2_conn_ctx_t *h2c;
+ i64 new_value;
- h2c = http2_conn_ctx_get_w_thread (hc);
- he =
- clib_llist_elt (h2m->req_pool[hc->c_thread_index], h2c->streams_to_resume);
-
- /* check if something in list and reschedule first app session from list if
- * we have some space in connection window */
- if (h2c->peer_window > 0 &&
- !clib_llist_is_empty (h2m->req_pool[hc->c_thread_index], resume_list,
- he))
- {
- req =
- clib_llist_next (h2m->req_pool[hc->c_thread_index], resume_list, he);
- clib_llist_remove (h2m->req_pool[hc->c_thread_index], resume_list, req);
- transport_connection_reschedule (&req->base.connection);
+ new_value = (i64) req->peer_window + delta;
+ if (new_value > HTTP2_WIN_SIZE_MAX)
+ return -1;
+ req->peer_window = (i32) new_value;
+ HTTP_DBG (1, "new window size %d", req->peer_window);
+ /* settings change can make stream window negative */
+ if (req->peer_window <= 0)
+ {
+ HTTP_DBG (1, "descheduling need stream window update");
+ req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE;
+ if (clib_llist_elt_is_linked (req, sched_list))
+ clib_llist_remove (wrk->req_pool, sched_list, req);
+ return 0;
}
+ if (req->flags & HTTP2_REQ_F_NEED_WINDOW_UPDATE)
+ {
+ req->flags &= ~HTTP2_REQ_F_NEED_WINDOW_UPDATE;
+ http2_req_schedule_data_tx (hc, req);
+ h2c = http2_conn_ctx_get_w_thread (hc);
+ if (h2c->peer_window > 0)
+ http2_conn_schedule (h2c, hc->c_thread_index);
+ }
+ return 0;
}
/* send GOAWAY frame and close TCP connection */
http2_connection_error (http_conn_t *hc, http2_error_t error,
transport_send_params_t *sp)
{
+ http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
u8 *response;
u32 req_index, stream_id;
http2_conn_ctx_t *h2c;
if (req->stream_state != HTTP2_STREAM_STATE_CLOSED)
session_transport_reset_notify (&req->base.connection);
}));
+ if (clib_llist_elt_is_linked (h2c, sched_list))
+ clib_llist_remove (wrk->conn_pool, sched_list, h2c);
http_shutdown_transport (hc);
}
http2_stream_error (http_conn_t *hc, http2_req_t *req, http2_error_t error,
transport_send_params_t *sp)
{
+ http2_conn_ctx_t *h2c;
+
ASSERT (req->stream_state > HTTP2_STREAM_STATE_IDLE);
http2_send_stream_error (hc, req->stream_id, error, sp);
session_transport_closed_notify (&req->base.connection);
else
session_transport_closing_notify (&req->base.connection);
+
+ h2c = http2_conn_ctx_get_w_thread (hc);
+ session_transport_delete_notify (&req->base.connection);
+ http2_conn_free_req (h2c, req, hc->c_thread_index);
}
always_inline void
-http2_stream_close (http2_req_t *req)
+http2_stream_close (http2_req_t *req, http_conn_t *hc)
{
+ http2_conn_ctx_t *h2c;
+
req->stream_state = HTTP2_STREAM_STATE_CLOSED;
if (req->flags & HTTP2_REQ_F_APP_CLOSED)
{
((http_req_handle_t) req->base.hr_req_handle).req_index);
session_transport_closing_notify (&req->base.connection);
}
+
+ h2c = http2_conn_ctx_get_w_thread (hc);
+ session_transport_delete_notify (&req->base.connection);
+ http2_conn_free_req (h2c, req, hc->c_thread_index);
}
always_inline void
http_io_ts_after_write (hc, 1);
}
+/***********************/
+/* stream TX scheduler */
+/***********************/
+
+static void
+http2_sched_dispatch_headers (http2_req_t *req, http_conn_t *hc,
+ u8 *n_emissions)
+{
+ http_msg_t msg;
+ u8 *response, *date, *app_headers = 0;
+ u8 fh[HTTP2_FRAME_HEADER_SIZE];
+ hpack_response_control_data_t control_data;
+ u8 flags = HTTP2_FRAME_FLAG_END_HEADERS;
+ u32 n_written, stream_id, n_deq;
+ http2_conn_ctx_t *h2c;
+
+ http_get_app_msg (&req->base, &msg);
+ ASSERT (msg.type == HTTP_MSG_REPLY);
+ n_deq = sizeof (msg);
+ *n_emissions += msg.data.type == HTTP_MSG_DATA_PTR ?
+ HTTP2_SCHED_WEIGHT_HEADERS_PTR :
+ HTTP2_SCHED_WEIGHT_HEADERS_INLINE;
+
+ response = http_get_tx_buf (hc);
+ date = format (0, "%U", format_http_time_now, hc);
+
+ control_data.sc = msg.code;
+ control_data.content_len = msg.data.body_len;
+ control_data.server_name = hc->app_name;
+ control_data.server_name_len = vec_len (hc->app_name);
+ control_data.date = date;
+ control_data.date_len = vec_len (date);
+
+ if (msg.data.headers_len)
+ {
+ n_deq += msg.data.type == HTTP_MSG_DATA_PTR ? sizeof (uword) :
+ msg.data.headers_len;
+ app_headers = http_get_app_header_list (&req->base, &msg);
+ }
+
+ hpack_serialize_response (app_headers, msg.data.headers_len, &control_data,
+ &response);
+ vec_free (date);
+
+ h2c = http2_conn_ctx_get_w_thread (hc);
+ if (vec_len (response) > h2c->peer_settings.max_frame_size)
+ {
+ /* TODO: CONTINUATION (headers fragmentation) */
+ clib_warning ("resp headers greater than SETTINGS_MAX_FRAME_SIZE");
+ http2_stream_error (hc, req, HTTP2_ERROR_INTERNAL_ERROR, 0);
+ return;
+ }
+
+ stream_id = req->stream_id;
+ if (msg.data.body_len)
+ {
+ /* start sending the actual data */
+ http_req_tx_buffer_init (&req->base, &msg);
+ HTTP_DBG (1, "adding to data queue req_index %x",
+ ((http_req_handle_t) req->base.hr_req_handle).req_index);
+ http2_req_schedule_data_tx (hc, req);
+ http_io_as_dequeue_notify (&req->base, n_deq);
+ }
+ else
+ {
+ /* no response body, we are done */
+ flags |= HTTP2_FRAME_FLAG_END_STREAM;
+ http2_stream_close (req, hc);
+ }
+
+ http2_frame_write_headers_header (vec_len (response), stream_id, flags, fh);
+ svm_fifo_seg_t segs[2] = { { fh, HTTP2_FRAME_HEADER_SIZE },
+ { response, vec_len (response) } };
+ n_written = http_io_ts_write_segs (hc, segs, 2, 0);
+ ASSERT (n_written == (HTTP2_FRAME_HEADER_SIZE + vec_len (response)));
+ http_io_ts_after_write (hc, 0);
+}
+
+static void
+http2_sched_dispatch_data (http2_req_t *req, http_conn_t *hc, u8 *n_emissions)
+{
+ u32 max_write, max_read, n_segs, n_read, n_written = 0;
+ svm_fifo_seg_t *app_segs, *segs = 0;
+ http_buffer_t *hb = &req->base.tx_buf;
+ u8 fh[HTTP2_FRAME_HEADER_SIZE];
+ u8 finished = 0, flags = 0;
+ http2_conn_ctx_t *h2c;
+
+ ASSERT (http_buffer_bytes_left (hb) > 0);
+
+ *n_emissions += hb->type == HTTP_BUFFER_PTR ? HTTP2_SCHED_WEIGHT_DATA_PTR :
+ HTTP2_SCHED_WEIGHT_DATA_INLINE;
+
+ h2c = http2_conn_ctx_get_w_thread (hc);
+
+ max_write = http_io_ts_max_write (hc, 0);
+ max_write -= HTTP2_FRAME_HEADER_SIZE;
+ max_write = clib_min (max_write, (u32) req->peer_window);
+ max_write = clib_min (max_write, h2c->peer_window);
+ max_write = clib_min (max_write, h2c->peer_settings.max_frame_size);
+
+ max_read = http_buffer_bytes_left (hb);
+
+ n_read = http_buffer_get_segs (hb, max_write, &app_segs, &n_segs);
+ if (n_read == 0)
+ {
+ HTTP_DBG (1, "no data to deq");
+ transport_connection_reschedule (&req->base.connection);
+ return;
+ }
+
+ finished = (max_read - n_read) == 0;
+ flags = finished ? HTTP2_FRAME_FLAG_END_STREAM : 0;
+ http2_frame_write_data_header (n_read, req->stream_id, flags, fh);
+ vec_validate (segs, 0);
+ segs[0].len = HTTP2_FRAME_HEADER_SIZE;
+ segs[0].data = fh;
+ vec_append (segs, app_segs);
+
+ n_written = http_io_ts_write_segs (hc, segs, n_segs + 1, 0);
+ ASSERT (n_written == (HTTP2_FRAME_HEADER_SIZE + n_read));
+ vec_free (segs);
+ http_buffer_drain (hb, n_read);
+ req->peer_window -= n_read;
+ h2c->peer_window -= n_read;
+
+ if (finished)
+ {
+ /* all done, close stream */
+ http_buffer_free (hb);
+ if (hc->flags & HTTP_CONN_F_IS_SERVER)
+ http2_stream_close (req, hc);
+ else
+ req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
+ }
+ else
+ {
+ if (req->peer_window == 0)
+ {
+ /* mark that we need window update on stream */
+ HTTP_DBG (1, "stream window is full");
+ req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE;
+ }
+ else
+ {
+ /* schedule for next round */
+ HTTP_DBG (1, "adding to data queue req_index %x",
+ ((http_req_handle_t) req->base.hr_req_handle).req_index);
+ http2_req_schedule_data_tx (hc, req);
+ http_io_as_dequeue_notify (&req->base, n_read);
+ }
+ }
+
+ http_io_ts_after_write (hc, finished);
+}
+
+static void
+http2_update_time_callback (f64 now, u8 thread_index)
+{
+ http2_worker_ctx_t *wrk = http2_get_worker (thread_index);
+ http2_conn_ctx_t *h2c;
+ http_conn_t *hc;
+ http2_req_t *req, *new_he, *old_he;
+ clib_llist_index_t ri, old_ti, next_ri, ci;
+ u8 n_emissions = 0;
+
+ /*
+ * Run stream tx scheduler, we want to run for short time each heart-beat, so
+ * only one stream is processed with cap on frames emission. Since not all
+ * frames are equal, from CPU cycles or memory copy perspective, different
+ * weights are assigned when incrementing emissions counter. In most of cases
+ * connection is schedule only if it will be able to send data, same applies
+ * to streams within connection.
+ */
+ ci = clib_llist_next_index (clib_llist_elt (wrk->conn_pool, wrk->sched_head),
+ sched_list);
+ if (ci != wrk->sched_head)
+ {
+ h2c = clib_llist_elt (wrk->conn_pool, ci);
+ ASSERT (!(h2c->flags & HTTP2_CONN_F_TS_DESCHED));
+ clib_llist_remove (wrk->conn_pool, sched_list, h2c);
+ hc = http_conn_get_w_thread (h2c->hc_index, thread_index);
+ ASSERT (hc->flags & HTTP_CONN_F_HAS_REQUEST);
+
+ /* first handle new responses (headers frame) */
+ new_he = clib_llist_elt (wrk->req_pool, h2c->new_tx_streams);
+ ri = clib_llist_next_index (new_he, sched_list);
+ /* save tail of old list so we will do only one round of already queued
+ * streams */
+ old_ti = clib_llist_prev_index (
+ clib_llist_elt (wrk->req_pool, h2c->old_tx_streams), sched_list);
+ while (ri != h2c->new_tx_streams &&
+ !http_io_ts_check_write_thresh (hc) &&
+ n_emissions < HTTP2_SCHED_MAX_EMISSIONS)
+ {
+ req = clib_llist_elt (wrk->req_pool, ri);
+ ri = clib_llist_next_index (req, sched_list);
+ HTTP_DBG (1, "sending headers req_index %x",
+ ((http_req_handle_t) req->base.hr_req_handle).req_index);
+ clib_llist_remove (wrk->req_pool, sched_list, req);
+ http2_sched_dispatch_headers (req, hc, &n_emissions);
+ }
+
+ /* handle old responses (data frames), if we had any prior to processing
+ * new ones, each stream tx one frame for now */
+ /* TODO RFC9218 Prioritization (urgency will be weight) */
+ old_he = clib_llist_elt (wrk->req_pool, h2c->old_tx_streams);
+ if (old_ti != h2c->old_tx_streams)
+ {
+ ri = clib_llist_next_index (old_he, sched_list);
+ while (!http_io_ts_check_write_thresh (hc) && h2c->peer_window > 0 &&
+ n_emissions < HTTP2_SCHED_MAX_EMISSIONS)
+ {
+ req = clib_llist_elt (wrk->req_pool, ri);
+ next_ri = clib_llist_next_index (req, sched_list);
+ HTTP_DBG (
+ 1, "sending data req_index %x",
+ ((http_req_handle_t) req->base.hr_req_handle).req_index);
+ clib_llist_remove (wrk->req_pool, sched_list, req);
+ http2_sched_dispatch_data (req, hc, &n_emissions);
+ if (ri == old_ti)
+ break;
+
+ ri = next_ri;
+ }
+ }
+ /* deschedule http connection and wait for deq notification if underlying
+ * transport session tx fifo is almost full */
+ if (http_io_ts_check_write_thresh (hc))
+ {
+ h2c->flags |= HTTP2_CONN_F_TS_DESCHED;
+ http_io_ts_add_want_deq_ntf (hc);
+ if (clib_llist_elt_is_linked (h2c, sched_list))
+ clib_llist_remove (wrk->conn_pool, sched_list, h2c);
+ return;
+ }
+ /* reschedule connection if something is waiting in queue */
+ if (!clib_llist_is_empty (wrk->req_pool, sched_list, new_he) ||
+ !clib_llist_is_empty (wrk->req_pool, sched_list, old_he))
+ http2_conn_schedule (h2c, hc->c_thread_index);
+ }
+}
+
/*************************************/
/* request state machine handlers RX */
/*************************************/
transport_send_params_t *sp,
http2_error_t *error)
{
- http_msg_t msg;
- u8 *response, *date, *app_headers = 0;
- u8 fh[HTTP2_FRAME_HEADER_SIZE];
- hpack_response_control_data_t control_data;
- u8 flags = HTTP2_FRAME_FLAG_END_HEADERS;
- http_sm_result_t sm_result = HTTP_SM_ERROR;
- u32 n_written;
+ http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
+ http2_req_t *he;
http2_conn_ctx_t *h2c;
- http_get_app_msg (&req->base, &msg);
- ASSERT (msg.type == HTTP_MSG_REPLY);
-
- response = http_get_tx_buf (hc);
- date = format (0, "%U", format_http_time_now, hc);
-
- control_data.sc = msg.code;
- control_data.content_len = msg.data.body_len;
- control_data.server_name = hc->app_name;
- control_data.server_name_len = vec_len (hc->app_name);
- control_data.date = date;
- control_data.date_len = vec_len (date);
-
- if (msg.data.headers_len)
- app_headers = http_get_app_header_list (&req->base, &msg);
-
- hpack_serialize_response (app_headers, msg.data.headers_len, &control_data,
- &response);
- vec_free (date);
+ ASSERT (!clib_llist_elt_is_linked (req, sched_list));
+ /* add response to stream scheduler */
+ HTTP_DBG (1, "adding to headers queue req_index %x",
+ ((http_req_handle_t) req->base.hr_req_handle).req_index);
h2c = http2_conn_ctx_get_w_thread (hc);
- if (vec_len (response) > h2c->peer_settings.max_frame_size)
- {
- /* TODO: CONTINUATION (headers fragmentation) */
- clib_warning ("resp headers greater than SETTINGS_MAX_FRAME_SIZE");
- *error = HTTP2_ERROR_INTERNAL_ERROR;
- return HTTP_SM_ERROR;
- }
-
- if (msg.data.body_len)
- {
- /* start sending the actual data */
- http_req_tx_buffer_init (&req->base, &msg);
- http_req_state_change (&req->base, HTTP_REQ_STATE_APP_IO_MORE_DATA);
- sm_result = HTTP_SM_CONTINUE;
- }
- else
- {
- /* no response body, we are done */
- flags |= HTTP2_FRAME_FLAG_END_STREAM;
- sm_result = HTTP_SM_STOP;
- http2_stream_close (req);
- }
+ he = clib_llist_elt (wrk->req_pool, h2c->new_tx_streams);
+ clib_llist_add_tail (wrk->req_pool, sched_list, req, he);
+ http2_conn_schedule (h2c, hc->c_thread_index);
- http2_frame_write_headers_header (vec_len (response), req->stream_id, flags,
- fh);
- svm_fifo_seg_t segs[2] = { { fh, HTTP2_FRAME_HEADER_SIZE },
- { response, vec_len (response) } };
- n_written = http_io_ts_write_segs (hc, segs, 2, sp);
- ASSERT (n_written == (HTTP2_FRAME_HEADER_SIZE + vec_len (response)));
- http_io_ts_after_write (hc, 0);
+ http_req_state_change (&req->base, HTTP_REQ_STATE_APP_IO_MORE_DATA);
+ http_req_deschedule (&req->base, sp);
- return sm_result;
+ return HTTP_SM_STOP;
}
static http_sm_result_t
transport_send_params_t *sp,
http2_error_t *error)
{
- u32 max_write, max_read, n_segs, n_read, n_written = 0;
- svm_fifo_seg_t *app_segs, *segs = 0;
- http_buffer_t *hb = &req->base.tx_buf;
- u8 fh[HTTP2_FRAME_HEADER_SIZE];
- u8 finished = 0, flags = 0;
http2_conn_ctx_t *h2c;
- ASSERT (http_buffer_bytes_left (hb) > 0);
+ ASSERT (!clib_llist_elt_is_linked (req, sched_list));
- if (req->peer_window <= 0)
- {
- HTTP_DBG (1, "stream window is full");
- /* mark that we need window update on stream */
- req->flags |= HTTP2_REQ_F_NEED_WINDOW_UPDATE;
- http_req_deschedule (&req->base, sp);
- return HTTP_SM_STOP;
- }
+ /* add data back to stream scheduler */
+ HTTP_DBG (1, "adding to data queue req_index %x",
+ ((http_req_handle_t) req->base.hr_req_handle).req_index);
+ http2_req_schedule_data_tx (hc, req);
h2c = http2_conn_ctx_get_w_thread (hc);
- if (h2c->peer_window == 0)
- {
- HTTP_DBG (1, "connection window is full");
- /* add to waiting queue */
- http2_req_add_to_resume_list (h2c, req);
- http_req_deschedule (&req->base, sp);
- return HTTP_SM_STOP;
- }
-
- max_write = http_io_ts_max_write (hc, sp);
- if (max_write <= HTTP2_FRAME_HEADER_SIZE)
- {
- HTTP_DBG (1, "ts tx fifo full");
- goto check_fifo;
- }
- max_write -= HTTP2_FRAME_HEADER_SIZE;
- max_write = clib_min (max_write, (u32) req->peer_window);
- max_write = clib_min (max_write, h2c->peer_window);
- max_write = clib_min (max_write, h2c->peer_settings.max_frame_size);
-
- max_read = http_buffer_bytes_left (hb);
-
- n_read = http_buffer_get_segs (hb, max_write, &app_segs, &n_segs);
- if (n_read == 0)
- {
- HTTP_DBG (1, "no data to deq");
- goto check_fifo;
- }
-
- finished = (max_read - n_read) == 0;
- flags = finished ? HTTP2_FRAME_FLAG_END_STREAM : 0;
- http2_frame_write_data_header (n_read, req->stream_id, flags, fh);
- vec_validate (segs, 0);
- segs[0].len = HTTP2_FRAME_HEADER_SIZE;
- segs[0].data = fh;
- vec_append (segs, app_segs);
-
- n_written = http_io_ts_write_segs (hc, segs, n_segs + 1, sp);
- ASSERT (n_written == (HTTP2_FRAME_HEADER_SIZE + n_read));
- vec_free (segs);
- http_buffer_drain (hb, n_read);
- req->peer_window -= n_read;
- h2c->peer_window -= n_read;
+ if (h2c->peer_window > 0)
+ http2_conn_schedule (h2c, hc->c_thread_index);
- if (finished)
- {
- http_buffer_free (hb);
- if (hc->flags & HTTP_CONN_F_IS_SERVER)
- http2_stream_close (req);
- else
- req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
- }
- http_io_ts_after_write (hc, finished);
+ http_req_deschedule (&req->base, sp);
-check_fifo:
- if (http_io_ts_check_write_thresh (hc))
- {
- http_io_ts_add_want_deq_ntf (hc);
- http_req_deschedule (&req->base, sp);
- }
return HTTP_SM_STOP;
}
{
http_sm_result_t res;
http2_error_t error;
- http2_conn_ctx_t *h2c;
do
{
}
while (res == HTTP_SM_CONTINUE);
- if (req->stream_state == HTTP2_STREAM_STATE_CLOSED)
- {
- h2c = http2_conn_ctx_get_w_thread (hc);
- session_transport_delete_notify (&req->base.connection);
- http2_conn_free_req (h2c, req, hc->c_thread_index);
- }
-
return HTTP2_ERROR_NO_ERROR;
}
static http2_error_t
http2_handle_window_update_frame (http_conn_t *hc, http2_frame_header_t *fh)
{
+ http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
u8 *rx_buf;
u32 win_increment;
http2_error_t rv;
if (win_increment > (HTTP2_WIN_SIZE_MAX - h2c->peer_window))
return HTTP2_ERROR_FLOW_CONTROL_ERROR;
h2c->peer_window += win_increment;
+ /* reschedule connection if we have pending data */
+ if (!clib_llist_is_empty (
+ wrk->req_pool, sched_list,
+ clib_llist_elt (wrk->req_pool, h2c->old_tx_streams)))
+ http2_conn_schedule (h2c, hc->c_thread_index);
}
else
{
}
if (req->stream_state != HTTP2_STREAM_STATE_CLOSED)
{
- if (http2_req_update_peer_window (req, win_increment))
+ if (http2_req_update_peer_window (hc, req, win_increment))
{
http2_stream_error (hc, req, HTTP2_ERROR_FLOW_CONTROL_ERROR, 0);
return HTTP2_ERROR_NO_ERROR;
}
- if (req->flags & HTTP2_REQ_F_NEED_WINDOW_UPDATE)
- http2_req_add_to_resume_list (h2c, req);
}
}
req = http2_req_get (req_index, hc->c_thread_index);
if (req->stream_state != HTTP2_STREAM_STATE_CLOSED)
{
- if (http2_req_update_peer_window (req, win_size_delta))
+ if (http2_req_update_peer_window (hc, req, win_size_delta))
http2_stream_error (hc, req,
HTTP2_ERROR_FLOW_CONTROL_ERROR, 0);
- if (req->flags & HTTP2_REQ_F_NEED_WINDOW_UPDATE)
- http2_req_add_to_resume_list (h2c, req);
}
}));
}
static http2_error_t
http2_handle_rst_stream_frame (http_conn_t *hc, http2_frame_header_t *fh)
{
+ http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
u8 *rx_buf;
http2_error_t rv;
http2_req_t *req;
req->stream_state = HTTP2_STREAM_STATE_CLOSED;
session_transport_reset_notify (&req->base.connection);
+ if (clib_llist_elt_is_linked (req, sched_list))
+ clib_llist_remove (wrk->req_pool, sched_list, req);
return HTTP2_ERROR_NO_ERROR;
}
static http2_error_t
http2_handle_goaway_frame (http_conn_t *hc, http2_frame_header_t *fh)
{
+ http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
u8 *rx_buf;
http2_error_t rv;
u32 error_code, last_stream_id, req_index, stream_id;
req = http2_req_get (req_index, hc->c_thread_index);
session_transport_reset_notify (&req->base.connection);
}));
+ if (clib_llist_elt_is_linked (h2c, sched_list))
+ clib_llist_remove (wrk->conn_pool, sched_list, h2c);
http_shutdown_transport (hc);
}
http2_req_t *req;
http2_error_t rv;
- HTTP_DBG (1, "hc [%u]%x req_index %u", hc->c_thread_index, hc->hc_hc_index,
+ HTTP_DBG (1, "hc [%u]%x req_index %x", hc->c_thread_index, hc->hc_hc_index,
req_index);
req = http2_req_get (req_index, hc->c_thread_index);
return;
}
- /* maybe we can continue sending data on some stream */
- http2_resume_list_process (hc);
-
/* reset http connection expiration timer */
http_conn_timer_update (hc);
}
{
http2_req_t *req;
- HTTP_DBG (1, "hc [%u]%x req_index %u", hc->c_thread_index, hc->hc_hc_index,
+ HTTP_DBG (1, "hc [%u]%x req_index %x", hc->c_thread_index, hc->hc_hc_index,
req_index);
req = http2_req_get (req_index, thread_index);
if (!req)
{
http2_req_t *req;
- HTTP_DBG (1, "hc [%u]%x req_index %u", hc->c_thread_index, hc->hc_hc_index,
+ HTTP_DBG (1, "hc [%u]%x req_index %x", hc->c_thread_index, hc->hc_hc_index,
req_index);
req = http2_req_get (req_index, thread_index);
req->flags |= HTTP2_REQ_F_APP_CLOSED;
http_io_ts_after_write (hc, 0);
h2c->our_window = HTTP2_CONNECTION_WINDOW_SIZE;
}
- /* maybe we can continue sending data on some stream */
- http2_resume_list_process (hc);
/* reset http connection expiration timer */
http_conn_timer_update (hc);
static void
http2_transport_close_callback (http_conn_t *hc)
{
+ http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
u32 req_index, stream_id, n_open_streams = 0;
http2_req_t *req;
http2_conn_ctx_t *h2c;
req = http2_req_get (req_index, hc->c_thread_index);
if (req->stream_state != HTTP2_STREAM_STATE_CLOSED)
{
- HTTP_DBG (1, "req_index %u", req_index);
+ HTTP_DBG (1, "req_index %x", req_index);
session_transport_closing_notify (&req->base.connection);
n_open_streams++;
}
if (n_open_streams == 0)
{
HTTP_DBG (1, "no open stream disconnecting");
+ if (clib_llist_elt_is_linked (h2c, sched_list))
+ clib_llist_remove (wrk->conn_pool, sched_list, h2c);
http_disconnect_transport (hc);
}
}
static void
http2_transport_reset_callback (http_conn_t *hc)
{
+ http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
u32 req_index, stream_id;
http2_req_t *req;
http2_conn_ctx_t *h2c;
req = http2_req_get (req_index, hc->c_thread_index);
if (req->stream_state != HTTP2_STREAM_STATE_CLOSED)
{
- HTTP_DBG (1, "req_index %u", req_index);
+ HTTP_DBG (1, "req_index %x", req_index);
session_transport_reset_notify (&req->base.connection);
}
}));
+ if (clib_llist_elt_is_linked (h2c, sched_list))
+ clib_llist_remove (wrk->conn_pool, sched_list, h2c);
}
static void
http2_transport_conn_reschedule_callback (http_conn_t *hc)
{
- u32 req_index, stream_id;
- http2_req_t *req;
+ http2_worker_ctx_t *wrk = http2_get_worker (hc->c_thread_index);
http2_conn_ctx_t *h2c;
HTTP_DBG (1, "hc [%u]%x", hc->c_thread_index, hc->hc_hc_index);
ASSERT (hc->flags & HTTP_CONN_F_HAS_REQUEST);
- if (!(hc->flags & HTTP_CONN_F_HAS_REQUEST))
- return;
-
h2c = http2_conn_ctx_get_w_thread (hc);
- hash_foreach (
- stream_id, req_index, h2c->req_by_stream_id, ({
- req = http2_req_get (req_index, hc->c_thread_index);
- if (req->stream_state != HTTP2_STREAM_STATE_CLOSED &&
- transport_connection_is_descheduled (&req->base.connection))
- {
- HTTP_DBG (1, "req_index %u", req_index);
- transport_connection_reschedule (&req->base.connection);
- }
- }));
+ h2c->flags &= ~HTTP2_CONN_F_TS_DESCHED;
+ /* reschedule connection if something is waiting in queue */
+ if (!clib_llist_is_empty (
+ wrk->req_pool, sched_list,
+ clib_llist_elt (wrk->req_pool, h2c->new_tx_streams)) ||
+ !clib_llist_is_empty (
+ wrk->req_pool, sched_list,
+ clib_llist_elt (wrk->req_pool, h2c->old_tx_streams)))
+ http2_conn_schedule (h2c, hc->c_thread_index);
}
static void
http2_main_t *h2m = &http2_main;
vlib_thread_main_t *vtm = vlib_get_thread_main ();
u32 num_threads;
+ http2_worker_ctx_t *wrk;
+ int i;
num_threads = 1 /* main thread */ + vtm->n_threads;
- vec_validate (h2m->conn_pool, num_threads - 1);
- vec_validate (h2m->req_pool, num_threads - 1);
+ vec_validate (h2m->wrk_ctx, num_threads - 1);
+ for (i = 0; i < num_threads; i++)
+ {
+ wrk = &h2m->wrk_ctx[i];
+ wrk->sched_head = clib_llist_make_head (wrk->conn_pool, sched_list);
+ }
}
static int