http: h2 client multiplexing 58/43458/15
authorMatus Fabian <[email protected]>
Fri, 18 Jul 2025 17:04:07 +0000 (13:04 -0400)
committerFlorin Coras <[email protected]>
Sat, 26 Jul 2025 03:13:59 +0000 (03:13 +0000)
Type: improvement

Change-Id: I768df864cbda26b0901528789b52a33e788c2258
Signed-off-by: Matus Fabian <[email protected]>
extras/hs-test/http2_test.go
extras/hs-test/infra/suite_http2.go
extras/hs-test/resources/nginx/nginx_server.conf
src/plugins/hs_apps/http_client.c
src/plugins/http/http.c
src/plugins/http/http1.c
src/plugins/http/http2/http2.c
src/plugins/http/http_private.h

index 238d335..26605f2 100644 (file)
@@ -6,13 +6,15 @@ import (
        "strings"
        "time"
 
+       "github.com/edwarnicke/exechelper"
+
        . "fd.io/hs-test/infra"
 )
 
 func init() {
        RegisterH2Tests(Http2TcpGetTest, Http2TcpPostTest, Http2MultiplexingTest, Http2TlsTest, Http2ContinuationTxTest, Http2ServerMemLeakTest,
-               Http2ClientGetTest, Http2ClientPostTest, Http2ClientPostPtrTest, Http2ClientGetRepeatTest)
-       RegisterH2MWTests(Http2MultiplexingMWTest)
+               Http2ClientGetTest, Http2ClientPostTest, Http2ClientPostPtrTest, Http2ClientGetRepeatTest, Http2ClientMultiplexingTest)
+       RegisterH2MWTests(Http2MultiplexingMWTest, Http2ClientMultiplexingMWTest)
        RegisterVethTests(Http2CliTlsTest, Http2ClientContinuationTest)
 }
 
@@ -233,6 +235,66 @@ func Http2ClientGetRepeatTest(s *Http2Suite) {
        s.Log(o)
 }
 
