vcl: improve handling of close on rw
[vpp.git] / src / vcl / vppcom.c
index 2c2cb2c..3c3e15d 100644 (file)
@@ -30,7 +30,7 @@ vcl_wait_for_segment (u64 segment_handle)
   f64 timeout;
 
   if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
-    return 1;
+    return 0;
 
   timeout = clib_time_now (&wrk->clib_time) + wait_for_seconds;
   while (clib_time_now (&wrk->clib_time) < timeout)
@@ -94,6 +94,14 @@ vppcom_session_state_str (session_state_t state)
       st = "STATE_FAILED";
       break;
 
+    case STATE_UPDATED:
+      st = "STATE_UPDATED";
+      break;
+
+    case STATE_LISTEN_NO_MQ:
+      st = "STATE_LISTEN_NO_MQ";
+      break;
+
     default:
       st = "UNKNOWN_STATE";
       break;
@@ -505,6 +513,10 @@ vcl_session_bound_handler (vcl_worker_t * wrk, session_bound_msg_t * mp)
   vcl_session_table_add_listener (wrk, mp->handle, sid);
   session->session_state = STATE_LISTEN;
 
+  session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
+  vec_validate (wrk->vpp_event_queues, 0);
+  wrk->vpp_event_queues[0] = session->vpp_evt_q;
+
   if (session->is_dgram)
     {
       svm_fifo_t *rx_fifo, *tx_fifo;
@@ -607,22 +619,18 @@ vcl_session_worker_update_reply_handler (vcl_worker_t * wrk, void *data)
                    s->session_index);
       return;
     }
-  s->rx_fifo = uword_to_pointer (msg->rx_fifo, svm_fifo_t *);
-  s->tx_fifo = uword_to_pointer (msg->tx_fifo, svm_fifo_t *);
 
-  s->rx_fifo->client_session_index = s->session_index;
-  s->tx_fifo->client_session_index = s->session_index;
-  s->rx_fifo->client_thread_index = wrk->wrk_index;
-  s->tx_fifo->client_thread_index = wrk->wrk_index;
-  s->session_state = STATE_UPDATED;
-
-  if (s->shared_index != VCL_INVALID_SESSION_INDEX)
+  if (s->rx_fifo)
     {
-      vcl_shared_session_t *ss;
-      ss = vcl_shared_session_get (s->shared_index);
-      if (vec_len (ss->workers) > 1)
-       VDBG (0, "workers need to be updated");
+      s->rx_fifo = uword_to_pointer (msg->rx_fifo, svm_fifo_t *);
+      s->tx_fifo = uword_to_pointer (msg->tx_fifo, svm_fifo_t *);
+      s->rx_fifo->client_session_index = s->session_index;
+      s->tx_fifo->client_session_index = s->session_index;
+      s->rx_fifo->client_thread_index = wrk->wrk_index;
+      s->tx_fifo->client_thread_index = wrk->wrk_index;
     }
+  s->session_state = STATE_UPDATED;
+
   VDBG (0, "session %u[0x%llx] moved to worker %u", s->session_index,
        s->vpp_handle, wrk->wrk_index);
 }
@@ -653,7 +661,6 @@ vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
       session = vcl_session_disconnected_handler (wrk, disconnected_msg);
       if (!session)
        break;
-      session->session_state = STATE_DISCONNECT;
       VDBG (0, "disconnected session %u [0x%llx]", session->session_index,
            session->vpp_handle);
       break;
@@ -741,7 +748,7 @@ vcl_handle_pending_wrk_updates (vcl_worker_t * wrk)
   vec_reset_length (wrk->pending_session_wrk_updates);
 }
 
-static void
+void
 vcl_flush_mq_events (void)
 {
   vcl_worker_t *wrk = vcl_worker_get_current ();
@@ -818,11 +825,11 @@ vppcom_session_unbind (u32 session_handle)
   session->vpp_handle = ~0;
   session->session_state = STATE_DISCONNECT;
 
-  VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending unbind msg! new state"
-       " 0x%x (%s)", getpid (), vpp_handle, session_handle, STATE_DISCONNECT,
+  VDBG (1, "vpp handle 0x%llx, sid %u: sending unbind msg! new state"
+       " 0x%x (%s)", vpp_handle, session_handle, STATE_DISCONNECT,
        vppcom_session_state_str (STATE_DISCONNECT));
   vcl_evt (VCL_EVT_UNBIND, session);
-  vppcom_send_unbind_sock (vpp_handle);
+  vppcom_send_unbind_sock (wrk, vpp_handle);
 
   return VPPCOM_OK;
 }
@@ -872,164 +879,6 @@ vppcom_session_disconnect (u32 session_handle)
   return VPPCOM_OK;
 }
 
