API / CLI event-log tracing 73/15073/2
authorDave Barach <dave@barachs.net>
Mon, 1 Oct 2018 13:25:32 +0000 (09:25 -0400)
committerDamjan Marion <dmarion@me.com>
Mon, 1 Oct 2018 15:59:35 +0000 (15:59 +0000)
Add an "elog trace [api][cli][barrier]" debug CLI command. Removed the
barrier elog test command. Remove unused reliable multicast code.

Change-Id: Ib3ecde901b7c49fe92b313d0087cd7e776adcdce
Signed-off-by: Dave Barach <dave@barachs.net>
15 files changed:
src/vlib/CMakeLists.txt
src/vlib/buffer_funcs.h
src/vlib/cli.c
src/vlib/main.h
src/vlib/mc.c [deleted file]
src/vlib/mc.h [deleted file]
src/vlib/threads.c
src/vlib/threads.h
src/vlib/threads_cli.c
src/vlib/vlib.h
src/vlibapi/api_shared.c
src/vnet/interface.c
src/vnet/ip/ip6.h
src/vnet/srp/interface.c
src/vnet/srp/node.c

index b187f98..72c73f3 100644 (file)
@@ -43,7 +43,6 @@ add_vpp_library(vlib
   linux/vfio.c
   log.c
   main.c
-  mc.c
   node.c
   node_cli.c
   node_format.c
@@ -55,7 +54,6 @@ add_vpp_library(vlib
   unix/cli.c
   unix/input.c
   unix/main.c
-  unix/mc_socket.c
   unix/plugin.c
   unix/util.c
 
@@ -77,7 +75,6 @@ add_vpp_library(vlib
   linux/vfio.h
   log.h
   main.h
-  mc.h
   node_funcs.h
   node.h
   pci/pci_config.h
index 5306af6..d8abdf3 100644 (file)
@@ -41,6 +41,7 @@
 #define included_vlib_buffer_funcs_h
 
 #include <vppinfra/hash.h>
+#include <vppinfra/fifo.h>
 
 /** \file
     vlib buffer access methods.
index ef02d27..d7b2a46 100644 (file)
@@ -40,6 +40,7 @@
 #include <vlib/vlib.h>
 #include <vlib/unix/unix.h>
 #include <vppinfra/cpu.h>
+#include <vppinfra/elog.h>
 #include <unistd.h>
 #include <ctype.h>
 
@@ -583,6 +584,23 @@ vlib_cli_dispatch_sub_commands (vlib_main_t * vm,
            }
          else
            {
+             if (PREDICT_FALSE (vm->elog_trace_cli_commands))
+               {
+                  /* *INDENT-OFF* */
+                  ELOG_TYPE_DECLARE (e) =
+                    {
+                      .format = "cli-cmd: %s",
+                      .format_args = "T4",
+                    };
+                  /* *INDENT-ON* */
+                 struct
+                 {
+                   u32 c;
+                 } *ed;
+                 ed = ELOG_DATA (&vm->elog_main, e);
+                 ed->c = elog_global_id_for_msg_name (c->path);
+               }
+
              if (!c->is_mp_safe)
                vlib_worker_thread_barrier_sync (vm);
 
@@ -591,6 +609,32 @@ vlib_cli_dispatch_sub_commands (vlib_main_t * vm,
              if (!c->is_mp_safe)
                vlib_worker_thread_barrier_release (vm);
 
+             if (PREDICT_FALSE (vm->elog_trace_cli_commands))
+               {
+                  /* *INDENT-OFF* */
+                  ELOG_TYPE_DECLARE (e) =
+                    {
+                      .format = "cli-cmd: %s %s",
+                      .format_args = "T4T4",
+                    };
+                  /* *INDENT-ON* */
+                 struct
+                 {
+                   u32 c, err;
+                 } *ed;
+                 ed = ELOG_DATA (&vm->elog_main, e);
+                 ed->c = elog_global_id_for_msg_name (c->path);
+                 if (c_error)
+                   {
+                     vec_add1 (c_error->what, 0);
+                     ed->err = elog_global_id_for_msg_name
+                       ((const char *) c_error->what);
+                     _vec_len (c_error->what) -= 1;
+                   }
+                 else
+                   ed->err = elog_global_id_for_msg_name ("OK");
+               }
+
              if (c_error)
                {
                  error =
@@ -1415,6 +1459,75 @@ VLIB_CLI_COMMAND (show_cli_command, static) = {
 };
 /* *INDENT-ON* */
 
+static clib_error_t *
+elog_trace_command_fn (vlib_main_t * vm,
+                      unformat_input_t * input, vlib_cli_command_t * cmd)
+{
+  unformat_input_t _line_input, *line_input = &_line_input;
+  int enable = 1;
+  int api = 0, cli = 0, barrier = 0;
+
+  if (!unformat_user (input, unformat_line_input, line_input))
+    goto print_status;
+
+  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (line_input, "api"))
+       api = 1;
+      else if (unformat (line_input, "cli"))
+       cli = 1;
+      else if (unformat (line_input, "barrier"))
+       barrier = 1;
+      else if (unformat (line_input, "disable"))
+       enable = 0;
+      else if (unformat (line_input, "enable"))
+       enable = 1;
+      else
+       break;
+    }
+  unformat_free (line_input);
+
+  vm->elog_trace_api_messages = api ? enable : vm->elog_trace_api_messages;
+  vm->elog_trace_cli_commands = cli ? enable : vm->elog_trace_cli_commands;
+  vlib_worker_threads->barrier_elog_enabled =
+    barrier ? enable : vlib_worker_threads->barrier_elog_enabled;
+
+print_status:
+  vlib_cli_output (vm, "Current status:");
+
+  vlib_cli_output
+    (vm, "    Event log API message trace: %s\n    CLI command trace: %s",
+     vm->elog_trace_api_messages ? "on" : "off",
+     vm->elog_trace_cli_commands ? "on" : "off");
+  vlib_cli_output
+    (vm, "    Barrier sync trace: %s",
+     vlib_worker_threads->barrier_elog_enabled ? "on" : "off");
+
+  return 0;
+}
+
+/*?
+ * Control event logging of api, cli, and thread barrier events
+ * With no arguments, displays the current trace status.
+ * Name the event groups you wish to trace or stop tracing.
+ *
+ * @cliexpar
+ * @clistart
+ * elog trace api cli barrier
+ * elog trace api cli barrier disable
+ * elog trace
+ * @cliend
+ * @cliexcmd{elog trace [api][cli][barrier][disable]}
+?*/
+/* *INDENT-OFF* */
+VLIB_CLI_COMMAND (elog_trace_command, static) =
+{
+  .path = "elog trace",
+  .short_help = "elog trace [api][cli][barrier][disable]",
+  .function = elog_trace_command_fn,
+};
+/* *INDENT-ON* */
+
 static clib_error_t *
 vlib_cli_init (vlib_main_t * vm)
 {
index d21227a..ce42b6e 100644 (file)
@@ -148,9 +148,6 @@ typedef struct vlib_main_t
                         struct vlib_node_runtime_t * node,
                         vlib_frame_t * frame);
 
-  /* Multicast distribution.  Set to zero for MC disabled. */
-  mc_main_t *mc_main;
-
   /* Stream index to use for distribution when MC is enabled. */
   u32 mc_stream_index;
 
@@ -159,6 +156,10 @@ typedef struct vlib_main_t
   /* Event logger. */
   elog_main_t elog_main;
 
+  /* Event logger trace flags */
+  int elog_trace_api_messages;
+  int elog_trace_cli_commands;
+
   /* Node call and return event types. */
   elog_event_type_t *node_call_elog_event_types;
   elog_event_type_t *node_return_elog_event_types;
@@ -184,7 +185,6 @@ typedef struct vlib_main_t
   _vlib_init_function_list_elt_t *main_loop_exit_function_registrations;
   _vlib_init_function_list_elt_t *api_init_function_registrations;
   vlib_config_function_runtime_t *config_function_registrations;
-  mc_serialize_msg_t *mc_msg_registrations;    /* mc_main is a pointer... */
 
   /* control-plane API queue signal pending, length indication */
   volatile u32 queue_signal_pending;
diff --git a/src/vlib/mc.c b/src/vlib/mc.c
deleted file mode 100644 (file)
index a289871..0000000
+++ /dev/null
@@ -1,2609 +0,0 @@
-/*
- * mc.c: vlib reliable sequenced multicast distributed applications
- *
- * Copyright (c) 2010 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <vlib/vlib.h>
-
-/*
- * 1 to enable msg id training wheels, which are useful for tracking
- * down catchup and/or partitioned network problems
- */
-#define MSG_ID_DEBUG 0
-
-static format_function_t format_mc_stream_state;
-
-static u32
-elog_id_for_peer_id (mc_main_t * m, u64 peer_id)
-{
-  uword *p, r;
-  mhash_t *h = &m->elog_id_by_peer_id;
-
-  if (!m->elog_id_by_peer_id.hash)
-    mhash_init (h, sizeof (uword), sizeof (mc_peer_id_t));
-
-  p = mhash_get (h, &peer_id);
-  if (p)
-    return p[0];
-  r = elog_string (m->elog_main, "%U", m->transport.format_peer_id, peer_id);
-  mhash_set (h, &peer_id, r, /* old_value */ 0);
-  return r;
-}
-
-static u32
-elog_id_for_msg_name (mc_main_t * m, char *msg_name)
-{
-  uword *p, r;
-  uword *h = m->elog_id_by_msg_name;
-  u8 *name_copy;
-
-  if (!h)
-    h = m->elog_id_by_msg_name = hash_create_string (0, sizeof (uword));
-
-  p = hash_get_mem (h, msg_name);
-  if (p)
-    return p[0];
-  r = elog_string (m->elog_main, "%s", msg_name);
-
-  name_copy = format (0, "%s%c", msg_name, 0);
-
-  hash_set_mem (h, name_copy, r);
-  m->elog_id_by_msg_name = h;
-
-  return r;
-}
-
-static void
-elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence,
-            u32 retry_count)
-{
-  if (MC_EVENT_LOGGING > 0)
-    {
-      /* *INDENT-OFF* */
-      ELOG_TYPE_DECLARE (e) =
-        {
-          .format = "tx-msg: stream %d local seq %d attempt %d",
-          .format_args = "i4i4i4",
-        };
-      /* *INDENT-ON* */
-      struct
-      {
-       u32 stream_id, local_sequence, retry_count;
-      } *ed;
-      ed = ELOG_DATA (m->elog_main, e);
-      ed->stream_id = stream_id;
-      ed->local_sequence = local_sequence;
-      ed->retry_count = retry_count;
-    }
-}
-
-/*
- * seq_cmp
- * correctly compare two unsigned sequence numbers.
- * This function works so long as x and y are within 2**(n-1) of each
- * other, where n = bits(x, y).
- *
- * Magic decoder ring:
- * seq_cmp == 0 => x and y are equal
- * seq_cmp < 0 => x is "in the past" with respect to y
- * seq_cmp > 0 => x is "in the future" with respect to y
- */
-always_inline i32
-mc_seq_cmp (u32 x, u32 y)
-{
-  return (i32) x - (i32) y;
-}
-
-void *
-mc_get_vlib_buffer (vlib_main_t * vm, u32 n_bytes, u32 * bi_return)
-{
-  u32 n_alloc, bi = 0;
-  vlib_buffer_t *b;
-
-  n_alloc = vlib_buffer_alloc (vm, &bi, 1);
-  ASSERT (n_alloc == 1);
-
-  b = vlib_get_buffer (vm, bi);
-  b->current_length = n_bytes;
-  *bi_return = bi;
-  return (void *) b->data;
-}
-
-static void
-delete_peer_with_index (mc_main_t * mcm, mc_stream_t * s,
-                       uword index, int notify_application)
-{
-  mc_stream_peer_t *p = pool_elt_at_index (s->peers, index);
-  ASSERT (p != 0);
-  if (s->config.peer_died && notify_application)
-    s->config.peer_died (mcm, s, p->id);
-
-  s->all_peer_bitmap = clib_bitmap_andnoti (s->all_peer_bitmap, p - s->peers);
-
-  if (MC_EVENT_LOGGING > 0)
-    {
-      /* *INDENT-OFF* */
-      ELOG_TYPE_DECLARE (e) =
-        {
-          .format = "delete peer %s from all_peer_bitmap",
-          .format_args = "T4",
-        };
-      /* *INDENT-ON* */
-      struct
-      {
-       u32 peer;
-      } *ed = 0;
-
-      ed = ELOG_DATA (mcm->elog_main, e);
-      ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
-    }
-  /* Do not delete the pool / hash table entries, or we lose sequence number state */
-}
-
-static mc_stream_peer_t *
-get_or_create_peer_with_id (mc_main_t * mcm,
-                           mc_stream_t * s, mc_peer_id_t id, int *created)
-{
-  uword *q = mhash_get (&s->peer_index_by_id, &id);
-  mc_stream_peer_t *p;
-
-  if (q)
-    {
-      p = pool_elt_at_index (s->peers, q[0]);
-      goto done;
-    }
-
-  pool_get (s->peers, p);
-  memset (p, 0, sizeof (p[0]));
-  p->id = id;
-  p->last_sequence_received = ~0;
-  mhash_set (&s->peer_index_by_id, &id, p - s->peers, /* old_value */ 0);
-  if (created)
-    *created = 1;
-
-done:
-  if (MC_EVENT_LOGGING > 0)
-    {
-      /* *INDENT-OFF* */
-      ELOG_TYPE_DECLARE (e) =
-        {
-          .format = "get_or_create %s peer %s stream %d seq %d",
-          .format_args = "t4T4i4i4",
-          .n_enum_strings = 2,
-          .enum_strings = {
-            "old", "new",
-          },
-        };
-      /* *INDENT-ON* */
-      struct
-      {
-       u32 is_new, peer, stream_index, rx_sequence;
-      } *ed = 0;
-
-      ed = ELOG_DATA (mcm->elog_main, e);
-      ed->is_new = q ? 0 : 1;
-      ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
-      ed->stream_index = s->index;
-      ed->rx_sequence = p->last_sequence_received;
-    }
-  /* $$$$ Enable or reenable this peer */
-  s->all_peer_bitmap = clib_bitmap_ori (s->all_peer_bitmap, p - s->peers);
-  return p;
-}
-
-static void
-maybe_send_window_open_event (vlib_main_t * vm, mc_stream_t * stream)
-{
-  vlib_one_time_waiting_process_t *p;
-
-  if (pool_elts (stream->retry_pool) >= stream->config.window_size)
-    return;
-
-  vec_foreach (p, stream->procs_waiting_for_open_window)
-    vlib_signal_one_time_waiting_process (vm, p);
-
-  if (stream->procs_waiting_for_open_window)
-    _vec_len (stream->procs_waiting_for_open_window) = 0;
-}
-
-static void
-mc_retry_free (mc_main_t * mcm, mc_stream_t * s, mc_retry_t * r)
-{
-  mc_retry_t record, *retp;
-
-  if (r->unacked_by_peer_bitmap)
-    _vec_len (r->unacked_by_peer_bitmap) = 0;
-
-  if (clib_fifo_elts (s->retired_fifo) >= 2 * s->config.window_size)
-    {
-      clib_fifo_sub1 (s->retired_fifo, record);
-      vlib_buffer_free_one (mcm->vlib_main, record.buffer_index);
-    }
-
-  clib_fifo_add2 (s->retired_fifo, retp);
-
-  retp->buffer_index = r->buffer_index;
-  retp->local_sequence = r->local_sequence;
-
-  r->buffer_index = ~0;                /* poison buffer index in this retry */
-}
-
-static void
-mc_resend_retired (mc_main_t * mcm, mc_stream_t * s, u32 local_sequence)
-{
-  mc_retry_t *retry;
-
-  if (MC_EVENT_LOGGING > 0)
-    {
-      /* *INDENT-OFF* */
-      ELOG_TYPE_DECLARE (e) =
-        {
-          .format = "resend-retired: search for local seq %d",
-          .format_args = "i4",
-        };
-      /* *INDENT-ON* */
-      struct
-      {
-       u32 local_sequence;
-      } *ed;
-      ed = ELOG_DATA (mcm->elog_main, e);
-      ed->local_sequence = local_sequence;
-    }
-
-  /* *INDENT-OFF* */
-  clib_fifo_foreach (retry, s->retired_fifo,
-  ({
-    if (retry->local_sequence == local_sequence)
-      {
-        elog_tx_msg (mcm, s->index, retry-> local_sequence, -13);
-        mcm->transport.tx_buffer (mcm->transport.opaque,
-                                  MC_TRANSPORT_USER_REQUEST_TO_RELAY,
-                                  retry->buffer_index);
-        return;
-      }
-  }));
-  /* *INDENT-ON* */
-
-  if (MC_EVENT_LOGGING > 0)
-    {
-      /* *INDENT-OFF* */
-      ELOG_TYPE_DECLARE (e) =
-        {
-          .format = "resend-retired: FAILED search for local seq %d",
-          .format_args = "i4",
-        };
-      /* *INDENT-ON* */
-      struct
-      {
-       u32 local_sequence;
-      } *ed;
-      ed = ELOG_DATA (mcm->elog_main, e);
-      ed->local_sequence = local_sequence;
-    }
-}
-
-static uword *
-delete_retry_fifo_elt (mc_main_t * mcm,
-                      mc_stream_t * stream,
-                      mc_retry_t * r, uword * dead_peer_bitmap)
-{
-  mc_stream_peer_t *p;
-
-  /* *INDENT-OFF* */
-  pool_foreach (p, stream->peers, ({
-    uword pi = p - stream->peers;
-    uword is_alive = 0 == clib_bitmap_get (r->unacked_by_peer_bitmap, pi);
-
-    if (! is_alive)
-      dead_peer_bitmap = clib_bitmap_ori (dead_peer_bitmap, pi);
-
-    if (MC_EVENT_LOGGING > 0)
-      {
-        ELOG_TYPE_DECLARE (e) = {
-          .format = "delete_retry_fifo_elt: peer %s is %s",
-          .format_args = "T4t4",
-          .n_enum_strings = 2,
-          .enum_strings = { "alive", "dead", },
-        };
-        struct { u32 peer, is_alive; } * ed;
-        ed = ELOG_DATA (mcm->elog_main, e);
-        ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
-        ed->is_alive = is_alive;
-      }
-  }));
-  /* *INDENT-ON* */
-
-  hash_unset (stream->retry_index_by_local_sequence, r->local_sequence);
-  mc_retry_free (mcm, stream, r);
-
-  return dead_peer_bitmap;
-}
-
-always_inline mc_retry_t *
-prev_retry (mc_stream_t * s, mc_retry_t * r)
-{
-  return (r->prev_index != ~0
-         ? pool_elt_at_index (s->retry_pool, r->prev_index) : 0);
-}
-
-always_inline mc_retry_t *
-next_retry (mc_stream_t * s, mc_retry_t * r)
-{
-  return (r->next_index != ~0
-         ? pool_elt_at_index (s->retry_pool, r->next_index) : 0);
-}
-
-always_inline void
-remove_retry_from_pool (mc_stream_t * s, mc_retry_t * r)
-{
-  mc_retry_t *p = prev_retry (s, r);
-  mc_retry_t *n = next_retry (s, r);
-
-  if (p)
-    p->next_index = r->next_index;
-  else
-    s->retry_head_index = r->next_index;
-  if (n)
-    n->prev_index = r->prev_index;
-  else
-    s->retry_tail_index = r->prev_index;
-
-  pool_put_index (s->retry_pool, r - s->retry_pool);
-}
-
-static void
-check_retry (mc_main_t * mcm, mc_stream_t * s)
-{
-  mc_retry_t *r;
-  vlib_main_t *vm = mcm->vlib_main;
-  f64 now = vlib_time_now (vm);
-  uword *dead_peer_bitmap = 0;
-  u32 ri, ri_next;
-
-  for (ri = s->retry_head_index; ri != ~0; ri = ri_next)
-    {
-      r = pool_elt_at_index (s->retry_pool, ri);
-      ri_next = r->next_index;
-
-      if (now < r->sent_at + s->config.retry_interval)
-       continue;
-
-      r->n_retries += 1;
-      if (r->n_retries > s->config.retry_limit)
-       {
-         dead_peer_bitmap =
-           delete_retry_fifo_elt (mcm, s, r, dead_peer_bitmap);
-         remove_retry_from_pool (s, r);
-       }
-      else
-       {
-         if (MC_EVENT_LOGGING > 0)
-           {
-             mc_stream_peer_t *p;
-
-              /* *INDENT-OFF* */
-             ELOG_TYPE_DECLARE (t) =
-                {
-                  .format = "resend local seq %d attempt %d",
-                  .format_args = "i4i4",
-                };
-              /* *INDENT-ON* */
-
-              /* *INDENT-OFF* */
-             pool_foreach (p, s->peers, ({
-               if (clib_bitmap_get (r->unacked_by_peer_bitmap, p - s->peers))
-                 {
-                   ELOG_TYPE_DECLARE (ev) = {
-                     .format = "resend: needed by peer %s local seq %d",
-                     .format_args = "T4i4",
-                   };
-                   struct { u32 peer, rx_sequence; } * ed;
-                   ed = ELOG_DATA (mcm->elog_main, ev);
-                   ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
-                   ed->rx_sequence = r->local_sequence;
-                 }
-             }));
-              /* *INDENT-ON* */
-
-             struct
-             {
-               u32 sequence;
-               u32 trail;
-             } *ed;
-             ed = ELOG_DATA (mcm->elog_main, t);
-             ed->sequence = r->local_sequence;
-             ed->trail = r->n_retries;
-           }
-
-         r->sent_at = vlib_time_now (vm);
-         s->stats.n_retries += 1;
-
-         elog_tx_msg (mcm, s->index, r->local_sequence, r->n_retries);
-
-         mcm->transport.tx_buffer
-           (mcm->transport.opaque,
-            MC_TRANSPORT_USER_REQUEST_TO_RELAY, r->buffer_index);
-       }
-    }
-
-  maybe_send_window_open_event (mcm->vlib_main, s);
-
-  /* Delete any dead peers we've found. */
-  if (!clib_bitmap_is_zero (dead_peer_bitmap))
-    {
-      uword i;
-
-      /* *INDENT-OFF* */
-      clib_bitmap_foreach (i, dead_peer_bitmap, ({
-       delete_peer_with_index (mcm, s, i, /* notify_application */ 1);
-
-       /* Delete any references to just deleted peer in retry pool. */
-       pool_foreach (r, s->retry_pool, ({
-         r->unacked_by_peer_bitmap =
-           clib_bitmap_andnoti (r->unacked_by_peer_bitmap, i);
-       }));
-      }));
-/* *INDENT-ON* */
-      clib_bitmap_free (dead_peer_bitmap);
-    }
-}
-
-always_inline mc_main_t *
-mc_node_get_main (vlib_node_runtime_t * node)
-{
-  mc_main_t **p = (void *) node->runtime_data;
-  return p[0];
-}
-
-static uword
-mc_retry_process (vlib_main_t * vm,
-                 vlib_node_runtime_t * node, vlib_frame_t * f)
-{
-  mc_main_t *mcm = mc_node_get_main (node);
-  mc_stream_t *s;
-
-  while (1)
-    {
-      vlib_process_suspend (vm, 1.0);
-      vec_foreach (s, mcm->stream_vector)
-      {
-       if (s->state != MC_STREAM_STATE_invalid)
-         check_retry (mcm, s);
-      }
-    }
-  return 0;                    /* not likely */
-}
-
-static void
-send_join_or_leave_request (mc_main_t * mcm, u32 stream_index, u32 is_join)
-{
-  vlib_main_t *vm = mcm->vlib_main;
-  mc_msg_join_or_leave_request_t *mp;
-  u32 bi;
-
-  mp = mc_get_vlib_buffer (vm, sizeof (mp[0]), &bi);
-  memset (mp, 0, sizeof (*mp));
-  mp->type = MC_MSG_TYPE_join_or_leave_request;
-  mp->peer_id = mcm->transport.our_ack_peer_id;
-  mp->stream_index = stream_index;
-  mp->is_join = is_join;
-
-  mc_byte_swap_msg_join_or_leave_request (mp);
-
-  /*
-   * These msgs are unnumbered, unordered so send on the from-relay
-   * channel.
-   */
-  mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi);
-}
-
-static uword
-mc_join_ager_process (vlib_main_t * vm,
-                     vlib_node_runtime_t * node, vlib_frame_t * f)
-{
-  mc_main_t *mcm = mc_node_get_main (node);
-
-  while (1)
-    {
-      if (mcm->joins_in_progress)
-       {
-         mc_stream_t *s;
-         vlib_one_time_waiting_process_t *p;
-         f64 now = vlib_time_now (vm);
-
-         vec_foreach (s, mcm->stream_vector)
-         {
-           if (s->state != MC_STREAM_STATE_join_in_progress)
-             continue;
-
-           if (now > s->join_timeout)
-             {
-               s->state = MC_STREAM_STATE_ready;
-
-               if (MC_EVENT_LOGGING > 0)
-                 {
-                    /* *INDENT-OFF* */
-                   ELOG_TYPE_DECLARE (e) =
-                      {
-                        .format = "stream %d join timeout",
-                      };
-                    /* *INDENT-ON* */
-                   ELOG (mcm->elog_main, e, s->index);
-                 }
-               /* Make sure that this app instance exists as a stream peer,
-                  or we may answer a catchup request with a NULL
-                  all_peer_bitmap... */
-               (void) get_or_create_peer_with_id
-                 (mcm, s, mcm->transport.our_ack_peer_id, /* created */ 0);
-
-               vec_foreach (p, s->procs_waiting_for_join_done)
-                 vlib_signal_one_time_waiting_process (vm, p);
-               if (s->procs_waiting_for_join_done)
-                 _vec_len (s->procs_waiting_for_join_done) = 0;
-
-               mcm->joins_in_progress--;
-               ASSERT (mcm->joins_in_progress >= 0);
-             }
-           else
-             {
-               /* Resent join request which may have been lost. */
-               send_join_or_leave_request (mcm, s->index, 1 /* is_join */ );
-
-               /* We're *not* alone, retry for as long as it takes */
-               if (mcm->relay_state == MC_RELAY_STATE_SLAVE)
-                 s->join_timeout = vlib_time_now (vm) + 2.0;
-
-
-               if (MC_EVENT_LOGGING > 0)
-                 {
-                    /* *INDENT-OFF* */
-                   ELOG_TYPE_DECLARE (e) =
-                      {
-                        .format = "stream %d resend join request",
-                      };
-                    /* *INDENT-ON* */
-                   ELOG (mcm->elog_main, e, s->index);
-                 }
-             }
-         }
-       }
-
-      vlib_process_suspend (vm, .5);
-    }
-
-  return 0;                    /* not likely */
-}
-
-static void
-serialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
-{
-  char *name = va_arg (*va, char *);
-  serialize_cstring (m, name);
-}
-
-static void
-elog_stream_name (char *buf, int n_buf_bytes, char *v)
-{
-  clib_memcpy (buf, v, clib_min (n_buf_bytes - 1, vec_len (v)));
-  buf[n_buf_bytes - 1] = 0;
-}
-
-static void
-unserialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
-{
-  mc_main_t *mcm = va_arg (*va, mc_main_t *);
-  char *name;
-  mc_stream_t *s;
-  uword *p;
-
-  unserialize_cstring (m, &name);
-
-  if ((p = hash_get_mem (mcm->stream_index_by_name, name)))
-    {
-      if (MC_EVENT_LOGGING > 0)
-       {
-          /* *INDENT-OFF* */
-         ELOG_TYPE_DECLARE (e) =
-            {
-              .format = "stream index %d already named %s",
-              .format_args = "i4s16",
-            };
-          /* *INDENT-ON* */
-         struct
-         {
-           u32 stream_index;
-           char name[16];
-         } *ed;
-         ed = ELOG_DATA (mcm->elog_main, e);
-         ed->stream_index = p[0];
-         elog_stream_name (ed->name, sizeof (ed->name), name);
-       }
-
-      vec_free (name);
-      return;
-    }
-
-  vec_add2 (mcm->stream_vector, s, 1);
-  mc_stream_init (s);
-  s->state = MC_STREAM_STATE_name_known;
-  s->index = s - mcm->stream_vector;
-  s->config.name = name;
-
-  if (MC_EVENT_LOGGING > 0)
-    {
-      /* *INDENT-OFF* */
-      ELOG_TYPE_DECLARE (e) =
-        {
-          .format = "stream index %d named %s",
-          .format_args = "i4s16",
-        };
-      /* *INDENT-ON* */
-      struct
-      {
-       u32 stream_index;
-       char name[16];
-      } *ed;
-      ed = ELOG_DATA (mcm->elog_main, e);
-      ed->stream_index = s->index;
-      elog_stream_name (ed->name, sizeof (ed->name), name);
-    }
-
-  hash_set_mem (mcm->stream_index_by_name, name, s->index);
-
-  p = hash_get (mcm->procs_waiting_for_stream_name_by_name, name);
-  if (p)
-    {
-      vlib_one_time_waiting_process_t *wp, **w;
-      w = pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, p[0]);
-      vec_foreach (wp, w[0])
-       vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
-      pool_put (mcm->procs_waiting_for_stream_name_pool, w);
-      hash_unset_mem (mcm->procs_waiting_for_stream_name_by_name, name);
-    }
-}
-
-/* *INDENT-OFF* */
-MC_SERIALIZE_MSG (mc_register_stream_name_msg, static) =
-{
-  .name = "mc_register_stream_name",
-  .serialize = serialize_mc_register_stream_name,
-  .unserialize = unserialize_mc_register_stream_name,
-};
-/* *INDENT-ON* */
-
-void
-mc_rx_buffer_unserialize (mc_main_t * mcm,
-                         mc_stream_t * stream,
-                         mc_peer_id_t peer_id, u32 buffer_index)
-{
-  return mc_unserialize (mcm, stream, buffer_index);
-}
-
-static u8 *
-mc_internal_catchup_snapshot (mc_main_t * mcm,
-                             u8 * data_vector,
-                             u32 last_global_sequence_processed)
-{
-  serialize_main_t m;
-
-  /* Append serialized data to data vector. */
-  serialize_open_vector (&m, data_vector);
-  m.stream.current_buffer_index = vec_len (data_vector);
-
-  serialize (&m, serialize_mc_main, mcm);
-  return serialize_close_vector (&m);
-}
-
-static void
-mc_internal_catchup (mc_main_t * mcm, u8 * data, u32 n_data_bytes)
-{
-  serialize_main_t s;
-
-  unserialize_open_data (&s, data, n_data_bytes);
-
-  unserialize (&s, unserialize_mc_main, mcm);
-}
-
-/* Overridden from the application layer, not actually used here */
-void mc_stream_join_process_hold (void) __attribute__ ((weak));
-void
-mc_stream_join_process_hold (void)
-{
-}
-
-static u32
-mc_stream_join_helper (mc_main_t * mcm,
-                      mc_stream_config_t * config, u32 is_internal)
-{
-  mc_stream_t *s;
-  vlib_main_t *vm = mcm->vlib_main;
-
-  s = 0;
-  if (!is_internal)
-    {
-      uword *p;
-
-      /* Already have a stream with given name? */
-      if ((s = mc_stream_by_name (mcm, config->name)))
-       {
-         /* Already joined and ready? */
-         if (s->state == MC_STREAM_STATE_ready)
-           return s->index;
-       }
-
-      /* First join MC internal stream. */
-      if (!mcm->stream_vector
-         || (mcm->stream_vector[MC_STREAM_INDEX_INTERNAL].state
-             == MC_STREAM_STATE_invalid))
-       {
-         static mc_stream_config_t c = {
-           .name = "mc-internal",
-           .rx_buffer = mc_rx_buffer_unserialize,
-           .catchup = mc_internal_catchup,
-           .catchup_snapshot = mc_internal_catchup_snapshot,
-         };
-
-         c.save_snapshot = config->save_snapshot;
-
-         mc_stream_join_helper (mcm, &c, /* is_internal */ 1);
-       }
-
-      /* If stream is still unknown register this name and wait for
-         sequenced message to name stream.  This way all peers agree
-         on stream name to index mappings. */
-      s = mc_stream_by_name (mcm, config->name);
-      if (!s)
-       {
-         vlib_one_time_waiting_process_t *wp, **w;
-         u8 *name_copy = format (0, "%s", config->name);
-
-         mc_serialize_stream (mcm,
-                              MC_STREAM_INDEX_INTERNAL,
-                              &mc_register_stream_name_msg, config->name);
-
-         /* Wait for this stream to be named. */
-         p =
-           hash_get_mem (mcm->procs_waiting_for_stream_name_by_name,
-                         name_copy);
-         if (p)
-           w =
-             pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool,
-                                p[0]);
-         else
-           {
-             pool_get (mcm->procs_waiting_for_stream_name_pool, w);
-             if (!mcm->procs_waiting_for_stream_name_by_name)
-               mcm->procs_waiting_for_stream_name_by_name = hash_create_string ( /* elts */ 0, /* value size */
-                                                                                sizeof
-                                                                                (uword));
-             hash_set_mem (mcm->procs_waiting_for_stream_name_by_name,
-                           name_copy,
-                           w - mcm->procs_waiting_for_stream_name_pool);
-             w[0] = 0;
-           }
-
-         vec_add2 (w[0], wp, 1);
-         vlib_current_process_wait_for_one_time_event (vm, wp);
-         vec_free (name_copy);
-       }
-
-      /* Name should be known now. */
-      s = mc_stream_by_name (mcm, config->name);
-      ASSERT (s != 0);
-      ASSERT (s->state == MC_STREAM_STATE_name_known);
-    }
-
-  if (!s)
-    {
-      vec_add2 (mcm->stream_vector, s, 1);
-      mc_stream_init (s);
-      s->index = s - mcm->stream_vector;
-    }
-
-  {
-    /* Save name since we could have already used it as hash key. */
-    char *name_save = s->config.name;
-
-    s->config = config[0];
-
-    if (name_save)
-      s->config.name = name_save;
-  }
-
-  if (s->config.window_size == 0)
-    s->config.window_size = 8;
-
-  if (s->config.retry_interval == 0.0)
-    s->config.retry_interval = 1.0;
-
-  /* Sanity. */
-  ASSERT (s->config.retry_interval < 30);
-
-  if (s->config.retry_limit == 0)
-    s->config.retry_limit = 7;
-
-  s->state = MC_STREAM_STATE_join_in_progress;
-  if (!s->peer_index_by_id.hash)
-    mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
-
-  /* If we don't hear from someone in 5 seconds, we're alone */
-  s->join_timeout = vlib_time_now (vm) + 5.0;
-  mcm->joins_in_progress++;
-
-  if (MC_EVENT_LOGGING > 0)
-    {
-      /* *INDENT-OFF* */
-      ELOG_TYPE_DECLARE (e) =
-      {
-        .format = "stream index %d join request %s",
-        .format_args = "i4s16",
-      };
-      /* *INDENT-ON* */
-      struct
-      {
-       u32 stream_index;
-       char name[16];
-      } *ed;
-      ed = ELOG_DATA (mcm->elog_main, e);
-      ed->stream_index = s->index;
-      elog_stream_name (ed->name, sizeof (ed->name), s->config.name);
-    }
-
-  send_join_or_leave_request (mcm, s->index, 1 /* join */ );
-
-  vlib_current_process_wait_for_one_time_event_vector
-    (vm, &s->procs_waiting_for_join_done);
-
-  if (MC_EVENT_LOGGING)
-    {
-      ELOG_TYPE (e, "join complete stream %d");
-      ELOG (mcm->elog_main, e, s->index);
-    }
-
-  return s->index;
-}
-
-u32
-mc_stream_join (mc_main_t * mcm, mc_stream_config_t * config)
-{
-  return mc_stream_join_helper (mcm, config, /* is_internal */ 0);
-}
-
-void
-mc_stream_leave (mc_main_t * mcm, u32 stream_index)
-{
-  mc_stream_t *s = mc_stream_by_index (mcm, stream_index);
-
-  if (!s)
-    return;
-
-  if (MC_EVENT_LOGGING)
-    {
-      /* *INDENT-OFF* */
-      ELOG_TYPE_DECLARE (t) =
-        {
-          .format = "leave-stream: %d",.format_args = "i4",
-        };
-      /* *INDENT-ON* */
-      struct
-      {
-       u32 index;
-      } *ed;
-      ed = ELOG_DATA (mcm->elog_main, t);
-      ed->index = stream_index;
-    }
-
-  send_join_or_leave_request (mcm, stream_index, 0 /* is_join */ );
-  mc_stream_free (s);
-  s->state = MC_STREAM_STATE_name_known;
-}
-
-void
-mc_msg_join_or_leave_request_handler (mc_main_t * mcm,
-                                     mc_msg_join_or_leave_request_t * req,
-                                     u32 buffer_index)
-{
-  mc_stream_t *s;
-  mc_msg_join_reply_t *rep;
-  u32 bi;
-
-  mc_byte_swap_msg_join_or_leave_request (req);
-
-  s = mc_stream_by_index (mcm, req->stream_index);
-  if (!s || s->state != MC_STREAM_STATE_ready)
-    return;
-
-  /* If the peer is joining, create it */
-  if (req->is_join)
-    {
-      mc_stream_t *this_s;
-
-      /* We're not in a position to catch up a peer until all
-         stream joins are complete. */
-      if (0)
-       {
-         /* XXX This is hard to test so we've. */
-         vec_foreach (this_s, mcm->stream_vector)
-         {
-           if (this_s->state != MC_STREAM_STATE_ready
-               && this_s->state != MC_STREAM_STATE_name_known)
-             return;
-         }
-       }
-      else if (mcm->joins_in_progress > 0)
-       return;
-
-      (void) get_or_create_peer_with_id (mcm, s, req->peer_id,
-                                        /* created */ 0);
-
-      rep = mc_get_vlib_buffer (mcm->vlib_main, sizeof (rep[0]), &bi);
-      memset (rep, 0, sizeof (rep[0]));
-      rep->type = MC_MSG_TYPE_join_reply;
-      rep->stream_index = req->stream_index;
-
-      mc_byte_swap_msg_join_reply (rep);
-      /* These two are already in network byte order... */
-      rep->peer_id = mcm->transport.our_ack_peer_id;
-      rep->catchup_peer_id = mcm->transport.our_catchup_peer_id;
-
-      mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi);
-    }
-  else
-    {
-      if (s->config.peer_died)
-       s->config.peer_died (mcm, s, req->peer_id);
-    }
-}
-
-void
-mc_msg_join_reply_handler (mc_main_t * mcm,
-                          mc_msg_join_reply_t * mp, u32 buffer_index)
-{
-  mc_stream_t *s;
-
-  mc_byte_swap_msg_join_reply (mp);
-
-  s = mc_stream_by_index (mcm, mp->stream_index);
-
-  if (!s || s->state != MC_STREAM_STATE_join_in_progress)
-    return;
-
-  /* Switch to catchup state; next join reply
-     for this stream will be ignored. */
-  s->state = MC_STREAM_STATE_catchup;
-
-  mcm->joins_in_progress--;
-  mcm->transport.catchup_request_fun (mcm->transport.opaque,
-                                     mp->stream_index, mp->catchup_peer_id);
-}
-
-void
-mc_wait_for_stream_ready (mc_main_t * m, char *stream_name)
-{
-  mc_stream_t *s;
-
-  while (1)
-    {
-      s = mc_stream_by_name (m, stream_name);
-      if (s)
-       break;
-      vlib_process_suspend (m->vlib_main, .1);
-    }
-
-  /* It's OK to send a message in catchup and ready states. */
-  if (s->state == MC_STREAM_STATE_catchup
-      || s->state == MC_STREAM_STATE_ready)
-    return;
-
-  /* Otherwise we are waiting for a join to finish. */
-  vlib_current_process_wait_for_one_time_event_vector
-    (m->vlib_main, &s->procs_waiting_for_join_done);
-}
-
-u32
-mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index)
-{
-  mc_stream_t *s = mc_stream_by_index (mcm, stream_index);
-  vlib_main_t *vm = mcm->vlib_main;
-  mc_retry_t *r;
-  mc_msg_user_request_t *mp;
-  vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index);
-  u32 ri;
-
-  if (!s)
-    return 0;
-
-  if (s->state != MC_STREAM_STATE_ready)
-    vlib_current_process_wait_for_one_time_event_vector
-      (vm, &s->procs_waiting_for_join_done);
-
-  while (pool_elts (s->retry_pool) >= s->config.window_size)
-    {
-      vlib_current_process_wait_for_one_time_event_vector
-       (vm, &s->procs_waiting_for_open_window);
-    }
-
-  pool_get (s->retry_pool, r);
-  ri = r - s->retry_pool;
-
-  r->prev_index = s->retry_tail_index;
-  r->next_index = ~0;
-  s->retry_tail_index = ri;
-
-  if (r->prev_index == ~0)
-    s->retry_head_index = ri;
-  else
-    {
-      mc_retry_t *p = pool_elt_at_index (s->retry_pool, r->prev_index);
-      p->next_index = ri;
-    }
-
-  vlib_buffer_advance (b, -sizeof (mp[0]));
-  mp = vlib_buffer_get_current (b);
-
-  mp->peer_id = mcm->transport.our_ack_peer_id;
-  /* mp->transport.global_sequence set by relay agent. */
-  mp->global_sequence = 0xdeadbeef;
-  mp->stream_index = s->index;
-  mp->local_sequence = s->our_local_sequence++;
-  mp->n_data_bytes =
-    vlib_buffer_index_length_in_chain (vm, buffer_index) - sizeof (mp[0]);
-
-  r->buffer_index = buffer_index;
-  r->local_sequence = mp->local_sequence;
-  r->sent_at = vlib_time_now (vm);
-  r->n_retries = 0;
-
-  /* Retry will be freed when all currently known peers have acked. */
-  vec_validate (r->unacked_by_peer_bitmap, vec_len (s->all_peer_bitmap) - 1);
-  vec_copy (r->unacked_by_peer_bitmap, s->all_peer_bitmap);
-
-  hash_set (s->retry_index_by_local_sequence, r->local_sequence,
-           r - s->retry_pool);
-
-  elog_tx_msg (mcm, s->index, mp->local_sequence, r->n_retries);
-
-  mc_byte_swap_msg_user_request (mp);
-
-  mcm->transport.tx_buffer (mcm->transport.opaque,
-                           MC_TRANSPORT_USER_REQUEST_TO_RELAY, buffer_index);
-
-  s->user_requests_sent++;
-
-  /* return amount of window remaining */
-  return s->config.window_size - pool_elts (s->retry_pool);
-}
-
-void
-mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp,
-                            u32 buffer_index)
-{
-  vlib_main_t *vm = mcm->vlib_main;
-  mc_stream_t *s;
-  mc_stream_peer_t *peer;
-  i32 seq_cmp_result;
-  static int once = 0;
-
-  mc_byte_swap_msg_user_request (mp);
-
-  s = mc_stream_by_index (mcm, mp->stream_index);
-
-  /* Not signed up for this stream? Turf-o-matic */
-  if (!s || s->state != MC_STREAM_STATE_ready)
-    {
-      vlib_buffer_free_one (vm, buffer_index);
-      return;
-    }
-
-  /* Find peer, including ourselves. */
-  peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
-                                    /* created */ 0);
-
-  seq_cmp_result = mc_seq_cmp (mp->local_sequence,
-                              peer->last_sequence_received + 1);
-
-  if (MC_EVENT_LOGGING > 0)
-    {
-      /* *INDENT-OFF* */
-      ELOG_TYPE_DECLARE (e) =
-        {
-          .format = "rx-msg: peer %s stream %d rx seq %d seq_cmp %d",
-          .format_args = "T4i4i4i4",
-        };
-      /* *INDENT-ON* */
-      struct
-      {
-       u32 peer, stream_index, rx_sequence;
-       i32 seq_cmp_result;
-      } *ed;
-      ed = ELOG_DATA (mcm->elog_main, e);
-      ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
-      ed->stream_index = mp->stream_index;
-      ed->rx_sequence = mp->local_sequence;
-      ed->seq_cmp_result = seq_cmp_result;
-    }
-
-  if (0 && mp->stream_index == 1 && once == 0)
-    {
-      once = 1;
-      ELOG_TYPE (e, "FAKE lost msg on stream 1");
-      ELOG (mcm->elog_main, e, 0);
-      return;
-    }
-
-  peer->last_sequence_received += seq_cmp_result == 0;
-  s->user_requests_received++;
-
-  if (seq_cmp_result > 0)
-    peer->stats.n_msgs_from_future += 1;
-
-  /* Send ack even if msg from future */
-  if (1)
-    {
-      mc_msg_user_ack_t *rp;
-      u32 bi;
-
-      rp = mc_get_vlib_buffer (vm, sizeof (rp[0]), &bi);
-      rp->peer_id = mcm->transport.our_ack_peer_id;
-      rp->stream_index = s->index;
-      rp->local_sequence = mp->local_sequence;
-      rp->seq_cmp_result = seq_cmp_result;
-
-      if (MC_EVENT_LOGGING > 0)
-       {
-          /* *INDENT-OFF* */
-         ELOG_TYPE_DECLARE (e) =
-            {
-              .format = "tx-ack: stream %d local seq %d",
-              .format_args = "i4i4",
-            };
-          /* *INDENT-ON* */
-         struct
-         {
-           u32 stream_index;
-           u32 local_sequence;
-         } *ed;
-         ed = ELOG_DATA (mcm->elog_main, e);
-         ed->stream_index = rp->stream_index;
-         ed->local_sequence = rp->local_sequence;
-       }
-
-      mc_byte_swap_msg_user_ack (rp);
-
-      mcm->transport.tx_ack (mcm->transport.opaque, mp->peer_id, bi);
-      /* Msg from past? If so, free the buffer... */
-      if (seq_cmp_result < 0)
-       {
-         vlib_buffer_free_one (vm, buffer_index);
-         peer->stats.n_msgs_from_past += 1;
-       }
-    }
-
-  if (seq_cmp_result == 0)
-    {
-      vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index);
-      switch (s->state)
-       {
-       case MC_STREAM_STATE_ready:
-         vlib_buffer_advance (b, sizeof (mp[0]));
-         s->config.rx_buffer (mcm, s, mp->peer_id, buffer_index);
-
-         /* Stream vector can change address via rx callback for mc-internal
-            stream. */
-         s = mc_stream_by_index (mcm, mp->stream_index);
-         ASSERT (s != 0);
-         s->last_global_sequence_processed = mp->global_sequence;
-         break;
-
-       case MC_STREAM_STATE_catchup:
-         clib_fifo_add1 (s->catchup_fifo, buffer_index);
-         break;
-
-       default:
-         clib_warning ("stream in unknown state %U",
-                       format_mc_stream_state, s->state);
-         break;
-       }
-    }
-}
-
-void
-mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp,
-                        u32 buffer_index)
-{
-  vlib_main_t *vm = mcm->vlib_main;
-  uword *p;
-  mc_stream_t *s;
-  mc_stream_peer_t *peer;
-  mc_retry_t *r;
-  int peer_created = 0;
-
-  mc_byte_swap_msg_user_ack (mp);
-
-  s = mc_stream_by_index (mcm, mp->stream_index);
-
-  if (MC_EVENT_LOGGING > 0)
-    {
-      /* *INDENT-OFF* */
-      ELOG_TYPE_DECLARE (t) =
-        {
-          .format = "rx-ack: local seq %d peer %s seq_cmp_result %d",
-          .format_args = "i4T4i4",
-        };
-      /* *INDENT-ON* */
-
-      struct
-      {
-       u32 local_sequence;
-       u32 peer;
-       i32 seq_cmp_result;
-      } *ed;
-      ed = ELOG_DATA (mcm->elog_main, t);
-      ed->local_sequence = mp->local_sequence;
-      ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
-      ed->seq_cmp_result = mp->seq_cmp_result;
-    }
-
-  /* Unknown stream? */
-  if (!s)
-    return;
-
-  /* Find the peer which just ack'ed. */
-  peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
-                                    /* created */ &peer_created);
-
-  /*
-   * Peer reports message from the future. If it's not in the retry
-   * fifo, look for a retired message.
-   */
-  if (mp->seq_cmp_result > 0)
-    {
-      p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence -
-                   mp->seq_cmp_result);
-      if (p == 0)
-       mc_resend_retired (mcm, s, mp->local_sequence - mp->seq_cmp_result);
-
-      /* Normal retry should fix it... */
-      return;
-    }
-
-  /*
-   * Pointer to the indicated retry fifo entry.
-   * Worth hashing because we could use a window size of 100 or 1000.
-   */
-  p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence);
-
-  /*
-   * Is this a duplicate ACK, received after we've retired the
-   * fifo entry. This can happen when learning about new
-   * peers.
-   */
-  if (p == 0)
-    {
-      if (MC_EVENT_LOGGING > 0)
-       {
-          /* *INDENT-OFF* */
-         ELOG_TYPE_DECLARE (t) =
-            {
-              .format = "ack: for seq %d from peer %s no fifo elt",
-              .format_args = "i4T4",
-            };
-          /* *INDENT-ON* */
-
-         struct
-         {
-           u32 seq;
-           u32 peer;
-         } *ed;
-         ed = ELOG_DATA (mcm->elog_main, t);
-         ed->seq = mp->local_sequence;
-         ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
-       }
-
-      return;
-    }
-
-  r = pool_elt_at_index (s->retry_pool, p[0]);
-
-  /* Make sure that this new peer ACKs our msgs from now on */
-  if (peer_created)
-    {
-      mc_retry_t *later_retry = next_retry (s, r);
-
-      while (later_retry)
-       {
-         later_retry->unacked_by_peer_bitmap =
-           clib_bitmap_ori (later_retry->unacked_by_peer_bitmap,
-                            peer - s->peers);
-         later_retry = next_retry (s, later_retry);
-       }
-    }
-
-  ASSERT (mp->local_sequence == r->local_sequence);
-
-  /* If we weren't expecting to hear from this peer */
-  if (!peer_created &&
-      !clib_bitmap_get (r->unacked_by_peer_bitmap, peer - s->peers))
-    {
-      if (MC_EVENT_LOGGING > 0)
-       {
-          /* *INDENT-OFF* */
-         ELOG_TYPE_DECLARE (t) =
-            {
-              .format = "dup-ack: for seq %d from peer %s",
-              .format_args = "i4T4",
-            };
-          /* *INDENT-ON* */
-         struct
-         {
-           u32 seq;
-           u32 peer;
-         } *ed;
-         ed = ELOG_DATA (mcm->elog_main, t);
-         ed->seq = r->local_sequence;
-         ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
-       }
-      if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
-       return;
-    }
-
-  if (MC_EVENT_LOGGING > 0)
-    {
-      /* *INDENT-OFF* */
-      ELOG_TYPE_DECLARE (t) =
-        {
-          .format = "ack: for seq %d from peer %s",
-          .format_args = "i4T4",
-        };
-      /* *INDENT-ON* */
-      struct
-      {
-       u32 seq;
-       u32 peer;
-      } *ed;
-      ed = ELOG_DATA (mcm->elog_main, t);
-      ed->seq = mp->local_sequence;
-      ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
-    }
-
-  r->unacked_by_peer_bitmap =
-    clib_bitmap_andnoti (r->unacked_by_peer_bitmap, peer - s->peers);
-
-  /* Not all clients have ack'ed */
-  if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
-    {
-      return;
-    }
-  if (MC_EVENT_LOGGING > 0)
-    {
-      /* *INDENT-OFF* */
-      ELOG_TYPE_DECLARE (t) =
-        {
-          .format = "ack: retire fifo elt loc seq %d after %d acks",
-          .format_args = "i4i4",
-        };
-      /* *INDENT-ON* */
-      struct
-      {
-       u32 seq;
-       u32 npeers;
-      } *ed;
-      ed = ELOG_DATA (mcm->elog_main, t);
-      ed->seq = r->local_sequence;
-      ed->npeers = pool_elts (s->peers);
-    }
-
-  hash_unset (s->retry_index_by_local_sequence, mp->local_sequence);
-  mc_retry_free (mcm, s, r);
-  remove_retry_from_pool (s, r);
-  maybe_send_window_open_event (vm, s);
-}
-
-#define EVENT_MC_SEND_CATCHUP_DATA 0
-
-static uword
-mc_catchup_process (vlib_main_t * vm,
-                   vlib_node_runtime_t * node, vlib_frame_t * f)
-{
-  mc_main_t *mcm = mc_node_get_main (node);
-  uword *event_data = 0;
-  mc_catchup_process_arg_t *args;
-  int i;
-
-  while (1)
-    {
-      if (event_data)
-       _vec_len (event_data) = 0;
-      vlib_process_wait_for_event_with_type (vm, &event_data,
-                                            EVENT_MC_SEND_CATCHUP_DATA);
-
-      for (i = 0; i < vec_len (event_data); i++)
-       {
-         args = pool_elt_at_index (mcm->catchup_process_args, event_data[i]);
-
-         mcm->transport.catchup_send_fun (mcm->transport.opaque,
-                                          args->catchup_opaque,
-                                          args->catchup_snapshot);
-
-         /* Send function will free snapshot data vector. */
-         pool_put (mcm->catchup_process_args, args);
-       }
-    }
-
-  return 0;                    /* not likely */
-}
-
-static void
-serialize_mc_stream (serialize_main_t * m, va_list * va)
-{
-  mc_stream_t *s = va_arg (*va, mc_stream_t *);
-  mc_stream_peer_t *p;
-
-  serialize_integer (m, pool_elts (s->peers), sizeof (u32));
-  /* *INDENT-OFF* */
-  pool_foreach (p, s->peers, ({
-    u8 * x = serialize_get (m, sizeof (p->id));
-    clib_memcpy (x, p->id.as_u8, sizeof (p->id));
-    serialize_integer (m, p->last_sequence_received,
-                       sizeof (p->last_sequence_received));
-  }));
-/* *INDENT-ON* */
-  serialize_bitmap (m, s->all_peer_bitmap);
-}
-
-void
-unserialize_mc_stream (serialize_main_t * m, va_list * va)
-{
-  mc_stream_t *s = va_arg (*va, mc_stream_t *);
-  u32 i, n_peers;
-  mc_stream_peer_t *p;
-
-  unserialize_integer (m, &n_peers, sizeof (u32));
-  mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
-  for (i = 0; i < n_peers; i++)
-    {
-      u8 *x;
-      pool_get (s->peers, p);
-      x = unserialize_get (m, sizeof (p->id));
-      clib_memcpy (p->id.as_u8, x, sizeof (p->id));
-      unserialize_integer (m, &p->last_sequence_received,
-                          sizeof (p->last_sequence_received));
-      mhash_set (&s->peer_index_by_id, &p->id, p - s->peers,   /* old_value */
-                0);
-    }
-  s->all_peer_bitmap = unserialize_bitmap (m);
-
-  /* This is really bad. */
-  if (!s->all_peer_bitmap)
-    clib_warning ("BUG: stream %s all_peer_bitmap NULL", s->config.name);
-}
-
-void
-mc_msg_catchup_request_handler (mc_main_t * mcm,
-                               mc_msg_catchup_request_t * req,
-                               u32 catchup_opaque)
-{
-  vlib_main_t *vm = mcm->vlib_main;
-  mc_stream_t *s;
-  mc_catchup_process_arg_t *args;
-
-  mc_byte_swap_msg_catchup_request (req);
-
-  s = mc_stream_by_index (mcm, req->stream_index);
-  if (!s || s->state != MC_STREAM_STATE_ready)
-    return;
-
-  if (MC_EVENT_LOGGING > 0)
-    {
-      /* *INDENT-OFF* */
-      ELOG_TYPE_DECLARE (t) =
-        {
-          .format = "catchup-request: from %s stream %d",
-          .format_args = "T4i4",
-        };
-      /* *INDENT-ON* */
-      struct
-      {
-       u32 peer, stream;
-      } *ed;
-      ed = ELOG_DATA (mcm->elog_main, t);
-      ed->peer = elog_id_for_peer_id (mcm, req->peer_id.as_u64);
-      ed->stream = req->stream_index;
-    }
-
-  /*
-   * The application has to snapshoot its data structures right
-   * here, right now. If we process any messages after
-   * noting the last global sequence we've processed, the client
-   * won't be able to accurately reconstruct our data structures.
-   *
-   * Once the data structures are e.g. vec_dup()'ed, we
-   * send the resulting messages from a separate process, to
-   * make sure that we don't cause a bunch of message retransmissions
-   */
-  pool_get (mcm->catchup_process_args, args);
-
-  args->stream_index = s - mcm->stream_vector;
-  args->catchup_opaque = catchup_opaque;
-  args->catchup_snapshot = 0;
-
-  /* Construct catchup reply and snapshot state for stream to send as
-     catchup reply payload. */
-  {
-    mc_msg_catchup_reply_t *rep;
-    serialize_main_t m;
-
-    vec_resize (args->catchup_snapshot, sizeof (rep[0]));
-
-    rep = (void *) args->catchup_snapshot;
-
-    rep->peer_id = req->peer_id;
-    rep->stream_index = req->stream_index;
-    rep->last_global_sequence_included = s->last_global_sequence_processed;
-
-    /* Setup for serialize to append to catchup snapshot. */
-    serialize_open_vector (&m, args->catchup_snapshot);
-    m.stream.current_buffer_index = vec_len (m.stream.buffer);
-
-    serialize (&m, serialize_mc_stream, s);
-
-    args->catchup_snapshot = serialize_close_vector (&m);
-
-    /* Actually copy internal state */
-    args->catchup_snapshot = s->config.catchup_snapshot
-      (mcm, args->catchup_snapshot, rep->last_global_sequence_included);
-
-    rep = (void *) args->catchup_snapshot;
-    rep->n_data_bytes = vec_len (args->catchup_snapshot) - sizeof (rep[0]);
-
-    mc_byte_swap_msg_catchup_reply (rep);
-  }
-
-  /* now go send it... */
-  vlib_process_signal_event (vm, mcm->catchup_process,
-                            EVENT_MC_SEND_CATCHUP_DATA,
-                            args - mcm->catchup_process_args);
-}
-
-#define EVENT_MC_UNSERIALIZE_BUFFER 0
-#define EVENT_MC_UNSERIALIZE_CATCHUP 1
-
-void
-mc_msg_catchup_reply_handler (mc_main_t * mcm, mc_msg_catchup_reply_t * mp,
-                             u32 catchup_opaque)
-{
-  vlib_process_signal_event (mcm->vlib_main,
-                            mcm->unserialize_process,
-                            EVENT_MC_UNSERIALIZE_CATCHUP,
-                            pointer_to_uword (mp));
-}
-
-static void
-perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp)
-{
-  mc_stream_t *s;
-  i32 seq_cmp_result;
-
-  mc_byte_swap_msg_catchup_reply (mp);
-
-  s = mc_stream_by_index (mcm, mp->stream_index);
-
-  /* Never heard of this stream or already caught up. */
-  if (!s || s->state == MC_STREAM_STATE_ready)
-    return;
-
-  {
-    serialize_main_t m;
-    mc_stream_peer_t *p;
-    u32 n_stream_bytes;
-
-    /* For offline sim replay: save the entire catchup snapshot... */
-    if (s->config.save_snapshot)
-      s->config.save_snapshot (mcm, /* is_catchup */ 1, mp->data,
-                              mp->n_data_bytes);
-
-    unserialize_open_data (&m, mp->data, mp->n_data_bytes);
-    unserialize (&m, unserialize_mc_stream, s);
-
-    /* Make sure we start numbering our messages as expected */
-    /* *INDENT-OFF* */
-    pool_foreach (p, s->peers, ({
-      if (p->id.as_u64 == mcm->transport.our_ack_peer_id.as_u64)
-        s->our_local_sequence = p->last_sequence_received + 1;
-    }));
-/* *INDENT-ON* */
-
-    n_stream_bytes = m.stream.current_buffer_index;
-
-    /* No need to unserialize close; nothing to free. */
-
-    /* After serialized stream is user's catchup data. */
-    s->config.catchup (mcm, mp->data + n_stream_bytes,
-                      mp->n_data_bytes - n_stream_bytes);
-  }
-
-  /* Vector could have been moved by catchup.
-     This can only happen for mc-internal stream. */
-  s = mc_stream_by_index (mcm, mp->stream_index);
-
-  s->last_global_sequence_processed = mp->last_global_sequence_included;
-
-  while (clib_fifo_elts (s->catchup_fifo))
-    {
-      mc_msg_user_request_t *gp;
-      u32 bi;
-      vlib_buffer_t *b;
-
-      clib_fifo_sub1 (s->catchup_fifo, bi);
-
-      b = vlib_get_buffer (mcm->vlib_main, bi);
-      gp = vlib_buffer_get_current (b);
-
-      /* Make sure we're replaying "new" news */
-      seq_cmp_result = mc_seq_cmp (gp->global_sequence,
-                                  mp->last_global_sequence_included);
-
-      if (seq_cmp_result > 0)
-       {
-         vlib_buffer_advance (b, sizeof (gp[0]));
-         s->config.rx_buffer (mcm, s, gp->peer_id, bi);
-         s->last_global_sequence_processed = gp->global_sequence;
-
-         if (MC_EVENT_LOGGING)
-           {
-              /* *INDENT-OFF* */
-             ELOG_TYPE_DECLARE (t) =
-                {
-                  .format = "catchup replay local sequence 0x%x",
-                  .format_args = "i4",
-                };
-              /* *INDENT-ON* */
-             struct
-             {
-               u32 local_sequence;
-             } *ed;
-             ed = ELOG_DATA (mcm->elog_main, t);
-             ed->local_sequence = gp->local_sequence;
-           }
-       }
-      else
-       {
-         if (MC_EVENT_LOGGING)
-           {
-              /* *INDENT-OFF* */
-             ELOG_TYPE_DECLARE (t) =
-                {
-                  .format = "catchup discard local sequence 0x%x",
-                  .format_args = "i4",
-                };
-              /* *INDENT-ON* */
-             struct
-             {
-               u32 local_sequence;
-             } *ed;
-             ed = ELOG_DATA (mcm->elog_main, t);
-             ed->local_sequence = gp->local_sequence;
-           }
-
-         vlib_buffer_free_one (mcm->vlib_main, bi);
-       }
-    }
-
-  s->state = MC_STREAM_STATE_ready;
-
-  /* Now that we are caught up wake up joining process. */
-  {
-    vlib_one_time_waiting_process_t *wp;
-    vec_foreach (wp, s->procs_waiting_for_join_done)
-      vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
-    if (s->procs_waiting_for_join_done)
-      _vec_len (s->procs_waiting_for_join_done) = 0;
-  }
-}
-
-static void
-this_node_maybe_master (mc_main_t * mcm)
-{
-  vlib_main_t *vm = mcm->vlib_main;
-  mc_msg_master_assert_t *mp;
-  uword event_type;
-  int timeouts = 0;
-  int is_master = mcm->relay_state == MC_RELAY_STATE_MASTER;
-  clib_error_t *error;
-  f64 now, time_last_master_assert = -1;
-  u32 bi;
-
-  while (1)
-    {
-      if (!mcm->we_can_be_relay_master)
-       {
-         mcm->relay_state = MC_RELAY_STATE_SLAVE;
-         if (MC_EVENT_LOGGING)
-           {
-             ELOG_TYPE (e, "become slave (config)");
-             ELOG (mcm->elog_main, e, 0);
-           }
-         return;
-       }
-
-      now = vlib_time_now (vm);
-      if (now >= time_last_master_assert + 1)
-       {
-         time_last_master_assert = now;
-         mp = mc_get_vlib_buffer (mcm->vlib_main, sizeof (mp[0]), &bi);
-
-         mp->peer_id = mcm->transport.our_ack_peer_id;
-         mp->global_sequence = mcm->relay_global_sequence;
-
-         /*
-          * these messages clog the event log, set MC_EVENT_LOGGING higher
-          * if you want them
-          */
-         if (MC_EVENT_LOGGING > 1)
-           {
-              /* *INDENT-OFF* */
-             ELOG_TYPE_DECLARE (e) =
-                {
-                  .format = "tx-massert: peer %s global seq %u",
-                  .format_args = "T4i4",
-                };
-              /* *INDENT-ON* */
-             struct
-             {
-               u32 peer, global_sequence;
-             } *ed;
-             ed = ELOG_DATA (mcm->elog_main, e);
-             ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
-             ed->global_sequence = mp->global_sequence;
-           }
-
-         mc_byte_swap_msg_master_assert (mp);
-
-         error =
-           mcm->transport.tx_buffer (mcm->transport.opaque,
-                                     MC_TRANSPORT_MASTERSHIP, bi);
-         if (error)
-           clib_error_report (error);
-       }
-
-      vlib_process_wait_for_event_or_clock (vm, 1.0);
-      event_type = vlib_process_get_events (vm, /* no event data */ 0);
-
-      switch (event_type)
-       {
-       case ~0:
-         if (!is_master && timeouts++ > 2)
-           {
-             mcm->relay_state = MC_RELAY_STATE_MASTER;
-             mcm->relay_master_peer_id =
-               mcm->transport.our_ack_peer_id.as_u64;
-             if (MC_EVENT_LOGGING)
-               {
-                 ELOG_TYPE (e, "become master (was maybe_master)");
-                 ELOG (mcm->elog_main, e, 0);
-               }
-             return;
-           }
-         break;
-
-       case MC_RELAY_STATE_SLAVE:
-         mcm->relay_state = MC_RELAY_STATE_SLAVE;
-         if (MC_EVENT_LOGGING && mcm->relay_state != MC_RELAY_STATE_SLAVE)
-           {
-             ELOG_TYPE (e, "become slave (was maybe_master)");
-             ELOG (mcm->elog_main, e, 0);
-           }
-         return;
-       }
-    }
-}
-
-static void
-this_node_slave (mc_main_t * mcm)
-{
-  vlib_main_t *vm = mcm->vlib_main;
-  uword event_type;
-  int timeouts = 0;
-
-  if (MC_EVENT_LOGGING)
-    {
-      ELOG_TYPE (e, "become slave");
-      ELOG (mcm->elog_main, e, 0);
-    }
-
-  while (1)
-    {
-      vlib_process_wait_for_event_or_clock (vm, 1.0);
-      event_type = vlib_process_get_events (vm, /* no event data */ 0);
-
-      switch (event_type)
-       {
-       case ~0:
-         if (timeouts++ > 2)
-           {
-             mcm->relay_state = MC_RELAY_STATE_NEGOTIATE;
-             mcm->relay_master_peer_id = ~0ULL;
-             if (MC_EVENT_LOGGING)
-               {
-                 ELOG_TYPE (e, "timeouts; negoitate mastership");
-                 ELOG (mcm->elog_main, e, 0);
-               }
-             return;
-           }
-         break;
-
-       case MC_RELAY_STATE_SLAVE:
-         mcm->relay_state = MC_RELAY_STATE_SLAVE;
-         timeouts = 0;
-         break;
-       }
-    }
-}
-
-static uword
-mc_mastership_process (vlib_main_t * vm,
-                      vlib_node_runtime_t * node, vlib_frame_t * f)
-{
-  mc_main_t *mcm = mc_node_get_main (node);
-
-  while (1)
-    {
-      switch (mcm->relay_state)
-       {
-       case MC_RELAY_STATE_NEGOTIATE:
-       case MC_RELAY_STATE_MASTER:
-         this_node_maybe_master (mcm);
-         break;
-
-       case MC_RELAY_STATE_SLAVE:
-         this_node_slave (mcm);
-         break;
-       }
-    }
-  return 0;                    /* not likely */
-}
-
-void
-mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master)
-{
-  if (we_can_be_master != mcm->we_can_be_relay_master)
-    {
-      mcm->we_can_be_relay_master = we_can_be_master;
-      vlib_process_signal_event (mcm->vlib_main,
-                                mcm->mastership_process,
-                                MC_RELAY_STATE_NEGOTIATE, 0);
-    }
-}
-
-void
-mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp,
-                             u32 buffer_index)
-{
-  mc_peer_id_t his_peer_id, our_peer_id;
-  i32 seq_cmp_result;
-  u8 signal_slave = 0;
-  u8 update_global_sequence = 0;
-
-  mc_byte_swap_msg_master_assert (mp);
-
-  his_peer_id = mp->peer_id;
-  our_peer_id = mcm->transport.our_ack_peer_id;
-
-  /* compare the incoming global sequence with ours */
-  seq_cmp_result = mc_seq_cmp (mp->global_sequence,
-                              mcm->relay_global_sequence);
-
-  /* If the sender has a lower peer id and the sender's sequence >=
-     our global sequence, we become a slave.  Otherwise we are master. */
-  if (mc_peer_id_compare (his_peer_id, our_peer_id) < 0
-      && seq_cmp_result >= 0)
-    {
-      vlib_process_signal_event (mcm->vlib_main,
-                                mcm->mastership_process,
-                                MC_RELAY_STATE_SLAVE, 0);
-      signal_slave = 1;
-    }
-
-  /* Update our global sequence. */
-  if (seq_cmp_result > 0)
-    {
-      mcm->relay_global_sequence = mp->global_sequence;
-      update_global_sequence = 1;
-    }
-
-  {
-    uword *q = mhash_get (&mcm->mastership_peer_index_by_id, &his_peer_id);
-    mc_mastership_peer_t *p;
-
-    if (q)
-      p = vec_elt_at_index (mcm->mastership_peers, q[0]);
-    else
-      {
-       vec_add2 (mcm->mastership_peers, p, 1);
-       p->peer_id = his_peer_id;
-       mhash_set (&mcm->mastership_peer_index_by_id, &p->peer_id,
-                  p - mcm->mastership_peers,
-                  /* old_value */ 0);
-      }
-    p->time_last_master_assert_received = vlib_time_now (mcm->vlib_main);
-  }
-
-  /*
-   * these messages clog the event log, set MC_EVENT_LOGGING higher
-   * if you want them.
-   */
-  if (MC_EVENT_LOGGING > 1)
-    {
-      /* *INDENT-OFF* */
-      ELOG_TYPE_DECLARE (e) =
-        {
-          .format = "rx-massert: peer %s global seq %u upd %d slave %d",
-          .format_args = "T4i4i1i1",
-        };
-      /* *INDENT-ON* */
-
-      struct
-      {
-       u32 peer;
-       u32 global_sequence;
-       u8 update_sequence;
-       u8 slave;
-      } *ed;
-      ed = ELOG_DATA (mcm->elog_main, e);
-      ed->peer = elog_id_for_peer_id (mcm, his_peer_id.as_u64);
-      ed->global_sequence = mp->global_sequence;
-      ed->update_sequence = update_global_sequence;
-      ed->slave = signal_slave;
-    }
-}
-
-static void
-mc_serialize_init (mc_main_t * mcm)
-{
-  mc_serialize_msg_t *m;
-  vlib_main_t *vm = vlib_get_main ();
-
-  mcm->global_msg_index_by_name
-    = hash_create_string ( /* elts */ 0, sizeof (uword));
-
-  m = vm->mc_msg_registrations;
-
-  while (m)
-    {
-      m->global_index = vec_len (mcm->global_msgs);
-      hash_set_mem (mcm->global_msg_index_by_name, m->name, m->global_index);
-      vec_add1 (mcm->global_msgs, m);
-      m = m->next_registration;
-    }
-}
-
-clib_error_t *
-mc_serialize_va (mc_main_t * mc,
-                u32 stream_index,
-                u32 multiple_messages_per_vlib_buffer,
-                mc_serialize_msg_t * msg, va_list * va)
-{
-  mc_stream_t *s;
-  clib_error_t *error;
-  serialize_main_t *m = &mc->serialize_mains[VLIB_TX];
-  vlib_serialize_buffer_main_t *sbm = &mc->serialize_buffer_mains[VLIB_TX];
-  u32 bi, n_before, n_after, n_total, n_this_msg;
-  u32 si, gi;
-
-  if (!sbm->vlib_main)
-    {
-      sbm->tx.max_n_data_bytes_per_chain = 4096;
-      sbm->tx.free_list_index = VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX;
-    }
-
-  if (sbm->first_buffer == 0)
-    serialize_open_vlib_buffer (m, mc->vlib_main, sbm);
-
-  n_before = serialize_vlib_buffer_n_bytes (m);
-
-  s = mc_stream_by_index (mc, stream_index);
-  gi = msg->global_index;
-  ASSERT (msg == vec_elt (mc->global_msgs, gi));
-
-  si = ~0;
-  if (gi < vec_len (s->stream_msg_index_by_global_index))
-    si = s->stream_msg_index_by_global_index[gi];
-
-  serialize_likely_small_unsigned_integer (m, si);
-
-  /* For first time message is sent, use name to identify message. */
-  if (si == ~0 || MSG_ID_DEBUG)
-    serialize_cstring (m, msg->name);
-
-  if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
-    {
-      /* *INDENT-OFF* */
-      ELOG_TYPE_DECLARE (e) =
-        {
-          .format = "serialize-msg: %s index %d",
-          .format_args = "T4i4",
-        };
-      /* *INDENT-ON* */
-      struct
-      {
-       u32 c[2];
-      } *ed;
-      ed = ELOG_DATA (mc->elog_main, e);
-      ed->c[0] = elog_id_for_msg_name (mc, msg->name);
-      ed->c[1] = si;
-    }
-
-  error = va_serialize (m, va);
-
-  n_after = serialize_vlib_buffer_n_bytes (m);
-  n_this_msg = n_after - n_before;
-  n_total = n_after + sizeof (mc_msg_user_request_t);
-
-  /* For max message size ignore first message where string name is sent. */
-  if (si != ~0)
-    msg->max_n_bytes_serialized =
-      clib_max (msg->max_n_bytes_serialized, n_this_msg);
-
-  if (!multiple_messages_per_vlib_buffer
-      || si == ~0
-      || n_total + msg->max_n_bytes_serialized >
-      mc->transport.max_packet_size)
-    {
-      bi = serialize_close_vlib_buffer (m);
-      sbm->first_buffer = 0;
-      if (!error)
-       mc_stream_send (mc, stream_index, bi);
-      else if (bi != ~0)
-       vlib_buffer_free_one (mc->vlib_main, bi);
-    }
-
-  return error;
-}
-
-clib_error_t *
-mc_serialize_internal (mc_main_t * mc,
-                      u32 stream_index,
-                      u32 multiple_messages_per_vlib_buffer,
-                      mc_serialize_msg_t * msg, ...)
-{
-  vlib_main_t *vm = mc->vlib_main;
-  va_list va;
-  clib_error_t *error;
-
-  if (stream_index == ~0)
-    {
-      if (vm->mc_main && vm->mc_stream_index == ~0)
-       vlib_current_process_wait_for_one_time_event_vector
-         (vm, &vm->procs_waiting_for_mc_stream_join);
-      stream_index = vm->mc_stream_index;
-    }
-
-  va_start (va, msg);
-  error = mc_serialize_va (mc, stream_index,
-                          multiple_messages_per_vlib_buffer, msg, &va);
-  va_end (va);
-  return error;
-}
-
-uword
-mc_unserialize_message (mc_main_t * mcm,
-                       mc_stream_t * s, serialize_main_t * m)
-{
-  mc_serialize_stream_msg_t *sm;
-  u32 gi, si;
-
-  si = unserialize_likely_small_unsigned_integer (m);
-
-  if (!(si == ~0 || MSG_ID_DEBUG))
-    {
-      sm = vec_elt_at_index (s->stream_msgs, si);
-      gi = sm->global_index;
-    }
-  else
-    {
-      char *name;
-
-      unserialize_cstring (m, &name);
-
-      if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
-       {
-          /* *INDENT-OFF* */
-         ELOG_TYPE_DECLARE (e) =
-            {
-              .format = "unserialize-msg: %s rx index %d",
-              .format_args = "T4i4",
-            };
-          /* *INDENT-ON* */
-         struct
-         {
-           u32 c[2];
-         } *ed;
-         ed = ELOG_DATA (mcm->elog_main, e);
-         ed->c[0] = elog_id_for_msg_name (mcm, name);
-         ed->c[1] = si;
-       }
-
-      {
-       uword *p = hash_get_mem (mcm->global_msg_index_by_name, name);
-       gi = p ? p[0] : ~0;
-      }
-
-      /* Unknown message? */
-      if (gi == ~0)
-       {
-         vec_free (name);
-         goto done;
-       }
-
-      vec_validate_init_empty (s->stream_msg_index_by_global_index, gi, ~0);
-      si = s->stream_msg_index_by_global_index[gi];
-
-      /* Stream local index unknown?  Create it. */
-      if (si == ~0)
-       {
-         vec_add2 (s->stream_msgs, sm, 1);
-
-         si = sm - s->stream_msgs;
-         sm->global_index = gi;
-         s->stream_msg_index_by_global_index[gi] = si;
-
-         if (MC_EVENT_LOGGING > 0)
-           {
-              /* *INDENT-OFF* */
-             ELOG_TYPE_DECLARE (e) =
-                {
-                  .format = "msg-bind: stream %d %s to index %d",
-                  .format_args = "i4T4i4",
-                };
-              /* *INDENT-ON* */
-             struct
-             {
-               u32 c[3];
-             } *ed;
-             ed = ELOG_DATA (mcm->elog_main, e);
-             ed->c[0] = s->index;
-             ed->c[1] = elog_id_for_msg_name (mcm, name);
-             ed->c[2] = si;
-           }
-       }
-      else
-       {
-         sm = vec_elt_at_index (s->stream_msgs, si);
-         if (gi != sm->global_index && MC_EVENT_LOGGING > 0)
-           {
-              /* *INDENT-OFF* */
-             ELOG_TYPE_DECLARE (e) =
-                {
-                  .format = "msg-id-ERROR: %s index %d expected %d",
-                  .format_args = "T4i4i4",
-                };
-              /* *INDENT-ON* */
-             struct
-             {
-               u32 c[3];
-             } *ed;
-             ed = ELOG_DATA (mcm->elog_main, e);
-             ed->c[0] = elog_id_for_msg_name (mcm, name);
-             ed->c[1] = si;
-             ed->c[2] = ~0;
-             if (sm->global_index <
-                 vec_len (s->stream_msg_index_by_global_index))
-               ed->c[2] =
-                 s->stream_msg_index_by_global_index[sm->global_index];
-           }
-       }
-
-      vec_free (name);
-    }
-
-  if (gi != ~0)
-    {
-      mc_serialize_msg_t *msg;
-      msg = vec_elt (mcm->global_msgs, gi);
-      unserialize (m, msg->unserialize, mcm);
-    }
-
-done:
-  return gi != ~0;
-}
-
-void
-mc_unserialize_internal (mc_main_t * mcm, u32 stream_and_buffer_index)
-{
-  vlib_main_t *vm = mcm->vlib_main;
-  serialize_main_t *m = &mcm->serialize_mains[VLIB_RX];
-  vlib_serialize_buffer_main_t *sbm = &mcm->serialize_buffer_mains[VLIB_RX];
-  mc_stream_and_buffer_t *sb;
-  mc_stream_t *stream;
-  u32 buffer_index;
-
-  sb =
-    pool_elt_at_index (mcm->mc_unserialize_stream_and_buffers,
-                      stream_and_buffer_index);
-  buffer_index = sb->buffer_index;
-  stream = vec_elt_at_index (mcm->stream_vector, sb->stream_index);
-  pool_put (mcm->mc_unserialize_stream_and_buffers, sb);
-
-  if (stream->config.save_snapshot)
-    {
-      u32 n_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index);
-      static u8 *contents;
-      vec_reset_length (contents);
-      vec_validate (contents, n_bytes - 1);
-      vlib_buffer_contents (vm, buffer_index, contents);
-      stream->config.save_snapshot (mcm, /* is_catchup */ 0, contents,
-                                   n_bytes);
-    }
-
-  ASSERT (vlib_in_process_context (vm));
-
-  unserialize_open_vlib_buffer (m, vm, sbm);
-
-  clib_fifo_add1 (sbm->rx.buffer_fifo, buffer_index);
-
-  while (unserialize_vlib_buffer_n_bytes (m) > 0)
-    mc_unserialize_message (mcm, stream, m);
-
-  /* Frees buffer. */
-  unserialize_close_vlib_buffer (m);
-}
-
-void
-mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index)
-{
-  vlib_main_t *vm = mcm->vlib_main;
-  mc_stream_and_buffer_t *sb;
-  pool_get (mcm->mc_unserialize_stream_and_buffers, sb);
-  sb->stream_index = s->index;
-  sb->buffer_index = buffer_index;
-  vlib_process_signal_event (vm, mcm->unserialize_process,
-                            EVENT_MC_UNSERIALIZE_BUFFER,
-                            sb - mcm->mc_unserialize_stream_and_buffers);
-}
-
-static uword
-mc_unserialize_process (vlib_main_t * vm,
-                       vlib_node_runtime_t * node, vlib_frame_t * f)
-{
-  mc_main_t *mcm = mc_node_get_main (node);
-  uword event_type, *event_data = 0;
-  int i;
-
-  while (1)
-    {
-      if (event_data)
-       _vec_len (event_data) = 0;
-
-      vlib_process_wait_for_event (vm);
-      event_type = vlib_process_get_events (vm, &event_data);
-      switch (event_type)
-       {
-       case EVENT_MC_UNSERIALIZE_BUFFER:
-         for (i = 0; i < vec_len (event_data); i++)
-           mc_unserialize_internal (mcm, event_data[i]);
-         break;
-
-       case EVENT_MC_UNSERIALIZE_CATCHUP:
-         for (i = 0; i < vec_len (event_data); i++)
-           {
-             u8 *mp = uword_to_pointer (event_data[i], u8 *);
-             perform_catchup (mcm, (void *) mp);
-             vec_free (mp);
-           }
-         break;
-
-       default:
-         break;
-       }
-    }
-
-  return 0;                    /* not likely */
-}
-
-void
-serialize_mc_main (serialize_main_t * m, va_list * va)
-{
-  mc_main_t *mcm = va_arg (*va, mc_main_t *);
-  mc_stream_t *s;
-  mc_serialize_stream_msg_t *sm;
-  mc_serialize_msg_t *msg;
-
-  serialize_integer (m, vec_len (mcm->stream_vector), sizeof (u32));
-  vec_foreach (s, mcm->stream_vector)
-  {
-    /* Stream name. */
-    serialize_cstring (m, s->config.name);
-
-    /* Serialize global names for all sent messages. */
-    serialize_integer (m, vec_len (s->stream_msgs), sizeof (u32));
-    vec_foreach (sm, s->stream_msgs)
-    {
-      msg = vec_elt (mcm->global_msgs, sm->global_index);
-      serialize_cstring (m, msg->name);
-    }
-  }
-}
-
-void
-unserialize_mc_main (serialize_main_t * m, va_list * va)
-{
-  mc_main_t *mcm = va_arg (*va, mc_main_t *);
-  u32 i, n_streams, n_stream_msgs;
-  char *name;
-  mc_stream_t *s;
-  mc_serialize_stream_msg_t *sm;
-
-  unserialize_integer (m, &n_streams, sizeof (u32));
-  for (i = 0; i < n_streams; i++)
-    {
-      unserialize_cstring (m, &name);
-      if (i != MC_STREAM_INDEX_INTERNAL && !mc_stream_by_name (mcm, name))
-       {
-         vec_validate (mcm->stream_vector, i);
-         s = vec_elt_at_index (mcm->stream_vector, i);
-         mc_stream_init (s);
-         s->index = s - mcm->stream_vector;
-         s->config.name = name;
-         s->state = MC_STREAM_STATE_name_known;
-         hash_set_mem (mcm->stream_index_by_name, s->config.name, s->index);
-       }
-      else
-       vec_free (name);
-
-      s = vec_elt_at_index (mcm->stream_vector, i);
-
-      vec_free (s->stream_msgs);
-      vec_free (s->stream_msg_index_by_global_index);
-
-      unserialize_integer (m, &n_stream_msgs, sizeof (u32));
-      vec_resize (s->stream_msgs, n_stream_msgs);
-      vec_foreach (sm, s->stream_msgs)
-      {
-       uword *p;
-       u32 si, gi;
-
-       unserialize_cstring (m, &name);
-       p = hash_get (mcm->global_msg_index_by_name, name);
-       gi = p ? p[0] : ~0;
-       si = sm - s->stream_msgs;
-
-       if (MC_EVENT_LOGGING > 0)
-         {
-            /* *INDENT-OFF* */
-           ELOG_TYPE_DECLARE (e) =
-              {
-                .format = "catchup-bind: %s to %d global index %d stream %d",
-                .format_args = "T4i4i4i4",
-              };
-            /* *INDENT-ON* */
-
-           struct
-           {
-             u32 c[4];
-           } *ed;
-           ed = ELOG_DATA (mcm->elog_main, e);
-           ed->c[0] = elog_id_for_msg_name (mcm, name);
-           ed->c[1] = si;
-           ed->c[2] = gi;
-           ed->c[3] = s->index;
-         }
-
-       vec_free (name);
-
-       sm->global_index = gi;
-       if (gi != ~0)
-         {
-           vec_validate_init_empty (s->stream_msg_index_by_global_index,
-                                    gi, ~0);
-           s->stream_msg_index_by_global_index[gi] = si;
-         }
-      }
-    }
-}
-
-void
-mc_main_init (mc_main_t * mcm, char *tag)
-{
-  vlib_main_t *vm = vlib_get_main ();
-
-  mcm->vlib_main = vm;
-  mcm->elog_main = &vm->elog_main;
-
-  mcm->relay_master_peer_id = ~0ULL;
-  mcm->relay_state = MC_RELAY_STATE_NEGOTIATE;
-
-  mcm->stream_index_by_name
-    = hash_create_string ( /* elts */ 0, /* value size */ sizeof (uword));
-
-  {
-    vlib_node_registration_t r;
-
-    memset (&r, 0, sizeof (r));
-
-    r.type = VLIB_NODE_TYPE_PROCESS;
-
-    /* Point runtime data to main instance. */
-    r.runtime_data = &mcm;
-    r.runtime_data_bytes = sizeof (&mcm);
-
-    r.name = (char *) format (0, "mc-mastership-%s", tag);
-    r.function = mc_mastership_process;
-    mcm->mastership_process = vlib_register_node (vm, &r);
-
-    r.name = (char *) format (0, "mc-join-ager-%s", tag);
-    r.function = mc_join_ager_process;
-    mcm->join_ager_process = vlib_register_node (vm, &r);
-
-    r.name = (char *) format (0, "mc-retry-%s", tag);
-    r.function = mc_retry_process;
-    mcm->retry_process = vlib_register_node (vm, &r);
-
-    r.name = (char *) format (0, "mc-catchup-%s", tag);
-    r.function = mc_catchup_process;
-    mcm->catchup_process = vlib_register_node (vm, &r);
-
-    r.name = (char *) format (0, "mc-unserialize-%s", tag);
-    r.function = mc_unserialize_process;
-    mcm->unserialize_process = vlib_register_node (vm, &r);
-  }
-
-  if (MC_EVENT_LOGGING > 0)
-    mhash_init (&mcm->elog_id_by_peer_id, sizeof (uword),
-               sizeof (mc_peer_id_t));
-
-  mhash_init (&mcm->mastership_peer_index_by_id, sizeof (uword),
-             sizeof (mc_peer_id_t));
-  mc_serialize_init (mcm);
-}
-
-static u8 *
-format_mc_relay_state (u8 * s, va_list * args)
-{
-  mc_relay_state_t state = va_arg (*args, mc_relay_state_t);
-  char *t = 0;
-  switch (state)
-    {
-    case MC_RELAY_STATE_NEGOTIATE:
-      t = "negotiate";
-      break;
-    case MC_RELAY_STATE_MASTER:
-      t = "master";
-      break;
-    case MC_RELAY_STATE_SLAVE:
-      t = "slave";
-      break;
-    default:
-      return format (s, "unknown 0x%x", state);
-    }
-
-  return format (s, "%s", t);
-}
-
-static u8 *
-format_mc_stream_state (u8 * s, va_list * args)
-{
-  mc_stream_state_t state = va_arg (*args, mc_stream_state_t);
-  char *t = 0;
-  switch (state)
-    {
-#define _(f) case MC_STREAM_STATE_##f: t = #f; break;
-      foreach_mc_stream_state
-#undef _
-    default:
-      return format (s, "unknown 0x%x", state);
-    }
-
-  return format (s, "%s", t);
-}
-
-static int
-mc_peer_comp (void *a1, void *a2)
-{
-  mc_stream_peer_t *p1 = a1;
-  mc_stream_peer_t *p2 = a2;
-
-  return mc_peer_id_compare (p1->id, p2->id);
-}
-
-u8 *
-format_mc_main (u8 * s, va_list * args)
-{
-  mc_main_t *mcm = va_arg (*args, mc_main_t *);
-  mc_stream_t *t;
-  mc_stream_peer_t *p, *ps;
-  u32 indent = format_get_indent (s);
-
-  s = format (s, "MC state %U, %d streams joined, global sequence 0x%x",
-             format_mc_relay_state, mcm->relay_state,
-             vec_len (mcm->stream_vector), mcm->relay_global_sequence);
-
-  {
-    mc_mastership_peer_t *mp;
-    f64 now = vlib_time_now (mcm->vlib_main);
-    s = format (s, "\n%UMost recent mastership peers:",
-               format_white_space, indent + 2);
-    vec_foreach (mp, mcm->mastership_peers)
-    {
-      s = format (s, "\n%U%-30U%.4e",
-                 format_white_space, indent + 4,
-                 mcm->transport.format_peer_id, mp->peer_id,
-                 now - mp->time_last_master_assert_received);
-    }
-  }
-
-  vec_foreach (t, mcm->stream_vector)
-  {
-    s = format (s, "\n%Ustream `%s' index %d",
-               format_white_space, indent + 2, t->config.name, t->index);
-
-    s = format (s, "\n%Ustate %U",
-               format_white_space, indent + 4,
-               format_mc_stream_state, t->state);
-
-    s =
-      format (s,
-             "\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent",
-             format_white_space, indent + 4, t->config.retry_interval,
-             t->config.retry_limit, pool_elts (t->retry_pool),
-             t->stats.n_retries - t->stats_last_clear.n_retries);
-
-    s = format (s, "\n%U%Ld/%Ld user requests sent/received",
-               format_white_space, indent + 4,
-               t->user_requests_sent, t->user_requests_received);
-
-    s = format (s, "\n%U%d peers, local/global sequence 0x%x/0x%x",
-               format_white_space, indent + 4,
-               pool_elts (t->peers),
-               t->our_local_sequence, t->last_global_sequence_processed);
-
-    ps = 0;
-    /* *INDENT-OFF* */
-    pool_foreach (p, t->peers,
-    ({
-      if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers))
-        vec_add1 (ps, p[0]);
-    }));
-    /* *INDENT-ON* */
-    vec_sort_with_function (ps, mc_peer_comp);
-    s = format (s, "\n%U%=30s%10s%16s%16s",
-               format_white_space, indent + 6,
-               "Peer", "Last seq", "Retries", "Future");
-
-    vec_foreach (p, ps)
-    {
-      s = format (s, "\n%U%-30U0x%08x%16Ld%16Ld%s",
-                 format_white_space, indent + 6,
-                 mcm->transport.format_peer_id, p->id.as_u64,
-                 p->last_sequence_received,
-                 p->stats.n_msgs_from_past -
-                 p->stats_last_clear.n_msgs_from_past,
-                 p->stats.n_msgs_from_future -
-                 p->stats_last_clear.n_msgs_from_future,
-                 (mcm->transport.our_ack_peer_id.as_u64 ==
-                  p->id.as_u64 ? " (self)" : ""));
-    }
-    vec_free (ps);
-  }
-
-  return s;
-}
-
-/*
- * fd.io coding-style-patch-verification: ON
- *
- * Local Variables:
- * eval: (c-set-style "gnu")
- * End:
- */
diff --git a/src/vlib/mc.h b/src/vlib/mc.h
deleted file mode 100644 (file)
index 28f9435..0000000
+++ /dev/null
@@ -1,695 +0,0 @@
-/*
- * mc.h: vlib reliable sequenced multicast distributed applications
- *
- * Copyright (c) 2010 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef included_vlib_mc_h
-#define included_vlib_mc_h
-
-#include <vppinfra/elog.h>
-#include <vppinfra/fifo.h>
-#include <vppinfra/mhash.h>
-#include <vlib/node.h>
-
-#ifndef MC_EVENT_LOGGING
-#define MC_EVENT_LOGGING 1
-#endif
-
-always_inline uword
-mc_need_byte_swap (void)
-{
-  return CLIB_ARCH_IS_LITTLE_ENDIAN;
-}
-
-/*
- * Used to uniquely identify hosts.
- * For IP4 this would be ip4_address plus tcp/udp port.
- */
-typedef union
-{
-  u8 as_u8[8];
-  u64 as_u64;
-} mc_peer_id_t;
-
-always_inline mc_peer_id_t
-mc_byte_swap_peer_id (mc_peer_id_t i)
-{
-  /* Peer id is already in network byte order. */
-  return i;
-}
-
-always_inline int
-mc_peer_id_compare (mc_peer_id_t a, mc_peer_id_t b)
-{
-  return memcmp (a.as_u8, b.as_u8, sizeof (a.as_u8));
-}
-
-/* Assert mastership.  Lowest peer_id amount all peers wins mastership.
-   Only sent/received over mastership channel (MC_TRANSPORT_MASTERSHIP).
-   So, we don't need a message opcode. */
-typedef CLIB_PACKED (struct
-                    {
-                    /* Peer id asserting mastership. */
-                    mc_peer_id_t peer_id;
-                    /* Global sequence number asserted. */
-                    u32 global_sequence;}) mc_msg_master_assert_t;
-
-always_inline void
-mc_byte_swap_msg_master_assert (mc_msg_master_assert_t * r)
-{
-  if (mc_need_byte_swap ())
-    {
-      r->peer_id = mc_byte_swap_peer_id (r->peer_id);
-      r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
-    }
-}
-
-#define foreach_mc_msg_type                    \
-  _ (master_assert)                            \
-  _ (join_or_leave_request)                    \
-  _ (join_reply)                               \
-  _ (user_request)                             \
-  _ (user_ack)                                 \
-  _ (catchup_request)                          \
-  _ (catchup_reply)
-
-typedef enum
-{
-#define _(f) MC_MSG_TYPE_##f,
-  foreach_mc_msg_type
-#undef _
-} mc_relay_msg_type_t;
-
-/* Request to join a given stream.  Multicast over MC_TRANSPORT_JOIN. */
-typedef CLIB_PACKED (struct
-                    {
-mc_peer_id_t peer_id; mc_relay_msg_type_t type:32;
-                    /* MC_MSG_TYPE_join_or_leave_request */
-                    /* Stream to join or leave. */
-                    u32 stream_index;
-                    /* join = 1, leave = 0 */
-                    u8 is_join;}) mc_msg_join_or_leave_request_t;
-
-always_inline void
-mc_byte_swap_msg_join_or_leave_request (mc_msg_join_or_leave_request_t * r)
-{
-  if (mc_need_byte_swap ())
-    {
-      r->peer_id = mc_byte_swap_peer_id (r->peer_id);
-      r->type = clib_byte_swap_u32 (r->type);
-      r->stream_index = clib_byte_swap_u32 (r->stream_index);
-    }
-}
-
-/* Join reply.  Multicast over MC_TRANSPORT_JOIN. */
-typedef CLIB_PACKED (struct
-                    {
-mc_peer_id_t peer_id; mc_relay_msg_type_t type:32;
-                    /* MC_MSG_TYPE_join_reply */
-                    u32 stream_index;
-                    /* Peer ID to contact to catchup with this stream. */
-                    mc_peer_id_t catchup_peer_id;}) mc_msg_join_reply_t;
-
-always_inline void
-mc_byte_swap_msg_join_reply (mc_msg_join_reply_t * r)
-{
-  if (mc_need_byte_swap ())
-    {
-      r->peer_id = mc_byte_swap_peer_id (r->peer_id);
-      r->type = clib_byte_swap_u32 (r->type);
-      r->stream_index = clib_byte_swap_u32 (r->stream_index);
-      r->catchup_peer_id = mc_byte_swap_peer_id (r->catchup_peer_id);
-    }
-}
-
-/* Generic (application) request.  Multicast over MC_TRANSPORT_USER_REQUEST_TO_RELAY and then
-   relayed by relay master after filling in global sequence number. */
-typedef CLIB_PACKED (struct
-                    {
-                    mc_peer_id_t peer_id; u32 stream_index;
-                    /* Global sequence number as filled in by relay master. */
-                    u32 global_sequence;
-                    /* Local sequence number as filled in by peer sending message. */
-                    u32 local_sequence;
-                    /* Size of request data. */
-                    u32 n_data_bytes;
-                    /* Opaque request data. */
-                    u8 data[0];}) mc_msg_user_request_t;
-
-always_inline void
-mc_byte_swap_msg_user_request (mc_msg_user_request_t * r)
-{
-  if (mc_need_byte_swap ())
-    {
-      r->peer_id = mc_byte_swap_peer_id (r->peer_id);
-      r->stream_index = clib_byte_swap_u32 (r->stream_index);
-      r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
-      r->local_sequence = clib_byte_swap_u32 (r->local_sequence);
-      r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes);
-    }
-}
-
-/* Sent unicast over ACK channel. */
-typedef CLIB_PACKED (struct
-                    {
-                    mc_peer_id_t peer_id;
-                    u32 global_sequence; u32 stream_index;
-                    u32 local_sequence;
-                    i32 seq_cmp_result;}) mc_msg_user_ack_t;
-
-always_inline void
-mc_byte_swap_msg_user_ack (mc_msg_user_ack_t * r)
-{
-  if (mc_need_byte_swap ())
-    {
-      r->peer_id = mc_byte_swap_peer_id (r->peer_id);
-      r->stream_index = clib_byte_swap_u32 (r->stream_index);
-      r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
-      r->local_sequence = clib_byte_swap_u32 (r->local_sequence);
-      r->seq_cmp_result = clib_byte_swap_i32 (r->seq_cmp_result);
-    }
-}
-
-/* Sent/received unicast over catchup channel (e.g. using TCP). */
-typedef CLIB_PACKED (struct
-                    {
-                    mc_peer_id_t peer_id;
-                    u32 stream_index;}) mc_msg_catchup_request_t;
-
-always_inline void
-mc_byte_swap_msg_catchup_request (mc_msg_catchup_request_t * r)
-{
-  if (mc_need_byte_swap ())
-    {
-      r->peer_id = mc_byte_swap_peer_id (r->peer_id);
-      r->stream_index = clib_byte_swap_u32 (r->stream_index);
-    }
-}
-
-/* Sent/received unicast over catchup channel. */
-typedef CLIB_PACKED (struct
-                    {
-                    mc_peer_id_t peer_id; u32 stream_index;
-                    /* Last global sequence number included in catchup data. */
-                    u32 last_global_sequence_included;
-                    /* Size of catchup data. */
-                    u32 n_data_bytes;
-                    /* Catchup data. */
-                    u8 data[0];}) mc_msg_catchup_reply_t;
-
-always_inline void
-mc_byte_swap_msg_catchup_reply (mc_msg_catchup_reply_t * r)
-{
-  if (mc_need_byte_swap ())
-    {
-      r->peer_id = mc_byte_swap_peer_id (r->peer_id);
-      r->stream_index = clib_byte_swap_u32 (r->stream_index);
-      r->last_global_sequence_included =
-       clib_byte_swap_u32 (r->last_global_sequence_included);
-      r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes);
-    }
-}
-
-typedef struct _mc_serialize_msg
-{
-  /* Name for this type. */
-  char *name;
-
-  /* Functions to serialize/unserialize data. */
-  serialize_function_t *serialize;
-  serialize_function_t *unserialize;
-
-  /* Maximum message size in bytes when serialized.
-     If zero then this will be set to the largest sent message. */
-  u32 max_n_bytes_serialized;
-
-  /* Opaque to use for first argument to serialize/unserialize function. */
-  u32 opaque;
-
-  /* Index in global message vector. */
-  u32 global_index;
-
-  /* Registration list */
-  struct _mc_serialize_msg *next_registration;
-} mc_serialize_msg_t;
-
-typedef struct
-{
-  /* Index into global message vector. */
-  u32 global_index;
-} mc_serialize_stream_msg_t;
-
-#define MC_SERIALIZE_MSG(x,...)                                 \
-    __VA_ARGS__ mc_serialize_msg_t x;                           \
-static void __mc_serialize_msg_registration_##x (void)          \
-    __attribute__((__constructor__)) ;                          \
-static void __mc_serialize_msg_registration_##x (void)          \
-{                                                               \
-    vlib_main_t * vm = vlib_get_main();                         \
-    x.next_registration = vm->mc_msg_registrations;             \
-    vm->mc_msg_registrations = &x;                              \
-}                                                               \
-static void __mc_serialize_msg_unregistration_##x (void)        \
-    __attribute__((__destructor__)) ;                           \
-static void __mc_serialize_msg_unregistration_##x (void)        \
-{                                                               \
-    vlib_main_t * vm = vlib_get_main();                         \
-    VLIB_REMOVE_FROM_LINKED_LIST (vm->mc_msg_registrations, &x, \
-                                  next_registration);           \
-}                                                               \
-__VA_ARGS__ mc_serialize_msg_t x
-
-typedef enum
-{
-  MC_TRANSPORT_MASTERSHIP,
-  MC_TRANSPORT_JOIN,
-  MC_TRANSPORT_USER_REQUEST_TO_RELAY,
-  MC_TRANSPORT_USER_REQUEST_FROM_RELAY,
-  MC_N_TRANSPORT_TYPE,
-} mc_transport_type_t;
-
-typedef struct
-{
-  clib_error_t *(*tx_buffer) (void *opaque, mc_transport_type_t type,
-                             u32 buffer_index);
-
-  clib_error_t *(*tx_ack) (void *opaque, mc_peer_id_t peer_id,
-                          u32 buffer_index);
-
-  /* Returns catchup opaque. */
-    uword (*catchup_request_fun) (void *opaque, u32 stream_index,
-                                 mc_peer_id_t catchup_peer_id);
-
-  void (*catchup_send_fun) (void *opaque, uword catchup_opaque,
-                           u8 * data_vector);
-
-  /* Opaque passed to callbacks. */
-  void *opaque;
-
-  mc_peer_id_t our_ack_peer_id;
-  mc_peer_id_t our_catchup_peer_id;
-
-  /* Max packet size (MTU) for this transport.
-     For IP this is interface MTU less IP + UDP header size. */
-  u32 max_packet_size;
-
-  format_function_t *format_peer_id;
-} mc_transport_t;
-
-typedef struct
-{
-  /* Count of messages received from this peer from the past/future
-     (with seq_cmp != 0). */
-  u64 n_msgs_from_past;
-  u64 n_msgs_from_future;
-} mc_stream_peer_stats_t;
-
-typedef struct
-{
-  /* ID of this peer. */
-  mc_peer_id_t id;
-
-  /* The last sequence we received from this peer. */
-  u32 last_sequence_received;
-
-  mc_stream_peer_stats_t stats, stats_last_clear;
-} mc_stream_peer_t;
-
-typedef struct
-{
-  u32 buffer_index;
-
-  /* Cached copy of local sequence number from buffer. */
-  u32 local_sequence;
-
-  /* Number of times this buffer has been sent (retried). */
-  u32 n_retries;
-
-  /* Previous/next retries in doubly-linked list. */
-  u32 prev_index, next_index;
-
-  /* Bitmap of all peers which have acked this msg */
-  uword *unacked_by_peer_bitmap;
-
-  /* Message send or resend time */
-  f64 sent_at;
-} mc_retry_t;
-
-typedef struct
-{
-  /* Number of retries sent for this stream. */
-  u64 n_retries;
-} mc_stream_stats_t;
-
-struct mc_main_t;
-struct mc_stream_t;
-
-typedef struct
-{
-  /* Stream name. */
-  char *name;
-
-  /* Number of outstanding messages. */
-  u32 window_size;
-
-  /* Retry interval, in seconds */
-  f64 retry_interval;
-
-  /* Retry limit */
-  u32 retry_limit;
-
-  /* User rx buffer callback */
-  void (*rx_buffer) (struct mc_main_t * mc_main,
-                    struct mc_stream_t * stream,
-                    mc_peer_id_t peer_id, u32 buffer_index);
-
-  /* User callback to create a snapshot */
-  u8 *(*catchup_snapshot) (struct mc_main_t * mc_main,
-                          u8 * snapshot_vector,
-                          u32 last_global_sequence_included);
-
-  /* User callback to replay a snapshot */
-  void (*catchup) (struct mc_main_t * mc_main,
-                  u8 * snapshot_data, u32 n_snapshot_data_bytes);
-
-  /* Callback to save a snapshot for offline replay */
-  void (*save_snapshot) (struct mc_main_t * mc_main,
-                        u32 is_catchup,
-                        u8 * snapshot_data, u32 n_snapshot_data_bytes);
-
-  /* Called when a peer dies */
-  void (*peer_died) (struct mc_main_t * mc_main,
-                    struct mc_stream_t * stream, mc_peer_id_t peer_id);
-} mc_stream_config_t;
-
-#define foreach_mc_stream_state                        \
-  _ (invalid)                                  \
-  _ (name_known)                               \
-  _ (join_in_progress)                         \
-  _ (catchup)                                  \
-  _ (ready)
-
-typedef enum
-{
-#define _(f) MC_STREAM_STATE_##f,
-  foreach_mc_stream_state
-#undef _
-} mc_stream_state_t;
-
-typedef struct mc_stream_t
-{
-  mc_stream_config_t config;
-
-  mc_stream_state_t state;
-
-  /* Index in stream pool. */
-  u32 index;
-
-  /* Stream index 0 is always for MC internal use. */
-#define MC_STREAM_INDEX_INTERNAL 0
-
-  mc_retry_t *retry_pool;
-
-  /* Head and tail index of retry pool. */
-  u32 retry_head_index, retry_tail_index;
-
-  /*
-   * Country club for recently retired messages
-   * If the set of peers is expanding and a new peer
-   * misses a message, we can easily retire the FIFO
-   * element before we even know about the new peer
-   */
-  mc_retry_t *retired_fifo;
-
-  /* Hash mapping local sequence to retry pool index. */
-  uword *retry_index_by_local_sequence;
-
-  /* catch-up fifo of VLIB buffer indices.
-     start recording when catching up. */
-  u32 *catchup_fifo;
-
-  mc_stream_stats_t stats, stats_last_clear;
-
-  /* Peer pool. */
-  mc_stream_peer_t *peers;
-
-  /* Bitmap with ones for all peers in peer pool. */
-  uword *all_peer_bitmap;
-
-  /* Map of 64 bit id to index in stream pool. */
-  mhash_t peer_index_by_id;
-
-  /* Timeout, in case we're alone in the world */
-  f64 join_timeout;
-
-  vlib_one_time_waiting_process_t *procs_waiting_for_join_done;
-
-  vlib_one_time_waiting_process_t *procs_waiting_for_open_window;
-
-  /* Next sequence number to use */
-  u32 our_local_sequence;
-
-  /*
-   * Last global sequence we processed.
-   * When supplying catchup data, we need to tell
-   * the client precisely where to start replaying
-   */
-  u32 last_global_sequence_processed;
-
-  /* Vector of unique messages we've sent on this stream. */
-  mc_serialize_stream_msg_t *stream_msgs;
-
-  /* Vector global message index into per stream message index. */
-  u32 *stream_msg_index_by_global_index;
-
-  /* Hashed by message name. */
-  uword *stream_msg_index_by_name;
-
-  u64 user_requests_sent;
-  u64 user_requests_received;
-} mc_stream_t;
-
-always_inline void
-mc_stream_free (mc_stream_t * s)
-{
-  pool_free (s->retry_pool);
-  hash_free (s->retry_index_by_local_sequence);
-  clib_fifo_free (s->catchup_fifo);
-  pool_free (s->peers);
-  mhash_free (&s->peer_index_by_id);
-  vec_free (s->procs_waiting_for_join_done);
-  vec_free (s->procs_waiting_for_open_window);
-}
-
-always_inline void
-mc_stream_init (mc_stream_t * s)
-{
-  memset (s, 0, sizeof (s[0]));
-  s->retry_head_index = s->retry_tail_index = ~0;
-}
-
-typedef struct
-{
-  u32 stream_index;
-  u32 catchup_opaque;
-  u8 *catchup_snapshot;
-} mc_catchup_process_arg_t;
-
-typedef enum
-{
-  MC_RELAY_STATE_NEGOTIATE,
-  MC_RELAY_STATE_MASTER,
-  MC_RELAY_STATE_SLAVE,
-} mc_relay_state_t;
-
-typedef struct
-{
-  mc_peer_id_t peer_id;
-
-  f64 time_last_master_assert_received;
-} mc_mastership_peer_t;
-
-typedef struct
-{
-  u32 stream_index;
-  u32 buffer_index;
-} mc_stream_and_buffer_t;
-
-typedef struct mc_main_t
-{
-  mc_relay_state_t relay_state;
-
-  /* Mastership */
-  u32 we_can_be_relay_master;
-
-  u64 relay_master_peer_id;
-
-  mc_mastership_peer_t *mastership_peers;
-
-  /* Map of 64 bit id to index in stream pool. */
-  mhash_t mastership_peer_index_by_id;
-
-  /* The transport we're using. */
-  mc_transport_t transport;
-
-  /* Last-used global sequence number. */
-  u32 relay_global_sequence;
-
-  /* Vector of streams. */
-  mc_stream_t *stream_vector;
-
-  /* Hash table mapping stream name to pool index. */
-  uword *stream_index_by_name;
-
-  uword *procs_waiting_for_stream_name_by_name;
-
-  vlib_one_time_waiting_process_t **procs_waiting_for_stream_name_pool;
-
-  int joins_in_progress;
-
-  mc_catchup_process_arg_t *catchup_process_args;
-
-  /* Node indices for mastership, join ager,
-     retry and catchup processes. */
-  u32 mastership_process;
-  u32 join_ager_process;
-  u32 retry_process;
-  u32 catchup_process;
-  u32 unserialize_process;
-
-  /* Global vector of messages. */
-  mc_serialize_msg_t **global_msgs;
-
-  /* Hash table mapping message name to index. */
-  uword *global_msg_index_by_name;
-
-  /* Shared serialize/unserialize main. */
-  serialize_main_t serialize_mains[VLIB_N_RX_TX];
-
-  vlib_serialize_buffer_main_t serialize_buffer_mains[VLIB_N_RX_TX];
-
-  /* Convenience variables */
-  struct vlib_main_t *vlib_main;
-  elog_main_t *elog_main;
-
-  /* Maps 64 bit peer id to elog string table offset for this formatted peer id. */
-  mhash_t elog_id_by_peer_id;
-
-  uword *elog_id_by_msg_name;
-
-  /* For mc_unserialize. */
-  mc_stream_and_buffer_t *mc_unserialize_stream_and_buffers;
-} mc_main_t;
-
-always_inline mc_stream_t *
-mc_stream_by_name (mc_main_t * m, char *name)
-{
-  uword *p = hash_get (m->stream_index_by_name, name);
-  return p ? vec_elt_at_index (m->stream_vector, p[0]) : 0;
-}
-
-always_inline mc_stream_t *
-mc_stream_by_index (mc_main_t * m, u32 i)
-{
-  return i < vec_len (m->stream_vector) ? m->stream_vector + i : 0;
-}
-
-always_inline void
-mc_clear_stream_stats (mc_main_t * m)
-{
-  mc_stream_t *s;
-  mc_stream_peer_t *p;
-  vec_foreach (s, m->stream_vector)
-  {
-    s->stats_last_clear = s->stats;
-      /* *INDENT-OFF* */
-      pool_foreach (p, s->peers, ({
-       p->stats_last_clear = p->stats;
-      }));
-      /* *INDENT-ON* */
-  }
-}
-
-/* Declare all message handlers. */
-#define _(f) void mc_msg_##f##_handler (mc_main_t * mcm, mc_msg_##f##_t * msg, u32 buffer_index);
-foreach_mc_msg_type
-#undef _
-  u32 mc_stream_join (mc_main_t * mcm, mc_stream_config_t *);
-
-void mc_stream_leave (mc_main_t * mcm, u32 stream_index);
-
-void mc_wait_for_stream_ready (mc_main_t * m, char *stream_name);
-
-u32 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index);
-
-void mc_main_init (mc_main_t * mcm, char *tag);
-
-void mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master);
-
-void *mc_get_vlib_buffer (struct vlib_main_t *vm, u32 n_bytes,
-                         u32 * bi_return);
-
-format_function_t format_mc_main;
-
-clib_error_t *mc_serialize_internal (mc_main_t * mc,
-                                    u32 stream_index,
-                                    u32 multiple_messages_per_vlib_buffer,
-                                    mc_serialize_msg_t * msg, ...);
-
-clib_error_t *mc_serialize_va (mc_main_t * mc,
-                              u32 stream_index,
-                              u32 multiple_messages_per_vlib_buffer,
-                              mc_serialize_msg_t * msg, va_list * va);
-
-#define mc_serialize_stream(mc,si,msg,args...)                 \
-  mc_serialize_internal((mc),(si),(0),(msg),(msg)->serialize,args)
-
-#define mc_serialize(mc,msg,args...)                           \
-  mc_serialize_internal((mc),(~0),(0),(msg),(msg)->serialize,args)
-
-#define mc_serialize2(mc,add,msg,args...)                              \
-  mc_serialize_internal((mc),(~0),(add),(msg),(msg)->serialize,args)
-
-void mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index);
-uword mc_unserialize_message (mc_main_t * mcm, mc_stream_t * s,
-                             serialize_main_t * m);
-
-serialize_function_t serialize_mc_main, unserialize_mc_main;
-
-always_inline uword
-mc_max_message_size_in_bytes (mc_main_t * mcm)
-{
-  return mcm->transport.max_packet_size - sizeof (mc_msg_user_request_t);
-}
-
-always_inline word
-mc_serialize_n_bytes_left (mc_main_t * mcm, serialize_main_t * m)
-{
-  return mc_max_message_size_in_bytes (mcm) -
-    serialize_vlib_buffer_n_bytes (m);
-}
-
-void unserialize_mc_stream (serialize_main_t * m, va_list * va);
-void mc_stream_join_process_hold (void);
-
-#endif /* included_vlib_mc_h */
-
-/*
- * fd.io coding-style-patch-verification: ON
- *
- * Local Variables:
- * eval: (c-set-style "gnu")
- * End:
- */
index 055998a..981209b 100644 (file)
@@ -43,8 +43,8 @@ vlib_thread_main_t vlib_thread_main;
  * imapacts observed timings.
  */
 
