From: Matus Fabian Date: Wed, 20 Aug 2025 16:18:46 +0000 (-0400) Subject: hsa: http connect proxy client multiworker support X-Git-Url: https://gerrit.fd.io/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F16%2F43616%2F24;p=vpp.git hsa: http connect proxy client multiworker support Type: improvement Change-Id: I24e80476136712ee8dd4c8c4b3cd7e5c017acc80 Signed-off-by: Matus Fabian --- diff --git a/src/plugins/hs_apps/http_connect_proxy_client.c b/src/plugins/hs_apps/http_connect_proxy_client.c index 32cc729b2fe..c06324313b4 100644 --- a/src/plugins/hs_apps/http_connect_proxy_client.c +++ b/src/plugins/hs_apps/http_connect_proxy_client.c @@ -9,6 +9,7 @@ #include #include #include +#include #define HCPC_DEBUG 0 @@ -20,9 +21,15 @@ #define TCP_MSS 1460 +#define HCPC_TIMER_HANDLE_INVALID ((u32) ~0) + +#define HCPC_EVENT_PROXY_CONNECTED 1 + #define foreach_hcpc_session_state \ + _ (CREATED, "CREATED") \ _ (CONNECTING, "CONNECTING") \ _ (ESTABLISHED, "ESTABLISHED") \ + _ (CLOSING, "CLOSING") \ _ (CLOSED, "CLOSED") typedef enum @@ -33,32 +40,45 @@ typedef enum } hcpc_session_state_t; #define foreach_hcpc_session_flags \ - _ (IS_PARENT) \ - _ (IS_UDP) \ - _ (HTTP_DISCONNECTED) \ - _ (LISTENER_DISCONNECTED) + _ (IS_PARENT, "is-parent") \ + _ (IS_UDP, "is-udp") \ + _ (HTTP_SHUTDOWN, "http-shutdown") \ + _ (INTERCEPT_SHUTDOWN, "intercept-shutdown") typedef enum { -#define _(sym) HCPC_SESSION_F_BIT_##sym, +#define _(sym, str) HCPC_SESSION_F_BIT_##sym, foreach_hcpc_session_flags #undef _ + HCPC_SESSION_N_F_BITS } hcpc_session_flags_bit_t; typedef enum { -#define _(sym) HCPC_SESSION_F_##sym = 1 << HCPC_SESSION_F_BIT_##sym, +#define _(sym, str) HCPC_SESSION_F_##sym = 1 << HCPC_SESSION_F_BIT_##sym, foreach_hcpc_session_flags #undef _ } hcpc_session_flags_t; +typedef struct +{ + session_handle_t session_handle; + svm_fifo_t *rx_fifo; + svm_fifo_t *tx_fifo; +} hcpc_session_side_t; + typedef struct { u32 session_index; hcpc_session_state_t state; - hcpc_session_flags_t flags; - session_handle_t listener_session_handle; - session_handle_t http_session_handle; + u8 flags; + hcpc_session_side_t intercept; + hcpc_session_side_t http; + u32 timer_handle; + u32 opaque; + volatile int http_establishing; + volatile int intercept_diconnected; + volatile int http_disconnected; } hcpc_session_t; typedef struct @@ -68,6 +88,32 @@ typedef struct session_handle_t session_handle; } hcpc_listener_t; +#define foreach_hcpc_wrk_stat \ + _ (tunnels_opened, "tunnels opened") \ + _ (udp_idle_timeouts, "udp idle timeouts") \ + _ (tunnels_reset_by_client, "tunnels reset by client") \ + _ (tunnels_reset_by_target, "tunnels reset by target") \ + _ (max_streams_hit, "max streams hit") \ + _ (target_unreachable, "target unreachable") \ + _ (client_closed_before_stream_opened, "client closed before stream " \ + "opened") \ + _ (client_closed_before_tunnel_connected, \ + "client closed before tunnel connected") + +typedef struct +{ +#define _(name, str) u64 name; + foreach_hcpc_wrk_stat +#undef _ +} hcpc_wrk_stats_t; + +typedef struct +{ + hcpc_wrk_stats_t stats; +} hcpc_worker_t; + +typedef void (*hsi_intercept_proto_fn) (transport_proto_t proto, u8 is_ip4); + typedef struct { u32 http_app_index; @@ -86,10 +132,20 @@ typedef struct u32 fifo_size; u32 prealloc_fifos; u64 private_segment_size; + u32 process_node_index; + u32 udp_idle_timeout; + clib_spinlock_t sessions_lock; + clib_spinlock_t tw_lock; + tw_timer_wheel_2t_1w_2048sl_t tw; + hsi_intercept_proto_fn intercept_proto_fn; + hcpc_worker_t *workers; } hcpc_main_t; hcpc_main_t hcpc_main; +#define hcpc_worker_stats_inc(_ti, _stat, _val) \ + hcpcm->workers[_ti].stats._stat += _val + static u8 * format_hcpc_session_state (u8 *s, va_list *va) { @@ -109,6 +165,52 @@ format_hcpc_session_state (u8 *s, va_list *va) return format (s, "%s", t); } +const char *hcpc_session_flags_str[] = { +#define _(sym, str) str, + foreach_hcpc_session_flags +#undef _ + +}; + +static u8 * +format_hcpc_session_flags (u8 *s, va_list *va) +{ + hcpc_session_t *ps = va_arg (*va, hcpc_session_t *); + int i, last = -1; + + for (i = 0; i < HCPC_SESSION_N_F_BITS; i++) + { + if (ps->flags & (1 << i)) + last = i; + } + + for (i = 0; i < last; i++) + { + if (ps->flags & (1 << i)) + s = format (s, "%s | ", hcpc_session_flags_str[i]); + } + if (last >= 0) + s = format (s, "%s", hcpc_session_flags_str[i]); + + return s; +} + +static u8 * +format_hcpc_session_vars (u8 *s, va_list *va) +{ + hcpc_session_t *ps = va_arg (*va, hcpc_session_t *); + + s = format (s, + " http_establishing %u, http_stream_opened %u, " + "http_disconnected %u, intercept_disconnected %u\n", + ps->http_establishing, + ps->http.session_handle != SESSION_INVALID_HANDLE, + ps->http_disconnected, ps->intercept_diconnected); + s = format (s, " flags: %U\n", format_hcpc_session_flags, ps); + + return s; +} + static void hcpc_session_close_http (hcpc_session_t *ps) { @@ -116,27 +218,77 @@ hcpc_session_close_http (hcpc_session_t *ps) vnet_disconnect_args_t _a = { 0 }, *a = &_a; session_error_t rv; - a->handle = ps->http_session_handle; + HCPC_DBG ("session [%u]", ps->session_index); + ASSERT (!vlib_num_workers () || + CLIB_SPINLOCK_IS_LOCKED (&hcpcm->sessions_lock)); + + a->handle = ps->http.session_handle; a->app_index = hcpcm->http_app_index; rv = vnet_disconnect_session (a); if (rv) - clib_warning ("disconnect returned: %U", format_session_error, rv); - ps->flags |= HCPC_SESSION_F_HTTP_DISCONNECTED; + clib_warning ("session %u disconnect returned: %U", ps->session_index, + format_session_error, rv); + ps->http_disconnected = 1; +} + +static void +hcpc_session_shutdown_http (hcpc_session_t *ps) +{ + hcpc_main_t *hcpcm = &hcpc_main; + vnet_shutdown_args_t _a = { 0 }, *a = &_a; + session_error_t rv; + + HCPC_DBG ("session [%u]", ps->session_index); + ASSERT (!vlib_num_workers () || + CLIB_SPINLOCK_IS_LOCKED (&hcpcm->sessions_lock)); + + ps->flags |= HCPC_SESSION_F_HTTP_SHUTDOWN; + a->handle = ps->http.session_handle; + a->app_index = hcpcm->http_app_index; + rv = vnet_shutdown_session (a); + if (rv) + clib_warning ("session %u shutdown returned: %U", ps->session_index, + format_session_error, rv); } static void -hcpc_session_close_listener (hcpc_session_t *ps) +hcpc_session_close_intercept (hcpc_session_t *ps) { hcpc_main_t *hcpcm = &hcpc_main; vnet_disconnect_args_t _a = { 0 }, *a = &_a; session_error_t rv; - a->handle = ps->listener_session_handle; + HCPC_DBG ("session [%u]", ps->session_index); + ASSERT (!vlib_num_workers () || + CLIB_SPINLOCK_IS_LOCKED (&hcpcm->sessions_lock)); + + a->handle = ps->intercept.session_handle; a->app_index = hcpcm->listener_app_index; rv = vnet_disconnect_session (a); if (rv) - clib_warning ("disconnect returned: %U", format_session_error, rv); - ps->flags |= HCPC_SESSION_F_LISTENER_DISCONNECTED; + clib_warning ("session %u disconnect returned: %U", ps->session_index, + format_session_error, rv); + ps->intercept_diconnected = 1; +} + +static void +hcpc_session_shutdown_intercept (hcpc_session_t *ps) +{ + hcpc_main_t *hcpcm = &hcpc_main; + vnet_shutdown_args_t _a = { 0 }, *a = &_a; + session_error_t rv; + + HCPC_DBG ("session [%u]", ps->session_index); + ASSERT (!vlib_num_workers () || + CLIB_SPINLOCK_IS_LOCKED (&hcpcm->sessions_lock)); + + ps->flags |= HCPC_SESSION_F_INTERCEPT_SHUTDOWN; + a->handle = ps->intercept.session_handle; + a->app_index = hcpcm->listener_app_index; + rv = vnet_shutdown_session (a); + if (rv) + clib_warning ("session %u shutdown returned: %U", ps->session_index, + format_session_error, rv); } static hcpc_session_t * @@ -145,10 +297,14 @@ hcpc_session_alloc () hcpc_main_t *hcpcm = &hcpc_main; hcpc_session_t *ps; + ASSERT (!vlib_num_workers () || + CLIB_SPINLOCK_IS_LOCKED (&hcpcm->sessions_lock)); + pool_get_zero (hcpcm->sessions, ps); ps->session_index = ps - hcpcm->sessions; - ps->http_session_handle = SESSION_INVALID_HANDLE; - ps->listener_session_handle = SESSION_INVALID_HANDLE; + ps->http.session_handle = SESSION_INVALID_HANDLE; + ps->intercept.session_handle = SESSION_INVALID_HANDLE; + ps->timer_handle = HCPC_TIMER_HANDLE_INVALID; return ps; } @@ -158,6 +314,10 @@ hcpc_session_free (hcpc_session_t *ps) { hcpc_main_t *hcpcm = &hcpc_main; + HCPC_DBG ("session [%u]", ps->session_index); + ASSERT (!vlib_num_workers () || + CLIB_SPINLOCK_IS_LOCKED (&hcpcm->sessions_lock)); + if (CLIB_DEBUG) memset (ps, 0xB0, sizeof (*ps)); pool_put (hcpcm->sessions, ps); @@ -173,43 +333,169 @@ hcpc_session_get (u32 s_index) return pool_elt_at_index (hcpcm->sessions, s_index); } +static void +hcpc_timer_expired_cb (u32 *expired_timers) +{ + hcpc_main_t *hcpcm = &hcpc_main; + int i; + u32 ps_index; + hcpc_session_t *ps; + + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); + for (i = 0; i < vec_len (expired_timers); i++) + { + ps_index = expired_timers[i] & 0x7FFFFFFF; + ps = hcpc_session_get (ps_index); + if (!ps) + continue; + HCPC_DBG ("session [%u]", ps_index); + ASSERT (ps->flags & HCPC_SESSION_F_IS_UDP); + ASSERT (ps->http_establishing == 0); + ps->state = HCPC_SESSION_CLOSED; + ps->timer_handle = HCPC_TIMER_HANDLE_INVALID; + if (!ps->intercept_diconnected) + hcpc_session_close_intercept (ps); + if (!ps->http_disconnected) + hcpc_session_close_http (ps); + } + clib_spinlock_unlock_if_init (&hcpc_main.sessions_lock); + hcpc_worker_stats_inc (vlib_get_thread_index (), udp_idle_timeouts, + vec_len (expired_timers)); +} + +static inline void +hcpc_timer_start (hcpc_session_t *ps) +{ + hcpc_main_t *hcpcm = &hcpc_main; + + ASSERT (ps->timer_handle == HCPC_TIMER_HANDLE_INVALID); + ASSERT (ps->flags & HCPC_SESSION_F_IS_UDP); + ASSERT (!vlib_num_workers () || + CLIB_SPINLOCK_IS_LOCKED (&hcpcm->sessions_lock)); + + clib_spinlock_lock_if_init (&hcpcm->tw_lock); + ps->timer_handle = tw_timer_start_2t_1w_2048sl ( + &hcpcm->tw, ps->session_index, 0, hcpcm->udp_idle_timeout); + clib_spinlock_unlock_if_init (&hcpcm->tw_lock); +} + +static inline void +hcpc_timer_stop (hcpc_session_t *ps) +{ + hcpc_main_t *hcpcm = &hcpc_main; + + ASSERT (!vlib_num_workers () || + CLIB_SPINLOCK_IS_LOCKED (&hcpcm->sessions_lock)); + + if (ps->timer_handle == HCPC_TIMER_HANDLE_INVALID) + return; + + ASSERT (ps->flags & HCPC_SESSION_F_IS_UDP); + + clib_spinlock_lock_if_init (&hcpcm->tw_lock); + tw_timer_stop_2t_1w_2048sl (&hcpcm->tw, ps->timer_handle); + ps->timer_handle = HCPC_TIMER_HANDLE_INVALID; + clib_spinlock_unlock_if_init (&hcpcm->tw_lock); +} + +static inline void +hcpc_timer_update (hcpc_session_t *ps) +{ + hcpc_main_t *hcpcm = &hcpc_main; + + ASSERT (!vlib_num_workers () || + CLIB_SPINLOCK_IS_LOCKED (&hcpcm->sessions_lock)); + + if (ps->timer_handle == HCPC_TIMER_HANDLE_INVALID) + return; + + ASSERT (ps->flags & HCPC_SESSION_F_IS_UDP); + + clib_spinlock_lock_if_init (&hcpcm->tw_lock); + tw_timer_update_2t_1w_2048sl (&hcpcm->tw, ps->timer_handle, + hcpcm->udp_idle_timeout); + clib_spinlock_unlock_if_init (&hcpcm->tw_lock); +} + +static void +hcpc_session_postponed_free_rpc (void *arg) +{ + uword session_index = pointer_to_uword (arg); + hcpc_main_t *hcpcm = &hcpc_main; + hcpc_session_t *ps; + + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); + + HCPC_DBG ("session [%u]", session_index); + ps = hcpc_session_get (session_index); + ASSERT (ps); + segment_manager_dealloc_fifos (ps->intercept.rx_fifo, ps->intercept.tx_fifo); + hcpc_session_free (ps); + + clib_spinlock_unlock_if_init (&hcpc_main.sessions_lock); +} + static void hcpc_delete_session (session_t *s, u8 is_http) { + hcpc_main_t *hcpcm = &hcpc_main; hcpc_session_t *ps; - session_t *ls; - HCPC_DBG ("session %u (is http %u)", s->opaque, is_http); + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); + + HCPC_DBG ("session [%u] (is http %u)", s->opaque, is_http); ps = hcpc_session_get (s->opaque); ASSERT (ps); + hcpc_timer_stop (ps); + if (is_http) { - ps->http_session_handle = SESSION_INVALID_HANDLE; + ps->http.session_handle = SESSION_INVALID_HANDLE; /* http connection session doesn't have listener */ if (ps->flags & HCPC_SESSION_F_IS_PARENT) { - ASSERT (ps->listener_session_handle == SESSION_INVALID_HANDLE); + ASSERT (ps->intercept.session_handle == SESSION_INVALID_HANDLE); hcpc_session_free (ps); + clib_spinlock_unlock_if_init (&hcpc_main.sessions_lock); return; } + + /* revert master thread index change on connect notification */ + ps->intercept.rx_fifo->master_thread_index = + ps->intercept.tx_fifo->master_thread_index; + /* listener already cleaned up */ - if (ps->listener_session_handle == SESSION_INVALID_HANDLE) + if (ps->intercept.session_handle == SESSION_INVALID_HANDLE) { - ASSERT (s->rx_fifo->refcnt == 1); - hcpc_session_free (ps); - return; + if (s->thread_index != ps->intercept.tx_fifo->master_thread_index) + { + s->rx_fifo = 0; + s->tx_fifo = 0; + session_send_rpc_evt_to_thread ( + ps->intercept.tx_fifo->master_thread_index, + hcpc_session_postponed_free_rpc, + uword_to_pointer (ps->session_index, void *)); + } + else + { + ASSERT (s->rx_fifo->refcnt == 1); + hcpc_session_free (ps); + } } - ls = session_get_from_handle (ps->listener_session_handle); - ls->rx_fifo->master_thread_index = ls->tx_fifo->master_thread_index; } else { - ps->listener_session_handle = SESSION_INVALID_HANDLE; + ps->intercept.session_handle = SESSION_INVALID_HANDLE; /* http already cleaned up */ - if (ps->http_session_handle == SESSION_INVALID_HANDLE) - hcpc_session_free (ps); + if (ps->http.session_handle == SESSION_INVALID_HANDLE) + { + if (!ps->http_establishing) + hcpc_session_free (ps); + } } + + clib_spinlock_unlock_if_init (&hcpc_main.sessions_lock); } static void @@ -219,6 +505,13 @@ hcpc_http_connection_closed () hcpc_listener_t *l; hcpc_session_t *ps; + pool_foreach (ps, hcpcm->sessions) + { + ps->state = HCPC_SESSION_CLOSED; + ps->intercept_diconnected = 1; + ps->http_disconnected = 1; + } + pool_foreach (l, hcpcm->listeners) { if (l->session_handle != SESSION_INVALID_HANDLE) @@ -228,11 +521,6 @@ hcpc_http_connection_closed () vnet_unlisten (&a); } } - - pool_foreach (ps, hcpcm->sessions) - { - ps->state = HCPC_SESSION_CLOSED; - } } static void @@ -241,37 +529,124 @@ hcpc_close_session (session_t *s, u8 is_http) hcpc_main_t *hcpcm = &hcpc_main; hcpc_session_t *ps; - HCPC_DBG ("session %u (is http %u)", s->opaque, is_http); + HCPC_DBG ("session [%u] (is http %u)", s->opaque, is_http); + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); ps = hcpc_session_get (s->opaque); ASSERT (ps); - ps->state = HCPC_SESSION_CLOSED; if (is_http) { /* http connection went down */ if (ps->flags & HCPC_SESSION_F_IS_PARENT) { + ps->state = HCPC_SESSION_CLOSED; hcpcm->http_connection_handle = SESSION_INVALID_HANDLE; hcpc_http_connection_closed (); + clib_spinlock_unlock_if_init (&hcpc_main.sessions_lock); return; } - hcpc_session_close_http (ps); - if (!(ps->flags & HCPC_SESSION_F_LISTENER_DISCONNECTED)) + if (ps->flags & HCPC_SESSION_F_IS_UDP) { - ASSERT (ps->http_session_handle != SESSION_INVALID_HANDLE); - hcpc_session_close_listener (ps); + /* udp don't have half-close */ + ps->state = HCPC_SESSION_CLOSED; + hcpc_session_close_http (ps); + if (!(ps->intercept_diconnected)) + { + ASSERT (ps->http.session_handle != SESSION_INVALID_HANDLE); + hcpc_session_close_intercept (ps); + } + } + else + { + if (ps->intercept_diconnected) + { + /* intercept already disconnect, we can close http immediately */ + hcpc_session_close_http (ps); + } + else + { + ASSERT (ps->http.session_handle != SESSION_INVALID_HANDLE); + if (ps->state == HCPC_SESSION_CLOSING) + { + /* both sides start closing on same time from different + * threads, so http don't receive shutdown evt */ + if (ps->flags & HCPC_SESSION_F_HTTP_SHUTDOWN) + { + ps->flags &= ~HCPC_SESSION_F_HTTP_SHUTDOWN; + hcpc_session_close_http (ps); + } + /* intercept start closing first, confirm close */ + ps->state = HCPC_SESSION_CLOSED; + hcpc_session_close_intercept (ps); + } + else + { + /* shutdown intercept first, http will be closed on intercept + * close event */ + ASSERT (ps->state == HCPC_SESSION_ESTABLISHED); + ps->state = HCPC_SESSION_CLOSING; + hcpc_session_shutdown_intercept (ps); + } + } } } else { - hcpc_session_close_listener (ps); - if (!(ps->flags & HCPC_SESSION_F_HTTP_DISCONNECTED)) + if (ps->flags & HCPC_SESSION_F_IS_UDP) + { + /* udp don't have half-close */ + ps->state = HCPC_SESSION_CLOSED; + hcpc_session_close_intercept (ps); + if (!(ps->http_disconnected) && !(ps->http_establishing)) + { + if (ps->http.session_handle != SESSION_INVALID_HANDLE) + hcpc_session_close_http (ps); + ps->http_disconnected = 1; + } + } + else { - if (ps->http_session_handle != SESSION_INVALID_HANDLE) - hcpc_session_close_http (ps); - ps->flags |= HCPC_SESSION_F_HTTP_DISCONNECTED; + if (ps->http_disconnected) + { + /* http already disconnect, we can close intercept immediately */ + hcpc_session_close_intercept (ps); + } + else + { + if (ps->state == HCPC_SESSION_CLOSING) + { + /* both sides start closing on same time from different + * threads, so intercept don't receive shutdown evt */ + if (ps->flags & HCPC_SESSION_F_INTERCEPT_SHUTDOWN) + { + ps->flags &= ~HCPC_SESSION_F_INTERCEPT_SHUTDOWN; + hcpc_session_close_intercept (ps); + } + /* http start closing first, confirm close */ + ps->state = HCPC_SESSION_CLOSED; + hcpc_session_close_http (ps); + } + else + { + if (ps->http_establishing) + { + /* http is still connecting */ + hcpc_session_close_intercept (ps); + } + else + { + /* shutdown http first, intercept will be closed on http + * close event, unless it is already closed */ + ps->state = HCPC_SESSION_CLOSING; + if (ps->http.session_handle != SESSION_INVALID_HANDLE) + hcpc_session_shutdown_http (ps); + } + } + } } } + + clib_spinlock_unlock_if_init (&hcpc_main.sessions_lock); } static void @@ -294,9 +669,26 @@ hcpc_listen (hcpc_listener_t *l) l->session_handle = a->handle; HCPC_DBG ("listener started %U:%u", format_ip46_address, &l->sep.ip, l->sep.is_ip4, clib_net_to_host_u16 (l->sep.port)); + + /* if listen all (wildcard ip and port) enable it in hsi */ + if (l->sep.port == 0) + { + if (l->sep.is_ip4) + { + if (l->sep.ip.ip4.as_u32 != 0) + return; + hcpcm->intercept_proto_fn (l->sep.transport_proto, 1); + } + else + { + if (!(l->sep.ip.ip6.as_u64[0] == 0 || l->sep.ip.ip6.as_u64[1] == 0)) + return; + hcpcm->intercept_proto_fn (l->sep.transport_proto, 0); + } + } } -static void +void hcpc_start_listen () { hcpc_main_t *hcpcm = &hcpc_main; @@ -308,11 +700,66 @@ hcpc_start_listen () } } -static void +#define HCPC_ARC_IP4 "ip4-unicast" +#define HCPC_ARC_IP6 "ip6-unicast" +#define HCPC_NODE_IP4 "hsi4-in" +#define HCPC_NODE_IP6 "hsi6-in" + +static clib_error_t * +hcpc_enable_hsi (u8 is_ip4) +{ + hcpc_main_t *hcpcm = &hcpc_main; + vnet_feature_registration_t *reg; + clib_error_t *err = 0; + int rv; + + if (is_ip4) + { + if (hcpcm->hsi4_enabled) + return 0; + reg = vnet_get_feature_reg (HCPC_ARC_IP4, HCPC_NODE_IP4); + } + else + { + if (hcpcm->hsi6_enabled) + return 0; + reg = vnet_get_feature_reg (HCPC_ARC_IP6, HCPC_NODE_IP6); + } + if (reg == 0) + return clib_error_return (0, "hsi plugin not loaded"); + + if (reg->enable_disable_cb) + { + if ((err = reg->enable_disable_cb (hcpcm->sw_if_index, 1))) + return err; + } + + if (is_ip4) + rv = vnet_feature_enable_disable (HCPC_ARC_IP4, HCPC_NODE_IP4, + hcpcm->sw_if_index, 1, 0, 0); + else + rv = vnet_feature_enable_disable (HCPC_ARC_IP6, HCPC_NODE_IP6, + hcpcm->sw_if_index, 1, 0, 0); + if (rv) + return clib_error_return (0, "vnet feature enable failed (rv=%d)", rv); + + if (is_ip4) + hcpcm->hsi4_enabled = 1; + else + hcpcm->hsi6_enabled = 1; + return 0; +} + +static clib_error_t * hcpc_listener_add (hcpc_listener_t *l_cfg) { hcpc_main_t *hcpcm = &hcpc_main; hcpc_listener_t *l; + clib_error_t *err = 0; + + err = hcpc_enable_hsi (l_cfg->sep.is_ip4); + if (err) + return err; pool_get (hcpcm->listeners, l); *l = *l_cfg; @@ -321,6 +768,8 @@ hcpc_listener_add (hcpc_listener_t *l_cfg) if (hcpcm->http_connection_handle != SESSION_INVALID_HANDLE) hcpc_listen (l); + + return 0; } static int @@ -356,11 +805,10 @@ hcpc_listener_del (hcpc_listener_t *l_cfg) } static void -hcpc_connect_http_stream_rpc (void *rpc_args) +hcpc_open_http_stream (u32 session_index) { hcpc_main_t *hcpcm = &hcpc_main; vnet_connect_args_t _a, *a = &_a; - u32 session_index = pointer_to_uword (rpc_args); session_error_t rv; clib_memset (a, 0, sizeof (*a)); @@ -375,9 +823,27 @@ hcpc_connect_http_stream_rpc (void *rpc_args) clib_warning ("connect returned: %U", format_session_error, rv); } +static void +hcpc_connect_http_stream_rpc (void *rpc_args) +{ + u32 session_index = pointer_to_uword (rpc_args); + + hcpc_open_http_stream (session_index); +} + static void hcpc_connect_http_stream (u32 session_index) { + u32 connects_thread = transport_cl_thread (), thread_index; + + thread_index = vlib_get_thread_index (); + + if (thread_index == connects_thread) + { + hcpc_open_http_stream (session_index); + return; + } + session_send_rpc_evt_to_thread_force ( transport_cl_thread (), hcpc_connect_http_stream_rpc, uword_to_pointer (session_index, void *)); @@ -445,8 +911,10 @@ hcpc_write_http_connect_udp_req (svm_fifo_t *f, transport_connection_t *tc) target = format (0, "/.well-known/masque/udp/[%U]/%u/", format_ip6_address, &tc->lcl_ip.ip6, clib_net_to_host_u16 (tc->lcl_port)); - HCPC_DBG ("opening UDP tunnel to: %U:%u", format_ip46_address, &tc->lcl_ip, - tc->is_ip4, clib_net_to_host_u16 (tc->lcl_port)); + HCPC_DBG ("opening UDP tunnel %U:%u->%U:%u", format_ip46_address, + &tc->rmt_ip, tc->is_ip4, clib_net_to_host_u16 (tc->rmt_port), + format_ip46_address, &tc->lcl_ip, tc->is_ip4, + clib_net_to_host_u16 (tc->lcl_port)); msg.type = HTTP_MSG_REQUEST; msg.method_type = HTTP_REQ_CONNECT; @@ -470,7 +938,6 @@ hcpc_write_http_connect_udp_req (svm_fifo_t *f, transport_connection_t *tc) clib_warning ("enqueue failed: %d", rv); return -1; } - clib_warning ("%d", rv); return 0; } @@ -489,7 +956,10 @@ hcpc_write_http_connect_req (svm_fifo_t *f, transport_connection_t *tc) target = format (0, "[%U]:%u", format_ip6_address, &tc->lcl_ip.ip6, clib_net_to_host_u16 (tc->lcl_port)); - HCPC_DBG ("opening TCP tunnel to: %v", target); + HCPC_DBG ("opening TCP tunnel %U:%u->%U:%u", format_ip46_address, + &tc->rmt_ip, tc->is_ip4, clib_net_to_host_u16 (tc->rmt_port), + format_ip46_address, &tc->lcl_ip, tc->is_ip4, + clib_net_to_host_u16 (tc->lcl_port)); msg.type = HTTP_MSG_REQUEST; msg.method_type = HTTP_REQ_CONNECT; @@ -515,8 +985,9 @@ hcpc_write_http_connect_req (svm_fifo_t *f, transport_connection_t *tc) } static int -hcpc_read_http_connect_resp (session_t *s, hcpc_session_t *ps) +hcpc_read_http_connect_resp (session_t *s) { + hcpc_main_t *hcpcm = &hcpc_main; http_msg_t msg; int rv; @@ -528,10 +999,12 @@ hcpc_read_http_connect_resp (session_t *s, hcpc_session_t *ps) HCPC_DBG ("response: %U %U", format_http_version, http_session_get_version (s), format_http_status_code, msg.code); if (http_status_code_str[msg.code][0] != '2') - return -1; - - ps->state = HCPC_SESSION_ESTABLISHED; + { + hcpc_worker_stats_inc (vlib_get_thread_index (), target_unreachable, 1); + return -1; + } + hcpc_worker_stats_inc (vlib_get_thread_index (), tunnels_opened, 1); return 0; } @@ -540,90 +1013,265 @@ hcpc_read_http_connect_resp (session_t *s, hcpc_session_t *ps) /***************************/ static int -http_session_connected_callback (u32 app_index, u32 session_index, - session_t *s, session_error_t err) +hcpc_http_session_connected_callback (u32 app_index, u32 session_index, + session_t *s, session_error_t err) { hcpc_main_t *hcpcm = &hcpc_main; hcpc_session_t *ps; if (err) { - clib_warning ("connect error: %U", format_session_error, err); - return -1; + clib_warning ("session %u connect error: %U", session_index, + format_session_error, err); + + /* connect to http proxy server failed */ + if (hcpcm->http_connection_handle == SESSION_INVALID_HANDLE) + return 0; + + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); + ps = hcpc_session_get (session_index); + ASSERT (ps); + ps->state = HCPC_SESSION_CLOSED; + ps->http_disconnected = 1; + ps->http_establishing = 0; + if (!ps->intercept_diconnected) + { + if (ps->intercept.session_handle != SESSION_INVALID_HANDLE) + { + session_reset ( + session_get_from_handle (ps->intercept.session_handle)); + ps->intercept_diconnected = 1; + } + else + hcpc_delete_session (s, 1); + } + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); + if (err == SESSION_E_MAX_STREAMS_HIT) + hcpc_worker_stats_inc (vlib_get_thread_index (), max_streams_hit, 1); + return 0; } if (hcpcm->http_connection_handle == SESSION_INVALID_HANDLE) { HCPC_DBG ("parent session connected"); + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); ps = hcpc_session_alloc (); - ps->http_session_handle = session_handle (s); + ps->http.session_handle = session_handle (s); + ps->http.rx_fifo = s->rx_fifo; + ps->http.tx_fifo = s->tx_fifo; ps->flags |= HCPC_SESSION_F_IS_PARENT; + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); s->opaque = ps->session_index; hcpcm->http_connection_handle = session_handle (s); - hcpc_start_listen (); + vlib_process_signal_event_mt (vlib_get_main (), + hcpcm->process_node_index, + HCPC_EVENT_PROXY_CONNECTED, 0); return 0; } - HCPC_DBG ("stream for session %u opened", session_index); + HCPC_DBG ("stream for session [%u] opened", session_index); + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); ps = hcpc_session_get (session_index); - if (!ps) - return -1; + ASSERT (ps); + ps->http.session_handle = session_handle (s); + ps->http.rx_fifo = s->rx_fifo; + ps->http.tx_fifo = s->tx_fifo; + + /* listener session was already closed */ + if (ps->intercept_diconnected) + { + hcpc_worker_stats_inc (s->thread_index, + client_closed_before_stream_opened, 1); + session_reset (s); + ps->http_establishing = 0; + ps->http_disconnected = 1; + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); + return -1; + } - ps->http_session_handle = session_handle (s); + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); - if (svm_fifo_set_event (s->tx_fifo)) - session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX); + if (svm_fifo_max_dequeue (s->tx_fifo)) + if (svm_fifo_set_event (s->tx_fifo)) + session_program_tx_io_evt (s->handle, SESSION_IO_EVT_TX); return 0; } static void -http_session_disconnect_callback (session_t *s) +hcpc_http_session_disconnect_callback (session_t *s) { hcpc_close_session (s, 1); } static void -http_session_transport_closed_callback (session_t *s) +hcpc_http_session_transport_closed_callback (session_t *s) { - clib_warning ("transport closed"); + HCPC_DBG ("session [%u]", s->opaque); } static void -http_session_reset_callback (session_t *s) +hcpc_http_session_reset_callback (session_t *s) { - hcpc_close_session (s, 1); + hcpc_main_t *hcpcm = &hcpc_main; + hcpc_session_t *ps; + + HCPC_DBG ("session [%u]", s->opaque); + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); + ps = hcpc_session_get (s->opaque); + ASSERT (ps); + ps->state = HCPC_SESSION_CLOSED; + hcpc_session_close_http (ps); + if (!ps->intercept_diconnected && + ps->intercept.session_handle != SESSION_INVALID_HANDLE) + { + session_reset (session_get_from_handle (ps->intercept.session_handle)); + ps->intercept_diconnected = 1; + } + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); + hcpc_worker_stats_inc (s->thread_index, tunnels_reset_by_target, 1); +} + +static void +hcpc_intercept_rollback_cwnd (session_handle_t intercept_handle, u32 cwnd) +{ + session_t *s; + transport_connection_t *tc; + tcp_connection_t *tcp_conn; + + s = session_get_from_handle_if_valid (intercept_handle); + if (!s) + return; + ASSERT (TRANSPORT_PROTO_TCP == session_get_transport_proto (s)); + tc = session_get_transport (s); + tcp_conn = (tcp_connection_t *) tc; + tcp_conn->cwnd = cwnd; + transport_connection_reschedule (tc); +} + +static void +hcpc_intercept_rollback_cwnd_rpc (void *arg) +{ + hcpc_main_t *hcpcm = &hcpc_main; + hcpc_session_t *ps; + + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); + ps = hcpc_session_get (pointer_to_uword (arg)); + ASSERT (ps); + hcpc_intercept_rollback_cwnd (ps->intercept.session_handle, ps->opaque); + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); } static int -http_rx_callback (session_t *s) +hcpc_http_rx_callback (session_t *s) { + hcpc_main_t *hcpcm = &hcpc_main; hcpc_session_t *ps; svm_fifo_t *listener_tx_fifo; + session_handle_t sh; - HCPC_DBG ("session %u", s->opaque); + HCPC_DBG ("session [%u]", s->opaque); + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); ps = hcpc_session_get (s->opaque); ASSERT (ps); - if (ps->state == HCPC_SESSION_CLOSED) - return -1; - else if (ps->state == HCPC_SESSION_CONNECTING) - return hcpc_read_http_connect_resp (s, ps); + if (ps->http_establishing) + { + ps->http_establishing = 0; + if (ps->intercept_diconnected || hcpc_read_http_connect_resp (s)) + { + ps->state = HCPC_SESSION_CLOSED; + session_reset (s); + ps->http_disconnected = 1; + if (!ps->intercept_diconnected) + { + if (ps->intercept.session_handle != SESSION_INVALID_HANDLE) + session_reset ( + session_get_from_handle (ps->intercept.session_handle)); + ps->intercept_diconnected = 1; + } + else + hcpc_worker_stats_inc (s->thread_index, + client_closed_before_tunnel_connected, 1); + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); + return 0; + } + ps->state = HCPC_SESSION_ESTABLISHED; + if (ps->flags & HCPC_SESSION_F_IS_UDP) + hcpc_timer_start (ps); + else + { + /* rollback cwnd and reschedule tcp intercept transport */ + if (s->thread_index != + session_thread_from_handle (ps->intercept.session_handle)) + session_send_rpc_evt_to_thread ( + session_thread_from_handle (ps->intercept.session_handle), + hcpc_intercept_rollback_cwnd_rpc, + uword_to_pointer (ps->session_index, void *)); + else + hcpc_intercept_rollback_cwnd (ps->intercept.session_handle, + ps->opaque); + } + + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); + + return 0; + } + if (ps->intercept_diconnected) + { + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); + return -1; + } + hcpc_timer_update (ps); + sh = ps->intercept.session_handle; + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); - /* send event for listener tx fifo */ + /* send event for intercept tx fifo */ listener_tx_fifo = s->rx_fifo; + ASSERT (svm_fifo_max_dequeue (listener_tx_fifo)); if (svm_fifo_set_event (listener_tx_fifo)) - session_program_tx_io_evt (listener_tx_fifo->vpp_sh, SESSION_IO_EVT_TX); + session_program_tx_io_evt (sh, SESSION_IO_EVT_TX); return 0; } +static void +hcpc_intercept_force_ack (session_handle_t intercept_handle) +{ + transport_connection_t *tc; + session_t *s; + + s = session_get_from_handle_if_valid (intercept_handle); + if (!s || session_get_transport_proto (s) != TRANSPORT_PROTO_TCP) + return; + + tc = session_get_transport (s); + tcp_send_ack ((tcp_connection_t *) tc); +} + +static void +hcpc_intercept_force_ack_rpc (void *arg) +{ + hcpc_main_t *hcpcm = &hcpc_main; + hcpc_session_t *ps; + + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); + ps = hcpc_session_get (pointer_to_uword (arg)); + ASSERT (ps); + hcpc_intercept_force_ack (ps->intercept.session_handle); + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); +} + static int -http_tx_callback (session_t *s) +hcpc_http_tx_callback (session_t *s) { + hcpc_main_t *hcpcm = &hcpc_main; hcpc_session_t *ps; u32 min_free; + hcpc_session_state_t state; + u32 ps_index; + session_handle_t intercept_sh; - HCPC_DBG ("session %u", s->opaque); + HCPC_DBG ("session [%u]", s->opaque); min_free = clib_min (svm_fifo_size (s->tx_fifo) >> 3, 128 << 10); if (svm_fifo_max_enqueue (s->tx_fifo) < min_free) { @@ -631,26 +1279,35 @@ http_tx_callback (session_t *s) return 0; } + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); ps = hcpc_session_get (s->opaque); ASSERT (ps); + state = ps->state; + if (ps->intercept_diconnected) + { + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); + return -1; + } + ps_index = ps->session_index; + intercept_sh = ps->intercept.session_handle; + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); - if (ps->state < HCPC_SESSION_ESTABLISHED) + if (state < HCPC_SESSION_ESTABLISHED) return 0; - if (ps->state == HCPC_SESSION_CLOSED) - return -1; - /* force ack on listener side to update rcv wnd */ - if (ps->flags & HCPC_SESSION_F_IS_UDP) - return 0; - tcp_send_ack ((tcp_connection_t *) session_get_transport ( - session_get_from_handle (ps->listener_session_handle))); + if (s->thread_index != session_thread_from_handle (intercept_sh)) + session_send_rpc_evt_to_thread (session_thread_from_handle (intercept_sh), + hcpc_intercept_force_ack_rpc, + uword_to_pointer (ps_index, void *)); + else + hcpc_intercept_force_ack (intercept_sh); return 0; } static void -http_session_cleanup_callback (session_t *s, session_cleanup_ntf_t ntf) +hcpc_http_session_cleanup_callback (session_t *s, session_cleanup_ntf_t ntf) { if (ntf == SESSION_CLEANUP_TRANSPORT) return; @@ -659,7 +1316,7 @@ http_session_cleanup_callback (session_t *s, session_cleanup_ntf_t ntf) } static int -http_alloc_session_fifos (session_t *s) +hcpc_http_alloc_session_fifos (session_t *s) { hcpc_main_t *hcpcm = &hcpc_main; hcpc_session_t *ps; @@ -667,17 +1324,24 @@ http_alloc_session_fifos (session_t *s) svm_fifo_t *rx_fifo = 0, *tx_fifo = 0; int rv; - HCPC_DBG ("session %u alloc fifos", s->opaque); + HCPC_DBG ("session [%u] alloc fifos", s->opaque); + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); + ps = hcpc_session_get (s->opaque); + /* http connection session doesn't have listener */ if (!ps) { HCPC_DBG ("http connection session"); + /* http connection is not mapped to any listener, alloc session fifos */ app_worker_t *app_wrk = app_worker_get (hcpcm->http_app_index); segment_manager_t *sm = app_worker_get_connect_segment_manager (app_wrk); if ((rv = segment_manager_alloc_session_fifos (sm, s->thread_index, &rx_fifo, &tx_fifo))) - return rv; + { + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); + return rv; + } rx_fifo->shr->master_session_index = s->session_index; rx_fifo->vpp_sh = s->handle; s->flags &= ~SESSION_F_PROXY; @@ -685,29 +1349,40 @@ http_alloc_session_fifos (session_t *s) else { HCPC_DBG ("http stream session"); - ls = session_get_from_handle (ps->listener_session_handle); + if (ps->intercept_diconnected) + { + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); + return SESSION_E_ALLOC; + } + ls = session_get_from_handle (ps->intercept.session_handle); tx_fifo = ls->rx_fifo; rx_fifo = ls->tx_fifo; + ASSERT (rx_fifo->refcnt == 1); + ASSERT (tx_fifo->refcnt == 1); rx_fifo->refcnt++; tx_fifo->refcnt++; } tx_fifo->shr->master_session_index = s->session_index; tx_fifo->vpp_sh = s->handle; + + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); + s->rx_fifo = rx_fifo; s->tx_fifo = tx_fifo; return 0; } static session_cb_vft_t http_session_cb_vft = { - .session_connected_callback = http_session_connected_callback, - .session_disconnect_callback = http_session_disconnect_callback, - .session_transport_closed_callback = http_session_transport_closed_callback, - .session_reset_callback = http_session_reset_callback, - .builtin_app_rx_callback = http_rx_callback, - .builtin_app_tx_callback = http_tx_callback, - .session_cleanup_callback = http_session_cleanup_callback, - .proxy_alloc_session_fifos = http_alloc_session_fifos, + .session_connected_callback = hcpc_http_session_connected_callback, + .session_disconnect_callback = hcpc_http_session_disconnect_callback, + .session_transport_closed_callback = + hcpc_http_session_transport_closed_callback, + .session_reset_callback = hcpc_http_session_reset_callback, + .builtin_app_rx_callback = hcpc_http_rx_callback, + .builtin_app_tx_callback = hcpc_http_tx_callback, + .session_cleanup_callback = hcpc_http_session_cleanup_callback, + .proxy_alloc_session_fifos = hcpc_http_alloc_session_fifos, }; /*******************************/ @@ -715,61 +1390,113 @@ static session_cb_vft_t http_session_cb_vft = { /*******************************/ static int -listener_accept_callback (session_t *s) +hcpc_intercept_accept_callback (session_t *s) { hcpc_session_t *ps; hcpc_main_t *hcpcm = &hcpc_main; + tcp_connection_t *tcp_conn; if (hcpcm->http_connection_handle == SESSION_INVALID_HANDLE) return -1; + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); + ps = hcpc_session_alloc (); ps->state = HCPC_SESSION_CONNECTING; - ps->listener_session_handle = session_handle (s); + ps->intercept.session_handle = session_handle (s); + ps->intercept.rx_fifo = s->rx_fifo; + ps->intercept.tx_fifo = s->tx_fifo; + ps->http_establishing = 1; if (session_get_transport_proto (s) == TRANSPORT_PROTO_UDP) ps->flags |= HCPC_SESSION_F_IS_UDP; + else + { + /* set cwnd to zero temporarily, otherwise tcp on intercept side can + * dequeue http response msg when doing ack */ + tcp_conn = (tcp_connection_t *) session_get_transport (s); + ps->opaque = tcp_conn->cwnd; + tcp_conn->cwnd = 0; + } + + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); + s->opaque = ps->session_index; s->session_state = SESSION_STATE_READY; - HCPC_DBG ("going to open stream for new session %u", ps->session_index); + HCPC_DBG ("going to open stream for new session [%u]", ps->session_index); hcpc_connect_http_stream (ps->session_index); return 0; } static void -listener_session_disconnect_callback (session_t *s) +hcpc_intercept_session_disconnect_callback (session_t *s) { hcpc_close_session (s, 0); } static void -listener_session_reset_callback (session_t *s) +hcpc_intercept_session_transport_closed_callback (session_t *s) { - hcpc_close_session (s, 0); + HCPC_DBG ("session [%u]", s->opaque); +} + +static void +hcpc_intercept_session_reset_callback (session_t *s) +{ + hcpc_main_t *hcpcm = &hcpc_main; + hcpc_session_t *ps; + + HCPC_DBG ("session [%u]", s->opaque); + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); + ps = hcpc_session_get (s->opaque); + ASSERT (ps); + ps->state = HCPC_SESSION_CLOSED; + hcpc_session_close_intercept (ps); + /* we want stream reset here */ + if (!ps->http_disconnected && + ps->http.session_handle != SESSION_INVALID_HANDLE) + { + session_reset (session_get_from_handle (ps->http.session_handle)); + ps->http_disconnected = 1; + } + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); + hcpc_worker_stats_inc (s->thread_index, tunnels_reset_by_client, 1); } static int -listener_rx_callback (session_t *s) +hcpc_intercept_rx_callback (session_t *s) { + hcpc_main_t *hcpcm = &hcpc_main; hcpc_session_t *ps; svm_fifo_t *http_tx_fifo; + hcpc_session_state_t state; + session_handle_t sh; - HCPC_DBG ("session %u", s->opaque); + HCPC_DBG ("session [%u]", s->opaque); + if (s->flags & SESSION_F_APP_CLOSED) + return 0; + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); ps = hcpc_session_get (s->opaque); - if (!ps) - return -1; + ASSERT (ps); + state = ps->state; + sh = ps->http.session_handle; + if (ps->http_disconnected) + { + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); + return -1; + } + hcpc_timer_update (ps); + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); - if (ps->state < HCPC_SESSION_ESTABLISHED) + if (state < HCPC_SESSION_ESTABLISHED) return 0; - if (ps->state == HCPC_SESSION_CLOSED) - return -1; - /* send event for http tx fifo */ http_tx_fifo = s->rx_fifo; - if (svm_fifo_set_event (http_tx_fifo)) - session_program_tx_io_evt (ps->http_session_handle, SESSION_IO_EVT_TX); + if (svm_fifo_max_dequeue (http_tx_fifo)) + if (svm_fifo_set_event (http_tx_fifo)) + session_program_tx_io_evt (sh, SESSION_IO_EVT_TX); if (svm_fifo_max_enqueue (http_tx_fifo) <= TCP_MSS) svm_fifo_add_want_deq_ntf (http_tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); @@ -778,28 +1505,37 @@ listener_rx_callback (session_t *s) } static int -listener_tx_callback (session_t *s) +hcpc_intercept_tx_callback (session_t *s) { + hcpc_main_t *hcpcm = &hcpc_main; hcpc_session_t *ps; + hcpc_session_state_t state; + session_handle_t sh; - HCPC_DBG ("session %u", s->opaque); + HCPC_DBG ("session [%u]", s->opaque); + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); ps = hcpc_session_get (s->opaque); ASSERT (ps); + state = ps->state; + sh = ps->http.session_handle; + if (ps->http_disconnected) + { + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); + return -1; + } + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); - if (ps->state < HCPC_SESSION_ESTABLISHED) + if (state < HCPC_SESSION_ESTABLISHED) return 0; - if (ps->state == HCPC_SESSION_CLOSED) - return -1; - /* pass notification to http transport */ - session_program_transport_io_evt (ps->http_session_handle, - SESSION_IO_EVT_RX); + session_program_transport_io_evt (sh, SESSION_IO_EVT_RX); return 0; } static void -listener_session_cleanup_callback (session_t *s, session_cleanup_ntf_t ntf) +hcpc_intercept_session_cleanup_callback (session_t *s, + session_cleanup_ntf_t ntf) { if (ntf == SESSION_CLEANUP_TRANSPORT) return; @@ -808,13 +1544,13 @@ listener_session_cleanup_callback (session_t *s, session_cleanup_ntf_t ntf) } static int -listener_add_segment_callback (u32 client_index, u64 segment_handle) +hcpc_intercept_add_segment_callback (u32 client_index, u64 segment_handle) { return 0; } static int -listener_write_early_data (session_t *s) +hcpc_intercept_write_early_data (session_t *s) { transport_proto_t tp; transport_connection_t *tc; @@ -843,14 +1579,16 @@ listener_write_early_data (session_t *s) } static session_cb_vft_t listener_session_cb_vft = { - .session_accept_callback = listener_accept_callback, - .session_disconnect_callback = listener_session_disconnect_callback, - .session_reset_callback = listener_session_reset_callback, - .builtin_app_rx_callback = listener_rx_callback, - .builtin_app_tx_callback = listener_tx_callback, - .session_cleanup_callback = listener_session_cleanup_callback, - .add_segment_callback = listener_add_segment_callback, - .proxy_write_early_data = listener_write_early_data, + .session_accept_callback = hcpc_intercept_accept_callback, + .session_disconnect_callback = hcpc_intercept_session_disconnect_callback, + .session_transport_closed_callback = + hcpc_intercept_session_transport_closed_callback, + .session_reset_callback = hcpc_intercept_session_reset_callback, + .builtin_app_rx_callback = hcpc_intercept_rx_callback, + .builtin_app_tx_callback = hcpc_intercept_tx_callback, + .session_cleanup_callback = hcpc_intercept_session_cleanup_callback, + .add_segment_callback = hcpc_intercept_add_segment_callback, + .proxy_write_early_data = hcpc_intercept_write_early_data, }; static clib_error_t * @@ -897,7 +1635,7 @@ hcpc_attach_http_client () } static clib_error_t * -hcpc_attach_listener () +hcpc_attach_intercept_listener () { hcpc_main_t *hcpcm = &hcpc_main; vnet_app_attach_args_t _a, *a = &_a; @@ -929,53 +1667,93 @@ hcpc_attach_listener () return 0; } -#define HCPC_ARC_IP4 "ip4-unicast" -#define HCPC_ARC_IP6 "ip6-unicast" -#define HCPC_NODE_IP4 "hsi4-in" -#define HCPC_NODE_IP6 "hsi6-in" - -static clib_error_t * -hcpc_enable_hsi (u8 is_ip4) +static uword +hcpc_event_process (vlib_main_t *vm, vlib_node_runtime_t *rt, vlib_frame_t *f) { hcpc_main_t *hcpcm = &hcpc_main; - vnet_feature_registration_t *reg; - clib_error_t *err = 0; - int rv; + f64 now, timeout = 1.0; + uword *event_data = 0; + uword event_type; - if (is_ip4) + while (1) { - if (hcpcm->hsi4_enabled) - return 0; - reg = vnet_get_feature_reg (HCPC_ARC_IP4, HCPC_NODE_IP4); + vlib_process_wait_for_event_or_clock (vm, timeout); + event_type = vlib_process_get_events (vm, (uword **) &event_data); + switch (event_type) + { + case HCPC_EVENT_PROXY_CONNECTED: + vlib_worker_thread_barrier_sync (vm); + hcpc_start_listen (); + vlib_worker_thread_barrier_release (vm); + break; + case ~0: + /* TODO: proxy connection keep-alive */ + /* expire timers */ + now = vlib_time_now (vm); + clib_spinlock_lock_if_init (&hcpcm->tw_lock); + tw_timer_expire_timers_2t_1w_2048sl (&hcpcm->tw, now); + clib_spinlock_unlock_if_init (&hcpcm->tw_lock); + break; + /* TODO: auto proxy reconnect */ + default: + HCPC_DBG ("unknown event %u", event_type); + break; + } + vec_reset_length (event_data); } - else + + return 0; +} + +clib_error_t * +hcpc_enable (vlib_main_t *vm) +{ + vlib_thread_main_t *vtm = vlib_get_thread_main (); + hcpc_main_t *hcpcm = &hcpc_main; + hcpc_worker_t *wrk; + clib_error_t *err = 0; + u32 num_threads; + + hcpcm->intercept_proto_fn = + vlib_get_plugin_symbol ("hsi_plugin.so", "hsi_intercept_proto"); + if (hcpcm->intercept_proto_fn == 0) + return clib_error_return (0, "hsi_plugin.so not loaded"); + + session_enable_disable_args_t args = { .is_en = 1, + .rt_engine_type = + RT_BACKEND_ENGINE_RULE_TABLE }; + vlib_worker_thread_barrier_sync (vm); + vnet_session_enable_disable (vm, &args); + vlib_worker_thread_barrier_release (vm); + + if (vlib_num_workers ()) { - if (hcpcm->hsi6_enabled) - return 0; - reg = vnet_get_feature_reg (HCPC_ARC_IP6, HCPC_NODE_IP6); + clib_spinlock_init (&hcpcm->sessions_lock); + clib_spinlock_init (&hcpcm->tw_lock); } - if (reg == 0) - return clib_error_return (0, "hsi plugin not loaded"); - if (reg->enable_disable_cb) + num_threads = 1 + vtm->n_threads; + vec_validate (hcpcm->workers, num_threads - 1); + vec_foreach (wrk, hcpcm->workers) { - if ((err = reg->enable_disable_cb (hcpcm->sw_if_index, 1))) - return err; + clib_memset (&wrk->stats, 0, sizeof (wrk->stats)); } - if (is_ip4) - rv = vnet_feature_enable_disable (HCPC_ARC_IP4, HCPC_NODE_IP4, - hcpcm->sw_if_index, 1, 0, 0); - else - rv = vnet_feature_enable_disable (HCPC_ARC_IP6, HCPC_NODE_IP6, - hcpcm->sw_if_index, 1, 0, 0); - if (rv) - return clib_error_return (0, "vnet feature enable failed (rv=%d)", rv); + err = hcpc_attach_http_client (); + if (err) + return err; + + err = hcpc_attach_intercept_listener (); + if (err) + return err; + + if (hcpcm->process_node_index == 0) + hcpcm->process_node_index = + vlib_process_create (vm, "hcpc-event-process", hcpc_event_process, 16); + + tw_timer_wheel_init_2t_1w_2048sl (&hcpcm->tw, hcpc_timer_expired_cb, 1.0, + ~0); - if (is_ip4) - hcpcm->hsi4_enabled = 1; - else - hcpcm->hsi6_enabled = 1; return 0; } @@ -996,6 +1774,9 @@ hcpc_create_command_fn (vlib_main_t *vm, unformat_input_t *input, u64 mem_size; vnet_main_t *vnm = vnet_get_main (); + if (hcpcm->http_app_index != APP_INVALID_INDEX) + return clib_error_return (0, "http connect proxy client already enabled"); + if (!unformat_user (input, unformat_line_input, line_input)) return clib_error_return (0, "expected arguments"); @@ -1017,6 +1798,9 @@ hcpc_create_command_fn (vlib_main_t *vm, unformat_input_t *input, else if (unformat (line_input, "interface %U", unformat_vnet_sw_interface, vnm, &hcpcm->sw_if_index)) ; + else if (unformat (line_input, "udp-idle-timeout %u", + &hcpcm->udp_idle_timeout)) + ; else { err = clib_error_return (0, "unknown input `%U'", @@ -1055,26 +1839,14 @@ hcpc_create_command_fn (vlib_main_t *vm, unformat_input_t *input, goto done; } - err = hcpc_enable_hsi (l->sep.is_ip4); + err = hcpc_enable (vm); if (err) goto done; - session_enable_disable_args_t args = { .is_en = 1, - .rt_engine_type = - RT_BACKEND_ENGINE_RULE_TABLE }; - vlib_worker_thread_barrier_sync (vm); - vnet_session_enable_disable (vm, &args); - vlib_worker_thread_barrier_release (vm); - - err = hcpc_attach_http_client (); + err = hcpc_listener_add (l); if (err) goto done; - err = hcpc_attach_listener (); - if (err) - goto done; - - hcpc_listener_add (l); hcpc_connect_http_connection (); hcpcm->is_init = 1; @@ -1089,7 +1861,7 @@ VLIB_CLI_COMMAND (hcpc_create_command, static) = { .path = "http connect proxy client enable", .short_help = "http connect proxy client enable server-uri \n" - "interface listener \n" + "interface listener [udp-idle-timeout ]\n" "[fifo-size ] [private-segment-size ] [prealloc-fifos ]", .function = hcpc_create_command_fn, }; @@ -1142,10 +1914,9 @@ hcpc_add_del_listener_command_fn (vlib_main_t *vm, unformat_input_t *input, if (is_add) { - err = hcpc_enable_hsi (l->sep.is_ip4); + err = hcpc_listener_add (l); if (err) goto done; - hcpc_listener_add (l); } else { @@ -1180,7 +1951,9 @@ hcpc_show_command_fn (vlib_main_t *vm, unformat_input_t *input, vlib_cli_command_t *cmd) { hcpc_main_t *hcpcm = &hcpc_main; - u8 show_listeners = 0, show_sessions = 0; + u8 show_listeners = 0, show_sessions = 0, show_stats = 0; + clib_thread_index_t ti; + hcpc_worker_t *wrk; if (!hcpcm->is_init) return clib_error_return (0, "http connect proxy client disabled"); @@ -1191,6 +1964,8 @@ hcpc_show_command_fn (vlib_main_t *vm, unformat_input_t *input, show_listeners = 1; else if (unformat (input, "sessions")) show_sessions = 1; + else if (unformat (input, "stats")) + show_stats = 1; else { return clib_error_return (0, "unknown input `%U'", @@ -1221,20 +1996,55 @@ hcpc_show_command_fn (vlib_main_t *vm, unformat_input_t *input, if (show_sessions) { hcpc_session_t *ps; + session_t *s; transport_connection_t *tc; + clib_spinlock_lock_if_init (&hcpcm->sessions_lock); pool_foreach (ps, hcpcm->sessions) { if (ps->flags & HCPC_SESSION_F_IS_PARENT) continue; - tc = session_get_transport ( - session_get_from_handle (ps->listener_session_handle)); - vlib_cli_output (vm, "session [%lu] %U %U:%u->%U:%u %U", - ps->session_index, format_transport_proto, - tc->proto, format_ip46_address, &tc->rmt_ip, - tc->is_ip4, clib_net_to_host_u16 (tc->rmt_port), - format_ip46_address, &tc->lcl_ip, tc->is_ip4, - clib_net_to_host_u16 (tc->lcl_port), - format_hcpc_session_state, ps->state); + if ((ps->state > HCPC_SESSION_ESTABLISHED) || + (ps->intercept.session_handle == SESSION_INVALID_HANDLE)) + { + vlib_cli_output (vm, "session [%lu] %U\n%U", ps->session_index, + format_hcpc_session_state, ps->state, + format_hcpc_session_vars, ps); + continue; + } + + s = session_get_from_handle (ps->intercept.session_handle); + tc = session_get_transport (s); + /* transport closed, we don't know yet */ + if (!tc) + { + vlib_cli_output ( + vm, "session [%lu] INTERCEPT-TRANSPORT-CLOSED\n%U", + ps->session_index, format_hcpc_session_vars, ps); + continue; + } + + vlib_cli_output ( + vm, "session [%lu] %U %U:%u->%U:%u %U\n%U", ps->session_index, + format_transport_proto, tc->proto, format_ip46_address, + &tc->rmt_ip, tc->is_ip4, clib_net_to_host_u16 (tc->rmt_port), + format_ip46_address, &tc->lcl_ip, tc->is_ip4, + clib_net_to_host_u16 (tc->lcl_port), format_hcpc_session_state, + ps->state, format_hcpc_session_vars, ps); + } + clib_spinlock_unlock_if_init (&hcpcm->sessions_lock); + } + + if (show_stats) + { + for (ti = 0; ti < vec_len (hcpcm->workers); ti++) + { + wrk = &hcpcm->workers[ti]; + vlib_cli_output (vm, "Thread %u:\n", ti); +#define _(name, str) \ + if (wrk->stats.name) \ + vlib_cli_output (vm, " %lu %s", wrk->stats.name, str); + foreach_hcpc_wrk_stat +#undef _ } } @@ -1243,10 +2053,40 @@ hcpc_show_command_fn (vlib_main_t *vm, unformat_input_t *input, VLIB_CLI_COMMAND (hcpc_show_command, static) = { .path = "show http connect proxy client", - .short_help = "show http connect proxy [listeners] [sessions]", + .short_help = "show http connect proxy [listeners] [sessions] [stats]", .function = hcpc_show_command_fn, }; +static clib_error_t * +hcpc_clear_stats_fn (vlib_main_t *vm, unformat_input_t *input, + vlib_cli_command_t *cmd) +{ + hcpc_main_t *hcpcm = &hcpc_main; + hcpc_worker_t *wrk; + clib_thread_index_t ti; + + if (unformat_check_input (input) != UNFORMAT_END_OF_INPUT) + return clib_error_return (0, "unknown input `%U'", format_unformat_error, + input); + + if (!hcpcm->is_init) + return clib_error_return (0, "http connect proxy client disabled"); + + for (ti = 0; ti < vec_len (hcpcm->workers); ti++) + { + wrk = &hcpcm->workers[ti]; + clib_memset (&wrk->stats, 0, sizeof (wrk->stats)); + } + + return 0; +} + +VLIB_CLI_COMMAND (hcpc_clear_stats_command, static) = { + .path = "clear http connect proxy client stats", + .short_help = "clear http connect proxy client stats", + .function = hcpc_clear_stats_fn, +}; + clib_error_t * hcpc_main_init (vlib_main_t *vm) { @@ -1261,6 +2101,7 @@ hcpc_main_init (vlib_main_t *vm) hcpcm->private_segment_size = 128 << 20; hcpcm->prealloc_fifos = 0; hcpcm->sw_if_index = ~0; + hcpcm->udp_idle_timeout = 600; vec_validate (hcpcm->capsule_proto_header_buf, 10); http_init_headers_ctx (&hcpcm->capsule_proto_header, diff --git a/test-c/hs-test/infra/netconfig.go b/test-c/hs-test/infra/netconfig.go index 082a0742611..7de9040a126 100644 --- a/test-c/hs-test/infra/netconfig.go +++ b/test-c/hs-test/infra/netconfig.go @@ -424,7 +424,7 @@ func linkSetMultiQueue(ifName string) error { return nil } -func newCommand(s []string, ns string) *exec.Cmd { +func CommandInNetns(s []string, ns string) *exec.Cmd { return appendNetns(s, ns) } diff --git a/test-c/hs-test/infra/suite_masque.go b/test-c/hs-test/infra/suite_masque.go index 3bac3bfe29a..18d6305e167 100644 --- a/test-c/hs-test/infra/suite_masque.go +++ b/test-c/hs-test/infra/suite_masque.go @@ -30,6 +30,7 @@ type MasqueSuite struct { Nginx string NginxSsl string Proxy string + Unused string } NetNamespaces struct { Client string @@ -59,6 +60,7 @@ func (s *MasqueSuite) SetupSuite() { s.Ports.Nginx = s.GeneratePort() s.Ports.NginxSsl = s.GeneratePort() s.Ports.Proxy = s.GeneratePort() + s.Ports.Unused = s.GeneratePort() s.NetNamespaces.Client = s.GetNetNamespaceByName("client-ns") s.Interfaces.Client = s.GetInterfaceByName("cln") s.Interfaces.TunnelClient = s.GetInterfaceByName("cln-tun") @@ -73,8 +75,11 @@ func (s *MasqueSuite) SetupSuite() { func (s *MasqueSuite) SetupTest() { s.HstSuite.SetupTest() + var memoryConfig Stanza + memoryConfig.NewStanza("memory").Append("main-heap-size 2G") + // vpp masque proxy client - clientVpp, err := s.Containers.VppClient.newVppInstance(s.Containers.VppClient.AllocatedCpus) + clientVpp, err := s.Containers.VppClient.newVppInstance(s.Containers.VppClient.AllocatedCpus, memoryConfig) s.AssertNotNil(clientVpp, fmt.Sprint(err)) s.AssertNil(clientVpp.Start()) idx, err := clientVpp.createAfPacket(s.Interfaces.Client, false) @@ -85,7 +90,7 @@ func (s *MasqueSuite) SetupTest() { s.AssertNotEqual(0, idx) // vpp masque proxy server - serverVpp, err := s.Containers.VppServer.newVppInstance(s.Containers.VppServer.AllocatedCpus) + serverVpp, err := s.Containers.VppServer.newVppInstance(s.Containers.VppServer.AllocatedCpus, memoryConfig) s.AssertNotNil(serverVpp, fmt.Sprint(err)) s.AssertNil(serverVpp.Start()) idx, err = serverVpp.createAfPacket(s.Interfaces.TunnelServer, false) @@ -140,18 +145,27 @@ func (s *MasqueSuite) TeardownTest() { serverVpp := s.Containers.VppServer.VppInstance if CurrentSpecReport().Failed() { s.CollectNginxLogs(s.Containers.NginxServer) + s.CollectIperfLogs(s.Containers.IperfServer) s.Log(clientVpp.Vppctl("show session verbose 2")) s.Log(clientVpp.Vppctl("show error")) - s.Log(clientVpp.Vppctl("show http connect proxy client listeners sessions")) + s.Log(clientVpp.Vppctl("show http connect proxy client listeners sessions stats")) + s.Log(clientVpp.Vppctl("show http stats")) + s.Log(clientVpp.Vppctl("show tcp stats")) s.Log(serverVpp.Vppctl("show session verbose 2")) s.Log(serverVpp.Vppctl("show error")) + s.Log(serverVpp.Vppctl("show http stats")) + s.Log(serverVpp.Vppctl("show tcp stats")) } } -func (s *MasqueSuite) ProxyClientConnect(proto, port string) { +func (s *MasqueSuite) ProxyClientConnect(proto, port string, extraArgs ...string) { + extras := "" + if len(extraArgs) > 0 { + extras = strings.Join(extraArgs, " ") + } vpp := s.Containers.VppClient.VppInstance - cmd := fmt.Sprintf("http connect proxy client enable server-uri https://%s:%s listener %s://0.0.0.0:%s interface host-%s", - s.ProxyAddr(), s.Ports.Proxy, proto, port, s.Interfaces.Client.Name()) + cmd := fmt.Sprintf("http connect proxy client enable server-uri https://%s:%s listener %s://0.0.0.0:%s interface host-%s %s", + s.ProxyAddr(), s.Ports.Proxy, proto, port, s.Interfaces.Client.Name(), extras) s.Log(vpp.Vppctl(cmd)) connected := false diff --git a/test-c/hs-test/infra/utils.go b/test-c/hs-test/infra/utils.go index 666afd538b4..3be32202673 100644 --- a/test-c/hs-test/infra/utils.go +++ b/test-c/hs-test/infra/utils.go @@ -243,7 +243,7 @@ func (s *HstSuite) CollectH2loadLogs(h2loadContainer *Container) { } func (s *HstSuite) StartHttpServer(running chan struct{}, done chan struct{}, addressPort, netNs string) { - cmd := newCommand([]string{"./http_server", addressPort, s.Ppid, s.ProcessIndex}, netNs) + cmd := CommandInNetns([]string{"./http_server", addressPort, s.Ppid, s.ProcessIndex}, netNs) err := cmd.Start() s.Log(cmd) if err != nil { @@ -260,7 +260,7 @@ func (s *HstSuite) StartWget(finished chan error, server_ip, port, query, netNs finished <- errors.New("wget error") }() - cmd := newCommand([]string{"wget", "--timeout=10", "--no-proxy", "--tries=5", "-O", "/dev/null", server_ip + ":" + port + "/" + query}, + cmd := CommandInNetns([]string{"wget", "--timeout=10", "--no-proxy", "--tries=5", "-O", "/dev/null", server_ip + ":" + port + "/" + query}, netNs) s.Log(cmd) o, err := cmd.CombinedOutput() @@ -283,7 +283,7 @@ func (s *HstSuite) StartCurl(finished chan error, uri, netNs, expectedRespCode s c := []string{"curl", "-v", "-s", "-k", "--max-time", strconv.Itoa(timeout), "-o", "/dev/null", "--noproxy", "*"} c = append(c, args...) c = append(c, uri) - cmd := newCommand(c, netNs) + cmd := CommandInNetns(c, netNs) s.Log(cmd) o, err := cmd.CombinedOutput() s.Log(string(o)) @@ -304,7 +304,7 @@ func (s *HstSuite) StartIperfClient(finished chan error, clientAddress, serverAd c := []string{"iperf3", "-c", serverAddress, "-B", clientAddress, "-J", "-l", "1460", "-b", "10g", "-p", serverPort} c = append(c, args...) - cmd := newCommand(c, netNs) + cmd := CommandInNetns(c, netNs) s.Log(cmd) o, err := cmd.CombinedOutput() if err != nil { diff --git a/test-c/hs-test/proxy_test.go b/test-c/hs-test/proxy_test.go index 5adce76a9c6..5d9fc389392 100644 --- a/test-c/hs-test/proxy_test.go +++ b/test-c/hs-test/proxy_test.go @@ -34,9 +34,12 @@ func init() { RegisterVppUdpProxyMWTests(VppProxyUdpMigrationMWTest, VppConnectUdpStressMWTest) RegisterEnvoyProxyTests(EnvoyHttpGetTcpTest, EnvoyHttpPutTcpTest) RegisterNginxProxySoloTests(NginxMirroringTest, MirrorMultiThreadTest) - RegisterMasqueTests(VppConnectProxyClientDownloadTcpTest, VppConnectProxyClientDownloadUdpTest, - VppConnectProxyClientUploadTcpTest, VppConnectProxyClientUploadUdpTest) + RegisterMasqueTests(VppConnectProxyClientDownloadUdpTest, + VppConnectProxyClientUploadUdpTest, VppConnectProxyMemLeakTest) RegisterMasqueSoloTests(VppConnectProxyIperfTcpTest, VppConnectProxyIperfUdpTest) + RegisterMasqueMWTests(VppConnectProxyIperfTcpMWTest, VppConnectProxyIperfUdpMWTest, VppConnectProxyClientUploadTcpMWTest, + VppConnectProxyClientTargetUnreachableMWTest, VppConnectProxyClientDownloadTcpMWTest, + VppConnectProxyClientStressMWTest, VppConnectProxyClientUdpIdleMWTest, VppConnectProxyClientServerClosedTcpMWTest) } func VppProxyHttpGetTcpMWTest(s *VppProxySuite) { @@ -664,7 +667,59 @@ func VppConnectUdpStressMWTest(s *VppUdpProxySuite) { vppConnectUdpStressLoad(s) } -func VppConnectProxyClientDownloadTcpTest(s *MasqueSuite) { +func vppConnectProxyClientCheckCleanup(s *MasqueSuite) { + clientVpp := s.Containers.VppClient.VppInstance + closed := false + for nTries := 0; nTries < 35; nTries++ { + o := clientVpp.Vppctl("show http connect proxy client sessions") + if !strings.Contains(o, "session [") { + closed = true + break + } + time.Sleep(1 * time.Second) + } + s.AssertEqual(closed, true) + h2Stats := clientVpp.Vppctl("show http stats") + streamsOpened := 0 + streamsClosed := 0 + lines := strings.Split(h2Stats, "\n") + for _, line := range lines { + if strings.Contains(line, "application streams opened") { + tmp := strings.Split(line, " ") + streamsOpened, _ = strconv.Atoi(tmp[1]) + } + if strings.Contains(line, "application streams closed") { + tmp := strings.Split(line, " ") + streamsClosed, _ = strconv.Atoi(tmp[1]) + } + } + // one stream for http/2 connection (parent stays open) + s.AssertEqual(streamsOpened-streamsClosed, 1) +} + +func vppConnectProxyServerCheckCleanup(s *MasqueSuite) { + o := s.Containers.VppServer.VppInstance.Vppctl("show session verbose") + s.AssertNotContains(o, "[H2]") + h2Stats := s.Containers.VppServer.VppInstance.Vppctl("show http stats") + streamsOpened := 0 + streamsClosed := 0 + lines := strings.Split(h2Stats, "\n") + for _, line := range lines { + if strings.Contains(line, "application streams opened") { + tmp := strings.Split(line, " ") + streamsOpened, _ = strconv.Atoi(tmp[1]) + } + if strings.Contains(line, "application streams closed") { + tmp := strings.Split(line, " ") + streamsClosed, _ = strconv.Atoi(tmp[1]) + } + } + s.AssertEqual(streamsOpened-streamsClosed, 0) +} + +func VppConnectProxyClientDownloadTcpMWTest(s *MasqueSuite) { + s.CpusPerVppContainer = 3 + s.SetupTest() s.StartNginxServer() clientVpp := s.Containers.VppClient.VppInstance s.ProxyClientConnect("tcp", s.Ports.NginxSsl) @@ -683,6 +738,9 @@ func VppConnectProxyClientDownloadTcpTest(s *MasqueSuite) { }() s.Log(clientVpp.Vppctl("show http connect proxy client sessions")) s.AssertNil(<-finished) + // test client initiated stream close + vppConnectProxyClientCheckCleanup(s) + vppConnectProxyServerCheckCleanup(s) } func VppConnectProxyClientDownloadUdpTest(s *MasqueSuite) { @@ -700,7 +758,9 @@ func VppConnectProxyClientDownloadUdpTest(s *MasqueSuite) { s.AssertNil(<-finished) } -func VppConnectProxyClientUploadTcpTest(s *MasqueSuite) { +func VppConnectProxyClientUploadTcpMWTest(s *MasqueSuite) { + s.CpusPerVppContainer = 3 + s.SetupTest() s.StartNginxServer() s.ProxyClientConnect("tcp", s.Ports.NginxSsl) @@ -771,9 +831,10 @@ func VppConnectProxyIperfTcpTest(s *MasqueSuite) { func VppConnectProxyIperfUdpTest(s *MasqueSuite) { s.Containers.IperfServer.Run() - s.ProxyClientConnect("udp", s.Ports.Nginx) + // test listen all, we are running solo anyway + s.ProxyClientConnect("udp", "0") clientVpp := s.Containers.VppClient.VppInstance - cmd := fmt.Sprintf("http connect proxy client listener add listener tcp://0.0.0.0:%s", s.Ports.Nginx) + cmd := fmt.Sprintf("http connect proxy client listener add listener tcp://0.0.0.0:0") s.Log(clientVpp.Vppctl(cmd)) stopServerCh := make(chan struct{}) @@ -801,3 +862,150 @@ func VppConnectProxyIperfUdpTest(s *MasqueSuite) { s.Log(clientVpp.Vppctl("show http connect proxy client sessions")) s.AssertNil(<-finished) } + +func VppConnectProxyIperfTcpMWTest(s *MasqueSuite) { + s.CpusPerVppContainer = 3 + s.SetupTest() + VppConnectProxyIperfTcpTest(s) + // test server send rst_stream (iperf data flows) + vppConnectProxyClientCheckCleanup(s) + vppConnectProxyServerCheckCleanup(s) +} + +func VppConnectProxyIperfUdpMWTest(s *MasqueSuite) { + s.CpusPerVppContainer = 3 + s.SetupTest() + VppConnectProxyIperfUdpTest(s) + clientVpp := s.Containers.VppClient.VppInstance + closed := false + for nTries := 0; nTries < 60; nTries++ { + o := clientVpp.Vppctl("show http connect proxy client sessions") + if !strings.Contains(o, "] tcp ") { + closed = true + break + } + time.Sleep(1 * time.Second) + } + s.AssertEqual(closed, true) +} + +func VppConnectProxyClientTargetUnreachableMWTest(s *MasqueSuite) { + s.CpusPerVppContainer = 3 + s.SetupTest() + s.StartNginxServer() + s.ProxyClientConnect("tcp", s.Ports.Unused) + + uri := fmt.Sprintf("https://%s:%s/httpTestFile", s.NginxAddr(), s.Ports.Unused) + finished := make(chan error, 1) + go func() { + defer GinkgoRecover() + s.StartCurl(finished, uri, s.NetNamespaces.Client, "200", 30, []string{"--http1.1"}) + }() + s.AssertNotNil(<-finished) + + vppConnectProxyClientCheckCleanup(s) +} + +func VppConnectProxyClientServerClosedTcpMWTest(s *MasqueSuite) { + s.CpusPerVppContainer = 3 + s.SetupTest() + s.StartNginxServer() + clientVpp := s.Containers.VppClient.VppInstance + s.ProxyClientConnect("tcp", s.Ports.Nginx) + + uri := fmt.Sprintf("http://%s:%s/64B", s.NginxAddr(), s.Ports.Nginx) + finished := make(chan error, 1) + go func() { + defer GinkgoRecover() + // run http/1.0 so server start closing + s.StartCurl(finished, uri, s.NetNamespaces.Client, "200", 30, []string{"--http1.0"}) + }() + s.Log(clientVpp.Vppctl("show http connect proxy client sessions")) + s.AssertNil(<-finished) + // test server initiated stream close + vppConnectProxyClientCheckCleanup(s) + vppConnectProxyServerCheckCleanup(s) +} + +func VppConnectProxyClientUdpIdleMWTest(s *MasqueSuite) { + s.CpusPerVppContainer = 3 + s.SetupTest() + s.StartNginxServer() + s.ProxyClientConnect("udp", s.Ports.NginxSsl, "udp-idle-timeout 5") + + uri := fmt.Sprintf("https://%s:%s/64B", s.NginxAddr(), s.Ports.NginxSsl) + finished := make(chan error, 1) + go func() { + defer GinkgoRecover() + s.StartCurl(finished, uri, s.NetNamespaces.Client, "200", 30, []string{"--http3-only"}) + }() + s.AssertNil(<-finished) + + vppConnectProxyClientCheckCleanup(s) + vppConnectProxyServerCheckCleanup(s) +} + +func VppConnectProxyMemLeakTest(s *MasqueSuite) { + s.SkipUnlessLeakCheck() + + s.StartNginxServer() + s.ProxyClientConnect("tcp", s.Ports.Nginx) + + clientVpp := s.Containers.VppClient.VppInstance + serverVpp := s.Containers.VppServer.VppInstance + /* no goVPP less noise */ + clientVpp.Disconnect() + serverVpp.Disconnect() + + uri := fmt.Sprintf("http://%s:%s/64B", s.NginxAddr(), s.Ports.Nginx) + + /* warmup requests (FIB, pool allocations) */ + finished := make(chan error, 1) + go func() { + defer GinkgoRecover() + // run http/1.0 so server start closing + s.StartCurl(finished, uri, s.NetNamespaces.Client, "200", 30, []string{"--http1.1"}) + }() + s.AssertNil(<-finished) + + /* let's give it some time to clean up sessions, so pool elements can be reused and we have less noise */ + vppConnectProxyClientCheckCleanup(s) + vppConnectProxyServerCheckCleanup(s) + + clientVpp.EnableMemoryTrace() + clientTraces1, err := clientVpp.GetMemoryTrace() + s.AssertNil(err, fmt.Sprint(err)) + + finished = make(chan error, 1) + go func() { + defer GinkgoRecover() + // run http/1.0 so server start closing + s.StartCurl(finished, uri, s.NetNamespaces.Client, "200", 30, []string{"--http1.1"}) + }() + s.AssertNil(<-finished) + + /* let's give it some time to clean up sessions */ + vppConnectProxyClientCheckCleanup(s) + vppConnectProxyServerCheckCleanup(s) + + clientTraces2, err := clientVpp.GetMemoryTrace() + s.AssertNil(err, fmt.Sprint(err)) + clientVpp.MemLeakCheck(clientTraces1, clientTraces2) +} + +func VppConnectProxyClientStressMWTest(s *MasqueSuite) { + s.CpusPerVppContainer = 3 + s.SetupTest() + s.StartNginxServer() + s.ProxyClientConnect("tcp", s.Ports.Nginx) + + // try to open more tunnels than SETTINGS_MAX_CONCURRENT_STREAMS, (100 - 1 for parent), to test failed http connects + uri := fmt.Sprintf("http://%s:%s/64B", s.NginxAddr(), s.Ports.Nginx) + cmd := CommandInNetns([]string{"ab", "-q", "-l", "-n", "10000", "-c", "102", "-s", "5", "-r", uri}, s.NetNamespaces.Client) + s.Log(cmd) + res, _ := cmd.CombinedOutput() + s.Log(string(res)) + + vppConnectProxyClientCheckCleanup(s) + vppConnectProxyServerCheckCleanup(s) +} diff --git a/test-c/hs-test/resources/nginx/nginx_masque.conf b/test-c/hs-test/resources/nginx/nginx_masque.conf index 23773292e76..eeba46c51aa 100644 --- a/test-c/hs-test/resources/nginx/nginx_masque.conf +++ b/test-c/hs-test/resources/nginx/nginx_masque.conf @@ -33,5 +33,8 @@ http { create_full_put_path off; dav_access all:rw; } + location /64B { + return 200 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'; + } } }