session tcp: track half open in app wrk 35/26535/9
authorFlorin Coras <fcoras@cisco.com>
Thu, 16 Apr 2020 04:30:22 +0000 (04:30 +0000)
committerDave Barach <openvpp@barachs.net>
Fri, 17 Apr 2020 14:58:23 +0000 (14:58 +0000)
Type: improvement

Do extra checks when establishing an active connect and cleanup pending
connects if application detaches.

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Ibe9349db57b313ba2aa5ea3960ef5cf755f5098a

src/vnet/session/application.h
src/vnet/session/application_worker.c
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_types.h
src/vnet/session/transport.c
src/vnet/session/transport.h
src/vnet/session/transport_types.h
src/vnet/tcp/tcp.c
src/vnet/tcp/tcp_input.c

index 00b60c3..377ea1e 100644 (file)
@@ -63,6 +63,9 @@ typedef struct app_worker_
   u32 api_client_index;
 
   u8 app_is_builtin;
+
+  /** Per transport proto hash tables of half-open connection handles */
+  uword **half_open_table;
 } app_worker_t;
 
 typedef struct app_worker_map_
@@ -251,6 +254,13 @@ int app_worker_accept_notify (app_worker_t * app_wrk, session_t * s);
 int app_worker_init_connected (app_worker_t * app_wrk, session_t * s);
 int app_worker_connect_notify (app_worker_t * app_wrk, session_t * s,
                               session_error_t err, u32 opaque);
+int app_worker_add_half_open (app_worker_t * app_wrk, transport_proto_t tp,
+                             session_handle_t ho_handle,
+                             session_handle_t wrk_handle);
+int app_worker_del_half_open (app_worker_t * app_wrk, transport_proto_t tp,
+                             session_handle_t ho_handle);
+u64 app_worker_lookup_half_open (app_worker_t * app_wrk, transport_proto_t tp,
+                                session_handle_t ho_handle);
 int app_worker_close_notify (app_worker_t * app_wrk, session_t * s);
 int app_worker_transport_closed_notify (app_worker_t * app_wrk,
                                        session_t * s);
index 47b1567..c67aa88 100644 (file)
@@ -75,6 +75,8 @@ app_worker_free (app_worker_t * app_wrk)
   }));
   /* *INDENT-ON* */
 
+  hash_free (app_wrk->listeners_table);
+
   for (i = 0; i < vec_len (handles); i++)
     {
       a->app_index = app->app_index;
@@ -83,6 +85,7 @@ app_worker_free (app_worker_t * app_wrk)
       /* seg manager is removed when unbind completes */
       (void) vnet_unlisten (a);
     }
+  vec_reset_length (handles);
 
   /*
    * Connects segment manager cleanup
@@ -96,6 +99,31 @@ app_worker_free (app_worker_t * app_wrk)
       segment_manager_init_free (sm);
     }
 
+  /*
+   * Half-open cleanup
+   */
+
+  for (i = 0; i < vec_len (app_wrk->half_open_table); i++)
+    {
+      if (!app_wrk->half_open_table[i])
+       continue;
+
+      /* *INDENT-OFF* */
+      hash_foreach (handle, sm_index, app_wrk->half_open_table[i], ({
+       vec_add1 (handles, handle);
+      }));
+      /* *INDENT-ON* */
+
+      for (i = 0; i < vec_len (handles); i++)
+       session_cleanup_half_open (i, handles[i]);
+
+      hash_free (app_wrk->half_open_table[i]);
+      vec_reset_length (handles);
+    }
+
+  vec_free (app_wrk->half_open_table);
+  vec_free (handles);
+
   /* If first segment manager is used by a listener */
   if (app_wrk->first_segment_manager != APP_INVALID_SEGMENT_MANAGER_INDEX
       && app_wrk->first_segment_manager != app_wrk->connects_seg_manager)
@@ -334,6 +362,40 @@ app_worker_connect_notify (app_worker_t * app_wrk, session_t * s,
                                                 s, err);
 }
 
