vcl/session: add api for changing session app worker 76/16676/11
authorFlorin Coras <fcoras@cisco.com>
Thu, 3 Jan 2019 03:31:22 +0000 (19:31 -0800)
committerDave Barach <openvpp@barachs.net>
Sat, 5 Jan 2019 21:53:16 +0000 (21:53 +0000)
In case of multi process apps, after forking, the parent may decide to
close part or all of the sessions it shares with the child. Because the
sessions have fifos allocated in the parent's segment manager, they must
be moved to the child's segment manager.

Change-Id: I85b4c8c8545005724023ee14043647719cef61dd
Signed-off-by: Florin Coras <fcoras@cisco.com>
src/vcl/ldp.c
src/vcl/vcl_bapi.c
src/vcl/vcl_private.c
src/vcl/vcl_private.h
src/vcl/vppcom.c
src/vcl/vppcom.h
src/vnet/session/application.c
src/vnet/session/application.h
src/vnet/session/application_interface.h
src/vnet/session/session.h
src/vnet/session/session_node.c

index ee35396..a61acb9 100644 (file)
 typedef struct ldp_fd_entry_
 {
   u32 session_index;
+  u32 worker_index;
   u32 fd;
   u32 fd_index;
   u32 flags;
+  clib_spinlock_t lock;
 } ldp_fd_entry_t;
 
 typedef struct ldp_worker_ctx_
@@ -101,7 +103,7 @@ typedef struct
   ldp_worker_ctx_t *workers;
   int init;
   char app_name[LDP_APP_NAME_MAX];
-  u32 sid_bit_val;
+  u32 sh_bit_val;
   u32 sid_bit_mask;
   u32 debug;
   ldp_fd_entry_t *fd_pool;
@@ -119,7 +121,7 @@ typedef struct
     clib_warning ("ldp<%d>: " _fmt, getpid(), ##_args)
 
 static ldp_main_t ldp_main = {
-  .sid_bit_val = (1 << LDP_SID_BIT_MIN),
+  .sh_bit_val = (1 << LDP_SID_BIT_MIN),
   .sid_bit_mask = (1 << LDP_SID_BIT_MIN) - 1,
   .debug = LDP_DEBUG_INIT,
 };
@@ -154,44 +156,73 @@ ldp_get_app_name ()
   return ldp->app_name;
 }
 
+static inline vcl_session_handle_t
+ldp_fd_entry_sh (ldp_fd_entry_t * fde)
+{
+  return vppcom_session_handle (fde->session_index);
+}
+
 static int
-ldp_fd_alloc (u32 sid)
+ldp_fd_alloc (vcl_session_handle_t sh)
 {
   ldp_fd_entry_t *fde;
 
   clib_rwlock_writer_lock (&ldp->fd_table_lock);
-  if (pool_elts (ldp->fd_pool) >= (1ULL << 32) - ldp->sid_bit_val)
+  if (pool_elts (ldp->fd_pool) >= (1ULL << 32) - ldp->sh_bit_val)
     {
       clib_rwlock_writer_unlock (&ldp->fd_table_lock);
       return -1;
     }
   pool_get (ldp->fd_pool, fde);
-  fde->session_index = vppcom_session_index (sid);
+  fde->session_index = vppcom_session_index (sh);
+  fde->worker_index = vppcom_session_worker (sh);
   fde->fd_index = fde - ldp->fd_pool;
-  fde->fd = fde->fd_index + ldp->sid_bit_val;
+  fde->fd = fde->fd_index + ldp->sh_bit_val;
   hash_set (ldp->session_index_to_fd_table, fde->session_index, fde->fd);
+  clib_spinlock_init (&fde->lock);
   clib_rwlock_writer_unlock (&ldp->fd_table_lock);
   return fde->fd;
 }
 
 static ldp_fd_entry_t *
-ldp_fd_entry_get_w_lock (u32 fd_index)
+ldp_fd_entry_get (u32 fd_index)
 {
-  clib_rwlock_reader_lock (&ldp->fd_table_lock);
   if (pool_is_free_index (ldp->fd_pool, fd_index))
     return 0;
-
   return pool_elt_at_index (ldp->fd_pool, fd_index);
 }
 
+static ldp_fd_entry_t *
+ldp_fd_entry_lock (u32 fd_index)
+{
+  ldp_fd_entry_t *fe;
+  clib_rwlock_reader_lock (&ldp->fd_table_lock);
+  if (pool_is_free_index (ldp->fd_pool, fd_index))
+    {
+      clib_rwlock_reader_unlock (&ldp->fd_table_lock);
+      return 0;
+    }
+
+  fe = pool_elt_at_index (ldp->fd_pool, fd_index);
+  clib_spinlock_lock (&fe->lock);
+  return fe;
+}
+
+static void
+ldp_fd_entry_unlock (ldp_fd_entry_t * fde)
+{
+  clib_spinlock_unlock (&fde->lock);
+  clib_rwlock_reader_unlock (&ldp->fd_table_lock);
+}
+
 static inline int
-ldp_fd_from_sid (u32 sid)
+ldp_fd_from_sh (vcl_session_handle_t sh)
 {
   uword *fdp;
   int fd;
 
   clib_rwlock_reader_lock (&ldp->fd_table_lock);
-  fdp = hash_get (ldp->session_index_to_fd_table, vppcom_session_index (sid));
+  fdp = hash_get (ldp->session_index_to_fd_table, vppcom_session_index (sh));
   fd = fdp ? *fdp : -EMFILE;
   clib_rwlock_reader_unlock (&ldp->fd_table_lock);
 
@@ -199,52 +230,63 @@ ldp_fd_from_sid (u32 sid)
 }
 
 static inline int
-ldp_fd_is_sid (int fd)
+ldp_fd_is_sh (int fd)
 {
-  return fd >= ldp->sid_bit_val;
+  return fd >= ldp->sh_bit_val;
 }
 
 static inline u32
-ldp_sid_from_fd (int fd)
+ldp_sh_from_fd (int fd)
 {
   u32 fd_index, session_index;
   ldp_fd_entry_t *fde;
 
-  if (!ldp_fd_is_sid (fd))
+  if (!ldp_fd_is_sh (fd))
     return INVALID_SESSION_ID;
 
-  fd_index = fd - ldp->sid_bit_val;
-  fde = ldp_fd_entry_get_w_lock (fd_index);
+  fd_index = fd - ldp->sh_bit_val;
+  fde = ldp_fd_entry_lock (fd_index);
   if (!fde)
     {
       LDBG (0, "unknown fd %d", fd);
-      clib_rwlock_reader_unlock (&ldp->fd_table_lock);
       return INVALID_SESSION_ID;
     }
   session_index = fde->session_index;
-  clib_rwlock_reader_unlock (&ldp->fd_table_lock);
+  ldp_fd_entry_unlock (fde);
 
   return vppcom_session_handle (session_index);
 }
 
+static ldp_fd_entry_t *
+ldp_fd_entry_lock_w_fd (int fd)
+{
+  u32 fd_index;
+
+  if (!ldp_fd_is_sh (fd))
+    return 0;
+
+  fd_index = fd - ldp->sh_bit_val;
+  return ldp_fd_entry_lock (fd_index);
+}
+
 static void
