session: refactoring application_local.c 38/43138/20
authorSteven Luong <[email protected]>
Tue, 10 Jun 2025 21:18:19 +0000 (14:18 -0700)
committerFlorin Coras <[email protected]>
Fri, 3 Oct 2025 22:14:23 +0000 (22:14 +0000)
Moved segment management code to segment_manager.c and rename functions to sm_custom.
Specifically,

ct_session_dealloc_fifos -> sm_custom_segment_dealloc_fifos
ct_lookup_free_segment   -> sm_lookup_free_custom_segment
ct_alloc_segment         -> sm_custom_alloc_segment
ct_init_accepted_session -> sm_custom_segment_alloc_fifos

Unified segment_manager_alloc_fifos and segment_manager_dealloc_fifos with
custom segment. Only exposed segment_manager_alloc/dealloc_fifos as public APIs.

Make use of session_transport_delete_notify instead of custom cleanup.

Type: improvement

Change-Id: Ibd0eaef92e3ebb8da536c5190bcd004571a35fc1
Signed-off-by: Steven Luong <[email protected]>
src/svm/fifo_segment.h
src/svm/fifo_types.h
src/svm/svm_fifo.h
src/vnet/session/application.h
src/vnet/session/application_local.c
src/vnet/session/application_local.h
src/vnet/session/application_worker.c
src/vnet/session/segment_manager.c
src/vnet/session/segment_manager.h

index ec18420..086f145 100644 (file)
@@ -57,16 +57,6 @@ typedef enum
     MEMORY_N_PRESSURE,
 } fifo_segment_mem_status_t;
 
-#if 0
-typedef enum fifo_segment_mem_status_
-{
-  MEMORY_PRESSURE_NO_PRESSURE,
-  MEMORY_PRESSURE_LOW_PRESSURE,
-  MEMORY_PRESSURE_HIGH_PRESSURE,
-  MEMORY_PRESSURE_NO_MEMORY,
-} fifo_segment_mem_status_t;
-#endif
-
 typedef struct
 {
   ssvm_private_t ssvm;         /**< ssvm segment data */
index de642df..788a7c9 100644 (file)
@@ -87,6 +87,8 @@ typedef struct svm_fifo_shr_
   u8 subscribers[SVM_FIFO_MAX_EVT_SUBSCRIBERS];
 } svm_fifo_shared_t;
 
