session: optimize ct fifo segment allocations
[vpp.git] / src / vnet / session / application_local.c
index b5c062f..9a8fe00 100644 (file)
 #include <vnet/session/application_local.h>
 #include <vnet/session/session.h>
 
+typedef enum ct_segment_flags_
+{
+  CT_SEGMENT_F_CLIENT_DETACHED = 1 << 0,
+  CT_SEGMENT_F_SERVER_DETACHED = 1 << 1,
+} ct_segment_flags_t;
+
+typedef struct ct_segment_
+{
+  u32 segment_index;
+  u32 client_n_sessions;
+  u32 server_n_sessions;
+  ct_segment_flags_t flags;
+} ct_segment_t;
+
+typedef struct ct_segments_
+{
+  u32 sm_index;
+  u32 server_wrk;
+  u32 client_wrk;
+  ct_segment_t *segments;
+} ct_segments_ctx_t;
+
 typedef struct ct_main_
 {
   ct_connection_t **connections;       /**< Per-worker connection pools */
@@ -23,6 +45,9 @@ typedef struct ct_main_
   u32 n_sessions;                      /**< Cumulative sessions counter */
   u32 *ho_reusable;                    /**< Vector of reusable ho indices */
   clib_spinlock_t ho_reuseable_lock;   /**< Lock for reusable ho indices */
+  clib_rwlock_t app_segs_lock;         /**< RW lock for seg contexts */
+  uword *app_segs_ctxs_table;          /**< App handle to segment pool map */
+  ct_segments_ctx_t *app_seg_ctxs;     /**< Pool of ct segment contexts */
 } ct_main_t;
 
 static ct_main_t ct_main;
@@ -37,6 +62,8 @@ ct_connection_alloc (u32 thread_index)
   ct->c_thread_index = thread_index;
   ct->client_wrk = ~0;
   ct->server_wrk = ~0;
+  ct->seg_ctx_index = ~0;
+  ct->ct_seg_index = ~0;
   return ct;
 }
 
@@ -106,44 +133,140 @@ ct_session_endpoint (session_t * ll, session_endpoint_t * sep)
   ip_copy (&sep->ip, &ct->c_lcl_ip, ct->c_is_ip4);
 }
 
