http: http/2 stream state machine 92/42592/8
authorMatus Fabian <[email protected]>
Mon, 31 Mar 2025 17:29:48 +0000 (13:29 -0400)
committerFlorin Coras <[email protected]>
Mon, 7 Apr 2025 19:39:37 +0000 (19:39 +0000)
server side stream state machine

Type: feature

Change-Id: I03ce57f6c1146f7e1dfd7d54db7d3e3ec7615991
Signed-off-by: Matus Fabian <[email protected]>
src/plugins/http/http1.c
src/plugins/http/http2/frame.c
src/plugins/http/http2/hpack.c
src/plugins/http/http2/hpack.h
src/plugins/http/http2/http2.c
src/plugins/http/http_private.h

index e71a458..d99e4bd 100644 (file)
@@ -157,26 +157,6 @@ http1_read_message (http_conn_t *hc, u8 *rx_buf)
   return 0;
 }
 
-static void
-http1_identify_optional_query (http_req_t *req, u8 *rx_buf)
-{
-  int i;
-  for (i = req->target_path_offset;
-       i < (req->target_path_offset + req->target_path_len); i++)
-    {
-      if (rx_buf[i] == '?')
-       {
-         req->target_query_offset = i + 1;
-         req->target_query_len = req->target_path_offset +
-                                 req->target_path_len -
-                                 req->target_query_offset;
-         req->target_path_len =
-           req->target_path_len - req->target_query_len - 1;
-         break;
-       }
-    }
-}
-
 static int
 http1_parse_target (http_req_t *req, u8 *rx_buf)
 {
@@ -198,7 +178,7 @@ http1_parse_target (http_req_t *req, u8 *rx_buf)
       req->target_path_len--;
       req->target_path_offset++;
       req->target_form = HTTP_TARGET_ORIGIN_FORM;
-      http1_identify_optional_query (req, rx_buf);
+      http_identify_optional_query (req, rx_buf);
       /* can't be CONNECT method */
       return req->method == HTTP_REQ_CONNECT ? -1 : 0;
     }
@@ -240,7 +220,7 @@ http1_parse_target (http_req_t *req, u8 *rx_buf)
              clib_warning ("zero length host");
              return -1;
            }
-         http1_identify_optional_query (req, rx_buf);
+         http_identify_optional_query (req, rx_buf);
          /* can't be CONNECT method */
          return req->method == HTTP_REQ_CONNECT ? -1 : 0;
        }
