vapi: support api clients within vpp process
[vpp.git] / src / vlibmemory / memory_api.c
index c9eebab..b1d250b 100644 (file)
 #include <vlibmemory/vl_memory_api_h.h>
 #undef vl_endianfun
 
-static inline void *
-vl_api_memclnt_create_t_print (vl_api_memclnt_create_t * a, void *handle)
-{
-  vl_print (handle, "vl_api_memclnt_create_t:\n");
-  vl_print (handle, "name: %s\n", a->name);
-  vl_print (handle, "input_queue: 0x%wx\n", a->input_queue);
-  vl_print (handle, "context: %u\n", (unsigned) a->context);
-  vl_print (handle, "ctx_quota: %ld\n", (long) a->ctx_quota);
-  return handle;
-}
-
-static inline void *
-vl_api_memclnt_delete_t_print (vl_api_memclnt_delete_t * a, void *handle)
-{
-  vl_print (handle, "vl_api_memclnt_delete_t:\n");
-  vl_print (handle, "index: %u\n", (unsigned) a->index);
-  vl_print (handle, "handle: 0x%wx\n", a->handle);
-  return handle;
-}
-
 volatile int **vl_api_queue_cursizes;
 
 static void
 memclnt_queue_callback (vlib_main_t * vm)
 {
   int i;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
+  int have_pending_rpcs;
 
   if (PREDICT_FALSE (vec_len (vl_api_queue_cursizes) !=
                     1 + vec_len (am->vlib_private_rps)))
@@ -103,7 +84,12 @@ memclnt_queue_callback (vlib_main_t * vm)
          break;
        }
     }
