}
static inline int
-vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq)
+vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq, u32 n_max_msg)
{
svm_msg_q_msg_t *msg;
u32 n_msgs;
int i;
- n_msgs = svm_msg_q_size (mq);
+ n_msgs = clib_min (svm_msg_q_size (mq), n_max_msg);
for (i = 0; i < n_msgs; i++)
{
vec_add2 (wrk->mq_msg_vector, msg, 1);
mq = wrk->app_event_queue;
svm_msg_q_lock (mq);
- vcl_mq_dequeue_batch (wrk, mq);
+ vcl_mq_dequeue_batch (wrk, mq, ~0);
svm_msg_q_unlock (mq);
for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
}
}
}
- vcl_mq_dequeue_batch (wrk, mq);
+ vcl_mq_dequeue_batch (wrk, mq, ~0);
svm_msg_q_unlock (mq);
for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
}
}
}
- vcl_mq_dequeue_batch (wrk, mq);
+ ASSERT (maxevents > *num_ev);
+ vcl_mq_dequeue_batch (wrk, mq, maxevents - *num_ev);
svm_msg_q_unlock (mq);
handle_dequeued:
{
msg = vec_elt_at_index (wrk->mq_msg_vector, i);
e = svm_msg_q_msg_data (mq, msg);
- if (*num_ev < maxevents)
- vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
- else
- vec_add1 (wrk->unhandled_evts_vector, *e);
+ vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
svm_msg_q_free_msg (mq, msg);
}
vec_reset_length (wrk->mq_msg_vector);
vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events,
int maxevents, u32 n_evts, double wait_for_time)
{
- double wait = 0, start = 0;
+ double wait = 0, start = 0, now;
if (!n_evts)
{
if (wait == -1)
continue;
- wait = wait - (clib_time_now (&wrk->clib_time) - start);
+ now = clib_time_now (&wrk->clib_time);
+ wait -= now - start;
+ start = now;
}
while (wait > 0);
events, &n_evts);
if (n_evts == maxevents)
{
- i += 1;
- break;
+ vec_delete (wrk->unhandled_evts_vector, i + 1, 0);
+ return n_evts;
}
}
- vec_delete (wrk->unhandled_evts_vector, i, 0);
+ vec_reset_length (wrk->unhandled_evts_vector);
}
if (vcm->cfg.use_mq_eventfd)