vcl: refactor vls to minimize lock usage 63/25663/13
authorFlorin Coras <fcoras@cisco.com>
Wed, 4 Mar 2020 22:20:12 +0000 (22:20 +0000)
committerDave Barach <openvpp@barachs.net>
Fri, 6 Mar 2020 15:08:24 +0000 (15:08 +0000)
Type: refactor

- per vls worker private pool of sessions
- deep copy of vls worker data structures on fork
- maintain a global, i.e., heap allocated, and lock protected  pool of
  elements that track sessions that are shared between workers (due to
  forking).

Credit for uncovering the issue goes to Intel team contributing code to
VSAP (Ping, Yuwei, Shujun, Guoao).

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Id7d8bb06ecd7b03e4134f1cae23e740cf4634649

src/vcl/vcl_locked.c
src/vcl/vcl_private.h

index d69d391..fcbc0ea 100644 (file)
 #include <vcl/vcl_locked.h>
 #include <vcl/vcl_private.h>
 
+typedef struct vls_shared_data_
+{
+  clib_spinlock_t lock;
+  u32 owner_wrk_index;
+  u32 *workers_subscribed;
+  clib_bitmap_t *listeners;
+} vls_shared_data_t;
+
 typedef struct vcl_locked_session_
 {
   clib_spinlock_t lock;
   u32 session_index;
   u32 worker_index;
   u32 vls_index;
-  u32 *workers_subscribed;
-  clib_bitmap_t *listeners;
+  u32 shared_data_index;
 } vcl_locked_session_t;
 
+typedef struct vls_worker_
+{
+  vcl_locked_session_t *vls_pool;
+  uword *session_index_to_vlsh_table;
+  u32 wrk_index;
+} vls_worker_t;
+
 typedef struct vls_local_
 {
   int vls_wrk_index;
@@ -41,35 +55,102 @@ static vls_process_local_t *vlsl = &vls_local;
 
 typedef struct vls_main_
 {
-  vcl_locked_session_t *vls_pool;
+  vls_worker_t *workers;
   clib_rwlock_t vls_table_lock;
-  uword *session_index_to_vlsh_table;
+  /** Pool of data shared by sessions owned by different workers */
+  vls_shared_data_t *shared_data_pool;
+  clib_rwlock_t shared_data_lock;
 } vls_main_t;
 
 vls_main_t *vlsm;
 
+static inline u32
+vls_get_worker_index (void)
+{
+  return vcl_get_worker_index ();
+}
+
+static u32
+vls_shared_data_alloc (void)
+{
+  vls_shared_data_t *vls_shd;
+  u32 shd_index;
+
+  clib_rwlock_writer_lock (&vlsm->shared_data_lock);
+  pool_get_zero (vlsm->shared_data_pool, vls_shd);
+  clib_spinlock_init (&vls_shd->lock);
+  shd_index = vls_shd - vlsm->shared_data_pool;
+  clib_rwlock_writer_unlock (&vlsm->shared_data_lock);
+
+  return shd_index;
+}
+
+static u32
+vls_shared_data_index (vls_shared_data_t * vls_shd)
+{
+  return vls_shd - vlsm->shared_data_pool;
+}
+
+vls_shared_data_t *
+vls_shared_data_get (u32 shd_index)
+{
+  if (pool_is_free_index (vlsm->shared_data_pool, shd_index))
+    return 0;
+  return pool_elt_at_index (vlsm->shared_data_pool, shd_index);
+}
+
+static void
+vls_shared_data_free (u32 shd_index)
+{
+  vls_shared_data_t *vls_shd;
+
+  clib_rwlock_writer_lock (&vlsm->shared_data_lock);
+  vls_shd = vls_shared_data_get (shd_index);
+  clib_spinlock_free (&vls_shd->lock);
+  clib_bitmap_free (vls_shd->listeners);
+  vec_free (vls_shd->workers_subscribed);
+  pool_put (vlsm->shared_data_pool, vls_shd);
+  clib_rwlock_writer_unlock (&vlsm->shared_data_lock);
+}
+
+static inline void
+vls_shared_data_pool_rlock (void)
+{
+  clib_rwlock_reader_lock (&vlsm->shared_data_lock);
+}
+
+static inline void
+vls_shared_data_pool_runlock (void)
+{
+  clib_rwlock_reader_unlock (&vlsm->shared_data_lock);
+}
+
 static inline void
 vls_table_rlock (void)
 {
-  clib_rwlock_reader_lock (&vlsm->vls_table_lock);
+  if (vlsl->vls_mt_n_threads > 1)
+    clib_rwlock_reader_lock (&vlsm->vls_table_lock);
 }
 
 static inline void
 vls_table_runlock (void)
 {
-  clib_rwlock_reader_unlock (&vlsm->vls_table_lock);
+  if (vlsl->vls_mt_n_threads > 1)
+    clib_rwlock_reader_unlock (&vlsm->vls_table_lock);
 }
 
 static inline void
 vls_table_wlock (void)
 {
-  clib_rwlock_writer_lock (&vlsm->vls_table_lock);
+  if (vlsl->vls_mt_n_threads > 1)
+    clib_rwlock_writer_lock (&vlsm->vls_table_lock);
 }
 
 static inline void
 vls_table_wunlock (void)
 {
-  clib_rwlock_writer_unlock (&vlsm->vls_table_lock);
+  if (vlsl->vls_mt_n_threads > 1)
+    clib_rwlock_writer_unlock (&vlsm->vls_table_lock);
 }
 
 typedef enum
@@ -124,6 +205,26 @@ vls_mt_locks_init (void)
   pthread_mutex_init (&vlsl->vls_mt_spool_mlock, NULL);
 }
 
