session: use half-open sessions for vc establishment 58/32258/14
authorFlorin Coras <fcoras@cisco.com>
Sat, 8 May 2021 02:39:43 +0000 (19:39 -0700)
committerFlorin Coras <florin.coras@gmail.com>
Mon, 10 May 2021 20:53:32 +0000 (20:53 +0000)
Use half-open sessions to track virtual circuit connection
establishment. These sesssions can only be allocated and freed by the
thread that allocates half-open connections (main). Consequently, they
can only be freed on half-open cleanup notifications from transports.

Goal is to simplify state tracking within the session layer but it's
also a first step towards allowing builtin apps to track and cleanup
outstanding connects.

Type: improvement

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I8a535906d13eb7f8966deb82333839de80f8049f

src/vnet/session/application.h
src/vnet/session/application_worker.c
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_cli.c
src/vnet/session/session_types.h
src/vnet/session/transport.c
src/vnet/session/transport_types.h
src/vnet/tcp/tcp.c

index a8ddfec..5beb813 100644 (file)
@@ -61,8 +61,8 @@ typedef struct app_worker_
 
   u8 app_is_builtin;
 
-  /** Per transport proto hash tables of half-open connection handles */
-  uword **half_open_table;
+  /** Pool of half-open session handles. Tracked in case worker detaches */
+  session_handle_t *half_open_table;
 
   /** Protects detached seg managers */
   clib_spinlock_t detached_seg_managers_lock;
@@ -313,13 +313,8 @@ int app_worker_accept_notify (app_worker_t * app_wrk, session_t * s);
 int app_worker_init_connected (app_worker_t * app_wrk, session_t * s);
 int app_worker_connect_notify (app_worker_t * app_wrk, session_t * s,
                               session_error_t err, u32 opaque);
-int app_worker_add_half_open (app_worker_t * app_wrk, transport_proto_t tp,
-                             session_handle_t ho_handle,
-                             session_handle_t wrk_handle);
-int app_worker_del_half_open (app_worker_t * app_wrk, transport_proto_t tp,
-                             session_handle_t ho_handle);
-u64 app_worker_lookup_half_open (app_worker_t * app_wrk, transport_proto_t tp,
-                                session_handle_t ho_handle);
+int app_worker_add_half_open (app_worker_t *app_wrk, session_handle_t sh);
+int app_worker_del_half_open (app_worker_t *app_wrk, u32 ho_index);
 int app_worker_close_notify (app_worker_t * app_wrk, session_t * s);
 int app_worker_transport_closed_notify (app_worker_t * app_wrk,
                                        session_t * s);
index 7e03171..13a3ede 100644 (file)
@@ -58,9 +58,10 @@ app_worker_free (app_worker_t * app_wrk)
   vnet_unlisten_args_t _a, *a = &_a;
   u64 handle, *handles = 0, *sm_indices = 0;
   segment_manager_t *sm;
+  session_handle_t *sh;
   session_t *ls;
   u32 sm_index;
-  int i, j;
+  int i;
 
   /*
    *  Listener cleanup
@@ -110,26 +111,10 @@ app_worker_free (app_worker_t * app_wrk)
    * Half-open cleanup
    */
 