-  if (vec_len (vm->pending_rpc_requests))
+
+  clib_spinlock_lock_if_init (&vm->pending_rpc_lock);
+  have_pending_rpcs = vec_len (vm->pending_rpc_requests) > 0;
+  clib_spinlock_unlock_if_init (&vm->pending_rpc_lock);
+
+  if (have_pending_rpcs)
     {
       vm->queue_signal_pending = 1;
       vm->api_queue_nonempty = 1;
@@ -121,31 +107,27 @@ vl_api_memclnt_create_internal (char *name, svm_queue_t * q)
 {
   vl_api_registration_t **regpp;
   vl_api_registration_t *regp;
-  svm_region_t *svm;
   void *oldheap;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
 
   ASSERT (vlib_get_thread_index () == 0);
   pool_get (am->vl_clients, regpp);
 
-  svm = am->vlib_rp;
 
-  pthread_mutex_lock (&svm->mutex);
-  oldheap = svm_push_data_heap (svm);
+  oldheap = vl_msg_push_heap ();
   *regpp = clib_mem_alloc (sizeof (vl_api_registration_t));
 
   regp = *regpp;
   clib_memset (regp, 0, sizeof (*regp));
   regp->registration_type = REGISTRATION_TYPE_SHMEM;
   regp->vl_api_registration_pool_index = regpp - am->vl_clients;
-  regp->vlib_rp = svm;
+  regp->vlib_rp = am->vlib_rp;
   regp->shmem_hdr = am->shmem_hdr;
 
   regp->vl_input_queue = q;
   regp->name = format (0, "%s%c", name, 0);
 
-  pthread_mutex_unlock (&svm->mutex);
-  svm_pop_heap (oldheap);
+  vl_msg_pop_heap (oldheap);
   return vl_msg_api_handle_from_index_and_epoch
     (regp->vl_api_registration_pool_index,
      am->shmem_hdr->application_restarts);
@@ -160,11 +142,10 @@ vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t * mp)
   vl_api_registration_t **regpp;
   vl_api_registration_t *regp;
   vl_api_memclnt_create_reply_t *rp;
-  svm_region_t *svm;
   svm_queue_t *q;
   int rv = 0;
   void *oldheap;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   u8 *msg_table;
 
   /*
@@ -195,24 +176,23 @@ vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t * mp)
 
   pool_get (am->vl_clients, regpp);
 
-  svm = am->vlib_rp;
-
-  pthread_mutex_lock (&svm->mutex);
-  oldheap = svm_push_data_heap (svm);
+  oldheap = vl_msg_push_heap ();
   *regpp = clib_mem_alloc (sizeof (vl_api_registration_t));
 
   regp = *regpp;
   clib_memset (regp, 0, sizeof (*regp));
   regp->registration_type = REGISTRATION_TYPE_SHMEM;
   regp->vl_api_registration_pool_index = regpp - am->vl_clients;
-  regp->vlib_rp = svm;
+  regp->vlib_rp = am->vlib_rp;
   regp->shmem_hdr = am->shmem_hdr;
   regp->clib_file_index = am->shmem_hdr->clib_file_index;
 
   q = regp->vl_input_queue = (svm_queue_t *) (uword) mp->input_queue;
+  VL_MSG_API_SVM_QUEUE_UNPOISON (q);
 
   regp->name = format (0, "%s", mp->name);
   vec_add1 (regp->name, 0);
+  regp->keepalive = true;
 
   if (am->serialized_message_table_in_shmem == 0)
     am->serialized_message_table_in_shmem =
@@ -223,8 +203,7 @@ vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t * mp)
   else
     msg_table = am->serialized_message_table_in_shmem;
 
-  pthread_mutex_unlock (&svm->mutex);
-  svm_pop_heap (oldheap);
+  vl_msg_pop_heap (oldheap);
 
   rp = vl_msg_api_alloc (sizeof (*rp));
   rp->_vl_msg_id = ntohs (VL_API_MEMCLNT_CREATE_REPLY);
@@ -239,13 +218,94 @@ vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t * mp)
   vl_msg_api_send_shmem (q, (u8 *) & rp);
 }
 
-int
+void
+vl_api_memclnt_create_v2_t_handler (vl_api_memclnt_create_v2_t *mp)
+{
+  vl_api_registration_t **regpp;
+  vl_api_registration_t *regp;
+  vl_api_memclnt_create_v2_reply_t *rp;
+  svm_queue_t *q;
+  int rv = 0;
+  void *oldheap;
+  api_main_t *am = vlibapi_get_main ();
+  u8 *msg_table;
+
+  /*
+   * This is tortured. Maintain a vlib-address-space private
+   * pool of client registrations. We use the shared-memory virtual
+   * address of client structure as a handle, to allow direct
+   * manipulation of context quota vbls from the client library.
+   *
+   * This scheme causes trouble w/ API message trace replay, since
+   * some random VA from clib_mem_alloc() certainly won't
+   * occur in the Linux sim. The (very) few places
+   * that care need to use the pool index.
+   *
+   * Putting the registration object(s) into a pool in shared memory and
+   * using the pool index as a handle seems like a great idea.
+   * Unfortunately, each and every reference to that pool would need
+   * to be protected by a mutex:
+   *
+   *     Client                      VLIB
+   *     ------                      ----
+   *     convert pool index to
+   *     pointer.
+   *     <deschedule>
+   *                                 expand pool
+   *                                 <deschedule>
+   *     kaboom!
+   */
+
+  pool_get (am->vl_clients, regpp);
+
+  oldheap = vl_msg_push_heap ();
+  *regpp = clib_mem_alloc (sizeof (vl_api_registration_t));
+
+  regp = *regpp;
+  clib_memset (regp, 0, sizeof (*regp));
+  regp->registration_type = REGISTRATION_TYPE_SHMEM;
+  regp->vl_api_registration_pool_index = regpp - am->vl_clients;
+  regp->vlib_rp = am->vlib_rp;
+  regp->shmem_hdr = am->shmem_hdr;
+  regp->clib_file_index = am->shmem_hdr->clib_file_index;
+
+  q = regp->vl_input_queue = (svm_queue_t *) (uword) mp->input_queue;
+  VL_MSG_API_SVM_QUEUE_UNPOISON (q);
+
+  regp->name = format (0, "%s", mp->name);
+  vec_add1 (regp->name, 0);
+  regp->keepalive = mp->keepalive;
+
+  if (am->serialized_message_table_in_shmem == 0)
+    am->serialized_message_table_in_shmem =
+      vl_api_serialize_message_table (am, 0);
+
+  if (am->vlib_rp != am->vlib_primary_rp)
+    msg_table = vl_api_serialize_message_table (am, 0);
+  else
+    msg_table = am->serialized_message_table_in_shmem;
+
+  vl_msg_pop_heap (oldheap);
+
+  rp = vl_msg_api_alloc (sizeof (*rp));
+  rp->_vl_msg_id = ntohs (VL_API_MEMCLNT_CREATE_V2_REPLY);
+  rp->handle = (uword) regp;
+  rp->index = vl_msg_api_handle_from_index_and_epoch (
+    regp->vl_api_registration_pool_index, am->shmem_hdr->application_restarts);
+  rp->context = mp->context;
+  rp->response = ntohl (rv);
+  rp->message_table = pointer_to_uword (msg_table);
+
+  vl_msg_api_send_shmem (q, (u8 *) &rp);
+}
+
+void
 vl_api_call_reaper_functions (u32 client_index)
 {
   clib_error_t *error = 0;
   _vl_msg_api_function_list_elt_t *i;
 
-  i = api_main.reaper_function_registrations;
+  i = vlibapi_get_main ()->reaper_function_registrations;
   while (i)
     {
       error = i->f (client_index);
@@ -253,7 +313,6 @@ vl_api_call_reaper_functions (u32 client_index)
        clib_error_report (error);
       i = i->next_init_function;
     }
-  return 0;
 }
 
 /*
@@ -265,15 +324,13 @@ vl_api_memclnt_delete_t_handler (vl_api_memclnt_delete_t * mp)
   vl_api_registration_t **regpp;
   vl_api_registration_t *regp;
   vl_api_memclnt_delete_reply_t *rp;
-  svm_region_t *svm;
   void *oldheap;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   u32 handle, client_index, epoch;
 
   handle = mp->index;
 
-  if (vl_api_call_reaper_functions (handle))
-    return;
+  vl_api_call_reaper_functions (handle);
 
   epoch = vl_msg_api_handle_get_epoch (handle);
   client_index = vl_msg_api_handle_get_index (handle);
@@ -293,7 +350,6 @@ vl_api_memclnt_delete_t_handler (vl_api_memclnt_delete_t * mp)
     {
       int i;
       regp = *regpp;
-      svm = am->vlib_rp;
       int private_registration = 0;
 
       /* Send reply unless client asked us to do the cleanup */
