vlib: improve node interrupt handling
[vpp.git] / src / vlib / main.c
index 4223474..2e100b2 100644 (file)
@@ -44,9 +44,6 @@
 #include <vppinfra/tw_timer_1t_3w_1024sl_ov.h>
 
 #include <vlib/unix/unix.h>
-#include <vlib/unix/cj.h>
-
-CJ_GLOBAL_LOG_PROTOTYPE;
 
 /* Actually allocate a few extra slots of vector data to support
    speculative vector enqueues which overflow vector data in next frame. */
@@ -190,6 +187,31 @@ vlib_get_frame_to_node (vlib_main_t * vm, u32 to_node_index)
   return vlib_get_frame (vm, f);
 }
 
+static inline void
+vlib_validate_frame_indices (vlib_frame_t * f)
+{
+  if (CLIB_DEBUG > 0)
+    {
+      int i;
+      u32 *from = vlib_frame_vector_args (f);
+
+      /* Check for bad buffer index values */
+      for (i = 0; i < f->n_vectors; i++)
+       {
+         if (from[i] == 0)
+           {
+             clib_warning ("BUG: buffer index 0 at index %d", i);
+             ASSERT (0);
+           }
+         else if (from[i] == 0xfefefefe)
+           {
+             clib_warning ("BUG: frame poison pattern at index %d", i);
+             ASSERT (0);
+           }
+       }
+    }
+}
+
 void
 vlib_put_frame_to_node (vlib_main_t * vm, u32 to_node_index, vlib_frame_t * f)
 {
@@ -199,6 +221,8 @@ vlib_put_frame_to_node (vlib_main_t * vm, u32 to_node_index, vlib_frame_t * f)
   if (f->n_vectors == 0)
     return;
 
+  vlib_validate_frame_indices (f);
+
   to_node = vlib_get_node (vm, to_node_index);
 
   vec_add2 (vm->node_main.pending_frames, p, 1);
@@ -432,6 +456,9 @@ vlib_put_next_frame_validate (vlib_main_t * vm,
   f = vlib_get_frame (vm, nf->frame);
 
   ASSERT (n_vectors_left <= VLIB_FRAME_SIZE);
+
+  vlib_validate_frame_indices (f);
+
   n_after = VLIB_FRAME_SIZE - n_vectors_left;
   n_before = f->n_vectors;
 
@@ -1675,6 +1702,26 @@ vl_api_send_pending_rpc_requests (vlib_main_t * vm)
 {
 }
 
+static_always_inline u64
+dispatch_pending_interrupts (vlib_main_t * vm, vlib_node_main_t * nm,
+                            u64 cpu_time_now)
+{
+  vlib_node_runtime_t *n;
+
+  for (int i = 0; i < _vec_len (nm->pending_local_interrupts); i++)
+    {
+      vlib_node_interrupt_t *in;
+      in = vec_elt_at_index (nm->pending_local_interrupts, i);
+      n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
+                           in->node_runtime_index);
+      n->interrupt_data = in->data;
+      cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
+                                   VLIB_NODE_STATE_INTERRUPT, /* frame */ 0,
+                                   cpu_time_now);
+    }
+  vec_reset_length (nm->pending_local_interrupts);
+  return cpu_time_now;
+}
 
 static_always_inline void
 vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
@@ -1683,8 +1730,8 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
   vlib_thread_main_t *tm = vlib_get_thread_main ();
   uword i;
   u64 cpu_time_now;
+  f64 now;
   vlib_frame_queue_main_t *fqm;
-  u32 *last_node_runtime_indices = 0;
   u32 frame_queue_check_counter = 0;
 
   /* Initialize pending node vector. */
@@ -1704,10 +1751,9 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
     cpu_time_now = clib_cpu_time_now ();
 
   /* Pre-allocate interupt runtime indices and lock. */
-  vec_alloc (nm->pending_interrupt_node_runtime_indices, 32);
-  vec_alloc (last_node_runtime_indices, 32);
-  if (!is_main)
-    clib_spinlock_init (&nm->pending_interrupt_lock);
+  vec_alloc (nm->pending_local_interrupts, 32);
+  vec_alloc (nm->pending_remote_interrupts, 32);
+  clib_spinlock_init (&nm->pending_interrupt_lock);
 
   /* Pre-allocate expired nodes. */
   if (!nm->polling_threshold_vector_length)
@@ -1717,6 +1763,7 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
 
   vm->cpu_id = clib_get_current_cpu_id ();
   vm->numa_node = clib_get_current_numa_node ();
+  os_set_numa_index (vm->numa_node);
 
   /* Start all processes. */
   if (is_main)
@@ -1792,40 +1839,27 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
       if (PREDICT_TRUE (is_main && vm->queue_signal_pending == 0))
        vm->queue_signal_callback (vm);
 
