hsa: http connect proxy client multiworker support 16/43616/24
authorMatus Fabian <[email protected]>
Wed, 20 Aug 2025 16:18:46 +0000 (12:18 -0400)
committerFlorin Coras <[email protected]>
Thu, 25 Sep 2025 15:34:08 +0000 (15:34 +0000)
Type: improvement

Change-Id: I24e80476136712ee8dd4c8c4b3cd7e5c017acc80
Signed-off-by: Matus Fabian <[email protected]>
src/plugins/hs_apps/http_connect_proxy_client.c
test-c/hs-test/infra/netconfig.go
test-c/hs-test/infra/suite_masque.go
test-c/hs-test/infra/utils.go
test-c/hs-test/proxy_test.go
test-c/hs-test/resources/nginx/nginx_masque.conf

index 32cc729..c063243 100644 (file)
@@ -9,6 +9,7 @@
 #include <http/http_status_codes.h>
 #include <vnet/tls/tls_types.h>
 #include <vnet/tcp/tcp.h>
+#include <vppinfra/tw_timer_2t_1w_2048sl.h>
 
 #define HCPC_DEBUG 0
 
 
 #define TCP_MSS 1460
 
+#define HCPC_TIMER_HANDLE_INVALID ((u32) ~0)
+
+#define HCPC_EVENT_PROXY_CONNECTED 1
+
 #define foreach_hcpc_session_state                                            \
+  _ (CREATED, "CREATED")                                                      \
   _ (CONNECTING, "CONNECTING")                                                \
   _ (ESTABLISHED, "ESTABLISHED")                                              \
+  _ (CLOSING, "CLOSING")                                                      \
   _ (CLOSED, "CLOSED")
 
 typedef enum
@@ -33,32 +40,45 @@ typedef enum
 } hcpc_session_state_t;
 
 #define foreach_hcpc_session_flags                                            \
-  _ (IS_PARENT)                                                               \
-  _ (IS_UDP)                                                                  \
-  _ (HTTP_DISCONNECTED)                                                       \
-  _ (LISTENER_DISCONNECTED)
+  _ (IS_PARENT, "is-parent")                                                  \
+  _ (IS_UDP, "is-udp")                                                        \
+  _ (HTTP_SHUTDOWN, "http-shutdown")                                          \
+  _ (INTERCEPT_SHUTDOWN, "intercept-shutdown")
 
 typedef enum
 {
-#define _(sym) HCPC_SESSION_F_BIT_##sym,
+#define _(sym, str) HCPC_SESSION_F_BIT_##sym,
   foreach_hcpc_session_flags
 #undef _
+    HCPC_SESSION_N_F_BITS
 } hcpc_session_flags_bit_t;
 
 typedef enum
 {
-#define _(sym) HCPC_SESSION_F_##sym = 1 << HCPC_SESSION_F_BIT_##sym,
+#define _(sym, str) HCPC_SESSION_F_##sym = 1 << HCPC_SESSION_F_BIT_##sym,
   foreach_hcpc_session_flags
 #undef _
 } hcpc_session_flags_t;
 
+typedef struct
+{
+  session_handle_t session_handle;
+  svm_fifo_t *rx_fifo;
+  svm_fifo_t *tx_fifo;
+} hcpc_session_side_t;
+
 typedef struct
 {
   u32 session_index;
   hcpc_session_state_t state;
-  hcpc_session_flags_t flags;
-  session_handle_t listener_session_handle;
-  session_handle_t http_session_handle;
+  u8 flags;
+  hcpc_session_side_t intercept;
+  hcpc_session_side_t http;
+  u32 timer_handle;
+  u32 opaque;
+  volatile int http_establishing;
+  volatile int intercept_diconnected;
+  volatile int http_disconnected;
 } hcpc_session_t;
 
 typedef struct
@@ -68,6 +88,32 @@ typedef struct
   session_handle_t session_handle;
 } hcpc_listener_t;
 
+#define foreach_hcpc_wrk_stat                                                 \
+  _ (tunnels_opened, "tunnels opened")                                        \
+  _ (udp_idle_timeouts, "udp idle timeouts")                                  \
+  _ (tunnels_reset_by_client, "tunnels reset by client")                      \
+  _ (tunnels_reset_by_target, "tunnels reset by target")                      \
+  _ (max_streams_hit, "max streams hit")                                      \
+  _ (target_unreachable, "target unreachable")                                \
+  _ (client_closed_before_stream_opened, "client closed before stream "       \
+                                        "opened")                            \
+  _ (client_closed_before_tunnel_connected,                                   \
+     "client closed before tunnel connected")
+
+typedef struct
+{
+#define _(name, str) u64 name;
+  foreach_hcpc_wrk_stat
+#undef _
+} hcpc_wrk_stats_t;
+
+typedef struct
+{
+  hcpc_wrk_stats_t stats;
+} hcpc_worker_t;
+
+typedef void (*hsi_intercept_proto_fn) (transport_proto_t proto, u8 is_ip4);
+
 typedef struct
 {
   u32 http_app_index;
@@ -86,10 +132,20 @@ typedef struct
   u32 fifo_size;
   u32 prealloc_fifos;
   u64 private_segment_size;
+  u32 process_node_index;
+  u32 udp_idle_timeout;
+  clib_spinlock_t sessions_lock;
+  clib_spinlock_t tw_lock;
+  tw_timer_wheel_2t_1w_2048sl_t tw;
+  hsi_intercept_proto_fn intercept_proto_fn;
+  hcpc_worker_t *workers;
 } hcpc_main_t;
 
 hcpc_main_t hcpc_main;
 
+#define hcpc_worker_stats_inc(_ti, _stat, _val)                               \
+  hcpcm->workers[_ti].stats._stat += _val
+
 static u8 *
 format_hcpc_session_state (u8 *s, va_list *va)
 {
@@ -109,6 +165,52 @@ format_hcpc_session_state (u8 *s, va_list *va)
   return format (s, "%s", t);
 }
 
+const char *hcpc_session_flags_str[] = {
+#define _(sym, str) str,
+  foreach_hcpc_session_flags
+#undef _
+
+};
+
+static u8 *
+format_hcpc_session_flags (u8 *s, va_list *va)
+{
+  hcpc_session_t *ps = va_arg (*va, hcpc_session_t *);
+  int i, last = -1;
+
+  for (i = 0; i < HCPC_SESSION_N_F_BITS; i++)
+    {
+      if (ps->flags & (1 << i))
+       last = i;
+    }
+
+  for (i = 0; i < last; i++)
+    {
+      if (ps->flags & (1 << i))
+       s = format (s, "%s | ", hcpc_session_flags_str[i]);
+    }
+  if (last >= 0)
+    s = format (s, "%s", hcpc_session_flags_str[i]);
+
+  return s;
+}
+
+static u8 *
+format_hcpc_session_vars (u8 *s, va_list *va)
+{
+  hcpc_session_t *ps = va_arg (*va, hcpc_session_t *);
+
+  s = format (s,
+             " http_establishing %u, http_stream_opened %u, "
+             "http_disconnected %u, intercept_disconnected %u\n",
+             ps->http_establishing,
+             ps->http.session_handle != SESSION_INVALID_HANDLE,
+             ps->http_disconnected, ps->intercept_diconnected);
+  s = format (s, " flags: %U\n", format_hcpc_session_flags, ps);
+
+  return s;
+}
+
 static void
 hcpc_session_close_http (hcpc_session_t *ps)
 {
@@ -116,27 +218,77 @@ hcpc_session_close_http (hcpc_session_t *ps)
   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
   session_error_t rv;
 
-  a->handle = ps->http_session_handle;
+  HCPC_DBG ("session [%u]", ps->session_index);
+  ASSERT (!vlib_num_workers () ||
+         CLIB_SPINLOCK_IS_LOCKED (&hcpcm->sessions_lock));
+
+  a->handle = ps->http.session_handle;
   a->app_index = hcpcm->http_app_index;
   rv = vnet_disconnect_session (a);
   if (rv)
-    clib_warning ("disconnect returned: %U", format_session_error, rv);
-  ps->flags |= HCPC_SESSION_F_HTTP_DISCONNECTED;
+    clib_warning ("session %u disconnect returned: %U", ps->session_index,
+                 format_session_error, rv);
+  ps->http_disconnected = 1;
+}
+
+static void
+hcpc_session_shutdown_http (hcpc_session_t *ps)
+{
+  hcpc_main_t *hcpcm = &hcpc_main;
+  vnet_shutdown_args_t _a = { 0 }, *a = &_a;
+  session_error_t rv;
+
+  HCPC_DBG ("session [%u]", ps->session_index);
+  ASSERT (!vlib_num_workers () ||
+         CLIB_SPINLOCK_IS_LOCKED (&hcpcm->sessions_lock));
+
+  ps->flags |= HCPC_SESSION_F_HTTP_SHUTDOWN;
+  a->handle = ps->http.session_handle;
+  a->app_index = hcpcm->http_app_index;
+  rv = vnet_shutdown_session (a);
+  if (rv)
+    clib_warning ("session %u shutdown returned: %U", ps->session_index,
+                 format_session_error, rv);
 }
 
 static void
-hcpc_session_close_listener (hcpc_session_t *ps)
+hcpc_session_close_intercept (hcpc_session_t *ps)
 {
   hcpc_main_t *hcpcm = &hcpc_main;
   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
   session_error_t rv;
 
-  a->handle = ps->listener_session_handle;
+  HCPC_DBG ("session [%u]", ps->session_index);
+  ASSERT (!vlib_num_workers () ||
+         CLIB_SPINLOCK_IS_LOCKED (&hcpcm->sessions_lock));
+
+  a->handle = ps->intercept.session_handle;
   a->app_index = hcpcm->listener_app_index;
   rv = vnet_disconnect_session (a);
   if (rv)
-    clib_warning ("disconnect returned: %U", format_session_error, rv);
-  ps->flags |= HCPC_SESSION_F_LISTENER_DISCONNECTED;
+    clib_warning ("session %u disconnect returned: %U", ps->session_index,
+                 format_session_error, rv);
+  ps->intercept_diconnected = 1;
+}
+
+static void
+hcpc_session_shutdown_intercept (hcpc_session_t *ps)
+{
+  hcpc_main_t *hcpcm = &hcpc_main;
+  vnet_shutdown_args_t _a = { 0 }, *a = &_a;
+  session_error_t rv;
+
+  HCPC_DBG ("session [%u]", ps->session_index);
+  ASSERT (!vlib_num_workers () ||
+         CLIB_SPINLOCK_IS_LOCKED (&hcpcm->sessions_lock));
+
+  ps->flags |= HCPC_SESSION_F_INTERCEPT_SHUTDOWN;
+  a->handle = ps->intercept.session_handle;
+  a->app_index = hcpcm->listener_app_index;
+  rv = vnet_shutdown_session (a);
+  if (rv)
+    clib_warning ("session %u shutdown returned: %U", ps->session_index,
+                 format_session_error, rv);
 }
 
 static hcpc_session_t *
@@ -145,10 +297,14 @@ hcpc_session_alloc ()
   hcpc_main_t *hcpcm = &hcpc_main;
   hcpc_session_t *ps;
 
+  ASSERT (!vlib_num_workers () ||
+         CLIB_SPINLOCK_IS_LOCKED (&hcpcm->sessions_lock));
+
   pool_get_zero (hcpcm->sessions, ps);
   ps->session_index = ps - hcpcm->sessions;
