#include <stdio.h>
#include <stdlib.h>
-#include <svm/svm_fifo_segment.h>
#include <vcl/vppcom.h>
#include <vcl/vcl_debug.h>
#include <vcl/vcl_private.h>
+#include <svm/fifo_segment.h>
__thread uword __vcl_worker_index = ~0;
}
static inline int
-vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq)
+vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq, u32 n_max_msg)
{
svm_msg_q_msg_t *msg;
u32 n_msgs;
int i;
- n_msgs = svm_msg_q_size (mq);
+ n_msgs = clib_min (svm_msg_q_size (mq), n_max_msg);
for (i = 0; i < n_msgs; i++)
{
vec_add2 (wrk->mq_msg_vector, msg, 1);
}
static u32
-vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp)
+vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp,
+ u32 ls_index)
{
vcl_session_t *session, *listen_session;
svm_fifo_t *rx_fifo, *tx_fifo;
session = vcl_session_alloc (wrk);
- listen_session = vcl_session_table_lookup_listener (wrk,
- mp->listener_handle);
- if (!listen_session)
+ listen_session = vcl_session_get (wrk, ls_index);
+ if (listen_session->vpp_handle != mp->listener_handle)
{
- evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
- VDBG (0, "ERROR: couldn't find listen session: unknown vpp listener "
- "handle %llx", mp->listener_handle);
- vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle,
- VNET_API_ERROR_INVALID_ARGUMENT);
- vcl_session_free (wrk, session);
- return VCL_INVALID_SESSION_INDEX;
+ VDBG (0, "ERROR: listener handle %lu does not match session %u",
+ mp->listener_handle, ls_index);
+ goto error;
}
- rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
- tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
-
if (vcl_wait_for_segment (mp->segment_handle))
{
- VDBG (0, "segment for session %u couldn't be mounted!",
+ VDBG (0, "ERROR: segment for session %u couldn't be mounted!",
session->session_index);
- return VCL_INVALID_SESSION_INDEX;
+ goto error;
}
+ rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
+ tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
svm_msg_q_t *);
rx_fifo->client_session_index = session->session_index;
session->vpp_handle = mp->handle;
session->vpp_thread_index = rx_fifo->master_thread_index;
- session->client_context = mp->context;
session->rx_fifo = rx_fifo;
session->tx_fifo = tx_fifo;
session->session_state = STATE_ACCEPT;
- session->transport.rmt_port = mp->port;
- session->transport.is_ip4 = mp->is_ip4;
- clib_memcpy_fast (&session->transport.rmt_ip, mp->ip,
+ session->transport.rmt_port = mp->rmt.port;
+ session->transport.is_ip4 = mp->rmt.is_ip4;
+ clib_memcpy_fast (&session->transport.rmt_ip, &mp->rmt.ip,
sizeof (ip46_address_t));
vcl_session_table_add_vpp_handle (wrk, mp->handle, session->session_index);
VDBG (1, "session %u [0x%llx]: client accept request from %s address %U"
" port %d queue %p!", session->session_index, mp->handle,
- mp->is_ip4 ? "IPv4" : "IPv6", format_ip46_address, &mp->ip,
- mp->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
- clib_net_to_host_u16 (mp->port), session->vpp_evt_q);
+ mp->rmt.is_ip4 ? "IPv4" : "IPv6", format_ip46_address, &mp->rmt.ip,
+ mp->rmt.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
+ clib_net_to_host_u16 (mp->rmt.port), session->vpp_evt_q);
vcl_evt (VCL_EVT_ACCEPT, session, listen_session, session_index);
+ vcl_send_session_accepted_reply (session->vpp_evt_q, mp->context,
+ session->vpp_handle, 0);
+
return session->session_index;
+
+error:
+ evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
+ vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle,
+ VNET_API_ERROR_INVALID_ARGUMENT);
+ vcl_session_free (wrk, session);
+ return VCL_INVALID_SESSION_INDEX;
}
static u32
session->tx_fifo = tx_fifo;
session->vpp_handle = mp->handle;
session->vpp_thread_index = rx_fifo->master_thread_index;
- session->transport.is_ip4 = mp->is_ip4;
- clib_memcpy_fast (&session->transport.lcl_ip, mp->lcl_ip,
+ session->transport.is_ip4 = mp->lcl.is_ip4;
+ clib_memcpy_fast (&session->transport.lcl_ip, &mp->lcl.ip,
sizeof (session->transport.lcl_ip));
- session->transport.lcl_port = mp->lcl_port;
+ session->transport.lcl_port = mp->lcl.port;
session->session_state = STATE_CONNECT;
/* Add it to lookup table */
mq = wrk->app_event_queue;
svm_msg_q_lock (mq);
- vcl_mq_dequeue_batch (wrk, mq);
+ vcl_mq_dequeue_batch (wrk, mq, ~0);
svm_msg_q_unlock (mq);
for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
vppcom_session_unbind (u32 session_handle)
{
vcl_worker_t *wrk = vcl_worker_get_current ();
+ session_accepted_msg_t *accepted_msg;
vcl_session_t *session = 0;
+ vcl_session_msg_t *evt;
u64 vpp_handle;
session = vcl_session_get_w_handle (wrk, session_handle);
if (!session)
return VPPCOM_EBADFD;
+ /* Flush pending accept events, if any */
+ while (clib_fifo_elts (session->accept_evts_fifo))
+ {
+ clib_fifo_sub2 (session->accept_evts_fifo, evt);
+ accepted_msg = &evt->accepted_msg;
+ vcl_session_table_del_vpp_handle (wrk, accepted_msg->handle);
+ vcl_send_session_accepted_reply (session->vpp_evt_q,
+ accepted_msg->context,
+ session->vpp_handle, -1);
+ }
+ clib_fifo_free (session->accept_evts_fifo);
+
vpp_handle = session->vpp_handle;
session->vpp_handle = ~0;
session->session_state = STATE_DISCONNECT;
vcm->main_pid = getpid ();
vcm->app_name = format (0, "%s", app_name);
vppcom_init_error_string_table ();
- svm_fifo_segment_main_init (&vcm->segment_main, vcl_cfg->segment_baseva,
- 20 /* timeout in secs */ );
+ fifo_segment_main_init (&vcm->segment_main, vcl_cfg->segment_baseva,
+ 20 /* timeout in secs */ );
pool_alloc (vcm->workers, vcl_cfg->max_workers);
clib_spinlock_init (&vcm->workers_lock);
clib_rwlock_init (&vcm->segment_table_lock);
e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
if (e->event_type != SESSION_CTRL_EVT_ACCEPTED)
{
- clib_warning ("discarded event: %u", e->event_type);
+ VDBG (0, "discarded event: %u", e->event_type);
svm_msg_q_free_msg (wrk->app_event_queue, &msg);
continue;
}
handle:
- client_session_index = vcl_session_accepted_handler (wrk, &accepted_msg);
+ client_session_index = vcl_session_accepted_handler (wrk, &accepted_msg,
+ listen_session_index);
+ if (client_session_index == VCL_INVALID_SESSION_INDEX)
+ return VPPCOM_ECONNABORTED;
+
listen_session = vcl_session_get (wrk, listen_session_index);
client_session = vcl_session_get (wrk, client_session_index);
sizeof (ip6_address_t));
}
- vcl_send_session_accepted_reply (client_session->vpp_evt_q,
- client_session->client_context,
- client_session->vpp_handle, 0);
-
VDBG (0, "listener %u [0x%llx] accepted %u [0x%llx] peer: %U:%u "
"local: %U:%u", listen_session_handle, listen_session->vpp_handle,
client_session_index, client_session->vpp_handle,
if (svm_fifo_is_empty_cons (rx_fifo))
svm_fifo_unset_event (s->rx_fifo);
+ /* Cut-through sessions might request tx notifications on rx fifos */
+ if (PREDICT_FALSE (rx_fifo->want_deq_ntf))
+ {
+ app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo->master_session_index,
+ SESSION_IO_EVT_RX, SVM_Q_WAIT);
+ svm_fifo_reset_has_deq_ntf (s->rx_fifo);
+ }
+
VDBG (2, "session %u[0x%llx]: read %d bytes from (%p)", s->session_index,
s->vpp_handle, n_read, rx_fifo);
}
}
- n_read = svm_fifo_segments (rx_fifo, (svm_fifo_segment_t *) ds);
+ n_read = svm_fifo_segments (rx_fifo, (svm_fifo_seg_t *) ds);
svm_fifo_unset_event (rx_fifo);
return n_read;
if (PREDICT_FALSE (!s || s->is_vep))
return;
- svm_fifo_segments_free (s->rx_fifo, (svm_fifo_segment_t *) ds);
+ svm_fifo_segments_free (s->rx_fifo, (svm_fifo_seg_t *) ds);
}
int
}
while (svm_fifo_is_full_prod (tx_fifo))
{
- svm_fifo_add_want_tx_ntf (tx_fifo, SVM_FIFO_WANT_TX_NOTIF);
+ svm_fifo_add_want_deq_ntf (tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
if (vcl_session_is_closing (s))
return vcl_session_closing_error (s);
svm_msg_q_lock (mq);
}
}
}
- vcl_mq_dequeue_batch (wrk, mq);
+ vcl_mq_dequeue_batch (wrk, mq, ~0);
svm_msg_q_unlock (mq);
for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
bits_set++;
}
else
- svm_fifo_add_want_tx_ntf (session->tx_fifo, SVM_FIFO_WANT_TX_NOTIF);
+ svm_fifo_add_want_deq_ntf (session->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
}));
check_rd:
vep_session->vep.next_sh = session_handle;
if (session->tx_fifo)
- svm_fifo_add_want_tx_ntf (session->tx_fifo,
- SVM_FIFO_WANT_TX_NOTIF_IF_FULL);
+ svm_fifo_add_want_deq_ntf (session->tx_fifo,
+ SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL);
VDBG (1, "EPOLL_CTL_ADD: vep_sh %u, sh %u, events 0x%x, data 0x%llx!",
vep_handle, session_handle, event->events, event->data.u64);
session->is_vep_session = 0;
if (session->tx_fifo)
- svm_fifo_del_want_tx_ntf (session->tx_fifo, SVM_FIFO_NO_TX_NOTIF);
+ svm_fifo_del_want_deq_ntf (session->tx_fifo, SVM_FIFO_NO_DEQ_NOTIF);
VDBG (1, "EPOLL_CTL_DEL: vep_idx %u, sh %u!", vep_handle,
session_handle);
add_event = 1;
events[*num_ev].events |= EPOLLOUT;
session_evt_data = session->vep.ev.data.u64;
- svm_fifo_reset_tx_ntf (session->tx_fifo);
+ svm_fifo_reset_has_deq_ntf (session->tx_fifo);
break;
case SESSION_CTRL_EVT_ACCEPTED:
session = vcl_session_accepted (wrk,
}
}
}
- vcl_mq_dequeue_batch (wrk, mq);
+ ASSERT (maxevents > *num_ev);
+ vcl_mq_dequeue_batch (wrk, mq, maxevents - *num_ev);
svm_msg_q_unlock (mq);
handle_dequeued:
{
msg = vec_elt_at_index (wrk->mq_msg_vector, i);
e = svm_msg_q_msg_data (mq, msg);
- if (*num_ev < maxevents)
- vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
- else
- vec_add1 (wrk->unhandled_evts_vector, *e);
+ vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
svm_msg_q_free_msg (mq, msg);
}
vec_reset_length (wrk->mq_msg_vector);
vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events,
int maxevents, u32 n_evts, double wait_for_time)
{
- double wait = 0, start = 0;
+ double wait = 0, start = 0, now;
if (!n_evts)
{
if (wait == -1)
continue;
- wait = wait - (clib_time_now (&wrk->clib_time) - start);
+ now = clib_time_now (&wrk->clib_time);
+ wait -= now - start;
+ start = now;
}
while (wait > 0);
events, &n_evts);
if (n_evts == maxevents)
{
- i += 1;
- break;
+ vec_delete (wrk->unhandled_evts_vector, i + 1, 0);
+ return n_evts;
}
}
- vec_delete (wrk->unhandled_evts_vector, i, 0);
+ vec_reset_length (wrk->unhandled_evts_vector);
}
if (vcm->cfg.use_mq_eventfd)
return VPPCOM_OK;
}
+void
+vppcom_worker_unregister (void)
+{
+ vcl_worker_cleanup (vcl_worker_get_current (), 1 /* notify vpp */ );
+ vcl_set_worker_index (~0);
+}
+
int
vppcom_worker_index (void)
{