-      /* Next handle interrupts. */
-      {
-       /* unlocked read, for performance */
-       uword l = _vec_len (nm->pending_interrupt_node_runtime_indices);
-       uword i;
-       if (PREDICT_FALSE (l > 0))
-         {
-           u32 *tmp;
-           if (!is_main)
-             {
-               clib_spinlock_lock (&nm->pending_interrupt_lock);
-               /* Re-read w/ lock held, in case another thread added an item */
-               l = _vec_len (nm->pending_interrupt_node_runtime_indices);
-             }
+      /* handle local interruots */
+      if (_vec_len (nm->pending_local_interrupts))
+       cpu_time_now = dispatch_pending_interrupts (vm, nm, cpu_time_now);
+
+      /* handle remote interruots */
+      if (_vec_len (nm->pending_remote_interrupts))
+       {
+         vlib_node_interrupt_t *in;
+
+         /* at this point it is known that
+          * vec_len (nm->pending_local_interrupts) is zero so we quickly swap
+          * local and remote vector under the spinlock */
+         clib_spinlock_lock (&nm->pending_interrupt_lock);
+         in = nm->pending_local_interrupts;
+         nm->pending_local_interrupts = nm->pending_remote_interrupts;
+         nm->pending_remote_interrupts = in;
+         clib_spinlock_unlock (&nm->pending_interrupt_lock);
+
+         cpu_time_now = dispatch_pending_interrupts (vm, nm, cpu_time_now);
+       }
 
-           tmp = nm->pending_interrupt_node_runtime_indices;
-           nm->pending_interrupt_node_runtime_indices =
-             last_node_runtime_indices;
-           last_node_runtime_indices = tmp;
-           _vec_len (last_node_runtime_indices) = 0;
-           if (!is_main)
-             clib_spinlock_unlock (&nm->pending_interrupt_lock);
-           for (i = 0; i < l; i++)
-             {
-               n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
-                                     last_node_runtime_indices[i]);
-               cpu_time_now =
-                 dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
-                                VLIB_NODE_STATE_INTERRUPT,
-                                /* frame */ 0,
-                                cpu_time_now);
-             }
-         }
-      }
       /* Input nodes may have added work to the pending vector.
          Process pending vector until there is nothing left.
          All pending vectors will be processed from input -> output. */
@@ -1925,6 +1959,33 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
       /* Record time stamp in case there are no enabled nodes and above
          calls do not update time stamp. */
       cpu_time_now = clib_cpu_time_now ();
+      vm->loops_this_reporting_interval++;
+      now = clib_time_now_internal (&vm->clib_time, cpu_time_now);
+      /* Time to update loops_per_second? */
+      if (PREDICT_FALSE (now >= vm->loop_interval_end))
+       {
+         /* Next sample ends in 20ms */
+         if (vm->loop_interval_start)
+           {
+             f64 this_loops_per_second;
+
+             this_loops_per_second =
+               ((f64) vm->loops_this_reporting_interval) / (now -
+                                                            vm->loop_interval_start);
+
+             vm->loops_per_second =
+               vm->loops_per_second * vm->damping_constant +
+               (1.0 - vm->damping_constant) * this_loops_per_second;
+             if (vm->loops_per_second != 0.0)
+               vm->seconds_per_loop = 1.0 / vm->loops_per_second;
+             else
+               vm->seconds_per_loop = 0.0;
+           }
+         /* New interval starts now, and ends in 20ms */
+         vm->loop_interval_start = now;
+         vm->loop_interval_end = now + 2e-4;
+         vm->loops_this_reporting_interval = 0;
+       }
     }
 }
 
@@ -1957,6 +2018,20 @@ vlib_main_configure (vlib_main_t * vm, unformat_input_t * input)
        ;
       else if (unformat (input, "elog-post-mortem-dump"))
        vm->elog_post_mortem_dump = 1;
+      else if (unformat (input, "buffer-alloc-success-rate %f",
+                        &vm->buffer_alloc_success_rate))
+       {
+         if (VLIB_BUFFER_ALLOC_FAULT_INJECTOR == 0)
+           return clib_error_return
+             (0, "Buffer fault injection not configured");
+       }
+      else if (unformat (input, "buffer-alloc-success-seed %u",
+                        &vm->buffer_alloc_success_seed))
+       {
+         if (VLIB_BUFFER_ALLOC_FAULT_INJECTOR == 0)
+           return clib_error_return
+             (0, "Buffer fault injection not configured");
+       }
       else
        return unformat_parse_error (input);
     }
@@ -2021,8 +2096,6 @@ vlib_main (vlib_main_t * volatile vm, unformat_input_t * input)
 
   vm->queue_signal_callback = dummy_queue_signal_callback;
 
-  clib_time_init (&vm->clib_time);
-
   /* Turn on event log. */
   if (!vm->elog_main.event_ring_size)
     vm->elog_main.event_ring_size = 128 << 10;
@@ -2120,9 +2193,26 @@ vlib_main (vlib_main_t * volatile vm, unformat_input_t * input)
   vec_validate (vm->processing_rpc_requests, 0);
   _vec_len (vm->processing_rpc_requests) = 0;
 
+  /* Default params for the buffer allocator fault injector, if configured */
+  if (VLIB_BUFFER_ALLOC_FAULT_INJECTOR > 0)
+    {
+      vm->buffer_alloc_success_seed = 0xdeaddabe;
+      vm->buffer_alloc_success_rate = 0.80;
+    }
+
   if ((error = vlib_call_all_config_functions (vm, input, 0 /* is_early */ )))
     goto done;
 
+  /*
+   * Use exponential smoothing, with a half-life of 1 second
+   * reported_rate(t) = reported_rate(t-1) * K + rate(t)*(1-K)
+   *
+   * Sample every 20ms, aka 50 samples per second
+   * K = exp (-1.0/20.0);
+   * K = 0.95
+   */
+  vm->damping_constant = exp (-1.0 / 20.0);
+
   /* Sort per-thread init functions before we start threads */
   vlib_sort_init_exit_functions (&vm->worker_init_function_registrations);