+/**
+ * Move dgram session to the right thread
+ */
+int
+session_dgram_connect_notify (transport_connection_t * tc,
+ u32 old_thread_index,
+ stream_session_t ** new_session)
+{
+ stream_session_t *new_s;
+ session_switch_pool_args_t *rpc_args;
+
+ /*
+ * Clone half-open session to the right thread.
+ */
+ new_s = session_clone_safe (tc->s_index, old_thread_index);
+ new_s->connection_index = tc->c_index;
+ new_s->server_rx_fifo->master_session_index = new_s->session_index;
+ new_s->server_rx_fifo->master_thread_index = new_s->thread_index;
+ new_s->session_state = SESSION_STATE_READY;
+ session_lookup_add_connection (tc, session_handle (new_s));
+
+ /*
+ * Ask thread owning the old session to clean it up and make us the tx
+ * fifo owner
+ */
+ rpc_args = clib_mem_alloc (sizeof (*rpc_args));
+ rpc_args->new_session_index = new_s->session_index;
+ rpc_args->new_thread_index = new_s->thread_index;
+ rpc_args->session_index = tc->s_index;
+ rpc_args->thread_index = old_thread_index;
+ session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
+ rpc_args);
+
+ tc->s_index = new_s->session_index;
+ new_s->connection_index = tc->c_index;
+ *new_session = new_s;
+ return 0;
+}
+
+void
+stream_session_accept_notify (transport_connection_t * tc)
+{
+ application_t *server;
+ stream_session_t *s;
+
+ s = session_get (tc->s_index, tc->thread_index);
+ server = application_get (s->app_index);
+ server->cb_fns.session_accept_callback (s);
+}
+
+/**
+ * Notification from transport that connection is being closed.
+ *
+ * A disconnect is sent to application but state is not removed. Once
+ * disconnect is acknowledged by application, session disconnect is called.
+ * Ultimately this leads to close being called on transport (passive close).
+ */
+void
+stream_session_disconnect_notify (transport_connection_t * tc)
+{
+ application_t *server;
+ stream_session_t *s;
+
+ s = session_get (tc->s_index, tc->thread_index);
+ server = application_get (s->app_index);
+ server->cb_fns.session_disconnect_callback (s);
+ s->session_state = SESSION_STATE_CLOSING;
+}
+
+/**
+ * Cleans up session and lookup table.
+ *
+ * Transport connection must still be valid.
+ */
+void
+stream_session_delete (stream_session_t * s)
+{
+ int rv;
+
+ /* Delete from the main lookup table. */
+ if ((rv = session_lookup_del_session (s)))
+ clib_warning ("hash delete error, rv %d", rv);
+
+ /* Cleanup fifo segments */
+ segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo,
+ s->server_tx_fifo);
+ session_free (s);
+}
+
+/**
+ * Notification from transport that connection is being deleted
+ *
+ * This removes the session if it is still valid. It should be called only on
+ * previously fully established sessions. For instance failed connects should
+ * call stream_session_connect_notify and indicate that the connect has
+ * failed.
+ */
+void
+stream_session_delete_notify (transport_connection_t * tc)
+{
+ stream_session_t *s;
+
+ /* App might've been removed already */
+ s = session_get_if_valid (tc->s_index, tc->thread_index);
+ if (!s)
+ return;
+ stream_session_delete (s);
+}
+
+/**
+ * Notify application that connection has been reset.
+ */
+void
+stream_session_reset_notify (transport_connection_t * tc)
+{
+ stream_session_t *s;
+ application_t *app;
+ s = session_get (tc->s_index, tc->thread_index);
+ s->session_state = SESSION_STATE_CLOSED;
+ app = application_get (s->app_index);
+ app->cb_fns.session_reset_callback (s);
+}
+
+/**
+ * Accept a stream session. Optionally ping the server by callback.
+ */
+int
+stream_session_accept (transport_connection_t * tc, u32 listener_index,
+ u8 notify)
+{
+ application_t *server;
+ stream_session_t *s, *listener;
+ segment_manager_t *sm;
+ int rv;
+
+ /* Find the server */
+ listener = listen_session_get (listener_index);
+ server = application_get (listener->app_index);
+
+ sm = application_get_listen_segment_manager (server, listener);
+ if ((rv = session_alloc_and_init (sm, tc, 1, &s)))
+ return rv;
+
+ s->app_index = server->index;
+ s->listener_index = listener_index;
+ s->session_state = SESSION_STATE_ACCEPTING;
+
+ /* Shoulder-tap the server */
+ if (notify)
+ {
+ server->cb_fns.session_accept_callback (s);
+ }
+
+ return 0;
+}
+
+int
+session_open_cl (u32 app_index, session_endpoint_t * rmt, u32 opaque)
+{
+ transport_connection_t *tc;
+ transport_endpoint_t *tep;
+ segment_manager_t *sm;
+ stream_session_t *s;
+ application_t *app;
+ int rv;
+
+ tep = session_endpoint_to_transport (rmt);
+ rv = tp_vfts[rmt->transport_proto].open (tep);
+ if (rv < 0)
+ {
+ SESSION_DBG ("Transport failed to open connection.");
+ return VNET_API_ERROR_SESSION_CONNECT;
+ }
+
+ tc = tp_vfts[rmt->transport_proto].get_half_open ((u32) rv);
+
+ /* For dgram type of service, allocate session and fifos now.
+ */
+ app = application_get (app_index);
+ sm = application_get_connect_segment_manager (app);
+
+ if (session_alloc_and_init (sm, tc, 1, &s))
+ return -1;
+ s->app_index = app->index;
+ s->session_state = SESSION_STATE_OPENED;
+
+ /* Tell the app about the new event fifo for this session */
+ app->cb_fns.session_connected_callback (app->index, opaque, s, 0);
+
+ return 0;
+}
+
+int
+session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque)
+{
+ transport_connection_t *tc;
+ transport_endpoint_t *tep;
+ u64 handle;
+ int rv;
+
+ tep = session_endpoint_to_transport (rmt);
+ rv = tp_vfts[rmt->transport_proto].open (tep);
+ if (rv < 0)
+ {
+ SESSION_DBG ("Transport failed to open connection.");
+ return VNET_API_ERROR_SESSION_CONNECT;
+ }
+
+ tc = tp_vfts[rmt->transport_proto].get_half_open ((u32) rv);
+
+ /* If transport offers a stream service, only allocate session once the
+ * connection has been established.
+ * Add connection to half-open table and save app and tc index. The
+ * latter is needed to help establish the connection while the former
+ * is needed when the connect notify comes and we have to notify the
+ * external app
+ */
+ handle = (((u64) app_index) << 32) | (u64) tc->c_index;
+ session_lookup_add_half_open (tc, handle);
+
+ /* Store api_context (opaque) for when the reply comes. Not the nicest
+ * thing but better than allocating a separate half-open pool.
+ */
+ tc->s_index = opaque;
+ return 0;
+}
+
+int
+session_open_app (u32 app_index, session_endpoint_t * rmt, u32 opaque)
+{
+ session_endpoint_extended_t *sep = (session_endpoint_extended_t *) rmt;
+ sep->app_index = app_index;
+ sep->opaque = opaque;
+
+ return tp_vfts[rmt->transport_proto].open ((transport_endpoint_t *) sep);
+}
+
+typedef int (*session_open_service_fn) (u32, session_endpoint_t *, u32);
+
+/* *INDENT-OFF* */
+static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
+ session_open_vc,
+ session_open_cl,
+ session_open_app,
+};
+/* *INDENT-ON* */
+
+/**
+ * Ask transport to open connection to remote transport endpoint.
+ *
+ * Stores handle for matching request with reply since the call can be
+ * asynchronous. For instance, for TCP the 3-way handshake must complete
+ * before reply comes. Session is only created once connection is established.
+ *
+ * @param app_index Index of the application requesting the connect
+ * @param st Session type requested.
+ * @param tep Remote transport endpoint
+ * @param opaque Opaque data (typically, api_context) the application expects
+ * on open completion.
+ */
+int
+session_open (u32 app_index, session_endpoint_t * rmt, u32 opaque)
+{
+ transport_service_type_t tst = tp_vfts[rmt->transport_proto].service_type;
+ return session_open_srv_fns[tst] (app_index, rmt, opaque);
+}
+
+int
+session_listen_vc (stream_session_t * s, session_endpoint_t * sep)
+{
+ transport_connection_t *tc;
+ u32 tci;
+
+ /* Transport bind/listen */
+ tci = tp_vfts[sep->transport_proto].bind (s->session_index,
+ session_endpoint_to_transport
+ (sep));
+
+ if (tci == (u32) ~ 0)
+ return -1;
+
+ /* Attach transport to session */
+ s->connection_index = tci;
+ tc = tp_vfts[sep->transport_proto].get_listener (tci);
+
+ /* Weird but handle it ... */
+ if (tc == 0)
+ return -1;
+
+ /* Add to the main lookup table */
+ session_lookup_add_connection (tc, s->session_index);
+ return 0;