Use thread local storage for thread index
[vpp.git] / src / vlib / threads.c
index c5e58bc..4a111f8 100644 (file)
 #include <vlib/threads.h>
 #include <vlib/unix/cj.h>
 
-
-#if DPDK==1
-#include <rte_config.h>
-#include <rte_common.h>
-#include <rte_eal.h>
-#include <rte_launch.h>
-#include <rte_lcore.h>
-#endif
 DECLARE_CJ_GLOBAL_LOG;
 
 #define FRAME_QUEUE_NELTS 32
 
-
-#if DPDK==1
-/*
- *  Weak definitions of DPDK symbols used in this file.
- *  Needed for linking test programs without DPDK libs.
- */
-unsigned __thread __attribute__ ((weak)) RTE_PER_LCORE (_lcore_id);
-struct lcore_config __attribute__ ((weak)) lcore_config[];
-unsigned __attribute__ ((weak)) rte_socket_id ();
-int __attribute__ ((weak)) rte_eal_remote_launch ();
-#endif
 u32
 vl (void *p)
 {
@@ -54,27 +35,12 @@ vl (void *p)
 vlib_worker_thread_t *vlib_worker_threads;
 vlib_thread_main_t vlib_thread_main;
 
+__thread uword vlib_thread_index = 0;
+
 uword
 os_get_cpu_number (void)
 {
-  void *sp;
-  uword n;
-  u32 len;
-
-  len = vec_len (vlib_thread_stacks);
-  if (len == 0)
-    return 0;
-
-  /* Get any old stack address. */
-  sp = &sp;
-
-  n = ((uword) sp - (uword) vlib_thread_stacks[0])
-    >> VLIB_LOG2_THREAD_STACK_SIZE;
-
-  /* "processes" have their own stacks, and they always run in thread 0 */
-  n = n >= len ? 0 : n;
-
-  return n;
+  return vlib_thread_index;
 }
 
 uword
@@ -194,14 +160,17 @@ vlib_thread_init (vlib_main_t * vm)
     tm->cpu_socket_bitmap = clib_bitmap_set (0, 0, 1);
 
   /* pin main thread to main_lcore  */
-#if DPDK==0
-  {
-    cpu_set_t cpuset;
-    CPU_ZERO (&cpuset);
-    CPU_SET (tm->main_lcore, &cpuset);
-    pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
-  }
-#endif
+  if (tm->cb.vlib_thread_set_lcore_cb)
+    {
+      tm->cb.vlib_thread_set_lcore_cb (0, tm->main_lcore);
+    }
+  else
+    {
+      cpu_set_t cpuset;
+      CPU_ZERO (&cpuset);
+      CPU_SET (tm->main_lcore, &cpuset);
+      pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
+    }
 
   /* as many threads as stacks... */
   vec_validate_aligned (vlib_worker_threads, vec_len (vlib_thread_stacks) - 1,
@@ -291,21 +260,6 @@ vlib_thread_init (vlib_main_t * vm)
   return 0;
 }
 
-vlib_worker_thread_t *
-vlib_alloc_thread (vlib_main_t * vm)
-{
-  vlib_worker_thread_t *w;
-
-  if (vec_len (vlib_worker_threads) >= vec_len (vlib_thread_stacks))
-    {
-      clib_warning ("out of worker threads... Quitting...");
-      exit (1);
-    }
-  vec_add2 (vlib_worker_threads, w, 1);
-  w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
-  return w;
-}
-
 vlib_frame_queue_t *
 vlib_frame_queue_alloc (int nelts)
 {
@@ -443,7 +397,7 @@ vlib_frame_queue_enqueue (vlib_main_t * vm, u32 node_runtime_index,
       f64 b4 = vlib_time_now_ticks (vm, before);
       vlib_worker_thread_barrier_check (vm, b4);
       /* Bad idea. Dequeue -> enqueue -> dequeue -> trouble */
-      // vlib_frame_queue_dequeue (vm->cpu_index, vm, nm);
+      // vlib_frame_queue_dequeue (vm->thread_index, vm, nm);
     }
 
   elt = fq->elts + (new_tail & (fq->nelts - 1));
@@ -513,6 +467,8 @@ vlib_worker_thread_bootstrap_fn (void *arg)
   w->lwp = syscall (SYS_gettid);
   w->thread_id = pthread_self ();
 
+  vlib_thread_index = w - vlib_worker_threads;
+
   rv = (void *) clib_calljmp
     ((uword (*)(uword)) w->thread_function,
      (uword) arg, w->thread_stack + VLIB_THREAD_STACK_SIZE);
@@ -520,32 +476,29 @@ vlib_worker_thread_bootstrap_fn (void *arg)
   return rv;
 }
 
-static int
-vlib_launch_thread (void *fp, vlib_worker_thread_t * w, unsigned lcore_id)
+static clib_error_t *
+vlib_launch_thread_int (void *fp, vlib_worker_thread_t * w, unsigned lcore_id)
 {
+  vlib_thread_main_t *tm = &vlib_thread_main;
   void *(*fp_arg) (void *) = fp;
 
   w->lcore_id = lcore_id;
-#if DPDK==1
-  if (!w->registration->use_pthreads)
-    if (rte_eal_remote_launch) /* do we have dpdk linked */
-      return rte_eal_remote_launch (fp, (void *) w, lcore_id);
-    else
-      return -1;
+  if (tm->cb.vlib_launch_thread_cb && !w->registration->use_pthreads)
+    return tm->cb.vlib_launch_thread_cb (fp, (void *) w, lcore_id);
   else
-#endif
     {
-      int ret;
       pthread_t worker;
       cpu_set_t cpuset;
       CPU_ZERO (&cpuset);
       CPU_SET (lcore_id, &cpuset);
 
-      ret = pthread_create (&worker, NULL /* attr */ , fp_arg, (void *) w);
-      if (ret == 0)
-       return pthread_setaffinity_np (worker, sizeof (cpu_set_t), &cpuset);
-      else
-       return ret;
+      if (pthread_create (&worker, NULL /* attr */ , fp_arg, (void *) w))
+       return clib_error_return_unix (0, "pthread_create");
+
+      if (pthread_setaffinity_np (worker, sizeof (cpu_set_t), &cpuset))
+       return clib_error_return_unix (0, "pthread_setaffinity_np");
+
+      return 0;
     }
 }
 
@@ -589,9 +542,13 @@ start_workers (vlib_main_t * vm)
 
   if (n_vlib_mains > 1)
     {
-      vec_validate (vlib_mains, tm->n_vlib_mains - 1);
+      /* Replace hand-crafted length-1 vector with a real vector */
+      vlib_mains = 0;
+
+      vec_validate_aligned (vlib_mains, tm->n_vlib_mains - 1,
+                           CLIB_CACHE_LINE_BYTES);
       _vec_len (vlib_mains) = 0;
-      vec_add1 (vlib_mains, vm);
+      vec_add1_aligned (vlib_mains, vm, CLIB_CACHE_LINE_BYTES);
 
       vlib_worker_threads->wait_at_barrier =
        clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
@@ -625,7 +582,9 @@ start_workers (vlib_main_t * vm)
                  mheap_alloc (0 /* use VM */ , tr->mheap_size);
              else
                w->thread_mheap = main_heap;
-             w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
+
+             w->thread_stack =
+               vlib_thread_stack_init (w - vlib_worker_threads);
              w->thread_function = tr->function;
              w->thread_function_arg = w;
              w->instance_id = k;
@@ -645,9 +604,11 @@ start_workers (vlib_main_t * vm)
              vm_clone = clib_mem_alloc (sizeof (*vm_clone));
              clib_memcpy (vm_clone, vlib_mains[0], sizeof (*vm_clone));
 
-             vm_clone->cpu_index = worker_thread_index;
+             vm_clone->thread_index = worker_thread_index;
              vm_clone->heap_base = w->thread_mheap;
              vm_clone->mbuf_alloc_list = 0;
+             vm_clone->init_functions_called =
+               hash_create (0, /* value bytes */ 0);
              memset (&vm_clone->random_buffer, 0,
                      sizeof (vm_clone->random_buffer));
 
@@ -689,11 +650,29 @@ start_workers (vlib_main_t * vm)
                }
              nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
                vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
+             vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
+             {
+               vlib_node_t *n = vlib_get_node (vm, rt->node_index);
+               rt->thread_index = vm_clone->thread_index;
+               /* copy initial runtime_data from node */
+               if (n->runtime_data && n->runtime_data_bytes > 0)
+                 clib_memcpy (rt->runtime_data, n->runtime_data,
+                              clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
+                                        n->runtime_data_bytes));
+             }
 
              nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
                vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
              vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