@@ -326,11 +382,11 @@ vl_api_memclnt_delete_t_handler (vl_api_memclnt_delete_t * mp)
       for (i = 0; i < vec_len (am->vlib_private_rps); i++)
        {
          /* Is this a pairwise / private API segment? */
-         if (am->vlib_private_rps[i] == svm)
+         if (am->vlib_private_rps[i] == am->vlib_rp)
            {
              /* Note: account for the memfd header page */
-             uword virtual_base = svm->virtual_base - MMAP_PAGESIZE;
-             uword virtual_size = svm->virtual_size + MMAP_PAGESIZE;
+             uword virtual_base = am->vlib_rp->virtual_base - MMAP_PAGESIZE;
+             uword virtual_size = am->vlib_rp->virtual_size + MMAP_PAGESIZE;
 
              /*
               * Kill the registration pool element before we make
@@ -354,16 +410,14 @@ vl_api_memclnt_delete_t_handler (vl_api_memclnt_delete_t * mp)
        {
          pool_put_index (am->vl_clients,
                          regp->vl_api_registration_pool_index);
-         pthread_mutex_lock (&svm->mutex);
-         oldheap = svm_push_data_heap (svm);
+         oldheap = vl_msg_push_heap ();
          if (mp->do_cleanup)
            svm_queue_free (regp->vl_input_queue);
          vec_free (regp->name);
          /* Poison the old registration */
          clib_memset (regp, 0xF1, sizeof (*regp));
          clib_mem_free (regp);
-         pthread_mutex_unlock (&svm->mutex);
-         svm_pop_heap (oldheap);
+         vl_msg_pop_heap (oldheap);
          /*
           * These messages must be freed manually, since they're set up
           * as "bounce" messages. In the private_registration == 1 case,
@@ -410,7 +464,7 @@ vl_api_memclnt_keepalive_t_handler (vl_api_memclnt_keepalive_t * mp)
   api_main_t *am;
   vl_shmem_hdr_t *shmem_hdr;
 
-  am = &api_main;
+  am = vlibapi_get_main ();
   shmem_hdr = am->shmem_hdr;
 
   rmp = vl_msg_api_alloc_as_if_client (sizeof (*rmp));
@@ -425,11 +479,12 @@ vl_api_memclnt_keepalive_t_handler (vl_api_memclnt_keepalive_t * mp)
  * don't trace memclnt_keepalive[_reply] msgs
  */
 
-#define foreach_vlib_api_msg                            \
-_(MEMCLNT_CREATE, memclnt_create, 1)                    \
-_(MEMCLNT_DELETE, memclnt_delete, 1)                    \
-_(MEMCLNT_KEEPALIVE, memclnt_keepalive, 0)              \
-_(MEMCLNT_KEEPALIVE_REPLY, memclnt_keepalive_reply, 0)
+#define foreach_vlib_api_msg                                                  \
+  _ (MEMCLNT_CREATE, memclnt_create, 0)                                       \
+  _ (MEMCLNT_CREATE_V2, memclnt_create_v2, 0)                                 \
+  _ (MEMCLNT_DELETE, memclnt_delete, 0)                                       \
+  _ (MEMCLNT_KEEPALIVE, memclnt_keepalive, 0)                                 \
+  _ (MEMCLNT_KEEPALIVE_REPLY, memclnt_keepalive_reply, 0)
 
 /*
  * memory_api_init
@@ -438,7 +493,7 @@ int
 vl_mem_api_init (const char *region_name)
 {
   int rv;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   vl_msg_api_msg_config_t cfg;
   vl_msg_api_msg_config_t *c = &cfg;
   vl_shmem_hdr_t *shm;
@@ -465,6 +520,14 @@ vl_mem_api_init (const char *region_name)
   foreach_vlib_api_msg;
 #undef _
 
+#define vl_msg_name_crc_list
+#include <vlibmemory/memclnt.api.h>
+#undef vl_msg_name_crc_list
+
+#define _(id, n, crc) vl_msg_api_add_msg_name_crc (am, #n "_" #crc, id);
+  foreach_vl_msg_name_crc_memclnt;
+#undef _
+
   /*
    * special-case freeing of memclnt_delete messages, so we can
    * simply munmap pairwise / private API segments...
@@ -487,7 +550,7 @@ vl_mem_api_init (const char *region_name)
 clib_error_t *
 map_api_segment_init (vlib_main_t * vm)
 {
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   int rv;
 
   if ((rv = vl_mem_api_init (am->region_name)) < 0)
@@ -503,9 +566,7 @@ send_memclnt_keepalive (vl_api_registration_t * regp, f64 now)
 {
   vl_api_memclnt_keepalive_t *mp;
   svm_queue_t *q;
-  api_main_t *am = &api_main;
-  svm_region_t *save_vlib_rp = am->vlib_rp;
-  vl_shmem_hdr_t *save_shmem_hdr = am->shmem_hdr;
+  api_main_t *am = vlibapi_get_main ();
 
   q = regp->vl_input_queue;
 
@@ -529,10 +590,7 @@ send_memclnt_keepalive (vl_api_registration_t * regp, f64 now)
    * memory clients..
    */
 
-  am->vlib_rp = regp->vlib_rp;
-  am->shmem_hdr = regp->shmem_hdr;
-
-  mp = vl_msg_api_alloc (sizeof (*mp));
+  mp = vl_mem_api_alloc_as_if_client_w_reg (regp, sizeof (*mp));
   clib_memset (mp, 0, sizeof (*mp));
   mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_MEMCLNT_KEEPALIVE);
   mp->context = mp->client_index =
@@ -544,10 +602,7 @@ send_memclnt_keepalive (vl_api_registration_t * regp, f64 now)
 
   /* Failure-to-send due to a stuffed queue is absolutely expected */
   if (svm_queue_add (q, (u8 *) & mp, 1 /* nowait */ ))
-    vl_msg_api_free (mp);
-
-  am->vlib_rp = save_vlib_rp;
-  am->shmem_hdr = save_shmem_hdr;
+    vl_msg_api_free_w_region (regp->vlib_rp, mp);
 }
 
 static void