+struct _svm_fifo;
+
 typedef struct _svm_fifo
 {
   CLIB_CACHE_LINE_ALIGN_MARK (cacheline);
@@ -118,6 +120,15 @@ typedef struct _svm_fifo
 
   struct _svm_fifo *next; /**< prev in active chain */
   struct _svm_fifo *prev; /**< prev in active chain */
+  union
+  {
+    struct _svm_fifo *ct_fifo; /**< ct client's ptr to server fifo */
+    struct
+    {
+      u32 seg_ctx_index; /**< info to locate custom_seg_ctx */
+      u32 ct_seg_index;         /**< info to locate ct_seg within seg_ctx */
+    };
+  };
 
 #if SVM_FIFO_TRACE
   svm_fifo_trace_elem_t *trace;
index 7ea114f..37c4a5b 100644 (file)
@@ -40,6 +40,8 @@ typedef enum svm_fifo_deq_ntf_
 typedef enum svm_fifo_flag_
 {
   SVM_FIFO_F_LL_TRACKED = 1 << 0,
+  SVM_FIFO_F_SERVER_CT = 1 << 1,
+  SVM_FIFO_F_CLIENT_CT = 1 << 2,
 } svm_fifo_flag_t;
 
 typedef enum
index 2876e53..0e0daae 100644 (file)
@@ -351,6 +351,7 @@ session_error_t app_worker_start_listen (app_worker_t *app_wrk,
                                         app_listener_t *lstnr);
 int app_worker_stop_listen (app_worker_t * app_wrk, app_listener_t * al);
 int app_worker_init_accepted (session_t * s);
+int app_worker_init_accepted_ct (session_t *s);
 int app_worker_listened_notify (app_worker_t *app_wrk, session_handle_t alsh,
                                u32 opaque, session_error_t err);
 int app_worker_unlisten_reply (app_worker_t *app_wrk, session_handle_t sh,
index 1392e5d..499e52c 100644 (file)
 #include <vnet/session/application_local.h>
 #include <vnet/session/session.h>
 
-typedef enum ct_segment_flags_
-{
-  CT_SEGMENT_F_CLIENT_DETACHED = 1 << 0,
-  CT_SEGMENT_F_SERVER_DETACHED = 1 << 1,
-} ct_segment_flags_t;
-
-typedef struct ct_segment_
-{
-  u32 client_n_sessions;
-  u32 server_n_sessions;
-  u32 seg_ctx_index;
-  u32 ct_seg_index;
-  u32 segment_index;
-  ct_segment_flags_t flags;
-} ct_segment_t;
-
-typedef struct ct_segments_
-{
-  u32 sm_index;
-  u32 server_wrk;
-  u32 client_wrk;
-  u32 fifo_pair_bytes;
-  ct_segment_t *segments;
-} ct_segments_ctx_t;
-
 typedef struct ct_cleanup_req_
 {
   u32 ct_index;
@@ -64,9 +39,6 @@ typedef struct ct_main_
   u32 n_sessions;                      /**< Cumulative sessions counter */
   u32 *ho_reusable;                    /**< Vector of reusable ho indices */
   clib_spinlock_t ho_reuseable_lock;   /**< Lock for reusable ho indices */
-  clib_rwlock_t app_segs_lock;         /**< RW lock for seg contexts */
-  uword *app_segs_ctxs_table;          /**< App handle to segment pool map */
-  ct_segments_ctx_t *app_seg_ctxs;     /**< Pool of ct segment contexts */
   u32 **fwrk_pending_connects;         /**< First wrk pending half-opens */
   u32 fwrk_thread;                     /**< First worker thread */
   u8 fwrk_have_flush;                  /**< Flag for connect flush rpc */
@@ -99,7 +71,7 @@ ct_connection_alloc (clib_thread_index_t thread_index)
   return ct;
 }
 
-static ct_connection_t *
+ct_connection_t *
 ct_connection_get (u32 ct_index, clib_thread_index_t thread_index)
 {
   ct_worker_t *wrk = ct_worker_get (thread_index);
@@ -177,170 +149,6 @@ ct_session_endpoint (session_t * ll, session_endpoint_t * sep)
   ip_copy (&sep->ip, &ct->c_lcl_ip, ct->c_is_ip4);
 }
 
-static void
-ct_set_invalid_app_wrk (ct_connection_t *ct, u8 is_client)
-{
-  ct_connection_t *peer_ct;
-
-  peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
-
-  if (is_client)
-    {
-      ct->client_wrk = APP_INVALID_INDEX;
-      if (peer_ct)
-       ct->client_wrk = APP_INVALID_INDEX;
-    }
-  else
-    {
-      ct->server_wrk = APP_INVALID_INDEX;
-      if (peer_ct)
-       ct->server_wrk = APP_INVALID_INDEX;
-    }
-}
-
-static inline u64
-ct_client_seg_handle (u64 server_sh, u32 client_wrk_index)
-{
-  return (((u64) client_wrk_index << 56) | server_sh);
-}
-
-static void
-ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
-                         svm_fifo_t *tx_fifo)
-{
-  ct_segments_ctx_t *seg_ctx;
-  ct_main_t *cm = &ct_main;
-  segment_manager_t *sm;
-  app_worker_t *app_wrk;
-  ct_segment_t *ct_seg;
-  fifo_segment_t *fs;
-  u32 seg_index;
-  session_t *s;
-  int cnt;
-
-  /*
-   * Cleanup fifos
-   */
-
-  sm = segment_manager_get (rx_fifo->segment_manager);
-  seg_index = rx_fifo->segment_index;
-
-  fs = segment_manager_get_segment_w_lock (sm, seg_index);
-  fifo_segment_free_fifo (fs, rx_fifo);
-  fifo_segment_free_fifo (fs, tx_fifo);
-  segment_manager_segment_reader_unlock (sm);
-
-  /*
-   * Atomically update segment context with readers lock
-   */
-
-  clib_rwlock_reader_lock (&cm->app_segs_lock);
-
-  seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
-  ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
-
-  if (ct->flags & CT_CONN_F_CLIENT)
-    {
-      cnt =
-       __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
-    }
-  else
-    {
-      cnt =
-       __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
-    }
-
-  clib_rwlock_reader_unlock (&cm->app_segs_lock);
-
-  /*
-   * No need to do any app updates, return
-   */
-  ASSERT (cnt >= 0);
-  if (cnt)
-    return;
-
-  /*
-   * Grab exclusive lock and update flags unless some other thread
-   * added more sessions
-   */
-  clib_rwlock_writer_lock (&cm->app_segs_lock);
-
-  seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
-  ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
-  if (ct->flags & CT_CONN_F_CLIENT)
-    {
-      cnt = ct_seg->client_n_sessions;
-      if (cnt)
-       goto done;
-      ct_seg->flags |= CT_SEGMENT_F_CLIENT_DETACHED;
-      s = session_get (ct->c_s_index, ct->c_thread_index);
-      if (s->app_wrk_index == APP_INVALID_INDEX)
-       ct_set_invalid_app_wrk (ct, 1 /* is_client */);
-    }
-  else
-    {
-      cnt = ct_seg->server_n_sessions;
-      if (cnt)
-       goto done;
-      ct_seg->flags |= CT_SEGMENT_F_SERVER_DETACHED;
-      s = session_get (ct->c_s_index, ct->c_thread_index);
-      if (s->app_wrk_index == APP_INVALID_INDEX)
-       ct_set_invalid_app_wrk (ct, 0 /* is_client */);
-    }
-
-  if (!(ct_seg->flags & CT_SEGMENT_F_CLIENT_DETACHED) ||
-      !(ct_seg->flags & CT_SEGMENT_F_SERVER_DETACHED))
-    goto done;
-
-  /*
-   * Remove segment context because both client and server detached
-   */
-
-  pool_put_index (seg_ctx->segments, ct->ct_seg_index);
-
-  /*
-   * No more segment indices left, remove the segments context
-   */
-  if (!pool_elts (seg_ctx->segments))
-    {
-      u64 table_handle = seg_ctx->client_wrk << 16 | seg_ctx->server_wrk;
-      table_handle = (u64) seg_ctx->sm_index << 32 | table_handle;
-      hash_unset (cm->app_segs_ctxs_table, table_handle);
-      pool_free (seg_ctx->segments);
-      pool_put_index (cm->app_seg_ctxs, ct->seg_ctx_index);
-    }
-
-  /*
-   * Segment to be removed so notify both apps
-   */
-
-  app_wrk = app_worker_get_if_valid (ct->client_wrk);
-  /* Determine if client app still needs notification, i.e., if it is
-   * still attached. If client detached and this is the last ct session
-   * on this segment, then its connects segment manager should also be
-   * detached, so do not send notification */
-  if (app_wrk)
-    {
-      segment_manager_t *csm;
-      csm = app_worker_get_connect_segment_manager (app_wrk);
-      if (!segment_manager_app_detached (csm))
-       app_worker_del_segment_notify (
-         app_wrk, ct_client_seg_handle (ct->segment_handle, ct->client_wrk));
-    }
-
-  /* Notify server app and free segment */
-  segment_manager_lock_and_del_segment (sm, seg_index);
-
-  /* Cleanup segment manager if needed. If server detaches there's a chance
-   * the client's sessions will hold up segment removal */
-  if (segment_manager_app_detached (sm) && !segment_manager_has_fifos (sm))
-    segment_manager_free_safe (sm);
-
-done:
-
-  clib_rwlock_writer_unlock (&cm->app_segs_lock);
-}
-
 static void
 ct_session_force_disconnect_server (ct_connection_t *sct)
 {
@@ -404,6 +212,11 @@ ct_session_connect_notify (session_t *ss, session_error_t err)
       goto connect_error;
     }
 
+  cs->rx_fifo->ct_fifo = cct->client_tx_fifo;
+  cs->tx_fifo->ct_fifo = cct->client_rx_fifo;
+  cs->tx_fifo->flags |= SVM_FIFO_F_CLIENT_CT;
+  cs->rx_fifo->flags |= SVM_FIFO_F_CLIENT_CT;
+
   session_set_state (cs, SESSION_STATE_CONNECTING);
 
   if (app_worker_connect_notify (client_wrk, cs, 0, opaque))
@@ -426,237 +239,10 @@ connect_error:
 cleanup_client:
 
   if (cct->client_rx_fifo)
-    ct_session_dealloc_fifos (cct, cct->client_rx_fifo, cct->client_tx_fifo);
+    segment_manager_dealloc_fifos (cct->client_rx_fifo, cct->client_tx_fifo);
   ct_connection_free (cct);
-  return -1;
-}
-
-static inline ct_segment_t *
-ct_lookup_free_segment (ct_main_t *cm, segment_manager_t *sm,
-                       u32 seg_ctx_index)
-{
-  uword free_bytes, max_free_bytes;
-  ct_segment_t *ct_seg, *res = 0;
-  ct_segments_ctx_t *seg_ctx;
-  fifo_segment_t *fs;
-  u32 max_fifos;
-
-  seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
-  max_free_bytes = seg_ctx->fifo_pair_bytes;
-
-  pool_foreach (ct_seg, seg_ctx->segments)
-    {
-      /* Client or server has detached so segment cannot be used */
-      fs = segment_manager_get_segment (sm, ct_seg->segment_index);
-      free_bytes = fifo_segment_available_bytes (fs);
-      max_fifos = fifo_segment_size (fs) / seg_ctx->fifo_pair_bytes;
-      if (free_bytes > max_free_bytes &&
-         fifo_segment_num_fifos (fs) / 2 < max_fifos)
-       {
-         max_free_bytes = free_bytes;
-         res = ct_seg;
-       }
-    }
 
-  return res;
-}
-
-static ct_segment_t *
-ct_alloc_segment (ct_main_t *cm, app_worker_t *server_wrk, u64 table_handle,
-                 segment_manager_t *sm, u32 client_wrk_index)
-{
-  u32 seg_ctx_index = ~0, sm_index, pair_bytes;
-  u64 seg_size, seg_handle, client_seg_handle;
-  segment_manager_props_t *props;
-  const u32 margin = 16 << 10;
-  ct_segments_ctx_t *seg_ctx;
-  app_worker_t *client_wrk;
-  application_t *server;
-  ct_segment_t *ct_seg;
-  uword *spp;
-  int fs_index;
-
-  server = application_get (server_wrk->app_index);
-  props = application_segment_manager_properties (server);
-  sm_index = segment_manager_index (sm);
-  pair_bytes = props->rx_fifo_size + props->tx_fifo_size + margin;
-
-  /*
-   * Make sure another thread did not alloc a segment while acquiring the lock
-   */
-
-  spp = hash_get (cm->app_segs_ctxs_table, table_handle);
-  if (spp)
-    {
-      seg_ctx_index = *spp;
-      ct_seg = ct_lookup_free_segment (cm, sm, seg_ctx_index);
-      if (ct_seg)
-       return ct_seg;
-    }
-
-  /*
-   * No segment, try to alloc one and notify the server and the client.
-   * Make sure the segment is not used for other fifos
-   */
-  seg_size = clib_max (props->segment_size, 128 << 20);
-  fs_index =
-    segment_manager_add_segment2 (sm, seg_size, FIFO_SEGMENT_F_CUSTOM_USE);
-  if (fs_index < 0)
-    return 0;
-
-  if (seg_ctx_index == ~0)
-    {
-      pool_get_zero (cm->app_seg_ctxs, seg_ctx);
-      seg_ctx_index = seg_ctx - cm->app_seg_ctxs;
-      hash_set (cm->app_segs_ctxs_table, table_handle, seg_ctx_index);
-      seg_ctx->server_wrk = server_wrk->wrk_index;
-      seg_ctx->client_wrk = client_wrk_index;
-      seg_ctx->sm_index = sm_index;
-      seg_ctx->fifo_pair_bytes = pair_bytes;
-    }
-  else
-    {
-      seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
-    }
-
-  pool_get_zero (seg_ctx->segments, ct_seg);
-  ct_seg->segment_index = fs_index;
-  ct_seg->server_n_sessions = 0;
-  ct_seg->client_n_sessions = 0;
-  ct_seg->ct_seg_index = ct_seg - seg_ctx->segments;
-  ct_seg->seg_ctx_index = seg_ctx_index;
-
-  /* New segment, notify the server and client */
-  seg_handle = segment_manager_make_segment_handle (sm_index, fs_index);
-  if (app_worker_add_segment_notify (server_wrk, seg_handle))
-    goto error;
-
-  client_wrk = app_worker_get (client_wrk_index);
-  /* Make sure client workers do not have overlapping segment handles.
-   * Ideally, we should attach fs to client worker segment manager and
-   * create a new handle but that's not currently possible. */
-  client_seg_handle = ct_client_seg_handle (seg_handle, client_wrk_index);
-  if (app_worker_add_segment_notify (client_wrk, client_seg_handle))
-    {
-      app_worker_del_segment_notify (server_wrk, seg_handle);
-      goto error;
-    }
-
-  return ct_seg;
-
-error:
-
-  segment_manager_lock_and_del_segment (sm, fs_index);
-  pool_put_index (seg_ctx->segments, ct_seg->seg_ctx_index);
-  return 0;
-}
-
-static int
-ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct,
-                         session_t *ls, session_t *ll)
-{
-  segment_manager_props_t *props;
-  u64 seg_handle, table_handle;
-  u32 sm_index, fs_index = ~0;
-  ct_segments_ctx_t *seg_ctx;
-  ct_main_t *cm = &ct_main;
-  application_t *server;
-  segment_manager_t *sm;
-  ct_segment_t *ct_seg;
-  fifo_segment_t *fs;
-  uword *spp;
-  int rv;
-
-  sm = app_worker_get_listen_segment_manager (server_wrk, ll);
-  sm_index = segment_manager_index (sm);
-  server = application_get (server_wrk->app_index);
-  props = application_segment_manager_properties (server);
-
-  table_handle = ct->client_wrk << 16 | server_wrk->wrk_index;
-  table_handle = (u64) sm_index << 32 | table_handle;
-
-  /*
-   * Check if we already have a segment that can hold the fifos
-   */
-
-  clib_rwlock_reader_lock (&cm->app_segs_lock);
-
-  spp = hash_get (cm->app_segs_ctxs_table, table_handle);
-  if (spp)
-    {
-      ct_seg = ct_lookup_free_segment (cm, sm, *spp);
-      if (ct_seg)
-       {
-         ct->seg_ctx_index = ct_seg->seg_ctx_index;
-         ct->ct_seg_index = ct_seg->ct_seg_index;
-         fs_index = ct_seg->segment_index;
-         ct_seg->flags &=
-           ~(CT_SEGMENT_F_SERVER_DETACHED | CT_SEGMENT_F_CLIENT_DETACHED);
-         __atomic_add_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
-         __atomic_add_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
-       }
-    }
-
-  clib_rwlock_reader_unlock (&cm->app_segs_lock);
-
-  /*
-   * If not, grab exclusive lock and allocate segment
-   */
-  if (fs_index == ~0)
-    {
-      clib_rwlock_writer_lock (&cm->app_segs_lock);
-
-      ct_seg =
-       ct_alloc_segment (cm, server_wrk, table_handle, sm, ct->client_wrk);
-      if (!ct_seg)
-       {
-         clib_rwlock_writer_unlock (&cm->app_segs_lock);
-         return -1;
-       }
-
-      ct->seg_ctx_index = ct_seg->seg_ctx_index;
-      ct->ct_seg_index = ct_seg->ct_seg_index;
-      ct_seg->server_n_sessions += 1;
-      ct_seg->client_n_sessions += 1;
-      fs_index = ct_seg->segment_index;
-
-      clib_rwlock_writer_unlock (&cm->app_segs_lock);
-    }
-
-  /*
-   * Allocate and initialize the fifos
-   */
-  fs = segment_manager_get_segment_w_lock (sm, fs_index);
-  rv = segment_manager_try_alloc_fifos (
-    fs, ls->thread_index, props->rx_fifo_size, props->tx_fifo_size,
-    &ls->rx_fifo, &ls->tx_fifo);
-  if (rv)
-    {
-      segment_manager_segment_reader_unlock (sm);
-
-      clib_rwlock_reader_lock (&cm->app_segs_lock);
-
-      seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
-      ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
-      __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
-      __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
-
-      clib_rwlock_reader_unlock (&cm->app_segs_lock);
-
-      return rv;
-    }
-
-  ls->rx_fifo->shr->master_session_index = ls->session_index;
-  ls->tx_fifo->shr->master_session_index = ls->session_index;
-  ls->rx_fifo->vpp_sh = ls->handle;
-  ls->tx_fifo->vpp_sh = ls->handle;
-
-  seg_handle = segment_manager_segment_handle (sm, fs);
-  segment_manager_segment_reader_unlock (sm);
-
-  ct->segment_handle = seg_handle;
-
-  return 0;
+  return -1;
 }
 
 static void