-  ps->http_session_handle = SESSION_INVALID_HANDLE;
-  ps->listener_session_handle = SESSION_INVALID_HANDLE;
+  ps->http.session_handle = SESSION_INVALID_HANDLE;
+  ps->intercept.session_handle = SESSION_INVALID_HANDLE;
+  ps->timer_handle = HCPC_TIMER_HANDLE_INVALID;
 
   return ps;
 }
@@ -158,6 +314,10 @@ hcpc_session_free (hcpc_session_t *ps)
 {
   hcpc_main_t *hcpcm = &hcpc_main;
 
+  HCPC_DBG ("session [%u]", ps->session_index);
+  ASSERT (!vlib_num_workers () ||
+         CLIB_SPINLOCK_IS_LOCKED (&hcpcm->sessions_lock));
+
   if (CLIB_DEBUG)
     memset (ps, 0xB0, sizeof (*ps));
   pool_put (hcpcm->sessions, ps);
@@ -173,43 +333,169 @@ hcpc_session_get (u32 s_index)
   return pool_elt_at_index (hcpcm->sessions, s_index);
 }
 
+static void
+hcpc_timer_expired_cb (u32 *expired_timers)
+{
+  hcpc_main_t *hcpcm = &hcpc_main;
+  int i;
+  u32 ps_index;
+  hcpc_session_t *ps;
+
+  clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
+  for (i = 0; i < vec_len (expired_timers); i++)
+    {
+      ps_index = expired_timers[i] & 0x7FFFFFFF;
+      ps = hcpc_session_get (ps_index);
+      if (!ps)
+       continue;
+      HCPC_DBG ("session [%u]", ps_index);
+      ASSERT (ps->flags & HCPC_SESSION_F_IS_UDP);
+      ASSERT (ps->http_establishing == 0);
+      ps->state = HCPC_SESSION_CLOSED;
+      ps->timer_handle = HCPC_TIMER_HANDLE_INVALID;
+      if (!ps->intercept_diconnected)
+       hcpc_session_close_intercept (ps);
+      if (!ps->http_disconnected)
+       hcpc_session_close_http (ps);
+    }
+  clib_spinlock_unlock_if_init (&hcpc_main.sessions_lock);
+  hcpc_worker_stats_inc (vlib_get_thread_index (), udp_idle_timeouts,
+                        vec_len (expired_timers));
+}
+
+static inline void
+hcpc_timer_start (hcpc_session_t *ps)
+{
+  hcpc_main_t *hcpcm = &hcpc_main;
+
+  ASSERT (ps->timer_handle == HCPC_TIMER_HANDLE_INVALID);
+  ASSERT (ps->flags & HCPC_SESSION_F_IS_UDP);
+  ASSERT (!vlib_num_workers () ||
+         CLIB_SPINLOCK_IS_LOCKED (&hcpcm->sessions_lock));
+
+  clib_spinlock_lock_if_init (&hcpcm->tw_lock);
+  ps->timer_handle = tw_timer_start_2t_1w_2048sl (
+    &hcpcm->tw, ps->session_index, 0, hcpcm->udp_idle_timeout);
+  clib_spinlock_unlock_if_init (&hcpcm->tw_lock);
+}
+
+static inline void
+hcpc_timer_stop (hcpc_session_t *ps)
+{
+  hcpc_main_t *hcpcm = &hcpc_main;
+
+  ASSERT (!vlib_num_workers () ||
+         CLIB_SPINLOCK_IS_LOCKED (&hcpcm->sessions_lock));
+
+  if (ps->timer_handle == HCPC_TIMER_HANDLE_INVALID)
+    return;
+
+  ASSERT (ps->flags & HCPC_SESSION_F_IS_UDP);
+
+  clib_spinlock_lock_if_init (&hcpcm->tw_lock);
+  tw_timer_stop_2t_1w_2048sl (&hcpcm->tw, ps->timer_handle);
+  ps->timer_handle = HCPC_TIMER_HANDLE_INVALID;
+  clib_spinlock_unlock_if_init (&hcpcm->tw_lock);
+}
+
+static inline void
+hcpc_timer_update (hcpc_session_t *ps)
+{
+  hcpc_main_t *hcpcm = &hcpc_main;
+
+  ASSERT (!vlib_num_workers () ||
+         CLIB_SPINLOCK_IS_LOCKED (&hcpcm->sessions_lock));
+
+  if (ps->timer_handle == HCPC_TIMER_HANDLE_INVALID)
+    return;
+
+  ASSERT (ps->flags & HCPC_SESSION_F_IS_UDP);
+
+  clib_spinlock_lock_if_init (&hcpcm->tw_lock);
+  tw_timer_update_2t_1w_2048sl (&hcpcm->tw, ps->timer_handle,
+                               hcpcm->udp_idle_timeout);
+  clib_spinlock_unlock_if_init (&hcpcm->tw_lock);
+}
+
+static void
+hcpc_session_postponed_free_rpc (void *arg)
+{
+  uword session_index = pointer_to_uword (arg);
+  hcpc_main_t *hcpcm = &hcpc_main;
+  hcpc_session_t *ps;
+
+  clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
+
+  HCPC_DBG ("session [%u]", session_index);
+  ps = hcpc_session_get (session_index);
+  ASSERT (ps);
+  segment_manager_dealloc_fifos (ps->intercept.rx_fifo, ps->intercept.tx_fifo);
+  hcpc_session_free (ps);
+
+  clib_spinlock_unlock_if_init (&hcpc_main.sessions_lock);
+}
+
 static void
 hcpc_delete_session (session_t *s, u8 is_http)
 {
+  hcpc_main_t *hcpcm = &hcpc_main;
   hcpc_session_t *ps;
-  session_t *ls;
 
-  HCPC_DBG ("session %u (is http %u)", s->opaque, is_http);
+  clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
+
+  HCPC_DBG ("session [%u] (is http %u)", s->opaque, is_http);
   ps = hcpc_session_get (s->opaque);
   ASSERT (ps);
 
+  hcpc_timer_stop (ps);
+
   if (is_http)
     {
-      ps->http_session_handle = SESSION_INVALID_HANDLE;
+      ps->http.session_handle = SESSION_INVALID_HANDLE;
       /* http connection session doesn't have listener */
       if (ps->flags & HCPC_SESSION_F_IS_PARENT)
        {
-         ASSERT (ps->listener_session_handle == SESSION_INVALID_HANDLE);
+         ASSERT (ps->intercept.session_handle == SESSION_INVALID_HANDLE);
          hcpc_session_free (ps);
+         clib_spinlock_unlock_if_init (&hcpc_main.sessions_lock);
          return;
        }
+
+      /* revert master thread index change on connect notification */
+      ps->intercept.rx_fifo->master_thread_index =
+       ps->intercept.tx_fifo->master_thread_index;
+
       /* listener already cleaned up */
-      if (ps->listener_session_handle == SESSION_INVALID_HANDLE)
+      if (ps->intercept.session_handle == SESSION_INVALID_HANDLE)
        {
-         ASSERT (s->rx_fifo->refcnt == 1);
-         hcpc_session_free (ps);
-         return;
+         if (s->thread_index != ps->intercept.tx_fifo->master_thread_index)
+           {
+             s->rx_fifo = 0;
+             s->tx_fifo = 0;
+             session_send_rpc_evt_to_thread (
+               ps->intercept.tx_fifo->master_thread_index,
+               hcpc_session_postponed_free_rpc,
+               uword_to_pointer (ps->session_index, void *));
+           }
+         else
+           {
+             ASSERT (s->rx_fifo->refcnt == 1);
+             hcpc_session_free (ps);
+           }
        }
-      ls = session_get_from_handle (ps->listener_session_handle);
-      ls->rx_fifo->master_thread_index = ls->tx_fifo->master_thread_index;
     }
   else
     {
-      ps->listener_session_handle = SESSION_INVALID_HANDLE;
+      ps->intercept.session_handle = SESSION_INVALID_HANDLE;
       /* http already cleaned up */
-      if (ps->http_session_handle == SESSION_INVALID_HANDLE)
-       hcpc_session_free (ps);
+      if (ps->http.session_handle == SESSION_INVALID_HANDLE)
+       {
+         if (!ps->http_establishing)
+           hcpc_session_free (ps);
+       }
     }
+
+  clib_spinlock_unlock_if_init (&hcpc_main.sessions_lock);
 }
 
 static void
@@ -219,6 +505,13 @@ hcpc_http_connection_closed ()
   hcpc_listener_t *l;
   hcpc_session_t *ps;
 
+  pool_foreach (ps, hcpcm->sessions)
+    {
+      ps->state = HCPC_SESSION_CLOSED;
+      ps->intercept_diconnected = 1;
+      ps->http_disconnected = 1;
+    }
+
   pool_foreach (l, hcpcm->listeners)
     {
       if (l->session_handle != SESSION_INVALID_HANDLE)
@@ -228,11 +521,6 @@ hcpc_http_connection_closed ()
          vnet_unlisten (&a);
        }
     }
-
-  pool_foreach (ps, hcpcm->sessions)
-    {
-      ps->state = HCPC_SESSION_CLOSED;
-    }
 }
 
 static void
@@ -241,37 +529,124 @@ hcpc_close_session (session_t *s, u8 is_http)
   hcpc_main_t *hcpcm = &hcpc_main;
   hcpc_session_t *ps;
 
-  HCPC_DBG ("session %u (is http %u)", s->opaque, is_http);
+  HCPC_DBG ("session [%u] (is http %u)", s->opaque, is_http);
+  clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
   ps = hcpc_session_get (s->opaque);
   ASSERT (ps);