-static void
-vcl_cleanup_bapi (void)
-{
-  socket_client_main_t *scm = &socket_client_main;
-  api_main_t *am = &api_main;
-
-  am->my_client_index = ~0;
-  am->my_registration = 0;
-  am->vl_input_queue = 0;
-  am->msg_index_by_name_and_crc = 0;
-  scm->socket_fd = 0;
-
-  vl_client_api_unmap ();
-}
-
-static void
-vcl_cleanup_forked_child (vcl_worker_t * wrk, vcl_worker_t * child_wrk)
-{
-  vcl_worker_t *sub_child;
-  int tries = 0;
-
-  if (child_wrk->forked_child != ~0)
-    {
-      sub_child = vcl_worker_get_if_valid (child_wrk->forked_child);
-      if (sub_child)
-       {
-         /* Wait a bit, maybe the process is going away */
-         while (kill (sub_child->current_pid, 0) >= 0 && tries++ < 50)
-           usleep (1e3);
-         if (kill (sub_child->current_pid, 0) < 0)
-           vcl_cleanup_forked_child (child_wrk, sub_child);
-       }
-    }
-  vcl_worker_cleanup (child_wrk, 1 /* notify vpp */ );
-  VDBG (0, "Cleaned up wrk %u", child_wrk->wrk_index);
-  wrk->forked_child = ~0;
-}
-
-static struct sigaction old_sa;
-
-static void
-vcl_intercept_sigchld_handler (int signum, siginfo_t * si, void *uc)
-{
-  vcl_worker_t *wrk, *child_wrk;
-
-  if (vcl_get_worker_index () == ~0)
-    return;
-
-  if (sigaction (SIGCHLD, &old_sa, 0))
-    {
-      VERR ("couldn't restore sigchld");
-      exit (-1);
-    }
-
-  wrk = vcl_worker_get_current ();
-  if (wrk->forked_child == ~0)
-    return;
-
-  child_wrk = vcl_worker_get_if_valid (wrk->forked_child);
-  if (!child_wrk)
-    goto done;
-
-  if (si && si->si_pid != child_wrk->current_pid)
-    {
-      VDBG (0, "unexpected child pid %u", si->si_pid);
-      goto done;
-    }
-  vcl_cleanup_forked_child (wrk, child_wrk);
-
-done:
-  if (old_sa.sa_flags & SA_SIGINFO)
-    {
-      void (*fn) (int, siginfo_t *, void *) = old_sa.sa_sigaction;
-      fn (signum, si, uc);
-    }
-  else
-    {
-      void (*fn) (int) = old_sa.sa_handler;
-      if (fn)
-       fn (signum);
-    }
-}
-
-static void
-vcl_incercept_sigchld ()
-{
-  struct sigaction sa;
-  clib_memset (&sa, 0, sizeof (sa));
-  sa.sa_sigaction = vcl_intercept_sigchld_handler;
-  sa.sa_flags = SA_SIGINFO;
-  if (sigaction (SIGCHLD, &sa, &old_sa))
-    {
-      VERR ("couldn't intercept sigchld");
-      exit (-1);
-    }
-}
-
-static void
-vcl_app_pre_fork (void)
-{
-  vcl_incercept_sigchld ();
-  vcl_flush_mq_events ();
-}
-
-static void
-vcl_app_fork_child_handler (void)
-{
-  vcl_worker_t *parent_wrk, *wrk;
-  int rv, parent_wrk_index;
-  u8 *child_name;
-
-  parent_wrk_index = vcl_get_worker_index ();
-  VDBG (0, "initializing forked child with parent wrk %u", parent_wrk_index);
-
-  /*
-   * Allocate worker
-   */
-  vcl_set_worker_index (~0);
-  if (!vcl_worker_alloc_and_init ())
-    VERR ("couldn't allocate new worker");
-
-  /*
-   * Attach to binary api
-   */
-  child_name = format (0, "%v-child-%u%c", vcm->app_name, getpid (), 0);
-  vcl_cleanup_bapi ();
-  vppcom_api_hookup ();
-  vcm->app_state = STATE_APP_START;
-  rv = vppcom_connect_to_vpp ((char *) child_name);
-  vec_free (child_name);
-  if (rv)
-    {
-      VERR ("couldn't connect to VPP!");
-      return;
-    }
-
-  /*
-   * Register worker with vpp and share sessions
-   */
-  vcl_worker_register_with_vpp ();
-  parent_wrk = vcl_worker_get (parent_wrk_index);
-  wrk = vcl_worker_get_current ();
-  wrk->vpp_event_queues = vec_dup (parent_wrk->vpp_event_queues);
-  vcl_worker_share_sessions (parent_wrk);
-  parent_wrk->forked_child = vcl_get_worker_index ();
-
-  VDBG (0, "forked child main worker initialized");
-  vcm->forking = 0;
-}
-
-static void
-vcl_app_fork_parent_handler (void)
-{
-  vcm->forking = 1;
-  while (vcm->forking)
-    ;
-}
-
 /**
  * Handle app exit
  *
@@ -1079,8 +928,6 @@ vppcom_app_create (char *app_name)
   pool_alloc (vcm->workers, vcl_cfg->max_workers);
   clib_spinlock_init (&vcm->workers_lock);
   clib_rwlock_init (&vcm->segment_table_lock);
-  pthread_atfork (vcl_app_pre_fork, vcl_app_fork_parent_handler,
-                 vcl_app_fork_child_handler);
   atexit (vppcom_app_exit);
 
   /* Allocate default worker */
