http: add transport connect_stream callback 09/43909/9
authorMatus Fabian <[email protected]>
Sat, 18 Oct 2025 18:42:18 +0000 (14:42 -0400)
committerFlorin Coras <[email protected]>
Thu, 23 Oct 2025 07:59:17 +0000 (07:59 +0000)
app now open HTTP/2 streams via vnet_connect_stream instead of
vnet_connect

Type: improvement

Change-Id: Icc9a948d7c2a7d50c9d83fefb10f267fb56e4367
Signed-off-by: Matus Fabian <[email protected]>
src/plugins/hs_apps/http_client.c
src/plugins/hs_apps/http_connect_proxy_client.c
src/plugins/http/http.c
src/plugins/http/http1.c
src/plugins/http/http2/http2.c
src/plugins/http/http_private.h

index 36ee5fb..1ef25a2 100644 (file)
@@ -166,8 +166,7 @@ hc_session_alloc (hc_worker_t *wrk)
 }
 
 static int
-hc_request (session_t *s, hc_worker_t *wrk, hc_session_t *hc_session,
-           session_error_t err)
+hc_request (session_t *s, hc_worker_t *wrk, hc_session_t *hc_session)
 {
   hc_main_t *hcm = &hc_main;
   u64 to_send;
@@ -231,54 +230,51 @@ done:
   return 0;
 }
 
-typedef struct
-{
-  u64 parent_handle;
-  u32 parent_index;
-} hc_connect_streams_args_t;
-
-static void
-hc_connect_streams_rpc (void *rpc_args)
+static int
+hc_connect_streams (u64 parent_handle, u32 parent_index)
 {
-  hc_connect_streams_args_t *args = rpc_args;
   hc_main_t *hcm = &hc_main;
   vnet_connect_args_t _a, *a = &_a;
   hc_worker_t *wrk;
-  hc_session_t *ho_hs;
+  hc_session_t *hs;
   u32 i;
   int rv;
+  session_t *s;
 
   clib_memset (a, 0, sizeof (*a));
   clib_memcpy (&a->sep_ext, &hcm->connect_sep, sizeof (hcm->connect_sep));
-  a->sep_ext.parent_handle = args->parent_handle;
+  a->sep_ext.parent_handle = parent_handle;
   a->app_index = hcm->app_index;
 
+  wrk = hc_worker_get (session_thread_from_handle (parent_handle));
+
   for (i = 0; i < (hcm->max_streams - 1); i++)
     {
-      /* allocate half-open session */
-      wrk = hc_worker_get (transport_cl_thread ());
-      ho_hs = hc_session_alloc (wrk);
-      ho_hs->parent_index = args->parent_index;
-      a->api_context = ho_hs->session_index;
+      hs = hc_session_alloc (wrk);
+      hs->parent_index = parent_index;
+      a->api_context = hs->session_index;
 
-      rv = vnet_connect (a);
+      rv = vnet_connect_stream (a);
       if (rv)
-       clib_warning (0, "connect returned: %U", format_session_error, rv);
+       {
+         clib_warning (0, "connect returned: %U", format_session_error, rv);
+         if (rv == SESSION_E_MAX_STREAMS_HIT)
+           vlib_process_signal_event_mt (
+             vlib_get_main (), hcm->cli_node_index, HC_MAX_STREAMS_HIT, 0);
+         else
+           vlib_process_signal_event_mt (
+             vlib_get_main (), hcm->cli_node_index, HC_CONNECT_FAILED, 0);
+         return -1;
+       }
+      s = session_get_from_handle (a->sh);
+      hs->http_session_index = s->session_index;
+      hs->stats.max_req = hcm->reqs_per_session;
+      hs->stats.start = vlib_time_now (wrk->vlib_main);
+      if (hc_request (s, wrk, hs))
+       return -1;
     }
-  vec_free (args);
-}
-
-static void
-hc_connect_streams (u64 parent_handle, u32 parent_index)
-{
-  hc_connect_streams_args_t *args = 0;
 
-  vec_validate (args, 0);
-  args->parent_handle = parent_handle;
-  args->parent_index = parent_index;
-
-  session_send_rpc_evt_to_thread_force (transport_cl_thread (),
-                                       hc_connect_streams_rpc, args);
+  return 0;
 }
 
 static int
