vcl: improve handling of close on rw
[vpp.git] / src / vcl / vppcom.c
index c91d0f4..3c3e15d 100644 (file)
@@ -30,7 +30,7 @@ vcl_wait_for_segment (u64 segment_handle)
   f64 timeout;
 
   if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
-    return 1;
+    return 0;
 
   timeout = clib_time_now (&wrk->clib_time) + wait_for_seconds;
   while (clib_time_now (&wrk->clib_time) < timeout)
@@ -94,6 +94,14 @@ vppcom_session_state_str (session_state_t state)
       st = "STATE_FAILED";
       break;
 
+    case STATE_UPDATED:
+      st = "STATE_UPDATED";
+      break;
+
+    case STATE_LISTEN_NO_MQ:
+      st = "STATE_LISTEN_NO_MQ";
+      break;
+
     default:
       st = "UNKNOWN_STATE";
       break;
@@ -505,6 +513,10 @@ vcl_session_bound_handler (vcl_worker_t * wrk, session_bound_msg_t * mp)
   vcl_session_table_add_listener (wrk, mp->handle, sid);
   session->session_state = STATE_LISTEN;
 
+  session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
+  vec_validate (wrk->vpp_event_queues, 0);
+  wrk->vpp_event_queues[0] = session->vpp_evt_q;
+
   if (session->is_dgram)
     {
       svm_fifo_t *rx_fifo, *tx_fifo;
@@ -607,13 +619,16 @@ vcl_session_worker_update_reply_handler (vcl_worker_t * wrk, void *data)
                    s->session_index);
       return;
     }
-  s->rx_fifo = uword_to_pointer (msg->rx_fifo, svm_fifo_t *);
-  s->tx_fifo = uword_to_pointer (msg->tx_fifo, svm_fifo_t *);
 
-  s->rx_fifo->client_session_index = s->session_index;
-  s->tx_fifo->client_session_index = s->session_index;
-  s->rx_fifo->client_thread_index = wrk->wrk_index;
-  s->tx_fifo->client_thread_index = wrk->wrk_index;
+  if (s->rx_fifo)
+    {
+      s->rx_fifo = uword_to_pointer (msg->rx_fifo, svm_fifo_t *);
+      s->tx_fifo = uword_to_pointer (msg->tx_fifo, svm_fifo_t *);
+      s->rx_fifo->client_session_index = s->session_index;
+      s->tx_fifo->client_session_index = s->session_index;
+      s->rx_fifo->client_thread_index = wrk->wrk_index;
+      s->tx_fifo->client_thread_index = wrk->wrk_index;
+    }
   s->session_state = STATE_UPDATED;
 
   VDBG (0, "session %u[0x%llx] moved to worker %u", s->session_index,
@@ -646,7 +661,6 @@ vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
       session = vcl_session_disconnected_handler (wrk, disconnected_msg);
       if (!session)
        break;
-      session->session_state = STATE_DISCONNECT;
       VDBG (0, "disconnected session %u [0x%llx]", session->session_index,
            session->vpp_handle);
       break;
@@ -811,11 +825,11 @@ vppcom_session_unbind (u32 session_handle)
   session->vpp_handle = ~0;
   session->session_state = STATE_DISCONNECT;
 
-  VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending unbind msg! new state"
-       " 0x%x (%s)", getpid (), vpp_handle, session_handle, STATE_DISCONNECT,
+  VDBG (1, "vpp handle 0x%llx, sid %u: sending unbind msg! new state"
+       " 0x%x (%s)", vpp_handle, session_handle, STATE_DISCONNECT,
        vppcom_session_state_str (STATE_DISCONNECT));
   vcl_evt (VCL_EVT_UNBIND, session);
-  vppcom_send_unbind_sock (vpp_handle);
+  vppcom_send_unbind_sock (wrk, vpp_handle);
 
   return VPPCOM_OK;
 }
