vcl: use events for epoll/select/read/write 28/13528/36
authorFlorin Coras <fcoras@cisco.com>
Tue, 17 Jul 2018 17:46:29 +0000 (10:46 -0700)
committerFlorin Coras <florin.coras@gmail.com>
Fri, 27 Jul 2018 17:40:29 +0000 (17:40 +0000)
Have vcl poll and wait on the event message queues as opposed to
constantly polling the session fifos. This also adds event signaling to
cut through sessions.

On the downside, because we can't wait on multiple condvars, i.e., when
we have multiple message queues because of cut-through registrations, we
do timed waits.

Change-Id: I29ade95dba449659fe46008bb1af502276a7c5fd
Signed-off-by: Florin Coras <fcoras@cisco.com>
23 files changed:
src/svm/message_queue.c
src/svm/message_queue.h
src/svm/svm_fifo.c
src/vcl/ldp.c
src/vcl/sock_test_client.c
src/vcl/vcl_bapi.c
src/vcl/vcl_cfg.c
src/vcl/vcl_private.h
src/vcl/vcl_test.h
src/vcl/vcl_test_client.c
src/vcl/vcl_test_server.c
src/vcl/vppcom.c
src/vcl/vppcom.h
src/vnet/session/application.c
src/vnet/session/application_interface.c
src/vnet/session/application_interface.h
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_api.c
src/vnet/session/session_cli.c
src/vnet/session/session_lookup.c
src/vnet/session/session_node.c
test/test_vcl.py

index 0f9be9c..1b2d2e1 100644 (file)
@@ -73,6 +73,7 @@ svm_msg_q_alloc (svm_msg_q_cfg_t * cfg)
       ring = &mq->rings[i];
       ring->elsize = cfg->ring_cfgs[i].elsize;
       ring->nitems = cfg->ring_cfgs[i].nitems;
+      ring->cursize = ring->head = ring->tail = 0;
       if (cfg->ring_cfgs[i].data)
        ring->data = cfg->ring_cfgs[i].data;
       else
@@ -97,10 +98,10 @@ svm_msg_q_free (svm_msg_q_t * mq)
 svm_msg_q_msg_t
 svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index)
 {
-  svm_msg_q_msg_t msg = {.as_u64 = ~0 };
+  svm_msg_q_msg_t msg;
   svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index);
 
-  ASSERT (ring->cursize != ring->nitems);
+  ASSERT (ring->cursize < ring->nitems);
   msg.ring_index = ring - mq->rings;
   msg.elt_index = ring->tail;
   ring->tail = (ring->tail + 1) % ring->nitems;
@@ -131,12 +132,9 @@ svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
   else
     {
       svm_msg_q_lock (mq);
+      while (svm_msg_q_ring_is_full (mq, ring_index))
+       svm_msg_q_wait (mq);
       *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
-      while (svm_msg_q_msg_is_invalid (msg))
-       {
-         svm_msg_q_wait (mq);
-         *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
-       }
     }
   return 0;
 }
@@ -190,18 +188,20 @@ svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
 static int
 svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
 {
+  u32 dist1, dist2, tail, head;
   svm_msg_q_ring_t *ring;
-  u32 dist1, dist2;
 
   if (vec_len (mq->rings) <= msg->ring_index)
     return 0;
   ring = &mq->rings[msg->ring_index];
+  tail = ring->tail;
+  head = ring->head;
 
-  dist1 = ((ring->nitems + msg->elt_index) - ring->head) % ring->nitems;
-  if (ring->tail == ring->head)
+  dist1 = ((ring->nitems + msg->elt_index) - head) % ring->nitems;
+  if (tail == head)
     dist2 = (ring->cursize == 0) ? 0 : ring->nitems;
   else
-    dist2 = ((ring->nitems + ring->tail) - ring->head) % ring->nitems;
+    dist2 = ((ring->nitems + tail) - head) % ring->nitems;
   return (dist1 < dist2);
 }
 
index e4a5f07..4c16c97 100644 (file)
@@ -22,6 +22,7 @@
 
 #include <vppinfra/clib.h>
 #include <vppinfra/error.h>
+#include <vppinfra/time.h>
 #include <svm/queue.h>
 
 typedef struct svm_msg_q_ring_
@@ -274,12 +275,6 @@ svm_msg_q_lock (svm_msg_q_t * mq)
   return pthread_mutex_lock (&mq->q->mutex);
 }
 
-static inline void
-svm_msg_q_wait (svm_msg_q_t * mq)
-{
-  pthread_cond_wait (&mq->q->condvar, &mq->q->mutex);
-}
-
 /**
  * Unlock message queue
  */
@@ -292,6 +287,37 @@ svm_msg_q_unlock (svm_msg_q_t * mq)
   pthread_mutex_unlock (&mq->q->mutex);
 }
 
+/**
+ * Wait for message queue event
+ *
+ * Must be called with mutex held
+ */
+static inline void
+svm_msg_q_wait (svm_msg_q_t * mq)
+{
+  pthread_cond_wait (&mq->q->condvar, &mq->q->mutex);
+}
+
+/**
+ * Timed wait for message queue event
+ *
+ * Must be called with mutex held.
+ *
+ * @param mq           message queue
+ * @param timeout      time in seconds
+ */
+static inline int
+svm_msg_q_timedwait (svm_msg_q_t * mq, double timeout)
+{
+  struct timespec ts;
+
+  ts.tv_sec = unix_time_now () + (u32) timeout;
+  ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
+  if (pthread_cond_timedwait (&mq->q->condvar, &mq->q->mutex, &ts))
+    return -1;
+  return 0;
+}
+
 #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
 
 /*
index 47df225..018827e 100644 (file)
@@ -179,7 +179,7 @@ format_svm_fifo (u8 * s, va_list * args)
 
   if (verbose > 1)
     s = format
-      (s, " server session %d thread %d client session %d thread %d\n",
+      (s, " vpp session %d thread %d app session %d thread %d\n",
        f->master_session_index, f->master_thread_index,
        f->client_session_index, f->client_thread_index);
 
index ce243df..c26c460 100644 (file)
@@ -3171,7 +3171,7 @@ ldp_epoll_pwait (int epfd, struct epoll_event *events,
       return -1;
     }
 
-  time_to_wait = ((timeout >= 0) ? (double) timeout / (double) 1000 : 0);
+  time_to_wait = ((timeout >= 0) ? (double) timeout : 0);
   time_out = clib_time_now (&ldp->clib_time) + time_to_wait;
 
   func_str = "vppcom_session_attr[GET_LIBC_EPFD]";
index e88b2b9..1561676 100644 (file)
@@ -455,10 +455,9 @@ stream_test_client (sock_test_t test)
          if (FD_ISSET (tsock->fd, wfdset) &&
              (tsock->stats.tx_bytes < ctrl->cfg.total_bytes))
            {
-             tx_bytes =
-               sock_test_write (tsock->fd, (uint8_t *) tsock->txbuf,
-                                ctrl->cfg.txbuf_size, &tsock->stats,
-                                ctrl->cfg.verbose);
+             tx_bytes = sock_test_write (tsock->fd, (uint8_t *) tsock->txbuf,
+                                         ctrl->cfg.txbuf_size, &tsock->stats,
+                                         ctrl->cfg.verbose);
              if (tx_bytes < 0)
                {
                  fprintf (stderr, "\nCLIENT: ERROR: sock_test_write(%d) "
index ca65782..0201cd8 100644 (file)
@@ -33,7 +33,7 @@
 #include <vpp/api/vpe_all_api_h.h>
 #undef vl_printfun
 
-static u8 *
+u8 *
 format_api_error (u8 * s, va_list * args)
 {
   i32 error = va_arg (*args, u32);
@@ -350,8 +350,8 @@ done:
 
   session->vpp_handle = mp->handle;
   session->transport.is_ip4 = mp->lcl_is_ip4;
-  session->transport.lcl_ip = to_ip46 (mp->lcl_is_ip4 ? IP46_TYPE_IP4 :
-                                      IP46_TYPE_IP6, mp->lcl_ip);
+  clib_memcpy (&session->transport.lcl_ip, mp->lcl_ip,
+              sizeof (ip46_address_t));
   session->transport.lcl_port = mp->lcl_port;
   vppcom_session_table_add_listener (mp->handle, session_index);
   session->session_state = STATE_LISTEN;
@@ -435,8 +435,7 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
   session->session_state = STATE_ACCEPT;
   session->transport.rmt_port = mp->port;
   session->transport.is_ip4 = mp->is_ip4;
-  session->transport.rmt_ip = to_ip46 (mp->is_ip4 ? IP46_TYPE_IP4 :
-                                      IP46_TYPE_IP6, mp->ip);
+  clib_memcpy (&session->transport.rmt_ip, mp->ip, sizeof (ip46_address_t));
 
   /* Add it to lookup table */
   hash_set (vcm->session_index_by_vpp_handles, mp->handle, session_index);
@@ -533,7 +532,8 @@ vppcom_app_send_attach (void)
     APP_OPTIONS_FLAGS_ACCEPT_REDIRECT | APP_OPTIONS_FLAGS_ADD_SEGMENT |
     (vcm->cfg.app_scope_local ? APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE : 0) |
     (vcm->cfg.app_scope_global ? APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE : 0) |
-    (app_is_proxy ? APP_OPTIONS_FLAGS_IS_PROXY : 0);
+    (app_is_proxy ? APP_OPTIONS_FLAGS_IS_PROXY : 0) |
+    APP_OPTIONS_FLAGS_USE_MQ_FOR_CTRL_MSGS;
   bmp->options[APP_OPTIONS_PROXY_TRANSPORT] =
     (u64) ((vcm->cfg.app_proxy_transport_tcp ? 1 << TRANSPORT_PROTO_TCP : 0) |
           (vcm->cfg.app_proxy_transport_udp ? 1 << TRANSPORT_PROTO_UDP : 0));
index f25c8fc..279a975 100644 (file)
@@ -29,8 +29,6 @@ vppcom_main_t *vcm = &_vppcom_main;
 void
 vppcom_cfg_init (vppcom_cfg_t * vcl_cfg)
 {
-  char *env_var_str;
-
   ASSERT (vcl_cfg);
 
   vcl_cfg->heapsize = (256ULL << 20);
@@ -48,24 +46,11 @@ vppcom_cfg_init (vppcom_cfg_t * vcl_cfg)
   vcl_cfg->accept_timeout = 60.0;
   vcl_cfg->event_ring_size = (128 << 10);
   vcl_cfg->event_log_path = "/dev/shm";
-
-  env_var_str = getenv (VPPCOM_ENV_DEBUG);
-  if (env_var_str)
-    {
-      u32 tmp;
-      if (sscanf (env_var_str, "%u", &tmp) != 1)
-       clib_warning ("VCL<%d>: WARNING: Invalid debug level specified in the"
-                     " environment variable " VPPCOM_ENV_DEBUG " (%s)!\n",
-                     getpid (), env_var_str);
-      else
-       {
-         vcm->debug = tmp;
-         VDBG (0, "VCL<%d>: configured VCL debug level (%u) from "
-               VPPCOM_ENV_DEBUG "!", getpid (), vcm->debug);
-       }
-    }
 }
 
