session: improve ct locking with multiple workers 74/32774/11
authorFlorin Coras <fcoras@cisco.com>
Thu, 17 Jun 2021 22:53:38 +0000 (15:53 -0700)
committerFlorin Coras <fcoras@cisco.com>
Fri, 18 Jun 2021 23:51:42 +0000 (16:51 -0700)
Type: improvement

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Id91c3be57d49745cb3db6c768a8d5d14133f899e

src/vnet/session/application_local.c
src/vnet/session/application_local.h
src/vnet/session/segment_manager.c
src/vnet/session/segment_manager.h
src/vnet/session/session_node.c

index 9a8fe00..3c62dad 100644 (file)
@@ -24,9 +24,11 @@ typedef enum ct_segment_flags_
 
 typedef struct ct_segment_
 {
-  u32 segment_index;
   u32 client_n_sessions;
   u32 server_n_sessions;
+  u32 seg_ctx_index;
+  u32 ct_seg_index;
+  u32 segment_index;
   ct_segment_flags_t flags;
 } ct_segment_t;
 
@@ -35,6 +37,7 @@ typedef struct ct_segments_
   u32 sm_index;
   u32 server_wrk;
   u32 client_wrk;
+  u32 fifo_pair_bytes;
   ct_segment_t *segments;
 } ct_segments_ctx_t;
 
@@ -139,13 +142,13 @@ ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
 {
   ct_segments_ctx_t *seg_ctx;
   ct_main_t *cm = &ct_main;
-  ct_segment_flags_t flags;
   segment_manager_t *sm;
   app_worker_t *app_wrk;
   ct_segment_t *ct_seg;
   fifo_segment_t *fs;
+  u8 del_segment = 0;
   u32 seg_index;
-  u8 cnt;
+  int cnt;
 
   /*
    * Cleanup fifos
@@ -160,7 +163,7 @@ ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
   segment_manager_segment_reader_unlock (sm);
 
   /*
-   * Update segment context
+   * Atomically update segment context with readers lock
    */
 
   clib_rwlock_reader_lock (&cm->app_segs_lock);
@@ -172,23 +175,70 @@ ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
     {
       cnt =
        __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
-      if (!cnt)
-       ct_seg->flags |= CT_SEGMENT_F_CLIENT_DETACHED;
     }
   else
     {
       cnt =
        __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
+    }
+
+  clib_rwlock_reader_unlock (&cm->app_segs_lock);
+
+  /*
+   * No need to do any app updates, return
+   */
+  ASSERT (cnt >= 0);
+  if (cnt)
+    return;
+
+  /*
+   * Grab exclusive lock and update flags unless some other thread
+   * added more sessions
+   */
+  clib_rwlock_writer_lock (&cm->app_segs_lock);
+
+  seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
+  ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
+  if (ct->flags & CT_CONN_F_CLIENT)
+    {
+      cnt = ct_seg->client_n_sessions;
+      if (!cnt)
+       ct_seg->flags |= CT_SEGMENT_F_CLIENT_DETACHED;
+    }
+  else
+    {
+      cnt = ct_seg->server_n_sessions;
       if (!cnt)
        ct_seg->flags |= CT_SEGMENT_F_SERVER_DETACHED;
     }
 
-  flags = ct_seg->flags;
+  /*
+   * Remove segment context because both client and server detached
+   */
 
-  clib_rwlock_reader_unlock (&cm->app_segs_lock);
+  if (!cnt && (ct_seg->flags & CT_SEGMENT_F_CLIENT_DETACHED) &&
+      (ct_seg->flags & CT_SEGMENT_F_SERVER_DETACHED))
+    {
+      pool_put_index (seg_ctx->segments, ct->ct_seg_index);
+
+      /*
+       * No more segment indices left, remove the segments context
+       */
+      if (!pool_elts (seg_ctx->segments))
+       {
+         u64 table_handle = seg_ctx->client_wrk << 16 | seg_ctx->server_wrk;
+         table_handle = (u64) seg_ctx->sm_index << 32 | table_handle;
+         hash_unset (cm->app_segs_ctxs_table, table_handle);
+         pool_free (seg_ctx->segments);
+         pool_put_index (cm->app_seg_ctxs, ct->seg_ctx_index);
+       }
+      del_segment = 1;
+    }
+
+  clib_rwlock_writer_unlock (&cm->app_segs_lock);
 
   /*
-   * No need to do any app updates, return
+   * Session counter went to zero, notify the app that detached
    */
   if (cnt)
     return;
@@ -214,33 +264,9 @@ ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
       app_worker_del_segment_notify (app_wrk, ct->segment_handle);
     }
 