@@ -287,7 +283,7 @@ hc_session_connected_callback (u32 app_index, u32 ho_index, session_t *s,
 {
   hc_main_t *hcm = &hc_main;
   hc_worker_t *wrk;
-  hc_session_t *hc_session, *ho_session, *parent_session;
+  hc_session_t *hc_session, *ho_session;
   hc_http_header_t *header;
   http_version_t http_version;
   u8 *f = 0;
@@ -296,12 +292,8 @@ hc_session_connected_callback (u32 app_index, u32 ho_index, session_t *s,
   if (err)
     {
       clib_warning ("connected error: %U", format_session_error, err);
-      if (err == SESSION_E_MAX_STREAMS_HIT)
-       vlib_process_signal_event_mt (vlib_get_main (), hcm->cli_node_index,
-                                     HC_MAX_STREAMS_HIT, 0);
-      else
-       vlib_process_signal_event_mt (vlib_get_main (), hcm->cli_node_index,
-                                     HC_CONNECT_FAILED, 0);
+      vlib_process_signal_event_mt (vlib_get_main (), hcm->cli_node_index,
+                                   HC_CONNECT_FAILED, 0);
       return -1;
     }
 
@@ -318,24 +310,6 @@ hc_session_connected_callback (u32 app_index, u32 ho_index, session_t *s,
   hcm->connected_counter++;
   clib_spinlock_unlock_if_init (&hcm->lock);
 
-  if (hc_session->session_flags & HC_S_FLAG_IS_PARENT)
-    {
-      http_version = http_session_get_version (s);
-      if (http_version == HTTP_VERSION_2 && hcm->max_streams > 1)
-       {
-         HTTP_DBG (1, "parent connected, going to open %u streams",
-                   hcm->max_streams - 1);
-         hc_connect_streams (session_handle (s), hc_session->session_index);
-       }
-    }
-  else
-    {
-      parent_session =
-       hc_session_get (hc_session->parent_index, hc_session->thread_index);
-      parent_session->child_count++;
-    }
-
-  hc_session->thread_index = s->thread_index;
   hc_session->body_recv = 0;
   s->opaque = hc_session->session_index;
   wrk->session_index = hc_session->session_index;
@@ -422,7 +396,21 @@ hc_session_connected_callback (u32 app_index, u32 ho_index, session_t *s,
 
   hc_session->stats.start = vlib_time_now (wrk->vlib_main);
 
-  return hc_request (s, wrk, hc_session, err);
+  if (hc_request (s, wrk, hc_session))
+    return -1;
+
+  http_version = http_session_get_version (s);
+  if (http_version == HTTP_VERSION_2 && hcm->max_streams > 1)
+    {
+      ASSERT (hc_session->session_flags & HC_S_FLAG_IS_PARENT);
+      HTTP_DBG (1, "parent connected, going to open %u streams",
+               hcm->max_streams - 1);
+      hc_session->child_count = hcm->max_streams - 1;
+      if (hc_connect_streams (session_handle (s), hc_session->session_index))
+       return -1;
+    }
+
+  return 0;
 }
 
 static void
@@ -503,7 +491,6 @@ hc_rx_callback (session_t *s)
   http_msg_t msg;
   int rv;
   u32 max_deq;
-  session_error_t session_err = 0;
   int send_err = 0;
   http_version_t http_version;
 
@@ -688,7 +675,7 @@ done:
          else
            {
              HTTP_DBG (1, "doing another repeat");
-             send_err = hc_request (s, wrk, hc_session, session_err);
+             send_err = hc_request (s, wrk, hc_session);
              if (send_err)
                clib_warning ("failed to send request, error %d", send_err);
            }
index ab96626..cb9bdd1 100644 (file)
@@ -823,6 +823,8 @@ hcpc_open_http_stream (u32 session_index)
   hcpc_main_t *hcpcm = &hcpc_main;
   vnet_connect_args_t _a, *a = &_a;
   session_error_t rv;
+  session_t *s;
+  hcpc_session_t *ps;
 
   clib_memset (a, 0, sizeof (*a));
   clib_memcpy (&a->sep_ext, &hcpcm->proxy_server_sep,
@@ -831,9 +833,74 @@ hcpc_open_http_stream (u32 session_index)
   a->app_index = hcpcm->http_app_index;
   a->api_context = session_index;
 
-  rv = vnet_connect (a);
+  rv = vnet_connect_stream (a);
   if (rv)
-    clib_warning ("connect returned: %U", format_session_error, rv);
+    {
+      clib_warning ("session %u connect error: %U", session_index,
+                   format_session_error, rv);
+      clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
+      ps = hcpc_session_get (session_index);
+      ASSERT (ps);
+      ps->state = HCPC_SESSION_CLOSED;
+      ps->http_disconnected = 1;
+      ps->http_establishing = 0;
+      if (!ps->intercept_diconnected)
+       {
+         ps->intercept.rx_fifo->master_thread_index =
+           ps->intercept.tx_fifo->master_thread_index;
+         if (ps->intercept.session_handle != SESSION_INVALID_HANDLE)
+           {
+             session_reset (
+               session_get_from_handle (ps->intercept.session_handle));
+             ps->intercept_diconnected = 1;
+           }
+         else
+           {
+             if (session_thread_from_handle (hcpcm->http_connection_handle) !=
+                 ps->intercept.tx_fifo->master_thread_index)
+               {
+                 session_send_rpc_evt_to_thread (
+                   ps->intercept.tx_fifo->master_thread_index,
+                   hcpc_session_postponed_free_rpc,
+                   uword_to_pointer (ps->session_index, void *));
+               }
+             else
+               hcpc_session_free (ps);
+           }
+         if (rv == SESSION_E_MAX_STREAMS_HIT)
+           hcpc_worker_stats_inc (vlib_get_thread_index (), max_streams_hit,
+                                  1);
+       }
+      clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+      return;
+    }
+
+  HCPC_DBG ("stream for session [%u] opened", session_index);
+  s = session_get_from_handle (a->sh);
+  clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
+  ps = hcpc_session_get (session_index);
+  ASSERT (ps);
+  ps->http.session_handle = session_handle (s);
+  ps->http.rx_fifo = s->rx_fifo;
+  ps->http.tx_fifo = s->tx_fifo;
+
+  /* listener session was already closed */
+  if (ps->intercept_diconnected)
+    {
+      hcpc_worker_stats_inc (s->thread_index,
+                            client_closed_before_stream_opened, 1);
+      session_reset (s);
+      ps->http_establishing = 0;
+      ps->http_disconnected = 1;
+      clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+      return;
+    }
+
+  clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+
+  if (svm_fifo_max_dequeue (s->tx_fifo))
+    if (svm_fifo_set_event (s->tx_fifo))
+      session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX);
 }
 
 static void
@@ -847,8 +914,10 @@ hcpc_connect_http_stream_rpc (void *rpc_args)
 static void
 hcpc_connect_http_stream (u32 session_index)
 {
-  u32 connects_thread = transport_cl_thread (), thread_index;
+  hcpc_main_t *hcpcm = &hcpc_main;
+  u32 connects_thread, thread_index;
 
+  connects_thread = session_thread_from_handle (hcpcm->http_connection_handle);
   thread_index = vlib_get_thread_index ();
 
   if (thread_index == connects_thread)
@@ -858,7 +927,7 @@ hcpc_connect_http_stream (u32 session_index)
     }
 
   session_send_rpc_evt_to_thread_force (
-    transport_cl_thread (), hcpc_connect_http_stream_rpc,
+    connects_thread, hcpc_connect_http_stream_rpc,
     uword_to_pointer (session_index, void *));
 }
 
@@ -1039,82 +1108,27 @@ hcpc_http_session_connected_callback (u32 app_index, u32 session_index,
   hcpc_main_t *hcpcm = &hcpc_main;
   hcpc_session_t *ps;
 
+  ASSERT (hcpcm->http_connection_handle == SESSION_INVALID_HANDLE);
+
   if (err)
     {
-      clib_warning ("session %u connect error: %U", session_index,
+      clib_warning ("connect to http proxy server failed: %U",
                    format_session_error, err);
-
-      /* connect to http proxy server failed */
-      if (hcpcm->http_connection_handle == SESSION_INVALID_HANDLE)
-       return 0;
-
-      clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
-      ps = hcpc_session_get (session_index);
-      ASSERT (ps);
-      ps->state = HCPC_SESSION_CLOSED;
-      ps->http_disconnected = 1;
-      ps->http_establishing = 0;
-      if (!ps->intercept_diconnected)
-       {
-         if (ps->intercept.session_handle != SESSION_INVALID_HANDLE)
-           {
-             session_reset (
-               session_get_from_handle (ps->intercept.session_handle));
-             ps->intercept_diconnected = 1;
-           }
-         else
-           hcpc_delete_session (s, 1);
-       }
-      clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
-      if (err == SESSION_E_MAX_STREAMS_HIT)
-       hcpc_worker_stats_inc (vlib_get_thread_index (), max_streams_hit, 1);
-      return 0;
-    }
-
-  if (hcpcm->http_connection_handle == SESSION_INVALID_HANDLE)
-    {
-      HCPC_DBG ("parent session connected");
-      clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
-      ps = hcpc_session_alloc ();
-      ps->http.session_handle = session_handle (s);
-      ps->http.rx_fifo = s->rx_fifo;
-      ps->http.tx_fifo = s->tx_fifo;
-      ps->flags |= HCPC_SESSION_F_IS_PARENT;
-      clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
-      s->opaque = ps->session_index;
-      hcpcm->http_connection_handle = session_handle (s);
-      vlib_process_signal_event_mt (vlib_get_main (),
-                                   hcpcm->process_node_index,
-                                   HCPC_EVENT_PROXY_CONNECTED, 0);
       return 0;
     }
 
-  HCPC_DBG ("stream for session [%u] opened", session_index);
+  HCPC_DBG ("parent session connected");
   clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
-  ps = hcpc_session_get (session_index);
-  ASSERT (ps);
+  ps = hcpc_session_alloc ();
   ps->http.session_handle = session_handle (s);
   ps->http.rx_fifo = s->rx_fifo;
   ps->http.tx_fifo = s->tx_fifo;
-
-  /* listener session was already closed */
-  if (ps->intercept_diconnected)
-    {
-      hcpc_worker_stats_inc (s->thread_index,
-                            client_closed_before_stream_opened, 1);
-      session_reset (s);
-      ps->http_establishing = 0;
-      ps->http_disconnected = 1;
-      clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
-      return -1;
-    }
-
+  ps->flags |= HCPC_SESSION_F_IS_PARENT;
   clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
-
-  if (svm_fifo_max_dequeue (s->tx_fifo))
-    if (svm_fifo_set_event (s->tx_fifo))
-      session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX);
-
+  s->opaque = ps->session_index;
+  hcpcm->http_connection_handle = session_handle (s);
+  vlib_process_signal_event_mt (vlib_get_main (), hcpcm->process_node_index,
+                               HCPC_EVENT_PROXY_CONNECTED, 0);
   return 0;
 }
 
index 1ec48ef..b4697d5 100644 (file)
@@ -300,7 +300,7 @@ http_get_app_header_list (http_req_t *req, http_msg_t *msg)
   u8 *app_headers;
   int rv;
 
-  as = session_get_from_handle (req->hr_pa_session_handle);
+  as = session_get (req->c_s_index, req->c_thread_index);
 
   if (msg->data.type == HTTP_MSG_DATA_PTR)
     {
@@ -328,7 +328,7 @@ http_get_app_target (http_req_t *req, http_msg_t *msg)
   u8 *target;
   int rv;
 
-  as = session_get_from_handle (req->hr_pa_session_handle);
+  as = session_get (req->c_s_index, req->c_thread_index);
 
   if (msg->data.type == HTTP_MSG_DATA_PTR)
     {
@@ -371,7 +371,7 @@ http_get_rx_buf (http_conn_t *hc)
 void
 http_req_tx_buffer_init (http_req_t *req, http_msg_t *msg)
 {
-  session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+  session_t *as = session_get (req->c_s_index, req->c_thread_index);
   http_buffer_init (&req->tx_buf, msg_to_buf_type[msg->data.type], as->tx_fifo,
                    msg->data.body_len);
 }
@@ -828,7 +828,6 @@ http_transport_enable (vlib_main_t *vm, u8 is_en)
   vec_validate (hm->wrk, num_threads - 1);
   vec_foreach (wrk, hm->wrk)
     {
-      clib_spinlock_init (&wrk->pending_stream_connects_lock);
       clib_memset (&wrk->stats, 0, sizeof (wrk->stats));
     }
   vec_validate (hm->rx_bufs, num_threads - 1);
@@ -951,7 +950,7 @@ http_connect_connection (session_endpoint_cfg_t *sep)
 }
 
 static int
-http_connect_stream (u64 parent_handle, u32 opaque)
+http_connect_stream (u64 parent_handle, u32 *req_index)
 {
   session_t *hs;
   http_req_handle_t rh;
@@ -985,90 +984,26 @@ http_connect_stream (u64 parent_handle, u32 opaque)
       return -1;
     }
 
-  return http_vfts[rh.version].conn_connect_stream_callback (hc, opaque);
-}
-
-static void
-http_handle_stream_connects_rpc (void *args)
-{
-  clib_thread_index_t thread_index = pointer_to_uword (args);
-  http_worker_t *wrk;
-  u32 n_pending, max_connects, n_connects = 0;
-  http_pending_connect_stream_t *pc;
-
-  wrk = http_worker_get (thread_index);
-
-  clib_spinlock_lock (&wrk->pending_stream_connects_lock);
-
-  n_pending = clib_fifo_elts (wrk->pending_connect_streams);
-  max_connects = clib_min (32, n_pending);
-  vec_validate (wrk->burst_connect_streams, max_connects);
-
-  while (n_connects < max_connects)
-    clib_fifo_sub1 (wrk->pending_connect_streams,
-                   wrk->burst_connect_streams[n_connects++]);
-
-  clib_spinlock_unlock (&wrk->pending_stream_connects_lock);
-
-  n_connects = 0;
-  while (n_connects < max_connects)
-    {
-      pc = &wrk->burst_connect_streams[n_connects++];
-      http_connect_stream (pc->parent_handle, pc->opaque);
-    }
-
-  /* more work to do? */
-  if (max_connects < n_pending)
-    session_send_rpc_evt_to_thread_force (
-      thread_index, http_handle_stream_connects_rpc,
-      uword_to_pointer ((uword) thread_index, void *));
+  return http_vfts[rh.version].conn_connect_stream_callback (hc, req_index);
 }
 
 static int
-http_program_connect_stream (session_endpoint_cfg_t *sep)
+http_transport_connect (transport_endpoint_cfg_t *tep)
 {
-  clib_thread_index_t parent_thread_index =
-    session_thread_from_handle (sep->parent_handle);
-  http_worker_t *wrk;
-  u32 n_pending;
-
-  ASSERT (session_vlib_thread_is_cl_thread ());
-
-  /* if we are already on same worker as parent, handle connect */
-  if (parent_thread_index == transport_cl_thread ())
-    return http_connect_stream (sep->parent_handle, sep->opaque);
-
-  /* if not on same worker as parent, queue request */
-  wrk = http_worker_get (parent_thread_index);
-
-  clib_spinlock_lock (&wrk->pending_stream_connects_lock);
-
-  http_pending_connect_stream_t p = { .parent_handle = sep->parent_handle,
-                                     .opaque = sep->opaque };
-  clib_fifo_add1 (wrk->pending_connect_streams, p);
-  n_pending = clib_fifo_elts (wrk->pending_connect_streams);
-
-  clib_spinlock_unlock (&wrk->pending_stream_connects_lock);
-
-  if (n_pending == 1)
-    session_send_rpc_evt_to_thread_force (
-      parent_thread_index, http_handle_stream_connects_rpc,
-      uword_to_pointer ((uword) parent_thread_index, void *));
+  session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) tep;
 
-  return 0;
+  ASSERT (sep->parent_handle == SESSION_INVALID_HANDLE);
+  return http_connect_connection (sep);
 }
 
 static int
-http_transport_connect (transport_endpoint_cfg_t *tep)
+http_transport_connect_stream (transport_endpoint_cfg_t *tep,
+                              CLIB_UNUSED (session_t *stream_session),
+                              u32 *conn_index)
 {
   session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) tep;
-  session_t *hs;
 
-  hs = session_get_from_handle_if_valid (sep->parent_handle);
-  if (hs)
-    return http_program_connect_stream (sep);
-  else
-    return http_connect_connection (sep);
+  return http_connect_stream (sep->parent_handle, conn_index);
 }
 
 static u32
