vcl: move forking logic to vls
[vpp.git] / src / vcl / vcl_locked.c
index 6254bad..c6c79cd 100644 (file)
 
 typedef struct vcl_locked_session_
 {
+  clib_spinlock_t lock;
   u32 session_index;
   u32 worker_index;
   u32 vls_index;
   u32 flags;
-  clib_spinlock_t lock;
+  u32 *workers_subscribed;
 } vcl_locked_session_t;
 
 typedef struct vcl_main_
@@ -62,7 +63,7 @@ vls_table_wunlock (void)
 static inline vcl_session_handle_t
 vls_to_sh (vcl_locked_session_t * vls)
 {
-  return vppcom_session_handle (vls->session_index);
+  return vcl_session_handle_from_index (vls->session_index);
 }
 
 static inline vcl_session_handle_t
@@ -164,6 +165,86 @@ vls_get_and_free (vls_handle_t vlsh)
   vls_table_wunlock ();
 }
 
+u8
+vls_is_shared (vcl_locked_session_t * vls)
+{
+  return vec_len (vls->workers_subscribed);
+}
+
+int
+vls_unshare_session (vcl_locked_session_t * vls)
+{
+  vcl_worker_t *wrk = vcl_worker_get_current ();
+  vcl_session_t *s;
+  int i;
+
+  for (i = 0; i < vec_len (vls->workers_subscribed); i++)
+    {
+      if (vls->workers_subscribed[i] != wrk->wrk_index)
+       continue;
+
+      s = vcl_session_get (wrk, vls->session_index);
+      if (s->rx_fifo)
+       {
+         svm_fifo_del_subscriber (s->rx_fifo, wrk->vpp_wrk_index);
+         svm_fifo_del_subscriber (s->tx_fifo, wrk->vpp_wrk_index);
+       }
+      vec_del1 (vls->workers_subscribed, i);
+      vcl_session_cleanup (wrk, s, vcl_session_handle (s),
+                          0 /* do_disconnect */ );
+      return 0;
+    }
+
+  /* Assumption is that unshare is only called if session is shared.
+   * So shared_workers must be non-empty if the worker is the owner */
+  if (vls->worker_index == wrk->wrk_index)
+    {
+      s = vcl_session_get (wrk, vls->session_index);
+      vls->worker_index = vls->workers_subscribed[0];
+      vec_del1 (vls->workers_subscribed, 0);
+      vcl_send_session_worker_update (wrk, s, vls->worker_index);
+      if (vec_len (vls->workers_subscribed))
+       clib_warning ("more workers need to be updated");
+    }
+
+  return 0;
+}
+
+void
+vls_share_vcl_session (vcl_worker_t * wrk, vcl_session_t * s)
+{
+  vcl_locked_session_t *vls;
+
+  vls = vls_get_w_dlock (vls_session_index_to_vlsh (s->session_index));
+  if (!vls)
+    return;
+  vec_add1 (vls->workers_subscribed, wrk->wrk_index);
+  if (s->rx_fifo)
+    {
+      svm_fifo_add_subscriber (s->rx_fifo, wrk->vpp_wrk_index);
+      svm_fifo_add_subscriber (s->tx_fifo, wrk->vpp_wrk_index);
+    }
+  vls_dunlock (vls);
+}
+
+void
+vls_worker_copy_on_fork (vcl_worker_t * parent_wrk)
+{
+  vcl_worker_t *wrk = vcl_worker_get_current ();
+  vcl_session_t *s;
+
+  wrk->vpp_event_queues = vec_dup (parent_wrk->vpp_event_queues);
+  wrk->sessions = pool_dup (parent_wrk->sessions);
+  wrk->session_index_by_vpp_handles =
+    hash_dup (parent_wrk->session_index_by_vpp_handles);
+
+  /* *INDENT-OFF* */
+  pool_foreach (s, wrk->sessions, ({
+    vls_share_vcl_session (wrk, s);
+  }));
+  /* *INDENT-ON* */
+}
+
 int
 vls_write (vls_handle_t vlsh, void *buf, size_t nbytes)
 {
@@ -325,13 +406,19 @@ vls_close (vls_handle_t vlsh)
 {
   vcl_locked_session_t *vls;
   vcl_session_handle_t sh;
-  int rv, refcnt;
+  int rv;
 
   if (!(vls = vls_get_w_dlock (vlsh)))
     return VPPCOM_EBADFD;
 
+  if (vls_is_shared (vls))
+    {
+      vls_unshare_session (vls);
+      vls_dunlock (vls);
+      return VPPCOM_OK;
+    }
+
   sh = vls_to_sh (vls);
-  refcnt = vppcom_session_attr (sh, VPPCOM_ATTR_GET_REFCNT, 0, 0);
   if ((rv = vppcom_session_close (sh)))
     {
       vls_dunlock (vls);
@@ -339,8 +426,7 @@ vls_close (vls_handle_t vlsh)
     }
 
   vls_dunlock (vls);
-  if (refcnt <= 1)
-    vls_get_and_free (vlsh);
+  vls_get_and_free (vlsh);
   return rv;
 }
 
@@ -438,6 +524,148 @@ vls_session_index_to_vlsh (uint32_t session_index)
   return vlsh;
 }
 