+u8
+vls_is_shared (vcl_locked_session_t * vls)
+{
+  return (vls->shared_data_index != ~0);
+}
+
+static inline void
+vls_lock (vcl_locked_session_t * vls)
+{
+  if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls))
+    clib_spinlock_lock (&vls->lock);
+}
+
+static inline void
+vls_unlock (vcl_locked_session_t * vls)
+{
+  if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls))
+    clib_spinlock_unlock (&vls->lock);
+}
+
 static inline vcl_session_handle_t
 vls_to_sh (vcl_locked_session_t * vls)
 {
@@ -139,19 +240,54 @@ vls_to_sh_tu (vcl_locked_session_t * vls)
   return sh;
 }
 
+static vls_worker_t *
+vls_worker_get_current (void)
+{
+  return pool_elt_at_index (vlsm->workers, vls_get_worker_index ());
+}
+
+static void
+vls_worker_alloc (void)
+{
+  vls_worker_t *wrk;
+
+  pool_get_zero (vlsm->workers, wrk);
+  wrk->wrk_index = vcl_get_worker_index ();
+}
+
+static void
+vls_worker_free (vls_worker_t * wrk)
+{
+  hash_free (wrk->session_index_to_vlsh_table);
+  pool_free (wrk->vls_pool);
+  pool_put (vlsm->workers, wrk);
+}
+
+static vls_worker_t *
+vls_worker_get (u32 wrk_index)
+{
+  if (pool_is_free_index (vlsm->workers, wrk_index))
+    return 0;
+  return pool_elt_at_index (vlsm->workers, wrk_index);
+}
+
 static vls_handle_t
 vls_alloc (vcl_session_handle_t sh)
 {
+  vls_worker_t *wrk = vls_worker_get_current ();
   vcl_locked_session_t *vls;
 
   vls_table_wlock ();
-  pool_get (vlsm->vls_pool, vls);
+
+  pool_get_zero (wrk->vls_pool, vls);
   vls->session_index = vppcom_session_index (sh);
   vls->worker_index = vppcom_session_worker (sh);
-  vls->vls_index = vls - vlsm->vls_pool;
-  hash_set (vlsm->session_index_to_vlsh_table, vls->session_index,
+  vls->vls_index = vls - wrk->vls_pool;
+  vls->shared_data_index = ~0;
+  hash_set (wrk->session_index_to_vlsh_table, vls->session_index,
            vls->vls_index);
   clib_spinlock_init (&vls->lock);
+
   vls_table_wunlock ();
   return vls->vls_index;
 }
@@ -159,28 +295,32 @@ vls_alloc (vcl_session_handle_t sh)
 static vcl_locked_session_t *
 vls_get (vls_handle_t vlsh)
 {
-  if (pool_is_free_index (vlsm->vls_pool, vlsh))
+  vls_worker_t *wrk = vls_worker_get_current ();
+  if (pool_is_free_index (wrk->vls_pool, vlsh))
     return 0;
-  return pool_elt_at_index (vlsm->vls_pool, vlsh);
+  return pool_elt_at_index (wrk->vls_pool, vlsh);
 }
 
 static void
 vls_free (vcl_locked_session_t * vls)
 {
+  vls_worker_t *wrk = vls_worker_get_current ();
+
   ASSERT (vls != 0);
-  hash_unset (vlsm->session_index_to_vlsh_table, vls->session_index);
+  hash_unset (wrk->session_index_to_vlsh_table, vls->session_index);
   clib_spinlock_free (&vls->lock);
-  pool_put (vlsm->vls_pool, vls);
+  pool_put (wrk->vls_pool, vls);
 }
 
 static vcl_locked_session_t *
 vls_get_and_lock (vls_handle_t vlsh)
 {
+  vls_worker_t *wrk = vls_worker_get_current ();
   vcl_locked_session_t *vls;
-  if (pool_is_free_index (vlsm->vls_pool, vlsh))
+  if (pool_is_free_index (wrk->vls_pool, vlsh))
     return 0;
-  vls = pool_elt_at_index (vlsm->vls_pool, vlsh);
-  clib_spinlock_lock (&vls->lock);
+  vls = pool_elt_at_index (wrk->vls_pool, vlsh);
+  vls_lock (vls);
   return vls;
 }
 
@@ -195,18 +335,6 @@ vls_get_w_dlock (vls_handle_t vlsh)
   return vls;
 }
 