-  if (!(flags & CT_SEGMENT_F_CLIENT_DETACHED) ||
-      !(flags & CT_SEGMENT_F_SERVER_DETACHED))
+  if (!del_segment)
     return;
 
-  /*
-   * Remove segment context because both client and server detached
-   */
-
-  clib_rwlock_writer_lock (&cm->app_segs_lock);
-
-  seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
-  pool_put_index (seg_ctx->segments, ct->ct_seg_index);
-
-  /*
-   * No more segment indices left, remove the segments context
-   */
-  if (!pool_elts (seg_ctx->segments))
-    {
-      u64 table_handle = seg_ctx->client_wrk << 16 | seg_ctx->server_wrk;
-      table_handle = (u64) seg_ctx->sm_index << 32 | table_handle;
-      hash_unset (cm->app_segs_ctxs_table, table_handle);
-      pool_free (seg_ctx->segments);
-      pool_put_index (cm->app_seg_ctxs, ct->seg_ctx_index);
-    }
-
-  clib_rwlock_writer_unlock (&cm->app_segs_lock);
-
   segment_manager_lock_and_del_segment (sm, seg_index);
 
   /* Cleanup segment manager if needed. If server detaches there's a chance
@@ -250,16 +276,12 @@ ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
 }
 
 int
-ct_session_connect_notify (session_t *ss)
+ct_session_connect_notify (session_t *ss, session_error_t err)
 {
-  u32 ss_index, opaque, thread_index, cnt;
+  u32 ss_index, opaque, thread_index;
   ct_connection_t *sct, *cct;
-  ct_segments_ctx_t *seg_ctx;
   app_worker_t *client_wrk;
-  ct_main_t *cm = &ct_main;
-  ct_segment_t *ct_seg;
   session_t *cs;
-  int err = 0;
 
   ss_index = ss->session_index;
   thread_index = ss->thread_index;
@@ -270,7 +292,7 @@ ct_session_connect_notify (session_t *ss)
   cct = ct_connection_get (sct->peer_index, thread_index);
 
   /* Client closed while waiting for reply from server */
-  if (!cct)
+  if (PREDICT_FALSE (!cct))
     {
       session_transport_closing_notify (&sct->connection);
       session_transport_delete_notify (&sct->connection);
@@ -281,28 +303,8 @@ ct_session_connect_notify (session_t *ss)
   session_half_open_delete_notify (&cct->connection);
   cct->flags &= ~CT_CONN_F_HALF_OPEN;
 
-  /*
-   * Update ct segment context
-   */
-
-  clib_rwlock_reader_lock (&cm->app_segs_lock);
-
-  seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, sct->seg_ctx_index);
-  ct_seg = pool_elt_at_index (seg_ctx->segments, sct->ct_seg_index);
-
-  cnt = __atomic_add_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
-  if (cnt == 1)
-    {
-      err = app_worker_add_segment_notify (client_wrk, cct->segment_handle);
-      if (err)
-       {
-         clib_rwlock_reader_unlock (&cm->app_segs_lock);
-         session_close (ss);
-         goto error;
-       }
-    }
-
-  clib_rwlock_reader_unlock (&cm->app_segs_lock);
+  if (PREDICT_FALSE (err))
+    goto connect_error;
 
   /*
    * Alloc client session
@@ -315,34 +317,27 @@ ct_session_connect_notify (session_t *ss)
   cs->session_state = SESSION_STATE_CONNECTING;
   cs->app_wrk_index = client_wrk->wrk_index;
   cs->connection_index = cct->c_c_index;
-  cct->seg_ctx_index = sct->seg_ctx_index;
-  cct->ct_seg_index = sct->ct_seg_index;
-
   cct->c_s_index = cs->session_index;
-  cct->client_rx_fifo = ss->tx_fifo;
-  cct->client_tx_fifo = ss->rx_fifo;
-
-  cct->client_rx_fifo->refcnt++;
-  cct->client_tx_fifo->refcnt++;
 
   /* This will allocate fifos for the session. They won't be used for
    * exchanging data but they will be used to close the connection if
    * the segment manager/worker is freed */
   if ((err = app_worker_init_connected (client_wrk, cs)))
     {
-      session_close (ss);
       session_free (cs);
-      goto error;
+      session_close (ss);
+      err = SESSION_E_ALLOC;
+      goto connect_error;
     }
 
   cs->session_state = SESSION_STATE_CONNECTING;
 