-static u32
-elog_id_for_msg_name (const char *msg_name)
+u32
+elog_global_id_for_msg_name (const char *msg_name)
 {
   uword *p, r;
   static uword *h;
@@ -85,7 +85,8 @@ barrier_trace_sync (f64 t_entry, f64 t_open, f64 t_closed)
 
   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
   ed->count = (int) vlib_worker_threads[0].barrier_sync_count;
-  ed->caller = elog_id_for_msg_name (vlib_worker_threads[0].barrier_caller);
+  ed->caller = elog_global_id_for_msg_name
+    (vlib_worker_threads[0].barrier_caller);
   ed->t_entry = (int) (1000000.0 * t_entry);
   ed->t_open = (int) (1000000.0 * t_open);
   ed->t_closed = (int) (1000000.0 * t_closed);
@@ -111,7 +112,8 @@ barrier_trace_sync_rec (f64 t_entry)
 
   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
   ed->depth = (int) vlib_worker_threads[0].recursion_level - 1;
-  ed->caller = elog_id_for_msg_name (vlib_worker_threads[0].barrier_caller);
+  ed->caller = elog_global_id_for_msg_name
+    (vlib_worker_threads[0].barrier_caller);
 }
 
 static inline void
index 71b5d0c..bb7c164 100644 (file)
@@ -597,6 +597,7 @@ vlib_process_signal_event_mt_helper (vlib_process_signal_event_mt_args_t *
                                     args);
 void vlib_rpc_call_main_thread (void *function, u8 * args, u32 size);
 