-static inline void
-vls_lock (vcl_locked_session_t * vls)
-{
-  clib_spinlock_lock (&vls->lock);
-}
-
-static inline void
-vls_unlock (vcl_locked_session_t * vls)
-{
-  clib_spinlock_unlock (&vls->lock);
-}
-
 static inline void
 vls_get_and_unlock (vls_handle_t vlsh)
 {
@@ -224,6 +352,14 @@ vls_dunlock (vcl_locked_session_t * vls)
   vls_table_runlock ();
 }
 
+static vcl_locked_session_t *
+vls_session_get (vls_worker_t * wrk, u32 vls_index)
+{
+  if (pool_is_free_index (wrk->vls_pool, vls_index))
+    return 0;
+  return pool_elt_at_index (wrk->vls_pool, vls_index);
+}
+
 vcl_session_handle_t
 vlsh_to_sh (vls_handle_t vlsh)
 {
@@ -249,8 +385,9 @@ vlsh_to_session_index (vls_handle_t vlsh)
 vls_handle_t
 vls_si_to_vlsh (u32 session_index)
 {
+  vls_worker_t *wrk = vls_worker_get_current ();
   uword *vlshp;
-  vlshp = hash_get (vlsm->session_index_to_vlsh_table, session_index);
+  vlshp = hash_get (wrk->session_index_to_vlsh_table, session_index);
   return vlshp ? *vlshp : VLS_INVALID_HANDLE;
 }
 
@@ -266,32 +403,94 @@ vls_session_index_to_vlsh (uint32_t session_index)
   return vlsh;
 }
 
