http: large POST handling 30/41530/6
authorMatus Fabian <[email protected]>
Wed, 4 Sep 2024 16:04:54 +0000 (18:04 +0200)
committerFlorin Coras <[email protected]>
Sun, 8 Sep 2024 22:41:27 +0000 (22:41 +0000)
Type: improvement
Change-Id: I28b8e8ccbff6f97e669b0048011b187decbfc892
Signed-off-by: Matus Fabian <[email protected]>
extras/hs-test/http_test.go
src/plugins/hs_apps/http_tps.c
src/plugins/http/http.c
src/plugins/http/http_plugin.rst

index 733ca46..bfd2d34 100644 (file)
@@ -6,6 +6,7 @@ import (
        "github.com/onsi/gomega/ghttp"
        "github.com/onsi/gomega/gmeasure"
        "io"
+       "math/rand"
        "net"
        "net/http"
        "net/http/httptrace"
@@ -30,8 +31,8 @@ func init() {
                HttpInvalidContentLengthTest, HttpInvalidTargetSyntaxTest, HttpStaticPathTraversalTest, HttpUriDecodeTest,
                HttpHeadersTest, HttpStaticFileHandlerTest, HttpStaticFileHandlerDefaultMaxAgeTest, HttpClientTest, HttpClientErrRespTest, HttpClientPostFormTest,
                HttpClientPostFileTest, HttpClientPostFilePtrTest, AuthorityFormTargetTest, HttpRequestLineTest)
-       RegisterNoTopoSoloTests(HttpStaticPromTest, HttpTpsTest, HttpTpsInterruptModeTest, PromConcurrentConnectionsTest,
-               PromMemLeakTest, HttpClientPostMemLeakTest, HttpInvalidClientRequestMemLeakTest)
+       RegisterNoTopoSoloTests(HttpStaticPromTest, HttpGetTpsTest, HttpGetTpsInterruptModeTest, PromConcurrentConnectionsTest,
+               PromMemLeakTest, HttpClientPostMemLeakTest, HttpInvalidClientRequestMemLeakTest, HttpPostTpsTest, HttpPostTpsInterruptModeTest)
 }
 
 const wwwRootPath = "/tmp/www_root"
@@ -53,18 +54,51 @@ func httpDownloadBenchmark(s *HstSuite, experiment *gmeasure.Experiment, data in
        experiment.RecordValue("Download Speed", (float64(resp.ContentLength)/1024/1024)/duration.Seconds(), gmeasure.Units("MB/s"), gmeasure.Precision(2))
 }
 