@@ -1053,7 +1067,7 @@ vcl_session_cleanup (vcl_worker_t * wrk, vcl_session_t * session,
 
       if (!do_disconnect)
        {
-         VDBG (0, "session %u [0x%llx] disconnect skipped",
+         VDBG (1, "session %u [0x%llx] disconnect skipped",
                session->session_index, vpp_handle);
          goto cleanup;
        }
@@ -1099,11 +1113,11 @@ vcl_session_cleanup (vcl_worker_t * wrk, vcl_session_t * session,
       vcl_ct_registration_unlock (wrk);
     }
 
+  VDBG (0, "session %u [0x%llx] removed", session->session_index, vpp_handle);
+
 cleanup:
   vcl_session_table_del_vpp_handle (wrk, vpp_handle);
   vcl_session_free (wrk, session);
-
-  VDBG (0, "session %u [0x%llx] removed", session->session_index, vpp_handle);
   vcl_evt (VCL_EVT_CLOSE, session, rv);
 
   return rv;
@@ -1499,20 +1513,12 @@ vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
     return (e->event_type == SESSION_IO_EVT_CT_TX);
 }
 
-static inline u8
-vcl_session_is_readable (vcl_session_t * s)
-{
-  return ((s->session_state & STATE_OPEN)
-         || (s->session_state == STATE_LISTEN
-             && s->session_type == VPPCOM_PROTO_UDP));
-}
-
 static inline int
 vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
                              u8 peek)
 {
   vcl_worker_t *wrk = vcl_worker_get_current ();
-  int n_read = 0, rv, is_nonblocking;
+  int n_read = 0, is_nonblocking;
   vcl_session_t *s = 0;
   svm_fifo_t *rx_fifo;
   svm_msg_q_msg_t msg;
@@ -1527,15 +1533,12 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
   if (PREDICT_FALSE (!s || s->is_vep))
     return VPPCOM_EBADFD;
 
-  if (PREDICT_FALSE (!vcl_session_is_readable (s)))
+  if (PREDICT_FALSE (!vcl_session_is_open (s)))
     {
-      session_state_t state = s->session_state;
-      rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
-
-      VDBG (0, "session handle %u[0x%llx] is not open! state 0x%x (%s),"
-           " returning %d (%s)", session_handle, s->vpp_handle, state,
-           vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
-      return rv;
+      VDBG (0, "session handle %u[0x%llx] is not open! state 0x%x (%s)",
+           s->session_index, s->vpp_handle, s->session_state,
+           vppcom_session_state_str (s->session_state));
+      return vcl_session_closed_error (s);
     }
 
   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
@@ -1553,6 +1556,9 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
        }
       while (svm_fifo_is_empty (rx_fifo))
        {
+         if (vcl_session_is_closing (s))
+           return vcl_session_closing_error (s);
+
          svm_fifo_unset_event (rx_fifo);
          svm_msg_q_lock (mq);
          if (svm_msg_q_is_empty (mq))
@@ -1564,9 +1570,6 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
          if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
            vcl_handle_mq_event (wrk, e);
          svm_msg_q_free_msg (mq, &msg);
-
-         if (PREDICT_FALSE (s->session_state == STATE_DISCONNECT))
-           return VPPCOM_ECONNRESET;
        }
     }
 
