vcl: support inter worker rpc 55/27955/5
authorFlorin Coras <fcoras@cisco.com>
Fri, 17 Jul 2020 03:46:17 +0000 (20:46 -0700)
committerFlorin Coras <florin.coras@gmail.com>
Tue, 11 Aug 2020 18:05:06 +0000 (18:05 +0000)
Type: feature

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I664cd14c84fc5cf2ffe61efce99c95219b44fad7

src/vcl/vcl_locked.c
src/vcl/vcl_private.h
src/vcl/vppcom.c
src/vnet/session/application_interface.h
src/vnet/session/session_node.c
src/vnet/session/session_types.h

index 19522c0..fc48618 100644 (file)
@@ -38,6 +38,7 @@ typedef struct vls_worker_
   vcl_locked_session_t *vls_pool;
   uword *session_index_to_vlsh_table;
   u32 wrk_index;
+  volatile int rpc_done;
 } vls_worker_t;
 
 typedef struct vls_local_
@@ -64,6 +65,24 @@ typedef struct vls_main_
 
 vls_main_t *vlsm;
 
+typedef enum vls_rpc_msg_type_
+{
+  VLS_RPC_CLONE_AND_SHARE,
+} vls_rpc_msg_type_e;
+
+typedef struct vls_rpc_msg_
+{
+  u8 type;
+  u8 data[0];
+} vls_rpc_msg_t;
+
+typedef struct vls_clone_and_share_msg_
+{
+  u32 vls_index;               /**< vls to be shared */
+  u32 origin_vls_wrk;          /**< worker that initiated the rpc */
+  u32 origin_vls_index;                /**< vls session of the originator */
+} vls_clone_and_share_msg_t;
+
 static inline u32
 vls_get_worker_index (void)
 {
@@ -614,51 +633,50 @@ done:
 }
 
 void