-u8
-vls_is_shared (vcl_locked_session_t * vls)
-{
-  return vec_len (vls->workers_subscribed);
-}
-
 u8
 vls_is_shared_by_wrk (vcl_locked_session_t * vls, u32 wrk_index)
 {
+  vls_shared_data_t *vls_shd;
   int i;
-  for (i = 0; i < vec_len (vls->workers_subscribed); i++)
-    if (vls->workers_subscribed[i] == wrk_index)
-      return 1;
+
+  if (vls->shared_data_index == ~0)
+    return 0;
+
+  vls_shared_data_pool_rlock ();
+
+  vls_shd = vls_shared_data_get (vls->shared_data_index);
+  clib_spinlock_lock (&vls_shd->lock);
+
+  for (i = 0; i < vec_len (vls_shd->workers_subscribed); i++)
+    if (vls_shd->workers_subscribed[i] == wrk_index)
+      {
+       clib_spinlock_unlock (&vls_shd->lock);
+       vls_shared_data_pool_runlock ();
+       return 1;
+      }
+  clib_spinlock_unlock (&vls_shd->lock);
+
+  vls_shared_data_pool_runlock ();
   return 0;
 }
 
 static void
 vls_listener_wrk_set (vcl_locked_session_t * vls, u32 wrk_index, u8 is_active)
 {
-  clib_bitmap_set (vls->listeners, wrk_index, is_active);
+  vls_shared_data_t *vls_shd;
+
+  if (vls->shared_data_index == ~0)
+    {
+      clib_warning ("not a shared session");
+      return;
+    }
+
+  vls_shared_data_pool_rlock ();
+
+  vls_shd = vls_shared_data_get (vls->shared_data_index);
+
+  clib_spinlock_lock (&vls_shd->lock);
+  clib_bitmap_set (vls_shd->listeners, wrk_index, is_active);
+  clib_spinlock_unlock (&vls_shd->lock);
+
+  vls_shared_data_pool_runlock ();
+}
+
+static u32
+vls_shared_get_owner (vcl_locked_session_t * vls)
+{
+  vls_shared_data_t *vls_shd;
+  u32 owner_wrk;
+
+  vls_shared_data_pool_rlock ();
+
+  vls_shd = vls_shared_data_get (vls->shared_data_index);
+  owner_wrk = vls_shd->owner_wrk_index;
+
+  vls_shared_data_pool_runlock ();
+
+  return owner_wrk;
 }
 
 static u8
 vls_listener_wrk_is_active (vcl_locked_session_t * vls, u32 wrk_index)
 {
-  return (clib_bitmap_get (vls->listeners, wrk_index) == 1);
+  vls_shared_data_t *vls_shd;
+  u8 is_set;
+
+  if (vls->shared_data_index == ~0)
+    {
+      clib_warning ("not a shared session");
+      return 0;
+    }
+
+  vls_shared_data_pool_rlock ();
+
+  vls_shd = vls_shared_data_get (vls->shared_data_index);
+
+  clib_spinlock_lock (&vls_shd->lock);
+  is_set = clib_bitmap_get (vls_shd->listeners, wrk_index);
+  clib_spinlock_unlock (&vls_shd->lock);
+
+  vls_shared_data_pool_runlock ();
+
+  return (is_set == 1);
 }
 
 static void
@@ -316,96 +515,199 @@ vls_listener_wrk_stop_listen (vcl_locked_session_t * vls, u32 wrk_index)
   vls_listener_wrk_set (vls, wrk_index, 0 /* is_active */ );
 }
 