+#define VCL_CFG_DBG(_lvl, _fmt, _args...)              \
+  if (vcm->debug > _lvl)                               \
+    fprintf (stderr, _fmt, ##_args)
 void
 vppcom_cfg_heapsize (char *conf_fname)
 {
@@ -85,16 +70,17 @@ vppcom_cfg_heapsize (char *conf_fname)
   fp = fopen (conf_fname, "r");
   if (fp == NULL)
     {
-      VDBG (0, "VCL<%d>: using default heapsize %lld (0x%llx)", getpid (),
-           vcl_cfg->heapsize, vcl_cfg->heapsize);
+      VCL_CFG_DBG (0, "VCL<%d>: using default heapsize %lu (0x%lx)",
+                  getpid (), vcl_cfg->heapsize, vcl_cfg->heapsize);
       goto defaulted;
     }
 
   argv = calloc (1, sizeof (char *));
   if (argv == NULL)
     {
-      VDBG (0, "VCL<%d>: calloc failed, using default heapsize %lld (0x%llx)",
-           getpid (), vcl_cfg->heapsize, vcl_cfg->heapsize);
+      VCL_CFG_DBG (0, "VCL<%d>: calloc failed, using default heapsize %lu"
+                  " (0x%lx)", getpid (), vcl_cfg->heapsize,
+                  vcl_cfg->heapsize);
       goto defaulted;
     }
 
@@ -111,18 +97,18 @@ vppcom_cfg_heapsize (char *conf_fname)
          char **tmp = realloc (argv, argc * sizeof (char *));
          if (tmp == NULL)
            {
-             VDBG (0, "VCL<%d>: realloc failed, using default heapsize %lld "
-                   "(0x%llx)", getpid (), vcl_cfg->heapsize,
-                   vcl_cfg->heapsize);
+             VCL_CFG_DBG (0, "VCL<%d>: realloc failed, using default "
+                          "heapsize %lu (0x%lx)", getpid (),
+                          vcl_cfg->heapsize, vcl_cfg->heapsize);
              goto defaulted;
            }
          argv = tmp;
          arg = strndup (p, 1024);
          if (arg == NULL)
            {
-             VDBG (0, "VCL<%d>: strndup failed, using default heapsize %lld "
-                   "(0x%llx)", getpid (), vcl_cfg->heapsize,
-                   vcl_cfg->heapsize);
+             VCL_CFG_DBG (0, "VCL<%d>: strndup failed, using default "
+                          "heapsize %ld (0x%lx)", getpid (),
+                          vcl_cfg->heapsize, vcl_cfg->heapsize);
              goto defaulted;
            }
          argv[argc - 1] = arg;
@@ -136,8 +122,9 @@ vppcom_cfg_heapsize (char *conf_fname)
   char **tmp = realloc (argv, (argc + 1) * sizeof (char *));
   if (tmp == NULL)
     {
-      VDBG (0, "VCL<%d>: realloc failed, using default heapsize %lld "
-           "(0x%llx)", getpid (), vcl_cfg->heapsize, vcl_cfg->heapsize);
+      VCL_CFG_DBG (0, "VCL<%d>: realloc failed, using default heapsize %ld "
+                  "(0x%lx)", getpid (), vcl_cfg->heapsize,
+                  vcl_cfg->heapsize);
       goto defaulted;
     }
   argv = tmp;
@@ -163,9 +150,9 @@ vppcom_cfg_heapsize (char *conf_fname)
            }
          if (size == 0)
            {
-             VDBG (0, "VCL<%d>: parse error '%s %s', using default "
-                   "heapsize %lld (0x%llx)", getpid (), argv[i], argv[i + 1],
-                   vcl_cfg->heapsize, vcl_cfg->heapsize);
+             VCL_CFG_DBG (0, "VCL<%d>: parse error '%s %s', using default "
+                          "heapsize %ld (0x%lx)", getpid (), argv[i],
+                          argv[i + 1], vcl_cfg->heapsize, vcl_cfg->heapsize);
              goto defaulted;
            }
 
@@ -175,9 +162,9 @@ vppcom_cfg_heapsize (char *conf_fname)
            vcl_cfg->heapsize = size << 20;
          else
            {
-             VDBG (0, "VCL<%d>: parse error '%s %s', using default "
-                   "heapsize %lld (0x%llx)", getpid (), argv[i], argv[i + 1],
-                   vcl_cfg->heapsize, vcl_cfg->heapsize);
+             VCL_CFG_DBG (0, "VCL<%d>: parse error '%s %s', using default "
+                          "heapsize %ld (0x%lx)", getpid (), argv[i],
+                          argv[i + 1], vcl_cfg->heapsize, vcl_cfg->heapsize);
              goto defaulted;
            }
        }
@@ -193,17 +180,17 @@ defaulted:
                  MAP_SHARED | MAP_ANONYMOUS, -1, 0);
   if (vcl_mem == MAP_FAILED)
     {
-      clib_unix_error ("VCL<%d>: ERROR: mmap(0, %lld == 0x%llx, "
-                      "PROT_READ | PROT_WRITE,MAP_SHARED | MAP_ANONYMOUS, "
-                      "-1, 0) failed!",
-                      getpid (), vcl_cfg->heapsize, vcl_cfg->heapsize);
+      VCL_CFG_DBG (0, "VCL<%d>: ERROR: mmap(0, %ld == 0x%lx, "
+                  "PROT_READ | PROT_WRITE,MAP_SHARED | MAP_ANONYMOUS, "
+                  "-1, 0) failed!", getpid (), vcl_cfg->heapsize,
+                  vcl_cfg->heapsize);
       ASSERT (vcl_mem != MAP_FAILED);
       return;
     }
   heap = clib_mem_init_thread_safe (vcl_mem, vcl_cfg->heapsize);
   if (!heap)
     {
-      clib_warning ("VCL<%d>: ERROR: clib_mem_init() failed!", getpid ());
+      fprintf (stderr, "VCL<%d>: ERROR: clib_mem_init() failed!", getpid ());
       ASSERT (heap);
       return;
     }
index aba4839..327a7fc 100644 (file)
@@ -58,6 +58,7 @@ typedef enum
 
 #define SERVER_STATE_OPEN  (STATE_ACCEPT|STATE_CLOSE_ON_EMPTY)
 #define CLIENT_STATE_OPEN  (STATE_CONNECT|STATE_CLOSE_ON_EMPTY)
+#define STATE_OPEN (SERVER_STATE_OPEN | CLIENT_STATE_OPEN)
 
 typedef struct epoll_event vppcom_epoll_event_t;
 
@@ -78,6 +79,15 @@ typedef struct
   ip46_address_t ip46;
 } vppcom_ip46_t;
 