@@ -605,10 +660,12 @@ vl_mem_api_dead_client_scan (api_main_t * am, vl_shmem_hdr_t * shm, f64 now)
   vec_reset_length (confused_indices);
 
   /* *INDENT-OFF* */
-  pool_foreach (regpp, am->vl_clients, ({
+  pool_foreach (regpp, am->vl_clients)  {
+      if (!(*regpp)->keepalive)
+       continue;
       vl_mem_send_client_keepalive_w_reg (am, now, regpp, &dead_indices,
-                                          &confused_indices);
-  }));
+                                         &confused_indices);
+  }
   /* *INDENT-ON* */
 
   /* This should "never happen," but if it does, fix it... */
@@ -624,7 +681,6 @@ vl_mem_api_dead_client_scan (api_main_t * am, vl_shmem_hdr_t * shm, f64 now)
   if (PREDICT_FALSE (vec_len (dead_indices) > 0))
     {
       int i;
-      svm_region_t *svm;
       void *oldheap;
 
       /* Allow the application to clean up its registrations */
@@ -637,13 +693,11 @@ vl_mem_api_dead_client_scan (api_main_t * am, vl_shmem_hdr_t * shm, f64 now)
 
              handle = vl_msg_api_handle_from_index_and_epoch
                (dead_indices[i], shm->application_restarts);
-             (void) vl_api_call_reaper_functions (handle);
+             vl_api_call_reaper_functions (handle);
            }
        }
 