@@ -687,10 +667,7 @@ static int
 http1_identify_message_body (http_req_t *req, u8 *rx_buf,
                             http_status_code_t *ec)
 {
-  int i;
-  u8 *p;
-  u64 body_len = 0, digit;
-  http_field_line_t *field_line;
+  int rv;
 
   req->body_len = 0;
 
@@ -712,33 +689,14 @@ http1_identify_message_body (http_req_t *req, u8 *rx_buf,
       HTTP_DBG (2, "Content-Length header not present, no message-body");
       return 0;
     }
-  field_line = vec_elt_at_index (req->headers, req->content_len_header_index);
 
-  p = rx_buf + req->headers_offset + field_line->value_offset;
-  for (i = 0; i < field_line->value_len; i++)
+  rv = http_parse_content_length (req, rx_buf);
+  if (rv)
     {
-      /* check for digit */
-      if (!isdigit (*p))
-       {
-         clib_warning ("expected digit");
-         *ec = HTTP_STATUS_BAD_REQUEST;
-         return -1;
-       }
-      digit = *p - '0';
-      u64 new_body_len = body_len * 10 + digit;
-      /* check for overflow */
-      if (new_body_len < body_len)
-       {
-         clib_warning ("too big number, overflow");
-         *ec = HTTP_STATUS_BAD_REQUEST;
-         return -1;
-       }
-      body_len = new_body_len;
-      p++;
+      *ec = HTTP_STATUS_BAD_REQUEST;
+      return rv;
     }
 
-  req->body_len = body_len;
-
   req->body_offset = req->headers_offset + req->headers_len + 2;
   HTTP_DBG (2, "body length: %llu", req->body_len);
   HTTP_DBG (2, "body offset: %u", req->body_offset);
index 577bb6c..a2028b8 100644 (file)
@@ -233,7 +233,7 @@ http2_frame_write_goaway (http2_error_t error_code, u32 last_stream_id,
   u8 *p;
   u32 value;
 
-  ASSERT (last_stream_id > 0 && last_stream_id <= 0x7FFFFFFF);
+  ASSERT (last_stream_id <= 0x7FFFFFFF);
 
   http2_frame_header_t fh = { .type = HTTP2_FRAME_TYPE_GOAWAY,
                              .length = GOAWAY_MIN_SIZE };
index 24fc320..f722f67 100644 (file)
@@ -849,6 +849,7 @@ hpack_parse_request (u8 *src, u32 src_len, u8 *dst, u32 dst_len,
   b_left = dst_len;
   control_data->parsed_bitmap = 0;
   control_data->headers_len = 0;
+  control_data->content_len_header_index = ~0;
 
   while (p != end)
     {
@@ -902,6 +903,15 @@ hpack_parse_request (u8 *src, u32 src_len, u8 *dst, u32 dst_len,
       header->value_len = value_len;
       control_data->headers_len += name_len;
       control_data->headers_len += value_len;
+      /* find headers that will be used later in preprocessing */
+      if (regular_header_parsed)
+       {
+         if (control_data->content_len_header_index == ~0 &&
+             http_token_is ((const char *) name, name_len,
+                            hpack_headers[HTTP_HEADER_CONTENT_LENGTH].base,
+                            hpack_headers[HTTP_HEADER_CONTENT_LENGTH].len))
+           control_data->content_len_header_index = header - *headers;
+       }
     }
 
   HTTP_DBG (2, "%U", format_hpack_dynamic_table, dynamic_table);
index 9f3e62e..4e254be 100644 (file)
@@ -54,6 +54,7 @@ typedef struct
   u8 *path;
   u32 path_len;
   u8 *headers;
+  uword content_len_header_index;
   u32 headers_len;
   u16 parsed_bitmap;
 } hpack_request_control_data_t;
index 035620c..e31b3ee 100644 (file)
@@ -3,23 +3,61 @@
  */
 
 #include <http/http2/hpack.h>
+#include <http/http2/frame.h>
 #include <http/http_private.h>
+#include <http/http_timer.h>
 
 #ifndef HTTP_2_ENABLE
 #define HTTP_2_ENABLE 0
 #endif
 
+#define foreach_http2_stream_state                                            \
+  _ (IDLE, "IDLE")                                                            \
+  _ (OPEN, "OPEN")                                                            \
+  _ (HALF_CLOSED, "HALF-CLOSED")                                              \
+  _ (CLOSED, "CLOSED")
+
+typedef enum http2_stream_state_
+{
+#define _(s, str) HTTP2_STREAM_STATE_##s,
+  foreach_http2_stream_state
+#undef _
+} http2_stream_state_t;
+
+#define foreach_http2_req_flags _ (APP_CLOSED, "app-closed")
+
+typedef enum http2_req_flags_bit_
+{
+#define _(sym, str) HTTP2_REQ_F_BIT_##sym,
+  foreach_http2_req_flags
+#undef _
+} http2_req_flags_bit_t;
+
+typedef enum http2_req_flags_
+{
+#define _(sym, str) HTTP2_REQ_F_##sym = 1 << HTTP2_REQ_F_BIT_##sym,
+  foreach_http2_req_flags
+#undef _
+} __clib_packed http2_req_flags_t;
+
 typedef struct http2_req_
 {
   http_req_t base;
+  http2_stream_state_t stream_state;
+  u8 flags;
   u32 stream_id;
   u64 peer_window;
+  u8 *payload;
+  u32 payload_len;
 } http2_req_t;
 
 typedef struct http2_conn_ctx_
 {
   http2_conn_settings_t peer_settings;
   hpack_dynamic_table_t decoder_dynamic_table;
+  u8 flags;
+  u32 last_opened_stream_id;
+  u32 last_processed_stream_id;
   u64 peer_window;
   uword *req_by_stream_id;
 } http2_conn_ctx_t;
@@ -47,6 +85,8 @@ http2_conn_ctx_alloc_w_thread (http_conn_t *hc)
   h2c->req_by_stream_id = hash_create (0, sizeof (uword));
   hc->opaque =
     uword_to_pointer (h2c - h2m->conn_pool[hc->c_thread_index], void *);
+  HTTP_DBG (1, "h2c [%u]%x", hc->c_thread_index,
+           h2c - h2m->conn_pool[hc->c_thread_index]);
   return h2c;
 }
 
@@ -65,6 +105,8 @@ http2_conn_ctx_free (http_conn_t *hc)
   http2_conn_ctx_t *h2c;
 
   h2c = http2_conn_ctx_get_w_thread (hc);
+  HTTP_DBG (1, "h2c [%u]%x", hc->c_thread_index,
+           h2c - h2m->conn_pool[hc->c_thread_index]);
   hpack_dynamic_table_free (&h2c->decoder_dynamic_table);
   hash_free (h2c->req_by_stream_id);
   if (CLIB_DEBUG)
@@ -72,7 +114,7 @@ http2_conn_ctx_free (http_conn_t *hc)
   pool_put (h2m->conn_pool[hc->c_thread_index], h2c);
 }
 
-http2_req_t *
+static inline http2_req_t *
 http2_conn_alloc_req (http_conn_t *hc, u32 stream_id)
 {
   http2_main_t *h2m = &http2_main;
@@ -92,7 +134,10 @@ http2_conn_alloc_req (http_conn_t *hc, u32 stream_id)
   req->base.hr_hc_index = hc->hc_hc_index;
   req->base.c_thread_index = hc->c_thread_index;
   req->stream_id = stream_id;
+  req->stream_state = HTTP2_STREAM_STATE_IDLE;
   h2c = http2_conn_ctx_get_w_thread (hc);
+  HTTP_DBG (1, "h2c [%u]%x req_index %x stream_id %u", hc->c_thread_index,
+           h2c - h2m->conn_pool[hc->c_thread_index], req_index, stream_id);
   req->peer_window = h2c->peer_settings.initial_window_size;
   hash_set (h2c->req_by_stream_id, stream_id, req_index);
   return req;
@@ -103,6 +148,10 @@ http2_conn_free_req (http2_conn_ctx_t *h2c, http2_req_t *req, u32 thread_index)
 {
   http2_main_t *h2m = &http2_main;
 
+  HTTP_DBG (1, "h2c [%u]%x req_index %x stream_id %u", thread_index,
+           h2c - h2m->conn_pool[thread_index],
+           ((http_req_handle_t) req->base.hr_req_handle).req_index,
+           req->stream_id);
   vec_free (req->base.headers);
   vec_free (req->base.target);
   http_buffer_free (&req->base.tx_buf);
@@ -141,6 +190,660 @@ http2_req_get (u32 req_index, u32 thread_index)
   return pool_elt_at_index (h2m->req_pool[thread_index], req_index);
 }
 
+/* send GOAWAY frame and close TCP connection */
+always_inline void
+http2_connection_error (http_conn_t *hc, http2_error_t error,
+                       transport_send_params_t *sp)
+{
+  u8 *response;
+  u32 req_index, stream_id;
+  http2_conn_ctx_t *h2c;
+  http2_req_t *req;
+
+  h2c = http2_conn_ctx_get_w_thread (hc);
+
+  response = http_get_tx_buf (hc);
+  http2_frame_write_goaway (error, h2c->last_processed_stream_id, &response);
+  http_io_ts_after_write (hc, sp, 0, 1);
+
+  hash_foreach (stream_id, req_index, h2c->req_by_stream_id, ({
+                 req = http2_req_get (req_index, hc->c_thread_index);
+                 if (req->stream_state != HTTP2_STREAM_STATE_CLOSED)
+                   session_transport_closing_notify (&req->base.connection);
+               }));
+  http_disconnect_transport (hc);
+}
+
+/* send RST_STREAM frame and notify app */
+always_inline void
+http2_stream_error (http_conn_t *hc, http2_req_t *req, http2_error_t error,
+                   transport_send_params_t *sp)
+{
+  u8 *response;
+
+  ASSERT (req->stream_state > HTTP2_STREAM_STATE_IDLE);
+
+  response = http_get_tx_buf (hc);
+  http2_frame_write_rst_stream (error, req->stream_id, &response);
+  http_io_ts_write (hc, response, vec_len (response), sp);
+  http_io_ts_after_write (hc, sp, 0, 1);
+
+  req->stream_state = HTTP2_STREAM_STATE_CLOSED;
+  if (req->flags & HTTP2_REQ_F_APP_CLOSED)
+    session_transport_closed_notify (&req->base.connection);
+  else
+    session_transport_closing_notify (&req->base.connection);
+}
+
+always_inline void
+http2_stream_close (http2_req_t *req)
+{
+  req->stream_state = HTTP2_STREAM_STATE_CLOSED;
+  if (req->flags & HTTP2_REQ_F_APP_CLOSED)
+    {
+      HTTP_DBG (1, "req [%u]%x app already closed, confirm",
+               req->base.c_thread_index,
+               ((http_req_handle_t) req->base.hr_req_handle).req_index);
+      session_transport_closed_notify (&req->base.connection);
+    }
+  else
+    {
+      HTTP_DBG (1, "req [%u]%x all done closing, notify app",
+               req->base.c_thread_index,
+               ((http_req_handle_t) req->base.hr_req_handle).req_index);
+      session_transport_closing_notify (&req->base.connection);
+    }
+}
+
+/*************************************/
+/* request state machine handlers RX */
+/*************************************/
+
+static http_sm_result_t
+http2_req_state_wait_transport_method (http_conn_t *hc, http2_req_t *req,
+                                      transport_send_params_t *sp,
+                                      http2_error_t *error)
+{
+  http2_conn_ctx_t *h2c;
+  hpack_request_control_data_t control_data;
+  u8 *buf = 0;
+  http_msg_t msg;
+  int rv;
+  http_req_state_t new_state = HTTP_REQ_STATE_WAIT_APP_REPLY;
+
+  h2c = http2_conn_ctx_get_w_thread (hc);
+
+  vec_validate_init_empty (buf, 1023, 0);
+  *error = hpack_parse_request (req->payload, req->payload_len, buf, 1023,
+                               &control_data, &req->base.headers,
+                               &h2c->decoder_dynamic_table);
+  if (*error != HTTP2_ERROR_NO_ERROR)
+    {
+      HTTP_DBG (1, "hpack_parse_request failed");
+      return HTTP_SM_ERROR;
+    }
+
+  if (!(control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_METHOD_PARSED))
+    {
+      HTTP_DBG (1, ":method pseudo-header missing in request");
+      http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
+      return HTTP_SM_STOP;
+    }
+  if (control_data.method == HTTP_REQ_UNKNOWN ||
+      control_data.method == HTTP_REQ_CONNECT)
+    {
+      HTTP_DBG (1, "unsupported method");
+      http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
+      return HTTP_SM_STOP;
+    }
+  if (!(control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_SCHEME_PARSED) &&
+      control_data.method != HTTP_REQ_CONNECT)
+    {
+      HTTP_DBG (1, ":scheme pseudo-header missing in request");
+      http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
+      return HTTP_SM_STOP;
+    }
+  if (control_data.scheme == HTTP_URL_SCHEME_UNKNOWN)
+    {
+      HTTP_DBG (1, "unsupported scheme");
+      http2_stream_error (hc, req, HTTP2_ERROR_INTERNAL_ERROR, sp);
+      return HTTP_SM_STOP;
+    }
+  if (!(control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_PATH_PARSED) &&
+      control_data.method != HTTP_REQ_CONNECT)
+    {
+      HTTP_DBG (1, ":path pseudo-header missing in request");
+      http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
+      return HTTP_SM_STOP;
+    }
+  if (!(control_data.parsed_bitmap & HPACK_PSEUDO_HEADER_AUTHORITY_PARSED) &&
+      control_data.method != HTTP_REQ_CONNECT)
+    {
+      HTTP_DBG (1, ":path pseudo-header missing in request");
+      http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
+      return HTTP_SM_STOP;
+    }
+
+  req->base.headers_offset = control_data.headers - buf;
+  req->base.headers_len = control_data.headers_len;
+  if (control_data.content_len_header_index != ~0)
+    {
+      req->base.content_len_header_index =
+       control_data.content_len_header_index;
+      rv = http_parse_content_length (&req->base, buf);
+      if (rv)
+       {
+         http2_stream_error (hc, req, HTTP2_ERROR_PROTOCOL_ERROR, sp);
+         return HTTP_SM_STOP;
+       }
+      new_state = HTTP_REQ_STATE_TRANSPORT_IO_MORE_DATA;
+    }
+  req->base.to_recv = req->base.body_len;
+
+  req->base.target_path_len = control_data.path_len;
+  req->base.target_path_offset = control_data.path - buf;
+  /* drop leading slash */
+  req->base.target_path_offset++;
+  req->base.target_path_len--;
+  req->base.target_query_offset = 0;
+  req->base.target_query_len = 0;
+  http_identify_optional_query (&req->base, buf);
+
+  req->base.control_data_len =
+    req->base.headers_offset + control_data.headers_len;
+
+  msg.type = HTTP_MSG_REQUEST;
+  msg.method_type = control_data.method;
+  msg.data.type = HTTP_MSG_DATA_INLINE;
+  msg.data.len = req->base.connection_header_index;
+  msg.data.scheme = control_data.scheme;
+  msg.data.target_authority_offset = control_data.authority - buf;
+  msg.data.target_authority_len = control_data.authority_len;
+  msg.data.target_path_offset = req->base.target_path_offset;
+  msg.data.target_path_len = req->base.target_path_len;
+  msg.data.target_query_offset = req->base.target_query_offset;
+  msg.data.target_query_len = req->base.target_query_len;
+  msg.data.headers_offset = req->base.headers_offset;
+  msg.data.headers_len = req->base.headers_len;
+  msg.data.headers_ctx = pointer_to_uword (req->base.headers);
+  msg.data.upgrade_proto = HTTP_UPGRADE_PROTO_NA;
+  msg.data.body_len = req->base.body_len;
+
+  svm_fifo_seg_t segs[2] = { { (u8 *) &msg, sizeof (msg) },
+                            { buf, req->base.control_data_len } };
+  HTTP_DBG (3, "%U", format_http_bytes, buf, req->base.control_data_len);
+  http_io_as_write_segs (&req->base, segs, 2);
+  http_req_state_change (&req->base, new_state);
+  http_app_worker_rx_notify (&req->base);
+
+  if (req->stream_id > h2c->last_processed_stream_id)
+    h2c->last_processed_stream_id = req->stream_id;
+
+  return HTTP_SM_STOP;
+}
+
+static http_sm_result_t
+http2_req_state_transport_io_more_data (http_conn_t *hc, http2_req_t *req,
+                                       transport_send_params_t *sp,
+                                       http2_error_t *error)
+{
+  http_io_as_write (&req->base, req->payload, req->payload_len);
+  req->base.to_recv -= req->payload_len;
+  if (req->base.to_recv)
+    http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_APP_REPLY);
+  http_app_worker_rx_notify (&req->base);
+
+  return HTTP_SM_STOP;
+}
+
+/*************************************/
+/* request state machine handlers TX */
+/*************************************/
+
+static http_sm_result_t
+http2_req_state_wait_app_reply (http_conn_t *hc, http2_req_t *req,
+                               transport_send_params_t *sp,
+                               http2_error_t *error)
+{
+  http_msg_t msg;
+  u8 *response, *date, *app_headers = 0;
+  u8 fh[HTTP2_FRAME_HEADER_SIZE];
+  hpack_response_control_data_t control_data;
+  u8 flags = HTTP2_FRAME_FLAG_END_HEADERS;
+  http_sm_result_t sm_result = HTTP_SM_ERROR;
+  u32 n_written;
+
+  http_get_app_msg (&req->base, &msg);
+  ASSERT (msg.type == HTTP_MSG_REPLY);
+
+  response = http_get_tx_buf (hc);
+  date = format (0, "%U", format_http_time_now, hc);
+
+  control_data.sc = msg.code;
+  control_data.content_len = msg.data.body_len;
+  control_data.server_name = hc->app_name;
+  control_data.server_name_len = vec_len (hc->app_name);
+  control_data.date = date;
+  control_data.date_len = vec_len (date);
+
+  if (msg.data.headers_len)
+    app_headers = http_get_app_header_list (&req->base, &msg);
+
+  hpack_serialize_response (app_headers, msg.data.headers_len, &control_data,
+                           &response);
+  vec_free (date);
+
+  if (msg.data.body_len)
+    {
+      /* start sending the actual data */
+      http_req_tx_buffer_init (&req->base, &msg);
+      http_req_state_change (&req->base, HTTP_REQ_STATE_APP_IO_MORE_DATA);
+      sm_result = HTTP_SM_CONTINUE;
+    }
+  else
+    {
+      /* no response body, we are done */
+      flags |= HTTP2_FRAME_FLAG_END_STREAM;
+      sm_result = HTTP_SM_STOP;
+      http2_stream_close (req);
+    }
+
+  http2_frame_write_headers_header (vec_len (response), req->stream_id, flags,
+                                   fh);
+  svm_fifo_seg_t segs[2] = { { fh, HTTP2_FRAME_HEADER_SIZE },
+                            { response, vec_len (response) } };
+  n_written = http_io_ts_write_segs (hc, segs, 2, sp);
+  ASSERT (n_written == (HTTP2_FRAME_HEADER_SIZE + vec_len (response)));
+  http_io_ts_after_write (hc, sp, 0, 1);
+
+  return sm_result;
+}
+
+static http_sm_result_t
+http2_req_state_app_io_more_data (http_conn_t *hc, http2_req_t *req,
+                                 transport_send_params_t *sp,
+                                 http2_error_t *error)
+{
+  u32 max_write, max_read, n_segs, n_read, n_written = 0;
+  svm_fifo_seg_t *app_segs, *segs = 0;
+  http_buffer_t *hb = &req->base.tx_buf;
+  u8 fh[HTTP2_FRAME_HEADER_SIZE];
+  u8 finished = 0, flags = 0;
+
+  ASSERT (http_buffer_bytes_left (hb) > 0);
+  max_write = http_io_ts_max_write (hc, sp);
+  if (max_write <= HTTP2_FRAME_HEADER_SIZE)
+    {
+      HTTP_DBG (1, "ts tx fifo full");
+      goto check_fifo;
+    }
+  max_read = http_buffer_bytes_left (hb);
+
+  n_read = http_buffer_get_segs (hb, max_write - HTTP2_FRAME_HEADER_SIZE,
+                                &app_segs, &n_segs);
+  if (n_read == 0)
+    {
+      HTTP_DBG (1, "no data to deq");
+      goto check_fifo;
+    }
+
+  finished = (max_read - n_read) == 0;
+  flags = finished ? HTTP2_FRAME_FLAG_END_STREAM : 0;
+  http2_frame_write_data_header (n_read, req->stream_id, flags, fh);
+  vec_validate (segs, 0);
+  segs[0].len = HTTP2_FRAME_HEADER_SIZE;
+  segs[0].data = fh;
+  vec_append (segs, app_segs);
+
+  n_written = http_io_ts_write_segs (hc, segs, n_segs + 1, sp);
+  ASSERT (n_written == (HTTP2_FRAME_HEADER_SIZE + n_read));
+  vec_free (segs);
+  http_buffer_drain (hb, n_read);
+
+  if (finished)
+    {
+      http_buffer_free (hb);
+      if (hc->flags & HTTP_CONN_F_IS_SERVER)
+       http2_stream_close (req);
+      else
+       req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
+    }
+
+check_fifo:
+  http_io_ts_after_write (hc, sp, finished, !!n_written);
+  return HTTP_SM_STOP;
+}
+
+/*************************/
+/* request state machine */
+/*************************/
+
+typedef http_sm_result_t (*http2_sm_handler) (http_conn_t *hc,
+                                             http2_req_t *req,
+                                             transport_send_params_t *sp,
+                                             http2_error_t *error);
+
+static http2_sm_handler tx_state_funcs[HTTP_REQ_N_STATES] = {
+  0, /* idle */
+  0, /* wait app method */
+  0, /* wait transport reply */
+  0, /* transport io more data */
+  0, /* wait transport method */
+  http2_req_state_wait_app_reply,
+  http2_req_state_app_io_more_data,
+  0, /* tunnel */
+  0, /* udp tunnel */
+};
+
+static http2_sm_handler rx_state_funcs[HTTP_REQ_N_STATES] = {
+  0, /* idle */
+  0, /* wait app method */
+  0, /* wait transport reply */
+  http2_req_state_transport_io_more_data,
+  http2_req_state_wait_transport_method,
+  0, /* wait app reply */
+  0, /* app io more data */
+  0, /* tunnel */
+  0, /* udp tunnel */
+};
+
+static_always_inline int
+http2_req_state_is_tx_valid (http2_req_t *req)
+{
+  return tx_state_funcs[req->base.state] ? 1 : 0;
+}
+
+static_always_inline http2_error_t
+http2_req_run_state_machine (http_conn_t *hc, http2_req_t *req,
+                            transport_send_params_t *sp, u8 is_tx)
+{
+  http_sm_result_t res;
+  http2_error_t error;
+  http2_conn_ctx_t *h2c;
+
+  do
+    {
+      if (is_tx)
+       res = tx_state_funcs[req->base.state](hc, req, sp, &error);
+      else
+       res = rx_state_funcs[req->base.state](hc, req, 0, &error);
+
+      if (res == HTTP_SM_ERROR)
+       {
+         HTTP_DBG (1, "protocol error %U", format_http2_error, error);
+         return error;
+       }
+    }
+  while (res == HTTP_SM_CONTINUE);
+
+  if (req->stream_state == HTTP2_STREAM_STATE_CLOSED)
+    {
+      h2c = http2_conn_ctx_get_w_thread (hc);
+      session_transport_delete_notify (&req->base.connection);
+      http2_conn_free_req (h2c, req, hc->c_thread_index);
+    }
+
+  return HTTP2_ERROR_NO_ERROR;
+}
+
+/******************/
+/* frame handlers */
+/******************/
+
+static http2_error_t
+http2_handle_headers_frame (http_conn_t *hc, http2_frame_header_t *fh)
+{
+  http2_req_t *req;
+  u8 *rx_buf;
+  http2_error_t rv;
+  http2_conn_ctx_t *h2c;
+
+  if (!(fh->flags & HTTP2_FRAME_FLAG_END_HEADERS))
+    {
+      /* TODO: fragmented headers */
+      return HTTP2_ERROR_INTERNAL_ERROR;
+    }
+
+  if (hc->flags & HTTP_CONN_F_IS_SERVER)
+    {
+      h2c = http2_conn_ctx_get_w_thread (hc);
+      /* streams initiated by client must use odd-numbered stream id */
+      if ((fh->stream_id & 1) == 0)
+       {
+         HTTP_DBG (1, "invalid stream id %u", fh->stream_id);
+         return HTTP2_ERROR_PROTOCOL_ERROR;
+       }
+      /* stream id must be greater than all streams that client has opened */
+      if (fh->stream_id <= h2c->last_opened_stream_id)
+       {
+         HTTP_DBG (1, "closed stream id %u", fh->stream_id);
+         return HTTP2_ERROR_STREAM_CLOSED;
+       }
+      h2c->last_opened_stream_id = fh->stream_id;
+      req = http2_conn_alloc_req (hc, fh->stream_id);
+      http_conn_accept_request (hc, &req->base);
+      http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_TRANSPORT_METHOD);
+      req->stream_state = HTTP2_STREAM_STATE_OPEN;
+      hc->flags &= ~HTTP_CONN_F_NO_APP_SESSION;
+      if (!(hc->flags & HTTP_CONN_F_HAS_REQUEST))
+       {
+         hc->flags |= HTTP_CONN_F_HAS_REQUEST;
+         hpack_dynamic_table_init (
+           &h2c->decoder_dynamic_table,
+           http2_default_conn_settings.header_table_size);
+       }
+      if (fh->flags & HTTP2_FRAME_FLAG_END_STREAM)
+       req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
+    }
+  else
+    {
+      /* TODO: client */
+      return HTTP2_ERROR_INTERNAL_ERROR;
+    }
+
+  rx_buf = http_get_rx_buf (hc);
+  vec_validate (rx_buf, fh->length - 1);
+  http_io_ts_read (hc, rx_buf, fh->length, 0);
+
+  rv = http2_frame_read_headers (&req->payload, &req->payload_len, rx_buf,
+                                fh->length, fh->flags);
+  if (rv != HTTP2_ERROR_NO_ERROR)
+    return rv;
+
+  HTTP_DBG (1, "run state machine");
+  return http2_req_run_state_machine (hc, req, 0, 0);
+}
+
+static http2_error_t
+http2_handle_data_frame (http_conn_t *hc, http2_frame_header_t *fh)
+{
+  http2_req_t *req;
+  u8 *rx_buf;
+  http2_error_t rv;
+  http2_conn_ctx_t *h2c;
+
+  req = http2_conn_get_req (hc, fh->stream_id);
+  if (!req)
+    {
+      h2c = http2_conn_ctx_get_w_thread (hc);
+      if (fh->stream_id <= h2c->last_opened_stream_id)
+       {
+         /* we reset stream, but peer might send something meanwhile */
+         HTTP_DBG (1, "stream closed, ignoring frame");
+         http2_stream_error (hc, req, HTTP2_ERROR_STREAM_CLOSED, 0);
+         return HTTP2_ERROR_NO_ERROR;
+       }
+      else
+       return HTTP2_ERROR_PROTOCOL_ERROR;
+    }
+
+  /* bogus state, connection error */
+  if (req->stream_state == HTTP2_STREAM_STATE_HALF_CLOSED)
+    {
+      HTTP_DBG (1, "error: stream already half-closed");
+      http2_stream_error (hc, req, HTTP2_ERROR_STREAM_CLOSED, 0);
+      return HTTP2_ERROR_NO_ERROR;
+    }
+
+  if (fh->flags & HTTP2_FRAME_FLAG_END_STREAM)
+    req->stream_state = HTTP2_STREAM_STATE_HALF_CLOSED;
+
+  rx_buf = http_get_rx_buf (hc);
+  vec_validate (rx_buf, fh->length - 1);
+  http_io_ts_read (hc, rx_buf, fh->length, 0);
+
+  rv = http2_frame_read_data (&req->payload, &req->payload_len, rx_buf,
+                             fh->length, fh->flags);
+  if (rv != HTTP2_ERROR_NO_ERROR)
+    return rv;
+
+  HTTP_DBG (1, "run state machine");
+  return http2_req_run_state_machine (hc, req, 0, 0);
+}
+
+static http2_error_t
+http2_handle_window_update_frame (http_conn_t *hc, http2_frame_header_t *fh)
+{
+  u8 *rx_buf;
+  u32 win_increment;
+  http2_error_t rv;
+
+  rx_buf = http_get_rx_buf (hc);
+  vec_validate (rx_buf, fh->length - 1);
+  http_io_ts_read (hc, rx_buf, fh->length, 0);
+
+  rv = http2_frame_read_window_update (&win_increment, rx_buf, fh->length);
+  if (rv != HTTP2_ERROR_NO_ERROR)
+    return rv;
+
+  /* TODO: flow control */
+  return HTTP2_ERROR_NO_ERROR;
+}
+
+static http2_error_t
+http2_handle_settings_frame (http_conn_t *hc, http2_frame_header_t *fh)
+{
+  u8 *rx_buf, *resp = 0;
+  http2_error_t rv;
+  http2_conn_settings_t new_settings;
+  http2_conn_ctx_t *h2c;
+
+  if (fh->stream_id != 0)
+    return HTTP2_ERROR_PROTOCOL_ERROR;
+
+  if (fh->flags == HTTP2_FRAME_FLAG_ACK)
+    {
+      if (fh->length != 0)
+       return HTTP2_ERROR_FRAME_SIZE_ERROR;
+      /* TODO: we can start using non-default settings */
+    }
+  else
+    {
+      if (fh->length < sizeof (http2_settings_entry_t))
+       return HTTP2_ERROR_FRAME_SIZE_ERROR;
+
+      rx_buf = http_get_rx_buf (hc);
+      vec_validate (rx_buf, fh->length - 1);
+      http_io_ts_read (hc, rx_buf, fh->length, 0);
+
+      h2c = http2_conn_ctx_get_w_thread (hc);
+      new_settings = h2c->peer_settings;
+      rv = http2_frame_read_settings (&new_settings, rx_buf, fh->length);
+      if (rv != HTTP2_ERROR_NO_ERROR)
+       return rv;
+      h2c->peer_settings = new_settings;
+
+      /* ACK peer settings */
+      http2_frame_write_settings_ack (&resp);
+      http_io_ts_write (hc, resp, vec_len (resp), 0);
+      vec_free (resp);
+      http_io_ts_after_write (hc, 0, 0, 1);
+    }
+
+  return HTTP2_ERROR_NO_ERROR;
+}
+
+static http2_error_t
+http2_handle_rst_stream_frame (http_conn_t *hc, http2_frame_header_t *fh)
+{
+  u8 *rx_buf;
+  http2_error_t rv;
+  http2_req_t *req;
+  u32 error_code;
+  http2_conn_ctx_t *h2c;
+
+  if (fh->stream_id == 0)
+    return HTTP2_ERROR_PROTOCOL_ERROR;
+
+  rx_buf = http_get_rx_buf (hc);
+  vec_validate (rx_buf, fh->length - 1);
+  http_io_ts_read (hc, rx_buf, fh->length, 0);
+
+  rv = http2_frame_read_rst_stream (&error_code, rx_buf, fh->length);
+  if (rv != HTTP2_ERROR_NO_ERROR)
+    return rv;
+
+  req = http2_conn_get_req (hc, fh->stream_id);
+  if (!req)
+    {
+      h2c = http2_conn_ctx_get_w_thread (hc);
+      if (fh->stream_id <= h2c->last_opened_stream_id)
+       {
+         /* we reset stream, but peer might send something meanwhile */
+         HTTP_DBG (1, "stream closed, ignoring frame");
+         return HTTP2_ERROR_NO_ERROR;
+       }
+      else
+       return HTTP2_ERROR_PROTOCOL_ERROR;
+    }
+
+  req->stream_state = HTTP2_STREAM_STATE_CLOSED;
+  session_transport_reset_notify (&req->base.connection);
+
+  return HTTP2_ERROR_NO_ERROR;
+}
+
+static http2_error_t
+http2_handle_goaway_frame (http_conn_t *hc, http2_frame_header_t *fh)
+{
+  u8 *rx_buf;
+  http2_error_t rv;
+  u32 error_code, last_stream_id, req_index, stream_id;
+  http2_conn_ctx_t *h2c;
+  http2_req_t *req;
+
+  if (fh->stream_id != 0)
+    return HTTP2_ERROR_PROTOCOL_ERROR;
+
+  rx_buf = http_get_rx_buf (hc);
+  vec_validate (rx_buf, fh->length - 1);
+  http_io_ts_read (hc, rx_buf, fh->length, 0);
+
+  rv =
+    http2_frame_read_goaway (&error_code, &last_stream_id, rx_buf, fh->length);
+  if (rv != HTTP2_ERROR_NO_ERROR)
+    return rv;
+
+  if (error_code == HTTP2_ERROR_NO_ERROR)
+    {
+      /* TODO: graceful shutdown (no new streams) */
+    }
+  else
+    {
+      /* connection error */
+      hc->state = HTTP_CONN_STATE_CLOSED;
+      if (!(hc->flags & HTTP_CONN_F_HAS_REQUEST))
+       return HTTP2_ERROR_NO_ERROR;
+      h2c = http2_conn_ctx_get_w_thread (hc);
+      hash_foreach (stream_id, req_index, h2c->req_by_stream_id, ({
+                     req = http2_req_get (req_index, hc->c_thread_index);
+                     session_transport_reset_notify (&req->base.connection);
+                   }));
+    }
+
+  return HTTP2_ERROR_NO_ERROR;
+}
+
 /*****************/
 /* http core VFT */
 /*****************/