-  if (app_worker_connect_notify (client_wrk, cs, err, opaque))
+  if (app_worker_connect_notify (client_wrk, cs, 0, opaque))
     {
-      session_close (ss);
-      ct_session_dealloc_fifos (cct, cs->rx_fifo, cs->tx_fifo);
+      segment_manager_dealloc_fifos (cs->rx_fifo, cs->tx_fifo);
       session_free (cs);
-      return -1;
+      session_close (ss);
+      goto cleanup_client;
     }
 
   cs = session_get (cct->c_s_index, cct->c_thread_index);
@@ -350,21 +345,31 @@ ct_session_connect_notify (session_t *ss)
 
   return 0;
 
-error:
-  app_worker_connect_notify (client_wrk, 0, err, opaque);
+connect_error:
+
+  app_worker_connect_notify (client_wrk, 0, err, cct->client_opaque);
+
+cleanup_client:
+
+  if (cct->client_rx_fifo)
+    ct_session_dealloc_fifos (cct, cct->client_rx_fifo, cct->client_tx_fifo);
+  ct_connection_free (cct);
   return -1;
 }
 
-static ct_segment_t *
-ct_lookup_free_segment (segment_manager_t *sm, ct_segments_ctx_t *seg_ctx,
-                       u32 pair_bytes)
+static inline ct_segment_t *
+ct_lookup_free_segment (ct_main_t *cm, segment_manager_t *sm,
+                       u32 seg_ctx_index)
 {
   uword free_bytes, max_free_bytes;
   ct_segment_t *ct_seg, *res = 0;
+  ct_segments_ctx_t *seg_ctx;
   fifo_segment_t *fs;
   u32 max_fifos;
 
-  max_free_bytes = pair_bytes;
+  seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
+  max_free_bytes = seg_ctx->fifo_pair_bytes;
+
   pool_foreach (ct_seg, seg_ctx->segments)
     {
       /* Client or server has detached so segment cannot be used */
@@ -373,7 +378,7 @@ ct_lookup_free_segment (segment_manager_t *sm, ct_segments_ctx_t *seg_ctx,
        continue;
       fs = segment_manager_get_segment (sm, ct_seg->segment_index);
       free_bytes = fifo_segment_available_bytes (fs);
-      max_fifos = fifo_segment_size (fs) / pair_bytes;
+      max_fifos = fifo_segment_size (fs) / seg_ctx->fifo_pair_bytes;
       if (free_bytes > max_free_bytes &&
          fifo_segment_num_fifos (fs) / 2 < max_fifos)
        {
@@ -385,22 +390,107 @@ ct_lookup_free_segment (segment_manager_t *sm, ct_segments_ctx_t *seg_ctx,
   return res;
 }
 
+static ct_segment_t *
+ct_alloc_segment (ct_main_t *cm, app_worker_t *server_wrk, u64 table_handle,
+                 segment_manager_t *sm, u32 client_wrk_index)
+{
+  u32 seg_ctx_index = ~0, sm_index, pair_bytes;
+  segment_manager_props_t *props;
+  const u32 margin = 16 << 10;
+  ct_segments_ctx_t *seg_ctx;
+  app_worker_t *client_wrk;
+  u64 seg_size, seg_handle;
+  application_t *server;
+  ct_segment_t *ct_seg;
+  uword *spp;
+  int fs_index;
+
+  server = application_get (server_wrk->app_index);
+  props = application_segment_manager_properties (server);
+  sm_index = segment_manager_index (sm);
+  pair_bytes = props->rx_fifo_size + props->tx_fifo_size + margin;
+
+  /*
+   * Make sure another thread did not alloc a segment while acquiring the lock
+   */
+
+  spp = hash_get (cm->app_segs_ctxs_table, table_handle);
+  if (spp)
+    {
+      seg_ctx_index = *spp;
+      ct_seg = ct_lookup_free_segment (cm, sm, seg_ctx_index);
+      if (ct_seg)
+       return ct_seg;
+    }
+
+  /*
+   * No segment, try to alloc one and notify the server and the client.
+   * Make sure the segment is not used for other fifos
+   */
+  seg_size = clib_max (props->segment_size, 128 << 20);
+  fs_index =
+    segment_manager_add_segment2 (sm, seg_size, FIFO_SEGMENT_F_CUSTOM_USE);
+  if (fs_index < 0)
+    return 0;
+
+  if (seg_ctx_index == ~0)
+    {
+      pool_get_zero (cm->app_seg_ctxs, seg_ctx);
+      seg_ctx_index = seg_ctx - cm->app_seg_ctxs;
+      hash_set (cm->app_segs_ctxs_table, table_handle, seg_ctx_index);
+      seg_ctx->server_wrk = server_wrk->wrk_index;
+      seg_ctx->client_wrk = client_wrk_index;
+      seg_ctx->sm_index = sm_index;
+      seg_ctx->fifo_pair_bytes = pair_bytes;
+    }
+  else
+    {
+      seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
+    }
+
+  pool_get_zero (seg_ctx->segments, ct_seg);
+  ct_seg->segment_index = fs_index;
+  ct_seg->server_n_sessions = 0;
+  ct_seg->client_n_sessions = 0;
+  ct_seg->ct_seg_index = ct_seg - seg_ctx->segments;
+  ct_seg->seg_ctx_index = seg_ctx_index;
+
+  /* New segment, notify the server and client */
+  seg_handle = segment_manager_make_segment_handle (sm_index, fs_index);
+  if (app_worker_add_segment_notify (server_wrk, seg_handle))
+    goto error;
+
+  client_wrk = app_worker_get (client_wrk_index);
+  if (app_worker_add_segment_notify (client_wrk, seg_handle))
+    {
+      app_worker_del_segment_notify (server_wrk, seg_handle);
+      goto error;
+    }
+
+  return ct_seg;
+
+error:
+
+  segment_manager_lock_and_del_segment (sm, fs_index);
+  pool_put_index (seg_ctx->segments, ct_seg->seg_ctx_index);
+  return 0;
+}
+
 static int
 ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct,
                          session_t *ls, session_t *ll)
 {
-  u32 sm_index, pair_bytes, seg_ctx_index = ~0, ct_seg_index = ~0;
-  u64 seg_handle, table_handle, seg_size;
   segment_manager_props_t *props;
-  const u32 margin = 16 << 10;
+  u64 seg_handle, table_handle;
+  u32 sm_index, fs_index = ~0;
   ct_segments_ctx_t *seg_ctx;
   ct_main_t *cm = &ct_main;
   application_t *server;
   segment_manager_t *sm;
   ct_segment_t *ct_seg;
   fifo_segment_t *fs;
-  int rv, fs_index;
   uword *spp;
+  int rv;
 
   sm = app_worker_get_listen_segment_manager (server_wrk, ll);
   sm_index = segment_manager_index (sm);
@@ -408,7 +498,7 @@ ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct,
   props = application_segment_manager_properties (server);
 
   table_handle = ct->client_wrk << 16 | server_wrk->wrk_index;
-  table_handle = (u64) segment_manager_index (sm) << 32 | table_handle;
+  table_handle = (u64) sm_index << 32 | table_handle;
 
   /*
    * Check if we already have a segment that can hold the fifos
@@ -419,73 +509,41 @@ ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct,
   spp = hash_get (cm->app_segs_ctxs_table, table_handle);
   if (spp)
     {
-      seg_ctx_index = *spp;
-      seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
-      pair_bytes = props->rx_fifo_size + props->tx_fifo_size + margin;
-      ct_seg = ct_lookup_free_segment (sm, seg_ctx, pair_bytes);
+      ct_seg = ct_lookup_free_segment (cm, sm, *spp);
       if (ct_seg)
        {
-         ct_seg_index = ct_seg - seg_ctx->segments;
+         ct->seg_ctx_index = ct_seg->seg_ctx_index;
+         ct->ct_seg_index = ct_seg->ct_seg_index;
          fs_index = ct_seg->segment_index;
          __atomic_add_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
+         __atomic_add_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
        }
     }
 
   clib_rwlock_reader_unlock (&cm->app_segs_lock);
 
   /*
-   * No segment, try to alloc one and notify the server
+   * If not, grab exclusive lock and allocate segment
    */
-
-  if (ct_seg_index == ~0)
+  if (fs_index == ~0)
     {
-      seg_size = clib_max (props->segment_size, 128 << 20);
-      fs_index = segment_manager_add_segment (sm, seg_size, 0);
-      if (fs_index < 0)
-       {
-         rv = -1;
-         goto failed;
-       }
-
-      /* Make sure the segment is not used for other fifos */
-      fs = segment_manager_get_segment_w_lock (sm, fs_index);
-      fifo_segment_flags (fs) |= FIFO_SEGMENT_F_CUSTOM_USE;
-      segment_manager_segment_reader_unlock (sm);
-
       clib_rwlock_writer_lock (&cm->app_segs_lock);
 
-      if (seg_ctx_index == ~0)
+      ct_seg =
+       ct_alloc_segment (cm, server_wrk, table_handle, sm, ct->client_wrk);
+      if (!ct_seg)
        {
-         pool_get_zero (cm->app_seg_ctxs, seg_ctx);
-         seg_ctx_index = seg_ctx - cm->app_seg_ctxs;
-         hash_set (cm->app_segs_ctxs_table, table_handle, seg_ctx_index);
-         seg_ctx->server_wrk = server_wrk->wrk_index;
-         seg_ctx->client_wrk = ct->client_wrk;
-         seg_ctx->sm_index = sm_index;
+         clib_rwlock_writer_unlock (&cm->app_segs_lock);
+         return -1;
        }
-      else
-       seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
 
-      pool_get_zero (seg_ctx->segments, ct_seg);
-      ct_seg->segment_index = fs_index;
+      ct->seg_ctx_index = ct_seg->seg_ctx_index;
+      ct->ct_seg_index = ct_seg->ct_seg_index;
       ct_seg->server_n_sessions += 1;
-      ct_seg_index = ct_seg - seg_ctx->segments;
+      ct_seg->client_n_sessions += 1;
+      fs_index = ct_seg->segment_index;
 
       clib_rwlock_writer_unlock (&cm->app_segs_lock);
-
-      /* New segment, notify the server. Client notification sent after
-       * server accepts the connection */
-      seg_handle = segment_manager_make_segment_handle (sm_index, fs_index);
-      if ((rv = app_worker_add_segment_notify (server_wrk, seg_handle)))
-       {
-         segment_manager_lock_and_del_segment (sm, fs_index);
-
-         clib_rwlock_writer_lock (&cm->app_segs_lock);
-         pool_put_index (seg_ctx->segments, ct_seg_index);
-         clib_rwlock_writer_unlock (&cm->app_segs_lock);
-
-         goto failed_fix_count;
-       }
     }
 
   /*
@@ -498,7 +556,17 @@ ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct,
   if (rv)
     {
       segment_manager_segment_reader_unlock (sm);
-      goto failed_fix_count;
+
+      clib_rwlock_reader_lock (&cm->app_segs_lock);
+
+      seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
+      ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
+      __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
+      __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
+
+      clib_rwlock_reader_unlock (&cm->app_segs_lock);
+
+      return rv;
     }
 
   ls->rx_fifo->shr->master_session_index = ls->session_index;
@@ -514,23 +582,8 @@ ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct,
   segment_manager_segment_reader_unlock (sm);
 
   ct->segment_handle = seg_handle;
-  ct->seg_ctx_index = seg_ctx_index;
-  ct->ct_seg_index = ct_seg_index;
 
   return 0;
-
-failed_fix_count:
-
-  clib_rwlock_reader_lock (&cm->app_segs_lock);
-
-  seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
-  ct_seg = pool_elt_at_index (seg_ctx->segments, ct_seg_index);
-  __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
-
-  clib_rwlock_reader_unlock (&cm->app_segs_lock);
-
-failed:
-  return rv;
 }
 
 static void
@@ -616,21 +669,28 @@ ct_accept_rpc_wrk_handler (void *accept_args)
 
   if (ct_init_accepted_session (server_wrk, sct, ss, ll))
     {
+      ct_session_connect_notify (ss, SESSION_E_ALLOC);
       ct_connection_free (sct);
       session_free (ss);
       return;
     }
 
+  cct->seg_ctx_index = sct->seg_ctx_index;
+  cct->ct_seg_index = sct->ct_seg_index;
+  cct->client_rx_fifo = ss->tx_fifo;
+  cct->client_tx_fifo = ss->rx_fifo;
+  cct->client_rx_fifo->refcnt++;
+  cct->client_tx_fifo->refcnt++;
+  cct->segment_handle = sct->segment_handle;
+
   ss->session_state = SESSION_STATE_ACCEPTING;
   if (app_worker_accept_notify (server_wrk, ss))
     {
+      ct_session_connect_notify (ss, SESSION_E_REFUSED);
       ct_session_dealloc_fifos (sct, ss->rx_fifo, ss->tx_fifo);
       ct_connection_free (sct);
       session_free (ss);
-      return;
     }
-
-  cct->segment_handle = sct->segment_handle;
 }
 
 static int
@@ -811,6 +871,7 @@ ct_session_close (u32 ct_index, u32 thread_index)
   session_t *s;
 
   ct = ct_connection_get (ct_index, thread_index);
+  s = session_get (ct->c_s_index, ct->c_thread_index);
   peer_ct = ct_connection_get (ct->peer_index, thread_index);
   if (peer_ct)
     {
@@ -818,9 +879,7 @@ ct_session_close (u32 ct_index, u32 thread_index)
       /* Make sure session was allocated */
       if (peer_ct->flags & CT_CONN_F_HALF_OPEN)
        {
-         app_wrk = app_worker_get (peer_ct->client_wrk);
-         app_worker_connect_notify (app_wrk, 0, SESSION_E_REFUSED,
-                                    peer_ct->client_opaque);
+         ct_session_connect_notify (s, SESSION_E_REFUSED);
        }
       else if (peer_ct->c_s_index != ~0)
        session_transport_closing_notify (&peer_ct->connection);
@@ -828,8 +887,6 @@ ct_session_close (u32 ct_index, u32 thread_index)
        ct_connection_free (peer_ct);
     }
 
-  s = session_get (ct->c_s_index, ct->c_thread_index);
-
   if (ct->flags & CT_CONN_F_CLIENT)
     {
       /* Normal free for client session as the fifos are allocated through
index f98f469..86edf24 100644 (file)
@@ -56,7 +56,7 @@ typedef struct ct_connection_
 
 session_t *ct_session_get_peer (session_t * s);
 void ct_session_endpoint (session_t * ll, session_endpoint_t * sep);
-int ct_session_connect_notify (session_t * ls);
+int ct_session_connect_notify (session_t *ls, session_error_t err);
 int ct_session_tx (session_t * s);
 
 #endif /* SRC_VNET_SESSION_APPLICATION_LOCAL_H_ */
index a7ce989..f711270 100644 (file)
@@ -87,9 +87,9 @@ segment_manager_segment_index (segment_manager_t * sm, fifo_segment_t * seg)
  * If needed a writer's lock is acquired before allocating a new segment
  * to avoid affecting any of the segments pool readers.
  */
-int
-segment_manager_add_segment (segment_manager_t *sm, uword segment_size,
-                            u8 notify_app)
+static inline int
+segment_manager_add_segment_inline (segment_manager_t *sm, uword segment_size,
+                                   u8 notify_app, u8 flags)
 {
   segment_manager_main_t *smm = &sm_main;
   segment_manager_props_t *props;
@@ -161,6 +161,7 @@ segment_manager_add_segment (segment_manager_t *sm, uword segment_size,
   fs->h->high_watermark = sm->high_watermark;
   fs->h->low_watermark = sm->low_watermark;
   fs->h->pct_first_alloc = props->pct_first_alloc;
+  fs->h->flags = flags;
   fs->h->flags &= ~FIFO_SEGMENT_F_MEM_LIMIT;
 
   if (notify_app)
@@ -181,6 +182,20 @@ done:
   return fs_index;
 }
 
+int
+segment_manager_add_segment (segment_manager_t *sm, uword segment_size,
+                            u8 notify_app)
+{
+  return segment_manager_add_segment_inline (sm, segment_size, notify_app, 0);
+}
+
+int
+segment_manager_add_segment2 (segment_manager_t *sm, uword segment_size,
+                             u8 flags)
+{
+  return segment_manager_add_segment_inline (sm, segment_size, 0, flags);
+}
+
 /**
  * Remove segment without lock
  */
index ef8b970..5a3d772 100644 (file)
@@ -104,6 +104,8 @@ u32 segment_manager_index (segment_manager_t * sm);
 
 int segment_manager_add_segment (segment_manager_t *sm, uword segment_size,
                                 u8 notify_app);
+int segment_manager_add_segment2 (segment_manager_t *sm, uword segment_size,
+                                 u8 flags);
 void segment_manager_del_segment (segment_manager_t * sm,
                                  fifo_segment_t * fs);
 void segment_manager_lock_and_del_segment (segment_manager_t * sm,
index 5a3ee18..76080a0 100644 (file)
@@ -457,7 +457,7 @@ session_mq_accepted_reply_handler (void *data)
   if (!session_has_transport (s))
     {
       s->session_state = SESSION_STATE_READY;
-      if (ct_session_connect_notify (s))
+      if (ct_session_connect_notify (s, SESSION_E_NONE))
        return;
     }
   else