-ldp_fd_free_w_sid (u32 sid)
+ldp_fd_free_w_sh (vcl_session_handle_t sh)
 {
   ldp_fd_entry_t *fde;
   u32 fd_index;
   int fd;
 
-  fd = ldp_fd_from_sid (sid);
+  fd = ldp_fd_from_sh (sh);
   if (!fd)
     return;
 
-  fd_index = fd - ldp->sid_bit_val;
-  fde = ldp_fd_entry_get_w_lock (fd_index);
-  if (fde)
-    {
-      hash_unset (ldp->session_index_to_fd_table, fde->session_index);
-      pool_put (ldp->fd_pool, fde);
-    }
+  fd_index = fd - ldp->sh_bit_val;
+  clib_rwlock_writer_lock (&ldp->fd_table_lock);
+  fde = ldp_fd_entry_get (fd_index);
+  ASSERT (fde != 0);
+  hash_unset (ldp->session_index_to_fd_table, fde->session_index);
+  clib_spinlock_free (&fde->lock);
+  pool_put (ldp->fd_pool, fde);
   clib_rwlock_writer_unlock (&ldp->fd_table_lock);
 }
 
@@ -307,38 +349,38 @@ ldp_init (void)
          clib_warning ("LDP<%d>: WARNING: Invalid LDP sid bit specified in"
                        " the env var " LDP_ENV_SID_BIT " (%s)! sid bit "
                        "value %d (0x%x)", getpid (), env_var_str,
-                       ldp->sid_bit_val, ldp->sid_bit_val);
+                       ldp->sh_bit_val, ldp->sh_bit_val);
        }
       else if (sb < LDP_SID_BIT_MIN)
        {
-         ldp->sid_bit_val = (1 << LDP_SID_BIT_MIN);
-         ldp->sid_bit_mask = ldp->sid_bit_val - 1;
+         ldp->sh_bit_val = (1 << LDP_SID_BIT_MIN);
+         ldp->sid_bit_mask = ldp->sh_bit_val - 1;
 
          clib_warning ("LDP<%d>: WARNING: LDP sid bit (%u) specified in the"
                        " env var " LDP_ENV_SID_BIT " (%s) is too small. "
                        "Using LDP_SID_BIT_MIN (%d)! sid bit value %d (0x%x)",
                        getpid (), sb, env_var_str, LDP_SID_BIT_MIN,
-                       ldp->sid_bit_val, ldp->sid_bit_val);
+                       ldp->sh_bit_val, ldp->sh_bit_val);
        }
       else if (sb > LDP_SID_BIT_MAX)
        {
-         ldp->sid_bit_val = (1 << LDP_SID_BIT_MAX);
-         ldp->sid_bit_mask = ldp->sid_bit_val - 1;
+         ldp->sh_bit_val = (1 << LDP_SID_BIT_MAX);
+         ldp->sid_bit_mask = ldp->sh_bit_val - 1;
 
          clib_warning ("LDP<%d>: WARNING: LDP sid bit (%u) specified in the"
                        " env var " LDP_ENV_SID_BIT " (%s) is too big. Using"
                        " LDP_SID_BIT_MAX (%d)! sid bit value %d (0x%x)",
                        getpid (), sb, env_var_str, LDP_SID_BIT_MAX,
-                       ldp->sid_bit_val, ldp->sid_bit_val);
+                       ldp->sh_bit_val, ldp->sh_bit_val);
        }
       else
        {
-         ldp->sid_bit_val = (1 << sb);
-         ldp->sid_bit_mask = ldp->sid_bit_val - 1;
+         ldp->sh_bit_val = (1 << sb);
+         ldp->sid_bit_mask = ldp->sh_bit_val - 1;
 
          LDBG (0, "configured LDP sid bit (%u) from "
                LDP_ENV_SID_BIT "!  sid bit value %d (0x%x)", sb,
-               ldp->sid_bit_val, ldp->sid_bit_val);
+               ldp->sh_bit_val, ldp->sh_bit_val);
        }
     }
 