-int
-ct_session_connect_notify (session_t * ss)
+static void
+ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
+                         svm_fifo_t *tx_fifo)
 {
-  u32 ss_index, opaque, thread_index;
-  ct_connection_t *sct, *cct;
-  app_worker_t *client_wrk;
+  ct_segments_ctx_t *seg_ctx;
+  ct_main_t *cm = &ct_main;
+  ct_segment_flags_t flags;
   segment_manager_t *sm;
-  fifo_segment_t *seg;
-  u64 segment_handle;
-  int err = 0;
-  session_t *cs;
+  app_worker_t *app_wrk;
+  ct_segment_t *ct_seg;
+  fifo_segment_t *fs;
+  u32 seg_index;
+  u8 cnt;
 
-  ss_index = ss->session_index;
-  thread_index = ss->thread_index;
-  sct = (ct_connection_t *) session_get_transport (ss);
-  client_wrk = app_worker_get (sct->client_wrk);
-  opaque = sct->client_opaque;
+  /*
+   * Cleanup fifos
+   */
+
+  sm = segment_manager_get (rx_fifo->segment_manager);
+  seg_index = rx_fifo->segment_index;
+
+  fs = segment_manager_get_segment_w_lock (sm, seg_index);
+  fifo_segment_free_fifo (fs, rx_fifo);
+  fifo_segment_free_fifo (fs, tx_fifo);
+  segment_manager_segment_reader_unlock (sm);
+
+  /*
+   * Update segment context
+   */
+
+  clib_rwlock_reader_lock (&cm->app_segs_lock);
 
-  sm = segment_manager_get (ss->rx_fifo->segment_manager);
-  seg = segment_manager_get_segment_w_lock (sm, ss->rx_fifo->segment_index);
-  segment_handle = segment_manager_segment_handle (sm, seg);
+  seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
+  ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
 
-  if ((err = app_worker_add_segment_notify (client_wrk, segment_handle)))
+  if (ct->flags & CT_CONN_F_CLIENT)
     {
-      clib_warning ("failed to notify client %u of new segment",
-                   sct->client_wrk);
-      segment_manager_segment_reader_unlock (sm);
-      session_close (ss);
-      goto error;
+      cnt =
+       __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
+      if (!cnt)
+       ct_seg->flags |= CT_SEGMENT_F_CLIENT_DETACHED;
     }
   else
     {
-      segment_manager_segment_reader_unlock (sm);
+      cnt =
+       __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
+      if (!cnt)
+       ct_seg->flags |= CT_SEGMENT_F_SERVER_DETACHED;
     }
 
+  flags = ct_seg->flags;
+
+  clib_rwlock_reader_unlock (&cm->app_segs_lock);
+
   /*
-   * Alloc client session
+   * No need to do any app updates, return
+   */
+  if (cnt)
+    return;
+
+  if (ct->flags & CT_CONN_F_CLIENT)
+    {
+      app_wrk = app_worker_get_if_valid (ct->client_wrk);
+      /* Determine if client app still needs notification, i.e., if it is
+       * still attached. If client detached and this is the last ct session
+       * on this segment, then its connects segment manager should also be
+       * detached, so do not send notification */
+      if (app_wrk)
+       {
+         segment_manager_t *csm;
+         csm = app_worker_get_connect_segment_manager (app_wrk);
+         if (!segment_manager_app_detached (csm))
+           app_worker_del_segment_notify (app_wrk, ct->segment_handle);
+       }
+    }
+  else if (!segment_manager_app_detached (sm))
+    {
+      app_wrk = app_worker_get (ct->server_wrk);
+      app_worker_del_segment_notify (app_wrk, ct->segment_handle);
+    }
+
+  if (!(flags & CT_SEGMENT_F_CLIENT_DETACHED) ||
+      !(flags & CT_SEGMENT_F_SERVER_DETACHED))
+    return;
+
+  /*
+   * Remove segment context because both client and server detached
+   */
+
+  clib_rwlock_writer_lock (&cm->app_segs_lock);
+
+  seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
+  pool_put_index (seg_ctx->segments, ct->ct_seg_index);
+
+  /*
+   * No more segment indices left, remove the segments context
    */
+  if (!pool_elts (seg_ctx->segments))
+    {
+      u64 table_handle = seg_ctx->client_wrk << 16 | seg_ctx->server_wrk;
+      table_handle = (u64) seg_ctx->sm_index << 32 | table_handle;
+      hash_unset (cm->app_segs_ctxs_table, table_handle);
+      pool_free (seg_ctx->segments);
+      pool_put_index (cm->app_seg_ctxs, ct->seg_ctx_index);
+    }
+
+  clib_rwlock_writer_unlock (&cm->app_segs_lock);
+
+  segment_manager_lock_and_del_segment (sm, seg_index);
+
+  /* Cleanup segment manager if needed. If server detaches there's a chance
+   * the client's sessions will hold up segment removal */
+  if (segment_manager_app_detached (sm) && !segment_manager_has_fifos (sm))
+    segment_manager_free_safe (sm);
+}
+
+int
+ct_session_connect_notify (session_t *ss)
+{
+  u32 ss_index, opaque, thread_index, cnt;
+  ct_connection_t *sct, *cct;
+  ct_segments_ctx_t *seg_ctx;
+  app_worker_t *client_wrk;
+  ct_main_t *cm = &ct_main;
+  ct_segment_t *ct_seg;
+  session_t *cs;
+  int err = 0;
+
+  ss_index = ss->session_index;
+  thread_index = ss->thread_index;
+  sct = (ct_connection_t *) session_get_transport (ss);
+  client_wrk = app_worker_get (sct->client_wrk);
+  opaque = sct->client_opaque;
+
   cct = ct_connection_get (sct->peer_index, thread_index);
 
   /* Client closed while waiting for reply from server */
@@ -158,6 +281,33 @@ ct_session_connect_notify (session_t * ss)
   session_half_open_delete_notify (&cct->connection);
   cct->flags &= ~CT_CONN_F_HALF_OPEN;
 
+  /*
+   * Update ct segment context
+   */
+
+  clib_rwlock_reader_lock (&cm->app_segs_lock);
+
+  seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, sct->seg_ctx_index);
+  ct_seg = pool_elt_at_index (seg_ctx->segments, sct->ct_seg_index);
+
+  cnt = __atomic_add_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
+  if (cnt == 1)
+    {
+      err = app_worker_add_segment_notify (client_wrk, cct->segment_handle);
+      if (err)
+       {
+         clib_rwlock_reader_unlock (&cm->app_segs_lock);
+         session_close (ss);
+         goto error;
+       }
+    }
+
+  clib_rwlock_reader_unlock (&cm->app_segs_lock);
+
+  /*
+   * Alloc client session
+   */
+
   cs = session_alloc (thread_index);
   ss = session_get (ss_index, thread_index);
   cs->session_type = ss->session_type;
