api: API trace improvements
[vpp.git] / src / vlibmemory / memory_api.c
index 7a7644a..6c066a1 100644 (file)
 #include <vlibmemory/vl_memory_api_h.h>
 #undef vl_endianfun
 
-static inline void *
-vl_api_memclnt_create_t_print (vl_api_memclnt_create_t * a, void *handle)
-{
-  vl_print (handle, "vl_api_memclnt_create_t:\n");
-  vl_print (handle, "name: %s\n", a->name);
-  vl_print (handle, "input_queue: 0x%wx\n", a->input_queue);
-  vl_print (handle, "context: %u\n", (unsigned) a->context);
-  vl_print (handle, "ctx_quota: %ld\n", (long) a->ctx_quota);
-  return handle;
-}
-
-static inline void *
-vl_api_memclnt_delete_t_print (vl_api_memclnt_delete_t * a, void *handle)
-{
-  vl_print (handle, "vl_api_memclnt_delete_t:\n");
-  vl_print (handle, "index: %u\n", (unsigned) a->index);
-  vl_print (handle, "handle: 0x%wx\n", a->handle);
-  return handle;
-}
-
 volatile int **vl_api_queue_cursizes;
 
 static void
 memclnt_queue_callback (vlib_main_t * vm)
 {
   int i;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
+  int have_pending_rpcs;
 
   if (PREDICT_FALSE (vec_len (vl_api_queue_cursizes) !=
                     1 + vec_len (am->vlib_private_rps)))
@@ -103,7 +84,12 @@ memclnt_queue_callback (vlib_main_t * vm)
          break;
        }
     }
