API refactoring : gre
[vpp.git] / vlib / vlib / threads.c
index 302e201..c5e58bc 100644 (file)
@@ -13,7 +13,6 @@
  * limitations under the License.
  */
 #define _GNU_SOURCE
-#include <sched.h>
 
 #include <signal.h>
 #include <math.h>
@@ -52,6 +51,7 @@ vl (void *p)
   return vec_len (p);
 }
 
+vlib_worker_thread_t *vlib_worker_threads;
 vlib_thread_main_t vlib_thread_main;
 
 uword
@@ -100,7 +100,7 @@ vlib_set_thread_name (char *name)
     {
       rv = pthread_setname_np (thread, name);
       if (rv)
-        clib_warning ("pthread_setname_np returned %d", rv);
+       clib_warning ("pthread_setname_np returned %d", rv);
     }
 }
 
@@ -132,7 +132,7 @@ vlib_sysfs_list_to_bitmap (char *filename)
          unformat_init_string (&in, (char *) buffer,
                                strlen ((char *) buffer));
          if (unformat (&in, "%U", unformat_bitmap_list, &r) != 1)
-            clib_warning ("unformat_bitmap_list failed");
+           clib_warning ("unformat_bitmap_list failed");
          unformat_free (&in);
        }
       vec_free (buffer);
@@ -212,10 +212,22 @@ vlib_thread_init (vlib_main_t * vm)
   w = vlib_worker_threads;
   w->thread_mheap = clib_mem_get_heap ();
   w->thread_stack = vlib_thread_stacks[0];
-  w->dpdk_lcore_id = -1;
+  w->lcore_id = tm->main_lcore;
   w->lwp = syscall (SYS_gettid);
+  w->thread_id = pthread_self ();
   tm->n_vlib_mains = 1;
 
+  if (tm->sched_policy != ~0)
+    {
+      struct sched_param sched_param;
+      if (!sched_getparam (w->lwp, &sched_param))
+       {
+         if (tm->sched_priority != ~0)
+           sched_param.sched_priority = tm->sched_priority;
+         sched_setscheduler (w->lwp, tm->sched_policy, &sched_param);
+       }
+    }
+
   /* assign threads to cores and set n_vlib_mains */
   tr = tm->next;
 
@@ -276,10 +288,6 @@ vlib_thread_init (vlib_main_t * vm)
   vec_validate_aligned (vlib_worker_threads, first_index - 1,
                        CLIB_CACHE_LINE_BYTES);
 
-
-  tm->efd.enabled = VLIB_EFD_DISABLED;
-  tm->efd.queue_hi_thresh = ((VLIB_EFD_DEF_WORKER_HI_THRESH_PCT *
-                             FRAME_QUEUE_NELTS) / 100);
   return 0;
 }
 
@@ -464,12 +472,15 @@ vlib_worker_thread_init (vlib_worker_thread_t * w)
 {
   vlib_thread_main_t *tm = vlib_get_thread_main ();
 
-  /* worker threads wants no signals. */
-  {
-    sigset_t s;
-    sigfillset (&s);
-    pthread_sigmask (SIG_SETMASK, &s, 0);
-  }
+  /*
+   * Note: disabling signals in worker threads as follows
+   * prevents the api post-mortem dump scheme from working
+   * {
+   *    sigset_t s;
+   *    sigfillset (&s);
+   *    pthread_sigmask (SIG_SETMASK, &s, 0);
+   *  }
+   */
 
   clib_mem_set_heap (w->thread_mheap);
 
@@ -500,15 +511,7 @@ vlib_worker_thread_bootstrap_fn (void *arg)
   vlib_worker_thread_t *w = arg;
 
   w->lwp = syscall (SYS_gettid);
-  w->dpdk_lcore_id = -1;
-#if DPDK==1
-  if (w->registration && !w->registration->use_pthreads && rte_socket_id)      /* do we really have dpdk linked */
-    {
-      unsigned lcore = rte_lcore_id ();
-      lcore = lcore < RTE_MAX_LCORE ? lcore : -1;
-      w->dpdk_lcore_id = lcore;
-    }
-#endif
+  w->thread_id = pthread_self ();
 
   rv = (void *) clib_calljmp
     ((uword (*)(uword)) w->thread_function,
@@ -522,6 +525,7 @@ vlib_launch_thread (void *fp, vlib_worker_thread_t * w, unsigned lcore_id)
 {
   void *(*fp_arg) (void *) = fp;
 
+  w->lcore_id = lcore_id;
 #if DPDK==1
   if (!w->registration->use_pthreads)
     if (rte_eal_remote_launch) /* do we have dpdk linked */
@@ -552,7 +556,6 @@ start_workers (vlib_main_t * vm)
   vlib_worker_thread_t *w;
   vlib_main_t *vm_clone;
   void *oldheap;
-  vlib_frame_queue_t *fq;
   vlib_thread_main_t *tm = &vlib_thread_main;
   vlib_thread_registration_t *tr;
   vlib_node_runtime_t *rt;
@@ -574,15 +577,6 @@ start_workers (vlib_main_t * vm)
       vlib_set_thread_name ((char *) w->name);
     }
 
-#if DPDK==1
-  w->dpdk_lcore_id = -1;
-  if (rte_socket_id)           /* do we really have dpdk linked */
-    {
-      unsigned lcore = rte_lcore_id ();
-      w->dpdk_lcore_id = lcore < RTE_MAX_LCORE ? lcore : -1;;
-    }
-#endif
-
   /*
    * Truth of the matter: we always use at least two
    * threads. So, make the main heap thread-safe
@@ -599,11 +593,6 @@ start_workers (vlib_main_t * vm)
       _vec_len (vlib_mains) = 0;
       vec_add1 (vlib_mains, vm);
 
-      vec_validate (vlib_frame_queues, tm->n_vlib_mains - 1);
-      _vec_len (vlib_frame_queues) = 0;
-      fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS);
-      vec_add1 (vlib_frame_queues, fq);
-
       vlib_worker_threads->wait_at_barrier =
        clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
       vlib_worker_threads->workers_at_barrier =
@@ -650,19 +639,6 @@ start_workers (vlib_main_t * vm)
              if (tr->no_data_structure_clone)
                continue;
 
-             /* Allocate "to-worker-N" frame queue */
-             if (tr->frame_queue_nelts)
-               {
-                 fq = vlib_frame_queue_alloc (tr->frame_queue_nelts);
-               }
-             else
-               {
-                 fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS);
-               }
-
-             vec_validate (vlib_frame_queues, worker_thread_index);
-             vlib_frame_queues[worker_thread_index] = fq;
-
              /* Fork vlib_global_main et al. Look for bugs here */
              oldheap = clib_mem_set_heap (w->thread_mheap);
 
@@ -995,6 +971,20 @@ vlib_worker_thread_node_runtime_update (void)
     }
 }
 
+u32
+unformat_sched_policy (unformat_input_t * input, va_list * args)
+{
+  u32 *r = va_arg (*args, u32 *);
+
+  if (0);
+#define _(v,f,s) else if (unformat (input, s)) *r = SCHED_POLICY_##f;
+  foreach_sched_policy
+#undef _
+    else
+    return 0;
+  return 1;
+}
+
 static clib_error_t *
 cpu_config (vlib_main_t * vm, unformat_input_t * input)
 {
@@ -1007,7 +997,10 @@ cpu_config (vlib_main_t * vm, unformat_input_t * input)
   u32 count;
 
   tm->thread_registrations_by_name = hash_create_string (0, sizeof (uword));
+
   tm->n_thread_stacks = 1;     /* account for main thread */
+  tm->sched_policy = ~0;
+  tm->sched_priority = ~0;
 
   tr = tm->next;
 
@@ -1061,11 +1054,18 @@ cpu_config (vlib_main_t * vm, unformat_input_t * input)
          tr->coremask = bitmap;
          tr->count = clib_bitmap_count_set_bits (tr->coremask);
        }
+      else
+       if (unformat
+           (input, "scheduler-policy %U", unformat_sched_policy,
+            &tm->sched_policy))
+       ;
+      else if (unformat (input, "scheduler-priority %u", &tm->sched_priority))
+       ;
       else if (unformat (input, "%s %u", &name, &count))
        {
          p = hash_get_mem (tm->thread_registrations_by_name, name);
          if (p == 0)
-           return clib_error_return (0, "no such thread type '%s'", name);
+           return clib_error_return (0, "no such thread type '%s'", name);
 
          tr = (vlib_thread_registration_t *) p[0];
          if (tr->fixed_count)
@@ -1077,6 +1077,25 @@ cpu_config (vlib_main_t * vm, unformat_input_t * input)
        break;
     }
 
+  if (tm->sched_priority != ~0)
+    {
+      if (tm->sched_policy == SCHED_FIFO || tm->sched_policy == SCHED_RR)
+       {
+         u32 prio_max = sched_get_priority_max (tm->sched_policy);
+         u32 prio_min = sched_get_priority_min (tm->sched_policy);
+         if (tm->sched_priority > prio_max)
+           tm->sched_priority = prio_max;
+         if (tm->sched_priority < prio_min)
+           tm->sched_priority = prio_min;
+       }
+      else
+       {
+         return clib_error_return
+           (0,
+            "scheduling priority (%d) is not allowed for `normal` scheduling policy",
+            tm->sched_priority);
+       }
+    }
   tr = tm->next;
 
   if (!tm->thread_prefix)
@@ -1203,10 +1222,11 @@ vlib_worker_thread_barrier_release (vlib_main_t * vm)
  * the handoff node.
  */
 static inline int
-vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
+vlib_frame_queue_dequeue_internal (vlib_main_t * vm,
+                                  vlib_frame_queue_main_t * fqm)
 {
   u32 thread_id = vm->cpu_index;
-  vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
+  vlib_frame_queue_t *fq = fqm->vlib_frame_queues[thread_id];
   vlib_frame_queue_elt_t *elt;
   u32 *from, *to;
   vlib_frame_t *f;
@@ -1214,12 +1234,11 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
   int processed = 0;
   u32 n_left_to_node;
   u32 vectors = 0;
-  vlib_thread_main_t *tm = vlib_get_thread_main ();
 
   ASSERT (fq);
   ASSERT (vm == vlib_mains[thread_id]);
 
-  if (PREDICT_FALSE (tm->handoff_dispatch_node_index == ~0))
+  if (PREDICT_FALSE (fqm->node_index == ~0))
     return 0;
   /*
    * Gather trace data for frame queues
@@ -1230,7 +1249,7 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
       frame_queue_nelt_counter_t *fqh;
       u32 elix;
 
-      fqt = &tm->frame_queue_traces[thread_id];
+      fqt = &fqm->frame_queue_traces[thread_id];
 
       fqt->nelts = fq->nelts;
       fqt->head = fq->head;
@@ -1245,7 +1264,7 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
        }
 
       /* Record the number of elements in use in the histogram */
-      fqh = &tm->frame_queue_histogram[thread_id];
+      fqh = &fqm->frame_queue_histogram[thread_id];
       fqh->count[fqt->n_in_use]++;
 
       /* Record a snapshot of the elements in use */
@@ -1282,7 +1301,7 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
       ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME);
       ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE);
 
-      f = vlib_get_frame_to_node (vm, tm->handoff_dispatch_node_index);
+      f = vlib_get_frame_to_node (vm, fqm->node_index);
 
       to = vlib_frame_vector_args (f);
 
@@ -1309,7 +1328,7 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
 
       vectors += elt->n_vectors;
       f->n_vectors = elt->n_vectors;
-      vlib_put_frame_to_node (vm, tm->handoff_dispatch_node_index, f);
+      vlib_put_frame_to_node (vm, fqm->node_index, f);
 
       elt->valid = 0;
       elt->n_vectors = 0;
@@ -1335,13 +1354,18 @@ static_always_inline void
 vlib_worker_thread_internal (vlib_main_t * vm)
 {
   vlib_node_main_t *nm = &vm->node_main;
+  vlib_thread_main_t *tm = vlib_get_thread_main ();
   u64 cpu_time_now = clib_cpu_time_now ();
+  vlib_frame_queue_main_t *fqm;
+
+  vec_alloc (nm->pending_interrupt_node_runtime_indices, 32);
 
   while (1)
     {
       vlib_worker_thread_barrier_check ();
 
-      vlib_frame_queue_dequeue_internal (vm);
+      vec_foreach (fqm, tm->frame_queue_mains)
+       vlib_frame_queue_dequeue_internal (vm, fqm);
 
       vlib_node_runtime_t *n;
       vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT])
@@ -1351,6 +1375,28 @@ vlib_worker_thread_internal (vlib_main_t * vm)
                                      cpu_time_now);
       }
 
+      /* Next handle interrupts. */
+      {
+       uword l = _vec_len (nm->pending_interrupt_node_runtime_indices);
+       uword i;
+       if (l > 0)
+         {
+           _vec_len (nm->pending_interrupt_node_runtime_indices) = 0;
+           for (i = 0; i < l; i++)
+             {
+               n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
+                                     nm->
+                                     pending_interrupt_node_runtime_indices
+                                     [i]);
+               cpu_time_now =
+                 dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
+                                VLIB_NODE_STATE_INTERRUPT,
+                                /* frame */ 0,
+                                cpu_time_now);
+             }
+         }
+      }
+
       if (_vec_len (nm->pending_frames))
        {
          int i;
@@ -1377,7 +1423,6 @@ void
 vlib_worker_thread_fn (void *arg)
 {
   vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg;
-  vlib_thread_main_t *tm = vlib_get_thread_main ();
   vlib_main_t *vm = vlib_get_main ();
 
   ASSERT (vm->cpu_index == os_get_cpu_number ());
@@ -1386,9 +1431,12 @@ vlib_worker_thread_fn (void *arg)
   clib_time_init (&vm->clib_time);
   clib_mem_set_heap (w->thread_mheap);
 
+#if DPDK > 0
   /* Wait until the dpdk init sequence is complete */
+  vlib_thread_main_t *tm = vlib_get_thread_main ();
   while (tm->worker_thread_release == 0)
     vlib_worker_thread_barrier_check ();
+#endif
 
   vlib_worker_thread_internal (vm);
 }
@@ -1401,13 +1449,35 @@ VLIB_REGISTER_THREAD (worker_thread_reg, static) = {
 };
 /* *INDENT-ON* */
 
-clib_error_t *
-threads_init (vlib_main_t * vm)
+u32
+vlib_frame_queue_main_init (u32 node_index, u32 frame_queue_nelts)
 {
   vlib_thread_main_t *tm = vlib_get_thread_main ();
+  vlib_frame_queue_main_t *fqm;
+  vlib_frame_queue_t *fq;
+  int i;
+
+  if (frame_queue_nelts == 0)
+    frame_queue_nelts = FRAME_QUEUE_NELTS;
+
+  vec_add2 (tm->frame_queue_mains, fqm, 1);
 
-  tm->handoff_dispatch_node_index = ~0;
+  fqm->node_index = node_index;
 
+  vec_validate (fqm->vlib_frame_queues, tm->n_vlib_mains - 1);
+  _vec_len (fqm->vlib_frame_queues) = 0;
+  for (i = 0; i < tm->n_vlib_mains; i++)
+    {
+      fq = vlib_frame_queue_alloc (frame_queue_nelts);
+      vec_add1 (fqm->vlib_frame_queues, fq);
+    }
+
+  return (fqm - tm->frame_queue_mains);
+}
+
+clib_error_t *
+threads_init (vlib_main_t * vm)
+{
   return 0;
 }