vcl: improve read and fifo event handling
[vpp.git] / src / vcl / vppcom.c
index 87f29e3..6d6e7d0 100644 (file)
@@ -285,6 +285,8 @@ vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp)
       vcl_wait_for_memory (session->vpp_evt_q);
       rx_fifo->master_session_index = session->session_index;
       tx_fifo->master_session_index = session->session_index;
+      rx_fifo->master_thread_index = vcl_get_worker_index ();
+      tx_fifo->master_thread_index = vcl_get_worker_index ();
       vec_validate (wrk->vpp_event_queues, 0);
       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
       wrk->vpp_event_queues[0] = evt_q;
@@ -295,7 +297,8 @@ vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp)
                                             svm_msg_q_t *);
       rx_fifo->client_session_index = session->session_index;
       tx_fifo->client_session_index = session->session_index;
-
+      rx_fifo->client_thread_index = vcl_get_worker_index ();
+      tx_fifo->client_thread_index = vcl_get_worker_index ();
       vpp_wrk_index = tx_fifo->master_thread_index;
       vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
       wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
@@ -349,8 +352,7 @@ vcl_session_connected_handler (vcl_worker_t * wrk,
   if (mp->retval)
     {
       clib_warning ("VCL<%d>: ERROR: sid %u: connect failed! %U", getpid (),
-                   mp->handle, session_index, format_api_error,
-                   ntohl (mp->retval));
+                   session_index, format_api_error, ntohl (mp->retval));
       session->session_state = STATE_FAILED;
       session->vpp_handle = mp->handle;
       return session_index;
@@ -361,6 +363,8 @@ vcl_session_connected_handler (vcl_worker_t * wrk,
   vcl_wait_for_memory (rx_fifo);
   rx_fifo->client_session_index = session_index;
   tx_fifo->client_session_index = session_index;
+  rx_fifo->client_thread_index = vcl_get_worker_index ();
+  tx_fifo->client_thread_index = vcl_get_worker_index ();
 
   if (mp->client_event_queue_address)
     {
@@ -614,13 +618,13 @@ vppcom_app_attach (void)
 }
 
 static int
-vppcom_session_unbind (u32 session_index)
+vppcom_session_unbind (u32 session_handle)
 {
   vcl_worker_t *wrk = vcl_worker_get_current ();
   vcl_session_t *session = 0;
   u64 vpp_handle;
 
-  session = vcl_session_get (wrk, session_index);
+  session = vcl_session_get_w_handle (wrk, session_handle);
   if (!session)
     return VPPCOM_EBADFD;
 
@@ -630,7 +634,7 @@ vppcom_session_unbind (u32 session_index)
   session->session_state = STATE_DISCONNECT;
 
   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending unbind msg! new state"
-       " 0x%x (%s)", getpid (), vpp_handle, session_index, STATE_DISCONNECT,
+       " 0x%x (%s)", getpid (), vpp_handle, session_handle, STATE_DISCONNECT,
        vppcom_session_state_str (STATE_DISCONNECT));
   vcl_evt (VCL_EVT_UNBIND, session);
   vppcom_send_unbind_sock (vpp_handle);
@@ -639,7 +643,7 @@ vppcom_session_unbind (u32 session_index)
 }
 
 static int
-vppcom_session_disconnect (u32 session_index)
+vppcom_session_disconnect (u32 session_handle)
 {
   vcl_worker_t *wrk = vcl_worker_get_current ();
   svm_msg_q_t *vpp_evt_q;
@@ -647,18 +651,21 @@ vppcom_session_disconnect (u32 session_index)
   session_state_t state;
   u64 vpp_handle;
 
-  session = vcl_session_get (wrk, session_index);
+  session = vcl_session_get_w_handle (wrk, session_handle);
+  if (!session)
+    return VPPCOM_EBADFD;
+
   vpp_handle = session->vpp_handle;
   state = session->session_state;
 
   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u state 0x%x (%s)", getpid (),
-       vpp_handle, session_index, state, vppcom_session_state_str (state));
+       vpp_handle, session_handle, state, vppcom_session_state_str (state));
 
   if (PREDICT_FALSE (state & STATE_LISTEN))
     {
       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
                    "Cannot disconnect a listen socket!",
-                   getpid (), vpp_handle, session_index);
+                   getpid (), vpp_handle, session_handle);
       return VPPCOM_EBADFD;
     }
 