-               rt->cpu_index = vm_clone->cpu_index;
+             {
+               vlib_node_t *n = vlib_get_node (vm, rt->node_index);
+               rt->thread_index = vm_clone->thread_index;
+               /* copy initial runtime_data from node */
+               if (n->runtime_data && n->runtime_data_bytes > 0)
+                 clib_memcpy (rt->runtime_data, n->runtime_data,
+                              clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
+                                        n->runtime_data_bytes));
+             }
 
              nm_clone->processes = vec_dup (nm->processes);
 
@@ -704,7 +683,7 @@ start_workers (vlib_main_t * vm)
              /* Packet trace buffers are guaranteed to be empty, nothing to do here */
 
              clib_mem_set_heap (oldheap);
-             vec_add1 (vlib_mains, vm_clone);
+             vec_add1_aligned (vlib_mains, vm_clone, CLIB_CACHE_LINE_BYTES);
 
              vm_clone->error_main.counters =
                vec_dup (vlib_mains[0]->error_main.counters);
@@ -727,8 +706,7 @@ start_workers (vlib_main_t * vm)
                                     == fl_clone - bm_clone->buffer_free_list_pool);
 
                             fl_clone[0] = fl_orig[0];
-                            fl_clone->aligned_buffers = 0;
-                            fl_clone->unaligned_buffers = 0;
+                            fl_clone->buffers = 0;
                             fl_clone->n_alloc = 0;
                           }));
 /* *INDENT-ON* */
