Run interior graph nodes before process nodes
[vpp.git] / src / vlib / main.c
index fd9937f..6783068 100644 (file)
@@ -460,7 +460,7 @@ vlib_put_next_frame (vlib_main_t * vm,
   vlib_frame_t *f;
   u32 n_vectors_in_frame;
 
-  if (vm->buffer_main->callbacks_registered == 0 && CLIB_DEBUG > 0)
+  if (buffer_main.callbacks_registered == 0 && CLIB_DEBUG > 0)
     vlib_put_next_frame_validate (vm, r, next_index, n_vectors_left);
 
   nf = vlib_node_runtime_get_next_frame (vm, r, next_index);
@@ -1276,6 +1276,7 @@ dispatch_process (vlib_main_t * vm,
   vlib_node_main_t *nm = &vm->node_main;
   vlib_node_runtime_t *node_runtime = &p->node_runtime;
   vlib_node_t *node = vlib_get_node (vm, node_runtime->node_index);
+  u32 old_process_index;
   u64 t;
   uword n_vectors, is_suspend;
 
@@ -1291,11 +1292,12 @@ dispatch_process (vlib_main_t * vm,
                             f ? f->n_vectors : 0, /* is_after */ 0);
 
   /* Save away current process for suspend. */
+  old_process_index = nm->current_process_index;
   nm->current_process_index = node->runtime_index;
 
   n_vectors = vlib_process_startup (vm, p, f);
 
-  nm->current_process_index = ~0;
+  nm->current_process_index = old_process_index;
 
   ASSERT (n_vectors != VLIB_PROCESS_RETURN_LONGJMP_RETURN);
   is_suspend = n_vectors == VLIB_PROCESS_RETURN_LONGJMP_SUSPEND;
@@ -1423,6 +1425,13 @@ dispatch_suspended_process (vlib_main_t * vm,
   return t;
 }
 
+void vl_api_send_pending_rpc_requests (vlib_main_t *) __attribute__ ((weak));
+void
+vl_api_send_pending_rpc_requests (vlib_main_t * vm)
+{
+}
+
+
 static_always_inline void
 vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
 {
@@ -1475,6 +1484,9 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
     {
       vlib_node_runtime_t *n;
 
+      if (PREDICT_FALSE (_vec_len (vm->pending_rpc_requests) > 0))
+       vl_api_send_pending_rpc_requests (vm);
+
       if (!is_main)
        {
          vlib_worker_thread_barrier_check ();
@@ -1483,13 +1495,12 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
        }
 
       /* Process pre-input nodes. */
-      if (is_main)
-       vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT])
-         cpu_time_now = dispatch_node (vm, n,
-                                       VLIB_NODE_TYPE_PRE_INPUT,
-                                       VLIB_NODE_STATE_POLLING,
-                                       /* frame */ 0,
-                                       cpu_time_now);
+      vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT])
+       cpu_time_now = dispatch_node (vm, n,
+                                     VLIB_NODE_TYPE_PRE_INPUT,
+                                     VLIB_NODE_STATE_POLLING,
+                                     /* frame */ 0,
+                                     cpu_time_now);
 
       /* Next process input nodes. */
       vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT])
@@ -1504,13 +1515,19 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
 
       /* Next handle interrupts. */
       {
+       /* unlocked read, for performance */
        uword l = _vec_len (nm->pending_interrupt_node_runtime_indices);
        uword i;
-       if (l > 0)
+       if (PREDICT_FALSE (l > 0))
          {
            u32 *tmp;
            if (!is_main)
-             clib_spinlock_lock (&nm->pending_interrupt_lock);
+             {
+               clib_spinlock_lock (&nm->pending_interrupt_lock);
+               /* Re-read w/ lock held, in case another thread added an item */
+               l = _vec_len (nm->pending_interrupt_node_runtime_indices);
+             }
+
            tmp = nm->pending_interrupt_node_runtime_indices;
            nm->pending_interrupt_node_runtime_indices =
              last_node_runtime_indices;
@@ -1530,6 +1547,13 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
              }
          }
       }