+static void
+vls_cleanup_forked_child (vcl_worker_t * wrk, vcl_worker_t * child_wrk)
+{
+  vcl_worker_t *sub_child;
+  int tries = 0;
+
+  if (child_wrk->forked_child != ~0)
+    {
+      sub_child = vcl_worker_get_if_valid (child_wrk->forked_child);
+      if (sub_child)
+       {
+         /* Wait a bit, maybe the process is going away */
+         while (kill (sub_child->current_pid, 0) >= 0 && tries++ < 50)
+           usleep (1e3);
+         if (kill (sub_child->current_pid, 0) < 0)
+           vls_cleanup_forked_child (child_wrk, sub_child);
+       }
+    }
+  vcl_worker_cleanup (child_wrk, 1 /* notify vpp */ );
+  VDBG (0, "Cleaned up wrk %u", child_wrk->wrk_index);
+  wrk->forked_child = ~0;
+}
+
+static struct sigaction old_sa;
+
+static void
+vls_intercept_sigchld_handler (int signum, siginfo_t * si, void *uc)
+{
+  vcl_worker_t *wrk, *child_wrk;
+
+  if (vcl_get_worker_index () == ~0)
+    return;
+
+  if (sigaction (SIGCHLD, &old_sa, 0))
+    {
+      VERR ("couldn't restore sigchld");
+      exit (-1);
+    }
+
+  wrk = vcl_worker_get_current ();
+  if (wrk->forked_child == ~0)
+    return;
+
+  child_wrk = vcl_worker_get_if_valid (wrk->forked_child);
+  if (!child_wrk)
+    goto done;
+
+  if (si && si->si_pid != child_wrk->current_pid)
+    {
+      VDBG (0, "unexpected child pid %u", si->si_pid);
+      goto done;
+    }
+  vls_cleanup_forked_child (wrk, child_wrk);
+
+done:
+  if (old_sa.sa_flags & SA_SIGINFO)
+    {
+      void (*fn) (int, siginfo_t *, void *) = old_sa.sa_sigaction;
+      fn (signum, si, uc);
+    }
+  else
+    {
+      void (*fn) (int) = old_sa.sa_handler;
+      if (fn)
+       fn (signum);
+    }
+}
+
+static void
+vls_incercept_sigchld ()
+{
+  struct sigaction sa;
+  clib_memset (&sa, 0, sizeof (sa));
+  sa.sa_sigaction = vls_intercept_sigchld_handler;
+  sa.sa_flags = SA_SIGINFO;
+  if (sigaction (SIGCHLD, &sa, &old_sa))
+    {
+      VERR ("couldn't intercept sigchld");
+      exit (-1);
+    }
+}
+
+static void
+vls_app_pre_fork (void)
+{
+  vls_incercept_sigchld ();
+  vcl_flush_mq_events ();
+}
+
+static void
+vls_app_fork_child_handler (void)
+{
+  vcl_worker_t *parent_wrk;
+  int rv, parent_wrk_index;
+  u8 *child_name;
+
+  parent_wrk_index = vcl_get_worker_index ();
+  VDBG (0, "initializing forked child %u with parent wrk %u", getpid (),
+       parent_wrk_index);
+
+  /*
+   * Allocate worker
+   */
+  vcl_set_worker_index (~0);
+  if (!vcl_worker_alloc_and_init ())
+    VERR ("couldn't allocate new worker");
+
+  /*
+   * Attach to binary api
+   */
+  child_name = format (0, "%v-child-%u%c", vcm->app_name, getpid (), 0);
+  vcl_cleanup_bapi ();
+  vppcom_api_hookup ();
+  vcm->app_state = STATE_APP_START;
+  rv = vppcom_connect_to_vpp ((char *) child_name);
+  vec_free (child_name);
+  if (rv)
+    {
+      VERR ("couldn't connect to VPP!");
+      return;
+    }
+
+  /*
+   * Register worker with vpp and share sessions
+   */
+  vcl_worker_register_with_vpp ();
+  parent_wrk = vcl_worker_get (parent_wrk_index);
+  vls_worker_copy_on_fork (parent_wrk);
+  parent_wrk->forked_child = vcl_get_worker_index ();
+
+  VDBG (0, "forked child main worker initialized");
+  vcm->forking = 0;
+}
+
+static void
+vls_app_fork_parent_handler (void)
+{
+  vcm->forking = 1;
+  while (vcm->forking)
+    ;
+}
+
 int
 vls_app_create (char *app_name)
 {
@@ -445,6 +673,8 @@ vls_app_create (char *app_name)
   if ((rv = vppcom_app_create (app_name)))
     return rv;
   clib_rwlock_init (&vlsm->vls_table_lock);
+  pthread_atfork (vls_app_pre_fork, vls_app_fork_parent_handler,
+                 vls_app_fork_child_handler);
   return VPPCOM_OK;
 }