@@ -731,14 +317,9 @@ ct_accept_one (clib_thread_index_t thread_index, u32 ho_index)
   ss->connection_index = sct->c_c_index;
   ss->listener_handle = listen_session_get_handle (ll);
   session_set_state (ss, SESSION_STATE_CREATED);
-
-  server_wrk = application_listener_select_worker (ll);
-  ss->app_wrk_index = server_wrk->wrk_index;
-
   sct->c_s_index = ss->session_index;
-  sct->server_wrk = ss->app_wrk_index;
 
-  if (ct_init_accepted_session (server_wrk, sct, ss, ll))
+  if (app_worker_init_accepted_ct (ss))
     {
       ct_session_connect_notify (ss, SESSION_E_ALLOC);
       ct_connection_free (sct);
@@ -746,21 +327,13 @@ ct_accept_one (clib_thread_index_t thread_index, u32 ho_index)
       return;
     }
 
-  cct->server_wrk = sct->server_wrk;
-  cct->seg_ctx_index = sct->seg_ctx_index;
-  cct->ct_seg_index = sct->ct_seg_index;
-  cct->client_rx_fifo = ss->tx_fifo;
-  cct->client_tx_fifo = ss->rx_fifo;
-  cct->client_rx_fifo->refcnt++;
-  cct->client_tx_fifo->refcnt++;
-  cct->segment_handle =
-    ct_client_seg_handle (sct->segment_handle, cct->client_wrk);
-
   session_set_state (ss, SESSION_STATE_ACCEPTING);