@@ -205,7 +908,51 @@ static void
 http2_app_tx_callback (http_conn_t *hc, u32 req_index,
                       transport_send_params_t *sp)
 {
-  /* TODO: run state machine */
+  http2_req_t *req;
+  http2_error_t rv;
+
+  HTTP_DBG (1, "hc [%u]%x req_index %u", hc->c_thread_index, hc->hc_hc_index,
+           req_index);
+  req = http2_req_get (req_index, hc->c_thread_index);
+
+  if (!http2_req_state_is_tx_valid (req))
+    {
+      if (req->base.state == HTTP_REQ_STATE_TRANSPORT_IO_MORE_DATA &&
+         (hc->flags & HTTP_CONN_F_IS_SERVER))
+       {
+         /* server app might send error earlier */
+         http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_APP_REPLY);
+       }
+      else
+       {
+         clib_warning ("hc [%u]%x invalid tx state: http req state "
+                       "'%U', session state '%U'",
+                       hc->c_thread_index, hc->hc_hc_index,
+                       format_http_req_state, req->base.state,
+                       format_http_conn_state, hc);
+         http2_stream_error (hc, req, HTTP2_ERROR_INTERNAL_ERROR, sp);
+         return;
+       }
+    }
+
+  /* peer reset stream, but app might send something meanwhile */
+  if (req->stream_state == HTTP2_STREAM_STATE_CLOSED)
+    {
+      HTTP_DBG (1, "stream closed, ignoring app data");
+      http_io_as_drain_all (&req->base);
+      return;
+    }
+
+  HTTP_DBG (1, "run state machine");
+  rv = http2_req_run_state_machine (hc, req, sp, 1);
+  if (rv != HTTP2_ERROR_NO_ERROR)
+    {
+      http2_connection_error (hc, rv, sp);
+      return;
+    }
+
+  /* reset http connection expiration timer */
+  http_conn_timer_update (hc);
 }
 
 static void