@@ -1608,7 +1611,7 @@ vppcom_session_read_segments (uint32_t session_handle,
                              vppcom_data_segments_t ds)
 {
   vcl_worker_t *wrk = vcl_worker_get_current ();
-  int n_read = 0, rv, is_nonblocking;
+  int n_read = 0, is_nonblocking;
   vcl_session_t *s = 0;
   svm_fifo_t *rx_fifo;
   svm_msg_q_msg_t msg;
@@ -1620,12 +1623,8 @@ vppcom_session_read_segments (uint32_t session_handle,
   if (PREDICT_FALSE (!s || s->is_vep))
     return VPPCOM_EBADFD;
 
-  if (PREDICT_FALSE (!vcl_session_is_readable (s)))
-    {
-      session_state_t state = s->session_state;
-      rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
-      return rv;
-    }
+  if (PREDICT_FALSE (!vcl_session_is_open (s)))
+    return vcl_session_closed_error (s);
 
   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
   is_ct = vcl_session_is_ct (s);
@@ -1642,6 +1641,9 @@ vppcom_session_read_segments (uint32_t session_handle,
        }
       while (svm_fifo_is_empty (rx_fifo))
        {
+         if (vcl_session_is_closing (s))
+           return vcl_session_closing_error (s);
+
          svm_fifo_unset_event (rx_fifo);
          svm_msg_q_lock (mq);
          if (svm_msg_q_is_empty (mq))
@@ -1653,9 +1655,6 @@ vppcom_session_read_segments (uint32_t session_handle,
          if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
            vcl_handle_mq_event (wrk, e);
          svm_msg_q_free_msg (mq, &msg);
-
-         if (PREDICT_FALSE (s->session_state == STATE_DISCONNECT))
-           return VPPCOM_ECONNRESET;
        }
     }
 
@@ -1687,37 +1686,6 @@ vppcom_session_free_segments (uint32_t session_handle,
   svm_fifo_segments_free (s->rx_fifo, (svm_fifo_segment_t *) ds);
 }
 
-static inline int
-vppcom_session_read_ready (vcl_session_t * session)
-{
-  /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
-  if (PREDICT_FALSE (session->is_vep))
-    {
-      clib_warning ("VCL<%d>: ERROR: sid %u: cannot read from an "
-                   "epoll session!", getpid (), session->session_index);
-      return VPPCOM_EBADFD;
-    }
-
-  if (PREDICT_FALSE (!(session->session_state & (STATE_OPEN | STATE_LISTEN))))
-    {
-      session_state_t state = session->session_state;
-      int rv;
-
-      rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
-
-      VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open!"
-           " state 0x%x (%s), returning %d (%s)", getpid (),
-           session->vpp_handle, session->session_index, state,
-           vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
-      return rv;
-    }
-
-  if (session->session_state & STATE_LISTEN)
-    return clib_fifo_elts (session->accept_evts_fifo);
-
-  return svm_fifo_max_dequeue (session->rx_fifo);
-}
-
 int
 vppcom_data_segment_copy (void *buf, vppcom_data_segments_t ds, u32 max_bytes)
 {
@@ -1746,7 +1714,7 @@ vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n,
                             u8 is_flush)
 {
   vcl_worker_t *wrk = vcl_worker_get_current ();
-  int rv, n_write, is_nonblocking;
+  int n_write, is_nonblocking;
   vcl_session_t *s = 0;
   svm_fifo_t *tx_fifo = 0;
   session_evt_type_t et;
@@ -1764,21 +1732,17 @@ vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n,
 
   if (PREDICT_FALSE (s->is_vep))
     {
-      clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
-                   "cannot write to an epoll session!",
-                   getpid (), s->vpp_handle, session_handle);
-
+      VDBG (0, "ERROR: session %u [0x%llx]: cannot write to an epoll"
+           " session!", s->session_index, s->vpp_handle);
       return VPPCOM_EBADFD;
     }
 
-  if (PREDICT_FALSE (!(s->session_state & STATE_OPEN)))
+  if (PREDICT_FALSE (!vcl_session_is_open (s)))
     {
-      session_state_t state = s->session_state;
-      rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
-      VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open! "
-           "state 0x%x (%s)", getpid (), s->vpp_handle, session_handle,
-           state, vppcom_session_state_str (state));
-      return rv;
+      VDBG (1, "session %u [0x%llx]: is not open! state 0x%x (%s)",
+           s->session_index, s->vpp_handle, s->session_state,
+           vppcom_session_state_str (s->session_state));
+      return vcl_session_closed_error (s);;
     }
 
   tx_fifo = s->tx_fifo;
@@ -1794,6 +1758,8 @@ vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n,
       while (svm_fifo_is_full (tx_fifo))
        {
          svm_fifo_add_want_tx_ntf (tx_fifo, SVM_FIFO_WANT_TX_NOTIF);
+         if (vcl_session_is_closing (s))
+           return vcl_session_closing_error (s);
          svm_msg_q_lock (mq);
          if (svm_msg_q_is_empty (mq))
            svm_msg_q_wait (mq);
@@ -1805,9 +1771,6 @@ vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n,
          if (!vcl_is_tx_evt_for_session (e, s->session_index, is_ct))
            vcl_handle_mq_event (wrk, e);
          svm_msg_q_free_msg (mq, &msg);
-
-         if (PREDICT_FALSE (!(s->session_state & STATE_OPEN)))
-           return VPPCOM_ECONNRESET;
        }
     }
 
