Repair vlib API socket server
[vpp.git] / src / vlibmemory / memory_vlib.c
index d305ea6..c9b3183 100644 (file)
@@ -96,17 +96,7 @@ vl_api_trace_plugin_msg_ids_t_print (vl_api_trace_plugin_msg_ids_t * a,
 #include <vlibmemory/vl_memory_api_h.h>
 #undef vl_endianfun
 
-void vl_socket_api_send (vl_api_registration_t * rp, u8 * elem)
-  __attribute__ ((weak));
-
-void
-vl_socket_api_send (vl_api_registration_t * rp, u8 * elem)
-{
-  static int count;
-
-  if (count++ < 5)
-    clib_warning ("need to link against -lvlibsocket, msg not sent!");
-}
+extern void vl_socket_api_send (vl_api_registration_t * rp, u8 * elem);
 
 void
 vl_msg_api_send (vl_api_registration_t * rp, u8 * elem)
@@ -117,7 +107,7 @@ vl_msg_api_send (vl_api_registration_t * rp, u8 * elem)
     }
   else
     {
-      vl_msg_api_send_shmem (rp->vl_input_queue, elem);
+      vl_msg_api_send_shmem (rp->vl_input_queue, (u8 *) & elem);
     }
 }
 
@@ -196,6 +186,7 @@ vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t * mp)
   int rv = 0;
   void *oldheap;
   api_main_t *am = &api_main;
+  u8 *serialized_message_table_in_shmem;
 
   /*
    * This is tortured. Maintain a vlib-address-space private
@@ -235,6 +226,8 @@ vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t * mp)
   memset (regp, 0, sizeof (*regp));
   regp->registration_type = REGISTRATION_TYPE_SHMEM;
   regp->vl_api_registration_pool_index = regpp - am->vl_clients;
+  regp->vlib_rp = svm;
+  regp->shmem_hdr = am->shmem_hdr;
 
   q = regp->vl_input_queue = (unix_shared_memory_queue_t *) (uword)
     mp->input_queue;
@@ -242,11 +235,11 @@ vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t * mp)
   regp->name = format (0, "%s", mp->name);
   vec_add1 (regp->name, 0);
 
+  serialized_message_table_in_shmem = vl_api_serialize_message_table (am, 0);
+
   pthread_mutex_unlock (&svm->mutex);
   svm_pop_heap (oldheap);
 
-  ASSERT (am->serialized_message_table_in_shmem);
-
   rp = vl_msg_api_alloc (sizeof (*rp));
   rp->_vl_msg_id = ntohs (VL_API_MEMCLNT_CREATE_REPLY);
   rp->handle = (uword) regp;
@@ -255,8 +248,7 @@ vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t * mp)
      am->shmem_hdr->application_restarts);
   rp->context = mp->context;
   rp->response = ntohl (rv);
-  rp->message_table =
-    pointer_to_uword (am->serialized_message_table_in_shmem);
+  rp->message_table = pointer_to_uword (serialized_message_table_in_shmem);
 
   vl_msg_api_send_shmem (q, (u8 *) & rp);
 }
@@ -313,11 +305,15 @@ vl_api_memclnt_delete_t_handler (vl_api_memclnt_delete_t * mp)
 
   if (!pool_is_free (am->vl_clients, regpp))
     {
+      int i;
       regp = *regpp;
       svm = am->vlib_rp;
+      int private_registration = 0;
 
-      /* $$$ check the input queue for e.g. punted sf's */
-
+      /*
+       * Note: the API message handling path will set am->vlib_rp
+       * as appropriate for pairwise / private memory segments
+       */
       rp = vl_msg_api_alloc (sizeof (*rp));
       rp->_vl_msg_id = ntohs (VL_API_MEMCLNT_DELETE_REPLY);
       rp->handle = mp->handle;
@@ -333,18 +329,56 @@ vl_api_memclnt_delete_t_handler (vl_api_memclnt_delete_t * mp)
          return;
        }
 
+      /* For horizontal scaling, add a hash table... */
+      for (i = 0; i < vec_len (am->vlib_private_rps); i++)
+       {
+         /* Is this a pairwise / private API segment? */
+         if (am->vlib_private_rps[i] == svm)
+           {
+             /* Note: account for the memfd header page */
+             u64 virtual_base = svm->virtual_base - MMAP_PAGESIZE;
+             u64 virtual_size = svm->virtual_size + MMAP_PAGESIZE;
+
+             /*
+              * Kill the registration pool element before we make
+              * the index vanish forever
+              */
+             pool_put_index (am->vl_clients,
+                             regp->vl_api_registration_pool_index);
+
+             vec_delete (am->vlib_private_rps, 1, i);
+             /* Kill it, accounting for the memfd header page */
+             if (munmap ((void *) virtual_base, virtual_size) < 0)
+               clib_unix_warning ("munmap");
+             /* Reset the queue-length-address cache */
+             vec_reset_length (vl_api_queue_cursizes);
+             private_registration = 1;
+             break;
+           }
+       }
+
       /* No dangling references, please */
       *regpp = 0;
 