+u32 elog_global_id_for_msg_name (const char *msg_name);
 #endif /* included_vlib_threads_h */
 
 /*
index a47d864..86c0247 100644 (file)
@@ -532,29 +532,6 @@ VLIB_CLI_COMMAND (cmd_test_frame_queue_threshold,static) = {
 };
 /* *INDENT-ON* */
 
-static clib_error_t *
-test_threads_barrier_elog_command_fn (vlib_main_t * vm,
-                                     unformat_input_t * input,
-                                     vlib_cli_command_t * cmd)
-{
-  if (unformat (input, "enable"))
-    vlib_worker_threads->barrier_elog_enabled = 1;
-  else if (unformat (input, "disable"))
-    vlib_worker_threads->barrier_elog_enabled = 0;
-  else
-    return clib_error_return (0, "please choose enable or disable");
-  return 0;
-}
-
-/* *INDENT-OFF* */
-VLIB_CLI_COMMAND (test_elog_vector_length_trigger, static) =
-{
-  .path = "test threads barrier-elog",
-  .short_help = "test threads barrier-elog",
-  .function = test_threads_barrier_elog_command_fn,
-};
-/* *INDENT-ON* */
-
 /*
  * fd.io coding-style-patch-verification: ON
  *
index 21c5b2a..ba72f46 100644 (file)
@@ -56,7 +56,6 @@ struct vlib_main_t;
 #include <vlib/counter.h>
 #include <vlib/error.h>
 #include <vlib/init.h>
-#include <vlib/mc.h>
 #include <vlib/node.h>
 #include <vlib/trace.h>
 #include <vlib/log.h>
index 24ec33b..2c1d4e7 100644 (file)
@@ -436,12 +436,8 @@ msg_handler_internal (api_main_t * am,
     vl_msg_api_free (the_msg);
 }
 
-/* set to 1 if you want before/after message handler event logging */
-#define ELOG_API_MESSAGE_HANDLERS 0
-
-#if ELOG_API_MESSAGE_HANDLERS > 0
 static u32