+int
+app_worker_add_half_open (app_worker_t * app_wrk, transport_proto_t tp,
+                         session_handle_t ho_handle,
+                         session_handle_t wrk_handle)
+{
+  ASSERT (vlib_get_thread_index () == 0);
+  vec_validate (app_wrk->half_open_table, tp);
+  hash_set (app_wrk->half_open_table[tp], ho_handle, wrk_handle);
+  return 0;
+}
+
+int
+app_worker_del_half_open (app_worker_t * app_wrk, transport_proto_t tp,
+                         session_handle_t ho_handle)
+{
+  ASSERT (vlib_get_thread_index () == 0);
+  hash_unset (app_wrk->half_open_table[tp], ho_handle);
+  return 0;
+}
+
+u64
+app_worker_lookup_half_open (app_worker_t * app_wrk, transport_proto_t tp,
+                            session_handle_t ho_handle)
+{
+  u64 *ho_wrk_handlep;
+
+  /* No locking because all updates are done from main thread */
+  ho_wrk_handlep = hash_get (app_wrk->half_open_table[tp], ho_handle);
+  if (!ho_wrk_handlep)
+    return SESSION_INVALID_HANDLE;
+
+  return *ho_wrk_handlep;
+}
+
 int
 app_worker_close_notify (app_worker_t * app_wrk, session_t * s)
 {
index ed940d5..a709302 100644 (file)
@@ -282,6 +282,20 @@ session_delete (session_t * s)
   session_free_w_fifos (s);
 }
 