@@ -217,13 +964,40 @@ http2_app_rx_evt_callback (http_conn_t *hc, u32 req_index, u32 thread_index)
 static void
 http2_app_close_callback (http_conn_t *hc, u32 req_index, u32 thread_index)
 {
-  /* TODO: confirm close or wait until all app data drained */
+  http2_req_t *req;
+
+  HTTP_DBG (1, "hc [%u]%x req_index %u", hc->c_thread_index, hc->hc_hc_index,
+           req_index);
+  req = http2_req_get (req_index, thread_index);
+  if (!req)
+    {
+      HTTP_DBG (1, "req already deleted");
+      return;
+    }
+
+  if (req->stream_state == HTTP2_STREAM_STATE_CLOSED ||
+      hc->state == HTTP_CONN_STATE_CLOSED)
+    {
+      HTTP_DBG (1, "nothing more to send, confirm close");
+      session_transport_closed_notify (&req->base.connection);
+    }
+  else
+    {
+      HTTP_DBG (1, "wait for all data to be written to ts");
+      req->flags |= HTTP2_REQ_F_APP_CLOSED;
+    }
 }
 
 static void
 http2_app_reset_callback (http_conn_t *hc, u32 req_index, u32 thread_index)
 {
-  /* TODO: send RST_STREAM frame */
+  http2_req_t *req;
+
+  HTTP_DBG (1, "hc [%u]%x req_index %u", hc->c_thread_index, hc->hc_hc_index,
+           req_index);
+  req = http2_req_get (req_index, thread_index);
+  req->flags |= HTTP2_REQ_F_APP_CLOSED;
+  http2_stream_error (hc, req, HTTP2_ERROR_INTERNAL_ERROR, 0);
 }
 
 static int