-      pool_put_index (am->vl_clients, regp->vl_api_registration_pool_index);
-
-      pthread_mutex_lock (&svm->mutex);
-      oldheap = svm_push_data_heap (svm);
-      /* Poison the old registration */
-      memset (regp, 0xF1, sizeof (*regp));
-      clib_mem_free (regp);
-      pthread_mutex_unlock (&svm->mutex);
-      svm_pop_heap (oldheap);
+      if (private_registration == 0)
+       {
+         pool_put_index (am->vl_clients,
+                         regp->vl_api_registration_pool_index);
+         pthread_mutex_lock (&svm->mutex);
+         oldheap = svm_push_data_heap (svm);
+         /* Poison the old registration */
+         memset (regp, 0xF1, sizeof (*regp));
+         clib_mem_free (regp);
+         pthread_mutex_unlock (&svm->mutex);
+         svm_pop_heap (oldheap);
+         /*
+          * These messages must be freed manually, since they're set up
+          * as "bounce" messages. In the private_registration == 1 case,
+          * we kill the shared-memory segment which contains the message
+          * with munmap.
+          */
+         vl_msg_api_free (mp);
+       }
     }
   else
     {
@@ -392,10 +426,54 @@ out:
   vl_msg_api_send_shmem (q, (u8 *) & rmp);
 }
 
-#define foreach_vlib_api_msg                    \
-_(MEMCLNT_CREATE, memclnt_create)               \
-_(MEMCLNT_DELETE, memclnt_delete)               \
-_(GET_FIRST_MSG_ID, get_first_msg_id)
+/**
+ * client answered a ping, stave off the grim reaper...
+ */
+
+void
+  vl_api_memclnt_keepalive_reply_t_handler
+  (vl_api_memclnt_keepalive_reply_t * mp)
+{
+  vl_api_registration_t *regp;
+  vlib_main_t *vm = vlib_get_main ();
+
+  regp = vl_api_client_index_to_registration (mp->context);
+  if (regp)
+    {
+      regp->last_heard = vlib_time_now (vm);
+      regp->unanswered_pings = 0;
+    }
+  else
+    clib_warning ("BUG: anonymous memclnt_keepalive_reply");
+}
+
+/**
+ * We can send ourselves these messages if someone uses the
+ * builtin binary api test tool...
+ */
+static void
+vl_api_memclnt_keepalive_t_handler (vl_api_memclnt_keepalive_t * mp)
+{
+  vl_api_memclnt_keepalive_reply_t *rmp;
+  api_main_t *am;
+  vl_shmem_hdr_t *shmem_hdr;
+
+  am = &api_main;
+  shmem_hdr = am->shmem_hdr;
+
+  rmp = vl_msg_api_alloc_as_if_client (sizeof (*rmp));
+  memset (rmp, 0, sizeof (*rmp));
+  rmp->_vl_msg_id = ntohs (VL_API_MEMCLNT_KEEPALIVE_REPLY);
+  rmp->context = mp->context;
+  vl_msg_api_send_shmem (shmem_hdr->vl_input_queue, (u8 *) & rmp);
+}
+
+#define foreach_vlib_api_msg                            \
+_(MEMCLNT_CREATE, memclnt_create)                       \
+_(MEMCLNT_DELETE, memclnt_delete)                       \
+_(GET_FIRST_MSG_ID, get_first_msg_id)                   \
+_(MEMCLNT_KEEPALIVE, memclnt_keepalive)                 \
+_(MEMCLNT_KEEPALIVE_REPLY, memclnt_keepalive_reply)
 
 /*
  * vl_api_init
@@ -404,6 +482,7 @@ static int
 memory_api_init (const char *region_name)
 {
   int rv;
+  api_main_t *am = &api_main;
   vl_msg_api_msg_config_t cfg;
   vl_msg_api_msg_config_t *c = &cfg;
 
@@ -428,6 +507,13 @@ memory_api_init (const char *region_name)
   foreach_vlib_api_msg;
 #undef _
 
+  /*
+   * special-case freeing of memclnt_delete messages, so we can
+   * simply munmap pairwise / private API segments...
+   */
+  am->message_bounce[VL_API_MEMCLNT_DELETE] = 1;
+  am->is_mp_safe[VL_API_MEMCLNT_KEEPALIVE_REPLY] = 1;
+
   return 0;
 }
 
