hsa: http client parallel sessions 83/42183/14
authorAdrian Villin <[email protected]>
Fri, 24 Jan 2025 12:56:22 +0000 (13:56 +0100)
committerFlorin Coras <[email protected]>
Tue, 11 Feb 2025 20:14:07 +0000 (20:14 +0000)
- client is now able to use multiple workers to send requests
  (sometimes it uses multiple sessions on a single worker)

Type: feature

Change-Id: I2d83d47a9768011b3d8d05ed320852606841e4b8
Signed-off-by: Adrian Villin <[email protected]>
extras/hs-test/http_test.go
extras/hs-test/infra/suite_no_topo.go
extras/hs-test/topo-containers/single.yaml
src/plugins/hs_apps/http_client.c

index 6893455..b143e55 100644 (file)
@@ -7,7 +7,6 @@ import (
        "math/rand"
        "net"
        "net/http"
-       "net/http/httptest"
        "net/http/httptrace"
        "os"
        "strconv"
@@ -36,10 +35,10 @@ func init() {
                HttpClientErrRespTest, HttpClientPostFormTest, HttpClientGet128kbResponseTest, HttpClientGetResponseBodyTest,
                HttpClientGetNoResponseBodyTest, HttpClientPostFileTest, HttpClientPostFilePtrTest, HttpUnitTest,
                HttpRequestLineTest, HttpClientGetTimeout, HttpStaticFileHandlerWrkTest, HttpStaticUrlHandlerWrkTest, HttpConnTimeoutTest,
-               HttpClientGetRepeat, HttpClientPostRepeat, HttpIgnoreH2UpgradeTest, HttpInvalidAuthorityFormUriTest, HttpHeaderErrorConnectionDropTest)
+               HttpClientGetRepeatTest, HttpClientPostRepeatTest, HttpIgnoreH2UpgradeTest, HttpInvalidAuthorityFormUriTest, HttpHeaderErrorConnectionDropTest)
        RegisterNoTopoSoloTests(HttpStaticPromTest, HttpGetTpsTest, HttpGetTpsInterruptModeTest, PromConcurrentConnectionsTest,
                PromMemLeakTest, HttpClientPostMemLeakTest, HttpInvalidClientRequestMemLeakTest, HttpPostTpsTest, HttpPostTpsInterruptModeTest,
-               PromConsecutiveConnectionsTest, HttpGetTpsTlsTest, HttpPostTpsTlsTest)
+               PromConsecutiveConnectionsTest, HttpGetTpsTlsTest, HttpPostTpsTlsTest, HttpClientGetRepeatMTTest, HttpClientPtrGetRepeatMTTest)
 }
 
 const wwwRootPath = "/tmp/www_root"
@@ -382,35 +381,37 @@ func httpClientGet(s *NoTopoSuite, response string, size int) {
        s.AssertContains(file_contents, response)
 }
 
-func startSimpleServer(s *NoTopoSuite, replyCount *int, serverAddress string) (server *httptest.Server) {
-       var err error
-       server = httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-               fmt.Fprintf(w, "Hello")
-               *replyCount++
-       }))
-       server.Listener, err = net.Listen("tcp", serverAddress+":80")
-       s.AssertNil(err, "Error while creating listener.")
-
-       server.Start()
+func HttpClientGetRepeatMTTest(s *NoTopoSuite) {
+       httpClientRepeat(s, "", "sessions 2")
+}
 
-       return server
+func HttpClientPtrGetRepeatMTTest(s *NoTopoSuite) {
+       httpClientRepeat(s, "", "use-ptr sessions 2")
 }
 