@@ -752,7 +730,8 @@ start_workers (vlib_main_t * vm)
                  mheap_alloc (0 /* use VM */ , tr->mheap_size);
              else
                w->thread_mheap = main_heap;
-             w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
+             w->thread_stack =
+               vlib_thread_stack_init (w - vlib_worker_threads);
              w->thread_function = tr->function;
              w->thread_function_arg = w;
              w->instance_id = j;
@@ -769,6 +748,7 @@ start_workers (vlib_main_t * vm)
 
   for (i = 0; i < vec_len (tm->registrations); i++)
     {
+      clib_error_t *err;
       int j;
 
       tr = tm->registrations[i];
@@ -778,22 +758,24 @@ start_workers (vlib_main_t * vm)
          for (j = 0; j < tr->count; j++)
            {
              w = vlib_worker_threads + worker_thread_index++;
-             if (vlib_launch_thread (vlib_worker_thread_bootstrap_fn, w, 0) <
-                 0)
-               clib_warning ("Couldn't start '%s' pthread ", tr->name);
+             err = vlib_launch_thread_int (vlib_worker_thread_bootstrap_fn,
+                                           w, 0);
+             if (err)
+               clib_error_report (err);
            }
        }
       else
        {
          uword c;
-            /* *INDENT-OFF* */
-            clib_bitmap_foreach (c, tr->coremask, ({
-              w = vlib_worker_threads + worker_thread_index++;
-              if (vlib_launch_thread (vlib_worker_thread_bootstrap_fn, w, c) < 0)
-                clib_warning ("Couldn't start DPDK lcore %d", c);
-
-            }));
-/* *INDENT-ON* */
+          /* *INDENT-OFF* */
+          clib_bitmap_foreach (c, tr->coremask, ({
+            w = vlib_worker_threads + worker_thread_index++;
+           err = vlib_launch_thread_int (vlib_worker_thread_bootstrap_fn,
+                                         w, c);
+           if (err)
+             clib_error_report (err);
+          }));
+          /* *INDENT-ON* */
        }
     }
   vlib_worker_thread_barrier_sync (vm);
@@ -820,15 +802,15 @@ vlib_worker_thread_node_runtime_update (void)
                                  uword n_calls,
                                  uword n_vectors, uword n_clocks);
 
-  ASSERT (os_get_cpu_number () == 0);
+  ASSERT (vlib_get_thread_index () == 0);
 
-  if (vec_len (vlib_mains) == 0)
+  if (vec_len (vlib_mains) == 1)
     return;
 
   vm = vlib_mains[0];
   nm = &vm->node_main;
 
