threads: add support for multiple worker handoff queues 29/3729/5
authorDamjan Marion <damarion@cisco.com>
Tue, 8 Nov 2016 16:37:01 +0000 (17:37 +0100)
committerDave Barach <openvpp@barachs.net>
Tue, 15 Nov 2016 20:27:50 +0000 (20:27 +0000)
Change-Id: I2452df3c493eeb0a5078d53a230df6906651c057
Signed-off-by: Damjan Marion <damarion@cisco.com>
vlib/vlib/threads.c
vlib/vlib/threads.h
vlib/vlib/threads_cli.c
vnet/vnet/devices/dpdk/cli.c
vnet/vnet/handoff.c
vnet/vnet/handoff.h

index 5581d43..70d4019 100644 (file)
@@ -556,7 +556,6 @@ start_workers (vlib_main_t * vm)
   vlib_worker_thread_t *w;
   vlib_main_t *vm_clone;
   void *oldheap;
-  vlib_frame_queue_t *fq;
   vlib_thread_main_t *tm = &vlib_thread_main;
   vlib_thread_registration_t *tr;
   vlib_node_runtime_t *rt;
@@ -594,11 +593,6 @@ start_workers (vlib_main_t * vm)
       _vec_len (vlib_mains) = 0;
       vec_add1 (vlib_mains, vm);
 
-      vec_validate (vlib_frame_queues, tm->n_vlib_mains - 1);
-      _vec_len (vlib_frame_queues) = 0;
-      fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS);
-      vec_add1 (vlib_frame_queues, fq);
-
       vlib_worker_threads->wait_at_barrier =
        clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
       vlib_worker_threads->workers_at_barrier =
@@ -645,19 +639,6 @@ start_workers (vlib_main_t * vm)
              if (tr->no_data_structure_clone)
                continue;
 
-             /* Allocate "to-worker-N" frame queue */
-             if (tr->frame_queue_nelts)
-               {
-                 fq = vlib_frame_queue_alloc (tr->frame_queue_nelts);
-               }
-             else
-               {
-                 fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS);
-               }
-
-             vec_validate (vlib_frame_queues, worker_thread_index);
-             vlib_frame_queues[worker_thread_index] = fq;
-
              /* Fork vlib_global_main et al. Look for bugs here */
              oldheap = clib_mem_set_heap (w->thread_mheap);
 
@@ -1241,10 +1222,11 @@ vlib_worker_thread_barrier_release (vlib_main_t * vm)
  * the handoff node.
  */
 static inline int
-vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
+vlib_frame_queue_dequeue_internal (vlib_main_t * vm,
+                                  vlib_frame_queue_main_t * fqm)
 {
   u32 thread_id = vm->cpu_index;
-  vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
+  vlib_frame_queue_t *fq = fqm->vlib_frame_queues[thread_id];
   vlib_frame_queue_elt_t *elt;
   u32 *from, *to;
   vlib_frame_t *f;
@@ -1252,12 +1234,11 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
   int processed = 0;
   u32 n_left_to_node;
   u32 vectors = 0;
-  vlib_thread_main_t *tm = vlib_get_thread_main ();
 
   ASSERT (fq);
   ASSERT (vm == vlib_mains[thread_id]);
 
-  if (PREDICT_FALSE (tm->handoff_dispatch_node_index == ~0))
+  if (PREDICT_FALSE (fqm->node_index == ~0))
     return 0;
   /*
    * Gather trace data for frame queues
@@ -1268,7 +1249,7 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
       frame_queue_nelt_counter_t *fqh;
       u32 elix;
 
-      fqt = &tm->frame_queue_traces[thread_id];
+      fqt = &fqm->frame_queue_traces[thread_id];
 
       fqt->nelts = fq->nelts;
       fqt->head = fq->head;
@@ -1283,7 +1264,7 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
        }
 
       /* Record the number of elements in use in the histogram */
-      fqh = &tm->frame_queue_histogram[thread_id];
+      fqh = &fqm->frame_queue_histogram[thread_id];
       fqh->count[fqt->n_in_use]++;
 
       /* Record a snapshot of the elements in use */
@@ -1320,7 +1301,7 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
       ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME);
       ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE);
 
-      f = vlib_get_frame_to_node (vm, tm->handoff_dispatch_node_index);
+      f = vlib_get_frame_to_node (vm, fqm->node_index);
 
       to = vlib_frame_vector_args (f);
 
@@ -1347,7 +1328,7 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
 
       vectors += elt->n_vectors;
       f->n_vectors = elt->n_vectors;
-      vlib_put_frame_to_node (vm, tm->handoff_dispatch_node_index, f);
+      vlib_put_frame_to_node (vm, fqm->node_index, f);
 
       elt->valid = 0;
       elt->n_vectors = 0;
@@ -1373,7 +1354,9 @@ static_always_inline void
 vlib_worker_thread_internal (vlib_main_t * vm)
 {
   vlib_node_main_t *nm = &vm->node_main;
+  vlib_thread_main_t *tm = vlib_get_thread_main ();
   u64 cpu_time_now = clib_cpu_time_now ();
+  vlib_frame_queue_main_t *fqm;
 
   vec_alloc (nm->pending_interrupt_node_runtime_indices, 32);
 
@@ -1381,7 +1364,8 @@ vlib_worker_thread_internal (vlib_main_t * vm)
     {
       vlib_worker_thread_barrier_check ();
 
-      vlib_frame_queue_dequeue_internal (vm);
+      vec_foreach (fqm, tm->frame_queue_mains)
+       vlib_frame_queue_dequeue_internal (vm, fqm);
 
       vlib_node_runtime_t *n;
       vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT])
@@ -1465,13 +1449,35 @@ VLIB_REGISTER_THREAD (worker_thread_reg, static) = {
 };
 /* *INDENT-ON* */
 
-clib_error_t *
-threads_init (vlib_main_t * vm)
+u32
+vlib_frame_queue_main_init (u32 node_index, u32 frame_queue_nelts)
 {
   vlib_thread_main_t *tm = vlib_get_thread_main ();
+  vlib_frame_queue_main_t *fqm;
+  vlib_frame_queue_t *fq;
+  int i;
+
+  if (frame_queue_nelts == 0)
+    frame_queue_nelts = FRAME_QUEUE_NELTS;
 
-  tm->handoff_dispatch_node_index = ~0;
+  vec_add2 (tm->frame_queue_mains, fqm, 1);
 
+  fqm->node_index = node_index;
+
+  vec_validate (fqm->vlib_frame_queues, tm->n_vlib_mains - 1);
+  _vec_len (fqm->vlib_frame_queues) = 0;
+  for (i = 0; i < tm->n_vlib_mains; i++)
+    {
+      fq = vlib_frame_queue_alloc (frame_queue_nelts);
+      vec_add1 (fqm->vlib_frame_queues, fq);
+    }
+
+  return (fqm - tm->frame_queue_mains);
+}
+
+clib_error_t *
+threads_init (vlib_main_t * vm)
+{
   return 0;
 }
 
index e264435..c2db864 100644 (file)
@@ -141,7 +141,15 @@ typedef struct
 }
 vlib_frame_queue_t;
 
-vlib_frame_queue_t **vlib_frame_queues;
+typedef struct
+{
+  u32 node_index;
+  vlib_frame_queue_t **vlib_frame_queues;
+
+  /* for frame queue tracing */
+  frame_queue_trace_t *frame_queue_traces;
+  frame_queue_nelt_counter_t *frame_queue_histogram;
+} vlib_frame_queue_main_t;
 
 /* Called early, in thread 0's context */
 clib_error_t *vlib_thread_init (vlib_main_t * vm);
@@ -170,6 +178,7 @@ void vlib_create_worker_threads (vlib_main_t * vm, int n,
                                 void (*thread_function) (void *));
 
 void vlib_worker_thread_init (vlib_worker_thread_t * w);