@@ -1825,8 +1788,8 @@ vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n,
 
   ASSERT (n_write > 0);
 
-  VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: wrote %d bytes", getpid (),
-       s->vpp_handle, session_handle, n_write);
+  VDBG (2, "session %u [0x%llx]: wrote %d bytes", s->session_index,
+       s->vpp_handle, n_write);
 
   return n_write;
 }
@@ -1871,41 +1834,6 @@ vcl_ct_session_get_from_fifo (vcl_worker_t * wrk, svm_fifo_t * f, u8 type)
   return 0;
 }
 
-static inline int
-vppcom_session_write_ready (vcl_session_t * session)
-{
-  /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
-  if (PREDICT_FALSE (session->is_vep))
-    {
-      VDBG (0, "session %u [0x%llx]: cannot write to an epoll session!",
-           session->session_index, session->vpp_handle);
-      return VPPCOM_EBADFD;
-    }
-
-  if (PREDICT_FALSE (session->session_state & STATE_LISTEN))
-    {
-      if (session->tx_fifo)
-       return svm_fifo_max_enqueue (session->tx_fifo);
-      else
-       return VPPCOM_EBADFD;
-    }
-
-  if (PREDICT_FALSE (!(session->session_state & STATE_OPEN)))
-    {
-      session_state_t state = session->session_state;
-      int rv;
-
-      rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
-      VDBG (0, "session %u [0x%llx]: session is not open! state 0x%x (%s), "
-           "returning %d (%s)", session->session_index, session->vpp_handle,
-           state, vppcom_session_state_str (state), rv,
-           vppcom_retval_str (rv));
-      return rv;
-    }
-
-  return svm_fifo_max_enqueue (session->tx_fifo);
-}
-
 #define vcl_fifo_rx_evt_valid_or_break(_fifo)                  \
 if (PREDICT_FALSE (svm_fifo_is_empty (_fifo)))                 \
   {                                                            \
@@ -2200,7 +2128,7 @@ check_rd:
         continue;
       }
 
-    rv = vppcom_session_read_ready (session);
+    rv = vcl_session_read_ready (session);
     if (rv)
       {
         clib_bitmap_set_no_check ((uword*)read_map, sid, 1);
@@ -2833,12 +2761,12 @@ vppcom_session_attr (uint32_t session_handle, uint32_t op,
   switch (op)
     {
     case VPPCOM_ATTR_GET_NREAD:
-      rv = vppcom_session_read_ready (session);
+      rv = vcl_session_read_ready (session);
       VDBG (2, "VPPCOM_ATTR_GET_NREAD: sid %u, nread = %d", rv);
       break;
 
     case VPPCOM_ATTR_GET_NWRITE:
-      rv = vppcom_session_write_ready (session);
+      rv = vcl_session_write_ready (session);
       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_NWRITE: sid %u, nwrite = %d",
            getpid (), session_handle, rv);
       break;
@@ -3477,7 +3405,7 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
 
          if (POLLIN & vp[i].events)
            {
-             rv = vppcom_session_read_ready (session);
+             rv = vcl_session_read_ready (session);
              if (rv > 0)
                {
                  vp[i].revents |= POLLIN;
@@ -3501,7 +3429,7 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
 
          if (POLLOUT & vp[i].events)
            {
-             rv = vppcom_session_write_ready (session);
+             rv = vcl_session_write_ready (session);
              if (rv > 0)
                {
                  vp[i].revents |= POLLOUT;
@@ -3587,6 +3515,15 @@ vppcom_worker_index (void)
   return vcl_get_worker_index ();
 }
 
+int
+vppcom_worker_mqs_epfd (void)
+{
+  vcl_worker_t *wrk = vcl_worker_get_current ();
+  if (!vcm->cfg.use_mq_eventfd)
+    return -1;
+  return wrk->mqs_epfd;
+}
+
 /*
  * fd.io coding-style-patch-verification: ON
  *