+static int
+vls_shared_data_subscriber_position (vls_shared_data_t * vls_shd,
+                                    u32 wrk_index)
+{
+  int i;
+
+  for (i = 0; i < vec_len (vls_shd->workers_subscribed); i++)
+    {
+      if (vls_shd->workers_subscribed[i] == wrk_index)
+       return i;
+    }
+  return -1;
+}
+
 int
 vls_unshare_session (vcl_locked_session_t * vls, vcl_worker_t * wrk)
 {
-  int i, do_disconnect;
+  vls_shared_data_t *vls_shd;
+  int do_disconnect;
+  u32 n_subscribers, pos;
   vcl_session_t *s;
 
+  ASSERT (vls->shared_data_index != ~0);
+
   s = vcl_session_get (wrk, vls->session_index);
   if (s->session_state == STATE_LISTEN)
     vls_listener_wrk_set (vls, wrk->wrk_index, 0 /* is_active */ );
 
-  for (i = 0; i < vec_len (vls->workers_subscribed); i++)
+  vls_shared_data_pool_rlock ();
+
+  vls_shd = vls_shared_data_get (vls->shared_data_index);
+  clib_spinlock_lock (&vls_shd->lock);
+
+  pos = vls_shared_data_subscriber_position (vls_shd, wrk->wrk_index);
+  if (pos < 0)
     {
-      if (vls->workers_subscribed[i] != wrk->wrk_index)
-       continue;
+      clib_warning ("worker %u not subscribed for vls %u", wrk->wrk_index,
+                   vls->worker_index);
+      goto done;
+    }
 
-      if (s->rx_fifo)
-       {
-         svm_fifo_del_subscriber (s->rx_fifo, wrk->vpp_wrk_index);
-         svm_fifo_del_subscriber (s->tx_fifo, wrk->vpp_wrk_index);
-       }
-      vec_del1 (vls->workers_subscribed, i);
-      do_disconnect = s->session_state == STATE_LISTEN;
-      vcl_session_cleanup (wrk, s, vcl_session_handle (s), do_disconnect);
+  /*
+   * Unsubscribe from share data and fifos
+   */
+  if (s->rx_fifo)
+    {
+      svm_fifo_del_subscriber (s->rx_fifo, wrk->vpp_wrk_index);
+      svm_fifo_del_subscriber (s->tx_fifo, wrk->vpp_wrk_index);
+    }
+  vec_del1 (vls_shd->workers_subscribed, pos);
+
+  /*
+   * Cleanup vcl state
+   */
+  n_subscribers = vec_len (vls_shd->workers_subscribed);
+  do_disconnect = s->session_state == STATE_LISTEN || !n_subscribers;
+  vcl_session_cleanup (wrk, s, vcl_session_handle (s), do_disconnect);
+
+  /*
+   * No subscriber left, cleanup shared data
+   */
+  if (!n_subscribers)
+    {
+      u32 shd_index = vls_shared_data_index (vls_shd);
+
+      clib_spinlock_unlock (&vls_shd->lock);
+      vls_shared_data_pool_runlock ();
+
+      vls_shared_data_free (shd_index);
+
+      /* All locks have been dropped */
       return 0;
     }
 
   /* Return, if this is not the owning worker */
-  if (vls->worker_index != wrk->wrk_index)
-    return 0;
+  if (vls_shd->owner_wrk_index != wrk->wrk_index)
+    goto done;
+
+  ASSERT (vec_len (vls_shd->workers_subscribed));
+
+  /*
+   *  Check if we can change owner or close
+   */
+  vls_shd->owner_wrk_index = vls_shd->workers_subscribed[0];
+  vcl_send_session_worker_update (wrk, s, vls_shd->owner_wrk_index);
+
+  /* XXX is this still needed? */
+  if (vec_len (vls_shd->workers_subscribed) > 1)
+    clib_warning ("more workers need to be updated");
+
+done:
+
+  clib_spinlock_unlock (&vls_shd->lock);
+  vls_shared_data_pool_runlock ();
+
+  return 0;
+}
+
+void
+vls_share_session (vcl_locked_session_t * vls, vls_worker_t * vls_wrk,
+                  vls_worker_t * vls_parent_wrk, vcl_worker_t * vcl_wrk)
+{
+  vcl_locked_session_t *parent_vls;
+  vls_shared_data_t *vls_shd;
+  vcl_session_t *s;
 
-  /* Check if we can change owner or close */
-  if (vec_len (vls->workers_subscribed))
+  s = vcl_session_get (vcl_wrk, vls->session_index);
+  if (!s)
     {
-      vls->worker_index = vls->workers_subscribed[0];
-      vec_del1 (vls->workers_subscribed, 0);
-      vcl_send_session_worker_update (wrk, s, vls->worker_index);
-      if (vec_len (vls->workers_subscribed))
-       clib_warning ("more workers need to be updated");
+      clib_warning ("wrk %u parent %u session %u vls %u NOT AVAILABLE",
+                   vcl_wrk->wrk_index, vls_parent_wrk->wrk_index,
+                   vls->session_index, vls->vls_index);
+      return;
+    }
+
+  /* Reinit session lock */
+  clib_spinlock_init (&vls->lock);
+
+  if (vls->shared_data_index != ~0)
+    {
+      vls_shared_data_pool_rlock ();
+      vls_shd = vls_shared_data_get (vls->shared_data_index);
     }
   else
     {
-      vcl_session_cleanup (wrk, s, vcl_session_handle (s),
-                          1 /* do_disconnect */ );
+      u32 vls_shd_index = vls_shared_data_alloc ();
+
+      vls_shared_data_pool_rlock ();
+
+      vls_shd = vls_shared_data_get (vls_shd_index);
+      vls_shd->owner_wrk_index = vls_parent_wrk->wrk_index;
+      vls->shared_data_index = vls_shd_index;
+
+      /* Update parent shared data */
+      parent_vls = vls_session_get (vls_parent_wrk, vls->vls_index);
+      parent_vls->shared_data_index = vls_shd_index;
+      vec_add1 (vls_shd->workers_subscribed, vls_parent_wrk->wrk_index);
     }
 
-  return 0;
-}
+  clib_spinlock_lock (&vls_shd->lock);
 