+typedef struct vcl_session_msg
+{
+  u32 next;
+  union
+  {
+    session_accepted_msg_t accepted_msg;
+  };
+} vcl_session_msg_t;
+
 enum
 {
   VCL_SESS_ATTR_SERVER,
@@ -130,9 +140,11 @@ typedef struct
   u32 wait_cont_idx;
   vppcom_epoll_t vep;
   int libc_epfd;
-  u64 client_queue_address;
+  svm_msg_q_t *our_evt_q;
+  u32 ct_registration;
   u64 options[16];
   vce_event_handler_reg_t *poll_reg;
+  vcl_session_msg_t *accept_evts_fifo;
 #if VCL_ELOG
   elog_track_t elog_track;
 #endif
@@ -166,6 +178,12 @@ typedef struct vppcom_cfg_t_
 
 void vppcom_cfg (vppcom_cfg_t * vcl_cfg);
 
+typedef struct vcl_cut_through_registration_
+{
+  svm_msg_q_t *mq;
+  u32 sid;
+} vcl_cut_through_registration_t;
+
 typedef struct vppcom_main_t_
 {
   u8 init;
@@ -213,6 +231,12 @@ typedef struct vppcom_main_t_
   /* IO thread */
   vppcom_session_io_thread_t session_io_thread;
 
+  /* pool of ctrl msgs */
+  vcl_session_msg_t *ctrl_evt_pool;
+
+  /** Pool of cut through registrations */
+  vcl_cut_through_registration_t *cut_through_registrations;
+
 #ifdef VCL_ELOG
   /* VPP Event-logger */
   elog_main_t elog_main;
@@ -255,6 +279,40 @@ do {                                                            \
 #define VCL_EVENTS_UNLOCK() \
   clib_spinlock_unlock (&(vcm->event_thread.events_lockp))
 
+#define VCL_INVALID_SESSION_INDEX ((u32)~0)
+
+static inline vcl_session_t *
+vcl_session_get (u32 session_index)
+{
+  if (pool_is_free_index (vcm->sessions, session_index))
+    return 0;
+  return pool_elt_at_index (vcm->sessions, session_index);
+}
+
+static inline u32
+vcl_session_index (vcl_session_t * s)
+{
+  return (s - vcm->sessions);
+}
+
+static inline vcl_session_t *
+vcl_session_get_w_handle (u64 handle)
+{
+  uword *p;
+  if ((p = hash_get (vcm->session_index_by_vpp_handles, handle)))
+    return vcl_session_get ((u32) p[0]);
+  return 0;
+}
+
+static inline u32
+vcl_session_get_index_from_handle (u64 handle)
+{
+  uword *p;
+  if ((p = hash_get (vcm->session_index_by_vpp_handles, handle)))
+    return p[0];
+  return VCL_INVALID_SESSION_INDEX;
+}
+
 static inline int
 vppcom_session_at_index (u32 session_index, vcl_session_t * volatile *sess)
 {
@@ -326,6 +384,8 @@ void vppcom_send_accept_session_reply (u64 handle, u32 context, int retval);
 
 u32 vcl_max_nsid_len (void);
 
+u8 *format_api_error (u8 * s, va_list * args);
+
 #endif /* SRC_VCL_VCL_PRIVATE_H_ */
 
 /*
index 0f3bd2d..bdfb89c 100644 (file)
@@ -102,11 +102,6 @@ vcl_test_write (int fd, uint8_t *buf, uint32_t nbytes,
           nbytes_left = nbytes_left - rv;
           if (stats)
             stats->tx_incomp++;
-          if (verbose)
-            {
-              printf ("SOCK_TEST: WARNING: bytes written (%d) "
-                      "!= bytes to write (%d)!\n", tx_bytes, nbytes);
-            }
         }
      
     } while (tx_bytes != nbytes);
index af3e01d..a34648b 100644 (file)
@@ -166,13 +166,13 @@ echo_test_client ()
                (tsock->stats.stop.tv_nsec == 0)))
            continue;
 
-         if (FD_ISSET (tsock->fd, wfdset) &&
-             (tsock->stats.tx_bytes < ctrl->cfg.total_bytes))
+         if (FD_ISSET (tsock->fd, wfdset)
+             && (tsock->stats.tx_bytes < ctrl->cfg.total_bytes))
 
            {
-             tx_bytes =
-               vcl_test_write (tsock->fd, (uint8_t *) tsock->txbuf, nbytes,
-                               &tsock->stats, ctrl->cfg.verbose);
+             tx_bytes = vcl_test_write (tsock->fd, (uint8_t *) tsock->txbuf,
+                                        nbytes, &tsock->stats,
+                                        ctrl->cfg.verbose);
              if (tx_bytes < 0)
                {
                  fprintf (stderr, "\nCLIENT: ERROR: vcl_test_write(%d) "
@@ -180,16 +180,13 @@ echo_test_client ()
                  return;
                }
 
-             printf ("CLIENT (fd %d): TX (%d bytes) - '%s'\n",
-                     tsock->fd, tx_bytes, tsock->txbuf);
            }
 
          if ((FD_ISSET (tsock->fd, rfdset)) &&
              (tsock->stats.rx_bytes < ctrl->cfg.total_bytes))
            {
-             rx_bytes =
-               vcl_test_read (tsock->fd, (uint8_t *) tsock->rxbuf,
-                              nbytes, &tsock->stats);
+             rx_bytes = vcl_test_read (tsock->fd, (uint8_t *) tsock->rxbuf,
+                                       nbytes, &tsock->stats);
              if (rx_bytes > 0)
                {
                  printf ("CLIENT (fd %d): RX (%d bytes) - '%s'\n",
@@ -338,13 +335,12 @@ stream_test_client (sock_test_t test)
                                    tsock->rxbuf_size, &tsock->stats);
            }
 
-         if (FD_ISSET (tsock->fd, wfdset) &&
-             (tsock->stats.tx_bytes < ctrl->cfg.total_bytes))
+         if (FD_ISSET (tsock->fd, wfdset)
+             && (tsock->stats.tx_bytes < ctrl->cfg.total_bytes))
            {
-             tx_bytes =
-               vcl_test_write (tsock->fd, (uint8_t *) tsock->txbuf,
-                               ctrl->cfg.txbuf_size, &tsock->stats,
-                               ctrl->cfg.verbose);
+             tx_bytes = vcl_test_write (tsock->fd, (uint8_t *) tsock->txbuf,
+                                        ctrl->cfg.txbuf_size, &tsock->stats,
+                                        ctrl->cfg.verbose);
              if (tx_bytes < 0)
                {
                  fprintf (stderr, "\nCLIENT: ERROR: vcl_test_write(%d) "
index 98c36a4..6a2fda0 100644 (file)
@@ -463,7 +463,7 @@ main (int argc, char **argv)
     {
       int num_ev;
       num_ev = vppcom_epoll_wait (ssm->epfd, ssm->wait_events,
-                                 SOCK_SERVER_MAX_EPOLL_EVENTS, 60.0);
+                                 SOCK_SERVER_MAX_EPOLL_EVENTS, 60000.0);
       if (num_ev < 0)
        {
          errno = -num_ev;
index 60c649d..d1c4413 100644 (file)
 #include <vcl/vcl_debug.h>
 #include <vcl/vcl_private.h>
 
+static u8 not_ready;
+
+void
+sigsegv_signal (int signum)
+{
+  not_ready = 1;
+}
+
+static void
+vcl_wait_for_memory (void *mem)
+{
+  u8 __clib_unused test;
+  if (1 || vcm->debug)
+    {
+      sleep (1);
+      return;
+    }
+  if (signal (SIGSEGV, sigsegv_signal))
+    {
+      perror ("signal()");
+      return;
+    }
+  not_ready = 0;
+
+again:
+  test = *(u8 *) mem;
+  if (not_ready)
+    {
+      not_ready = 0;
+      usleep (1);
+      goto again;
+    }
+
+  signal (SIGSEGV, SIG_DFL);
+}
+
 static const char *
 vppcom_app_state_str (app_state_t state)
 {
@@ -206,6 +242,256 @@ vppcom_wait_for_app_state_change (app_state_t app_state)
   return VPPCOM_ETIMEDOUT;
 }
 
+static u32
+vcl_ct_registration_add (svm_msg_q_t * mq, u32 sid)
+{
+  vcl_cut_through_registration_t *cr;
+  pool_get (vcm->cut_through_registrations, cr);
+  cr->mq = mq;
+  cr->sid = sid;
+  return (cr - vcm->cut_through_registrations);
+}
+
+static void
+vcl_ct_registration_del (u32 ct_index)
+{
+  pool_put_index (vcm->cut_through_registrations, ct_index);
+}
+
+static vcl_session_t *
+vcl_ct_session_get_from_fifo (svm_fifo_t * f, u8 type)
+{
+  vcl_session_t *s;
+  s = vcl_session_get (f->client_session_index);
+  if (s)
+    {
+      /* rx fifo */
+      if (type == 0 && s->rx_fifo == f)
+       return s;
+      /* tx fifo */
+      if (type == 1 && s->tx_fifo == f)
+       return s;
+    }
+  s = vcl_session_get (f->master_session_index);
+  if (s)
+    {
+      if (type == 0 && s->rx_fifo == f)
+       return s;
+      if (type == 1 && s->tx_fifo == f)
+       return s;
+    }
+  return 0;
+}
+
+static void
+vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
+                                session_handle_t handle, int retval)
+{
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  session_accepted_reply_msg_t *rmp;
+  app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_ACCEPTED_REPLY);
+  rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
+  rmp->handle = handle;
+  rmp->context = context;
+  rmp->retval = retval;
+  app_send_ctrl_evt_to_vpp (mq, app_evt);
+}
+
+static u32
+vcl_session_accepted_handler (session_accepted_msg_t * mp)
+{
+  vcl_session_t *session, *listen_session;
+  svm_fifo_t *rx_fifo, *tx_fifo;
+  u32 session_index;
+
+  VCL_SESSION_LOCK ();
+
+  listen_session = vppcom_session_table_lookup_listener (mp->listener_handle);
+  if (!listen_session)
+    {
+      svm_msg_q_t *evt_q;
+      evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
+      clib_warning ("VCL<%d>: ERROR: couldn't find listen session: "
+                   "unknown vpp listener handle %llx",
+                   getpid (), mp->listener_handle);
+      vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle,
+                                      VNET_API_ERROR_INVALID_ARGUMENT);
+      return VCL_INVALID_SESSION_INDEX;
+    }
+
+  pool_get (vcm->sessions, session);
+  memset (session, 0, sizeof (*session));
+  session_index = (u32) (session - vcm->sessions);
+
+  rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
+  tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
+
+  if (mp->server_event_queue_address)
+    {
+      session->vpp_evt_q = uword_to_pointer (mp->client_event_queue_address,
+                                            svm_msg_q_t *);
+      session->our_evt_q = uword_to_pointer (mp->server_event_queue_address,
+                                            svm_msg_q_t *);
+      vcl_wait_for_memory (session->vpp_evt_q);
+      session->ct_registration = vcl_ct_registration_add (session->our_evt_q,
+                                                         session_index);
+      rx_fifo->master_session_index = session_index;
+      tx_fifo->master_session_index = session_index;
+    }
+  else
+    {
+      session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
+                                            svm_msg_q_t *);
+      rx_fifo->client_session_index = session_index;
+      tx_fifo->client_session_index = session_index;
+    }
+
+  session->vpp_handle = mp->handle;
+  session->client_context = mp->context;
+  session->rx_fifo = rx_fifo;
+  session->tx_fifo = tx_fifo;
+
+  session->session_state = STATE_ACCEPT;
+  session->transport.rmt_port = mp->port;
+  session->transport.is_ip4 = mp->is_ip4;
+  clib_memcpy (&session->transport.rmt_ip, mp->ip, sizeof (ip46_address_t));
+
+  hash_set (vcm->session_index_by_vpp_handles, mp->handle, session_index);
+  session->transport.lcl_port = listen_session->transport.lcl_port;
+  session->transport.lcl_ip = listen_session->transport.lcl_ip;
+
+  VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: client accept request from %s"
+       " address %U port %d queue %p!", getpid (), mp->handle, session_index,
+       mp->is_ip4 ? "IPv4" : "IPv6", format_ip46_address, &mp->ip,
+       mp->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
+       clib_net_to_host_u16 (mp->port), session->vpp_evt_q);
+  vcl_evt (VCL_EVT_ACCEPT, session, listen_session, session_index);
+
+  VCL_SESSION_UNLOCK ();
+  return session_index;
+}
+
+static u32
+vcl_session_connected_handler (session_connected_msg_t * mp)
+{
+  vcl_session_t *session = 0;
+  u32 session_index;
+  svm_fifo_t *rx_fifo, *tx_fifo;
+  int rv = VPPCOM_OK;
+
+  session_index = mp->context;
+  VCL_SESSION_LOCK_AND_GET (session_index, &session);
+done:
+  if (mp->retval)
+    {
+      clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
+                   "connect failed! %U",
+                   getpid (), mp->handle, session_index,
+                   format_api_error, ntohl (mp->retval));
+      if (session)
+       {
+         session->session_state = STATE_FAILED;
+         session->vpp_handle = mp->handle;
+       }
+      else
+       {
+         clib_warning ("[%s] ERROR: vpp handle 0x%llx, sid %u: "
+                       "Invalid session index (%u)!",
+                       getpid (), mp->handle, session_index);
+       }
+      goto done_unlock;
+    }
+
+  if (rv)
+    goto done_unlock;
+
+  if (mp->client_event_queue_address)
+    {
+      session->vpp_evt_q = uword_to_pointer (mp->server_event_queue_address,
+                                            svm_msg_q_t *);
+      session->our_evt_q = uword_to_pointer (mp->client_event_queue_address,
+                                            svm_msg_q_t *);
+      vcl_wait_for_memory (session->vpp_evt_q);
+      session->ct_registration = vcl_ct_registration_add (session->our_evt_q,
+                                                         session_index);
+    }
+  else
+    session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
+                                          svm_msg_q_t *);
+
+  rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
+  rx_fifo->client_session_index = session_index;
+  tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
+  tx_fifo->client_session_index = session_index;
+
+  session->rx_fifo = rx_fifo;
+  session->tx_fifo = tx_fifo;
+  session->vpp_handle = mp->handle;
+  session->transport.is_ip4 = mp->is_ip4;
+  clib_memcpy (&session->transport.lcl_ip, mp->lcl_ip,
+              sizeof (session->transport.lcl_ip));
+  session->transport.lcl_port = mp->lcl_port;
+  session->session_state = STATE_CONNECT;
+
+  /* Add it to lookup table */
+  hash_set (vcm->session_index_by_vpp_handles, mp->handle, session_index);
+
+  VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: connect succeeded! "
+       "session_rx_fifo %p, refcnt %d, session_tx_fifo %p, refcnt %d",
+       getpid (), mp->handle, session_index, session->rx_fifo,
+       session->rx_fifo->refcnt, session->tx_fifo, session->tx_fifo->refcnt);
+done_unlock:
+  VCL_SESSION_UNLOCK ();
+  return session_index;
+}
+
+int
+vcl_handle_mq_ctrl_event (session_event_t * e)
+{
+  session_accepted_msg_t *accepted_msg;
+  session_disconnected_msg_t *disconnected_msg;
+  vcl_session_msg_t *vcl_msg;
+  vcl_session_t *session;
+  u64 handle;
+  u32 sid;
+
+  switch (e->event_type)
+    {
+    case FIFO_EVENT_APP_RX:
+      clib_warning ("unhandled rx: sid %u (0x%x)",
+                   e->fifo->client_session_index,
+                   e->fifo->client_session_index);
+      break;
+    case SESSION_CTRL_EVT_ACCEPTED:
+      accepted_msg = (session_accepted_msg_t *) e->data;
+      handle = accepted_msg->listener_handle;
+      session = vppcom_session_table_lookup_listener (handle);
+      if (!session)
+       {
+         clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
+                       "listener handle %llx", getpid (), handle);
+         break;
+       }
+
+      clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
+      vcl_msg->accepted_msg = *accepted_msg;
+      break;
+    case SESSION_CTRL_EVT_CONNECTED:
+      vcl_session_connected_handler ((session_connected_msg_t *) e->data);
+      break;
+    case SESSION_CTRL_EVT_DISCONNECTED:
+      disconnected_msg = (session_disconnected_msg_t *) e->data;
+      sid = vcl_session_get_index_from_handle (disconnected_msg->handle);
+      session = vcl_session_get (sid);
+      session->session_state = STATE_DISCONNECT;
+      VDBG (0, "disconnected %u", sid);
+      break;
+    default:
+      clib_warning ("unhandled %u", e->event_type);
+    }
+  return VPPCOM_OK;
+}
+
 static inline int
 vppcom_wait_for_session_state_change (u32 session_index,
                                      session_state_t state,
@@ -213,6 +499,8 @@ vppcom_wait_for_session_state_change (u32 session_index,
 {
   f64 timeout = clib_time_now (&vcm->clib_time) + wait_for_time;
   vcl_session_t *volatile session;
+  svm_msg_q_msg_t msg;
+  session_event_t *e;
   int rv;
 
   do
@@ -234,8 +522,13 @@ vppcom_wait_for_session_state_change (u32 session_index,
          VCL_SESSION_UNLOCK ();
          return VPPCOM_ECONNREFUSED;
        }
-
       VCL_SESSION_UNLOCK ();
+
+      if (svm_msg_q_sub (vcm->app_event_queue, &msg, SVM_Q_NOWAIT, 0))
+       continue;
+      e = svm_msg_q_msg_data (vcm->app_event_queue, &msg);
+      vcl_handle_mq_ctrl_event (e);
+      svm_msg_q_free_msg (vcm->app_event_queue, &msg);
     }
   while (clib_time_now (&vcm->clib_time) < timeout);
 
@@ -334,25 +627,17 @@ vppcom_session_disconnect (u32 session_index)
       goto done;
     }
 
-  /* The peer has already initiated the close,
-   * so send the disconnect session reply.
-   */
   if (state & STATE_CLOSE_ON_EMPTY)
     {
-      //XXX alagalah - Check and drain here?
-      vppcom_send_disconnect_session_reply (vpp_handle,
-                                           session_index, 0 /* rv */ );
+      vppcom_send_disconnect_session_reply (vpp_handle, session_index,
+                                           0 /* rv */ );
       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect "
            "REPLY...", getpid (), vpp_handle, session_index);
     }
-
-  /* Otherwise, send a disconnect session msg...
-   */
   else
     {
       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect...",
            getpid (), vpp_handle, session_index);
-
       vppcom_send_disconnect_session (vpp_handle, session_index);
     }
 
@@ -564,7 +849,6 @@ vppcom_session_close (uint32_t session_index)
                  getpid (), vpp_handle, session_index,
                  rv, vppcom_retval_str (rv));
        }
-
       else if (state & (CLIENT_STATE_OPEN | SERVER_STATE_OPEN))
        {
          rv = vppcom_session_disconnect (session_index);
@@ -577,6 +861,9 @@ vppcom_session_close (uint32_t session_index)
     }
 
   VCL_SESSION_LOCK_AND_GET (session_index, &session);
+  if (session->our_evt_q)
+    vcl_ct_registration_del (session->ct_registration);
+
   vpp_handle = session->vpp_handle;
   if (vpp_handle != ~0)
     {
@@ -625,8 +912,12 @@ vppcom_session_bind (uint32_t session_index, vppcom_endpt_t * ep)
     }
 
   session->transport.is_ip4 = ep->is_ip4;
-  session->transport.lcl_ip = to_ip46 (ep->is_ip4 ? IP46_TYPE_IP4 :
-                                      IP46_TYPE_IP6, ep->ip);
+  if (ep->is_ip4)
+    clib_memcpy (&session->transport.lcl_ip.ip4, ep->ip,
+                sizeof (ip4_address_t));
+  else
+    clib_memcpy (&session->transport.lcl_ip.ip6, ep->ip,
+                sizeof (ip6_address_t));
   session->transport.lcl_port = ep->port;
 
   VDBG (0, "VCL<%d>: sid %u: binding to local %s address %U port %u, "
@@ -678,9 +969,9 @@ vppcom_session_listen (uint32_t listen_session_index, uint32_t q_len)
 
   vppcom_send_bind_sock (listen_session, listen_session_index);
   VCL_SESSION_UNLOCK ();
-  retval =
-    vppcom_wait_for_session_state_change (listen_session_index, STATE_LISTEN,
-                                         vcm->cfg.session_timeout);
+  retval = vppcom_wait_for_session_state_change (listen_session_index,
+                                                STATE_LISTEN,
+                                                vcm->cfg.session_timeout);
 
   VCL_SESSION_LOCK_AND_GET (listen_session_index, &listen_session);
   if (PREDICT_FALSE (retval))
@@ -693,10 +984,6 @@ vppcom_session_listen (uint32_t listen_session_index, uint32_t q_len)
       goto done;
     }
 
-  VCL_ACCEPT_FIFO_LOCK ();
-  clib_fifo_validate (vcm->client_session_index_fifo, q_len);
-  VCL_ACCEPT_FIFO_UNLOCK ();
-
   VCL_SESSION_UNLOCK ();
 
 done:
@@ -732,101 +1019,67 @@ int
 vppcom_session_accept (uint32_t listen_session_index, vppcom_endpt_t * ep,
                       uint32_t flags)
 {
+  session_accepted_msg_t accepted_msg;
   vcl_session_t *listen_session = 0;
   vcl_session_t *client_session = 0;
   u32 client_session_index = ~0;
-  int rv;
+  svm_msg_q_t *vpp_evt_q;
+  vcl_session_msg_t *evt;
   u64 listen_vpp_handle;
-  vce_event_handler_reg_t *reg;
-  vce_event_t *ev;
-  vce_event_connect_request_t *result;
-  struct timespec ts;
-  struct timeval tv;
-  int millisecond_timeout = 1;
-  int hours_timeout = 20 * 60 * 60;
+  svm_msg_q_msg_t msg;
+  session_event_t *e;
+  u8 is_nonblocking;
+  int rv;
 
   VCL_SESSION_LOCK_AND_GET (listen_session_index, &listen_session);
-  listen_vpp_handle = listen_session->vpp_handle;      // For debugging
 
-  rv = validate_args_session_accept_ (listen_session);
-  if (rv)
+  if (validate_args_session_accept_ (listen_session))
     {
       VCL_SESSION_UNLOCK ();
       goto done;
     }
 
-  /* Using an aggressive timer of 1ms and a generous timer of
-   * 20 hours, we can implement a blocking and non-blocking listener
-   * as both event and time driven */
-  gettimeofday (&tv, NULL);
-  ts.tv_nsec = (tv.tv_usec * 1000) + (1000 * millisecond_timeout);
-  ts.tv_sec = tv.tv_sec;
-
-  /* Predict that the Listener is blocking more often than not */
-  if (PREDICT_TRUE (!VCL_SESS_ATTR_TEST (listen_session->attr,
-                                        VCL_SESS_ATTR_NONBLOCK)))
-    ts.tv_sec += hours_timeout;
-
   VCL_SESSION_UNLOCK ();
 
-  /* Register handler for connect_request event on listen_session_index */
-  vce_event_key_t evk;
-  evk.session_index = listen_session_index;
-  evk.eid = VCL_EVENT_CONNECT_REQ_ACCEPTED;
-  reg = vce_register_handler (&vcm->event_thread, &evk,
-                             vce_connect_request_handler_fn, 0);
-  VCL_EVENTS_LOCK ();
-  ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx);
-  pthread_mutex_lock (&reg->handler_lock);
-  while (!ev)
-    {
-      VCL_EVENTS_UNLOCK ();
-      rv = pthread_cond_timedwait (&reg->handler_cond,
-                                  &reg->handler_lock, &ts);
-      if (rv == ETIMEDOUT)
-       {
-         rv = VPPCOM_EAGAIN;
-         goto cleanup;
-       }
-      VCL_EVENTS_LOCK ();
-      ev = vce_get_event_from_index (&vcm->event_thread, reg->ev_idx);
-    }
-  result = vce_get_event_data (ev, sizeof (*result));
-  client_session_index = result->accepted_session_index;
-  VCL_EVENTS_UNLOCK ();
-
-  /* Remove from the FIFO used to service epoll */
-  VCL_ACCEPT_FIFO_LOCK ();
-  if (clib_fifo_elts (vcm->client_session_index_fifo))
-    {
-      u32 tmp_client_session_index;
-      clib_fifo_sub1 (vcm->client_session_index_fifo,
-                     tmp_client_session_index);
-      /* It wasn't ours... put it back ... */
-      if (tmp_client_session_index != client_session_index)
-       clib_fifo_add1 (vcm->client_session_index_fifo,
-                       tmp_client_session_index);
-    }
-  VCL_ACCEPT_FIFO_UNLOCK ();
+  if (clib_fifo_elts (listen_session->accept_evts_fifo))
+    {
+      clib_fifo_sub2 (listen_session->accept_evts_fifo, evt);
+      accepted_msg = evt->accepted_msg;
+      goto handle;
+    }
 
-  VCL_SESSION_LOCK ();
+  is_nonblocking = VCL_SESS_ATTR_TEST (listen_session->attr,
+                                      VCL_SESS_ATTR_NONBLOCK);
+  if (svm_msg_q_is_empty (vcm->app_event_queue) && is_nonblocking)
+    return VPPCOM_EAGAIN;
 
-  rv = vppcom_session_at_index (client_session_index, &client_session);
-  if (PREDICT_FALSE (rv))
+  while (1)
     {
-      rv = VPPCOM_ECONNABORTED;
-      clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: client sid %u "
-                   "lookup failed! returning %d (%s)", getpid (),
-                   listen_vpp_handle, listen_session_index,
-                   client_session_index, rv, vppcom_retval_str (rv));
-      goto cleanup;
+      if (svm_msg_q_sub (vcm->app_event_queue, &msg, SVM_Q_WAIT, 0))
+       return VPPCOM_EAGAIN;
+
+      e = svm_msg_q_msg_data (vcm->app_event_queue, &msg);
+      if (e->event_type != SESSION_CTRL_EVT_ACCEPTED)
+       {
+         clib_warning ("discarded event: %u", e->event_type);
+         svm_msg_q_free_msg (vcm->app_event_queue, &msg);
+         continue;
+       }
+      clib_memcpy (&accepted_msg, e->data, sizeof (accepted_msg));
+      svm_msg_q_free_msg (vcm->app_event_queue, &msg);
+      break;
     }
 
+handle:
+
+  client_session_index = vcl_session_accepted_handler (&accepted_msg);
+  VCL_SESSION_LOCK_AND_GET (client_session_index, &client_session);
+  rv = client_session_index;
+
   if (flags & O_NONBLOCK)
     VCL_SESS_ATTR_SET (client_session->attr, VCL_SESS_ATTR_NONBLOCK);
-  else
-    VCL_SESS_ATTR_CLR (client_session->attr, VCL_SESS_ATTR_NONBLOCK);
 
+  listen_vpp_handle = listen_session->vpp_handle;
   VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: Got a client request! "
        "vpp handle 0x%llx, sid %u, flags %d, is_nonblocking %u",
        getpid (), listen_vpp_handle, listen_session_index,
@@ -846,46 +1099,31 @@ vppcom_session_accept (uint32_t listen_session_index, vppcom_endpt_t * ep,
                     sizeof (ip6_address_t));
     }
 
-  vppcom_send_accept_session_reply (client_session->vpp_handle,
-                                   client_session->client_context,
-                                   0 /* retval OK */ );
+  if (accepted_msg.server_event_queue_address)
+    vpp_evt_q = uword_to_pointer (accepted_msg.vpp_event_queue_address,
+                                 svm_msg_q_t *);
+  else
+    vpp_evt_q = client_session->vpp_evt_q;
+  vcl_send_session_accepted_reply (vpp_evt_q, client_session->client_context,
+                                  client_session->vpp_handle, 0);
 
-  VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: accepted vpp handle 0x%llx,"
-       " sid %u connection from peer %s address %U port %u to local %s address"
-       " %U port %u",
-       getpid (), listen_vpp_handle,
+  VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: accepted vpp handle 0x%llx, "
+       "sid %u connection from peer %s address %U port %u to local %s "
+       "address %U port %u", getpid (), listen_vpp_handle,
        listen_session_index, client_session->vpp_handle,
        client_session_index,
        client_session->transport.is_ip4 ? "IPv4" : "IPv6",
        format_ip46_address, &client_session->transport.rmt_ip,
-       client_session->transport.is_ip4 ?
-       IP46_TYPE_IP4 : IP46_TYPE_IP6,
+       client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
        clib_net_to_host_u16 (client_session->transport.rmt_port),
        client_session->transport.is_ip4 ? "IPv4" : "IPv6",
        format_ip46_address, &client_session->transport.lcl_ip,
-       client_session->transport.is_ip4 ?
-       IP46_TYPE_IP4 : IP46_TYPE_IP6,
+       client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
        clib_net_to_host_u16 (client_session->transport.lcl_port));
   vcl_evt (VCL_EVT_ACCEPT, client_session, listen_session,
           client_session_index);
   VCL_SESSION_UNLOCK ();
 
-  rv = (int) client_session_index;
-  vce_clear_event (&vcm->event_thread, reg->ev_idx);
-  if (vcm->session_io_thread.io_sessions_lockp)
-    {
-      /* Throw this new accepted session index into the rx poll thread pool */
-      VCL_IO_SESSIONS_LOCK ();
-      u32 *active_session_index;
-      pool_get (vcm->session_io_thread.active_session_indexes,
-               active_session_index);
-      *active_session_index = client_session_index;
-      VCL_IO_SESSIONS_UNLOCK ();
-    }
-cleanup:
-  vce_unregister_handler (&vcm->event_thread, reg);
-  pthread_mutex_unlock (&reg->handler_lock);
-
 done:
   return rv;
 }
@@ -947,9 +1185,8 @@ vppcom_session_connect (uint32_t session_index, vppcom_endpt_t * server_ep)
   vppcom_send_connect_sock (session, session_index);
   VCL_SESSION_UNLOCK ();
 
-  retval =
-    vppcom_wait_for_session_state_change (session_index, STATE_CONNECT,
-                                         vcm->cfg.session_timeout);
+  retval = vppcom_wait_for_session_state_change (session_index, STATE_CONNECT,
+                                                vcm->cfg.session_timeout);
 
   VCL_SESSION_LOCK_AND_GET (session_index, &session);
   vpp_handle = session->vpp_handle;
@@ -978,29 +1215,32 @@ done:
   return rv;
 }
 
+static u8
+vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
+{
+  if (!is_ct)
+    return (e->event_type == FIFO_EVENT_APP_RX
+           && e->fifo->client_session_index == sid);
+  else
+    return (e->event_type == SESSION_IO_EVT_CT_TX);
+}
+
 static inline int
 vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
                              u8 peek)
 {
+  int n_read = 0, rv, is_nonblocking;
   vcl_session_t *session = 0;
   svm_fifo_t *rx_fifo;
-  int n_read = 0;
-  int rv;
-  int is_nonblocking;
-
-  u64 vpp_handle;
-  u32 poll_et;
-  session_state_t state;
+  svm_msg_q_msg_t msg;
+  session_event_t *e;
+  svm_msg_q_t *mq;
+  u8 is_full;
 
   ASSERT (buf);
 
   VCL_SESSION_LOCK_AND_GET (session_index, &session);
 
-  is_nonblocking = VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK);
-  rx_fifo = session->rx_fifo;
-  state = session->session_state;
-  vpp_handle = session->vpp_handle;
-
   if (PREDICT_FALSE (session->is_vep))
     {
       VCL_SESSION_UNLOCK ();
@@ -1010,72 +1250,84 @@ vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
       goto done;
     }
 
-  if (PREDICT_FALSE (!(state & (SERVER_STATE_OPEN | CLIENT_STATE_OPEN))))
+  is_nonblocking = VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK);
+  rx_fifo = session->rx_fifo;
+
+  if (PREDICT_FALSE (!(session->session_state & STATE_OPEN)))
     {
+      session_state_t state = session->session_state;
       VCL_SESSION_UNLOCK ();
       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
 
       VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: %s session is not open! "
            "state 0x%x (%s), returning %d (%s)",
-           getpid (), vpp_handle, session_index, state,
+           getpid (), session->vpp_handle, session_index, state,
            vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
       goto done;
     }
 
   VCL_SESSION_UNLOCK ();
+  mq = session->our_evt_q ? session->our_evt_q : vcm->app_event_queue;
+  is_full = svm_fifo_is_full (rx_fifo);
 
-  do
+  if (svm_fifo_is_empty (rx_fifo))
     {
-      if (peek)
-       n_read = svm_fifo_peek (rx_fifo, 0, n, buf);
-      else
-       n_read = svm_fifo_dequeue_nowait (rx_fifo, n, buf);
-    }
-  while (!is_nonblocking && (n_read <= 0));
-
-  if (n_read <= 0)
-    {
-      VCL_SESSION_LOCK_AND_GET (session_index, &session);
-
-      poll_et = (((EPOLLET | EPOLLIN) & session->vep.ev.events) ==
-                (EPOLLET | EPOLLIN));
-      if (poll_et)
-       session->vep.et_mask |= EPOLLIN;
-
-      if (state & STATE_CLOSE_ON_EMPTY)
+      svm_fifo_unset_event (rx_fifo);
+      if (is_nonblocking)
        {
-         rv = VPPCOM_ECONNRESET;
-
-         VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: Empty fifo with "
-               "session state 0x%x (%s)! Setting state to 0x%x (%s), "
-               "returning %d (%s)",
-               getpid (), session->vpp_handle, session_index,
-               state, vppcom_session_state_str (state),
-               STATE_DISCONNECT,
-               vppcom_session_state_str (STATE_DISCONNECT), rv,
-               vppcom_retval_str (rv));
-
-         session->session_state = STATE_DISCONNECT;
+         rv = VPPCOM_OK;
+         goto done;
+       }
+      svm_msg_q_lock (mq);
+      while (1)
+       {
+         if (svm_msg_q_is_empty (mq))
+           svm_msg_q_wait (mq);
+         svm_msg_q_sub_w_lock (mq, &msg);
+         e = svm_msg_q_msg_data (mq, &msg);
+         if (!vcl_is_rx_evt_for_session (e, session_index,
+                                         session->our_evt_q != 0))
+           {
+             vcl_handle_mq_ctrl_event (e);
+             svm_msg_q_free_msg (mq, &msg);
+             continue;
+           }
+         if (svm_fifo_is_empty (rx_fifo))
+           {
+             svm_msg_q_free_msg (mq, &msg);
+             continue;
+           }
+         svm_msg_q_free_msg (mq, &msg);
+         svm_msg_q_unlock (mq);
+         break;
        }
-      else
-       rv = VPPCOM_EAGAIN;
-
-      VCL_SESSION_UNLOCK ();
     }
+
+  if (peek)
+    n_read = svm_fifo_peek (rx_fifo, 0, n, buf);
   else
-    rv = n_read;
+    n_read = svm_fifo_dequeue_nowait (rx_fifo, n, buf);
+  ASSERT (n_read > 0);
+  svm_fifo_unset_event (rx_fifo);
+
+  if (session->our_evt_q && is_full)
+    app_send_io_evt_to_vpp (session->vpp_evt_q, rx_fifo, SESSION_IO_EVT_CT_RX,
+                           SVM_Q_WAIT);
+
 
   if (VPPCOM_DEBUG > 2)
     {
-      if (rv > 0)
+      if (n_read > 0)
        clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes "
-                     "from (%p)", getpid (), vpp_handle,
+                     "from (%p)", getpid (), session->vpp_handle,
                      session_index, n_read, rx_fifo);
       else
        clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: nothing read! "
-                     "returning %d (%s)", getpid (), vpp_handle,
+                     "returning %d (%s)", getpid (), session->vpp_handle,
                      session_index, rv, vppcom_retval_str (rv));
     }
+  return n_read;
+
 done:
   return rv;
 }
@@ -1093,100 +1345,55 @@ vppcom_session_peek (uint32_t session_index, void *buf, int n)
 }
 
 static inline int
-vppcom_session_read_ready (vcl_session_t * session, u32 session_index)
+vppcom_session_read_ready (vcl_session_t * session)
 {
-  int ready = 0;
-  u32 poll_et;
-  int rv;
-  session_state_t state = session->session_state;
-  u64 vpp_handle = session->vpp_handle;
-
   /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
   if (PREDICT_FALSE (session->is_vep))
     {
       clib_warning ("VCL<%d>: ERROR: sid %u: cannot read from an "
-                   "epoll session!", getpid (), session_index);
-      rv = VPPCOM_EBADFD;
-      goto done;
-    }
-
-  if (session->session_state & STATE_LISTEN)
-    {
-      VCL_ACCEPT_FIFO_LOCK ();
-      ready = clib_fifo_elts (vcm->client_session_index_fifo);
-      VCL_ACCEPT_FIFO_UNLOCK ();
-    }
-  else
-    {
-      if (!(state & (SERVER_STATE_OPEN | CLIENT_STATE_OPEN | STATE_LISTEN)))
-       {
-         rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET :
-               VPPCOM_ENOTCONN);
-
-         VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open!"
-               " state 0x%x (%s), returning %d (%s)",
-               getpid (), vpp_handle, session_index,
-               state, vppcom_session_state_str (state),
-               rv, vppcom_retval_str (rv));
-         goto done;
-       }
-
-      ready = svm_fifo_max_dequeue (session->rx_fifo);
+                   "epoll session!", getpid (), vcl_session_index (session));
+      return VPPCOM_EBADFD;
     }
 
-  if (ready == 0)
+  if (PREDICT_FALSE (!(session->session_state & (STATE_OPEN | STATE_LISTEN))))
     {
-      poll_et =
-       ((EPOLLET | EPOLLIN) & session->vep.ev.events) == (EPOLLET | EPOLLIN);
-      if (poll_et)
-       session->vep.et_mask |= EPOLLIN;
+      session_state_t state = session->session_state;
+      int rv;
 
-      if (state & STATE_CLOSE_ON_EMPTY)
-       {
-         rv = VPPCOM_ECONNRESET;
+      rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
 
-         VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: Empty fifo with "
-               "session state 0x%x (%s)! Setting state to 0x%x (%s), "
-               "returning %d (%s)",
-               getpid (), session_index, vpp_handle,
-               state, vppcom_session_state_str (state),
-               STATE_DISCONNECT,
-               vppcom_session_state_str (STATE_DISCONNECT), rv,
-               vppcom_retval_str (rv));
-         session->session_state = STATE_DISCONNECT;
-         goto done;
-       }
+      VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open!"
+           " state 0x%x (%s), returning %d (%s)", getpid (),
+           session->vpp_handle, vcl_session_index (session), state,
+           vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
+      return rv;
     }
-  rv = ready;
 
-  if (!svm_msg_q_is_empty (vcm->app_event_queue) &&
-      !pthread_mutex_trylock (&vcm->app_event_queue->q->mutex))
-    {
-      u32 i, n_to_dequeue = vcm->app_event_queue->q->cursize;
-      svm_msg_q_msg_t msg;
+  if (session->session_state & STATE_LISTEN)
+    return clib_fifo_elts (session->accept_evts_fifo);
 
-      for (i = 0; i < n_to_dequeue; i++)
-       {
-         svm_queue_sub_raw (vcm->app_event_queue->q, (u8 *) & msg);
-         svm_msg_q_free_msg (vcm->app_event_queue, &msg);
-       }
+  return svm_fifo_max_dequeue (session->rx_fifo);
+}
 
-      pthread_mutex_unlock (&vcm->app_event_queue->q->mutex);
-    }
-done:
-  return rv;
+static u8
+vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
+{
+  if (!is_ct)
+    return (e->event_type == FIFO_EVENT_APP_TX
+           && e->fifo->client_session_index == sid);
+  else
+    return (e->event_type == SESSION_IO_EVT_CT_RX);
 }
 
 int
 vppcom_session_write (uint32_t session_index, void *buf, size_t n)
 {
+  int rv, n_write, is_nonblocking;
   vcl_session_t *session = 0;
   svm_fifo_t *tx_fifo = 0;
+  svm_msg_q_msg_t msg;
+  session_event_t *e;
   svm_msg_q_t *mq;
-  session_state_t state;
-  int rv, n_write, is_nonblocking;
-  u32 poll_et;
-  u64 vpp_handle;
 
   ASSERT (buf);
 
@@ -1194,105 +1401,98 @@ vppcom_session_write (uint32_t session_index, void *buf, size_t n)
 
   tx_fifo = session->tx_fifo;
   is_nonblocking = VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK);
-  vpp_handle = session->vpp_handle;
-  state = session->session_state;
 
   if (PREDICT_FALSE (session->is_vep))
     {
       VCL_SESSION_UNLOCK ();
       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
                    "cannot write to an epoll session!",
-                   getpid (), vpp_handle, session_index);
+                   getpid (), session->vpp_handle, session_index);
 
       rv = VPPCOM_EBADFD;
       goto done;
     }
 
-  if (!(session->session_state & (SERVER_STATE_OPEN | CLIENT_STATE_OPEN)))
+  if (!(session->session_state & STATE_OPEN))
     {
-      rv =
-       ((session->session_state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET :
-        VPPCOM_ENOTCONN);
-
+      session_state_t state = session->session_state;
+      rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
       VCL_SESSION_UNLOCK ();
       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open! "
            "state 0x%x (%s)",
-           getpid (), vpp_handle, session_index,
+           getpid (), session->vpp_handle, session_index,
            state, vppcom_session_state_str (state));
       goto done;
     }
 
   VCL_SESSION_UNLOCK ();
 
-  do
+  mq = session->our_evt_q ? session->our_evt_q : vcm->app_event_queue;
+  if (svm_fifo_is_full (tx_fifo))
     {
-      n_write = svm_fifo_enqueue_nowait (tx_fifo, n, (void *) buf);
+      if (is_nonblocking)
+       {
+         rv = VPPCOM_EWOULDBLOCK;
+         goto done;
+       }
+      svm_msg_q_lock (mq);
+      while (1)
+       {
+         if (!svm_fifo_is_full (tx_fifo))
+           {
+             svm_msg_q_unlock (mq);
+             break;
+           }
+         if (svm_msg_q_is_empty (mq) && svm_msg_q_timedwait (mq, 10e-6))
+           continue;
+         svm_msg_q_sub_w_lock (mq, &msg);
+         e = svm_msg_q_msg_data (mq, &msg);
+         if (!vcl_is_tx_evt_for_session (e, session_index,
+                                         session->our_evt_q != 0))
+           {
+             vcl_handle_mq_ctrl_event (e);
+             svm_msg_q_free_msg (mq, &msg);
+             continue;
+           }
+         if (svm_fifo_is_full (tx_fifo))
+           {
+             svm_msg_q_free_msg (mq, &msg);
+             continue;
+           }
+         svm_msg_q_free_msg (mq, &msg);
+         svm_msg_q_unlock (mq);
+         break;
+       }
     }
-  while (!is_nonblocking && (n_write <= 0));
 
-  /* If event wasn't set, add one
-   *
-   * To reduce context switching, can check if an
-   * event is already there for this event_key, but for now
-   * this will suffice. */
+  n_write = svm_fifo_enqueue_nowait (tx_fifo, n, (void *) buf);
+  ASSERT (n_write > 0);
 
-  if ((n_write > 0) && svm_fifo_set_event (tx_fifo))
+  if (svm_fifo_set_event (tx_fifo))
     {
-      /* Send TX event to vpp */
+      session_evt_type_t et;
       VCL_SESSION_LOCK_AND_GET (session_index, &session);
-      mq = session->vpp_evt_q;
-      ASSERT (mq);
-      app_send_io_evt_to_vpp (mq, tx_fifo, FIFO_EVENT_APP_TX, SVM_Q_WAIT);
+      et = session->our_evt_q ? SESSION_IO_EVT_CT_TX : FIFO_EVENT_APP_TX;
+      app_send_io_evt_to_vpp (session->vpp_evt_q, tx_fifo, et, SVM_Q_WAIT);
       VCL_SESSION_UNLOCK ();
       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: added FIFO_EVENT_APP_TX "
            "to vpp_event_q %p, n_write %d", getpid (),
-           vpp_handle, session_index, mq, n_write);
-    }
-
-  if (n_write <= 0)
-    {
-      VCL_SESSION_LOCK_AND_GET (session_index, &session);
-
-      poll_et = (((EPOLLET | EPOLLOUT) & session->vep.ev.events) ==
-                (EPOLLET | EPOLLOUT));
-      if (poll_et)
-       session->vep.et_mask |= EPOLLOUT;
-
-      if (session->session_state & STATE_CLOSE_ON_EMPTY)
-       {
-         rv = VPPCOM_ECONNRESET;
-
-         VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: Empty fifo with "
-               "session state 0x%x (%s)! Setting state to 0x%x (%s), "
-               "returning %d (%s)",
-               getpid (), session->vpp_handle, session_index,
-               session->session_state,
-               vppcom_session_state_str (session->session_state),
-               STATE_DISCONNECT,
-               vppcom_session_state_str (STATE_DISCONNECT), rv,
-               vppcom_retval_str (rv));
-
-         session->session_state = STATE_DISCONNECT;
-       }
-      else
-       rv = VPPCOM_EAGAIN;
-
-      VCL_SESSION_UNLOCK ();
+           session->vpp_handle, session_index, session->vpp_evt_q, n_write);
     }
-  else
-    rv = n_write;
 
   if (VPPCOM_DEBUG > 2)
     {
       if (n_write <= 0)
        clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
-                     "FIFO-FULL (%p)", getpid (), vpp_handle,
+                     "FIFO-FULL (%p)", getpid (), session->vpp_handle,
                      session_index, tx_fifo);
       else
        clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
                      "wrote %d bytes tx-fifo: (%p)", getpid (),
-                     vpp_handle, session_index, n_write, tx_fifo);
+                     session->vpp_handle, session_index, n_write, tx_fifo);
     }
+  return n_write;
+
 done:
   return rv;
 }
@@ -1300,20 +1500,13 @@ done:
 static inline int
 vppcom_session_write_ready (vcl_session_t * session, u32 session_index)
 {
-  int ready;
-  u32 poll_et;
-  int rv;
-
-  ASSERT (session);
-
   /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
   if (PREDICT_FALSE (session->is_vep))
     {
       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
                    "cannot write to an epoll session!",
                    getpid (), session->vpp_handle, session_index);
-      rv = VPPCOM_EBADFD;
-      goto done;
+      return VPPCOM_EBADFD;
     }
 
   if (PREDICT_FALSE (session->session_state & STATE_LISTEN))
@@ -1321,58 +1514,168 @@ vppcom_session_write_ready (vcl_session_t * session, u32 session_index)
       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
                    "cannot write to a listen session!",
                    getpid (), session->vpp_handle, session_index);
-      rv = VPPCOM_EBADFD;
-      goto done;
+      return VPPCOM_EBADFD;
     }
 
-  if (!(session->session_state & (SERVER_STATE_OPEN | CLIENT_STATE_OPEN)))
+  if (PREDICT_FALSE (!(session->session_state & STATE_OPEN)))
     {
       session_state_t state = session->session_state;
+      int rv;
 
       rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
-
       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
                    "session is not open! state 0x%x (%s), "
                    "returning %d (%s)", getpid (), session->vpp_handle,
                    session_index,
                    state, vppcom_session_state_str (state),
                    rv, vppcom_retval_str (rv));
-      goto done;
+      return rv;
     }
 
-  ready = svm_fifo_max_enqueue (session->tx_fifo);
-
   VDBG (3, "VCL<%d>: vpp handle 0x%llx, sid %u: peek %s (%p), ready = %d",
        getpid (), session->vpp_handle, session_index, session->tx_fifo,
-       ready);
+       svm_fifo_max_enqueue (session->tx_fifo));
 
-  if (ready == 0)
+  return svm_fifo_max_enqueue (session->tx_fifo);
+}
+
+static int
+vcl_select_handle_mq (svm_msg_q_t * mq, unsigned long n_bits,
+                     unsigned long *read_map, unsigned long *write_map,
+                     unsigned long *except_map, double time_to_wait,
+                     u32 * bits_set)
+{
+  session_disconnected_msg_t *disconnected_msg;
+  session_accepted_msg_t *accepted_msg;
+  vcl_session_msg_t *vcl_msg;
+  vcl_session_t *session;
+  svm_msg_q_msg_t msg;
+  session_event_t *e;
+  u32 n_msgs, i, sid;
+  u64 handle;
+
+  svm_msg_q_lock (mq);
+  if (svm_msg_q_is_empty (mq))
     {
-      poll_et = (((EPOLLET | EPOLLOUT) & session->vep.ev.events) ==
-                (EPOLLET | EPOLLOUT));
-      if (poll_et)
-       session->vep.et_mask |= EPOLLOUT;
+      if (*bits_set)
+       {
+         svm_msg_q_unlock (mq);
+         return 0;
+       }
 
-      if (session->session_state & STATE_CLOSE_ON_EMPTY)
+      if (!time_to_wait)
+       {
+         svm_msg_q_unlock (mq);
+         return 0;
+       }
+      else if (time_to_wait < 0)
        {
-         rv = VPPCOM_ECONNRESET;
+         svm_msg_q_wait (mq);
+       }
+      else
+       {
+         if (svm_msg_q_timedwait (mq, time_to_wait))
+           {
+             svm_msg_q_unlock (mq);
+             return 0;
+           }
+       }
+    }
+  svm_msg_q_unlock (mq);
 
-         VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: Empty fifo with "
-               "session state 0x%x (%s)! Setting state to 0x%x (%s), "
-               "returning %d (%s)", getpid (),
-               session->vpp_handle, session_index,
-               session->session_state,
-               vppcom_session_state_str (session->session_state),
-               STATE_DISCONNECT,
-               vppcom_session_state_str (STATE_DISCONNECT), rv,
-               vppcom_retval_str (rv));
-         session->session_state = STATE_DISCONNECT;
-         goto done;
+  n_msgs = svm_msg_q_size (mq);
+  for (i = 0; i < n_msgs; i++)
+    {
+      if (svm_msg_q_sub (mq, &msg, SVM_Q_WAIT, 0))
+       {
+         clib_warning ("message queue returned");
+         continue;
+       }
+      e = svm_msg_q_msg_data (mq, &msg);
+      switch (e->event_type)
+       {
+       case FIFO_EVENT_APP_RX:
+         sid = e->fifo->client_session_index;
+         session = vcl_session_get (sid);
+         if (!session || svm_fifo_is_empty (session->rx_fifo))
+           break;
+         if (sid < n_bits && read_map)
+           {
+             clib_bitmap_set_no_check (read_map, sid, 1);
+             *bits_set += 1;
+           }
+         break;
+       case FIFO_EVENT_APP_TX:
+         sid = e->fifo->client_session_index;
+         session = vcl_session_get (sid);
+         if (!session || svm_fifo_is_full (session->tx_fifo))
+           break;
+         if (sid < n_bits && write_map)
+           {
+             clib_bitmap_set_no_check (write_map, sid, 1);
+             *bits_set += 1;
+           }
+         break;
+       case SESSION_IO_EVT_CT_TX:
+         session = vcl_ct_session_get_from_fifo (e->fifo, 0);
+         sid = vcl_session_index (session);
+         if (!session || svm_fifo_is_empty (session->rx_fifo))
+           break;
+         if (sid < n_bits && read_map)
+           {
+             clib_bitmap_set_no_check (read_map, sid, 1);
+             *bits_set += 1;
+           }
+         break;
+         break;
+       case SESSION_IO_EVT_CT_RX:
+         session = vcl_ct_session_get_from_fifo (e->fifo, 1);
+         sid = vcl_session_index (session);
+         if (!session || svm_fifo_is_full (session->tx_fifo))
+           break;
+         if (sid < n_bits && write_map)
+           {
+             clib_bitmap_set_no_check (write_map, sid, 1);
+             *bits_set += 1;
+           }
+         break;
+       case SESSION_CTRL_EVT_ACCEPTED:
+         accepted_msg = (session_accepted_msg_t *) e->data;
+         handle = accepted_msg->listener_handle;
+         session = vppcom_session_table_lookup_listener (handle);
+         if (!session)
+           {
+             clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
+                           "listener handle %llx", getpid (), handle);
+             break;
+           }
+
+         clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
+         vcl_msg->accepted_msg = *accepted_msg;
+         sid = session - vcm->sessions;
+         if (sid < n_bits && read_map)
+           {
+             clib_bitmap_set_no_check (read_map, sid, 1);
+             *bits_set += 1;
+           }
+         break;
+       case SESSION_CTRL_EVT_DISCONNECTED:
+         disconnected_msg = (session_disconnected_msg_t *) e->data;
+         sid = vcl_session_get_index_from_handle (disconnected_msg->handle);
+         if (sid < n_bits && except_map)
+           {
+             clib_bitmap_set_no_check (except_map, sid, 1);
+             *bits_set += 1;
+           }
+         break;
+       default:
+         clib_warning ("unhandled: %u", e->event_type);
+         break;
        }
+      svm_msg_q_free_msg (mq, &msg);
     }
-  rv = ready;
-done:
-  return rv;
+
+  return *bits_set;
 }
 
 int
@@ -1380,11 +1683,11 @@ vppcom_select (unsigned long n_bits, unsigned long *read_map,
               unsigned long *write_map, unsigned long *except_map,
               double time_to_wait)
 {
-  u32 session_index;
+  u32 sid, minbits = clib_max (n_bits, BITS (uword)), bits_set = 0;
+  vcl_cut_through_registration_t *cr;
+  double total_wait = 0, wait_slice;
   vcl_session_t *session = 0;
-  int rv, bits_set = 0;
-  f64 timeout = clib_time_now (&vcm->clib_time) + time_to_wait;
-  u32 minbits = clib_max (n_bits, BITS (uword));
+  int rv;
 
   ASSERT (sizeof (clib_bitmap_t) == sizeof (long int));
 
@@ -1412,124 +1715,74 @@ vppcom_select (unsigned long n_bits, unsigned long *read_map,
              vec_len (vcm->ex_bitmap) * sizeof (clib_bitmap_t));
     }
 
+  if (!n_bits)
+    return 0;
+
+  if (!write_map)
+    goto check_rd;
+
+  /* *INDENT-OFF* */
+  clib_bitmap_foreach (sid, vcm->wr_bitmap, ({
+    VCL_SESSION_LOCK();
+    if (!(session = vcl_session_get (sid)))
+      {
+        VCL_SESSION_UNLOCK();
+        VDBG (0, "VCL<%d>: session %d specified in write_map is closed.",
+              getpid (), sid);
+        return VPPCOM_EBADFD;
+      }
+
+    rv = svm_fifo_is_full (session->tx_fifo);
+    VCL_SESSION_UNLOCK();
+    if (!rv)
+      {
+        clib_bitmap_set_no_check (write_map, sid, 1);
+        bits_set++;
+      }
+  }));
+
+check_rd:
+  if (!read_map)
+    goto check_mq;
+  clib_bitmap_foreach (sid, vcm->rd_bitmap, ({
+    VCL_SESSION_LOCK();
+    if (!(session = vcl_session_get (sid)))
+      {
+        VCL_SESSION_UNLOCK();
+        VDBG (0, "VCL<%d>: session %d specified in write_map is closed.",
+              getpid (), sid);
+        return VPPCOM_EBADFD;
+      }
+
+    rv = vppcom_session_read_ready (session);
+    VCL_SESSION_UNLOCK();
+    if (rv)
+      {
+        clib_bitmap_set_no_check (read_map, sid, 1);
+        bits_set++;
+      }
+  }));
+  /* *INDENT-ON* */
+
+check_mq:
+  wait_slice = vcm->cut_through_registrations ? 10e-6 : time_to_wait;
   do
     {
       /* *INDENT-OFF* */
-      if (n_bits)
-        {
-          if (read_map)
-            {
-              clib_bitmap_foreach (session_index, vcm->rd_bitmap,
-                ({
-                  VCL_SESSION_LOCK();
-                  rv = vppcom_session_at_index (session_index, &session);
-                  if (rv < 0)
-                    {
-                      VCL_SESSION_UNLOCK();
-                      VDBG (1, "VCL<%d>: session %d specified in read_map is"
-                         " closed.", getpid (),
-                                      session_index);
-                      bits_set = VPPCOM_EBADFD;
-                      goto select_done;
-                    }
-                  if (session->session_state & STATE_LISTEN)
-                    {
-                      vce_event_handler_reg_t *reg = 0;
-                      vce_event_key_t evk;
-
-                      /* Check if handler already registered for this
-                       * event.
-                       * If not, register handler for connect_request event
-                       * on listen_session_index
-                       */
-                      evk.session_index = session_index;
-                      evk.eid = VCL_EVENT_CONNECT_REQ_ACCEPTED;
-                      reg = vce_get_event_handler (&vcm->event_thread, &evk);
-                      if (!reg)
-                        reg = vce_register_handler (&vcm->event_thread, &evk,
-                                    vce_poll_wait_connect_request_handler_fn,
-                                                   0 /* No callback args */);
-                      rv = vppcom_session_read_ready (session, session_index);
-                      if (rv > 0)
-                        {
-                          vce_unregister_handler (&vcm->event_thread, reg);
-                        }
-                    }
-                  else
-                    rv = vppcom_session_read_ready (session, session_index);
-                  VCL_SESSION_UNLOCK();
-                  if (except_map && vcm->ex_bitmap &&
-                      clib_bitmap_get (vcm->ex_bitmap, session_index) &&
-                      (rv < 0))
-                    {
-                      clib_bitmap_set_no_check (except_map, session_index, 1);
-                      bits_set++;
-                    }
-                  else if (rv > 0)
-                    {
-                      clib_bitmap_set_no_check (read_map, session_index, 1);
-                      bits_set++;
-                    }
-                }));
-            }
-
-          if (write_map)
-            {
-              clib_bitmap_foreach (session_index, vcm->wr_bitmap,
-                ({
-                  VCL_SESSION_LOCK();
-                  rv = vppcom_session_at_index (session_index, &session);
-                  if (rv < 0)
-                    {
-                      VCL_SESSION_UNLOCK();
-                      VDBG (0, "VCL<%d>: session %d specified in "
-                                      "write_map is closed.", getpid (),
-                                      session_index);
-                      bits_set = VPPCOM_EBADFD;
-                      goto select_done;
-                    }
-
-                  rv = vppcom_session_write_ready (session, session_index);
-                  VCL_SESSION_UNLOCK();
-                  if (write_map && (rv > 0))
-                    {
-                      clib_bitmap_set_no_check (write_map, session_index, 1);
-                      bits_set++;
-                    }
-                }));
-            }
-
-          if (except_map)
-            {
-              clib_bitmap_foreach (session_index, vcm->ex_bitmap,
-                ({
-                  VCL_SESSION_LOCK();
-                  rv = vppcom_session_at_index (session_index, &session);
-                  if (rv < 0)
-                    {
-                      VCL_SESSION_UNLOCK();
-                      VDBG (1, "VCL<%d>: session %d specified in except_map "
-                         "is closed.", getpid (),
-                                      session_index);
-                      bits_set = VPPCOM_EBADFD;
-                      goto select_done;
-                    }
-
-                  rv = vppcom_session_read_ready (session, session_index);
-                  VCL_SESSION_UNLOCK();
-                  if (rv < 0)
-                    {
-                      clib_bitmap_set_no_check (except_map, session_index, 1);
-                      bits_set++;
-                    }
-                }));
-            }
-        }
+      pool_foreach (cr, vcm->cut_through_registrations, ({
+       vcl_select_handle_mq (cr->mq, n_bits, read_map, write_map, except_map,
+                             0, &bits_set);
+      }));
       /* *INDENT-ON* */
+
+      vcl_select_handle_mq (vcm->app_event_queue, n_bits, read_map, write_map,
+                           except_map, time_to_wait, &bits_set);
+      total_wait += wait_slice;
+      if (bits_set)
+       return bits_set;
     }
-  while ((time_to_wait == -1) || (clib_time_now (&vcm->clib_time) < timeout));
+  while (total_wait < time_to_wait);
 
-select_done:
   return (bits_set);
 }
 
@@ -1857,193 +2110,230 @@ done:
   return rv;
 }
 
-int
-vppcom_epoll_wait (uint32_t vep_idx, struct epoll_event *events,
-                  int maxevents, double wait_for_time)
+static int
+vcl_epoll_wait_handle_mq (svm_msg_q_t * mq, struct epoll_event *events,
+                         u32 maxevents, double wait_for_time, u32 * num_ev)
 {
-  vcl_session_t *vep_session;
-  int rv;
-  f64 timeout = clib_time_now (&vcm->clib_time) + wait_for_time;
-  u32 keep_trying = 1;
-  int num_ev = 0;
-  u32 vep_next_sid, wait_cont_idx;
-  u8 is_vep;
-
-  if (PREDICT_FALSE (maxevents <= 0))
-    {
-      clib_warning ("VCL<%d>: ERROR: Invalid maxevents (%d)!",
-                   getpid (), maxevents);
-      return VPPCOM_EINVAL;
-    }
-  memset (events, 0, sizeof (*events) * maxevents);
-
-  VCL_SESSION_LOCK_AND_GET (vep_idx, &vep_session);
-  vep_next_sid = vep_session->vep.next_sid;
-  is_vep = vep_session->is_vep;
-  wait_cont_idx = vep_session->wait_cont_idx;
-  VCL_SESSION_UNLOCK ();
+  session_disconnected_msg_t *disconnected_msg;
+  session_connected_msg_t *connected_msg;
+  session_accepted_msg_t *accepted_msg;
+  u32 sid = ~0, session_events, n_msgs;
+  u64 session_evt_data = ~0, handle;
+  vcl_session_msg_t *vcl_msg;
+  vcl_session_t *session;
+  svm_msg_q_msg_t msg;
+  session_event_t *e;
+  u8 add_event;
+  int i;
 
-  if (PREDICT_FALSE (!is_vep))
-    {
-      clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
-                   getpid (), vep_idx);
-      rv = VPPCOM_EINVAL;
-      goto done;
-    }
-  if (PREDICT_FALSE (vep_next_sid == ~0))
+  svm_msg_q_lock (mq);
+  if (svm_msg_q_is_empty (mq))
     {
-      VDBG (1, "VCL<%d>: WARNING: vep_idx (%u) is empty!",
-           getpid (), vep_idx);
-      goto done;
+      if (!wait_for_time)
+       {
+         svm_msg_q_unlock (mq);
+         return 0;
+       }
+      else if (wait_for_time < 0)
+       {
+         svm_msg_q_wait (mq);
+       }
+      else
+       {
+         if (svm_msg_q_timedwait (mq, wait_for_time / 1e3))
+           {
+             svm_msg_q_unlock (mq);
+             return 0;
+           }
+       }
     }
+  svm_msg_q_unlock (mq);
 
-  do
+  n_msgs = svm_msg_q_size (mq);
+  for (i = 0; i < n_msgs; i++)
     {
-      u32 sid;
-      u32 next_sid = ~0;
-      vcl_session_t *session;
-
-      for (sid = (wait_cont_idx == ~0) ? vep_next_sid : wait_cont_idx;
-          sid != ~0; sid = next_sid)
+      if (svm_msg_q_sub (mq, &msg, SVM_Q_WAIT, 0))
        {
-         u32 session_events, et_mask, clear_et_mask, session_vep_idx;
-         u8 add_event, is_vep_session;
-         int ready;
-         u64 session_ev_data;
-
-         VCL_SESSION_LOCK_AND_GET (sid, &session);
-         next_sid = session->vep.next_sid;
+         clib_warning ("message queue returned");
+         continue;
+       }
+      e = svm_msg_q_msg_data (mq, &msg);
+      add_event = 0;
+      switch (e->event_type)
+       {
+       case FIFO_EVENT_APP_RX:
+         sid = e->fifo->client_session_index;
+         clib_spinlock_lock (&vcm->sessions_lockp);
+         session = vcl_session_get (sid);
          session_events = session->vep.ev.events;
-         et_mask = session->vep.et_mask;
-         is_vep = session->is_vep;
-         is_vep_session = session->is_vep_session;
-         session_vep_idx = session->vep.vep_idx;
-         session_ev_data = session->vep.ev.data.u64;
-
-         VCL_SESSION_UNLOCK ();
-
-         if (PREDICT_FALSE (is_vep))
+         if ((EPOLLIN & session->vep.ev.events)
+             && !svm_fifo_is_empty (session->rx_fifo))
            {
-             VDBG (0, "VCL<%d>: ERROR: sid (%u) is a vep!",
-                   getpid (), vep_idx);
-             rv = VPPCOM_EINVAL;
-             goto done;
+             add_event = 1;
+             events[*num_ev].events |= EPOLLIN;
+             session_evt_data = session->vep.ev.data.u64;
            }
-         if (PREDICT_FALSE (!is_vep_session))
+         clib_spinlock_unlock (&vcm->sessions_lockp);
+         break;
+       case FIFO_EVENT_APP_TX:
+         sid = e->fifo->client_session_index;
+         clib_spinlock_lock (&vcm->sessions_lockp);
+         session = vcl_session_get (sid);
+         session_events = session->vep.ev.events;
+         if ((EPOLLOUT & session_events)
+             && !svm_fifo_is_full (session->tx_fifo))
            {
-             VDBG (0, "VCL<%d>: ERROR: session (%u) is not "
-                   "a vep session!", getpid (), sid);
-             rv = VPPCOM_EINVAL;
-             goto done;
+             add_event = 1;
+             events[*num_ev].events |= EPOLLOUT;
+             session_evt_data = session->vep.ev.data.u64;
            }
-         if (PREDICT_FALSE (session_vep_idx != vep_idx))
+         clib_spinlock_unlock (&vcm->sessions_lockp);
+         break;
+       case SESSION_IO_EVT_CT_TX:
+         session = vcl_ct_session_get_from_fifo (e->fifo, 0);
+         sid = vcl_session_index (session);
+         session_events = session->vep.ev.events;
+         if ((EPOLLIN & session->vep.ev.events)
+             && !svm_fifo_is_empty (session->rx_fifo))
            {
-             clib_warning ("VCL<%d>: ERROR: session (%u) "
-                           "vep_idx (%u) != vep_idx (%u)!",
-                           getpid (), sid, session_vep_idx, vep_idx);
-             rv = VPPCOM_EINVAL;
-             goto done;
+             add_event = 1;
+             events[*num_ev].events |= EPOLLIN;
+             session_evt_data = session->vep.ev.data.u64;
            }
-
-         add_event = clear_et_mask = 0;
-
-         if (EPOLLIN & session_events)
+         break;
+       case SESSION_IO_EVT_CT_RX:
+         session = vcl_ct_session_get_from_fifo (e->fifo, 1);
+         sid = vcl_session_index (session);
+         session_events = session->vep.ev.events;
+         if ((EPOLLOUT & session_events)
+             && !svm_fifo_is_full (session->tx_fifo))
            {
-             VCL_SESSION_LOCK_AND_GET (sid, &session);
-             ready = vppcom_session_read_ready (session, sid);
-             VCL_SESSION_UNLOCK ();
-             if ((ready > 0) && (EPOLLIN & et_mask))
-               {
-                 add_event = 1;
-                 events[num_ev].events |= EPOLLIN;
-                 if (((EPOLLET | EPOLLIN) & session_events) ==
-                     (EPOLLET | EPOLLIN))
-                   clear_et_mask |= EPOLLIN;
-               }
-             else if (ready < 0)
-               {
-                 add_event = 1;
-                 switch (ready)
-                   {
-                   case VPPCOM_ECONNRESET:
-                     events[num_ev].events |= EPOLLHUP | EPOLLRDHUP;
-                     break;
-
-                   default:
-                     events[num_ev].events |= EPOLLERR;
-                     break;
-                   }
-               }
+             add_event = 1;
+             events[*num_ev].events |= EPOLLOUT;
+             session_evt_data = session->vep.ev.data.u64;
            }
-
-         if (EPOLLOUT & session_events)
+         break;
+       case SESSION_CTRL_EVT_ACCEPTED:
+         accepted_msg = (session_accepted_msg_t *) e->data;
+         handle = accepted_msg->listener_handle;
+         session = vppcom_session_table_lookup_listener (handle);
+         if (!session)
            {
-             VCL_SESSION_LOCK_AND_GET (sid, &session);
-             ready = vppcom_session_write_ready (session, sid);
-             VCL_SESSION_UNLOCK ();
-             if ((ready > 0) && (EPOLLOUT & et_mask))
-               {
-                 add_event = 1;
-                 events[num_ev].events |= EPOLLOUT;
-                 if (((EPOLLET | EPOLLOUT) & session_events) ==
-                     (EPOLLET | EPOLLOUT))
-                   clear_et_mask |= EPOLLOUT;
-               }
-             else if (ready < 0)
-               {
-                 add_event = 1;
-                 switch (ready)
-                   {
-                   case VPPCOM_ECONNRESET:
-                     events[num_ev].events |= EPOLLHUP;
-                     break;
-
-                   default:
-                     events[num_ev].events |= EPOLLERR;
-                     break;
-                   }
-               }
+             clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
+                           "listener handle %llx", getpid (), handle);
+             break;
            }
 
-         if (add_event)
+         clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
+         vcl_msg->accepted_msg = *accepted_msg;
+         session_events = session->vep.ev.events;
+         if (!(EPOLLIN & session_events))
+           break;
+
+         add_event = 1;
+         events[*num_ev].events |= EPOLLIN;
+         session_evt_data = session->vep.ev.data.u64;
+         break;
+       case SESSION_CTRL_EVT_CONNECTED:
+         connected_msg = (session_connected_msg_t *) e->data;
+         vcl_session_connected_handler (connected_msg);
+         /* Generate EPOLLOUT because there's no connected event */
+         sid = vcl_session_get_index_from_handle (connected_msg->handle);
+         clib_spinlock_lock (&vcm->sessions_lockp);
+         session = vcl_session_get (sid);
+         session_events = session->vep.ev.events;
+         if (EPOLLOUT & session_events)
            {
-             events[num_ev].data.u64 = session_ev_data;
-             if (EPOLLONESHOT & session_events)
-               {
-                 VCL_SESSION_LOCK_AND_GET (sid, &session);
-                 session->vep.ev.events = 0;
-                 VCL_SESSION_UNLOCK ();
-               }
-             num_ev++;
-             if (num_ev == maxevents)
-               {
-                 VCL_SESSION_LOCK_AND_GET (vep_idx, &vep_session);
-                 vep_session->wait_cont_idx = next_sid;
-                 VCL_SESSION_UNLOCK ();
-                 goto done;
-               }
+             add_event = 1;
+             events[*num_ev].events |= EPOLLOUT;
+             session_evt_data = session->vep.ev.data.u64;
            }
-         if (wait_cont_idx != ~0)
+         clib_spinlock_unlock (&vcm->sessions_lockp);
+         break;
+       case SESSION_CTRL_EVT_DISCONNECTED:
+         disconnected_msg = (session_disconnected_msg_t *) e->data;
+         sid = vcl_session_get_index_from_handle (disconnected_msg->handle);
+         clib_spinlock_lock (&vcm->sessions_lockp);
+         session = vcl_session_get (sid);
+         add_event = 1;
+         events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
+         session_evt_data = session->vep.ev.data.u64;
+         session_events = session->vep.ev.events;
+         clib_spinlock_unlock (&vcm->sessions_lockp);
+         break;
+       default:
+         clib_warning ("unhandled: %u", e->event_type);
+         svm_msg_q_free_msg (mq, &msg);
+         continue;
+       }
+
+      svm_msg_q_free_msg (mq, &msg);
+
+      if (add_event)
+       {
+         events[*num_ev].data.u64 = session_evt_data;
+         if (EPOLLONESHOT & session_events)
            {
-             if (next_sid == ~0)
-               next_sid = vep_next_sid;
-             else if (next_sid == wait_cont_idx)
-               next_sid = ~0;
+             clib_spinlock_lock (&vcm->sessions_lockp);
+             session = vcl_session_get (sid);
+             session->vep.ev.events = 0;
+             clib_spinlock_unlock (&vcm->sessions_lockp);
            }
+         *num_ev += 1;
+         if (*num_ev == maxevents)
+           break;
        }
-      if (wait_for_time != -1)
-       keep_trying = (clib_time_now (&vcm->clib_time) <= timeout) ? 1 : 0;
     }
-  while ((num_ev == 0) && keep_trying);
+  return *num_ev;
+}
 
-  if (wait_cont_idx != ~0)
+int
+vppcom_epoll_wait (uint32_t vep_idx, struct epoll_event *events,
+                  int maxevents, double wait_for_time)
+{
+  vcl_cut_through_registration_t *cr;
+  vcl_session_t *vep_session;
+  double total_wait = 0, wait_slice;
+  u32 num_ev = 0;
+
+  if (PREDICT_FALSE (maxevents <= 0))
     {
-      VCL_SESSION_LOCK_AND_GET (vep_idx, &vep_session);
-      vep_session->wait_cont_idx = ~0;
-      VCL_SESSION_UNLOCK ();
+      clib_warning ("VCL<%d>: ERROR: Invalid maxevents (%d)!",
+                   getpid (), maxevents);
+      return VPPCOM_EINVAL;
     }
-done:
-  return (rv != VPPCOM_OK) ? rv : num_ev;
+
+  clib_spinlock_lock (&vcm->sessions_lockp);
+  vep_session = vcl_session_get (vep_idx);
+  if (PREDICT_FALSE (!vep_session->is_vep))
+    {
+      clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
+                   getpid (), vep_idx);
+      clib_spinlock_unlock (&vcm->sessions_lockp);
+      return VPPCOM_EINVAL;
+    }
+  clib_spinlock_unlock (&vcm->sessions_lockp);
+
+  memset (events, 0, sizeof (*events) * maxevents);
+  wait_slice = vcm->cut_through_registrations ? 10e-6 : wait_for_time;
+
+  do
+    {
+      /* *INDENT-OFF* */
+      pool_foreach (cr, vcm->cut_through_registrations, ({
+        vcl_epoll_wait_handle_mq (cr->mq, events, maxevents, 0, &num_ev);
+      }));
+      /* *INDENT-ON* */
+
+      vcl_epoll_wait_handle_mq (vcm->app_event_queue, events, maxevents,
+                               num_ev ? 0 : wait_slice, &num_ev);
+      total_wait += wait_slice;
+      if (num_ev)
+       return num_ev;
+    }
+  while (total_wait < wait_for_time);
+
+  return num_ev;
 }
 
 int