@@ -165,6 +315,8 @@ ct_session_connect_notify (session_t * ss)
   cs->session_state = SESSION_STATE_CONNECTING;
   cs->app_wrk_index = client_wrk->wrk_index;
   cs->connection_index = cct->c_c_index;
+  cct->seg_ctx_index = sct->seg_ctx_index;
+  cct->ct_seg_index = sct->ct_seg_index;
 
   cct->c_s_index = cs->session_index;
   cct->client_rx_fifo = ss->tx_fifo;
@@ -188,7 +340,7 @@ ct_session_connect_notify (session_t * ss)
   if (app_worker_connect_notify (client_wrk, cs, err, opaque))
     {
       session_close (ss);
-      segment_manager_dealloc_fifos (cs->rx_fifo, cs->tx_fifo);
+      ct_session_dealloc_fifos (cct, cs->rx_fifo, cs->tx_fifo);
       session_free (cs);
       return -1;
     }
@@ -203,75 +355,181 @@ error:
   return -1;
 }
 
+static ct_segment_t *
+ct_lookup_free_segment (segment_manager_t *sm, ct_segments_ctx_t *seg_ctx,
+                       u32 pair_bytes)
+{
+  uword free_bytes, max_free_bytes;
+  ct_segment_t *ct_seg, *res = 0;
+  fifo_segment_t *fs;
+  u32 max_fifos;
+
+  max_free_bytes = pair_bytes;
+  pool_foreach (ct_seg, seg_ctx->segments)
+    {
+      /* Client or server has detached so segment cannot be used */
+      if ((ct_seg->flags & CT_SEGMENT_F_SERVER_DETACHED) ||
+         (ct_seg->flags & CT_SEGMENT_F_CLIENT_DETACHED))
+       continue;
+      fs = segment_manager_get_segment (sm, ct_seg->segment_index);
+      free_bytes = fifo_segment_available_bytes (fs);
+      max_fifos = fifo_segment_size (fs) / pair_bytes;
+      if (free_bytes > max_free_bytes &&
+         fifo_segment_num_fifos (fs) / 2 < max_fifos)
+       {
+         max_free_bytes = free_bytes;
+         res = ct_seg;
+       }
+    }
+
+  return res;
+}
+
 static int
-ct_init_accepted_session (app_worker_t * server_wrk,
-                         ct_connection_t * ct, session_t * ls,
-                         session_t * ll)
+ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct,
+                         session_t *ls, session_t *ll)
 {
-  u32 round_rx_fifo_sz, round_tx_fifo_sz, sm_index, seg_size;
+  u32 sm_index, pair_bytes, seg_ctx_index = ~0, ct_seg_index = ~0;
+  u64 seg_handle, table_handle, seg_size;
   segment_manager_props_t *props;
+  const u32 margin = 16 << 10;
+  ct_segments_ctx_t *seg_ctx;
+  ct_main_t *cm = &ct_main;
   application_t *server;
   segment_manager_t *sm;
-  u32 margin = 16 << 10;
-  fifo_segment_t *seg;
-  u64 segment_handle;
-  int seg_index, rv;
+  ct_segment_t *ct_seg;
+  fifo_segment_t *fs;
+  int rv, fs_index;
+  uword *spp;
 
+  sm = app_worker_get_listen_segment_manager (server_wrk, ll);
+  sm_index = segment_manager_index (sm);
   server = application_get (server_wrk->app_index);
-
   props = application_segment_manager_properties (server);
-  round_rx_fifo_sz = 1 << max_log2 (props->rx_fifo_size);
-  round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size);
-  /* Increase size because of inefficient chunk allocations. Depending on
-   * how data is consumed, it may happen that more chunks than needed are
-   * allocated.
-   * TODO should remove once allocations are done more efficiently */
-  seg_size = 4 * (round_rx_fifo_sz + round_tx_fifo_sz + margin);
 