@@ -668,13 +675,13 @@ vppcom_session_disconnect (u32 session_index)
       vcl_send_session_disconnected_reply (vpp_evt_q, vcm->my_client_index,
                                           vpp_handle, 0);
       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect "
-           "REPLY...", getpid (), vpp_handle, session_index);
+           "REPLY...", getpid (), vpp_handle, session_handle);
     }
   else
     {
       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect...",
-           getpid (), vpp_handle, session_index);
-      vppcom_send_disconnect_session (vpp_handle, session_index);
+           getpid (), vpp_handle, session_handle);
+      vppcom_send_disconnect_session (vpp_handle);
     }
 
   return VPPCOM_OK;
@@ -700,6 +707,7 @@ vppcom_app_create (char *app_name)
       svm_fifo_segment_main_init (vcl_cfg->segment_baseva,
                                  20 /* timeout in secs */ );
       pool_init_fixed (vcm->workers, vcl_cfg->max_workers);
+      clib_spinlock_init (&vcm->workers_lock);
       vcl_worker_alloc_and_init ();
     }
 
@@ -1142,7 +1150,7 @@ handle:
   vcl_evt (VCL_EVT_ACCEPT, client_session, listen_session,
           client_session_index);
 
-  return client_session_index;
+  return vcl_session_handle (client_session);
 }
 
 int
@@ -1260,7 +1268,7 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
   svm_msg_q_msg_t msg;
   session_event_t *e;
   svm_msg_q_t *mq;
-  u8 is_full;
+  u8 is_ct;
 
   if (PREDICT_FALSE (!buf))
     return VPPCOM_EINVAL;
@@ -1291,18 +1299,19 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
       return rv;
     }
 
-  mq = vcl_session_is_ct (s) ? s->our_evt_q : wrk->app_event_queue;
-  svm_fifo_unset_event (rx_fifo);
-  is_full = svm_fifo_is_full (rx_fifo);
+  is_ct = vcl_session_is_ct (s);
+  mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
 
   if (svm_fifo_is_empty (rx_fifo))
     {
       if (is_nonblocking)
        {
+         svm_fifo_unset_event (rx_fifo);
          return VPPCOM_OK;
        }
-      while (1)
+      while (svm_fifo_is_empty (rx_fifo))
        {
+         svm_fifo_unset_event (rx_fifo);
          svm_msg_q_lock (mq);
          if (svm_msg_q_is_empty (mq))
            svm_msg_q_wait (mq);
@@ -1310,20 +1319,16 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
          svm_msg_q_sub_w_lock (mq, &msg);
          e = svm_msg_q_msg_data (mq, &msg);
          svm_msg_q_unlock (mq);
-         if (!vcl_is_rx_evt_for_session (e, s->session_index,
-                                         s->our_evt_q != 0))
+         if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
            {
              vcl_handle_mq_ctrl_event (wrk, e);
              svm_msg_q_free_msg (mq, &msg);
              continue;
            }
-         svm_fifo_unset_event (rx_fifo);
          svm_msg_q_free_msg (mq, &msg);
+
          if (PREDICT_FALSE (s->session_state == STATE_CLOSE_ON_EMPTY))
            return 0;
-         if (svm_fifo_is_empty (rx_fifo))
-           continue;
-         break;
        }
     }
 
@@ -1332,7 +1337,10 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
   else
     n_read = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
 
-  if (vcl_session_is_ct (s) && is_full)
+  if (svm_fifo_is_empty (rx_fifo))
+    svm_fifo_unset_event (rx_fifo);
+
+  if (is_ct && n_read + svm_fifo_max_dequeue (rx_fifo) == rx_fifo->nitems)
     {
       /* If the peer is not polling send notification */
       if (!svm_fifo_has_event (s->rx_fifo))
@@ -1580,6 +1588,14 @@ vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq)
   return n_msgs;
 }
 
+#define vcl_fifo_rx_evt_valid_or_break(_fifo)                  \
+if (PREDICT_FALSE (svm_fifo_is_empty (_fifo)))                 \
+  {                                                            \
+    svm_fifo_unset_event (_fifo);                              \
+    if (svm_fifo_is_empty (_fifo))                             \
+      break;                                                   \
+  }                                                            \
+
 static int
 vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
                      unsigned long n_bits, unsigned long *read_map,