-func HttpTpsInterruptModeTest(s *NoTopoSuite) {
-       HttpTpsTest(s)
+func HttpGetTpsInterruptModeTest(s *NoTopoSuite) {
+       HttpGetTpsTest(s)
 }
 
-func HttpTpsTest(s *NoTopoSuite) {
+func HttpGetTpsTest(s *NoTopoSuite) {
        vpp := s.GetContainerByName("vpp").VppInstance
        serverAddress := s.VppAddr()
        url := "http://" + serverAddress + ":8080/test_file_10M"
 
        vpp.Vppctl("http tps uri tcp://0.0.0.0/8080")
 
-       s.RunBenchmark("HTTP tps 10M", 10, 0, httpDownloadBenchmark, url)
+       s.RunBenchmark("HTTP tps download 10M", 10, 0, httpDownloadBenchmark, url)
+}
+
+func httpUploadBenchmark(s *HstSuite, experiment *gmeasure.Experiment, data interface{}) {
+       url, isValid := data.(string)
+       s.AssertEqual(true, isValid)
+       body := make([]byte, 10485760)
+       _, err := rand.Read(body)
+       client := NewHttpClient()
+       req, err := http.NewRequest("POST", url, bytes.NewBuffer(body))
+       s.AssertNil(err, fmt.Sprint(err))
+       t := time.Now()
+       resp, err := client.Do(req)
+       s.AssertNil(err, fmt.Sprint(err))
+       defer resp.Body.Close()
+       s.AssertHttpStatus(resp, 200)
+       _, err = io.ReadAll(resp.Body)
+       s.AssertNil(err, fmt.Sprint(err))
+       duration := time.Since(t)
+       experiment.RecordValue("Upload Speed", (float64(req.ContentLength)/1024/1024)/duration.Seconds(), gmeasure.Units("MB/s"), gmeasure.Precision(2))
+}
+
+func HttpPostTpsInterruptModeTest(s *NoTopoSuite) {
+       HttpPostTpsTest(s)
+}
+
+func HttpPostTpsTest(s *NoTopoSuite) {
+       vpp := s.GetContainerByName("vpp").VppInstance
+       serverAddress := s.VppAddr()
+       url := "http://" + serverAddress + ":8080/test_file_10M"
+
+       vpp.Vppctl("http tps uri tcp://0.0.0.0/8080")
+
+       s.RunBenchmark("HTTP tps upload 10M", 10, 0, httpUploadBenchmark, url)
 }
 
 func HttpPersistentConnectionTest(s *NoTopoSuite) {
index 35a5802..39046b3 100644 (file)
@@ -20,6 +20,8 @@
 #include <http/http_header_names.h>
 #include <http/http_content_types.h>
 
+#define HTS_RX_BUF_SIZE (64 << 10)
+
 typedef struct
 {
   CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -28,6 +30,7 @@ typedef struct
   u64 data_len;
   u64 data_offset;
   u32 vpp_session_index;
+  u32 to_recv;
   union
   {
     /** threshold after which connection is closed */
@@ -36,6 +39,7 @@ typedef struct
     u32 close_rate;
   };
   u8 *uri;
+  u8 *rx_buf;
   http_header_t *resp_headers;
 } hts_session_t;
 
@@ -105,6 +109,8 @@ hts_session_free (hts_session_t *hs)
   if (htm->debug_level > 0)
     clib_warning ("Freeing session %u", hs->session_index);
 
+  vec_free (hs->rx_buf);
+
   if (CLIB_DEBUG)
     clib_memset (hs, 0xfa, sizeof (*hs));
 
@@ -227,6 +233,8 @@ hts_start_send_data (hts_session_t *hs, http_status_code_t status)
   http_msg_t msg;
   session_t *ts;
   u8 *headers_buf = 0;
+  u32 n_segs = 1;
+  svm_fifo_seg_t seg[2];
   int rv;
 
   if (vec_len (hs->resp_headers))
@@ -235,6 +243,9 @@ hts_start_send_data (hts_session_t *hs, http_status_code_t status)
       vec_free (hs->resp_headers);
       msg.data.headers_offset = 0;
       msg.data.headers_len = vec_len (headers_buf);
+      seg[1].data = headers_buf;
+      seg[1].len = msg.data.headers_len;
+      n_segs = 2;
     }
   else
     {
@@ -248,17 +259,14 @@ hts_start_send_data (hts_session_t *hs, http_status_code_t status)
   msg.data.body_len = hs->data_len;
   msg.data.body_offset = msg.data.headers_len;
   msg.data.len = msg.data.body_len + msg.data.headers_len;
+  seg[0].data = (u8 *) &msg;
+  seg[0].len = sizeof (msg);
 
   ts = session_get (hs->vpp_session_index, hs->thread_index);
-  rv = svm_fifo_enqueue (ts->tx_fifo, sizeof (msg), (u8 *) &msg);
-  ASSERT (rv == sizeof (msg));
-
-  if (msg.data.headers_len)
-    {
-      rv = svm_fifo_enqueue (ts->tx_fifo, vec_len (headers_buf), headers_buf);
-      ASSERT (rv == msg.data.headers_len);
-      vec_free (headers_buf);
-    }
+  rv = svm_fifo_enqueue_segments (ts->tx_fifo, seg, n_segs,
+                                 0 /* allow partial */);
+  vec_free (headers_buf);
+  ASSERT (rv == (sizeof (msg) + msg.data.headers_len));
 
   if (!msg.data.body_len)
     {
@@ -323,6 +331,40 @@ done:
   return rc;
 }
 
+static inline void
+hts_session_rx_body (hts_session_t *hs, session_t *ts)
+{
+  hts_main_t *htm = &hts_main;
+  u32 n_deq;
+  int rv;
+
+  n_deq = svm_fifo_max_dequeue (ts->rx_fifo);
+  if (!htm->no_zc)
+    {
+      svm_fifo_dequeue_drop_all (ts->rx_fifo);
+    }
+  else
+    {
+      n_deq = clib_min (n_deq, HTS_RX_BUF_SIZE);
+      rv = svm_fifo_dequeue (ts->rx_fifo, n_deq, hs->rx_buf);
+      ASSERT (rv == n_deq);
+    }
+  hs->to_recv -= n_deq;
+
+  if (hs->close_threshold > 0)
+    {
+      if ((f64) (hs->data_len - hs->to_recv) / hs->data_len >
+         hs->close_threshold)
+       hts_disconnect_transport (hs);
+    }
+
+  if (hs->to_recv == 0)
+    {
+      hts_start_send_data (hs, HTTP_STATUS_OK);
+      vec_free (hs->rx_buf);
+    }
+}
+
 static int
 hts_ts_rx_callback (session_t *ts)
 {
@@ -333,44 +375,77 @@ hts_ts_rx_callback (session_t *ts)
   int rv;
 
   hs = hts_session_get (ts->thread_index, ts->opaque);
-  hs->data_len = 0;
-  hs->resp_headers = 0;
 
-  /* Read the http message header */
-  rv = svm_fifo_dequeue (ts->rx_fifo, sizeof (msg), (u8 *) &msg);
-  ASSERT (rv == sizeof (msg));
-
-  if (msg.type != HTTP_MSG_REQUEST || msg.method_type != HTTP_REQ_GET)
+  if (hs->to_recv == 0)
     {
-      http_add_header (&hs->resp_headers,
-                      http_header_name_token (HTTP_HEADER_ALLOW),
-                      http_token_lit ("GET"));
-      hts_start_send_data (hs, HTTP_STATUS_METHOD_NOT_ALLOWED);
-      goto done;
-    }
+      hs->data_len = 0;
+      hs->resp_headers = 0;
+      hs->rx_buf = 0;
 
-  if (msg.data.target_path_len == 0 ||
-      msg.data.target_form != HTTP_TARGET_ORIGIN_FORM)
-    {
-      hts_start_send_data (hs, HTTP_STATUS_BAD_REQUEST);
-      goto done;
-    }
+      /* Read the http message header */
+      rv = svm_fifo_dequeue (ts->rx_fifo, sizeof (msg), (u8 *) &msg);
+      ASSERT (rv == sizeof (msg));
 
-  vec_validate (target, msg.data.target_path_len - 1);
-  rv = svm_fifo_peek (ts->rx_fifo, msg.data.target_path_offset,
-                     msg.data.target_path_len, target);
-  ASSERT (rv == msg.data.target_path_len);
+      if (msg.type != HTTP_MSG_REQUEST)
+       {
+         hts_start_send_data (hs, HTTP_STATUS_INTERNAL_ERROR);
+         goto done;
+       }
+      if (msg.method_type != HTTP_REQ_GET && msg.method_type != HTTP_REQ_POST)
+       {
+         http_add_header (&hs->resp_headers,
+                          http_header_name_token (HTTP_HEADER_ALLOW),
+                          http_token_lit ("GET, POST"));
+         hts_start_send_data (hs, HTTP_STATUS_METHOD_NOT_ALLOWED);
+         goto done;
+       }
 
-  if (htm->debug_level)
-    clib_warning ("Request target: %v", target);
+      if (msg.data.target_path_len == 0 ||
+         msg.data.target_form != HTTP_TARGET_ORIGIN_FORM)
+       {
+         hts_start_send_data (hs, HTTP_STATUS_BAD_REQUEST);
+         goto done;
+       }
 
-  if (try_test_file (hs, target))
-    hts_start_send_data (hs, HTTP_STATUS_NOT_FOUND);
+      vec_validate (target, msg.data.target_path_len - 1);
+      rv = svm_fifo_peek (ts->rx_fifo, msg.data.target_path_offset,
+                         msg.data.target_path_len, target);
+      ASSERT (rv == msg.data.target_path_len);
 
-  vec_free (target);
+      if (htm->debug_level)
+       clib_warning ("%s request target: %v",
+                     msg.method_type == HTTP_REQ_GET ? "GET" : "POST",
+                     target);
+
+      if (msg.method_type == HTTP_REQ_GET)
+       {
+         if (try_test_file (hs, target))
+           hts_start_send_data (hs, HTTP_STATUS_NOT_FOUND);
+         vec_free (target);
+       }
+      else
+       {
+         vec_free (target);
+         if (!msg.data.body_len)
+           {
+             hts_start_send_data (hs, HTTP_STATUS_BAD_REQUEST);
+             goto done;
+           }
+         /* drop everything up to body */
+         svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.body_offset);
+         hs->to_recv = msg.data.body_len;
+         if (htm->no_zc)
+           vec_validate (hs->rx_buf, HTS_RX_BUF_SIZE - 1);
+         hts_session_rx_body (hs, ts);
+         return 0;
+       }
+
+    done:
+      svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.len);
+    }
+  else
+    hts_session_rx_body (hs, ts);
 
-done:
-  svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.len);
   return 0;
 }
 