@@ -1443,6 +1378,7 @@ http_transport_cleanup_ho (u32 ho_hc_index)
 static const transport_proto_vft_t http_proto = {
   .enable = http_transport_enable,
   .connect = http_transport_connect,
+  .connect_stream = http_transport_connect_stream,
   .start_listen = http_start_listen,
   .stop_listen = http_stop_listen,
   .half_close = http_transport_shutdown,
index 19cc187..af9eec4 100644 (file)
@@ -77,7 +77,7 @@ http1_conn_alloc_req (http_conn_t *hc)
   pool_get_aligned_safe (h1m->req_pool[hc->c_thread_index], req,
                         CLIB_CACHE_LINE_BYTES);
   clib_memset (req, 0, sizeof (*req));
-  req->hr_pa_session_handle = SESSION_INVALID_HANDLE;
+  req->c_s_index = SESSION_INVALID_INDEX;
   req_index = req - h1m->req_pool[hc->c_thread_index];
   hr_handle.version = HTTP_VERSION_1;
   hr_handle.req_index = req_index;
@@ -1967,7 +1967,7 @@ http1_transport_connected_callback (http_conn_t *hc)
   req = http1_conn_alloc_req (hc);
   http_req_state_change (req, HTTP_REQ_STATE_WAIT_APP_METHOD);
   http_stats_connections_established_inc (hc->c_thread_index);
-  return http_conn_established (hc, req, hc->hc_pa_app_api_ctx, 0);
+  return http_conn_established (hc, req, hc->hc_pa_app_api_ctx);
 }
 
 static void
