vlib: calculate per-worker loops/second metric
[vpp.git] / src / vlib / main.c
index 4dcf63e..f723d10 100644 (file)
@@ -52,8 +52,6 @@ CJ_GLOBAL_LOG_PROTOTYPE;
    speculative vector enqueues which overflow vector data in next frame. */
 #define VLIB_FRAME_SIZE_ALLOC (VLIB_FRAME_SIZE + 4)
 
-u32 wraps;
-
 always_inline u32
 vlib_frame_bytes (u32 n_scalar_bytes, u32 n_vector_bytes)
 {
@@ -117,7 +115,7 @@ get_frame_size_info (vlib_node_main_t * nm,
 #endif
 }
 
-static u32
+static vlib_frame_t *
 vlib_frame_alloc_to_node (vlib_main_t * vm, u32 to_node_index,
                          u32 frame_flags)
 {
@@ -125,7 +123,7 @@ vlib_frame_alloc_to_node (vlib_main_t * vm, u32 to_node_index,
   vlib_frame_size_t *fs;
   vlib_node_t *to_node;
   vlib_frame_t *f;
-  u32 fi, l, n, scalar_size, vector_size;
+  u32 l, n, scalar_size, vector_size;
 
   to_node = vlib_get_node (vm, to_node_index);
 
@@ -134,17 +132,15 @@ vlib_frame_alloc_to_node (vlib_main_t * vm, u32 to_node_index,
 
   fs = get_frame_size_info (nm, scalar_size, vector_size);
   n = vlib_frame_bytes (scalar_size, vector_size);
-  if ((l = vec_len (fs->free_frame_indices)) > 0)
+  if ((l = vec_len (fs->free_frames)) > 0)
     {
       /* Allocate from end of free list. */
-      fi = fs->free_frame_indices[l - 1];
-      f = vlib_get_frame_no_check (vm, fi);
-      _vec_len (fs->free_frame_indices) = l - 1;
+      f = fs->free_frames[l - 1];
+      _vec_len (fs->free_frames) = l - 1;
     }
   else
     {
       f = clib_mem_alloc_aligned_no_fail (n, VLIB_FRAME_ALIGN);
-      fi = vlib_frame_index_no_check (vm, f);
     }
 
   /* Poison frame when debugging. */
@@ -167,12 +163,12 @@ vlib_frame_alloc_to_node (vlib_main_t * vm, u32 to_node_index,
 
   fs->n_alloc_frames += 1;
 
-  return fi;
+  return f;
 }
 
 /* Allocate a frame for from FROM_NODE to TO_NODE via TO_NEXT_INDEX.
    Returns frame index. */
-static u32
+static vlib_frame_t *
 vlib_frame_alloc (vlib_main_t * vm, vlib_node_runtime_t * from_node_runtime,
                  u32 to_next_index)
 {
@@ -188,10 +184,10 @@ vlib_frame_alloc (vlib_main_t * vm, vlib_node_runtime_t * from_node_runtime,
 vlib_frame_t *
 vlib_get_frame_to_node (vlib_main_t * vm, u32 to_node_index)
 {
-  u32 fi = vlib_frame_alloc_to_node (vm, to_node_index,
-                                    /* frame_flags */
-                                    VLIB_FRAME_FREE_AFTER_DISPATCH);
-  return vlib_get_frame (vm, fi);
+  vlib_frame_t *f = vlib_frame_alloc_to_node (vm, to_node_index,
+                                             /* frame_flags */
+                                             VLIB_FRAME_FREE_AFTER_DISPATCH);
+  return vlib_get_frame (vm, f);
 }
 
 void
@@ -208,7 +204,7 @@ vlib_put_frame_to_node (vlib_main_t * vm, u32 to_node_index, vlib_frame_t * f)
   vec_add2 (vm->node_main.pending_frames, p, 1);
 
   f->frame_flags |= VLIB_FRAME_PENDING;
-  p->frame_index = vlib_frame_index (vm, f);
+  p->frame = vlib_get_frame (vm, f);
   p->node_runtime_index = to_node->runtime_index;
   p->next_frame_index = VLIB_PENDING_FRAME_NO_NEXT_FRAME;
 }
@@ -220,28 +216,24 @@ vlib_frame_free (vlib_main_t * vm, vlib_node_runtime_t * r, vlib_frame_t * f)
   vlib_node_main_t *nm = &vm->node_main;
   vlib_node_t *node;
   vlib_frame_size_t *fs;
-  u32 frame_index;
 
   ASSERT (f->frame_flags & VLIB_FRAME_IS_ALLOCATED);
 
   node = vlib_get_node (vm, r->node_index);
   fs = get_frame_size_info (nm, node->scalar_size, node->vector_size);
 
-  frame_index = vlib_frame_index (vm, f);
-
   ASSERT (f->frame_flags & VLIB_FRAME_IS_ALLOCATED);
 
   /* No next frames may point to freed frame. */
   if (CLIB_DEBUG > 0)
     {
       vlib_next_frame_t *nf;
-      vec_foreach (nf, vm->node_main.next_frames)
-       ASSERT (nf->frame_index != frame_index);
+      vec_foreach (nf, vm->node_main.next_frames) ASSERT (nf->frame != f);
     }
 
   f->frame_flags &= ~(VLIB_FRAME_IS_ALLOCATED | VLIB_FRAME_NO_APPEND);
 
-  vec_add1 (fs->free_frame_indices, frame_index);
+  vec_add1 (fs->free_frames, f);
   ASSERT (fs->n_alloc_frames > 0);
   fs->n_alloc_frames -= 1;
 }
@@ -257,7 +249,7 @@ show_frame_stats (vlib_main_t * vm,
   vec_foreach (fs, nm->frame_sizes)
   {
     u32 n_alloc = fs->n_alloc_frames;
-    u32 n_free = vec_len (fs->free_frame_indices);
+    u32 n_free = vec_len (fs->free_frames);
 
     if (n_alloc + n_free > 0)
       vlib_cli_output (vm, "%=6d%=12d%=12d",
@@ -321,11 +313,11 @@ vlib_next_frame_change_ownership (vlib_main_t * vm,
       if (next_frame->flags & VLIB_FRAME_PENDING)
        {
          vlib_pending_frame_t *p;
-         if (next_frame->frame_index != ~0)
+         if (next_frame->frame != NULL)
            {
              vec_foreach (p, nm->pending_frames)
              {
-               if (p->frame_index == next_frame->frame_index)
+               if (p->frame == next_frame->frame)
                  {
                    p->next_frame_index =
                      next_frame - vm->node_main.next_frames;
@@ -377,11 +369,11 @@ vlib_get_next_frame_internal (vlib_main_t * vm,
   /* ??? Don't need valid flag: can use frame_index == ~0 */
   if (PREDICT_FALSE (!(nf->flags & VLIB_FRAME_IS_ALLOCATED)))
     {
-      nf->frame_index = vlib_frame_alloc (vm, node, next_index);
+      nf->frame = vlib_frame_alloc (vm, node, next_index);
       nf->flags |= VLIB_FRAME_IS_ALLOCATED;
     }
 
-  f = vlib_get_frame (vm, nf->frame_index);
+  f = nf->frame;
 
   /* Has frame been removed from pending vector (e.g. finished dispatching)?
      If so we can reuse frame. */
@@ -403,13 +395,12 @@ vlib_get_next_frame_internal (vlib_main_t * vm,
          two redundant frames from node -> next node. */
       if (!(nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH))
        {
-         vlib_frame_t *f_old = vlib_get_frame (vm, nf->frame_index);
+         vlib_frame_t *f_old = vlib_get_frame (vm, nf->frame);
          f_old->frame_flags |= VLIB_FRAME_FREE_AFTER_DISPATCH;
        }
 
       /* Allocate new frame to replace full one. */
-      nf->frame_index = vlib_frame_alloc (vm, node, next_index);
-      f = vlib_get_frame (vm, nf->frame_index);
+      f = nf->frame = vlib_frame_alloc (vm, node, next_index);
       n_used = f->n_vectors;
     }
 
@@ -438,7 +429,7 @@ vlib_put_next_frame_validate (vlib_main_t * vm,
   u32 n_before, n_after;
 
   nf = vlib_node_runtime_get_next_frame (vm, rt, next_index);
-  f = vlib_get_frame (vm, nf->frame_index);
+  f = vlib_get_frame (vm, nf->frame);
 
   ASSERT (n_vectors_left <= VLIB_FRAME_SIZE);
   n_after = VLIB_FRAME_SIZE - n_vectors_left;
@@ -475,7 +466,7 @@ vlib_put_next_frame (vlib_main_t * vm,
     vlib_put_next_frame_validate (vm, r, next_index, n_vectors_left);
 
   nf = vlib_node_runtime_get_next_frame (vm, r, next_index);
-  f = vlib_get_frame (vm, nf->frame_index);
+  f = vlib_get_frame (vm, nf->frame);
 
   /* Make sure that magic number is still there.  Otherwise, caller
      has overrun frame meta data. */
@@ -511,7 +502,7 @@ vlib_put_next_frame (vlib_main_t * vm,
 
          vec_add2 (nm->pending_frames, p, 1);
 
-         p->frame_index = nf->frame_index;
+         p->frame = nf->frame;
          p->node_runtime_index = nf->node_runtime_index;
          p->next_frame_index = nf - nm->next_frames;
          nf->flags |= VLIB_FRAME_PENDING;
@@ -525,7 +516,7 @@ vlib_put_next_frame (vlib_main_t * vm,
           */
          if (0 && r->thread_index != next_runtime->thread_index)
            {
-             nf->frame_index = ~0;
+             nf->frame = NULL;
              nf->flags &= ~(VLIB_FRAME_PENDING | VLIB_FRAME_IS_ALLOCATED);
            }
        }
@@ -680,13 +671,16 @@ vlib_node_runtime_update_stats (vlib_main_t * vm,
   return r;
 }
 
-static inline void
-vlib_node_runtime_perf_counter (vlib_main_t * vm, u64 * pmc0, u64 * pmc1)
+always_inline void
+vlib_node_runtime_perf_counter (vlib_main_t * vm, u64 * pmc0, u64 * pmc1,
+                               vlib_node_runtime_t * node,
+                               vlib_frame_t * frame, int before_or_after)
 {
   *pmc0 = 0;
   *pmc1 = 0;
-  if (PREDICT_FALSE (vm->vlib_node_runtime_perf_counter_cb != 0))
-    (*vm->vlib_node_runtime_perf_counter_cb) (vm, pmc0, pmc1);
+  if (PREDICT_FALSE (vec_len (vm->vlib_node_runtime_perf_counter_cbs) != 0))
+    clib_call_callbacks (vm->vlib_node_runtime_perf_counter_cbs, vm, pmc0,
+                        pmc1, node, frame, before_or_after);
 }
 
 always_inline void
@@ -1010,14 +1004,17 @@ format_buffer_metadata (u8 * s, va_list * args)
   s = format (s, "flags: %U\n", format_vnet_buffer_flags, b);
   s = format (s, "current_data: %d, current_length: %d\n",
              (i32) (b->current_data), (i32) (b->current_length));
-  s = format (s, "current_config_index: %d, flow_id: %x, next_buffer: %x\n",
-             b->current_config_index, b->flow_id, b->next_buffer);
-  s = format (s, "error: %d, ref_count: %d, buffer_pool_index: %d\n",
-             (u32) (b->error), (u32) (b->ref_count),
-             (u32) (b->buffer_pool_index));
-  s = format (s,
-             "trace_index: %d, len_not_first_buf: %d\n",
-             b->trace_index, b->total_length_not_including_first_buffer);
+  s = format
+    (s,
+     "current_config_index/punt_reason: %d, flow_id: %x, next_buffer: %x\n",
+     b->current_config_index, b->flow_id, b->next_buffer);
+  s =
+    format (s, "error: %d, ref_count: %d, buffer_pool_index: %d\n",
+           (u32) (b->error), (u32) (b->ref_count),
+           (u32) (b->buffer_pool_index));
+  s =
+    format (s, "trace_handle: 0x%x, len_not_first_buf: %d\n", b->trace_handle,
+           b->total_length_not_including_first_buffer);
   return s;
 }
 
@@ -1029,7 +1026,7 @@ dispatch_pcap_trace (vlib_main_t * vm,
 {
   int i;
   vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **bufp, *b;
-  pcap_main_t *pm = &vm->dispatch_pcap_main;
+  pcap_main_t *pm = &vlib_global_main.dispatch_pcap_main;
   vlib_trace_main_t *tm = &vm->trace_main;
   u32 capture_size;
   vlib_node_t *n;
@@ -1088,7 +1085,8 @@ dispatch_pcap_trace (vlib_main_t * vm,
          if (PREDICT_FALSE (b->flags & VLIB_BUFFER_IS_TRACED))
            {
              vlib_trace_header_t **h
-               = pool_elt_at_index (tm->trace_buffer_pool, b->trace_index);
+               = pool_elt_at_index (tm->trace_buffer_pool,
+                                    vlib_buffer_get_trace_index (b));
 
              vm->pcap_buffer = format (vm->pcap_buffer, "%U%c",
                                        format_vlib_trace, vm, h[0], 0);
@@ -1181,7 +1179,8 @@ dispatch_node (vlib_main_t * vm,
                             last_time_stamp, frame ? frame->n_vectors : 0,
                             /* is_after */ 0);
 
-  vlib_node_runtime_perf_counter (vm, &pmc_before[0], &pmc_before[1]);
+  vlib_node_runtime_perf_counter (vm, &pmc_before[0], &pmc_before[1],
+                                 node, frame, 0 /* before */ );
 
   /*
    * Turn this on if you run into
@@ -1215,7 +1214,8 @@ dispatch_node (vlib_main_t * vm,
    * To validate accounting: pmc_delta = t - pmc_before;
    * perf ticks should equal clocks/pkt...
    */
-  vlib_node_runtime_perf_counter (vm, &pmc_after[0], &pmc_after[1]);
+  vlib_node_runtime_perf_counter (vm, &pmc_after[0], &pmc_after[1], node,
+                                 frame, 1 /* after */ );
 
   pmc_delta[0] = pmc_after[0] - pmc_before[0];
   pmc_delta[1] = pmc_after[1] - pmc_before[1];
@@ -1329,7 +1329,7 @@ dispatch_pending_node (vlib_main_t * vm, uword pending_frame_index,
   vlib_frame_t *f;
   vlib_next_frame_t *nf, nf_dummy;
   vlib_node_runtime_t *n;
-  u32 restore_frame_index;
+  vlib_frame_t *restore_frame;
   vlib_pending_frame_t *p;
 
   /* See comment below about dangling references to nm->pending_frames */
@@ -1338,13 +1338,13 @@ dispatch_pending_node (vlib_main_t * vm, uword pending_frame_index,
   n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
                        p->node_runtime_index);
 
-  f = vlib_get_frame (vm, p->frame_index);
+  f = vlib_get_frame (vm, p->frame);
   if (p->next_frame_index == VLIB_PENDING_FRAME_NO_NEXT_FRAME)
     {
       /* No next frame: so use dummy on stack. */
       nf = &nf_dummy;
       nf->flags = f->frame_flags & VLIB_NODE_FLAG_TRACE;
-      nf->frame_index = ~p->frame_index;
+      nf->frame = NULL;
     }
   else
     nf = vec_elt_at_index (nm->next_frames, p->next_frame_index);
@@ -1353,13 +1353,13 @@ dispatch_pending_node (vlib_main_t * vm, uword pending_frame_index,
 
   /* Force allocation of new frame while current frame is being
      dispatched. */
-  restore_frame_index = ~0;
-  if (nf->frame_index == p->frame_index)
+  restore_frame = NULL;
+  if (nf->frame == p->frame)
     {
-      nf->frame_index = ~0;
+      nf->frame = NULL;
       nf->flags &= ~VLIB_FRAME_IS_ALLOCATED;
       if (!(n->flags & VLIB_NODE_FLAG_FRAME_NO_FREE_AFTER_DISPATCH))
-       restore_frame_index = p->frame_index;
+       restore_frame = p->frame;
     }
 
   /* Frame must be pending. */
@@ -1377,11 +1377,17 @@ dispatch_pending_node (vlib_main_t * vm, uword pending_frame_index,
                                   VLIB_NODE_TYPE_INTERNAL,
                                   VLIB_NODE_STATE_POLLING,
                                   f, last_time_stamp);
+  /* Internal node vector-rate accounting, for summary stats */
+  vm->internal_node_vectors += f->n_vectors;
+  vm->internal_node_calls++;
+  vm->internal_node_last_vectors_per_main_loop =
+    (f->n_vectors > vm->internal_node_last_vectors_per_main_loop) ?
+    f->n_vectors : vm->internal_node_last_vectors_per_main_loop;
 
   f->frame_flags &= ~(VLIB_FRAME_PENDING | VLIB_FRAME_NO_APPEND);
 
   /* Frame is ready to be used again, so restore it. */
-  if (restore_frame_index != ~0)
+  if (restore_frame != NULL)
     {
       /*
        * We musn't restore a frame that is flagged to be freed. This
@@ -1409,10 +1415,10 @@ dispatch_pending_node (vlib_main_t * vm, uword pending_frame_index,
       nf = vec_elt_at_index (nm->next_frames, p->next_frame_index);
       nf->flags |= VLIB_FRAME_IS_ALLOCATED;
 
-      if (~0 == nf->frame_index)
+      if (NULL == nf->frame)
        {
          /* no new frame has been assigned to this node, use the saved one */
-         nf->frame_index = restore_frame_index;
+         nf->frame = restore_frame;
          f->n_vectors = 0;
        }
       else
@@ -1546,7 +1552,7 @@ dispatch_process (vlib_main_t * vm,
       n_vectors = 0;
       pool_get (nm->suspended_process_frames, pf);
       pf->node_runtime_index = node->runtime_index;
-      pf->frame_index = f ? vlib_frame_index (vm, f) : ~0;
+      pf->frame = f;
       pf->next_frame_index = ~0;
 
       p->n_suspends += 1;
@@ -1614,7 +1620,7 @@ dispatch_suspended_process (vlib_main_t * vm,
 
   node_runtime = &p->node_runtime;
   node = vlib_get_node (vm, node_runtime->node_index);
-  f = pf->frame_index != ~0 ? vlib_get_frame (vm, pf->frame_index) : 0;
+  f = pf->frame;
 
   vlib_elog_main_loop_event (vm, node_runtime->node_index, t,
                             f ? f->n_vectors : 0, /* is_after */ 0);
@@ -1677,8 +1683,10 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
   vlib_thread_main_t *tm = vlib_get_thread_main ();
   uword i;
   u64 cpu_time_now;
+  f64 now;
   vlib_frame_queue_main_t *fqm;
   u32 *last_node_runtime_indices = 0;
+  u32 frame_queue_check_counter = 0;
 
   /* Initialize pending node vector. */
   if (is_main)
@@ -1710,11 +1718,20 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
 
   vm->cpu_id = clib_get_current_cpu_id ();
   vm->numa_node = clib_get_current_numa_node ();
+  os_set_numa_index (vm->numa_node);
 
   /* Start all processes. */
   if (is_main)
     {
       uword i;
+
+      /*
+       * Perform an initial barrier sync. Pays no attention to
+       * the barrier sync hold-down timer scheme, which won't work
+       * at this point in time.
+       */
+      vlib_worker_thread_initial_barrier_sync_and_release (vm);
+
       nm->current_process_index = ~0;
       for (i = 0; i < vec_len (nm->processes); i++)
        cpu_time_now = dispatch_process (vm, nm->processes[i], /* frame */ 0,
@@ -1734,11 +1751,28 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
       if (!is_main)
        {
          vlib_worker_thread_barrier_check ();
-         vec_foreach (fqm, tm->frame_queue_mains)
-           vlib_frame_queue_dequeue (vm, fqm);
-         if (PREDICT_FALSE (vm->worker_thread_main_loop_callback != 0))
-           ((void (*)(vlib_main_t *)) vm->worker_thread_main_loop_callback)
-             (vm);
+         if (PREDICT_FALSE (vm->check_frame_queues +
+                            frame_queue_check_counter))
+           {
+             u32 processed = 0;
+
+             if (vm->check_frame_queues)
+               {
+                 frame_queue_check_counter = 100;
+                 vm->check_frame_queues = 0;
+               }
+
+             vec_foreach (fqm, tm->frame_queue_mains)
+               processed += vlib_frame_queue_dequeue (vm, fqm);
+
+             /* No handoff queue work found? */
+             if (processed)
+               frame_queue_check_counter = 100;
+             else
+               frame_queue_check_counter--;
+           }
+         if (PREDICT_FALSE (vec_len (vm->worker_thread_main_loop_callbacks)))
+           clib_call_callbacks (vm->worker_thread_main_loop_callbacks, vm);
        }
 
       /* Process pre-input nodes. */
@@ -1890,10 +1924,36 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
            }
        }
       vlib_increment_main_loop_counter (vm);
-
       /* Record time stamp in case there are no enabled nodes and above
          calls do not update time stamp. */
       cpu_time_now = clib_cpu_time_now ();
+      vm->loops_this_reporting_interval++;
+      now = clib_time_now_internal (&vm->clib_time, cpu_time_now);
+      /* Time to update loops_per_second? */
+      if (PREDICT_FALSE (now >= vm->loop_interval_end))
+       {
+         /* Next sample ends in 20ms */
+         if (vm->loop_interval_start)
+           {
+             f64 this_loops_per_second;
+
+             this_loops_per_second =
+               ((f64) vm->loops_this_reporting_interval) / (now -
+                                                            vm->loop_interval_start);
+
+             vm->loops_per_second =
+               vm->loops_per_second * vm->damping_constant +
+               (1.0 - vm->damping_constant) * this_loops_per_second;
+             if (vm->loops_per_second != 0.0)
+               vm->seconds_per_loop = 1.0 / vm->loops_per_second;
+             else
+               vm->seconds_per_loop = 0.0;
+           }
+         /* New interval starts now, and ends in 20ms */
+         vm->loop_interval_start = now;
+         vm->loop_interval_end = now + 2e-4;
+         vm->loops_this_reporting_interval = 0;
+       }
     }
 }
 
@@ -1958,6 +2018,29 @@ clib_error_t *name (vlib_main_t *vm) { return 0; }
 foreach_weak_reference_stub;
 #undef _
 
+void vl_api_set_elog_main (elog_main_t * m) __attribute__ ((weak));
+void
+vl_api_set_elog_main (elog_main_t * m)
+{
+  clib_warning ("STUB");
+}
+
+int vl_api_set_elog_trace_api_messages (int enable) __attribute__ ((weak));
+int
+vl_api_set_elog_trace_api_messages (int enable)
+{
+  clib_warning ("STUB");
+  return 0;
+}
+
+int vl_api_get_elog_trace_api_messages (void) __attribute__ ((weak));
+int
+vl_api_get_elog_trace_api_messages (void)
+{
+  clib_warning ("STUB");
+  return 0;
+}
+
 /* Main function. */
 int
 vlib_main (vlib_main_t * volatile vm, unformat_input_t * input)
@@ -1967,13 +2050,13 @@ vlib_main (vlib_main_t * volatile vm, unformat_input_t * input)
 
   vm->queue_signal_callback = dummy_queue_signal_callback;
 
-  clib_time_init (&vm->clib_time);
-
   /* Turn on event log. */
   if (!vm->elog_main.event_ring_size)
     vm->elog_main.event_ring_size = 128 << 10;
   elog_init (&vm->elog_main, vm->elog_main.event_ring_size);
   elog_enable_disable (&vm->elog_main, 1);
+  vl_api_set_elog_main (&vm->elog_main);
+  (void) vl_api_set_elog_trace_api_messages (1);
 
   /* Default name. */
   if (!vm->name)
@@ -2064,6 +2147,30 @@ vlib_main (vlib_main_t * volatile vm, unformat_input_t * input)
   vec_validate (vm->processing_rpc_requests, 0);
   _vec_len (vm->processing_rpc_requests) = 0;
 
+  if ((error = vlib_call_all_config_functions (vm, input, 0 /* is_early */ )))
+    goto done;
+
+  /*
+   * Use exponential smoothing, with a half-life of 1 second
+   * reported_rate(t) = reported_rate(t-1) * K + rate(t)*(1-K)
+   *
+   * Sample every 20ms, aka 50 samples per second
+   * K = exp (-1.0/20.0);
+   * K = 0.95
+   */
+  vm->damping_constant = exp (-1.0 / 20.0);
+
+  /* Sort per-thread init functions before we start threads */
+  vlib_sort_init_exit_functions (&vm->worker_init_function_registrations);
+
+  /* Call all main loop enter functions. */
+  {
+    clib_error_t *sub_error;
+    sub_error = vlib_call_all_main_loop_enter_functions (vm);
+    if (sub_error)
+      clib_error_report (sub_error);
+  }
+
   switch (clib_setjmp (&vm->main_loop_exit, VLIB_MAIN_LOOP_EXIT_NONE))
     {
     case VLIB_MAIN_LOOP_EXIT_NONE:
@@ -2078,17 +2185,6 @@ vlib_main (vlib_main_t * volatile vm, unformat_input_t * input)
       goto done;
     }
 
-  if ((error = vlib_call_all_config_functions (vm, input, 0 /* is_early */ )))
-    goto done;
-
-  /* Call all main loop enter functions. */
-  {
-    clib_error_t *sub_error;
-    sub_error = vlib_call_all_main_loop_enter_functions (vm);
-    if (sub_error)
-      clib_error_report (sub_error);
-  }
-
   vlib_main_loop (vm);
 
 done:
@@ -2106,169 +2202,207 @@ done:
   return 0;
 }
 
-static inline clib_error_t *
-pcap_dispatch_trace_command_internal (vlib_main_t * vm,
-                                     unformat_input_t * input,
-                                     vlib_cli_command_t * cmd, int rx_tx)
+int
+vlib_pcap_dispatch_trace_configure (vlib_pcap_dispatch_trace_args_t * a)
 {
-  unformat_input_t _line_input, *line_input = &_line_input;
+  vlib_main_t *vm = vlib_get_main ();
   pcap_main_t *pm = &vm->dispatch_pcap_main;
-  u8 *filename = 0;
-  u32 max = 1000;
-  int enabled = 0;
-  int is_error = 0;
-  clib_error_t *error = 0;
-  u32 node_index, add;
   vlib_trace_main_t *tm;
   vlib_trace_node_t *tn;
 
-  /* Get a line of input. */
-  if (!unformat_user (input, unformat_line_input, line_input))
-    return 0;
-
-  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
+  if (a->status)
     {
-      if (unformat (line_input, "on"))
-       {
-         if (vm->dispatch_pcap_enable == 0)
-           {
-             enabled = 1;
-           }
-         else
-           {
-             vlib_cli_output (vm, "pcap dispatch capture already on...");
-             is_error = 1;
-             break;
-           }
-       }
-      else if (unformat (line_input, "off"))
-       {
-         if (vm->dispatch_pcap_enable)
-           {
-             vlib_cli_output
-               (vm, "captured %d pkts...", pm->n_packets_captured);
-             if (pm->n_packets_captured)
-               {
-                 pm->n_packets_to_capture = pm->n_packets_captured;
-                 error = pcap_write (pm);
-                 if (error)
-                   clib_error_report (error);
-                 else
-                   vlib_cli_output (vm, "saved to %s...", pm->file_name);
-               }
-             vm->dispatch_pcap_enable = 0;
-           }
-         else
-           {
-             vlib_cli_output (vm, "pcap tx capture already off...");
-             is_error = 1;
-             break;
-           }
-       }
-      else if (unformat (line_input, "max %d", &max))
+      if (vm->dispatch_pcap_enable)
        {
-         if (vm->dispatch_pcap_enable)
+         int i;
+         vlib_cli_output
+           (vm, "pcap dispatch capture enabled: %d of %d pkts...",
+            pm->n_packets_captured, pm->n_packets_to_capture);
+         vlib_cli_output (vm, "capture to file %s", pm->file_name);
+
+         for (i = 0; i < vec_len (vm->dispatch_buffer_trace_nodes); i++)
            {
-             vlib_cli_output
-               (vm,
-                "can't change max value while pcap tx capture active...");
-             is_error = 1;
-             break;
+             vlib_cli_output (vm,
+                              "Buffer trace of %d pkts from %U enabled...",
+                              a->buffer_traces_to_capture,
+                              format_vlib_node_name, vm,
+                              vm->dispatch_buffer_trace_nodes[i]);
            }
-         pm->n_packets_to_capture = max;
        }
       else
-       if (unformat
-           (line_input, "file %U", unformat_vlib_tmpfile, &filename))
-       {
-         if (vm->dispatch_pcap_enable)
-           {
-             vlib_cli_output
-               (vm, "can't change file while pcap tx capture active...");
-             is_error = 1;
-             break;
-           }
-       }
-      else if (unformat (line_input, "status"))
-       {
-         if (vm->dispatch_pcap_enable)
-           {
-             vlib_cli_output
-               (vm, "pcap dispatch capture is on: %d of %d pkts...",
-                pm->n_packets_captured, pm->n_packets_to_capture);
-             vlib_cli_output (vm, "Capture to file %s", pm->file_name);
-           }
-         else
-           {
-             vlib_cli_output (vm, "pcap dispatch capture is off...");
-           }
-         break;
-       }
-      else if (unformat (line_input, "buffer-trace %U %d",
-                        unformat_vlib_node, vm, &node_index, &add))
-       {
-         if (vnet_trace_dummy == 0)
-           vec_validate_aligned (vnet_trace_dummy, 2048,
-                                 CLIB_CACHE_LINE_BYTES);
-         vlib_cli_output (vm, "Buffer tracing of %d pkts from %U enabled...",
-                          add, format_vlib_node_name, vm, node_index);
+       vlib_cli_output (vm, "pcap dispatch capture disabled");
+      return 0;
+    }
 
-          /* *INDENT-OFF* */
-          foreach_vlib_main ((
-            {
-              tm = &this_vlib_main->trace_main;
-              tm->verbose = 0;  /* not sure this ever did anything... */
-              vec_validate (tm->nodes, node_index);
-              tn = tm->nodes + node_index;
-              tn->limit += add;
-              tm->trace_enable = 1;
-            }));
-          /* *INDENT-ON* */
-       }
+  /* Consistency checks */
 
-      else
-       {
-         error = clib_error_return (0, "unknown input `%U'",
-                                    format_unformat_error, line_input);
-         is_error = 1;
-         break;
-       }
+  /* Enable w/ capture already enabled not allowed */
+  if (vm->dispatch_pcap_enable && a->enable)
+    return -7;                 /* VNET_API_ERROR_INVALID_VALUE */
+
+  /* Disable capture with capture already disabled, not interesting */
+  if (vm->dispatch_pcap_enable == 0 && a->enable == 0)
+    return -81;                        /* VNET_API_ERROR_VALUE_EXIST */
+
+  /* Change number of packets to capture while capturing */
+  if (vm->dispatch_pcap_enable && a->enable
+      && (pm->n_packets_to_capture != a->packets_to_capture))
+    return -8;                 /* VNET_API_ERROR_INVALID_VALUE_2 */
+
+  /* Independent of enable/disable, to allow buffer trace multi nodes */
+  if (a->buffer_trace_node_index != ~0)
+    {
+      /* *INDENT-OFF* */
+      foreach_vlib_main ((
+        {
+          tm = &this_vlib_main->trace_main;
+          tm->verbose = 0;  /* not sure this ever did anything... */
+          vec_validate (tm->nodes, a->buffer_trace_node_index);
+          tn = tm->nodes + a->buffer_trace_node_index;
+          tn->limit += a->buffer_traces_to_capture;
+          tm->trace_enable = 1;
+        }));
+      /* *INDENT-ON* */
+      vec_add1 (vm->dispatch_buffer_trace_nodes, a->buffer_trace_node_index);
     }
-  unformat_free (line_input);
 
-  if (is_error == 0)
+  if (a->enable)
     {
-      /* Clean up from previous run */
+      /* Clean up from previous run, if any */
       vec_free (pm->file_name);
       vec_free (pm->pcap_data);
-
       memset (pm, 0, sizeof (*pm));
-      pm->n_packets_to_capture = max;
 
-      if (enabled)
+      vec_validate_aligned (vnet_trace_dummy, 2048, CLIB_CACHE_LINE_BYTES);
+      if (pm->lock == 0)
+       clib_spinlock_init (&(pm->lock));
+
+      if (a->filename == 0)
+       a->filename = format (0, "/tmp/dispatch.pcap%c", 0);
+
+      pm->file_name = (char *) a->filename;
+      pm->n_packets_captured = 0;
+      pm->packet_type = PCAP_PACKET_TYPE_vpp;
+      pm->n_packets_to_capture = a->packets_to_capture;
+      /* *INDENT-OFF* */
+      foreach_vlib_main (({this_vlib_main->dispatch_pcap_enable = 1;}));
+      /* *INDENT-ON* */
+    }
+  else
+    {
+      /* *INDENT-OFF* */
+      foreach_vlib_main (({this_vlib_main->dispatch_pcap_enable = 0;}));
+      /* *INDENT-ON* */
+      vec_reset_length (vm->dispatch_buffer_trace_nodes);
+      if (pm->n_packets_captured)
        {
-         if (filename == 0)
-           filename = format (0, "/tmp/dispatch.pcap%c", 0);
-
-         pm->file_name = (char *) filename;
-         pm->n_packets_captured = 0;
-         pm->packet_type = PCAP_PACKET_TYPE_vpp;
-         if (pm->lock == 0)
-           clib_spinlock_init (&(pm->lock));
-         vm->dispatch_pcap_enable = 1;
-         vlib_cli_output (vm, "pcap dispatch capture on...");
+         clib_error_t *error;
+         pm->n_packets_to_capture = pm->n_packets_captured;
+         vlib_cli_output (vm, "Write %d packets to %s, and stop capture...",
+                          pm->n_packets_captured, pm->file_name);
+         error = pcap_write (pm);
+         if (pm->flags & PCAP_MAIN_INIT_DONE)
+           pcap_close (pm);
+         /* Report I/O errors... */
+         if (error)
+           {
+             clib_error_report (error);
+             return -11;       /* VNET_API_ERROR_SYSCALL_ERROR_1 */
+           }
+         return 0;
        }
+      else
+       return -6;              /* VNET_API_ERROR_NO_SUCH_ENTRY */
     }
 
-  return error;
+  return 0;
 }
 
 static clib_error_t *
-pcap_dispatch_trace_command_fn (vlib_main_t * vm,
-                               unformat_input_t * input,
-                               vlib_cli_command_t * cmd)
+dispatch_trace_command_fn (vlib_main_t * vm,
+                          unformat_input_t * input, vlib_cli_command_t * cmd)
 {
-  return pcap_dispatch_trace_command_internal (vm, input, cmd, VLIB_RX);
+  unformat_input_t _line_input, *line_input = &_line_input;
+  vlib_pcap_dispatch_trace_args_t _a, *a = &_a;
+  u8 *filename = 0;
+  u32 max = 1000;
+  int rv;
+  int enable = 0;
+  int status = 0;
+  u32 node_index = ~0, buffer_traces_to_capture = 100;
+
+  /* Get a line of input. */
+  if (!unformat_user (input, unformat_line_input, line_input))
+    return 0;
+
+  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (line_input, "on %=", &enable, 1))
+       ;
+      else if (unformat (line_input, "enable %=", &enable, 1))
+       ;
+      else if (unformat (line_input, "off %=", &enable, 0))
+       ;
+      else if (unformat (line_input, "disable %=", &enable, 0))
+       ;
+      else if (unformat (line_input, "max %d", &max))
+       ;
+      else if (unformat (line_input, "packets-to-capture %d", &max))
+       ;
+      else if (unformat (line_input, "file %U", unformat_vlib_tmpfile,
+                        &filename))
+       ;
+      else if (unformat (line_input, "status %=", &status, 1))
+       ;
+      else if (unformat (line_input, "buffer-trace %U %d",
+                        unformat_vlib_node, vm, &node_index,
+                        &buffer_traces_to_capture))
+       ;
+      else
+       {
+         return clib_error_return (0, "unknown input `%U'",
+                                   format_unformat_error, line_input);
+       }
+    }
+
+  unformat_free (line_input);
+
+  /* no need for memset (a, 0, sizeof (*a)), set all fields here. */
+  a->filename = filename;
+  a->enable = enable;
+  a->status = status;
+  a->packets_to_capture = max;
+  a->buffer_trace_node_index = node_index;
+  a->buffer_traces_to_capture = buffer_traces_to_capture;
+
+  rv = vlib_pcap_dispatch_trace_configure (a);
+
+  switch (rv)
+    {
+    case 0:
+      break;
+
+    case -7:
+      return clib_error_return (0, "dispatch trace already enabled...");
+
+    case -81:
+      return clib_error_return (0, "dispatch trace already disabled...");
+
+    case -8:
+      return clib_error_return
+       (0, "can't change number of records to capture while tracing...");
+
+    case -11:
+      return clib_error_return (0, "I/O writing trace capture...");
+
+    case -6:
+      return clib_error_return (0, "No packets captured...");
+
+    default:
+      vlib_cli_output (vm, "WARNING: trace configure returned %d", rv);
+      break;
+    }
+  return 0;
 }
 
 /*?
@@ -2328,7 +2462,7 @@ VLIB_CLI_COMMAND (pcap_dispatch_trace_command, static) = {
     .short_help =
     "pcap dispatch trace [on|off] [max <nn>] [file <name>] [status]\n"
     "              [buffer-trace <input-node-name> <nn>]",
-    .function = pcap_dispatch_trace_command_fn,
+    .function = dispatch_trace_command_fn,
 };
 /* *INDENT-ON* */