#include <stdlib.h>
#include <svm/svm_fifo_segment.h>
#include <vcl/vppcom.h>
-#include <vcl/vcl_event.h>
#include <vcl/vcl_debug.h>
#include <vcl/vcl_private.h>
vcl_wait_for_memory (session->vpp_evt_q);
rx_fifo->master_session_index = session->session_index;
tx_fifo->master_session_index = session->session_index;
+ rx_fifo->master_thread_index = vcl_get_worker_index ();
+ tx_fifo->master_thread_index = vcl_get_worker_index ();
vec_validate (wrk->vpp_event_queues, 0);
evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
wrk->vpp_event_queues[0] = evt_q;
svm_msg_q_t *);
rx_fifo->client_session_index = session->session_index;
tx_fifo->client_session_index = session->session_index;
-
+ rx_fifo->client_thread_index = vcl_get_worker_index ();
+ tx_fifo->client_thread_index = vcl_get_worker_index ();
vpp_wrk_index = tx_fifo->master_thread_index;
vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
if (mp->retval)
{
clib_warning ("VCL<%d>: ERROR: sid %u: connect failed! %U", getpid (),
- mp->handle, session_index, format_api_error,
- ntohl (mp->retval));
+ session_index, format_api_error, ntohl (mp->retval));
session->session_state = STATE_FAILED;
session->vpp_handle = mp->handle;
return session_index;
vcl_wait_for_memory (rx_fifo);
rx_fifo->client_session_index = session_index;
tx_fifo->client_session_index = session_index;
+ rx_fifo->client_thread_index = vcl_get_worker_index ();
+ tx_fifo->client_thread_index = vcl_get_worker_index ();
if (mp->client_event_queue_address)
{
return sid;
}
-int
-vcl_handle_mq_ctrl_event (vcl_worker_t * wrk, session_event_t * e)
+static int
+vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
{
session_accepted_msg_t *accepted_msg;
session_disconnected_msg_t *disconnected_msg;
switch (e->event_type)
{
case FIFO_EVENT_APP_RX:
- clib_warning ("unhandled rx: sid %u (0x%x)",
- e->fifo->client_session_index,
- e->fifo->client_session_index);
+ case FIFO_EVENT_APP_TX:
+ case SESSION_IO_EVT_CT_RX:
+ case SESSION_IO_EVT_CT_TX:
+ vec_add1 (wrk->unhandled_evts_vector, *e);
break;
case SESSION_CTRL_EVT_ACCEPTED:
accepted_msg = (session_accepted_msg_t *) e->data;
break;
}
session->session_state = STATE_DISCONNECT;
- VDBG (0, "disconnected handle 0xllx, sid %u", disconnected_msg->handle,
+ VDBG (0, "disconnected handle 0x%llx, sid %u", disconnected_msg->handle,
sid);
break;
case SESSION_CTRL_EVT_RESET:
if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0))
continue;
e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
- vcl_handle_mq_ctrl_event (wrk, e);
+ vcl_handle_mq_event (wrk, e);
svm_msg_q_free_msg (wrk->app_event_queue, &msg);
}
while (clib_time_now (&wrk->clib_time) < timeout);
u64 vpp_handle;
session = vcl_session_get_w_handle (wrk, session_handle);
+ if (!session)
+ return VPPCOM_EBADFD;
+
vpp_handle = session->vpp_handle;
state = session->session_state;
svm_fifo_segment_main_init (vcl_cfg->segment_baseva,
20 /* timeout in secs */ );
pool_init_fixed (vcm->workers, vcl_cfg->max_workers);
+ clib_spinlock_init (&vcm->workers_lock);
vcl_worker_alloc_and_init ();
}
svm_msg_q_msg_t msg;
session_event_t *e;
svm_msg_q_t *mq;
- u8 is_full;
+ u8 is_ct;
if (PREDICT_FALSE (!buf))
return VPPCOM_EINVAL;
s = vcl_session_get_w_handle (wrk, session_handle);
- if (PREDICT_FALSE (!s))
+ if (PREDICT_FALSE (!s || s->is_vep))
return VPPCOM_EBADFD;
- if (PREDICT_FALSE (s->is_vep))
- {
- clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
- "read from an epoll session!", getpid (), session_handle);
- return VPPCOM_EBADFD;
- }
-
- is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
- rx_fifo = s->rx_fifo;
-
if (PREDICT_FALSE (!vcl_session_is_readable (s)))
{
session_state_t state = s->session_state;
return rv;
}
- mq = vcl_session_is_ct (s) ? s->our_evt_q : wrk->app_event_queue;
- svm_fifo_unset_event (rx_fifo);
- is_full = svm_fifo_is_full (rx_fifo);
+ is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
+ is_ct = vcl_session_is_ct (s);
+ mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
+ rx_fifo = s->rx_fifo;
if (svm_fifo_is_empty (rx_fifo))
{
if (is_nonblocking)
{
+ svm_fifo_unset_event (rx_fifo);
return VPPCOM_OK;
}
- while (1)
+ while (svm_fifo_is_empty (rx_fifo))
{
+ svm_fifo_unset_event (rx_fifo);
svm_msg_q_lock (mq);
if (svm_msg_q_is_empty (mq))
svm_msg_q_wait (mq);
svm_msg_q_sub_w_lock (mq, &msg);
e = svm_msg_q_msg_data (mq, &msg);
svm_msg_q_unlock (mq);
- if (!vcl_is_rx_evt_for_session (e, s->session_index,
- s->our_evt_q != 0))
+ if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
{
- vcl_handle_mq_ctrl_event (wrk, e);
+ vcl_handle_mq_event (wrk, e);
svm_msg_q_free_msg (mq, &msg);
continue;
}
- svm_fifo_unset_event (rx_fifo);
svm_msg_q_free_msg (mq, &msg);
+
if (PREDICT_FALSE (s->session_state == STATE_CLOSE_ON_EMPTY))
return 0;
- if (svm_fifo_is_empty (rx_fifo))
- continue;
- break;
}
}
else
n_read = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
- if (vcl_session_is_ct (s) && is_full)
+ if (svm_fifo_is_empty (rx_fifo))
+ svm_fifo_unset_event (rx_fifo);
+
+ if (is_ct && n_read + svm_fifo_max_dequeue (rx_fifo) == rx_fifo->nitems)
{
/* If the peer is not polling send notification */
if (!svm_fifo_has_event (s->rx_fifo))
SESSION_IO_EVT_CT_RX, SVM_Q_WAIT);
}
- if (VPPCOM_DEBUG > 2)
- {
- if (n_read > 0)
- clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes "
- "from (%p)", getpid (), s->vpp_handle,
- session_handle, n_read, rx_fifo);
- else
- clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: nothing read! "
- "returning %d (%s)", getpid (), s->vpp_handle,
- session_handle, n_read, vppcom_retval_str (n_read));
- }
+ VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes from (%p)",
+ getpid (), s->vpp_handle, session_handle, n_read, rx_fifo);
+
return n_read;
}
return (vppcom_session_read_internal (session_handle, buf, n, 1));
}
+int
+vppcom_session_read_segments (uint32_t session_handle,
+ vppcom_data_segments_t ds)
+{
+ vcl_worker_t *wrk = vcl_worker_get_current ();
+ int n_read = 0, rv, is_nonblocking;
+ vcl_session_t *s = 0;
+ svm_fifo_t *rx_fifo;
+ svm_msg_q_msg_t msg;
+ session_event_t *e;
+ svm_msg_q_t *mq;
+ u8 is_ct;
+
+ s = vcl_session_get_w_handle (wrk, session_handle);
+ if (PREDICT_FALSE (!s || s->is_vep))
+ return VPPCOM_EBADFD;
+
+ if (PREDICT_FALSE (!vcl_session_is_readable (s)))
+ {
+ session_state_t state = s->session_state;
+ rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
+ return rv;
+ }
+
+ is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
+ is_ct = vcl_session_is_ct (s);
+ mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
+ rx_fifo = s->rx_fifo;
+
+ if (svm_fifo_is_empty (rx_fifo))
+ {
+ if (is_nonblocking)
+ {
+ svm_fifo_unset_event (rx_fifo);
+ return VPPCOM_OK;
+ }
+ while (svm_fifo_is_empty (rx_fifo))
+ {
+ svm_fifo_unset_event (rx_fifo);
+ svm_msg_q_lock (mq);
+ if (svm_msg_q_is_empty (mq))
+ svm_msg_q_wait (mq);
+
+ svm_msg_q_sub_w_lock (mq, &msg);
+ e = svm_msg_q_msg_data (mq, &msg);
+ svm_msg_q_unlock (mq);
+ if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
+ {
+ vcl_handle_mq_event (wrk, e);
+ svm_msg_q_free_msg (mq, &msg);
+ continue;
+ }
+ svm_msg_q_free_msg (mq, &msg);
+
+ if (PREDICT_FALSE (s->session_state == STATE_CLOSE_ON_EMPTY))
+ return 0;
+ }
+ }
+
+ n_read = svm_fifo_segments (rx_fifo, (svm_fifo_segment_t *) ds);
+ svm_fifo_unset_event (rx_fifo);
+
+ if (is_ct && n_read + svm_fifo_max_dequeue (rx_fifo) == rx_fifo->nitems)
+ {
+ /* If the peer is not polling send notification */
+ if (!svm_fifo_has_event (s->rx_fifo))
+ app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo,
+ SESSION_IO_EVT_CT_RX, SVM_Q_WAIT);
+ }
+
+ return n_read;
+}
+
+void
+vppcom_session_free_segments (uint32_t session_handle,
+ vppcom_data_segments_t ds)
+{
+ vcl_worker_t *wrk = vcl_worker_get_current ();
+ vcl_session_t *s;
+
+ s = vcl_session_get_w_handle (wrk, session_handle);
+ if (PREDICT_FALSE (!s || s->is_vep))
+ return;
+
+ svm_fifo_segments_free (s->rx_fifo, (svm_fifo_segment_t *) ds);
+}
+
static inline int
vppcom_session_read_ready (vcl_session_t * session)
{
return svm_fifo_max_dequeue (session->rx_fifo);
}
+int
+vppcom_data_segment_copy (void *buf, vppcom_data_segments_t ds, u32 max_bytes)
+{
+ u32 first_copy = clib_min (ds[0].len, max_bytes);
+ clib_memcpy (buf, ds[0].data, first_copy);
+ if (first_copy < max_bytes)
+ {
+ clib_memcpy (buf + first_copy, ds[1].data,
+ clib_min (ds[1].len, max_bytes - first_copy));
+ }
+ return 0;
+}
+
static u8
vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
{
svm_msg_q_msg_t msg;
session_event_t *e;
svm_msg_q_t *mq;
+ u8 is_ct;
if (PREDICT_FALSE (!buf))
return VPPCOM_EINVAL;
if (PREDICT_FALSE (!s))
return VPPCOM_EBADFD;
- tx_fifo = s->tx_fifo;
- is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
-
if (PREDICT_FALSE (s->is_vep))
{
clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
return VPPCOM_EBADFD;
}
- if (!(s->session_state & STATE_OPEN))
+ if (PREDICT_FALSE (!(s->session_state & STATE_OPEN)))
{
session_state_t state = s->session_state;
rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open! "
- "state 0x%x (%s)",
- getpid (), s->vpp_handle, session_handle,
+ "state 0x%x (%s)", getpid (), s->vpp_handle, session_handle,
state, vppcom_session_state_str (state));
return rv;
}
- mq = vcl_session_is_ct (s) ? s->our_evt_q : wrk->app_event_queue;
+ tx_fifo = s->tx_fifo;
+ is_ct = vcl_session_is_ct (s);
+ is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
+ mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
if (svm_fifo_is_full (tx_fifo))
{
if (is_nonblocking)
}
while (svm_fifo_is_full (tx_fifo))
{
+ svm_fifo_set_want_tx_evt (tx_fifo, 1);
svm_msg_q_lock (mq);
- while (svm_msg_q_is_empty (mq) && svm_msg_q_timedwait (mq, 10e-6))
- ;
+ svm_msg_q_wait (mq);
+
svm_msg_q_sub_w_lock (mq, &msg);
e = svm_msg_q_msg_data (mq, &msg);
svm_msg_q_unlock (mq);
- if (!vcl_is_tx_evt_for_session (e, s->session_index,
- s->our_evt_q != 0))
- vcl_handle_mq_ctrl_event (wrk, e);
+ if (!vcl_is_tx_evt_for_session (e, s->session_index, is_ct))
+ vcl_handle_mq_event (wrk, e);
svm_msg_q_free_msg (mq, &msg);
}
}
ASSERT (n_write > 0);
- if (VPPCOM_DEBUG > 2)
- {
- if (n_write <= 0)
- clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
- "FIFO-FULL (%p)", getpid (), s->vpp_handle,
- session_handle, tx_fifo);
- else
- clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
- "wrote %d bytes tx-fifo: (%p)", getpid (),
- s->vpp_handle, session_handle, n_write, tx_fifo);
- }
+ VDBG (2, "VCL<%d>: vpp handle 0x%llx, sid %u: wrote %d bytes", getpid (),
+ s->vpp_handle, session_handle, n_write);
+
return n_write;
}
return n_msgs;
}
-static int
-vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
- unsigned long n_bits, unsigned long *read_map,
- unsigned long *write_map, unsigned long *except_map,
- double time_to_wait, u32 * bits_set)
+#define vcl_fifo_rx_evt_valid_or_break(_fifo) \
+if (PREDICT_FALSE (svm_fifo_is_empty (_fifo))) \
+ { \
+ svm_fifo_unset_event (_fifo); \
+ if (svm_fifo_is_empty (_fifo)) \
+ break; \
+ } \
+
+static void
+vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
+ unsigned long n_bits, unsigned long *read_map,
+ unsigned long *write_map,
+ unsigned long *except_map, u32 * bits_set)
{
session_disconnected_msg_t *disconnected_msg;
session_connected_msg_t *connected_msg;
session_accepted_msg_t *accepted_msg;
vcl_session_msg_t *vcl_msg;
vcl_session_t *session;
+ u64 handle;
+ u32 sid;
+
+ switch (e->event_type)
+ {
+ case FIFO_EVENT_APP_RX:
+ vcl_fifo_rx_evt_valid_or_break (e->fifo);
+ sid = e->fifo->client_session_index;
+ session = vcl_session_get (wrk, sid);
+ if (!session)
+ break;
+ if (sid < n_bits && read_map)
+ {
+ clib_bitmap_set_no_check (read_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ case FIFO_EVENT_APP_TX:
+ sid = e->fifo->client_session_index;
+ session = vcl_session_get (wrk, sid);
+ if (!session)
+ break;
+ if (sid < n_bits && write_map)
+ {
+ clib_bitmap_set_no_check (write_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ case SESSION_IO_EVT_CT_TX:
+ vcl_fifo_rx_evt_valid_or_break (e->fifo);
+ session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
+ if (!session)
+ break;
+ sid = session->session_index;
+ if (sid < n_bits && read_map)
+ {
+ clib_bitmap_set_no_check (read_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ case SESSION_IO_EVT_CT_RX:
+ session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
+ if (!session)
+ break;
+ sid = session->session_index;
+ if (sid < n_bits && write_map)
+ {
+ clib_bitmap_set_no_check (write_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ case SESSION_CTRL_EVT_ACCEPTED:
+ accepted_msg = (session_accepted_msg_t *) e->data;
+ handle = accepted_msg->listener_handle;
+ session = vcl_session_table_lookup_listener (wrk, handle);
+ if (!session)
+ {
+ clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
+ "listener handle %llx", getpid (), handle);
+ break;
+ }
+
+ clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
+ vcl_msg->accepted_msg = *accepted_msg;
+ sid = session->session_index;
+ if (sid < n_bits && read_map)
+ {
+ clib_bitmap_set_no_check (read_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ case SESSION_CTRL_EVT_CONNECTED:
+ connected_msg = (session_connected_msg_t *) e->data;
+ vcl_session_connected_handler (wrk, connected_msg);
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED:
+ disconnected_msg = (session_disconnected_msg_t *) e->data;
+ sid = vcl_session_index_from_vpp_handle (wrk, disconnected_msg->handle);
+ if (sid < n_bits && except_map)
+ {
+ clib_bitmap_set_no_check (except_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ case SESSION_CTRL_EVT_RESET:
+ sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
+ if (sid < n_bits && except_map)
+ {
+ clib_bitmap_set_no_check (except_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
+ default:
+ clib_warning ("unhandled: %u", e->event_type);
+ break;
+ }
+}
+
+static int
+vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
+ unsigned long n_bits, unsigned long *read_map,
+ unsigned long *write_map, unsigned long *except_map,
+ double time_to_wait, u32 * bits_set)
+{
svm_msg_q_msg_t *msg;
session_event_t *e;
- u32 i, sid;
- u64 handle;
+ u32 i;
svm_msg_q_lock (mq);
if (svm_msg_q_is_empty (mq))
{
msg = vec_elt_at_index (wrk->mq_msg_vector, i);
e = svm_msg_q_msg_data (mq, msg);
- switch (e->event_type)
- {
- case FIFO_EVENT_APP_RX:
- sid = e->fifo->client_session_index;
- session = vcl_session_get (wrk, sid);
- if (!session)
- break;
- if (sid < n_bits && read_map)
- {
- clib_bitmap_set_no_check (read_map, sid, 1);
- *bits_set += 1;
- }
- break;
- case FIFO_EVENT_APP_TX:
- sid = e->fifo->client_session_index;
- session = vcl_session_get (wrk, sid);
- if (!session)
- break;
- if (sid < n_bits && write_map)
- {
- clib_bitmap_set_no_check (write_map, sid, 1);
- *bits_set += 1;
- }
- break;
- case SESSION_IO_EVT_CT_TX:
- if (svm_fifo_is_empty (e->fifo))
- {
- svm_fifo_unset_event (e->fifo);
- if (svm_fifo_is_empty (e->fifo))
- break;
- }
- session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
- if (!session)
- break;
- sid = session->session_index;
- if (sid < n_bits && read_map)
- {
- clib_bitmap_set_no_check (read_map, sid, 1);
- *bits_set += 1;
- }
- break;
- case SESSION_IO_EVT_CT_RX:
- session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
- if (!session)
- break;
- sid = session->session_index;
- if (sid < n_bits && write_map)
- {
- clib_bitmap_set_no_check (write_map, sid, 1);
- *bits_set += 1;
- }
- break;
- case SESSION_CTRL_EVT_ACCEPTED:
- accepted_msg = (session_accepted_msg_t *) e->data;
- handle = accepted_msg->listener_handle;
- session = vcl_session_table_lookup_listener (wrk, handle);
- if (!session)
- {
- clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
- "listener handle %llx", getpid (), handle);
- break;
- }
-
- clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
- vcl_msg->accepted_msg = *accepted_msg;
- sid = session->session_index;
- if (sid < n_bits && read_map)
- {
- clib_bitmap_set_no_check (read_map, sid, 1);
- *bits_set += 1;
- }
- break;
- case SESSION_CTRL_EVT_CONNECTED:
- connected_msg = (session_connected_msg_t *) e->data;
- vcl_session_connected_handler (wrk, connected_msg);
- break;
- case SESSION_CTRL_EVT_DISCONNECTED:
- disconnected_msg = (session_disconnected_msg_t *) e->data;
- sid = vcl_session_index_from_vpp_handle (wrk,
- disconnected_msg->handle);
- if (sid < n_bits && except_map)
- {
- clib_bitmap_set_no_check (except_map, sid, 1);
- *bits_set += 1;
- }
- break;
- case SESSION_CTRL_EVT_RESET:
- sid = vcl_session_reset_handler (wrk,
- (session_reset_msg_t *) e->data);
- if (sid < n_bits && except_map)
- {
- clib_bitmap_set_no_check (except_map, sid, 1);
- *bits_set += 1;
- }
- break;
- default:
- clib_warning ("unhandled: %u", e->event_type);
- break;
- }
+ vcl_select_handle_mq_event (wrk, e, n_bits, read_map, write_map,
+ except_map, bits_set);
svm_msg_q_free_msg (mq, msg);
}
-
vec_reset_length (wrk->mq_msg_vector);
return *bits_set;
}
u32 sid, minbits = clib_max (n_bits, BITS (uword)), bits_set = 0;
vcl_worker_t *wrk = vcl_worker_get_current ();
vcl_session_t *session = 0;
- int rv;
+ int rv, i;
ASSERT (sizeof (clib_bitmap_t) == sizeof (long int));
check_mq:
+ for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
+ {
+ vcl_select_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i], n_bits,
+ read_map, write_map, except_map, &bits_set);
+ }
+ vec_reset_length (wrk->unhandled_evts_vector);
+
if (vcm->cfg.use_mq_eventfd)
vppcom_select_eventfd (wrk, n_bits, read_map, write_map, except_map,
time_to_wait, &bits_set);
return rv;
}
-static int
-vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
- struct epoll_event *events, u32 maxevents,
- double wait_for_time, u32 * num_ev)
+static inline void
+vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
+ struct epoll_event *events, u32 * num_ev)
{
session_disconnected_msg_t *disconnected_msg;
session_connected_msg_t *connected_msg;
u32 sid = ~0, session_events;
vcl_session_msg_t *vcl_msg;
vcl_session_t *session;
+ u8 add_event = 0;
+
+ switch (e->event_type)
+ {
+ case FIFO_EVENT_APP_RX:
+ ASSERT (e->fifo->client_thread_index == vcl_get_worker_index ());
+ vcl_fifo_rx_evt_valid_or_break (e->fifo);
+ sid = e->fifo->client_session_index;
+ session = vcl_session_get (wrk, sid);
+ session_events = session->vep.ev.events;
+ if (!(EPOLLIN & session->vep.ev.events))
+ break;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLIN;
+ session_evt_data = session->vep.ev.data.u64;
+ break;
+ case FIFO_EVENT_APP_TX:
+ sid = e->fifo->client_session_index;
+ session = vcl_session_get (wrk, sid);
+ session_events = session->vep.ev.events;
+ if (!(EPOLLOUT & session_events))
+ break;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLOUT;
+ session_evt_data = session->vep.ev.data.u64;
+ break;
+ case SESSION_IO_EVT_CT_TX:
+ vcl_fifo_rx_evt_valid_or_break (e->fifo);
+ session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
+ sid = session->session_index;
+ session_events = session->vep.ev.events;
+ if (!(EPOLLIN & session->vep.ev.events))
+ break;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLIN;
+ session_evt_data = session->vep.ev.data.u64;
+ break;
+ case SESSION_IO_EVT_CT_RX:
+ session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
+ sid = session->session_index;
+ session_events = session->vep.ev.events;
+ if (!(EPOLLOUT & session_events))
+ break;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLOUT;
+ session_evt_data = session->vep.ev.data.u64;
+ break;
+ case SESSION_CTRL_EVT_ACCEPTED:
+ accepted_msg = (session_accepted_msg_t *) e->data;
+ handle = accepted_msg->listener_handle;
+ session = vcl_session_table_lookup_listener (wrk, handle);
+ if (!session)
+ {
+ clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
+ "listener handle %llx", getpid (), handle);
+ break;
+ }
+
+ clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
+ vcl_msg->accepted_msg = *accepted_msg;
+ session_events = session->vep.ev.events;
+ if (!(EPOLLIN & session_events))
+ break;
+
+ add_event = 1;
+ events[*num_ev].events |= EPOLLIN;
+ session_evt_data = session->vep.ev.data.u64;
+ break;
+ case SESSION_CTRL_EVT_CONNECTED:
+ connected_msg = (session_connected_msg_t *) e->data;
+ vcl_session_connected_handler (wrk, connected_msg);
+ /* Generate EPOLLOUT because there's no connected event */
+ sid = vcl_session_index_from_vpp_handle (wrk, connected_msg->handle);
+ session = vcl_session_get (wrk, sid);
+ session_events = session->vep.ev.events;
+ if (EPOLLOUT & session_events)
+ {
+ add_event = 1;
+ events[*num_ev].events |= EPOLLOUT;
+ session_evt_data = session->vep.ev.data.u64;
+ }
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED:
+ disconnected_msg = (session_disconnected_msg_t *) e->data;
+ sid = vcl_session_index_from_vpp_handle (wrk, disconnected_msg->handle);
+ if (!(session = vcl_session_get (wrk, sid)))
+ break;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
+ session_evt_data = session->vep.ev.data.u64;
+ session_events = session->vep.ev.events;
+ break;
+ case SESSION_CTRL_EVT_RESET:
+ sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
+ if (!(session = vcl_session_get (wrk, sid)))
+ break;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
+ session_evt_data = session->vep.ev.data.u64;
+ session_events = session->vep.ev.events;
+ break;
+ default:
+ VDBG (0, "unhandled: %u", e->event_type);
+ break;
+ }
+
+ if (add_event)
+ {
+ events[*num_ev].data.u64 = session_evt_data;
+ if (EPOLLONESHOT & session_events)
+ {
+ session = vcl_session_get (wrk, sid);
+ session->vep.ev.events = 0;
+ }
+ *num_ev += 1;
+ }
+}
+
+static int
+vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
+ struct epoll_event *events, u32 maxevents,
+ double wait_for_time, u32 * num_ev)
+{
svm_msg_q_msg_t *msg;
session_event_t *e;
- u8 add_event;
int i;
svm_msg_q_lock (mq);
{
msg = vec_elt_at_index (wrk->mq_msg_vector, i);
e = svm_msg_q_msg_data (mq, msg);
- add_event = 0;
- switch (e->event_type)
- {
- case FIFO_EVENT_APP_RX:
- sid = e->fifo->client_session_index;
- session = vcl_session_get (wrk, sid);
- session_events = session->vep.ev.events;
- if (!(EPOLLIN & session->vep.ev.events))
- break;
- add_event = 1;
- events[*num_ev].events |= EPOLLIN;
- session_evt_data = session->vep.ev.data.u64;
- break;
- case FIFO_EVENT_APP_TX:
- sid = e->fifo->client_session_index;
- session = vcl_session_get (wrk, sid);
- session_events = session->vep.ev.events;
- if (!(EPOLLOUT & session_events))
- break;
- add_event = 1;
- events[*num_ev].events |= EPOLLOUT;
- session_evt_data = session->vep.ev.data.u64;
- break;
- case SESSION_IO_EVT_CT_TX:
- session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 0);
- sid = session->session_index;
- session_events = session->vep.ev.events;
- if (!(EPOLLIN & session->vep.ev.events))
- break;
- add_event = 1;
- events[*num_ev].events |= EPOLLIN;
- session_evt_data = session->vep.ev.data.u64;
- break;
- case SESSION_IO_EVT_CT_RX:
- session = vcl_ct_session_get_from_fifo (wrk, e->fifo, 1);
- sid = session->session_index;
- session_events = session->vep.ev.events;
- if (!(EPOLLOUT & session_events))
- break;
- add_event = 1;
- events[*num_ev].events |= EPOLLOUT;
- session_evt_data = session->vep.ev.data.u64;
- break;
- case SESSION_CTRL_EVT_ACCEPTED:
- accepted_msg = (session_accepted_msg_t *) e->data;
- handle = accepted_msg->listener_handle;
- session = vcl_session_table_lookup_listener (wrk, handle);
- if (!session)
- {
- clib_warning ("VCL<%d>: ERROR: couldn't find listen session:"
- "listener handle %llx", getpid (), handle);
- break;
- }
-
- clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
- vcl_msg->accepted_msg = *accepted_msg;
- session_events = session->vep.ev.events;
- if (!(EPOLLIN & session_events))
- break;
-
- add_event = 1;
- events[*num_ev].events |= EPOLLIN;
- session_evt_data = session->vep.ev.data.u64;
- break;
- case SESSION_CTRL_EVT_CONNECTED:
- connected_msg = (session_connected_msg_t *) e->data;
- vcl_session_connected_handler (wrk, connected_msg);
- /* Generate EPOLLOUT because there's no connected event */
- sid = vcl_session_index_from_vpp_handle (wrk,
- connected_msg->handle);
- session = vcl_session_get (wrk, sid);
- session_events = session->vep.ev.events;
- if (EPOLLOUT & session_events)
- {
- add_event = 1;
- events[*num_ev].events |= EPOLLOUT;
- session_evt_data = session->vep.ev.data.u64;
- }
- break;
- case SESSION_CTRL_EVT_DISCONNECTED:
- disconnected_msg = (session_disconnected_msg_t *) e->data;
- sid = vcl_session_index_from_vpp_handle (wrk,
- disconnected_msg->handle);
- if (!(session = vcl_session_get (wrk, sid)))
- break;
- add_event = 1;
- events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
- session_evt_data = session->vep.ev.data.u64;
- session_events = session->vep.ev.events;
- break;
- case SESSION_CTRL_EVT_RESET:
- sid = vcl_session_reset_handler (wrk,
- (session_reset_msg_t *) e->data);
- if (!(session = vcl_session_get (wrk, sid)))
- break;
- add_event = 1;
- events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
- session_evt_data = session->vep.ev.data.u64;
- session_events = session->vep.ev.events;
- break;
- default:
- VDBG (0, "unhandled: %u", e->event_type);
- svm_msg_q_free_msg (mq, msg);
- continue;
- }
+ vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
svm_msg_q_free_msg (mq, msg);
-
- if (add_event)
+ if (*num_ev == maxevents)
{
- events[*num_ev].data.u64 = session_evt_data;
- if (EPOLLONESHOT & session_events)
- {
- session = vcl_session_get (wrk, sid);
- session->vep.ev.events = 0;
- }
- *num_ev += 1;
- if (*num_ev == maxevents)
- break;
+ i += 1;
+ break;
}
}
- vec_reset_length (wrk->mq_msg_vector);
+ vec_delete (wrk->mq_msg_vector, i, 0);
+
return *num_ev;
}
static int
vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events,
- int maxevents, double wait_for_time)
+ int maxevents, u32 n_evts, double wait_for_time)
{
vcl_cut_through_registration_t *cr;
double total_wait = 0, wait_slice;
- u32 num_ev = 0;
int rv;
wait_for_time = (wait_for_time == -1) ? (double) 10e9 : wait_for_time;
vcl_ct_registration_lock (wrk);
/* *INDENT-OFF* */
pool_foreach (cr, wrk->cut_through_registrations, ({
- vcl_epoll_wait_handle_mq (wrk, cr->mq, events, maxevents, 0, &num_ev);
+ vcl_epoll_wait_handle_mq (wrk, cr->mq, events, maxevents, 0, &n_evts);
}));
/* *INDENT-ON* */
vcl_ct_registration_unlock (wrk);
rv = vcl_epoll_wait_handle_mq (wrk, wrk->app_event_queue, events,
- maxevents, num_ev ? 0 : wait_slice,
- &num_ev);
+ maxevents, n_evts ? 0 : wait_slice,
+ &n_evts);
if (rv)
total_wait += wait_slice;
- if (num_ev)
- return num_ev;
+ if (n_evts)
+ return n_evts;
}
while (total_wait < wait_for_time);
- return (int) num_ev;
+ return n_evts;
}
static int
vppcom_epoll_wait_eventfd (vcl_worker_t * wrk, struct epoll_event *events,
- int maxevents, double wait_for_time)
+ int maxevents, u32 n_evts, double wait_for_time)
{
vcl_mq_evt_conn_t *mqc;
int __clib_unused n_read;
int n_mq_evts, i;
- u32 n_evts = 0;
u64 buf;
vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
{
vcl_worker_t *wrk = vcl_worker_get_current ();
vcl_session_t *vep_session;
+ u32 n_evts = 0;
+ int i;
if (PREDICT_FALSE (maxevents <= 0))
{
memset (events, 0, sizeof (*events) * maxevents);
+ if (vec_len (wrk->unhandled_evts_vector))
+ {
+ for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
+ {
+ vcl_epoll_wait_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i],
+ events, &n_evts);
+ if (n_evts == maxevents)
+ {
+ i += 1;
+ break;
+ }
+ }
+
+ vec_delete (wrk->unhandled_evts_vector, i, 0);
+ }
+
if (vcm->cfg.use_mq_eventfd)
- return vppcom_epoll_wait_eventfd (wrk, events, maxevents, wait_for_time);
+ return vppcom_epoll_wait_eventfd (wrk, events, maxevents, n_evts,
+ wait_for_time);
- return vppcom_epoll_wait_condvar (wrk, events, maxevents, wait_for_time);
+ return vppcom_epoll_wait_condvar (wrk, events, maxevents, n_evts,
+ wait_for_time);
}
int
int
vppcom_worker_register (void)
{
- if (!vcl_worker_alloc_and_init ())
+ if (vcl_worker_alloc_and_init ())
return VPPCOM_OK;
return VPPCOM_EEXIST;
}