+void
+session_cleanup_half_open (transport_proto_t tp, session_handle_t ho_handle)
+{
+  transport_cleanup_half_open (tp, session_handle_index (ho_handle));
+}
+
+void
+session_half_open_delete_notify (transport_proto_t tp,
+                                session_handle_t ho_handle)
+{
+  app_worker_t *app_wrk = app_worker_get (session_handle_data (ho_handle));
+  app_worker_del_half_open (app_wrk, tp, ho_handle);
+}
+
 session_t *
 session_alloc_for_connection (transport_connection_t * tc)
 {
@@ -738,10 +752,10 @@ int
 session_stream_connect_notify (transport_connection_t * tc,
                               session_error_t err)
 {
+  session_handle_t ho_handle, wrk_handle;
   u32 opaque = 0, new_ti, new_si;
   app_worker_t *app_wrk;
   session_t *s = 0;
-  u64 ho_handle;
 
   /*
    * Find connection handle and cleanup half-open table
@@ -757,11 +771,19 @@ session_stream_connect_notify (transport_connection_t * tc,
   /* Get the app's index from the handle we stored when opening connection
    * and the opaque (api_context for external apps) from transport session
    * index */
-  app_wrk = app_worker_get_if_valid (ho_handle >> 32);
+  app_wrk = app_worker_get_if_valid (session_handle_data (ho_handle));
   if (!app_wrk)
     return -1;
 
-  opaque = tc->s_index;
+  wrk_handle = app_worker_lookup_half_open (app_wrk, tc->proto, ho_handle);
+  if (wrk_handle == SESSION_INVALID_HANDLE)
+    return -1;
+
+  /* Make sure this is the same half-open index */
+  if (session_handle_index (wrk_handle) != session_handle_index (ho_handle))
+    return -1;
+
+  opaque = session_handle_data (wrk_handle);
 
   if (err)
     return app_worker_connect_notify (app_wrk, s, err, opaque);
@@ -1183,7 +1205,7 @@ session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
 {
   transport_connection_t *tc;
   transport_endpoint_cfg_t *tep;
-  u64 handle;
+  u64 handle, wrk_handle;
   int rv;
 
   tep = session_endpoint_to_transport_cfg (rmt);
@@ -1203,13 +1225,20 @@ session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
    * is needed when the connect notify comes and we have to notify the
    * external app
    */
-  handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index;
+  handle = session_make_handle (tc->c_index, app_wrk_index);
   session_lookup_add_half_open (tc, handle);
 
-  /* Store api_context (opaque) for when the reply comes. Not the nicest
-   * thing but better than allocating a separate half-open pool.
+  /* Store the half-open handle in the connection. Transport will use it
+   * when cleaning up @ref session_half_open_delete_notify
+   */
+  tc->s_ho_handle = handle;
+
+  /* Track the half-open connections in case we want to forcefully
+   * clean them up @ref session_cleanup_half_open
    */
-  tc->s_index = opaque;
+  wrk_handle = session_make_handle (tc->c_index, opaque);
+  app_worker_add_half_open (app_worker_get (app_wrk_index),
+                           rmt->transport_proto, handle, wrk_handle);
 
   return 0;
 }
index 956bff0..5e0a832 100644 (file)
@@ -293,6 +293,8 @@ session_evt_alloc_old (session_worker_t * wrk)
 session_t *session_alloc (u32 thread_index);
 void session_free (session_t * s);
 void session_free_w_fifos (session_t * s);
+void session_cleanup_half_open (transport_proto_t tp,
+                               session_handle_t ho_handle);
 u8 session_is_valid (u32 si, u8 thread_index);
 
 always_inline session_t *
@@ -462,6 +464,8 @@ int session_dgram_connect_notify (transport_connection_t * tc,
 int session_stream_accept_notify (transport_connection_t * tc);
 void session_transport_closing_notify (transport_connection_t * tc);
 void session_transport_delete_notify (transport_connection_t * tc);
+void session_half_open_delete_notify (transport_proto_t tp,
+                                     session_handle_t ho_handle);
 void session_transport_closed_notify (transport_connection_t * tc);
 void session_transport_reset_notify (transport_connection_t * tc);
 int session_stream_accept (transport_connection_t * tc, u32 listener_index,
index cdaaad0..784312d 100644 (file)
@@ -309,9 +309,21 @@ session_parse_handle (session_handle_t handle, u32 * index,
 }
 
 static inline session_handle_t
-session_make_handle (u32 session_index, u32 thread_index)
+session_make_handle (u32 session_index, u32 data)
 {
-  return (((u64) thread_index << 32) | (u64) session_index);
+  return (((u64) data << 32) | (u64) session_index);
+}
+
+always_inline u32
+session_handle_index (session_handle_t ho_handle)
+{
+  return (ho_handle & 0xffffffff);
+}
+
+always_inline u32
+session_handle_data (session_handle_t ho_handle)
+{
+  return (ho_handle >> 32);
 }
 
 typedef enum
index d2c6594..9dd495c 100644 (file)
@@ -303,6 +303,13 @@ transport_cleanup (transport_proto_t tp, u32 conn_index, u8 thread_index)
   tp_vfts[tp].cleanup (conn_index, thread_index);
 }
 
+void
+transport_cleanup_half_open (transport_proto_t tp, u32 conn_index)
+{
+  if (tp_vfts[tp].cleanup)
+    tp_vfts[tp].cleanup_ho (conn_index);
+}
+
 int
 transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep)
 {
index 954db49..9c873b1 100644 (file)
@@ -76,6 +76,7 @@ typedef struct _transport_proto_vft
   void (*close) (u32 conn_index, u32 thread_index);
   void (*reset) (u32 conn_index, u32 thread_index);
   void (*cleanup) (u32 conn_index, u32 thread_index);
+  void (*cleanup_ho) (u32 conn_index);
   clib_error_t *(*enable) (vlib_main_t * vm, u8 is_en);
 
   /*
@@ -137,6 +138,7 @@ u32 transport_start_listen (transport_proto_t tp, u32 session_index,
 u32 transport_stop_listen (transport_proto_t tp, u32 conn_index);
 void transport_cleanup (transport_proto_t tp, u32 conn_index,
                        u8 thread_index);
+void transport_cleanup_half_open (transport_proto_t tp, u32 conn_index);
 void transport_get_endpoint (transport_proto_t tp, u32 conn_index,
                             u32 thread_index, transport_endpoint_t * tep,
                             u8 is_lcl);
index 28043b5..d76970f 100644 (file)
@@ -144,6 +144,8 @@ typedef struct _transport_connection
 #define c_stats connection.stats
 #define c_pacer connection.pacer
 #define c_flags connection.flags
+#define s_ho_handle pacer.bucket
+#define c_s_ho_handle connection.pacer.bucket
 } transport_connection_t;
 
 STATIC_ASSERT (STRUCT_OFFSET_OF (transport_connection_t, s_index)
index e4335e0..7798009 100644 (file)
@@ -186,7 +186,7 @@ tcp_session_get_listener (u32 listener_index)
  *
  */
 static void
-tcp_half_open_connection_del (tcp_connection_t * tc)
+tcp_half_open_connection_free (tcp_connection_t * tc)
 {
   tcp_main_t *tm = vnet_get_tcp_main ();
   clib_spinlock_lock_if_init (&tm->half_open_lock);
@@ -214,9 +214,10 @@ tcp_half_open_connection_cleanup (tcp_connection_t * tc)
   if (tc->c_thread_index != vlib_get_thread_index ())
     return 1;
 
+  session_half_open_delete_notify (TRANSPORT_PROTO_TCP, tc->c_s_ho_handle);
   wrk = tcp_get_worker (tc->c_thread_index);
   tcp_timer_reset (&wrk->timer_wheel, tc, TCP_TIMER_RETRANSMIT_SYN);
-  tcp_half_open_connection_del (tc);
+  tcp_half_open_connection_free (tc);
   return 0;
 }
 
@@ -443,6 +444,18 @@ tcp_session_cleanup (u32 conn_index, u32 thread_index)
   tcp_connection_cleanup (tc);
 }
 
+static void
+tcp_session_cleanup_ho (u32 conn_index)
+{
+  tcp_worker_ctx_t *wrk;
+  tcp_connection_t *tc;
+
+  tc = tcp_half_open_connection_get (conn_index);
+  wrk = tcp_get_worker (tc->c_thread_index);
+  tcp_timer_reset (&wrk->timer_wheel, tc, TCP_TIMER_RETRANSMIT_SYN);
+  tcp_half_open_connection_free (tc);
+}
+
 static void
 tcp_session_reset (u32 conn_index, u32 thread_index)
 {
@@ -1148,6 +1161,7 @@ const static transport_proto_vft_t tcp_proto = {
   .connect = tcp_session_open,
   .close = tcp_session_close,
   .cleanup = tcp_session_cleanup,
+  .cleanup_ho = tcp_session_cleanup_ho,
   .reset = tcp_session_reset,
   .send_params = tcp_session_send_params,
   .update_time = tcp_update_time,
index b82eab0..964afe3 100755 (executable)
@@ -1948,11 +1948,6 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
       new_tc0->timers[TCP_TIMER_RETRANSMIT_SYN] = TCP_TIMER_HANDLE_INVALID;
       new_tc0->sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_RX];
 
-      /* If this is not the owning thread, wait for syn retransmit to
-       * expire and cleanup then */
-      if (tcp_half_open_connection_cleanup (tc0))
-       tc0->flags |= TCP_CONN_HALF_OPEN_DONE;
-
       if (tcp_opts_tstamp (&new_tc0->rcv_opts))
        {
          new_tc0->tsval_recent = new_tc0->rcv_opts.tsval;
@@ -1991,7 +1986,7 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
              tcp_send_reset_w_pkt (new_tc0, b0, my_thread_index, is_ip4);
              tcp_connection_cleanup (new_tc0);
              error0 = TCP_ERROR_CREATE_SESSION_FAIL;
-             goto drop;
+             goto cleanup_ho;
            }
 
          new_tc0->tx_fifo_size =
@@ -2014,7 +2009,7 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
              tcp_send_reset_w_pkt (tc0, b0, my_thread_index, is_ip4);
              TCP_EVT (TCP_EVT_RST_SENT, tc0);
              error0 = TCP_ERROR_CREATE_SESSION_FAIL;
-             goto drop;
+             goto cleanup_ho;
            }
 
          new_tc0->tx_fifo_size =
@@ -2023,7 +2018,7 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
          tcp_init_snd_vars (new_tc0);
          tcp_send_synack (new_tc0);
          error0 = TCP_ERROR_SYNS_RCVD;
-         goto drop;
+         goto cleanup_ho;
        }
 
       if (!(new_tc0->cfg_flags & TCP_CFG_F_NO_TSO))
@@ -2044,6 +2039,13 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
          tcp_send_ack (new_tc0);
        }
 
+    cleanup_ho:
+
+      /* If this is not the owning thread, wait for syn retransmit to
+       * expire and cleanup then */
+      if (tcp_half_open_connection_cleanup (tc0))
+       tc0->flags |= TCP_CONN_HALF_OPEN_DONE;
+
     drop:
 
       tcp_inc_counter (syn_sent, error0, 1);