-  for (i = 0; i < vec_len (app_wrk->half_open_table); i++)
-    {
-      if (!app_wrk->half_open_table[i])
-       continue;
-
-      /* *INDENT-OFF* */
-      hash_foreach (handle, sm_index, app_wrk->half_open_table[i], ({
-       vec_add1 (handles, handle);
-      }));
-      /* *INDENT-ON* */
+  pool_foreach (sh, app_wrk->half_open_table)
+    session_cleanup_half_open (*sh);
 
-      for (j = 0; j < vec_len (handles); j++)
-       session_cleanup_half_open (i, handles[j]);
-
-      hash_free (app_wrk->half_open_table[i]);
-      vec_reset_length (handles);
-    }
-
-  vec_free (app_wrk->half_open_table);
-  vec_free (handles);
+  pool_free (app_wrk->half_open_table);
 
   /*
    * Detached listener segment managers cleanup
@@ -396,39 +381,25 @@ app_worker_connect_notify (app_worker_t * app_wrk, session_t * s,
 }
 
 int
-app_worker_add_half_open (app_worker_t * app_wrk, transport_proto_t tp,
-                         session_handle_t ho_handle,
-                         session_handle_t wrk_handle)
+app_worker_add_half_open (app_worker_t *app_wrk, session_handle_t sh)
 {
+  session_handle_t *shp;
+
   ASSERT (vlib_get_thread_index () == 0);
-  vec_validate (app_wrk->half_open_table, tp);
-  hash_set (app_wrk->half_open_table[tp], ho_handle, wrk_handle);
-  return 0;
+  pool_get (app_wrk->half_open_table, shp);
+  *shp = sh;
+
+  return (shp - app_wrk->half_open_table);
 }
 
 int
-app_worker_del_half_open (app_worker_t * app_wrk, transport_proto_t tp,
-                         session_handle_t ho_handle)
+app_worker_del_half_open (app_worker_t *app_wrk, u32 ho_index)
 {
   ASSERT (vlib_get_thread_index () == 0);
-  hash_unset (app_wrk->half_open_table[tp], ho_handle);
+  pool_put_index (app_wrk->half_open_table, ho_index);
   return 0;
 }
 
-u64
-app_worker_lookup_half_open (app_worker_t * app_wrk, transport_proto_t tp,
-                            session_handle_t ho_handle)
-{
-  u64 *ho_wrk_handlep;
-
-  /* No locking because all updates are done from main thread */
-  ho_wrk_handlep = hash_get (app_wrk->half_open_table[tp], ho_handle);
-  if (!ho_wrk_handlep)
-    return SESSION_INVALID_HANDLE;
-
-  return *ho_wrk_handlep;
-}
-
 int
 app_worker_close_notify (app_worker_t * app_wrk, session_t * s)
 {
index 7fe35c2..1fac5ed 100644 (file)
@@ -251,6 +251,10 @@ session_is_valid (u32 si, u8 thread_index)
       || s->session_state <= SESSION_STATE_LISTENING)
     return 1;
 
+  if (s->session_state == SESSION_STATE_CONNECTING &&
+      (s->flags & SESSION_F_HALF_OPEN))
+    return 1;
+
   tc = session_get_transport (s);
   if (s->connection_index != tc->c_index
       || s->thread_index != tc->thread_index || tc->s_index != si)
@@ -296,17 +300,23 @@ session_delete (session_t * s)
 }
 
 void
-session_cleanup_half_open (transport_proto_t tp, session_handle_t ho_handle)
+session_cleanup_half_open (session_handle_t ho_handle)
 {
-  transport_cleanup_half_open (tp, session_handle_index (ho_handle));
+  session_t *s = session_get_from_handle (ho_handle);
+  transport_cleanup_half_open (session_get_transport_proto (s),
+                              s->connection_index);
 }
 
 void
-session_half_open_delete_notify (transport_proto_t tp,
-                                session_handle_t ho_handle)
+session_half_open_delete_notify (transport_connection_t *tc)
 {
-  app_worker_t *app_wrk = app_worker_get (session_handle_data (ho_handle));
-  app_worker_del_half_open (app_wrk, tp, ho_handle);
+  app_worker_t *app_wrk;
+  session_t *s;
+
+  s = session_get (tc->s_index, tc->thread_index);
+  app_wrk = app_worker_get (s->app_wrk_index);
+  app_worker_del_half_open (app_wrk, s->ho_index);
+  session_free (s);
 }
 
 session_t *
@@ -328,6 +338,18 @@ session_alloc_for_connection (transport_connection_t * tc)
   return s;
 }
 
+static session_t *
+session_alloc_for_half_open (transport_connection_t *tc)
+{
+  session_t *s;
+
+  s = ho_session_alloc ();
+  s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
+  s->connection_index = tc->c_index;
+  tc->s_index = s->session_index;
+  return s;
+}
+
 /**
  * Discards bytes from buffer chain
  *
@@ -797,39 +819,21 @@ int
 session_stream_connect_notify (transport_connection_t * tc,
                               session_error_t err)
 {
-  session_handle_t ho_handle, wrk_handle;
   u32 opaque = 0, new_ti, new_si;
   app_worker_t *app_wrk;
-  session_t *s = 0;
+  session_t *s = 0, *ho;
 
   /*
-   * Find connection handle and cleanup half-open table
+   * Cleanup half-open table
    */
