Decouple worker thread code from dpdk, enable worker threads in vpp_lite
[vpp.git] / vlib / vlib / threads.c
index 1808f36..d2ce449 100644 (file)
@@ -51,11 +51,6 @@ u32 vl(void *p)
   return vec_len (p);
 }
 
-void debug_hex_bytes (u8 *s, u32 n)
-{
-    fformat (stderr, "%U\n", format_hex_bytes, s, n);
-}
-
 vlib_thread_main_t vlib_thread_main;
 
 uword
@@ -1150,66 +1145,210 @@ void vlib_worker_thread_barrier_release(vlib_main_t * vm)
     }
 }
 
-static clib_error_t *
-show_threads_fn (vlib_main_t * vm,
-       unformat_input_t * input,
-       vlib_cli_command_t * cmd)
+/*
+ * Check the frame queue to see if any frames are available.
+ * If so, pull the packets off the frames and put them to 
+ * the handoff node.
+ */
+static inline int vlib_frame_queue_dequeue_internal (vlib_main_t *vm)
 {
-  vlib_worker_thread_t * w;
-  int i;
+  u32 thread_id = vm->cpu_index;
+  vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
+  vlib_frame_queue_elt_t *elt;
+  u32 * from, * to;
+  vlib_frame_t * f;
+  int msg_type;
+  int processed = 0;
+  u32 n_left_to_node;
+  u32 vectors = 0;
+  vlib_thread_main_t *tm = vlib_get_thread_main();
 
-  vlib_cli_output (vm, "%-7s%-20s%-12s%-8s%-7s%-7s%-7s%-10s",
-                   "ID", "Name", "Type", "LWP",
-                   "lcore", "Core", "Socket", "State");
+  ASSERT (fq);
+  ASSERT(vm == vlib_mains[thread_id]);
 
-#if !defined(__powerpc64__)
-  for (i = 0; i < vec_len(vlib_worker_threads); i++)
+  if (PREDICT_FALSE (tm->handoff_dispatch_node_index == ~0))
+      return 0;
+  /*
+   * Gather trace data for frame queues
+   */
+  if (PREDICT_FALSE(fq->trace))
     {
-      w = vlib_worker_threads + i;
-      u8 * line = NULL;
+      frame_queue_trace_t *fqt;
+      frame_queue_nelt_counter_t *fqh;
+      u32 elix;
+
+      fqt = &tm->frame_queue_traces[thread_id];
+
+      fqt->nelts = fq->nelts;
+      fqt->head = fq->head;
+      fqt->head_hint = fq->head_hint;
+      fqt->tail = fq->tail;
+      fqt->threshold = fq->vector_threshold;
+      fqt->n_in_use = fqt->tail - fqt->head;
+      if (fqt->n_in_use >= fqt->nelts){
+        // if beyond max then use max
+        fqt->n_in_use = fqt->nelts-1;
+      }
 
-      line = format(line, "%-7d%-20s%-12s%-8d",
-                    i,
-                    w->name ? w->name : (u8 *) "",
-                    w->registration ? w->registration->name : "",
-                    w->lwp);
+      /* Record the number of elements in use in the histogram */
+      fqh = &tm->frame_queue_histogram[thread_id];
+      fqh->count[ fqt->n_in_use ]++;
 
-#if DPDK==1
-      int lcore = w->dpdk_lcore_id;
-      if (lcore > -1)
+      /* Record a snapshot of the elements in use */
+      for (elix=0; elix<fqt->nelts; elix++) {
+        elt = fq->elts + ((fq->head+1 + elix) & (fq->nelts-1));
+        if (1 || elt->valid) 
+          {
+            fqt->n_vectors[elix] = elt->n_vectors;
+          }
+      }
+      fqt->written = 1;
+    }
+
+  while (1)
+    {
+      if (fq->head == fq->tail)
         {
-          line = format(line, "%-7u%-7u%-7u",
-                        lcore,
-                        lcore_config[lcore].core_id,
-                        lcore_config[lcore].socket_id);
+          fq->head_hint = fq->head;
+          return processed;
+        }
 
-          switch(lcore_config[lcore].state)
-            {
-              case WAIT:
-                line = format(line, "wait");
-                break;
-              case RUNNING:
-                line = format(line, "running");
-                break;
-              case FINISHED:
-                line = format(line, "finished");
-                break;
-              default:
-                line = format(line, "unknown");
-            }
+      elt = fq->elts + ((fq->head+1) & (fq->nelts-1));
+
+      if (!elt->valid)
+        {
+          fq->head_hint = fq->head;
+          return processed;
+        }
+
+      from = elt->buffer_index;
+      msg_type = elt->msg_type;
+
+      ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME);
+      ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE);
+
+      f = vlib_get_frame_to_node (vm, tm->handoff_dispatch_node_index);
+
+      to = vlib_frame_vector_args (f);
+
+      n_left_to_node = elt->n_vectors;
+
+      while (n_left_to_node >= 4)
+        {
+          to[0] = from[0];
+          to[1] = from[1];
+          to[2] = from[2];
+          to[3] = from[3];
+          to += 4;
+          from += 4;
+          n_left_to_node -= 4;
+        }
+
+      while (n_left_to_node > 0)
+        {
+          to[0] = from[0];
+          to++;
+          from++;
+          n_left_to_node--;
+        }
+
+      vectors += elt->n_vectors;
+      f->n_vectors = elt->n_vectors;
+      vlib_put_frame_to_node (vm, tm->handoff_dispatch_node_index, f);
+
+      elt->valid = 0;
+      elt->n_vectors = 0;
+      elt->msg_type = 0xfefefefe;
+      CLIB_MEMORY_BARRIER();
+      fq->head++;
+      processed++;
+
+      /*
+       * Limit the number of packets pushed into the graph
+       */
+      if (vectors >= fq->vector_threshold)
+        {
+          fq->head_hint = fq->head;
+          return processed;
         }