-  ASSERT (os_get_cpu_number () == 0);
+  ASSERT (vlib_get_thread_index () == 0);
   ASSERT (*vlib_worker_threads->wait_at_barrier == 1);
 
   /*
@@ -939,26 +921,55 @@ vlib_worker_thread_node_runtime_update (void)
        clib_mem_free (old_nodes_clone[j]);
       vec_free (old_nodes_clone);
 
-      vec_free (nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
 
+      /* re-clone internal nodes */
+      old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL];
       nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
        vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
 
-      /* clone input node runtime */
-      old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT];
+      vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
+      {
+       vlib_node_t *n = vlib_get_node (vm, rt->node_index);
+       rt->thread_index = vm_clone->thread_index;
+       /* copy runtime_data, will be overwritten later for existing rt */
+       if (n->runtime_data && n->runtime_data_bytes > 0)
+         clib_memcpy (rt->runtime_data, n->runtime_data,
+                      clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
+                                n->runtime_data_bytes));
+      }
+
+      for (j = 0; j < vec_len (old_rt); j++)
+       {
+         rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
+         rt->state = old_rt[j].state;
+         clib_memcpy (rt->runtime_data, old_rt[j].runtime_data,
+                      VLIB_NODE_RUNTIME_DATA_SIZE);
+       }
+
+      vec_free (old_rt);
 
+      /* re-clone input nodes */
+      old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT];
       nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
        vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
 
       vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
       {
-       rt->cpu_index = vm_clone->cpu_index;
+       vlib_node_t *n = vlib_get_node (vm, rt->node_index);
+       rt->thread_index = vm_clone->thread_index;
+       /* copy runtime_data, will be overwritten later for existing rt */
+       if (n->runtime_data && n->runtime_data_bytes > 0)
+         clib_memcpy (rt->runtime_data, n->runtime_data,
+                      clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
+                                n->runtime_data_bytes));
       }
 
       for (j = 0; j < vec_len (old_rt); j++)
        {
          rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
          rt->state = old_rt[j].state;
+         clib_memcpy (rt->runtime_data, old_rt[j].runtime_data,
+                      VLIB_NODE_RUNTIME_DATA_SIZE);
        }
 
       vec_free (old_rt);
@@ -1105,7 +1116,7 @@ cpu_config (vlib_main_t * vm, unformat_input_t * input)
     {
       tm->n_thread_stacks += tr->count;
       tm->n_pthreads += tr->count * tr->use_pthreads;
-      tm->n_eal_threads += tr->count * (tr->use_pthreads == 0);
+      tm->n_threads += tr->count * (tr->use_pthreads == 0);
       tr = tr->next;
     }
 
@@ -1144,7 +1155,7 @@ vlib_worker_thread_fork_fixup (vlib_fork_fixup_t which)
   if (vlib_mains == 0)
     return;
 
-  ASSERT (os_get_cpu_number () == 0);
+  ASSERT (vlib_get_thread_index () == 0);
   vlib_worker_thread_barrier_sync (vm);
 
   switch (which)
@@ -1165,7 +1176,7 @@ vlib_worker_thread_barrier_sync (vlib_main_t * vm)
   f64 deadline;
   u32 count;
 
-  if (!vlib_mains)
+  if (vec_len (vlib_mains) < 2)
     return;
 
   count = vec_len (vlib_mains) - 1;
@@ -1176,7 +1187,7 @@ vlib_worker_thread_barrier_sync (vlib_main_t * vm)
 
   vlib_worker_threads[0].barrier_sync_count++;
 
-  ASSERT (os_get_cpu_number () == 0);
+  ASSERT (vlib_get_thread_index () == 0);
 
   deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT;
 
