vcl/ldp: fix poll
[vpp.git] / src / vcl / vppcom.c
index 16b467f..46b1c10 100644 (file)
@@ -17,7 +17,6 @@
 #include <stdlib.h>
 #include <svm/svm_fifo_segment.h>
 #include <vcl/vppcom.h>
-#include <vcl/vcl_event.h>
 #include <vcl/vcl_debug.h>
 #include <vcl/vcl_private.h>
 
@@ -1294,13 +1293,14 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
   is_ct = vcl_session_is_ct (s);
   mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
   rx_fifo = s->rx_fifo;
+  s->has_rx_evt = 0;
 
   if (svm_fifo_is_empty (rx_fifo))
     {
       if (is_nonblocking)
        {
          svm_fifo_unset_event (rx_fifo);
-         return VPPCOM_OK;
+         return VPPCOM_EWOULDBLOCK;
        }
       while (svm_fifo_is_empty (rx_fifo))
        {
@@ -1333,12 +1333,11 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
   if (svm_fifo_is_empty (rx_fifo))
     svm_fifo_unset_event (rx_fifo);
 
-  if (is_ct && n_read + svm_fifo_max_dequeue (rx_fifo) == rx_fifo->nitems)
+  if (is_ct && svm_fifo_want_tx_evt (rx_fifo))
     {
-      /* If the peer is not polling send notification */
-      if (!svm_fifo_has_event (s->rx_fifo))
-       app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo,
-                               SESSION_IO_EVT_CT_RX, SVM_Q_WAIT);
+      svm_fifo_set_want_tx_evt (s->rx_fifo, 0);
+      app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo, SESSION_IO_EVT_CT_RX,
+                             SVM_Q_WAIT);
     }
 
   VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes from (%p)",