-      svm = am->vlib_rp;
-      pthread_mutex_lock (&svm->mutex);
-      oldheap = svm_push_data_heap (svm);
+      oldheap = vl_msg_push_heap ();
 
       for (i = 0; i < vec_len (dead_indices); i++)
        {
@@ -651,7 +705,7 @@ vl_mem_api_dead_client_scan (api_main_t * am, vl_shmem_hdr_t * shm, f64 now)
          if (regpp)
            {
              /* Is this a pairwise SVM segment? */
-             if ((*regpp)->vlib_rp != svm)
+             if ((*regpp)->vlib_rp != am->vlib_rp)
                {
                  int i;
                  svm_region_t *dead_rp = (*regpp)->vlib_rp;
@@ -668,7 +722,7 @@ vl_mem_api_dead_client_scan (api_main_t * am, vl_shmem_hdr_t * shm, f64 now)
                      }
                  svm_pop_heap (oldheap);
                  clib_warning ("private rp %llx AWOL", dead_rp);
-                 oldheap = svm_push_data_heap (svm);
+                 oldheap = svm_push_data_heap (am->vlib_rp);
 
                found:
                  /* Kill it, accounting for the memfd header page */
@@ -677,7 +731,7 @@ vl_mem_api_dead_client_scan (api_main_t * am, vl_shmem_hdr_t * shm, f64 now)
                    clib_unix_warning ("munmap");
                  /* Reset the queue-length-address cache */
                  vec_reset_length (vl_api_queue_cursizes);
-                 oldheap = svm_push_data_heap (svm);
+                 oldheap = svm_push_data_heap (am->vlib_rp);
                }
              else
                {
@@ -693,27 +747,174 @@ vl_mem_api_dead_client_scan (api_main_t * am, vl_shmem_hdr_t * shm, f64 now)
              svm_pop_heap (oldheap);
              clib_warning ("Duplicate free, client index %d",
                            regpp - am->vl_clients);
-             oldheap = svm_push_data_heap (svm);
+             oldheap = svm_push_data_heap (am->vlib_rp);
            }
        }
 
       svm_client_scan_this_region_nolock (am->vlib_rp);
 