@@ -236,24 +1010,124 @@ http2_transport_connected_callback (http_conn_t *hc)
 static void
 http2_transport_rx_callback (http_conn_t *hc)
 {
-  /* TODO: run state machine or handle control frames on stream 0 */
+  http2_frame_header_t fh;
+  u32 to_deq;
+  u8 *rx_buf;
+  http2_error_t rv;
+
+  HTTP_DBG (1, "hc [%u]%x", hc->c_thread_index, hc->hc_hc_index);
+
+  to_deq = http_io_ts_max_read (hc);
+
+  if (PREDICT_FALSE (to_deq == 0))
+    {
+      HTTP_DBG (1, "no data to deq");
+      return;
+    }
+
+  if (PREDICT_FALSE (to_deq < HTTP2_FRAME_HEADER_SIZE))
+    {
+      HTTP_DBG (1, "to_deq %u is less than frame header size", to_deq);
+      http2_connection_error (hc, HTTP2_ERROR_PROTOCOL_ERROR, 0);
+      return;
+    }
+
+  while (to_deq >= HTTP2_FRAME_HEADER_SIZE)
+    {
+      rx_buf = http_get_rx_buf (hc);
+      http_io_ts_read (hc, rx_buf, HTTP2_FRAME_HEADER_SIZE, 0);
+      to_deq -= HTTP2_FRAME_HEADER_SIZE;
+      http2_frame_header_read (rx_buf, &fh);
+      if (fh.length > to_deq)
+       {
+         HTTP_DBG (1, "incomplete frame, to deq %lu, frame length %lu",
+                   to_deq, fh.length);
+         http2_connection_error (hc, HTTP2_ERROR_PROTOCOL_ERROR, 0);
+         return;
+       }
+      to_deq -= fh.length;
+
+      HTTP_DBG (1, "frame type 0x%02x", fh.type);
+      switch (fh.type)
+       {
+       case HTTP2_FRAME_TYPE_HEADERS:
+         rv = http2_handle_headers_frame (hc, &fh);
+         break;
+       case HTTP2_FRAME_TYPE_DATA:
+         rv = http2_handle_data_frame (hc, &fh);
+         break;
+       case HTTP2_FRAME_TYPE_WINDOW_UPDATE:
+         rv = http2_handle_window_update_frame (hc, &fh);
+         break;
+       case HTTP2_FRAME_TYPE_SETTINGS:
+         rv = http2_handle_settings_frame (hc, &fh);
+         break;
+       case HTTP2_FRAME_TYPE_RST_STREAM:
+         rv = http2_handle_rst_stream_frame (hc, &fh);
+         break;
+       case HTTP2_FRAME_TYPE_GOAWAY:
+         rv = http2_handle_goaway_frame (hc, &fh);
+         break;
+       case HTTP2_FRAME_TYPE_PING:
+         /* TODO */
+         rv = HTTP2_ERROR_INTERNAL_ERROR;
+         break;
+       case HTTP2_FRAME_TYPE_CONTINUATION:
+         /* TODO */
+         rv = HTTP2_ERROR_INTERNAL_ERROR;
+         break;
+       case HTTP2_FRAME_TYPE_PUSH_PROMISE:
+         /* TODO */
+         rv = HTTP2_ERROR_PROTOCOL_ERROR;
+         break;
+       case HTTP2_FRAME_TYPE_PRIORITY: /* deprecated */
+       default:
+         /* ignore unknown frame type */
+         http_io_ts_drain (hc, fh.length);
+         break;
+       }
+
+      if (rv != HTTP2_ERROR_NO_ERROR)
+       {
+         http2_connection_error (hc, rv, 0);
+         return;
+       }
+    }
+
+  /* reset http connection expiration timer */
+  http_conn_timer_update (hc);
 }
 
 static void
 http2_transport_close_callback (http_conn_t *hc)
 {
-  u32 req_index, stream_id;
+  u32 req_index, stream_id, n_open_streams = 0;
   http2_req_t *req;
   http2_conn_ctx_t *h2c;
 
+  HTTP_DBG (1, "hc [%u]%x", hc->c_thread_index, hc->hc_hc_index);
+
   if (!(hc->flags & HTTP_CONN_F_HAS_REQUEST))
-    return;
+    {
+      HTTP_DBG (1, "no request");
+      return;
+    }
 
   h2c = http2_conn_ctx_get_w_thread (hc);
   hash_foreach (stream_id, req_index, h2c->req_by_stream_id, ({
                  req = http2_req_get (req_index, hc->c_thread_index);
-                 session_transport_closing_notify (&req->base.connection);
+                 if (req->stream_state != HTTP2_STREAM_STATE_CLOSED)
+                   {
+                     HTTP_DBG (1, "req_index %u", req_index);
+                     session_transport_closing_notify (&req->base.connection);
+                     n_open_streams++;
+                   }
                }));
+  if (n_open_streams == 0)
+    {
+      HTTP_DBG (1, "no open stream disconnecting");
+      http_disconnect_transport (hc);
+    }
 }
 
 static void
