vcl_locked_session_t *vls_pool;
uword *session_index_to_vlsh_table;
u32 wrk_index;
+ volatile int rpc_done;
} vls_worker_t;
typedef struct vls_local_
vls_main_t *vlsm;
+typedef enum vls_rpc_msg_type_
+{
+ VLS_RPC_CLONE_AND_SHARE,
+} vls_rpc_msg_type_e;
+
+typedef struct vls_rpc_msg_
+{
+ u8 type;
+ u8 data[0];
+} vls_rpc_msg_t;
+
+typedef struct vls_clone_and_share_msg_
+{
+ u32 vls_index; /**< vls to be shared */
+ u32 origin_vls_wrk; /**< worker that initiated the rpc */
+ u32 origin_vls_index; /**< vls session of the originator */
+} vls_clone_and_share_msg_t;
+
static inline u32
vls_get_worker_index (void)
{
}
void
-vls_share_session (vcl_locked_session_t * vls, vls_worker_t * vls_wrk,
- vls_worker_t * vls_parent_wrk, vcl_worker_t * vcl_wrk)
+vls_init_share_session (vls_worker_t * vls_wrk, vcl_locked_session_t * vls)
{
- vcl_locked_session_t *parent_vls;
+ vls_shared_data_t *vls_shd;
+
+ u32 vls_shd_index = vls_shared_data_alloc ();
+
+ vls_shared_data_pool_rlock ();
+
+ vls_shd = vls_shared_data_get (vls_shd_index);
+ vls_shd->owner_wrk_index = vls_wrk->wrk_index;
+ vls->shared_data_index = vls_shd_index;
+ vec_add1 (vls_shd->workers_subscribed, vls_wrk->wrk_index);
+
+ vls_shared_data_pool_runlock ();
+}
+
+void
+vls_share_session (vls_worker_t * vls_wrk, vcl_locked_session_t * vls)
+{
+ vcl_worker_t *vcl_wrk = vcl_worker_get (vls_wrk->wrk_index);
vls_shared_data_t *vls_shd;
vcl_session_t *s;
s = vcl_session_get (vcl_wrk, vls->session_index);
if (!s)
{
- clib_warning ("wrk %u parent %u session %u vls %u NOT AVAILABLE",
- vcl_wrk->wrk_index, vls_parent_wrk->wrk_index,
- vls->session_index, vls->vls_index);
+ clib_warning ("wrk %u session %u vls %u NOT AVAILABLE",
+ vcl_wrk->wrk_index, vls->session_index, vls->vls_index);
return;
}
+ ASSERT (vls->shared_data_index != ~0);
+
/* Reinit session lock */
clib_spinlock_init (&vls->lock);
- if (vls->shared_data_index != ~0)
- {
- vls_shared_data_pool_rlock ();
- vls_shd = vls_shared_data_get (vls->shared_data_index);
- }
- else
- {
- u32 vls_shd_index = vls_shared_data_alloc ();
-
- vls_shared_data_pool_rlock ();
-
- vls_shd = vls_shared_data_get (vls_shd_index);
- vls_shd->owner_wrk_index = vls_parent_wrk->wrk_index;
- vls->shared_data_index = vls_shd_index;
+ vls_shared_data_pool_rlock ();
- /* Update parent shared data */
- parent_vls = vls_session_get (vls_parent_wrk, vls->vls_index);
- parent_vls->shared_data_index = vls_shd_index;
- vec_add1 (vls_shd->workers_subscribed, vls_parent_wrk->wrk_index);
- }
+ vls_shd = vls_shared_data_get (vls->shared_data_index);
clib_spinlock_lock (&vls_shd->lock);
-
vec_add1 (vls_shd->workers_subscribed, vls_wrk->wrk_index);
-
clib_spinlock_unlock (&vls_shd->lock);
+
vls_shared_data_pool_runlock ();
if (s->rx_fifo)
static void
vls_share_sessions (vls_worker_t * vls_parent_wrk, vls_worker_t * vls_wrk)
{
- vcl_worker_t *vcl_wrk = vcl_worker_get (vls_wrk->wrk_index);
- vcl_locked_session_t *vls;
+ vcl_locked_session_t *vls, *parent_vls;
/* *INDENT-OFF* */
pool_foreach (vls, vls_wrk->vls_pool, ({
- vls_share_session (vls, vls_wrk, vls_parent_wrk, vcl_wrk);
+ /* Initialize sharing on parent session */
+ if (vls->shared_data_index == ~0)
+ {
+ parent_vls = vls_session_get (vls_parent_wrk, vls->vls_index);
+ vls_init_share_session (vls_parent_wrk, parent_vls);
+ vls->shared_data_index = parent_vls->shared_data_index;
+ }
+ vls_share_session (vls_wrk, vls);
}));
/* *INDENT-ON* */
}
vls_worker_free (wrk);
}
+static void
+vls_clone_and_share_rpc_handler (void *args)
+{
+ vls_clone_and_share_msg_t *msg = (vls_clone_and_share_msg_t *) args;
+ vls_worker_t *wrk = vls_worker_get_current (), *dst_wrk;
+ vcl_locked_session_t *vls, *dst_vls;
+ vcl_worker_t *dst_vcl_wrk;
+ vcl_session_t *s, *dst_s;
+
+ vls = vls_session_get (wrk, msg->vls_index);
+ vls_init_share_session (wrk, vls);
+
+ s = vcl_session_get (vcl_worker_get_current (), vls->session_index);
+ dst_wrk = vls_worker_get (msg->origin_vls_wrk);
+ dst_vcl_wrk = vcl_worker_get (msg->origin_vls_wrk);
+ dst_vls = vls_session_get (dst_wrk, msg->origin_vls_index);
+ dst_vls->shared_data_index = vls->shared_data_index;
+ dst_s = vcl_session_get (dst_vcl_wrk, dst_vls->session_index);
+ clib_memcpy (dst_s, s, sizeof (*s));
+
+ dst_wrk->rpc_done = 1;
+}
+
+static void
+vls_rpc_handler (void *args)
+{
+ vls_rpc_msg_t *msg = (vls_rpc_msg_t *) args;
+ switch (msg->type)
+ {
+ case VLS_RPC_CLONE_AND_SHARE:
+ vls_clone_and_share_rpc_handler (msg->data);
+ break;
+ default:
+ break;
+ }
+}
+
+void
+vls_send_clone_and_share_rpc (vls_worker_t * wrk, vcl_locked_session_t * vls,
+ u32 dst_wrk_index, u32 dst_vls_index)
+{
+ u8 data[sizeof (u8) + sizeof (vls_clone_and_share_msg_t)];
+ vls_clone_and_share_msg_t *msg;
+ vls_rpc_msg_t *rpc;
+
+ rpc = (vls_rpc_msg_t *) & data;
+ rpc->type = VLS_RPC_CLONE_AND_SHARE;
+ msg = (vls_clone_and_share_msg_t *) & rpc->data;
+ msg->origin_vls_wrk = wrk->wrk_index;
+ msg->origin_vls_index = vls->vls_index;
+ msg->vls_index = dst_vls_index;
+
+ wrk->rpc_done = 0;
+ vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data));
+ while (!wrk->rpc_done)
+ ;
+}
+
int
vls_app_create (char *app_name)
{
vls_worker_alloc ();
vlsl->vls_wrk_index = vcl_get_worker_index ();
vls_mt_locks_init ();
+ vcm->wrk_rpc_fn = vls_rpc_handler;
return VPPCOM_OK;
}
app_send_ctrl_evt_to_vpp (mq, app_evt);
}
+void
+vcl_send_worker_rpc (u32 dst_wrk_index, void *data, u32 data_len)
+{
+ app_session_evt_t _app_evt, *app_evt = &_app_evt;
+ session_app_wrk_rpc_msg_t *mp;
+ vcl_worker_t *dst_wrk, *wrk;
+ svm_msg_q_t *mq;
+
+ if (data_len > sizeof (mp->data))
+ goto done;
+
+ clib_spinlock_lock (&vcm->workers_lock);
+
+ dst_wrk = vcl_worker_get_if_valid (dst_wrk_index);
+ if (!dst_wrk)
+ goto done;
+
+ wrk = vcl_worker_get_current ();
+ mq = vcl_worker_ctrl_mq (wrk);
+ app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_APP_WRK_RPC);
+ mp = (session_app_wrk_rpc_msg_t *) app_evt->evt->data;
+ mp->client_index = wrk->my_client_index;
+ mp->wrk_index = dst_wrk->vpp_wrk_index;
+ clib_memcpy (mp->data, data, data_len);
+ app_send_ctrl_evt_to_vpp (mq, app_evt);
+
+done:
+ clib_spinlock_unlock (&vcm->workers_lock);
+}
+
static u32
vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp,
u32 ls_index)
VDBG (1, "Unmapped segment: %d", msg->segment_handle);
}
+static void
+vcl_worker_rpc_handler (vcl_worker_t * wrk, void *data)
+{
+ if (!vcm->wrk_rpc_fn)
+ return;
+
+ (vcm->wrk_rpc_fn) (data);
+}
+
static int
vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
{
case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
vcl_session_app_del_segment_handler (wrk, e->data);
break;
+ case SESSION_CTRL_EVT_RPC:
+ vcl_worker_rpc_handler (wrk, e->data);
+ break;
default:
clib_warning ("unhandled %u", e->event_type);
}
case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
vcl_session_app_del_segment_handler (wrk, e->data);
break;
+ case SESSION_CTRL_EVT_RPC:
+ vcl_worker_rpc_handler (wrk, e->data);
+ break;
default:
clib_warning ("unhandled: %u", e->event_type);
break;
case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
vcl_session_app_del_segment_handler (wrk, e->data);
break;
+ case SESSION_CTRL_EVT_RPC:
+ vcl_worker_rpc_handler (wrk, e->data);
+ break;
default:
VDBG (0, "unhandled: %u", e->event_type);
break;
app_worker_close_notify (app_wrk, s);
}
+static void
+session_mq_app_wrk_rpc_handler (void *data)
+{
+ session_app_wrk_rpc_msg_t *mp = (session_app_wrk_rpc_msg_t *) data;
+ svm_msg_q_msg_t _msg, *msg = &_msg;
+ session_app_wrk_rpc_msg_t *rmp;
+ app_worker_t *app_wrk;
+ session_event_t *evt;
+ application_t *app;
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+
+ app_wrk = application_get_worker (app, mp->wrk_index);
+
+ svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
+ SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT,
+ msg);
+ evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
+ clib_memset (evt, 0, sizeof (*evt));
+ evt->event_type = SESSION_CTRL_EVT_APP_WRK_RPC;
+ rmp = (session_app_wrk_rpc_msg_t *) evt->data;
+ clib_memcpy (rmp->data, mp->data, sizeof (mp->data));
+ svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
+}
+
vlib_node_registration_t session_queue_node;
typedef struct
case SESSION_CTRL_EVT_APP_DETACH:
app_mq_detach_handler (session_evt_ctrl_data (wrk, elt));
break;
+ case SESSION_CTRL_EVT_APP_WRK_RPC:
+ session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
+ break;
default:
clib_warning ("unhandled event type %d", e->event_type);
}