-elog_id_for_msg_name (vlib_main_t * vm, char *msg_name)
+elog_id_for_msg_name (vlib_main_t * vm, const char *msg_name)
 {
   uword *p, r;
   static uword *h;
@@ -461,7 +457,6 @@ elog_id_for_msg_name (vlib_main_t * vm, char *msg_name)
 
   return r;
 }
-#endif
 
 /* This is only to be called from a vlib/vnet app */
 void
@@ -472,26 +467,25 @@ vl_msg_api_handler_with_vm_node (api_main_t * am,
   u16 id = ntohs (*((u16 *) the_msg));
   u8 *(*handler) (void *, void *, void *);
 
-#if ELOG_API_MESSAGE_HANDLERS > 0
-  {
-    /* *INDENT-OFF* */
-    ELOG_TYPE_DECLARE (e) =
+  if (PREDICT_FALSE (vm->elog_trace_api_messages))
+    {
+      /* *INDENT-OFF* */
+      ELOG_TYPE_DECLARE (e) =
+        {
+          .format = "api-msg: %s",
+          .format_args = "T4",
+        };
+      /* *INDENT-ON* */
+      struct
       {
-        .format = "api-msg: %s",
-        .format_args = "T4",
-      };
-    /* *INDENT-ON* */
-    struct
-    {
-      u32 c;
-    } *ed;
-    ed = ELOG_DATA (&vm->elog_main, e);
-    if (id < vec_len (am->msg_names))
-      ed->c = elog_id_for_msg_name (vm, am->msg_names[id]);
-    else
-      ed->c = elog_id_for_msg_name (vm, "BOGUS");
-  }
-#endif
+       u32 c;
+      } *ed;
+      ed = ELOG_DATA (&vm->elog_main, e);
+      if (id < vec_len (am->msg_names))
+       ed->c = elog_id_for_msg_name (vm, (const char *) am->msg_names[id]);
+      else
+       ed->c = elog_id_for_msg_name (vm, "BOGUS");
+    }
 
   if (id < vec_len (am->msg_handlers) && am->msg_handlers[id])
     {
@@ -521,26 +515,25 @@ vl_msg_api_handler_with_vm_node (api_main_t * am,
   if (!(am->message_bounce[id]))
     vl_msg_api_free (the_msg);
 
-#if ELOG_API_MESSAGE_HANDLERS > 0
-  {
-  /* *INDENT-OFF* */
-  ELOG_TYPE_DECLARE (e) = {
-    .format = "api-msg-done: %s",
-    .format_args = "T4",
-  };
-  /* *INDENT-ON* */
-
-    struct
-    {
-      u32 c;
-    } *ed;
-    ed = ELOG_DATA (&vm->elog_main, e);
-    if (id < vec_len (am->msg_names))
-      ed->c = elog_id_for_msg_name (vm, am->msg_names[id]);
-    else
-      ed->c = elog_id_for_msg_name (vm, "BOGUS");
-  }
-#endif
+  if (PREDICT_FALSE (vm->elog_trace_api_messages))
+    {
+      /* *INDENT-OFF* */
+      ELOG_TYPE_DECLARE (e) = {
+        .format = "api-msg-done: %s",
+        .format_args = "T4",
+      };
+      /* *INDENT-ON* */
+
+      struct
+      {
+       u32 c;
+      } *ed;
+      ed = ELOG_DATA (&vm->elog_main, e);
+      if (id < vec_len (am->msg_names))
+       ed->c = elog_id_for_msg_name (vm, (const char *) am->msg_names[id]);
+      else
+       ed->c = elog_id_for_msg_name (vm, "BOGUS");
+    }
 }
 
 void
index 5cbbbf9..45a265c 100644 (file)
@@ -102,52 +102,6 @@ unserialize_vec_vnet_sw_hw_interface_state (serialize_main_t * m,
     }
 }
 
