if (svm_fifo_is_empty (rx_fifo))
svm_fifo_unset_event (rx_fifo);
- if (is_ct && n_read + svm_fifo_max_dequeue (rx_fifo) == rx_fifo->nitems)
+ if (is_ct && svm_fifo_want_tx_evt (rx_fifo))
{
- /* If the peer is not polling send notification */
- if (!svm_fifo_has_event (s->rx_fifo))
- app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo,
- SESSION_IO_EVT_CT_RX, SVM_Q_WAIT);
+ svm_fifo_set_want_tx_evt (s->rx_fifo, 0);
+ app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo, SESSION_IO_EVT_CT_RX,
+ SVM_Q_WAIT);
}
VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes from (%p)",
session_event_t *e;
int i;
+ if (vec_len (wrk->mq_msg_vector) && svm_msg_q_is_empty (mq))
+ goto handle_dequeued;
+
svm_msg_q_lock (mq);
if (svm_msg_q_is_empty (mq))
{
vcl_mq_dequeue_batch (wrk, mq);
svm_msg_q_unlock (mq);
+handle_dequeued:
for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
{
msg = vec_elt_at_index (wrk->mq_msg_vector, i);
break;
}
}
-
vec_delete (wrk->mq_msg_vector, i, 0);
return *num_ev;
u64 buf;
vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
+again:
n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
vec_len (wrk->mq_events), wait_for_time);
for (i = 0; i < n_mq_evts; i++)
n_read = read (mqc->mq_fd, &buf, sizeof (buf));
vcl_epoll_wait_handle_mq (wrk, mqc->mq, events, maxevents, 0, &n_evts);
}
+ if (!n_evts && n_mq_evts > 0)
+ goto again;
return (int) n_evts;
}