session_manager_main_t session_manager_main;
extern transport_proto_vft_t *tp_vfts;
-static void
-session_send_evt_to_thread (u64 session_handle, fifo_event_type_t evt_type,
- u32 thread_index, void *fp, void *rpc_args)
+static inline int
+session_send_evt_to_thread (void *data, void *args, u32 thread_index,
+ session_evt_type_t evt_type)
{
- u32 tries = 0;
- session_fifo_event_t evt = { {0}, };
- svm_queue_t *q;
-
- evt.event_type = evt_type;
- if (evt_type == FIFO_EVENT_RPC)
- {
- evt.rpc_args.fp = fp;
- evt.rpc_args.arg = rpc_args;
- }
- else
- evt.session_handle = session_handle;
+ session_fifo_event_t *evt;
+ svm_msg_q_msg_t msg;
+ svm_msg_q_t *mq;
+ u32 tries = 0, max_tries;
- q = session_manager_get_vpp_event_queue (thread_index);
- while (svm_queue_add (q, (u8 *) & evt, 1))
+ mq = session_manager_get_vpp_event_queue (thread_index);
+ while (svm_msg_q_try_lock (mq))
{
- if (tries++ == 3)
+ max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3;
+ if (tries++ == max_tries)
{
SESSION_DBG ("failed to enqueue evt");
- break;
+ return -1;
}
}
+ if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+ {
+ svm_msg_q_unlock (mq);
+ return -2;
+ }
+ msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+ if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg)))
+ {
+ svm_msg_q_unlock (mq);
+ return -2;
+ }
+ evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+ evt->event_type = evt_type;
+ switch (evt_type)
+ {
+ case FIFO_EVENT_RPC:
+ evt->rpc_args.fp = data;
+ evt->rpc_args.arg = args;
+ break;
+ case FIFO_EVENT_APP_TX:
+ case FIFO_EVENT_BUILTIN_RX:
+ evt->fifo = data;
+ break;
+ case FIFO_EVENT_DISCONNECT:
+ evt->session_handle = session_handle ((stream_session_t *) data);
+ break;
+ default:
+ clib_warning ("evt unhandled!");
+ svm_msg_q_unlock (mq);
+ return -1;
+ }
+
+ svm_msg_q_add_w_lock (mq, &msg);
+ svm_msg_q_unlock (mq);
+ return 0;
}
-void
-session_send_session_evt_to_thread (u64 session_handle,
- fifo_event_type_t evt_type,
- u32 thread_index)
+int
+session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
{
- session_send_evt_to_thread (session_handle, evt_type, thread_index, 0, 0);
+ return session_send_evt_to_thread (f, 0, f->master_thread_index, evt_type);
+}
+
+int
+session_send_io_evt_to_thread_custom (svm_fifo_t * f, u32 thread_index,
+ session_evt_type_t evt_type)
+{
+ return session_send_evt_to_thread (f, 0, thread_index, evt_type);
+}
+
+int
+session_send_ctrl_evt_to_thread (stream_session_t * s,
+ session_evt_type_t evt_type)
+{
+ /* only event supported for now is disconnect */
+ ASSERT (evt_type == FIFO_EVENT_DISCONNECT);
+ return session_send_evt_to_thread (s, 0, s->thread_index,
+ FIFO_EVENT_DISCONNECT);
}
void
session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
{
if (thread_index != vlib_get_thread_index ())
- session_send_evt_to_thread (0, FIFO_EVENT_RPC, thread_index, fp,
- rpc_args);
+ session_send_evt_to_thread (fp, rpc_args, thread_index, FIFO_EVENT_RPC);
else
{
void (*fnp) (void *) = fp;
stream_session_t *s;
u32 thread_index = tc->thread_index;
- ASSERT (thread_index == vlib_get_thread_index ());
+ ASSERT (thread_index == vlib_get_thread_index ()
+ || transport_protocol_is_cl (tc->proto));
s = session_alloc (thread_index);
s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
return enqueued;
}
+
int
-session_enqueue_dgram_connection (stream_session_t * s, vlib_buffer_t * b,
- u8 proto, u8 queue_event)
+session_enqueue_dgram_connection (stream_session_t * s,
+ session_dgram_hdr_t * hdr,
+ vlib_buffer_t * b, u8 proto, u8 queue_event)
{
int enqueued = 0, rv, in_order_off;
- if (svm_fifo_max_enqueue (s->server_rx_fifo) < b->current_length)
- return -1;
+ ASSERT (svm_fifo_max_enqueue (s->server_rx_fifo)
+ >= b->current_length + sizeof (*hdr));
+
+ svm_fifo_enqueue_nowait (s->server_rx_fifo, sizeof (session_dgram_hdr_t),
+ (u8 *) hdr);
enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length,
vlib_buffer_get_current (b));
if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
}
u32
-stream_session_tx_fifo_max_dequeue (transport_connection_t * tc)
+session_tx_fifo_max_dequeue (transport_connection_t * tc)
{
stream_session_t *s = session_get (tc->s_index, tc->thread_index);
if (!s->server_tx_fifo)
/**
* Notify session peer that new data has been enqueued.
*
- * @param s Stream session for which the event is to be generated.
- * @param block Flag to indicate if call should block if event queue is full.
+ * @param s Stream session for which the event is to be generated.
+ * @param lock Flag to indicate if call should lock message queue.
*
- * @return 0 on succes or negative number if failed to send notification.
+ * @return 0 on success or negative number if failed to send notification.
*/
-static int
-session_enqueue_notify (stream_session_t * s, u8 block)
+static inline int
+session_enqueue_notify (stream_session_t * s, u8 lock)
{
application_t *app;
- session_fifo_event_t evt;
- svm_queue_t *q;
-
- if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
- {
- /* Session is closed so app will never clean up. Flush rx fifo */
- u32 to_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
- if (to_dequeue)
- svm_fifo_dequeue_drop (s->server_rx_fifo, to_dequeue);
- return 0;
- }
- /* Get session's server */
app = application_get_if_valid (s->app_index);
-
if (PREDICT_FALSE (app == 0))
{
clib_warning ("invalid s->app_index = %d", s->app_index);
return 0;
}
- /* Built-in app? Hand event to the callback... */
- if (app->cb_fns.builtin_app_rx_callback)
- return app->cb_fns.builtin_app_rx_callback (s);
-
- /* If no event, send one */
- if (svm_fifo_set_event (s->server_rx_fifo))
- {
- /* Fabricate event */
- evt.fifo = s->server_rx_fifo;
- evt.event_type = FIFO_EVENT_APP_RX;
-
- /* Add event to server's event queue */
- q = app->event_queue;
-
- /* Based on request block (or not) for lack of space */
- if (block || PREDICT_TRUE (q->cursize < q->maxsize))
- svm_queue_add (app->event_queue, (u8 *) & evt,
- 0 /* do wait for mutex */ );
- else
- {
- clib_warning ("fifo full");
- return -1;
- }
- }
-
/* *INDENT-OFF* */
SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
- ed->data[0] = evt.event_type;
+ ed->data[0] = FIFO_EVENT_APP_RX;
ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo);
}));
/* *INDENT-ON* */
- return 0;
+ if (lock)
+ return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX);
+
+ return application_send_event (app, s, FIFO_EVENT_APP_RX);
+}
+
+int
+session_dequeue_notify (stream_session_t * s)
+{
+ application_t *app;
+
+ app = application_get_if_valid (s->app_index);
+ if (PREDICT_FALSE (!app))
+ return -1;
+
+ if (session_transport_service_type (s) == TRANSPORT_SERVICE_CL)
+ return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX);
+
+ return application_send_event (app, s, FIFO_EVENT_APP_TX);
}
/**
session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index)
{
session_manager_main_t *smm = &session_manager_main;
- u32 *indices;
+ transport_service_type_t tp_service;
+ int i, errors = 0, lock;
stream_session_t *s;
- int i, errors = 0;
+ u32 *indices;
indices = smm->session_to_enqueue[transport_proto][thread_index];
+ tp_service = transport_protocol_service_type (transport_proto);
+ lock = tp_service == TRANSPORT_SERVICE_CL;
for (i = 0; i < vec_len (indices); i++)
{
s = session_get_if_valid (indices[i], thread_index);
- if (s == 0 || session_enqueue_notify (s, 0 /* don't block */ ))
+ if (PREDICT_FALSE (!s))
+ {
+ errors++;
+ continue;
+ }
+ if (PREDICT_FALSE (session_enqueue_notify (s, lock)))
errors++;
}
return errors;
}
+int
+session_manager_flush_all_enqueue_events (u8 transport_proto)
+{
+ vlib_thread_main_t *vtm = vlib_get_thread_main ();
+ int i, errors = 0;
+ for (i = 0; i < 1 + vtm->n_threads; i++)
+ errors += session_manager_flush_enqueue_events (transport_proto, i);
+ return errors;
+}
+
/**
* Init fifo tail and head pointers
*
s = session_get (tc->s_index, tc->thread_index);
server = application_get (s->app_index);
server->cb_fns.session_disconnect_callback (s);
+ s->session_state = SESSION_STATE_CLOSING;
}
/**
* Cleans up session and lookup table.
+ *
+ * Transport connection must still be valid.
*/
void
stream_session_delete (stream_session_t * s)
stream_session_t *s;
application_t *app;
s = session_get (tc->s_index, tc->thread_index);
-
+ s->session_state = SESSION_STATE_CLOSED;
app = application_get (s->app_index);
app->cb_fns.session_reset_callback (s);
}
application_t *server;
stream_session_t *s, *listener;
segment_manager_t *sm;
- session_type_t sst;
int rv;
- sst = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
-
/* Find the server */
- listener = listen_session_get (sst, listener_index);
+ listener = listen_session_get (listener_index);
server = application_get (listener->app_index);
sm = application_get_listen_segment_manager (server, listener);
if (session_alloc_and_init (sm, tc, 1, &s))
return -1;
s->app_index = app->index;
- s->session_state = SESSION_STATE_CONNECTING_READY;
+ s->session_state = SESSION_STATE_OPENED;
/* Tell the app about the new event fifo for this session */
app->cb_fns.session_connected_callback (app->index, opaque, s, 0);
u64 handle;
int rv;
- /* TODO until udp is fixed */
- if (rmt->transport_proto == TRANSPORT_PROTO_UDP)
- return session_open_cl (app_index, rmt, opaque);
-
tep = session_endpoint_to_transport (rmt);
rv = tp_vfts[rmt->transport_proto].open (tep);
if (rv < 0)
return session_open_srv_fns[tst] (app_index, rmt, opaque);
}
-/**
- * Ask transport to listen on local transport endpoint.
- *
- * @param s Session for which listen will be called. Note that unlike
- * established sessions, listen sessions are not associated to a
- * thread.
- * @param tep Local endpoint to be listened on.
- */
int
session_listen_vc (stream_session_t * s, session_endpoint_t * sep)
{
return 0;
}
+int
+session_listen_cl (stream_session_t * s, session_endpoint_t * sep)
+{
+ transport_connection_t *tc;
+ application_t *server;
+ segment_manager_t *sm;
+ u32 tci;
+
+ /* Transport bind/listen */
+ tci = tp_vfts[sep->transport_proto].bind (s->session_index,
+ session_endpoint_to_transport
+ (sep));
+
+ if (tci == (u32) ~ 0)
+ return -1;
+
+ /* Attach transport to session */
+ s->connection_index = tci;
+ tc = tp_vfts[sep->transport_proto].get_listener (tci);
+
+ /* Weird but handle it ... */
+ if (tc == 0)
+ return -1;
+
+ server = application_get (s->app_index);
+ sm = application_get_listen_segment_manager (server, s);
+ if (session_alloc_fifos (sm, s))
+ return -1;
+
+ /* Add to the main lookup table */
+ session_lookup_add_connection (tc, s->session_index);
+ return 0;
+}
+
int
session_listen_app (stream_session_t * s, session_endpoint_t * sep)
{
static session_listen_service_fn
session_listen_srv_fns[TRANSPORT_N_SERVICES] = {
session_listen_vc,
- session_listen_vc,
+ session_listen_cl,
session_listen_app,
};
/* *INDENT-ON* */
+/**
+ * Ask transport to listen on local transport endpoint.
+ *
+ * @param s Session for which listen will be called. Note that unlike
+ * established sessions, listen sessions are not associated to a
+ * thread.
+ * @param tep Local endpoint to be listened on.
+ */
int
stream_session_listen (stream_session_t * s, session_endpoint_t * sep)
{
void
stream_session_disconnect (stream_session_t * s)
{
- if (!s || s->session_state == SESSION_STATE_CLOSED)
+ u32 thread_index = vlib_get_thread_index ();
+ session_manager_main_t *smm = &session_manager_main;
+ session_fifo_event_t *evt;
+
+ if (!s)
return;
- s->session_state = SESSION_STATE_CLOSED;
- session_send_session_evt_to_thread (session_handle (s),
- FIFO_EVENT_DISCONNECT, s->thread_index);
+
+ if (s->session_state >= SESSION_STATE_CLOSING)
+ {
+ /* Session already closed. Clear the tx fifo */
+ if (s->session_state == SESSION_STATE_CLOSED)
+ svm_fifo_dequeue_drop_all (s->server_tx_fifo);
+ return;
+ }
+
+ s->session_state = SESSION_STATE_CLOSING;
+
+ /* If we are in the handler thread, or being called with the worker barrier
+ * held (api/cli), just append a new event to pending disconnects vector. */
+ if ((thread_index == 0 && !vlib_get_current_process (vlib_get_main ()))
+ || thread_index == s->thread_index)
+ {
+ ASSERT (s->thread_index == thread_index || thread_index == 0);
+ vec_add2 (smm->pending_disconnects[s->thread_index], evt, 1);
+ memset (evt, 0, sizeof (*evt));
+ evt->session_handle = session_handle (s);
+ evt->event_type = FIFO_EVENT_DISCONNECT;
+ }
+ else
+ session_send_ctrl_evt_to_thread (s, FIFO_EVENT_DISCONNECT);
}
/**
/**
* Cleanup transport and session state.
*
- * Notify transport of the cleanup, wait for a delete notify to actually
- * remove the session state.
+ * Notify transport of the cleanup and free the session. This should
+ * be called only if transport reported some error and is already
+ * closed.
*/
void
stream_session_cleanup (stream_session_t * s)
{
- int rv;
-
s->session_state = SESSION_STATE_CLOSED;
- /* Delete from the main lookup table to avoid more enqueues */
- rv = session_lookup_del_session (s);
- if (rv)
- clib_warning ("hash delete error, rv %d", rv);
-
+ /* Delete from main lookup table before we axe the the transport */
+ session_lookup_del_session (s);
tp_vfts[session_get_transport_proto (s)].cleanup (s->connection_index,
s->thread_index);
+ /* Since we called cleanup, no delete notification will come. So, make
+ * sure the session is properly freed. */
+ segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo,
+ s->server_tx_fifo);
+ session_free (s);
+}
+
+transport_service_type_t
+session_transport_service_type (stream_session_t * s)
+{
+ transport_proto_t tp;
+ tp = session_get_transport_proto (s);
+ return transport_protocol_service_type (tp);
+}
+
+transport_tx_fn_type_t
+session_transport_tx_fn_type (stream_session_t * s)
+{
+ transport_proto_t tp;
+ tp = session_get_transport_proto (s);
+ return transport_protocol_tx_fn_type (tp);
+}
+
+u8
+session_tx_is_dgram (stream_session_t * s)
+{
+ return (session_transport_tx_fn_type (s) == TRANSPORT_TX_DGRAM);
}
/**
eqs->name = format (0, "%s%c", "evt-qs-segment", 0);
eqs->requested_va = smm->session_baseva;
- ssvm_master_init (eqs, SSVM_SEGMENT_MEMFD);
+ if (ssvm_master_init (eqs, SSVM_SEGMENT_MEMFD))
+ {
+ clib_warning ("failed to initialize queue segment");
+ return;
+ }
}
if (smm->evt_qs_use_memfd_seg)
for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
{
- smm->vpp_event_queues[i] = svm_queue_init (evt_q_length, evt_size,
- vpp_pid, 0);
+ svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
+ u32 notif_q_size = clib_max (16, evt_q_length >> 4);
+ svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
+ {evt_q_length, evt_size, 0}
+ ,
+ {notif_q_size, 256, 0}
+ };
+ cfg->consumer_pid = 0;
+ cfg->n_rings = 2;
+ cfg->q_nitems = evt_q_length;
+ cfg->ring_cfgs = rc;
+ smm->vpp_event_queues[i] = svm_msg_q_alloc (cfg);
}
if (smm->evt_qs_use_memfd_seg)
static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
session_tx_fifo_peek_and_snd,
session_tx_fifo_dequeue_and_snd,
- session_tx_fifo_dequeue_internal
+ session_tx_fifo_dequeue_internal,
+ session_tx_fifo_dequeue_and_snd
};
/* *INDENT-ON* */
session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
vec_validate (smm->session_type_to_next, session_type);
- vec_validate (smm->listen_sessions, session_type);
vec_validate (smm->session_tx_fns, session_type);
/* *INDENT-OFF* */
return 0;
}
+void
+session_flush_frames_main_thread (vlib_main_t * vm)
+{
+ ASSERT (vlib_get_thread_index () == 0);
+ vlib_process_signal_event_mt (vm, session_queue_process_node.index,
+ SESSION_Q_PROCESS_FLUSH_FRAMES, 0);
+}
+
static clib_error_t *
session_manager_main_enable (vlib_main_t * vm)
{
vec_validate (smm->free_event_vector, num_threads - 1);
vec_validate (smm->vpp_event_queues, num_threads - 1);
vec_validate (smm->peekers_rw_locks, num_threads - 1);
+ vec_validate_aligned (smm->ctx, num_threads - 1, CLIB_CACHE_LINE_BYTES);
for (i = 0; i < TRANSPORT_N_PROTO; i++)
- for (j = 0; j < num_threads; j++)
- {
- vec_validate (smm->session_to_enqueue[i], num_threads - 1);
- vec_validate (smm->current_enqueue_epoch[i], num_threads - 1);
- }
+ {
+ vec_validate (smm->current_enqueue_epoch[i], num_threads - 1);
+ vec_validate (smm->session_to_enqueue[i], num_threads - 1);
+ for (j = 0; j < num_threads; j++)
+ smm->current_enqueue_epoch[i][j] = 1;
+ }
for (i = 0; i < num_threads; i++)
{
clib_rwlock_init (&smm->peekers_rw_locks[i]);
}
-#if SESSION_DBG
+#if SESSION_DEBUG
vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
#endif
session_node_enable_disable (u8 is_en)
{
u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
+ vlib_thread_main_t *vtm = vlib_get_thread_main ();
+ u8 have_workers = vtm->n_threads != 0;
+
/* *INDENT-OFF* */
foreach_vlib_main (({
+ if (have_workers && ii == 0)
+ {
+ vlib_node_set_state (this_vlib_main, session_queue_process_node.index,
+ state);
+ if (is_en)
+ {
+ vlib_node_t *n = vlib_get_node (this_vlib_main,
+ session_queue_process_node.index);
+ vlib_start_process (this_vlib_main, n->runtime_index);
+ }
+ else
+ {
+ vlib_process_signal_event_mt (this_vlib_main,
+ session_queue_process_node.index,
+ SESSION_Q_PROCESS_STOP, 0);
+ }
+
+ continue;
+ }
vlib_node_set_state (this_vlib_main, session_queue_node.index,
state);
}));