-static void
-serialize_vnet_sw_hw_interface_set_flags (serialize_main_t * m, va_list * va)
-{
-  vnet_sw_hw_interface_state_t *s =
-    va_arg (*va, vnet_sw_hw_interface_state_t *);
-  serialize (m, serialize_vec_vnet_sw_hw_interface_state, s, 1);
-}
-
-static void
-unserialize_vnet_sw_interface_set_flags (serialize_main_t * m, va_list * va)
-{
-  CLIB_UNUSED (mc_main_t * mc) = va_arg (*va, mc_main_t *);
-  vnet_sw_hw_interface_state_t s;
-
-  unserialize (m, unserialize_vec_vnet_sw_hw_interface_state, &s, 1);
-
-  vnet_sw_interface_set_flags_helper
-    (vnet_get_main (), s.sw_hw_if_index, s.flags,
-     /* helper_flags no redistribution */ 0);
-}
-
-static void
-unserialize_vnet_hw_interface_set_flags (serialize_main_t * m, va_list * va)
-{
-  CLIB_UNUSED (mc_main_t * mc) = va_arg (*va, mc_main_t *);
-  vnet_sw_hw_interface_state_t s;
-
-  unserialize (m, unserialize_vec_vnet_sw_hw_interface_state, &s, 1);
-
-  vnet_hw_interface_set_flags_helper
-    (vnet_get_main (), s.sw_hw_if_index, s.flags,
-     /* helper_flags no redistribution */ 0);
-}
-
-MC_SERIALIZE_MSG (vnet_sw_interface_set_flags_msg, static) =
-{
-.name = "vnet_sw_interface_set_flags",.serialize =
-    serialize_vnet_sw_hw_interface_set_flags,.unserialize =
-    unserialize_vnet_sw_interface_set_flags,};
-
-MC_SERIALIZE_MSG (vnet_hw_interface_set_flags_msg, static) =
-{
-.name = "vnet_hw_interface_set_flags",.serialize =
-    serialize_vnet_sw_hw_interface_set_flags,.unserialize =
-    unserialize_vnet_hw_interface_set_flags,};
-
 void
 serialize_vnet_interface_state (serialize_main_t * m, va_list * va)
 {
@@ -311,9 +265,6 @@ vnet_hw_interface_set_flags_helper (vnet_main_t * vnm, u32 hw_if_index,
   vnet_hw_interface_t *hi = vnet_get_hw_interface (vnm, hw_if_index);
   vnet_hw_interface_class_t *hw_class =
     vnet_get_hw_interface_class (vnm, hi->hw_class_index);
-  vnet_device_class_t *dev_class =
-    vnet_get_device_class (vnm, hi->dev_class_index);
-  vlib_main_t *vm = vnm->vlib_main;
   u32 mask;
   clib_error_t *error = 0;
   u32 is_create =
@@ -332,19 +283,6 @@ vnet_hw_interface_set_flags_helper (vnet_main_t * vnm, u32 hw_if_index,
   if (!is_create && (hi->flags & mask) == flags)
     goto done;
 
-  /* Some interface classes do not redistribute (e.g. are local). */
-  if (!dev_class->redistribute)
-    helper_flags &= ~VNET_INTERFACE_SET_FLAGS_HELPER_WANT_REDISTRIBUTE;
-
-  if (vm->mc_main
-      && (helper_flags & VNET_INTERFACE_SET_FLAGS_HELPER_WANT_REDISTRIBUTE))
-    {
-      vnet_sw_hw_interface_state_t s;
-      s.sw_hw_if_index = hw_if_index;
-      s.flags = flags;
-      mc_serialize (vm->mc_main, &vnet_hw_interface_set_flags_msg, &s);
-    }
-
   if ((hi->flags & VNET_HW_INTERFACE_FLAG_LINK_UP) !=
       (flags & VNET_HW_INTERFACE_FLAG_LINK_UP))
     {
@@ -373,7 +311,6 @@ vnet_sw_interface_set_flags_helper (vnet_main_t * vnm, u32 sw_if_index,
                                    u32 flags, u32 helper_flags)
 {
   vnet_sw_interface_t *si = vnet_get_sw_interface (vnm, sw_if_index);
-  vlib_main_t *vm = vnm->vlib_main;
   u32 mask;
   clib_error_t *error = 0;
   u32 is_create =
@@ -451,16 +388,6 @@ vnet_sw_interface_set_flags_helper (vnet_main_t * vnm, u32 sw_if_index,
              ~VNET_INTERFACE_SET_FLAGS_HELPER_WANT_REDISTRIBUTE;
        }
 
-      if (vm->mc_main
-         && (helper_flags &
-             VNET_INTERFACE_SET_FLAGS_HELPER_WANT_REDISTRIBUTE))
-       {
-         vnet_sw_hw_interface_state_t s;
-         s.sw_hw_if_index = sw_if_index;
-         s.flags = flags;
-         mc_serialize (vm->mc_main, &vnet_sw_interface_set_flags_msg, &s);
-       }
-
       /* set the flags now before invoking the registered clients
        * so that the state they query is consistent with the state here notified */
       old_flags = si->flags;
@@ -1110,42 +1037,6 @@ vnet_sw_interface_walk (vnet_main_t * vnm,
   /* *INDENT-ON* */
 }
 
-static void
-serialize_vnet_hw_interface_set_class (serialize_main_t * m, va_list * va)
-{
-  u32 hw_if_index = va_arg (*va, u32);
-  char *hw_class_name = va_arg (*va, char *);
-  serialize_integer (m, hw_if_index, sizeof (hw_if_index));
-  serialize_cstring (m, hw_class_name);
-}
-
-static void
-unserialize_vnet_hw_interface_set_class (serialize_main_t * m, va_list * va)
-{
-  CLIB_UNUSED (mc_main_t * mc) = va_arg (*va, mc_main_t *);
-  vnet_main_t *vnm = vnet_get_main ();
-  u32 hw_if_index;
-  char *hw_class_name;
-  uword *p;
-  clib_error_t *error;
-
-  unserialize_integer (m, &hw_if_index, sizeof (hw_if_index));
-  unserialize_cstring (m, &hw_class_name);
-  p =
-    hash_get (vnm->interface_main.hw_interface_class_by_name, hw_class_name);
-  ASSERT (p != 0);
-  error = vnet_hw_interface_set_class_helper (vnm, hw_if_index, p[0],
-                                             /* redistribute */ 0);
-  if (error)
-    clib_error_report (error);
-}
-
-MC_SERIALIZE_MSG (vnet_hw_interface_set_class_msg, static) =
-{
-.name = "vnet_hw_interface_set_class",.serialize =
-    serialize_vnet_hw_interface_set_class,.unserialize =
-    unserialize_vnet_hw_interface_set_class,};
-
 void
 vnet_hw_interface_init_for_class (vnet_main_t * vnm, u32 hw_if_index,
                                  u32 hw_class_index, u32 hw_instance)
@@ -1197,13 +1088,6 @@ vnet_hw_interface_set_class_helper (vnet_main_t * vnm, u32 hw_if_index,
                                  "%v class cannot be changed from %s to %s",
                                  hi->name, old_class->name, new_class->name);
 
-      if (vnm->vlib_main->mc_main)
-       {
-         mc_serialize (vnm->vlib_main->mc_main,
-                       &vnet_hw_interface_set_class_msg, hw_if_index,
-                       new_class->name);
-         return 0;
-       }
     }
 
   if (old_class->hw_class_change)
@@ -1241,11 +1125,6 @@ vnet_hw_interface_rx_redirect_to_node_helper (vnet_main_t * vnm,
   vnet_device_class_t *dev_class = vnet_get_device_class
     (vnm, hi->dev_class_index);
 
-  if (redistribute)
-    {
-      /* $$$$ fixme someday maybe */
-      ASSERT (vnm->vlib_main->mc_main == 0);
-    }
   if (dev_class->rx_redirect_to_node)
     {
       dev_class->rx_redirect_to_node (vnm, hw_if_index, node_index);
index a6feec5..bc89a08 100644 (file)
@@ -40,7 +40,6 @@
 #ifndef included_ip_ip6_h
 #define included_ip_ip6_h
 
-#include <vlib/mc.h>
 #include <vlib/buffer.h>
 #include <vnet/ethernet/packet.h>
 #include <vnet/ip/ip6_packet.h>
index 735f960..bac2e75 100644 (file)
@@ -111,63 +111,14 @@ void unserialize_srp_main (serialize_main_t * m, va_list * va)
     }
 }
 
-static void serialize_srp_register_interface_msg (serialize_main_t * m, va_list * va)
-{
-  u32 * hw_if_indices = va_arg (*va, u32 *);
-  serialize_integer (m, hw_if_indices[SRP_SIDE_A], sizeof (hw_if_indices[SRP_SIDE_A]));
-  serialize_integer (m, hw_if_indices[SRP_SIDE_B], sizeof (hw_if_indices[SRP_SIDE_B]));
-}
-
-static void unserialize_srp_register_interface_msg (serialize_main_t * m, va_list * va)
-{
-  CLIB_UNUSED (mc_main_t * mcm) = va_arg (*va, mc_main_t *);
-  u32 hw_if_indices[SRP_N_SIDE];
-  srp_main_t * sm = &srp_main;
-  uword * p;
-
-  unserialize_integer (m, &hw_if_indices[SRP_SIDE_A], sizeof (hw_if_indices[SRP_SIDE_A]));
-  unserialize_integer (m, &hw_if_indices[SRP_SIDE_B], sizeof (hw_if_indices[SRP_SIDE_B]));
-
-  p = hash_get (sm->srp_register_interface_waiting_process_pool_index_by_hw_if_index,
-               hw_if_indices[0]);
-  if (p)
-    {
-      vlib_one_time_waiting_process_t * wp = pool_elt_at_index (sm->srp_register_interface_waiting_process_pool, p[0]);
-      vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
-      pool_put (sm->srp_register_interface_waiting_process_pool, wp);
-      hash_unset (sm->srp_register_interface_waiting_process_pool_index_by_hw_if_index,
-                 hw_if_indices[0]);
-    }
-  else
-    srp_register_interface_helper (hw_if_indices, /* redistribute */ 0);
-}
-
-MC_SERIALIZE_MSG (srp_register_interface_msg, static) = {
-  .name = "vnet_srp_register_interface",
-  .serialize = serialize_srp_register_interface_msg,
-  .unserialize = unserialize_srp_register_interface_msg,
-};
-
 static void srp_register_interface_helper (u32 * hw_if_indices_by_side, u32 redistribute)
 {
   vnet_main_t * vnm = vnet_get_main();
   srp_main_t * sm = &srp_main;
-  vlib_main_t * vm = sm->vlib_main;
   srp_interface_t * si;
   vnet_hw_interface_t * hws[SRP_N_RING];
   uword s, * p;
 
-  if (vm->mc_main && redistribute)
-    {
-      vlib_one_time_waiting_process_t * wp;
-      mc_serialize (vm->mc_main, &srp_register_interface_msg, hw_if_indices_by_side);
-      pool_get (sm->srp_register_interface_waiting_process_pool, wp);
-      hash_set (sm->srp_register_interface_waiting_process_pool_index_by_hw_if_index,
-               hw_if_indices_by_side[0],
-               wp - sm->srp_register_interface_waiting_process_pool);
-      vlib_current_process_wait_for_one_time_event (vm, wp);
-    }
-
   /* Check if interface has already been registered. */
   p = hash_get (sm->interface_index_by_hw_if_index, hw_if_indices_by_side[0]);
   if (p)
@@ -298,36 +249,6 @@ VNET_HW_INTERFACE_CLASS (srp_hw_interface_class) = {
   .hw_class_change = srp_interface_hw_class_change,
 };
 
-static void serialize_srp_interface_config_msg (serialize_main_t * m, va_list * va)
-{
-  srp_interface_t * si = va_arg (*va, srp_interface_t *);
-  srp_main_t * sm = &srp_main;
-
-  ASSERT (! pool_is_free (sm->interface_pool, si));
-  serialize_integer (m, si - sm->interface_pool, sizeof (u32));
-  serialize (m, serialize_f64, si->config.wait_to_restore_idle_delay);
-  serialize (m, serialize_f64, si->config.ips_tx_interval);
-}
-
-static void unserialize_srp_interface_config_msg (serialize_main_t * m, va_list * va)
-{
-  CLIB_UNUSED (mc_main_t * mcm) = va_arg (*va, mc_main_t *);
-  srp_main_t * sm = &srp_main;
-  srp_interface_t * si;
-  u32 si_index;
-
-  unserialize_integer (m, &si_index, sizeof (u32));
-  si = pool_elt_at_index (sm->interface_pool, si_index);
-  unserialize (m, unserialize_f64, &si->config.wait_to_restore_idle_delay);
-  unserialize (m, unserialize_f64, &si->config.ips_tx_interval);
-}
-
-MC_SERIALIZE_MSG (srp_interface_config_msg, static) = {
-  .name = "vnet_srp_interface_config",
-  .serialize = serialize_srp_interface_config_msg,
-  .unserialize = unserialize_srp_interface_config_msg,
-};
-
 void srp_interface_get_interface_config (u32 hw_if_index, srp_interface_config_t * c)
 {
   srp_interface_t * si = srp_get_interface_from_vnet_hw_interface (hw_if_index);
@@ -337,22 +258,17 @@ void srp_interface_get_interface_config (u32 hw_if_index, srp_interface_config_t
 
 void srp_interface_set_interface_config (u32 hw_if_index, srp_interface_config_t * c)
 {
-  srp_main_t * sm = &srp_main;
-  vlib_main_t * vm = sm->vlib_main;
   srp_interface_t * si = srp_get_interface_from_vnet_hw_interface (hw_if_index);
   ASSERT (si != 0);
   if (memcmp (&si->config, &c[0], sizeof (c[0])))
     {
       si->config = c[0];
-      if (vm->mc_main)
-       mc_serialize (vm->mc_main, &srp_interface_config_msg, si);
     }
 }
 
 #if DEBUG > 0
 
 #define VNET_SIMULATED_SRP_TX_NEXT_SRP_INPUT VNET_INTERFACE_TX_N_NEXT
-
 /* Echo packets back to srp input. */
 static uword
 simulated_srp_interface_tx (vlib_main_t * vm,
index acb770e..25cf0c0 100644 (file)
@@ -640,63 +640,6 @@ static void tx_ips_packet (srp_interface_t * si,
   vlib_put_frame_to_node (vm, hi->output_node_index, f);
 }
 
-static void serialize_srp_interface_state_msg (serialize_main_t * m, va_list * va)
-{
-  srp_interface_t * si = va_arg (*va, srp_interface_t *);
-  srp_main_t * sm = &srp_main;
-  int r;
-
-  ASSERT (! pool_is_free (sm->interface_pool, si));
-  serialize_integer (m, si - sm->interface_pool, sizeof (u32));
-  serialize_likely_small_unsigned_integer (m, si->current_ips_state);
-  for (r = 0; r < SRP_N_RING; r++)
-    {
-      srp_interface_ring_t * ir = &si->rings[r];
-      void * p;
-      serialize_likely_small_unsigned_integer (m, ir->rx_neighbor_address_valid);
-      if (ir->rx_neighbor_address_valid)
-       {
-         p = serialize_get (m, sizeof (ir->rx_neighbor_address));
-         clib_memcpy (p, ir->rx_neighbor_address, sizeof (ir->rx_neighbor_address));
-       }
-      serialize_likely_small_unsigned_integer (m, ir->waiting_to_restore);
-      if (ir->waiting_to_restore)
-       serialize (m, serialize_f64, ir->wait_to_restore_start_time);
-    }
-}
-
-static void unserialize_srp_interface_state_msg (serialize_main_t * m, va_list * va)
-{
-  CLIB_UNUSED (mc_main_t * mcm) = va_arg (*va, mc_main_t *);
-  srp_main_t * sm = &srp_main;
-  srp_interface_t * si;
-  u32 si_index, r;
-
-  unserialize_integer (m, &si_index, sizeof (u32));
-  si = pool_elt_at_index (sm->interface_pool, si_index);
-  si->current_ips_state = unserialize_likely_small_unsigned_integer (m);
-  for (r = 0; r < SRP_N_RING; r++)
-    {
-      srp_interface_ring_t * ir = &si->rings[r];
-      void * p;
-      ir->rx_neighbor_address_valid = unserialize_likely_small_unsigned_integer (m);
-      if (ir->rx_neighbor_address_valid)
-       {
-         p = unserialize_get (m, sizeof (ir->rx_neighbor_address));
-         clib_memcpy (ir->rx_neighbor_address, p, sizeof (ir->rx_neighbor_address));
-       }
-      ir->waiting_to_restore = unserialize_likely_small_unsigned_integer (m);
-      if (ir->waiting_to_restore)
-       unserialize (m, unserialize_f64, &ir->wait_to_restore_start_time);
-    }
-}
-
-MC_SERIALIZE_MSG (srp_interface_state_msg, static) = {
-  .name = "vnet_srp_interface_state",
-  .serialize = serialize_srp_interface_state_msg,
-  .unserialize = unserialize_srp_interface_state_msg,
-};
-
 static int requests_switch (srp_ips_request_type_t r)
 {
   static u8 t[16] = {
@@ -716,7 +659,6 @@ void srp_ips_rx_packet (u32 sw_if_index, srp_ips_header_t * h)
   srp_ring_type_t rx_ring;
   srp_interface_t * si = srp_get_interface (sw_if_index, &rx_ring);
   srp_interface_ring_t * ir = &si->rings[rx_ring];
-  int si_needs_broadcast = 0;
 
   /* FIXME trace. */
   if (0)
@@ -751,7 +693,6 @@ void srp_ips_rx_packet (u32 sw_if_index, srp_ips_header_t * h)
        {
          srp_ips_header_t to_tx[2];
 
-         si_needs_broadcast = 1;
          si->current_ips_state = SRP_IPS_STATE_wrapped;
          si->hw_wrap_function (si->rings[SRP_SIDE_A].hw_if_index, /* enable_wrap */ 1);
          si->hw_wrap_function (si->rings[SRP_SIDE_B].hw_if_index, /* enable_wrap */ 1);
@@ -775,7 +716,6 @@ void srp_ips_rx_packet (u32 sw_if_index, srp_ips_header_t * h)
          && h->request_type == SRP_IPS_REQUEST_idle
          && h->status == SRP_IPS_STATUS_idle)
        {
-         si_needs_broadcast = 1;
          si->current_ips_state = SRP_IPS_STATE_idle;
          si->hw_wrap_function (si->rings[SRP_SIDE_A].hw_if_index, /* enable_wrap */ 0);
          si->hw_wrap_function (si->rings[SRP_SIDE_B].hw_if_index, /* enable_wrap */ 0);
@@ -790,10 +730,8 @@ void srp_ips_rx_packet (u32 sw_if_index, srp_ips_header_t * h)
       abort ();
       break;
     }
-
  done:
-  if (vm->mc_main && si_needs_broadcast)
-    mc_serialize (vm->mc_main, &srp_interface_state_msg, si);
+  ;
 }
 
 /* Preform local IPS request on given interface. */
@@ -801,11 +739,9 @@ void srp_ips_local_request (u32 sw_if_index, srp_ips_request_type_t request)
 {
   vnet_main_t * vnm = vnet_get_main();
   srp_main_t * sm = &srp_main;
-  vlib_main_t * vm = sm->vlib_main;
   srp_ring_type_t rx_ring;
   srp_interface_t * si = srp_get_interface (sw_if_index, &rx_ring);
   srp_interface_ring_t * ir = &si->rings[rx_ring];
-  int si_needs_broadcast = 0;
 
   if (request == SRP_IPS_REQUEST_wait_to_restore)
     {
@@ -815,13 +751,11 @@ void srp_ips_local_request (u32 sw_if_index, srp_ips_request_type_t request)
        {
          ir->wait_to_restore_start_time = vlib_time_now (sm->vlib_main);
          ir->waiting_to_restore = 1;
-         si_needs_broadcast = 1;
        }
     }
   else
     {
       /* FIXME handle local signal fail. */
-      si_needs_broadcast = ir->waiting_to_restore;
       ir->wait_to_restore_start_time = 0;
       ir->waiting_to_restore = 0;
     }
@@ -832,8 +766,6 @@ void srp_ips_local_request (u32 sw_if_index, srp_ips_request_type_t request)
                  format_vnet_sw_if_index_name, vnm, sw_if_index,
                  format_srp_ips_request_type, request);
 
-  if (vm->mc_main && si_needs_broadcast)
-    mc_serialize (vm->mc_main, &srp_interface_state_msg, si);
 }
 
 static void maybe_send_ips_message (srp_interface_t * si)