-vls_share_session (vcl_locked_session_t * vls, vls_worker_t * vls_wrk,
-                  vls_worker_t * vls_parent_wrk, vcl_worker_t * vcl_wrk)
+vls_init_share_session (vls_worker_t * vls_wrk, vcl_locked_session_t * vls)
 {
-  vcl_locked_session_t *parent_vls;
+  vls_shared_data_t *vls_shd;
+
+  u32 vls_shd_index = vls_shared_data_alloc ();
+
+  vls_shared_data_pool_rlock ();
+
+  vls_shd = vls_shared_data_get (vls_shd_index);
+  vls_shd->owner_wrk_index = vls_wrk->wrk_index;
+  vls->shared_data_index = vls_shd_index;
+  vec_add1 (vls_shd->workers_subscribed, vls_wrk->wrk_index);
+
+  vls_shared_data_pool_runlock ();
+}
+
+void
+vls_share_session (vls_worker_t * vls_wrk, vcl_locked_session_t * vls)
+{
+  vcl_worker_t *vcl_wrk = vcl_worker_get (vls_wrk->wrk_index);
   vls_shared_data_t *vls_shd;
   vcl_session_t *s;
 
   s = vcl_session_get (vcl_wrk, vls->session_index);
   if (!s)
     {
-      clib_warning ("wrk %u parent %u session %u vls %u NOT AVAILABLE",
-                   vcl_wrk->wrk_index, vls_parent_wrk->wrk_index,
-                   vls->session_index, vls->vls_index);
+      clib_warning ("wrk %u session %u vls %u NOT AVAILABLE",
+                   vcl_wrk->wrk_index, vls->session_index, vls->vls_index);
       return;
     }
 
+  ASSERT (vls->shared_data_index != ~0);
+
   /* Reinit session lock */
   clib_spinlock_init (&vls->lock);
 
-  if (vls->shared_data_index != ~0)
-    {
-      vls_shared_data_pool_rlock ();
-      vls_shd = vls_shared_data_get (vls->shared_data_index);
-    }
-  else
-    {
-      u32 vls_shd_index = vls_shared_data_alloc ();
-
-      vls_shared_data_pool_rlock ();
-
-      vls_shd = vls_shared_data_get (vls_shd_index);
-      vls_shd->owner_wrk_index = vls_parent_wrk->wrk_index;
-      vls->shared_data_index = vls_shd_index;
+  vls_shared_data_pool_rlock ();
 
-      /* Update parent shared data */
-      parent_vls = vls_session_get (vls_parent_wrk, vls->vls_index);
-      parent_vls->shared_data_index = vls_shd_index;
-      vec_add1 (vls_shd->workers_subscribed, vls_parent_wrk->wrk_index);
-    }
+  vls_shd = vls_shared_data_get (vls->shared_data_index);
 
   clib_spinlock_lock (&vls_shd->lock);
-
   vec_add1 (vls_shd->workers_subscribed, vls_wrk->wrk_index);
-
   clib_spinlock_unlock (&vls_shd->lock);
+
   vls_shared_data_pool_runlock ();
 
   if (s->rx_fifo)
@@ -675,12 +693,18 @@ vls_share_session (vcl_locked_session_t * vls, vls_worker_t * vls_wrk,
 static void
 vls_share_sessions (vls_worker_t * vls_parent_wrk, vls_worker_t * vls_wrk)
 {
-  vcl_worker_t *vcl_wrk = vcl_worker_get (vls_wrk->wrk_index);
-  vcl_locked_session_t *vls;
+  vcl_locked_session_t *vls, *parent_vls;
 
   /* *INDENT-OFF* */
   pool_foreach (vls, vls_wrk->vls_pool, ({
-    vls_share_session (vls, vls_wrk, vls_parent_wrk, vcl_wrk);
+    /* Initialize sharing on parent session */
+    if (vls->shared_data_index == ~0)
+      {
+       parent_vls = vls_session_get (vls_parent_wrk, vls->vls_index);
+       vls_init_share_session (vls_parent_wrk, parent_vls);
+       vls->shared_data_index = parent_vls->shared_data_index;
+      }
+    vls_share_session (vls_wrk, vls);
   }));
   /* *INDENT-ON* */
 }
@@ -1358,6 +1382,64 @@ vls_app_exit (void)
   vls_worker_free (wrk);
 }
 
+static void
+vls_clone_and_share_rpc_handler (void *args)
+{
+  vls_clone_and_share_msg_t *msg = (vls_clone_and_share_msg_t *) args;
+  vls_worker_t *wrk = vls_worker_get_current (), *dst_wrk;
+  vcl_locked_session_t *vls, *dst_vls;
+  vcl_worker_t *dst_vcl_wrk;
+  vcl_session_t *s, *dst_s;
+
+  vls = vls_session_get (wrk, msg->vls_index);
+  vls_init_share_session (wrk, vls);
+
+  s = vcl_session_get (vcl_worker_get_current (), vls->session_index);
+  dst_wrk = vls_worker_get (msg->origin_vls_wrk);
+  dst_vcl_wrk = vcl_worker_get (msg->origin_vls_wrk);
+  dst_vls = vls_session_get (dst_wrk, msg->origin_vls_index);
+  dst_vls->shared_data_index = vls->shared_data_index;
+  dst_s = vcl_session_get (dst_vcl_wrk, dst_vls->session_index);
+  clib_memcpy (dst_s, s, sizeof (*s));
+
+  dst_wrk->rpc_done = 1;
+}
+
+static void
+vls_rpc_handler (void *args)
+{
+  vls_rpc_msg_t *msg = (vls_rpc_msg_t *) args;
+  switch (msg->type)
+    {
+    case VLS_RPC_CLONE_AND_SHARE:
+      vls_clone_and_share_rpc_handler (msg->data);
+      break;
+    default:
+      break;
+    }
+}
+
+void
+vls_send_clone_and_share_rpc (vls_worker_t * wrk, vcl_locked_session_t * vls,
+                             u32 dst_wrk_index, u32 dst_vls_index)
+{
+  u8 data[sizeof (u8) + sizeof (vls_clone_and_share_msg_t)];
+  vls_clone_and_share_msg_t *msg;
+  vls_rpc_msg_t *rpc;
+
+  rpc = (vls_rpc_msg_t *) & data;
+  rpc->type = VLS_RPC_CLONE_AND_SHARE;
+  msg = (vls_clone_and_share_msg_t *) & rpc->data;
+  msg->origin_vls_wrk = wrk->wrk_index;
+  msg->origin_vls_index = vls->vls_index;
+  msg->vls_index = dst_vls_index;
+
+  wrk->rpc_done = 0;
+  vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data));
+  while (!wrk->rpc_done)
+    ;
+}
+
 int
 vls_app_create (char *app_name)
 {
@@ -1378,6 +1460,7 @@ vls_app_create (char *app_name)
   vls_worker_alloc ();
   vlsl->vls_wrk_index = vcl_get_worker_index ();
   vls_mt_locks_init ();
+  vcm->wrk_rpc_fn = vls_rpc_handler;
   return VPPCOM_OK;
 }
 
index 4a739e6..4a25632 100644 (file)
@@ -309,6 +309,8 @@ typedef struct vcl_worker_
   api_main_t bapi_api_ctx;
 } vcl_worker_t;
 
+typedef void (vcl_rpc_fn_t) (void *args);
+
 typedef struct vppcom_main_t_
 {
   u8 is_init;
@@ -357,6 +359,8 @@ typedef struct vppcom_main_t_
   /* VNET_API_ERROR_FOO -> "Foo" hash table */
   uword *error_string_by_error_number;
 
+  vcl_rpc_fn_t *wrk_rpc_fn;
+
 } vppcom_main_t;
 
 extern vppcom_main_t *vcm;
@@ -655,6 +659,8 @@ vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s)
 
 void vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s,
                                     u32 wrk_index);
+void vcl_send_worker_rpc (u32 dst_wrk_index, void *data, u32 data_len);
+
 /*
  * VCL Binary API
  */
index f1478f8..41d2f31 100644 (file)
@@ -351,6 +351,36 @@ vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s,
   app_send_ctrl_evt_to_vpp (mq, app_evt);
 }
 