-      pthread_mutex_unlock (&svm->mutex);
-      svm_pop_heap (oldheap);
+      vl_msg_pop_heap (oldheap);
       for (i = 0; i < vec_len (dead_indices); i++)
        pool_put_index (am->vl_clients, dead_indices[i]);
     }
 }
 
+void (*vl_mem_api_fuzz_hook) (u16, void *);
+
+/* This is only to be called from a vlib/vnet app */
+static void
+vl_mem_api_handler_with_vm_node (api_main_t *am, svm_region_t *vlib_rp,
+                                void *the_msg, vlib_main_t *vm,
+                                vlib_node_runtime_t *node, u8 is_private)
+{
+  u16 id = clib_net_to_host_u16 (*((u16 *) the_msg));
+  u8 *(*handler) (void *, void *, void *);
+  u8 *(*print_fp) (void *, void *);
+  svm_region_t *old_vlib_rp;
+  void *save_shmem_hdr;
+  int is_mp_safe = 1;
+
+  if (PREDICT_FALSE (am->elog_trace_api_messages))
+    {
+      ELOG_TYPE_DECLARE (e) = {
+       .format = "api-msg: %s",
+       .format_args = "T4",
+      };
+      struct
+      {
+       u32 c;
+      } * ed;
+      ed = ELOG_DATA (am->elog_main, e);
+      if (id < vec_len (am->msg_names) && am->msg_names[id])
+       ed->c = elog_string (am->elog_main, (char *) am->msg_names[id]);
+      else
+       ed->c = elog_string (am->elog_main, "BOGUS");
+    }
+
+  if (id < vec_len (am->msg_handlers) && am->msg_handlers[id])
+    {
+      handler = (void *) am->msg_handlers[id];
+
+      if (PREDICT_FALSE (am->rx_trace && am->rx_trace->enabled))
+       vl_msg_api_trace (am, am->rx_trace, the_msg);
+
+      if (PREDICT_FALSE (am->msg_print_flag))
+       {
+         fformat (stdout, "[%d]: %s\n", id, am->msg_names[id]);
+         print_fp = (void *) am->msg_print_handlers[id];
+         if (print_fp == 0)
+           {
+             fformat (stdout, "  [no registered print fn for msg %d]\n", id);
+           }
+         else
+           {
+             (*print_fp) (the_msg, vm);
+           }
+       }
+      is_mp_safe = am->is_mp_safe[id];
+
+      if (!is_mp_safe)
+       {
+         vl_msg_api_barrier_trace_context (am->msg_names[id]);
+         vl_msg_api_barrier_sync ();
+       }
+      if (is_private)
+       {
+         old_vlib_rp = am->vlib_rp;
+         save_shmem_hdr = am->shmem_hdr;
+         am->vlib_rp = vlib_rp;
+         am->shmem_hdr = (void *) vlib_rp->user_ctx;
+       }
+
+      if (PREDICT_FALSE (vl_mem_api_fuzz_hook != 0))
+       (*vl_mem_api_fuzz_hook) (id, the_msg);
+
+      if (am->is_autoendian[id])
+       {
+         void (*endian_fp) (void *);
+         endian_fp = am->msg_endian_handlers[id];
+         (*endian_fp) (the_msg);
+       }
+      if (PREDICT_FALSE (vec_len (am->perf_counter_cbs) != 0))
+       clib_call_callbacks (am->perf_counter_cbs, am, id, 0 /* before */);
+
+      (*handler) (the_msg, vm, node);
+
+      if (PREDICT_FALSE (vec_len (am->perf_counter_cbs) != 0))
+       clib_call_callbacks (am->perf_counter_cbs, am, id, 1 /* after */);
+      if (is_private)
+       {
+         am->vlib_rp = old_vlib_rp;
+         am->shmem_hdr = save_shmem_hdr;
+       }
+      if (!is_mp_safe)
+       vl_msg_api_barrier_release ();
+    }
+  else
+    {
+      clib_warning ("no handler for msg id %d", id);
+    }
+
+  /*
+   * Special-case, so we can e.g. bounce messages off the vnet
+   * main thread without copying them...
+   */
+  if (id >= vec_len (am->message_bounce) || !(am->message_bounce[id]))
+    {
+      if (is_private)
+       {
+         old_vlib_rp = am->vlib_rp;
+         save_shmem_hdr = am->shmem_hdr;
+         am->vlib_rp = vlib_rp;
+         am->shmem_hdr = (void *) vlib_rp->user_ctx;
+       }
+      vl_msg_api_free (the_msg);
+      if (is_private)
+       {
+         am->vlib_rp = old_vlib_rp;
+         am->shmem_hdr = save_shmem_hdr;
+       }
+    }
+
+  if (PREDICT_FALSE (am->elog_trace_api_messages))
+    {
+      ELOG_TYPE_DECLARE (e) = { .format = "api-msg-done(%s): %s",
+                               .format_args = "t4T4",
+                               .n_enum_strings = 2,
+                               .enum_strings = {
+                                 "barrier",
+                                 "mp-safe",
+                               } };
+
+      struct
+      {
+       u32 barrier;
+       u32 c;
+      } * ed;
+      ed = ELOG_DATA (am->elog_main, e);
+      if (id < vec_len (am->msg_names) && am->msg_names[id])
+       ed->c = elog_string (am->elog_main, (char *) am->msg_names[id]);
+      else
+       ed->c = elog_string (am->elog_main, "BOGUS");
+      ed->barrier = is_mp_safe;
+    }
+}
+
 static inline int