-  if (vec_len (vm->pending_rpc_requests))
+
+  clib_spinlock_lock_if_init (&vm->pending_rpc_lock);
+  have_pending_rpcs = vec_len (vm->pending_rpc_requests) > 0;
+  clib_spinlock_unlock_if_init (&vm->pending_rpc_lock);
+
+  if (have_pending_rpcs)
     {
       vm->queue_signal_pending = 1;
       vm->api_queue_nonempty = 1;
@@ -121,31 +107,27 @@ vl_api_memclnt_create_internal (char *name, svm_queue_t * q)
 {
   vl_api_registration_t **regpp;
   vl_api_registration_t *regp;
-  svm_region_t *svm;
   void *oldheap;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
 
   ASSERT (vlib_get_thread_index () == 0);
   pool_get (am->vl_clients, regpp);
 
-  svm = am->vlib_rp;
 
-  pthread_mutex_lock (&svm->mutex);
-  oldheap = svm_push_data_heap (svm);
+  oldheap = vl_msg_push_heap ();
   *regpp = clib_mem_alloc (sizeof (vl_api_registration_t));
 
   regp = *regpp;
   clib_memset (regp, 0, sizeof (*regp));
   regp->registration_type = REGISTRATION_TYPE_SHMEM;
   regp->vl_api_registration_pool_index = regpp - am->vl_clients;
-  regp->vlib_rp = svm;
+  regp->vlib_rp = am->vlib_rp;
   regp->shmem_hdr = am->shmem_hdr;
 
   regp->vl_input_queue = q;
   regp->name = format (0, "%s%c", name, 0);
 
-  pthread_mutex_unlock (&svm->mutex);
-  svm_pop_heap (oldheap);
+  vl_msg_pop_heap (oldheap);
   return vl_msg_api_handle_from_index_and_epoch
     (regp->vl_api_registration_pool_index,
      am->shmem_hdr->application_restarts);
@@ -160,11 +142,10 @@ vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t * mp)
   vl_api_registration_t **regpp;
   vl_api_registration_t *regp;
   vl_api_memclnt_create_reply_t *rp;
-  svm_region_t *svm;
   svm_queue_t *q;
   int rv = 0;
   void *oldheap;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   u8 *msg_table;
 
   /*
@@ -195,21 +176,19 @@ vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t * mp)
 
   pool_get (am->vl_clients, regpp);
 
-  svm = am->vlib_rp;
-
-  pthread_mutex_lock (&svm->mutex);
-  oldheap = svm_push_data_heap (svm);
+  oldheap = vl_msg_push_heap ();
   *regpp = clib_mem_alloc (sizeof (vl_api_registration_t));
 
   regp = *regpp;
   clib_memset (regp, 0, sizeof (*regp));
   regp->registration_type = REGISTRATION_TYPE_SHMEM;
   regp->vl_api_registration_pool_index = regpp - am->vl_clients;
-  regp->vlib_rp = svm;
+  regp->vlib_rp = am->vlib_rp;
   regp->shmem_hdr = am->shmem_hdr;
   regp->clib_file_index = am->shmem_hdr->clib_file_index;
 
   q = regp->vl_input_queue = (svm_queue_t *) (uword) mp->input_queue;
+  VL_MSG_API_SVM_QUEUE_UNPOISON (q);
 
   regp->name = format (0, "%s", mp->name);
   vec_add1 (regp->name, 0);
@@ -223,8 +202,7 @@ vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t * mp)
   else
     msg_table = am->serialized_message_table_in_shmem;
 
-  pthread_mutex_unlock (&svm->mutex);
-  svm_pop_heap (oldheap);
+  vl_msg_pop_heap (oldheap);
 
   rp = vl_msg_api_alloc (sizeof (*rp));
   rp->_vl_msg_id = ntohs (VL_API_MEMCLNT_CREATE_REPLY);
@@ -239,13 +217,13 @@ vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t * mp)
   vl_msg_api_send_shmem (q, (u8 *) & rp);
 }
 
-int
+void
 vl_api_call_reaper_functions (u32 client_index)
 {
   clib_error_t *error = 0;
   _vl_msg_api_function_list_elt_t *i;
 
-  i = api_main.reaper_function_registrations;
+  i = vlibapi_get_main ()->reaper_function_registrations;
   while (i)
     {
       error = i->f (client_index);
@@ -253,7 +231,6 @@ vl_api_call_reaper_functions (u32 client_index)
        clib_error_report (error);
       i = i->next_init_function;
     }
-  return 0;
 }
 
 /*
@@ -265,15 +242,13 @@ vl_api_memclnt_delete_t_handler (vl_api_memclnt_delete_t * mp)
   vl_api_registration_t **regpp;
   vl_api_registration_t *regp;
   vl_api_memclnt_delete_reply_t *rp;
-  svm_region_t *svm;
   void *oldheap;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   u32 handle, client_index, epoch;
 
   handle = mp->index;
 
-  if (vl_api_call_reaper_functions (handle))
-    return;
+  vl_api_call_reaper_functions (handle);
 
   epoch = vl_msg_api_handle_get_epoch (handle);
   client_index = vl_msg_api_handle_get_index (handle);
@@ -293,37 +268,43 @@ vl_api_memclnt_delete_t_handler (vl_api_memclnt_delete_t * mp)
     {
       int i;
       regp = *regpp;
-      svm = am->vlib_rp;
       int private_registration = 0;
 
-      /*
-       * Note: the API message handling path will set am->vlib_rp
-       * as appropriate for pairwise / private memory segments
-       */
-      rp = vl_msg_api_alloc (sizeof (*rp));
-      rp->_vl_msg_id = ntohs (VL_API_MEMCLNT_DELETE_REPLY);
-      rp->handle = mp->handle;
-      rp->response = 1;
-
-      vl_msg_api_send_shmem (regp->vl_input_queue, (u8 *) & rp);
-
-      if (client_index != regp->vl_api_registration_pool_index)
+      /* Send reply unless client asked us to do the cleanup */
+      if (!mp->do_cleanup)
        {
-         clib_warning ("mismatch client_index %d pool_index %d",
-                       client_index, regp->vl_api_registration_pool_index);
-         vl_msg_api_free (rp);
-         return;
+         /*
+          * Note: the API message handling path will set am->vlib_rp
+          * as appropriate for pairwise / private memory segments
+          */
+         rp = vl_msg_api_alloc (sizeof (*rp));
+         rp->_vl_msg_id = ntohs (VL_API_MEMCLNT_DELETE_REPLY);
+         rp->handle = mp->handle;
+         rp->response = 1;
+
+         vl_msg_api_send_shmem (regp->vl_input_queue, (u8 *) & rp);
+         if (client_index != regp->vl_api_registration_pool_index)
+           {
+             clib_warning ("mismatch client_index %d pool_index %d",
+                           client_index,
+                           regp->vl_api_registration_pool_index);
+             vl_msg_api_free (rp);
+             return;
+           }
        }
 
+      /* No dangling references, please */
+      *regpp = 0;
+
       /* For horizontal scaling, add a hash table... */
       for (i = 0; i < vec_len (am->vlib_private_rps); i++)
        {
          /* Is this a pairwise / private API segment? */
-         if (am->vlib_private_rps[i] == svm)
+         if (am->vlib_private_rps[i] == am->vlib_rp)
            {
              /* Note: account for the memfd header page */
-             u64 virtual_base = svm->virtual_base - MMAP_PAGESIZE;
-             u64 virtual_size = svm->virtual_size + MMAP_PAGESIZE;
+             uword virtual_base = am->vlib_rp->virtual_base - MMAP_PAGESIZE;
+             uword virtual_size = am->vlib_rp->virtual_size + MMAP_PAGESIZE;
 
              /*
               * Kill the registration pool element before we make
@@ -343,21 +324,18 @@ vl_api_memclnt_delete_t_handler (vl_api_memclnt_delete_t * mp)
            }
        }
 
-      /* No dangling references, please */
-      *regpp = 0;
-
       if (private_registration == 0)
        {
          pool_put_index (am->vl_clients,
                          regp->vl_api_registration_pool_index);
-         pthread_mutex_lock (&svm->mutex);
-         oldheap = svm_push_data_heap (svm);
+         oldheap = vl_msg_push_heap ();
+         if (mp->do_cleanup)
+           svm_queue_free (regp->vl_input_queue);
          vec_free (regp->name);
          /* Poison the old registration */
          clib_memset (regp, 0xF1, sizeof (*regp));
          clib_mem_free (regp);
-         pthread_mutex_unlock (&svm->mutex);
-         svm_pop_heap (oldheap);
+         vl_msg_pop_heap (oldheap);
          /*
           * These messages must be freed manually, since they're set up
           * as "bounce" messages. In the private_registration == 1 case,
@@ -404,7 +382,7 @@ vl_api_memclnt_keepalive_t_handler (vl_api_memclnt_keepalive_t * mp)
   api_main_t *am;
   vl_shmem_hdr_t *shmem_hdr;
 
-  am = &api_main;
+  am = vlibapi_get_main ();
   shmem_hdr = am->shmem_hdr;
 
   rmp = vl_msg_api_alloc_as_if_client (sizeof (*rmp));
@@ -414,11 +392,16 @@ vl_api_memclnt_keepalive_t_handler (vl_api_memclnt_keepalive_t * mp)
   vl_msg_api_send_shmem (shmem_hdr->vl_input_queue, (u8 *) & rmp);
 }
 
-#define foreach_vlib_api_msg                            \
-_(MEMCLNT_CREATE, memclnt_create)                       \
-_(MEMCLNT_DELETE, memclnt_delete)                       \
-_(MEMCLNT_KEEPALIVE, memclnt_keepalive)                 \
-_(MEMCLNT_KEEPALIVE_REPLY, memclnt_keepalive_reply)    \
+/*
+ * To avoid filling the API trace buffer with boring messages,
+ * don't trace memclnt_keepalive[_reply] msgs
+ */
+
+#define foreach_vlib_api_msg                                                  \
+  _ (MEMCLNT_CREATE, memclnt_create, 0)                                       \
+  _ (MEMCLNT_DELETE, memclnt_delete, 0)                                       \
+  _ (MEMCLNT_KEEPALIVE, memclnt_keepalive, 0)                                 \
+  _ (MEMCLNT_KEEPALIVE_REPLY, memclnt_keepalive_reply, 0)
 
 /*
  * memory_api_init
@@ -427,7 +410,7 @@ int
 vl_mem_api_init (const char *region_name)
 {
   int rv;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   vl_msg_api_msg_config_t cfg;
   vl_msg_api_msg_config_t *c = &cfg;
   vl_shmem_hdr_t *shm;
@@ -438,7 +421,7 @@ vl_mem_api_init (const char *region_name)
   if ((rv = vl_map_shmem (region_name, 1 /* is_vlib */ )) < 0)
     return rv;
 
-#define _(N,n) do {                                             \
+#define _(N,n,t) do {                                            \
     c->id = VL_API_##N;                                         \
     c->name = #n;                                               \
     c->handler = vl_api_##n##_t_handler;                        \
@@ -446,7 +429,7 @@ vl_mem_api_init (const char *region_name)
     c->endian = vl_api_##n##_t_endian;                          \
     c->print = vl_api_##n##_t_print;                            \
     c->size = sizeof(vl_api_##n##_t);                           \
-    c->traced = 1; /* trace, so these msgs print */             \
+    c->traced = t; /* trace, so these msgs print */             \
     c->replay = 0; /* don't replay client create/delete msgs */ \
     c->message_bounce = 0; /* don't bounce this message */     \
     vl_msg_api_config(c);} while (0);
@@ -454,12 +437,21 @@ vl_mem_api_init (const char *region_name)
   foreach_vlib_api_msg;
 #undef _
 
+#define vl_msg_name_crc_list
+#include <vlibmemory/memclnt.api.h>
+#undef vl_msg_name_crc_list
+
+#define _(id, n, crc) vl_msg_api_add_msg_name_crc (am, #n "_" #crc, id);
+  foreach_vl_msg_name_crc_memclnt;
+#undef _
+
   /*
    * special-case freeing of memclnt_delete messages, so we can
    * simply munmap pairwise / private API segments...
    */
   am->message_bounce[VL_API_MEMCLNT_DELETE] = 1;
   am->is_mp_safe[VL_API_MEMCLNT_KEEPALIVE_REPLY] = 1;
+  am->is_mp_safe[VL_API_MEMCLNT_KEEPALIVE] = 1;
 
   vlib_set_queue_signal_callback (vm, memclnt_queue_callback);
 
@@ -475,7 +467,7 @@ vl_mem_api_init (const char *region_name)
 clib_error_t *
 map_api_segment_init (vlib_main_t * vm)
 {
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   int rv;
 
   if ((rv = vl_mem_api_init (am->region_name)) < 0)
@@ -491,9 +483,7 @@ send_memclnt_keepalive (vl_api_registration_t * regp, f64 now)
 {
   vl_api_memclnt_keepalive_t *mp;
   svm_queue_t *q;
-  api_main_t *am = &api_main;
-  svm_region_t *save_vlib_rp = am->vlib_rp;
-  vl_shmem_hdr_t *save_shmem_hdr = am->shmem_hdr;
+  api_main_t *am = vlibapi_get_main ();
 
   q = regp->vl_input_queue;
 
@@ -517,10 +507,7 @@ send_memclnt_keepalive (vl_api_registration_t * regp, f64 now)
    * memory clients..
    */
 
-  am->vlib_rp = regp->vlib_rp;
-  am->shmem_hdr = regp->shmem_hdr;
-
-  mp = vl_msg_api_alloc (sizeof (*mp));
+  mp = vl_mem_api_alloc_as_if_client_w_reg (regp, sizeof (*mp));
   clib_memset (mp, 0, sizeof (*mp));
   mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_MEMCLNT_KEEPALIVE);
   mp->context = mp->client_index =
@@ -532,10 +519,7 @@ send_memclnt_keepalive (vl_api_registration_t * regp, f64 now)
 
   /* Failure-to-send due to a stuffed queue is absolutely expected */
   if (svm_queue_add (q, (u8 *) & mp, 1 /* nowait */ ))
-    vl_msg_api_free (mp);
-
-  am->vlib_rp = save_vlib_rp;
-  am->shmem_hdr = save_shmem_hdr;
+    vl_msg_api_free_w_region (regp->vlib_rp, mp);
 }
 
 static void
@@ -593,10 +577,10 @@ vl_mem_api_dead_client_scan (api_main_t * am, vl_shmem_hdr_t * shm, f64 now)
   vec_reset_length (confused_indices);
 
   /* *INDENT-OFF* */
-  pool_foreach (regpp, am->vl_clients, ({
+  pool_foreach (regpp, am->vl_clients)  {
       vl_mem_send_client_keepalive_w_reg (am, now, regpp, &dead_indices,
                                           &confused_indices);
-  }));
+  }
   /* *INDENT-ON* */
 
   /* This should "never happen," but if it does, fix it... */
@@ -612,7 +596,6 @@ vl_mem_api_dead_client_scan (api_main_t * am, vl_shmem_hdr_t * shm, f64 now)
   if (PREDICT_FALSE (vec_len (dead_indices) > 0))
     {
       int i;
-      svm_region_t *svm;
       void *oldheap;
 
       /* Allow the application to clean up its registrations */
@@ -625,13 +608,11 @@ vl_mem_api_dead_client_scan (api_main_t * am, vl_shmem_hdr_t * shm, f64 now)
 
              handle = vl_msg_api_handle_from_index_and_epoch
                (dead_indices[i], shm->application_restarts);
-             (void) vl_api_call_reaper_functions (handle);
+             vl_api_call_reaper_functions (handle);
            }
        }
 
