Move RPC calls off the binary API input queue
[vpp.git] / src / vlibmemory / memory_api.c
index a444ec7..7a7644a 100644 (file)
@@ -103,6 +103,14 @@ memclnt_queue_callback (vlib_main_t * vm)
          break;
        }
     }
+  if (vec_len (vm->pending_rpc_requests))
+    {
+      vm->queue_signal_pending = 1;
+      vm->api_queue_nonempty = 1;
+      vlib_process_signal_event (vm, vl_api_clnt_node.index,
+                                /* event_type */ QUEUE_SIGNAL_EVENT,
+                                /* event_data */ 0);
+    }
 }
 
 /*
@@ -703,6 +711,32 @@ vl_mem_api_handle_msg_main (vlib_main_t * vm, vlib_node_runtime_t * node)
                                    am->shmem_hdr->vl_input_queue);
 }
 
+int
+vl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node)
+{
+  api_main_t *am = &api_main;
+  int i;
+  uword *tmp, mp;
+
+  /*
+   * Swap pending and processing vectors, then process the RPCs
+   * Avoid deadlock conditions by construction.
+   */
+  clib_spinlock_lock_if_init (&vm->pending_rpc_lock);
+  tmp = vm->processing_rpc_requests;
+  vec_reset_length (tmp);
+  vm->processing_rpc_requests = vm->pending_rpc_requests;
+  vm->pending_rpc_requests = tmp;
+  clib_spinlock_unlock_if_init (&vm->pending_rpc_lock);
+
+  for (i = 0; i < vec_len (vm->processing_rpc_requests); i++)
+    {
+      mp = vm->processing_rpc_requests[i];
+      vl_msg_api_handler_with_vm_node (am, (void *) mp, vm, node);
+    }
+  return 0;
+}
+
 int
 vl_mem_api_handle_msg_private (vlib_main_t * vm, vlib_node_runtime_t * node,
                               u32 reg_index)