+  server_wrk = app_worker_get (ss->app_wrk_index);
+
   if (app_worker_accept_notify (server_wrk, ss))
     {
       ct_session_connect_notify (ss, SESSION_E_REFUSED);
-      ct_session_dealloc_fifos (sct, ss->rx_fifo, ss->tx_fifo);
+      segment_manager_dealloc_fifos (ss->rx_fifo, ss->tx_fifo);
       ct_connection_free (sct);
       session_free (ss);
     }
@@ -1050,26 +623,13 @@ ct_close_is_reset (ct_connection_t *ct, session_t *s)
     return (svm_fifo_max_dequeue (s->rx_fifo) > 0);
 }
 
-static void
-ct_session_cleanup_server_session (session_t *s)
-{
-  ct_connection_t *ct;
-
-  ct = (ct_connection_t *) session_get_transport (s);
-  ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
-  session_free (s);
-  ct_connection_free (ct);
-}
-
 static void
 ct_session_postponed_cleanup (ct_connection_t *ct)
 {
   ct_connection_t *peer_ct;
-  app_worker_t *app_wrk;
   session_t *s;
 
   s = session_get (ct->c_s_index, ct->c_thread_index);
-  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
 
   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
   if (peer_ct)
@@ -1079,40 +639,13 @@ ct_session_postponed_cleanup (ct_connection_t *ct)
       else
        session_transport_closing_notify (&peer_ct->connection);
     }
-  session_transport_closed_notify (&ct->connection);
-
-  /* It would be cleaner to call session_transport_delete_notify
-   * but then we can't control session cleanup lower */
-  session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
-  if (app_wrk)
-    app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
-
-  if (ct->flags & CT_CONN_F_CLIENT)
-    {
-      /* Normal free for client session as the fifos are allocated through
-       * the connects segment manager in a segment that's not shared with
-       * the server */
-      ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo);
-      session_program_cleanup (s);
-      ct_connection_free (ct);
-    }
-  else
-    {
-      /* Manual session and fifo segment cleanup to avoid implicit
-       * segment manager cleanups and notifications */
-      if (app_wrk)
-       {
-         /* Remove custom cleanup notify infra when/if switching to normal
-          * session cleanup. Note that ct is freed in the cb function */
-         app_worker_cleanup_notify_custom (app_wrk, s,
-                                           SESSION_CLEANUP_SESSION,
-                                           ct_session_cleanup_server_session);
-       }
-      else
-       {
-         ct_session_cleanup_server_session (s);
-       }
-    }
+  /*
+   * Comment out session_transport_closed_notify. It caused an error spit out
+   * in session delete due to session_lookup_del_session returns an error.
+   * session_transport_closed_notify (&ct->connection);
+   */
+  session_transport_delete_notify (&ct->connection);
+  ct_connection_free (ct);
 }
 
 static void
@@ -1367,7 +900,6 @@ ct_enable_disable (vlib_main_t * vm, u8 is_en)
   vec_foreach (wrk, cm->wrk)
     clib_spinlock_init (&wrk->pending_connects_lock);
   clib_spinlock_init (&cm->ho_reuseable_lock);
-  clib_rwlock_init (&cm->app_segs_lock);
   cm->is_init = 1;
 
   return 0;
index fd2804c..ff7a258 100644 (file)
@@ -59,6 +59,8 @@ session_t *ct_session_get_peer (session_t * s);
 void ct_session_endpoint (session_t * ll, session_endpoint_t * sep);
 int ct_session_connect_notify (session_t *ls, session_error_t err);
 int ct_session_tx (session_t * s);
+ct_connection_t *ct_connection_get (u32 ct_index,
+                                   clib_thread_index_t thread_index);
 
 #endif /* SRC_VNET_SESSION_APPLICATION_LOCAL_H_ */
 
index 8b389be..21850fe 100644 (file)
@@ -415,6 +415,55 @@ app_worker_init_accepted (session_t * s)
   return 0;
 }
 
+static int
+app_worker_alloc_session_fifos_ct (segment_manager_t *sm, session_t *s)
+{
+  svm_fifo_t *rx_fifo = 0, *tx_fifo = 0;
+  int rv;
+
+  if ((rv = segment_manager_alloc_session_fifos_ct (s, sm, s->thread_index,
+                                                   &rx_fifo, &tx_fifo)))
+    return rv;
+
+  rx_fifo->shr->master_session_index = s->session_index;
+  rx_fifo->vpp_sh = s->handle;
+
+  tx_fifo->shr->master_session_index = s->session_index;
+  tx_fifo->vpp_sh = s->handle;
+
+  s->rx_fifo = rx_fifo;
+  s->tx_fifo = tx_fifo;
+  return 0;
+}
+
+int
+app_worker_init_accepted_ct (session_t *s)
+{
+  app_worker_t *app_wrk;
+  segment_manager_t *sm;
+  session_t *listener;
+  application_t *app;
+
+  listener = listen_session_get_from_handle (s->listener_handle);
+  app_wrk = application_listener_select_worker (listener);
+  if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk)))
+    return -1;
+
+  s->app_wrk_index = app_wrk->wrk_index;
+  app = application_get (app_wrk->app_index);
+  if (app->cb_fns.fifo_tuning_callback)
+    s->flags |= SESSION_F_CUSTOM_FIFO_TUNING;
+
+  sm = app_worker_get_listen_segment_manager (app_wrk, listener);
+  if (app_worker_alloc_session_fifos_ct (sm, s))
+    return -1;
+
+  if (application_is_builtin_proxy (app))
+    return app->cb_fns.proxy_write_early_data (s);
+
+  return 0;
+}
+
 int
 app_worker_listened_notify (app_worker_t *app_wrk, session_handle_t alsh,
                            u32 opaque, session_error_t err)