@@ -1633,8 +1649,11 @@ vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
       switch (e->event_type)
        {
        case FIFO_EVENT_APP_RX:
+         vcl_fifo_rx_evt_valid_or_break (e->fifo);
          sid = e->fifo->client_session_index;
          session = vcl_session_get (wrk, sid);
+         if (!session)
+           break;
          if (sid < n_bits && read_map)
            {
              clib_bitmap_set_no_check (read_map, sid, 1);
@@ -1653,7 +1672,10 @@ vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
            }
          break;
        case SESSION_IO_EVT_CT_TX:
+         vcl_fifo_rx_evt_valid_or_break (e->fifo);
          session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
+         if (!session)
+           break;
          sid = session->session_index;
          if (sid < n_bits && read_map)
            {
@@ -1663,9 +1685,9 @@ vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
          break;
        case SESSION_IO_EVT_CT_RX:
          session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
-         sid = session->session_index;
          if (!session)
            break;
+         sid = session->session_index;
          if (sid < n_bits && write_map)
            {
              clib_bitmap_set_no_check (write_map, sid, 1);
@@ -1977,7 +1999,7 @@ vppcom_epoll_create (void)
   VDBG (0, "VCL<%d>: Created vep_idx %u / sid %u!",
        getpid (), vep_session->session_index, vep_session->session_index);
 
-  return (vep_session->session_index);
+  return vcl_session_handle (vep_session);
 }
 
 int
@@ -2037,7 +2059,8 @@ vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
       if (vep_session->vep.next_sh != ~0)
        {
          vcl_session_t *next_session;
-         next_session = vcl_session_get (wrk, vep_session->vep.next_sh);
+         next_session = vcl_session_get_w_handle (wrk,
+                                                  vep_session->vep.next_sh);
          if (PREDICT_FALSE (!next_session))
            {
              clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_ADD: Invalid "
@@ -2121,7 +2144,7 @@ vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
       else
        {
          vcl_session_t *prev_session;
-         prev_session = vcl_session_get (wrk, session->vep.prev_sh);
+         prev_session = vcl_session_get_w_handle (wrk, session->vep.prev_sh);
          if (PREDICT_FALSE (!prev_session))
            {
              clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_DEL: Invalid "
@@ -2135,7 +2158,7 @@ vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
       if (session->vep.next_sh != ~0)
        {
          vcl_session_t *next_session;
-         next_session = vcl_session_get (wrk, session->vep.next_sh);
+         next_session = vcl_session_get_w_handle (wrk, session->vep.next_sh);
          if (PREDICT_FALSE (!next_session))
            {
              clib_warning ("VCL<%d>: ERROR: EPOLL_CTL_DEL: Invalid "
@@ -2217,6 +2240,8 @@ vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
       switch (e->event_type)
        {
        case FIFO_EVENT_APP_RX:
+         ASSERT (e->fifo->client_thread_index == vcl_get_worker_index ());
+         vcl_fifo_rx_evt_valid_or_break (e->fifo);
          sid = e->fifo->client_session_index;
          session = vcl_session_get (wrk, sid);
          session_events = session->vep.ev.events;
@@ -2237,6 +2262,7 @@ vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
          session_evt_data = session->vep.ev.data.u64;
          break;
        case SESSION_IO_EVT_CT_TX:
+         vcl_fifo_rx_evt_valid_or_break (e->fifo);
          session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
          sid = session->session_index;
          session_events = session->vep.ev.events;
@@ -2314,7 +2340,7 @@ vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
          session_events = session->vep.ev.events;
          break;
        default:
-         clib_warning ("unhandled: %u", e->event_type);
+         VDBG (0, "unhandled: %u", e->event_type);
          svm_msg_q_free_msg (mq, msg);
          continue;
        }
@@ -2410,6 +2436,9 @@ vppcom_epoll_wait (uint32_t vep_handle, struct epoll_event *events,
     }
 
   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
+  if (!vep_session)
+    return VPPCOM_EBADFD;
+
   if (PREDICT_FALSE (!vep_session->is_vep))
     {
       clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
@@ -3142,7 +3171,7 @@ vppcom_session_index (uint32_t session_handle)
 int
 vppcom_worker_register (void)
 {
-  if (!vcl_worker_alloc_and_init ())
+  if (vcl_worker_alloc_and_init ())
     return VPPCOM_OK;
   return VPPCOM_EEXIST;
 }