-void
-vls_share_vcl_session (vcl_worker_t * wrk, vcl_session_t * s)
-{
-  vcl_locked_session_t *vls;
+  vec_add1 (vls_shd->workers_subscribed, vls_wrk->wrk_index);
+
+  clib_spinlock_unlock (&vls_shd->lock);
+  vls_shared_data_pool_runlock ();
 
-  vls = vls_get (vls_si_to_vlsh (s->session_index));
-  if (!vls)
-    return;
-  vls_lock (vls);
-  vec_add1 (vls->workers_subscribed, wrk->wrk_index);
   if (s->rx_fifo)
     {
-      svm_fifo_add_subscriber (s->rx_fifo, wrk->vpp_wrk_index);
-      svm_fifo_add_subscriber (s->tx_fifo, wrk->vpp_wrk_index);
+      svm_fifo_add_subscriber (s->rx_fifo, vcl_wrk->vpp_wrk_index);
+      svm_fifo_add_subscriber (s->tx_fifo, vcl_wrk->vpp_wrk_index);
     }
   else if (s->session_state == STATE_LISTEN)
     {
       s->session_state = STATE_LISTEN_NO_MQ;
     }
+}
 
-  vls_unlock (vls);
+static void
+vls_share_sessions (vls_worker_t * vls_parent_wrk, vls_worker_t * vls_wrk)
+{
+  vcl_worker_t *vcl_wrk = vcl_worker_get (vls_wrk->wrk_index);
+  vcl_locked_session_t *vls;
+
+  /* *INDENT-OFF* */
+  pool_foreach (vls, vls_wrk->vls_pool, ({
+    vls_share_session (vls, vls_wrk, vls_parent_wrk, vcl_wrk);
+  }));
+  /* *INDENT-ON* */
 }
 
 void
 vls_worker_copy_on_fork (vcl_worker_t * parent_wrk)
 {
+  vls_worker_t *vls_wrk = vls_worker_get_current (), *vls_parent_wrk;
   vcl_worker_t *wrk = vcl_worker_get_current ();
-  vcl_session_t *s;
 
+  /*
+   * init vcl worker
+   */
   wrk->vpp_event_queues = vec_dup (parent_wrk->vpp_event_queues);
   wrk->sessions = pool_dup (parent_wrk->sessions);
   wrk->session_index_by_vpp_handles =
     hash_dup (parent_wrk->session_index_by_vpp_handles);
-  vls_table_rlock ();
 
-  /* *INDENT-OFF* */
-  pool_foreach (s, wrk->sessions, ({
-    vls_share_vcl_session (wrk, s);
-  }));
-  /* *INDENT-ON* */
+  /*
+   * init vls worker
+   */
+  vls_parent_wrk = vls_worker_get (parent_wrk->wrk_index);
+  vls_wrk->session_index_to_vlsh_table =
+    hash_dup (vls_parent_wrk->session_index_to_vlsh_table);
+  vls_wrk->vls_pool = pool_dup (vls_parent_wrk->vls_pool);
 
-  vls_table_runlock ();
+  vls_share_sessions (vls_parent_wrk, vls_wrk);
 }
 
 static void
@@ -620,6 +922,7 @@ vls_mp_checks (vcl_locked_session_t * vls, int is_add)
 {
   vcl_worker_t *wrk = vcl_worker_get_current ();
   vcl_session_t *s;
+  u32 owner_wrk;
 
   s = vcl_session_get (wrk, vls->session_index);
   switch (s->session_state)
@@ -627,8 +930,7 @@ vls_mp_checks (vcl_locked_session_t * vls, int is_add)
     case STATE_LISTEN:
       if (is_add)
        {
-         if (vls->worker_index == wrk->wrk_index)
-           vls_listener_wrk_set (vls, wrk->wrk_index, 1 /* is_active */ );
+         vls_listener_wrk_set (vls, vls->worker_index, 1 /* is_active */ );
          break;
        }
       vls_listener_wrk_stop_listen (vls, vls->worker_index);
@@ -647,8 +949,9 @@ vls_mp_checks (vcl_locked_session_t * vls, int is_add)
        * that it is fine to listen and not accept new sessions for a
        * given listener. Without it, we would accumulate unhandled
        * accepts on the passive worker message queue. */
-      if (!vls_listener_wrk_is_active (vls, vls->worker_index))
-       vls_listener_wrk_stop_listen (vls, vls->worker_index);
+      owner_wrk = vls_shared_get_owner (vls);
+      if (!vls_listener_wrk_is_active (vls, owner_wrk))
+       vls_listener_wrk_stop_listen (vls, owner_wrk);
       break;
     default:
       break;
@@ -713,17 +1016,12 @@ vls_close (vls_handle_t vlsh)
     }
 
   vls_mt_guard (0, VLS_MT_OP_SPOOL);