index b18bfce..fd32043 100644 (file)
 #include <vnet/session/session.h>
 #include <vnet/session/application.h>
 #include <sys/mman.h>
+#include <vnet/session/application_local.h>
+
+VLIB_REGISTER_LOG_CLASS (segment_manager_log,
+                        static) = { .class_name = "segment-manager",
+                                    .subclass_name = "error" };
+
+#define log_debug(fmt, ...)                                                   \
+  vlib_log_debug (segment_manager_log.class, "%s: " fmt, __func__, __VA_ARGS__)
+#define log_warn(fmt, ...)                                                    \
+  vlib_log_warn (segment_manager_log.class, fmt, __VA_ARGS__)
+#define log_err(fmt, ...)                                                     \
+  vlib_log_err (segment_manager_log.class, fmt, __VA_ARGS__)
+
+typedef enum custom_segment_flags_
+{
+  CUSTOM_SEGMENT_F_CLIENT_DETACHED = 1 << 0,
+  CUSTOM_SEGMENT_F_SERVER_DETACHED = 1 << 1,
+} custom_segment_flags_t;
+
+typedef struct custom_segment_
+{
+  u32 client_n_sessions;
+  u32 server_n_sessions;
+  u32 seg_ctx_index;
+  u32 custom_seg_index;
+  u32 segment_index;
+  custom_segment_flags_t flags;
+} custom_segment_t;
+
+typedef struct custom_segments_
+{
+  u32 sm_index;
+  u32 server_wrk;
+  u32 client_wrk;
+  u32 fifo_pair_bytes;
+  custom_segment_t *segments;
+} custom_segments_ctx_t;
 
 typedef struct segment_manager_main_
 {
@@ -33,9 +70,15 @@ typedef struct segment_manager_main_
   u8 default_high_watermark;   /**< default high watermark % */
   u8 default_low_watermark;    /**< default low watermark % */
   u8 no_dump_segments;         /**< don't dump segs in core files */
+
+  /* custom segment stuff */
+  clib_rwlock_t custom_segs_lock; /**< RW lock for seg contexts */
+  uword *custom_segs_ctxs_table;  /**< Handle to segment pool map */
+  custom_segments_ctx_t
+    *custom_seg_ctxs; /**< Pool of custom segment contexts */
 } segment_manager_main_t;
 