-#endif
-      vlib_cli_output(vm, "%v", line);
-      vec_free(line);
     }
-#endif
+  ASSERT(0);
+  return processed;
+}
 
-  return 0;
+static_always_inline void
+vlib_worker_thread_internal (vlib_main_t *vm)
+{
+  vlib_node_main_t * nm = &vm->node_main;
+  u64 cpu_time_now = clib_cpu_time_now ();
+
+  while (1)
+    {
+      vlib_worker_thread_barrier_check ();
+
+      vlib_frame_queue_dequeue_internal (vm);
+
+      vlib_node_runtime_t * n;
+      vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT])
+       {
+         cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
+                                       VLIB_NODE_STATE_POLLING, /* frame */ 0,
+                                       cpu_time_now);
+       }
+
+      if (_vec_len (nm->pending_frames))
+        {
+          int i;
+          cpu_time_now = clib_cpu_time_now ();
+          for (i = 0; i < _vec_len (nm->pending_frames); i++) {
+            vlib_pending_frame_t *p;
+
+            p = nm->pending_frames + i;
+
+            cpu_time_now = dispatch_pending_node (vm, p, cpu_time_now);
+          }
+          _vec_len (nm->pending_frames) = 0;
+        }
+      vlib_increment_main_loop_counter (vm);
+
+      /* Record time stamp in case there are no enabled nodes and above
+         calls do not update time stamp. */
+      cpu_time_now = clib_cpu_time_now ();
+    }
 }
 
+void vlib_worker_thread_fn (void * arg)
+{
+  vlib_worker_thread_t * w = (vlib_worker_thread_t *) arg;
+  vlib_thread_main_t * tm = vlib_get_thread_main();
+  vlib_main_t * vm = vlib_get_main();
+
+  ASSERT(vm->cpu_index == os_get_cpu_number());
+
+  vlib_worker_thread_init (w);
+  clib_time_init (&vm->clib_time);
+  clib_mem_set_heap (w->thread_mheap);
+
+  /* Wait until the dpdk init sequence is complete */
+  while (tm->worker_thread_release == 0)
+    vlib_worker_thread_barrier_check ();
+
+  vlib_worker_thread_internal(vm);
+}
 
-VLIB_CLI_COMMAND (show_threads_command, static) = {
-  .path = "show threads",
-  .short_help = "Show threads",
-  .function = show_threads_fn,
+VLIB_REGISTER_THREAD (worker_thread_reg, static) = {
+  .name = "workers",
+  .short_name = "wk",
+  .function = vlib_worker_thread_fn,
 };
+
+clib_error_t *threads_init (vlib_main_t *vm)
+{
+  vlib_thread_main_t * tm = vlib_get_thread_main();
+
+  tm->handoff_dispatch_node_index = ~0;
+
+  return 0;
+}
+
+VLIB_INIT_FUNCTION (threads_init);