-      svm = am->vlib_rp;
-      pthread_mutex_lock (&svm->mutex);
-      oldheap = svm_push_data_heap (svm);
+      oldheap = vl_msg_push_heap ();
 
       for (i = 0; i < vec_len (dead_indices); i++)
        {
@@ -639,13 +620,13 @@ vl_mem_api_dead_client_scan (api_main_t * am, vl_shmem_hdr_t * shm, f64 now)
          if (regpp)
            {
              /* Is this a pairwise SVM segment? */
-             if ((*regpp)->vlib_rp != svm)
+             if ((*regpp)->vlib_rp != am->vlib_rp)
                {
                  int i;
                  svm_region_t *dead_rp = (*regpp)->vlib_rp;
                  /* Note: account for the memfd header page */
-                 u64 virtual_base = dead_rp->virtual_base - MMAP_PAGESIZE;
-                 u64 virtual_size = dead_rp->virtual_size + MMAP_PAGESIZE;
+                 uword virtual_base = dead_rp->virtual_base - MMAP_PAGESIZE;
+                 uword virtual_size = dead_rp->virtual_size + MMAP_PAGESIZE;
 
                  /* For horizontal scaling, add a hash table... */
                  for (i = 0; i < vec_len (am->vlib_private_rps); i++)
@@ -654,14 +635,18 @@ vl_mem_api_dead_client_scan (api_main_t * am, vl_shmem_hdr_t * shm, f64 now)
                        vec_delete (am->vlib_private_rps, 1, i);
                        goto found;
                      }
+                 svm_pop_heap (oldheap);
                  clib_warning ("private rp %llx AWOL", dead_rp);
+                 oldheap = svm_push_data_heap (am->vlib_rp);
 
                found:
                  /* Kill it, accounting for the memfd header page */
+                 svm_pop_heap (oldheap);
                  if (munmap ((void *) virtual_base, virtual_size) < 0)
                    clib_unix_warning ("munmap");
                  /* Reset the queue-length-address cache */
                  vec_reset_length (vl_api_queue_cursizes);
+                 oldheap = svm_push_data_heap (am->vlib_rp);
                }
              else
                {
@@ -677,27 +662,33 @@ vl_mem_api_dead_client_scan (api_main_t * am, vl_shmem_hdr_t * shm, f64 now)
              svm_pop_heap (oldheap);
              clib_warning ("Duplicate free, client index %d",
                            regpp - am->vl_clients);
-             oldheap = svm_push_data_heap (svm);
+             oldheap = svm_push_data_heap (am->vlib_rp);
            }
        }
 
       svm_client_scan_this_region_nolock (am->vlib_rp);
 
-      pthread_mutex_unlock (&svm->mutex);
-      svm_pop_heap (oldheap);
+      vl_msg_pop_heap (oldheap);
       for (i = 0; i < vec_len (dead_indices); i++)
        pool_put_index (am->vl_clients, dead_indices[i]);
     }
 }
 
 static inline int
-void_mem_api_handle_msg_i (api_main_t * am, vlib_main_t * vm,
-                          vlib_node_runtime_t * node, svm_queue_t * q)
+void_mem_api_handle_msg_i (api_main_t * am, svm_region_t * vlib_rp,
+                          vlib_main_t * vm, vlib_node_runtime_t * node,
+                          u8 is_private)
 {
+  svm_queue_t *q;
   uword mp;
+
+  q = ((vl_shmem_hdr_t *) (void *) vlib_rp->user_ctx)->vl_input_queue;
+
   if (!svm_queue_sub2 (q, (u8 *) & mp))
     {
-      vl_msg_api_handler_with_vm_node (am, (void *) mp, vm, node);
+      VL_MSG_API_UNPOISON ((void *) mp);
+      vl_msg_api_handler_with_vm_node (am, vlib_rp, (void *) mp, vm, node,
+                                      is_private);
       return 0;
     }
   return -1;
@@ -706,15 +697,15 @@ void_mem_api_handle_msg_i (api_main_t * am, vlib_main_t * vm,
 int
 vl_mem_api_handle_msg_main (vlib_main_t * vm, vlib_node_runtime_t * node)
 {
-  api_main_t *am = &api_main;
-  return void_mem_api_handle_msg_i (am, vm, node,
-                                   am->shmem_hdr->vl_input_queue);
+  api_main_t *am = vlibapi_get_main ();
+  return void_mem_api_handle_msg_i (am, am->vlib_rp, vm, node,
+                                   0 /* is_private */ );
 }
 
 int
 vl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node)
 {
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   int i;
   uword *tmp, mp;
 
@@ -729,11 +720,29 @@ vl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node)
   vm->pending_rpc_requests = tmp;
   clib_spinlock_unlock_if_init (&vm->pending_rpc_lock);
 
-  for (i = 0; i < vec_len (vm->processing_rpc_requests); i++)
+  /*
+   * RPCs are used to reflect function calls to thread 0
+   * when the underlying code is not thread-safe.
+   *
+   * Grabbing the thread barrier across a set of RPCs
+   * greatly increases efficiency, and avoids
+   * running afoul of the barrier sync holddown timer.
+   * The barrier sync code supports recursive locking.
+   *
+   * We really need to rewrite RPC-based code...
+   */
+  if (PREDICT_TRUE (vec_len (vm->processing_rpc_requests)))
     {
-      mp = vm->processing_rpc_requests[i];
-      vl_msg_api_handler_with_vm_node (am, (void *) mp, vm, node);
+      vl_msg_api_barrier_sync ();
+      for (i = 0; i < vec_len (vm->processing_rpc_requests); i++)
+       {
+         mp = vm->processing_rpc_requests[i];
+         vl_msg_api_handler_with_vm_node (am, am->vlib_rp, (void *) mp, vm,
+                                          node, 0 /* is_private */ );
+       }
+      vl_msg_api_barrier_release ();
     }
+
   return 0;
 }
 
@@ -741,23 +750,9 @@ int
 vl_mem_api_handle_msg_private (vlib_main_t * vm, vlib_node_runtime_t * node,
                               u32 reg_index)
 {
-  api_main_t *am = &api_main;
-  vl_shmem_hdr_t *save_shmem_hdr = am->shmem_hdr;
-  svm_region_t *vlib_rp, *save_vlib_rp = am->vlib_rp;
-  svm_queue_t *q;
-  int rv;
-
-  vlib_rp = am->vlib_rp = am->vlib_private_rps[reg_index];
-
-  am->shmem_hdr = (void *) vlib_rp->user_ctx;
-  q = am->shmem_hdr->vl_input_queue;
-
-  rv = void_mem_api_handle_msg_i (am, vm, node, q);
-
-  am->shmem_hdr = save_shmem_hdr;
-  am->vlib_rp = save_vlib_rp;
-
-  return rv;
+  api_main_t *am = vlibapi_get_main ();
+  return void_mem_api_handle_msg_i (am, am->vlib_private_rps[reg_index], vm,
+                                   node, 1 /* is_private */ );
 }
 
 vl_api_registration_t *
@@ -765,7 +760,7 @@ vl_mem_api_client_index_to_registration (u32 handle)
 {
   vl_api_registration_t **regpp;
   vl_api_registration_t *regp;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   vl_shmem_hdr_t *shmem_hdr;
   u32 index;
 
@@ -793,7 +788,7 @@ svm_queue_t *
 vl_api_client_index_to_input_queue (u32 index)
 {
   vl_api_registration_t *regp;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
 
   /* Special case: vlib trying to send itself a message */
   if (index == (u32) ~ 0)
@@ -863,7 +858,7 @@ vl_api_ring_command (vlib_main_t * vm,
 {
   int i;
   vl_shmem_hdr_t *shmem_hdr;
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
 
   /* First, dump the primary region rings.. */
 
@@ -892,15 +887,15 @@ vl_api_ring_command (vlib_main_t * vm,
 
       /* For horizontal scaling, add a hash table... */
       /* *INDENT-OFF* */
-      pool_foreach (regpp, am->vl_clients,
-      ({
+      pool_foreach (regpp, am->vl_clients)
+       {
         regp = *regpp;
         if (regp && regp->vlib_rp == vlib_rp)
           {
             vlib_cli_output (vm, "%s segment rings:", regp->name);
             goto found;
           }
-      }));
+      }
       vlib_cli_output (vm, "regp %llx not found?", regp);
       continue;
       /* *INDENT-ON* */
@@ -929,9 +924,33 @@ VLIB_CLI_COMMAND (cli_show_api_ring_command, static) =
 clib_error_t *
 vlibmemory_init (vlib_main_t * vm)
 {
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   svm_map_region_args_t _a, *a = &_a;
-  clib_error_t *error;
+  u8 *remove_path1, *remove_path2;
+  void vlibsocket_reference (void);
+
+  vlibsocket_reference ();
+
+  /*
+   * By popular request / to avoid support fires, remove any old api segment
+   * files Right Here.
+   */
+  if (am->root_path == 0)
+    {
+      remove_path1 = format (0, "/dev/shm/global_vm%c", 0);
+      remove_path2 = format (0, "/dev/shm/vpe-api%c", 0);
+    }
+  else
+    {
+      remove_path1 = format (0, "/dev/shm/%s-global_vm%c", am->root_path, 0);
+      remove_path2 = format (0, "/dev/shm/%s-vpe-api%c", am->root_path, 0);
+    }
+
+  (void) unlink ((char *) remove_path1);
+  (void) unlink ((char *) remove_path2);
+
+  vec_free (remove_path1);
+  vec_free (remove_path2);
 
   clib_memset (a, 0, sizeof (*a));
   a->root_path = am->root_path;
@@ -948,15 +967,13 @@ vlibmemory_init (vlib_main_t * vm)
 
   svm_region_init_args (a);
 
-  error = vlib_call_init_function (vm, vlibsocket_init);
-
-  return error;
+  return 0;
 }
 
 void
 vl_set_memory_region_name (const char *name)
 {
-  api_main_t *am = &api_main;
+  api_main_t *am = vlibapi_get_main ();
   am->region_name = name;
 }