+u32 vlib_frame_queue_main_init (u32 node_index, u32 frame_queue_nelts);
 
 /* Check for a barrier sync request every 30ms */
 #define BARRIER_SYNC_DELAY (0.030000)
@@ -321,12 +330,8 @@ typedef struct
 
   vlib_efd_t efd;
 
-  /* handoff node index */
-  u32 handoff_dispatch_node_index;
-
-  /* for frame queue tracing */
-  frame_queue_trace_t *frame_queue_traces;
-  frame_queue_nelt_counter_t *frame_queue_histogram;
+  /* Worker handoff queues */
+  vlib_frame_queue_main_t *frame_queue_mains;
 
   /* worker thread initialization barrier */
   volatile u32 worker_thread_release;
@@ -388,6 +393,94 @@ vlib_get_worker_vlib_main (u32 worker_index)
   return vm;
 }
 
+static inline void
+vlib_put_frame_queue_elt (vlib_frame_queue_elt_t * hf)
+{
+  CLIB_MEMORY_BARRIER ();
+  hf->valid = 1;
+}
+
+static inline vlib_frame_queue_elt_t *
+vlib_get_frame_queue_elt (u32 frame_queue_index, u32 index)
+{
+  vlib_frame_queue_t *fq;
+  vlib_frame_queue_elt_t *elt;
+  vlib_thread_main_t *tm = &vlib_thread_main;
+  vlib_frame_queue_main_t *fqm =
+    vec_elt_at_index (tm->frame_queue_mains, frame_queue_index);
+  u64 new_tail;
+
+  fq = fqm->vlib_frame_queues[index];
+  ASSERT (fq);
+
+  new_tail = __sync_add_and_fetch (&fq->tail, 1);
+
+  /* Wait until a ring slot is available */
+  while (new_tail >= fq->head_hint + fq->nelts)
+    vlib_worker_thread_barrier_check ();
+
+  elt = fq->elts + (new_tail & (fq->nelts - 1));
+
+  /* this would be very bad... */
+  while (elt->valid)
+    ;
+
+  elt->msg_type = VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME;
+  elt->last_n_vectors = elt->n_vectors = 0;
+
+  return elt;
+}
+
+static inline vlib_frame_queue_t *
+is_vlib_frame_queue_congested (u32 frame_queue_index,
+                              u32 index,
+                              u32 queue_hi_thresh,
+                              vlib_frame_queue_t **
+                              handoff_queue_by_worker_index)
+{
+  vlib_frame_queue_t *fq;
+  vlib_thread_main_t *tm = &vlib_thread_main;
+  vlib_frame_queue_main_t *fqm =
+    vec_elt_at_index (tm->frame_queue_mains, frame_queue_index);
+
+  fq = handoff_queue_by_worker_index[index];
+  if (fq != (vlib_frame_queue_t *) (~0))
+    return fq;
+
+  fq = fqm->vlib_frame_queues[index];
+  ASSERT (fq);
+
+  if (PREDICT_FALSE (fq->tail >= (fq->head_hint + queue_hi_thresh)))
+    {
+      /* a valid entry in the array will indicate the queue has reached
+       * the specified threshold and is congested
+       */
+      handoff_queue_by_worker_index[index] = fq;
+      fq->enqueue_full_events++;
+      return fq;
+    }
+
+  return NULL;
+}
+
+static inline vlib_frame_queue_elt_t *
+vlib_get_worker_handoff_queue_elt (u32 frame_queue_index,
+                                  u32 vlib_worker_index,
+                                  vlib_frame_queue_elt_t **
+                                  handoff_queue_elt_by_worker_index)
+{
+  vlib_frame_queue_elt_t *elt;
+
+  if (handoff_queue_elt_by_worker_index[vlib_worker_index])
+    return handoff_queue_elt_by_worker_index[vlib_worker_index];
+
+  elt = vlib_get_frame_queue_elt (frame_queue_index, vlib_worker_index);
+
+  handoff_queue_elt_by_worker_index[vlib_worker_index] = elt;
+
+  return elt;
+}
+
 #endif /* included_vlib_threads_h */
 
 /*
index 70bf729..aef6757 100644 (file)
@@ -157,28 +157,48 @@ static clib_error_t *
 trace_frame_queue (vlib_main_t * vm, unformat_input_t * input,
                   vlib_cli_command_t * cmd)
 {
+  unformat_input_t _line_input, *line_input = &_line_input;
   clib_error_t *error = NULL;
   frame_queue_trace_t *fqt;
   frame_queue_nelt_counter_t *fqh;
   vlib_thread_main_t *tm = vlib_get_thread_main ();
+  vlib_frame_queue_main_t *fqm;
   u32 num_fq;
   u32 fqix;
-  u32 enable = 0;
+  u32 enable = 2;
+  u32 index = ~(u32) 0;
 
-  if (unformat (input, "on"))
-    {
-      enable = 1;
-    }
-  else if (unformat (input, "off"))
-    {
-      enable = 0;
-    }
-  else
+  if (!unformat_user (input, unformat_line_input, line_input))
+    return 0;
+
+  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
     {
-      return clib_error_return (0, "expecting on or off");
+      if (unformat (line_input, "on"))
+       enable = 1;
+      else if (unformat (line_input, "off"))
+       enable = 0;
+      else if (unformat (line_input, "index %u"), &index)
+       ;
+      else
+       return clib_error_return (0, "parse error: '%U'",
+                                 format_unformat_error, line_input);
     }
 
-  num_fq = vec_len (vlib_frame_queues);
+  unformat_free (line_input);
+
+  if (enable > 1)
+    return clib_error_return (0, "expecting on or off");
+
+  if (vec_len (tm->frame_queue_mains) == 0)
+    return clib_error_return (0, "no worker handoffs exist");
+
+  if (index > vec_len (tm->frame_queue_mains) - 1)
+    return clib_error_return (0,
+                             "expecting valid worker handoff queue index");
+
+  fqm = vec_elt_at_index (tm->frame_queue_mains, index);
+
+  num_fq = vec_len (fqm->vlib_frame_queues);
   if (num_fq == 0)
     {
       vlib_cli_output (vm, "No frame queues exist\n");
@@ -186,20 +206,20 @@ trace_frame_queue (vlib_main_t * vm, unformat_input_t * input,
     }
 
   // Allocate storage for trace if necessary
-  vec_validate_aligned (tm->frame_queue_traces, num_fq - 1,
+  vec_validate_aligned (fqm->frame_queue_traces, num_fq - 1,
                        CLIB_CACHE_LINE_BYTES);
-  vec_validate_aligned (tm->frame_queue_histogram, num_fq - 1,
+  vec_validate_aligned (fqm->frame_queue_histogram, num_fq - 1,
                        CLIB_CACHE_LINE_BYTES);
 
   for (fqix = 0; fqix < num_fq; fqix++)
     {
-      fqt = &tm->frame_queue_traces[fqix];
-      fqh = &tm->frame_queue_histogram[fqix];
+      fqt = &fqm->frame_queue_traces[fqix];
+      fqh = &fqm->frame_queue_histogram[fqix];
 
       memset (fqt->n_vectors, 0xff, sizeof (fqt->n_vectors));
       fqt->written = 0;
       memset (fqh, 0, sizeof (*fqh));
-      vlib_frame_queues[fqix]->trace = enable;
+      fqm->vlib_frame_queues[fqix]->trace = enable;
     }
   return error;
 }
@@ -236,16 +256,16 @@ compute_percent (u64 * two_counters, u64 total)
  * Display frame queue trace data gathered by threads.
  */
 static clib_error_t *