@@ -397,6 +472,7 @@ hts_ts_accept_callback (session_t *ts)
 
   hs = hts_session_alloc (ts->thread_index);
   hs->vpp_session_index = ts->session_index;
+  hs->to_recv = 0;
 
   ts->opaque = hs->session_index;
   ts->session_state = SESSION_STATE_READY;
index a32f6f6..d01ee7f 100644 (file)
@@ -527,13 +527,18 @@ v_find_index (u8 *vec, u32 offset, u32 num, char *str)
 static void
 http_identify_optional_query (http_conn_t *hc)
 {
-  u32 pos = vec_search (hc->rx_buf, '?');
-  if (~0 != pos)
+  int i;
+  for (i = hc->target_path_offset;
+       i < (hc->target_path_offset + hc->target_path_len); i++)
     {
-      hc->target_query_offset = pos + 1;
-      hc->target_query_len =
-       hc->target_path_offset + hc->target_path_len - hc->target_query_offset;
-      hc->target_path_len = hc->target_path_len - hc->target_query_len - 1;
+      if (hc->rx_buf[i] == '?')
+       {
+         hc->target_query_offset = i + 1;
+         hc->target_query_len = hc->target_path_offset + hc->target_path_len -
+                                hc->target_query_offset;
+         hc->target_path_len = hc->target_path_len - hc->target_query_len - 1;
+         break;
+       }
     }
 }
 
