threads: add support for multiple worker handoff queues
[vpp.git] / vlib / vlib / threads.c
index 5581d43..70d4019 100644 (file)
@@ -556,7 +556,6 @@ start_workers (vlib_main_t * vm)
   vlib_worker_thread_t *w;
   vlib_main_t *vm_clone;
   void *oldheap;
-  vlib_frame_queue_t *fq;
   vlib_thread_main_t *tm = &vlib_thread_main;
   vlib_thread_registration_t *tr;
   vlib_node_runtime_t *rt;
@@ -594,11 +593,6 @@ start_workers (vlib_main_t * vm)
       _vec_len (vlib_mains) = 0;
       vec_add1 (vlib_mains, vm);
 
-      vec_validate (vlib_frame_queues, tm->n_vlib_mains - 1);
-      _vec_len (vlib_frame_queues) = 0;
-      fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS);
-      vec_add1 (vlib_frame_queues, fq);
-
       vlib_worker_threads->wait_at_barrier =
        clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
       vlib_worker_threads->workers_at_barrier =
@@ -645,19 +639,6 @@ start_workers (vlib_main_t * vm)
              if (tr->no_data_structure_clone)
                continue;
 
-             /* Allocate "to-worker-N" frame queue */
-             if (tr->frame_queue_nelts)
-               {
-                 fq = vlib_frame_queue_alloc (tr->frame_queue_nelts);
-               }
-             else
-               {
-                 fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS);
-               }
-
-             vec_validate (vlib_frame_queues, worker_thread_index);
-             vlib_frame_queues[worker_thread_index] = fq;
-
              /* Fork vlib_global_main et al. Look for bugs here */
              oldheap = clib_mem_set_heap (w->thread_mheap);
 
@@ -1241,10 +1222,11 @@ vlib_worker_thread_barrier_release (vlib_main_t * vm)
  * the handoff node.
  */
 static inline int
-vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
+vlib_frame_queue_dequeue_internal (vlib_main_t * vm,
+                                  vlib_frame_queue_main_t * fqm)
 {
   u32 thread_id = vm->cpu_index;
-  vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
+  vlib_frame_queue_t *fq = fqm->vlib_frame_queues[thread_id];
   vlib_frame_queue_elt_t *elt;
   u32 *from, *to;
   vlib_frame_t *f;
@@ -1252,12 +1234,11 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
   int processed = 0;
   u32 n_left_to_node;
   u32 vectors = 0;
-  vlib_thread_main_t *tm = vlib_get_thread_main ();
 
   ASSERT (fq);
   ASSERT (vm == vlib_mains[thread_id]);
 
-  if (PREDICT_FALSE (tm->handoff_dispatch_node_index == ~0))
+  if (PREDICT_FALSE (fqm->node_index == ~0))
     return 0;
   /*
    * Gather trace data for frame queues
@@ -1268,7 +1249,7 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
       frame_queue_nelt_counter_t *fqh;
       u32 elix;
 
-      fqt = &tm->frame_queue_traces[thread_id];
+      fqt = &fqm->frame_queue_traces[thread_id];
 
       fqt->nelts = fq->nelts;
       fqt->head = fq->head;
@@ -1283,7 +1264,7 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
        }
 
       /* Record the number of elements in use in the histogram */
-      fqh = &tm->frame_queue_histogram[thread_id];
+      fqh = &fqm->frame_queue_histogram[thread_id];
       fqh->count[fqt->n_in_use]++;
 
       /* Record a snapshot of the elements in use */
@@ -1320,7 +1301,7 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
       ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME);
       ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE);
 
-      f = vlib_get_frame_to_node (vm, tm->handoff_dispatch_node_index);
+      f = vlib_get_frame_to_node (vm, fqm->node_index);
 
       to = vlib_frame_vector_args (f);
 
@@ -1347,7 +1328,7 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
 
       vectors += elt->n_vectors;
       f->n_vectors = elt->n_vectors;
-      vlib_put_frame_to_node (vm, tm->handoff_dispatch_node_index, f);
+      vlib_put_frame_to_node (vm, fqm->node_index, f);
 
       elt->valid = 0;
       elt->n_vectors = 0;
@@ -1373,7 +1354,9 @@ static_always_inline void
 vlib_worker_thread_internal (vlib_main_t * vm)
 {
   vlib_node_main_t *nm = &vm->node_main;
+  vlib_thread_main_t *tm = vlib_get_thread_main ();
   u64 cpu_time_now = clib_cpu_time_now ();
+  vlib_frame_queue_main_t *fqm;
 
   vec_alloc (nm->pending_interrupt_node_runtime_indices, 32);
 
@@ -1381,7 +1364,8 @@ vlib_worker_thread_internal (vlib_main_t * vm)
     {
       vlib_worker_thread_barrier_check ();
 
-      vlib_frame_queue_dequeue_internal (vm);
+      vec_foreach (fqm, tm->frame_queue_mains)
+       vlib_frame_queue_dequeue_internal (vm, fqm);
 
       vlib_node_runtime_t *n;
       vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT])
@@ -1465,13 +1449,35 @@ VLIB_REGISTER_THREAD (worker_thread_reg, static) = {
 };
 /* *INDENT-ON* */
 
-clib_error_t *
-threads_init (vlib_main_t * vm)
+u32
+vlib_frame_queue_main_init (u32 node_index, u32 frame_queue_nelts)
 {
   vlib_thread_main_t *tm = vlib_get_thread_main ();
+  vlib_frame_queue_main_t *fqm;
+  vlib_frame_queue_t *fq;
+  int i;
+
+  if (frame_queue_nelts == 0)
+    frame_queue_nelts = FRAME_QUEUE_NELTS;
 
-  tm->handoff_dispatch_node_index = ~0;
+  vec_add2 (tm->frame_queue_mains, fqm, 1);
 
+  fqm->node_index = node_index;
+
+  vec_validate (fqm->vlib_frame_queues, tm->n_vlib_mains - 1);
+  _vec_len (fqm->vlib_frame_queues) = 0;
+  for (i = 0; i < tm->n_vlib_mains; i++)
+    {
+      fq = vlib_frame_queue_alloc (frame_queue_nelts);
+      vec_add1 (fqm->vlib_frame_queues, fq);
+    }
+
+  return (fqm - tm->frame_queue_mains);
+}
+
+clib_error_t *
+threads_init (vlib_main_t * vm)
+{
   return 0;
 }