+int
+session_manager_flush_all_enqueue_events (u8 transport_proto)
+{
+ vlib_thread_main_t *vtm = vlib_get_thread_main ();
+ int i, errors = 0;
+ for (i = 0; i < 1 + vtm->n_threads; i++)
+ errors += session_manager_flush_enqueue_events (transport_proto, i);
+ return errors;
+}
+
+/**
+ * Init fifo tail and head pointers
+ *
+ * Useful if transport uses absolute offsets for tracking ooo segments.
+ */
+void
+stream_session_init_fifos_pointers (transport_connection_t * tc,
+ u32 rx_pointer, u32 tx_pointer)
+{
+ session_t *s;
+ s = session_get (tc->s_index, tc->thread_index);
+ svm_fifo_init_pointers (s->rx_fifo, rx_pointer);
+ svm_fifo_init_pointers (s->tx_fifo, tx_pointer);
+}
+
+int
+session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
+{
+ u32 opaque = 0, new_ti, new_si;
+ session_t *new_s = 0;
+ segment_manager_t *sm;
+ app_worker_t *app_wrk;
+ application_t *app;
+ u8 alloc_fifos;
+ int error = 0;
+ u64 handle;
+
+ /*
+ * Find connection handle and cleanup half-open table
+ */
+ handle = session_lookup_half_open_handle (tc);
+ if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
+ {
+ SESSION_DBG ("half-open was removed!");
+ return -1;
+ }
+ session_lookup_del_half_open (tc);
+
+ /* Get the app's index from the handle we stored when opening connection
+ * and the opaque (api_context for external apps) from transport session
+ * index */
+ app_wrk = app_worker_get_if_valid (handle >> 32);
+ if (!app_wrk)
+ return -1;
+ opaque = tc->s_index;
+ app = application_get (app_wrk->app_index);
+
+ /*
+ * Allocate new session with fifos (svm segments are allocated if needed)
+ */
+ if (!is_fail)
+ {
+ sm = app_worker_get_connect_segment_manager (app_wrk);
+ alloc_fifos = !application_is_builtin_proxy (app);
+ if (session_alloc_and_init (sm, tc, alloc_fifos, &new_s))
+ {
+ is_fail = 1;
+ error = -1;
+ }
+ else
+ {
+ new_s->session_state = SESSION_STATE_CONNECTING;
+ new_s->app_wrk_index = app_wrk->wrk_index;
+ new_si = new_s->session_index;
+ new_ti = new_s->thread_index;
+ }
+ }
+
+ /*
+ * Notify client application
+ */
+ if (app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque,
+ new_s, is_fail))
+ {
+ SESSION_DBG ("failed to notify app");
+ if (!is_fail)
+ {
+ new_s = session_get (new_si, new_ti);
+ session_transport_close (new_s);
+ }
+ }
+ else
+ {
+ if (!is_fail)
+ {
+ new_s = session_get (new_si, new_ti);
+ new_s->session_state = SESSION_STATE_READY;
+ }
+ }
+
+ return error;
+}
+
+typedef struct _session_switch_pool_args
+{
+ u32 session_index;
+ u32 thread_index;
+ u32 new_thread_index;
+ u32 new_session_index;
+} session_switch_pool_args_t;
+
+static void
+session_switch_pool (void *cb_args)
+{
+ session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
+ transport_proto_t tp;
+ session_t *s;
+ ASSERT (args->thread_index == vlib_get_thread_index ());
+ s = session_get (args->session_index, args->thread_index);
+ s->tx_fifo->master_session_index = args->new_session_index;
+ s->tx_fifo->master_thread_index = args->new_thread_index;
+ tp = session_get_transport_proto (s);
+ tp_vfts[tp].cleanup (s->connection_index, s->thread_index);
+ session_free (s);
+ clib_mem_free (cb_args);
+}
+
+/**
+ * Move dgram session to the right thread
+ */
+int
+session_dgram_connect_notify (transport_connection_t * tc,
+ u32 old_thread_index, session_t ** new_session)
+{
+ session_t *new_s;
+ session_switch_pool_args_t *rpc_args;
+
+ /*
+ * Clone half-open session to the right thread.
+ */
+ new_s = session_clone_safe (tc->s_index, old_thread_index);
+ new_s->connection_index = tc->c_index;
+ new_s->rx_fifo->master_session_index = new_s->session_index;
+ new_s->rx_fifo->master_thread_index = new_s->thread_index;
+ new_s->session_state = SESSION_STATE_READY;
+ session_lookup_add_connection (tc, session_handle (new_s));
+
+ /*
+ * Ask thread owning the old session to clean it up and make us the tx
+ * fifo owner
+ */
+ rpc_args = clib_mem_alloc (sizeof (*rpc_args));
+ rpc_args->new_session_index = new_s->session_index;
+ rpc_args->new_thread_index = new_s->thread_index;
+ rpc_args->session_index = tc->s_index;
+ rpc_args->thread_index = old_thread_index;
+ session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
+ rpc_args);
+
+ tc->s_index = new_s->session_index;
+ new_s->connection_index = tc->c_index;
+ *new_session = new_s;
+ return 0;
+}
+
+int
+stream_session_accept_notify (transport_connection_t * tc)
+{
+ app_worker_t *app_wrk;
+ application_t *app;
+ session_t *s;
+
+ s = session_get (tc->s_index, tc->thread_index);
+ app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+ if (!app_wrk)
+ return -1;
+ s->session_state = SESSION_STATE_ACCEPTING;
+ app = application_get (app_wrk->app_index);
+ return app->cb_fns.session_accept_callback (s);
+}
+
+/**
+ * Notification from transport that connection is being closed.
+ *
+ * A disconnect is sent to application but state is not removed. Once
+ * disconnect is acknowledged by application, session disconnect is called.
+ * Ultimately this leads to close being called on transport (passive close).
+ */
+void
+session_transport_closing_notify (transport_connection_t * tc)
+{
+ app_worker_t *app_wrk;
+ application_t *app;
+ session_t *s;
+
+ s = session_get (tc->s_index, tc->thread_index);
+ if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
+ return;
+ s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
+ app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+ if (!app_wrk)
+ return;
+ app = application_get (app_wrk->app_index);
+ app->cb_fns.session_disconnect_callback (s);
+}
+
+/**
+ * Notification from transport that connection is being deleted
+ *
+ * This removes the session if it is still valid. It should be called only on
+ * previously fully established sessions. For instance failed connects should
+ * call stream_session_connect_notify and indicate that the connect has
+ * failed.
+ */
+void
+session_transport_delete_notify (transport_connection_t * tc)
+{
+ session_t *s;
+
+ /* App might've been removed already */
+ if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
+ return;
+
+ /* Make sure we don't try to send anything more */
+ svm_fifo_dequeue_drop_all (s->tx_fifo);
+
+ switch (s->session_state)
+ {
+ case SESSION_STATE_ACCEPTING:
+ case SESSION_STATE_TRANSPORT_CLOSING:
+ /* If transport finishes or times out before we get a reply
+ * from the app, mark transport as closed and wait for reply
+ * before removing the session. Cleanup session table in advance
+ * because transport will soon be closed and closed sessions
+ * are assumed to have been removed from the lookup table */
+ session_lookup_del_session (s);
+ s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
+ break;
+ case SESSION_STATE_CLOSING:
+ case SESSION_STATE_CLOSED_WAITING:
+ /* Cleanup lookup table as transport needs to still be valid.
+ * Program transport close to ensure that all session events
+ * have been cleaned up. Once transport close is called, the
+ * session is just removed because both transport and app have
+ * confirmed the close*/
+ session_lookup_del_session (s);
+ s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
+ session_program_transport_close (s);
+ break;
+ case SESSION_STATE_TRANSPORT_CLOSED:
+ break;
+ case SESSION_STATE_CLOSED:
+ session_delete (s);
+ break;
+ default:
+ clib_warning ("session state %u", s->session_state);
+ session_delete (s);
+ break;
+ }
+}
+
+/**
+ * Notification from transport that session can be closed
+ *
+ * Should be called by transport only if it was closed with non-empty
+ * tx fifo and once it decides to begin the closing procedure prior to
+ * issuing a delete notify. This gives the chance to the session layer
+ * to cleanup any outstanding events.
+ */
+void
+session_transport_closed_notify (transport_connection_t * tc)
+{
+ session_t *s;
+
+ if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
+ return;