-  ps->state = HCPC_SESSION_CLOSED;
 
   if (is_http)
     {
       /* http connection went down */
       if (ps->flags & HCPC_SESSION_F_IS_PARENT)
        {
+         ps->state = HCPC_SESSION_CLOSED;
          hcpcm->http_connection_handle = SESSION_INVALID_HANDLE;
          hcpc_http_connection_closed ();
+         clib_spinlock_unlock_if_init (&hcpc_main.sessions_lock);
          return;
        }
-      hcpc_session_close_http (ps);
-      if (!(ps->flags & HCPC_SESSION_F_LISTENER_DISCONNECTED))
+      if (ps->flags & HCPC_SESSION_F_IS_UDP)
        {
-         ASSERT (ps->http_session_handle != SESSION_INVALID_HANDLE);
-         hcpc_session_close_listener (ps);
+         /* udp don't have half-close */
+         ps->state = HCPC_SESSION_CLOSED;
+         hcpc_session_close_http (ps);
+         if (!(ps->intercept_diconnected))
+           {
+             ASSERT (ps->http.session_handle != SESSION_INVALID_HANDLE);
+             hcpc_session_close_intercept (ps);
+           }
+       }
+      else
+       {
+         if (ps->intercept_diconnected)
+           {
+             /* intercept already disconnect, we can close http immediately */
+             hcpc_session_close_http (ps);
+           }
+         else
+           {
+             ASSERT (ps->http.session_handle != SESSION_INVALID_HANDLE);
+             if (ps->state == HCPC_SESSION_CLOSING)
+               {
+                 /* both sides start closing on same time from different
+                  * threads, so http don't receive shutdown evt */
+                 if (ps->flags & HCPC_SESSION_F_HTTP_SHUTDOWN)
+                   {
+                     ps->flags &= ~HCPC_SESSION_F_HTTP_SHUTDOWN;
+                     hcpc_session_close_http (ps);
+                   }
+                 /* intercept start closing first, confirm close */
+                 ps->state = HCPC_SESSION_CLOSED;
+                 hcpc_session_close_intercept (ps);
+               }
+             else
+               {
+                 /* shutdown intercept first, http will be closed on intercept
+                  * close event */
+                 ASSERT (ps->state == HCPC_SESSION_ESTABLISHED);
+                 ps->state = HCPC_SESSION_CLOSING;
+                 hcpc_session_shutdown_intercept (ps);
+               }
+           }
        }
     }
   else
     {
-      hcpc_session_close_listener (ps);
-      if (!(ps->flags & HCPC_SESSION_F_HTTP_DISCONNECTED))
+      if (ps->flags & HCPC_SESSION_F_IS_UDP)
+       {
+         /* udp don't have half-close */
+         ps->state = HCPC_SESSION_CLOSED;
+         hcpc_session_close_intercept (ps);
+         if (!(ps->http_disconnected) && !(ps->http_establishing))
+           {
+             if (ps->http.session_handle != SESSION_INVALID_HANDLE)
+               hcpc_session_close_http (ps);
+             ps->http_disconnected = 1;
+           }
+       }
+      else
        {
-         if (ps->http_session_handle != SESSION_INVALID_HANDLE)
-           hcpc_session_close_http (ps);
-         ps->flags |= HCPC_SESSION_F_HTTP_DISCONNECTED;
+         if (ps->http_disconnected)
+           {
+             /* http already disconnect, we can close intercept immediately */
+             hcpc_session_close_intercept (ps);
+           }
+         else
+           {
+             if (ps->state == HCPC_SESSION_CLOSING)
+               {
+                 /* both sides start closing on same time from different
+                  * threads, so intercept don't receive shutdown evt */
+                 if (ps->flags & HCPC_SESSION_F_INTERCEPT_SHUTDOWN)
+                   {
+                     ps->flags &= ~HCPC_SESSION_F_INTERCEPT_SHUTDOWN;
+                     hcpc_session_close_intercept (ps);
+                   }
+                 /* http start closing first, confirm close */
+                 ps->state = HCPC_SESSION_CLOSED;
+                 hcpc_session_close_http (ps);
+               }
+             else
+               {
+                 if (ps->http_establishing)
+                   {
+                     /* http is still connecting */
+                     hcpc_session_close_intercept (ps);
+                   }
+                 else
+                   {
+                     /* shutdown http first, intercept will be closed on http
+                      * close event, unless it is already closed */
+                     ps->state = HCPC_SESSION_CLOSING;
+                     if (ps->http.session_handle != SESSION_INVALID_HANDLE)
+                       hcpc_session_shutdown_http (ps);
+                   }
+               }
+           }
        }
     }
+
+  clib_spinlock_unlock_if_init (&hcpc_main.sessions_lock);
 }
 
 static void
@@ -294,9 +669,26 @@ hcpc_listen (hcpc_listener_t *l)
   l->session_handle = a->handle;
   HCPC_DBG ("listener started %U:%u", format_ip46_address, &l->sep.ip,
            l->sep.is_ip4, clib_net_to_host_u16 (l->sep.port));
+
+  /* if listen all (wildcard ip and port) enable it in hsi */
+  if (l->sep.port == 0)
+    {
+      if (l->sep.is_ip4)
+       {
+         if (l->sep.ip.ip4.as_u32 != 0)
+           return;
+         hcpcm->intercept_proto_fn (l->sep.transport_proto, 1);
+       }
+      else
+       {
+         if (!(l->sep.ip.ip6.as_u64[0] == 0 || l->sep.ip.ip6.as_u64[1] == 0))
+           return;
+         hcpcm->intercept_proto_fn (l->sep.transport_proto, 0);
+       }
+    }
 }
 
-static void
+void
 hcpc_start_listen ()
 {
   hcpc_main_t *hcpcm = &hcpc_main;
@@ -308,11 +700,66 @@ hcpc_start_listen ()
     }
 }
 
-static void
+#define HCPC_ARC_IP4  "ip4-unicast"
+#define HCPC_ARC_IP6  "ip6-unicast"
+#define HCPC_NODE_IP4 "hsi4-in"
+#define HCPC_NODE_IP6 "hsi6-in"
+
+static clib_error_t *
+hcpc_enable_hsi (u8 is_ip4)
+{
+  hcpc_main_t *hcpcm = &hcpc_main;
+  vnet_feature_registration_t *reg;
+  clib_error_t *err = 0;
+  int rv;
+
+  if (is_ip4)
+    {
+      if (hcpcm->hsi4_enabled)
+       return 0;
+      reg = vnet_get_feature_reg (HCPC_ARC_IP4, HCPC_NODE_IP4);
+    }
+  else
+    {
+      if (hcpcm->hsi6_enabled)
+       return 0;
+      reg = vnet_get_feature_reg (HCPC_ARC_IP6, HCPC_NODE_IP6);
+    }
+  if (reg == 0)
+    return clib_error_return (0, "hsi plugin not loaded");
+
+  if (reg->enable_disable_cb)
+    {
+      if ((err = reg->enable_disable_cb (hcpcm->sw_if_index, 1)))
+       return err;
+    }
+
+  if (is_ip4)
+    rv = vnet_feature_enable_disable (HCPC_ARC_IP4, HCPC_NODE_IP4,
+                                     hcpcm->sw_if_index, 1, 0, 0);
+  else
+    rv = vnet_feature_enable_disable (HCPC_ARC_IP6, HCPC_NODE_IP6,
+                                     hcpcm->sw_if_index, 1, 0, 0);
+  if (rv)
+    return clib_error_return (0, "vnet feature enable failed (rv=%d)", rv);
+
+  if (is_ip4)
+    hcpcm->hsi4_enabled = 1;
+  else
+    hcpcm->hsi6_enabled = 1;
+  return 0;
+}
+
+static clib_error_t *
 hcpc_listener_add (hcpc_listener_t *l_cfg)
 {
   hcpc_main_t *hcpcm = &hcpc_main;
   hcpc_listener_t *l;
+  clib_error_t *err = 0;
+
+  err = hcpc_enable_hsi (l_cfg->sep.is_ip4);
+  if (err)
+    return err;
 
   pool_get (hcpcm->listeners, l);
   *l = *l_cfg;
@@ -321,6 +768,8 @@ hcpc_listener_add (hcpc_listener_t *l_cfg)
 
   if (hcpcm->http_connection_handle != SESSION_INVALID_HANDLE)
     hcpc_listen (l);
+
+  return 0;
 }
 
 static int
@@ -356,11 +805,10 @@ hcpc_listener_del (hcpc_listener_t *l_cfg)
 }
 
 static void
-hcpc_connect_http_stream_rpc (void *rpc_args)
+hcpc_open_http_stream (u32 session_index)
 {
   hcpc_main_t *hcpcm = &hcpc_main;
   vnet_connect_args_t _a, *a = &_a;
-  u32 session_index = pointer_to_uword (rpc_args);
   session_error_t rv;
 
   clib_memset (a, 0, sizeof (*a));
@@ -375,9 +823,27 @@ hcpc_connect_http_stream_rpc (void *rpc_args)
     clib_warning ("connect returned: %U", format_session_error, rv);
 }
 
+static void
+hcpc_connect_http_stream_rpc (void *rpc_args)
+{
+  u32 session_index = pointer_to_uword (rpc_args);
+
+  hcpc_open_http_stream (session_index);
+}
+
 static void
 hcpc_connect_http_stream (u32 session_index)
 {
+  u32 connects_thread = transport_cl_thread (), thread_index;
+
+  thread_index = vlib_get_thread_index ();
+
+  if (thread_index == connects_thread)
+    {
+      hcpc_open_http_stream (session_index);
+      return;
+    }
+
   session_send_rpc_evt_to_thread_force (
     transport_cl_thread (), hcpc_connect_http_stream_rpc,
     uword_to_pointer (session_index, void *));
@@ -445,8 +911,10 @@ hcpc_write_http_connect_udp_req (svm_fifo_t *f, transport_connection_t *tc)
     target = format (0, "/.well-known/masque/udp/[%U]/%u/", format_ip6_address,
                     &tc->lcl_ip.ip6, clib_net_to_host_u16 (tc->lcl_port));
 
-  HCPC_DBG ("opening UDP tunnel to: %U:%u", format_ip46_address, &tc->lcl_ip,
-           tc->is_ip4, clib_net_to_host_u16 (tc->lcl_port));
+  HCPC_DBG ("opening UDP tunnel %U:%u->%U:%u", format_ip46_address,
+           &tc->rmt_ip, tc->is_ip4, clib_net_to_host_u16 (tc->rmt_port),
+           format_ip46_address, &tc->lcl_ip, tc->is_ip4,
+           clib_net_to_host_u16 (tc->lcl_port));
 
   msg.type = HTTP_MSG_REQUEST;
   msg.method_type = HTTP_REQ_CONNECT;
@@ -470,7 +938,6 @@ hcpc_write_http_connect_udp_req (svm_fifo_t *f, transport_connection_t *tc)
       clib_warning ("enqueue failed: %d", rv);
       return -1;
     }
-  clib_warning ("%d", rv);
 
   return 0;
 }
@@ -489,7 +956,10 @@ hcpc_write_http_connect_req (svm_fifo_t *f, transport_connection_t *tc)
     target = format (0, "[%U]:%u", format_ip6_address, &tc->lcl_ip.ip6,
                     clib_net_to_host_u16 (tc->lcl_port));
 
-  HCPC_DBG ("opening TCP tunnel to: %v", target);
+  HCPC_DBG ("opening TCP tunnel %U:%u->%U:%u", format_ip46_address,
+           &tc->rmt_ip, tc->is_ip4, clib_net_to_host_u16 (tc->rmt_port),
+           format_ip46_address, &tc->lcl_ip, tc->is_ip4,
+           clib_net_to_host_u16 (tc->lcl_port));
 
   msg.type = HTTP_MSG_REQUEST;
   msg.method_type = HTTP_REQ_CONNECT;
@@ -515,8 +985,9 @@ hcpc_write_http_connect_req (svm_fifo_t *f, transport_connection_t *tc)
 }
 
 static int
-hcpc_read_http_connect_resp (session_t *s, hcpc_session_t *ps)
+hcpc_read_http_connect_resp (session_t *s)
 {
+  hcpc_main_t *hcpcm = &hcpc_main;
   http_msg_t msg;
   int rv;
 
@@ -528,10 +999,12 @@ hcpc_read_http_connect_resp (session_t *s, hcpc_session_t *ps)
   HCPC_DBG ("response: %U %U", format_http_version,
            http_session_get_version (s), format_http_status_code, msg.code);
   if (http_status_code_str[msg.code][0] != '2')
-    return -1;
-
-  ps->state = HCPC_SESSION_ESTABLISHED;
+    {
+      hcpc_worker_stats_inc (vlib_get_thread_index (), target_unreachable, 1);
+      return -1;
+    }
 
+  hcpc_worker_stats_inc (vlib_get_thread_index (), tunnels_opened, 1);
   return 0;
 }
 
@@ -540,90 +1013,265 @@ hcpc_read_http_connect_resp (session_t *s, hcpc_session_t *ps)
 /***************************/
 
 static int
