vcl_wait_for_memory (void *mem)
{
u8 __clib_unused test;
+ if (vcm->mounting_segment)
+ {
+ while (vcm->mounting_segment)
+ ;
+ return;
+ }
if (1 || vcm->debug)
{
- sleep (1);
+ usleep (1e5);
return;
}
if (signal (SIGSEGV, sigsegv_signal))
return VPPCOM_ETIMEDOUT;
}
-static u32
-vcl_ct_registration_add (svm_msg_q_t * mq, u32 sid)
+static svm_msg_q_t *
+vcl_session_vpp_evt_q (vcl_session_t * s)
{
- vcl_cut_through_registration_t *cr;
- pool_get (vcm->cut_through_registrations, cr);
- cr->mq = mq;
- cr->sid = sid;
- return (cr - vcm->cut_through_registrations);
+ if (vcl_session_is_ct (s))
+ return vcm->vpp_event_queues[0];
+ else
+ return vcm->vpp_event_queues[s->tx_fifo->master_thread_index];
}
static void
-vcl_ct_registration_del (u32 ct_index)
+vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
+ session_handle_t handle, int retval)
{
- pool_put_index (vcm->cut_through_registrations, ct_index);
+ app_session_evt_t _app_evt, *app_evt = &_app_evt;
+ session_accepted_reply_msg_t *rmp;
+ app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_ACCEPTED_REPLY);
+ rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
+ rmp->handle = handle;
+ rmp->context = context;
+ rmp->retval = retval;
+ app_send_ctrl_evt_to_vpp (mq, app_evt);
}
-static vcl_session_t *
-vcl_ct_session_get_from_fifo (svm_fifo_t * f, u8 type)
+static void
+vcl_send_session_disconnected_reply (svm_msg_q_t * mq, u32 context,
+ session_handle_t handle, int retval)
{
- vcl_session_t *s;
- s = vcl_session_get (f->client_session_index);
- if (s)
- {
- /* rx fifo */
- if (type == 0 && s->rx_fifo == f)
- return s;
- /* tx fifo */
- if (type == 1 && s->tx_fifo == f)
- return s;
- }
- s = vcl_session_get (f->master_session_index);
- if (s)
- {
- if (type == 0 && s->rx_fifo == f)
- return s;
- if (type == 1 && s->tx_fifo == f)
- return s;
- }
- return 0;
+ app_session_evt_t _app_evt, *app_evt = &_app_evt;
+ session_disconnected_reply_msg_t *rmp;
+ app_alloc_ctrl_evt_to_vpp (mq, app_evt,
+ SESSION_CTRL_EVT_DISCONNECTED_REPLY);
+ rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
+ rmp->handle = handle;
+ rmp->context = context;
+ rmp->retval = retval;
+ app_send_ctrl_evt_to_vpp (mq, app_evt);
}
static void
-vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
- session_handle_t handle, int retval)
+vcl_send_session_reset_reply (svm_msg_q_t * mq, u32 context,
+ session_handle_t handle, int retval)
{
app_session_evt_t _app_evt, *app_evt = &_app_evt;
- session_accepted_reply_msg_t *rmp;
- app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_ACCEPTED_REPLY);
- rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
+ session_reset_reply_msg_t *rmp;
+ app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_RESET_REPLY);
+ rmp = (session_reset_reply_msg_t *) app_evt->evt->data;
rmp->handle = handle;
rmp->context = context;
rmp->retval = retval;
{
vcl_session_t *session, *listen_session;
svm_fifo_t *rx_fifo, *tx_fifo;
- u32 session_index;
+ u32 session_index, vpp_wrk_index;
+ svm_msg_q_t *evt_q;
VCL_SESSION_LOCK ();
+ session = vcl_session_alloc ();
+ session_index = vcl_session_index (session);
+
listen_session = vppcom_session_table_lookup_listener (mp->listener_handle);
if (!listen_session)
{
getpid (), mp->listener_handle);
vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle,
VNET_API_ERROR_INVALID_ARGUMENT);
+ vcl_session_free (session);
+ VCL_SESSION_UNLOCK ();
return VCL_INVALID_SESSION_INDEX;
}
- pool_get (vcm->sessions, session);
- memset (session, 0, sizeof (*session));
- session_index = (u32) (session - vcm->sessions);
-
rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
session->our_evt_q = uword_to_pointer (mp->server_event_queue_address,
svm_msg_q_t *);
vcl_wait_for_memory (session->vpp_evt_q);
- session->ct_registration = vcl_ct_registration_add (session->our_evt_q,
- session_index);
rx_fifo->master_session_index = session_index;
tx_fifo->master_session_index = session_index;
+ vec_validate (vcm->vpp_event_queues, 0);
+ evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
+ vcm->vpp_event_queues[0] = evt_q;
}
else
{
svm_msg_q_t *);
rx_fifo->client_session_index = session_index;
tx_fifo->client_session_index = session_index;
+
+ vpp_wrk_index = tx_fifo->master_thread_index;
+ vec_validate (vcm->vpp_event_queues, vpp_wrk_index);
+ vcm->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
}
session->vpp_handle = mp->handle;
hash_set (vcm->session_index_by_vpp_handles, mp->handle, session_index);
session->transport.lcl_port = listen_session->transport.lcl_port;
session->transport.lcl_ip = listen_session->transport.lcl_ip;
+ session->session_type = listen_session->session_type;
+ session->is_dgram = session->session_type == VPPCOM_PROTO_UDP;
VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: client accept request from %s"
" address %U port %d queue %p!", getpid (), mp->handle, session_index,
static u32
vcl_session_connected_handler (session_connected_msg_t * mp)
{
- vcl_session_t *session = 0;
- u32 session_index;
+ u32 session_index, vpp_wrk_index;
svm_fifo_t *rx_fifo, *tx_fifo;
+ vcl_session_t *session = 0;
+ svm_msg_q_t *evt_q;
int rv = VPPCOM_OK;
session_index = mp->context;
if (rv)
goto done_unlock;
+ rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
+ tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
+ vcl_wait_for_memory (rx_fifo);
+ rx_fifo->client_session_index = session_index;
+ tx_fifo->client_session_index = session_index;
+
if (mp->client_event_queue_address)
{
session->vpp_evt_q = uword_to_pointer (mp->server_event_queue_address,
svm_msg_q_t *);
session->our_evt_q = uword_to_pointer (mp->client_event_queue_address,
svm_msg_q_t *);
- vcl_wait_for_memory (session->vpp_evt_q);
- session->ct_registration = vcl_ct_registration_add (session->our_evt_q,
- session_index);
+
+ vec_validate (vcm->vpp_event_queues, 0);
+ evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
+ vcm->vpp_event_queues[0] = evt_q;
}
else
- session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
- svm_msg_q_t *);
-
- rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
- rx_fifo->client_session_index = session_index;
- tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
- tx_fifo->client_session_index = session_index;
+ {
+ session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
+ svm_msg_q_t *);
+ vpp_wrk_index = tx_fifo->master_thread_index;
+ vec_validate (vcm->vpp_event_queues, vpp_wrk_index);
+ vcm->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
+ }
session->rx_fifo = rx_fifo;
session->tx_fifo = tx_fifo;
return session_index;
}
+static u32
+vcl_reset_handler (session_reset_msg_t * reset_msg)
+{
+ vcl_session_t *session;
+ u32 sid;
+
+ sid = vcl_session_get_index_from_handle (reset_msg->handle);
+ session = vcl_session_get (sid);
+ if (!session)
+ {
+ VDBG (0, "request to reset unknown handle 0x%llx", reset_msg->handle);
+ return VCL_INVALID_SESSION_INDEX;
+ }
+ session->session_state = STATE_CLOSE_ON_EMPTY;
+ VDBG (0, "reset handle 0x%llx, sid %u ", reset_msg->handle, sid);
+ vcl_send_session_reset_reply (vcl_session_vpp_evt_q (session),
+ vcm->my_client_index, reset_msg->handle, 0);
+ return sid;
+}
+
int
vcl_handle_mq_ctrl_event (session_event_t * e)
{
disconnected_msg = (session_disconnected_msg_t *) e->data;
sid = vcl_session_get_index_from_handle (disconnected_msg->handle);
session = vcl_session_get (sid);
+ if (!session)
+ {
+ VDBG (0, "request to disconnect unknown handle 0x%llx",
+ disconnected_msg->handle);
+ break;
+ }
session->session_state = STATE_DISCONNECT;
- VDBG (0, "disconnected %u", sid);
+ VDBG (0, "disconnected handle 0xllx, sid %u", disconnected_msg->handle,
+ sid);
+ break;
+ case SESSION_CTRL_EVT_RESET:
+ vcl_reset_handler ((session_reset_msg_t *) e->data);
break;
default:
clib_warning ("unhandled %u", e->event_type);
static int
vppcom_session_disconnect (u32 session_index)
{
- int rv;
+ svm_msg_q_t *vpp_evt_q;
vcl_session_t *session;
- u64 vpp_handle;
session_state_t state;
+ u64 vpp_handle;
+ int rv;
VCL_SESSION_LOCK_AND_GET (session_index, &session);
if (state & STATE_CLOSE_ON_EMPTY)
{
- vppcom_send_disconnect_session_reply (vpp_handle, session_index,
- 0 /* rv */ );
+ vpp_evt_q = vcl_session_vpp_evt_q (session);
+ vcl_send_session_disconnected_reply (vpp_evt_q, vcm->my_client_index,
+ vpp_handle, 0);
VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect "
"REPLY...", getpid (), vpp_handle, session_index);
}
{
vcm->init = 1;
vppcom_cfg (&vcm->cfg);
+ vcl_cfg = &vcm->cfg;
+
+ vcm->mqs_epfd = -1;
+ if (vcl_cfg->use_mq_eventfd)
+ vcm->mqs_epfd = epoll_create (1);
clib_spinlock_init (&vcm->session_fifo_lockp);
clib_fifo_validate (vcm->client_session_index_fifo,
vcm->main_cpu = os_get_thread_index ();
vcm->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
+ vcm->ct_registration_by_mq = hash_create (0, sizeof (uword));
+ clib_spinlock_init (&vcm->ct_registration_lock);
clib_time_init (&vcm->clib_time);
vppcom_init_error_string_table ();
svm_fifo_segment_main_init (vcl_cfg->segment_baseva,
20 /* timeout in secs */ );
+ vec_validate (vcm->mq_events, 64);
+ vec_validate (vcm->mq_msg_vector, 128);
+ vec_reset_length (vcm->mq_msg_vector);
}
if (vcm->my_client_index == ~0)
session->session_type = proto;
session->session_state = STATE_START;
session->vpp_handle = ~0;
+ session->is_dgram = proto == VPPCOM_PROTO_UDP;
if (is_nonblocking)
VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_NONBLOCK);
- else
- VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_NONBLOCK);
vcl_evt (VCL_EVT_CREATE, session, session_type, session->session_state,
is_nonblocking, session_index);
}
VCL_SESSION_LOCK_AND_GET (session_index, &session);
- if (session->our_evt_q)
- vcl_ct_registration_del (session->ct_registration);
+ if (vcl_session_is_ct (session))
+ {
+ vcl_cut_through_registration_t *ctr;
+ uword mq_addr;
+
+ mq_addr = pointer_to_uword (session->our_evt_q);
+ ctr = vcl_ct_registration_lock_and_lookup (mq_addr);
+ ASSERT (ctr);
+ if (ctr->epoll_evt_conn_index != ~0)
+ vcl_mq_epoll_del_evfd (ctr->epoll_evt_conn_index);
+ VDBG (0, "Removing ct registration %u",
+ vcl_ct_registration_index (ctr));
+ vcl_ct_registration_del (ctr);
+ vcl_ct_registration_unlock ();
+ }
vpp_handle = session->vpp_handle;
if (vpp_handle != ~0)
session->session_type ? "UDP" : "TCP");
vcl_evt (VCL_EVT_BIND, session);
VCL_SESSION_UNLOCK ();
+
+ if (session->session_type == VPPCOM_PROTO_UDP)
+ vppcom_session_listen (session_index, 10);
+
done:
return rv;
}
handle:
client_session_index = vcl_session_accepted_handler (&accepted_msg);
+ listen_session = vcl_session_get (listen_session_index);
VCL_SESSION_LOCK_AND_GET (client_session_index, &client_session);
rv = client_session_index;
svm_msg_q_t *);
else
vpp_evt_q = client_session->vpp_evt_q;
+
vcl_send_session_accepted_reply (vpp_evt_q, client_session->client_context,
client_session->vpp_handle, 0);
return (e->event_type == SESSION_IO_EVT_CT_TX);
}
+static inline u8
+vcl_session_is_readable (vcl_session_t * s)
+{
+ return ((s->session_state & STATE_OPEN)
+ || (s->session_state == STATE_LISTEN
+ && s->session_type == VPPCOM_PROTO_UDP));
+}
+
static inline int
vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
u8 peek)
{
int n_read = 0, rv, is_nonblocking;
- vcl_session_t *session = 0;
+ vcl_session_t *s = 0;
svm_fifo_t *rx_fifo;
svm_msg_q_msg_t msg;
session_event_t *e;
ASSERT (buf);
- VCL_SESSION_LOCK_AND_GET (session_index, &session);
+ VCL_SESSION_LOCK_AND_GET (session_index, &s);
- if (PREDICT_FALSE (session->is_vep))
+ if (PREDICT_FALSE (s->is_vep))
{
VCL_SESSION_UNLOCK ();
clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
goto done;
}
- is_nonblocking = VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK);
- rx_fifo = session->rx_fifo;
+ is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
+ rx_fifo = s->rx_fifo;
- if (PREDICT_FALSE (!(session->session_state & STATE_OPEN)))
+ if (PREDICT_FALSE (!vcl_session_is_readable (s)))
{
- session_state_t state = session->session_state;
+ session_state_t state = s->session_state;
VCL_SESSION_UNLOCK ();
rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: %s session is not open! "
"state 0x%x (%s), returning %d (%s)",
- getpid (), session->vpp_handle, session_index, state,
+ getpid (), s->vpp_handle, session_index, state,
vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
goto done;
}
VCL_SESSION_UNLOCK ();
- mq = session->our_evt_q ? session->our_evt_q : vcm->app_event_queue;
+ mq = vcl_session_is_ct (s) ? s->our_evt_q : vcm->app_event_queue;
+ svm_fifo_unset_event (rx_fifo);
is_full = svm_fifo_is_full (rx_fifo);
if (svm_fifo_is_empty (rx_fifo))
{
- svm_fifo_unset_event (rx_fifo);
if (is_nonblocking)
{
rv = VPPCOM_OK;
goto done;
}
- svm_msg_q_lock (mq);
while (1)
{
+ svm_msg_q_lock (mq);
if (svm_msg_q_is_empty (mq))
svm_msg_q_wait (mq);
+
svm_msg_q_sub_w_lock (mq, &msg);
e = svm_msg_q_msg_data (mq, &msg);
+ svm_msg_q_unlock (mq);
if (!vcl_is_rx_evt_for_session (e, session_index,
- session->our_evt_q != 0))
+ s->our_evt_q != 0))
{
vcl_handle_mq_ctrl_event (e);
svm_msg_q_free_msg (mq, &msg);
continue;
}
- if (svm_fifo_is_empty (rx_fifo))
- {
- svm_msg_q_free_msg (mq, &msg);
- continue;
- }
+ svm_fifo_unset_event (rx_fifo);
svm_msg_q_free_msg (mq, &msg);
- svm_msg_q_unlock (mq);
+ if (PREDICT_FALSE (s->session_state == STATE_CLOSE_ON_EMPTY))
+ return 0;
+ if (svm_fifo_is_empty (rx_fifo))
+ continue;
break;
}
}
- if (peek)
- n_read = svm_fifo_peek (rx_fifo, 0, n, buf);
+ if (s->is_dgram)
+ n_read = app_recv_dgram_raw (rx_fifo, buf, n, &s->transport, 0, peek);
else
- n_read = svm_fifo_dequeue_nowait (rx_fifo, n, buf);
- ASSERT (n_read > 0);
- svm_fifo_unset_event (rx_fifo);
-
- if (session->our_evt_q && is_full)
- app_send_io_evt_to_vpp (session->vpp_evt_q, rx_fifo, SESSION_IO_EVT_CT_RX,
- SVM_Q_WAIT);
+ n_read = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
+ if (vcl_session_is_ct (s) && is_full)
+ {
+ /* If the peer is not polling send notification */
+ if (!svm_fifo_has_event (s->rx_fifo))
+ app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo,
+ SESSION_IO_EVT_CT_RX, SVM_Q_WAIT);
+ }
if (VPPCOM_DEBUG > 2)
{
if (n_read > 0)
clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes "
- "from (%p)", getpid (), session->vpp_handle,
+ "from (%p)", getpid (), s->vpp_handle,
session_index, n_read, rx_fifo);
else
clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: nothing read! "
- "returning %d (%s)", getpid (), session->vpp_handle,
+ "returning %d (%s)", getpid (), s->vpp_handle,
session_index, rv, vppcom_retval_str (rv));
}
return n_read;
vppcom_session_write (uint32_t session_index, void *buf, size_t n)
{
int rv, n_write, is_nonblocking;
- vcl_session_t *session = 0;
+ vcl_session_t *s = 0;
svm_fifo_t *tx_fifo = 0;
+ session_evt_type_t et;
svm_msg_q_msg_t msg;
session_event_t *e;
svm_msg_q_t *mq;
ASSERT (buf);
- VCL_SESSION_LOCK_AND_GET (session_index, &session);
+ VCL_SESSION_LOCK_AND_GET (session_index, &s);
- tx_fifo = session->tx_fifo;
- is_nonblocking = VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK);
+ tx_fifo = s->tx_fifo;
+ is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
- if (PREDICT_FALSE (session->is_vep))
+ if (PREDICT_FALSE (s->is_vep))
{
VCL_SESSION_UNLOCK ();
clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
"cannot write to an epoll session!",
- getpid (), session->vpp_handle, session_index);
+ getpid (), s->vpp_handle, session_index);
rv = VPPCOM_EBADFD;
goto done;
}
- if (!(session->session_state & STATE_OPEN))
+ if (!(s->session_state & STATE_OPEN))
{
- session_state_t state = session->session_state;
+ session_state_t state = s->session_state;
rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
VCL_SESSION_UNLOCK ();
VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open! "
"state 0x%x (%s)",
- getpid (), session->vpp_handle, session_index,
+ getpid (), s->vpp_handle, session_index,
state, vppcom_session_state_str (state));
goto done;
}
VCL_SESSION_UNLOCK ();
- mq = session->our_evt_q ? session->our_evt_q : vcm->app_event_queue;
+ mq = vcl_session_is_ct (s) ? s->our_evt_q : vcm->app_event_queue;
if (svm_fifo_is_full (tx_fifo))
{
if (is_nonblocking)
rv = VPPCOM_EWOULDBLOCK;
goto done;
}
- svm_msg_q_lock (mq);
- while (1)
+ while (svm_fifo_is_full (tx_fifo))
{
- if (!svm_fifo_is_full (tx_fifo))
- {
- svm_msg_q_unlock (mq);
- break;
- }
- if (svm_msg_q_is_empty (mq) && svm_msg_q_timedwait (mq, 10e-6))
- continue;
+ svm_msg_q_lock (mq);
+ while (svm_msg_q_is_empty (mq) && svm_msg_q_timedwait (mq, 10e-6))
+ ;
svm_msg_q_sub_w_lock (mq, &msg);
e = svm_msg_q_msg_data (mq, &msg);
+ svm_msg_q_unlock (mq);
+
if (!vcl_is_tx_evt_for_session (e, session_index,
- session->our_evt_q != 0))
- {
- vcl_handle_mq_ctrl_event (e);
- svm_msg_q_free_msg (mq, &msg);
- continue;
- }
- if (svm_fifo_is_full (tx_fifo))
- {
- svm_msg_q_free_msg (mq, &msg);
- continue;
- }
+ s->our_evt_q != 0))
+ vcl_handle_mq_ctrl_event (e);
svm_msg_q_free_msg (mq, &msg);
- svm_msg_q_unlock (mq);
- break;
}
}
- n_write = svm_fifo_enqueue_nowait (tx_fifo, n, (void *) buf);
- ASSERT (n_write > 0);
+ ASSERT (FIFO_EVENT_APP_TX + 1 == SESSION_IO_EVT_CT_TX);
+ et = FIFO_EVENT_APP_TX + vcl_session_is_ct (s);
+ if (s->is_dgram)
+ n_write = app_send_dgram_raw (tx_fifo, &s->transport,
+ s->vpp_evt_q, buf, n, et, SVM_Q_WAIT);
+ else
+ n_write = app_send_stream_raw (tx_fifo, s->vpp_evt_q, buf, n, et,
+ SVM_Q_WAIT);
- if (svm_fifo_set_event (tx_fifo))
- {
- session_evt_type_t et;
- VCL_SESSION_LOCK_AND_GET (session_index, &session);
- et = session->our_evt_q ? SESSION_IO_EVT_CT_TX : FIFO_EVENT_APP_TX;
- app_send_io_evt_to_vpp (session->vpp_evt_q, tx_fifo, et, SVM_Q_WAIT);
- VCL_SESSION_UNLOCK ();
- VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: added FIFO_EVENT_APP_TX "
- "to vpp_event_q %p, n_write %d", getpid (),
- session->vpp_handle, session_index, session->vpp_evt_q, n_write);
- }
+ ASSERT (n_write > 0);
if (VPPCOM_DEBUG > 2)
{
if (n_write <= 0)
clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
- "FIFO-FULL (%p)", getpid (), session->vpp_handle,
+ "FIFO-FULL (%p)", getpid (), s->vpp_handle,
session_index, tx_fifo);
else
clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
"wrote %d bytes tx-fifo: (%p)", getpid (),
- session->vpp_handle, session_index, n_write, tx_fifo);
+ s->vpp_handle, session_index, n_write, tx_fifo);
}
return n_write;
return rv;
}
+static vcl_session_t *
+vcl_ct_session_get_from_fifo (svm_fifo_t * f, u8 type)
+{
+ vcl_session_t *s;
+ s = vcl_session_get (f->client_session_index);
+ if (s)
+ {
+ /* rx fifo */
+ if (type == 0 && s->rx_fifo == f)
+ return s;
+ /* tx fifo */
+ if (type == 1 && s->tx_fifo == f)
+ return s;
+ }
+ s = vcl_session_get (f->master_session_index);
+ if (s)
+ {
+ if (type == 0 && s->rx_fifo == f)
+ return s;
+ if (type == 1 && s->tx_fifo == f)
+ return s;
+ }
+ return 0;
+}
+
static inline int
vppcom_session_write_ready (vcl_session_t * session, u32 session_index)
{
return svm_fifo_max_enqueue (session->tx_fifo);
}
+static inline int
+vcl_mq_dequeue_batch (svm_msg_q_t * mq)
+{
+ svm_msg_q_msg_t *msg;
+ u32 n_msgs;
+ int i;
+
+ n_msgs = svm_msg_q_size (mq);
+ for (i = 0; i < n_msgs; i++)
+ {
+ vec_add2 (vcm->mq_msg_vector, msg, 1);
+ svm_msg_q_sub_w_lock (mq, msg);
+ }
+ return n_msgs;
+}
+
static int
vcl_select_handle_mq (svm_msg_q_t * mq, unsigned long n_bits,
unsigned long *read_map, unsigned long *write_map,
u32 * bits_set)
{
session_disconnected_msg_t *disconnected_msg;
+ session_connected_msg_t *connected_msg;
session_accepted_msg_t *accepted_msg;
vcl_session_msg_t *vcl_msg;
vcl_session_t *session;
- svm_msg_q_msg_t msg;
+ svm_msg_q_msg_t *msg;
session_event_t *e;
- u32 n_msgs, i, sid;
+ u32 i, sid;
u64 handle;
svm_msg_q_lock (mq);
}
}
}
+ vcl_mq_dequeue_batch (mq);
svm_msg_q_unlock (mq);
- n_msgs = svm_msg_q_size (mq);
- for (i = 0; i < n_msgs; i++)
+ for (i = 0; i < vec_len (vcm->mq_msg_vector); i++)
{
- if (svm_msg_q_sub (mq, &msg, SVM_Q_WAIT, 0))
- {
- clib_warning ("message queue returned");
- continue;
- }
- e = svm_msg_q_msg_data (mq, &msg);
+ msg = vec_elt_at_index (vcm->mq_msg_vector, i);
+ e = svm_msg_q_msg_data (mq, msg);
switch (e->event_type)
{
case FIFO_EVENT_APP_RX:
sid = e->fifo->client_session_index;
session = vcl_session_get (sid);
- if (!session || svm_fifo_is_empty (session->rx_fifo))
- break;
if (sid < n_bits && read_map)
{
clib_bitmap_set_no_check (read_map, sid, 1);
case FIFO_EVENT_APP_TX:
sid = e->fifo->client_session_index;
session = vcl_session_get (sid);
- if (!session || svm_fifo_is_full (session->tx_fifo))
+ if (!session)
break;
if (sid < n_bits && write_map)
{
case SESSION_IO_EVT_CT_TX:
session = vcl_ct_session_get_from_fifo (e->fifo, 0);
sid = vcl_session_index (session);
- if (!session || svm_fifo_is_empty (session->rx_fifo))
- break;
if (sid < n_bits && read_map)
{
clib_bitmap_set_no_check (read_map, sid, 1);
case SESSION_IO_EVT_CT_RX:
session = vcl_ct_session_get_from_fifo (e->fifo, 1);
sid = vcl_session_index (session);
- if (!session || svm_fifo_is_full (session->tx_fifo))
+ if (!session)
break;
if (sid < n_bits && write_map)
{
*bits_set += 1;
}
break;
+ case SESSION_CTRL_EVT_CONNECTED:
+ connected_msg = (session_connected_msg_t *) e->data;
+ vcl_session_connected_handler (connected_msg);
+ break;
case SESSION_CTRL_EVT_DISCONNECTED:
disconnected_msg = (session_disconnected_msg_t *) e->data;
sid = vcl_session_get_index_from_handle (disconnected_msg->handle);
*bits_set += 1;
}
break;
+ case SESSION_CTRL_EVT_RESET:
+ sid = vcl_reset_handler ((session_reset_msg_t *) e->data);
+ if (sid < n_bits && except_map)
+ {
+ clib_bitmap_set_no_check (except_map, sid, 1);
+ *bits_set += 1;
+ }
+ break;
default:
clib_warning ("unhandled: %u", e->event_type);
break;
}
- svm_msg_q_free_msg (mq, &msg);
+ svm_msg_q_free_msg (mq, msg);
}
+ vec_reset_length (vcm->mq_msg_vector);
return *bits_set;
}
+static int
+vppcom_select_condvar (unsigned long n_bits, unsigned long *read_map,
+ unsigned long *write_map, unsigned long *except_map,
+ double time_to_wait, u32 * bits_set)
+{
+ double total_wait = 0, wait_slice;
+ vcl_cut_through_registration_t *cr;
+
+ time_to_wait = (time_to_wait == -1) ? 10e9 : time_to_wait;
+ wait_slice = vcm->cut_through_registrations ? 10e-6 : time_to_wait;
+ do
+ {
+ /* *INDENT-OFF* */
+ pool_foreach (cr, vcm->cut_through_registrations, ({
+ vcl_select_handle_mq (cr->mq, n_bits, read_map, write_map, except_map,
+ 0, bits_set);
+ }));
+ /* *INDENT-ON* */
+
+ vcl_select_handle_mq (vcm->app_event_queue, n_bits, read_map, write_map,
+ except_map, time_to_wait, bits_set);
+ total_wait += wait_slice;
+ if (*bits_set)
+ return *bits_set;
+ }
+ while (total_wait < time_to_wait);
+
+ return 0;
+}
+
+static int
+vppcom_select_eventfd (unsigned long n_bits, unsigned long *read_map,
+ unsigned long *write_map, unsigned long *except_map,
+ double time_to_wait, u32 * bits_set)
+{
+ vcl_mq_evt_conn_t *mqc;
+ int __clib_unused n_read;
+ int n_mq_evts, i;
+ u64 buf;
+
+ vec_validate (vcm->mq_events, pool_elts (vcm->mq_evt_conns));
+ n_mq_evts = epoll_wait (vcm->mqs_epfd, vcm->mq_events,
+ vec_len (vcm->mq_events), time_to_wait);
+ for (i = 0; i < n_mq_evts; i++)
+ {
+ mqc = vcl_mq_evt_conn_get (vcm->mq_events[i].data.u32);
+ n_read = read (mqc->mq_fd, &buf, sizeof (buf));
+ vcl_select_handle_mq (mqc->mq, n_bits, read_map, write_map,
+ except_map, 0, bits_set);
+ }
+
+ return (n_mq_evts > 0 ? (int) *bits_set : 0);
+}
+
int
vppcom_select (unsigned long n_bits, unsigned long *read_map,
unsigned long *write_map, unsigned long *except_map,
double time_to_wait)
{
u32 sid, minbits = clib_max (n_bits, BITS (uword)), bits_set = 0;
- vcl_cut_through_registration_t *cr;
- double total_wait = 0, wait_slice;
vcl_session_t *session = 0;
int rv;
/* *INDENT-OFF* */
clib_bitmap_foreach (sid, vcm->wr_bitmap, ({
- VCL_SESSION_LOCK();
if (!(session = vcl_session_get (sid)))
{
- VCL_SESSION_UNLOCK();
VDBG (0, "VCL<%d>: session %d specified in write_map is closed.",
getpid (), sid);
return VPPCOM_EBADFD;
}
rv = svm_fifo_is_full (session->tx_fifo);
- VCL_SESSION_UNLOCK();
if (!rv)
{
clib_bitmap_set_no_check (write_map, sid, 1);
check_rd:
if (!read_map)
goto check_mq;
+
clib_bitmap_foreach (sid, vcm->rd_bitmap, ({
- VCL_SESSION_LOCK();
if (!(session = vcl_session_get (sid)))
{
- VCL_SESSION_UNLOCK();
VDBG (0, "VCL<%d>: session %d specified in write_map is closed.",
getpid (), sid);
return VPPCOM_EBADFD;
}
rv = vppcom_session_read_ready (session);
- VCL_SESSION_UNLOCK();
if (rv)
{
clib_bitmap_set_no_check (read_map, sid, 1);
/* *INDENT-ON* */
check_mq:
- wait_slice = vcm->cut_through_registrations ? 10e-6 : time_to_wait;
- do
- {
- /* *INDENT-OFF* */
- pool_foreach (cr, vcm->cut_through_registrations, ({
- vcl_select_handle_mq (cr->mq, n_bits, read_map, write_map, except_map,
- 0, &bits_set);
- }));
- /* *INDENT-ON* */
- vcl_select_handle_mq (vcm->app_event_queue, n_bits, read_map, write_map,
- except_map, time_to_wait, &bits_set);
- total_wait += wait_slice;
- if (bits_set)
- return bits_set;
- }
- while (total_wait < time_to_wait);
+ if (vcm->cfg.use_mq_eventfd)
+ vppcom_select_eventfd (n_bits, read_map, write_map, except_map,
+ time_to_wait, &bits_set);
+ else
+ vppcom_select_condvar (n_bits, read_map, write_map, except_map,
+ time_to_wait, &bits_set);
return (bits_set);
}
session_disconnected_msg_t *disconnected_msg;
session_connected_msg_t *connected_msg;
session_accepted_msg_t *accepted_msg;
- u32 sid = ~0, session_events, n_msgs;
u64 session_evt_data = ~0, handle;
+ u32 sid = ~0, session_events;
vcl_session_msg_t *vcl_msg;
vcl_session_t *session;
- svm_msg_q_msg_t msg;
+ svm_msg_q_msg_t *msg;
session_event_t *e;
u8 add_event;
int i;
}
}
}
+ vcl_mq_dequeue_batch (mq);
svm_msg_q_unlock (mq);
- n_msgs = svm_msg_q_size (mq);
- for (i = 0; i < n_msgs; i++)
+ for (i = 0; i < vec_len (vcm->mq_msg_vector); i++)
{
- if (svm_msg_q_sub (mq, &msg, SVM_Q_WAIT, 0))
- {
- clib_warning ("message queue returned");
- continue;
- }
- e = svm_msg_q_msg_data (mq, &msg);
+ msg = vec_elt_at_index (vcm->mq_msg_vector, i);
+ e = svm_msg_q_msg_data (mq, msg);
add_event = 0;
switch (e->event_type)
{
case FIFO_EVENT_APP_RX:
sid = e->fifo->client_session_index;
- clib_spinlock_lock (&vcm->sessions_lockp);
session = vcl_session_get (sid);
session_events = session->vep.ev.events;
- if ((EPOLLIN & session->vep.ev.events)
- && !svm_fifo_is_empty (session->rx_fifo))
- {
- add_event = 1;
- events[*num_ev].events |= EPOLLIN;
- session_evt_data = session->vep.ev.data.u64;
- }
- clib_spinlock_unlock (&vcm->sessions_lockp);
+ if (!(EPOLLIN & session->vep.ev.events))
+ break;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLIN;
+ session_evt_data = session->vep.ev.data.u64;
break;
case FIFO_EVENT_APP_TX:
sid = e->fifo->client_session_index;
- clib_spinlock_lock (&vcm->sessions_lockp);
session = vcl_session_get (sid);
session_events = session->vep.ev.events;
- if ((EPOLLOUT & session_events)
- && !svm_fifo_is_full (session->tx_fifo))
- {
- add_event = 1;
- events[*num_ev].events |= EPOLLOUT;
- session_evt_data = session->vep.ev.data.u64;
- }
- clib_spinlock_unlock (&vcm->sessions_lockp);
+ if (!(EPOLLOUT & session_events))
+ break;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLOUT;
+ session_evt_data = session->vep.ev.data.u64;
break;
case SESSION_IO_EVT_CT_TX:
session = vcl_ct_session_get_from_fifo (e->fifo, 0);
sid = vcl_session_index (session);
session_events = session->vep.ev.events;
- if ((EPOLLIN & session->vep.ev.events)
- && !svm_fifo_is_empty (session->rx_fifo))
- {
- add_event = 1;
- events[*num_ev].events |= EPOLLIN;
- session_evt_data = session->vep.ev.data.u64;
- }
+ if (!(EPOLLIN & session->vep.ev.events))
+ break;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLIN;
+ session_evt_data = session->vep.ev.data.u64;
break;
case SESSION_IO_EVT_CT_RX:
session = vcl_ct_session_get_from_fifo (e->fifo, 1);
sid = vcl_session_index (session);
session_events = session->vep.ev.events;
- if ((EPOLLOUT & session_events)
- && !svm_fifo_is_full (session->tx_fifo))
- {
- add_event = 1;
- events[*num_ev].events |= EPOLLOUT;
- session_evt_data = session->vep.ev.data.u64;
- }
+ if (!(EPOLLOUT & session_events))
+ break;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLOUT;
+ session_evt_data = session->vep.ev.data.u64;
break;
case SESSION_CTRL_EVT_ACCEPTED:
accepted_msg = (session_accepted_msg_t *) e->data;
disconnected_msg = (session_disconnected_msg_t *) e->data;
sid = vcl_session_get_index_from_handle (disconnected_msg->handle);
clib_spinlock_lock (&vcm->sessions_lockp);
- session = vcl_session_get (sid);
+ if (!(session = vcl_session_get (sid)))
+ break;
add_event = 1;
events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
session_evt_data = session->vep.ev.data.u64;
session_events = session->vep.ev.events;
clib_spinlock_unlock (&vcm->sessions_lockp);
break;
+ case SESSION_CTRL_EVT_RESET:
+ sid = vcl_reset_handler ((session_reset_msg_t *) e->data);
+ if (!(session = vcl_session_get (sid)))
+ break;
+ add_event = 1;
+ events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
+ session_evt_data = session->vep.ev.data.u64;
+ session_events = session->vep.ev.events;
+ break;
default:
clib_warning ("unhandled: %u", e->event_type);
- svm_msg_q_free_msg (mq, &msg);
+ svm_msg_q_free_msg (mq, msg);
continue;
}
-
- svm_msg_q_free_msg (mq, &msg);
+ svm_msg_q_free_msg (mq, msg);
if (add_event)
{
break;
}
}
+
+ vec_reset_length (vcm->mq_msg_vector);
return *num_ev;
}
+static int
+vppcom_epoll_wait_condvar (struct epoll_event *events, int maxevents,
+ double wait_for_time)
+{
+ vcl_cut_through_registration_t *cr;
+ double total_wait = 0, wait_slice;
+ u32 num_ev = 0;
+ int rv;
+
+ wait_for_time = (wait_for_time == -1) ? (double) 10e9 : wait_for_time;
+ wait_slice = vcm->cut_through_registrations ? 10e-6 : wait_for_time;
+
+ do
+ {
+ /* *INDENT-OFF* */
+ pool_foreach (cr, vcm->cut_through_registrations, ({
+ vcl_epoll_wait_handle_mq (cr->mq, events, maxevents, 0, &num_ev);
+ }));
+ /* *INDENT-ON* */
+
+ rv = vcl_epoll_wait_handle_mq (vcm->app_event_queue, events, maxevents,
+ num_ev ? 0 : wait_slice, &num_ev);
+ if (rv)
+ total_wait += wait_slice;
+ if (num_ev)
+ return num_ev;
+ }
+ while (total_wait < wait_for_time);
+ return (int) num_ev;
+}
+
+static int
+vppcom_epoll_wait_eventfd (struct epoll_event *events, int maxevents,
+ double wait_for_time)
+{
+ vcl_mq_evt_conn_t *mqc;
+ int __clib_unused n_read;
+ int n_mq_evts, i;
+ u32 n_evts = 0;
+ u64 buf;
+
+ vec_validate (vcm->mq_events, pool_elts (vcm->mq_evt_conns));
+ n_mq_evts = epoll_wait (vcm->mqs_epfd, vcm->mq_events,
+ vec_len (vcm->mq_events), wait_for_time);
+ for (i = 0; i < n_mq_evts; i++)
+ {
+ mqc = vcl_mq_evt_conn_get (vcm->mq_events[i].data.u32);
+ n_read = read (mqc->mq_fd, &buf, sizeof (buf));
+ vcl_epoll_wait_handle_mq (mqc->mq, events, maxevents, 0, &n_evts);
+ }
+
+ return (int) n_evts;
+}
+
int
vppcom_epoll_wait (uint32_t vep_idx, struct epoll_event *events,
int maxevents, double wait_for_time)
{
- vcl_cut_through_registration_t *cr;
vcl_session_t *vep_session;
- double total_wait = 0, wait_slice;
- u32 num_ev = 0;
if (PREDICT_FALSE (maxevents <= 0))
{
clib_spinlock_unlock (&vcm->sessions_lockp);
memset (events, 0, sizeof (*events) * maxevents);
- wait_slice = vcm->cut_through_registrations ? 10e-6 : wait_for_time;
- do
- {
- /* *INDENT-OFF* */
- pool_foreach (cr, vcm->cut_through_registrations, ({
- vcl_epoll_wait_handle_mq (cr->mq, events, maxevents, 0, &num_ev);
- }));
- /* *INDENT-ON* */
-
- vcl_epoll_wait_handle_mq (vcm->app_event_queue, events, maxevents,
- num_ev ? 0 : wait_slice, &num_ev);
- total_wait += wait_slice;
- if (num_ev)
- return num_ev;
- }
- while (total_wait < wait_for_time);
+ if (vcm->cfg.use_mq_eventfd)
+ return vppcom_epoll_wait_eventfd (events, maxevents, wait_for_time);
- return num_ev;
+ return vppcom_epoll_wait_condvar (events, maxevents, wait_for_time);
}
int
VCL_SESSION_UNLOCK ();
VDBG (0, "VCL<%d>: invalid session, sid (%u) has been closed!",
getpid (), session_index);
- rv = VPPCOM_EBADFD;
VCL_SESSION_UNLOCK ();
- goto done;
+ return VPPCOM_EBADFD;
}
ep->is_ip4 = session->transport.is_ip4;
ep->port = session->transport.rmt_port;
- if (session->transport.is_ip4)
- clib_memcpy (ep->ip, &session->transport.rmt_ip.ip4,
- sizeof (ip4_address_t));
- else
- clib_memcpy (ep->ip, &session->transport.rmt_ip.ip6,
- sizeof (ip6_address_t));
VCL_SESSION_UNLOCK ();
}
{
clib_warning ("VCL<%d>: Unsupport flags for recvfrom %d",
getpid (), flags);
- rv = VPPCOM_EAFNOSUPPORT;
+ return VPPCOM_EAFNOSUPPORT;
+ }
+
+ if (ep)
+ {
+ if (session->transport.is_ip4)
+ clib_memcpy (ep->ip, &session->transport.rmt_ip.ip4,
+ sizeof (ip4_address_t));
+ else
+ clib_memcpy (ep->ip, &session->transport.rmt_ip.ip6,
+ sizeof (ip6_address_t));
}
-done:
return rv;
}
return num_ev;
}
+int
+vppcom_mq_epoll_fd (void)
+{
+ return vcm->mqs_epfd;
+}
+
/*
* fd.io coding-style-patch-verification: ON
*