Code Review
/
vpp.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
vcl/ldp: fix poll
[vpp.git]
/
src
/
vcl
/
vppcom.c
diff --git
a/src/vcl/vppcom.c
b/src/vcl/vppcom.c
index
7274749
..
46b1c10
100644
(file)
--- a/
src/vcl/vppcom.c
+++ b/
src/vcl/vppcom.c
@@
-1293,13
+1293,14
@@
vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
is_ct = vcl_session_is_ct (s);
mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
rx_fifo = s->rx_fifo;
is_ct = vcl_session_is_ct (s);
mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
rx_fifo = s->rx_fifo;
+ s->has_rx_evt = 0;
if (svm_fifo_is_empty (rx_fifo))
{
if (is_nonblocking)
{
svm_fifo_unset_event (rx_fifo);
if (svm_fifo_is_empty (rx_fifo))
{
if (is_nonblocking)
{
svm_fifo_unset_event (rx_fifo);
- return VPPCOM_
O
K;
+ return VPPCOM_
EWOULDBLOC
K;
}
while (svm_fifo_is_empty (rx_fifo))
{
}
while (svm_fifo_is_empty (rx_fifo))
{
@@
-1332,12
+1333,11
@@
vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
if (svm_fifo_is_empty (rx_fifo))
svm_fifo_unset_event (rx_fifo);
if (svm_fifo_is_empty (rx_fifo))
svm_fifo_unset_event (rx_fifo);
- if (is_ct &&
n_read + svm_fifo_max_dequeue (rx_fifo) == rx_fifo->nitems
)
+ if (is_ct &&
svm_fifo_want_tx_evt (rx_fifo)
)
{
{
- /* If the peer is not polling send notification */
- if (!svm_fifo_has_event (s->rx_fifo))
- app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo,
- SESSION_IO_EVT_CT_RX, SVM_Q_WAIT);
+ svm_fifo_set_want_tx_evt (s->rx_fifo, 0);
+ app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo, SESSION_IO_EVT_CT_RX,
+ SVM_Q_WAIT);
}
VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes from (%p)",
}
VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes from (%p)",
@@
-1386,13
+1386,14
@@
vppcom_session_read_segments (uint32_t session_handle,
is_ct = vcl_session_is_ct (s);
mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
rx_fifo = s->rx_fifo;
is_ct = vcl_session_is_ct (s);
mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
rx_fifo = s->rx_fifo;
+ s->has_rx_evt = 0;
if (svm_fifo_is_empty (rx_fifo))
{
if (is_nonblocking)
{
svm_fifo_unset_event (rx_fifo);
if (svm_fifo_is_empty (rx_fifo))
{
if (is_nonblocking)
{
svm_fifo_unset_event (rx_fifo);
- return VPPCOM_
O
K;
+ return VPPCOM_
EWOULDBLOC
K;
}
while (svm_fifo_is_empty (rx_fifo))
{
}
while (svm_fifo_is_empty (rx_fifo))
{
@@
-1552,7
+1553,8
@@
vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
{
svm_fifo_set_want_tx_evt (tx_fifo, 1);
svm_msg_q_lock (mq);
{
svm_fifo_set_want_tx_evt (tx_fifo, 1);
svm_msg_q_lock (mq);
- svm_msg_q_wait (mq);
+ if (svm_msg_q_is_empty (mq))
+ svm_msg_q_wait (mq);
svm_msg_q_sub_w_lock (mq, &msg);
e = svm_msg_q_msg_data (mq, &msg);
svm_msg_q_sub_w_lock (mq, &msg);
e = svm_msg_q_msg_data (mq, &msg);
@@
-2304,11
+2306,12
@@
vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
sid = e->fifo->client_session_index;
session = vcl_session_get (wrk, sid);
session_events = session->vep.ev.events;
sid = e->fifo->client_session_index;
session = vcl_session_get (wrk, sid);
session_events = session->vep.ev.events;
- if (!(EPOLLIN & session->vep.ev.events))
+ if (!(EPOLLIN & session->vep.ev.events)
|| session->has_rx_evt
)
break;
add_event = 1;
events[*num_ev].events |= EPOLLIN;
session_evt_data = session->vep.ev.data.u64;
break;
add_event = 1;
events[*num_ev].events |= EPOLLIN;
session_evt_data = session->vep.ev.data.u64;
+ session->has_rx_evt = 1;
break;
case FIFO_EVENT_APP_TX:
sid = e->fifo->client_session_index;
break;
case FIFO_EVENT_APP_TX:
sid = e->fifo->client_session_index;
@@
-2325,11
+2328,12
@@
vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
sid = session->session_index;
session_events = session->vep.ev.events;
session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
sid = session->session_index;
session_events = session->vep.ev.events;
- if (!(EPOLLIN & session->vep.ev.events))
+ if (!(EPOLLIN & session->vep.ev.events)
|| session->has_rx_evt
)
break;
add_event = 1;
events[*num_ev].events |= EPOLLIN;
session_evt_data = session->vep.ev.data.u64;
break;
add_event = 1;
events[*num_ev].events |= EPOLLIN;
session_evt_data = session->vep.ev.data.u64;
+ session->has_rx_evt = 1;
break;
case SESSION_IO_EVT_CT_RX:
session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
break;
case SESSION_IO_EVT_CT_RX:
session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
@@
-2421,6
+2425,9
@@
vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
session_event_t *e;
int i;
session_event_t *e;
int i;
+ if (vec_len (wrk->mq_msg_vector) && svm_msg_q_is_empty (mq))
+ goto handle_dequeued;
+
svm_msg_q_lock (mq);
if (svm_msg_q_is_empty (mq))
{
svm_msg_q_lock (mq);
if (svm_msg_q_is_empty (mq))
{
@@
-2445,20
+2452,18
@@
vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
vcl_mq_dequeue_batch (wrk, mq);
svm_msg_q_unlock (mq);
vcl_mq_dequeue_batch (wrk, mq);
svm_msg_q_unlock (mq);
+handle_dequeued:
for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
{
msg = vec_elt_at_index (wrk->mq_msg_vector, i);
e = svm_msg_q_msg_data (mq, msg);
for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
{
msg = vec_elt_at_index (wrk->mq_msg_vector, i);
e = svm_msg_q_msg_data (mq, msg);
- vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
+ if (*num_ev < maxevents)
+ vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
+ else
+ vec_add1 (wrk->unhandled_evts_vector, *e);
svm_msg_q_free_msg (mq, msg);
svm_msg_q_free_msg (mq, msg);
- if (*num_ev == maxevents)
- {
- i += 1;
- break;
- }
}
}
-
- vec_delete (wrk->mq_msg_vector, i, 0);
+ vec_reset_length (wrk->mq_msg_vector);
return *num_ev;
}
return *num_ev;
}
@@
-2506,6
+2511,7
@@
vppcom_epoll_wait_eventfd (vcl_worker_t * wrk, struct epoll_event *events,
u64 buf;
vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
u64 buf;
vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
+again:
n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
vec_len (wrk->mq_events), wait_for_time);
for (i = 0; i < n_mq_evts; i++)
n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
vec_len (wrk->mq_events), wait_for_time);
for (i = 0; i < n_mq_evts; i++)
@@
-2514,6
+2520,8
@@
vppcom_epoll_wait_eventfd (vcl_worker_t * wrk, struct epoll_event *events,
n_read = read (mqc->mq_fd, &buf, sizeof (buf));
vcl_epoll_wait_handle_mq (wrk, mqc->mq, events, maxevents, 0, &n_evts);
}
n_read = read (mqc->mq_fd, &buf, sizeof (buf));
vcl_epoll_wait_handle_mq (wrk, mqc->mq, events, maxevents, 0, &n_evts);
}
+ if (!n_evts && n_mq_evts > 0)
+ goto again;
return (int) n_evts;
}
return (int) n_evts;
}
@@
-3177,6
+3185,8
@@
vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
vcl_worker_t *wrk = vcl_worker_get_current ();
f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
u32 i, keep_trying = 1;
vcl_worker_t *wrk = vcl_worker_get_current ();
f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
u32 i, keep_trying = 1;
+ svm_msg_q_msg_t msg;
+ session_event_t *e;
int rv, num_ev = 0;
VDBG (3, "VCL<%d>: vp %p, nsids %u, wait_for_time %f",
int rv, num_ev = 0;
VDBG (3, "VCL<%d>: vp %p, nsids %u, wait_for_time %f",
@@
-3189,23
+3199,33
@@
vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
{
vcl_session_t *session;
{
vcl_session_t *session;
- for (i = 0; i < n_sids; i++)
+ /* Dequeue all events and drop all unhandled io events */
+ while (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0) == 0)
{
{
- ASSERT (vp[i].revents);
+ e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
+ vcl_handle_mq_event (wrk, e);
+ svm_msg_q_free_msg (wrk->app_event_queue, &msg);
+ }
+ vec_reset_length (wrk->unhandled_evts_vector);
+ for (i = 0; i < n_sids; i++)
+ {
session = vcl_session_get (wrk, vp[i].sid);
if (!session)
session = vcl_session_get (wrk, vp[i].sid);
if (!session)
- continue;
+ {
+ vp[i].revents = POLLHUP;
+ num_ev++;
+ continue;
+ }
- if (*vp[i].revents)
- *vp[i].revents = 0;
+ vp[i].revents = 0;
if (POLLIN & vp[i].events)
{
rv = vppcom_session_read_ready (session);
if (rv > 0)
{
if (POLLIN & vp[i].events)
{
rv = vppcom_session_read_ready (session);
if (rv > 0)
{
-
*
vp[i].revents |= POLLIN;
+ vp[i].revents |= POLLIN;
num_ev++;
}
else if (rv < 0)
num_ev++;
}
else if (rv < 0)
@@
-3213,11
+3233,11
@@
vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
switch (rv)
{
case VPPCOM_ECONNRESET:
switch (rv)
{
case VPPCOM_ECONNRESET:
-
*
vp[i].revents = POLLHUP;
+ vp[i].revents = POLLHUP;
break;
default:
break;
default:
-
*
vp[i].revents = POLLERR;
+ vp[i].revents = POLLERR;
break;
}
num_ev++;
break;
}
num_ev++;
@@
-3229,7
+3249,7
@@
vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
rv = vppcom_session_write_ready (session);
if (rv > 0)
{
rv = vppcom_session_write_ready (session);
if (rv > 0)
{
-
*
vp[i].revents |= POLLOUT;
+ vp[i].revents |= POLLOUT;
num_ev++;
}
else if (rv < 0)
num_ev++;
}
else if (rv < 0)
@@
-3237,11
+3257,11
@@
vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
switch (rv)
{
case VPPCOM_ECONNRESET:
switch (rv)
{
case VPPCOM_ECONNRESET:
-
*
vp[i].revents = POLLHUP;
+ vp[i].revents = POLLHUP;
break;
default:
break;
default:
-
*
vp[i].revents = POLLERR;
+ vp[i].revents = POLLERR;
break;
}
num_ev++;
break;
}
num_ev++;
@@
-3250,7
+3270,7
@@
vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
if (0) // Note "done:" label used by VCL_SESSION_LOCK_AND_GET()
{
if (0) // Note "done:" label used by VCL_SESSION_LOCK_AND_GET()
{
-
*
vp[i].revents = POLLNVAL;
+ vp[i].revents = POLLNVAL;
num_ev++;
}
}
num_ev++;
}
}
@@
-3266,7
+3286,7
@@
vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
{
clib_warning ("VCL<%d>: vp[%d].sid %d (0x%x), .events 0x%x, "
".revents 0x%x", getpid (), i, vp[i].sid, vp[i].sid,
{
clib_warning ("VCL<%d>: vp[%d].sid %d (0x%x), .events 0x%x, "
".revents 0x%x", getpid (), i, vp[i].sid, vp[i].sid,
- vp[i].events,
*
vp[i].revents);
+ vp[i].events, vp[i].revents);
}
}
return num_ev;
}
}
return num_ev;