vcl/ldp: fix poll
[vpp.git] / src / vcl / vppcom.c
index df4ebde..46b1c10 100644 (file)
@@ -1293,13 +1293,14 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
   is_ct = vcl_session_is_ct (s);
   mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
   rx_fifo = s->rx_fifo;
+  s->has_rx_evt = 0;
 
   if (svm_fifo_is_empty (rx_fifo))
     {
       if (is_nonblocking)
        {
          svm_fifo_unset_event (rx_fifo);
-         return VPPCOM_OK;
+         return VPPCOM_EWOULDBLOCK;
        }
       while (svm_fifo_is_empty (rx_fifo))
        {
@@ -1385,13 +1386,14 @@ vppcom_session_read_segments (uint32_t session_handle,
   is_ct = vcl_session_is_ct (s);
   mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
   rx_fifo = s->rx_fifo;
+  s->has_rx_evt = 0;
 
   if (svm_fifo_is_empty (rx_fifo))
     {
       if (is_nonblocking)
        {
          svm_fifo_unset_event (rx_fifo);
-         return VPPCOM_OK;
+         return VPPCOM_EWOULDBLOCK;
        }
       while (svm_fifo_is_empty (rx_fifo))
        {
@@ -1551,7 +1553,8 @@ vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
        {
          svm_fifo_set_want_tx_evt (tx_fifo, 1);
          svm_msg_q_lock (mq);
-         svm_msg_q_wait (mq);
+         if (svm_msg_q_is_empty (mq))
+           svm_msg_q_wait (mq);
 
          svm_msg_q_sub_w_lock (mq, &msg);
          e = svm_msg_q_msg_data (mq, &msg);
@@ -2303,11 +2306,12 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
       sid = e->fifo->client_session_index;
       session = vcl_session_get (wrk, sid);
       session_events = session->vep.ev.events;
-      if (!(EPOLLIN & session->vep.ev.events))
+      if (!(EPOLLIN & session->vep.ev.events) || session->has_rx_evt)
        break;
       add_event = 1;
       events[*num_ev].events |= EPOLLIN;
       session_evt_data = session->vep.ev.data.u64;
+      session->has_rx_evt = 1;
       break;
     case FIFO_EVENT_APP_TX:
       sid = e->fifo->client_session_index;
@@ -2324,11 +2328,12 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
       sid = session->session_index;
       session_events = session->vep.ev.events;
-      if (!(EPOLLIN & session->vep.ev.events))
+      if (!(EPOLLIN & session->vep.ev.events) || session->has_rx_evt)
        break;
       add_event = 1;
       events[*num_ev].events |= EPOLLIN;
       session_evt_data = session->vep.ev.data.u64;
+      session->has_rx_evt = 1;
       break;
     case SESSION_IO_EVT_CT_RX:
       session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
@@ -2452,15 +2457,13 @@ handle_dequeued:
     {
       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
       e = svm_msg_q_msg_data (mq, msg);
-      vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
+      if (*num_ev < maxevents)
+       vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
+      else
+       vec_add1 (wrk->unhandled_evts_vector, *e);
       svm_msg_q_free_msg (mq, msg);
-      if (*num_ev == maxevents)
-       {
-         i += 1;
-         break;
-       }
     }
-  vec_delete (wrk->mq_msg_vector, i, 0);
+  vec_reset_length (wrk->mq_msg_vector);
 
   return *num_ev;
 }
@@ -3182,6 +3185,8 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
   vcl_worker_t *wrk = vcl_worker_get_current ();
   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
   u32 i, keep_trying = 1;
+  svm_msg_q_msg_t msg;
+  session_event_t *e;
   int rv, num_ev = 0;
 
   VDBG (3, "VCL<%d>: vp %p, nsids %u, wait_for_time %f",
@@ -3194,23 +3199,33 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
     {
       vcl_session_t *session;
 
-      for (i = 0; i < n_sids; i++)
+      /* Dequeue all events and drop all unhandled io events */
+      while (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0) == 0)
        {
-         ASSERT (vp[i].revents);
+         e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
+         vcl_handle_mq_event (wrk, e);
+         svm_msg_q_free_msg (wrk->app_event_queue, &msg);
+       }
+      vec_reset_length (wrk->unhandled_evts_vector);
 
+      for (i = 0; i < n_sids; i++)
+       {
          session = vcl_session_get (wrk, vp[i].sid);
          if (!session)
-           continue;
+           {
+             vp[i].revents = POLLHUP;
+             num_ev++;
+             continue;
+           }
 
-         if (*vp[i].revents)
-           *vp[i].revents = 0;
+         vp[i].revents = 0;
 
          if (POLLIN & vp[i].events)
            {
              rv = vppcom_session_read_ready (session);
              if (rv > 0)
                {
-                 *vp[i].revents |= POLLIN;
+                 vp[i].revents |= POLLIN;
                  num_ev++;
                }
              else if (rv < 0)
@@ -3218,11 +3233,11 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
                  switch (rv)
                    {
                    case VPPCOM_ECONNRESET:
-                     *vp[i].revents = POLLHUP;
+                     vp[i].revents = POLLHUP;
                      break;
 
                    default:
-                     *vp[i].revents = POLLERR;
+                     vp[i].revents = POLLERR;
                      break;
                    }
                  num_ev++;
@@ -3234,7 +3249,7 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
              rv = vppcom_session_write_ready (session);
              if (rv > 0)
                {
-                 *vp[i].revents |= POLLOUT;
+                 vp[i].revents |= POLLOUT;
                  num_ev++;
                }
              else if (rv < 0)
@@ -3242,11 +3257,11 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
                  switch (rv)
                    {
                    case VPPCOM_ECONNRESET:
-                     *vp[i].revents = POLLHUP;
+                     vp[i].revents = POLLHUP;
                      break;
 
                    default:
-                     *vp[i].revents = POLLERR;
+                     vp[i].revents = POLLERR;
                      break;
                    }
                  num_ev++;
@@ -3255,7 +3270,7 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
 
          if (0)                // Note "done:" label used by VCL_SESSION_LOCK_AND_GET()
            {
-             *vp[i].revents = POLLNVAL;
+             vp[i].revents = POLLNVAL;
              num_ev++;
            }
        }
@@ -3271,7 +3286,7 @@ vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
        {
          clib_warning ("VCL<%d>: vp[%d].sid %d (0x%x), .events 0x%x, "
                        ".revents 0x%x", getpid (), i, vp[i].sid, vp[i].sid,
-                       vp[i].events, *vp[i].revents);
+                       vp[i].events, vp[i].revents);
        }
     }
   return num_ev;