-  ho_handle = session_lookup_half_open_handle (tc);
-  if (ho_handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
-    {
-      SESSION_DBG ("half-open was removed!");
-      return -1;
-    }
   session_lookup_del_half_open (tc);
 
-  /* Get the app's index from the handle we stored when opening connection
-   * and the opaque (api_context for external apps) from transport session
-   * index */
-  app_wrk = app_worker_get_if_valid (session_handle_data (ho_handle));
+  ho = ho_session_get (tc->s_index);
+  opaque = ho->opaque;
+  app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
   if (!app_wrk)
     return -1;
 
-  wrk_handle = app_worker_lookup_half_open (app_wrk, tc->proto, ho_handle);
-  if (wrk_handle == SESSION_INVALID_HANDLE)
-    return -1;
-
-  /* Make sure this is the same half-open index */
-  if (session_handle_index (wrk_handle) != session_handle_index (ho_handle))
-    return -1;
-
-  opaque = session_handle_data (wrk_handle);
-
   if (err)
     return app_worker_connect_notify (app_wrk, s, err, opaque);
 
@@ -1258,7 +1262,8 @@ session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
 {
   transport_connection_t *tc;
   transport_endpoint_cfg_t *tep;
-  u64 handle, wrk_handle;
+  app_worker_t *app_wrk;
+  session_t *s;
   int rv;
 
   tep = session_endpoint_to_transport_cfg (rmt);
@@ -1271,27 +1276,22 @@ session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
 
   tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
 
-  /* If transport offers a stream service, only allocate session once the
-   * connection has been established.
-   * Add connection to half-open table and save app and tc index. The
-   * latter is needed to help establish the connection while the former
-   * is needed when the connect notify comes and we have to notify the
-   * external app
-   */
-  handle = session_make_handle (tc->c_index, app_wrk_index);
-  session_lookup_add_half_open (tc, handle);
+  app_wrk = app_worker_get (app_wrk_index);
 
-  /* Store the half-open handle in the connection. Transport will use it
-   * when cleaning up @ref session_half_open_delete_notify
+  /* If transport offers a vc service, only allocate established
+   * session once the connection has been established.
+   * In the meantime allocate half-open session for tracking purposes
+   * associate half-open connection to it and add session to app-worker
+   * half-open table. These are needed to allocate the established
+   * session on transport notification, and to cleanup the half-open
+   * session if the app detaches before connection establishment.
    */
-  tc->s_ho_handle = handle;
+  s = session_alloc_for_half_open (tc);
+  s->app_wrk_index = app_wrk->wrk_index;
+  s->ho_index = app_worker_add_half_open (app_wrk, session_handle (s));
+  s->opaque = opaque;
 
-  /* Track the half-open connections in case we want to forcefully
-   * clean them up @ref session_cleanup_half_open
-   */
-  wrk_handle = session_make_handle (tc->c_index, opaque);
-  app_worker_add_half_open (app_worker_get (app_wrk_index),
-                           rmt->transport_proto, handle, wrk_handle);
+  session_lookup_add_half_open (tc, tc->c_index);
 
   return 0;
 }
index 8b59133..1a59d7d 100644 (file)
@@ -331,8 +331,7 @@ int session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq);
 session_t *session_alloc (u32 thread_index);
 void session_free (session_t * s);
 void session_free_w_fifos (session_t * s);
-void session_cleanup_half_open (transport_proto_t tp,
-                               session_handle_t ho_handle);
+void session_cleanup_half_open (session_handle_t ho_handle);
 u8 session_is_valid (u32 si, u8 thread_index);
 
 always_inline session_t *