@@ -474,6 +560,203 @@ send_one_plugin_msg_ids_msg (u8 * name, u16 first_msg_id, u16 last_msg_id)
   vl_msg_api_send_shmem (q, (u8 *) & mp);
 }
 
+static void
+send_memclnt_keepalive (vl_api_registration_t * regp, f64 now)
+{
+  vl_api_memclnt_keepalive_t *mp;
+  unix_shared_memory_queue_t *q;
+  api_main_t *am = &api_main;
+  svm_region_t *save_vlib_rp = am->vlib_rp;
+  vl_shmem_hdr_t *save_shmem_hdr = am->shmem_hdr;
+
+  q = regp->vl_input_queue;
+
+  /*
+   * If the queue head is moving, assume that the client is processing
+   * messages and skip the ping. This heuristic may fail if the queue
+   * is in the same position as last time, net of wrapping; in which
+   * case, the client will receive a keepalive.
+   */
+  if (regp->last_queue_head != q->head)
+    {
+      regp->last_heard = now;
+      regp->unanswered_pings = 0;
+      regp->last_queue_head = q->head;
+      return;
+    }
+
+  /*
+   * push/pop shared memory segment, so this routine
+   * will work with "normal" as well as "private segment"
+   * memory clients..
+   */
+
+  am->vlib_rp = regp->vlib_rp;
+  am->shmem_hdr = regp->shmem_hdr;
+
+  mp = vl_msg_api_alloc (sizeof (*mp));
+  memset (mp, 0, sizeof (*mp));
+  mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_MEMCLNT_KEEPALIVE);
+  mp->context = mp->client_index =
+    vl_msg_api_handle_from_index_and_epoch
+    (regp->vl_api_registration_pool_index,
+     am->shmem_hdr->application_restarts);
+
+  regp->unanswered_pings++;
+
+  /* Failure-to-send due to a stuffed queue is absolutely expected */
+  if (unix_shared_memory_queue_add (q, (u8 *) & mp, 1 /* nowait */ ))
+    vl_msg_api_free (mp);
+
+  am->vlib_rp = save_vlib_rp;
+  am->shmem_hdr = save_shmem_hdr;
+}
+
+static void
+dead_client_scan (api_main_t * am, vl_shmem_hdr_t * shm, f64 now)
+{
+
+  vl_api_registration_t **regpp;
+  vl_api_registration_t *regp;
+  static u32 *dead_indices;
+  static u32 *confused_indices;
+
+  vec_reset_length (dead_indices);
+  vec_reset_length (confused_indices);
+
+  /* *INDENT-OFF* */
+  pool_foreach (regpp, am->vl_clients,
+  ({
+    regp = *regpp;
+    if (regp)
+      {
+        /* If we haven't heard from this client recently... */
+        if (regp->last_heard < (now - 10.0))
+          {
+            if (regp->unanswered_pings == 2)
+              {
+                unix_shared_memory_queue_t *q;
+                q = regp->vl_input_queue;
+                if (kill (q->consumer_pid, 0) >=0)
+                  {
+                    clib_warning ("REAPER: lazy binary API client '%s'",
+                                  regp->name);
+                    regp->unanswered_pings = 0;
+                    regp->last_heard = now;
+                  }
+                else
+                  {
+                    clib_warning ("REAPER: binary API client '%s' died",
+                                  regp->name);
+                    vec_add1(dead_indices, regpp - am->vl_clients);
+                  }
+              }
+            else
+              send_memclnt_keepalive (regp, now);
+          }
+        else
+          regp->unanswered_pings = 0;
+      }
+    else
+      {
+        clib_warning ("NULL client registration index %d",
+                      regpp - am->vl_clients);
+        vec_add1 (confused_indices, regpp - am->vl_clients);
+      }
+  }));
+  /* *INDENT-ON* */
+  /* This should "never happen," but if it does, fix it... */
+  if (PREDICT_FALSE (vec_len (confused_indices) > 0))
+    {
+      int i;
+      for (i = 0; i < vec_len (confused_indices); i++)
+       {
+         pool_put_index (am->vl_clients, confused_indices[i]);
+       }
+    }
+
+  if (PREDICT_FALSE (vec_len (dead_indices) > 0))
+    {
+      int i;
+      svm_region_t *svm;
+      void *oldheap;
+
+      /* Allow the application to clean up its registrations */
+      for (i = 0; i < vec_len (dead_indices); i++)
+       {
+         regpp = pool_elt_at_index (am->vl_clients, dead_indices[i]);
+         if (regpp)
+           {
+             u32 handle;
+
+             handle = vl_msg_api_handle_from_index_and_epoch
+               (dead_indices[i], shm->application_restarts);
+             (void) call_reaper_functions (handle);
+           }
+       }
+
+      svm = am->vlib_rp;
+      pthread_mutex_lock (&svm->mutex);
+      oldheap = svm_push_data_heap (svm);
+
+      for (i = 0; i < vec_len (dead_indices); i++)
+       {
+         regpp = pool_elt_at_index (am->vl_clients, dead_indices[i]);
+         if (regpp)
+           {
+             /* Is this a pairwise SVM segment? */
+             if ((*regpp)->vlib_rp != svm)
+               {
+                 int i;
+                 svm_region_t *dead_rp = (*regpp)->vlib_rp;
+                 /* Note: account for the memfd header page */
+                 u64 virtual_base = dead_rp->virtual_base - MMAP_PAGESIZE;
+                 u64 virtual_size = dead_rp->virtual_size + MMAP_PAGESIZE;
+
+                 /* For horizontal scaling, add a hash table... */
+                 for (i = 0; i < vec_len (am->vlib_private_rps); i++)
+                   if (am->vlib_private_rps[i] == dead_rp)
+                     {
+                       vec_delete (am->vlib_private_rps, 1, i);
+                       goto found;
+                     }
+                 clib_warning ("private rp %llx AWOL", dead_rp);
+
+               found:
+                 /* Kill it, accounting for the memfd header page */
+                 if (munmap ((void *) virtual_base, virtual_size) < 0)
+                   clib_unix_warning ("munmap");
+                 /* Reset the queue-length-address cache */
+                 vec_reset_length (vl_api_queue_cursizes);
+               }
+             else
+               {
+                 /* Poison the old registration */
+                 memset (*regpp, 0xF3, sizeof (**regpp));
+                 clib_mem_free (*regpp);
+               }
+             /* no dangling references, please */
+             *regpp = 0;
+           }
+         else
+           {
+             svm_pop_heap (oldheap);
+             clib_warning ("Duplicate free, client index %d",
+                           regpp - am->vl_clients);
+             oldheap = svm_push_data_heap (svm);
+           }
+       }
+
+      svm_client_scan_this_region_nolock (am->vlib_rp);
+
+      pthread_mutex_unlock (&svm->mutex);
+      svm_pop_heap (oldheap);
+      for (i = 0; i < vec_len (dead_indices); i++)
+       pool_put_index (am->vl_clients, dead_indices[i]);
+    }
+}
+
+
 static uword
 memclnt_process (vlib_main_t * vm,
                 vlib_node_runtime_t * node, vlib_frame_t * f)