@@ -1177,22 +1024,14 @@ vppcom_session_create (u8 proto, u8 is_nonblocking)
 }
 
 int
-vppcom_session_close (uint32_t session_handle)
+vcl_session_cleanup (vcl_worker_t * wrk, vcl_session_t * session,
+                    vcl_session_handle_t sh, u8 do_disconnect)
 {
-  vcl_worker_t *wrk = vcl_worker_get_current ();
-  u8 is_vep, do_disconnect = 1;
-  vcl_session_t *session = 0;
   session_state_t state;
   u32 next_sh, vep_sh;
   int rv = VPPCOM_OK;
   u64 vpp_handle;
-
-  session = vcl_session_get_w_handle (wrk, session_handle);
-  if (!session)
-    return VPPCOM_EBADFD;
-
-  if (session->shared_index != ~0)
-    do_disconnect = vcl_worker_unshare_session (wrk, session);
+  u8 is_vep;
 
   is_vep = session->is_vep;
   next_sh = session->vep.next_sh;
@@ -1200,14 +1039,13 @@ vppcom_session_close (uint32_t session_handle)
   state = session->session_state;
   vpp_handle = session->vpp_handle;
 
-  VDBG (1, "closing session handle %u vpp handle %u", session_handle,
-       vpp_handle);
+  VDBG (1, "session %u [0x%llx] closing", session->session_index, vpp_handle);
 
   if (is_vep)
     {
       while (next_sh != ~0)
        {
-         rv = vppcom_epoll_ctl (session_handle, EPOLL_CTL_DEL, next_sh, 0);
+         rv = vppcom_epoll_ctl (sh, EPOLL_CTL_DEL, next_sh, 0);
          if (PREDICT_FALSE (rv < 0))
            VDBG (0, "vpp handle 0x%llx, sid %u: EPOLL_CTL_DEL vep_idx %u"
                  " failed! rv %d (%s)", vpp_handle, next_sh, vep_sh, rv,
@@ -1220,36 +1058,35 @@ vppcom_session_close (uint32_t session_handle)
     {
       if (session->is_vep_session)
        {
-         rv = vppcom_epoll_ctl (vep_sh, EPOLL_CTL_DEL, session_handle, 0);
+         rv = vppcom_epoll_ctl (vep_sh, EPOLL_CTL_DEL, sh, 0);
          if (rv < 0)
-           VDBG (0, "vpp handle 0x%llx, sid %u: EPOLL_CTL_DEL vep_idx %u "
-                 "failed! rv %d (%s)", vpp_handle, session_handle, vep_sh,
-                 rv, vppcom_retval_str (rv));
+           VDBG (0, "session %u [0x%llx]: EPOLL_CTL_DEL vep_idx %u "
+                 "failed! rv %d (%s)", session->session_index, vpp_handle,
+                 vep_sh, rv, vppcom_retval_str (rv));
        }
 
       if (!do_disconnect)
        {
-         VDBG (0, "session handle %u [0x%llx] disconnect skipped",
-               session_handle, vpp_handle);
+         VDBG (1, "session %u [0x%llx] disconnect skipped",
+               session->session_index, vpp_handle);
          goto cleanup;
        }
 
       if (state & STATE_LISTEN)
        {
-         rv = vppcom_session_unbind (session_handle);
+         rv = vppcom_session_unbind (sh);
          if (PREDICT_FALSE (rv < 0))
-           VDBG (0, "vpp handle 0x%llx, sid %u: listener unbind failed! "
-                 "rv %d (%s)", vpp_handle, session_handle, rv,
+           VDBG (0, "session %u [0x%llx]: listener unbind failed! "
+                 "rv %d (%s)", session->session_index, vpp_handle, rv,
                  vppcom_retval_str (rv));
        }
       else if (state & STATE_OPEN)
        {
-         rv = vppcom_session_disconnect (session_handle);
+         rv = vppcom_session_disconnect (sh);
          if (PREDICT_FALSE (rv < 0))
-           clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
-                         "session disconnect failed! rv %d (%s)",
-                         getpid (), vpp_handle, session_handle,
-                         rv, vppcom_retval_str (rv));
+           VDBG (0, "ERROR: session %u [0x%llx]: disconnect failed!"
+                 " rv %d (%s)", session->session_index, vpp_handle,
+                 rv, vppcom_retval_str (rv));
        }
       else if (state == STATE_DISCONNECT)
        {
@@ -1259,8 +1096,6 @@ vppcom_session_close (uint32_t session_handle)
        }
     }
 
-cleanup:
-
   if (vcl_session_is_ct (session))
     {
       vcl_cut_through_registration_t *ctr;
@@ -1278,16 +1113,29 @@ cleanup:
       vcl_ct_registration_unlock (wrk);
     }
 
+  VDBG (0, "session %u [0x%llx] removed", session->session_index, vpp_handle);
+
+cleanup:
   vcl_session_table_del_vpp_handle (wrk, vpp_handle);
   vcl_session_free (wrk, session);
-
-  VDBG (0, "session handle %u [0x%llx] removed", session_handle, vpp_handle);
-
   vcl_evt (VCL_EVT_CLOSE, session, rv);
 
   return rv;
 }
 
+int
+vppcom_session_close (uint32_t session_handle)
+{
+  vcl_worker_t *wrk = vcl_worker_get_current ();
+  vcl_session_t *session;
+
+  session = vcl_session_get_w_handle (wrk, session_handle);
+  if (!session)
+    return VPPCOM_EBADFD;
+  return vcl_session_cleanup (wrk, session, session_handle,
+                             1 /* do_disconnect */ );
+}
+
 int
 vppcom_session_bind (uint32_t session_handle, vppcom_endpt_t * ep)
 {
@@ -1665,20 +1513,12 @@ vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
     return (e->event_type == SESSION_IO_EVT_CT_TX);
 }
 
-static inline u8
-vcl_session_is_readable (vcl_session_t * s)
-{
-  return ((s->session_state & STATE_OPEN)
-         || (s->session_state == STATE_LISTEN
-             && s->session_type == VPPCOM_PROTO_UDP));
-}
-
 static inline int
 vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
                              u8 peek)
 {
   vcl_worker_t *wrk = vcl_worker_get_current ();
-  int n_read = 0, rv, is_nonblocking;
+  int n_read = 0, is_nonblocking;
   vcl_session_t *s = 0;
   svm_fifo_t *rx_fifo;
   svm_msg_q_msg_t msg;
@@ -1693,15 +1533,12 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
   if (PREDICT_FALSE (!s || s->is_vep))
     return VPPCOM_EBADFD;
 
-  if (PREDICT_FALSE (!vcl_session_is_readable (s)))
+  if (PREDICT_FALSE (!vcl_session_is_open (s)))
     {
-      session_state_t state = s->session_state;
-      rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
-
-      VDBG (0, "session handle %u[0x%llx] is not open! state 0x%x (%s),"
-           " returning %d (%s)", session_handle, s->vpp_handle, state,
-           vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
-      return rv;
+      VDBG (0, "session handle %u[0x%llx] is not open! state 0x%x (%s)",
+           s->session_index, s->vpp_handle, s->session_state,
+           vppcom_session_state_str (s->session_state));
+      return vcl_session_closed_error (s);
     }
 
   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
@@ -1719,6 +1556,9 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
        }
       while (svm_fifo_is_empty (rx_fifo))
        {
+         if (vcl_session_is_closing (s))
+           return vcl_session_closing_error (s);
+
          svm_fifo_unset_event (rx_fifo);
          svm_msg_q_lock (mq);
          if (svm_msg_q_is_empty (mq))
@@ -1730,9 +1570,6 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
          if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
            vcl_handle_mq_event (wrk, e);
          svm_msg_q_free_msg (mq, &msg);
-
-         if (PREDICT_FALSE (s->session_state == STATE_DISCONNECT))
-           return VPPCOM_ECONNRESET;
        }
     }
 
@@ -1744,9 +1581,9 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
   if (svm_fifo_is_empty (rx_fifo))
     svm_fifo_unset_event (rx_fifo);
 
-  if (is_ct && svm_fifo_want_tx_evt (rx_fifo))
+  if (is_ct && svm_fifo_needs_tx_ntf (rx_fifo, n_read))
     {
-      svm_fifo_set_want_tx_evt (s->rx_fifo, 0);
+      svm_fifo_clear_tx_ntf (s->rx_fifo);
       app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo, SESSION_IO_EVT_CT_RX,
                              SVM_Q_WAIT);
     }
@@ -1774,7 +1611,7 @@ vppcom_session_read_segments (uint32_t session_handle,
                              vppcom_data_segments_t ds)
 {
   vcl_worker_t *wrk = vcl_worker_get_current ();
-  int n_read = 0, rv, is_nonblocking;
+  int n_read = 0, is_nonblocking;
   vcl_session_t *s = 0;
   svm_fifo_t *rx_fifo;
   svm_msg_q_msg_t msg;
@@ -1786,12 +1623,8 @@ vppcom_session_read_segments (uint32_t session_handle,
   if (PREDICT_FALSE (!s || s->is_vep))
     return VPPCOM_EBADFD;
 
-  if (PREDICT_FALSE (!vcl_session_is_readable (s)))
-    {
-      session_state_t state = s->session_state;
-      rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
-      return rv;
-    }
+  if (PREDICT_FALSE (!vcl_session_is_open (s)))
+    return vcl_session_closed_error (s);
 
   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
   is_ct = vcl_session_is_ct (s);
@@ -1808,6 +1641,9 @@ vppcom_session_read_segments (uint32_t session_handle,
        }
       while (svm_fifo_is_empty (rx_fifo))
        {
+         if (vcl_session_is_closing (s))
+           return vcl_session_closing_error (s);
+
          svm_fifo_unset_event (rx_fifo);
          svm_msg_q_lock (mq);
          if (svm_msg_q_is_empty (mq))
@@ -1819,9 +1655,6 @@ vppcom_session_read_segments (uint32_t session_handle,
          if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
            vcl_handle_mq_event (wrk, e);
          svm_msg_q_free_msg (mq, &msg);
-
-         if (PREDICT_FALSE (s->session_state == STATE_DISCONNECT))
-           return VPPCOM_ECONNRESET;
        }
     }
 
@@ -1853,37 +1686,6 @@ vppcom_session_free_segments (uint32_t session_handle,
   svm_fifo_segments_free (s->rx_fifo, (svm_fifo_segment_t *) ds);
 }
 
-static inline int
-vppcom_session_read_ready (vcl_session_t * session)
-{
-  /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
-  if (PREDICT_FALSE (session->is_vep))
-    {
-      clib_warning ("VCL<%d>: ERROR: sid %u: cannot read from an "
-                   "epoll session!", getpid (), session->session_index);
-      return VPPCOM_EBADFD;
-    }
-
-  if (PREDICT_FALSE (!(session->session_state & (STATE_OPEN | STATE_LISTEN))))
-    {
-      session_state_t state = session->session_state;
-      int rv;
-
-      rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
-
-      VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open!"
-           " state 0x%x (%s), returning %d (%s)", getpid (),
-           session->vpp_handle, session->session_index, state,
-           vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
-      return rv;
-    }
-
-  if (session->session_state & STATE_LISTEN)
-    return clib_fifo_elts (session->accept_evts_fifo);
-
-  return svm_fifo_max_dequeue (session->rx_fifo);
-}
-
 int
 vppcom_data_segment_copy (void *buf, vppcom_data_segments_t ds, u32 max_bytes)
 {
@@ -1912,7 +1714,7 @@ vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n,
                             u8 is_flush)
 {
   vcl_worker_t *wrk = vcl_worker_get_current ();
-  int rv, n_write, is_nonblocking;
+  int n_write, is_nonblocking;
   vcl_session_t *s = 0;
   svm_fifo_t *tx_fifo = 0;
   session_evt_type_t et;
@@ -1930,21 +1732,17 @@ vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n,
 
   if (PREDICT_FALSE (s->is_vep))
     {
-      clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
-                   "cannot write to an epoll session!",
-                   getpid (), s->vpp_handle, session_handle);
-
+      VDBG (0, "ERROR: session %u [0x%llx]: cannot write to an epoll"
+           " session!", s->session_index, s->vpp_handle);
       return VPPCOM_EBADFD;
     }
 
-  if (PREDICT_FALSE (!(s->session_state & STATE_OPEN)))
+  if (PREDICT_FALSE (!vcl_session_is_open (s)))
     {
-      session_state_t state = s->session_state;
-      rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
-      VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open! "
-           "state 0x%x (%s)", getpid (), s->vpp_handle, session_handle,
-           state, vppcom_session_state_str (state));
-      return rv;
+      VDBG (1, "session %u [0x%llx]: is not open! state 0x%x (%s)",
+           s->session_index, s->vpp_handle, s->session_state,
+           vppcom_session_state_str (s->session_state));
+      return vcl_session_closed_error (s);;
     }
 
   tx_fifo = s->tx_fifo;
@@ -1959,7 +1757,9 @@ vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n,
        }
       while (svm_fifo_is_full (tx_fifo))
        {
-         svm_fifo_set_want_tx_evt (tx_fifo, 1);
+         svm_fifo_add_want_tx_ntf (tx_fifo, SVM_FIFO_WANT_TX_NOTIF);
+         if (vcl_session_is_closing (s))
+           return vcl_session_closing_error (s);
          svm_msg_q_lock (mq);
          if (svm_msg_q_is_empty (mq))
            svm_msg_q_wait (mq);
@@ -1971,9 +1771,6 @@ vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n,
          if (!vcl_is_tx_evt_for_session (e, s->session_index, is_ct))
            vcl_handle_mq_event (wrk, e);
          svm_msg_q_free_msg (mq, &msg);
-
-         if (PREDICT_FALSE (!(s->session_state & STATE_OPEN)))
-           return VPPCOM_ECONNRESET;
        }
     }
 