-static segment_manager_main_t sm_main;
+static segment_manager_main_t sm_main, *smm = &sm_main;
 
 #define segment_manager_foreach_segment_w_lock(VAR, SM, BODY)          \
 do {                                                                   \
@@ -44,6 +87,10 @@ do {                                                                 \
     clib_rwlock_reader_unlock (&(SM)->segments_rwlock);                        \
 } while (0)
 
+static void segment_manager_dealloc_fifos_ct (svm_fifo_t *rx_fifo,
+                                             svm_fifo_t *tx_fifo,
+                                             u32 is_client);
+
 static segment_manager_props_t *
 segment_manager_properties_get (segment_manager_t * sm)
 {
@@ -93,7 +140,6 @@ static inline int
 segment_manager_add_segment_inline (segment_manager_t *sm, uword segment_size,
                                    u8 notify_app, u8 flags, u8 need_lock)
 {
-  segment_manager_main_t *smm = &sm_main;
   segment_manager_props_t *props;
   app_worker_t *app_wrk;
   fifo_segment_t *fs;
@@ -229,8 +275,8 @@ segment_manager_add_segment2 (segment_manager_t *sm, uword segment_size,
 /**
  * Remove segment without lock
  */
-void
-segment_manager_del_segment (segment_manager_t * sm, fifo_segment_t * fs)
+static void
+segment_manager_del_segment (segment_manager_t *sm, fifo_segment_t *fs)
 {
   if (ssvm_type (&fs->ssvm) != SSVM_SEGMENT_PRIVATE)
     {
@@ -290,8 +336,8 @@ done:
   clib_rwlock_writer_unlock (&sm->segments_rwlock);
 }
 
-void
-segment_manager_lock_and_del_segment (segment_manager_t * sm, u32 fs_index)
+static void
+segment_manager_lock_and_del_segment (segment_manager_t *sm, u32 fs_index)
 {
   sm_lock_and_del_segment_inline (sm, fs_index, 0 /* check_if_empty */);
 }
@@ -363,7 +409,6 @@ segment_manager_segment_reader_unlock (segment_manager_t * sm)
 segment_manager_t *
 segment_manager_alloc (void)
 {
-  segment_manager_main_t *smm = &sm_main;
   segment_manager_t *sm;
 
   pool_get_zero (smm->segment_managers, sm);
@@ -498,7 +543,6 @@ segment_manager_cleanup_detached_listener (segment_manager_t * sm)
 void
 segment_manager_free (segment_manager_t * sm)
 {
-  segment_manager_main_t *smm = &sm_main;
   fifo_segment_t *fifo_segment;
 
   ASSERT (vlib_get_thread_index () == 0
@@ -716,7 +760,7 @@ segment_manager_del_sessions_filter (segment_manager_t *sm,
   vec_free (handles);
 }
 
-int
+static int
 segment_manager_try_alloc_fifos (fifo_segment_t *fs,
                                 clib_thread_index_t thread_index,
                                 u32 rx_fifo_size, u32 tx_fifo_size,
@@ -865,7 +909,7 @@ segment_manager_alloc_session_fifos (segment_manager_t *sm,
 }
 
 void
-segment_manager_dealloc_fifos (svm_fifo_t * rx_fifo, svm_fifo_t * tx_fifo)
+segment_manager_dealloc_fifos (svm_fifo_t *rx_fifo, svm_fifo_t *tx_fifo)
 {
   segment_manager_t *sm;
   fifo_segment_t *fs;
@@ -879,6 +923,15 @@ segment_manager_dealloc_fifos (svm_fifo_t * rx_fifo, svm_fifo_t * tx_fifo)
   ASSERT (rx_fifo->master_thread_index == vlib_get_thread_index () ||
          rx_fifo->refcnt > 1 || vlib_thread_is_main_w_barrier ());
 
+  if (rx_fifo->flags & (SVM_FIFO_F_SERVER_CT | SVM_FIFO_F_CLIENT_CT))
+    {
+      if (rx_fifo->flags & SVM_FIFO_F_SERVER_CT)
+       return segment_manager_dealloc_fifos_ct (rx_fifo, tx_fifo, 0);
+      else
+       segment_manager_dealloc_fifos_ct (rx_fifo->ct_fifo, tx_fifo->ct_fifo,
+                                         1);
+    }
+
   /* It's possible to have no segment manager if the session was removed
    * as result of a detach. */
   if (!(sm = segment_manager_get_if_valid (rx_fifo->segment_manager)))
@@ -886,6 +939,7 @@ segment_manager_dealloc_fifos (svm_fifo_t * rx_fifo, svm_fifo_t * tx_fifo)
 
   segment_index = rx_fifo->segment_index;
   fs = segment_manager_get_segment_w_lock (sm, segment_index);
+
   fifo_segment_free_fifo (fs, rx_fifo);
   fifo_segment_free_fifo (fs, tx_fifo);
 
@@ -1033,6 +1087,7 @@ segment_manager_main_init (u8 no_dump_segments)
   sm->default_high_watermark = 80;
   sm->default_low_watermark = 50;
   sm->no_dump_segments = no_dump_segments;
+  clib_rwlock_init (&sm->custom_segs_lock);
 }
 
 static u8 *
@@ -1111,7 +1166,6 @@ segment_manager_show_fn (vlib_main_t * vm, unformat_input_t * input,
                         vlib_cli_command_t * cmd)
 {
   unformat_input_t _line_input, *line_input = &_line_input;
-  segment_manager_main_t *smm = &sm_main;
   u8 show_segments = 0, verbose = 0;
   segment_manager_t *sm;
   u32 sm_index = ~0;
@@ -1204,37 +1258,39 @@ segment_manager_format_sessions (segment_manager_t * sm, int verbose)
 
   clib_rwlock_reader_lock (&sm->segments_rwlock);
 
-  pool_foreach (fs, sm->segments)  {
-    for (slice_index = 0; slice_index < fs->n_slices; slice_index++)
-      {
-        f = fifo_segment_get_slice_fifo_list (fs, slice_index);
-        while (f)
-          {
-            u32 session_index, thread_index;
-            session_t *session;
+  pool_foreach (fs, sm->segments)
+    {
+      for (slice_index = 0; slice_index < fs->n_slices; slice_index++)
+       {
+         f = fifo_segment_get_slice_fifo_list (fs, slice_index);
+         while (f)
+           {
+             u32 session_index, thread_index;
+             session_t *session;
 
-           session_index = f->vpp_session_index;
-           thread_index = f->master_thread_index;
+             session_index = f->vpp_session_index;
+             thread_index = f->master_thread_index;
 
-           session = session_get (session_index, thread_index);
-           str = format (0, "%U", format_session, session, verbose);
+             session = session_get (session_index, thread_index);
+             str = format (0, "%U", format_session, session, verbose);
 
-           if (verbose)
-             s = format (s, "%-" SESSION_CLI_ID_LEN "v%-20v%-15u%-10u", str,
-                         app_name, app_wrk->api_client_index,
-                         app_wrk->connects_seg_manager);
-           else
-             s = format (s, "%-" SESSION_CLI_ID_LEN "v%-20v", str, app_name);
+             if (verbose)
+               s = format (s, "%-" SESSION_CLI_ID_LEN "v%-20v%-15u%-10u", str,
+                           app_name, app_wrk->api_client_index,
+                           app_wrk->connects_seg_manager);
+             else
+               s =
+                 format (s, "%-" SESSION_CLI_ID_LEN "v%-20v", str, app_name);
 
-           vlib_cli_output (vm, "%v", s);
-           vec_reset_length (s);
-           vec_free (str);
+             vlib_cli_output (vm, "%v", s);
+             vec_reset_length (s);
+             vec_free (str);
 
-           f = f->next;
-         }
-       vec_free (s);
-      }
-  }
+             f = f->next;
+           }
+         vec_free (s);
+       }
+    }
 
   clib_rwlock_reader_unlock (&sm->segments_rwlock);
 }
@@ -1250,6 +1306,403 @@ segment_manager_set_watermarks (segment_manager_t * sm,
   sm->low_watermark = low_watermark;
 }
 
+/* custom segment stuff */
+
+static inline u64
+ct_client_seg_handle (u64 server_sh, u32 client_wrk_index)
+{
+  return (((u64) client_wrk_index << 56) | server_sh);
+}
+
+static void
+segment_manager_dealloc_fifos_ct (svm_fifo_t *rx_fifo, svm_fifo_t *tx_fifo,
+                                 u32 is_client)
+{
+  custom_segments_ctx_t *seg_ctx;
+  segment_manager_t *sm;
+  app_worker_t *app_wrk;
+  custom_segment_t *ct_seg;
+  fifo_segment_t *fs;
+  u32 seg_index;
+  int cnt;
+  u32 seg_ctx_index = rx_fifo->seg_ctx_index;
+  u32 ct_seg_index = rx_fifo->ct_seg_index;
+
+  /*
+   * Cleanup fifos
+   */
+
+  if (!(sm = segment_manager_get_if_valid (rx_fifo->segment_manager)))
+    return;
+  seg_index = rx_fifo->segment_index;
+
+  fs = segment_manager_get_segment_w_lock (sm, seg_index);
+  ASSERT ((fifo_segment_flags (fs) & FIFO_SEGMENT_F_CUSTOM_USE) ==
+         FIFO_SEGMENT_F_CUSTOM_USE);
+  fifo_segment_free_fifo (fs, rx_fifo);
+  fifo_segment_free_fifo (fs, tx_fifo);
+  segment_manager_segment_reader_unlock (sm);
+
+  /*
+   * Atomically update segment context with readers lock
+   */
+
+  clib_rwlock_reader_lock (&smm->custom_segs_lock);
+
+  seg_ctx = pool_elt_at_index (smm->custom_seg_ctxs, seg_ctx_index);
+  ct_seg = pool_elt_at_index (seg_ctx->segments, ct_seg_index);
+
+  if (is_client)
+    {
+      cnt =
+       __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
+    }
+  else
+    {
+      cnt =
+       __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
+    }
+
+  clib_rwlock_reader_unlock (&smm->custom_segs_lock);
+
+  /*
+   * No need to do any app updates, return
+   */
+  ASSERT (cnt >= 0);
+  if (cnt)
+    return;
+
+  /*
+   * Grab exclusive lock and update flags unless some other thread
+   * added more sessions
+   */
+  clib_rwlock_writer_lock (&smm->custom_segs_lock);
+
+  if (is_client)
+    {
+      cnt = ct_seg->client_n_sessions;
+      if (cnt)
+       goto done;
+      ct_seg->flags |= CUSTOM_SEGMENT_F_CLIENT_DETACHED;
+    }
+  else
+    {
+      cnt = ct_seg->server_n_sessions;
+      if (cnt)
+       goto done;
+      ct_seg->flags |= CUSTOM_SEGMENT_F_SERVER_DETACHED;
+    }
+
+  if (!(ct_seg->flags & CUSTOM_SEGMENT_F_CLIENT_DETACHED) ||
+      !(ct_seg->flags & CUSTOM_SEGMENT_F_SERVER_DETACHED))
+    goto done;
+
+  /*
+   * Remove segment context because both client and server detached
+   */
+
+  pool_put_index (seg_ctx->segments, ct_seg_index);
+
+  /*
+   * No more segment indices left, remove the segments context
+   */
+  if (!pool_elts (seg_ctx->segments))
+    {
+      u64 table_handle = seg_ctx->client_wrk << 16 | seg_ctx->server_wrk;
+      table_handle = (u64) seg_ctx->sm_index << 32 | table_handle;
+      hash_unset (smm->custom_segs_ctxs_table, table_handle);
+      pool_free (seg_ctx->segments);
+      pool_put_index (smm->custom_seg_ctxs, seg_ctx_index);
+    }
+
+  /*
+   * Segment to be removed so notify both apps
+   */
+
+  app_wrk = app_worker_get_if_valid (seg_ctx->client_wrk);
+  /* Determine if client app still needs notification, i.e., if it is
+   * still attached. If client detached and this is the last ct session
+   * on this segment, then its connects segment manager should also be
+   * detached, so do not send notification */
+  if (app_wrk)
+    {
+      segment_manager_t *csm;
+      u64 segment_handle = segment_manager_segment_handle (sm, fs);
+      csm = app_worker_get_connect_segment_manager (app_wrk);
+      if (!segment_manager_app_detached (csm))
+       app_worker_del_segment_notify (
+         app_wrk, ct_client_seg_handle (segment_handle, seg_ctx->client_wrk));
+    }
+
+  /* Notify server app and free segment */
+  segment_manager_lock_and_del_segment (sm, seg_index);
+
+  /* Cleanup segment manager if needed. If server detaches there's a chance
+   * the client's sessions will hold up segment removal */
+  if (segment_manager_app_detached (sm) && !segment_manager_has_fifos (sm))
+    segment_manager_free_safe (sm);
+
+done:
+
+  clib_rwlock_writer_unlock (&smm->custom_segs_lock);
+}
+
+static inline custom_segment_t *
+sm_lookup_free_custom_segment (segment_manager_t *sm, u32 seg_ctx_index)
+{
+  uword free_bytes, max_free_bytes;
+  custom_segment_t *ct_seg, *res = 0;
+  custom_segments_ctx_t *seg_ctx;
+  fifo_segment_t *fs;
+  u32 max_fifos;
+
+  seg_ctx = pool_elt_at_index (smm->custom_seg_ctxs, seg_ctx_index);
+  max_free_bytes = seg_ctx->fifo_pair_bytes;
+
+  pool_foreach (ct_seg, seg_ctx->segments)
+    {
+      /* Client or server has detached so segment cannot be used */
+      fs = segment_manager_get_segment (sm, ct_seg->segment_index);
+      free_bytes = fifo_segment_available_bytes (fs);
+      max_fifos = fifo_segment_size (fs) / seg_ctx->fifo_pair_bytes;
+      if (free_bytes > max_free_bytes &&
+         fifo_segment_num_fifos (fs) / 2 < max_fifos)
+       {
+         max_free_bytes = free_bytes;
+         res = ct_seg;
+       }
+    }
+
+  return res;
+}
+
+static custom_segment_t *
+sm_custom_alloc_segment (app_worker_t *server_wrk, u64 table_handle,
+                        segment_manager_t *sm, u32 client_wrk_index)
+{
+  u32 seg_ctx_index = ~0, sm_index, pair_bytes;
+  u64 seg_size, seg_handle, client_seg_handle;
+  segment_manager_props_t *props;
+  const u32 margin = 16 << 10;
+  custom_segments_ctx_t *seg_ctx;
+  application_t *server;
+  app_worker_t *client_wrk;
+  custom_segment_t *ct_seg;
+  uword *spp;
+  int fs_index;
+
+  server = application_get (server_wrk->app_index);
+  props = application_segment_manager_properties (server);
+  sm_index = segment_manager_index (sm);
+  pair_bytes = props->rx_fifo_size + props->tx_fifo_size + margin;
+
+  /*
+   * Make sure another thread did not alloc a segment while acquiring the lock
+   */
+
+  spp = hash_get (smm->custom_segs_ctxs_table, table_handle);
+  if (spp)
+    {
+      seg_ctx_index = *spp;
+      ct_seg = sm_lookup_free_custom_segment (sm, seg_ctx_index);
+      if (ct_seg)
+       return ct_seg;
+    }
+
+  /*
+   * No segment, try to alloc one and notify the server and the client.
+   * Make sure the segment is not used for other fifos
+   */
+  seg_size = clib_max (props->segment_size, 128 << 20);
+  fs_index =
+    segment_manager_add_segment2 (sm, seg_size, FIFO_SEGMENT_F_CUSTOM_USE);
+  if (fs_index < 0)
+    return 0;
+
+  if (seg_ctx_index == ~0)
+    {
+      pool_get_zero (smm->custom_seg_ctxs, seg_ctx);
+      seg_ctx_index = seg_ctx - smm->custom_seg_ctxs;
+      hash_set (smm->custom_segs_ctxs_table, table_handle, seg_ctx_index);
+      seg_ctx->server_wrk = server_wrk->wrk_index;
+      seg_ctx->client_wrk = client_wrk_index;
+      seg_ctx->sm_index = sm_index;
+      seg_ctx->fifo_pair_bytes = pair_bytes;
+    }
+  else
+    {
+      seg_ctx = pool_elt_at_index (smm->custom_seg_ctxs, seg_ctx_index);
+    }
+
+  pool_get_zero (seg_ctx->segments, ct_seg);
+  ct_seg->segment_index = fs_index;
+  ct_seg->server_n_sessions = 0;
+  ct_seg->client_n_sessions = 0;
+  ct_seg->custom_seg_index = ct_seg - seg_ctx->segments;
+  ct_seg->seg_ctx_index = seg_ctx_index;
+
+  /* New segment, notify the server and client */
+  seg_handle = segment_manager_make_segment_handle (sm_index, fs_index);
+  if (app_worker_add_segment_notify (server_wrk, seg_handle))
+    goto error;
+
+  client_wrk = app_worker_get (client_wrk_index);
+  /* Make sure client workers do not have overlapping segment handles.
+   * Ideally, we should attach fs to client worker segment manager and
+   * create a new handle but that's not currently possible. */
+  client_seg_handle = ct_client_seg_handle (seg_handle, client_wrk_index);
+  if (app_worker_add_segment_notify (client_wrk, client_seg_handle))
+    {
+      app_worker_del_segment_notify (server_wrk, seg_handle);
+      goto error;
+    }
+
+  return ct_seg;
+
+error:
+
+  segment_manager_lock_and_del_segment (sm, fs_index);
+  pool_put_index (seg_ctx->segments, ct_seg->seg_ctx_index);
+  return 0;
+}
+
+int
+segment_manager_alloc_session_fifos_ct (session_t *s, segment_manager_t *sm,
+                                       clib_thread_index_t thread_index,
+                                       svm_fifo_t **rx_fifo,
+                                       svm_fifo_t **tx_fifo)
+{
+  segment_manager_props_t *props;
+  u64 table_handle, seg_handle;
+  u32 sm_index, fs_index = ~0;
+  custom_segments_ctx_t *seg_ctx;
+  application_t *server;
+  custom_segment_t *ct_seg;
+  fifo_segment_t *fs;
+  uword *spp;
+  int rv;
+  app_worker_t *server_wrk = app_worker_get (s->app_wrk_index);
+  ct_connection_t *sct, *cct;
+
+  sct = ct_connection_get (s->connection_index, thread_index);
+  ASSERT (sct != 0);
+  if (sct == 0)
+    {
+      log_err ("Cannot find server cut-through connection: connection index "
+              "%d, thread index %d",
+              s->connection_index, thread_index);
+      return -1;
+    }
+  cct = ct_connection_get (sct->peer_index, thread_index);
+  ASSERT (cct != 0);
+  if (cct == 0)
+    {
+      log_err ("Cannot find client cut-through connection: peer index %d, "
+              "thread index %d",
+              sct->peer_index, thread_index);
+      return -1;
+    }
+
+  sm_index = segment_manager_index (sm);
+  server = application_get (server_wrk->app_index);
+  props = application_segment_manager_properties (server);
+
+  table_handle = sct->client_wrk << 16 | server_wrk->wrk_index;
+  table_handle = (u64) sm_index << 32 | table_handle;
+
+  /*
+   * Check if we already have a segment that can hold the fifos
+   */
+
+  clib_rwlock_reader_lock (&smm->custom_segs_lock);
+
+  spp = hash_get (smm->custom_segs_ctxs_table, table_handle);
+  if (spp)
+    {
+      ct_seg = sm_lookup_free_custom_segment (sm, *spp);
+      if (ct_seg)
+       {
+         sct->seg_ctx_index = ct_seg->seg_ctx_index;
+         sct->ct_seg_index = ct_seg->custom_seg_index;
+         fs_index = ct_seg->segment_index;
+         ct_seg->flags &= ~(CUSTOM_SEGMENT_F_SERVER_DETACHED |
+                            CUSTOM_SEGMENT_F_CLIENT_DETACHED);
+         __atomic_add_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
+         __atomic_add_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
+       }
+    }
+
+  clib_rwlock_reader_unlock (&smm->custom_segs_lock);
+
+  /*
+   * If not, grab exclusive lock and allocate segment
+   */
+  if (fs_index == ~0)
+    {
+      clib_rwlock_writer_lock (&smm->custom_segs_lock);
+
+      ct_seg = sm_custom_alloc_segment (server_wrk, table_handle, sm,
+                                       sct->client_wrk);
+      if (!ct_seg)
+       {
+         clib_rwlock_writer_unlock (&smm->custom_segs_lock);
+         return -1;
+       }
+
+      sct->seg_ctx_index = ct_seg->seg_ctx_index;
+      sct->ct_seg_index = ct_seg->custom_seg_index;
+      ct_seg->server_n_sessions += 1;
+      ct_seg->client_n_sessions += 1;
+      fs_index = ct_seg->segment_index;
+
+      clib_rwlock_writer_unlock (&smm->custom_segs_lock);
+    }
+
+  /*
+   * Allocate and initialize the fifos
+   */
+  fs = segment_manager_get_segment_w_lock (sm, fs_index);
+  rv = segment_manager_try_alloc_fifos (fs, thread_index, props->rx_fifo_size,
+                                       props->tx_fifo_size, rx_fifo, tx_fifo);
+  if (rv)
+    {
+      segment_manager_segment_reader_unlock (sm);
+
+      clib_rwlock_reader_lock (&smm->custom_segs_lock);
+
+      seg_ctx = pool_elt_at_index (smm->custom_seg_ctxs, sct->seg_ctx_index);
+      ct_seg = pool_elt_at_index (seg_ctx->segments, sct->ct_seg_index);
+      __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
+      __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
+
+      clib_rwlock_reader_unlock (&smm->custom_segs_lock);
+
+      return rv;
+    }
+
+  seg_handle = segment_manager_segment_handle (sm, fs);
+  segment_manager_segment_reader_unlock (sm);
+
+  sct->segment_handle = seg_handle;
+  sct->server_wrk = s->app_wrk_index;
+
+  cct->server_wrk = sct->server_wrk;
+  cct->seg_ctx_index = sct->seg_ctx_index;
+  cct->ct_seg_index = sct->ct_seg_index;
+  (*rx_fifo)->seg_ctx_index = (*tx_fifo)->seg_ctx_index = cct->seg_ctx_index;
+  (*rx_fifo)->ct_seg_index = (*tx_fifo)->ct_seg_index = cct->ct_seg_index;
+  (*tx_fifo)->flags |= SVM_FIFO_F_SERVER_CT;
+  (*rx_fifo)->flags |= SVM_FIFO_F_SERVER_CT;
+  cct->client_rx_fifo = *tx_fifo;
+  cct->client_tx_fifo = *rx_fifo;
+  cct->client_rx_fifo->refcnt++;
+  cct->client_tx_fifo->refcnt++;
+  cct->segment_handle =
+    ct_client_seg_handle (sct->segment_handle, cct->client_wrk);
+
+  return 0;
+}
+
 /*
  * fd.io coding-style-patch-verification: ON
  *
index 38dabab..8c9af27 100644 (file)
@@ -129,12 +129,6 @@ int segment_manager_add_segment (segment_manager_t *sm, uword segment_size,
  * @param segment_size Size of segment to be added
  * @param flags                Flags to be set on segment
  */
-int segment_manager_add_segment2 (segment_manager_t *sm, uword segment_size,
-                                 u8 flags);
-void segment_manager_del_segment (segment_manager_t * sm,
-                                 fifo_segment_t * fs);
-void segment_manager_lock_and_del_segment (segment_manager_t * sm,
-                                          u32 fs_index);
 fifo_segment_t *segment_manager_get_segment (segment_manager_t * sm,
                                             u32 segment_index);
 fifo_segment_t *segment_manager_get_segment_w_handle (u64 sh);
@@ -152,13 +146,12 @@ int segment_manager_alloc_session_fifos (segment_manager_t *sm,
                                         clib_thread_index_t thread_index,
                                         svm_fifo_t **rx_fifo,
                                         svm_fifo_t **tx_fifo);
-int segment_manager_try_alloc_fifos (fifo_segment_t *fs,
-                                    clib_thread_index_t thread_index,
-                                    u32 rx_fifo_size, u32 tx_fifo_size,
-                                    svm_fifo_t **rx_fifo,
-                                    svm_fifo_t **tx_fifo);
-void segment_manager_dealloc_fifos (svm_fifo_t * rx_fifo,
-                                   svm_fifo_t * tx_fifo);
+int segment_manager_alloc_session_fifos_ct (session_t *s,
+                                           segment_manager_t *sm,
+                                           clib_thread_index_t thread_index,
+                                           svm_fifo_t **rx_fifo,
+                                           svm_fifo_t **tx_fifo);
+void segment_manager_dealloc_fifos (svm_fifo_t *rx_fifo, svm_fifo_t *tx_fifo);
 void segment_manager_detach_fifo (segment_manager_t *sm, svm_fifo_t **f);
 void segment_manager_attach_fifo (segment_manager_t *sm, svm_fifo_t **f,
                                  session_t *s);