@@ -352,17 +394,18 @@ ldp_init (void)
 int
 close (int fd)
 {
-  int rv, refcnt;
-  u32 sid = ldp_sid_from_fd (fd);
+  int rv, refcnt, epfd;
+  ldp_fd_entry_t *fde;
+  u32 sh;
 
   if ((errno = -ldp_init ()))
     return -1;
 
-  if (sid != INVALID_SESSION_ID)
+  fde = ldp_fd_entry_lock_w_fd (fd);
+  if (fde)
     {
-      int epfd;
-
-      epfd = vppcom_session_attr (sid, VPPCOM_ATTR_GET_LIBC_EPFD, 0, 0);
+      sh = ldp_fd_entry_sh (fde);
+      epfd = vppcom_session_attr (sh, VPPCOM_ATTR_GET_LIBC_EPFD, 0, 0);
       if (epfd > 0)
        {
          LDBG (0, "fd %d (0x%x): calling libc_close: epfd %u (0x%x)",
@@ -374,7 +417,7 @@ close (int fd)
              u32 size = sizeof (epfd);
              epfd = 0;
 
-             (void) vppcom_session_attr (sid, VPPCOM_ATTR_SET_LIBC_EPFD,
+             (void) vppcom_session_attr (sh, VPPCOM_ATTR_SET_LIBC_EPFD,
                                          &epfd, &size);
            }
        }
@@ -382,21 +425,24 @@ close (int fd)
        {
          errno = -epfd;
          rv = -1;
+         ldp_fd_entry_unlock (fde);
          goto done;
        }
 
       LDBG (0, "fd %d (0x%x): calling vppcom_session_close: sid %u (0x%x)",
-           fd, fd, sid, sid);
+           fd, fd, sh, sh);
 
-      refcnt = vppcom_session_attr (sid, VPPCOM_ATTR_GET_REFCNT, 0, 0);
-      rv = vppcom_session_close (sid);
+      refcnt = vppcom_session_attr (sh, VPPCOM_ATTR_GET_REFCNT, 0, 0);
+      rv = vppcom_session_close (sh);
       if (rv != VPPCOM_OK)
        {
          errno = -rv;
          rv = -1;
        }
+
+      ldp_fd_entry_unlock (fde);
       if (refcnt <= 1)
-       ldp_fd_free_w_sid (sid);
+       ldp_fd_free_w_sh (sh);
     }
   else
     {
@@ -413,23 +459,27 @@ done:
 ssize_t
 read (int fd, void *buf, size_t nbytes)
 {
+  vcl_session_handle_t sh;
+  ldp_fd_entry_t *fde;
   ssize_t size;
-  u32 sid = ldp_sid_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
 
-  if (sid != INVALID_SESSION_ID)
+  fde = ldp_fd_entry_lock_w_fd (fd);
+  if (fde)
     {
+      sh = ldp_fd_entry_sh (fde);
       LDBG (2, "fd %d (0x%x): calling vppcom_session_read(): sid %u (0x%x),"
-           " buf %p, nbytes %u", fd, fd, sid, sid, buf, nbytes);
+           " buf %p, nbytes %u", fd, fd, sh, sh, buf, nbytes);
 
-      size = vppcom_session_read (sid, buf, nbytes);
+      size = vppcom_session_read (sh, buf, nbytes);
       if (size < 0)
        {
          errno = -size;
          size = -1;
        }
+      ldp_fd_entry_unlock (fde);
     }
   else
     {
@@ -447,7 +497,7 @@ ssize_t
 readv (int fd, const struct iovec * iov, int iovcnt)
 {
   ssize_t size = 0;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
   int rv = 0, i, total = 0;
 
   if ((errno = -ldp_init ()))
@@ -504,23 +554,27 @@ readv (int fd, const struct iovec * iov, int iovcnt)
 ssize_t
 write (int fd, const void *buf, size_t nbytes)
 {
+  vcl_session_handle_t sh;
+  ldp_fd_entry_t *fde;
   ssize_t size = 0;
-  u32 sid = ldp_sid_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
 
-  if (sid != INVALID_SESSION_ID)
+  fde = ldp_fd_entry_lock_w_fd (fd);
+  if (fde)
     {
+      sh = ldp_fd_entry_sh (fde);
       LDBG (2, "fd %d (0x%x): calling vppcom_session_write(): sid %u (0x%x), "
-           "buf %p, nbytes %u", fd, fd, sid, sid, buf, nbytes);
+           "buf %p, nbytes %u", fd, fd, sh, sh, buf, nbytes);
 
-      size = vppcom_session_write_msg (sid, (void *) buf, nbytes);
+      size = vppcom_session_write_msg (sh, (void *) buf, nbytes);
       if (size < 0)
        {
          errno = -size;
          size = -1;
        }
+      ldp_fd_entry_unlock (fde);
     }
   else
     {
@@ -538,7 +592,7 @@ ssize_t
 writev (int fd, const struct iovec * iov, int iovcnt)
 {
   ssize_t size = 0, total = 0;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
   int i, rv = 0;
 
   /*
@@ -590,7 +644,7 @@ fcntl (int fd, int cmd, ...)
   const char *func_str = __func__;
   int rv = 0;
   va_list ap;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -681,7 +735,7 @@ ioctl (int fd, unsigned long int cmd, ...)
   const char *func_str;
   int rv;
   va_list ap;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -808,7 +862,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds,
     time_out = -1;
 
 
-  if (nfds <= ldp->sid_bit_val)
+  if (nfds <= ldp->sh_bit_val)
     {
       func_str = "libc_pselect";
 
@@ -821,11 +875,11 @@ ldp_pselect (int nfds, fd_set * __restrict readfds,
       goto done;
     }
 
-  if (PREDICT_FALSE (ldp->sid_bit_val > FD_SETSIZE / 2))
+  if (PREDICT_FALSE (ldp->sh_bit_val > FD_SETSIZE / 2))
     {
       clib_warning ("LDP<%d>: ERROR: LDP sid bit value %d (0x%x) > "
                    "FD_SETSIZE/2 %d (0x%x)!", getpid (),
-                   ldp->sid_bit_val, ldp->sid_bit_val,
+                   ldp->sh_bit_val, ldp->sh_bit_val,
                    FD_SETSIZE / 2, FD_SETSIZE / 2);
       errno = EOVERFLOW;
       return -1;
@@ -845,7 +899,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds,
       clib_bitmap_foreach (fd, ldpw->rd_bitmap, ({
        if (fd > nfds)
          break;
-        sid = ldp_sid_from_fd (fd);
+        sid = ldp_sh_from_fd (fd);
         LDBG (3, "readfds: fd %d (0x%x), sid %u (0x%x)", fd, fd, sid, sid);
         if (sid == INVALID_SESSION_ID)
           clib_bitmap_set_no_check (ldpw->libc_rd_bitmap, fd, 1);
@@ -877,7 +931,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds,
       clib_bitmap_foreach (fd, ldpw->wr_bitmap, ({
        if (fd > nfds)
          break;
-        sid = ldp_sid_from_fd (fd);
+        sid = ldp_sh_from_fd (fd);
         LDBG (3, "writefds: fd %d (0x%x), sid %u (0x%x)", fd, fd, sid, sid);
         if (sid == INVALID_SESSION_ID)
           clib_bitmap_set_no_check (ldpw->libc_wr_bitmap, fd, 1);
@@ -909,7 +963,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds,
       clib_bitmap_foreach (fd, ldpw->ex_bitmap, ({
        if (fd > nfds)
          break;
-        sid = ldp_sid_from_fd (fd);
+        sid = ldp_sh_from_fd (fd);
         LDBG (3, "exceptfds: fd %d (0x%x), sid %u (0x%x)", fd, fd, sid, sid);
         if (sid == INVALID_SESSION_ID)
           clib_bitmap_set_no_check (ldpw->libc_ex_bitmap, fd, 1);
@@ -977,7 +1031,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds,
                       /* *INDENT-OFF* */
                       clib_bitmap_foreach (sid, ldpw->rd_bitmap,
                         ({
-                          fd = ldp_fd_from_sid (vppcom_session_handle (sid));
+                          fd = ldp_fd_from_sh (vppcom_session_handle (sid));
                           if (PREDICT_FALSE (fd < 0))
                             {
                               errno = EBADFD;
@@ -993,7 +1047,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds,
                       /* *INDENT-OFF* */
                       clib_bitmap_foreach (sid, ldpw->wr_bitmap,
                         ({
-                          fd = ldp_fd_from_sid (vppcom_session_handle (sid));
+                          fd = ldp_fd_from_sh (vppcom_session_handle (sid));
                           if (PREDICT_FALSE (fd < 0))
                             {
                               errno = EBADFD;
@@ -1009,7 +1063,7 @@ ldp_pselect (int nfds, fd_set * __restrict readfds,
                       /* *INDENT-OFF* */
                       clib_bitmap_foreach (sid, ldpw->ex_bitmap,
                         ({
-                          fd = ldp_fd_from_sid (vppcom_session_handle (sid));
+                          fd = ldp_fd_from_sh (vppcom_session_handle (sid));
                           if (PREDICT_FALSE (fd < 0))
                             {
                               errno = EBADFD;
@@ -1237,7 +1291,7 @@ int
 bind (int fd, __CONST_SOCKADDR_ARG addr, socklen_t len)
 {
   int rv;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -1361,7 +1415,7 @@ getsockname (int fd, __SOCKADDR_ARG addr, socklen_t * __restrict len)
 {
   int rv;
   const char *func_str;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -1430,7 +1484,7 @@ int
 connect (int fd, __CONST_SOCKADDR_ARG addr, socklen_t len)
 {
   int rv;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -1516,7 +1570,7 @@ getpeername (int fd, __SOCKADDR_ARG addr, socklen_t * __restrict len)
 {
   int rv;
   const char *func_str;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -1586,7 +1640,7 @@ send (int fd, const void *buf, size_t n, int flags)
 {
   ssize_t size;
   const char *func_str;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -1644,7 +1698,7 @@ sendfile (int out_fd, int in_fd, off_t * offset, size_t len)
   ldp_worker_ctx_t *ldpw = ldp_worker_get_current ();
   ssize_t size = 0;
   const char *func_str;
-  u32 sid = ldp_sid_from_fd (out_fd);
+  u32 sid = ldp_sh_from_fd (out_fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -1888,7 +1942,7 @@ recv (int fd, void *buf, size_t n, int flags)
   if ((errno = -ldp_init ()))
     return -1;
 
-  sid = ldp_sid_from_fd (fd);
+  sid = ldp_sh_from_fd (fd);
   if (sid != INVALID_SESSION_ID)
     {
       LDBG (2, "fd %d (0x%x): calling vcl recvfrom: sid %u (0x%x), buf %p,"
@@ -1915,7 +1969,7 @@ sendto (int fd, const void *buf, size_t n, int flags,
 {
   ssize_t size;
   const char *func_str = __func__;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -2006,7 +2060,7 @@ recvfrom (int fd, void *__restrict buf, size_t n, int flags,
 {
   ssize_t size;
   const char *func_str;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -2076,7 +2130,7 @@ sendmsg (int fd, const struct msghdr * message, int flags)
 {
   ssize_t size;
   const char *func_str;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -2125,7 +2179,7 @@ sendmmsg (int fd, struct mmsghdr *vmessages, unsigned int vlen, int flags)
 {
   ssize_t size;
   const char *func_str;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -2172,7 +2226,7 @@ recvmsg (int fd, struct msghdr * message, int flags)
 {
   ssize_t size;
   const char *func_str;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -2222,7 +2276,7 @@ recvmmsg (int fd, struct mmsghdr *vmessages,
 {
   ssize_t size;
   const char *func_str;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -2271,7 +2325,7 @@ getsockopt (int fd, int level, int optname,
 {
   int rv;
   const char *func_str = __func__;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
   u32 buflen = optlen ? (u32) * optlen : 0;
 
   if ((errno = -ldp_init ()))
@@ -2499,7 +2553,7 @@ setsockopt (int fd, int level, int optname,
 {
   int rv;
   const char *func_str = __func__;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -2663,7 +2717,7 @@ int
 listen (int fd, int n)
 {
   int rv;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -2696,13 +2750,14 @@ ldp_accept4 (int listen_fd, __SOCKADDR_ARG addr,
             socklen_t * __restrict addr_len, int flags)
 {
   int rv;
-  u32 listen_sid = ldp_sid_from_fd (listen_fd);
-  int accept_sid;
+  u32 listen_sh;
+  int accept_sh;
 
   if ((errno = -ldp_init ()))
     return -1;
 
-  if (listen_sid != INVALID_SESSION_ID)
+  listen_sh = ldp_sh_from_fd (listen_fd);
+  if (listen_sh != INVALID_SESSION_ID)
     {
       vppcom_endpt_t ep;
       u8 src_addr[sizeof (struct sockaddr_in6)];
@@ -2711,12 +2766,12 @@ ldp_accept4 (int listen_fd, __SOCKADDR_ARG addr,
 
       LDBG (0, "listen fd %d (0x%x): calling vppcom_session_accept:"
            " listen sid %u (0x%x), ep %p, flags 0x%x", listen_fd,
-           listen_fd, listen_sid, listen_sid, ep, flags);
+           listen_fd, listen_sh, listen_sh, ep, flags);
 
-      accept_sid = vppcom_session_accept (listen_sid, &ep, flags);
-      if (accept_sid < 0)
+      accept_sh = vppcom_session_accept (listen_sh, &ep, flags);
+      if (accept_sh < 0)
        {
-         errno = -accept_sid;
+         errno = -accept_sh;
          rv = -1;
        }
       else
@@ -2724,16 +2779,16 @@ ldp_accept4 (int listen_fd, __SOCKADDR_ARG addr,
          rv = ldp_copy_ep_to_sockaddr (addr, addr_len, &ep);
          if (rv != VPPCOM_OK)
            {
-             (void) vppcom_session_close ((u32) accept_sid);
+             (void) vppcom_session_close ((u32) accept_sh);
              errno = -rv;
              rv = -1;
            }
          else
            {
-             rv = ldp_fd_alloc ((u32) accept_sid);
+             rv = ldp_fd_alloc ((u32) accept_sh);
              if (rv < 0)
                {
-                 (void) vppcom_session_close ((u32) accept_sid);
+                 (void) vppcom_session_close ((u32) accept_sh);
                  errno = -rv;
                  rv = -1;
                }
@@ -2776,15 +2831,14 @@ shutdown (int fd, int how)
   if ((errno = -ldp_init ()))
     return -1;
 
-  if (ldp_fd_is_sid (fd))
+  if (ldp_fd_is_sh (fd))
     {
-      u32 fd_index = fd - ldp->sid_bit_val;
+      u32 fd_index = fd - ldp->sh_bit_val;
       ldp_fd_entry_t *fde;
 
-      fde = ldp_fd_entry_get_w_lock (fd_index);
+      fde = ldp_fd_entry_lock (fd_index);
       if (!fde)
        {
-         clib_rwlock_reader_unlock (&ldp->fd_table_lock);
          errno = ENOTCONN;
          return -1;
        }
@@ -2799,7 +2853,7 @@ shutdown (int fd, int how)
       if ((fde->flags & LDP_F_SHUT_RD) && (fde->flags & LDP_F_SHUT_WR))
        rv = close (fd);
 
-      clib_rwlock_reader_unlock (&ldp->fd_table_lock);
+      ldp_fd_entry_unlock (fde);
       LDBG (0, "fd %d (0x%x): calling vcl shutdown: how %d", fd, fd, how);
     }
   else
@@ -2869,7 +2923,7 @@ epoll_create (int size)
 int
 epoll_ctl (int epfd, int op, int fd, struct epoll_event *event)
 {
-  u32 vep_idx = ldp_sid_from_fd (epfd), sid;
+  u32 vep_idx = ldp_sh_from_fd (epfd), sid;
   const char *func_str;
   int rv;
 
@@ -2892,7 +2946,7 @@ epoll_ctl (int epfd, int op, int fd, struct epoll_event *event)
       goto done;
     }
 
-  sid = ldp_sid_from_fd (fd);
+  sid = ldp_sh_from_fd (fd);
 
   LDBG (0, "epfd %d (0x%x), vep_idx %d (0x%x), sid %d (0x%x)",
        epfd, epfd, vep_idx, vep_idx, sid, sid);
@@ -2995,7 +3049,7 @@ ldp_epoll_pwait (int epfd, struct epoll_event *events, int maxevents,
 {
   ldp_worker_ctx_t *ldpw = ldp_worker_get_current ();
   double time_to_wait = (double) 0, time_out, now = 0;
-  u32 vep_idx = ldp_sid_from_fd (epfd);
+  u32 vep_idx = ldp_sh_from_fd (epfd);
   int libc_epfd, rv = 0;
 
   if ((errno = -ldp_init ()))
@@ -3115,7 +3169,7 @@ poll (struct pollfd *fds, nfds_t nfds, int timeout)
       LDBG (3, "fds[%d] fd %d (0x%0x) events = 0x%x revents = 0x%x",
            i, fds[i].fd, fds[i].fd, fds[i].events, fds[i].revents);
 
-      sid = ldp_sid_from_fd (fds[i].fd);
+      sid = ldp_sh_from_fd (fds[i].fd);
       if (sid != INVALID_SESSION_ID)
        {
          fds[i].fd = -fds[i].fd;
index de5e80a..5b9a9d5 100644 (file)
@@ -609,21 +609,6 @@ vppcom_send_unbind_sock (u64 vpp_handle)
   vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & ump);
 }
 
-void
-vppcom_send_accept_session_reply (u64 handle, u32 context, int retval)
-{
-  vcl_worker_t *wrk = vcl_worker_get_current ();
-  vl_api_accept_session_reply_t *rmp;
-
-  rmp = vl_msg_api_alloc (sizeof (*rmp));
-  memset (rmp, 0, sizeof (*rmp));
-  rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
-  rmp->retval = htonl (retval);
-  rmp->context = context;
-  rmp->handle = handle;
-  vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & rmp);
-}
-
 void
 vppcom_send_application_tls_cert_add (vcl_session_t * session, char *cert,
                                      u32 cert_len)
index d82a7ff..3266431 100644 (file)
@@ -375,21 +375,23 @@ vcl_worker_share_session (vcl_worker_t * parent, vcl_worker_t * wrk,
                          vcl_session_t * new_s)
 {
   vcl_shared_session_t *ss;
-  vcl_session_t *s;
+  vcl_session_t *old_s;
 
-  s = vcl_session_get (parent, new_s->session_index);
-  if (s->shared_index == ~0)
+  if (new_s->shared_index == ~0)
     {
       ss = vcl_shared_session_alloc ();
+      ss->session_index = new_s->session_index;
       vec_add1 (ss->workers, parent->wrk_index);
-      s->shared_index = ss->ss_index;
+      vec_add1 (ss->workers, wrk->wrk_index);
+      new_s->shared_index = ss->ss_index;
+      old_s = vcl_session_get (parent, new_s->session_index);
+      old_s->shared_index = ss->ss_index;
     }
   else
     {
-      ss = vcl_shared_session_get (s->shared_index);
+      ss = vcl_shared_session_get (new_s->shared_index);
+      vec_add1 (ss->workers, wrk->wrk_index);
     }
-  new_s->shared_index = ss->ss_index;
-  vec_add1 (ss->workers, wrk->wrk_index);
 }
 
 int
@@ -414,6 +416,12 @@ vcl_worker_unshare_session (vcl_worker_t * wrk, vcl_session_t * s)
       return 1;
     }
 
+  /* If the first removed and not last, start session worker change.
+   * First request goes to vpp and vpp reflects it back to the right
+   * worker */
+  if (i == 0)
+    vcl_send_session_worker_update (wrk, s, ss->workers[0]);
+
   return 0;
 }
 
index 9dce518..2ae4b72 100644 (file)
@@ -69,7 +69,8 @@ typedef enum
   STATE_ACCEPT = 0x08,
   STATE_VPP_CLOSING = 0x10,
   STATE_DISCONNECT = 0x20,
-  STATE_FAILED = 0x40
+  STATE_FAILED = 0x40,
+  STATE_UPDATED = 0x80,
 } session_state_t;
 
 #define SERVER_STATE_OPEN  (STATE_ACCEPT|STATE_VPP_CLOSING)
@@ -144,6 +145,7 @@ typedef struct vcl_shared_session_
 {
   u32 ss_index;
   u32 *workers;
+  u32 session_index;
 } vcl_shared_session_t;
 
 typedef struct
@@ -287,6 +289,8 @@ typedef struct vcl_worker_
   /** Vector of unhandled events */
   session_event_t *unhandled_evts_vector;
 
+  u32 *pending_session_wrk_updates;
+
   /** Used also as a thread stop key buffer */
   pthread_t thread_id;
 
@@ -517,6 +521,7 @@ int vcl_worker_register_with_vpp (void);
 int vcl_worker_set_bapi (void);
 void vcl_worker_share_sessions (vcl_worker_t * parent_wrk);
 int vcl_worker_unshare_session (vcl_worker_t * wrk, vcl_session_t * s);
+vcl_shared_session_t *vcl_shared_session_get (u32 ss_index);
 int vcl_session_get_refcnt (vcl_session_t * s);
 
 void vcl_segment_table_add (u64 segment_handle, u32 svm_segment_index);
@@ -543,6 +548,17 @@ vcl_worker_get_current (void)
   return vcl_worker_get (vcl_get_worker_index ());
 }
 
+static inline svm_msg_q_t *
+vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s)
+{
+  if (vcl_session_is_ct (s))
+    return wrk->vpp_event_queues[0];
+  else
+    return wrk->vpp_event_queues[s->vpp_thread_index];
+}
+
+void vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s,
+                                    u32 wrk_index);
 /*
  * VCL Binary API
  */
@@ -556,12 +572,10 @@ void vppcom_send_disconnect_session (u64 vpp_handle);
 void vppcom_send_bind_sock (vcl_session_t * session);
 void vppcom_send_unbind_sock (u64 vpp_handle);
 void vppcom_api_hookup (void);
-void vppcom_send_accept_session_reply (u64 vpp_handle, u32 context, int rv);
 void vppcom_send_application_tls_cert_add (vcl_session_t * session,
                                           char *cert, u32 cert_len);
-void
-vppcom_send_application_tls_key_add (vcl_session_t * session, char *key,
-                                    u32 key_len);
+void vppcom_send_application_tls_key_add (vcl_session_t * session, char *key,
+                                         u32 key_len);
 void vcl_send_app_worker_add_del (u8 is_add);
 void vcl_send_child_worker_del (vcl_worker_t * wrk);
 
index 70afdce..6a1bf1c 100644 (file)
@@ -44,6 +44,22 @@ vcl_wait_for_segment (u64 segment_handle)
   return 1;
 }
 
+static inline int
+vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq)
+{
+  svm_msg_q_msg_t *msg;
+  u32 n_msgs;
+  int i;
+
+  n_msgs = svm_msg_q_size (mq);
+  for (i = 0; i < n_msgs; i++)
+    {
+      vec_add2 (wrk->mq_msg_vector, msg, 1);
+      svm_msg_q_sub_w_lock (mq, msg);
+    }
+  return n_msgs;
+}
+
 const char *
 vppcom_session_state_str (session_state_t state)
 {
@@ -175,15 +191,6 @@ format_ip46_address (u8 * s, va_list * args)
  */
 
 
-static svm_msg_q_t *
-vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s)
-{
-  if (vcl_session_is_ct (s))
-    return wrk->vpp_event_queues[0];
-  else
-    return wrk->vpp_event_queues[s->vpp_thread_index];
-}
-
 static void
 vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
                                 session_handle_t handle, int retval)
@@ -227,6 +234,24 @@ vcl_send_session_reset_reply (svm_msg_q_t * mq, u32 context,
   app_send_ctrl_evt_to_vpp (mq, app_evt);
 }
 
+void
+vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s,
+                               u32 wrk_index)
+{
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  session_worker_update_msg_t *mp;
+  svm_msg_q_t *mq;
+
+  mq = vcl_session_vpp_evt_q (wrk, s);
+  app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_WORKER_UPDATE);
+  mp = (session_worker_update_msg_t *) app_evt->evt->data;
+  mp->client_index = wrk->my_client_index;
+  mp->handle = s->vpp_handle;
+  mp->req_wrk_index = wrk->vpp_wrk_index;
+  mp->wrk_index = wrk_index;
+  app_send_ctrl_evt_to_vpp (mq, app_evt);
+}
+
 static u32
 vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp)
 {
@@ -540,7 +565,6 @@ vcl_session_disconnected_handler (vcl_worker_t * wrk,
   /* Caught a disconnect before actually accepting the session */
   if (session->session_state == STATE_LISTEN)
     {
-
       if (!vcl_flag_accepted_session (session, msg->handle,
                                      VCL_ACCEPTED_F_CLOSED))
        VDBG (0, "session was not accepted!");
@@ -551,6 +575,59 @@ vcl_session_disconnected_handler (vcl_worker_t * wrk,
   return session;
 }
 
+static void
+vcl_session_req_worker_update_handler (vcl_worker_t * wrk, void *data)
+{
+  session_req_worker_update_msg_t *msg;
+  vcl_session_t *s;
+
+  msg = (session_req_worker_update_msg_t *) data;
+  s = vcl_session_get_w_vpp_handle (wrk, msg->session_handle);
+  if (!s)
+    return;
+
+  vec_add1 (wrk->pending_session_wrk_updates, s->session_index);
+}
+
+static void
+vcl_session_worker_update_reply_handler (vcl_worker_t * wrk, void *data)
+{
+  session_worker_update_reply_msg_t *msg;
+  vcl_session_t *s;
+
+  msg = (session_worker_update_reply_msg_t *) data;
+  s = vcl_session_get_w_vpp_handle (wrk, msg->handle);
+  if (!s)
+    {
+      VDBG (0, "unknown handle 0x%llx", msg->handle);
+      return;
+    }
+  if (vcl_wait_for_segment (msg->segment_handle))
+    {
+      clib_warning ("segment for session %u couldn't be mounted!",
+                   s->session_index);
+      return;
+    }
+  s->rx_fifo = uword_to_pointer (msg->rx_fifo, svm_fifo_t *);
+  s->tx_fifo = uword_to_pointer (msg->tx_fifo, svm_fifo_t *);
+
+  s->rx_fifo->client_session_index = s->session_index;
+  s->tx_fifo->client_session_index = s->session_index;
+  s->rx_fifo->client_thread_index = wrk->wrk_index;
+  s->tx_fifo->client_thread_index = wrk->wrk_index;
+  s->session_state = STATE_UPDATED;
+
+  if (s->shared_index != VCL_INVALID_SESSION_INDEX)
+    {
+      vcl_shared_session_t *ss;
+      ss = vcl_shared_session_get (s->shared_index);
+      if (vec_len (ss->workers) > 1)
+       VDBG (0, "workers need to be updated");
+    }
+  VDBG (0, "session %u[0x%llx] moved to worker %u", s->session_index,
+       s->vpp_handle, wrk->wrk_index);
+}
+
 static int
 vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
 {
@@ -587,13 +664,19 @@ vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
     case SESSION_CTRL_EVT_BOUND:
       vcl_session_bound_handler (wrk, (session_bound_msg_t *) e->data);
       break;
+    case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
+      vcl_session_req_worker_update_handler (wrk, e->data);
+      break;
+    case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
+      vcl_session_worker_update_reply_handler (wrk, e->data);
+      break;
     default:
       clib_warning ("unhandled %u", e->event_type);
     }
   return VPPCOM_OK;
 }
 
-static inline int
+static int
 vppcom_wait_for_session_state_change (u32 session_index,
                                      session_state_t state,
                                      f64 wait_for_time)
@@ -638,6 +721,52 @@ vppcom_wait_for_session_state_change (u32 session_index,
   return VPPCOM_ETIMEDOUT;
 }
 
+static void
+vcl_handle_pending_wrk_updates (vcl_worker_t * wrk)
+{
+  session_state_t state;
+  vcl_session_t *s;
+  u32 *sip;
+
+  if (PREDICT_TRUE (vec_len (wrk->pending_session_wrk_updates) == 0))
+    return;
+
+  vec_foreach (sip, wrk->pending_session_wrk_updates)
+  {
+    s = vcl_session_get (wrk, *sip);
+    vcl_send_session_worker_update (wrk, s, wrk->wrk_index);
+    state = s->session_state;
+    vppcom_wait_for_session_state_change (s->session_index, STATE_UPDATED, 5);
+    s->session_state = state;
+  }
+  vec_reset_length (wrk->pending_session_wrk_updates);
+}
+
+static void
+vcl_flush_mq_events (void)
+{
+  vcl_worker_t *wrk = vcl_worker_get_current ();
+  svm_msg_q_msg_t *msg;
+  session_event_t *e;
+  svm_msg_q_t *mq;
+  int i;
+
+  mq = wrk->app_event_queue;
+  svm_msg_q_lock (mq);
+  vcl_mq_dequeue_batch (wrk, mq);
+  svm_msg_q_unlock (mq);
+
+  for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
+    {
+      msg = vec_elt_at_index (wrk->mq_msg_vector, i);
+      e = svm_msg_q_msg_data (mq, msg);
+      vcl_handle_mq_event (wrk, e);
+      svm_msg_q_free_msg (mq, msg);
+    }
+  vec_reset_length (wrk->mq_msg_vector);
+  vcl_handle_pending_wrk_updates (wrk);
+}
+
 static int
 vppcom_app_session_enable (void)
 {
@@ -845,13 +974,14 @@ static void
 vcl_app_pre_fork (void)
 {
   vcl_incercept_sigchld ();
+  vcl_flush_mq_events ();
 }
 
 static void
 vcl_app_fork_child_handler (void)
 {
+  vcl_worker_t *parent_wrk, *wrk;
   int rv, parent_wrk_index;
-  vcl_worker_t *parent_wrk;
   u8 *child_name;
 
   parent_wrk_index = vcl_get_worker_index ();
@@ -884,6 +1014,8 @@ vcl_app_fork_child_handler (void)
    */
   vcl_worker_register_with_vpp ();
   parent_wrk = vcl_worker_get (parent_wrk_index);
+  wrk = vcl_worker_get_current ();
+  wrk->vpp_event_queues = vec_dup (parent_wrk->vpp_event_queues);
   vcl_worker_share_sessions (parent_wrk);
   parent_wrk->forked_child = vcl_get_worker_index ();
 
@@ -1097,7 +1229,11 @@ vppcom_session_close (uint32_t session_handle)
        }
 
       if (!do_disconnect)
-       goto cleanup;
+       {
+         VDBG (0, "session handle %u [0x%llx] disconnect skipped",
+               session_handle, vpp_handle);
+         goto cleanup;
+       }
 
       if (state & STATE_LISTEN)
        {
@@ -1143,10 +1279,7 @@ cleanup:
       vcl_ct_registration_unlock (wrk);
     }
 
-  if (vpp_handle != ~0)
-    {
-      vcl_session_table_del_vpp_handle (wrk, vpp_handle);
-    }
+  vcl_session_table_del_vpp_handle (wrk, vpp_handle);
   vcl_session_free (wrk, session);
 
   VDBG (0, "session handle %u [0x%llx] removed", session_handle, vpp_handle);
@@ -1948,22 +2081,6 @@ vppcom_session_write_ready (vcl_session_t * session)
   return svm_fifo_max_enqueue (session->tx_fifo);
 }
 
-static inline int
-vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq)
-{
-  svm_msg_q_msg_t *msg;
-  u32 n_msgs;
-  int i;
-
-  n_msgs = svm_msg_q_size (mq);
-  for (i = 0; i < n_msgs; i++)
-    {
-      vec_add2 (wrk->mq_msg_vector, msg, 1);
-      svm_msg_q_sub_w_lock (mq, msg);
-    }
-  return n_msgs;
-}
-
 #define vcl_fifo_rx_evt_valid_or_break(_fifo)                  \
 if (PREDICT_FALSE (svm_fifo_is_empty (_fifo)))                 \
   {                                                            \
@@ -2067,6 +2184,12 @@ vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
          *bits_set += 1;
        }
       break;
+    case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
+      vcl_session_worker_update_reply_handler (wrk, e->data);
+      break;
+    case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
+      vcl_session_req_worker_update_handler (wrk, e->data);
+      break;
     default:
       clib_warning ("unhandled: %u", e->event_type);
       break;
@@ -2122,6 +2245,7 @@ vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
       svm_msg_q_free_msg (mq, msg);
     }
   vec_reset_length (wrk->mq_msg_vector);
+  vcl_handle_pending_wrk_updates (wrk);
   return *bits_set;
 }
 
@@ -2676,6 +2800,12 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
       session_evt_data = session->vep.ev.data.u64;
       session_events = session->vep.ev.events;
       break;
+    case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
+      vcl_session_req_worker_update_handler (wrk, e->data);
+      break;
+    case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
+      vcl_session_worker_update_reply_handler (wrk, e->data);
+      break;
     default:
       VDBG (0, "unhandled: %u", e->event_type);
       break;
@@ -2741,7 +2871,7 @@ handle_dequeued:
       svm_msg_q_free_msg (mq, msg);
     }
   vec_reset_length (wrk->mq_msg_vector);
-
+  vcl_handle_pending_wrk_updates (wrk);
   return *num_ev;
 }
 
@@ -3580,11 +3710,17 @@ vppcom_mq_epoll_fd (void)
 }
 
 int
-vppcom_session_index (uint32_t session_handle)
+vppcom_session_index (vcl_session_handle_t session_handle)
 {
   return session_handle & 0xFFFFFF;
 }
 
+int
+vppcom_session_worker (vcl_session_handle_t session_handle)
+{
+  return session_handle >> 24;
+}
+
 int
 vppcom_session_handle (uint32_t session_index)
 {
index 641946b..f2fca09 100644 (file)
@@ -97,6 +97,8 @@ typedef struct vppcom_endpt_t_
   uint16_t port;
 } vppcom_endpt_t;
 
+typedef uint32_t vcl_session_handle_t;
+
 typedef enum
 {
   VPPCOM_OK = 0,
@@ -277,7 +279,8 @@ extern int vppcom_session_sendto (uint32_t session_handle, void *buffer,
 extern int vppcom_poll (vcl_poll_t * vp, uint32_t n_sids,
                        double wait_for_time);
 extern int vppcom_mq_epoll_fd (void);
-extern int vppcom_session_index (uint32_t session_handle);
+extern int vppcom_session_index (vcl_session_handle_t session_handle);
+extern int vppcom_session_worker (vcl_session_handle_t session_handle);
 extern int vppcom_session_handle (uint32_t session_index);
 
 extern int vppcom_session_read_segments (uint32_t session_handle,
index 19c8fa2..85b5f93 100644 (file)
@@ -724,6 +724,48 @@ app_worker_stop_listen (app_worker_t * app_wrk, session_handle_t handle)
   return 0;
 }
 
+int
+app_worker_own_session (app_worker_t * app_wrk, stream_session_t * s)
+{
+  segment_manager_t *sm;
+  svm_fifo_t *rxf, *txf;
+
+  s->app_wrk_index = app_wrk->wrk_index;
+
+  rxf = s->server_rx_fifo;
+  txf = s->server_tx_fifo;
+
+  if (!rxf || !txf)
+    return 0;
+
+  s->server_rx_fifo = 0;
+  s->server_tx_fifo = 0;
+
+  sm = app_worker_get_or_alloc_connect_segment_manager (app_wrk);
+  if (session_alloc_fifos (sm, s))
+    return -1;
+
+  if (!svm_fifo_is_empty (rxf))
+    {
+      clib_memcpy_fast (s->server_rx_fifo->data, rxf->data, rxf->nitems);
+      s->server_rx_fifo->head = rxf->head;
+      s->server_rx_fifo->tail = rxf->tail;
+      s->server_rx_fifo->cursize = rxf->cursize;
+    }
+
+  if (!svm_fifo_is_empty (txf))
+    {
+      clib_memcpy_fast (s->server_tx_fifo->data, txf->data, txf->nitems);
+      s->server_tx_fifo->head = txf->head;
+      s->server_tx_fifo->tail = txf->tail;
+      s->server_tx_fifo->cursize = txf->cursize;
+    }
+
+  segment_manager_dealloc_fifos (rxf->segment_index, rxf, txf);
+
+  return 0;
+}
+
 /**
  * Start listening local transport endpoint for requested transport.
  *
@@ -889,6 +931,14 @@ app_worker_get_connect_segment_manager (app_worker_t * app)
   return segment_manager_get (app->connects_seg_manager);
 }
 
+segment_manager_t *
+app_worker_get_or_alloc_connect_segment_manager (app_worker_t * app_wrk)
+{
+  if (app_wrk->connects_seg_manager == (u32) ~ 0)
+    app_worker_alloc_connects_segment_manager (app_wrk);
+  return segment_manager_get (app_wrk->connects_seg_manager);
+}
+
 segment_manager_t *
 app_worker_get_listen_segment_manager (app_worker_t * app,
                                       stream_session_t * listener)
index e33f2ff..1d2064d 100644 (file)
@@ -225,12 +225,15 @@ int app_worker_alloc_and_init (application_t * app, app_worker_t ** wrk);
 app_worker_t *app_worker_get (u32 wrk_index);
 app_worker_t *app_worker_get_if_valid (u32 wrk_index);
 application_t *app_worker_get_app (u32 wrk_index);
+int app_worker_own_session (app_worker_t * app_wrk, stream_session_t * s);
 void app_worker_free (app_worker_t * app_wrk);
 int app_worker_open_session (app_worker_t * app, session_endpoint_t * tep,
                             u32 api_context);
 segment_manager_t *app_worker_get_listen_segment_manager (app_worker_t *,
                                                          stream_session_t *);
 segment_manager_t *app_worker_get_connect_segment_manager (app_worker_t *);
+segment_manager_t
+  * app_worker_get_or_alloc_connect_segment_manager (app_worker_t *);
 int app_worker_alloc_connects_segment_manager (app_worker_t * app);
 int app_worker_add_segment_notify (u32 app_or_wrk, u64 segment_handle);
 u32 app_worker_n_listeners (app_worker_t * app);
index a156c82..9c48faa 100644 (file)
@@ -220,9 +220,9 @@ typedef struct session_bound_msg_
   u8 lcl_is_ip4;
   u8 lcl_ip[16];
   u16 lcl_port;
-  u64 rx_fifo;
-  u64 tx_fifo;
-  u64 vpp_evt_q;
+  uword rx_fifo;
+  uword tx_fifo;
+  uword vpp_evt_q;
   u32 segment_size;
   u8 segment_name_length;
   u8 segment_name[128];
@@ -233,12 +233,12 @@ typedef struct session_accepted_msg_
   u32 context;
   u64 listener_handle;
   u64 handle;
-  u64 server_rx_fifo;
-  u64 server_tx_fifo;
+  uword server_rx_fifo;
+  uword server_tx_fifo;
   u64 segment_handle;
-  u64 vpp_event_queue_address;
-  u64 server_event_queue_address;
-  u64 client_event_queue_address;
+  uword vpp_event_queue_address;
+  uword server_event_queue_address;
+  uword client_event_queue_address;
   u16 port;
   u8 is_ip4;
   u8 ip[16];
@@ -260,12 +260,12 @@ typedef struct session_connected_msg_
   u32 context;
   i32 retval;
   u64 handle;
-  u64 server_rx_fifo;
-  u64 server_tx_fifo;
+  uword server_rx_fifo;
+  uword server_tx_fifo;
   u64 segment_handle;
-  u64 vpp_event_queue_address;
-  u64 client_event_queue_address;
-  u64 server_event_queue_address;
+  uword vpp_event_queue_address;
+  uword client_event_queue_address;
+  uword server_event_queue_address;
   u32 segment_size;
   u8 segment_name_length;
   u8 segment_name[64];
@@ -302,6 +302,28 @@ typedef struct session_reset_reply_msg_
   u64 handle;
 } __clib_packed session_reset_reply_msg_t;
 
+typedef struct session_req_worker_update_msg_
+{
+  u64 session_handle;
+} __clib_packed session_req_worker_update_msg_t;
+
+/* NOTE: using u16 for wrk indices because message needs to fit in 18B */
+typedef struct session_worker_update_msg_
+{
+  u32 client_index;
+  u16 wrk_index;
+  u16 req_wrk_index;
+  u64 handle;
+} __clib_packed session_worker_update_msg_t;
+
+typedef struct session_worker_update_reply_msg_
+{
+  u64 handle;
+  uword rx_fifo;
+  uword tx_fifo;
+  u64 segment_handle;
+} __clib_packed session_worker_update_reply_msg_t;
+
 typedef struct app_session_event_
 {
   svm_msg_q_msg_t msg;
index e3c7300..cf1b3e9 100644 (file)
@@ -49,7 +49,10 @@ typedef enum
   SESSION_CTRL_EVT_DISCONNECTED,
   SESSION_CTRL_EVT_DISCONNECTED_REPLY,
   SESSION_CTRL_EVT_RESET,
-  SESSION_CTRL_EVT_RESET_REPLY
+  SESSION_CTRL_EVT_RESET_REPLY,
+  SESSION_CTRL_EVT_REQ_WORKER_UPDATE,
+  SESSION_CTRL_EVT_WORKER_UPDATE,
+  SESSION_CTRL_EVT_WORKER_UPDATE_REPLY,
 } session_evt_type_t;
 
 static inline const char *
index 98965f3..880f163 100644 (file)
@@ -173,7 +173,7 @@ session_mq_disconnected_handler (void *data)
   svm_msg_q_unlock (app_wrk->event_queue);
   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
   clib_memset (evt, 0, sizeof (*evt));
-  evt->event_type = SESSION_CTRL_EVT_DISCONNECTED;
+  evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
   rmp = (session_disconnected_reply_msg_t *) evt->data;
   rmp->handle = mp->handle;
   rmp->context = mp->context;
@@ -207,6 +207,86 @@ session_mq_disconnected_reply_handler (void *data)
     }
 }
 
+static void
+session_mq_worker_update_handler (void *data)
+{
+  session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data;
+  session_worker_update_reply_msg_t *rmp;
+  svm_msg_q_msg_t _msg, *msg = &_msg;
+  app_worker_t *app_wrk;
+  u32 owner_app_wrk_map;
+  session_event_t *evt;
+  stream_session_t *s;
+  application_t *app;
+
+  app = application_lookup (mp->client_index);
+  if (!app)
+    return;
+  if (!(s = session_get_from_handle_if_valid (mp->handle)))
+    {
+      clib_warning ("invalid handle %llu", mp->handle);
+      return;
+    }
+  app_wrk = app_worker_get (s->app_wrk_index);
+  if (app_wrk->app_index != app->app_index)
+    {
+      clib_warning ("app %u does not own session %llu", app->app_index,
+                   mp->handle);
+      return;
+    }
+  owner_app_wrk_map = app_wrk->wrk_map_index;
+  app_wrk = application_get_worker (app, mp->wrk_index);
+
+  /* This needs to come from the new owner */
+  if (mp->req_wrk_index == owner_app_wrk_map)
+    {
+      session_req_worker_update_msg_t *wump;
+
+      svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
+                                          SESSION_MQ_CTRL_EVT_RING,
+                                          SVM_Q_WAIT, msg);
+      svm_msg_q_unlock (app_wrk->event_queue);
+      evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
+      clib_memset (evt, 0, sizeof (*evt));
+      evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
+      wump = (session_req_worker_update_msg_t *) evt->data;
+      wump->session_handle = mp->handle;
+      svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
+      return;
+    }
+
+  app_worker_own_session (app_wrk, s);
+
+  /*
+   * Send reply
+   */
+  svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
+                                      SESSION_MQ_CTRL_EVT_RING,
+                                      SVM_Q_WAIT, msg);
+  svm_msg_q_unlock (app_wrk->event_queue);
+  evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
+  clib_memset (evt, 0, sizeof (*evt));
+  evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
+  rmp = (session_worker_update_reply_msg_t *) evt->data;
+  rmp->handle = mp->handle;
+  rmp->rx_fifo = pointer_to_uword (s->server_rx_fifo);
+  rmp->tx_fifo = pointer_to_uword (s->server_tx_fifo);
+  rmp->segment_handle = session_segment_handle (s);
+  svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
+
+  /*
+   * Retransmit messages that may have been lost
+   */
+  if (!svm_fifo_is_empty (s->server_tx_fifo))
+    session_send_io_evt_to_thread (s->server_tx_fifo, FIFO_EVENT_APP_TX);
+
+  if (!svm_fifo_is_empty (s->server_rx_fifo))
+    app_worker_lock_and_send_event (app_wrk, s, FIFO_EVENT_APP_RX);
+
+  if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
+    app->cb_fns.session_disconnect_callback (s);
+}
+
 vlib_node_registration_t session_queue_node;
 
 typedef struct
@@ -936,6 +1016,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
        case SESSION_CTRL_EVT_RESET_REPLY:
          session_mq_reset_reply_handler (e->data);
          break;
+       case SESSION_CTRL_EVT_WORKER_UPDATE:
+         session_mq_worker_update_handler (e->data);
+         break;
        default:
          clib_warning ("unhandled event type %d", e->event_type);
        }