@@ -1991,8 +1788,8 @@ vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n,
 
   ASSERT (n_write > 0);
 
-  VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: wrote %d bytes", getpid (),
-       s->vpp_handle, session_handle, n_write);
+  VDBG (2, "session %u [0x%llx]: wrote %d bytes", s->session_index,
+       s->vpp_handle, n_write);
 
   return n_write;
 }
@@ -2037,48 +1834,6 @@ vcl_ct_session_get_from_fifo (vcl_worker_t * wrk, svm_fifo_t * f, u8 type)
   return 0;
 }
 
-static inline int
-vppcom_session_write_ready (vcl_session_t * session)
-{
-  /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
-  if (PREDICT_FALSE (session->is_vep))
-    {
-      clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
-                   "cannot write to an epoll session!",
-                   getpid (), session->vpp_handle, session->session_index);
-      return VPPCOM_EBADFD;
-    }
-
-  if (PREDICT_FALSE (session->session_state & STATE_LISTEN))
-    {
-      clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
-                   "cannot write to a listen session!",
-                   getpid (), session->vpp_handle, session->session_index);
-      return VPPCOM_EBADFD;
-    }
-
-  if (PREDICT_FALSE (!(session->session_state & STATE_OPEN)))
-    {
-      session_state_t state = session->session_state;
-      int rv;
-
-      rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
-      clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
-                   "session is not open! state 0x%x (%s), "
-                   "returning %d (%s)", getpid (), session->vpp_handle,
-                   session->session_index,
-                   state, vppcom_session_state_str (state),
-                   rv, vppcom_retval_str (rv));
-      return rv;
-    }
-
-  VDBG (3, "VCL<%d>: vpp handle 0x%llx, sid %u: peek %s (%p), ready = %d",
-       getpid (), session->vpp_handle, session->session_index,
-       session->tx_fifo, svm_fifo_max_enqueue (session->tx_fifo));
-
-  return svm_fifo_max_enqueue (session->tx_fifo);
-}
-
 #define vcl_fifo_rx_evt_valid_or_break(_fifo)                  \
 if (PREDICT_FALSE (svm_fifo_is_empty (_fifo)))                 \
   {                                                            \
@@ -2248,15 +2003,15 @@ vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
 }
 
 static int