@@ -1387,13 +1386,14 @@ vppcom_session_read_segments (uint32_t session_handle,
   is_ct = vcl_session_is_ct (s);
   mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
   rx_fifo = s->rx_fifo;
+  s->has_rx_evt = 0;
 
   if (svm_fifo_is_empty (rx_fifo))
     {
       if (is_nonblocking)
        {
          svm_fifo_unset_event (rx_fifo);
-         return VPPCOM_OK;
+         return VPPCOM_EWOULDBLOCK;
        }
       while (svm_fifo_is_empty (rx_fifo))
        {
@@ -1553,7 +1553,8 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
        {
          svm_fifo_set_want_tx_evt (tx_fifo, 1);
          svm_msg_q_lock (mq);
-         svm_msg_q_wait (mq);
+         if (svm_msg_q_is_empty (mq))
+           svm_msg_q_wait (mq);
 
          svm_msg_q_sub_w_lock (mq, &msg);
          e = svm_msg_q_msg_data (mq, &msg);
@@ -2305,11 +2306,12 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
       sid = e->fifo->client_session_index;
       session = vcl_session_get (wrk, sid);
       session_events = session->vep.ev.events;
-      if (!(EPOLLIN & session->vep.ev.events))
+      if (!(EPOLLIN & session->vep.ev.events) || session->has_rx_evt)
        break;
       add_event = 1;
       events[*num_ev].events |= EPOLLIN;
       session_evt_data = session->vep.ev.data.u64;
+      session->has_rx_evt = 1;
       break;
     case FIFO_EVENT_APP_TX:
       sid = e->fifo->client_session_index;
@@ -2326,11 +2328,12 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
       sid = session->session_index;
       session_events = session->vep.ev.events;
-      if (!(EPOLLIN & session->vep.ev.events))
+      if (!(EPOLLIN & session->vep.ev.events) || session->has_rx_evt)
        break;
       add_event = 1;
       events[*num_ev].events |= EPOLLIN;
       session_evt_data = session->vep.ev.data.u64;
+      session->has_rx_evt = 1;
       break;
     case SESSION_IO_EVT_CT_RX:
       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
@@ -2422,6 +2425,9 @@ vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
   session_event_t *e;
   int i;
 
+  if (vec_len (wrk->mq_msg_vector) && svm_msg_q_is_empty (mq))
+    goto handle_dequeued;
+
   svm_msg_q_lock (mq);
   if (svm_msg_q_is_empty (mq))
     {
@@ -2446,20 +2452,18 @@ vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
   vcl_mq_dequeue_batch (wrk, mq);
   svm_msg_q_unlock (mq);
 
+handle_dequeued:
   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
     {
       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
       e = svm_msg_q_msg_data (mq, msg);
-      vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
+      if (*num_ev < maxevents)
+       vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
+      else
+       vec_add1 (wrk->unhandled_evts_vector, *e);
       svm_msg_q_free_msg (mq, msg);
-      if (*num_ev == maxevents)
-       {
-         i += 1;
-         break;
-       }
     }
-
-  vec_delete (wrk->mq_msg_vector, i, 0);
+  vec_reset_length (wrk->mq_msg_vector);
 
   return *num_ev;
 }
@@ -2507,6 +2511,7 @@ vppcom_epoll_wait_eventfd (vcl_worker_t * wrk, struct epoll_event *events,
   u64 buf;
 
   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
+again:
   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
                          vec_len (wrk->mq_events), wait_for_time);
   for (i = 0; i < n_mq_evts; i++)
@@ -2515,6 +2520,8 @@ vppcom_epoll_wait_eventfd (vcl_worker_t * wrk, struct epoll_event *events,
       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
       vcl_epoll_wait_handle_mq (wrk, mqc->mq, events, maxevents, 0, &n_evts);
     }
+  if (!n_evts && n_mq_evts > 0)
+    goto again;
 
   return (int) n_evts;
 }
@@ -3178,6 +3185,8 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
   vcl_worker_t *wrk = vcl_worker_get_current ();
   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
   u32 i, keep_trying = 1;
+  svm_msg_q_msg_t msg;
+  session_event_t *e;
   int rv, num_ev = 0;
 
   VDBG (3, "VCL<%d>: vp %p, nsids %u, wait_for_time %f",
@@ -3190,23 +3199,33 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
     {
       vcl_session_t *session;
 
-      for (i = 0; i < n_sids; i++)
+      /* Dequeue all events and drop all unhandled io events */
+      while (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0) == 0)
        {
-         ASSERT (vp[i].revents);
+         e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
+         vcl_handle_mq_event (wrk, e);
+         svm_msg_q_free_msg (wrk->app_event_queue, &msg);
+       }
+      vec_reset_length (wrk->unhandled_evts_vector);
 
+      for (i = 0; i < n_sids; i++)
+       {
          session = vcl_session_get (wrk, vp[i].sid);
          if (!session)
-           continue;
+           {
+             vp[i].revents = POLLHUP;
+             num_ev++;
+             continue;
+           }
 
-         if (*vp[i].revents)
-           *vp[i].revents = 0;
+         vp[i].revents = 0;
 
          if (POLLIN & vp[i].events)
            {
              rv = vppcom_session_read_ready (session);
              if (rv > 0)
                {
-                 *vp[i].revents |= POLLIN;
+                 vp[i].revents |= POLLIN;
                  num_ev++;
                }
              else if (rv < 0)
@@ -3214,11 +3233,11 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
                  switch (rv)
                    {
                    case VPPCOM_ECONNRESET:
-                     *vp[i].revents = POLLHUP;
+                     vp[i].revents = POLLHUP;
                      break;
 
                    default:
-                     *vp[i].revents = POLLERR;
+                     vp[i].revents = POLLERR;
                      break;
                    }
                  num_ev++;
@@ -3230,7 +3249,7 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
              rv = vppcom_session_write_ready (session);
              if (rv > 0)
                {
-                 *vp[i].revents |= POLLOUT;
+                 vp[i].revents |= POLLOUT;
                  num_ev++;
                }
              else if (rv < 0)
@@ -3238,11 +3257,11 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
                  switch (rv)
                    {
                    case VPPCOM_ECONNRESET:
-                     *vp[i].revents = POLLHUP;
+                     vp[i].revents = POLLHUP;
                      break;
 
                    default:
-                     *vp[i].revents = POLLERR;
+                     vp[i].revents = POLLERR;
                      break;
                    }
                  num_ev++;
@@ -3251,7 +3270,7 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
 
          if (0)                // Note "done:" label used by VCL_SESSION_LOCK_AND_GET()
            {
-             *vp[i].revents = POLLNVAL;
+             vp[i].revents = POLLNVAL;
              num_ev++;
            }
        }
@@ -3267,7 +3286,7 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
        {
          clib_warning ("VCL<%d>: vp[%d].sid %d (0x%x), .events 0x%x, "
                        ".revents 0x%x", getpid (), i, vp[i].sid, vp[i].sid,
-                       vp[i].events, *vp[i].revents);
+                       vp[i].events, vp[i].revents);
        }
     }
   return num_ev;