+func Http2ClientMultiplexingTest(s *Http2Suite) {
+       vpp := s.Containers.Vpp.VppInstance
+       serverAddress := s.HostAddr() + ":" + s.Ports.Port2
+
+       s.CreateNginxServer()
+       s.AssertNil(s.Containers.NginxServer.Start())
+
+       uri := "https://" + serverAddress + "/httpTestFile"
+       cmd := fmt.Sprintf("http client http2 streams %d repeat %d uri %s", 10, 20, uri)
+       o := vpp.Vppctl(cmd)
+       s.Log(o)
+       s.AssertContains(o, "20 request(s)")
+       logPath := s.Containers.NginxServer.GetHostWorkDir() + "/" + s.Containers.NginxServer.Name + "-access.log"
+       logContents, err := exechelper.Output("cat " + logPath)
+       s.Log(string(logContents))
+       s.AssertNil(err)
+       s.AssertContains(string(logContents), "conn_reqs=20")
+
+       /* test session cleanup */
+       httpStreamCleanupDone := false
+       tcpSessionCleanupDone := false
+       for nTries := 0; nTries < 30; nTries++ {
+               o := vpp.Vppctl("show session verbose")
+               if !strings.Contains(o, "[T]") {
+                       tcpSessionCleanupDone = true
+               }
+               if !strings.Contains(o, "[H2]") {
+                       httpStreamCleanupDone = true
+               }
+               if httpStreamCleanupDone && tcpSessionCleanupDone {
+                       break
+               }
+               time.Sleep(1 * time.Second)
+       }
+       s.AssertEqual(true, tcpSessionCleanupDone, "TCP session not cleaned up")
+       s.AssertEqual(true, httpStreamCleanupDone, "HTTP/2 stream not cleaned up")
+}
+
+func Http2ClientMultiplexingMWTest(s *Http2Suite) {
+       s.CpusPerVppContainer = 3
+       s.SetupTest()
+
+       vpp := s.Containers.Vpp.VppInstance
+       serverAddress := s.HostAddr() + ":" + s.Ports.Port2
+
+       s.CreateNginxServer()
+       s.AssertNil(s.Containers.NginxServer.Start())
+
+       uri := "https://" + serverAddress + "/httpTestFile"
+       cmd := fmt.Sprintf("http client http2 sessions 2 streams %d repeat %d uri %s", 5, 20, uri)
+       o := vpp.Vppctl(cmd)
+       s.Log(o)
+       s.AssertContains(o, "20 request(s)")
+       logPath := s.Containers.NginxServer.GetHostWorkDir() + "/" + s.Containers.NginxServer.Name + "-access.log"
+       logContents, err := exechelper.Output("cat " + logPath)
+       s.Log(string(logContents))
+       s.AssertNil(err)
+       s.AssertEqual(2, strings.Count(string(logContents), "conn_reqs=10"))
+}
+
 func Http2ClientContinuationTest(s *VethsSuite) {
        serverAddress := s.Interfaces.Server.Ip4AddressString() + ":" + s.Ports.Port1
 
index b26ffd9..69739bc 100644 (file)
@@ -98,7 +98,12 @@ func (s *Http2Suite) SetupTest() {
 }
 
 func (s *Http2Suite) TeardownTest() {
-       s.HstSuite.TeardownTest()
+       defer s.HstSuite.TeardownTest()
+       vpp := s.Containers.Vpp.VppInstance
+       if CurrentSpecReport().Failed() {
+               s.Log(vpp.Vppctl("show session verbose 2"))
+               s.Log(vpp.Vppctl("show error"))
+       }
 }
 
 func (s *Http2Suite) VppAddr() string {
@@ -514,9 +519,11 @@ var _ = Describe("H2SpecClientSuite", Ordered, Serial, func() {
        })
 
        testCases := []struct {
-               desc       string
-               portOffset int
+               desc            string
+               portOffset      int
+               clientExtraArgs string
        }{
+               // some tests are testing error conditions after request is completed so in this run http client with repeat
                {desc: "client/1/1", portOffset: 0},
                {desc: "client/4.1/1", portOffset: 1},
                {desc: "client/4.1/2", portOffset: 2},
@@ -535,8 +542,8 @@ var _ = Describe("H2SpecClientSuite", Ordered, Serial, func() {
                //{desc: "client/5.1/6", portOffset: 13},
                //{desc: "client/5.1/7", portOffset: 14},
                {desc: "client/5.1/8", portOffset: 15},
-               {desc: "client/5.1/9", portOffset: 16},
-               {desc: "client/5.1/10", portOffset: 17},
+               {desc: "client/5.1/9", portOffset: 16, clientExtraArgs: "repeat 2 "},
+               {desc: "client/5.1/10", portOffset: 17, clientExtraArgs: "repeat 2 "},
                {desc: "client/5.1.1/1", portOffset: 18},
                {desc: "client/5.4.1/1", portOffset: 19},
                {desc: "client/5.4.1/2", portOffset: 20},
@@ -552,7 +559,7 @@ var _ = Describe("H2SpecClientSuite", Ordered, Serial, func() {
                //{desc: "client/6.3/2", portOffset: 29},
                {desc: "client/6.4/1", portOffset: 30},
                {desc: "client/6.4/2", portOffset: 31},
-               {desc: "client/6.4/3", portOffset: 32},
+               {desc: "client/6.4/3", portOffset: 32, clientExtraArgs: "repeat 2 "},
                {desc: "client/6.5/1", portOffset: 33},
                {desc: "client/6.5/2", portOffset: 34},
                {desc: "client/6.5/3", portOffset: 35},
@@ -573,11 +580,11 @@ var _ = Describe("H2SpecClientSuite", Ordered, Serial, func() {
                {desc: "client/6.9.1/1", portOffset: 49},
                // TODO: message framing without content length using END_STREAM flag
                //{desc: "client/6.9.1/2", portOffset: 50},
-               {desc: "client/6.10/1", portOffset: 51},
+               {desc: "client/6.10/1", portOffset: 51, clientExtraArgs: "repeat 2 "},
                {desc: "client/6.10/2", portOffset: 52},
                {desc: "client/6.10/3", portOffset: 53},
-               {desc: "client/6.10/4", portOffset: 54},
-               {desc: "client/6.10/5", portOffset: 55},
+               {desc: "client/6.10/4", portOffset: 54, clientExtraArgs: "repeat 2 "},
+               {desc: "client/6.10/5", portOffset: 55, clientExtraArgs: "repeat 2 "},
                {desc: "client/6.10/6", portOffset: 56},
        }
 
@@ -611,10 +618,8 @@ var _ = Describe("H2SpecClientSuite", Ordered, Serial, func() {
 
                        go h2spec.RunClientSpec(conf)
 
-                       cmd := fmt.Sprintf("http client timeout 5 verbose uri https://%s:%d/", serverAddress, h2specdFromPort+test.portOffset)
-                       res := s.Containers.Vpp.VppInstance.Vppctl(cmd)
-                       s.Log(res)
-                       s.AssertNotContains(res, "error: timeout")
+                       cmd := fmt.Sprintf("http client timeout 5 %s uri https://%s:%d/", test.clientExtraArgs, serverAddress, h2specdFromPort+test.portOffset)
+                       s.Log(s.Containers.Vpp.VppInstance.Vppctl(cmd))
 
                        oChan := make(chan string)
                        go func() {
index a40ed7c..d161e3c 100644 (file)
@@ -13,6 +13,9 @@ events {
 }
 
 http {
+  log_format access_log_fmt '$remote_addr - $remote_user [$time_local] '
+                            '"$request" $status $body_bytes_sent '
+                            '"$http_referer" "$http_user_agent" conn=$connection conn_reqs=$connection_requests';
   keepalive_timeout 300s;
   keepalive_requests 1000000;
   client_body_timeout {{.Timeout}}s;
@@ -20,7 +23,7 @@ http {
   send_timeout {{.Timeout}}s;
   sendfile on;
   server {
-    access_log /tmp/nginx/{{.LogPrefix}}-access.log;
+    access_log /tmp/nginx/{{.LogPrefix}}-access.log access_log_fmt;
     listen {{.Port}};
     listen {{.PortSsl}} ssl;
     server_name {{.Address}};
index 97dbca7..40eb1d8 100644 (file)
@@ -15,7 +15,8 @@
 #define foreach_hc_s_flag                                                     \
   _ (1, IS_CLOSED)                                                            \
   _ (2, PRINTABLE_BODY)                                                       \
-  _ (4, CHUNKED_BODY)
+  _ (4, CHUNKED_BODY)                                                         \
+  _ (8, IS_PARENT)
 
 typedef enum hc_s_flag_
 {
@@ -26,7 +27,7 @@ typedef enum hc_s_flag_
 
 typedef struct
 {
-  u64 req_per_wrk;
+  u64 max_req;
   u64 request_count;
   f64 start, end;
   f64 elapsed_time;
@@ -46,6 +47,12 @@ typedef struct
   u8 *http_response;
   u8 *response_status;
   FILE *file_ptr;
+  union
+  {
+    u32 child_count;
+    u32 parent_index;
+  };
+  u32 http_session_index;
 } hc_session_t;
 
 typedef struct
@@ -83,7 +90,8 @@ typedef struct
   bool verbose;
   f64 timeout;
   http_req_method_t req_method;
-  u64 repeat_count;
+  u64 reqs_per_session;
+  u64 reqs_remainder;
   f64 duration;
   bool repeat;
   bool multi_session;
@@ -91,6 +99,7 @@ typedef struct
   u32 connected_counter;
   u32 worker_index;
   u32 max_sessions;
+  u32 max_streams;
   u32 private_segment_size;
   u32 prealloc_fifos;
   u32 fifo_size;
@@ -112,6 +121,7 @@ typedef enum
   HC_GENERIC_ERR,
   HC_FOPEN_FAILED,
   HC_REPEAT_DONE,
+  HC_MAX_STREAMS_HIT,
 } hc_cli_signal_t;
 
 #define mime_printable_max_len 35
@@ -151,6 +161,7 @@ hc_session_alloc (hc_worker_t *wrk)
   pool_get_zero (wrk->sessions, s);
   s->session_index = s - wrk->sessions;
   s->thread_index = wrk->thread_index;
+  HTTP_DBG (1, "[%u]%u", s->thread_index, s->session_index);
 
   return s;
 }
@@ -221,32 +232,110 @@ done:
   return 0;
 }
 
+typedef struct
+{
+  u64 parent_handle;
+  u32 parent_index;
+} hc_connect_streams_args_t;
+
+static void
+hc_connect_streams_rpc (void *rpc_args)
+{
+  hc_connect_streams_args_t *args = rpc_args;
+  hc_main_t *hcm = &hc_main;
+  vnet_connect_args_t _a, *a = &_a;
+  hc_worker_t *wrk;
+  hc_session_t *ho_hs;
+  u32 i;
+  int rv;
+
+  clib_memset (a, 0, sizeof (*a));
+  clib_memcpy (&a->sep_ext, &hcm->connect_sep, sizeof (hcm->connect_sep));
+  a->sep_ext.parent_handle = args->parent_handle;
+  a->app_index = hcm->app_index;
+
+  for (i = 0; i < (hcm->max_streams - 1); i++)
+    {
+      /* allocate half-open session */
+      wrk = hc_worker_get (transport_cl_thread ());
+      ho_hs = hc_session_alloc (wrk);
+      ho_hs->parent_index = args->parent_index;
+      a->api_context = ho_hs->session_index;
+
+      rv = vnet_connect (a);
+      if (rv)
+       clib_warning (0, "connect returned: %U", format_session_error, rv);
+    }
+  vec_free (args);
+}
+
+static void
+hc_connect_streams (u64 parent_handle, u32 parent_index)
+{
+  hc_connect_streams_args_t *args = 0;
+
+  vec_validate (args, 0);
+  args->parent_handle = parent_handle;
+  args->parent_index = parent_index;
+
+  session_send_rpc_evt_to_thread_force (transport_cl_thread (),
+                                       hc_connect_streams_rpc, args);
+}
+
 static int
-hc_session_connected_callback (u32 app_index, u32 hc_session_index,
-                              session_t *s, session_error_t err)
+hc_session_connected_callback (u32 app_index, u32 ho_index, session_t *s,
+                              session_error_t err)
 {
   hc_main_t *hcm = &hc_main;
   hc_worker_t *wrk;
-  hc_session_t *hc_session;
+  hc_session_t *hc_session, *ho_session, *parent_session;
   hc_http_header_t *header;
+  http_version_t http_version;
   u8 *f = 0;
+  u32 s_index;
 
   if (err)
     {
-      clib_warning ("hc_session_index[%d] connected error: %U",
-                   hc_session_index, format_session_error, err);
-      vlib_process_signal_event_mt (vlib_get_main (), hcm->cli_node_index,
-                                   HC_CONNECT_FAILED, 0);
+      clib_warning ("connected error: %U", format_session_error, err);
+      if (err == SESSION_E_MAX_STREAMS_HIT)
+       vlib_process_signal_event_mt (vlib_get_main (), hcm->cli_node_index,
+                                     HC_MAX_STREAMS_HIT, 0);
+      else
+       vlib_process_signal_event_mt (vlib_get_main (), hcm->cli_node_index,
+                                     HC_CONNECT_FAILED, 0);
       return -1;
     }
 
+  ho_session = hc_session_get (ho_index, transport_cl_thread ());
   wrk = hc_worker_get (s->thread_index);
-
   hc_session = hc_session_alloc (wrk);
+  s_index = hc_session->session_index;
+  clib_memcpy_fast (hc_session, ho_session, sizeof (*hc_session));
+  hc_session->session_index = s_index;
+  hc_session->thread_index = s->thread_index;
+  hc_session->http_session_index = s->session_index;
+
   clib_spinlock_lock_if_init (&hcm->lock);
   hcm->connected_counter++;
   clib_spinlock_unlock_if_init (&hcm->lock);
 
+  if (hc_session->session_flags & HC_S_FLAG_IS_PARENT)
+    {
+      http_version = http_session_get_version (s);
+      if (http_version == HTTP_VERSION_2 && hcm->max_streams > 1)
+       {
+         HTTP_DBG (1, "parent connected, going to open %u streams",
+                   hcm->max_streams - 1);
+         hc_connect_streams (session_handle (s), hc_session->session_index);
+       }
+    }
+  else
+    {
+      parent_session =
+       hc_session_get (hc_session->parent_index, hc_session->thread_index);
+      parent_session->child_count++;
+    }
+
   hc_session->thread_index = s->thread_index;
   hc_session->body_recv = 0;
   s->opaque = hc_session->session_index;
@@ -254,19 +343,18 @@ hc_session_connected_callback (u32 app_index, u32 hc_session_index,
 
   if (hcm->multi_session)
     {
-      hc_session->stats.req_per_wrk = hcm->repeat_count / hcm->max_sessions;
+      hc_session->stats.max_req = hcm->reqs_per_session;
       clib_spinlock_lock_if_init (&hcm->lock);
       /* add remaining requests to the first connected session */
       if (hcm->connected_counter == 1)
        {
-         hc_session->stats.req_per_wrk +=
-           hcm->repeat_count % hcm->max_sessions;
+         hc_session->stats.max_req += hcm->reqs_remainder;
        }
       clib_spinlock_unlock_if_init (&hcm->lock);
     }
   else
     {
-      hc_session->stats.req_per_wrk = hcm->repeat_count;
+      hc_session->stats.max_req = hcm->reqs_per_session;
       hcm->worker_index = s->thread_index;
     }
   if (hcm->filename)
@@ -369,7 +457,7 @@ hc_session_transport_closed_callback (session_t *s)
     }
 
   /* send an event when all sessions are closed */
-  if (hcm->done_count >= hcm->max_sessions)
+  if (hcm->done_count >= (hcm->max_sessions * hcm->max_streams))
     {
       if (hcm->was_transport_closed)
        {
@@ -428,7 +516,10 @@ hc_rx_callback (session_t *s)
 
   max_deq = svm_fifo_max_dequeue_cons (s->rx_fifo);
   if (PREDICT_FALSE (max_deq == 0))
-    goto done;
+    {
+      HTTP_DBG (1, "no data to deq");
+      return 0;
+    }
 
   if (hc_session->to_recv == 0)
     {
@@ -444,6 +535,10 @@ hc_rx_callback (session_t *s)
          return -1;
        }
 
+      HTTP_DBG (1, "hc_session_index[%u]%u %U content-length: %lu",
+               s->thread_index, s->opaque, format_http_status_code, msg.code,
+               msg.data.body_len);
+
       if (msg.data.headers_len)
        {
          http_init_header_table_buf (&hc_session->resp_headers, msg);
@@ -519,6 +614,7 @@ hc_rx_callback (session_t *s)
               svm_fifo_max_dequeue (s->rx_fifo));
   if (!max_deq)
     {
+      HTTP_DBG (1, "body not yet received");
       goto done;
     }
   u32 n_deq = clib_min (hc_session->to_recv, max_deq);
@@ -543,6 +639,7 @@ hc_rx_callback (session_t *s)
   ASSERT (hc_session->to_recv >= rv);
   hc_session->to_recv -= rv;
   hc_session->body_recv += rv;
+  HTTP_DBG (1, "read %u, left to recv %u", n_deq, hc_session->to_recv);
   if (hcm->filename)
     {
       if (hc_session->file_ptr == NULL)
@@ -566,10 +663,28 @@ done:
          hc_session->stats.request_count++;
 
          if (hc_session->stats.elapsed_time >= hcm->duration &&
-             hc_session->stats.request_count >= hc_session->stats.req_per_wrk)
+             hc_session->stats.request_count >= hc_session->stats.max_req)
            {
              HTTP_DBG (1, "repeat done");
-             hc_session_disconnect_callback (s);
+             if (hc_session->session_flags & HC_S_FLAG_IS_PARENT)
+               {
+                 /* parent must be closed last */
+                 if (hc_session->child_count != 0)
+                   hc_session->session_flags |= HC_S_FLAG_IS_CLOSED;
+                 else
+                   hc_session_disconnect_callback (s);
+               }
+             else
+               {
+                 hc_session_disconnect_callback (s);
+                 hc_session_t *parent = hc_session_get (
+                   hc_session->parent_index, hc_session->thread_index);
+                 parent->child_count--;
+                 if (parent->child_count == 0 &&
+                     parent->session_flags & HC_S_FLAG_IS_CLOSED)
+                   hc_session_disconnect_callback (session_get (
+                     parent->http_session_index, parent->thread_index));
+               }
            }
          else
            {
@@ -619,6 +734,14 @@ hc_tx_callback (session_t *s)
   return 0;
 }
 
+static void
+hc_ho_cleanup_callback (session_t *s)
+{
+  HTTP_DBG (1, "ho index %u", s->opaque);
+  hc_worker_t *wrk = hc_worker_get (transport_cl_thread ());
+  pool_put_index (wrk->sessions, s->opaque);
+}
+
 static session_cb_vft_t hc_session_cb_vft = {
   .session_connected_callback = hc_session_connected_callback,
   .session_disconnect_callback = hc_session_disconnect_callback,
@@ -626,6 +749,7 @@ static session_cb_vft_t hc_session_cb_vft = {
   .session_reset_callback = hc_session_reset_callback,
   .builtin_app_rx_callback = hc_rx_callback,
   .builtin_app_tx_callback = hc_tx_callback,
+  .half_open_cleanup_callback = hc_ho_cleanup_callback,
 };
 
 static clib_error_t *
@@ -683,24 +807,30 @@ hc_attach ()
   return 0;
 }
 
-static int
+static void
 hc_connect_rpc (void *rpc_args)
 {
   vnet_connect_args_t *a = rpc_args;
   int rv = ~0;
   hc_main_t *hcm = &hc_main;
+  hc_worker_t *wrk;
+  hc_session_t *ho_hs;
 
   for (u32 i = 0; i < hcm->max_sessions; i++)
     {
+      /* allocate half-open session */
+      wrk = hc_worker_get (transport_cl_thread ());
+      ho_hs = hc_session_alloc (wrk);
+      ho_hs->session_flags |= HC_S_FLAG_IS_PARENT;
+      a->api_context = ho_hs->session_index;
+
       rv = vnet_connect (a);
-      if (rv > 0)
+      if (rv)
        clib_warning (0, "connect returned: %U", format_session_error, rv);
     }
 
   session_endpoint_free_ext_cfgs (&a->sep_ext);
   vec_free (a);
-
-  return rv;
 }
 
 static void
@@ -710,6 +840,7 @@ hc_connect ()
   vnet_connect_args_t *a = 0;
   transport_endpt_ext_cfg_t *ext_cfg;
   transport_endpt_cfg_http_t http_cfg = { (u32) hcm->timeout, 0 };
+
   vec_validate (a, 0);
   clib_memset (a, 0, sizeof (a[0]));
   clib_memcpy (&a->sep_ext, &hcm->connect_sep, sizeof (hcm->connect_sep));
@@ -731,7 +862,7 @@ hc_connect ()
          ext_cfg->crypto.alpn_protos[0] = TLS_ALPN_PROTO_HTTP_1_1;
          break;
        case HTTP_VERSION_2:
-         ext_cfg->crypto.alpn_protos[1] = TLS_ALPN_PROTO_HTTP_2;
+         ext_cfg->crypto.alpn_protos[0] = TLS_ALPN_PROTO_HTTP_2;
          break;
        default:
          break;
@@ -753,7 +884,7 @@ hc_get_req_stats (vlib_main_t *vm)
       hc_session_t *hc_session;
       vec_foreach (wrk, hcm->wrk)
        {
-         vec_foreach (hc_session, wrk->sessions)
+         pool_foreach (hc_session, wrk->sessions)
            {
              hc_stats.request_count += hc_session->stats.request_count;
              hc_session->stats.request_count = 0;
@@ -809,6 +940,9 @@ hc_get_event (vlib_main_t *vm)
     case HC_CONNECT_FAILED:
       err = clib_error_return (0, "error: failed to connect");
       break;
+    case HC_MAX_STREAMS_HIT:
+      err = clib_error_return (0, "error: max streams hit");
+      break;
     case HC_TRANSPORT_CLOSED:
       err = clib_error_return (0, "error: transport closed");
       break;
@@ -922,7 +1056,7 @@ hc_worker_cleanup (hc_worker_t *wrk)
   HTTP_DBG (1, "worker and worker sessions cleanup");
 
   vec_free (wrk->headers_buf);
-  vec_foreach (hc_session, wrk->sessions)
+  pool_foreach (hc_session, wrk->sessions)
     {
       http_free_header_table (&hc_session->resp_headers);
       vec_free (hc_session->http_response);
@@ -963,7 +1097,7 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input,
   hc_main_t *hcm = &hc_main;
   clib_error_t *err = 0;
   unformat_input_t _line_input, *line_input = &_line_input;
-  u64 mem_size;
+  u64 mem_size, repeat_count = 0;
   u8 *appns_id = 0;
   u8 *path = 0;
   u8 *file_data;
@@ -972,13 +1106,13 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input,
   u8 *value;
   int rv;
   hcm->timeout = 10;
-  hcm->repeat_count = 0;
   hcm->duration = 0;
   hcm->repeat = false;
   hcm->multi_session = false;
   hcm->done_count = 0;
   hcm->connected_counter = 0;
   hcm->max_sessions = 1;
+  hcm->max_streams = 1;
   hcm->prealloc_fifos = 0;
   hcm->private_segment_size = 0;
   hcm->fifo_size = 0;
@@ -1034,7 +1168,7 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input,
        hcm->verbose = true;
       else if (unformat (line_input, "timeout %f", &hcm->timeout))
        ;
-      else if (unformat (line_input, "repeat %d", &hcm->repeat_count))
+      else if (unformat (line_input, "repeat %d", &repeat_count))
        {
          hcm->repeat = true;
        }
@@ -1049,6 +1183,14 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input,
              goto done;
            }
        }
+      else if (unformat (line_input, "streams %d", &hcm->max_streams))
+       {
+         if (hcm->max_streams <= 1)
+           {
+             err = clib_error_return (0, "streams must be > 1");
+             goto done;
+           }
+       }
       else if (unformat (line_input, "prealloc-fifos %d",
                         &hcm->prealloc_fifos))
        ;
@@ -1099,7 +1241,7 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input,
        }
     }
 
-  if (hcm->duration && hcm->repeat_count)
+  if (hcm->duration && repeat_count)
     {
       err = clib_error_return (
        0, "combining duration and repeat is not supported");
@@ -1113,6 +1255,21 @@ hc_command_fn (vlib_main_t *vm, unformat_input_t *input,
       goto done;
     }
 
+  if (hcm->max_streams > 1 && !hcm->repeat)
+    {
+      err = clib_error_return (
+       0, "multiple streams are only supported with request repeating");
+      goto done;
+    }
+
+  if (repeat_count)
+    {
+      hcm->reqs_per_session =
+       repeat_count / (hcm->max_sessions * hcm->max_streams);
+      hcm->reqs_remainder =
+       repeat_count % (hcm->max_sessions * hcm->max_streams);
+    }
+
   if ((rv = parse_target ((char **) &hcm->uri, (char **) &hcm->target)))
     {
       err = clib_error_return (0, "target parse error: %U",
index ccf987a..fc1fc81 100644 (file)
@@ -788,6 +788,7 @@ http_transport_enable (vlib_main_t *vm, u8 is_en)
   http_main_t *hm = &http_main;
   u32 num_threads, i;
   http_engine_vft_t *http_version;
+  http_worker_t *wrk;
 
   if (!is_en)
     {
@@ -828,6 +829,10 @@ http_transport_enable (vlib_main_t *vm, u8 is_en)
     }
 
   vec_validate (hm->wrk, num_threads - 1);
+  vec_foreach (wrk, hm->wrk)
+    {
+      clib_spinlock_init (&wrk->pending_stream_connects_lock);
+    }
   vec_validate (hm->rx_bufs, num_threads - 1);
   vec_validate (hm->tx_bufs, num_threads - 1);
   vec_validate (hm->app_header_lists, num_threads - 1);
@@ -858,11 +863,10 @@ http_transport_enable (vlib_main_t *vm, u8 is_en)
 }
 
 static int
-http_transport_connect (transport_endpoint_cfg_t *tep)
+http_connect_connection (session_endpoint_cfg_t *sep)
 {
   vnet_connect_args_t _cargs, *cargs = &_cargs;
   http_main_t *hm = &http_main;
-  session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) tep;
   application_t *app;
   http_conn_t *hc;
   int error;
@@ -942,6 +946,127 @@ http_transport_connect (transport_endpoint_cfg_t *tep)
   return 0;
 }
 
+static int
+http_connect_stream (u64 parent_handle, u32 opaque)
+{
+  session_t *hs;
+  http_req_handle_t rh;
+  u32 hc_index;
+  http_conn_t *hc;
+
+  hs = session_get_from_handle (parent_handle);
+  if (session_type_transport_proto (hs->session_type) != TRANSPORT_PROTO_HTTP)
+    {
+      HTTP_DBG (1, "received incompatible session");
+      return -1;
+    }
+
+  rh.as_u32 = hs->connection_index;
+  if (rh.version != HTTP_VERSION_2)
+    {
+      HTTP_DBG (1, "%U multiplexing not supported", format_http_version,
+               rh.version);
+      return -1;
+    }
+
+  hc_index = http_vfts[rh.version].hc_index_get_by_req_index (
+    rh.req_index, hs->thread_index);
+  HTTP_DBG (1, "hc [%u]%x", hs->thread_index, hc_index);
+
+  hc = http_conn_get_w_thread (hc_index, hs->thread_index);
+
+  if (hc->state == HTTP_CONN_STATE_CLOSED)
+    {
+      HTTP_DBG (1, "conn closed");
+      return -1;
+    }
+
+  return http_vfts[rh.version].conn_connect_stream_callback (hc, opaque);
+}
+
+static void
+http_handle_stream_connects_rpc (void *args)
+{
+  clib_thread_index_t thread_index = pointer_to_uword (args);
+  http_worker_t *wrk;
+  u32 n_pending, max_connects, n_connects = 0;
+  http_pending_connect_stream_t *pc;
+
+  wrk = http_worker_get (thread_index);
+
+  clib_spinlock_lock (&wrk->pending_stream_connects_lock);
+
+  n_pending = clib_fifo_elts (wrk->pending_connect_streams);
+  max_connects = clib_min (32, n_pending);
+  vec_validate (wrk->burst_connect_streams, max_connects);
+
+  while (n_connects < max_connects)
+    clib_fifo_sub1 (wrk->pending_connect_streams,
+                   wrk->burst_connect_streams[n_connects++]);
+
+  clib_spinlock_unlock (&wrk->pending_stream_connects_lock);
+
+  n_connects = 0;
+  while (n_connects < max_connects)
+    {
+      pc = &wrk->burst_connect_streams[n_connects++];
+      http_connect_stream (pc->parent_handle, pc->opaque);
+    }
+
+  /* more work to do? */
+  if (max_connects < n_pending)
+    session_send_rpc_evt_to_thread_force (
+      thread_index, http_handle_stream_connects_rpc,
+      uword_to_pointer ((uword) thread_index, void *));
+}
+
+static int
+http_program_connect_stream (session_endpoint_cfg_t *sep)
+{
+  clib_thread_index_t parent_thread_index =
+    session_thread_from_handle (sep->parent_handle);
+  http_worker_t *wrk;
+  u32 n_pending;
+
+  ASSERT (session_vlib_thread_is_cl_thread ());
+
+  /* if we are already on same worker as parent, handle connect */
+  if (parent_thread_index == transport_cl_thread ())
+    return http_connect_stream (sep->parent_handle, sep->opaque);
+
+  /* if not on same worker as parent, queue request */
+  wrk = http_worker_get (parent_thread_index);
+
+  clib_spinlock_lock (&wrk->pending_stream_connects_lock);
+
+  http_pending_connect_stream_t p = { .parent_handle = sep->parent_handle,
+                                     .opaque = sep->opaque };
+  clib_fifo_add1 (wrk->pending_connect_streams, p);
+  n_pending = clib_fifo_elts (wrk->pending_connect_streams);
+
+  clib_spinlock_unlock (&wrk->pending_stream_connects_lock);
+
+  if (n_pending == 1)
+    session_send_rpc_evt_to_thread_force (
+      parent_thread_index, http_handle_stream_connects_rpc,
+      uword_to_pointer ((uword) parent_thread_index, void *));
+
+  return 0;
+}
+
+static int
+http_transport_connect (transport_endpoint_cfg_t *tep)
+{
+  session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) tep;
+  session_t *hs;
+
+  hs = session_get_from_handle_if_valid (sep->parent_handle);
+  if (hs)
+    return http_program_connect_stream (sep);
+  else
+    return http_connect_connection (sep);
+}
+
 static u32
 http_start_listen (u32 app_listener_index, transport_endpoint_cfg_t *tep)
 {
index e7ddaf3..d8e313b 100644 (file)
@@ -1954,7 +1954,7 @@ http1_transport_connected_callback (http_conn_t *hc)
 
   req = http1_conn_alloc_req (hc);
   http_req_state_change (req, HTTP_REQ_STATE_WAIT_APP_METHOD);
-  return http_conn_established (hc, req);
+  return http_conn_established (hc, req, hc->hc_pa_app_api_ctx);
 }
 
 static void
index 52147b8..4a3ccf3 100644 (file)
@@ -104,7 +104,7 @@ typedef struct http2_conn_ctx_
   u8 *unparsed_headers; /* temporary storing rx fragmented headers */
   u8 *unsent_headers;  /* temporary storing tx fragmented headers */
   u32 unsent_headers_offset;
-  u32 client_req_index;
+  u32 req_num;
 } http2_conn_ctx_t;
 
 typedef struct http2_worker_ctx_
@@ -197,6 +197,7 @@ http2_conn_ctx_free (http_conn_t *hc)
 
   h2c = http2_conn_ctx_get_w_thread (hc);
   HTTP_DBG (1, "h2c [%u]%x", hc->c_thread_index, h2c - wrk->conn_pool);
+  ASSERT (h2c->req_num == 0);
   hash_free (h2c->req_by_stream_id);
   if (hc->flags & HTTP_CONN_F_HAS_REQUEST)
     hpack_dynamic_table_free (&h2c->decoder_dynamic_table);
@@ -238,16 +239,19 @@ http2_conn_alloc_req (http_conn_t *hc)
            h2c - wrk->conn_pool, req_index);
   req->peer_window = h2c->peer_settings.initial_window_size;
   req->our_window = h2c->settings.initial_window_size;
+  h2c->req_num++;
   return req;
 }
 
 static_always_inline void
 http2_req_set_stream_id (http2_req_t *req, http2_conn_ctx_t *h2c,
-                        u32 stream_id)
+                        u32 stream_id, u8 unset_old)
 {
   HTTP_DBG (1, "req_index [%u]%x stream_id %u", req->base.c_thread_index,
            ((http_req_handle_t) req->base.hr_req_handle).req_index,
            stream_id);
+  if (unset_old && req->stream_id)
+    hash_unset (h2c->req_by_stream_id, req->stream_id);
   req->stream_id = stream_id;
   hash_set (h2c->req_by_stream_id, stream_id,
            ((http_req_handle_t) req->base.hr_req_handle).req_index);
@@ -273,6 +277,7 @@ http2_conn_free_req (http2_conn_ctx_t *h2c, http2_req_t *req,
   if (CLIB_DEBUG)
     memset (req, 0xba, sizeof (*req));
   pool_put (wrk->req_pool, req);
+  h2c->req_num--;
 }
 
 static inline void
@@ -284,9 +289,7 @@ http2_conn_reset_req (http2_conn_ctx_t *h2c, http2_req_t *req,
   if (clib_llist_elt_is_linked (req, sched_list))
     clib_llist_remove (wrk->req_pool, sched_list, req);
   http_buffer_free (&req->base.tx_buf);
-  if (req->stream_id)
-    hash_unset (h2c->req_by_stream_id, req->stream_id);
-  req->flags = 0;
+  req->flags &= ~HTTP2_REQ_F_NEED_WINDOW_UPDATE;
   req->stream_state = HTTP2_STREAM_STATE_IDLE;
   req->peer_window = h2c->peer_settings.initial_window_size;
   req->our_window = h2c->settings.initial_window_size;
@@ -429,11 +432,11 @@ http2_connection_error (http_conn_t *hc, http2_error_t error,
        }
       else
        {
-         if (hc->flags & HTTP_CONN_F_HAS_REQUEST)
-           {
-             req = http2_req_get (h2c->client_req_index, hc->c_thread_index);
-             session_transport_reset_notify (&req->base.connection);
-           }
+         hash_foreach (stream_id, req_index, h2c->req_by_stream_id, ({
+                         req = http2_req_get (req_index, hc->c_thread_index);
+                         session_transport_reset_notify (
+                           &req->base.connection);
+                       }));
        }
     }
   if (clib_llist_elt_is_linked (h2c, sched_list))
@@ -1088,7 +1091,7 @@ http2_sched_dispatch_req_headers (http2_req_t *req, http_conn_t *hc,
   max_write = clib_min (max_write, h2c->peer_settings.max_frame_size);
 
   stream_id = http2_conn_get_next_stream_id (h2c);
-  http2_req_set_stream_id (req, h2c, stream_id);
+  http2_req_set_stream_id (req, h2c, stream_id, 1);
 
   http_io_as_dequeue_notify (&req->base, n_deq);
 
@@ -1590,6 +1593,7 @@ http2_req_state_transport_io_more_data (http_conn_t *hc, http2_req_t *req,
          transport_connection_reschedule (&req->base.connection);
          h2c = http2_conn_ctx_get_w_thread (hc);
          http2_conn_reset_req (h2c, req, hc->c_thread_index);
+         http_io_as_del_want_read_ntf (&req->base);
        }
     }
   http_io_as_write (&req->base, req->payload, req->payload_len);
@@ -1639,8 +1643,8 @@ http2_req_state_udp_tunnel_rx (http_conn_t *hc, http2_req_t *req,
                               http2_error_t *error)
 {
   int rv;
-  u8 payload_offset;
-  u64 payload_len;
+  u8 payload_offset = 0;
+  u64 payload_len = 0;
   session_dgram_hdr_t hdr;
 
   HTTP_DBG (1, "udp tunnel received data from client");
@@ -1898,8 +1902,7 @@ http2_handle_headers_frame (http_conn_t *hc, http2_frame_header_t *fh)
          return HTTP2_ERROR_STREAM_CLOSED;
        }
       h2c->last_opened_stream_id = fh->stream_id;
-      if (hash_elts (h2c->req_by_stream_id) ==
-         h2c->settings.max_concurrent_streams)
+      if (h2c->req_num == h2c->settings.max_concurrent_streams)
        {
          HTTP_DBG (1, "SETTINGS_MAX_CONCURRENT_STREAMS exceeded");
          http_io_ts_drain (hc, fh->length);
@@ -1908,7 +1911,7 @@ http2_handle_headers_frame (http_conn_t *hc, http2_frame_header_t *fh)
          return HTTP2_ERROR_NO_ERROR;
        }
       req = http2_conn_alloc_req (hc);
-      http2_req_set_stream_id (req, h2c, fh->stream_id);
+      http2_req_set_stream_id (req, h2c, fh->stream_id, 0);
       req->dispatch_headers_cb = http2_sched_dispatch_resp_headers;
       http_conn_accept_request (hc, &req->base);
       http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_TRANSPORT_METHOD);
@@ -2150,7 +2153,9 @@ http2_handle_data_frame (http_conn_t *hc, http2_frame_header_t *fh)
   req->our_window -= fh->length;
   h2c->our_window -= fh->length;
 
-  HTTP_DBG (1, "run state machine");
+  HTTP_DBG (1, "run state machine '%U' req_index %x", format_http_req_state,
+           req->base.state,
+           ((http_req_handle_t) req->base.hr_req_handle).req_index);
   return http2_req_run_state_machine (hc, req, 0, 0);
 }
 
@@ -2272,14 +2277,13 @@ http2_handle_settings_frame (http_conn_t *hc, http2_frame_header_t *fh)
          h2c->flags &= ~HTTP2_CONN_F_EXPECT_SERVER_SETTINGS;
          HTTP_DBG (1, "client connection established");
          req = http2_conn_alloc_req (hc);
-         h2c->client_req_index =
-           ((http_req_handle_t) req->base.hr_req_handle).req_index;
+         req->flags |= HTTP2_REQ_F_IS_PARENT;
          hc->flags |= HTTP_CONN_F_HAS_REQUEST;
          hpack_dynamic_table_init (
            &h2c->decoder_dynamic_table,
            http2_default_conn_settings.header_table_size);
          http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_APP_METHOD);
-         if (http_conn_established (hc, &req->base))
+         if (http_conn_established (hc, &req->base, hc->hc_pa_app_api_ctx))
            return HTTP2_ERROR_INTERNAL_ERROR;
        }
 
@@ -2388,10 +2392,11 @@ http2_handle_goaway_frame (http_conn_t *hc, http2_frame_header_t *fh)
       if (!(hc->flags & HTTP_CONN_F_IS_SERVER))
        {
          ASSERT (hc->flags & HTTP_CONN_F_HAS_REQUEST);
-         req = http2_req_get (h2c->client_req_index, hc->c_thread_index);
-         if (!req)
-           return HTTP2_ERROR_NO_ERROR;
-         session_transport_closed_notify (&req->base.connection);
+         hash_foreach (stream_id, req_index, h2c->req_by_stream_id, ({
+                         req = http2_req_get (req_index, hc->c_thread_index);
+                         session_transport_closed_notify (
+                           &req->base.connection);
+                       }));
        }
     }
   else
@@ -2635,6 +2640,12 @@ http2_app_close_callback (http_conn_t *hc, u32 req_index,
     {
       HTTP_DBG (1, "nothing more to send, confirm close");
       session_transport_closed_notify (&req->base.connection);
+      if (req->flags & HTTP2_REQ_F_IS_PARENT)
+       {
+         HTTP_DBG (1, "client app closed parent, closing connection");
+         ASSERT (!(hc->flags & HTTP_CONN_F_IS_SERVER));
+         http_shutdown_transport (hc);
+       }
     }
   else if (req->base.is_tunnel)
     {
@@ -2950,6 +2961,28 @@ http2_conn_accept_callback (http_conn_t *hc)
     h2c->flags |= HTTP2_CONN_F_PREFACE_VERIFIED;
 }
 
+static int
+http2_conn_connect_stream_callback (http_conn_t *hc, u32 parent_app_api_ctx)
+{
+  http2_conn_ctx_t *h2c;
+  http2_req_t *req;
+  app_worker_t *app_wrk;
+
+  HTTP_DBG (1, "hc [%u]%x", hc->c_thread_index, hc->hc_hc_index);
+  h2c = http2_conn_ctx_get_w_thread (hc);
+  ASSERT (!(hc->flags & HTTP_CONN_F_IS_SERVER));
+  ASSERT (!(h2c->flags & HTTP2_CONN_F_EXPECT_SERVER_SETTINGS));
+  app_wrk = app_worker_get_if_valid (hc->hc_pa_wrk_index);
+  if (!app_wrk)
+    return -1;
+  if (h2c->req_num == h2c->settings.max_concurrent_streams)
+    return app_worker_connect_notify (app_wrk, 0, SESSION_E_MAX_STREAMS_HIT,
+                                     hc->hc_pa_app_api_ctx);
+  req = http2_conn_alloc_req (hc);
+  http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_APP_METHOD);
+  return http_conn_established (hc, &req->base, parent_app_api_ctx);
+}
+
 static void
 http2_conn_cleanup_callback (http_conn_t *hc)
 {
@@ -2959,14 +2992,8 @@ http2_conn_cleanup_callback (http_conn_t *hc)
 
   HTTP_DBG (1, "hc [%u]%x", hc->c_thread_index, hc->hc_hc_index);
   h2c = http2_conn_ctx_get_w_thread (hc);
-  if (hc->flags & HTTP_CONN_F_IS_SERVER)
-    hash_foreach (stream_id, req_index, h2c->req_by_stream_id,
-                 ({ vec_add1 (req_indices, req_index); }));
-  else
-    {
-      if (hc->flags & HTTP_CONN_F_HAS_REQUEST)
-       vec_add1 (req_indices, h2c->client_req_index);
-    }
+  hash_foreach (stream_id, req_index, h2c->req_by_stream_id,
+               ({ vec_add1 (req_indices, req_index); }));
 
   vec_foreach (req_index_p, req_indices)
     {
@@ -3074,6 +3101,7 @@ const static http_engine_vft_t http2_engine = {
   .transport_conn_reschedule_callback =
     http2_transport_conn_reschedule_callback,
   .conn_accept_callback = http2_conn_accept_callback,
+  .conn_connect_stream_callback = http2_conn_connect_stream_callback,
   .conn_cleanup_callback = http2_conn_cleanup_callback,
   .enable_callback = http2_enable_callback,
   .unformat_cfg_callback = http2_unformat_config_callback,
index b6a63b9..f6666af 100644 (file)
@@ -209,9 +209,18 @@ typedef struct http_tc_
   void *opaque; /* version specific data */
 } http_conn_t;
 
+typedef struct http_pending_connect_stream_
+{
+  u64 parent_handle;
+  u32 opaque;
+} http_pending_connect_stream_t;
+
 typedef struct http_worker_
 {
   http_conn_t *conn_pool;
+  clib_spinlock_t pending_stream_connects_lock;
+  http_pending_connect_stream_t *pending_connect_streams;
+  http_pending_connect_stream_t *burst_connect_streams;
 } http_worker_t;
 
 typedef struct http_main_
@@ -265,6 +274,8 @@ typedef struct http_engine_vft_
   void (*transport_reset_callback) (http_conn_t *hc);
   void (*transport_conn_reschedule_callback) (http_conn_t *hc);
   void (*conn_accept_callback) (http_conn_t *hc); /* optional */
+  int (*conn_connect_stream_callback) (http_conn_t *hc,
+                                      u32 parent_app_api_ctx); /* optional */
   void (*conn_cleanup_callback) (http_conn_t *hc);
   void (*enable_callback) (void);                          /* optional */
   uword (*unformat_cfg_callback) (unformat_input_t *input); /* optional */
@@ -556,6 +567,14 @@ http_io_as_add_want_read_ntf (http_req_t *req)
                                            SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY);
 }
 
+always_inline void
+http_io_as_del_want_read_ntf (http_req_t *req)
+{
+  session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+  svm_fifo_del_want_deq_ntf (as->rx_fifo, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL |
+                                           SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY);
+}
+
 always_inline void
 http_io_as_reset_has_read_ntf (http_req_t *req)
 {
@@ -859,7 +878,8 @@ http_conn_accept_request (http_conn_t *hc, http_req_t *req)
 }
 
 always_inline int
-http_conn_established (http_conn_t *hc, http_req_t *req)
+http_conn_established (http_conn_t *hc, http_req_t *req,
+                      u32 parent_app_api_ctx)
 {
   session_t *as;
   app_worker_t *app_wrk;
@@ -873,7 +893,7 @@ http_conn_established (http_conn_t *hc, http_req_t *req)
   as->app_wrk_index = hc->hc_pa_wrk_index;
   as->connection_index = req->hr_req_handle;
   as->session_state = SESSION_STATE_READY;
-  as->opaque = hc->hc_pa_app_api_ctx;
+  as->opaque = parent_app_api_ctx;
   ts = session_get_from_handle (hc->hc_tc_session_handle);
   as->session_type = session_type_from_proto_and_ip (
     TRANSPORT_PROTO_HTTP, session_type_is_ip4 (ts->session_type));
@@ -895,7 +915,7 @@ http_conn_established (http_conn_t *hc, http_req_t *req)
       return rv;
     }
 
-  app_worker_connect_notify (app_wrk, as, 0, hc->hc_pa_app_api_ctx);
+  app_worker_connect_notify (app_wrk, as, 0, parent_app_api_ctx);
 
   req->hr_pa_session_handle = session_handle (as);
   req->hr_pa_wrk_index = as->app_wrk_index;