session_manager_main_t session_manager_main;
extern transport_proto_vft_t *tp_vfts;
-static void
-session_send_evt_to_thread (u64 session_handle, fifo_event_type_t evt_type,
- u32 thread_index, void *fp, void *rpc_args)
+static inline int
+session_send_evt_to_thread (void *data, void *args, u32 thread_index,
+ session_evt_type_t evt_type)
{
- u32 tries = 0;
- session_fifo_event_t evt = { {0}, };
- svm_queue_t *q;
+ session_event_t *evt;
+ svm_msg_q_msg_t msg;
+ svm_msg_q_t *mq;
+ u32 tries = 0, max_tries;
- evt.event_type = evt_type;
- if (evt_type == FIFO_EVENT_RPC)
+ mq = session_manager_get_vpp_event_queue (thread_index);
+ while (svm_msg_q_try_lock (mq))
{
- evt.rpc_args.fp = fp;
- evt.rpc_args.arg = rpc_args;
- }
- else
- evt.session_handle = session_handle;
-
- q = session_manager_get_vpp_event_queue (thread_index);
- while (svm_queue_add (q, (u8 *) & evt, 1))
- {
- if (tries++ == 3)
+ max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3;
+ if (tries++ == max_tries)
{
SESSION_DBG ("failed to enqueue evt");
- break;
+ return -1;
}
}
+ if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+ {
+ svm_msg_q_unlock (mq);
+ return -2;
+ }
+ msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+ if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg)))
+ {
+ svm_msg_q_unlock (mq);
+ return -2;
+ }
+ evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
+ evt->event_type = evt_type;
+ switch (evt_type)
+ {
+ case FIFO_EVENT_RPC:
+ evt->rpc_args.fp = data;
+ evt->rpc_args.arg = args;
+ break;
+ case FIFO_EVENT_APP_TX:
+ case FIFO_EVENT_BUILTIN_RX:
+ evt->fifo = data;
+ break;
+ case FIFO_EVENT_DISCONNECT:
+ evt->session_handle = session_handle ((stream_session_t *) data);
+ break;
+ default:
+ clib_warning ("evt unhandled!");
+ svm_msg_q_unlock (mq);
+ return -1;
+ }
+
+ svm_msg_q_add_and_unlock (mq, &msg);
+ return 0;
}
-void
-session_send_session_evt_to_thread (u64 session_handle,
- fifo_event_type_t evt_type,
- u32 thread_index)
+int
+session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
+{
+ return session_send_evt_to_thread (f, 0, f->master_thread_index, evt_type);
+}
+
+int
+session_send_io_evt_to_thread_custom (svm_fifo_t * f, u32 thread_index,
+ session_evt_type_t evt_type)
{
- session_send_evt_to_thread (session_handle, evt_type, thread_index, 0, 0);
+ return session_send_evt_to_thread (f, 0, thread_index, evt_type);
+}
+
+int
+session_send_ctrl_evt_to_thread (stream_session_t * s,
+ session_evt_type_t evt_type)
+{
+ /* only event supported for now is disconnect */
+ ASSERT (evt_type == FIFO_EVENT_DISCONNECT);
+ return session_send_evt_to_thread (s, 0, s->thread_index,
+ FIFO_EVENT_DISCONNECT);
}
void
session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
{
if (thread_index != vlib_get_thread_index ())
- session_send_evt_to_thread (0, FIFO_EVENT_RPC, thread_index, fp,
- rpc_args);
+ session_send_evt_to_thread (fp, rpc_args, thread_index, FIFO_EVENT_RPC);
else
{
void (*fnp) (void *) = fp;
stream_session_t *s;
u32 thread_index = tc->thread_index;
- ASSERT (thread_index == vlib_get_thread_index ());
+ ASSERT (thread_index == vlib_get_thread_index ()
+ || transport_protocol_is_cl (tc->proto));
s = session_alloc (thread_index);
s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
}
u32
-stream_session_tx_fifo_max_dequeue (transport_connection_t * tc)
+session_tx_fifo_max_dequeue (transport_connection_t * tc)
{
stream_session_t *s = session_get (tc->s_index, tc->thread_index);
if (!s->server_tx_fifo)
/**
* Notify session peer that new data has been enqueued.
*
- * @param s Stream session for which the event is to be generated.
- * @param block Flag to indicate if call should block if event queue is full.
+ * @param s Stream session for which the event is to be generated.
+ * @param lock Flag to indicate if call should lock message queue.
*
- * @return 0 on succes or negative number if failed to send notification.
+ * @return 0 on success or negative number if failed to send notification.
*/
-static int
-session_enqueue_notify (stream_session_t * s, u8 block)
+static inline int
+session_enqueue_notify (stream_session_t * s, u8 lock)
{
- application_t *app;
- session_fifo_event_t evt;
- svm_queue_t *q;
+ app_worker_t *app;
- if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
+ app = app_worker_get_if_valid (s->app_wrk_index);
+ if (PREDICT_FALSE (!app))
{
- /* Session is closed so app will never clean up. Flush rx fifo */
- u32 to_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
- if (to_dequeue)
- svm_fifo_dequeue_drop (s->server_rx_fifo, to_dequeue);
+ SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
return 0;
}
- /* Get session's server */
- app = application_get_if_valid (s->app_index);
-
- if (PREDICT_FALSE (app == 0))
- {
- clib_warning ("invalid s->app_index = %d", s->app_index);
- return 0;
- }
-
- /* Built-in app? Hand event to the callback... */
- if (app->cb_fns.builtin_app_rx_callback)
- return app->cb_fns.builtin_app_rx_callback (s);
-
- /* If no event, send one */
- if (svm_fifo_set_event (s->server_rx_fifo))
- {
- /* Fabricate event */
- evt.fifo = s->server_rx_fifo;
- evt.event_type = FIFO_EVENT_APP_RX;
-
- /* Add event to server's event queue */
- q = app->event_queue;
-
- /* Based on request block (or not) for lack of space */
- if (block || PREDICT_TRUE (q->cursize < q->maxsize))
- svm_queue_add (app->event_queue, (u8 *) & evt,
- 0 /* do wait for mutex */ );
- else
- {
- clib_warning ("fifo full");
- return -1;
- }
- }
-
/* *INDENT-OFF* */
SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
- ed->data[0] = evt.event_type;
+ ed->data[0] = FIFO_EVENT_APP_RX;
ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo);
}));
/* *INDENT-ON* */
- return 0;
+ if (lock)
+ return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX);
+
+ return application_send_event (app, s, FIFO_EVENT_APP_RX);
+}
+
+int
+session_dequeue_notify (stream_session_t * s)
+{
+ app_worker_t *app;
+
+ app = app_worker_get_if_valid (s->app_wrk_index);
+ if (PREDICT_FALSE (!app))
+ return -1;
+
+ if (session_transport_service_type (s) == TRANSPORT_SERVICE_CL)
+ return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX);
+
+ return application_send_event (app, s, FIFO_EVENT_APP_TX);
}
/**
session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index)
{
session_manager_main_t *smm = &session_manager_main;
- u32 *indices;
+ transport_service_type_t tp_service;
+ int i, errors = 0, lock;
stream_session_t *s;
- int i, errors = 0;
+ u32 *indices;
indices = smm->session_to_enqueue[transport_proto][thread_index];
+ tp_service = transport_protocol_service_type (transport_proto);
+ lock = tp_service == TRANSPORT_SERVICE_CL;
for (i = 0; i < vec_len (indices); i++)
{
s = session_get_if_valid (indices[i], thread_index);
- if (s == 0 || session_enqueue_notify (s, 0 /* don't block */ ))
+ if (PREDICT_FALSE (!s))
+ {
+ errors++;
+ continue;
+ }
+ if (PREDICT_FALSE (session_enqueue_notify (s, lock)))
errors++;
}
u32 opaque = 0, new_ti, new_si;
stream_session_t *new_s = 0;
segment_manager_t *sm;
+ app_worker_t *app_wrk;
application_t *app;
u8 alloc_fifos;
int error = 0;
/* Get the app's index from the handle we stored when opening connection
* and the opaque (api_context for external apps) from transport session
* index */
- app = application_get_if_valid (handle >> 32);
- if (!app)
+ app_wrk = app_worker_get_if_valid (handle >> 32);
+ if (!app_wrk)
return -1;
opaque = tc->s_index;
+ app = application_get (app_wrk->app_index);
/*
* Allocate new session with fifos (svm segments are allocated if needed)
*/
if (!is_fail)
{
- sm = application_get_connect_segment_manager (app);
+ sm = app_worker_get_connect_segment_manager (app_wrk);
alloc_fifos = !application_is_builtin_proxy (app);
if (session_alloc_and_init (sm, tc, alloc_fifos, &new_s))
{
}
else
{
- new_s->app_index = app->index;
+ new_s->app_wrk_index = app_wrk->wrk_index;
new_si = new_s->session_index;
new_ti = new_s->thread_index;
}
/*
* Notify client application
*/
- if (app->cb_fns.session_connected_callback (app->index, opaque, new_s,
- is_fail))
+ if (app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque,
+ new_s, is_fail))
{
SESSION_DBG ("failed to notify app");
if (!is_fail)
void
stream_session_accept_notify (transport_connection_t * tc)
{
- application_t *server;
+ app_worker_t *app_wrk;
+ application_t *app;
stream_session_t *s;
s = session_get (tc->s_index, tc->thread_index);
- server = application_get (s->app_index);
- server->cb_fns.session_accept_callback (s);
+ app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+ if (!app_wrk)
+ return;
+ app = application_get (app_wrk->app_index);
+ app->cb_fns.session_accept_callback (s);
}
/**
void
stream_session_disconnect_notify (transport_connection_t * tc)
{
- application_t *server;
+ app_worker_t *app_wrk;
+ application_t *app;
stream_session_t *s;
s = session_get (tc->s_index, tc->thread_index);
- server = application_get (s->app_index);
- server->cb_fns.session_disconnect_callback (s);
+ s->session_state = SESSION_STATE_CLOSING;
+ app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+ if (!app_wrk)
+ return;
+ app = application_get (app_wrk->app_index);
+ app->cb_fns.session_disconnect_callback (s);
}
/**
* Cleans up session and lookup table.
+ *
+ * Transport connection must still be valid.
*/
void
stream_session_delete (stream_session_t * s)
stream_session_reset_notify (transport_connection_t * tc)
{
stream_session_t *s;
+ app_worker_t *app_wrk;
application_t *app;
s = session_get (tc->s_index, tc->thread_index);
-
- app = application_get (s->app_index);
+ s->session_state = SESSION_STATE_CLOSED;
+ app_wrk = app_worker_get (s->app_wrk_index);
+ app = application_get (app_wrk->app_index);
app->cb_fns.session_reset_callback (s);
}
stream_session_accept (transport_connection_t * tc, u32 listener_index,
u8 notify)
{
- application_t *server;
stream_session_t *s, *listener;
+ app_worker_t *app_wrk;
segment_manager_t *sm;
int rv;
/* Find the server */
listener = listen_session_get (listener_index);
- server = application_get (listener->app_index);
+ app_wrk = app_worker_get (listener->app_wrk_index);
- sm = application_get_listen_segment_manager (server, listener);
+ sm = app_worker_get_listen_segment_manager (app_wrk, listener);
if ((rv = session_alloc_and_init (sm, tc, 1, &s)))
return rv;
- s->app_index = server->index;
+ s->app_wrk_index = app_wrk->wrk_index;
s->listener_index = listener_index;
s->session_state = SESSION_STATE_ACCEPTING;
/* Shoulder-tap the server */
if (notify)
{
- server->cb_fns.session_accept_callback (s);
+ application_t *app = application_get (app_wrk->app_index);
+ app->cb_fns.session_accept_callback (s);
}
return 0;
}
int
-session_open_cl (u32 app_index, session_endpoint_t * rmt, u32 opaque)
+session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
{
transport_connection_t *tc;
transport_endpoint_t *tep;
segment_manager_t *sm;
+ app_worker_t *app_wrk;
stream_session_t *s;
application_t *app;
int rv;
/* For dgram type of service, allocate session and fifos now.
*/
- app = application_get (app_index);
- sm = application_get_connect_segment_manager (app);
+ app_wrk = app_worker_get (app_wrk_index);
+ sm = app_worker_get_connect_segment_manager (app_wrk);
if (session_alloc_and_init (sm, tc, 1, &s))
return -1;
- s->app_index = app->index;
+ s->app_wrk_index = app_wrk->wrk_index;
s->session_state = SESSION_STATE_OPENED;
/* Tell the app about the new event fifo for this session */
- app->cb_fns.session_connected_callback (app->index, opaque, s, 0);
+ app = application_get (app_wrk->app_index);
+ app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque, s, 0);
return 0;
}
int
-session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque)
+session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
{
transport_connection_t *tc;
transport_endpoint_t *tep;
* is needed when the connect notify comes and we have to notify the
* external app
*/
- handle = (((u64) app_index) << 32) | (u64) tc->c_index;
+ handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index;
session_lookup_add_half_open (tc, handle);
/* Store api_context (opaque) for when the reply comes. Not the nicest
}
int
-session_open_app (u32 app_index, session_endpoint_t * rmt, u32 opaque)
+session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
{
session_endpoint_extended_t *sep = (session_endpoint_extended_t *) rmt;
- sep->app_index = app_index;
+ sep->app_wrk_index = app_wrk_index;
sep->opaque = opaque;
return tp_vfts[rmt->transport_proto].open ((transport_endpoint_t *) sep);
* on open completion.
*/
int
-session_open (u32 app_index, session_endpoint_t * rmt, u32 opaque)
+session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
{
transport_service_type_t tst = tp_vfts[rmt->transport_proto].service_type;
- return session_open_srv_fns[tst] (app_index, rmt, opaque);
+ return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque);
}
int
session_listen_cl (stream_session_t * s, session_endpoint_t * sep)
{
transport_connection_t *tc;
- application_t *server;
+ app_worker_t *server;
segment_manager_t *sm;
u32 tci;
if (tc == 0)
return -1;
- server = application_get (s->app_index);
- sm = application_get_listen_segment_manager (server, s);
+ server = app_worker_get (s->app_wrk_index);
+ sm = app_worker_get_listen_segment_manager (server, s);
if (session_alloc_fifos (sm, s))
return -1;
{
session_endpoint_extended_t esep;
clib_memcpy (&esep, sep, sizeof (*sep));
- esep.app_index = s->app_index;
+ esep.app_wrk_index = s->app_wrk_index;
return tp_vfts[sep->transport_proto].bind (s->session_index,
(transport_endpoint_t *) & esep);
void
stream_session_disconnect (stream_session_t * s)
{
- if (!s || s->session_state == SESSION_STATE_CLOSED)
+ u32 thread_index = vlib_get_thread_index ();
+ session_manager_main_t *smm = &session_manager_main;
+ session_event_t *evt;
+
+ if (!s)
return;
- s->session_state = SESSION_STATE_CLOSED;
- session_send_session_evt_to_thread (session_handle (s),
- FIFO_EVENT_DISCONNECT, s->thread_index);
+
+ if (s->session_state >= SESSION_STATE_CLOSING)
+ {
+ /* Session already closed. Clear the tx fifo */
+ if (s->session_state == SESSION_STATE_CLOSED)
+ svm_fifo_dequeue_drop_all (s->server_tx_fifo);
+ return;
+ }
+
+ s->session_state = SESSION_STATE_CLOSING;
+
+ /* If we are in the handler thread, or being called with the worker barrier
+ * held (api/cli), just append a new event to pending disconnects vector. */
+ if ((thread_index == 0 && !vlib_get_current_process (vlib_get_main ()))
+ || thread_index == s->thread_index)
+ {
+ ASSERT (s->thread_index == thread_index || thread_index == 0);
+ vec_add2 (smm->pending_disconnects[s->thread_index], evt, 1);
+ memset (evt, 0, sizeof (*evt));
+ evt->session_handle = session_handle (s);
+ evt->event_type = FIFO_EVENT_DISCONNECT;
+ }
+ else
+ session_send_ctrl_evt_to_thread (s, FIFO_EVENT_DISCONNECT);
}
/**
/**
* Cleanup transport and session state.
*
- * Notify transport of the cleanup, wait for a delete notify to actually
- * remove the session state.
+ * Notify transport of the cleanup and free the session. This should
+ * be called only if transport reported some error and is already
+ * closed.
*/
void
stream_session_cleanup (stream_session_t * s)
{
- int rv;
-
s->session_state = SESSION_STATE_CLOSED;
- /* Delete from the main lookup table to avoid more enqueues */
- rv = session_lookup_del_session (s);
- if (rv)
- clib_warning ("hash delete error, rv %d", rv);
-
+ /* Delete from main lookup table before we axe the the transport */
+ session_lookup_del_session (s);
tp_vfts[session_get_transport_proto (s)].cleanup (s->connection_index,
s->thread_index);
+ /* Since we called cleanup, no delete notification will come. So, make
+ * sure the session is properly freed. */
+ segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo,
+ s->server_tx_fifo);
+ session_free (s);
+}
+
+transport_service_type_t
+session_transport_service_type (stream_session_t * s)
+{
+ transport_proto_t tp;
+ tp = session_get_transport_proto (s);
+ return transport_protocol_service_type (tp);
+}
+
+transport_tx_fn_type_t
+session_transport_tx_fn_type (stream_session_t * s)
+{
+ transport_proto_t tp;
+ tp = session_get_transport_proto (s);
+ return transport_protocol_tx_fn_type (tp);
+}
+
+u8
+session_tx_is_dgram (stream_session_t * s)
+{
+ return (session_transport_tx_fn_type (s) == TRANSPORT_TX_DGRAM);
}
/**
void
session_vpp_event_queues_allocate (session_manager_main_t * smm)
{
- u32 evt_q_length = 2048, evt_size = sizeof (session_fifo_event_t);
+ u32 evt_q_length = 2048, evt_size = sizeof (session_event_t);
ssvm_private_t *eqs = &smm->evt_qs_segment;
api_main_t *am = &api_main;
u64 eqs_size = 64 << 20;
eqs->name = format (0, "%s%c", "evt-qs-segment", 0);
eqs->requested_va = smm->session_baseva;
- ssvm_master_init (eqs, SSVM_SEGMENT_MEMFD);
+ if (ssvm_master_init (eqs, SSVM_SEGMENT_MEMFD))
+ {
+ clib_warning ("failed to initialize queue segment");
+ return;
+ }
}
if (smm->evt_qs_use_memfd_seg)
for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
{
- smm->vpp_event_queues[i] = svm_queue_init (evt_q_length, evt_size,
- vpp_pid, 0);
+ svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
+ u32 notif_q_size = clib_max (16, evt_q_length >> 4);
+ svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
+ {evt_q_length, evt_size, 0}
+ ,
+ {notif_q_size, 256, 0}
+ };
+ cfg->consumer_pid = 0;
+ cfg->n_rings = 2;
+ cfg->q_nitems = evt_q_length;
+ cfg->ring_cfgs = rc;
+ smm->vpp_event_queues[i] = svm_msg_q_alloc (cfg);
+ if (smm->evt_qs_use_memfd_seg)
+ {
+ if (svm_msg_q_alloc_consumer_eventfd (smm->vpp_event_queues[i]))
+ clib_warning ("eventfd returned");
+ }
}
if (smm->evt_qs_use_memfd_seg)
return 0;
}
+void
+session_flush_frames_main_thread (vlib_main_t * vm)
+{
+ ASSERT (vlib_get_thread_index () == 0);
+ vlib_process_signal_event_mt (vm, session_queue_process_node.index,
+ SESSION_Q_PROCESS_FLUSH_FRAMES, 0);
+}
+
static clib_error_t *
session_manager_main_enable (vlib_main_t * vm)
{
vec_validate (smm->free_event_vector, num_threads - 1);
vec_validate (smm->vpp_event_queues, num_threads - 1);
vec_validate (smm->peekers_rw_locks, num_threads - 1);
+ vec_validate_aligned (smm->ctx, num_threads - 1, CLIB_CACHE_LINE_BYTES);
for (i = 0; i < TRANSPORT_N_PROTO; i++)
{
session_node_enable_disable (u8 is_en)
{
u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
+ vlib_thread_main_t *vtm = vlib_get_thread_main ();
+ u8 have_workers = vtm->n_threads != 0;
+
/* *INDENT-OFF* */
foreach_vlib_main (({
+ if (have_workers && ii == 0)
+ {
+ vlib_node_set_state (this_vlib_main, session_queue_process_node.index,
+ state);
+ if (is_en)
+ {
+ vlib_node_t *n = vlib_get_node (this_vlib_main,
+ session_queue_process_node.index);
+ vlib_start_process (this_vlib_main, n->runtime_index);
+ }
+ else
+ {
+ vlib_process_signal_event_mt (this_vlib_main,
+ session_queue_process_node.index,
+ SESSION_Q_PROCESS_STOP, 0);
+ }
+
+ continue;
+ }
vlib_node_set_state (this_vlib_main, session_queue_node.index,
state);
}));