@@ -487,17 +770,29 @@ memclnt_process (vlib_main_t * vm,
   f64 dead_client_scan_time;
   f64 sleep_time, start_time;
   f64 vector_rate;
+  clib_error_t *socksvr_api_init (vlib_main_t * vm);
+  clib_error_t *error;
   int i;
-  u8 *serialized_message_table = 0;
-  svm_region_t *svm;
-  void *oldheap;
+  vl_socket_args_for_process_t *a;
+  uword event_type;
+  uword *event_data = 0;
+  int private_segment_rotor = 0;
+  svm_region_t *vlib_rp;
+  f64 now;
 
   vlib_set_queue_signal_callback (vm, memclnt_queue_callback);
 
   if ((rv = memory_api_init (am->region_name)) < 0)
     {
-      clib_warning ("memory_api_init returned %d, wait for godot...", rv);
-      vlib_process_suspend (vm, 1e70);
+      clib_warning ("memory_api_init returned %d, quitting...", rv);
+      return 0;
+    }
+
+  if ((error = socksvr_api_init (vm)))
+    {
+      clib_error_report (error);
+      clib_warning ("socksvr_api_init failed, quitting...");
+      return 0;
     }
 
   shm = am->shmem_hdr;
@@ -510,8 +805,8 @@ memclnt_process (vlib_main_t * vm,
   if (e)
     clib_error_report (e);
 
-  sleep_time = 20.0;
-  dead_client_scan_time = vlib_time_now (vm) + 20.0;
+  sleep_time = 10.0;
+  dead_client_scan_time = vlib_time_now (vm) + 10.0;
 
   /*
    * Send plugin message range messages for each plugin we loaded
@@ -523,20 +818,6 @@ memclnt_process (vlib_main_t * vm,
                                   rp->last_msg_id);
     }
 
-  /*
-   * Snapshoot the api message table.
-   */
-  serialized_message_table = vl_api_serialize_message_table (am, 0);
-
-  svm = am->vlib_rp;
-  pthread_mutex_lock (&svm->mutex);
-  oldheap = svm_push_data_heap (svm);
-
-  am->serialized_message_table_in_shmem = vec_dup (serialized_message_table);
-
-  pthread_mutex_unlock (&svm->mutex);
-  svm_pop_heap (oldheap);
-
   /*
    * Save the api message table snapshot, if configured
    */
@@ -544,6 +825,11 @@ memclnt_process (vlib_main_t * vm,
     {
       int fd, rv;
       u8 *chroot_file;
+      u8 *serialized_message_table;
+
+      /*
+       * Snapshoot the api message table.
+       */
       if (strstr ((char *) am->save_msg_table_filename, "..")
          || index ((char *) am->save_msg_table_filename, '/'))
        {
@@ -561,6 +847,9 @@ memclnt_process (vlib_main_t * vm,
          clib_unix_warning ("creat");
          goto skip_save;
        }
+
+      serialized_message_table = vl_api_serialize_message_table (am, 0);
+
       rv = write (fd, serialized_message_table,
                  vec_len (serialized_message_table));
 
@@ -572,15 +861,14 @@ memclnt_process (vlib_main_t * vm,
        clib_unix_warning ("close");
 
       vec_free (chroot_file);
+      vec_free (serialized_message_table);
     }
 
 skip_save:
-  vec_free (serialized_message_table);
 
   /* $$$ pay attention to frame size, control CPU usage */
   while (1)
     {
-      uword event_type __attribute__ ((unused));
       i8 *headp;
       int need_broadcast;
 
@@ -665,104 +953,89 @@ skip_save:
            }
        }
 
-      event_type = vlib_process_wait_for_event_or_clock (vm, sleep_time);
-      vm->queue_signal_pending = 0;
-      vlib_process_get_events (vm, 0 /* event_data */ );
-
-      if (vlib_time_now (vm) > dead_client_scan_time)
+      /*
+       * see if we have any private api shared-memory segments
+       * If so, push required context variables, and process
+       * a message.
+       */
+      if (PREDICT_FALSE (vec_len (am->vlib_private_rps)))
        {
-         vl_api_registration_t **regpp;
-         vl_api_registration_t *regp;
-         unix_shared_memory_queue_t *q;
-         static u32 *dead_indices;
-         static u32 *confused_indices;
+         unix_shared_memory_queue_t *save_vlib_input_queue = q;
+         vl_shmem_hdr_t *save_shmem_hdr = am->shmem_hdr;
+         svm_region_t *save_vlib_rp = am->vlib_rp;
 
-         vec_reset_length (dead_indices);
-         vec_reset_length (confused_indices);
+         vlib_rp = am->vlib_rp = am->vlib_private_rps[private_segment_rotor];
 
-          /* *INDENT-OFF* */
-          pool_foreach (regpp, am->vl_clients,
-          ({
-            regp = *regpp;
-            if (regp)
-              {
-                q = regp->vl_input_queue;
-                if (kill (q->consumer_pid, 0) < 0)
-                  {
-                    vec_add1(dead_indices, regpp - am->vl_clients);
-                  }
-              }
-            else
-              {
-                clib_warning ("NULL client registration index %d",
-                              regpp - am->vl_clients);
-                vec_add1 (confused_indices, regpp - am->vl_clients);
-              }
-          }));
-          /* *INDENT-ON* */
-         /* This should "never happen," but if it does, fix it... */
-         if (PREDICT_FALSE (vec_len (confused_indices) > 0))
-           {
-             int i;
-             for (i = 0; i < vec_len (confused_indices); i++)
-               {
-                 pool_put_index (am->vl_clients, confused_indices[i]);
-               }
-           }
+         am->shmem_hdr = (void *) vlib_rp->user_ctx;
+         q = am->shmem_hdr->vl_input_queue;
 
-         if (PREDICT_FALSE (vec_len (dead_indices) > 0))
+         pthread_mutex_lock (&q->mutex);
+         if (q->cursize > 0)
            {
-             int i;
-             svm_region_t *svm;
-             void *oldheap;
+             headp = (i8 *) (q->data + sizeof (uword) * q->head);
+             clib_memcpy (&mp, headp, sizeof (uword));
 
-             /* Allow the application to clean up its registrations */
-             for (i = 0; i < vec_len (dead_indices); i++)
-               {
-                 regpp = pool_elt_at_index (am->vl_clients, dead_indices[i]);
-                 if (regpp)
-                   {
-                     u32 handle;
-
-                     handle = vl_msg_api_handle_from_index_and_epoch
-                       (dead_indices[i], shm->application_restarts);
-                     (void) call_reaper_functions (handle);
-                   }
-               }
+             q->head++;
+             need_broadcast = (q->cursize == q->maxsize / 2);
+             q->cursize--;
 
-             svm = am->vlib_rp;
-             pthread_mutex_lock (&svm->mutex);
-             oldheap = svm_push_data_heap (svm);
+             if (PREDICT_FALSE (q->head == q->maxsize))
+               q->head = 0;
+             pthread_mutex_unlock (&q->mutex);
+             if (need_broadcast)
+               (void) pthread_cond_broadcast (&q->condvar);
 
-             for (i = 0; i < vec_len (dead_indices); i++)
-               {
-                 regpp = pool_elt_at_index (am->vl_clients, dead_indices[i]);
-                 if (regpp)
-                   {
-                     /* Poison the old registration */
-                     memset (*regpp, 0xF3, sizeof (**regpp));
-                     clib_mem_free (*regpp);
-                     /* no dangling references, please */
-                     *regpp = 0;
-                   }
-                 else
-                   {
-                     svm_pop_heap (oldheap);
-                     clib_warning ("Duplicate free, client index %d",
-                                   regpp - am->vl_clients);
-                     oldheap = svm_push_data_heap (svm);
-                   }
-               }
+             pthread_mutex_unlock (&q->mutex);
 
-             svm_client_scan_this_region_nolock (am->vlib_rp);
+             vl_msg_api_handler_with_vm_node (am, (void *) mp, vm, node);
+           }
+         else
+           pthread_mutex_unlock (&q->mutex);
 
-             pthread_mutex_unlock (&svm->mutex);
-             svm_pop_heap (oldheap);
-             for (i = 0; i < vec_len (dead_indices); i++)
-               pool_put_index (am->vl_clients, dead_indices[i]);
+         q = save_vlib_input_queue;
+         am->shmem_hdr = save_shmem_hdr;
+         am->vlib_rp = save_vlib_rp;
+
+         private_segment_rotor++;
+         if (private_segment_rotor >= vec_len (am->vlib_private_rps))
+           private_segment_rotor = 0;
+       }
+
+      vlib_process_wait_for_event_or_clock (vm, sleep_time);
+      vec_reset_length (event_data);
+      event_type = vlib_process_get_events (vm, &event_data);
+      now = vlib_time_now (vm);
+
+      switch (event_type)
+       {
+       case QUEUE_SIGNAL_EVENT:
+         vm->queue_signal_pending = 0;
+         break;
+
+       case SOCKET_READ_EVENT:
+         for (i = 0; i < vec_len (event_data); i++)
+           {
+             a = pool_elt_at_index (socket_main.process_args, event_data[i]);
+             vl_api_socket_process_msg (a->clib_file, a->regp,
+                                        (i8 *) a->data);
+             vec_free (a->data);
+             pool_put (socket_main.process_args, a);
            }
+         break;
 
-         dead_client_scan_time = vlib_time_now (vm) + 20.0;
+         /* Timeout... */
+       case -1:
+         break;
+
+       default:
+         clib_warning ("unknown event type %d", event_type);
+         break;
+       }
+
+      if (now > dead_client_scan_time)
+       {
+         dead_client_scan (am, shm, now);
+         dead_client_scan_time = vlib_time_now (vm) + 10.0;
        }
 
       if (TRACE_VLIB_MEMORY_QUEUE)
@@ -785,11 +1058,12 @@ skip_save:
   return 0;
 }
 /* *INDENT-OFF* */
-VLIB_REGISTER_NODE (memclnt_node,static) = {
-    .function = memclnt_process,
-    .type = VLIB_NODE_TYPE_PROCESS,
-    .name = "api-rx-from-ring",
-    .state = VLIB_NODE_STATE_DISABLED,
+VLIB_REGISTER_NODE (memclnt_node) =
+{
+  .function = memclnt_process,
+  .type = VLIB_NODE_TYPE_PROCESS,
+  .name = "api-rx-from-ring",
+  .state = VLIB_NODE_STATE_DISABLED,
 };
 /* *INDENT-ON* */
 
@@ -865,14 +1139,17 @@ VLIB_CLI_COMMAND (cli_clear_api_histogram_command, static) =
 };
 /* *INDENT-ON* */
 
+volatile int **vl_api_queue_cursizes;
+
 static void
 memclnt_queue_callback (vlib_main_t * vm)
 {
-  static volatile int *cursizep;
+  int i;
+  api_main_t *am = &api_main;
 
-  if (PREDICT_FALSE (cursizep == 0))
+  if (PREDICT_FALSE (vec_len (vl_api_queue_cursizes) !=
+                    1 + vec_len (am->vlib_private_rps)))
     {
-      api_main_t *am = &api_main;
       vl_shmem_hdr_t *shmem_hdr = am->shmem_hdr;
       unix_shared_memory_queue_t *q;
 
@@ -882,15 +1159,30 @@ memclnt_queue_callback (vlib_main_t * vm)
       q = shmem_hdr->vl_input_queue;
       if (q == 0)
        return;
-      cursizep = &q->cursize;
+
+      vec_add1 (vl_api_queue_cursizes, &q->cursize);
+
+      for (i = 0; i < vec_len (am->vlib_private_rps); i++)
+       {
+         svm_region_t *vlib_rp = am->vlib_private_rps[i];
+
+         shmem_hdr = (void *) vlib_rp->user_ctx;
+         q = shmem_hdr->vl_input_queue;
+         vec_add1 (vl_api_queue_cursizes, &q->cursize);
+       }
     }
 
-  if (*cursizep >= 1)
+  for (i = 0; i < vec_len (vl_api_queue_cursizes); i++)
     {
-      vm->queue_signal_pending = 1;
-      vm->api_queue_nonempty = 1;
-      vlib_process_signal_event (vm, memclnt_node.index,
-                                /* event_type */ 0, /* event_data */ 0);
+      if (*vl_api_queue_cursizes[i])
+       {
+         vm->queue_signal_pending = 1;
+         vm->api_queue_nonempty = 1;
+         vlib_process_signal_event (vm, memclnt_node.index,
+                                    /* event_type */ QUEUE_SIGNAL_EVENT,
+                                    /* event_data */ 0);
+         break;
+       }
     }
 }
 
@@ -971,13 +1263,55 @@ setup_memclnt_exit (vlib_main_t * vm)
 
 VLIB_INIT_FUNCTION (setup_memclnt_exit);
 
+u8 *
+format_api_message_rings (u8 * s, va_list * args)
+{
+  api_main_t *am = va_arg (*args, api_main_t *);
+  vl_shmem_hdr_t *shmem_hdr = va_arg (*args, vl_shmem_hdr_t *);
+  int main_segment = va_arg (*args, int);
+  ring_alloc_t *ap;
+  int i;
+
+  if (shmem_hdr == 0)
+    return format (s, "%8s %8s %8s %8s %8s\n",
+                  "Owner", "Size", "Nitems", "Hits", "Misses");
+
+  ap = shmem_hdr->vl_rings;
+
+  for (i = 0; i < vec_len (shmem_hdr->vl_rings); i++)
+    {
+      s = format (s, "%8s %8d %8d %8d %8d\n",
+                 "vlib", ap->size, ap->nitems, ap->hits, ap->misses);
+      ap++;
+    }
+
+  ap = shmem_hdr->client_rings;
+
+  for (i = 0; i < vec_len (shmem_hdr->client_rings); i++)
+    {
+      s = format (s, "%8s %8d %8d %8d %8d\n",
+                 "clnt", ap->size, ap->nitems, ap->hits, ap->misses);
+      ap++;
+    }
+
+  if (main_segment)
+    {
+      s = format (s, "%d ring miss fallback allocations\n", am->ring_misses);
+      s = format
+       (s,
+        "%d application restarts, %d reclaimed msgs, %d garbage collects\n",
+        shmem_hdr->application_restarts, shmem_hdr->restart_reclaims,
+        shmem_hdr->garbage_collects);
+    }
+  return s;
+}
+
 
 static clib_error_t *
 vl_api_ring_command (vlib_main_t * vm,
                     unformat_input_t * input, vlib_cli_command_t * cli_cmd)
 {
   int i;
-  ring_alloc_t *ap;
   vl_shmem_hdr_t *shmem_hdr;
   api_main_t *am = &api_main;
 
@@ -989,34 +1323,38 @@ vl_api_ring_command (vlib_main_t * vm,
       return 0;
     }
 
-  vlib_cli_output (vm, "%8s %8s %8s %8s %8s\n",
-                  "Owner", "Size", "Nitems", "Hits", "Misses");
-
-  ap = shmem_hdr->vl_rings;
+  vlib_cli_output (vm, "Main API segment rings:");
 
-  for (i = 0; i < vec_len (shmem_hdr->vl_rings); i++)
-    {
-      vlib_cli_output (vm, "%8s %8d %8d %8d %8d\n",
-                      "vlib", ap->size, ap->nitems, ap->hits, ap->misses);
-      ap++;
-    }
+  vlib_cli_output (vm, "%U", format_api_message_rings, am,
+                  0 /* print header */ , 0 /* notused */ );
 
-  ap = shmem_hdr->client_rings;
+  vlib_cli_output (vm, "%U", format_api_message_rings, am,
+                  shmem_hdr, 1 /* main segment */ );
 
-  for (i = 0; i < vec_len (shmem_hdr->client_rings); i++)
+  for (i = 0; i < vec_len (am->vlib_private_rps); i++)
     {
-      vlib_cli_output (vm, "%8s %8d %8d %8d %8d\n",
-                      "clnt", ap->size, ap->nitems, ap->hits, ap->misses);
-      ap++;
+      svm_region_t *vlib_rp = am->vlib_private_rps[i];
+      shmem_hdr = (void *) vlib_rp->user_ctx;
+      vl_api_registration_t **regpp;
+      vl_api_registration_t *regp;
+
+      /* For horizontal scaling, add a hash table... */
+      /* *INDENT-OFF* */
+      pool_foreach (regpp, am->vl_clients,
+      ({
+        regp = *regpp;
+        if (regp && regp->vlib_rp == vlib_rp)
+          {
+            vlib_cli_output (vm, "%s segment rings:", regp->name);
+            goto found;
+          }
+      }));
+      /* *INDENT-ON* */
+    found:
+      vlib_cli_output (vm, "%U", format_api_message_rings, am,
+                      shmem_hdr, 0 /* main segment */ );
     }
 
-  vlib_cli_output (vm, "%d ring miss fallback allocations\n",
-                  am->ring_misses);
-
-  vlib_cli_output
-    (vm, "%d application restarts, %d reclaimed msgs, %d garbage collects\n",
-     shmem_hdr->application_restarts,
-     shmem_hdr->restart_reclaims, shmem_hdr->garbage_collects);
   return 0;
 }
 
@@ -1051,15 +1389,13 @@ vl_api_client_command (vlib_main_t * vm,
 
     if (regp)
       {
-        q = regp->vl_input_queue;
-        if (kill (q->consumer_pid, 0) < 0)
-          {
-            health = "DEAD";
-          }
+        if (regp->unanswered_pings > 0)
+          health = "questionable";
         else
-          {
-            health = "alive";
-          }
+          health = "OK";
+
+        q = regp->vl_input_queue;
+
         vlib_cli_output (vm, "%16s %8d %14d 0x%016llx %s\n",
                          regp->name, q->consumer_pid, q->cursize,
                          q, health);
@@ -1306,6 +1642,7 @@ vlibmemory_init (vlib_main_t * vm)
 {
   api_main_t *am = &api_main;
   svm_map_region_args_t _a, *a = &_a;
+  clib_error_t *error;
 
   memset (a, 0, sizeof (*a));
   a->root_path = am->root_path;
@@ -1321,7 +1658,10 @@ vlibmemory_init (vlib_main_t * vm)
      0) ? am->global_pvt_heap_size : SVM_PVT_MHEAP_SIZE;
 
   svm_region_init_args (a);
-  return 0;
+
+  error = vlib_call_init_function (vm, vlibsocket_init);
+
+  return error;
 }
 
 VLIB_INIT_FUNCTION (vlibmemory_init);
@@ -2227,7 +2567,7 @@ dump_api_table_file_command_fn (vlib_main_t * vm,
 
   /* Load the serialized message table from the table dump */
 
-  error = unserialize_open_unix_file (sm, (char *) filename);
+  error = unserialize_open_clib_file (sm, (char *) filename);
 
   if (error)
     return error;
@@ -2251,7 +2591,7 @@ dump_api_table_file_command_fn (vlib_main_t * vm,
   if (compare_current)
     {
       /* Append the current message table */
-      u8 *tblv = vec_dup (am->serialized_message_table_in_shmem);
+      u8 *tblv = vl_api_serialize_message_table (am, 0);
 
       serialize_open_vector (sm, tblv);
       unserialize_integer (sm, &nmsgs, sizeof (u32));
@@ -2268,6 +2608,7 @@ dump_api_table_file_command_fn (vlib_main_t * vm,
          item->crc = extract_crc (name_and_crc);
          item->which = 1;      /* current_image */
        }
+      vec_free (tblv);
     }
 
   /* Sort the table. */