+ vls_mt_pool_runlock ();
+}
+
+static vcl_locked_session_t *
+vls_session_get (vls_worker_t * wrk, u32 vls_index)
+{
+ if (pool_is_free_index (wrk->vls_pool, vls_index))
+ return 0;
+ return pool_elt_at_index (wrk->vls_pool, vls_index);
+}
+
+vcl_session_handle_t
+vlsh_to_sh (vls_handle_t vlsh)
+{
+ vcl_locked_session_t *vls;
+ int rv;
+
+ vls = vls_get_w_dlock (vlsh);
+ if (!vls)
+ return INVALID_SESSION_ID;
+ rv = vls_to_sh (vls);
+ vls_dunlock (vls);
+ return rv;
+}
+
+vcl_session_handle_t
+vlsh_to_session_index (vls_handle_t vlsh)
+{
+ vcl_session_handle_t sh;
+ sh = vlsh_to_sh (vlsh);
+ return vppcom_session_index (sh);
+}
+
+vls_handle_t
+vls_si_wi_to_vlsh (u32 session_index, u32 vcl_wrk_index)
+{
+ vls_worker_t *wrk = vls_worker_get_current ();
+ uword *vlshp = vls_sh_to_vlsh_table_get (
+ wrk,
+ vcl_session_handle_from_wrk_session_index (session_index, vcl_wrk_index));
+
+ return vlshp ? *vlshp : VLS_INVALID_HANDLE;
+}
+
+vls_handle_t
+vls_session_index_to_vlsh (uint32_t session_index)
+{
+ vls_handle_t vlsh;
+
+ vls_mt_pool_rlock ();
+ vlsh = vls_si_wi_to_vlsh (session_index, vcl_get_worker_index ());
+ vls_mt_pool_runlock ();
+
+ return vlsh;
+}
+
+u8
+vls_is_shared_by_wrk (vcl_locked_session_t * vls, u32 wrk_index)
+{
+ vls_shared_data_t *vls_shd;
+ int i;
+
+ if (vls->shared_data_index == ~0)
+ return 0;
+
+ vls_shared_data_pool_rlock ();
+
+ vls_shd = vls_shared_data_get (vls->shared_data_index);
+ clib_spinlock_lock (&vls_shd->lock);
+
+ for (i = 0; i < vec_len (vls_shd->workers_subscribed); i++)
+ if (vls_shd->workers_subscribed[i] == wrk_index)
+ {
+ clib_spinlock_unlock (&vls_shd->lock);
+ vls_shared_data_pool_runlock ();
+ return 1;
+ }
+ clib_spinlock_unlock (&vls_shd->lock);
+
+ vls_shared_data_pool_runlock ();
+ return 0;
+}
+
+static void
+vls_listener_wrk_set (vcl_locked_session_t * vls, u32 wrk_index, u8 is_active)
+{
+ vls_shared_data_t *vls_shd;
+
+ if (vls->shared_data_index == ~0)
+ {
+ clib_warning ("not a shared session");
+ return;
+ }
+
+ vls_shared_data_pool_rlock ();
+
+ vls_shd = vls_shared_data_get (vls->shared_data_index);
+
+ clib_spinlock_lock (&vls_shd->lock);
+ vls_shd->listeners =
+ clib_bitmap_set (vls_shd->listeners, wrk_index, is_active);
+ clib_spinlock_unlock (&vls_shd->lock);
+
+ vls_shared_data_pool_runlock ();
+}
+
+static u32
+vls_shared_get_owner (vcl_locked_session_t * vls)
+{
+ vls_shared_data_t *vls_shd;
+ u32 owner_wrk;
+
+ vls_shared_data_pool_rlock ();
+
+ vls_shd = vls_shared_data_get (vls->shared_data_index);
+ owner_wrk = vls_shd->owner_wrk_index;
+
+ vls_shared_data_pool_runlock ();
+
+ return owner_wrk;
+}
+
+static u8
+vls_listener_wrk_is_active (vcl_locked_session_t * vls, u32 wrk_index)
+{
+ vls_shared_data_t *vls_shd;
+ u8 is_set;
+
+ if (vls->shared_data_index == ~0)
+ {
+ clib_warning ("not a shared session");
+ return 0;
+ }
+
+ vls_shared_data_pool_rlock ();
+
+ vls_shd = vls_shared_data_get (vls->shared_data_index);
+
+ clib_spinlock_lock (&vls_shd->lock);
+ is_set = clib_bitmap_get (vls_shd->listeners, wrk_index);
+ clib_spinlock_unlock (&vls_shd->lock);
+
+ vls_shared_data_pool_runlock ();
+
+ return (is_set == 1);
+}
+
+static void
+vls_listener_wrk_start_listen (vcl_locked_session_t * vls, u32 wrk_index)
+{
+ vcl_worker_t *wrk;
+ vcl_session_t *ls;
+
+ wrk = vcl_worker_get (wrk_index);
+ ls = vcl_session_get (wrk, vls->session_index);
+
+ /* Listen request already sent */
+ if (ls->flags & VCL_SESSION_F_PENDING_LISTEN)
+ return;
+
+ vcl_send_session_listen (wrk, ls);
+
+ vls_listener_wrk_set (vls, wrk_index, 1 /* is_active */);
+}
+
+static void
+vls_listener_wrk_stop_listen (vcl_locked_session_t * vls, u32 wrk_index)
+{
+ vcl_worker_t *wrk;
+ vcl_session_t *s;
+
+ wrk = vcl_worker_get (wrk_index);
+ s = vcl_session_get (wrk, vls->session_index);
+ if (s->session_state != VCL_STATE_LISTEN)
+ return;
+ vcl_send_session_unlisten (wrk, s);
+ s->session_state = VCL_STATE_LISTEN_NO_MQ;
+ vls_listener_wrk_set (vls, wrk_index, 0 /* is_active */ );
+}
+
+static int
+vls_shared_data_subscriber_position (vls_shared_data_t * vls_shd,
+ u32 wrk_index)
+{
+ int i;
+
+ for (i = 0; i < vec_len (vls_shd->workers_subscribed); i++)
+ {
+ if (vls_shd->workers_subscribed[i] == wrk_index)
+ return i;
+ }
+ return -1;
+}
+
+int
+vls_unshare_session (vcl_locked_session_t * vls, vcl_worker_t * wrk)
+{
+ vls_shared_data_t *vls_shd;
+ int do_disconnect, pos;
+ u32 n_subscribers;
+ vcl_session_t *s;
+
+ if (vls->shared_data_index == ~0)
+ return 0;
+
+ s = vcl_session_get (wrk, vls->session_index);
+ if (s->session_state == VCL_STATE_LISTEN)
+ vls_listener_wrk_set (vls, wrk->wrk_index, 0 /* is_active */ );
+
+ vls_shared_data_pool_rlock ();
+
+ vls_shd = vls_shared_data_get (vls->shared_data_index);
+ clib_spinlock_lock (&vls_shd->lock);
+
+ pos = vls_shared_data_subscriber_position (vls_shd, wrk->wrk_index);
+ if (pos < 0)
+ {
+ clib_warning ("worker %u not subscribed for vls %u", wrk->wrk_index,
+ vls->vcl_wrk_index);
+ goto done;
+ }
+
+ /*
+ * Unsubscribe from share data and fifos
+ */
+ if (s->rx_fifo)
+ {
+ svm_fifo_del_subscriber (s->rx_fifo, wrk->vpp_wrk_index);
+ svm_fifo_del_subscriber (s->tx_fifo, wrk->vpp_wrk_index);
+ }
+ vec_del1 (vls_shd->workers_subscribed, pos);
+
+ /*
+ * Cleanup vcl state
+ */
+ n_subscribers = vec_len (vls_shd->workers_subscribed);
+ do_disconnect = s->session_state == VCL_STATE_LISTEN || !n_subscribers;
+ vcl_session_cleanup (wrk, s, vcl_session_handle (s), do_disconnect);
+
+ /*
+ * No subscriber left, cleanup shared data
+ */
+ if (!n_subscribers)
+ {
+ u32 shd_index = vls_shared_data_index (vls_shd);
+
+ clib_spinlock_unlock (&vls_shd->lock);
+ vls_shared_data_pool_runlock ();
+
+ vls_shared_data_free (shd_index);
+
+ /* All locks have been dropped */
+ return 0;
+ }
+
+ /* Return, if this is not the owning worker */
+ if (vls_shd->owner_wrk_index != wrk->wrk_index)
+ goto done;
+
+ ASSERT (vec_len (vls_shd->workers_subscribed));
+
+ /*
+ * Check if we can change owner or close
+ */
+ vls_shd->owner_wrk_index = vls_shd->workers_subscribed[0];
+ if (s->vpp_evt_q)
+ vcl_send_session_worker_update (wrk, s, vls_shd->owner_wrk_index);
+
+ /* XXX is this still needed? */
+ if (vec_len (vls_shd->workers_subscribed) > 1)
+ clib_warning ("more workers need to be updated");
+
+done:
+
+ clib_spinlock_unlock (&vls_shd->lock);
+ vls_shared_data_pool_runlock ();
+
+ return 0;
+}
+
+void
+vls_init_share_session (vls_worker_t * vls_wrk, vcl_locked_session_t * vls)
+{
+ vls_shared_data_t *vls_shd;
+
+ u32 vls_shd_index = vls_shared_data_alloc ();
+
+ vls_shared_data_pool_rlock ();
+
+ vls_shd = vls_shared_data_get (vls_shd_index);
+ vls_shd->owner_wrk_index = vls_wrk->vcl_wrk_index;
+ vls->shared_data_index = vls_shd_index;
+ vec_add1 (vls_shd->workers_subscribed, vls_wrk->vcl_wrk_index);
+
+ vls_shared_data_pool_runlock ();
+}
+
+void
+vls_share_session (vls_worker_t * vls_wrk, vcl_locked_session_t * vls)
+{
+ vcl_worker_t *vcl_wrk = vcl_worker_get (vls_wrk->vcl_wrk_index);
+ vls_shared_data_t *vls_shd;
+ vcl_session_t *s;
+
+ s = vcl_session_get (vcl_wrk, vls->session_index);
+ if (!s)
+ {
+ clib_warning ("wrk %u session %u vls %u NOT AVAILABLE",
+ vcl_wrk->wrk_index, vls->session_index, vls->vls_index);
+ return;
+ }
+
+ ASSERT (vls->shared_data_index != ~0);
+
+ /* Reinit session lock */
+ clib_spinlock_init (&vls->lock);
+
+ vls_shared_data_pool_rlock ();
+
+ vls_shd = vls_shared_data_get (vls->shared_data_index);
+
+ clib_spinlock_lock (&vls_shd->lock);
+ vec_add1 (vls_shd->workers_subscribed, vls_wrk->vcl_wrk_index);
+ clib_spinlock_unlock (&vls_shd->lock);
+
+ vls_shared_data_pool_runlock ();
+
+ if (s->session_state == VCL_STATE_LISTEN)
+ {
+ s->session_state = VCL_STATE_LISTEN_NO_MQ;
+ s->rx_fifo = s->tx_fifo = 0;
+ }
+ else if (s->rx_fifo)
+ {
+ vcl_session_share_fifos (s, s->rx_fifo, s->tx_fifo);
+ }
+}
+
+static void
+vls_share_sessions (vls_worker_t * vls_parent_wrk, vls_worker_t * vls_wrk)
+{
+ vcl_locked_session_t *vls, *parent_vls;
+
+ pool_foreach (vls, vls_wrk->vls_pool) {
+ /* Initialize sharing on parent session */
+ if (vls->shared_data_index == ~0)
+ {
+ parent_vls = vls_session_get (vls_parent_wrk, vls->vls_index);
+ vls_init_share_session (vls_parent_wrk, parent_vls);
+ vls->shared_data_index = parent_vls->shared_data_index;
+ }
+ vls_share_session (vls_wrk, vls);
+ }