Move RPC calls off the binary API input queue
[vpp.git] / src / vlibmemory / vlib_api.c
index b72f133..16e6402 100644 (file)
@@ -346,7 +346,8 @@ vl_api_clnt_process (vlib_main_t * vm, vlib_node_runtime_t * node,
       start_time = vlib_time_now (vm);
       while (1)
        {
-         if (vl_mem_api_handle_msg_main (vm, node))
+         if (vl_mem_api_handle_rpc (vm, node)
+             || vl_mem_api_handle_msg_main (vm, node))
            {
              vm->api_queue_nonempty = 0;
              VL_MEM_API_LOG_Q_LEN ("q-underflow: len %d", 0);
@@ -564,36 +565,14 @@ vl_api_rpc_call_reply_t_handler (vl_api_rpc_call_reply_t * mp)
 void
 vl_api_send_pending_rpc_requests (vlib_main_t * vm)
 {
-  api_main_t *am = &api_main;
-  vl_shmem_hdr_t *shmem_hdr = am->shmem_hdr;
-  svm_queue_t *q;
-  int i;
-
-  /*
-   * Use the "normal" control-plane mechanism for the main thread.
-   * Well, almost. if the main input queue is full, we cannot
-   * block. Otherwise, we can expect a barrier sync timeout.
-   */
-  q = shmem_hdr->vl_input_queue;
+  vlib_main_t *vm_global = &vlib_global_main;
 
-  for (i = 0; i < vec_len (vm->pending_rpc_requests); i++)
-    {
-      while (pthread_mutex_trylock (&q->mutex))
-       vlib_worker_thread_barrier_check ();
+  ASSERT (vm != vm_global);
 
-      while (PREDICT_FALSE (svm_queue_is_full (q)))
-       {
-         pthread_mutex_unlock (&q->mutex);
-         vlib_worker_thread_barrier_check ();
-         while (pthread_mutex_trylock (&q->mutex))
-           vlib_worker_thread_barrier_check ();
-       }
-
-      vl_msg_api_send_shmem_nolock (q, (u8 *) (vm->pending_rpc_requests + i));
-
-      pthread_mutex_unlock (&q->mutex);
-    }
-  _vec_len (vm->pending_rpc_requests) = 0;
+  clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock);
+  vec_append (vm_global->pending_rpc_requests, vm->pending_rpc_requests);
+  vec_reset_length (vm->pending_rpc_requests);
+  clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);
 }
 
 always_inline void
@@ -601,6 +580,7 @@ vl_api_rpc_call_main_thread_inline (void *fp, u8 * data, u32 data_length,
                                    u8 force_rpc)
 {
   vl_api_rpc_call_t *mp;
+  vlib_main_t *vm_global = &vlib_global_main;
   vlib_main_t *vm = vlib_get_main ();
 
   /* Main thread and not a forced RPC: call the function directly */
@@ -626,7 +606,12 @@ vl_api_rpc_call_main_thread_inline (void *fp, u8 * data, u32 data_length,
   mp->function = pointer_to_uword (fp);
   mp->need_barrier_sync = 1;
 
+  /* Add to the pending vector. Thread 0 requires locking. */
+  if (vm == vm_global)
+    clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock);
   vec_add1 (vm->pending_rpc_requests, (uword) mp);
+  if (vm == vm_global)
+    clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);
 }
 
 /*