-  sm = app_worker_get_listen_segment_manager (server_wrk, ll);
-  seg_index = segment_manager_add_segment (sm, seg_size, 0);
-  if (seg_index < 0)
+  table_handle = ct->client_wrk << 16 | server_wrk->wrk_index;
+  table_handle = (u64) segment_manager_index (sm) << 32 | table_handle;
+
+  /*
+   * Check if we already have a segment that can hold the fifos
+   */
+
+  clib_rwlock_reader_lock (&cm->app_segs_lock);
+
+  spp = hash_get (cm->app_segs_ctxs_table, table_handle);
+  if (spp)
     {
-      clib_warning ("failed to add new cut-through segment");
-      return seg_index;
+      seg_ctx_index = *spp;
+      seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
+      pair_bytes = props->rx_fifo_size + props->tx_fifo_size + margin;
+      ct_seg = ct_lookup_free_segment (sm, seg_ctx, pair_bytes);
+      if (ct_seg)
+       {
+         ct_seg_index = ct_seg - seg_ctx->segments;
+         fs_index = ct_seg->segment_index;
+         __atomic_add_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
+       }
     }
-  seg = segment_manager_get_segment_w_lock (sm, seg_index);
 
-  rv = segment_manager_try_alloc_fifos (seg, ls->thread_index,
-                                       props->rx_fifo_size,
-                                       props->tx_fifo_size, &ls->rx_fifo,
-                                       &ls->tx_fifo);
+  clib_rwlock_reader_unlock (&cm->app_segs_lock);
+
+  /*
+   * No segment, try to alloc one and notify the server
+   */
+
+  if (ct_seg_index == ~0)
+    {
+      seg_size = clib_max (props->segment_size, 128 << 20);
+      fs_index = segment_manager_add_segment (sm, seg_size, 0);
+      if (fs_index < 0)
+       {
+         rv = -1;
+         goto failed;
+       }
+
+      /* Make sure the segment is not used for other fifos */
+      fs = segment_manager_get_segment_w_lock (sm, fs_index);
+      fifo_segment_flags (fs) |= FIFO_SEGMENT_F_CUSTOM_USE;
+      segment_manager_segment_reader_unlock (sm);
+
+      clib_rwlock_writer_lock (&cm->app_segs_lock);
+
+      if (seg_ctx_index == ~0)
+       {
+         pool_get_zero (cm->app_seg_ctxs, seg_ctx);
+         seg_ctx_index = seg_ctx - cm->app_seg_ctxs;
+         hash_set (cm->app_segs_ctxs_table, table_handle, seg_ctx_index);
+         seg_ctx->server_wrk = server_wrk->wrk_index;
+         seg_ctx->client_wrk = ct->client_wrk;
+         seg_ctx->sm_index = sm_index;
+       }
+      else
+       seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
+
+      pool_get_zero (seg_ctx->segments, ct_seg);
+      ct_seg->segment_index = fs_index;
+      ct_seg->server_n_sessions += 1;
+      ct_seg_index = ct_seg - seg_ctx->segments;
+
+      clib_rwlock_writer_unlock (&cm->app_segs_lock);
+
+      /* New segment, notify the server. Client notification sent after
+       * server accepts the connection */
+      seg_handle = segment_manager_make_segment_handle (sm_index, fs_index);
+      if ((rv = app_worker_add_segment_notify (server_wrk, seg_handle)))
+       {
+         segment_manager_lock_and_del_segment (sm, fs_index);
+
+         clib_rwlock_writer_lock (&cm->app_segs_lock);
+         pool_put_index (seg_ctx->segments, ct_seg_index);
+         clib_rwlock_writer_unlock (&cm->app_segs_lock);
+
+         goto failed_fix_count;
+       }
+    }
+
+  /*
+   * Allocate and initialize the fifos
+   */
+  fs = segment_manager_get_segment_w_lock (sm, fs_index);
+  rv = segment_manager_try_alloc_fifos (
+    fs, ls->thread_index, props->rx_fifo_size, props->tx_fifo_size,
+    &ls->rx_fifo, &ls->tx_fifo);
   if (rv)
     {
-      clib_warning ("failed to add fifos in cut-through segment");
       segment_manager_segment_reader_unlock (sm);
-      goto failed;
+      goto failed_fix_count;
     }
 
