session_send_evt_to_thread (u64 session_handle, fifo_event_type_t evt_type,
u32 thread_index, void *fp, void *rpc_args)
{
- u32 tries = 0;
session_fifo_event_t evt = { {0}, };
svm_queue_t *q;
+ u32 tries = 0, max_tries;
evt.event_type = evt_type;
if (evt_type == FIFO_EVENT_RPC)
q = session_manager_get_vpp_event_queue (thread_index);
while (svm_queue_add (q, (u8 *) & evt, 1))
{
- if (tries++ == 3)
+ max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3;
+ if (tries++ == max_tries)
{
SESSION_DBG ("failed to enqueue evt");
break;
stream_session_t *s;
u32 thread_index = tc->thread_index;
- ASSERT (thread_index == vlib_get_thread_index ());
+ ASSERT (thread_index == vlib_get_thread_index ()
+ || transport_protocol_is_cl (tc->proto));
s = session_alloc (thread_index);
s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
}
u32
-stream_session_tx_fifo_max_dequeue (transport_connection_t * tc)
+session_tx_fifo_max_dequeue (transport_connection_t * tc)
{
stream_session_t *s = session_get (tc->s_index, tc->thread_index);
if (!s->server_tx_fifo)
if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
{
/* Session is closed so app will never clean up. Flush rx fifo */
- u32 to_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
- if (to_dequeue)
- svm_fifo_dequeue_drop (s->server_rx_fifo, to_dequeue);
+ svm_fifo_dequeue_drop_all (s->server_rx_fifo);
return 0;
}
s = session_get (tc->s_index, tc->thread_index);
server = application_get (s->app_index);
server->cb_fns.session_disconnect_callback (s);
+ s->session_state = SESSION_STATE_CLOSING;
}
/**
* Cleans up session and lookup table.
+ *
+ * Transport connection must still be valid.
*/
void
stream_session_delete (stream_session_t * s)
stream_session_t *s;
application_t *app;
s = session_get (tc->s_index, tc->thread_index);
-
+ s->session_state = SESSION_STATE_CLOSED;
app = application_get (s->app_index);
app->cb_fns.session_reset_callback (s);
}
void
stream_session_disconnect (stream_session_t * s)
{
- if (!s || s->session_state == SESSION_STATE_CLOSED)
+ u32 thread_index = vlib_get_thread_index ();
+ session_manager_main_t *smm = &session_manager_main;
+ session_fifo_event_t *evt;
+
+ if (!s)
return;
- s->session_state = SESSION_STATE_CLOSED;
- session_send_session_evt_to_thread (session_handle (s),
- FIFO_EVENT_DISCONNECT, s->thread_index);
+
+ if (s->session_state >= SESSION_STATE_CLOSING)
+ {
+ /* Session already closed. Clear the tx fifo */
+ if (s->session_state == SESSION_STATE_CLOSED)
+ svm_fifo_dequeue_drop_all (s->server_tx_fifo);
+ return;
+ }
+
+ s->session_state = SESSION_STATE_CLOSING;
+
+ /* If we are in the handler thread, or being called with the worker barrier
+ * held (api/cli), just append a new event to pending disconnects vector. */
+ if ((thread_index == 0 && !vlib_get_current_process (vlib_get_main ()))
+ || thread_index == s->thread_index)
+ {
+ ASSERT (s->thread_index == thread_index || thread_index == 0);
+ vec_add2 (smm->pending_disconnects[s->thread_index], evt, 1);
+ memset (evt, 0, sizeof (*evt));
+ evt->session_handle = session_handle (s);
+ evt->event_type = FIFO_EVENT_DISCONNECT;
+ }
+ else
+ session_send_session_evt_to_thread (session_handle (s),
+ FIFO_EVENT_DISCONNECT,
+ s->thread_index);
}
/**
/**
* Cleanup transport and session state.
*
- * Notify transport of the cleanup, wait for a delete notify to actually
- * remove the session state.
+ * Notify transport of the cleanup and free the session. This should
+ * be called only if transport reported some error and is already
+ * closed.
*/
void
stream_session_cleanup (stream_session_t * s)
{
- int rv;
-
s->session_state = SESSION_STATE_CLOSED;
- /* Delete from the main lookup table to avoid more enqueues */
- rv = session_lookup_del_session (s);
- if (rv)
- clib_warning ("hash delete error, rv %d", rv);
-
+ /* Delete from main lookup table before we axe the the transport */
+ session_lookup_del_session (s);
tp_vfts[session_get_transport_proto (s)].cleanup (s->connection_index,
s->thread_index);
+ /* Since we called cleanup, no delete notification will come. So, make
+ * sure the session is properly freed. */
+ segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo,
+ s->server_tx_fifo);
+ session_free (s);
+}
+
+transport_service_type_t
+session_transport_service_type (stream_session_t * s)
+{
+ transport_proto_t tp;
+ tp = session_get_transport_proto (s);
+ return transport_protocol_service_type (tp);
+}
+
+transport_tx_fn_type_t
+session_transport_tx_fn_type (stream_session_t * s)
+{
+ transport_proto_t tp;
+ tp = session_get_transport_proto (s);
+ return transport_protocol_tx_fn_type (tp);
+}
+
+u8
+session_tx_is_dgram (stream_session_t * s)
+{
+ return (session_transport_tx_fn_type (s) == TRANSPORT_TX_DGRAM);
}
/**
return 0;
}
+void
+session_flush_frames_main_thread (vlib_main_t * vm)
+{
+ ASSERT (vlib_get_thread_index () == 0);
+ vlib_process_signal_event_mt (vm, session_queue_process_node.index,
+ SESSION_Q_PROCESS_FLUSH_FRAMES, 0);
+}
+
static clib_error_t *
session_manager_main_enable (vlib_main_t * vm)
{
session_node_enable_disable (u8 is_en)
{
u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
+ vlib_thread_main_t *vtm = vlib_get_thread_main ();
+ u8 have_workers = vtm->n_threads != 0;
+
/* *INDENT-OFF* */
foreach_vlib_main (({
+ if (have_workers && ii == 0)
+ {
+ vlib_node_set_state (this_vlib_main, session_queue_process_node.index,
+ state);
+ if (is_en)
+ {
+ vlib_node_t *n = vlib_get_node (this_vlib_main,
+ session_queue_process_node.index);
+ vlib_start_process (this_vlib_main, n->runtime_index);
+ }
+ else
+ {
+ vlib_process_signal_event_mt (this_vlib_main,
+ session_queue_process_node.index,
+ SESSION_Q_PROCESS_STOP, 0);
+ }
+
+ continue;
+ }
vlib_node_set_state (this_vlib_main, session_queue_node.index,
state);
}));