vlib: don't leak node frames on refork
[vpp.git] / src / vlib / main.c
index 02fdc89..964bc4a 100644 (file)
 #include <vppinfra/format.h>
 #include <vlib/vlib.h>
 #include <vlib/threads.h>
+#include <vlib/stats/stats.h>
 #include <vppinfra/tw_timer_1t_3w_1024sl_ov.h>
 
 #include <vlib/unix/unix.h>
 
-/* Actually allocate a few extra slots of vector data to support
-   speculative vector enqueues which overflow vector data in next frame. */
-#define VLIB_FRAME_SIZE_ALLOC (VLIB_FRAME_SIZE + 4)
-
-always_inline u32
-vlib_frame_bytes (u32 n_scalar_bytes, u32 n_vector_bytes)
-{
-  u32 n_bytes;
-
-  /* Make room for vlib_frame_t plus scalar arguments. */
-  n_bytes = vlib_frame_vector_byte_offset (n_scalar_bytes);
-
-  /* Make room for vector arguments.
-     Allocate a few extra slots of vector data to support
-     speculative vector enqueues which overflow vector data in next frame. */
-#define VLIB_FRAME_SIZE_EXTRA 4
-  n_bytes += (VLIB_FRAME_SIZE + VLIB_FRAME_SIZE_EXTRA) * n_vector_bytes;
-
-  /* Magic number is first 32bit number after vector data.
-     Used to make sure that vector data is never overrun. */
 #define VLIB_FRAME_MAGIC (0xabadc0ed)
-  n_bytes += sizeof (u32);
-
-  /* Pad to cache line. */
-  n_bytes = round_pow2 (n_bytes, CLIB_CACHE_LINE_BYTES);
-
-  return n_bytes;
-}
 
 always_inline u32 *
 vlib_frame_find_magic (vlib_frame_t * f, vlib_node_t * node)
 {
-  void *p = f;
-
-  p += vlib_frame_vector_byte_offset (node->scalar_size);
-
-  p += (VLIB_FRAME_SIZE + VLIB_FRAME_SIZE_EXTRA) * node->vector_size;
-
-  return p;
-}
-
-static inline vlib_frame_size_t *
-get_frame_size_info (vlib_node_main_t * nm,
-                    u32 n_scalar_bytes, u32 n_vector_bytes)
-{
-#ifdef VLIB_SUPPORTS_ARBITRARY_SCALAR_SIZES
-  uword key = (n_scalar_bytes << 16) | n_vector_bytes;
-  uword *p, i;
-
-  p = hash_get (nm->frame_size_hash, key);
-  if (p)
-    i = p[0];
-  else
-    {
-      i = vec_len (nm->frame_sizes);
-      vec_validate (nm->frame_sizes, i);
-      hash_set (nm->frame_size_hash, key, i);
-    }
-
-  return vec_elt_at_index (nm->frame_sizes, i);
-#else
-  ASSERT (vlib_frame_bytes (n_scalar_bytes, n_vector_bytes)
-         == (vlib_frame_bytes (0, 4)));
-  return vec_elt_at_index (nm->frame_sizes, 0);
-#endif
+  return (void *) f + node->magic_offset;
 }
 
 static vlib_frame_t *