-  sm_index = segment_manager_index (sm);
   ls->rx_fifo->shr->master_session_index = ls->session_index;
   ls->tx_fifo->shr->master_session_index = ls->session_index;
   ls->rx_fifo->master_thread_index = ls->thread_index;
   ls->tx_fifo->master_thread_index = ls->thread_index;
   ls->rx_fifo->segment_manager = sm_index;
   ls->tx_fifo->segment_manager = sm_index;
-  ls->rx_fifo->segment_index = seg_index;
-  ls->tx_fifo->segment_index = seg_index;
+  ls->rx_fifo->segment_index = fs_index;
+  ls->tx_fifo->segment_index = fs_index;
 
-  segment_handle = segment_manager_segment_handle (sm, seg);
-  if ((rv = app_worker_add_segment_notify (server_wrk, segment_handle)))
-    {
-      clib_warning ("failed to notify server of new segment");
-      segment_manager_segment_reader_unlock (sm);
-      goto failed;
-    }
+  seg_handle = segment_manager_segment_handle (sm, fs);
   segment_manager_segment_reader_unlock (sm);
-  ct->segment_handle = segment_handle;
+
+  ct->segment_handle = seg_handle;
+  ct->seg_ctx_index = seg_ctx_index;
+  ct->ct_seg_index = ct_seg_index;
 
   return 0;
 
+failed_fix_count:
+
+  clib_rwlock_reader_lock (&cm->app_segs_lock);
+
+  seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
+  ct_seg = pool_elt_at_index (seg_ctx->segments, ct_seg_index);
+  __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
+
+  clib_rwlock_reader_unlock (&cm->app_segs_lock);
+
 failed:
-  segment_manager_lock_and_del_segment (sm, seg_index);
   return rv;
 }
 
@@ -366,8 +624,8 @@ ct_accept_rpc_wrk_handler (void *accept_args)
   ss->session_state = SESSION_STATE_ACCEPTING;
   if (app_worker_accept_notify (server_wrk, ss))
     {
+      ct_session_dealloc_fifos (sct, ss->rx_fifo, ss->tx_fifo);
       ct_connection_free (sct);
-      segment_manager_dealloc_fifos (ss->rx_fifo, ss->tx_fifo);
       session_free (ss);
       return;
     }
@@ -571,12 +829,26 @@ ct_session_close (u32 ct_index, u32 thread_index)
     }
 
   s = session_get (ct->c_s_index, ct->c_thread_index);
-  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
-  if (app_wrk)
-    app_worker_del_segment_notify (app_wrk, ct->segment_handle);
-  session_free_w_fifos (s);
+
   if (ct->flags & CT_CONN_F_CLIENT)
-    segment_manager_dealloc_fifos (ct->client_rx_fifo, ct->client_tx_fifo);
+    {
+      /* Normal free for client session as the fifos are allocated through
+       * the connects segment manager in a segment that's not shared with
+       * the server */
+      session_free_w_fifos (s);
+      ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo);
+    }
+  else
+    {
+      /* Manual session and fifo segment cleanup to avoid implicit
+       * segment manager cleanups and notifications */
+      app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+      if (app_wrk)
+       app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_SESSION);
+
+      ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
+      session_free (s);
+    }
 
   ct_connection_free (ct);
 }
@@ -718,7 +990,7 @@ ct_enable_disable (vlib_main_t * vm, u8 is_en)
   cm->n_workers = vlib_num_workers ();
   vec_validate (cm->connections, cm->n_workers);
   clib_spinlock_init (&cm->ho_reuseable_lock);
-
+  clib_rwlock_init (&cm->app_segs_lock);
   return 0;
 }