index 45d35cb..286c1fc 100644 (file)
@@ -251,7 +251,7 @@ http2_conn_alloc_req (http_conn_t *hc, u8 is_parent)
 
   pool_get_aligned_safe (wrk->req_pool, req, CLIB_CACHE_LINE_BYTES);
   clib_memset (req, 0, sizeof (*req));
-  req->base.hr_pa_session_handle = SESSION_INVALID_HANDLE;
+  req->base.c_s_index = SESSION_INVALID_INDEX;
   req_index = req - wrk->req_pool;
   hr_handle.version = HTTP_VERSION_2;
   hr_handle.req_index = req_index;
@@ -2450,7 +2450,7 @@ http2_handle_settings_frame (http_conn_t *hc, http2_frame_header_t *fh)
            http2_default_conn_settings.header_table_size);
          http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_APP_METHOD);
          http_stats_connections_established_inc (hc->c_thread_index);
-         if (http_conn_established (hc, &req->base, hc->hc_pa_app_api_ctx, 0))
+         if (http_conn_established (hc, &req->base, hc->hc_pa_app_api_ctx))
            return HTTP2_ERROR_INTERNAL_ERROR;
        }
 
@@ -3278,29 +3278,22 @@ http2_conn_accept_callback (http_conn_t *hc)
 }
 
 static int
