Revert "Keep RPC traffic off the shared-memory API queue" 40/15540/2
authorFlorin Coras <florin.coras@gmail.com>
Thu, 25 Oct 2018 19:03:46 +0000 (19:03 +0000)
committerFlorin Coras <florin.coras@gmail.com>
Thu, 25 Oct 2018 19:34:03 +0000 (19:34 +0000)
This reverts commit 71615399e194847d7833b744caedab9b841733e5.

There seems to be an issue with ARPs when running with multiple workers.

Change-Id: Iaa68081512362945a9caf24dcb8d70fc7c5b75df
Signed-off-by: Florin Coras <fcoras@cisco.com>
src/vlib/main.h
src/vlib/threads.c
src/vlibmemory/memory_api.c
src/vlibmemory/memory_api.h
src/vlibmemory/vlib_api.c

index 64f2859..7c34fb6 100644 (file)
@@ -209,7 +209,6 @@ typedef struct vlib_main_t
 
   /* Vector of pending RPC requests */
   uword *pending_rpc_requests;
-  clib_spinlock_t pending_rpc_lock;
 
 } vlib_main_t;
 
index 7ecfa30..c99458d 100644 (file)
@@ -699,9 +699,6 @@ start_workers (vlib_main_t * vm)
       vlib_worker_threads->node_reforks_required =
        clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
 
-      /* We'll need the rpc vector lock... */
-      clib_spinlock_init (&vm->pending_rpc_lock);
-
       /* Ask for an initial barrier sync */
       *vlib_worker_threads->workers_at_barrier = 0;
       *vlib_worker_threads->wait_at_barrier = 1;
index aa0e25b..a444ec7 100644 (file)
@@ -703,26 +703,6 @@ vl_mem_api_handle_msg_main (vlib_main_t * vm, vlib_node_runtime_t * node)
                                    am->shmem_hdr->vl_input_queue);
 }
 
-int
-vl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node)
-{
-  api_main_t *am = &api_main;
-  int i;
-  uword *rpc_requests, mp;
-
-  clib_spinlock_lock_if_init (&vm->pending_rpc_lock);
-  rpc_requests = vm->pending_rpc_requests;
-
-  for (i = 0; i < vec_len (rpc_requests); i++)
-    {
-      mp = rpc_requests[i];
-      vl_msg_api_handler_with_vm_node (am, (void *) mp, vm, node);
-    }
-  vec_reset_length (vm->pending_rpc_requests);
-  clib_spinlock_unlock_if_init (&vm->pending_rpc_lock);
-  return 0;
-}
-
 int
 vl_mem_api_handle_msg_private (vlib_main_t * vm, vlib_node_runtime_t * node,
                               u32 reg_index)
index f658006..4cda04b 100644 (file)
@@ -32,8 +32,6 @@ void vl_mem_api_dead_client_scan (api_main_t * am, vl_shmem_hdr_t * shm,
 int vl_mem_api_handle_msg_main (vlib_main_t * vm, vlib_node_runtime_t * node);
 int vl_mem_api_handle_msg_private (vlib_main_t * vm,
                                   vlib_node_runtime_t * node, u32 reg_index);
-int vl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node);
-
 vl_api_registration_t *vl_mem_api_client_index_to_registration (u32 handle);
 void vl_mem_api_enable_disable (vlib_main_t * vm, int yesno);
 u32 vl_api_memclnt_create_internal (char *, svm_queue_t *);
index 21f112b..b72f133 100644 (file)
@@ -346,8 +346,7 @@ vl_api_clnt_process (vlib_main_t * vm, vlib_node_runtime_t * node,
       start_time = vlib_time_now (vm);
       while (1)
        {
-         if (vl_mem_api_handle_rpc (vm, node)
-             || vl_mem_api_handle_msg_main (vm, node))
+         if (vl_mem_api_handle_msg_main (vm, node))
            {
              vm->api_queue_nonempty = 0;
              VL_MEM_API_LOG_Q_LEN ("q-underflow: len %d", 0);
@@ -565,16 +564,36 @@ vl_api_rpc_call_reply_t_handler (vl_api_rpc_call_reply_t * mp)
 void
 vl_api_send_pending_rpc_requests (vlib_main_t * vm)
 {
-  vlib_main_t *vm_global = &vlib_global_main;
+  api_main_t *am = &api_main;
+  vl_shmem_hdr_t *shmem_hdr = am->shmem_hdr;
+  svm_queue_t *q;
+  int i;
 
-  /* Our own RPCs are already pending */
-  if (vm == vm_global)
-    return;
+  /*
+   * Use the "normal" control-plane mechanism for the main thread.
+   * Well, almost. if the main input queue is full, we cannot
+   * block. Otherwise, we can expect a barrier sync timeout.
+   */
+  q = shmem_hdr->vl_input_queue;
 
-  clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock);
-  vec_append (vm_global->pending_rpc_requests, vm->pending_rpc_requests);
-  vec_reset_length (vm->pending_rpc_requests);
-  clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);
+  for (i = 0; i < vec_len (vm->pending_rpc_requests); i++)
+    {
+      while (pthread_mutex_trylock (&q->mutex))
+       vlib_worker_thread_barrier_check ();
+
+      while (PREDICT_FALSE (svm_queue_is_full (q)))
+       {
+         pthread_mutex_unlock (&q->mutex);
+         vlib_worker_thread_barrier_check ();
+         while (pthread_mutex_trylock (&q->mutex))
+           vlib_worker_thread_barrier_check ();
+       }
+
+      vl_msg_api_send_shmem_nolock (q, (u8 *) (vm->pending_rpc_requests + i));
+
+      pthread_mutex_unlock (&q->mutex);
+    }
+  _vec_len (vm->pending_rpc_requests) = 0;
 }
 
 always_inline void
@@ -582,7 +601,6 @@ vl_api_rpc_call_main_thread_inline (void *fp, u8 * data, u32 data_length,
                                    u8 force_rpc)
 {
   vl_api_rpc_call_t *mp;
-  vlib_main_t *vm_global = &vlib_global_main;
   vlib_main_t *vm = vlib_get_main ();
 
   /* Main thread and not a forced RPC: call the function directly */
@@ -608,12 +626,7 @@ vl_api_rpc_call_main_thread_inline (void *fp, u8 * data, u32 data_length,
   mp->function = pointer_to_uword (fp);
   mp->need_barrier_sync = 1;
 
-  /* Add to the pending vector. Thread 0 requires locking. */
-  if (vm == vm_global)
-    clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock);
   vec_add1 (vm->pending_rpc_requests, (uword) mp);
-  if (vm == vm_global)
-    clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);
 }
 
 /*