is_ct = vcl_session_is_ct (s);
mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
rx_fifo = s->rx_fifo;
+ s->has_rx_evt = 0;
if (svm_fifo_is_empty (rx_fifo))
{
if (is_nonblocking)
{
svm_fifo_unset_event (rx_fifo);
- return VPPCOM_OK;
+ return VPPCOM_EWOULDBLOCK;
}
while (svm_fifo_is_empty (rx_fifo))
{
is_ct = vcl_session_is_ct (s);
mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
rx_fifo = s->rx_fifo;
+ s->has_rx_evt = 0;
if (svm_fifo_is_empty (rx_fifo))
{
if (is_nonblocking)
{
svm_fifo_unset_event (rx_fifo);
- return VPPCOM_OK;
+ return VPPCOM_EWOULDBLOCK;
}
while (svm_fifo_is_empty (rx_fifo))
{
{
svm_fifo_set_want_tx_evt (tx_fifo, 1);
svm_msg_q_lock (mq);
- svm_msg_q_wait (mq);
+ if (svm_msg_q_is_empty (mq))
+ svm_msg_q_wait (mq);
svm_msg_q_sub_w_lock (mq, &msg);
e = svm_msg_q_msg_data (mq, &msg);
sid = e->fifo->client_session_index;
session = vcl_session_get (wrk, sid);
session_events = session->vep.ev.events;
- if (!(EPOLLIN & session->vep.ev.events))
+ if (!(EPOLLIN & session->vep.ev.events) || session->has_rx_evt)
break;
add_event = 1;
events[*num_ev].events |= EPOLLIN;
session_evt_data = session->vep.ev.data.u64;
+ session->has_rx_evt = 1;
break;
case FIFO_EVENT_APP_TX:
sid = e->fifo->client_session_index;
session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
sid = session->session_index;
session_events = session->vep.ev.events;
- if (!(EPOLLIN & session->vep.ev.events))
+ if (!(EPOLLIN & session->vep.ev.events) || session->has_rx_evt)
break;
add_event = 1;
events[*num_ev].events |= EPOLLIN;
session_evt_data = session->vep.ev.data.u64;
+ session->has_rx_evt = 1;
break;
case SESSION_IO_EVT_CT_RX:
session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
{
msg = vec_elt_at_index (wrk->mq_msg_vector, i);
e = svm_msg_q_msg_data (mq, msg);
- vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
+ if (*num_ev < maxevents)
+ vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
+ else
+ vec_add1 (wrk->unhandled_evts_vector, *e);
svm_msg_q_free_msg (mq, msg);
- if (*num_ev == maxevents)
- {
- i += 1;
- break;
- }
}
- vec_delete (wrk->mq_msg_vector, i, 0);
+ vec_reset_length (wrk->mq_msg_vector);
return *num_ev;
}
return 0;
}
-void
-mq_send_local_session_disconnected_cb (u32 app_wrk_index,
- local_session_t * ls)
-{
- app_worker_t *app_wrk = app_worker_get (app_wrk_index);
- svm_msg_q_msg_t _msg, *msg = &_msg;
- session_disconnected_msg_t *mp;
- svm_msg_q_t *app_mq;
- session_event_t *evt;
- application_t *app;
-
- app = application_get (app_wrk->app_index);
- app_mq = app_wrk->event_queue;
- svm_msg_q_lock_and_alloc_msg_w_ring (app_mq, SESSION_MQ_CTRL_EVT_RING,
- SVM_Q_WAIT, msg);
- evt = svm_msg_q_msg_data (app_mq, msg);
- memset (evt, 0, sizeof (*evt));
- evt->event_type = SESSION_CTRL_EVT_DISCONNECTED;
- mp = (session_disconnected_msg_t *) evt->data;
- mp->handle = application_local_session_handle (ls);
- mp->context = app->api_client_index;
- svm_msg_q_add_and_unlock (app_mq, msg);
-}
-
static void
send_session_disconnect_callback (stream_session_t * s)
{
svm_msg_q_add_and_unlock (app_mq, msg);
}
+void
+mq_send_local_session_disconnected_cb (u32 app_wrk_index,
+ local_session_t * ls)
+{
+ app_worker_t *app_wrk = app_worker_get (app_wrk_index);
+ svm_msg_q_msg_t _msg, *msg = &_msg;
+ session_disconnected_msg_t *mp;
+ svm_msg_q_t *app_mq;
+ session_event_t *evt;
+ application_t *app;
+
+ app = application_get (app_wrk->app_index);
+ app_mq = app_wrk->event_queue;
+ if (mq_try_lock_and_alloc_msg (app_mq, msg))
+ return;
+ evt = svm_msg_q_msg_data (app_mq, msg);
+ memset (evt, 0, sizeof (*evt));
+ evt->event_type = SESSION_CTRL_EVT_DISCONNECTED;
+ mp = (session_disconnected_msg_t *) evt->data;
+ mp->handle = application_local_session_handle (ls);
+ mp->context = app->api_client_index;
+ svm_msg_q_add_and_unlock (app_mq, msg);
+}
+
static void
mq_send_session_reset_cb (stream_session_t * s)
{