-http2_conn_connect_stream_callback (http_conn_t *hc, u32 parent_app_api_ctx)
+http2_conn_connect_stream_callback (http_conn_t *hc, u32 *req_index)
 {
   http2_conn_ctx_t *h2c;
   http2_req_t *req;
-  app_worker_t *app_wrk;
-  int rv;
 
   HTTP_DBG (1, "hc [%u]%x", hc->c_thread_index, hc->hc_hc_index);
   h2c = http2_conn_ctx_get_w_thread (hc);
   ASSERT (!(hc->flags & HTTP_CONN_F_IS_SERVER));
   ASSERT (!(h2c->flags & HTTP2_CONN_F_EXPECT_SERVER_SETTINGS));
-  app_wrk = app_worker_get_if_valid (hc->hc_pa_wrk_index);
-  if (!app_wrk)
-    return -1;
   if (h2c->req_num == h2c->settings.max_concurrent_streams)
-    return app_worker_connect_notify (app_wrk, 0, SESSION_E_MAX_STREAMS_HIT,
-                                     parent_app_api_ctx);
+    return SESSION_E_MAX_STREAMS_HIT;
   req = http2_conn_alloc_req (hc, 0);
+  req->base.hr_pa_wrk_index = hc->hc_pa_wrk_index;
   http_req_state_change (&req->base, HTTP_REQ_STATE_WAIT_APP_METHOD);
-  rv = http_conn_established (hc, &req->base, parent_app_api_ctx, 1);
-  if (rv != 0)
-    http2_conn_free_req (h2c, req, hc->c_thread_index);
-  return rv;
+  *req_index = req->base.hr_req_handle;
+  return SESSION_E_NONE;
 }
 
 static void