@@ -1196,7 +1207,7 @@ vlib_worker_thread_barrier_release (vlib_main_t * vm)
 {
   f64 deadline;
 
-  if (!vlib_mains)
+  if (vec_len (vlib_mains) < 2)
     return;
 
   if (--vlib_worker_threads[0].recursion_level > 0)
@@ -1221,11 +1232,10 @@ vlib_worker_thread_barrier_release (vlib_main_t * vm)
  * If so, pull the packets off the frames and put them to
  * the handoff node.
  */
-static inline int
-vlib_frame_queue_dequeue_internal (vlib_main_t * vm,
-                                  vlib_frame_queue_main_t * fqm)
+int
+vlib_frame_queue_dequeue (vlib_main_t * vm, vlib_frame_queue_main_t * fqm)
 {
-  u32 thread_id = vm->cpu_index;
+  u32 thread_id = vm->thread_index;
   vlib_frame_queue_t *fq = fqm->vlib_frame_queues[thread_id];
   vlib_frame_queue_elt_t *elt;
   u32 *from, *to;
@@ -1350,95 +1360,30 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm,
   return processed;
 }
 
-static_always_inline void
-vlib_worker_thread_internal (vlib_main_t * vm)
-{
-  vlib_node_main_t *nm = &vm->node_main;
-  vlib_thread_main_t *tm = vlib_get_thread_main ();
-  u64 cpu_time_now = clib_cpu_time_now ();
-  vlib_frame_queue_main_t *fqm;
-
-  vec_alloc (nm->pending_interrupt_node_runtime_indices, 32);
-
-  while (1)
-    {
-      vlib_worker_thread_barrier_check ();
-
-      vec_foreach (fqm, tm->frame_queue_mains)
-       vlib_frame_queue_dequeue_internal (vm, fqm);
-
-      vlib_node_runtime_t *n;
-      vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT])
-      {
-       cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
-                                     VLIB_NODE_STATE_POLLING, /* frame */ 0,
-                                     cpu_time_now);
-      }
-
-      /* Next handle interrupts. */
-      {
-       uword l = _vec_len (nm->pending_interrupt_node_runtime_indices);
-       uword i;
-       if (l > 0)
-         {
-           _vec_len (nm->pending_interrupt_node_runtime_indices) = 0;
-           for (i = 0; i < l; i++)
-             {
-               n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
-                                     nm->
-                                     pending_interrupt_node_runtime_indices
-                                     [i]);
-               cpu_time_now =
-                 dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
-                                VLIB_NODE_STATE_INTERRUPT,
-                                /* frame */ 0,
-                                cpu_time_now);
-             }
-         }
-      }
-
-      if (_vec_len (nm->pending_frames))
-       {
-         int i;
-         cpu_time_now = clib_cpu_time_now ();
-         for (i = 0; i < _vec_len (nm->pending_frames); i++)
-           {
-             vlib_pending_frame_t *p;
-
-             p = nm->pending_frames + i;
-
-             cpu_time_now = dispatch_pending_node (vm, p, cpu_time_now);
-           }
-         _vec_len (nm->pending_frames) = 0;
-       }
-      vlib_increment_main_loop_counter (vm);
-
-      /* Record time stamp in case there are no enabled nodes and above
-         calls do not update time stamp. */
-      cpu_time_now = clib_cpu_time_now ();
-    }
-}
-
 void
 vlib_worker_thread_fn (void *arg)
 {
   vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg;
+  vlib_thread_main_t *tm = vlib_get_thread_main ();
   vlib_main_t *vm = vlib_get_main ();
+  clib_error_t *e;
 
-  ASSERT (vm->cpu_index == os_get_cpu_number ());
+  ASSERT (vm->thread_index == vlib_get_thread_index ());
 
   vlib_worker_thread_init (w);
   clib_time_init (&vm->clib_time);
   clib_mem_set_heap (w->thread_mheap);
 
-#if DPDK > 0
   /* Wait until the dpdk init sequence is complete */
-  vlib_thread_main_t *tm = vlib_get_thread_main ();
-  while (tm->worker_thread_release == 0)
+  while (tm->extern_thread_mgmt && tm->worker_thread_release == 0)
     vlib_worker_thread_barrier_check ();
-#endif
 
-  vlib_worker_thread_internal (vm);
+  e = vlib_call_init_exit_functions
+    (vm, vm->worker_init_function_registrations, 1 /* call_once */ );
+  if (e)
+    clib_error_report (e);
+
+  vlib_worker_loop (vm);
 }
 
 /* *INDENT-OFF* */
@@ -1475,6 +1420,20 @@ vlib_frame_queue_main_init (u32 node_index, u32 frame_queue_nelts)
   return (fqm - tm->frame_queue_mains);
 }
 
+
+int
+vlib_thread_cb_register (struct vlib_main_t *vm, vlib_thread_callbacks_t * cb)
+{
+  vlib_thread_main_t *tm = vlib_get_thread_main ();
+
+  if (tm->extern_thread_mgmt)
+    return -1;
+
+  tm->cb.vlib_launch_thread_cb = cb->vlib_launch_thread_cb;
+  tm->extern_thread_mgmt = 1;
+  return 0;
+}
+
 clib_error_t *
 threads_init (vlib_main_t * vm)
 {