@@ -2062,7 +2352,7 @@ vppcom_session_attr (uint32_t session_index, uint32_t op,
   switch (op)
     {
     case VPPCOM_ATTR_GET_NREAD:
-      rv = vppcom_session_read_ready (session, session_index);
+      rv = vppcom_session_read_ready (session);
       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_NREAD: sid %u, nread = %d",
            getpid (), rv);
       break;
@@ -2678,7 +2968,7 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
          if (POLLIN & vp[i].events)
            {
              VCL_SESSION_LOCK_AND_GET (vp[i].sid, &session);
-             rv = vppcom_session_read_ready (session, vp[i].sid);
+             rv = vppcom_session_read_ready (session);
              VCL_SESSION_UNLOCK ();
              if (rv > 0)
                {
index 4a78b93..8d414f6 100644 (file)
@@ -87,6 +87,7 @@ typedef enum
 {
   VPPCOM_OK = 0,
   VPPCOM_EAGAIN = -EAGAIN,
+  VPPCOM_EWOULDBLOCK = -EWOULDBLOCK,
   VPPCOM_EFAULT = -EFAULT,
   VPPCOM_ENOMEM = -ENOMEM,
   VPPCOM_EINVAL = -EINVAL,
index 5f18bd2..806e390 100644 (file)
@@ -849,12 +849,11 @@ app_send_io_evt_rx (application_t * app, stream_session_t * s, u8 lock)
       return 0;
     }
 
-  /* Built-in app? Hand event to the callback... */
   if (app->cb_fns.builtin_app_rx_callback)
     return app->cb_fns.builtin_app_rx_callback (s);
 
-  /* If no need for event, return */
-  if (!svm_fifo_set_event (s->server_rx_fifo))
+  if (svm_fifo_has_event (s->server_rx_fifo)
+      || svm_fifo_is_empty (s->server_rx_fifo))
     return 0;
 
   mq = app->event_queue;
@@ -876,7 +875,10 @@ app_send_io_evt_rx (application_t * app, stream_session_t * s, u8 lock)
   evt->fifo = s->server_rx_fifo;
   evt->event_type = FIFO_EVENT_APP_RX;
 
-  return app_enqueue_evt (mq, &msg, lock);
+  if (app_enqueue_evt (mq, &msg, lock))
+    return -1;
+  svm_fifo_set_event (s->server_rx_fifo);
+  return 0;
 }
 
 static inline int
@@ -1081,6 +1083,7 @@ application_local_session_connect (u32 table_index, application_t * client,
 {
   u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10;
   segment_manager_properties_t *props, *cprops;
+  u32 round_rx_fifo_sz, round_tx_fifo_sz;
   int rv, has_transport, seg_index;
   svm_fifo_segment_private_t *seg;
   segment_manager_t *sm;
@@ -1093,7 +1096,9 @@ application_local_session_connect (u32 table_index, application_t * client,
   cprops = application_segment_manager_properties (client);
   evt_q_elts = props->evt_q_size + cprops->evt_q_size;
   evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts);
-  seg_size = props->rx_fifo_size + props->tx_fifo_size + evt_q_sz + margin;
+  round_rx_fifo_sz = 1 << max_log2 (props->rx_fifo_size);
+  round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size);
+  seg_size = round_rx_fifo_sz + round_tx_fifo_sz + evt_q_sz + margin;
 
   has_transport = session_has_transport ((stream_session_t *) ll);
   if (!has_transport)
index 9d82a18..c8fa37f 100644 (file)
@@ -598,7 +598,7 @@ vnet_bind (vnet_bind_args_t * a)
 {
   int rv;
   if ((rv = vnet_bind_i (a->app_index, &a->sep, &a->handle)))
-    return clib_error_return_code (0, rv, 0, "bind failed");
+    return clib_error_return_code (0, rv, 0, "bind failed: %d", rv);
   return 0;
 }
 
@@ -607,7 +607,7 @@ vnet_unbind (vnet_unbind_args_t * a)
 {
   int rv;
   if ((rv = vnet_unbind_i (a->app_index, a->handle)))
-    return clib_error_return_code (0, rv, 0, "unbind failed");
+    return clib_error_return_code (0, rv, 0, "unbind failed: %d", rv);
   return 0;
 }
 
@@ -618,7 +618,7 @@ vnet_connect (vnet_connect_args_t * a)
   int rv;
 
   if ((rv = application_connect (a->app_index, a->api_context, sep)))
-    return clib_error_return_code (0, rv, 0, "connect failed");
+    return clib_error_return_code (0, rv, 0, "connect failed: %d", rv);
   return 0;
 }
 
index 0aabd38..ffe2a64 100644 (file)
@@ -207,17 +207,18 @@ typedef struct session_accepted_msg_
   u64 server_tx_fifo;
   u64 vpp_event_queue_address;
   u64 server_event_queue_address;
+  u64 client_event_queue_address;
   u16 port;
   u8 is_ip4;
   u8 ip[16];
-} session_accepted_msg_t;
+} __clib_packed session_accepted_msg_t;
 
 typedef struct session_accepted_reply_msg_
 {
   u32 context;
   i32 retval;
   u64 handle;
-} session_accepted_reply_msg_t;
+} __clib_packed session_accepted_reply_msg_t;
 
 /* Make sure this is not too large, otherwise it won't fit when dequeued in
  * the session queue node */
@@ -232,34 +233,35 @@ typedef struct session_connected_msg_
   u64 server_tx_fifo;
   u64 vpp_event_queue_address;
   u64 client_event_queue_address;
+  u64 server_event_queue_address;
   u32 segment_size;
   u8 segment_name_length;
   u8 segment_name[64];
   u8 lcl_ip[16];
   u8 is_ip4;
   u16 lcl_port;
-} session_connected_msg_t;
+} __clib_packed session_connected_msg_t;
 
 typedef struct session_disconnected_msg_
 {
   u32 client_index;
   u32 context;
   u64 handle;
-} session_disconnected_msg_t;
+} __clib_packed session_disconnected_msg_t;
 
 typedef struct session_disconnected_reply_msg_
 {
   u32 context;
   i32 retval;
   u64 handle;
-} session_disconnected_reply_msg_t;
+} __clib_packed session_disconnected_reply_msg_t;
 
 typedef struct session_reset_msg_
 {
   u32 client_index;
   u32 context;
   u64 handle;
-} session_reset_msg_t;
+} __clib_packed session_reset_msg_t;
 
 typedef struct session_reset_reply_msg_
 {
@@ -267,13 +269,13 @@ typedef struct session_reset_reply_msg_
   u32 context;
   i32 retval;
   u64 handle;
-} session_reset_reply_msg_t;
+} __clib_packed session_reset_reply_msg_t;
 
 typedef struct app_session_event_
 {
   svm_msg_q_msg_t msg;
   session_event_t *evt;
-} app_session_evt_t;
+} __clib_packed app_session_evt_t;
 
 static inline void
 app_alloc_ctrl_evt_to_vpp (svm_msg_q_t * mq, app_session_evt_t * app_evt,
@@ -337,12 +339,9 @@ app_send_io_evt_to_vpp (svm_msg_q_t * mq, svm_fifo_t * f, u8 evt_type,
   else
     {
       svm_msg_q_lock (mq);
+      while (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))
+       svm_msg_q_wait (mq);
       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
-      while (svm_msg_q_msg_is_invalid (&msg))
-       {
-         svm_msg_q_wait (mq);
-         msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
-       }
       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
       evt->fifo = f;
       evt->event_type = evt_type;
index 897cb1a..56f885b 100644 (file)
@@ -753,9 +753,10 @@ stream_session_disconnect_notify (transport_connection_t * tc)
   stream_session_t *s;
 
   s = session_get (tc->s_index, tc->thread_index);
-  server = application_get (s->app_index);
-  server->cb_fns.session_disconnect_callback (s);
   s->session_state = SESSION_STATE_CLOSING;
+  server = application_get_if_valid (s->app_index);
+  if (server)
+    server->cb_fns.session_disconnect_callback (s);
 }
 
 /**
index 1917616..99546cb 100644 (file)
@@ -37,6 +37,8 @@ typedef enum
   FIFO_EVENT_DISCONNECT,
   FIFO_EVENT_BUILTIN_RX,
   FIFO_EVENT_RPC,
+  SESSION_IO_EVT_CT_TX,
+  SESSION_IO_EVT_CT_RX,
   SESSION_CTRL_EVT_ACCEPTED,
   SESSION_CTRL_EVT_ACCEPTED_REPLY,
   SESSION_CTRL_EVT_CONNECTED,
index 724aff1..8585b57 100755 (executable)
@@ -435,7 +435,9 @@ mq_send_session_accepted_cb (stream_session_t * s)
        }
       mp->handle = application_local_session_handle (ls);
       mp->port = ls->port;
-      mp->vpp_event_queue_address = ls->client_evt_q;
+      vpp_queue = session_manager_get_vpp_event_queue (0);
+      mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
+      mp->client_event_queue_address = ls->client_evt_q;
       mp->server_event_queue_address = ls->server_evt_q;
     }
   svm_msg_q_add (app_mq, msg, SVM_Q_WAIT);
@@ -541,8 +543,10 @@ mq_send_session_connected_cb (u32 app_index, u32 api_context,
       local_session_t *ls = (local_session_t *) s;
       mp->handle = application_local_session_handle (ls);
       mp->lcl_port = ls->port;
-      mp->vpp_event_queue_address = ls->server_evt_q;
+      vpp_mq = session_manager_get_vpp_event_queue (0);
+      mp->vpp_event_queue_address = pointer_to_uword (vpp_mq);
       mp->client_event_queue_address = ls->client_evt_q;
+      mp->server_event_queue_address = ls->server_evt_q;
       mp->server_rx_fifo = pointer_to_uword (s->server_tx_fifo);
       mp->server_tx_fifo = pointer_to_uword (s->server_rx_fifo);
     }
index 06d98ae..3588bbc 100755 (executable)
@@ -26,14 +26,16 @@ format_stream_session_fifos (u8 * s, va_list * args)
   if (!ss->server_rx_fifo || !ss->server_tx_fifo)
     return s;
 
-  s = format (s, " Rx fifo: %U", format_svm_fifo, ss->server_rx_fifo, 1);
+  s = format (s, " Rx fifo: %U", format_svm_fifo, ss->server_rx_fifo,
+             verbose);
   if (verbose > 2 && ss->server_rx_fifo->has_event)
     {
       found = session_node_lookup_fifo_event (ss->server_rx_fifo, e);
       s = format (s, " session node event: %s\n",
                  found ? "found" : "not found");
     }
-  s = format (s, " Tx fifo: %U", format_svm_fifo, ss->server_tx_fifo, 1);
+  s = format (s, " Tx fifo: %U", format_svm_fifo, ss->server_tx_fifo,
+             verbose);
   if (verbose > 2 && ss->server_tx_fifo->has_event)
     {
       found = session_node_lookup_fifo_event (ss->server_tx_fifo, e);
index 3a31352..37fccd9 100644 (file)
@@ -1299,7 +1299,7 @@ u8 *
 format_ip4_session_lookup_kvp (u8 * s, va_list * args)
 {
   clib_bihash_kv_16_8_t *kvp = va_arg (*args, clib_bihash_kv_16_8_t *);
-  u32 is_local = va_arg (*args, u32);
+  u32 is_local = va_arg (*args, u32), app_index, session_index;
   u8 *app_name, *str = 0;
   stream_session_t *session;
   v4_connection_key_t *key = (v4_connection_key_t *) kvp->key;
@@ -1316,7 +1316,8 @@ format_ip4_session_lookup_kvp (u8 * s, va_list * args)
     }
   else
     {
-      app_name = application_name_from_index (kvp->value);
+      local_session_parse_handle (kvp->value, &app_index, &session_index);
+      app_name = application_name_from_index (app_index);
       str = format (0, "[%U] %U:%d", format_transport_proto_short, key->proto,
                    format_ip4_address, &key->src,
                    clib_net_to_host_u16 (key->src_port));
index baabb05..30cd5ae 100644 (file)
@@ -795,7 +795,7 @@ skip_dequeue:
     {
       stream_session_t *s;     /* $$$ prefetch 1 ahead maybe */
       session_event_t *e;
-      u32 to_dequeue;
+      u8 is_full;
 
       e = &fifo_events[i];
       switch (e->event_type)
@@ -814,7 +814,7 @@ skip_dequeue:
              clib_warning ("It's dead, Jim!");
              continue;
            }
-         to_dequeue = svm_fifo_max_dequeue (s->server_tx_fifo);
+         is_full = svm_fifo_is_full (s->server_tx_fifo);
 
          /* Spray packets in per session type frames, since they go to
           * different nodes */
@@ -823,7 +823,7 @@ skip_dequeue:
          if (PREDICT_TRUE (rv == SESSION_TX_OK))
            {
              /* Notify app there's tx space if not polling */
-             if (PREDICT_FALSE (to_dequeue == s->server_tx_fifo->nitems
+             if (PREDICT_FALSE (is_full
                                 && !svm_fifo_has_event (s->server_tx_fifo)))
                session_dequeue_notify (s);
            }
index 79eb75f..a34bc74 100644 (file)
@@ -43,7 +43,7 @@ class VCLTestCase(VppTestCase):
         self.server_args = [self.server_port]
         self.server_ipv6_addr = "::1"
         self.server_ipv6_args = ["-6", self.server_port]
-        self.timeout = 3
+        self.timeout = 10
         self.echo_phrase = "Hello, world! Jenny is a friend of mine."
 
         super(VCLTestCase, self).__init__(methodName)