index f5d600c..14b24b1 100644 (file)
@@ -113,7 +113,6 @@ typedef struct http_req_
     http_req_id_t c_http_req_id;
   };
 #define hr_pa_wrk_index             c_http_req_id.parent_app_wrk_index
-#define hr_pa_session_handle c_http_req_id.app_session_handle
 #define hr_hc_index         c_http_req_id.hc_index
 #define hr_req_handle       connection.c_index
 
@@ -240,9 +239,6 @@ typedef struct http_pending_connect_stream_
 typedef struct http_worker_
 {
   http_conn_t *conn_pool;
-  clib_spinlock_t pending_stream_connects_lock;
-  http_pending_connect_stream_t *pending_connect_streams;
-  http_pending_connect_stream_t *burst_connect_streams;
   http_wrk_stats_t stats;
 } http_worker_t;
 
@@ -299,7 +295,7 @@ typedef struct http_engine_vft_
   void (*transport_conn_reschedule_callback) (http_conn_t *hc);
   void (*conn_accept_callback) (http_conn_t *hc); /* optional */
   int (*conn_connect_stream_callback) (http_conn_t *hc,
-                                      u32 parent_app_api_ctx); /* optional */
+                                      u32 *req_index); /* optional */
   void (*conn_cleanup_callback) (http_conn_t *hc);
   void (*enable_callback) (void);                          /* optional */
   uword (*unformat_cfg_callback) (unformat_input_t *input); /* optional */