+      /* Input nodes may have added work to the pending vector.
+         Process pending vector until there is nothing left.
+         All pending vectors will be processed from input -> output. */
+      for (i = 0; i < _vec_len (nm->pending_frames); i++)
+       cpu_time_now = dispatch_pending_node (vm, i, cpu_time_now);
+      /* Reset pending vector for next iteration. */
+      _vec_len (nm->pending_frames) = 0;
 
       if (is_main)
        {
@@ -1548,7 +1572,6 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
            {
              uword i;
 
-           processes_timing_wheel_data:
              for (i = 0; i < _vec_len (nm->data_from_advancing_timing_wheel);
                   i++)
                {
@@ -1591,19 +1614,6 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
              _vec_len (nm->data_from_advancing_timing_wheel) = 0;
            }
        }
-
-      /* Input nodes may have added work to the pending vector.
-         Process pending vector until there is nothing left.
-         All pending vectors will be processed from input -> output. */
-      for (i = 0; i < _vec_len (nm->pending_frames); i++)
-       cpu_time_now = dispatch_pending_node (vm, i, cpu_time_now);
-      /* Reset pending vector for next iteration. */
-      _vec_len (nm->pending_frames) = 0;
-
-      /* Pending internal nodes may resume processes. */
-      if (is_main && _vec_len (nm->data_from_advancing_timing_wheel) > 0)
-       goto processes_timing_wheel_data;
-
       vlib_increment_main_loop_counter (vm);
 
       /* Record time stamp in case there are no enabled nodes and above
@@ -1661,6 +1671,18 @@ dummy_queue_signal_callback (vlib_main_t * vm)
 {
 }
 
+#define foreach_weak_reference_stub             \
+_(vlib_map_stat_segment_init)                   \
+_(vpe_api_init)                                 \
+_(vlibmemory_init)                              \
+_(map_api_segment_init)
+
+#define _(name)                                                 \
+clib_error_t *name (vlib_main_t *vm) __attribute__((weak));     \
+clib_error_t *name (vlib_main_t *vm) { return 0; }
+foreach_weak_reference_stub;
+#undef _
+
 /* Main function. */
 int
 vlib_main (vlib_main_t * volatile vm, unformat_input_t * input)
@@ -1700,6 +1722,12 @@ vlib_main (vlib_main_t * volatile vm, unformat_input_t * input)
       goto done;
     }
 
+  if ((error = vlib_map_stat_segment_init (vm)))
+    {
+      clib_error_report (error);
+      goto done;
+    }
+
   /* Register static nodes so that init functions may use them. */
   vlib_register_all_static_nodes (vm);
 
@@ -1719,6 +1747,25 @@ vlib_main (vlib_main_t * volatile vm, unformat_input_t * input)
        goto done;
     }
 
+  /* Direct call / weak reference, for vlib standalone use-cases */
+  if ((error = vpe_api_init (vm)))
+    {
+      clib_error_report (error);
+      goto done;
+    }
+
+  if ((error = vlibmemory_init (vm)))
+    {
+      clib_error_report (error);
+      goto done;
+    }
+
+  if ((error = map_api_segment_init (vm)))
+    {
+      clib_error_report (error);
+      goto done;
+    }
+
   /* See unix/main.c; most likely already set up */
   if (vm->init_functions_called == 0)
     vm->init_functions_called = hash_create (0, /* value bytes */ 0);
@@ -1726,9 +1773,8 @@ vlib_main (vlib_main_t * volatile vm, unformat_input_t * input)
     goto done;
 
   /* Create default buffer free list. */
-  vlib_buffer_get_or_create_free_list (vm,
-                                      VLIB_BUFFER_DEFAULT_FREE_LIST_BYTES,
-                                      "default");
+  vlib_buffer_create_free_list (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_BYTES,
+                               "default");
 
   nm->timing_wheel = clib_mem_alloc_aligned (sizeof (TWT (tw_timer_wheel)),
                                             CLIB_CACHE_LINE_BYTES);
@@ -1742,6 +1788,9 @@ vlib_main (vlib_main_t * volatile vm, unformat_input_t * input)
                            10e-6 /* timer period 10us */ ,
                            ~0 /* max expirations per call */ );
 
+  vec_validate (vm->pending_rpc_requests, 0);
+  _vec_len (vm->pending_rpc_requests) = 0;
+
   switch (clib_setjmp (&vm->main_loop_exit, VLIB_MAIN_LOOP_EXIT_NONE))
     {
     case VLIB_MAIN_LOOP_EXIT_NONE: