vcl: improve vls handling of shared listeners
[vpp.git] / src / vcl / vcl_locked.c
index b006d89..8b0c66d 100644 (file)
@@ -118,6 +118,7 @@ typedef struct vls_local_
 {
   int vls_wrk_index;                 /**< vls wrk index, 1 per process */
   volatile int vls_mt_n_threads;      /**< number of threads detected */
+  clib_rwlock_t vls_pool_lock;       /**< per process/wrk vls pool locks */
   pthread_mutex_t vls_mt_mq_mlock;    /**< vcl mq lock */
   pthread_mutex_t vls_mt_spool_mlock; /**< vcl select or pool lock */
   volatile u8 select_mp_check;       /**< flag set if select checks done */
@@ -129,7 +130,6 @@ static vls_process_local_t *vlsl = &vls_local;
 typedef struct vls_main_
 {
   vls_worker_t *workers;              /**< pool of vls workers */
-  clib_rwlock_t vls_pool_lock;        /**< per process/wrk vls pool locks */
   vls_shared_data_t *shared_data_pool; /**< inter proc pool of shared data */
   clib_rwlock_t shared_data_lock;      /**< shared data pool lock */
   clib_spinlock_t worker_rpc_lock;     /**< lock for inter-worker rpcs */
@@ -247,28 +247,28 @@ static inline void
 vls_mt_pool_rlock (void)
 {
   if (vlsl->vls_mt_n_threads > 1)
-    clib_rwlock_reader_lock (&vlsm->vls_pool_lock);
+    clib_rwlock_reader_lock (&vlsl->vls_pool_lock);
 }
 
 static inline void
 vls_mt_pool_runlock (void)
 {
   if (vlsl->vls_mt_n_threads > 1)
-    clib_rwlock_reader_unlock (&vlsm->vls_pool_lock);
+    clib_rwlock_reader_unlock (&vlsl->vls_pool_lock);
 }
 
 static inline void
 vls_mt_pool_wlock (void)
 {
   if (vlsl->vls_mt_n_threads > 1)
-    clib_rwlock_writer_lock (&vlsm->vls_pool_lock);
+    clib_rwlock_writer_lock (&vlsl->vls_pool_lock);
 }
 
 static inline void
 vls_mt_pool_wunlock (void)
 {
   if (vlsl->vls_mt_n_threads > 1)
-    clib_rwlock_writer_unlock (&vlsm->vls_pool_lock);
+    clib_rwlock_writer_unlock (&vlsl->vls_pool_lock);
 }
 
 typedef enum
@@ -619,7 +619,8 @@ vls_listener_wrk_set (vcl_locked_session_t * vls, u32 wrk_index, u8 is_active)
   vls_shd = vls_shared_data_get (vls->shared_data_index);
 
   clib_spinlock_lock (&vls_shd->lock);
-  clib_bitmap_set (vls_shd->listeners, wrk_index, is_active);
+  vls_shd->listeners =
+    clib_bitmap_set (vls_shd->listeners, wrk_index, is_active);
   clib_spinlock_unlock (&vls_shd->lock);
 
   vls_shared_data_pool_runlock ();
@@ -669,8 +670,19 @@ vls_listener_wrk_is_active (vcl_locked_session_t * vls, u32 wrk_index)
 static void
 vls_listener_wrk_start_listen (vcl_locked_session_t * vls, u32 wrk_index)
 {
-  vppcom_session_listen (vls_to_sh (vls), ~0);
-  vls_listener_wrk_set (vls, wrk_index, 1 /* is_active */ );
+  vcl_worker_t *wrk;
+  vcl_session_t *ls;
+
+  wrk = vcl_worker_get (wrk_index);
+  ls = vcl_session_get (wrk, vls->session_index);
+
+  /* Listen request already sent */
+  if (ls->flags & VCL_SESSION_F_PENDING_LISTEN)
+    return;
+
+  vcl_send_session_listen (wrk, ls);
+
+  vls_listener_wrk_set (vls, wrk_index, 1 /* is_active */);
 }
 
 static void
@@ -1480,17 +1492,33 @@ vls_epoll_ctl (vls_handle_t ep_vlsh, int op, vls_handle_t vlsh,
 
   vls_mt_detect ();
   vls_mt_pool_rlock ();
+
   ep_vls = vls_get_and_lock (ep_vlsh);
+  if (PREDICT_FALSE (!ep_vls))
+    {
+      vls_mt_pool_runlock ();
+      return VPPCOM_EBADFD;
+    }
 
   if (vls_mt_session_should_migrate (ep_vls))
     {
       ep_vls = vls_mt_session_migrate (ep_vls);
       if (PREDICT_FALSE (!ep_vls))
-       return VPPCOM_EBADFD;
+       {
+         vls_mt_pool_runlock ();
+         return VPPCOM_EBADFD;
+       }
     }
 
-  ep_sh = vls_to_sh (ep_vls);
   vls = vls_get_and_lock (vlsh);
+  if (PREDICT_FALSE (!vls))
+    {
+      vls_unlock (ep_vls);
+      vls_mt_pool_runlock ();
+      return VPPCOM_EBADFD;
+    }
+
+  ep_sh = vls_to_sh (ep_vls);
   sh = vls_to_sh (vls);
 
   vls_epoll_ctl_mp_checks (vls, op);
@@ -1769,6 +1797,7 @@ vls_app_fork_child_handler (void)
   vlsl->vls_mt_n_threads = 0;
   vlsl->vls_wrk_index = vcl_get_worker_index ();
   vlsl->select_mp_check = 0;
+  clib_rwlock_init (&vlsl->vls_pool_lock);
   vls_mt_locks_init ();
 
   parent_wrk = vcl_worker_get (parent_wrk_index);
@@ -1940,7 +1969,6 @@ vls_app_create (char *app_name)
 
   vlsm = clib_mem_alloc (sizeof (vls_main_t));
   clib_memset (vlsm, 0, sizeof (*vlsm));
-  clib_rwlock_init (&vlsm->vls_pool_lock);
   clib_rwlock_init (&vlsm->shared_data_lock);
   clib_spinlock_init (&vlsm->worker_rpc_lock);
   pool_alloc (vlsm->workers, vcm->cfg.max_workers);
@@ -1950,6 +1978,7 @@ vls_app_create (char *app_name)
   atexit (vls_app_exit);
   vls_worker_alloc ();
   vlsl->vls_wrk_index = vcl_get_worker_index ();
+  clib_rwlock_init (&vlsl->vls_pool_lock);
   vls_mt_locks_init ();
   vcm->wrk_rpc_fn = vls_rpc_handler;
   return VPPCOM_OK;