+void
+vcl_send_worker_rpc (u32 dst_wrk_index, void *data, u32 data_len)
+{
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  session_app_wrk_rpc_msg_t *mp;
+  vcl_worker_t *dst_wrk, *wrk;
+  svm_msg_q_t *mq;
+
+  if (data_len > sizeof (mp->data))
+    goto done;
+
+  clib_spinlock_lock (&vcm->workers_lock);
+
+  dst_wrk = vcl_worker_get_if_valid (dst_wrk_index);
+  if (!dst_wrk)
+    goto done;
+
+  wrk = vcl_worker_get_current ();
+  mq = vcl_worker_ctrl_mq (wrk);
+  app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_APP_WRK_RPC);
+  mp = (session_app_wrk_rpc_msg_t *) app_evt->evt->data;
+  mp->client_index = wrk->my_client_index;
+  mp->wrk_index = dst_wrk->vpp_wrk_index;
+  clib_memcpy (mp->data, data, data_len);
+  app_send_ctrl_evt_to_vpp (mq, app_evt);
+
+done:
+  clib_spinlock_unlock (&vcm->workers_lock);
+}
+
 static u32
 vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp,
                              u32 ls_index)
@@ -866,6 +896,15 @@ vcl_session_app_del_segment_handler (vcl_worker_t * wrk, void *data)
   VDBG (1, "Unmapped segment: %d", msg->segment_handle);
 }
 
+static void
+vcl_worker_rpc_handler (vcl_worker_t * wrk, void *data)
+{
+  if (!vcm->wrk_rpc_fn)
+    return;
+
+  (vcm->wrk_rpc_fn) (data);
+}
+
 static int
 vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
 {
@@ -923,6 +962,9 @@ vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
       vcl_session_app_del_segment_handler (wrk, e->data);
       break;
+    case SESSION_CTRL_EVT_RPC:
+      vcl_worker_rpc_handler (wrk, e->data);
+      break;
     default:
       clib_warning ("unhandled %u", e->event_type);
     }
@@ -2259,6 +2301,9 @@ vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
       vcl_session_app_del_segment_handler (wrk, e->data);
       break;
+    case SESSION_CTRL_EVT_RPC:
+      vcl_worker_rpc_handler (wrk, e->data);
+      break;
     default:
       clib_warning ("unhandled: %u", e->event_type);
       break;
@@ -2875,6 +2920,9 @@ vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
     case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
       vcl_session_app_del_segment_handler (wrk, e->data);
       break;
+    case SESSION_CTRL_EVT_RPC:
+      vcl_worker_rpc_handler (wrk, e->data);
+      break;
     default:
       VDBG (0, "unhandled: %u", e->event_type);
       break;
index 7a3eeeb..d189793 100644 (file)
@@ -527,6 +527,13 @@ typedef struct session_cleanup_msg_
   u8 type;
 } __clib_packed session_cleanup_msg_t;
 
+typedef struct session_app_wrk_rpc_msg_
+{
+  u32 client_index;    /**< app client index */
+  u32 wrk_index;       /**< dst worker index */
+  u8 data[252];                /**< rpc data */
+} __clib_packed session_app_wrk_rpc_msg_t;
+
 typedef struct app_session_event_
 {
   svm_msg_q_msg_t msg;
index 738f0b9..763b789 100644 (file)
@@ -490,6 +490,33 @@ session_mq_worker_update_handler (void *data)
     app_worker_close_notify (app_wrk, s);
 }
 
+static void
+session_mq_app_wrk_rpc_handler (void *data)
+{
+  session_app_wrk_rpc_msg_t *mp = (session_app_wrk_rpc_msg_t *) data;
+  svm_msg_q_msg_t _msg, *msg = &_msg;
+  session_app_wrk_rpc_msg_t *rmp;
+  app_worker_t *app_wrk;
+  session_event_t *evt;
+  application_t *app;
+
+  app = application_lookup (mp->client_index);
+  if (!app)
+    return;
+
+  app_wrk = application_get_worker (app, mp->wrk_index);
+
+  svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
+                                      SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT,
+                                      msg);
+  evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
+  clib_memset (evt, 0, sizeof (*evt));
+  evt->event_type = SESSION_CTRL_EVT_APP_WRK_RPC;
+  rmp = (session_app_wrk_rpc_msg_t *) evt->data;
+  clib_memcpy (rmp->data, mp->data, sizeof (mp->data));
+  svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
+}
+
 vlib_node_registration_t session_queue_node;
 
 typedef struct
@@ -1226,6 +1253,9 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
     case SESSION_CTRL_EVT_APP_DETACH:
       app_mq_detach_handler (session_evt_ctrl_data (wrk, elt));
       break;
+    case SESSION_CTRL_EVT_APP_WRK_RPC:
+      session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
+      break;
     default:
       clib_warning ("unhandled event type %d", e->event_type);
     }
index 784312d..b3a7eb2 100644 (file)
@@ -358,6 +358,7 @@ typedef enum
   SESSION_CTRL_EVT_APP_DEL_SEGMENT,
   SESSION_CTRL_EVT_MIGRATED,
   SESSION_CTRL_EVT_CLEANUP,
+  SESSION_CTRL_EVT_APP_WRK_RPC,
 } session_evt_type_t;
 
 #define foreach_session_ctrl_evt                               \