-void_mem_api_handle_msg_i (api_main_t * am, vlib_main_t * vm,
-                          vlib_node_runtime_t * node, svm_queue_t * q)
+void_mem_api_handle_msg_i (api_main_t * am, svm_region_t * vlib_rp,
+                          vlib_main_t * vm, vlib_node_runtime_t * node,
+                          u8 is_private)
 {
+  svm_queue_t *q;
   uword mp;
+
+  q = ((vl_shmem_hdr_t *) (void *) vlib_rp->user_ctx)->vl_input_queue;
+
   if (!svm_queue_sub2 (q, (u8 *) & mp))
     {
-      vl_msg_api_handler_with_vm_node (am, (void *) mp, vm, node);
+      VL_MSG_API_UNPOISON ((void *) mp);
+      vl_mem_api_handler_with_vm_node (am, vlib_rp, (void *) mp, vm, node,
+                                      is_private);
       return 0;
     }
   return -1;
@@ -722,15 +923,15 @@ void_mem_api_handle_msg_i (api_main_t * am, vlib_main_t * vm,
 int
 vl_mem_api_handle_msg_main (vlib_main_t * vm, vlib_node_runtime_t * node)
 {
-  api_main_t *am = &api_main;
-  return void_mem_api_handle_msg_i (am, vm, node,
-                                   am->shmem_hdr->vl_input_queue);
+  api_main_t *am = vlibapi_get_main ();
+  return void_mem_api_handle_msg_i (am, am->vlib_rp, vm, node,
+                                   0 /* is_private */ );
 }
 
 int
 vl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node)
 {
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   int i;
   uword *tmp, mp;
 
@@ -762,7 +963,8 @@ vl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node)
       for (i = 0; i < vec_len (vm->processing_rpc_requests); i++)
        {
          mp = vm->processing_rpc_requests[i];
-         vl_msg_api_handler_with_vm_node (am, (void *) mp, vm, node);
+         vl_mem_api_handler_with_vm_node (am, am->vlib_rp, (void *) mp, vm,
+                                          node, 0 /* is_private */);
        }
       vl_msg_api_barrier_release ();
     }