+
   if (vls_is_shared (vls))
-    {
-      /* At least two workers share the session so vls won't be freed */
-      vls_unshare_session (vls, vcl_worker_get_current ());
-      vls_unlock (vls);
-      vls_mt_unguard ();
-      vls_table_wunlock ();
-      return VPPCOM_OK;
-    }
+    rv = vls_unshare_session (vls, vcl_worker_get_current ());
+  else
+    rv = vppcom_session_close (vls_to_sh (vls));
 
-  rv = vppcom_session_close (vls_to_sh (vls));
   vls_free (vls);
   vls_mt_unguard ();
 
@@ -873,7 +1171,6 @@ vls_unshare_vcl_worker_sessions (vcl_worker_t * wrk)
 
   current_wrk = vcl_get_worker_index ();
   is_current = current_wrk == wrk->wrk_index;
-  vls_table_wlock ();
 
   /* *INDENT-OFF* */
   pool_foreach (s, wrk->sessions, ({
@@ -882,17 +1179,19 @@ vls_unshare_vcl_worker_sessions (vcl_worker_t * wrk)
       vls_unshare_session (vls, wrk);
   }));
   /* *INDENT-ON* */
-
-  vls_table_wunlock ();
 }
 
 static void
 vls_cleanup_vcl_worker (vcl_worker_t * wrk)
 {
+  vls_worker_t *vls_wrk = vls_worker_get (wrk->wrk_index);
+
   /* Unshare sessions and also cleanup worker since child may have
    * called _exit () and therefore vcl may not catch the event */
   vls_unshare_vcl_worker_sessions (wrk);
   vcl_worker_cleanup (wrk, 1 /* notify vpp */ );
+
+  vls_worker_free (vls_wrk);
 }
 
 static void
@@ -996,7 +1295,7 @@ vls_app_fork_child_handler (void)
        parent_wrk_index);
 
   /*
-   * Allocate worker
+   * Allocate worker vcl
    */
   vcl_set_worker_index (~0);
   if (!vcl_worker_alloc_and_init ())
@@ -1017,6 +1316,11 @@ vls_app_fork_child_handler (void)
       return;
     }
 
+  /*
+   * Allocate/initialize vls worker
+   */
+  vls_worker_alloc ();
+
   /*
    * Register worker with vpp and share sessions
    */
@@ -1047,8 +1351,11 @@ vls_app_fork_parent_handler (void)
 void
 vls_app_exit (void)
 {
+  vls_worker_t *wrk = vls_worker_get_current ();
+
   /* Unshare the sessions. VCL will clean up the worker */
   vls_unshare_vcl_worker_sessions (vcl_worker_get_current ());
+  vls_worker_free (wrk);
 }
 
 int
@@ -1058,12 +1365,17 @@ vls_app_create (char *app_name)
 
   if ((rv = vppcom_app_create (app_name)))
     return rv;
+
   vlsm = clib_mem_alloc (sizeof (vls_main_t));
   clib_memset (vlsm, 0, sizeof (*vlsm));
   clib_rwlock_init (&vlsm->vls_table_lock);
+  clib_rwlock_init (&vlsm->shared_data_lock);
+  pool_alloc (vlsm->workers, vcm->cfg.max_workers);
+
   pthread_atfork (vls_app_pre_fork, vls_app_fork_parent_handler,
                  vls_app_fork_child_handler);
   atexit (vls_app_exit);
+  vls_worker_alloc ();
   vlsl->vls_wrk_index = vcl_get_worker_index ();
   vls_mt_locks_init ();
   return VPPCOM_OK;
index b50bad2..3866563 100644 (file)
@@ -374,7 +374,8 @@ vcl_session_alloc (vcl_worker_t * wrk)
 static inline void
 vcl_session_free (vcl_worker_t * wrk, vcl_session_t * s)
 {
-  VDBG (0, "session %u [0x%llx] removed", s->session_index, s->vpp_handle);
+  /* Debug level set to 1 to avoid debug messages while ldp is cleaning up */
+  VDBG (1, "session %u [0x%llx] removed", s->session_index, s->vpp_handle);
   pool_put (wrk->sessions, s);
 }