@@ -263,13 +1137,19 @@ http2_transport_reset_callback (http_conn_t *hc)
   http2_req_t *req;
   http2_conn_ctx_t *h2c;
 
+  HTTP_DBG (1, "hc [%u]%x", hc->c_thread_index, hc->hc_hc_index);
+
   if (!(hc->flags & HTTP_CONN_F_HAS_REQUEST))
     return;
 
   h2c = http2_conn_ctx_get_w_thread (hc);
   hash_foreach (stream_id, req_index, h2c->req_by_stream_id, ({
                  req = http2_req_get (req_index, hc->c_thread_index);
-                 session_transport_reset_notify (&req->base.connection);
+                 if (req->stream_state != HTTP2_STREAM_STATE_CLOSED)
+                   {
+                     HTTP_DBG (1, "req_index %u", req_index);
+                     session_transport_reset_notify (&req->base.connection);
+                   }
                }));
 }
 
@@ -286,6 +1166,7 @@ http2_conn_cleanup_callback (http_conn_t *hc)
   http2_req_t *req;
   http2_conn_ctx_t *h2c;
 
+  HTTP_DBG (1, "hc [%u]%x", hc->c_thread_index, hc->hc_hc_index);
   h2c = http2_conn_ctx_get_w_thread (hc);
   hash_foreach (stream_id, req_index, h2c->req_by_stream_id,
                ({ vec_add1 (req_indices, req_index); }));
