vppinfra: refactor clib_timebase_t
[vpp.git] / src / vlib / threads.c
1 /*
2  * Copyright (c) 2015 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define _GNU_SOURCE
16
17 #include <signal.h>
18 #include <math.h>
19 #include <vppinfra/format.h>
20 #include <vppinfra/time_range.h>
21 #include <vppinfra/linux/sysfs.h>
22 #include <vlib/vlib.h>
23
24 #include <vlib/threads.h>
25 #include <vlib/unix/cj.h>
26
27 #include <vlib/stat_weak_inlines.h>
28
29 DECLARE_CJ_GLOBAL_LOG;
30
31
32 u32
33 vl (void *p)
34 {
35   return vec_len (p);
36 }
37
38 vlib_worker_thread_t *vlib_worker_threads;
39 vlib_thread_main_t vlib_thread_main;
40
41 /*
42  * Barrier tracing can be enabled on a normal build to collect information
43  * on barrier use, including timings and call stacks.  Deliberately not
44  * keyed off CLIB_DEBUG, because that can add significant overhead which
45  * imapacts observed timings.
46  */
47
48 static inline void
49 barrier_trace_sync (f64 t_entry, f64 t_open, f64 t_closed)
50 {
51   if (!vlib_worker_threads->barrier_elog_enabled)
52     return;
53
54     /* *INDENT-OFF* */
55     ELOG_TYPE_DECLARE (e) =
56       {
57         .format = "bar-trace-%s-#%d",
58         .format_args = "T4i4",
59       };
60     /* *INDENT-ON* */
61   struct
62   {
63     u32 caller, count, t_entry, t_open, t_closed;
64   } *ed = 0;
65
66   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
67   ed->count = (int) vlib_worker_threads[0].barrier_sync_count;
68   ed->caller = elog_string (&vlib_global_main.elog_main,
69                             (char *) vlib_worker_threads[0].barrier_caller);
70   ed->t_entry = (int) (1000000.0 * t_entry);
71   ed->t_open = (int) (1000000.0 * t_open);
72   ed->t_closed = (int) (1000000.0 * t_closed);
73 }
74
75 static inline void
76 barrier_trace_sync_rec (f64 t_entry)
77 {
78   if (!vlib_worker_threads->barrier_elog_enabled)
79     return;
80
81     /* *INDENT-OFF* */
82     ELOG_TYPE_DECLARE (e) =
83       {
84         .format = "bar-syncrec-%s-#%d",
85         .format_args = "T4i4",
86       };
87     /* *INDENT-ON* */
88   struct
89   {
90     u32 caller, depth;
91   } *ed = 0;
92
93   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
94   ed->depth = (int) vlib_worker_threads[0].recursion_level - 1;
95   ed->caller = elog_string (&vlib_global_main.elog_main,
96                             (char *) vlib_worker_threads[0].barrier_caller);
97 }
98
99 static inline void
100 barrier_trace_release_rec (f64 t_entry)
101 {
102   if (!vlib_worker_threads->barrier_elog_enabled)
103     return;
104
105     /* *INDENT-OFF* */
106     ELOG_TYPE_DECLARE (e) =
107       {
108         .format = "bar-relrrec-#%d",
109         .format_args = "i4",
110       };
111     /* *INDENT-ON* */
112   struct
113   {
114     u32 depth;
115   } *ed = 0;
116
117   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
118   ed->depth = (int) vlib_worker_threads[0].recursion_level;
119 }
120
121 static inline void
122 barrier_trace_release (f64 t_entry, f64 t_closed_total, f64 t_update_main)
123 {
124   if (!vlib_worker_threads->barrier_elog_enabled)
125     return;
126
127     /* *INDENT-OFF* */
128     ELOG_TYPE_DECLARE (e) =
129       {
130         .format = "bar-rel-#%d-e%d-u%d-t%d",
131         .format_args = "i4i4i4i4",
132       };
133     /* *INDENT-ON* */
134   struct
135   {
136     u32 count, t_entry, t_update_main, t_closed_total;
137   } *ed = 0;
138
139   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
140   ed->t_entry = (int) (1000000.0 * t_entry);
141   ed->t_update_main = (int) (1000000.0 * t_update_main);
142   ed->t_closed_total = (int) (1000000.0 * t_closed_total);
143   ed->count = (int) vlib_worker_threads[0].barrier_sync_count;
144
145   /* Reset context for next trace */
146   vlib_worker_threads[0].barrier_context = NULL;
147 }
148
149 uword
150 os_get_nthreads (void)
151 {
152   return vec_len (vlib_thread_stacks);
153 }
154
155 void
156 vlib_set_thread_name (char *name)
157 {
158   int pthread_setname_np (pthread_t __target_thread, const char *__name);
159   int rv;
160   pthread_t thread = pthread_self ();
161
162   if (thread)
163     {
164       rv = pthread_setname_np (thread, name);
165       if (rv)
166         clib_warning ("pthread_setname_np returned %d", rv);
167     }
168 }
169
170 static int
171 sort_registrations_by_no_clone (void *a0, void *a1)
172 {
173   vlib_thread_registration_t **tr0 = a0;
174   vlib_thread_registration_t **tr1 = a1;
175
176   return ((i32) ((*tr0)->no_data_structure_clone)
177           - ((i32) ((*tr1)->no_data_structure_clone)));
178 }
179
180 static uword *
181 clib_sysfs_list_to_bitmap (char *filename)
182 {
183   FILE *fp;
184   uword *r = 0;
185
186   fp = fopen (filename, "r");
187
188   if (fp != NULL)
189     {
190       u8 *buffer = 0;
191       vec_validate (buffer, 256 - 1);
192       if (fgets ((char *) buffer, 256, fp))
193         {
194           unformat_input_t in;
195           unformat_init_string (&in, (char *) buffer,
196                                 strlen ((char *) buffer));
197           if (unformat (&in, "%U", unformat_bitmap_list, &r) != 1)
198             clib_warning ("unformat_bitmap_list failed");
199           unformat_free (&in);
200         }
201       vec_free (buffer);
202       fclose (fp);
203     }
204   return r;
205 }
206
207
208 /* Called early in the init sequence */
209
210 clib_error_t *
211 vlib_thread_init (vlib_main_t * vm)
212 {
213   vlib_thread_main_t *tm = &vlib_thread_main;
214   vlib_worker_thread_t *w;
215   vlib_thread_registration_t *tr;
216   u32 n_vlib_mains = 1;
217   u32 first_index = 1;
218   u32 i;
219   uword *avail_cpu;
220
221   /* get bitmaps of active cpu cores and sockets */
222   tm->cpu_core_bitmap =
223     clib_sysfs_list_to_bitmap ("/sys/devices/system/cpu/online");
224   tm->cpu_socket_bitmap =
225     clib_sysfs_list_to_bitmap ("/sys/devices/system/node/online");
226
227   avail_cpu = clib_bitmap_dup (tm->cpu_core_bitmap);
228
229   /* skip cores */
230   for (i = 0; i < tm->skip_cores; i++)
231     {
232       uword c = clib_bitmap_first_set (avail_cpu);
233       if (c == ~0)
234         return clib_error_return (0, "no available cpus to skip");
235
236       avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
237     }
238
239   /* grab cpu for main thread */
240   if (tm->main_lcore == ~0)
241     {
242       /* if main-lcore is not set, we try to use lcore 1 */
243       if (clib_bitmap_get (avail_cpu, 1))
244         tm->main_lcore = 1;
245       else
246         tm->main_lcore = clib_bitmap_first_set (avail_cpu);
247       if (tm->main_lcore == (u8) ~ 0)
248         return clib_error_return (0, "no available cpus to be used for the"
249                                   " main thread");
250     }
251   else
252     {
253       if (clib_bitmap_get (avail_cpu, tm->main_lcore) == 0)
254         return clib_error_return (0, "cpu %u is not available to be used"
255                                   " for the main thread", tm->main_lcore);
256     }
257   avail_cpu = clib_bitmap_set (avail_cpu, tm->main_lcore, 0);
258
259   /*
260    * Determine if the number of workers is greater than 0.
261    * If so, mark CPU 0 unavailable so workers will be numbered after main.
262    */
263   u32 n_workers = 0;
264   uword *p = hash_get_mem (tm->thread_registrations_by_name, "workers");
265   if (p != 0)
266     {
267       vlib_thread_registration_t *tr = (vlib_thread_registration_t *) p[0];
268       int worker_thread_count = tr->count;
269       n_workers = worker_thread_count;
270     }
271   if (tm->skip_cores == 0 && n_workers)
272     avail_cpu = clib_bitmap_set (avail_cpu, 0, 0);
273
274   /* assume that there is socket 0 only if there is no data from sysfs */
275   if (!tm->cpu_socket_bitmap)
276     tm->cpu_socket_bitmap = clib_bitmap_set (0, 0, 1);
277
278   /* pin main thread to main_lcore  */
279   if (tm->cb.vlib_thread_set_lcore_cb)
280     {
281       tm->cb.vlib_thread_set_lcore_cb (0, tm->main_lcore);
282     }
283   else
284     {
285       cpu_set_t cpuset;
286       CPU_ZERO (&cpuset);
287       CPU_SET (tm->main_lcore, &cpuset);
288       pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
289     }
290
291   /* Set up thread 0 */
292   vec_validate_aligned (vlib_worker_threads, 0, CLIB_CACHE_LINE_BYTES);
293   _vec_len (vlib_worker_threads) = 1;
294   w = vlib_worker_threads;
295   w->thread_mheap = clib_mem_get_heap ();
296   w->thread_stack = vlib_thread_stacks[0];
297   w->cpu_id = tm->main_lcore;
298   w->lwp = syscall (SYS_gettid);
299   w->thread_id = pthread_self ();
300   tm->n_vlib_mains = 1;
301
302   vlib_get_thread_core_numa (w, w->cpu_id);
303
304   if (tm->sched_policy != ~0)
305     {
306       struct sched_param sched_param;
307       if (!sched_getparam (w->lwp, &sched_param))
308         {
309           if (tm->sched_priority != ~0)
310             sched_param.sched_priority = tm->sched_priority;
311           sched_setscheduler (w->lwp, tm->sched_policy, &sched_param);
312         }
313     }
314
315   /* assign threads to cores and set n_vlib_mains */
316   tr = tm->next;
317
318   while (tr)
319     {
320       vec_add1 (tm->registrations, tr);
321       tr = tr->next;
322     }
323
324   vec_sort_with_function (tm->registrations, sort_registrations_by_no_clone);
325
326   for (i = 0; i < vec_len (tm->registrations); i++)
327     {
328       int j;
329       tr = tm->registrations[i];
330       tr->first_index = first_index;
331       first_index += tr->count;
332       n_vlib_mains += (tr->no_data_structure_clone == 0) ? tr->count : 0;
333
334       /* construct coremask */
335       if (tr->use_pthreads || !tr->count)
336         continue;
337
338       if (tr->coremask)
339         {
340           uword c;
341           /* *INDENT-OFF* */
342           clib_bitmap_foreach (c, tr->coremask, ({
343             if (clib_bitmap_get(avail_cpu, c) == 0)
344               return clib_error_return (0, "cpu %u is not available to be used"
345                                         " for the '%s' thread",c, tr->name);
346
347             avail_cpu = clib_bitmap_set(avail_cpu, c, 0);
348           }));
349           /* *INDENT-ON* */
350         }
351       else
352         {
353           for (j = 0; j < tr->count; j++)
354             {
355               uword c = clib_bitmap_first_set (avail_cpu);
356               if (c == ~0)
357                 return clib_error_return (0,
358                                           "no available cpus to be used for"
359                                           " the '%s' thread", tr->name);
360
361               avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
362               tr->coremask = clib_bitmap_set (tr->coremask, c, 1);
363             }
364         }
365     }
366
367   clib_bitmap_free (avail_cpu);
368
369   tm->n_vlib_mains = n_vlib_mains;
370
371   /*
372    * Allocate the remaining worker threads, and thread stack vector slots
373    * from now on, calls to os_get_nthreads() will return the correct
374    * answer.
375    */
376   vec_validate_aligned (vlib_worker_threads, first_index - 1,
377                         CLIB_CACHE_LINE_BYTES);
378   vec_validate (vlib_thread_stacks, vec_len (vlib_worker_threads) - 1);
379   return 0;
380 }
381
382 vlib_frame_queue_t *
383 vlib_frame_queue_alloc (int nelts)
384 {
385   vlib_frame_queue_t *fq;
386
387   fq = clib_mem_alloc_aligned (sizeof (*fq), CLIB_CACHE_LINE_BYTES);
388   clib_memset (fq, 0, sizeof (*fq));
389   fq->nelts = nelts;
390   fq->vector_threshold = 128;   // packets
391   vec_validate_aligned (fq->elts, nelts - 1, CLIB_CACHE_LINE_BYTES);
392
393   if (1)
394     {
395       if (((uword) & fq->tail) & (CLIB_CACHE_LINE_BYTES - 1))
396         fformat (stderr, "WARNING: fq->tail unaligned\n");
397       if (((uword) & fq->head) & (CLIB_CACHE_LINE_BYTES - 1))
398         fformat (stderr, "WARNING: fq->head unaligned\n");
399       if (((uword) fq->elts) & (CLIB_CACHE_LINE_BYTES - 1))
400         fformat (stderr, "WARNING: fq->elts unaligned\n");
401
402       if (sizeof (fq->elts[0]) % CLIB_CACHE_LINE_BYTES)
403         fformat (stderr, "WARNING: fq->elts[0] size %d\n",
404                  sizeof (fq->elts[0]));
405       if (nelts & (nelts - 1))
406         {
407           fformat (stderr, "FATAL: nelts MUST be a power of 2\n");
408           abort ();
409         }
410     }
411
412   return (fq);
413 }
414
415 void vl_msg_api_handler_no_free (void *) __attribute__ ((weak));
416 void
417 vl_msg_api_handler_no_free (void *v)
418 {
419 }
420
421 /* Turned off, save as reference material... */
422 #if 0
423 static inline int
424 vlib_frame_queue_dequeue_internal (int thread_id,
425                                    vlib_main_t * vm, vlib_node_main_t * nm)
426 {
427   vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
428   vlib_frame_queue_elt_t *elt;
429   vlib_frame_t *f;
430   vlib_pending_frame_t *p;
431   vlib_node_runtime_t *r;
432   u32 node_runtime_index;
433   int msg_type;
434   u64 before;
435   int processed = 0;
436
437   ASSERT (vm == vlib_mains[thread_id]);
438
439   while (1)
440     {
441       if (fq->head == fq->tail)
442         return processed;
443
444       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
445
446       if (!elt->valid)
447         return processed;
448
449       before = clib_cpu_time_now ();
450
451       f = elt->frame;
452       node_runtime_index = elt->node_runtime_index;
453       msg_type = elt->msg_type;
454
455       switch (msg_type)
456         {
457         case VLIB_FRAME_QUEUE_ELT_FREE_BUFFERS:
458           vlib_buffer_free (vm, vlib_frame_vector_args (f), f->n_vectors);
459           /* note fallthrough... */
460         case VLIB_FRAME_QUEUE_ELT_FREE_FRAME:
461           r = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
462                                 node_runtime_index);
463           vlib_frame_free (vm, r, f);
464           break;
465         case VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME:
466           vec_add2 (vm->node_main.pending_frames, p, 1);
467           f->flags |= (VLIB_FRAME_PENDING | VLIB_FRAME_FREE_AFTER_DISPATCH);
468           p->node_runtime_index = elt->node_runtime_index;
469           p->frame_index = vlib_frame_index (vm, f);
470           p->next_frame_index = VLIB_PENDING_FRAME_NO_NEXT_FRAME;
471           fq->dequeue_vectors += (u64) f->n_vectors;
472           break;
473         case VLIB_FRAME_QUEUE_ELT_API_MSG:
474           vl_msg_api_handler_no_free (f);
475           break;
476         default:
477           clib_warning ("bogus frame queue message, type %d", msg_type);
478           break;
479         }
480       elt->valid = 0;
481       fq->dequeues++;
482       fq->dequeue_ticks += clib_cpu_time_now () - before;
483       CLIB_MEMORY_BARRIER ();
484       fq->head++;
485       processed++;
486     }
487   ASSERT (0);
488   return processed;
489 }
490
491 int
492 vlib_frame_queue_dequeue (int thread_id,
493                           vlib_main_t * vm, vlib_node_main_t * nm)
494 {
495   return vlib_frame_queue_dequeue_internal (thread_id, vm, nm);
496 }
497
498 int
499 vlib_frame_queue_enqueue (vlib_main_t * vm, u32 node_runtime_index,
500                           u32 frame_queue_index, vlib_frame_t * frame,
501                           vlib_frame_queue_msg_type_t type)
502 {
503   vlib_frame_queue_t *fq = vlib_frame_queues[frame_queue_index];
504   vlib_frame_queue_elt_t *elt;
505   u32 save_count;
506   u64 new_tail;
507   u64 before = clib_cpu_time_now ();
508
509   ASSERT (fq);
510
511   new_tail = clib_atomic_add_fetch (&fq->tail, 1);
512
513   /* Wait until a ring slot is available */
514   while (new_tail >= fq->head + fq->nelts)
515     {
516       f64 b4 = vlib_time_now_ticks (vm, before);
517       vlib_worker_thread_barrier_check (vm, b4);
518       /* Bad idea. Dequeue -> enqueue -> dequeue -> trouble */
519       // vlib_frame_queue_dequeue (vm->thread_index, vm, nm);
520     }
521
522   elt = fq->elts + (new_tail & (fq->nelts - 1));
523
524   /* this would be very bad... */
525   while (elt->valid)
526     {
527     }
528
529   /* Once we enqueue the frame, frame->n_vectors is owned elsewhere... */
530   save_count = frame->n_vectors;
531
532   elt->frame = frame;
533   elt->node_runtime_index = node_runtime_index;
534   elt->msg_type = type;
535   CLIB_MEMORY_BARRIER ();
536   elt->valid = 1;
537
538   return save_count;
539 }
540 #endif /* 0 */
541
542 /* To be called by vlib worker threads upon startup */
543 void
544 vlib_worker_thread_init (vlib_worker_thread_t * w)
545 {
546   vlib_thread_main_t *tm = vlib_get_thread_main ();
547
548   /*
549    * Note: disabling signals in worker threads as follows
550    * prevents the api post-mortem dump scheme from working
551    * {
552    *    sigset_t s;
553    *    sigfillset (&s);
554    *    pthread_sigmask (SIG_SETMASK, &s, 0);
555    *  }
556    */
557
558   clib_mem_set_heap (w->thread_mheap);
559
560   if (vec_len (tm->thread_prefix) && w->registration->short_name)
561     {
562       w->name = format (0, "%v_%s_%d%c", tm->thread_prefix,
563                         w->registration->short_name, w->instance_id, '\0');
564       vlib_set_thread_name ((char *) w->name);
565     }
566
567   if (!w->registration->use_pthreads)
568     {
569
570       /* Initial barrier sync, for both worker and i/o threads */
571       clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, 1);
572
573       while (*vlib_worker_threads->wait_at_barrier)
574         ;
575
576       clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, -1);
577     }
578 }
579
580 void *
581 vlib_worker_thread_bootstrap_fn (void *arg)
582 {
583   void *rv;
584   vlib_worker_thread_t *w = arg;
585
586   w->lwp = syscall (SYS_gettid);
587   w->thread_id = pthread_self ();
588
589   __os_thread_index = w - vlib_worker_threads;
590
591   rv = (void *) clib_calljmp
592     ((uword (*)(uword)) w->thread_function,
593      (uword) arg, w->thread_stack + VLIB_THREAD_STACK_SIZE);
594   /* NOTREACHED, we hope */
595   return rv;
596 }
597
598 void
599 vlib_get_thread_core_numa (vlib_worker_thread_t * w, unsigned cpu_id)
600 {
601   const char *sys_cpu_path = "/sys/devices/system/cpu/cpu";
602   const char *sys_node_path = "/sys/devices/system/node/node";
603   clib_bitmap_t *nbmp = 0, *cbmp = 0;
604   u32 node;
605   u8 *p = 0;
606   int core_id = -1, numa_id = -1;
607
608   p = format (p, "%s%u/topology/core_id%c", sys_cpu_path, cpu_id, 0);
609   clib_sysfs_read ((char *) p, "%d", &core_id);
610   vec_reset_length (p);
611
612   /* *INDENT-OFF* */
613   clib_sysfs_read ("/sys/devices/system/node/online", "%U",
614         unformat_bitmap_list, &nbmp);
615   clib_bitmap_foreach (node, nbmp, ({
616     p = format (p, "%s%u/cpulist%c", sys_node_path, node, 0);
617     clib_sysfs_read ((char *) p, "%U", unformat_bitmap_list, &cbmp);
618     if (clib_bitmap_get (cbmp, cpu_id))
619       numa_id = node;
620     vec_reset_length (cbmp);
621     vec_reset_length (p);
622   }));
623   /* *INDENT-ON* */
624   vec_free (nbmp);
625   vec_free (cbmp);
626   vec_free (p);
627
628   w->core_id = core_id;
629   w->numa_id = numa_id;
630 }
631
632 static clib_error_t *
633 vlib_launch_thread_int (void *fp, vlib_worker_thread_t * w, unsigned cpu_id)
634 {
635   vlib_thread_main_t *tm = &vlib_thread_main;
636   void *(*fp_arg) (void *) = fp;
637   void *numa_heap;
638
639   w->cpu_id = cpu_id;
640   vlib_get_thread_core_numa (w, cpu_id);
641
642   /* Set up NUMA-bound heap if indicated */
643   if (clib_per_numa_mheaps[w->numa_id] == 0)
644     {
645       /* If the user requested a NUMA heap, create it... */
646       if (tm->numa_heap_size)
647         {
648           numa_heap = clib_mem_init_thread_safe_numa
649             (0 /* DIY */ , tm->numa_heap_size, w->numa_id);
650           clib_per_numa_mheaps[w->numa_id] = numa_heap;
651         }
652       else
653         {
654           /* Or, use the main heap */
655           clib_per_numa_mheaps[w->numa_id] = w->thread_mheap;
656         }
657     }
658
659   if (tm->cb.vlib_launch_thread_cb && !w->registration->use_pthreads)
660     return tm->cb.vlib_launch_thread_cb (fp, (void *) w, cpu_id);
661   else
662     {
663       pthread_t worker;
664       cpu_set_t cpuset;
665       CPU_ZERO (&cpuset);
666       CPU_SET (cpu_id, &cpuset);
667
668       if (pthread_create (&worker, NULL /* attr */ , fp_arg, (void *) w))
669         return clib_error_return_unix (0, "pthread_create");
670
671       if (pthread_setaffinity_np (worker, sizeof (cpu_set_t), &cpuset))
672         return clib_error_return_unix (0, "pthread_setaffinity_np");
673
674       return 0;
675     }
676 }
677
678 static clib_error_t *
679 start_workers (vlib_main_t * vm)
680 {
681   int i, j;
682   vlib_worker_thread_t *w;
683   vlib_main_t *vm_clone;
684   void *oldheap;
685   vlib_thread_main_t *tm = &vlib_thread_main;
686   vlib_thread_registration_t *tr;
687   vlib_node_runtime_t *rt;
688   u32 n_vlib_mains = tm->n_vlib_mains;
689   u32 worker_thread_index;
690   u8 *main_heap = clib_mem_get_per_cpu_heap ();
691
692   vec_reset_length (vlib_worker_threads);
693
694   /* Set up the main thread */
695   vec_add2_aligned (vlib_worker_threads, w, 1, CLIB_CACHE_LINE_BYTES);
696   w->elog_track.name = "main thread";
697   elog_track_register (&vm->elog_main, &w->elog_track);
698
699   if (vec_len (tm->thread_prefix))
700     {
701       w->name = format (0, "%v_main%c", tm->thread_prefix, '\0');
702       vlib_set_thread_name ((char *) w->name);
703     }
704
705   vm->elog_main.lock =
706     clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES, CLIB_CACHE_LINE_BYTES);
707   vm->elog_main.lock[0] = 0;
708
709   if (n_vlib_mains > 1)
710     {
711       /* Replace hand-crafted length-1 vector with a real vector */
712       vlib_mains = 0;
713
714       vec_validate_aligned (vlib_mains, tm->n_vlib_mains - 1,
715                             CLIB_CACHE_LINE_BYTES);
716       _vec_len (vlib_mains) = 0;
717       vec_add1_aligned (vlib_mains, vm, CLIB_CACHE_LINE_BYTES);
718
719       vlib_worker_threads->wait_at_barrier =
720         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
721       vlib_worker_threads->workers_at_barrier =
722         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
723
724       vlib_worker_threads->node_reforks_required =
725         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
726
727       /* We'll need the rpc vector lock... */
728       clib_spinlock_init (&vm->pending_rpc_lock);
729
730       /* Ask for an initial barrier sync */
731       *vlib_worker_threads->workers_at_barrier = 0;
732       *vlib_worker_threads->wait_at_barrier = 1;
733
734       /* Without update or refork */
735       *vlib_worker_threads->node_reforks_required = 0;
736       vm->need_vlib_worker_thread_node_runtime_update = 0;
737
738       /* init timing */
739       vm->barrier_epoch = 0;
740       vm->barrier_no_close_before = 0;
741
742       worker_thread_index = 1;
743
744       for (i = 0; i < vec_len (tm->registrations); i++)
745         {
746           vlib_node_main_t *nm, *nm_clone;
747           int k;
748
749           tr = tm->registrations[i];
750
751           if (tr->count == 0)
752             continue;
753
754           for (k = 0; k < tr->count; k++)
755             {
756               vlib_node_t *n;
757
758               vec_add2 (vlib_worker_threads, w, 1);
759               /* Currently unused, may not really work */
760               if (tr->mheap_size)
761                 w->thread_mheap = create_mspace (tr->mheap_size,
762                                                  0 /* unlocked */ );
763               else
764                 w->thread_mheap = main_heap;
765
766               w->thread_stack =
767                 vlib_thread_stack_init (w - vlib_worker_threads);
768               w->thread_function = tr->function;
769               w->thread_function_arg = w;
770               w->instance_id = k;
771               w->registration = tr;
772
773               w->elog_track.name =
774                 (char *) format (0, "%s %d", tr->name, k + 1);
775               vec_add1 (w->elog_track.name, 0);
776               elog_track_register (&vm->elog_main, &w->elog_track);
777
778               if (tr->no_data_structure_clone)
779                 continue;
780
781               /* Fork vlib_global_main et al. Look for bugs here */
782               oldheap = clib_mem_set_heap (w->thread_mheap);
783
784               vm_clone = clib_mem_alloc_aligned (sizeof (*vm_clone),
785                                                  CLIB_CACHE_LINE_BYTES);
786               clib_memcpy (vm_clone, vlib_mains[0], sizeof (*vm_clone));
787
788               vm_clone->thread_index = worker_thread_index;
789               vm_clone->heap_base = w->thread_mheap;
790               vm_clone->heap_aligned_base = (void *)
791                 (((uword) w->thread_mheap) & ~(VLIB_FRAME_ALIGN - 1));
792               vm_clone->init_functions_called =
793                 hash_create (0, /* value bytes */ 0);
794               vm_clone->pending_rpc_requests = 0;
795               vec_validate (vm_clone->pending_rpc_requests, 0);
796               _vec_len (vm_clone->pending_rpc_requests) = 0;
797               clib_memset (&vm_clone->random_buffer, 0,
798                            sizeof (vm_clone->random_buffer));
799
800               nm = &vlib_mains[0]->node_main;
801               nm_clone = &vm_clone->node_main;
802               /* fork next frames array, preserving node runtime indices */
803               nm_clone->next_frames = vec_dup_aligned (nm->next_frames,
804                                                        CLIB_CACHE_LINE_BYTES);
805               for (j = 0; j < vec_len (nm_clone->next_frames); j++)
806                 {
807                   vlib_next_frame_t *nf = &nm_clone->next_frames[j];
808                   u32 save_node_runtime_index;
809                   u32 save_flags;
810
811                   save_node_runtime_index = nf->node_runtime_index;
812                   save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
813                   vlib_next_frame_init (nf);
814                   nf->node_runtime_index = save_node_runtime_index;
815                   nf->flags = save_flags;
816                 }
817
818               /* fork the frame dispatch queue */
819               nm_clone->pending_frames = 0;
820               vec_validate (nm_clone->pending_frames, 10);
821               _vec_len (nm_clone->pending_frames) = 0;
822
823               /* fork nodes */
824               nm_clone->nodes = 0;
825
826               /* Allocate all nodes in single block for speed */
827               n = clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*n));
828
829               for (j = 0; j < vec_len (nm->nodes); j++)
830                 {
831                   clib_memcpy (n, nm->nodes[j], sizeof (*n));
832                   /* none of the copied nodes have enqueue rights given out */
833                   n->owner_node_index = VLIB_INVALID_NODE_INDEX;
834                   clib_memset (&n->stats_total, 0, sizeof (n->stats_total));
835                   clib_memset (&n->stats_last_clear, 0,
836                                sizeof (n->stats_last_clear));
837                   vec_add1 (nm_clone->nodes, n);
838                   n++;
839                 }
840               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
841                 vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
842                                  CLIB_CACHE_LINE_BYTES);
843               vec_foreach (rt,
844                            nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
845               {
846                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
847                 rt->thread_index = vm_clone->thread_index;
848                 /* copy initial runtime_data from node */
849                 if (n->runtime_data && n->runtime_data_bytes > 0)
850                   clib_memcpy (rt->runtime_data, n->runtime_data,
851                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
852                                          n->runtime_data_bytes));
853               }
854
855               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
856                 vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
857                                  CLIB_CACHE_LINE_BYTES);
858               vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
859               {
860                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
861                 rt->thread_index = vm_clone->thread_index;
862                 /* copy initial runtime_data from node */
863                 if (n->runtime_data && n->runtime_data_bytes > 0)
864                   clib_memcpy (rt->runtime_data, n->runtime_data,
865                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
866                                          n->runtime_data_bytes));
867               }
868
869               nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT] =
870                 vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT],
871                                  CLIB_CACHE_LINE_BYTES);
872               vec_foreach (rt,
873                            nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT])
874               {
875                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
876                 rt->thread_index = vm_clone->thread_index;
877                 /* copy initial runtime_data from node */
878                 if (n->runtime_data && n->runtime_data_bytes > 0)
879                   clib_memcpy (rt->runtime_data, n->runtime_data,
880                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
881                                          n->runtime_data_bytes));
882               }
883
884               nm_clone->processes = vec_dup_aligned (nm->processes,
885                                                      CLIB_CACHE_LINE_BYTES);
886
887               /* Create per-thread frame freelist */
888               nm_clone->frame_sizes = vec_new (vlib_frame_size_t, 1);
889 #ifdef VLIB_SUPPORTS_ARBITRARY_SCALAR_SIZES
890               nm_clone->frame_size_hash = hash_create (0, sizeof (uword));
891 #endif
892               nm_clone->node_by_error = nm->node_by_error;
893
894               /* Packet trace buffers are guaranteed to be empty, nothing to do here */
895
896               clib_mem_set_heap (oldheap);
897               vec_add1_aligned (vlib_mains, vm_clone, CLIB_CACHE_LINE_BYTES);
898
899               /* Switch to the stats segment ... */
900               void *oldheap = vlib_stats_push_heap (0);
901               vm_clone->error_main.counters = vec_dup_aligned
902                 (vlib_mains[0]->error_main.counters, CLIB_CACHE_LINE_BYTES);
903               vlib_stats_pop_heap2 (vm_clone->error_main.counters,
904                                     worker_thread_index, oldheap, 1);
905
906               vm_clone->error_main.counters_last_clear = vec_dup_aligned
907                 (vlib_mains[0]->error_main.counters_last_clear,
908                  CLIB_CACHE_LINE_BYTES);
909
910               worker_thread_index++;
911             }
912         }
913     }
914   else
915     {
916       /* only have non-data-structure copy threads to create... */
917       for (i = 0; i < vec_len (tm->registrations); i++)
918         {
919           tr = tm->registrations[i];
920
921           for (j = 0; j < tr->count; j++)
922             {
923               vec_add2 (vlib_worker_threads, w, 1);
924               if (tr->mheap_size)
925                 {
926                   w->thread_mheap =
927                     create_mspace (tr->mheap_size, 0 /* locked */ );
928                 }
929               else
930                 w->thread_mheap = main_heap;
931               w->thread_stack =
932                 vlib_thread_stack_init (w - vlib_worker_threads);
933               w->thread_function = tr->function;
934               w->thread_function_arg = w;
935               w->instance_id = j;
936               w->elog_track.name =
937                 (char *) format (0, "%s %d", tr->name, j + 1);
938               w->registration = tr;
939               vec_add1 (w->elog_track.name, 0);
940               elog_track_register (&vm->elog_main, &w->elog_track);
941             }
942         }
943     }
944
945   worker_thread_index = 1;
946
947   for (i = 0; i < vec_len (tm->registrations); i++)
948     {
949       clib_error_t *err;
950       int j;
951
952       tr = tm->registrations[i];
953
954       if (tr->use_pthreads || tm->use_pthreads)
955         {
956           for (j = 0; j < tr->count; j++)
957             {
958               w = vlib_worker_threads + worker_thread_index++;
959               err = vlib_launch_thread_int (vlib_worker_thread_bootstrap_fn,
960                                             w, 0);
961               if (err)
962                 clib_error_report (err);
963             }
964         }
965       else
966         {
967           uword c;
968           /* *INDENT-OFF* */
969           clib_bitmap_foreach (c, tr->coremask, ({
970             w = vlib_worker_threads + worker_thread_index++;
971             err = vlib_launch_thread_int (vlib_worker_thread_bootstrap_fn,
972                                           w, c);
973             if (err)
974               clib_error_report (err);
975           }));
976           /* *INDENT-ON* */
977         }
978     }
979   vlib_worker_thread_barrier_sync (vm);
980   vlib_worker_thread_barrier_release (vm);
981   return 0;
982 }
983
984 VLIB_MAIN_LOOP_ENTER_FUNCTION (start_workers);
985
986
987 static inline void
988 worker_thread_node_runtime_update_internal (void)
989 {
990   int i, j;
991   vlib_main_t *vm;
992   vlib_node_main_t *nm, *nm_clone;
993   vlib_main_t *vm_clone;
994   vlib_node_runtime_t *rt;
995   never_inline void
996     vlib_node_runtime_sync_stats (vlib_main_t * vm,
997                                   vlib_node_runtime_t * r,
998                                   uword n_calls,
999                                   uword n_vectors, uword n_clocks);
1000
1001   ASSERT (vlib_get_thread_index () == 0);
1002
1003   vm = vlib_mains[0];
1004   nm = &vm->node_main;
1005
1006   ASSERT (*vlib_worker_threads->wait_at_barrier == 1);
1007
1008   /*
1009    * Scrape all runtime stats, so we don't lose node runtime(s) with
1010    * pending counts, or throw away worker / io thread counts.
1011    */
1012   for (j = 0; j < vec_len (nm->nodes); j++)
1013     {
1014       vlib_node_t *n;
1015       n = nm->nodes[j];
1016       vlib_node_sync_stats (vm, n);
1017     }
1018
1019   for (i = 1; i < vec_len (vlib_mains); i++)
1020     {
1021       vlib_node_t *n;
1022
1023       vm_clone = vlib_mains[i];
1024       nm_clone = &vm_clone->node_main;
1025
1026       for (j = 0; j < vec_len (nm_clone->nodes); j++)
1027         {
1028           n = nm_clone->nodes[j];
1029
1030           rt = vlib_node_get_runtime (vm_clone, n->index);
1031           vlib_node_runtime_sync_stats (vm_clone, rt, 0, 0, 0);
1032         }
1033     }
1034
1035   /* Per-worker clone rebuilds are now done on each thread */
1036 }
1037
1038
1039 void
1040 vlib_worker_thread_node_refork (void)
1041 {
1042   vlib_main_t *vm, *vm_clone;
1043   vlib_node_main_t *nm, *nm_clone;
1044   vlib_node_t **old_nodes_clone;
1045   vlib_node_runtime_t *rt, *old_rt;
1046
1047   vlib_node_t *new_n_clone;
1048
1049   int j;
1050
1051   vm = vlib_mains[0];
1052   nm = &vm->node_main;
1053   vm_clone = vlib_get_main ();
1054   nm_clone = &vm_clone->node_main;
1055
1056   /* Re-clone error heap */
1057   u64 *old_counters = vm_clone->error_main.counters;
1058   u64 *old_counters_all_clear = vm_clone->error_main.counters_last_clear;
1059
1060   clib_memcpy_fast (&vm_clone->error_main, &vm->error_main,
1061                     sizeof (vm->error_main));
1062   j = vec_len (vm->error_main.counters) - 1;
1063
1064   /* Switch to the stats segment ... */
1065   void *oldheap = vlib_stats_push_heap (0);
1066   vec_validate_aligned (old_counters, j, CLIB_CACHE_LINE_BYTES);
1067   vm_clone->error_main.counters = old_counters;
1068   vlib_stats_pop_heap2 (vm_clone->error_main.counters, vm_clone->thread_index,
1069                         oldheap, 0);
1070
1071   vec_validate_aligned (old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES);
1072   vm_clone->error_main.counters_last_clear = old_counters_all_clear;
1073
1074   nm_clone = &vm_clone->node_main;
1075   vec_free (nm_clone->next_frames);
1076   nm_clone->next_frames = vec_dup_aligned (nm->next_frames,
1077                                            CLIB_CACHE_LINE_BYTES);
1078
1079   for (j = 0; j < vec_len (nm_clone->next_frames); j++)
1080     {
1081       vlib_next_frame_t *nf = &nm_clone->next_frames[j];
1082       u32 save_node_runtime_index;
1083       u32 save_flags;
1084
1085       save_node_runtime_index = nf->node_runtime_index;
1086       save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
1087       vlib_next_frame_init (nf);
1088       nf->node_runtime_index = save_node_runtime_index;
1089       nf->flags = save_flags;
1090     }
1091
1092   old_nodes_clone = nm_clone->nodes;
1093   nm_clone->nodes = 0;
1094
1095   /* re-fork nodes */
1096
1097   /* Allocate all nodes in single block for speed */
1098   new_n_clone =
1099     clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*new_n_clone));
1100   for (j = 0; j < vec_len (nm->nodes); j++)
1101     {
1102       vlib_node_t *new_n = nm->nodes[j];
1103
1104       clib_memcpy_fast (new_n_clone, new_n, sizeof (*new_n));
1105       /* none of the copied nodes have enqueue rights given out */
1106       new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX;
1107
1108       if (j >= vec_len (old_nodes_clone))
1109         {
1110           /* new node, set to zero */
1111           clib_memset (&new_n_clone->stats_total, 0,
1112                        sizeof (new_n_clone->stats_total));
1113           clib_memset (&new_n_clone->stats_last_clear, 0,
1114                        sizeof (new_n_clone->stats_last_clear));
1115         }
1116       else
1117         {
1118           vlib_node_t *old_n_clone = old_nodes_clone[j];
1119           /* Copy stats if the old data is valid */
1120           clib_memcpy_fast (&new_n_clone->stats_total,
1121                             &old_n_clone->stats_total,
1122                             sizeof (new_n_clone->stats_total));
1123           clib_memcpy_fast (&new_n_clone->stats_last_clear,
1124                             &old_n_clone->stats_last_clear,
1125                             sizeof (new_n_clone->stats_last_clear));
1126
1127           /* keep previous node state */
1128           new_n_clone->state = old_n_clone->state;
1129         }
1130       vec_add1 (nm_clone->nodes, new_n_clone);
1131       new_n_clone++;
1132     }
1133   /* Free the old node clones */
1134   clib_mem_free (old_nodes_clone[0]);
1135
1136   vec_free (old_nodes_clone);
1137
1138
1139   /* re-clone internal nodes */
1140   old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL];
1141   nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
1142     vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
1143                      CLIB_CACHE_LINE_BYTES);
1144
1145   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
1146   {
1147     vlib_node_t *n = vlib_get_node (vm, rt->node_index);
1148     rt->thread_index = vm_clone->thread_index;
1149     /* copy runtime_data, will be overwritten later for existing rt */
1150     if (n->runtime_data && n->runtime_data_bytes > 0)
1151       clib_memcpy_fast (rt->runtime_data, n->runtime_data,
1152                         clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
1153                                   n->runtime_data_bytes));
1154   }
1155
1156   for (j = 0; j < vec_len (old_rt); j++)
1157     {
1158       rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
1159       rt->state = old_rt[j].state;
1160       clib_memcpy_fast (rt->runtime_data, old_rt[j].runtime_data,
1161                         VLIB_NODE_RUNTIME_DATA_SIZE);
1162     }
1163
1164   vec_free (old_rt);
1165
1166   /* re-clone input nodes */
1167   old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT];
1168   nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
1169     vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
1170                      CLIB_CACHE_LINE_BYTES);
1171
1172   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
1173   {
1174     vlib_node_t *n = vlib_get_node (vm, rt->node_index);
1175     rt->thread_index = vm_clone->thread_index;
1176     /* copy runtime_data, will be overwritten later for existing rt */
1177     if (n->runtime_data && n->runtime_data_bytes > 0)
1178       clib_memcpy_fast (rt->runtime_data, n->runtime_data,
1179                         clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
1180                                   n->runtime_data_bytes));
1181   }
1182
1183   for (j = 0; j < vec_len (old_rt); j++)
1184     {
1185       rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
1186       rt->state = old_rt[j].state;
1187       clib_memcpy_fast (rt->runtime_data, old_rt[j].runtime_data,
1188                         VLIB_NODE_RUNTIME_DATA_SIZE);
1189     }
1190
1191   vec_free (old_rt);
1192
1193   /* re-clone pre-input nodes */
1194   old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT];
1195   nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT] =
1196     vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT],
1197                      CLIB_CACHE_LINE_BYTES);
1198
1199   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT])
1200   {
1201     vlib_node_t *n = vlib_get_node (vm, rt->node_index);
1202     rt->thread_index = vm_clone->thread_index;
1203     /* copy runtime_data, will be overwritten later for existing rt */
1204     if (n->runtime_data && n->runtime_data_bytes > 0)
1205       clib_memcpy_fast (rt->runtime_data, n->runtime_data,
1206                         clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
1207                                   n->runtime_data_bytes));
1208   }
1209
1210   for (j = 0; j < vec_len (old_rt); j++)
1211     {
1212       rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
1213       rt->state = old_rt[j].state;
1214       clib_memcpy_fast (rt->runtime_data, old_rt[j].runtime_data,
1215                         VLIB_NODE_RUNTIME_DATA_SIZE);
1216     }
1217
1218   vec_free (old_rt);
1219
1220   nm_clone->processes = vec_dup_aligned (nm->processes,
1221                                          CLIB_CACHE_LINE_BYTES);
1222   nm_clone->node_by_error = nm->node_by_error;
1223 }
1224
1225 void
1226 vlib_worker_thread_node_runtime_update (void)
1227 {
1228   /*
1229    * Make a note that we need to do a node runtime update
1230    * prior to releasing the barrier.
1231    */
1232   vlib_global_main.need_vlib_worker_thread_node_runtime_update = 1;
1233 }
1234
1235 u32
1236 unformat_sched_policy (unformat_input_t * input, va_list * args)
1237 {
1238   u32 *r = va_arg (*args, u32 *);
1239
1240   if (0);
1241 #define _(v,f,s) else if (unformat (input, s)) *r = SCHED_POLICY_##f;
1242   foreach_sched_policy
1243 #undef _
1244     else
1245     return 0;
1246   return 1;
1247 }
1248
1249 static clib_error_t *
1250 cpu_config (vlib_main_t * vm, unformat_input_t * input)
1251 {
1252   vlib_thread_registration_t *tr;
1253   uword *p;
1254   vlib_thread_main_t *tm = &vlib_thread_main;
1255   u8 *name;
1256   uword *bitmap;
1257   u32 count;
1258
1259   tm->thread_registrations_by_name = hash_create_string (0, sizeof (uword));
1260
1261   tm->n_thread_stacks = 1;      /* account for main thread */
1262   tm->sched_policy = ~0;
1263   tm->sched_priority = ~0;
1264   tm->main_lcore = ~0;
1265
1266   tr = tm->next;
1267
1268   while (tr)
1269     {
1270       hash_set_mem (tm->thread_registrations_by_name, tr->name, (uword) tr);
1271       tr = tr->next;
1272     }
1273
1274   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1275     {
1276       if (unformat (input, "use-pthreads"))
1277         tm->use_pthreads = 1;
1278       else if (unformat (input, "thread-prefix %v", &tm->thread_prefix))
1279         ;
1280       else if (unformat (input, "main-core %u", &tm->main_lcore))
1281         ;
1282       else if (unformat (input, "skip-cores %u", &tm->skip_cores))
1283         ;
1284       else if (unformat (input, "numa-heap-size %U",
1285                          unformat_memory_size, &tm->numa_heap_size))
1286         ;
1287       else if (unformat (input, "coremask-%s %U", &name,
1288                          unformat_bitmap_mask, &bitmap) ||
1289                unformat (input, "corelist-%s %U", &name,
1290                          unformat_bitmap_list, &bitmap))
1291         {
1292           p = hash_get_mem (tm->thread_registrations_by_name, name);
1293           if (p == 0)
1294             return clib_error_return (0, "no such thread type '%s'", name);
1295
1296           tr = (vlib_thread_registration_t *) p[0];
1297
1298           if (tr->use_pthreads)
1299             return clib_error_return (0,
1300                                       "corelist cannot be set for '%s' threads",
1301                                       name);
1302
1303           tr->coremask = bitmap;
1304           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1305         }
1306       else
1307         if (unformat
1308             (input, "scheduler-policy %U", unformat_sched_policy,
1309              &tm->sched_policy))
1310         ;
1311       else if (unformat (input, "scheduler-priority %u", &tm->sched_priority))
1312         ;
1313       else if (unformat (input, "%s %u", &name, &count))
1314         {
1315           p = hash_get_mem (tm->thread_registrations_by_name, name);
1316           if (p == 0)
1317             return clib_error_return (0, "no such thread type 3 '%s'", name);
1318
1319           tr = (vlib_thread_registration_t *) p[0];
1320           if (tr->fixed_count)
1321             return clib_error_return
1322               (0, "number of %s threads not configurable", tr->name);
1323           tr->count = count;
1324         }
1325       else
1326         break;
1327     }
1328
1329   if (tm->sched_priority != ~0)
1330     {
1331       if (tm->sched_policy == SCHED_FIFO || tm->sched_policy == SCHED_RR)
1332         {
1333           u32 prio_max = sched_get_priority_max (tm->sched_policy);
1334           u32 prio_min = sched_get_priority_min (tm->sched_policy);
1335           if (tm->sched_priority > prio_max)
1336             tm->sched_priority = prio_max;
1337           if (tm->sched_priority < prio_min)
1338             tm->sched_priority = prio_min;
1339         }
1340       else
1341         {
1342           return clib_error_return
1343             (0,
1344              "scheduling priority (%d) is not allowed for `normal` scheduling policy",
1345              tm->sched_priority);
1346         }
1347     }
1348   tr = tm->next;
1349
1350   if (!tm->thread_prefix)
1351     tm->thread_prefix = format (0, "vpp");
1352
1353   while (tr)
1354     {
1355       tm->n_thread_stacks += tr->count;
1356       tm->n_pthreads += tr->count * tr->use_pthreads;
1357       tm->n_threads += tr->count * (tr->use_pthreads == 0);
1358       tr = tr->next;
1359     }
1360
1361   return 0;
1362 }
1363
1364 VLIB_EARLY_CONFIG_FUNCTION (cpu_config, "cpu");
1365
1366 void vnet_main_fixup (vlib_fork_fixup_t which) __attribute__ ((weak));
1367 void
1368 vnet_main_fixup (vlib_fork_fixup_t which)
1369 {
1370 }
1371
1372 void
1373 vlib_worker_thread_fork_fixup (vlib_fork_fixup_t which)
1374 {
1375   vlib_main_t *vm = vlib_get_main ();
1376
1377   if (vlib_mains == 0)
1378     return;
1379
1380   ASSERT (vlib_get_thread_index () == 0);
1381   vlib_worker_thread_barrier_sync (vm);
1382
1383   switch (which)
1384     {
1385     case VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX:
1386       vnet_main_fixup (VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX);
1387       break;
1388
1389     default:
1390       ASSERT (0);
1391     }
1392   vlib_worker_thread_barrier_release (vm);
1393 }
1394
1395   /*
1396    * Enforce minimum open time to minimize packet loss due to Rx overflow,
1397    * based on a test based heuristic that barrier should be open for at least
1398    * 3 time as long as it is closed (with an upper bound of 1ms because by that
1399    *  point it is probably too late to make a difference)
1400    */
1401
1402 #ifndef BARRIER_MINIMUM_OPEN_LIMIT
1403 #define BARRIER_MINIMUM_OPEN_LIMIT 0.001
1404 #endif
1405
1406 #ifndef BARRIER_MINIMUM_OPEN_FACTOR
1407 #define BARRIER_MINIMUM_OPEN_FACTOR 3
1408 #endif
1409
1410 void
1411 vlib_worker_thread_initial_barrier_sync_and_release (vlib_main_t * vm)
1412 {
1413   f64 deadline;
1414   f64 now = vlib_time_now (vm);
1415   u32 count = vec_len (vlib_mains) - 1;
1416
1417   /* No worker threads? */
1418   if (count == 0)
1419     return;
1420
1421   deadline = now + BARRIER_SYNC_TIMEOUT;
1422   *vlib_worker_threads->wait_at_barrier = 1;
1423   while (*vlib_worker_threads->workers_at_barrier != count)
1424     {
1425       if ((now = vlib_time_now (vm)) > deadline)
1426         {
1427           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1428           os_panic ();
1429         }
1430       CLIB_PAUSE ();
1431     }
1432   *vlib_worker_threads->wait_at_barrier = 0;
1433 }
1434
1435 void
1436 vlib_worker_thread_barrier_sync_int (vlib_main_t * vm, const char *func_name)
1437 {
1438   f64 deadline;
1439   f64 now;
1440   f64 t_entry;
1441   f64 t_open;
1442   f64 t_closed;
1443   f64 max_vector_rate;
1444   u32 count;
1445   int i;
1446
1447   if (vec_len (vlib_mains) < 2)
1448     return;
1449
1450   ASSERT (vlib_get_thread_index () == 0);
1451
1452   vlib_worker_threads[0].barrier_caller = func_name;
1453   count = vec_len (vlib_mains) - 1;
1454
1455   /* Record entry relative to last close */
1456   now = vlib_time_now (vm);
1457   t_entry = now - vm->barrier_epoch;
1458
1459   /* Tolerate recursive calls */
1460   if (++vlib_worker_threads[0].recursion_level > 1)
1461     {
1462       barrier_trace_sync_rec (t_entry);
1463       return;
1464     }
1465
1466   /*
1467    * Need data to decide if we're working hard enough to honor
1468    * the barrier hold-down timer.
1469    */
1470   max_vector_rate = 0.0;
1471   for (i = 1; i < vec_len (vlib_mains); i++)
1472     max_vector_rate =
1473       clib_max (max_vector_rate,
1474                 (f64) vlib_last_vectors_per_main_loop (vlib_mains[i]));
1475
1476   vlib_worker_threads[0].barrier_sync_count++;
1477
1478   /* Enforce minimum barrier open time to minimize packet loss */
1479   ASSERT (vm->barrier_no_close_before <= (now + BARRIER_MINIMUM_OPEN_LIMIT));
1480
1481   /*
1482    * If any worker thread seems busy, which we define
1483    * as a vector rate above 10, we enforce the barrier hold-down timer
1484    */
1485   if (max_vector_rate > 10.0)
1486     {
1487       while (1)
1488         {
1489           now = vlib_time_now (vm);
1490           /* Barrier hold-down timer expired? */
1491           if (now >= vm->barrier_no_close_before)
1492             break;
1493           if ((vm->barrier_no_close_before - now)
1494               > (2.0 * BARRIER_MINIMUM_OPEN_LIMIT))
1495             {
1496               clib_warning
1497                 ("clock change: would have waited for %.4f seconds",
1498                  (vm->barrier_no_close_before - now));
1499               break;
1500             }
1501         }
1502     }
1503   /* Record time of closure */
1504   t_open = now - vm->barrier_epoch;
1505   vm->barrier_epoch = now;
1506
1507   deadline = now + BARRIER_SYNC_TIMEOUT;
1508
1509   *vlib_worker_threads->wait_at_barrier = 1;
1510   while (*vlib_worker_threads->workers_at_barrier != count)
1511     {
1512       if ((now = vlib_time_now (vm)) > deadline)
1513         {
1514           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1515           os_panic ();
1516         }
1517     }
1518
1519   t_closed = now - vm->barrier_epoch;
1520
1521   barrier_trace_sync (t_entry, t_open, t_closed);
1522
1523 }
1524
1525 void
1526 vlib_worker_thread_barrier_release (vlib_main_t * vm)
1527 {
1528   f64 deadline;
1529   f64 now;
1530   f64 minimum_open;
1531   f64 t_entry;
1532   f64 t_closed_total;
1533   f64 t_update_main = 0.0;
1534   int refork_needed = 0;
1535
1536   if (vec_len (vlib_mains) < 2)
1537     return;
1538
1539   ASSERT (vlib_get_thread_index () == 0);
1540
1541
1542   now = vlib_time_now (vm);
1543   t_entry = now - vm->barrier_epoch;
1544
1545   if (--vlib_worker_threads[0].recursion_level > 0)
1546     {
1547       barrier_trace_release_rec (t_entry);
1548       return;
1549     }
1550
1551   /* Update (all) node runtimes before releasing the barrier, if needed */
1552   if (vm->need_vlib_worker_thread_node_runtime_update)
1553     {
1554       /*
1555        * Lock stat segment here, so we's safe when
1556        * rebuilding the stat segment node clones from the
1557        * stat thread...
1558        */
1559       vlib_stat_segment_lock ();
1560
1561       /* Do stats elements on main thread */
1562       worker_thread_node_runtime_update_internal ();
1563       vm->need_vlib_worker_thread_node_runtime_update = 0;
1564
1565       /* Do per thread rebuilds in parallel */
1566       refork_needed = 1;
1567       clib_atomic_fetch_add (vlib_worker_threads->node_reforks_required,
1568                              (vec_len (vlib_mains) - 1));
1569       now = vlib_time_now (vm);
1570       t_update_main = now - vm->barrier_epoch;
1571     }
1572
1573   deadline = now + BARRIER_SYNC_TIMEOUT;
1574
1575   /*
1576    * Note when we let go of the barrier.
1577    * Workers can use this to derive a reasonably accurate
1578    * time offset. See vlib_time_now(...)
1579    */
1580   vm->time_last_barrier_release = vlib_time_now (vm);
1581   CLIB_MEMORY_STORE_BARRIER ();
1582
1583   *vlib_worker_threads->wait_at_barrier = 0;
1584
1585   while (*vlib_worker_threads->workers_at_barrier > 0)
1586     {
1587       if ((now = vlib_time_now (vm)) > deadline)
1588         {
1589           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1590           os_panic ();
1591         }
1592     }
1593
1594   /* Wait for reforks before continuing */
1595   if (refork_needed)
1596     {
1597       now = vlib_time_now (vm);
1598
1599       deadline = now + BARRIER_SYNC_TIMEOUT;
1600
1601       while (*vlib_worker_threads->node_reforks_required > 0)
1602         {
1603           if ((now = vlib_time_now (vm)) > deadline)
1604             {
1605               fformat (stderr, "%s: worker thread refork deadlock\n",
1606                        __FUNCTION__);
1607               os_panic ();
1608             }
1609         }
1610       vlib_stat_segment_unlock ();
1611     }
1612
1613   t_closed_total = now - vm->barrier_epoch;
1614
1615   minimum_open = t_closed_total * BARRIER_MINIMUM_OPEN_FACTOR;
1616
1617   if (minimum_open > BARRIER_MINIMUM_OPEN_LIMIT)
1618     {
1619       minimum_open = BARRIER_MINIMUM_OPEN_LIMIT;
1620     }
1621
1622   vm->barrier_no_close_before = now + minimum_open;
1623
1624   /* Record barrier epoch (used to enforce minimum open time) */
1625   vm->barrier_epoch = now;
1626
1627   barrier_trace_release (t_entry, t_closed_total, t_update_main);
1628
1629 }
1630
1631 /*
1632  * Check the frame queue to see if any frames are available.
1633  * If so, pull the packets off the frames and put them to
1634  * the handoff node.
1635  */
1636 int
1637 vlib_frame_queue_dequeue (vlib_main_t * vm, vlib_frame_queue_main_t * fqm)
1638 {
1639   u32 thread_id = vm->thread_index;
1640   vlib_frame_queue_t *fq = fqm->vlib_frame_queues[thread_id];
1641   vlib_frame_queue_elt_t *elt;
1642   u32 *from, *to;
1643   vlib_frame_t *f;
1644   int msg_type;
1645   int processed = 0;
1646   u32 n_left_to_node;
1647   u32 vectors = 0;
1648
1649   ASSERT (fq);
1650   ASSERT (vm == vlib_mains[thread_id]);
1651
1652   if (PREDICT_FALSE (fqm->node_index == ~0))
1653     return 0;
1654   /*
1655    * Gather trace data for frame queues
1656    */
1657   if (PREDICT_FALSE (fq->trace))
1658     {
1659       frame_queue_trace_t *fqt;
1660       frame_queue_nelt_counter_t *fqh;
1661       u32 elix;
1662
1663       fqt = &fqm->frame_queue_traces[thread_id];
1664
1665       fqt->nelts = fq->nelts;
1666       fqt->head = fq->head;
1667       fqt->head_hint = fq->head_hint;
1668       fqt->tail = fq->tail;
1669       fqt->threshold = fq->vector_threshold;
1670       fqt->n_in_use = fqt->tail - fqt->head;
1671       if (fqt->n_in_use >= fqt->nelts)
1672         {
1673           // if beyond max then use max
1674           fqt->n_in_use = fqt->nelts - 1;
1675         }
1676
1677       /* Record the number of elements in use in the histogram */
1678       fqh = &fqm->frame_queue_histogram[thread_id];
1679       fqh->count[fqt->n_in_use]++;
1680
1681       /* Record a snapshot of the elements in use */
1682       for (elix = 0; elix < fqt->nelts; elix++)
1683         {
1684           elt = fq->elts + ((fq->head + 1 + elix) & (fq->nelts - 1));
1685           if (1 || elt->valid)
1686             {
1687               fqt->n_vectors[elix] = elt->n_vectors;
1688             }
1689         }
1690       fqt->written = 1;
1691     }
1692
1693   while (1)
1694     {
1695       vlib_buffer_t *b;
1696       if (fq->head == fq->tail)
1697         {
1698           fq->head_hint = fq->head;
1699           return processed;
1700         }
1701
1702       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
1703
1704       if (!elt->valid)
1705         {
1706           fq->head_hint = fq->head;
1707           return processed;
1708         }
1709
1710       from = elt->buffer_index;
1711       msg_type = elt->msg_type;
1712
1713       ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME);
1714       ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE);
1715
1716       f = vlib_get_frame_to_node (vm, fqm->node_index);
1717
1718       /* If the first vector is traced, set the frame trace flag */
1719       b = vlib_get_buffer (vm, from[0]);
1720       if (b->flags & VLIB_BUFFER_IS_TRACED)
1721         f->frame_flags |= VLIB_NODE_FLAG_TRACE;
1722
1723       to = vlib_frame_vector_args (f);
1724
1725       n_left_to_node = elt->n_vectors;
1726
1727       while (n_left_to_node >= 4)
1728         {
1729           to[0] = from[0];
1730           to[1] = from[1];
1731           to[2] = from[2];
1732           to[3] = from[3];
1733           to += 4;
1734           from += 4;
1735           n_left_to_node -= 4;
1736         }
1737
1738       while (n_left_to_node > 0)
1739         {
1740           to[0] = from[0];
1741           to++;
1742           from++;
1743           n_left_to_node--;
1744         }
1745
1746       vectors += elt->n_vectors;
1747       f->n_vectors = elt->n_vectors;
1748       vlib_put_frame_to_node (vm, fqm->node_index, f);
1749
1750       elt->valid = 0;
1751       elt->n_vectors = 0;
1752       elt->msg_type = 0xfefefefe;
1753       CLIB_MEMORY_BARRIER ();
1754       fq->head++;
1755       processed++;
1756
1757       /*
1758        * Limit the number of packets pushed into the graph
1759        */
1760       if (vectors >= fq->vector_threshold)
1761         {
1762           fq->head_hint = fq->head;
1763           return processed;
1764         }
1765     }
1766   ASSERT (0);
1767   return processed;
1768 }
1769
1770 void
1771 vlib_worker_thread_fn (void *arg)
1772 {
1773   vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg;
1774   vlib_thread_main_t *tm = vlib_get_thread_main ();
1775   vlib_main_t *vm = vlib_get_main ();
1776   clib_error_t *e;
1777
1778   ASSERT (vm->thread_index == vlib_get_thread_index ());
1779
1780   vlib_worker_thread_init (w);
1781   clib_time_init (&vm->clib_time);
1782   clib_mem_set_heap (w->thread_mheap);
1783
1784   e = vlib_call_init_exit_functions_no_sort
1785     (vm, &vm->worker_init_function_registrations, 1 /* call_once */ );
1786   if (e)
1787     clib_error_report (e);
1788
1789   /* Wait until the dpdk init sequence is complete */
1790   while (tm->extern_thread_mgmt && tm->worker_thread_release == 0)
1791     vlib_worker_thread_barrier_check ();
1792
1793   vlib_worker_loop (vm);
1794 }
1795
1796 /* *INDENT-OFF* */
1797 VLIB_REGISTER_THREAD (worker_thread_reg, static) = {
1798   .name = "workers",
1799   .short_name = "wk",
1800   .function = vlib_worker_thread_fn,
1801 };
1802 /* *INDENT-ON* */
1803
1804 u32
1805 vlib_frame_queue_main_init (u32 node_index, u32 frame_queue_nelts)
1806 {
1807   vlib_thread_main_t *tm = vlib_get_thread_main ();
1808   vlib_frame_queue_main_t *fqm;
1809   vlib_frame_queue_t *fq;
1810   int i;
1811
1812   if (frame_queue_nelts == 0)
1813     frame_queue_nelts = FRAME_QUEUE_MAX_NELTS;
1814
1815   ASSERT (frame_queue_nelts >= 8);
1816
1817   vec_add2 (tm->frame_queue_mains, fqm, 1);
1818
1819   fqm->node_index = node_index;
1820   fqm->frame_queue_nelts = frame_queue_nelts;
1821   fqm->queue_hi_thresh = frame_queue_nelts - 2;
1822
1823   vec_validate (fqm->vlib_frame_queues, tm->n_vlib_mains - 1);
1824   vec_validate (fqm->per_thread_data, tm->n_vlib_mains - 1);
1825   _vec_len (fqm->vlib_frame_queues) = 0;
1826   for (i = 0; i < tm->n_vlib_mains; i++)
1827     {
1828       vlib_frame_queue_per_thread_data_t *ptd;
1829       fq = vlib_frame_queue_alloc (frame_queue_nelts);
1830       vec_add1 (fqm->vlib_frame_queues, fq);
1831
1832       ptd = vec_elt_at_index (fqm->per_thread_data, i);
1833       vec_validate (ptd->handoff_queue_elt_by_thread_index,
1834                     tm->n_vlib_mains - 1);
1835       vec_validate_init_empty (ptd->congested_handoff_queue_by_thread_index,
1836                                tm->n_vlib_mains - 1,
1837                                (vlib_frame_queue_t *) (~0));
1838     }
1839
1840   return (fqm - tm->frame_queue_mains);
1841 }
1842
1843 int
1844 vlib_thread_cb_register (struct vlib_main_t *vm, vlib_thread_callbacks_t * cb)
1845 {
1846   vlib_thread_main_t *tm = vlib_get_thread_main ();
1847
1848   if (tm->extern_thread_mgmt)
1849     return -1;
1850
1851   tm->cb.vlib_launch_thread_cb = cb->vlib_launch_thread_cb;
1852   tm->extern_thread_mgmt = 1;
1853   return 0;
1854 }
1855
1856 void
1857 vlib_process_signal_event_mt_helper (vlib_process_signal_event_mt_args_t *
1858                                      args)
1859 {
1860   ASSERT (vlib_get_thread_index () == 0);
1861   vlib_process_signal_event (vlib_get_main (), args->node_index,
1862                              args->type_opaque, args->data);
1863 }
1864
1865 void *rpc_call_main_thread_cb_fn;
1866
1867 void
1868 vlib_rpc_call_main_thread (void *callback, u8 * args, u32 arg_size)
1869 {
1870   if (rpc_call_main_thread_cb_fn)
1871     {
1872       void (*fp) (void *, u8 *, u32) = rpc_call_main_thread_cb_fn;
1873       (*fp) (callback, args, arg_size);
1874     }
1875   else
1876     clib_warning ("BUG: rpc_call_main_thread_cb_fn NULL!");
1877 }
1878
1879 clib_error_t *
1880 threads_init (vlib_main_t * vm)
1881 {
1882   return 0;
1883 }
1884
1885 VLIB_INIT_FUNCTION (threads_init);
1886
1887
1888 static clib_error_t *
1889 show_clock_command_fn (vlib_main_t * vm,
1890                        unformat_input_t * input, vlib_cli_command_t * cmd)
1891 {
1892   int i;
1893   int verbose = 0;
1894   clib_timebase_t _tb, *tb = &_tb;
1895
1896   (void) unformat (input, "verbose %=", &verbose, 1);
1897
1898   clib_timebase_init (tb, 0 /* GMT */ , CLIB_TIMEBASE_DAYLIGHT_NONE,
1899                       &vm->clib_time);
1900
1901   vlib_cli_output (vm, "%U, %U GMT", format_clib_time, &vm->clib_time,
1902                    verbose, format_clib_timebase_time,
1903                    clib_timebase_now (tb));
1904
1905   if (vec_len (vlib_mains) == 1)
1906     return 0;
1907
1908   vlib_cli_output (vm, "Time last barrier release %.9f",
1909                    vm->time_last_barrier_release);
1910
1911   for (i = 1; i < vec_len (vlib_mains); i++)
1912     {
1913       if (vlib_mains[i] == 0)
1914         continue;
1915
1916       vlib_cli_output (vm, "%d: %U", i, format_clib_time,
1917                        &vlib_mains[i]->clib_time, verbose);
1918
1919       vlib_cli_output (vm, "Thread %d offset %.9f error %.9f", i,
1920                        vlib_mains[i]->time_offset,
1921                        vm->time_last_barrier_release -
1922                        vlib_mains[i]->time_last_barrier_release);
1923     }
1924   return 0;
1925 }
1926
1927 /* *INDENT-OFF* */
1928 VLIB_CLI_COMMAND (f_command, static) =
1929 {
1930   .path = "show clock",
1931   .short_help = "show clock",
1932   .function = show_clock_command_fn,
1933 };
1934 /* *INDENT-ON* */
1935
1936 /*
1937  * fd.io coding-style-patch-verification: ON
1938  *
1939  * Local Variables:
1940  * eval: (c-set-style "gnu")
1941  * End:
1942  */