vcl: cap epoll dequeue batch size to max events
[vpp.git] / src / vcl / vppcom.c
index 6668d46..8b81e26 100644 (file)
 
 #include <stdio.h>
 #include <stdlib.h>
-#include <svm/svm_fifo_segment.h>
 #include <vcl/vppcom.h>
 #include <vcl/vcl_debug.h>
 #include <vcl/vcl_private.h>
+#include <svm/fifo_segment.h>
 
 __thread uword __vcl_worker_index = ~0;
 
@@ -44,13 +44,13 @@ vcl_wait_for_segment (u64 segment_handle)
 }
 
 static inline int
-vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq)
+vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq, u32 n_max_msg)
 {
   svm_msg_q_msg_t *msg;
   u32 n_msgs;
   int i;
 
-  n_msgs = svm_msg_q_size (mq);
+  n_msgs = clib_min (svm_msg_q_size (mq), n_max_msg);
   for (i = 0; i < n_msgs; i++)
     {
       vec_add2 (wrk->mq_msg_vector, msg, 1);
@@ -764,7 +764,7 @@ vcl_flush_mq_events (void)
 
   mq = wrk->app_event_queue;
   svm_msg_q_lock (mq);
-  vcl_mq_dequeue_batch (wrk, mq);
+  vcl_mq_dequeue_batch (wrk, mq, ~0);
   svm_msg_q_unlock (mq);
 
   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
@@ -924,8 +924,8 @@ vppcom_app_create (char *app_name)
   vcm->main_pid = getpid ();
   vcm->app_name = format (0, "%s", app_name);
   vppcom_init_error_string_table ();
-  svm_fifo_segment_main_init (&vcm->segment_main, vcl_cfg->segment_baseva,
-                             20 /* timeout in secs */ );
+  fifo_segment_main_init (&vcm->segment_main, vcl_cfg->segment_baseva,
+                         20 /* timeout in secs */ );
   pool_alloc (vcm->workers, vcl_cfg->max_workers);
   clib_spinlock_init (&vcm->workers_lock);
   clib_rwlock_init (&vcm->segment_table_lock);
@@ -1611,7 +1611,7 @@ vppcom_session_read_segments (uint32_t session_handle,
        }
     }
 
-  n_read = svm_fifo_segments (rx_fifo, (svm_fifo_segment_t *) ds);
+  n_read = svm_fifo_segments (rx_fifo, (svm_fifo_seg_t *) ds);
   svm_fifo_unset_event (rx_fifo);
 
   return n_read;
@@ -1628,7 +1628,7 @@ vppcom_session_free_segments (uint32_t session_handle,
   if (PREDICT_FALSE (!s || s->is_vep))
     return;
 
-  svm_fifo_segments_free (s->rx_fifo, (svm_fifo_segment_t *) ds);
+  svm_fifo_segments_free (s->rx_fifo, (svm_fifo_seg_t *) ds);
 }
 
 int
@@ -1895,7 +1895,7 @@ vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
            }
        }
     }
-  vcl_mq_dequeue_batch (wrk, mq);
+  vcl_mq_dequeue_batch (wrk, mq, ~0);
   svm_msg_q_unlock (mq);
 
   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
@@ -2479,7 +2479,8 @@ vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
            }
        }
     }
-  vcl_mq_dequeue_batch (wrk, mq);
+  ASSERT (maxevents > *num_ev);
+  vcl_mq_dequeue_batch (wrk, mq, maxevents - *num_ev);
   svm_msg_q_unlock (mq);
 
 handle_dequeued:
@@ -2487,10 +2488,7 @@ handle_dequeued:
     {
       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
       e = svm_msg_q_msg_data (mq, msg);
-      if (*num_ev < maxevents)
-       vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
-      else
-       vec_add1 (wrk->unhandled_evts_vector, *e);
+      vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
       svm_msg_q_free_msg (mq, msg);
     }
   vec_reset_length (wrk->mq_msg_vector);
@@ -2502,7 +2500,7 @@ static int
 vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events,
                           int maxevents, u32 n_evts, double wait_for_time)
 {
-  double wait = 0, start = 0;
+  double wait = 0, start = 0, now;
 
   if (!n_evts)
     {
@@ -2519,7 +2517,9 @@ vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events,
       if (wait == -1)
        continue;
 
-      wait = wait - (clib_time_now (&wrk->clib_time) - start);
+      now = clib_time_now (&wrk->clib_time);
+      wait -= now - start;
+      start = now;
     }
   while (wait > 0);
 
@@ -2586,11 +2586,11 @@ vppcom_epoll_wait (uint32_t vep_handle, struct epoll_event *events,
                                          events, &n_evts);
          if (n_evts == maxevents)
            {
-             i += 1;
-             break;
+             vec_delete (wrk->unhandled_evts_vector, i + 1, 0);
+             return n_evts;
            }
        }
-      vec_delete (wrk->unhandled_evts_vector, i, 0);
+      vec_reset_length (wrk->unhandled_evts_vector);
     }
 
   if (vcm->cfg.use_mq_eventfd)