@@ -471,7 +467,7 @@ http_app_worker_rx_notify (http_req_t *req)
   session_t *as;
   app_worker_t *app_wrk;
 
-  as = session_get_from_handle (req->hr_pa_session_handle);
+  as = session_get (req->c_s_index, req->c_thread_index);
   if (!(as->flags & SESSION_F_RX_EVT))
     {
       app_wrk = app_worker_get_if_valid (as->app_wrk_index);
@@ -509,7 +505,7 @@ http_get_app_msg (http_req_t *req, http_msg_t *msg)
   session_t *as;
   int rv;
 
-  as = session_get_from_handle (req->hr_pa_session_handle);
+  as = session_get (req->c_s_index, req->c_thread_index);
   rv = svm_fifo_dequeue (as->tx_fifo, sizeof (*msg), (u8 *) msg);
   ASSERT (rv == sizeof (*msg));
 }
@@ -581,14 +577,14 @@ http_req_deschedule (http_req_t *req, transport_send_params_t *sp)
 always_inline void
 http_io_as_add_want_deq_ntf (http_req_t *req)
 {
-  session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+  session_t *as = session_get (req->c_s_index, req->c_thread_index);
   svm_fifo_add_want_deq_ntf (as->rx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
 }
 
 always_inline void
 http_io_as_add_want_read_ntf (http_req_t *req)
 {
-  session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+  session_t *as = session_get (req->c_s_index, req->c_thread_index);
   svm_fifo_add_want_deq_ntf (as->rx_fifo, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL |
                                            SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY);
 }
@@ -596,7 +592,7 @@ http_io_as_add_want_read_ntf (http_req_t *req)
 always_inline void
 http_io_as_del_want_read_ntf (http_req_t *req)
 {
-  session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+  session_t *as = session_get (req->c_s_index, req->c_thread_index);
   svm_fifo_del_want_deq_ntf (as->rx_fifo, SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL |
                                            SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY);
 }
@@ -604,14 +600,14 @@ http_io_as_del_want_read_ntf (http_req_t *req)
 always_inline void
 http_io_as_reset_has_read_ntf (http_req_t *req)
 {
-  session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+  session_t *as = session_get (req->c_s_index, req->c_thread_index);
   svm_fifo_reset_has_deq_ntf (as->rx_fifo);
 }
 
 always_inline void
 http_io_as_dequeue_notify (http_req_t *req, u32 n_last_deq)
 {
-  session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+  session_t *as = session_get (req->c_s_index, req->c_thread_index);
   if (svm_fifo_needs_deq_ntf (as->tx_fifo, n_last_deq))
     session_dequeue_notify (as);
 }
@@ -619,14 +615,14 @@ http_io_as_dequeue_notify (http_req_t *req, u32 n_last_deq)
 always_inline u32
 http_io_as_max_write (http_req_t *req)
 {
-  session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+  session_t *as = session_get (req->c_s_index, req->c_thread_index);
   return svm_fifo_max_enqueue_prod (as->rx_fifo);
 }
 
 always_inline u32
 http_io_as_max_read (http_req_t *req)
 {
-  session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+  session_t *as = session_get (req->c_s_index, req->c_thread_index);
   return svm_fifo_max_dequeue_cons (as->tx_fifo);
 }
 
@@ -634,7 +630,7 @@ always_inline void
 http_io_as_write (http_req_t *req, u8 *data, u32 len)
 {
   int n_written;
-  session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+  session_t *as = session_get (req->c_s_index, req->c_thread_index);
 
   n_written = svm_fifo_enqueue (as->rx_fifo, len, data);
   ASSERT (n_written == len);
@@ -645,7 +641,7 @@ http_io_as_write_segs (http_req_t *req, const svm_fifo_seg_t segs[],
                       u32 n_segs)
 {
   int n_written;
-  session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+  session_t *as = session_get (req->c_s_index, req->c_thread_index);
   n_written = svm_fifo_enqueue_segments (as->rx_fifo, segs, n_segs, 0);
   ASSERT (n_written > 0);
   return (u32) n_written;
@@ -655,7 +651,7 @@ always_inline u32
 http_io_as_peek (http_req_t *req, u8 *buf, u32 len, u32 offset)
 {
   int n_read;
-  session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+  session_t *as = session_get (req->c_s_index, req->c_thread_index);
 
   n_read = svm_fifo_peek (as->tx_fifo, offset, len, buf);
   ASSERT (n_read > 0);
@@ -667,7 +663,7 @@ http_io_as_read_segs (http_req_t *req, svm_fifo_seg_t *segs, u32 *n_segs,
                      u32 max_bytes)
 {
   int n_read;
-  session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+  session_t *as = session_get (req->c_s_index, req->c_thread_index);
   n_read = svm_fifo_segments (as->tx_fifo, 0, segs, n_segs, max_bytes);
   ASSERT (n_read > 0);
   return (u32) n_read;
@@ -676,21 +672,21 @@ http_io_as_read_segs (http_req_t *req, svm_fifo_seg_t *segs, u32 *n_segs,
 always_inline void
 http_io_as_drain (http_req_t *req, u32 len)
 {
-  session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+  session_t *as = session_get (req->c_s_index, req->c_thread_index);
   svm_fifo_dequeue_drop (as->tx_fifo, len);
 }
 
 always_inline void
 http_io_as_drain_all (http_req_t *req)
 {
-  session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+  session_t *as = session_get (req->c_s_index, req->c_thread_index);
   svm_fifo_dequeue_drop_all (as->tx_fifo);
 }
 
 always_inline void
 http_io_as_drain_unread (http_req_t *req)
 {
-  session_t *as = session_get_from_handle (req->hr_pa_session_handle);
+  session_t *as = session_get (req->c_s_index, req->c_thread_index);
   svm_fifo_dequeue_drop_all (as->rx_fifo);
 }
 
@@ -871,13 +867,12 @@ http_conn_accept_request (http_conn_t *hc, http_req_t *req)
   if ((rv = app_worker_init_accepted (as)))
     {
       HTTP_DBG (1, "failed to allocate fifos");
-      req->hr_pa_session_handle = SESSION_INVALID_HANDLE;
+      req->c_s_index = SESSION_INVALID_INDEX;
       session_free (as);
       hc->flags |= HTTP_CONN_F_NO_APP_SESSION;
       return rv;
     }
 
-  req->hr_pa_session_handle = session_handle (as);
   req->hr_pa_wrk_index = as->app_wrk_index;
 
   app_wrk = app_worker_get (as->app_wrk_index);
@@ -885,7 +880,7 @@ http_conn_accept_request (http_conn_t *hc, http_req_t *req)
   if ((rv = app_worker_accept_notify (app_wrk, as)))
     {
       HTTP_DBG (1, "app accept returned");
-      req->hr_pa_session_handle = SESSION_INVALID_HANDLE;
+      req->c_s_index = SESSION_INVALID_INDEX;
       session_free (as);
       hc->flags |= HTTP_CONN_F_NO_APP_SESSION;
       return rv;
@@ -896,7 +891,7 @@ http_conn_accept_request (http_conn_t *hc, http_req_t *req)
 
 always_inline int
 http_conn_established (http_conn_t *hc, http_req_t *req,
-                      u32 parent_app_api_ctx, u8 is_stream)
+                      u32 parent_app_api_ctx)
 {
   session_t *as;
   app_worker_t *app_wrk;
@@ -904,12 +899,9 @@ http_conn_established (http_conn_t *hc, http_req_t *req,
   http_conn_t *ho_hc;
   int rv;
 
-  if (!is_stream)
-    {
-      ho_hc = http_ho_conn_get (hc->ho_index);
-      /* in chain with TLS there is race on half-open cleanup */
-      __atomic_fetch_or (&ho_hc->flags, HTTP_CONN_F_HO_DONE, __ATOMIC_RELEASE);
-    }
+  ho_hc = http_ho_conn_get (hc->ho_index);
+  /* in chain with TLS there is race on half-open cleanup */
+  __atomic_fetch_or (&ho_hc->flags, HTTP_CONN_F_HO_DONE, __ATOMIC_RELEASE);
 
   /* allocate app session and initialize */
   as = session_alloc (hc->c_thread_index);
@@ -943,7 +935,6 @@ http_conn_established (http_conn_t *hc, http_req_t *req,
 
   app_worker_connect_notify (app_wrk, as, 0, parent_app_api_ctx);
 
-  req->hr_pa_session_handle = session_handle (as);
   req->hr_pa_wrk_index = as->app_wrk_index;
 
   return 0;