@@ -674,7 +679,9 @@ http_parse_request_line (http_conn_t *hc, http_status_code_t *ec)
     }
 
   /* parse request-target */
+  HTTP_DBG (0, "http at %d", i);
   target_len = i - hc->target_path_offset;
+  HTTP_DBG (0, "target_len %d", target_len);
   if (target_len < 1)
     {
       clib_warning ("request-target not present");
@@ -911,7 +918,7 @@ http_state_wait_server_reply (http_conn_t *hc, transport_send_params_t *sp)
   http_msg_t msg = {};
   app_worker_t *app_wrk;
   session_t *as;
-  u32 len, max_enq;
+  u32 len, max_enq, body_sent;
   http_status_code_t ec;
   http_main_t *hm = &http_main;
 
@@ -972,16 +979,16 @@ http_state_wait_server_reply (http_conn_t *hc, transport_send_params_t *sp)
 
   http_read_message_drop (hc, len);
 
-  if (hc->body_len == 0)
+  body_sent = len - hc->control_data_len;
+  hc->to_recv = hc->body_len - body_sent;
+  if (hc->to_recv == 0)
     {
-      /* no response body, we are done */
-      hc->to_recv = 0;
+      /* all sent, we are done */
       http_state_change (hc, HTTP_STATE_WAIT_APP_METHOD);
     }
   else
     {
-      /* stream response body */
-      hc->to_recv = hc->body_len;
+      /* stream rest of the response body */
       http_state_change (hc, HTTP_STATE_CLIENT_IO_MORE_DATA);
     }
 
@@ -1006,7 +1013,7 @@ http_state_wait_client_method (http_conn_t *hc, transport_send_params_t *sp)
   http_msg_t msg;
   session_t *as;
   int rv;
-  u32 len, max_enq;
+  u32 len, max_enq, max_deq, body_sent;
 
   rv = http_read_message (hc);
 
@@ -1034,16 +1041,20 @@ http_state_wait_client_method (http_conn_t *hc, transport_send_params_t *sp)
   if (rv)
     goto error;
 
-  /* send "control data" and request body */
+  /* send at least "control data" which is necessary minimum,
+   * if there is some space send also portion of body */
   as = session_get_from_handle (hc->h_pa_session_handle);
-  len = hc->control_data_len + hc->body_len;
   max_enq = svm_fifo_max_enqueue (as->rx_fifo);
-  if (max_enq < len)
+  if (max_enq < hc->control_data_len)
     {
-      /* TODO stream body of large POST */
-      clib_warning ("not enough room for data in app's rx fifo");
+      clib_warning ("not enough room for control data in app's rx fifo");
+      ec = HTTP_STATUS_INTERNAL_ERROR;
       goto error;
     }
+  /* do not dequeue more than one HTTP request, we do not support pipelining */
+  max_deq =
+    clib_min (hc->control_data_len + hc->body_len, vec_len (hc->rx_buf));
+  len = clib_min (max_enq, max_deq);
 
   msg.type = HTTP_MSG_REQUEST;
   msg.method_type = hc->method;
@@ -1065,9 +1076,21 @@ http_state_wait_client_method (http_conn_t *hc, transport_send_params_t *sp)
   rv = svm_fifo_enqueue_segments (as->rx_fifo, segs, 2, 0 /* allow partial */);
   ASSERT (rv == (sizeof (msg) + len));
 
-  /* drop everything, we do not support pipelining */
-  http_read_message_drop_all (hc);
-  http_state_change (hc, HTTP_STATE_WAIT_APP_REPLY);
+  body_sent = len - hc->control_data_len;
+  hc->to_recv = hc->body_len - body_sent;
+  if (hc->to_recv == 0)
+    {
+      /* drop everything, we do not support pipelining */
+      http_read_message_drop_all (hc);
+      /* all sent, we are done */
+      http_state_change (hc, HTTP_STATE_WAIT_APP_REPLY);
+    }
+  else
+    {
+      http_read_message_drop (hc, len);
+      /* stream rest of the response body */
+      http_state_change (hc, HTTP_STATE_CLIENT_IO_MORE_DATA);
+    }
 
   app_wrk = app_worker_get_if_valid (as->app_wrk_index);
   if (app_wrk)
@@ -1408,8 +1431,12 @@ http_state_client_io_more_data (http_conn_t *hc, transport_send_params_t *sp)
   hc->to_recv -= rv;
   HTTP_DBG (1, "drained %d from ts; remains %d", rv, hc->to_recv);
 
+  /* Finished transaction:
+   * server back to HTTP_STATE_WAIT_APP_REPLY
+   * client to HTTP_STATE_WAIT_APP_METHOD */
   if (hc->to_recv == 0)
-    http_state_change (hc, HTTP_STATE_WAIT_APP_METHOD);
+    http_state_change (hc, hc->is_server ? HTTP_STATE_WAIT_APP_REPLY :
+                                          HTTP_STATE_WAIT_APP_METHOD);
 
   app_wrk = app_worker_get_if_valid (as->app_wrk_index);
   if (app_wrk)
index 4daef79..feb2c7f 100644 (file)
@@ -144,16 +144,48 @@ Following example shows how to parse headers:
       vec_free (headers);
     }
 
