+ for (i = 0; i < 1 + vtm->n_threads; i++)
+ errors += session_manager_flush_enqueue_events (transport_proto, i);
+ return errors;
+}
+
+/**
+ * Init fifo tail and head pointers
+ *
+ * Useful if transport uses absolute offsets for tracking ooo segments.
+ */
+void
+stream_session_init_fifos_pointers (transport_connection_t * tc,
+ u32 rx_pointer, u32 tx_pointer)
+{
+ session_t *s;
+ s = session_get (tc->s_index, tc->thread_index);
+ svm_fifo_init_pointers (s->rx_fifo, rx_pointer);
+ svm_fifo_init_pointers (s->tx_fifo, tx_pointer);
+}
+
+int
+session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
+{
+ u32 opaque = 0, new_ti, new_si;
+ app_worker_t *app_wrk;
+ session_t *s = 0;
+ u64 ho_handle;
+
+ /*
+ * Find connection handle and cleanup half-open table
+ */
+ ho_handle = session_lookup_half_open_handle (tc);
+ if (ho_handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
+ {
+ SESSION_DBG ("half-open was removed!");
+ return -1;
+ }
+ session_lookup_del_half_open (tc);
+
+ /* Get the app's index from the handle we stored when opening connection
+ * and the opaque (api_context for external apps) from transport session
+ * index */
+ app_wrk = app_worker_get_if_valid (ho_handle >> 32);
+ if (!app_wrk)
+ return -1;
+
+ opaque = tc->s_index;
+
+ if (is_fail)
+ return app_worker_connect_notify (app_wrk, s, opaque);
+
+ s = session_alloc_for_connection (tc);
+ s->session_state = SESSION_STATE_CONNECTING;
+ s->app_wrk_index = app_wrk->wrk_index;
+ new_si = s->session_index;
+ new_ti = s->thread_index;
+
+ if (app_worker_init_connected (app_wrk, s))
+ {
+ session_free (s);
+ app_worker_connect_notify (app_wrk, 0, opaque);
+ return -1;
+ }
+
+ if (app_worker_connect_notify (app_wrk, s, opaque))
+ {
+ s = session_get (new_si, new_ti);
+ session_free_w_fifos (s);
+ return -1;
+ }
+
+ s = session_get (new_si, new_ti);
+ s->session_state = SESSION_STATE_READY;
+ session_lookup_add_connection (tc, session_handle (s));
+
+ return 0;
+}
+
+typedef struct _session_switch_pool_args
+{
+ u32 session_index;
+ u32 thread_index;
+ u32 new_thread_index;
+ u32 new_session_index;
+} session_switch_pool_args_t;
+
+static void
+session_switch_pool (void *cb_args)
+{
+ session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
+ session_t *s;
+ ASSERT (args->thread_index == vlib_get_thread_index ());
+ s = session_get (args->session_index, args->thread_index);
+ s->tx_fifo->master_session_index = args->new_session_index;
+ s->tx_fifo->master_thread_index = args->new_thread_index;
+ transport_cleanup (session_get_transport_proto (s), s->connection_index,
+ s->thread_index);
+ session_free (s);
+ clib_mem_free (cb_args);
+}
+
+/**
+ * Move dgram session to the right thread
+ */
+int
+session_dgram_connect_notify (transport_connection_t * tc,
+ u32 old_thread_index, session_t ** new_session)
+{
+ session_t *new_s;
+ session_switch_pool_args_t *rpc_args;
+
+ /*
+ * Clone half-open session to the right thread.
+ */
+ new_s = session_clone_safe (tc->s_index, old_thread_index);
+ new_s->connection_index = tc->c_index;
+ new_s->rx_fifo->master_session_index = new_s->session_index;
+ new_s->rx_fifo->master_thread_index = new_s->thread_index;
+ new_s->session_state = SESSION_STATE_READY;
+ session_lookup_add_connection (tc, session_handle (new_s));
+
+ /*
+ * Ask thread owning the old session to clean it up and make us the tx
+ * fifo owner
+ */
+ rpc_args = clib_mem_alloc (sizeof (*rpc_args));
+ rpc_args->new_session_index = new_s->session_index;
+ rpc_args->new_thread_index = new_s->thread_index;
+ rpc_args->session_index = tc->s_index;
+ rpc_args->thread_index = old_thread_index;
+ session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
+ rpc_args);
+
+ tc->s_index = new_s->session_index;
+ new_s->connection_index = tc->c_index;
+ *new_session = new_s;
+ return 0;
+}
+
+/**
+ * Notification from transport that connection is being closed.
+ *
+ * A disconnect is sent to application but state is not removed. Once
+ * disconnect is acknowledged by application, session disconnect is called.
+ * Ultimately this leads to close being called on transport (passive close).
+ */
+void
+session_transport_closing_notify (transport_connection_t * tc)
+{
+ app_worker_t *app_wrk;
+ application_t *app;
+ session_t *s;
+
+ s = session_get (tc->s_index, tc->thread_index);
+ if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
+ return;
+ s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
+ app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+ if (!app_wrk)
+ return;
+ app = application_get (app_wrk->app_index);
+ app->cb_fns.session_disconnect_callback (s);
+}
+
+/**
+ * Notification from transport that connection is being deleted
+ *
+ * This removes the session if it is still valid. It should be called only on
+ * previously fully established sessions. For instance failed connects should
+ * call stream_session_connect_notify and indicate that the connect has
+ * failed.
+ */
+void
+session_transport_delete_notify (transport_connection_t * tc)
+{
+ session_t *s;
+
+ /* App might've been removed already */
+ if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
+ return;
+
+ /* Make sure we don't try to send anything more */
+ svm_fifo_dequeue_drop_all (s->tx_fifo);
+
+ switch (s->session_state)
+ {
+ case SESSION_STATE_CREATED:
+ /* Session was created but accept notification was not yet sent to the
+ * app. Cleanup everything. */
+ session_lookup_del_session (s);
+ session_free_w_fifos (s);
+ break;
+ case SESSION_STATE_ACCEPTING:
+ case SESSION_STATE_TRANSPORT_CLOSING:
+ /* If transport finishes or times out before we get a reply
+ * from the app, mark transport as closed and wait for reply
+ * before removing the session. Cleanup session table in advance
+ * because transport will soon be closed and closed sessions
+ * are assumed to have been removed from the lookup table */
+ session_lookup_del_session (s);
+ s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
+ break;
+ case SESSION_STATE_CLOSING:
+ case SESSION_STATE_CLOSED_WAITING:
+ /* Cleanup lookup table as transport needs to still be valid.
+ * Program transport close to ensure that all session events
+ * have been cleaned up. Once transport close is called, the
+ * session is just removed because both transport and app have
+ * confirmed the close*/
+ session_lookup_del_session (s);
+ s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
+ session_program_transport_close (s);
+ break;
+ case SESSION_STATE_TRANSPORT_CLOSED:
+ break;
+ case SESSION_STATE_CLOSED:
+ session_delete (s);
+ break;
+ default:
+ clib_warning ("session state %u", s->session_state);
+ session_delete (s);
+ break;
+ }
+}
+
+/**
+ * Notification from transport that session can be closed
+ *
+ * Should be called by transport only if it was closed with non-empty
+ * tx fifo and once it decides to begin the closing procedure prior to
+ * issuing a delete notify. This gives the chance to the session layer
+ * to cleanup any outstanding events.
+ */
+void
+session_transport_closed_notify (transport_connection_t * tc)
+{
+ session_t *s;