+session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
+{
+ return session_send_evt_to_thread (f, 0, f->master_thread_index, evt_type);
+}
+
+int
+session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
+ session_evt_type_t evt_type)
+{
+ return session_send_evt_to_thread (data, 0, thread_index, evt_type);
+}
+
+int
+session_send_ctrl_evt_to_thread (stream_session_t * s,
+ session_evt_type_t evt_type)
+{
+ /* only event supported for now is disconnect */
+ ASSERT (evt_type == FIFO_EVENT_DISCONNECT);
+ return session_send_evt_to_thread (s, 0, s->thread_index,
+ FIFO_EVENT_DISCONNECT);
+}
+
+void
+session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
+{
+ if (thread_index != vlib_get_thread_index ())
+ session_send_evt_to_thread (fp, rpc_args, thread_index, FIFO_EVENT_RPC);
+ else
+ {
+ void (*fnp) (void *) = fp;
+ fnp (rpc_args);
+ }
+}
+
+stream_session_t *
+session_alloc (u32 thread_index)
+{
+ session_manager_worker_t *wrk = &session_manager_main.wrk[thread_index];
+ stream_session_t *s;
+ u8 will_expand = 0;
+ pool_get_aligned_will_expand (wrk->sessions, will_expand,
+ CLIB_CACHE_LINE_BYTES);
+ /* If we have peekers, let them finish */
+ if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
+ {
+ clib_rwlock_writer_lock (&wrk->peekers_rw_locks);
+ pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
+ clib_rwlock_writer_unlock (&wrk->peekers_rw_locks);
+ }
+ else
+ {
+ pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
+ }
+ clib_memset (s, 0, sizeof (*s));
+ s->session_index = s - wrk->sessions;
+ s->thread_index = thread_index;
+ return s;
+}
+
+void
+session_free (stream_session_t * s)
+{
+ pool_put (session_manager_main.wrk[s->thread_index].sessions, s);
+ if (CLIB_DEBUG)
+ clib_memset (s, 0xFA, sizeof (*s));
+}
+
+void
+session_free_w_fifos (stream_session_t * s)
+{
+ segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo,
+ s->server_tx_fifo);
+ session_free (s);
+}
+
+int
+session_alloc_fifos (segment_manager_t * sm, stream_session_t * s)