@@ -293,7 +1174,8 @@ http2_conn_cleanup_callback (http_conn_t *hc)
   vec_foreach (req_index_p, req_indices)
     {
       req = http2_req_get (*req_index_p, hc->c_thread_index);
-      session_transport_delete_notify (&req->base.connection);
+      if (req->stream_state != HTTP2_STREAM_STATE_CLOSED)
+       session_transport_delete_notify (&req->base.connection);
       http2_conn_free_req (h2c, req, hc->c_thread_index);
     }
 
index 8354bf2..10e5bfd 100644 (file)
@@ -472,6 +472,61 @@ http_get_app_msg (http_req_t *req, http_msg_t *msg)
   ASSERT (rv == sizeof (*msg));
 }
 
+always_inline void
+http_identify_optional_query (http_req_t *req, u8 *rx_buf)
+{
+  int i;
+  for (i = req->target_path_offset;
+       i < (req->target_path_offset + req->target_path_len); i++)
+    {
+      if (rx_buf[i] == '?')
+       {
+         req->target_query_offset = i + 1;
+         req->target_query_len = req->target_path_offset +
+                                 req->target_path_len -
+                                 req->target_query_offset;
+         req->target_path_len =
+           req->target_path_len - req->target_query_len - 1;
+         break;
+       }
+    }
+}
+
+always_inline int
+http_parse_content_length (http_req_t *req, u8 *rx_buf)
+{
+  int i;
+  http_field_line_t *field_line;
+  u8 *p;
+  u64 body_len = 0, digit;
+
+  field_line = vec_elt_at_index (req->headers, req->content_len_header_index);
+  p = rx_buf + req->headers_offset + field_line->value_offset;
+  for (i = 0; i < field_line->value_len; i++)
+    {
+      /* check for digit */
+      if (!isdigit (*p))
+       {
+         HTTP_DBG (1, "expected digit");
+         return -1;
+       }
+      digit = *p - '0';
+      u64 new_body_len = body_len * 10 + digit;
+      /* check for overflow */
+      if (new_body_len < body_len)
+       {
+         HTTP_DBG (1, "content-length value too big number, overflow");
+         return -1;
+       }
+      body_len = new_body_len;
+      p++;
+    }
+
+  req->body_len = body_len;
+
+  return 0;
+}
+
 /* Abstraction of app session fifo operations */
 
 always_inline void
@@ -495,6 +550,16 @@ http_io_as_max_read (http_req_t *req)
   return svm_fifo_max_dequeue_cons (as->tx_fifo);
 }
 
+always_inline void
+http_io_as_write (http_req_t *req, u8 *data, u32 len)
+{
+  int n_written;
+  session_t *ts = session_get_from_handle (req->hr_pa_session_handle);
+
+  n_written = svm_fifo_enqueue (ts->tx_fifo, len, data);
+  ASSERT (n_written == len);
+}
+
 always_inline u32
 http_io_as_write_segs (http_req_t *req, const svm_fifo_seg_t segs[],
                       u32 n_segs)