-vppcom_select_condvar (vcl_worker_t * wrk, unsigned long n_bits,
-                      unsigned long *read_map, unsigned long *write_map,
-                      unsigned long *except_map, double time_to_wait,
+vppcom_select_condvar (vcl_worker_t * wrk, int n_bits,
+                      vcl_si_set * read_map, vcl_si_set * write_map,
+                      vcl_si_set * except_map, double time_to_wait,
                       u32 * bits_set)
 {
   double total_wait = 0, wait_slice;
   vcl_cut_through_registration_t *cr;
 
-  time_to_wait = (time_to_wait == -1) ? 10e9 : time_to_wait;
+  time_to_wait = (time_to_wait == -1) ? 1e6 : time_to_wait;
   wait_slice = wrk->cut_through_registrations ? 10e-6 : time_to_wait;
   do
     {
@@ -2270,7 +2025,7 @@ vppcom_select_condvar (vcl_worker_t * wrk, unsigned long n_bits,
       vcl_ct_registration_unlock (wrk);
 
       vcl_select_handle_mq (wrk, wrk->app_event_queue, n_bits, read_map,
-                           write_map, except_map, time_to_wait, bits_set);
+                           write_map, except_map, wait_slice, bits_set);
       total_wait += wait_slice;
       if (*bits_set)
        return *bits_set;
@@ -2281,9 +2036,9 @@ vppcom_select_condvar (vcl_worker_t * wrk, unsigned long n_bits,
 }
 
 static int
-vppcom_select_eventfd (vcl_worker_t * wrk, unsigned long n_bits,
-                      unsigned long *read_map, unsigned long *write_map,
-                      unsigned long *except_map, double time_to_wait,
+vppcom_select_eventfd (vcl_worker_t * wrk, int n_bits,
+                      vcl_si_set * read_map, vcl_si_set * write_map,
+                      vcl_si_set * except_map, double time_to_wait,
                       u32 * bits_set)
 {
   vcl_mq_evt_conn_t *mqc;
@@ -2306,44 +2061,34 @@ vppcom_select_eventfd (vcl_worker_t * wrk, unsigned long n_bits,
 }
 
 int
-vppcom_select (unsigned long n_bits, unsigned long *read_map,
-              unsigned long *write_map, unsigned long *except_map,
-              double time_to_wait)
+vppcom_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map,
+              vcl_si_set * except_map, double time_to_wait)
 {
   u32 sid, minbits = clib_max (n_bits, BITS (uword)), bits_set = 0;
   vcl_worker_t *wrk = vcl_worker_get_current ();
   vcl_session_t *session = 0;
   int rv, i;
 
-  STATIC_ASSERT (sizeof (clib_bitmap_t) == sizeof (unsigned long),
-                "vppcom bitmap size mismatch");
-  STATIC_ASSERT (sizeof (clib_bitmap_t) == sizeof (fd_mask),
-                "vppcom bitmap size mismatch");
-  STATIC_ASSERT (sizeof (clib_bitmap_t) == sizeof (uword),
-                "vppcom bitmap size mismatch");
-
   if (n_bits && read_map)
     {
       clib_bitmap_validate (wrk->rd_bitmap, minbits);
       clib_memcpy_fast (wrk->rd_bitmap, read_map,
-                       vec_len (wrk->rd_bitmap) * sizeof (unsigned long));
-      memset (read_map, 0, vec_len (wrk->rd_bitmap) * sizeof (unsigned long));
+                       vec_len (wrk->rd_bitmap) * sizeof (vcl_si_set));
+      memset (read_map, 0, vec_len (wrk->rd_bitmap) * sizeof (vcl_si_set));
     }
   if (n_bits && write_map)
     {
       clib_bitmap_validate (wrk->wr_bitmap, minbits);
       clib_memcpy_fast (wrk->wr_bitmap, write_map,
-                       vec_len (wrk->wr_bitmap) * sizeof (unsigned long));
-      memset (write_map, 0,
-             vec_len (wrk->wr_bitmap) * sizeof (unsigned long));
+                       vec_len (wrk->wr_bitmap) * sizeof (vcl_si_set));
+      memset (write_map, 0, vec_len (wrk->wr_bitmap) * sizeof (vcl_si_set));
     }
   if (n_bits && except_map)
     {
       clib_bitmap_validate (wrk->ex_bitmap, minbits);
       clib_memcpy_fast (wrk->ex_bitmap, except_map,
-                       vec_len (wrk->ex_bitmap) * sizeof (unsigned long));
-      memset (except_map, 0,
-             vec_len (wrk->ex_bitmap) * sizeof (unsigned long));
+                       vec_len (wrk->ex_bitmap) * sizeof (vcl_si_set));
+      memset (except_map, 0, vec_len (wrk->ex_bitmap) * sizeof (vcl_si_set));
     }
 
   if (!n_bits)
@@ -2367,6 +2112,8 @@ vppcom_select (unsigned long n_bits, unsigned long *read_map,
         clib_bitmap_set_no_check ((uword*)write_map, sid, 1);
         bits_set++;
       }
+    else
+      svm_fifo_add_want_tx_ntf (session->tx_fifo, SVM_FIFO_WANT_TX_NOTIF);
   }));
 
 check_rd:
@@ -2381,7 +2128,7 @@ check_rd:
         continue;
       }
 
-    rv = vppcom_session_read_ready (session);
+    rv = vcl_session_read_ready (session);
     if (rv)
       {
         clib_bitmap_set_no_check ((uword*)read_map, sid, 1);
@@ -2585,6 +2332,10 @@ vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
       session->is_vep_session = 1;
       vep_session->vep.next_sh = session_handle;
 
+      if (session->tx_fifo)
+       svm_fifo_add_want_tx_ntf (session->tx_fifo,
+                                 SVM_FIFO_WANT_TX_NOTIF_IF_FULL);
+
       VDBG (1, "EPOLL_CTL_ADD: vep_sh %u, sh %u, events 0x%x, data 0x%llx!",
            vep_handle, session_handle, event->events, event->data.u64);
       vcl_evt (VCL_EVT_EPOLL_CTLADD, session, event->events, event->data.u64);
@@ -2670,6 +2421,10 @@ vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
       session->vep.prev_sh = ~0;
       session->vep.vep_sh = ~0;
       session->is_vep_session = 0;
+
+      if (session->tx_fifo)
+       svm_fifo_del_want_tx_ntf (session->tx_fifo, SVM_FIFO_NO_TX_NOTIF);
+
       VDBG (1, "EPOLL_CTL_DEL: vep_idx %u, sid %u!", vep_handle,
            session_handle);
       vcl_evt (VCL_EVT_EPOLL_CTLDEL, session, vep_sh);
@@ -2723,6 +2478,7 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
       add_event = 1;
       events[*num_ev].events |= EPOLLOUT;
       session_evt_data = session->vep.ev.data.u64;
+      svm_fifo_reset_tx_ntf (session->tx_fifo);
       break;
     case SESSION_IO_EVT_CT_TX:
       vcl_fifo_rx_evt_valid_or_break (e->fifo);
@@ -2749,6 +2505,7 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
       add_event = 1;
       events[*num_ev].events |= EPOLLOUT;
       session_evt_data = session->vep.ev.data.u64;
+      svm_fifo_reset_tx_ntf (session->tx_fifo);
       break;
     case SESSION_CTRL_EVT_ACCEPTED:
       session = vcl_session_accepted (wrk,
@@ -2772,31 +2529,34 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
       if (!(session = vcl_session_get (wrk, sid)))
        break;
       session_events = session->vep.ev.events;
-      if (EPOLLOUT & session_events)
-       {
-         add_event = 1;
-         events[*num_ev].events |= EPOLLOUT;
-         session_evt_data = session->vep.ev.data.u64;
-       }
+      if (!(EPOLLOUT & session_events))
+       break;
+      add_event = 1;
+      events[*num_ev].events |= EPOLLOUT;
+      session_evt_data = session->vep.ev.data.u64;
       break;
     case SESSION_CTRL_EVT_DISCONNECTED:
       disconnected_msg = (session_disconnected_msg_t *) e->data;
       session = vcl_session_disconnected_handler (wrk, disconnected_msg);
       if (!session)
        break;
+      session_events = session->vep.ev.events;
+      if (!((EPOLLHUP | EPOLLRDHUP) & session_events))
+       break;
       add_event = 1;
       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
       session_evt_data = session->vep.ev.data.u64;
-      session_events = session->vep.ev.events;
       break;
     case SESSION_CTRL_EVT_RESET:
       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
       if (!(session = vcl_session_get (wrk, sid)))
        break;
+      session_events = session->vep.ev.events;
+      if (!((EPOLLHUP | EPOLLRDHUP) & session_events))
+       break;
       add_event = 1;
       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
       session_evt_data = session->vep.ev.data.u64;
-      session_events = session->vep.ev.events;
       break;
     case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
       vcl_session_req_worker_update_handler (wrk, e->data);
@@ -2881,7 +2641,7 @@ vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events,
   double total_wait = 0, wait_slice;
   int rv;
 
-  wait_for_time = (wait_for_time == -1) ? (double) 10e9 : wait_for_time;
+  wait_for_time = (wait_for_time == -1) ? (double) 1e6 : wait_for_time;
   wait_slice = wrk->cut_through_registrations ? 10e-6 : wait_for_time;
 
   do
@@ -3001,12 +2761,12 @@ vppcom_session_attr (uint32_t session_handle, uint32_t op,
   switch (op)
     {
     case VPPCOM_ATTR_GET_NREAD:
-      rv = vppcom_session_read_ready (session);
+      rv = vcl_session_read_ready (session);
       VDBG (2, "VPPCOM_ATTR_GET_NREAD: sid %u, nread = %d", rv);
       break;
 
     case VPPCOM_ATTR_GET_NWRITE:
-      rv = vppcom_session_write_ready (session);
+      rv = vcl_session_write_ready (session);
       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_NWRITE: sid %u, nwrite = %d",
            getpid (), session_handle, rv);
       break;
@@ -3507,10 +3267,6 @@ vppcom_session_attr (uint32_t session_handle, uint32_t op,
        rv = VPPCOM_EINVAL;
       break;
 
-    case VPPCOM_ATTR_GET_REFCNT:
-      rv = vcl_session_get_refcnt (session);
-      break;
-
     case VPPCOM_ATTR_SET_SHUT:
       if (*flags == SHUT_RD || *flags == SHUT_RDWR)
        VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_SHUT_RD);
@@ -3649,7 +3405,7 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
 
          if (POLLIN & vp[i].events)
            {
-             rv = vppcom_session_read_ready (session);
+             rv = vcl_session_read_ready (session);
              if (rv > 0)
                {
                  vp[i].revents |= POLLIN;
@@ -3673,7 +3429,7 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
 
          if (POLLOUT & vp[i].events)
            {
-             rv = vppcom_session_write_ready (session);
+             rv = vcl_session_write_ready (session);
              if (rv > 0)
                {
                  vp[i].revents |= POLLOUT;
@@ -3738,12 +3494,6 @@ vppcom_session_worker (vcl_session_handle_t session_handle)
   return session_handle >> 24;
 }
 
-int
-vppcom_session_handle (uint32_t session_index)
-{
-  return (vcl_get_worker_index () << 24) | session_index;
-}
-
 int
 vppcom_worker_register (void)
 {
@@ -3765,6 +3515,15 @@ vppcom_worker_index (void)
   return vcl_get_worker_index ();
 }
 
+int
+vppcom_worker_mqs_epfd (void)
+{
+  vcl_worker_t *wrk = vcl_worker_get_current ();
+  if (!vcm->cfg.use_mq_eventfd)
+    return -1;
+  return wrk->mqs_epfd;
+}
+
 /*
  * fd.io coding-style-patch-verification: ON
  *