@@ -120,31 +62,35 @@ vlib_frame_alloc_to_node (vlib_main_t * vm, u32 to_node_index,
   vlib_frame_size_t *fs;
   vlib_node_t *to_node;
   vlib_frame_t *f;
-  u32 l, n, scalar_size, vector_size;
+  u32 l, n;
 
   ASSERT (vm == vlib_get_main ());
 
   to_node = vlib_get_node (vm, to_node_index);
 
-  scalar_size = to_node->scalar_size;
-  vector_size = to_node->vector_size;
+  vec_validate (nm->frame_sizes, to_node->frame_size_index);
+  fs = vec_elt_at_index (nm->frame_sizes, to_node->frame_size_index);
+
+  if (fs->frame_size == 0)
+    fs->frame_size = to_node->frame_size;
+  else
+    ASSERT (fs->frame_size == to_node->frame_size);
 
-  fs = get_frame_size_info (nm, scalar_size, vector_size);
-  n = vlib_frame_bytes (scalar_size, vector_size);
+  n = fs->frame_size;
   if ((l = vec_len (fs->free_frames)) > 0)
     {
       /* Allocate from end of free list. */
       f = fs->free_frames[l - 1];
-      _vec_len (fs->free_frames) = l - 1;
+      vec_set_len (fs->free_frames, l - 1);
     }
   else
     {
-      f = clib_mem_alloc_aligned_no_fail (n, VLIB_FRAME_ALIGN);
+      f = clib_mem_alloc_aligned_no_fail (n, CLIB_CACHE_LINE_BYTES);
     }
 
   /* Poison frame when debugging. */
   if (CLIB_DEBUG > 0)
-    clib_memset (f, 0xfe, n);
+    clib_memset_u8 (f, 0xfe, n);
 
   /* Insert magic number. */
   {
@@ -156,9 +102,11 @@ vlib_frame_alloc_to_node (vlib_main_t * vm, u32 to_node_index,
 
   f->frame_flags = VLIB_FRAME_IS_ALLOCATED | frame_flags;
   f->n_vectors = 0;
-  f->scalar_size = scalar_size;
-  f->vector_size = vector_size;
+  f->scalar_offset = to_node->scalar_offset;
+  f->vector_offset = to_node->vector_offset;
+  f->aux_offset = to_node->aux_offset;
   f->flags = 0;
+  f->frame_size_index = to_node->frame_size_index;
 
   fs->n_alloc_frames += 1;
 
@@ -239,17 +187,15 @@ vlib_put_frame_to_node (vlib_main_t * vm, u32 to_node_index, vlib_frame_t * f)
 
 /* Free given frame. */
 void
-vlib_frame_free (vlib_main_t * vm, vlib_node_runtime_t * r, vlib_frame_t * f)
+vlib_frame_free (vlib_main_t *vm, vlib_frame_t *f)
 {
   vlib_node_main_t *nm = &vm->node_main;
-  vlib_node_t *node;
   vlib_frame_size_t *fs;
 
   ASSERT (vm == vlib_get_main ());
   ASSERT (f->frame_flags & VLIB_FRAME_IS_ALLOCATED);
 
-  node = vlib_get_node (vm, r->node_index);
-  fs = get_frame_size_info (nm, node->scalar_size, node->vector_size);
+  fs = vec_elt_at_index (nm->frame_sizes, f->frame_size_index);
 
   ASSERT (f->frame_flags & VLIB_FRAME_IS_ALLOCATED);
 
@@ -261,6 +207,7 @@ vlib_frame_free (vlib_main_t * vm, vlib_node_runtime_t * r, vlib_frame_t * f)
     }
 
   f->frame_flags &= ~(VLIB_FRAME_IS_ALLOCATED | VLIB_FRAME_NO_APPEND);
+  f->flags = 0;
 
   vec_add1 (fs->free_frames, f);
   ASSERT (fs->n_alloc_frames > 0);
@@ -271,19 +218,24 @@ static clib_error_t *
 show_frame_stats (vlib_main_t * vm,
                  unformat_input_t * input, vlib_cli_command_t * cmd)
 {
-  vlib_node_main_t *nm = &vm->node_main;
   vlib_frame_size_t *fs;
 
-  vlib_cli_output (vm, "%=6s%=12s%=12s", "Size", "# Alloc", "# Free");
-  vec_foreach (fs, nm->frame_sizes)
-  {
-    u32 n_alloc = fs->n_alloc_frames;
-    u32 n_free = vec_len (fs->free_frames);
+  vlib_cli_output (vm, "%=8s%=6s%=12s%=12s", "Thread", "Size", "# Alloc",
+                  "# Free");
+  foreach_vlib_main ()
+    {
+      vlib_node_main_t *nm = &this_vlib_main->node_main;
+      vec_foreach (fs, nm->frame_sizes)
+       {
+         u32 n_alloc = fs->n_alloc_frames;
+         u32 n_free = vec_len (fs->free_frames);
 
-    if (n_alloc + n_free > 0)
-      vlib_cli_output (vm, "%=6d%=12d%=12d",
-                      fs - nm->frame_sizes, n_alloc, n_free);
-  }
+         if (n_alloc + n_free > 0)
+           vlib_cli_output (vm, "%=8d%=6d%=12d%=12d",
+                            this_vlib_main->thread_index, fs->frame_size,
+                            n_alloc, n_free);
+       }
+    }
 
   return 0;
 }
@@ -525,12 +477,8 @@ vlib_put_next_frame (vlib_main_t * vm,
       if (!(f->frame_flags & VLIB_FRAME_PENDING))
        {
          __attribute__ ((unused)) vlib_node_t *node;
-         vlib_node_t *next_node;
-         vlib_node_runtime_t *next_runtime;
 
          node = vlib_get_node (vm, r->node_index);
-         next_node = vlib_get_next_node (vm, r->node_index, next_index);
-         next_runtime = vlib_node_get_runtime (vm, next_node->index);
 
          vec_add2 (nm->pending_frames, p, 1);
 
@@ -539,18 +487,6 @@ vlib_put_next_frame (vlib_main_t * vm,
          p->next_frame_index = nf - nm->next_frames;
          nf->flags |= VLIB_FRAME_PENDING;
          f->frame_flags |= VLIB_FRAME_PENDING;
-
-         /*
-          * If we're going to dispatch this frame on another thread,
-          * force allocation of a new frame. Otherwise, we create
-          * a dangling frame reference. Each thread has its own copy of
-          * the next_frames vector.
-          */
-         if (0 && r->thread_index != next_runtime->thread_index)
-           {
-             nf->frame = NULL;
-             nf->flags &= ~(VLIB_FRAME_PENDING | VLIB_FRAME_IS_ALLOCATED);
-           }
        }
 
       /* Copy trace flag from next_frame and from runtime. */
@@ -570,13 +506,11 @@ vlib_put_next_frame (vlib_main_t * vm,
 }
 
 /* Sync up runtime (32 bit counters) and main node stats (64 bit counters). */
-never_inline void
-vlib_node_runtime_sync_stats (vlib_main_t * vm,
-                             vlib_node_runtime_t * r,
-                             uword n_calls, uword n_vectors, uword n_clocks)
+void
+vlib_node_runtime_sync_stats_node (vlib_node_t *n, vlib_node_runtime_t *r,
+                                  uword n_calls, uword n_vectors,
+                                  uword n_clocks)
 {
-  vlib_node_t *n = vlib_get_node (vm, r->node_index);
-
   n->stats_total.calls += n_calls + r->calls_since_last_overflow;
   n->stats_total.vectors += n_vectors + r->vectors_since_last_overflow;
   n->stats_total.clocks += n_clocks + r->clocks_since_last_overflow;
@@ -588,6 +522,14 @@ vlib_node_runtime_sync_stats (vlib_main_t * vm,
   r->clocks_since_last_overflow = 0;
 }
 
+void
+vlib_node_runtime_sync_stats (vlib_main_t *vm, vlib_node_runtime_t *r,
+                             uword n_calls, uword n_vectors, uword n_clocks)
+{
+  vlib_node_t *n = vlib_get_node (vm, r->node_index);
+  vlib_node_runtime_sync_stats_node (n, r, n_calls, n_vectors, n_clocks);
+}
+
 always_inline void __attribute__ ((unused))
 vlib_process_sync_stats (vlib_main_t * vm,
                         vlib_process_t * p,
@@ -608,7 +550,7 @@ vlib_node_sync_stats (vlib_main_t * vm, vlib_node_t * n)
   if (n->type == VLIB_NODE_TYPE_PROCESS)
     {
       /* Nothing to do for PROCESS nodes except in main thread */
-      if (vm != &vlib_global_main)
+      if (vm != vlib_get_first_main ())
        return;
 
       vlib_process_t *p = vlib_get_process_from_node (vm, n);
@@ -688,7 +630,7 @@ static clib_error_t *
 vlib_cli_elog_clear (vlib_main_t * vm,
                     unformat_input_t * input, vlib_cli_command_t * cmd)
 {
-  elog_reset_buffer (&vm->elog_main);
+  elog_reset_buffer (&vlib_global_main.elog_main);
   return 0;
 }
 
@@ -705,7 +647,7 @@ static clib_error_t *
 elog_save_buffer (vlib_main_t * vm,
                  unformat_input_t * input, vlib_cli_command_t * cmd)
 {
-  elog_main_t *em = &vm->elog_main;
+  elog_main_t *em = &vlib_global_main.elog_main;
   char *file, *chroot_file;
   clib_error_t *error = 0;
 
@@ -741,10 +683,10 @@ elog_save_buffer (vlib_main_t * vm,
 void
 vlib_post_mortem_dump (void)
 {
-  vlib_main_t *vm = &vlib_global_main;
+  vlib_global_main_t *vgm = vlib_get_global_main ();
 
-  for (int i = 0; i < vec_len (vm->post_mortem_callbacks); i++)
-    (vm->post_mortem_callbacks[i]) ();
+  for (int i = 0; i < vec_len (vgm->post_mortem_callbacks); i++)
+    (vgm->post_mortem_callbacks[i]) ();
 }
 
 /* *INDENT-OFF* */
@@ -759,7 +701,7 @@ static clib_error_t *
 elog_stop (vlib_main_t * vm,
           unformat_input_t * input, vlib_cli_command_t * cmd)
 {
-  elog_main_t *em = &vm->elog_main;
+  elog_main_t *em = &vlib_global_main.elog_main;
 
   em->n_total_events_disable_limit = em->n_total_events;
 
@@ -779,7 +721,7 @@ static clib_error_t *
 elog_restart (vlib_main_t * vm,
              unformat_input_t * input, vlib_cli_command_t * cmd)
 {
-  elog_main_t *em = &vm->elog_main;
+  elog_main_t *em = &vlib_global_main.elog_main;
 
   em->n_total_events_disable_limit = ~0;
 
@@ -799,11 +741,11 @@ static clib_error_t *
 elog_resize_command_fn (vlib_main_t * vm,
                        unformat_input_t * input, vlib_cli_command_t * cmd)
 {
-  elog_main_t *em = &vm->elog_main;
+  elog_main_t *em = &vlib_global_main.elog_main;
   u32 tmp;
 
   /* Stop the parade */
-  elog_reset_buffer (&vm->elog_main);
+  elog_reset_buffer (em);
 
   if (unformat (input, "%d", &tmp))
     {
@@ -830,7 +772,7 @@ VLIB_CLI_COMMAND (elog_resize_cli, static) = {
 static void
 elog_show_buffer_internal (vlib_main_t * vm, u32 n_events_to_show)
 {
-  elog_main_t *em = &vm->elog_main;
+  elog_main_t *em = &vlib_global_main.elog_main;
   elog_event_t *e, *es;
   f64 dt;
 
@@ -895,8 +837,8 @@ vlib_elog_main_loop_event (vlib_main_t * vm,
                           u32 node_index,
                           u64 time, u32 n_vectors, u32 is_return)
 {
-  vlib_main_t *evm = &vlib_global_main;
-  elog_main_t *em = &evm->elog_main;
+  vlib_main_t *evm = vlib_get_first_main ();
+  elog_main_t *em = vlib_get_elog_main ();
   int enabled = evm->elog_trace_graph_dispatch |
     evm->elog_trace_graph_circuit;
 
@@ -928,29 +870,14 @@ vlib_elog_main_loop_event (vlib_main_t * vm,
     }
 }
 
-#if VLIB_BUFFER_TRACE_TRAJECTORY > 0
-void (*vlib_buffer_trace_trajectory_cb) (vlib_buffer_t * b, u32 node_index);
-void (*vlib_buffer_trace_trajectory_init_cb) (vlib_buffer_t * b);
-
-void
-vlib_buffer_trace_trajectory_init (vlib_buffer_t * b)
-{
-  if (PREDICT_TRUE (vlib_buffer_trace_trajectory_init_cb != 0))
-    {
-      (*vlib_buffer_trace_trajectory_init_cb) (b);
-    }
-}
-
-#endif
-
 static inline void
 add_trajectory_trace (vlib_buffer_t * b, u32 node_index)
 {
 #if VLIB_BUFFER_TRACE_TRAJECTORY > 0
-  if (PREDICT_TRUE (vlib_buffer_trace_trajectory_cb != 0))
-    {
-      (*vlib_buffer_trace_trajectory_cb) (b, node_index);
-    }
+  if (PREDICT_FALSE (b->trajectory_nb >= VLIB_BUFFER_TRACE_TRAJECTORY_MAX))
+    return;
+  b->trajectory_trace[b->trajectory_nb] = node_index;
+  b->trajectory_nb++;
 #endif
 }
 
@@ -1050,13 +977,9 @@ dispatch_node (vlib_main_t * vm,
                                      /* n_vectors */ n,
                                      /* n_clocks */ t - last_time_stamp);
 
-  /* When in interrupt mode and vector rate crosses threshold switch to
-     polling mode. */
-  if (PREDICT_FALSE ((dispatch_state == VLIB_NODE_STATE_INTERRUPT)
-                    || (dispatch_state == VLIB_NODE_STATE_POLLING
-                        && (node->flags
-                            &
-                            VLIB_NODE_FLAG_SWITCH_FROM_INTERRUPT_TO_POLLING_MODE))))
+  /* When in adaptive mode and vector rate crosses threshold switch to
+     polling mode and vice versa. */
+  if (PREDICT_FALSE (node->flags & VLIB_NODE_FLAG_ADAPTIVE_MODE))
     {
       /* *INDENT-OFF* */
       ELOG_TYPE_DECLARE (e) =
@@ -1089,7 +1012,8 @@ dispatch_node (vlib_main_t * vm,
          nm->input_node_counts_by_state[VLIB_NODE_STATE_INTERRUPT] -= 1;
          nm->input_node_counts_by_state[VLIB_NODE_STATE_POLLING] += 1;
 
-         if (PREDICT_FALSE (vlib_global_main.elog_trace_graph_dispatch))
+         if (PREDICT_FALSE (
+               vlib_get_first_main ()->elog_trace_graph_dispatch))
            {
              vlib_worker_thread_t *w = vlib_worker_threads
                + vm->thread_index;
@@ -1124,7 +1048,8 @@ dispatch_node (vlib_main_t * vm,
                + vm->thread_index;
              node->flags |=
                VLIB_NODE_FLAG_SWITCH_FROM_POLLING_TO_INTERRUPT_MODE;
-             if (PREDICT_FALSE (vlib_global_main.elog_trace_graph_dispatch))
+             if (PREDICT_FALSE (
+                   vlib_get_first_main ()->elog_trace_graph_dispatch))
                {
                  ed = ELOG_TRACK_DATA (&vlib_global_main.elog_main, e,
                                        w->elog_track);
@@ -1238,13 +1163,14 @@ dispatch_pending_node (vlib_main_t * vm, uword pending_frame_index,
          /* no new frame has been assigned to this node, use the saved one */
          nf->frame = restore_frame;
          f->n_vectors = 0;
+         f->flags = 0;
        }
       else
        {
          /* The node has gained a frame, implying packets from the current frame
             were re-queued to this same node. we don't need the saved one
             anymore */
-         vlib_frame_free (vm, n, f);
+         vlib_frame_free (vm, f);
        }
     }
   else
@@ -1252,7 +1178,7 @@ dispatch_pending_node (vlib_main_t * vm, uword pending_frame_index,
       if (f->frame_flags & VLIB_FRAME_FREE_AFTER_DISPATCH)
        {
          ASSERT (!(n->flags & VLIB_NODE_FLAG_FRAME_NO_FREE_AFTER_DISPATCH));
-         vlib_frame_free (vm, n, f);
+         vlib_frame_free (vm, f);
        }
     }
 
@@ -1533,7 +1459,7 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
   if (is_main)
     {
       vec_resize (nm->pending_frames, 32);
-      _vec_len (nm->pending_frames) = 0;
+      vec_set_len (nm->pending_frames, 0);
     }
 
   /* Mark time of main loop start. */
@@ -1592,6 +1518,7 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
       if (PREDICT_FALSE (vm->check_frame_queues + frame_queue_check_counter))
        {
          u32 processed = 0;
+         vlib_frame_queue_dequeue_fn_t *fn;
 
          if (vm->check_frame_queues)
            {
@@ -1600,7 +1527,10 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
            }
 
          vec_foreach (fqm, tm->frame_queue_mains)
-           processed += vlib_frame_queue_dequeue (vm, fqm);
+           {
+             fn = fqm->frame_queue_dequeue_fn;
+             processed += (fn) (vm, fqm);
+           }
 
          /* No handoff queue work found? */
          if (processed)
@@ -1657,7 +1587,7 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
       for (i = 0; i < _vec_len (nm->pending_frames); i++)
        cpu_time_now = dispatch_pending_node (vm, i, cpu_time_now);
       /* Reset pending vector for next iteration. */
-      _vec_len (nm->pending_frames) = 0;
+      vec_set_len (nm->pending_frames, 0);
 
       if (is_main)
        {
@@ -1743,7 +1673,7 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
                        dispatch_suspended_process (vm, di, cpu_time_now);
                    }
                }
-             _vec_len (nm->data_from_advancing_timing_wheel) = 0;
+             vec_set_len (nm->data_from_advancing_timing_wheel, 0);
            }
        }
       vlib_increment_main_loop_counter (vm);
@@ -1792,33 +1722,32 @@ vlib_worker_loop (vlib_main_t * vm)
   vlib_main_or_worker_loop (vm, /* is_main */ 0);
 }
 
-vlib_main_t vlib_global_main;
+vlib_global_main_t vlib_global_main;
 
 void
 vlib_add_del_post_mortem_callback (void *cb, int is_add)
 {
-  vlib_main_t *vm = &vlib_global_main;
+  vlib_global_main_t *vgm = vlib_get_global_main ();
   int i;
 
   if (is_add == 0)
     {
-      for (i = vec_len (vm->post_mortem_callbacks) - 1; i >= 0; i--)
-       if (vm->post_mortem_callbacks[i] == cb)
-         vec_del1 (vm->post_mortem_callbacks, i);
+      for (i = vec_len (vgm->post_mortem_callbacks) - 1; i >= 0; i--)
+       if (vgm->post_mortem_callbacks[i] == cb)
+         vec_del1 (vgm->post_mortem_callbacks, i);
       return;
     }
 
-  for (i = 0; i < vec_len (vm->post_mortem_callbacks); i++)
-    if (vm->post_mortem_callbacks[i] == cb)
+  for (i = 0; i < vec_len (vgm->post_mortem_callbacks); i++)
+    if (vgm->post_mortem_callbacks[i] == cb)
       return;
-  vec_add1 (vm->post_mortem_callbacks, cb);
+  vec_add1 (vgm->post_mortem_callbacks, cb);
 }
 
 static void
 elog_post_mortem_dump (void)
 {
-  vlib_main_t *vm = &vlib_global_main;
-  elog_main_t *em = &vm->elog_main;
+  elog_main_t *em = vlib_get_elog_main ();
 
   u8 *filename;
   clib_error_t *error;
@@ -1835,6 +1764,7 @@ elog_post_mortem_dump (void)
 static clib_error_t *
 vlib_main_configure (vlib_main_t * vm, unformat_input_t * input)
 {
+  vlib_global_main_t *vgm = vlib_get_global_main ();
   int turn_on_mem_trace = 0;
 
   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
@@ -1843,9 +1773,9 @@ vlib_main_configure (vlib_main_t * vm, unformat_input_t * input)
        turn_on_mem_trace = 1;
 
       else if (unformat (input, "elog-events %d",
-                        &vm->configured_elog_ring_size))
-       vm->configured_elog_ring_size =
-         1 << max_log2 (vm->configured_elog_ring_size);
+                        &vgm->configured_elog_ring_size))
+       vgm->configured_elog_ring_size =
+         1 << max_log2 (vgm->configured_elog_ring_size);
       else if (unformat (input, "elog-post-mortem-dump"))
        vlib_add_del_post_mortem_callback (elog_post_mortem_dump,
                                           /* is_add */ 1);
@@ -1884,7 +1814,6 @@ placeholder_queue_signal_callback (vlib_main_t * vm)
 }
 
 #define foreach_weak_reference_stub             \
-_(vlib_map_stat_segment_init)                   \
 _(vpe_api_init)                                 \
 _(vlibmemory_init)                              \
 _(map_api_segment_init)
@@ -1922,21 +1851,22 @@ vl_api_get_elog_trace_api_messages (void)
 int
 vlib_main (vlib_main_t * volatile vm, unformat_input_t * input)
 {
+  vlib_global_main_t *vgm = vlib_get_global_main ();
   clib_error_t *volatile error;
   vlib_node_main_t *nm = &vm->node_main;
 
   vm->queue_signal_callback = placeholder_queue_signal_callback;
 
   /* Reconfigure event log which is enabled very early */
-  if (vm->configured_elog_ring_size &&
-      vm->configured_elog_ring_size != vm->elog_main.event_ring_size)
-    elog_resize (&vm->elog_main, vm->configured_elog_ring_size);
-  vl_api_set_elog_main (&vm->elog_main);
+  if (vgm->configured_elog_ring_size &&
+      vgm->configured_elog_ring_size != vgm->elog_main.event_ring_size)
+    elog_resize (&vgm->elog_main, vgm->configured_elog_ring_size);
+  vl_api_set_elog_main (vlib_get_elog_main ());
   (void) vl_api_set_elog_trace_api_messages (1);
 
   /* Default name. */
-  if (!vm->name)
-    vm->name = "VLIB";
+  if (!vgm->name)
+    vgm->name = "VLIB";
 
   if ((error = vlib_physmem_init (vm)))
     {
@@ -1944,7 +1874,13 @@ vlib_main (vlib_main_t * volatile vm, unformat_input_t * input)
       goto done;
     }
 
-  if ((error = vlib_map_stat_segment_init (vm)))
+  if ((error = vlib_log_init (vm)))
+    {
+      clib_error_report (error);
+      goto done;
+    }
+
+  if ((error = vlib_stats_init (vm)))
     {
       clib_error_report (error);
       goto done;
@@ -2004,8 +1940,8 @@ vlib_main (vlib_main_t * volatile vm, unformat_input_t * input)
     }
 
   /* See unix/main.c; most likely already set up */
-  if (vm->init_functions_called == 0)
-    vm->init_functions_called = hash_create (0, /* value bytes */ 0);
+  if (vgm->init_functions_called == 0)
+    vgm->init_functions_called = hash_create (0, /* value bytes */ 0);
   if ((error = vlib_call_all_init_functions (vm)))
     goto done;
 
@@ -2013,7 +1949,7 @@ vlib_main (vlib_main_t * volatile vm, unformat_input_t * input)
                                             CLIB_CACHE_LINE_BYTES);
 
   vec_validate (nm->data_from_advancing_timing_wheel, 10);
-  _vec_len (nm->data_from_advancing_timing_wheel) = 0;
+  vec_set_len (nm->data_from_advancing_timing_wheel, 0);
 
   /* Create the process timing wheel */
   TW (tw_timer_wheel_init) ((TWT (tw_timer_wheel) *) nm->timing_wheel,
@@ -2022,9 +1958,9 @@ vlib_main (vlib_main_t * volatile vm, unformat_input_t * input)
                            ~0 /* max expirations per call */ );
 
   vec_validate (vm->pending_rpc_requests, 0);
-  _vec_len (vm->pending_rpc_requests) = 0;
+  vec_set_len (vm->pending_rpc_requests, 0);
   vec_validate (vm->processing_rpc_requests, 0);
-  _vec_len (vm->processing_rpc_requests) = 0;
+  vec_set_len (vm->processing_rpc_requests, 0);
 
   /* Default params for the buffer allocator fault injector, if configured */
   if (VLIB_BUFFER_ALLOC_FAULT_INJECTOR > 0)
@@ -2047,7 +1983,7 @@ vlib_main (vlib_main_t * volatile vm, unformat_input_t * input)
   vm->damping_constant = exp (-1.0 / 20.0);
 
   /* Sort per-thread init functions before we start threads */
-  vlib_sort_init_exit_functions (&vm->worker_init_function_registrations);
+  vlib_sort_init_exit_functions (&vgm->worker_init_function_registrations);
 
   /* Call all main loop enter functions. */
   {
@@ -2087,7 +2023,7 @@ done:
   if (error)
     clib_error_report (error);
 
-  return 0;
+  return vm->main_loop_exit_status;
 }
 
 vlib_main_t *
@@ -2102,6 +2038,13 @@ vlib_get_elog_main_not_inline ()
   return &vlib_global_main.elog_main;
 }
 
+void
+vlib_exit_with_status (vlib_main_t *vm, int status)
+{
+  vm->main_loop_exit_status = status;
+  __atomic_store_n (&vm->main_loop_exit_now, 1, __ATOMIC_RELEASE);
+}
+
 /*
  * fd.io coding-style-patch-verification: ON
  *