-show_frame_queue_internal (vlib_main_t * vm, u32 histogram)
+show_frame_queue_internal (vlib_main_t * vm,
+                          vlib_frame_queue_main_t * fqm, u32 histogram)
 {
-  vlib_thread_main_t *tm = vlib_get_thread_main ();
   clib_error_t *error = NULL;
   frame_queue_trace_t *fqt;
   frame_queue_nelt_counter_t *fqh;
   u32 num_fq;
   u32 fqix;
 
-  num_fq = vec_len (tm->frame_queue_traces);
+  num_fq = vec_len (fqm->frame_queue_traces);
   if (num_fq == 0)
     {
       vlib_cli_output (vm, "No trace data for frame queues\n");
@@ -260,7 +280,7 @@ show_frame_queue_internal (vlib_main_t * vm, u32 histogram)
 
   for (fqix = 0; fqix < num_fq; fqix++)
     {
-      fqt = &(tm->frame_queue_traces[fqix]);
+      fqt = &(fqm->frame_queue_traces[fqix]);
 
       vlib_cli_output (vm, "Thread %d %v\n", fqix,
                       vlib_worker_threads[fqix].name);
@@ -273,7 +293,7 @@ show_frame_queue_internal (vlib_main_t * vm, u32 histogram)
 
       if (histogram)
        {
-         fqh = &(tm->frame_queue_histogram[fqix]);
+         fqh = &(fqm->frame_queue_histogram[fqix]);
          u32 nelt;
          u64 total = 0;
 
@@ -350,14 +370,40 @@ static clib_error_t *
 show_frame_queue_trace (vlib_main_t * vm, unformat_input_t * input,
                        vlib_cli_command_t * cmd)
 {
-  return show_frame_queue_internal (vm, 0);
+  vlib_thread_main_t *tm = vlib_get_thread_main ();
+  vlib_frame_queue_main_t *fqm;
+  clib_error_t *error;
+
+  vec_foreach (fqm, tm->frame_queue_mains)
+  {
+    vlib_cli_output (vm, "Worker handoff queue index %u (next node '%U'):",
+                    fqm - tm->frame_queue_mains,
+                    format_vlib_node_name, vm, fqm->node_index);
+    error = show_frame_queue_internal (vm, fqm, 0);
+    if (error)
+      return error;
+  }
+  return 0;
 }
 
 static clib_error_t *
 show_frame_queue_histogram (vlib_main_t * vm, unformat_input_t * input,
                            vlib_cli_command_t * cmd)
 {
-  return show_frame_queue_internal (vm, 1);
+  vlib_thread_main_t *tm = vlib_get_thread_main ();
+  vlib_frame_queue_main_t *fqm;
+  clib_error_t *error;
+
+  vec_foreach (fqm, tm->frame_queue_mains)
+  {
+    vlib_cli_output (vm, "Worker handoff queue index %u (next node '%U'):",
+                    fqm - tm->frame_queue_mains,
+                    format_vlib_node_name, vm, fqm->node_index);
+    error = show_frame_queue_internal (vm, fqm, 1);
+    if (error)
+      return error;
+  }
+  return 0;
 }
 
 /* *INDENT-OFF* */
@@ -384,18 +430,43 @@ static clib_error_t *
 test_frame_queue_nelts (vlib_main_t * vm, unformat_input_t * input,
                        vlib_cli_command_t * cmd)
 {
+  unformat_input_t _line_input, *line_input = &_line_input;
+  vlib_thread_main_t *tm = vlib_get_thread_main ();
+  vlib_frame_queue_main_t *fqm;
   clib_error_t *error = NULL;
   u32 num_fq;
   u32 fqix;
   u32 nelts = 0;
+  u32 index = ~(u32) 0;
+
+  if (!unformat_user (input, unformat_line_input, line_input))
+    return 0;
+
+  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (line_input, "nelts %u"), &nelts)
+       ;
+      else if (unformat (line_input, "index %u"), &index)
+       ;
+      else
+       return clib_error_return (0, "parse error: '%U'",
+                                 format_unformat_error, line_input);
+    }
+
+  unformat_free (line_input);
+
+  if (index > vec_len (tm->frame_queue_mains) - 1)
+    return clib_error_return (0,
+                             "expecting valid worker handoff queue index");
+
+  fqm = vec_elt_at_index (tm->frame_queue_mains, index);
 
-  if ((unformat (input, "%d", &nelts) != 1) ||
-      ((nelts != 4) && (nelts != 8) && (nelts != 16) && (nelts != 32)))
+  if ((nelts != 4) && (nelts != 8) && (nelts != 16) && (nelts != 32))
     {
       return clib_error_return (0, "expecting 4,8,16,32");
     }
 
-  num_fq = vec_len (vlib_frame_queues);
+  num_fq = vec_len (fqm->vlib_frame_queues);
   if (num_fq == 0)
     {
       vlib_cli_output (vm, "No frame queues exist\n");
@@ -404,7 +475,7 @@ test_frame_queue_nelts (vlib_main_t * vm, unformat_input_t * input,
 
   for (fqix = 0; fqix < num_fq; fqix++)
     {
-      vlib_frame_queues[fqix]->nelts = nelts;
+      fqm->vlib_frame_queues[fqix]->nelts = nelts;
     }
 
   return error;
@@ -426,15 +497,39 @@ static clib_error_t *
 test_frame_queue_threshold (vlib_main_t * vm, unformat_input_t * input,
                            vlib_cli_command_t * cmd)
 {
+  unformat_input_t _line_input, *line_input = &_line_input;
+  vlib_thread_main_t *tm = vlib_get_thread_main ();
+  vlib_frame_queue_main_t *fqm;
   clib_error_t *error = NULL;
   u32 num_fq;
   u32 fqix;
-  u32 threshold = 0;
+  u32 threshold = ~(u32) 0;
+  u32 index = ~(u32) 0;
+
+  if (!unformat_user (input, unformat_line_input, line_input))
+    return 0;
 
-  if (unformat (input, "%d", &threshold))
+  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
     {
+      if (unformat (line_input, "threshold %u"), &threshold)
+       ;
+      else if (unformat (line_input, "index %u"), &index)
+       ;
+      else
+       return clib_error_return (0, "parse error: '%U'",
+                                 format_unformat_error, line_input);
     }
-  else
+
+  unformat_free (line_input);
+
+  if (index > vec_len (tm->frame_queue_mains) - 1)
+    return clib_error_return (0,
+                             "expecting valid worker handoff queue index");
+
+  fqm = vec_elt_at_index (tm->frame_queue_mains, index);
+
+
+  if (threshold == ~(u32) 0)
     {
       vlib_cli_output (vm, "expecting threshold value\n");
       return error;
@@ -443,7 +538,7 @@ test_frame_queue_threshold (vlib_main_t * vm, unformat_input_t * input,
   if (threshold == 0)
     threshold = ~0;
 
-  num_fq = vec_len (vlib_frame_queues);
+  num_fq = vec_len (fqm->vlib_frame_queues);
   if (num_fq == 0)
     {
       vlib_cli_output (vm, "No frame queues exist\n");
@@ -452,7 +547,7 @@ test_frame_queue_threshold (vlib_main_t * vm, unformat_input_t * input,
 
   for (fqix = 0; fqix < num_fq; fqix++)
     {
-      vlib_frame_queues[fqix]->vector_threshold = threshold;
+      fqm->vlib_frame_queues[fqix]->vector_threshold = threshold;
     }
 
   return error;
index 8f99b27..5e53a98 100644 (file)
@@ -346,9 +346,7 @@ show_efd (vlib_main_t * vm,
   else if (unformat (input, "worker"))
     {
       vlib_thread_main_t *tm = vlib_get_thread_main ();
-      vlib_frame_queue_t *fq;
       vlib_thread_registration_t *tr;
-      int thread_id;
       u32 num_workers = 0;
       u32 first_worker_index = 0;
       uword *p;
@@ -364,27 +362,9 @@ show_efd (vlib_main_t * vm,
 
       vlib_cli_output (vm,
                       "num_workers               %d\n"
-                      "first_worker_index        %d\n"
-                      "vlib_frame_queues[%d]:\n",
-                      num_workers, first_worker_index, tm->n_vlib_mains);
+                      "first_worker_index        %d\n",
+                      num_workers, first_worker_index);
 
-      for (thread_id = 0; thread_id < tm->n_vlib_mains; thread_id++)
-       {
-         fq = vlib_frame_queues[thread_id];
-         if (fq)
-           {
-             vlib_cli_output (vm,
-                              "%2d: frames_queued         %u\n"
-                              "    frames_queued_hint    %u\n"
-                              "    enqueue_full_events   %u\n"
-                              "    enqueue_efd_discards  %u\n",
-                              thread_id,
-                              (fq->tail - fq->head),
-                              (fq->tail - fq->head_hint),
-                              fq->enqueue_full_events,
-                              fq->enqueue_efd_discards);
-           }
-       }
     }
   else if (unformat (input, "help"))
     {
@@ -413,9 +393,6 @@ clear_efd (vlib_main_t * vm,
 {
   dpdk_main_t *dm = &dpdk_main;
   dpdk_device_t *xd;
-  vlib_thread_main_t *tm = vlib_get_thread_main ();
-  vlib_frame_queue_t *fq;
-  int thread_id;
 
     /* *INDENT-OFF* */
     vec_foreach (xd, dm->devices)
@@ -432,16 +409,6 @@ clear_efd (vlib_main_t * vm,
       }
     /* *INDENT-ON* */
 
-  for (thread_id = 0; thread_id < tm->n_vlib_mains; thread_id++)
-    {
-      fq = vlib_frame_queues[thread_id];
-      if (fq)
-       {
-         fq->enqueue_full_events = 0;
-         fq->enqueue_efd_discards = 0;
-       }
-    }
-
   return 0;
 }
 
index 5593aa7..22d2ea9 100644 (file)
@@ -34,12 +34,16 @@ typedef struct
 
   per_inteface_handoff_data_t *if_data;
 
+  /* Worker handoff index */
+  u32 frame_queue_index;
+
   /* convenience variables */
   vlib_main_t *vlib_main;
   vnet_main_t *vnet_main;
 } handoff_main_t;
 
 handoff_main_t handoff_main;
+vlib_node_registration_t handoff_dispatch_node;
 
 typedef struct
 {
@@ -147,8 +151,9 @@ worker_handoff_node_fn (vlib_main_t * vm,
          if (hf)
            hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_worker;
 
-         hf = dpdk_get_handoff_queue_elt (next_worker_index,
-                                          handoff_queue_elt_by_worker_index);
+         hf = vlib_get_worker_handoff_queue_elt (hm->frame_queue_index,
+                                                 next_worker_index,
+                                                 handoff_queue_elt_by_worker_index);
 
          n_left_to_next_worker = VLIB_FRAME_SIZE - hf->n_vectors;
          to_next_worker = &hf->buffer_index[hf->n_vectors];
@@ -163,7 +168,7 @@ worker_handoff_node_fn (vlib_main_t * vm,
       if (n_left_to_next_worker == 0)
        {
          hf->n_vectors = VLIB_FRAME_SIZE;
-         vlib_put_handoff_queue_elt (hf);
+         vlib_put_frame_queue_elt (hf);
          current_worker_index = ~0;
          handoff_queue_elt_by_worker_index[next_worker_index] = 0;
          hf = 0;
@@ -196,7 +201,7 @@ worker_handoff_node_fn (vlib_main_t * vm,
           */
          if (1 || hf->n_vectors == hf->last_n_vectors)
            {
-             vlib_put_handoff_queue_elt (hf);
+             vlib_put_frame_queue_elt (hf);
              handoff_queue_elt_by_worker_index[i] = 0;
            }
          else
@@ -247,6 +252,10 @@ interface_handoff_enable_disable (vlib_main_t * vm, u32 sw_if_index,
   if (clib_bitmap_last_set (bitmap) >= hm->num_workers)
     return VNET_API_ERROR_INVALID_WORKER;
 
+  if (hm->frame_queue_index == ~0)
+    hm->frame_queue_index =
+      vlib_frame_queue_main_init (handoff_dispatch_node.index, 0);
+
   vec_validate (hm->if_data, sw_if_index);
   d = vec_elt_at_index (hm->if_data, sw_if_index);
 
@@ -356,9 +365,6 @@ format_handoff_dispatch_trace (u8 * s, va_list * args)
   return s;
 }
 
-
-vlib_node_registration_t handoff_dispatch_node;
-
 #define foreach_handoff_dispatch_error \
 _(EXAMPLE, "example packets")
 
@@ -556,8 +562,7 @@ handoff_init (vlib_main_t * vm)
   hm->vlib_main = vm;
   hm->vnet_main = &vnet_main;
 
-  ASSERT (tm->handoff_dispatch_node_index == ~0);
-  tm->handoff_dispatch_node_index = handoff_dispatch_node.index;
+  hm->frame_queue_index = ~0;
 
   return 0;
 }
index 9320f56..4fefb36 100644 (file)
@@ -32,85 +32,6 @@ typedef enum
   HANDOFF_DISPATCH_N_NEXT,
 } handoff_dispatch_next_t;
 
-static inline void
-vlib_put_handoff_queue_elt (vlib_frame_queue_elt_t * hf)
-{
-  CLIB_MEMORY_BARRIER ();
-  hf->valid = 1;
-}
-
-static inline vlib_frame_queue_elt_t *
-vlib_get_handoff_queue_elt (u32 vlib_worker_index)
-{
-  vlib_frame_queue_t *fq;
-  vlib_frame_queue_elt_t *elt;
-  u64 new_tail;
-
-  fq = vlib_frame_queues[vlib_worker_index];
-  ASSERT (fq);
-
-  new_tail = __sync_add_and_fetch (&fq->tail, 1);
-
-  /* Wait until a ring slot is available */
-  while (new_tail >= fq->head_hint + fq->nelts)
-    vlib_worker_thread_barrier_check ();
-
-  elt = fq->elts + (new_tail & (fq->nelts - 1));
-
-  /* this would be very bad... */
-  while (elt->valid)
-    ;
-
-  elt->msg_type = VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME;
-  elt->last_n_vectors = elt->n_vectors = 0;
-
-  return elt;
-}
-
-static inline vlib_frame_queue_t *
-is_vlib_handoff_queue_congested (u32 vlib_worker_index,
-                                u32 queue_hi_thresh,
-                                vlib_frame_queue_t **
-                                handoff_queue_by_worker_index)
-{
-  vlib_frame_queue_t *fq;
-
-  fq = handoff_queue_by_worker_index[vlib_worker_index];
-  if (fq != (vlib_frame_queue_t *) (~0))
-    return fq;
-
-  fq = vlib_frame_queues[vlib_worker_index];
-  ASSERT (fq);
-
-  if (PREDICT_FALSE (fq->tail >= (fq->head_hint + queue_hi_thresh)))
-    {
-      /* a valid entry in the array will indicate the queue has reached
-       * the specified threshold and is congested
-       */
-      handoff_queue_by_worker_index[vlib_worker_index] = fq;
-      fq->enqueue_full_events++;
-      return fq;
-    }
-
-  return NULL;
-}
-
-static inline vlib_frame_queue_elt_t *
-dpdk_get_handoff_queue_elt (u32 vlib_worker_index,
-                           vlib_frame_queue_elt_t **
-                           handoff_queue_elt_by_worker_index)
-{
-  vlib_frame_queue_elt_t *elt;
-
-  if (handoff_queue_elt_by_worker_index[vlib_worker_index])
-    return handoff_queue_elt_by_worker_index[vlib_worker_index];
-
-  elt = vlib_get_handoff_queue_elt (vlib_worker_index);
-
-  handoff_queue_elt_by_worker_index[vlib_worker_index] = elt;
-
-  return elt;
-}
 
 static inline u64
 ipv4_get_key (ip4_header_t * ip)