-http_session_connected_callback (u32 app_index, u32 session_index,
-                                session_t *s, session_error_t err)
+hcpc_http_session_connected_callback (u32 app_index, u32 session_index,
+                                     session_t *s, session_error_t err)
 {
   hcpc_main_t *hcpcm = &hcpc_main;
   hcpc_session_t *ps;
 
   if (err)
     {
-      clib_warning ("connect error: %U", format_session_error, err);
-      return -1;
+      clib_warning ("session %u connect error: %U", session_index,
+                   format_session_error, err);
+
+      /* connect to http proxy server failed */
+      if (hcpcm->http_connection_handle == SESSION_INVALID_HANDLE)
+       return 0;
+
+      clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
+      ps = hcpc_session_get (session_index);
+      ASSERT (ps);
+      ps->state = HCPC_SESSION_CLOSED;
+      ps->http_disconnected = 1;
+      ps->http_establishing = 0;
+      if (!ps->intercept_diconnected)
+       {
+         if (ps->intercept.session_handle != SESSION_INVALID_HANDLE)
+           {
+             session_reset (
+               session_get_from_handle (ps->intercept.session_handle));
+             ps->intercept_diconnected = 1;
+           }
+         else
+           hcpc_delete_session (s, 1);
+       }
+      clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+      if (err == SESSION_E_MAX_STREAMS_HIT)
+       hcpc_worker_stats_inc (vlib_get_thread_index (), max_streams_hit, 1);
+      return 0;
     }
 
   if (hcpcm->http_connection_handle == SESSION_INVALID_HANDLE)
     {
       HCPC_DBG ("parent session connected");
+      clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
       ps = hcpc_session_alloc ();
-      ps->http_session_handle = session_handle (s);
+      ps->http.session_handle = session_handle (s);
+      ps->http.rx_fifo = s->rx_fifo;
+      ps->http.tx_fifo = s->tx_fifo;
       ps->flags |= HCPC_SESSION_F_IS_PARENT;
+      clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
       s->opaque = ps->session_index;
       hcpcm->http_connection_handle = session_handle (s);
-      hcpc_start_listen ();
+      vlib_process_signal_event_mt (vlib_get_main (),
+                                   hcpcm->process_node_index,
+                                   HCPC_EVENT_PROXY_CONNECTED, 0);
       return 0;
     }
 
-  HCPC_DBG ("stream for session %u opened", session_index);
+  HCPC_DBG ("stream for session [%u] opened", session_index);
+  clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
   ps = hcpc_session_get (session_index);
-  if (!ps)
-    return -1;
+  ASSERT (ps);
+  ps->http.session_handle = session_handle (s);
+  ps->http.rx_fifo = s->rx_fifo;
+  ps->http.tx_fifo = s->tx_fifo;
+
+  /* listener session was already closed */
+  if (ps->intercept_diconnected)
+    {
+      hcpc_worker_stats_inc (s->thread_index,
+                            client_closed_before_stream_opened, 1);
+      session_reset (s);
+      ps->http_establishing = 0;
+      ps->http_disconnected = 1;
+      clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+      return -1;
+    }
 
-  ps->http_session_handle = session_handle (s);
+  clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
 
-  if (svm_fifo_set_event (s->tx_fifo))
-    session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX);
+  if (svm_fifo_max_dequeue (s->tx_fifo))
+    if (svm_fifo_set_event (s->tx_fifo))
+      session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX);
 
   return 0;
 }
 
 static void
-http_session_disconnect_callback (session_t *s)
+hcpc_http_session_disconnect_callback (session_t *s)
 {
   hcpc_close_session (s, 1);
 }
 
 static void
-http_session_transport_closed_callback (session_t *s)
+hcpc_http_session_transport_closed_callback (session_t *s)
 {
-  clib_warning ("transport closed");
+  HCPC_DBG ("session [%u]", s->opaque);
 }
 
 static void
-http_session_reset_callback (session_t *s)
+hcpc_http_session_reset_callback (session_t *s)
 {
-  hcpc_close_session (s, 1);
+  hcpc_main_t *hcpcm = &hcpc_main;
+  hcpc_session_t *ps;
+
+  HCPC_DBG ("session [%u]", s->opaque);
+  clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
+  ps = hcpc_session_get (s->opaque);
+  ASSERT (ps);
+  ps->state = HCPC_SESSION_CLOSED;
+  hcpc_session_close_http (ps);
+  if (!ps->intercept_diconnected &&
+      ps->intercept.session_handle != SESSION_INVALID_HANDLE)
+    {
+      session_reset (session_get_from_handle (ps->intercept.session_handle));
+      ps->intercept_diconnected = 1;
+    }
+  clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+  hcpc_worker_stats_inc (s->thread_index, tunnels_reset_by_target, 1);
+}
+
+static void
+hcpc_intercept_rollback_cwnd (session_handle_t intercept_handle, u32 cwnd)
+{
+  session_t *s;
+  transport_connection_t *tc;
+  tcp_connection_t *tcp_conn;
+
+  s = session_get_from_handle_if_valid (intercept_handle);
+  if (!s)
+    return;
+  ASSERT (TRANSPORT_PROTO_TCP == session_get_transport_proto (s));
+  tc = session_get_transport (s);
+  tcp_conn = (tcp_connection_t *) tc;
+  tcp_conn->cwnd = cwnd;
+  transport_connection_reschedule (tc);
+}
+
+static void
+hcpc_intercept_rollback_cwnd_rpc (void *arg)
+{
+  hcpc_main_t *hcpcm = &hcpc_main;
+  hcpc_session_t *ps;
+
+  clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
+  ps = hcpc_session_get (pointer_to_uword (arg));
+  ASSERT (ps);
+  hcpc_intercept_rollback_cwnd (ps->intercept.session_handle, ps->opaque);
+  clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
 }
 
 static int
-http_rx_callback (session_t *s)
+hcpc_http_rx_callback (session_t *s)
 {
+  hcpc_main_t *hcpcm = &hcpc_main;
   hcpc_session_t *ps;
   svm_fifo_t *listener_tx_fifo;
+  session_handle_t sh;
 
-  HCPC_DBG ("session %u", s->opaque);
+  HCPC_DBG ("session [%u]", s->opaque);
+  clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
   ps = hcpc_session_get (s->opaque);
   ASSERT (ps);
-  if (ps->state == HCPC_SESSION_CLOSED)
-    return -1;
-  else if (ps->state == HCPC_SESSION_CONNECTING)
-    return hcpc_read_http_connect_resp (s, ps);
+  if (ps->http_establishing)
+    {
+      ps->http_establishing = 0;
+      if (ps->intercept_diconnected || hcpc_read_http_connect_resp (s))
+       {
+         ps->state = HCPC_SESSION_CLOSED;
+         session_reset (s);
+         ps->http_disconnected = 1;
+         if (!ps->intercept_diconnected)
+           {
+             if (ps->intercept.session_handle != SESSION_INVALID_HANDLE)
+               session_reset (
+                 session_get_from_handle (ps->intercept.session_handle));
+             ps->intercept_diconnected = 1;
+           }
+         else
+           hcpc_worker_stats_inc (s->thread_index,
+                                  client_closed_before_tunnel_connected, 1);
+         clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+         return 0;
+       }
+      ps->state = HCPC_SESSION_ESTABLISHED;
+      if (ps->flags & HCPC_SESSION_F_IS_UDP)
+       hcpc_timer_start (ps);
+      else
+       {
+         /* rollback cwnd and reschedule tcp intercept transport */
+         if (s->thread_index !=
+             session_thread_from_handle (ps->intercept.session_handle))
+           session_send_rpc_evt_to_thread (
+             session_thread_from_handle (ps->intercept.session_handle),
+             hcpc_intercept_rollback_cwnd_rpc,
+             uword_to_pointer (ps->session_index, void *));
+         else
+           hcpc_intercept_rollback_cwnd (ps->intercept.session_handle,
+                                         ps->opaque);
+       }
+
+      clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+
+      return 0;
+    }
+  if (ps->intercept_diconnected)
+    {
+      clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+      return -1;
+    }
+  hcpc_timer_update (ps);
+  sh = ps->intercept.session_handle;
+  clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
 
-  /* send event for listener tx fifo */
+  /* send event for intercept tx fifo */
   listener_tx_fifo = s->rx_fifo;
+  ASSERT (svm_fifo_max_dequeue (listener_tx_fifo));
   if (svm_fifo_set_event (listener_tx_fifo))
-    session_program_tx_io_evt (listener_tx_fifo->vpp_sh, SESSION_IO_EVT_TX);
+    session_program_tx_io_evt (sh, SESSION_IO_EVT_TX);
 
   return 0;
 }
 
+static void
+hcpc_intercept_force_ack (session_handle_t intercept_handle)
+{
+  transport_connection_t *tc;
+  session_t *s;
+
+  s = session_get_from_handle_if_valid (intercept_handle);
+  if (!s || session_get_transport_proto (s) != TRANSPORT_PROTO_TCP)
+    return;
+
+  tc = session_get_transport (s);
+  tcp_send_ack ((tcp_connection_t *) tc);
+}
+
+static void
+hcpc_intercept_force_ack_rpc (void *arg)
+{
+  hcpc_main_t *hcpcm = &hcpc_main;
+  hcpc_session_t *ps;
+
+  clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
+  ps = hcpc_session_get (pointer_to_uword (arg));
+  ASSERT (ps);
+  hcpc_intercept_force_ack (ps->intercept.session_handle);
+  clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+}
+
 static int
-http_tx_callback (session_t *s)
+hcpc_http_tx_callback (session_t *s)
 {
+  hcpc_main_t *hcpcm = &hcpc_main;
   hcpc_session_t *ps;
   u32 min_free;
+  hcpc_session_state_t state;
+  u32 ps_index;
+  session_handle_t intercept_sh;
 
-  HCPC_DBG ("session %u", s->opaque);
+  HCPC_DBG ("session [%u]", s->opaque);
   min_free = clib_min (svm_fifo_size (s->tx_fifo) >> 3, 128 << 10);
   if (svm_fifo_max_enqueue (s->tx_fifo) < min_free)
     {
@@ -631,26 +1279,35 @@ http_tx_callback (session_t *s)
       return 0;
     }
 
+  clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
   ps = hcpc_session_get (s->opaque);
   ASSERT (ps);
+  state = ps->state;
+  if (ps->intercept_diconnected)
+    {
+      clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+      return -1;
+    }
+  ps_index = ps->session_index;
+  intercept_sh = ps->intercept.session_handle;
+  clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
 
-  if (ps->state < HCPC_SESSION_ESTABLISHED)
+  if (state < HCPC_SESSION_ESTABLISHED)
     return 0;
 
-  if (ps->state == HCPC_SESSION_CLOSED)
-    return -1;
-
   /* force ack on listener side to update rcv wnd */
-  if (ps->flags & HCPC_SESSION_F_IS_UDP)
-    return 0;
-  tcp_send_ack ((tcp_connection_t *) session_get_transport (
-    session_get_from_handle (ps->listener_session_handle)));
+  if (s->thread_index != session_thread_from_handle (intercept_sh))
+    session_send_rpc_evt_to_thread (session_thread_from_handle (intercept_sh),
+                                   hcpc_intercept_force_ack_rpc,
+                                   uword_to_pointer (ps_index, void *));
+  else
+    hcpc_intercept_force_ack (intercept_sh);
 
   return 0;
 }
 
 static void
-http_session_cleanup_callback (session_t *s, session_cleanup_ntf_t ntf)
+hcpc_http_session_cleanup_callback (session_t *s, session_cleanup_ntf_t ntf)
 {
   if (ntf == SESSION_CLEANUP_TRANSPORT)
     return;
@@ -659,7 +1316,7 @@ http_session_cleanup_callback (session_t *s, session_cleanup_ntf_t ntf)
 }
 
 static int
-http_alloc_session_fifos (session_t *s)
+hcpc_http_alloc_session_fifos (session_t *s)
 {
   hcpc_main_t *hcpcm = &hcpc_main;
   hcpc_session_t *ps;
@@ -667,17 +1324,24 @@ http_alloc_session_fifos (session_t *s)
   svm_fifo_t *rx_fifo = 0, *tx_fifo = 0;
   int rv;
 
-  HCPC_DBG ("session %u alloc fifos", s->opaque);
+  HCPC_DBG ("session [%u] alloc fifos", s->opaque);
+  clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
+
   ps = hcpc_session_get (s->opaque);
+
   /* http connection session doesn't have listener */
   if (!ps)
     {
       HCPC_DBG ("http connection session");
+      /* http connection is not mapped to any listener, alloc session fifos */
       app_worker_t *app_wrk = app_worker_get (hcpcm->http_app_index);
       segment_manager_t *sm = app_worker_get_connect_segment_manager (app_wrk);
       if ((rv = segment_manager_alloc_session_fifos (sm, s->thread_index,
                                                     &rx_fifo, &tx_fifo)))
-       return rv;
+       {
+         clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+         return rv;
+       }
       rx_fifo->shr->master_session_index = s->session_index;
       rx_fifo->vpp_sh = s->handle;
       s->flags &= ~SESSION_F_PROXY;
@@ -685,29 +1349,40 @@ http_alloc_session_fifos (session_t *s)
   else
     {
       HCPC_DBG ("http stream session");
-      ls = session_get_from_handle (ps->listener_session_handle);
+      if (ps->intercept_diconnected)
+       {
+         clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+         return SESSION_E_ALLOC;
+       }
+      ls = session_get_from_handle (ps->intercept.session_handle);
       tx_fifo = ls->rx_fifo;
       rx_fifo = ls->tx_fifo;
+      ASSERT (rx_fifo->refcnt == 1);
+      ASSERT (tx_fifo->refcnt == 1);
       rx_fifo->refcnt++;
       tx_fifo->refcnt++;
     }
 
   tx_fifo->shr->master_session_index = s->session_index;
   tx_fifo->vpp_sh = s->handle;
+
+  clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+
   s->rx_fifo = rx_fifo;
   s->tx_fifo = tx_fifo;
   return 0;
 }
 
 static session_cb_vft_t http_session_cb_vft = {
-  .session_connected_callback = http_session_connected_callback,
-  .session_disconnect_callback = http_session_disconnect_callback,
-  .session_transport_closed_callback = http_session_transport_closed_callback,
-  .session_reset_callback = http_session_reset_callback,
-  .builtin_app_rx_callback = http_rx_callback,
-  .builtin_app_tx_callback = http_tx_callback,
-  .session_cleanup_callback = http_session_cleanup_callback,
-  .proxy_alloc_session_fifos = http_alloc_session_fifos,
+  .session_connected_callback = hcpc_http_session_connected_callback,
+  .session_disconnect_callback = hcpc_http_session_disconnect_callback,
+  .session_transport_closed_callback =
+    hcpc_http_session_transport_closed_callback,
+  .session_reset_callback = hcpc_http_session_reset_callback,
+  .builtin_app_rx_callback = hcpc_http_rx_callback,
+  .builtin_app_tx_callback = hcpc_http_tx_callback,
+  .session_cleanup_callback = hcpc_http_session_cleanup_callback,
+  .proxy_alloc_session_fifos = hcpc_http_alloc_session_fifos,
 };
 
 /*******************************/
@@ -715,61 +1390,113 @@ static session_cb_vft_t http_session_cb_vft = {
 /*******************************/
 
 static int
-listener_accept_callback (session_t *s)
+hcpc_intercept_accept_callback (session_t *s)
 {
   hcpc_session_t *ps;
   hcpc_main_t *hcpcm = &hcpc_main;
+  tcp_connection_t *tcp_conn;
 
   if (hcpcm->http_connection_handle == SESSION_INVALID_HANDLE)
     return -1;
 
+  clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
+
   ps = hcpc_session_alloc ();
   ps->state = HCPC_SESSION_CONNECTING;
-  ps->listener_session_handle = session_handle (s);
+  ps->intercept.session_handle = session_handle (s);
+  ps->intercept.rx_fifo = s->rx_fifo;
+  ps->intercept.tx_fifo = s->tx_fifo;
+  ps->http_establishing = 1;
   if (session_get_transport_proto (s) == TRANSPORT_PROTO_UDP)
     ps->flags |= HCPC_SESSION_F_IS_UDP;
+  else
+    {
+      /* set cwnd to zero temporarily, otherwise tcp on intercept side can
+       * dequeue http response msg when doing ack */
+      tcp_conn = (tcp_connection_t *) session_get_transport (s);
+      ps->opaque = tcp_conn->cwnd;
+      tcp_conn->cwnd = 0;
+    }
+
+  clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+
   s->opaque = ps->session_index;
   s->session_state = SESSION_STATE_READY;
 
-  HCPC_DBG ("going to open stream for new session %u", ps->session_index);
+  HCPC_DBG ("going to open stream for new session [%u]", ps->session_index);
   hcpc_connect_http_stream (ps->session_index);
 
   return 0;
 }
 
 static void
-listener_session_disconnect_callback (session_t *s)
+hcpc_intercept_session_disconnect_callback (session_t *s)
 {
   hcpc_close_session (s, 0);
 }
 
 static void
-listener_session_reset_callback (session_t *s)
+hcpc_intercept_session_transport_closed_callback (session_t *s)
 {
-  hcpc_close_session (s, 0);
+  HCPC_DBG ("session [%u]", s->opaque);
+}
+
+static void
+hcpc_intercept_session_reset_callback (session_t *s)
+{
+  hcpc_main_t *hcpcm = &hcpc_main;
+  hcpc_session_t *ps;
+
+  HCPC_DBG ("session [%u]", s->opaque);
+  clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
+  ps = hcpc_session_get (s->opaque);
+  ASSERT (ps);
+  ps->state = HCPC_SESSION_CLOSED;
+  hcpc_session_close_intercept (ps);
+  /* we want stream reset here */
+  if (!ps->http_disconnected &&
+      ps->http.session_handle != SESSION_INVALID_HANDLE)
+    {
+      session_reset (session_get_from_handle (ps->http.session_handle));
+      ps->http_disconnected = 1;
+    }
+  clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+  hcpc_worker_stats_inc (s->thread_index, tunnels_reset_by_client, 1);
 }
 
 static int
-listener_rx_callback (session_t *s)
+hcpc_intercept_rx_callback (session_t *s)
 {
+  hcpc_main_t *hcpcm = &hcpc_main;
   hcpc_session_t *ps;
   svm_fifo_t *http_tx_fifo;
+  hcpc_session_state_t state;
+  session_handle_t sh;
 
-  HCPC_DBG ("session %u", s->opaque);
+  HCPC_DBG ("session [%u]", s->opaque);
+  if (s->flags & SESSION_F_APP_CLOSED)
+    return 0;
+  clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
   ps = hcpc_session_get (s->opaque);
-  if (!ps)
-    return -1;
+  ASSERT (ps);
+  state = ps->state;
+  sh = ps->http.session_handle;
+  if (ps->http_disconnected)
+    {
+      clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+      return -1;
+    }
+  hcpc_timer_update (ps);
+  clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
 
-  if (ps->state < HCPC_SESSION_ESTABLISHED)
+  if (state < HCPC_SESSION_ESTABLISHED)
     return 0;
 
-  if (ps->state == HCPC_SESSION_CLOSED)
-    return -1;
-
   /* send event for http tx fifo */
   http_tx_fifo = s->rx_fifo;
-  if (svm_fifo_set_event (http_tx_fifo))
-    session_program_tx_io_evt (ps->http_session_handle, SESSION_IO_EVT_TX);
+  if (svm_fifo_max_dequeue (http_tx_fifo))
+    if (svm_fifo_set_event (http_tx_fifo))
+      session_program_tx_io_evt (sh, SESSION_IO_EVT_TX);
 
   if (svm_fifo_max_enqueue (http_tx_fifo) <= TCP_MSS)
     svm_fifo_add_want_deq_ntf (http_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
@@ -778,28 +1505,37 @@ listener_rx_callback (session_t *s)
 }
 
 static int
-listener_tx_callback (session_t *s)
+hcpc_intercept_tx_callback (session_t *s)
 {
+  hcpc_main_t *hcpcm = &hcpc_main;
   hcpc_session_t *ps;
+  hcpc_session_state_t state;
+  session_handle_t sh;
 
-  HCPC_DBG ("session %u", s->opaque);
+  HCPC_DBG ("session [%u]", s->opaque);
+  clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
   ps = hcpc_session_get (s->opaque);
   ASSERT (ps);
+  state = ps->state;
+  sh = ps->http.session_handle;
+  if (ps->http_disconnected)
+    {
+      clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+      return -1;
+    }
+  clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
 
-  if (ps->state < HCPC_SESSION_ESTABLISHED)
+  if (state < HCPC_SESSION_ESTABLISHED)
     return 0;
 
-  if (ps->state == HCPC_SESSION_CLOSED)
-    return -1;
-
   /* pass notification to http transport */
-  session_program_transport_io_evt (ps->http_session_handle,
-                                   SESSION_IO_EVT_RX);
+  session_program_transport_io_evt (sh, SESSION_IO_EVT_RX);
   return 0;
 }
 
 static void
-listener_session_cleanup_callback (session_t *s, session_cleanup_ntf_t ntf)
+hcpc_intercept_session_cleanup_callback (session_t *s,
+                                        session_cleanup_ntf_t ntf)
 {
   if (ntf == SESSION_CLEANUP_TRANSPORT)
     return;
@@ -808,13 +1544,13 @@ listener_session_cleanup_callback (session_t *s, session_cleanup_ntf_t ntf)
 }
 
 static int
-listener_add_segment_callback (u32 client_index, u64 segment_handle)
+hcpc_intercept_add_segment_callback (u32 client_index, u64 segment_handle)
 {
   return 0;
 }
 
 static int
-listener_write_early_data (session_t *s)
+hcpc_intercept_write_early_data (session_t *s)
 {
   transport_proto_t tp;
   transport_connection_t *tc;
@@ -843,14 +1579,16 @@ listener_write_early_data (session_t *s)
 }
 
 static session_cb_vft_t listener_session_cb_vft = {
-  .session_accept_callback = listener_accept_callback,
-  .session_disconnect_callback = listener_session_disconnect_callback,
-  .session_reset_callback = listener_session_reset_callback,
-  .builtin_app_rx_callback = listener_rx_callback,
-  .builtin_app_tx_callback = listener_tx_callback,
-  .session_cleanup_callback = listener_session_cleanup_callback,
-  .add_segment_callback = listener_add_segment_callback,
-  .proxy_write_early_data = listener_write_early_data,
+  .session_accept_callback = hcpc_intercept_accept_callback,
+  .session_disconnect_callback = hcpc_intercept_session_disconnect_callback,
+  .session_transport_closed_callback =
+    hcpc_intercept_session_transport_closed_callback,
+  .session_reset_callback = hcpc_intercept_session_reset_callback,
+  .builtin_app_rx_callback = hcpc_intercept_rx_callback,
+  .builtin_app_tx_callback = hcpc_intercept_tx_callback,
+  .session_cleanup_callback = hcpc_intercept_session_cleanup_callback,
+  .add_segment_callback = hcpc_intercept_add_segment_callback,
+  .proxy_write_early_data = hcpc_intercept_write_early_data,
 };
 
 static clib_error_t *
@@ -897,7 +1635,7 @@ hcpc_attach_http_client ()
 }
 
 static clib_error_t *
-hcpc_attach_listener ()
+hcpc_attach_intercept_listener ()
 {
   hcpc_main_t *hcpcm = &hcpc_main;
   vnet_app_attach_args_t _a, *a = &_a;
@@ -929,53 +1667,93 @@ hcpc_attach_listener ()
   return 0;
 }
 
-#define HCPC_ARC_IP4  "ip4-unicast"
-#define HCPC_ARC_IP6  "ip6-unicast"
-#define HCPC_NODE_IP4 "hsi4-in"
-#define HCPC_NODE_IP6 "hsi6-in"
-
-static clib_error_t *
-hcpc_enable_hsi (u8 is_ip4)
+static uword
+hcpc_event_process (vlib_main_t *vm, vlib_node_runtime_t *rt, vlib_frame_t *f)
 {
   hcpc_main_t *hcpcm = &hcpc_main;
-  vnet_feature_registration_t *reg;
-  clib_error_t *err = 0;
-  int rv;
+  f64 now, timeout = 1.0;
+  uword *event_data = 0;
+  uword event_type;
 
-  if (is_ip4)
+  while (1)
     {
-      if (hcpcm->hsi4_enabled)
-       return 0;
-      reg = vnet_get_feature_reg (HCPC_ARC_IP4, HCPC_NODE_IP4);
+      vlib_process_wait_for_event_or_clock (vm, timeout);
+      event_type = vlib_process_get_events (vm, (uword **) &event_data);
+      switch (event_type)
+       {
+       case HCPC_EVENT_PROXY_CONNECTED:
+         vlib_worker_thread_barrier_sync (vm);
+         hcpc_start_listen ();
+         vlib_worker_thread_barrier_release (vm);
+         break;
+       case ~0:
+         /* TODO: proxy connection keep-alive */
+         /* expire timers */
+         now = vlib_time_now (vm);
+         clib_spinlock_lock_if_init (&hcpcm->tw_lock);
+         tw_timer_expire_timers_2t_1w_2048sl (&hcpcm->tw, now);
+         clib_spinlock_unlock_if_init (&hcpcm->tw_lock);
+         break;
+       /* TODO: auto proxy reconnect */
+       default:
+         HCPC_DBG ("unknown event %u", event_type);
+         break;
+       }
+      vec_reset_length (event_data);
     }
-  else
+
+  return 0;
+}
+
+clib_error_t *
+hcpc_enable (vlib_main_t *vm)
+{
+  vlib_thread_main_t *vtm = vlib_get_thread_main ();
+  hcpc_main_t *hcpcm = &hcpc_main;
+  hcpc_worker_t *wrk;
+  clib_error_t *err = 0;
+  u32 num_threads;
+
+  hcpcm->intercept_proto_fn =
+    vlib_get_plugin_symbol ("hsi_plugin.so", "hsi_intercept_proto");
+  if (hcpcm->intercept_proto_fn == 0)
+    return clib_error_return (0, "hsi_plugin.so not loaded");
+
+  session_enable_disable_args_t args = { .is_en = 1,
+                                        .rt_engine_type =
+                                          RT_BACKEND_ENGINE_RULE_TABLE };
+  vlib_worker_thread_barrier_sync (vm);
+  vnet_session_enable_disable (vm, &args);
+  vlib_worker_thread_barrier_release (vm);
+
+  if (vlib_num_workers ())
     {
-      if (hcpcm->hsi6_enabled)
-       return 0;
-      reg = vnet_get_feature_reg (HCPC_ARC_IP6, HCPC_NODE_IP6);
+      clib_spinlock_init (&hcpcm->sessions_lock);
+      clib_spinlock_init (&hcpcm->tw_lock);
     }
-  if (reg == 0)
-    return clib_error_return (0, "hsi plugin not loaded");
 
-  if (reg->enable_disable_cb)
+  num_threads = 1 + vtm->n_threads;
+  vec_validate (hcpcm->workers, num_threads - 1);
+  vec_foreach (wrk, hcpcm->workers)
     {
-      if ((err = reg->enable_disable_cb (hcpcm->sw_if_index, 1)))
-       return err;
+      clib_memset (&wrk->stats, 0, sizeof (wrk->stats));
     }
 
-  if (is_ip4)
-    rv = vnet_feature_enable_disable (HCPC_ARC_IP4, HCPC_NODE_IP4,
-                                     hcpcm->sw_if_index, 1, 0, 0);
-  else
-    rv = vnet_feature_enable_disable (HCPC_ARC_IP6, HCPC_NODE_IP6,
-                                     hcpcm->sw_if_index, 1, 0, 0);
-  if (rv)
-    return clib_error_return (0, "vnet feature enable failed (rv=%d)", rv);
+  err = hcpc_attach_http_client ();
+  if (err)
+    return err;
+
+  err = hcpc_attach_intercept_listener ();
+  if (err)
+    return err;
+
+  if (hcpcm->process_node_index == 0)
+    hcpcm->process_node_index =
+      vlib_process_create (vm, "hcpc-event-process", hcpc_event_process, 16);
+
+  tw_timer_wheel_init_2t_1w_2048sl (&hcpcm->tw, hcpc_timer_expired_cb, 1.0,
+                                   ~0);
 
-  if (is_ip4)
-    hcpcm->hsi4_enabled = 1;
-  else
-    hcpcm->hsi6_enabled = 1;
   return 0;
 }
 
@@ -996,6 +1774,9 @@ hcpc_create_command_fn (vlib_main_t *vm, unformat_input_t *input,
   u64 mem_size;
   vnet_main_t *vnm = vnet_get_main ();
 
+  if (hcpcm->http_app_index != APP_INVALID_INDEX)
+    return clib_error_return (0, "http connect proxy client already enabled");
+
   if (!unformat_user (input, unformat_line_input, line_input))
     return clib_error_return (0, "expected arguments");
 
@@ -1017,6 +1798,9 @@ hcpc_create_command_fn (vlib_main_t *vm, unformat_input_t *input,
       else if (unformat (line_input, "interface %U",
                         unformat_vnet_sw_interface, vnm, &hcpcm->sw_if_index))
        ;
+      else if (unformat (line_input, "udp-idle-timeout %u",
+                        &hcpcm->udp_idle_timeout))
+       ;
       else
        {
          err = clib_error_return (0, "unknown input `%U'",
@@ -1055,26 +1839,14 @@ hcpc_create_command_fn (vlib_main_t *vm, unformat_input_t *input,
       goto done;
     }
 
-  err = hcpc_enable_hsi (l->sep.is_ip4);
+  err = hcpc_enable (vm);
   if (err)
     goto done;
 
-  session_enable_disable_args_t args = { .is_en = 1,
-                                        .rt_engine_type =
-                                          RT_BACKEND_ENGINE_RULE_TABLE };
-  vlib_worker_thread_barrier_sync (vm);
-  vnet_session_enable_disable (vm, &args);
-  vlib_worker_thread_barrier_release (vm);
-
-  err = hcpc_attach_http_client ();
+  err = hcpc_listener_add (l);
   if (err)
     goto done;
 
-  err = hcpc_attach_listener ();
-  if (err)
-    goto done;
-
-  hcpc_listener_add (l);
   hcpc_connect_http_connection ();
 
   hcpcm->is_init = 1;
@@ -1089,7 +1861,7 @@ VLIB_CLI_COMMAND (hcpc_create_command, static) = {
   .path = "http connect proxy client enable",
   .short_help =
     "http connect proxy client enable server-uri <http[s]://ip:port>\n"
-    "interface <intfc> listener <tcp|udp://ip:port>\n"
+    "interface <intfc> listener <tcp|udp://ip:port> [udp-idle-timeout <n>]\n"
     "[fifo-size <nM|G>] [private-segment-size <nM|G>] [prealloc-fifos <n>]",
   .function = hcpc_create_command_fn,
 };
@@ -1142,10 +1914,9 @@ hcpc_add_del_listener_command_fn (vlib_main_t *vm, unformat_input_t *input,
 
   if (is_add)
     {
-      err = hcpc_enable_hsi (l->sep.is_ip4);
+      err = hcpc_listener_add (l);
       if (err)
        goto done;
-      hcpc_listener_add (l);
     }
   else
     {
@@ -1180,7 +1951,9 @@ hcpc_show_command_fn (vlib_main_t *vm, unformat_input_t *input,
                      vlib_cli_command_t *cmd)
 {
   hcpc_main_t *hcpcm = &hcpc_main;
-  u8 show_listeners = 0, show_sessions = 0;
+  u8 show_listeners = 0, show_sessions = 0, show_stats = 0;
+  clib_thread_index_t ti;
+  hcpc_worker_t *wrk;
 
   if (!hcpcm->is_init)
     return clib_error_return (0, "http connect proxy client disabled");
@@ -1191,6 +1964,8 @@ hcpc_show_command_fn (vlib_main_t *vm, unformat_input_t *input,
        show_listeners = 1;
       else if (unformat (input, "sessions"))
        show_sessions = 1;
+      else if (unformat (input, "stats"))
+       show_stats = 1;
       else
        {
          return clib_error_return (0, "unknown input `%U'",
@@ -1221,20 +1996,55 @@ hcpc_show_command_fn (vlib_main_t *vm, unformat_input_t *input,
   if (show_sessions)
     {
       hcpc_session_t *ps;
+      session_t *s;
       transport_connection_t *tc;
+      clib_spinlock_lock_if_init (&hcpcm->sessions_lock);
       pool_foreach (ps, hcpcm->sessions)
        {
          if (ps->flags & HCPC_SESSION_F_IS_PARENT)
            continue;
-         tc = session_get_transport (
-           session_get_from_handle (ps->listener_session_handle));
-         vlib_cli_output (vm, "session [%lu] %U %U:%u->%U:%u %U",
-                          ps->session_index, format_transport_proto,
-                          tc->proto, format_ip46_address, &tc->rmt_ip,
-                          tc->is_ip4, clib_net_to_host_u16 (tc->rmt_port),
-                          format_ip46_address, &tc->lcl_ip, tc->is_ip4,
-                          clib_net_to_host_u16 (tc->lcl_port),
-                          format_hcpc_session_state, ps->state);
+         if ((ps->state > HCPC_SESSION_ESTABLISHED) ||
+             (ps->intercept.session_handle == SESSION_INVALID_HANDLE))
+           {
+             vlib_cli_output (vm, "session [%lu] %U\n%U", ps->session_index,
+                              format_hcpc_session_state, ps->state,
+                              format_hcpc_session_vars, ps);
+             continue;
+           }
+
+         s = session_get_from_handle (ps->intercept.session_handle);
+         tc = session_get_transport (s);
+         /* transport closed, we don't know yet */
+         if (!tc)
+           {
+             vlib_cli_output (
+               vm, "session [%lu] INTERCEPT-TRANSPORT-CLOSED\n%U",
+               ps->session_index, format_hcpc_session_vars, ps);
+             continue;
+           }
+
+         vlib_cli_output (
+           vm, "session [%lu] %U %U:%u->%U:%u %U\n%U", ps->session_index,
+           format_transport_proto, tc->proto, format_ip46_address,
+           &tc->rmt_ip, tc->is_ip4, clib_net_to_host_u16 (tc->rmt_port),
+           format_ip46_address, &tc->lcl_ip, tc->is_ip4,
+           clib_net_to_host_u16 (tc->lcl_port), format_hcpc_session_state,
+           ps->state, format_hcpc_session_vars, ps);
+       }
+      clib_spinlock_unlock_if_init (&hcpcm->sessions_lock);
+    }
+
+  if (show_stats)
+    {
+      for (ti = 0; ti < vec_len (hcpcm->workers); ti++)
+       {
+         wrk = &hcpcm->workers[ti];
+         vlib_cli_output (vm, "Thread %u:\n", ti);
+#define _(name, str)                                                          \
+  if (wrk->stats.name)                                                        \
+    vlib_cli_output (vm, " %lu %s", wrk->stats.name, str);
+         foreach_hcpc_wrk_stat
+#undef _
        }
     }
 
@@ -1243,10 +2053,40 @@ hcpc_show_command_fn (vlib_main_t *vm, unformat_input_t *input,
 
 VLIB_CLI_COMMAND (hcpc_show_command, static) = {
   .path = "show http connect proxy client",
-  .short_help = "show http connect proxy [listeners] [sessions]",
+  .short_help = "show http connect proxy [listeners] [sessions] [stats]",
   .function = hcpc_show_command_fn,
 };
 
+static clib_error_t *
+hcpc_clear_stats_fn (vlib_main_t *vm, unformat_input_t *input,
+                    vlib_cli_command_t *cmd)
+{
+  hcpc_main_t *hcpcm = &hcpc_main;
+  hcpc_worker_t *wrk;
+  clib_thread_index_t ti;
+
+  if (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
+    return clib_error_return (0, "unknown input `%U'", format_unformat_error,
+                             input);
+
+  if (!hcpcm->is_init)
+    return clib_error_return (0, "http connect proxy client disabled");
+
+  for (ti = 0; ti < vec_len (hcpcm->workers); ti++)
+    {
+      wrk = &hcpcm->workers[ti];
+      clib_memset (&wrk->stats, 0, sizeof (wrk->stats));
+    }
+
+  return 0;
+}
+
+VLIB_CLI_COMMAND (hcpc_clear_stats_command, static) = {
+  .path = "clear http connect proxy client stats",
+  .short_help = "clear http connect proxy client stats",
+  .function = hcpc_clear_stats_fn,
+};
+
 clib_error_t *
 hcpc_main_init (vlib_main_t *vm)
 {
@@ -1261,6 +2101,7 @@ hcpc_main_init (vlib_main_t *vm)
   hcpcm->private_segment_size = 128 << 20;
   hcpcm->prealloc_fifos = 0;
   hcpcm->sw_if_index = ~0;
+  hcpcm->udp_idle_timeout = 600;
 
   vec_validate (hcpcm->capsule_proto_header_buf, 10);
   http_init_headers_ctx (&hcpcm->capsule_proto_header,
index 082a074..7de9040 100644 (file)
@@ -424,7 +424,7 @@ func linkSetMultiQueue(ifName string) error {
        return nil
 }
 
-func newCommand(s []string, ns string) *exec.Cmd {
+func CommandInNetns(s []string, ns string) *exec.Cmd {
        return appendNetns(s, ns)
 }
 
index 3bac3bf..18d6305 100644 (file)
@@ -30,6 +30,7 @@ type MasqueSuite struct {
                Nginx    string
                NginxSsl string
                Proxy    string
+               Unused   string
        }
        NetNamespaces struct {
                Client string
@@ -59,6 +60,7 @@ func (s *MasqueSuite) SetupSuite() {
        s.Ports.Nginx = s.GeneratePort()
        s.Ports.NginxSsl = s.GeneratePort()
        s.Ports.Proxy = s.GeneratePort()
+       s.Ports.Unused = s.GeneratePort()
        s.NetNamespaces.Client = s.GetNetNamespaceByName("client-ns")
        s.Interfaces.Client = s.GetInterfaceByName("cln")
        s.Interfaces.TunnelClient = s.GetInterfaceByName("cln-tun")
@@ -73,8 +75,11 @@ func (s *MasqueSuite) SetupSuite() {
 func (s *MasqueSuite) SetupTest() {
        s.HstSuite.SetupTest()
 
+       var memoryConfig Stanza
+       memoryConfig.NewStanza("memory").Append("main-heap-size 2G")
+
        // vpp masque proxy client
-       clientVpp, err := s.Containers.VppClient.newVppInstance(s.Containers.VppClient.AllocatedCpus)
+       clientVpp, err := s.Containers.VppClient.newVppInstance(s.Containers.VppClient.AllocatedCpus, memoryConfig)
        s.AssertNotNil(clientVpp, fmt.Sprint(err))
        s.AssertNil(clientVpp.Start())
        idx, err := clientVpp.createAfPacket(s.Interfaces.Client, false)
@@ -85,7 +90,7 @@ func (s *MasqueSuite) SetupTest() {
        s.AssertNotEqual(0, idx)
 
        // vpp masque proxy server
-       serverVpp, err := s.Containers.VppServer.newVppInstance(s.Containers.VppServer.AllocatedCpus)
+       serverVpp, err := s.Containers.VppServer.newVppInstance(s.Containers.VppServer.AllocatedCpus, memoryConfig)
        s.AssertNotNil(serverVpp, fmt.Sprint(err))
        s.AssertNil(serverVpp.Start())
        idx, err = serverVpp.createAfPacket(s.Interfaces.TunnelServer, false)
@@ -140,18 +145,27 @@ func (s *MasqueSuite) TeardownTest() {
        serverVpp := s.Containers.VppServer.VppInstance
        if CurrentSpecReport().Failed() {
                s.CollectNginxLogs(s.Containers.NginxServer)
+               s.CollectIperfLogs(s.Containers.IperfServer)
                s.Log(clientVpp.Vppctl("show session verbose 2"))
                s.Log(clientVpp.Vppctl("show error"))
-               s.Log(clientVpp.Vppctl("show http connect proxy client listeners sessions"))
+               s.Log(clientVpp.Vppctl("show http connect proxy client listeners sessions stats"))
+               s.Log(clientVpp.Vppctl("show http stats"))
+               s.Log(clientVpp.Vppctl("show tcp stats"))
                s.Log(serverVpp.Vppctl("show session verbose 2"))
                s.Log(serverVpp.Vppctl("show error"))
+               s.Log(serverVpp.Vppctl("show http stats"))
+               s.Log(serverVpp.Vppctl("show tcp stats"))
        }
 }
 
-func (s *MasqueSuite) ProxyClientConnect(proto, port string) {
+func (s *MasqueSuite) ProxyClientConnect(proto, port string, extraArgs ...string) {
+       extras := ""
+       if len(extraArgs) > 0 {
+               extras = strings.Join(extraArgs, " ")
+       }
        vpp := s.Containers.VppClient.VppInstance
-       cmd := fmt.Sprintf("http connect proxy client enable server-uri https://%s:%s listener %s://0.0.0.0:%s interface host-%s",
-               s.ProxyAddr(), s.Ports.Proxy, proto, port, s.Interfaces.Client.Name())
+       cmd := fmt.Sprintf("http connect proxy client enable server-uri https://%s:%s listener %s://0.0.0.0:%s interface host-%s %s",
+               s.ProxyAddr(), s.Ports.Proxy, proto, port, s.Interfaces.Client.Name(), extras)
        s.Log(vpp.Vppctl(cmd))
 
        connected := false
index 666afd5..3be3220 100644 (file)
@@ -243,7 +243,7 @@ func (s *HstSuite) CollectH2loadLogs(h2loadContainer *Container) {
 }
 
 func (s *HstSuite) StartHttpServer(running chan struct{}, done chan struct{}, addressPort, netNs string) {
-       cmd := newCommand([]string{"./http_server", addressPort, s.Ppid, s.ProcessIndex}, netNs)
+       cmd := CommandInNetns([]string{"./http_server", addressPort, s.Ppid, s.ProcessIndex}, netNs)
        err := cmd.Start()
        s.Log(cmd)
        if err != nil {
@@ -260,7 +260,7 @@ func (s *HstSuite) StartWget(finished chan error, server_ip, port, query, netNs
                finished <- errors.New("wget error")
        }()
 
-       cmd := newCommand([]string{"wget", "--timeout=10", "--no-proxy", "--tries=5", "-O", "/dev/null", server_ip + ":" + port + "/" + query},
+       cmd := CommandInNetns([]string{"wget", "--timeout=10", "--no-proxy", "--tries=5", "-O", "/dev/null", server_ip + ":" + port + "/" + query},
                netNs)
        s.Log(cmd)
        o, err := cmd.CombinedOutput()
@@ -283,7 +283,7 @@ func (s *HstSuite) StartCurl(finished chan error, uri, netNs, expectedRespCode s
        c := []string{"curl", "-v", "-s", "-k", "--max-time", strconv.Itoa(timeout), "-o", "/dev/null", "--noproxy", "*"}
        c = append(c, args...)
        c = append(c, uri)
-       cmd := newCommand(c, netNs)
+       cmd := CommandInNetns(c, netNs)
        s.Log(cmd)
        o, err := cmd.CombinedOutput()
        s.Log(string(o))
@@ -304,7 +304,7 @@ func (s *HstSuite) StartIperfClient(finished chan error, clientAddress, serverAd
 
        c := []string{"iperf3", "-c", serverAddress, "-B", clientAddress, "-J", "-l", "1460", "-b", "10g", "-p", serverPort}
        c = append(c, args...)
-       cmd := newCommand(c, netNs)
+       cmd := CommandInNetns(c, netNs)
        s.Log(cmd)
        o, err := cmd.CombinedOutput()
        if err != nil {
index 5adce76..5d9fc38 100644 (file)
@@ -34,9 +34,12 @@ func init() {
        RegisterVppUdpProxyMWTests(VppProxyUdpMigrationMWTest, VppConnectUdpStressMWTest)
        RegisterEnvoyProxyTests(EnvoyHttpGetTcpTest, EnvoyHttpPutTcpTest)
        RegisterNginxProxySoloTests(NginxMirroringTest, MirrorMultiThreadTest)
-       RegisterMasqueTests(VppConnectProxyClientDownloadTcpTest, VppConnectProxyClientDownloadUdpTest,
-               VppConnectProxyClientUploadTcpTest, VppConnectProxyClientUploadUdpTest)
+       RegisterMasqueTests(VppConnectProxyClientDownloadUdpTest,
+               VppConnectProxyClientUploadUdpTest, VppConnectProxyMemLeakTest)
        RegisterMasqueSoloTests(VppConnectProxyIperfTcpTest, VppConnectProxyIperfUdpTest)
+       RegisterMasqueMWTests(VppConnectProxyIperfTcpMWTest, VppConnectProxyIperfUdpMWTest, VppConnectProxyClientUploadTcpMWTest,
+               VppConnectProxyClientTargetUnreachableMWTest, VppConnectProxyClientDownloadTcpMWTest,
+               VppConnectProxyClientStressMWTest, VppConnectProxyClientUdpIdleMWTest, VppConnectProxyClientServerClosedTcpMWTest)
 }
 
 func VppProxyHttpGetTcpMWTest(s *VppProxySuite) {
@@ -664,7 +667,59 @@ func VppConnectUdpStressMWTest(s *VppUdpProxySuite) {
        vppConnectUdpStressLoad(s)
 }
 
-func VppConnectProxyClientDownloadTcpTest(s *MasqueSuite) {
+func vppConnectProxyClientCheckCleanup(s *MasqueSuite) {
+       clientVpp := s.Containers.VppClient.VppInstance
+       closed := false
+       for nTries := 0; nTries < 35; nTries++ {
+               o := clientVpp.Vppctl("show http connect proxy client sessions")
+               if !strings.Contains(o, "session [") {
+                       closed = true
+                       break
+               }
+               time.Sleep(1 * time.Second)
+       }
+       s.AssertEqual(closed, true)
+       h2Stats := clientVpp.Vppctl("show http stats")
+       streamsOpened := 0
+       streamsClosed := 0
+       lines := strings.Split(h2Stats, "\n")
+       for _, line := range lines {
+               if strings.Contains(line, "application streams opened") {
+                       tmp := strings.Split(line, " ")
+                       streamsOpened, _ = strconv.Atoi(tmp[1])
+               }
+               if strings.Contains(line, "application streams closed") {
+                       tmp := strings.Split(line, " ")
+                       streamsClosed, _ = strconv.Atoi(tmp[1])
+               }
+       }
+       // one stream for http/2 connection (parent stays open)
+       s.AssertEqual(streamsOpened-streamsClosed, 1)
+}
+
+func vppConnectProxyServerCheckCleanup(s *MasqueSuite) {
+       o := s.Containers.VppServer.VppInstance.Vppctl("show session verbose")
+       s.AssertNotContains(o, "[H2]")
+       h2Stats := s.Containers.VppServer.VppInstance.Vppctl("show http stats")
+       streamsOpened := 0
+       streamsClosed := 0
+       lines := strings.Split(h2Stats, "\n")
+       for _, line := range lines {
+               if strings.Contains(line, "application streams opened") {
+                       tmp := strings.Split(line, " ")
+                       streamsOpened, _ = strconv.Atoi(tmp[1])
+               }
+               if strings.Contains(line, "application streams closed") {
+                       tmp := strings.Split(line, " ")
+                       streamsClosed, _ = strconv.Atoi(tmp[1])
+               }
+       }
+       s.AssertEqual(streamsOpened-streamsClosed, 0)
+}
+
+func VppConnectProxyClientDownloadTcpMWTest(s *MasqueSuite) {
+       s.CpusPerVppContainer = 3
+       s.SetupTest()
        s.StartNginxServer()
        clientVpp := s.Containers.VppClient.VppInstance
        s.ProxyClientConnect("tcp", s.Ports.NginxSsl)
@@ -683,6 +738,9 @@ func VppConnectProxyClientDownloadTcpTest(s *MasqueSuite) {
        }()
        s.Log(clientVpp.Vppctl("show http connect proxy client sessions"))
        s.AssertNil(<-finished)
+       // test client initiated stream close
+       vppConnectProxyClientCheckCleanup(s)
+       vppConnectProxyServerCheckCleanup(s)
 }
 
 func VppConnectProxyClientDownloadUdpTest(s *MasqueSuite) {
@@ -700,7 +758,9 @@ func VppConnectProxyClientDownloadUdpTest(s *MasqueSuite) {
        s.AssertNil(<-finished)
 }
 
-func VppConnectProxyClientUploadTcpTest(s *MasqueSuite) {
+func VppConnectProxyClientUploadTcpMWTest(s *MasqueSuite) {
+       s.CpusPerVppContainer = 3
+       s.SetupTest()
        s.StartNginxServer()
        s.ProxyClientConnect("tcp", s.Ports.NginxSsl)
 
@@ -771,9 +831,10 @@ func VppConnectProxyIperfTcpTest(s *MasqueSuite) {
 
 func VppConnectProxyIperfUdpTest(s *MasqueSuite) {
        s.Containers.IperfServer.Run()
-       s.ProxyClientConnect("udp", s.Ports.Nginx)
+       // test listen all, we are running solo anyway
+       s.ProxyClientConnect("udp", "0")
        clientVpp := s.Containers.VppClient.VppInstance
-       cmd := fmt.Sprintf("http connect proxy client listener add listener tcp://0.0.0.0:%s", s.Ports.Nginx)
+       cmd := fmt.Sprintf("http connect proxy client listener add listener tcp://0.0.0.0:0")
        s.Log(clientVpp.Vppctl(cmd))
 
        stopServerCh := make(chan struct{})
@@ -801,3 +862,150 @@ func VppConnectProxyIperfUdpTest(s *MasqueSuite) {
        s.Log(clientVpp.Vppctl("show http connect proxy client sessions"))
        s.AssertNil(<-finished)
 }
+
+func VppConnectProxyIperfTcpMWTest(s *MasqueSuite) {
+       s.CpusPerVppContainer = 3
+       s.SetupTest()
+       VppConnectProxyIperfTcpTest(s)
+       // test server send rst_stream (iperf data flows)
+       vppConnectProxyClientCheckCleanup(s)
+       vppConnectProxyServerCheckCleanup(s)
+}
+
+func VppConnectProxyIperfUdpMWTest(s *MasqueSuite) {
+       s.CpusPerVppContainer = 3
+       s.SetupTest()
+       VppConnectProxyIperfUdpTest(s)
+       clientVpp := s.Containers.VppClient.VppInstance
+       closed := false
+       for nTries := 0; nTries < 60; nTries++ {
+               o := clientVpp.Vppctl("show http connect proxy client sessions")
+               if !strings.Contains(o, "] tcp ") {
+                       closed = true
+                       break
+               }
+               time.Sleep(1 * time.Second)
+       }
+       s.AssertEqual(closed, true)
+}
+
+func VppConnectProxyClientTargetUnreachableMWTest(s *MasqueSuite) {
+       s.CpusPerVppContainer = 3
+       s.SetupTest()
+       s.StartNginxServer()
+       s.ProxyClientConnect("tcp", s.Ports.Unused)
+
+       uri := fmt.Sprintf("https://%s:%s/httpTestFile", s.NginxAddr(), s.Ports.Unused)
+       finished := make(chan error, 1)
+       go func() {
+               defer GinkgoRecover()
+               s.StartCurl(finished, uri, s.NetNamespaces.Client, "200", 30, []string{"--http1.1"})
+       }()
+       s.AssertNotNil(<-finished)
+
+       vppConnectProxyClientCheckCleanup(s)
+}
+
+func VppConnectProxyClientServerClosedTcpMWTest(s *MasqueSuite) {
+       s.CpusPerVppContainer = 3
+       s.SetupTest()
+       s.StartNginxServer()
+       clientVpp := s.Containers.VppClient.VppInstance
+       s.ProxyClientConnect("tcp", s.Ports.Nginx)
+
+       uri := fmt.Sprintf("http://%s:%s/64B", s.NginxAddr(), s.Ports.Nginx)
+       finished := make(chan error, 1)
+       go func() {
+               defer GinkgoRecover()
+               // run http/1.0 so server start closing
+               s.StartCurl(finished, uri, s.NetNamespaces.Client, "200", 30, []string{"--http1.0"})
+       }()
+       s.Log(clientVpp.Vppctl("show http connect proxy client sessions"))
+       s.AssertNil(<-finished)
+       // test server initiated stream close
+       vppConnectProxyClientCheckCleanup(s)
+       vppConnectProxyServerCheckCleanup(s)
+}
+
+func VppConnectProxyClientUdpIdleMWTest(s *MasqueSuite) {
+       s.CpusPerVppContainer = 3
+       s.SetupTest()
+       s.StartNginxServer()
+       s.ProxyClientConnect("udp", s.Ports.NginxSsl, "udp-idle-timeout 5")
+
+       uri := fmt.Sprintf("https://%s:%s/64B", s.NginxAddr(), s.Ports.NginxSsl)
+       finished := make(chan error, 1)
+       go func() {
+               defer GinkgoRecover()
+               s.StartCurl(finished, uri, s.NetNamespaces.Client, "200", 30, []string{"--http3-only"})
+       }()
+       s.AssertNil(<-finished)
+
+       vppConnectProxyClientCheckCleanup(s)
+       vppConnectProxyServerCheckCleanup(s)
+}
+
+func VppConnectProxyMemLeakTest(s *MasqueSuite) {
+       s.SkipUnlessLeakCheck()
+
+       s.StartNginxServer()
+       s.ProxyClientConnect("tcp", s.Ports.Nginx)
+
+       clientVpp := s.Containers.VppClient.VppInstance
+       serverVpp := s.Containers.VppServer.VppInstance
+       /* no goVPP less noise */
+       clientVpp.Disconnect()
+       serverVpp.Disconnect()
+
+       uri := fmt.Sprintf("http://%s:%s/64B", s.NginxAddr(), s.Ports.Nginx)
+
+       /* warmup requests (FIB, pool allocations) */
+       finished := make(chan error, 1)
+       go func() {
+               defer GinkgoRecover()
+               // run http/1.0 so server start closing
+               s.StartCurl(finished, uri, s.NetNamespaces.Client, "200", 30, []string{"--http1.1"})
+       }()
+       s.AssertNil(<-finished)
+
+       /* let's give it some time to clean up sessions, so pool elements can be reused and we have less noise */
+       vppConnectProxyClientCheckCleanup(s)
+       vppConnectProxyServerCheckCleanup(s)
+
+       clientVpp.EnableMemoryTrace()
+       clientTraces1, err := clientVpp.GetMemoryTrace()
+       s.AssertNil(err, fmt.Sprint(err))
+
+       finished = make(chan error, 1)
+       go func() {
+               defer GinkgoRecover()
+               // run http/1.0 so server start closing
+               s.StartCurl(finished, uri, s.NetNamespaces.Client, "200", 30, []string{"--http1.1"})
+       }()
+       s.AssertNil(<-finished)
+
+       /* let's give it some time to clean up sessions */
+       vppConnectProxyClientCheckCleanup(s)
+       vppConnectProxyServerCheckCleanup(s)
+
+       clientTraces2, err := clientVpp.GetMemoryTrace()
+       s.AssertNil(err, fmt.Sprint(err))
+       clientVpp.MemLeakCheck(clientTraces1, clientTraces2)
+}
+
+func VppConnectProxyClientStressMWTest(s *MasqueSuite) {
+       s.CpusPerVppContainer = 3
+       s.SetupTest()
+       s.StartNginxServer()
+       s.ProxyClientConnect("tcp", s.Ports.Nginx)
+
+       // try to open more tunnels than SETTINGS_MAX_CONCURRENT_STREAMS, (100 - 1 for parent), to test failed http connects
+       uri := fmt.Sprintf("http://%s:%s/64B", s.NginxAddr(), s.Ports.Nginx)
+       cmd := CommandInNetns([]string{"ab", "-q", "-l", "-n", "10000", "-c", "102", "-s", "5", "-r", uri}, s.NetNamespaces.Client)
+       s.Log(cmd)
+       res, _ := cmd.CombinedOutput()
+       s.Log(string(res))
+
+       vppConnectProxyClientCheckCleanup(s)
+       vppConnectProxyServerCheckCleanup(s)
+}
index 2377329..eeba46c 100644 (file)
@@ -33,5 +33,8 @@ http {
       create_full_put_path off;
       dav_access all:rw;
     }
+    location /64B {
+      return 200 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx';
+    }
   }
 }