@@ -504,8 +503,7 @@ int session_dgram_connect_notify (transport_connection_t * tc,
 int session_stream_accept_notify (transport_connection_t * tc);
 void session_transport_closing_notify (transport_connection_t * tc);
 void session_transport_delete_notify (transport_connection_t * tc);
-void session_half_open_delete_notify (transport_proto_t tp,
-                                     session_handle_t ho_handle);
+void session_half_open_delete_notify (transport_connection_t *tc);
 void session_transport_closed_notify (transport_connection_t * tc);
 void session_transport_reset_notify (transport_connection_t * tc);
 int session_stream_accept (transport_connection_t * tc, u32 listener_index,
@@ -656,6 +654,30 @@ listen_session_free (session_t * s)
   session_free (s);
 }
 
+always_inline session_t *
+ho_session_alloc (void)
+{
+  session_t *s;
+  ASSERT (vlib_get_thread_index () == 0);
+  s = session_alloc (0);
+  s->session_state = SESSION_STATE_CONNECTING;
+  s->flags |= SESSION_F_HALF_OPEN;
+  return s;
+}
+
+always_inline session_t *
+ho_session_get (u32 ho_index)
+{
+  return session_get (ho_index, 0 /* half-open thread */);
+}
+
+always_inline void
+ho_session_free (session_t *s)
+{
+  ASSERT (!s->rx_fifo && s->thread_index == 0);
+  session_free (s);
+}
+
 transport_connection_t *listen_session_get_transport (session_t * s);
 
 /*
index 2ca9630..1505a95 100644 (file)
@@ -144,8 +144,12 @@ format_session (u8 * s, va_list * args)
     }
   else if (ss->session_state == SESSION_STATE_CONNECTING)
     {
-      s = format (s, "%-40U%v", format_transport_half_open_connection,
-                 tp, ss->connection_index, ss->thread_index, str);
+      if (ss->flags & SESSION_F_HALF_OPEN)
+       s = format (s, "%U%v", format_transport_half_open_connection, tp,
+                   ss->connection_index, ss->thread_index, verbose, str);
+      else
+       s = format (s, "%U", format_transport_connection, tp,
+                   ss->connection_index, ss->thread_index, verbose);
     }
   else
     {
index c8b1d2e..2c3db5f 100644 (file)
@@ -148,13 +148,14 @@ typedef enum
     SESSION_N_STATES,
 } session_state_t;
 
-#define foreach_session_flag                           \
-  _(RX_EVT, "rx-event")                                        \
-  _(PROXY, "proxy")                                    \
-  _(CUSTOM_TX, "custom-tx")                            \
-  _(IS_MIGRATING, "migrating")                         \
-  _(UNIDIRECTIONAL, "unidirectional")                  \
-  _(CUSTOM_FIFO_TUNING, "custom-fifo-tuning")          \
+#define foreach_session_flag                                                  \
+  _ (RX_EVT, "rx-event")                                                      \
+  _ (PROXY, "proxy")                                                          \
+  _ (CUSTOM_TX, "custom-tx")                                                  \
+  _ (IS_MIGRATING, "migrating")                                               \
+  _ (UNIDIRECTIONAL, "unidirectional")                                        \
+  _ (CUSTOM_FIFO_TUNING, "custom-fifo-tuning")                                \
+  _ (HALF_OPEN, "half-open")
 
 typedef enum session_flags_bits_
 {
@@ -208,6 +209,9 @@ typedef struct session_
 
     /** App listener index in app's listener pool if a listener */
     u32 al_index;
+
+    /** Index in app worker's half-open table if a half-open */
+    u32 ho_index;
   };
 
   /** Opaque, for general use */
index a420dcd..18ae3ca 100644 (file)
@@ -124,14 +124,14 @@ u8 *
 format_transport_half_open_connection (u8 * s, va_list * args)
 {
   u32 transport_proto = va_arg (*args, u32);
-  u32 listen_index = va_arg (*args, u32);
+  u32 ho_index = va_arg (*args, u32);
   transport_proto_vft_t *tp_vft;
 
   tp_vft = transport_protocol_get_vft (transport_proto);
   if (!tp_vft)
     return s;
 
-  s = format (s, "%U", tp_vft->format_half_open, listen_index);
+  s = format (s, "%U", tp_vft->format_half_open, ho_index);
   return s;
 }
 
index 7ea8d5f..75fd1b8 100644 (file)
@@ -147,7 +147,6 @@ typedef struct _transport_connection
 #define c_pacer connection.pacer
 #define c_flags connection.flags
 #define s_ho_handle pacer.bytes_per_sec
-#define c_s_ho_handle connection.pacer.bytes_per_sec
 } transport_connection_t;
 
 STATIC_ASSERT (STRUCT_OFFSET_OF (transport_connection_t, s_index)
index a6c1e21..90b832c 100644 (file)
@@ -214,7 +214,7 @@ tcp_half_open_connection_cleanup (tcp_connection_t * tc)
   if (tc->c_thread_index != vlib_get_thread_index ())
     return 1;
 
-  session_half_open_delete_notify (TRANSPORT_PROTO_TCP, tc->c_s_ho_handle);
+  session_half_open_delete_notify (&tc->connection);
   wrk = tcp_get_worker (tc->c_thread_index);
   tcp_timer_reset (&wrk->timer_wheel, tc, TCP_TIMER_RETRANSMIT_SYN);
   tcp_half_open_connection_free (tc);
@@ -853,8 +853,20 @@ format_tcp_half_open_session (u8 * s, va_list * args)
 {
   u32 tci = va_arg (*args, u32);
   u32 __clib_unused thread_index = va_arg (*args, u32);
-  tcp_connection_t *tc = tcp_half_open_connection_get (tci);
-  return format (s, "%U", format_tcp_connection_id, tc);
+  u32 verbose = va_arg (*args, u32);
+  tcp_connection_t *tc;
+  u8 *state = 0;
+
+  tc = tcp_half_open_connection_get (tci);
+  if (tc->flags & TCP_CONN_HALF_OPEN_DONE)
+    state = format (state, "%s", "CLOSED");
+  else
+    state = format (state, "%U", format_tcp_state, tc->state);
+  s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_tcp_connection_id, tc);
+  if (verbose)
+    s = format (s, "%-" SESSION_CLI_STATE_LEN "v", state);
+  vec_free (state);
+  return s;
 }
 
 static transport_connection_t *