-func HttpClientGetRepeat(s *NoTopoSuite) {
-       httpClientRepeat(s, "")
+func HttpClientGetRepeatTest(s *NoTopoSuite) {
+       httpClientRepeat(s, "", "")
 }
 
-func HttpClientPostRepeat(s *NoTopoSuite) {
-       httpClientRepeat(s, "post")
+func HttpClientPostRepeatTest(s *NoTopoSuite) {
+       httpClientRepeat(s, "post", "")
 }
 
-func httpClientRepeat(s *NoTopoSuite, requestMethod string) {
-       replyCount := 0
+func httpClientRepeat(s *NoTopoSuite, requestMethod string, clientArgs string) {
        vpp := s.Containers.Vpp.VppInstance
-       serverAddress := s.HostAddr()
+       logPath := s.Containers.NginxServer.GetContainerWorkDir() + "/" + s.Containers.NginxServer.Name + "-access.log"
+       serverAddress := s.Interfaces.Tap.Ip4AddressString()
+       replyCountInt := 0
        repeatAmount := 10000
-       server := startSimpleServer(s, &replyCount, serverAddress)
-       defer server.Close()
+       durationInSec := 10
+       var err error
+
+       // recreate interfaces with RX-queues
+       s.AssertNil(vpp.DeleteTap(s.Interfaces.Tap))
+       s.AssertNil(vpp.CreateTap(s.Interfaces.Tap, 2, 2))
+
+       s.CreateNginxServer()
+       s.AssertNil(s.Containers.NginxServer.Start())
 
        if requestMethod == "post" {
                fileName := "/tmp/test_file.txt"
@@ -419,41 +420,45 @@ func httpClientRepeat(s *NoTopoSuite, requestMethod string) {
                requestMethod += " file /tmp/test_file.txt"
        }
 
-       uri := "http://" + serverAddress + "/80"
-       cmd := fmt.Sprintf("http client %s use-ptr duration 10 header Hello:World uri %s target /index.html",
-               requestMethod, uri)
+       uri := "http://" + serverAddress + "/" + s.GetPortFromPpid()
+       cmd := fmt.Sprintf("http client %s %s duration %d header Hello:World uri %s target /index.html",
+               requestMethod, clientArgs, durationInSec, uri)
 
-       s.Log("Duration 10s")
+       s.Log("Duration %ds", durationInSec)
        o := vpp.Vppctl(cmd)
-       outputLen := len(o)
-       if outputLen > 500 {
-               s.Log(o[:500])
-               s.Log("* HST Framework: output limited to 500 chars to avoid flooding the console. Output length: " + fmt.Sprint(outputLen))
-       } else {
-               s.Log(o)
+       s.Log(o)
+
+       replyCount := s.Containers.NginxServer.Exec(false, "awk 'END { print NR }' "+logPath)
+       if replyCount != "" {
+               replyCountInt, err = strconv.Atoi(replyCount[:len(replyCount)-1])
+               s.AssertNil(err)
        }
-       s.Log("Server response count: %d", replyCount)
+       // empty the log file
+       s.Containers.NginxServer.Exec(false, "truncate -s 0 "+logPath)
+
+       s.Log("Server response count: %d", replyCountInt)
        s.AssertNotNil(o)
        s.AssertNotContains(o, "error")
-       s.AssertGreaterThan(replyCount, 15000)
+       s.AssertGreaterThan(replyCountInt, 15000)
 
-       cmd = fmt.Sprintf("http client %s use-ptr repeat %d header Hello:World uri %s target /index.html",
-               requestMethod, repeatAmount, uri)
+       replyCount = ""
+       cmd = fmt.Sprintf("http client %s %s repeat %d header Hello:World uri %s target /index.html",
+               requestMethod, clientArgs, repeatAmount, uri)
 
-       replyCount = 0
+       s.AssertNil(err, fmt.Sprint(err))
        s.Log("Repeat %d", repeatAmount)
        o = vpp.Vppctl(cmd)
-       outputLen = len(o)
-       if outputLen > 500 {
-               s.Log(o[:500])
-               s.Log("* HST Framework: output limited to 500 chars to avoid flooding the console. Output length: " + fmt.Sprint(outputLen))
-       } else {
-               s.Log(o)
+       s.Log(o)
+
+       replyCount = s.Containers.NginxServer.Exec(false, "awk 'END { print NR }' "+logPath)
+       if replyCount != "" {
+               replyCountInt, err = strconv.Atoi(replyCount[:len(replyCount)-1])
+               s.AssertNil(err)
        }
-       s.Log("Server response count: %d", replyCount)
+       s.Log("Server response count: %d", replyCountInt)
        s.AssertNotNil(o)
        s.AssertNotContains(o, "error")
-       s.AssertEqual(repeatAmount, replyCount)
+       s.AssertEqual(repeatAmount, replyCountInt)
 }
 
 func HttpClientGetTimeout(s *NoTopoSuite) {
index 1c7b6fe..d084413 100644 (file)
@@ -18,13 +18,15 @@ type NoTopoSuite struct {
                Tap *NetInterface
        }
        Containers struct {
-               Vpp        *Container
-               Nginx      *Container
-               NginxHttp3 *Container
-               Wrk        *Container
-               Curl       *Container
-               Ab         *Container
+               Vpp         *Container
+               Nginx       *Container
+               NginxHttp3  *Container
+               NginxServer *Container
+               Wrk         *Container
+               Curl        *Container
+               Ab          *Container
        }
+       NginxServerPort string
 }
 
 func RegisterNoTopoTests(tests ...func(s *NoTopoSuite)) {
@@ -42,6 +44,7 @@ func (s *NoTopoSuite) SetupSuite() {
        s.Containers.Vpp = s.GetContainerByName("vpp")
        s.Containers.Nginx = s.GetContainerByName("nginx")
        s.Containers.NginxHttp3 = s.GetContainerByName("nginx-http3")
+       s.Containers.NginxServer = s.GetTransientContainerByName("nginx-server")
        s.Containers.Wrk = s.GetContainerByName("wrk")
        s.Containers.Curl = s.GetContainerByName("curl")
        s.Containers.Ab = s.GetContainerByName("ab")
@@ -101,6 +104,28 @@ func (s *NoTopoSuite) CreateNginxConfig(container *Container, multiThreadWorkers
        )
 }
 
+// Creates container and config.
+func (s *NoTopoSuite) CreateNginxServer() {
+       s.AssertNil(s.Containers.NginxServer.Create())
+       s.NginxServerPort = s.GetPortFromPpid()
+       nginxSettings := struct {
+               LogPrefix string
+               Address   string
+               Port      string
+               Timeout   int
+       }{
+               LogPrefix: s.Containers.NginxServer.Name,
+               Address:   s.Interfaces.Tap.Ip4AddressString(),
+               Port:      s.NginxServerPort,
+               Timeout:   600,
+       }
+       s.Containers.NginxServer.CreateConfigFromTemplate(
+               "/nginx.conf",
+               "./resources/nginx/nginx_server.conf",
+               nginxSettings,
+       )
+}
+
 func (s *NoTopoSuite) AddNginxVclConfig(multiThreadWorkers bool) {
        vclFileName := s.Containers.Nginx.GetHostWorkDir() + "/vcl.conf"
        appSocketApi := fmt.Sprintf("app-socket-api %s/var/run/app_ns_sockets/default",
index b4449dc..2f5f31b 100644 (file)
@@ -28,6 +28,14 @@ containers:
     image: "hs-test/nginx-http3"
     is-optional: true
 
+  - name: "nginx-server"
+    volumes:
+      - <<: *shared-vol
+        container-dir: "/tmp/nginx"
+        is-default-work-dir: true
+    image: "hs-test/nginx-server"
+    is-optional: true
+
   - name: "ab"
     image: "hs-test/ab"
     is-optional: true
index 20271fc..e475931 100644 (file)
@@ -1,5 +1,5 @@
 /* SPDX-License-Identifier: Apache-2.0
- * Copyright(c) 2024 Cisco Systems, Inc.
+ * Copyright(c) 2025 Cisco Systems, Inc.
  */
 
 #include <vnet/session/application.h>
 #include <http/http_status_codes.h>
 #include <vppinfra/unix.h>
 
+typedef struct
+{
+  u64 req_per_wrk;
+  u64 request_count;
+  f64 start, end;
+  f64 elapsed_time;
+} hc_stats_t;
+
 typedef struct
 {
   CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
   u32 session_index;
   u32 thread_index;
-  u32 vpp_session_index;
   u64 to_recv;
   u8 is_closed;
+  hc_stats_t stats;
+  u64 data_offset;
+  u8 *resp_headers;
+  u8 *http_response;
+  u8 *response_status;
 } hc_session_t;
 
-typedef struct
-{
-  u64 request_count;
-  f64 start, end;
-  f64 elapsed_time;
-} hc_stats_t;
-
 typedef struct
 {
   hc_session_t *sessions;
@@ -35,6 +40,8 @@ typedef struct
   u8 *headers_buf;
   http_headers_ctx_t req_headers;
   http_msg_t msg;
+  u32 session_index;
+  bool has_common_headers;
 } hc_worker_t;
 
 typedef struct
@@ -52,11 +59,7 @@ typedef struct
   session_endpoint_cfg_t connect_sep;
   u8 *target;
   u8 *data;
-  u64 data_offset;
   hc_worker_t *wrk;
-  u8 *resp_headers;
-  u8 *http_response;
-  u8 *response_status;
   hc_http_header_t *custom_header;
   u8 is_file;
   u8 use_ptr;
@@ -67,6 +70,18 @@ typedef struct
   u64 repeat_count;
   f64 duration;
   bool repeat;
+  bool multi_session;
+  u32 done_count;
+  u32 connected_counter;
+  u32 worker_index;
+  u32 max_sessions;
+  u32 private_segment_size;
+  u32 prealloc_fifos;
+  u32 fifo_size;
+  u8 *appns_id;
+  u64 appns_secret;
+  clib_spinlock_t lock;
+  bool was_transport_closed;
 } hc_main_t;
 
 typedef enum
@@ -95,13 +110,6 @@ hc_session_get (u32 session_index, u32 thread_index)
   return pool_elt_at_index (wrk->sessions, session_index);
 }
 
-static void
-hc_ho_session_free (u32 hs_index)
-{
-  hc_worker_t *wrk = hc_worker_get (0);
-  pool_put_index (wrk->sessions, hs_index);
-}
-
 static hc_session_t *
 hc_session_alloc (hc_worker_t *wrk)
 {
@@ -115,14 +123,14 @@ hc_session_alloc (hc_worker_t *wrk)
 }
 
 static int
-hc_request (session_t *s, session_error_t err)
+hc_request (session_t *s, hc_worker_t *wrk, hc_session_t *hc_session,
+           session_error_t err)
 {
   hc_main_t *hcm = &hc_main;
   u64 to_send;
   u32 n_enq;
   u8 n_segs;
   int rv;
-  hc_worker_t *wrk = hc_worker_get (s->thread_index);
 
   if (hcm->use_ptr)
     {
@@ -166,7 +174,7 @@ hc_request (session_t *s, session_error_t err)
       rv = svm_fifo_enqueue (s->tx_fifo, n_enq, hcm->data);
       if (rv < to_send)
        {
-         hcm->data_offset = (rv > 0) ? rv : 0;
+         hc_session->data_offset = (rv > 0) ? rv : 0;
          svm_fifo_add_want_deq_ntf (s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
        }
     }
@@ -185,9 +193,8 @@ hc_session_connected_callback (u32 app_index, u32 hc_session_index,
 {
   hc_main_t *hcm = &hc_main;
   hc_worker_t *wrk;
-  u32 new_hc_index;
+  hc_session_t *hc_session;
   hc_http_header_t *header;
-  HTTP_DBG (1, "ho hc_index: %d", hc_session_index);
 
   if (err)
     {
@@ -199,68 +206,89 @@ hc_session_connected_callback (u32 app_index, u32 hc_session_index,
     }
 
   wrk = hc_worker_get (s->thread_index);
-  hc_session_t *hc_session, *new_hc_session = hc_session_alloc (wrk);
-  hc_session = hc_session_get (hc_session_index, 0);
-  new_hc_index = new_hc_session->session_index;
-  clib_memcpy_fast (new_hc_session, hc_session, sizeof (*hc_session));
-  new_hc_session->session_index = new_hc_index;
-  new_hc_session->thread_index = s->thread_index;
-  new_hc_session->vpp_session_index = s->session_index;
-  HTTP_DBG (1, "new hc_index: %d", new_hc_session->session_index);
-  s->opaque = new_hc_index;
+  hc_session = hc_session_alloc (wrk);
+  clib_spinlock_lock_if_init (&hcm->lock);
+  hcm->connected_counter++;
+  clib_spinlock_unlock_if_init (&hcm->lock);
 
-  if (hcm->req_method == HTTP_REQ_POST)
+  hc_session->thread_index = s->thread_index;
+  s->opaque = hc_session->session_index;
+  wrk->session_index = hc_session->session_index;
+
+  if (hcm->multi_session)
     {
-      if (hcm->is_file)
-       http_add_header (
-         &wrk->req_headers, HTTP_HEADER_CONTENT_TYPE,
-         http_content_type_token (HTTP_CONTENT_APP_OCTET_STREAM));
-      else
-       http_add_header (
-         &wrk->req_headers, HTTP_HEADER_CONTENT_TYPE,
-         http_content_type_token (HTTP_CONTENT_APP_X_WWW_FORM_URLENCODED));
+      hc_session->stats.req_per_wrk = hcm->repeat_count / hcm->max_sessions;
+      clib_spinlock_lock_if_init (&hcm->lock);
+      /* add remaining requests to the first connected session */
+      if (hcm->connected_counter == 1)
+       {
+         hc_session->stats.req_per_wrk +=
+           hcm->repeat_count % hcm->max_sessions;
+       }
+      clib_spinlock_unlock_if_init (&hcm->lock);
     }
-  http_add_header (&wrk->req_headers, HTTP_HEADER_ACCEPT, "*", 1);
-
-  vec_foreach (header, hcm->custom_header)
-    http_add_custom_header (
-      &wrk->req_headers, (const char *) header->name, vec_len (header->name),
-      (const char *) header->value, vec_len (header->value));
-
-  clib_warning ("%U", format_http_bytes, wrk->headers_buf,
-               wrk->req_headers.tail_offset);
-  wrk->msg.method_type = hcm->req_method;
-  if (hcm->req_method == HTTP_REQ_POST)
-    wrk->msg.data.body_len = vec_len (hcm->data);
   else
-    wrk->msg.data.body_len = 0;
-
-  wrk->msg.type = HTTP_MSG_REQUEST;
-  /* request target */
-  wrk->msg.data.target_path_len = vec_len (hcm->target);
-  /* custom headers */
-  wrk->msg.data.headers_len = wrk->req_headers.tail_offset;
-  /* total length */
-  wrk->msg.data.len = wrk->msg.data.target_path_len +
-                     wrk->msg.data.headers_len + wrk->msg.data.body_len;
-
-  if (hcm->use_ptr)
     {
-      wrk->msg.data.type = HTTP_MSG_DATA_PTR;
+      hc_session->stats.req_per_wrk = hcm->repeat_count;
+      hcm->worker_index = s->thread_index;
     }
-  else
+
+  if (!wrk->has_common_headers)
     {
-      wrk->msg.data.type = HTTP_MSG_DATA_INLINE;
-      wrk->msg.data.target_path_offset = 0;
-      wrk->msg.data.headers_offset = wrk->msg.data.target_path_len;
-      wrk->msg.data.body_offset =
-       wrk->msg.data.headers_offset + wrk->msg.data.headers_len;
+      wrk->has_common_headers = true;
+      if (hcm->req_method == HTTP_REQ_POST)
+       {
+         if (hcm->is_file)
+           http_add_header (
+             &wrk->req_headers, HTTP_HEADER_CONTENT_TYPE,
+             http_content_type_token (HTTP_CONTENT_APP_OCTET_STREAM));
+         else
+           http_add_header (&wrk->req_headers, HTTP_HEADER_CONTENT_TYPE,
+                            http_content_type_token (
+                              HTTP_CONTENT_APP_X_WWW_FORM_URLENCODED));
+       }
+      http_add_header (&wrk->req_headers, HTTP_HEADER_ACCEPT, "*", 1);
+
+      vec_foreach (header, hcm->custom_header)
+       http_add_custom_header (&wrk->req_headers, (const char *) header->name,
+                               vec_len (header->name),
+                               (const char *) header->value,
+                               vec_len (header->value));
+
+      wrk->msg.method_type = hcm->req_method;
+      if (hcm->req_method == HTTP_REQ_POST)
+       wrk->msg.data.body_len = vec_len (hcm->data);
+      else
+       wrk->msg.data.body_len = 0;
+
+      wrk->msg.type = HTTP_MSG_REQUEST;
+      /* request target */
+      wrk->msg.data.target_path_len = vec_len (hcm->target);
+      /* custom headers */
+      wrk->msg.data.headers_len = wrk->req_headers.tail_offset;
+      /* total length */
+      wrk->msg.data.len = wrk->msg.data.target_path_len +
+                         wrk->msg.data.headers_len + wrk->msg.data.body_len;
+
+      if (hcm->use_ptr)
+       {
+         wrk->msg.data.type = HTTP_MSG_DATA_PTR;
+       }
+      else
+       {
+         wrk->msg.data.type = HTTP_MSG_DATA_INLINE;
+         wrk->msg.data.target_path_offset = 0;
+         wrk->msg.data.headers_offset = wrk->msg.data.target_path_len;
+         wrk->msg.data.body_offset =
+           wrk->msg.data.headers_offset + wrk->msg.data.headers_len;
+       }
     }
 
   if (hcm->repeat)
-    hc_stats.start = vlib_time_now (vlib_get_main_by_index (s->thread_index));
+    hc_session->stats.start =
+      vlib_time_now (vlib_get_main_by_index (s->thread_index));
 
-  return hc_request (s, err);
+  return hc_request (s, wrk, hc_session, err);
 }
 
 static void
@@ -275,21 +303,38 @@ hc_session_disconnect_callback (session_t *s)
   if ((rv = vnet_disconnect_session (a)))
     clib_warning ("warning: disconnect returned: %U", format_session_error,
                  rv);
+  clib_spinlock_lock_if_init (&hcm->lock);
+  hcm->done_count++;
+  clib_spinlock_unlock_if_init (&hcm->lock);
 }
 
 static void
 hc_session_transport_closed_callback (session_t *s)
 {
   hc_main_t *hcm = &hc_main;
-  vlib_process_signal_event_mt (hcm->wrk->vlib_main, hcm->cli_node_index,
-                               HC_TRANSPORT_CLOSED, 0);
-}
+  hc_worker_t *wrk = hc_worker_get (s->thread_index);
 
-static void
-hc_ho_cleanup_callback (session_t *s)
-{
-  HTTP_DBG (1, "ho hc_index: %d:", s->opaque);
-  hc_ho_session_free (s->opaque);
+  clib_spinlock_lock_if_init (&hcm->lock);
+  if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
+    {
+      hcm->was_transport_closed = true;
+    }
+
+  /* send an event when all sessions are closed */
+  if (hcm->done_count >= hcm->max_sessions)
+    {
+      if (hcm->was_transport_closed)
+       {
+         vlib_process_signal_event_mt (wrk->vlib_main, hcm->cli_node_index,
+                                       HC_TRANSPORT_CLOSED, 0);
+       }
+      else
+       {
+         vlib_process_signal_event_mt (wrk->vlib_main, hcm->cli_node_index,
+                                       HC_REPEAT_DONE, 0);
+       }
+    }
+  clib_spinlock_unlock_if_init (&hcm->lock);
 }
 
 static void
@@ -315,20 +360,23 @@ hc_rx_callback (session_t *s)
 {
   hc_main_t *hcm = &hc_main;
   hc_worker_t *wrk = hc_worker_get (s->thread_index);
-  hc_session_t *hc_session;
+  hc_session_t *hc_session = hc_session_get (s->opaque, s->thread_index);
   http_msg_t msg;
   int rv;
+  u32 max_deq;
   session_error_t session_err = 0;
   int send_err = 0;
 
-  hc_session = hc_session_get (s->opaque, s->thread_index);
-
   if (hc_session->is_closed)
     {
       clib_warning ("hc_session_index[%d] is closed", s->opaque);
       return -1;
     }
 
+  max_deq = svm_fifo_max_dequeue_cons (s->rx_fifo);
+  if (PREDICT_FALSE (max_deq == 0))
+    goto done;
+
   if (hc_session->to_recv == 0)
     {
       rv = svm_fifo_dequeue (s->rx_fifo, sizeof (msg), (u8 *) &msg);
@@ -344,17 +392,20 @@ hc_rx_callback (session_t *s)
 
       if (msg.data.headers_len)
        {
-         hcm->response_status =
-           format (0, "%U", format_http_status_code, msg.code);
+
+         if (!hcm->repeat)
+           hc_session->response_status =
+             format (0, "%U", format_http_status_code, msg.code);
+
          svm_fifo_dequeue_drop (s->rx_fifo, msg.data.headers_offset);
 
-         vec_validate (hcm->resp_headers, msg.data.headers_len - 1);
-         vec_set_len (hcm->resp_headers, msg.data.headers_len);
+         vec_validate (hc_session->resp_headers, msg.data.headers_len - 1);
+         vec_set_len (hc_session->resp_headers, msg.data.headers_len);
          rv = svm_fifo_dequeue (s->rx_fifo, msg.data.headers_len,
-                                hcm->resp_headers);
+                                hc_session->resp_headers);
 
          ASSERT (rv == msg.data.headers_len);
-         HTTP_DBG (1, (char *) format (0, "%v", hcm->resp_headers));
+         HTTP_DBG (1, (char *) format (0, "%v", hc_session->resp_headers));
          msg.data.body_offset -=
            msg.data.headers_len + msg.data.headers_offset;
        }
@@ -372,18 +423,18 @@ hc_rx_callback (session_t *s)
        {
          goto done;
        }
-      vec_validate (hcm->http_response, msg.data.body_len - 1);
-      vec_reset_length (hcm->http_response);
+      vec_validate (hc_session->http_response, msg.data.body_len - 1);
+      vec_reset_length (hc_session->http_response);
     }
 
-  u32 max_deq = svm_fifo_max_dequeue (s->rx_fifo);
+  max_deq = svm_fifo_max_dequeue (s->rx_fifo);
   if (!max_deq)
     {
       goto done;
     }
   u32 n_deq = clib_min (hc_session->to_recv, max_deq);
-  u32 curr = vec_len (hcm->http_response);
-  rv = svm_fifo_dequeue (s->rx_fifo, n_deq, hcm->http_response + curr);
+  u32 curr = vec_len (hc_session->http_response);
+  rv = svm_fifo_dequeue (s->rx_fifo, n_deq, hc_session->http_response + curr);
   if (rv < 0)
     {
       clib_warning ("app dequeue(n=%d) failed; rv = %d", n_deq, rv);
@@ -393,7 +444,7 @@ hc_rx_callback (session_t *s)
     }
 
   ASSERT (rv == n_deq);
-  vec_set_len (hcm->http_response, curr + n_deq);
+  vec_set_len (hc_session->http_response, curr + n_deq);
   ASSERT (hc_session->to_recv >= rv);
   hc_session->to_recv -= rv;
 
@@ -402,20 +453,19 @@ done:
     {
       if (hcm->repeat)
        {
-         hc_stats.request_count++;
-         hc_stats.end = vlib_time_now (wrk->vlib_main);
-         hc_stats.elapsed_time = hc_stats.end - hc_stats.start;
+         hc_session->stats.request_count++;
+         hc_session->stats.end = vlib_time_now (wrk->vlib_main);
+         hc_session->stats.elapsed_time =
+           hc_session->stats.end - hc_session->stats.start;
 
-         if (hc_stats.elapsed_time >= hcm->duration &&
-             hc_stats.request_count >= hcm->repeat_count)
+         if (hc_session->stats.elapsed_time >= hcm->duration &&
+             hc_session->stats.request_count >= hc_session->stats.req_per_wrk)
            {
-             vlib_process_signal_event_mt (
-               wrk->vlib_main, hcm->cli_node_index, HC_REPEAT_DONE, 0);
              hc_session_disconnect_callback (s);
            }
          else
            {
-             send_err = hc_request (s, session_err);
+             send_err = hc_request (s, wrk, hc_session, session_err);
              if (send_err)
                clib_warning ("failed to send request, error %d", send_err);
            }
@@ -434,11 +484,13 @@ static int
 hc_tx_callback (session_t *s)
 {
   hc_main_t *hcm = &hc_main;
+  hc_session_t *hc_session = hc_session_get (s->opaque, s->thread_index);
   u64 to_send;
   int rv;
 
-  to_send = vec_len (hcm->data) - hcm->data_offset;
-  rv = svm_fifo_enqueue (s->tx_fifo, to_send, hcm->data + hcm->data_offset);
+  to_send = vec_len (hcm->data) - hc_session->data_offset;
+  rv = svm_fifo_enqueue (s->tx_fifo, to_send,
+                        hcm->data + hc_session->data_offset);
 
   if (rv <= 0)
     {
@@ -448,7 +500,7 @@ hc_tx_callback (session_t *s)
 
   if (rv < to_send)
     {
-      hcm->data_offset += rv;
+      hc_session->data_offset += rv;
       svm_fifo_add_want_deq_ntf (s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
     }
 
@@ -465,7 +517,6 @@ static session_cb_vft_t hc_session_cb_vft = {
   .session_reset_callback = hc_session_reset_callback,
   .builtin_app_rx_callback = hc_rx_callback,
   .builtin_app_tx_callback = hc_tx_callback,
-  .half_open_cleanup_callback = hc_ho_cleanup_callback,
 };
 
 static clib_error_t *
@@ -474,8 +525,12 @@ hc_attach ()
   hc_main_t *hcm = &hc_main;
   vnet_app_attach_args_t _a, *a = &_a;
   u64 options[18];
+  u32 segment_size = 128 << 20;
   int rv;
 
+  if (hcm->private_segment_size)
+    segment_size = hcm->private_segment_size;
+
   clib_memset (a, 0, sizeof (*a));
   clib_memset (options, 0, sizeof (options));
 
@@ -483,7 +538,19 @@ hc_attach ()
   a->name = format (0, "http_client");
   a->session_cb_vft = &hc_session_cb_vft;
   a->options = options;
+  a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size;
+  a->options[APP_OPTIONS_ADD_SEGMENT_SIZE] = segment_size;
+  a->options[APP_OPTIONS_RX_FIFO_SIZE] =
+    hcm->fifo_size ? hcm->fifo_size : 8 << 10;
+  a->options[APP_OPTIONS_TX_FIFO_SIZE] =
+    hcm->fifo_size ? hcm->fifo_size : 32 << 10;
   a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
+  a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = hcm->prealloc_fifos;
+  if (hcm->appns_id)
+    {
+      a->namespace_id = hcm->appns_id;
+      a->options[APP_OPTIONS_NAMESPACE_SECRET] = hcm->appns_secret;
+    }
 
   if ((rv = vnet_application_attach (a)))
     return clib_error_return (0, "attach returned: %U", format_session_error,
@@ -500,14 +567,19 @@ static int
 hc_connect_rpc (void *rpc_args)
 {
   vnet_connect_args_t *a = rpc_args;
-  int rv;
+  int rv = ~0;
+  hc_main_t *hcm = &hc_main;
 
-  rv = vnet_connect (a);
-  if (rv > 0)
-    clib_warning (0, "connect returned: %U", format_session_error, rv);
+  for (u32 i = 0; i < hcm->max_sessions; i++)
+    {
+      rv = vnet_connect (a);
+      if (rv > 0)
+       clib_warning (0, "connect returned: %U", format_session_error, rv);
+    }
 
   session_endpoint_free_ext_cfgs (&a->sep_ext);
   vec_free (a);
+
   return rv;
 }
 
@@ -516,14 +588,10 @@ hc_connect ()
 {
   hc_main_t *hcm = &hc_main;
   vnet_connect_args_t *a = 0;
-  hc_worker_t *wrk;
-  hc_session_t *hc_session;
   transport_endpt_ext_cfg_t *ext_cfg;
   transport_endpt_cfg_http_t http_cfg = { (u32) hcm->timeout, 0 };
-
   vec_validate (a, 0);
   clib_memset (a, 0, sizeof (a[0]));
-
   clib_memcpy (&a->sep_ext, &hcm->connect_sep, sizeof (hcm->connect_sep));
   a->app_index = hcm->app_index;
 
@@ -531,15 +599,41 @@ hc_connect ()
     &a->sep_ext, TRANSPORT_ENDPT_EXT_CFG_HTTP, sizeof (http_cfg));
   clib_memcpy (ext_cfg->data, &http_cfg, sizeof (http_cfg));
 
-  /* allocate http session on main thread */
-  wrk = hc_worker_get (0);
-  hc_session = hc_session_alloc (wrk);
-  a->api_context = hc_session->session_index;
-
   session_send_rpc_evt_to_thread_force (transport_cl_thread (), hc_connect_rpc,
                                        a);
 }
 
+static void
+hc_get_repeat_stats (vlib_main_t *vm)
+{
+  hc_main_t *hcm = &hc_main;
+  hc_worker_t *wrk;
+  hc_session_t *hc_session;
+
+  if (hcm->repeat)
+    {
+      vec_foreach (wrk, hcm->wrk)
+       {
+         vec_foreach (hc_session, wrk->sessions)
+           {
+             hc_stats.request_count += hc_session->stats.request_count;
+             hc_session->stats.request_count = 0;
+             if (hc_stats.elapsed_time < hc_session->stats.elapsed_time)
+               {
+                 hc_stats.elapsed_time = hc_session->stats.elapsed_time;
+                 hc_session->stats.elapsed_time = 0;
+               }
+           }
+       }
+      vlib_cli_output (vm,
+                      "< %d request(s) in %.6fs\n< avg latency "
+                      "%.4fms\n< %.2f req/sec",
+                      hc_stats.request_count, hc_stats.elapsed_time,
+                      (hc_stats.elapsed_time / hc_stats.request_count) * 1000,
+                      hc_stats.request_count / hc_stats.elapsed_time);
+    }
+}
+
 static clib_error_t *
 hc_get_event (vlib_main_t *vm)
 {
@@ -548,6 +642,8 @@ hc_get_event (vlib_main_t *vm)
   clib_error_t *err = NULL;
   FILE *file_ptr;
   u64 event_timeout;
+  hc_worker_t *wrk;
+  hc_session_t *hc_session;
 
   event_timeout = hcm->timeout ? hcm->timeout : 10;
   if (event_timeout == hcm->duration)
@@ -558,20 +654,26 @@ hc_get_event (vlib_main_t *vm)
   switch (event_type)
     {
     case ~0:
+      hc_get_repeat_stats (vm);
       err = clib_error_return (0, "error: timeout");
       break;
     case HC_CONNECT_FAILED:
+      hc_get_repeat_stats (vm);
       err = clib_error_return (0, "error: failed to connect");
       break;
     case HC_TRANSPORT_CLOSED:
+      hc_get_repeat_stats (vm);
       err = clib_error_return (0, "error: transport closed");
       break;
     case HC_GENERIC_ERR:
+      hc_get_repeat_stats (vm);
       err = clib_error_return (0, "error: unknown");
       break;
     case HC_REPLY_RECEIVED:
       if (hcm->filename)
        {
+         wrk = hc_worker_get (hcm->worker_index);
+         hc_session = hc_session_get (wrk->session_index, wrk->thread_index);
          file_ptr =
            fopen ((char *) format (0, "/tmp/%v", hcm->filename), "a");
          if (file_ptr == NULL)
@@ -580,26 +682,27 @@ hc_get_event (vlib_main_t *vm)
            }
          else
            {
-             fprintf (file_ptr, "< %s\n< %s\n< %s", hcm->response_status,
-                      hcm->resp_headers, hcm->http_response);
+             fprintf (file_ptr, "< %s\n< %s\n< %s",
+                      hc_session->response_status, hc_session->resp_headers,
+                      hc_session->http_response);
              fclose (file_ptr);
              vlib_cli_output (vm, "file saved (/tmp/%v)", hcm->filename);
            }
        }
       if (hcm->verbose)
-       vlib_cli_output (vm, "< %v< %v", hcm->response_status,
-                        hcm->resp_headers);
-      vlib_cli_output (vm, "\n%v\n", hcm->http_response);
+       {
+         wrk = hc_worker_get (hcm->worker_index);
+         hc_session = hc_session_get (wrk->session_index, wrk->thread_index);
+         vlib_cli_output (vm, "< %v< %v", hc_session->response_status,
+                          hc_session->resp_headers);
+         vlib_cli_output (vm, "\n%v\n", hc_session->http_response);
+       }
       break;
     case HC_REPEAT_DONE:
-      vlib_cli_output (vm,
-                      "< %d request(s) in %.6fs\n< avg latency "
-                      "%.4fms\n< %.2f req/sec",
-                      hc_stats.request_count, hc_stats.elapsed_time,
-                      (hc_stats.elapsed_time / hc_stats.request_count) * 1000,
-                      hc_stats.request_count / hc_stats.elapsed_time);
+      hc_get_repeat_stats (vm);
       break;
     default:
+      hc_get_repeat_stats (vm);
       err = clib_error_return (0, "error: unexpected event %d", event_type);
       break;
     }
@@ -612,15 +715,17 @@ static clib_error_t *
 hc_run (vlib_main_t *vm)
 {
   hc_main_t *hcm = &hc_main;
-  vlib_thread_main_t *vtm = vlib_get_thread_main ();
   u32 num_threads;
   hc_worker_t *wrk;
   clib_error_t *err;
 
-  num_threads = 1 /* main thread */ + vtm->n_threads;
+  num_threads = 1 /* main thread */ + vlib_num_workers ();
+  if (vlib_num_workers ())
+    clib_spinlock_init (&hcm->lock);
   vec_validate (hcm->wrk, num_threads - 1);
   vec_foreach (wrk, hcm->wrk)
     {
+      wrk->has_common_headers = false;
       wrk->thread_index = wrk - hcm->wrk;
       /* 4k for headers should be enough */
       vec_validate (wrk->headers_buf, 4095);
@@ -657,10 +762,18 @@ hc_detach ()
 }
 
 static void
-hcc_worker_cleanup (hc_worker_t *wrk)
+hc_worker_cleanup (hc_worker_t *wrk)
 {
-  HTTP_DBG (1, "worker cleanup");
+  hc_session_t *hc_session;
+  HTTP_DBG (1, "worker and worker sessions cleanup");
+
   vec_free (wrk->headers_buf);
+  vec_foreach (hc_session, wrk->sessions)
+    {
+      vec_free (hc_session->resp_headers);
+      vec_free (hc_session->http_response);
+      vec_free (hc_session->response_status);
+    }
   pool_free (wrk->sessions);
 }
 
@@ -673,16 +786,14 @@ hc_cleanup ()
   hc_http_header_t *header;
 
   vec_foreach (wrk, hcm->wrk)
-    hcc_worker_cleanup (wrk);
+    hc_worker_cleanup (wrk);
 
   vec_free (hcm->uri);
   vec_free (hcm->target);
   vec_free (hcm->data);
-  vec_free (hcm->resp_headers);
-  vec_free (hcm->http_response);
-  vec_free (hcm->response_status);
   vec_free (hcm->wrk);
   vec_free (hcm->filename);
+  vec_free (hcm->appns_id);
   vec_foreach (header, hcm->custom_header)
     {
       vec_free (header->name);
@@ -698,6 +809,8 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input,
   hc_main_t *hcm = &hc_main;
   clib_error_t *err = 0;
   unformat_input_t _line_input, *line_input = &_line_input;
+  u64 mem_size;
+  u8 *appns_id = 0;
   u8 *path = 0;
   u8 *file_data;
   hc_http_header_t new_header;
@@ -708,7 +821,16 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input,
   hcm->repeat_count = 0;
   hcm->duration = 0;
   hcm->repeat = false;
+  hcm->multi_session = false;
+  hcm->done_count = 0;
+  hcm->connected_counter = 0;
+  hcm->max_sessions = 1;
+  hcm->prealloc_fifos = 0;
+  hcm->private_segment_size = 0;
+  hcm->fifo_size = 0;
+  hcm->was_transport_closed = false;
   hc_stats.request_count = 0;
+  hc_stats.elapsed_time = 0;
 
   if (hcm->attached)
     return clib_error_return (0, "failed: already running!");
@@ -761,6 +883,29 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input,
        }
       else if (unformat (line_input, "duration %f", &hcm->duration))
        hcm->repeat = true;
+      else if (unformat (line_input, "sessions %d", &hcm->max_sessions))
+       {
+         hcm->multi_session = true;
+         if (hcm->max_sessions <= 1)
+           {
+             err = clib_error_return (0, "sessions must be > 1");
+             goto done;
+           }
+       }
+      else if (unformat (line_input, "prealloc-fifos %d",
+                        &hcm->prealloc_fifos))
+       ;
+      else if (unformat (line_input, "private-segment-size %U",
+                        unformat_memory_size, &mem_size))
+       hcm->private_segment_size = mem_size;
+      else if (unformat (line_input, "fifo-size %U", unformat_memory_size,
+                        &mem_size))
+       hcm->fifo_size = mem_size;
+      else if (unformat (line_input, "appns %_%v%_", &appns_id))
+       ;
+      else if (unformat (line_input, "secret %lu", &hcm->appns_secret))
+       ;
+
       else
        {
          err = clib_error_return (0, "unknown input `%U'",
@@ -801,6 +946,13 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input,
       goto done;
     }
 
+  if (hcm->multi_session && !hcm->repeat)
+    {
+      err = clib_error_return (
+       0, "multiple sessions are only supported with request repeating");
+      goto done;
+    }
+
   if ((rv = parse_uri ((char *) hcm->uri, &hcm->connect_sep)))
     {
       err =
@@ -808,6 +960,12 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input,
       goto done;
     }
 
+  if (hcm->duration >= hcm->timeout)
+    {
+      hcm->timeout = hcm->duration + 10;
+    }
+  hcm->appns_id = appns_id;
+
   if (hcm->repeat)
     vlib_cli_output (vm, "Running, please wait...");
 
@@ -845,7 +1003,9 @@ VLIB_CLI_COMMAND (hc_command, static) = {
     "[post] uri http://<ip-addr> target <origin-form> "
     "[data <form-urlencoded> | file <file-path>] [use-ptr] "
     "[save-to <filename>] [header <Key:Value>] [verbose] "
-    "[timeout <seconds> (default = 10)] [repeat <count> | duration <seconds>]",
+    "[timeout <seconds> (default = 10)] [repeat <count> | duration <seconds>] "
+    "[sessions <# of sessions>] [appns <app-ns> secret <appns-secret>] "
+    "[fifo-size <nM|G>] [private-segment-size <nM|G>] [prealloc-fifos <n>]",
   .function = hc_command_fn,
   .is_mp_safe = 1,
 };