-Finally application reads body:
+Finally application reads body  (if any), which might be received in multiple pieces (depends on size), so we might need some state machine in ``builtin_app_rx_callback``.
+We will add following members to our session context structure:
+
+.. code-block:: C
+
+  typedef struct
+  {
+    /* ... */
+    u32 to_recv;
+    u8 *resp_body;
+  } session_ctx_t;
+
+First we prepare vector for response body, do it only once when you are reading metadata:
+
+.. code-block:: C
+
+  /* drop everything up to body */
+  svm_fifo_dequeue_drop (ts->rx_fifo, msg.data.body_offset);
+  ctx->to_recv = msg.data.body_len;
+  /* prepare vector for response body */
+  vec_validate (ctx->resp_body, msg.data.body_len - 1);
+  vec_reset_length (ctx->resp_body);
+
+Now we can start reading body content, following block of code could be executed multiple times:
 
 .. code-block:: C
 
-  u8 *body = 0;
-  if (msg.data.body_len)
+  /* dequeue */
+  u32 n_deq = svm_fifo_max_dequeue (ts->rx_fifo);
+  /* current offset */
+  u32 curr = vec_len (ctx->resp_body);
+  rv = svm_fifo_dequeue (ts->rx_fifo, n_deq, ctx->resp_body + curr);
+  ASSERT (rv == n_deq);
+  /* update length of the vector */
+  vec_set_len (ctx->resp_body, curr + n_deq);
+  /* update number of remaining bytes to receive */
+  ctx->to_recv -= rv;
+  /* check if all data received */
+  if (ctx->to_recv == 0)
     {
-      vec_validate (body, msg.data.body_len - 1);
-      rv = svm_fifo_peek (ts->rx_fifo, msg.data.body_offset, msg.data.body_len, body);
-      ASSERT (rv == msg.data.body_len);
+      /* we are done */
+      /* send 200 OK response */
     }
 
 Sending data