@@ -774,23 +976,9 @@ int
 vl_mem_api_handle_msg_private (vlib_main_t * vm, vlib_node_runtime_t * node,
                               u32 reg_index)
 {
-  api_main_t *am = &api_main;
-  vl_shmem_hdr_t *save_shmem_hdr = am->shmem_hdr;
-  svm_region_t *vlib_rp, *save_vlib_rp = am->vlib_rp;
-  svm_queue_t *q;
-  int rv;
-
-  vlib_rp = am->vlib_rp = am->vlib_private_rps[reg_index];
-
-  am->shmem_hdr = (void *) vlib_rp->user_ctx;
-  q = am->shmem_hdr->vl_input_queue;
-
-  rv = void_mem_api_handle_msg_i (am, vm, node, q);
-
-  am->shmem_hdr = save_shmem_hdr;
-  am->vlib_rp = save_vlib_rp;
-
-  return rv;
+  api_main_t *am = vlibapi_get_main ();
+  return void_mem_api_handle_msg_i (am, am->vlib_private_rps[reg_index], vm,
+                                   node, 1 /* is_private */ );
 }
 
 vl_api_registration_t *
@@ -798,7 +986,7 @@ vl_mem_api_client_index_to_registration (u32 handle)
 {
   vl_api_registration_t **regpp;
   vl_api_registration_t *regp;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   vl_shmem_hdr_t *shmem_hdr;
   u32 index;
 
@@ -826,7 +1014,7 @@ svm_queue_t *
 vl_api_client_index_to_input_queue (u32 index)
 {
   vl_api_registration_t *regp;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
 
   /* Special case: vlib trying to send itself a message */
   if (index == (u32) ~ 0)
@@ -841,7 +1029,7 @@ vl_api_client_index_to_input_queue (u32 index)
 static clib_error_t *
 setup_memclnt_exit (vlib_main_t * vm)
 {
-  atexit (vl_unmap_shmem);
+  atexit (vl_unmap_shmem_client);
   return 0;
 }
 
@@ -896,7 +1084,7 @@ vl_api_ring_command (vlib_main_t * vm,
 {
   int i;
   vl_shmem_hdr_t *shmem_hdr;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
 
   /* First, dump the primary region rings.. */
 
@@ -925,15 +1113,15 @@ vl_api_ring_command (vlib_main_t * vm,
 
       /* For horizontal scaling, add a hash table... */
       /* *INDENT-OFF* */
-      pool_foreach (regpp, am->vl_clients,
-      ({
+      pool_foreach (regpp, am->vl_clients)
+       {
         regp = *regpp;
         if (regp && regp->vlib_rp == vlib_rp)
           {
             vlib_cli_output (vm, "%s segment rings:", regp->name);
             goto found;
           }
-      }));
+      }
       vlib_cli_output (vm, "regp %llx not found?", regp);
       continue;
       /* *INDENT-ON* */
@@ -962,7 +1150,7 @@ VLIB_CLI_COMMAND (cli_show_api_ring_command, static) =
 clib_error_t *
 vlibmemory_init (vlib_main_t * vm)
 {
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   svm_map_region_args_t _a, *a = &_a;
   u8 *remove_path1, *remove_path2;
   void vlibsocket_reference (void);
@@ -1011,7 +1199,7 @@ vlibmemory_init (vlib_main_t * vm)
 void
 vl_set_memory_region_name (const char *name)
 {
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   am->region_name = name;
 }