d9d781070a37d0e2b3820511d00e72c9343aa457
[vpp.git] / src / vlib / threads.c
1 /*
2  * Copyright (c) 2015 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define _GNU_SOURCE
16
17 #include <signal.h>
18 #include <math.h>
19 #include <vppinfra/format.h>
20 #include <vppinfra/linux/sysfs.h>
21 #include <vlib/vlib.h>
22
23 #include <vlib/threads.h>
24 #include <vlib/unix/cj.h>
25
26 #include <vlib/stat_weak_inlines.h>
27
28 DECLARE_CJ_GLOBAL_LOG;
29
30
31 u32
32 vl (void *p)
33 {
34   return vec_len (p);
35 }
36
37 vlib_worker_thread_t *vlib_worker_threads;
38 vlib_thread_main_t vlib_thread_main;
39
40 /*
41  * Barrier tracing can be enabled on a normal build to collect information
42  * on barrier use, including timings and call stacks.  Deliberately not
43  * keyed off CLIB_DEBUG, because that can add significant overhead which
44  * imapacts observed timings.
45  */
46
47 static inline void
48 barrier_trace_sync (f64 t_entry, f64 t_open, f64 t_closed)
49 {
50   if (!vlib_worker_threads->barrier_elog_enabled)
51     return;
52
53     /* *INDENT-OFF* */
54     ELOG_TYPE_DECLARE (e) =
55       {
56         .format = "bar-trace-%s-#%d",
57         .format_args = "T4i4",
58       };
59     /* *INDENT-ON* */
60   struct
61   {
62     u32 caller, count, t_entry, t_open, t_closed;
63   } *ed = 0;
64
65   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
66   ed->count = (int) vlib_worker_threads[0].barrier_sync_count;
67   ed->caller = elog_string (&vlib_global_main.elog_main,
68                             (char *) vlib_worker_threads[0].barrier_caller);
69   ed->t_entry = (int) (1000000.0 * t_entry);
70   ed->t_open = (int) (1000000.0 * t_open);
71   ed->t_closed = (int) (1000000.0 * t_closed);
72 }
73
74 static inline void
75 barrier_trace_sync_rec (f64 t_entry)
76 {
77   if (!vlib_worker_threads->barrier_elog_enabled)
78     return;
79
80     /* *INDENT-OFF* */
81     ELOG_TYPE_DECLARE (e) =
82       {
83         .format = "bar-syncrec-%s-#%d",
84         .format_args = "T4i4",
85       };
86     /* *INDENT-ON* */
87   struct
88   {
89     u32 caller, depth;
90   } *ed = 0;
91
92   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
93   ed->depth = (int) vlib_worker_threads[0].recursion_level - 1;
94   ed->caller = elog_string (&vlib_global_main.elog_main,
95                             (char *) vlib_worker_threads[0].barrier_caller);
96 }
97
98 static inline void
99 barrier_trace_release_rec (f64 t_entry)
100 {
101   if (!vlib_worker_threads->barrier_elog_enabled)
102     return;
103
104     /* *INDENT-OFF* */
105     ELOG_TYPE_DECLARE (e) =
106       {
107         .format = "bar-relrrec-#%d",
108         .format_args = "i4",
109       };
110     /* *INDENT-ON* */
111   struct
112   {
113     u32 depth;
114   } *ed = 0;
115
116   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
117   ed->depth = (int) vlib_worker_threads[0].recursion_level;
118 }
119
120 static inline void
121 barrier_trace_release (f64 t_entry, f64 t_closed_total, f64 t_update_main)
122 {
123   if (!vlib_worker_threads->barrier_elog_enabled)
124     return;
125
126     /* *INDENT-OFF* */
127     ELOG_TYPE_DECLARE (e) =
128       {
129         .format = "bar-rel-#%d-e%d-u%d-t%d",
130         .format_args = "i4i4i4i4",
131       };
132     /* *INDENT-ON* */
133   struct
134   {
135     u32 count, t_entry, t_update_main, t_closed_total;
136   } *ed = 0;
137
138   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
139   ed->t_entry = (int) (1000000.0 * t_entry);
140   ed->t_update_main = (int) (1000000.0 * t_update_main);
141   ed->t_closed_total = (int) (1000000.0 * t_closed_total);
142   ed->count = (int) vlib_worker_threads[0].barrier_sync_count;
143
144   /* Reset context for next trace */
145   vlib_worker_threads[0].barrier_context = NULL;
146 }
147
148 uword
149 os_get_nthreads (void)
150 {
151   return vec_len (vlib_thread_stacks);
152 }
153
154 void
155 vlib_set_thread_name (char *name)
156 {
157   int pthread_setname_np (pthread_t __target_thread, const char *__name);
158   int rv;
159   pthread_t thread = pthread_self ();
160
161   if (thread)
162     {
163       rv = pthread_setname_np (thread, name);
164       if (rv)
165         clib_warning ("pthread_setname_np returned %d", rv);
166     }
167 }
168
169 static int
170 sort_registrations_by_no_clone (void *a0, void *a1)
171 {
172   vlib_thread_registration_t **tr0 = a0;
173   vlib_thread_registration_t **tr1 = a1;
174
175   return ((i32) ((*tr0)->no_data_structure_clone)
176           - ((i32) ((*tr1)->no_data_structure_clone)));
177 }
178
179 static uword *
180 clib_sysfs_list_to_bitmap (char *filename)
181 {
182   FILE *fp;
183   uword *r = 0;
184
185   fp = fopen (filename, "r");
186
187   if (fp != NULL)
188     {
189       u8 *buffer = 0;
190       vec_validate (buffer, 256 - 1);
191       if (fgets ((char *) buffer, 256, fp))
192         {
193           unformat_input_t in;
194           unformat_init_string (&in, (char *) buffer,
195                                 strlen ((char *) buffer));
196           if (unformat (&in, "%U", unformat_bitmap_list, &r) != 1)
197             clib_warning ("unformat_bitmap_list failed");
198           unformat_free (&in);
199         }
200       vec_free (buffer);
201       fclose (fp);
202     }
203   return r;
204 }
205
206
207 /* Called early in the init sequence */
208
209 clib_error_t *
210 vlib_thread_init (vlib_main_t * vm)
211 {
212   vlib_thread_main_t *tm = &vlib_thread_main;
213   vlib_worker_thread_t *w;
214   vlib_thread_registration_t *tr;
215   u32 n_vlib_mains = 1;
216   u32 first_index = 1;
217   u32 i;
218   uword *avail_cpu;
219
220   /* get bitmaps of active cpu cores and sockets */
221   tm->cpu_core_bitmap =
222     clib_sysfs_list_to_bitmap ("/sys/devices/system/cpu/online");
223   tm->cpu_socket_bitmap =
224     clib_sysfs_list_to_bitmap ("/sys/devices/system/node/online");
225
226   avail_cpu = clib_bitmap_dup (tm->cpu_core_bitmap);
227
228   /* skip cores */
229   for (i = 0; i < tm->skip_cores; i++)
230     {
231       uword c = clib_bitmap_first_set (avail_cpu);
232       if (c == ~0)
233         return clib_error_return (0, "no available cpus to skip");
234
235       avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
236     }
237
238   /* grab cpu for main thread */
239   if (tm->main_lcore == ~0)
240     {
241       /* if main-lcore is not set, we try to use lcore 1 */
242       if (clib_bitmap_get (avail_cpu, 1))
243         tm->main_lcore = 1;
244       else
245         tm->main_lcore = clib_bitmap_first_set (avail_cpu);
246       if (tm->main_lcore == (u8) ~ 0)
247         return clib_error_return (0, "no available cpus to be used for the"
248                                   " main thread");
249     }
250   else
251     {
252       if (clib_bitmap_get (avail_cpu, tm->main_lcore) == 0)
253         return clib_error_return (0, "cpu %u is not available to be used"
254                                   " for the main thread", tm->main_lcore);
255     }
256   avail_cpu = clib_bitmap_set (avail_cpu, tm->main_lcore, 0);
257
258   /* assume that there is socket 0 only if there is no data from sysfs */
259   if (!tm->cpu_socket_bitmap)
260     tm->cpu_socket_bitmap = clib_bitmap_set (0, 0, 1);
261
262   /* pin main thread to main_lcore  */
263   if (tm->cb.vlib_thread_set_lcore_cb)
264     {
265       tm->cb.vlib_thread_set_lcore_cb (0, tm->main_lcore);
266     }
267   else
268     {
269       cpu_set_t cpuset;
270       CPU_ZERO (&cpuset);
271       CPU_SET (tm->main_lcore, &cpuset);
272       pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
273     }
274
275   /* Set up thread 0 */
276   vec_validate_aligned (vlib_worker_threads, 0, CLIB_CACHE_LINE_BYTES);
277   _vec_len (vlib_worker_threads) = 1;
278   w = vlib_worker_threads;
279   w->thread_mheap = clib_mem_get_heap ();
280   w->thread_stack = vlib_thread_stacks[0];
281   w->cpu_id = tm->main_lcore;
282   w->lwp = syscall (SYS_gettid);
283   w->thread_id = pthread_self ();
284   tm->n_vlib_mains = 1;
285
286   vlib_get_thread_core_numa (w, w->cpu_id);
287
288   if (tm->sched_policy != ~0)
289     {
290       struct sched_param sched_param;
291       if (!sched_getparam (w->lwp, &sched_param))
292         {
293           if (tm->sched_priority != ~0)
294             sched_param.sched_priority = tm->sched_priority;
295           sched_setscheduler (w->lwp, tm->sched_policy, &sched_param);
296         }
297     }
298
299   /* assign threads to cores and set n_vlib_mains */
300   tr = tm->next;
301
302   while (tr)
303     {
304       vec_add1 (tm->registrations, tr);
305       tr = tr->next;
306     }
307
308   vec_sort_with_function (tm->registrations, sort_registrations_by_no_clone);
309
310   for (i = 0; i < vec_len (tm->registrations); i++)
311     {
312       int j;
313       tr = tm->registrations[i];
314       tr->first_index = first_index;
315       first_index += tr->count;
316       n_vlib_mains += (tr->no_data_structure_clone == 0) ? tr->count : 0;
317
318       /* construct coremask */
319       if (tr->use_pthreads || !tr->count)
320         continue;
321
322       if (tr->coremask)
323         {
324           uword c;
325           /* *INDENT-OFF* */
326           clib_bitmap_foreach (c, tr->coremask, ({
327             if (clib_bitmap_get(avail_cpu, c) == 0)
328               return clib_error_return (0, "cpu %u is not available to be used"
329                                         " for the '%s' thread",c, tr->name);
330
331             avail_cpu = clib_bitmap_set(avail_cpu, c, 0);
332           }));
333           /* *INDENT-ON* */
334         }
335       else
336         {
337           for (j = 0; j < tr->count; j++)
338             {
339               uword c = clib_bitmap_first_set (avail_cpu);
340               if (c == ~0)
341                 return clib_error_return (0,
342                                           "no available cpus to be used for"
343                                           " the '%s' thread", tr->name);
344
345               avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
346               tr->coremask = clib_bitmap_set (tr->coremask, c, 1);
347             }
348         }
349     }
350
351   clib_bitmap_free (avail_cpu);
352
353   tm->n_vlib_mains = n_vlib_mains;
354
355   /*
356    * Allocate the remaining worker threads, and thread stack vector slots
357    * from now on, calls to os_get_nthreads() will return the correct
358    * answer.
359    */
360   vec_validate_aligned (vlib_worker_threads, first_index - 1,
361                         CLIB_CACHE_LINE_BYTES);
362   vec_validate (vlib_thread_stacks, vec_len (vlib_worker_threads) - 1);
363   return 0;
364 }
365
366 vlib_frame_queue_t *
367 vlib_frame_queue_alloc (int nelts)
368 {
369   vlib_frame_queue_t *fq;
370
371   fq = clib_mem_alloc_aligned (sizeof (*fq), CLIB_CACHE_LINE_BYTES);
372   clib_memset (fq, 0, sizeof (*fq));
373   fq->nelts = nelts;
374   fq->vector_threshold = 128;   // packets
375   vec_validate_aligned (fq->elts, nelts - 1, CLIB_CACHE_LINE_BYTES);
376
377   if (1)
378     {
379       if (((uword) & fq->tail) & (CLIB_CACHE_LINE_BYTES - 1))
380         fformat (stderr, "WARNING: fq->tail unaligned\n");
381       if (((uword) & fq->head) & (CLIB_CACHE_LINE_BYTES - 1))
382         fformat (stderr, "WARNING: fq->head unaligned\n");
383       if (((uword) fq->elts) & (CLIB_CACHE_LINE_BYTES - 1))
384         fformat (stderr, "WARNING: fq->elts unaligned\n");
385
386       if (sizeof (fq->elts[0]) % CLIB_CACHE_LINE_BYTES)
387         fformat (stderr, "WARNING: fq->elts[0] size %d\n",
388                  sizeof (fq->elts[0]));
389       if (nelts & (nelts - 1))
390         {
391           fformat (stderr, "FATAL: nelts MUST be a power of 2\n");
392           abort ();
393         }
394     }
395
396   return (fq);
397 }
398
399 void vl_msg_api_handler_no_free (void *) __attribute__ ((weak));
400 void
401 vl_msg_api_handler_no_free (void *v)
402 {
403 }
404
405 /* Turned off, save as reference material... */
406 #if 0
407 static inline int
408 vlib_frame_queue_dequeue_internal (int thread_id,
409                                    vlib_main_t * vm, vlib_node_main_t * nm)
410 {
411   vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
412   vlib_frame_queue_elt_t *elt;
413   vlib_frame_t *f;
414   vlib_pending_frame_t *p;
415   vlib_node_runtime_t *r;
416   u32 node_runtime_index;
417   int msg_type;
418   u64 before;
419   int processed = 0;
420
421   ASSERT (vm == vlib_mains[thread_id]);
422
423   while (1)
424     {
425       if (fq->head == fq->tail)
426         return processed;
427
428       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
429
430       if (!elt->valid)
431         return processed;
432
433       before = clib_cpu_time_now ();
434
435       f = elt->frame;
436       node_runtime_index = elt->node_runtime_index;
437       msg_type = elt->msg_type;
438
439       switch (msg_type)
440         {
441         case VLIB_FRAME_QUEUE_ELT_FREE_BUFFERS:
442           vlib_buffer_free (vm, vlib_frame_vector_args (f), f->n_vectors);
443           /* note fallthrough... */
444         case VLIB_FRAME_QUEUE_ELT_FREE_FRAME:
445           r = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
446                                 node_runtime_index);
447           vlib_frame_free (vm, r, f);
448           break;
449         case VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME:
450           vec_add2 (vm->node_main.pending_frames, p, 1);
451           f->flags |= (VLIB_FRAME_PENDING | VLIB_FRAME_FREE_AFTER_DISPATCH);
452           p->node_runtime_index = elt->node_runtime_index;
453           p->frame_index = vlib_frame_index (vm, f);
454           p->next_frame_index = VLIB_PENDING_FRAME_NO_NEXT_FRAME;
455           fq->dequeue_vectors += (u64) f->n_vectors;
456           break;
457         case VLIB_FRAME_QUEUE_ELT_API_MSG:
458           vl_msg_api_handler_no_free (f);
459           break;
460         default:
461           clib_warning ("bogus frame queue message, type %d", msg_type);
462           break;
463         }
464       elt->valid = 0;
465       fq->dequeues++;
466       fq->dequeue_ticks += clib_cpu_time_now () - before;
467       CLIB_MEMORY_BARRIER ();
468       fq->head++;
469       processed++;
470     }
471   ASSERT (0);
472   return processed;
473 }
474
475 int
476 vlib_frame_queue_dequeue (int thread_id,
477                           vlib_main_t * vm, vlib_node_main_t * nm)
478 {
479   return vlib_frame_queue_dequeue_internal (thread_id, vm, nm);
480 }
481
482 int
483 vlib_frame_queue_enqueue (vlib_main_t * vm, u32 node_runtime_index,
484                           u32 frame_queue_index, vlib_frame_t * frame,
485                           vlib_frame_queue_msg_type_t type)
486 {
487   vlib_frame_queue_t *fq = vlib_frame_queues[frame_queue_index];
488   vlib_frame_queue_elt_t *elt;
489   u32 save_count;
490   u64 new_tail;
491   u64 before = clib_cpu_time_now ();
492
493   ASSERT (fq);
494
495   new_tail = clib_atomic_add_fetch (&fq->tail, 1);
496
497   /* Wait until a ring slot is available */
498   while (new_tail >= fq->head + fq->nelts)
499     {
500       f64 b4 = vlib_time_now_ticks (vm, before);
501       vlib_worker_thread_barrier_check (vm, b4);
502       /* Bad idea. Dequeue -> enqueue -> dequeue -> trouble */
503       // vlib_frame_queue_dequeue (vm->thread_index, vm, nm);
504     }
505
506   elt = fq->elts + (new_tail & (fq->nelts - 1));
507
508   /* this would be very bad... */
509   while (elt->valid)
510     {
511     }
512
513   /* Once we enqueue the frame, frame->n_vectors is owned elsewhere... */
514   save_count = frame->n_vectors;
515
516   elt->frame = frame;
517   elt->node_runtime_index = node_runtime_index;
518   elt->msg_type = type;
519   CLIB_MEMORY_BARRIER ();
520   elt->valid = 1;
521
522   return save_count;
523 }
524 #endif /* 0 */
525
526 /* To be called by vlib worker threads upon startup */
527 void
528 vlib_worker_thread_init (vlib_worker_thread_t * w)
529 {
530   vlib_thread_main_t *tm = vlib_get_thread_main ();
531
532   /*
533    * Note: disabling signals in worker threads as follows
534    * prevents the api post-mortem dump scheme from working
535    * {
536    *    sigset_t s;
537    *    sigfillset (&s);
538    *    pthread_sigmask (SIG_SETMASK, &s, 0);
539    *  }
540    */
541
542   clib_mem_set_heap (w->thread_mheap);
543
544   if (vec_len (tm->thread_prefix) && w->registration->short_name)
545     {
546       w->name = format (0, "%v_%s_%d%c", tm->thread_prefix,
547                         w->registration->short_name, w->instance_id, '\0');
548       vlib_set_thread_name ((char *) w->name);
549     }
550
551   if (!w->registration->use_pthreads)
552     {
553
554       /* Initial barrier sync, for both worker and i/o threads */
555       clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, 1);
556
557       while (*vlib_worker_threads->wait_at_barrier)
558         ;
559
560       clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, -1);
561     }
562 }
563
564 void *
565 vlib_worker_thread_bootstrap_fn (void *arg)
566 {
567   void *rv;
568   vlib_worker_thread_t *w = arg;
569
570   w->lwp = syscall (SYS_gettid);
571   w->thread_id = pthread_self ();
572
573   __os_thread_index = w - vlib_worker_threads;
574
575   rv = (void *) clib_calljmp
576     ((uword (*)(uword)) w->thread_function,
577      (uword) arg, w->thread_stack + VLIB_THREAD_STACK_SIZE);
578   /* NOTREACHED, we hope */
579   return rv;
580 }
581
582 void
583 vlib_get_thread_core_numa (vlib_worker_thread_t * w, unsigned cpu_id)
584 {
585   const char *sys_cpu_path = "/sys/devices/system/cpu/cpu";
586   u8 *p = 0;
587   int core_id = -1, numa_id = -1;
588
589   p = format (p, "%s%u/topology/core_id%c", sys_cpu_path, cpu_id, 0);
590   clib_sysfs_read ((char *) p, "%d", &core_id);
591   vec_reset_length (p);
592   p = format (p, "%s%u/topology/physical_package_id%c", sys_cpu_path,
593               cpu_id, 0);
594   clib_sysfs_read ((char *) p, "%d", &numa_id);
595   vec_free (p);
596
597   w->core_id = core_id;
598   w->numa_id = numa_id;
599 }
600
601 static clib_error_t *
602 vlib_launch_thread_int (void *fp, vlib_worker_thread_t * w, unsigned cpu_id)
603 {
604   vlib_thread_main_t *tm = &vlib_thread_main;
605   void *(*fp_arg) (void *) = fp;
606   void *numa_heap;
607
608   w->cpu_id = cpu_id;
609   vlib_get_thread_core_numa (w, cpu_id);
610
611   /* Set up NUMA-bound heap if indicated */
612   if (clib_per_numa_mheaps[w->numa_id] == 0)
613     {
614       /* If the user requested a NUMA heap, create it... */
615       if (tm->numa_heap_size)
616         {
617           numa_heap = clib_mem_init_thread_safe_numa
618             (0 /* DIY */ , tm->numa_heap_size, w->numa_id);
619           clib_per_numa_mheaps[w->numa_id] = numa_heap;
620         }
621       else
622         {
623           /* Or, use the main heap */
624           clib_per_numa_mheaps[w->numa_id] = w->thread_mheap;
625         }
626     }
627
628   if (tm->cb.vlib_launch_thread_cb && !w->registration->use_pthreads)
629     return tm->cb.vlib_launch_thread_cb (fp, (void *) w, cpu_id);
630   else
631     {
632       pthread_t worker;
633       cpu_set_t cpuset;
634       CPU_ZERO (&cpuset);
635       CPU_SET (cpu_id, &cpuset);
636
637       if (pthread_create (&worker, NULL /* attr */ , fp_arg, (void *) w))
638         return clib_error_return_unix (0, "pthread_create");
639
640       if (pthread_setaffinity_np (worker, sizeof (cpu_set_t), &cpuset))
641         return clib_error_return_unix (0, "pthread_setaffinity_np");
642
643       return 0;
644     }
645 }
646
647 static clib_error_t *
648 start_workers (vlib_main_t * vm)
649 {
650   int i, j;
651   vlib_worker_thread_t *w;
652   vlib_main_t *vm_clone;
653   void *oldheap;
654   vlib_thread_main_t *tm = &vlib_thread_main;
655   vlib_thread_registration_t *tr;
656   vlib_node_runtime_t *rt;
657   u32 n_vlib_mains = tm->n_vlib_mains;
658   u32 worker_thread_index;
659   u8 *main_heap = clib_mem_get_per_cpu_heap ();
660
661   vec_reset_length (vlib_worker_threads);
662
663   /* Set up the main thread */
664   vec_add2_aligned (vlib_worker_threads, w, 1, CLIB_CACHE_LINE_BYTES);
665   w->elog_track.name = "main thread";
666   elog_track_register (&vm->elog_main, &w->elog_track);
667
668   if (vec_len (tm->thread_prefix))
669     {
670       w->name = format (0, "%v_main%c", tm->thread_prefix, '\0');
671       vlib_set_thread_name ((char *) w->name);
672     }
673
674   vm->elog_main.lock =
675     clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES, CLIB_CACHE_LINE_BYTES);
676   vm->elog_main.lock[0] = 0;
677
678   if (n_vlib_mains > 1)
679     {
680       /* Replace hand-crafted length-1 vector with a real vector */
681       vlib_mains = 0;
682
683       vec_validate_aligned (vlib_mains, tm->n_vlib_mains - 1,
684                             CLIB_CACHE_LINE_BYTES);
685       _vec_len (vlib_mains) = 0;
686       vec_add1_aligned (vlib_mains, vm, CLIB_CACHE_LINE_BYTES);
687
688       vlib_worker_threads->wait_at_barrier =
689         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
690       vlib_worker_threads->workers_at_barrier =
691         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
692
693       vlib_worker_threads->node_reforks_required =
694         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
695
696       /* We'll need the rpc vector lock... */
697       clib_spinlock_init (&vm->pending_rpc_lock);
698
699       /* Ask for an initial barrier sync */
700       *vlib_worker_threads->workers_at_barrier = 0;
701       *vlib_worker_threads->wait_at_barrier = 1;
702
703       /* Without update or refork */
704       *vlib_worker_threads->node_reforks_required = 0;
705       vm->need_vlib_worker_thread_node_runtime_update = 0;
706
707       /* init timing */
708       vm->barrier_epoch = 0;
709       vm->barrier_no_close_before = 0;
710
711       worker_thread_index = 1;
712
713       for (i = 0; i < vec_len (tm->registrations); i++)
714         {
715           vlib_node_main_t *nm, *nm_clone;
716           int k;
717
718           tr = tm->registrations[i];
719
720           if (tr->count == 0)
721             continue;
722
723           for (k = 0; k < tr->count; k++)
724             {
725               vlib_node_t *n;
726
727               vec_add2 (vlib_worker_threads, w, 1);
728               /* Currently unused, may not really work */
729               if (tr->mheap_size)
730                 {
731 #if USE_DLMALLOC == 0
732                   w->thread_mheap =
733                     mheap_alloc (0 /* use VM */ , tr->mheap_size);
734 #else
735                   w->thread_mheap = create_mspace (tr->mheap_size,
736                                                    0 /* unlocked */ );
737 #endif
738                 }
739               else
740                 w->thread_mheap = main_heap;
741
742               w->thread_stack =
743                 vlib_thread_stack_init (w - vlib_worker_threads);
744               w->thread_function = tr->function;
745               w->thread_function_arg = w;
746               w->instance_id = k;
747               w->registration = tr;
748
749               w->elog_track.name =
750                 (char *) format (0, "%s %d", tr->name, k + 1);
751               vec_add1 (w->elog_track.name, 0);
752               elog_track_register (&vm->elog_main, &w->elog_track);
753
754               if (tr->no_data_structure_clone)
755                 continue;
756
757               /* Fork vlib_global_main et al. Look for bugs here */
758               oldheap = clib_mem_set_heap (w->thread_mheap);
759
760               vm_clone = clib_mem_alloc_aligned (sizeof (*vm_clone),
761                                                  CLIB_CACHE_LINE_BYTES);
762               clib_memcpy (vm_clone, vlib_mains[0], sizeof (*vm_clone));
763
764               vm_clone->thread_index = worker_thread_index;
765               vm_clone->heap_base = w->thread_mheap;
766               vm_clone->heap_aligned_base = (void *)
767                 (((uword) w->thread_mheap) & ~(VLIB_FRAME_ALIGN - 1));
768               vm_clone->init_functions_called =
769                 hash_create (0, /* value bytes */ 0);
770               vm_clone->pending_rpc_requests = 0;
771               vec_validate (vm_clone->pending_rpc_requests, 0);
772               _vec_len (vm_clone->pending_rpc_requests) = 0;
773               clib_memset (&vm_clone->random_buffer, 0,
774                            sizeof (vm_clone->random_buffer));
775
776               nm = &vlib_mains[0]->node_main;
777               nm_clone = &vm_clone->node_main;
778               /* fork next frames array, preserving node runtime indices */
779               nm_clone->next_frames = vec_dup_aligned (nm->next_frames,
780                                                        CLIB_CACHE_LINE_BYTES);
781               for (j = 0; j < vec_len (nm_clone->next_frames); j++)
782                 {
783                   vlib_next_frame_t *nf = &nm_clone->next_frames[j];
784                   u32 save_node_runtime_index;
785                   u32 save_flags;
786
787                   save_node_runtime_index = nf->node_runtime_index;
788                   save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
789                   vlib_next_frame_init (nf);
790                   nf->node_runtime_index = save_node_runtime_index;
791                   nf->flags = save_flags;
792                 }
793
794               /* fork the frame dispatch queue */
795               nm_clone->pending_frames = 0;
796               vec_validate (nm_clone->pending_frames, 10);
797               _vec_len (nm_clone->pending_frames) = 0;
798
799               /* fork nodes */
800               nm_clone->nodes = 0;
801
802               /* Allocate all nodes in single block for speed */
803               n = clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*n));
804
805               for (j = 0; j < vec_len (nm->nodes); j++)
806                 {
807                   clib_memcpy (n, nm->nodes[j], sizeof (*n));
808                   /* none of the copied nodes have enqueue rights given out */
809                   n->owner_node_index = VLIB_INVALID_NODE_INDEX;
810                   clib_memset (&n->stats_total, 0, sizeof (n->stats_total));
811                   clib_memset (&n->stats_last_clear, 0,
812                                sizeof (n->stats_last_clear));
813                   vec_add1 (nm_clone->nodes, n);
814                   n++;
815                 }
816               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
817                 vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
818                                  CLIB_CACHE_LINE_BYTES);
819               vec_foreach (rt,
820                            nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
821               {
822                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
823                 rt->thread_index = vm_clone->thread_index;
824                 /* copy initial runtime_data from node */
825                 if (n->runtime_data && n->runtime_data_bytes > 0)
826                   clib_memcpy (rt->runtime_data, n->runtime_data,
827                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
828                                          n->runtime_data_bytes));
829               }
830
831               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
832                 vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
833                                  CLIB_CACHE_LINE_BYTES);
834               vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
835               {
836                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
837                 rt->thread_index = vm_clone->thread_index;
838                 /* copy initial runtime_data from node */
839                 if (n->runtime_data && n->runtime_data_bytes > 0)
840                   clib_memcpy (rt->runtime_data, n->runtime_data,
841                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
842                                          n->runtime_data_bytes));
843               }
844
845               nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT] =
846                 vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT],
847                                  CLIB_CACHE_LINE_BYTES);
848               vec_foreach (rt,
849                            nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT])
850               {
851                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
852                 rt->thread_index = vm_clone->thread_index;
853                 /* copy initial runtime_data from node */
854                 if (n->runtime_data && n->runtime_data_bytes > 0)
855                   clib_memcpy (rt->runtime_data, n->runtime_data,
856                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
857                                          n->runtime_data_bytes));
858               }
859
860               nm_clone->processes = vec_dup_aligned (nm->processes,
861                                                      CLIB_CACHE_LINE_BYTES);
862
863               /* Create per-thread frame freelist */
864               nm_clone->frame_sizes = vec_new (vlib_frame_size_t, 1);
865 #ifdef VLIB_SUPPORTS_ARBITRARY_SCALAR_SIZES
866               nm_clone->frame_size_hash = hash_create (0, sizeof (uword));
867 #endif
868               nm_clone->node_by_error = nm->node_by_error;
869
870               /* Packet trace buffers are guaranteed to be empty, nothing to do here */
871
872               clib_mem_set_heap (oldheap);
873               vec_add1_aligned (vlib_mains, vm_clone, CLIB_CACHE_LINE_BYTES);
874
875               /* Switch to the stats segment ... */
876               void *oldheap = vlib_stats_push_heap (0);
877               vm_clone->error_main.counters = vec_dup_aligned
878                 (vlib_mains[0]->error_main.counters, CLIB_CACHE_LINE_BYTES);
879               vlib_stats_pop_heap2 (vm_clone->error_main.counters,
880                                     worker_thread_index, oldheap, 1);
881
882               vm_clone->error_main.counters_last_clear = vec_dup_aligned
883                 (vlib_mains[0]->error_main.counters_last_clear,
884                  CLIB_CACHE_LINE_BYTES);
885
886               worker_thread_index++;
887             }
888         }
889     }
890   else
891     {
892       /* only have non-data-structure copy threads to create... */
893       for (i = 0; i < vec_len (tm->registrations); i++)
894         {
895           tr = tm->registrations[i];
896
897           for (j = 0; j < tr->count; j++)
898             {
899               vec_add2 (vlib_worker_threads, w, 1);
900               if (tr->mheap_size)
901                 {
902 #if USE_DLMALLOC == 0
903                   w->thread_mheap =
904                     mheap_alloc (0 /* use VM */ , tr->mheap_size);
905 #else
906                   w->thread_mheap =
907                     create_mspace (tr->mheap_size, 0 /* locked */ );
908 #endif
909                 }
910               else
911                 w->thread_mheap = main_heap;
912               w->thread_stack =
913                 vlib_thread_stack_init (w - vlib_worker_threads);
914               w->thread_function = tr->function;
915               w->thread_function_arg = w;
916               w->instance_id = j;
917               w->elog_track.name =
918                 (char *) format (0, "%s %d", tr->name, j + 1);
919               w->registration = tr;
920               vec_add1 (w->elog_track.name, 0);
921               elog_track_register (&vm->elog_main, &w->elog_track);
922             }
923         }
924     }
925
926   worker_thread_index = 1;
927
928   for (i = 0; i < vec_len (tm->registrations); i++)
929     {
930       clib_error_t *err;
931       int j;
932
933       tr = tm->registrations[i];
934
935       if (tr->use_pthreads || tm->use_pthreads)
936         {
937           for (j = 0; j < tr->count; j++)
938             {
939               w = vlib_worker_threads + worker_thread_index++;
940               err = vlib_launch_thread_int (vlib_worker_thread_bootstrap_fn,
941                                             w, 0);
942               if (err)
943                 clib_error_report (err);
944             }
945         }
946       else
947         {
948           uword c;
949           /* *INDENT-OFF* */
950           clib_bitmap_foreach (c, tr->coremask, ({
951             w = vlib_worker_threads + worker_thread_index++;
952             err = vlib_launch_thread_int (vlib_worker_thread_bootstrap_fn,
953                                           w, c);
954             if (err)
955               clib_error_report (err);
956           }));
957           /* *INDENT-ON* */
958         }
959     }
960   vlib_worker_thread_barrier_sync (vm);
961   vlib_worker_thread_barrier_release (vm);
962   return 0;
963 }
964
965 VLIB_MAIN_LOOP_ENTER_FUNCTION (start_workers);
966
967
968 static inline void
969 worker_thread_node_runtime_update_internal (void)
970 {
971   int i, j;
972   vlib_main_t *vm;
973   vlib_node_main_t *nm, *nm_clone;
974   vlib_main_t *vm_clone;
975   vlib_node_runtime_t *rt;
976   never_inline void
977     vlib_node_runtime_sync_stats (vlib_main_t * vm,
978                                   vlib_node_runtime_t * r,
979                                   uword n_calls,
980                                   uword n_vectors, uword n_clocks);
981
982   ASSERT (vlib_get_thread_index () == 0);
983
984   vm = vlib_mains[0];
985   nm = &vm->node_main;
986
987   ASSERT (*vlib_worker_threads->wait_at_barrier == 1);
988
989   /*
990    * Scrape all runtime stats, so we don't lose node runtime(s) with
991    * pending counts, or throw away worker / io thread counts.
992    */
993   for (j = 0; j < vec_len (nm->nodes); j++)
994     {
995       vlib_node_t *n;
996       n = nm->nodes[j];
997       vlib_node_sync_stats (vm, n);
998     }
999
1000   for (i = 1; i < vec_len (vlib_mains); i++)
1001     {
1002       vlib_node_t *n;
1003
1004       vm_clone = vlib_mains[i];
1005       nm_clone = &vm_clone->node_main;
1006
1007       for (j = 0; j < vec_len (nm_clone->nodes); j++)
1008         {
1009           n = nm_clone->nodes[j];
1010
1011           rt = vlib_node_get_runtime (vm_clone, n->index);
1012           vlib_node_runtime_sync_stats (vm_clone, rt, 0, 0, 0);
1013         }
1014     }
1015
1016   /* Per-worker clone rebuilds are now done on each thread */
1017 }
1018
1019
1020 void
1021 vlib_worker_thread_node_refork (void)
1022 {
1023   vlib_main_t *vm, *vm_clone;
1024   vlib_node_main_t *nm, *nm_clone;
1025   vlib_node_t **old_nodes_clone;
1026   vlib_node_runtime_t *rt, *old_rt;
1027
1028   vlib_node_t *new_n_clone;
1029
1030   int j;
1031
1032   vm = vlib_mains[0];
1033   nm = &vm->node_main;
1034   vm_clone = vlib_get_main ();
1035   nm_clone = &vm_clone->node_main;
1036
1037   /* Re-clone error heap */
1038   u64 *old_counters = vm_clone->error_main.counters;
1039   u64 *old_counters_all_clear = vm_clone->error_main.counters_last_clear;
1040
1041   clib_memcpy_fast (&vm_clone->error_main, &vm->error_main,
1042                     sizeof (vm->error_main));
1043   j = vec_len (vm->error_main.counters) - 1;
1044
1045   /* Switch to the stats segment ... */
1046   void *oldheap = vlib_stats_push_heap (0);
1047   vec_validate_aligned (old_counters, j, CLIB_CACHE_LINE_BYTES);
1048   vm_clone->error_main.counters = old_counters;
1049   vlib_stats_pop_heap2 (vm_clone->error_main.counters, vm_clone->thread_index,
1050                         oldheap, 0);
1051
1052   vec_validate_aligned (old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES);
1053   vm_clone->error_main.counters_last_clear = old_counters_all_clear;
1054
1055   nm_clone = &vm_clone->node_main;
1056   vec_free (nm_clone->next_frames);
1057   nm_clone->next_frames = vec_dup_aligned (nm->next_frames,
1058                                            CLIB_CACHE_LINE_BYTES);
1059
1060   for (j = 0; j < vec_len (nm_clone->next_frames); j++)
1061     {
1062       vlib_next_frame_t *nf = &nm_clone->next_frames[j];
1063       u32 save_node_runtime_index;
1064       u32 save_flags;
1065
1066       save_node_runtime_index = nf->node_runtime_index;
1067       save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
1068       vlib_next_frame_init (nf);
1069       nf->node_runtime_index = save_node_runtime_index;
1070       nf->flags = save_flags;
1071     }
1072
1073   old_nodes_clone = nm_clone->nodes;
1074   nm_clone->nodes = 0;
1075
1076   /* re-fork nodes */
1077
1078   /* Allocate all nodes in single block for speed */
1079   new_n_clone =
1080     clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*new_n_clone));
1081   for (j = 0; j < vec_len (nm->nodes); j++)
1082     {
1083       vlib_node_t *new_n = nm->nodes[j];
1084
1085       clib_memcpy_fast (new_n_clone, new_n, sizeof (*new_n));
1086       /* none of the copied nodes have enqueue rights given out */
1087       new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX;
1088
1089       if (j >= vec_len (old_nodes_clone))
1090         {
1091           /* new node, set to zero */
1092           clib_memset (&new_n_clone->stats_total, 0,
1093                        sizeof (new_n_clone->stats_total));
1094           clib_memset (&new_n_clone->stats_last_clear, 0,
1095                        sizeof (new_n_clone->stats_last_clear));
1096         }
1097       else
1098         {
1099           vlib_node_t *old_n_clone = old_nodes_clone[j];
1100           /* Copy stats if the old data is valid */
1101           clib_memcpy_fast (&new_n_clone->stats_total,
1102                             &old_n_clone->stats_total,
1103                             sizeof (new_n_clone->stats_total));
1104           clib_memcpy_fast (&new_n_clone->stats_last_clear,
1105                             &old_n_clone->stats_last_clear,
1106                             sizeof (new_n_clone->stats_last_clear));
1107
1108           /* keep previous node state */
1109           new_n_clone->state = old_n_clone->state;
1110         }
1111       vec_add1 (nm_clone->nodes, new_n_clone);
1112       new_n_clone++;
1113     }
1114   /* Free the old node clones */
1115   clib_mem_free (old_nodes_clone[0]);
1116
1117   vec_free (old_nodes_clone);
1118
1119
1120   /* re-clone internal nodes */
1121   old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL];
1122   nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
1123     vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
1124                      CLIB_CACHE_LINE_BYTES);
1125
1126   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
1127   {
1128     vlib_node_t *n = vlib_get_node (vm, rt->node_index);
1129     rt->thread_index = vm_clone->thread_index;
1130     /* copy runtime_data, will be overwritten later for existing rt */
1131     if (n->runtime_data && n->runtime_data_bytes > 0)
1132       clib_memcpy_fast (rt->runtime_data, n->runtime_data,
1133                         clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
1134                                   n->runtime_data_bytes));
1135   }
1136
1137   for (j = 0; j < vec_len (old_rt); j++)
1138     {
1139       rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
1140       rt->state = old_rt[j].state;
1141       clib_memcpy_fast (rt->runtime_data, old_rt[j].runtime_data,
1142                         VLIB_NODE_RUNTIME_DATA_SIZE);
1143     }
1144
1145   vec_free (old_rt);
1146
1147   /* re-clone input nodes */
1148   old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT];
1149   nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
1150     vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
1151                      CLIB_CACHE_LINE_BYTES);
1152
1153   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
1154   {
1155     vlib_node_t *n = vlib_get_node (vm, rt->node_index);
1156     rt->thread_index = vm_clone->thread_index;
1157     /* copy runtime_data, will be overwritten later for existing rt */
1158     if (n->runtime_data && n->runtime_data_bytes > 0)
1159       clib_memcpy_fast (rt->runtime_data, n->runtime_data,
1160                         clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
1161                                   n->runtime_data_bytes));
1162   }
1163
1164   for (j = 0; j < vec_len (old_rt); j++)
1165     {
1166       rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
1167       rt->state = old_rt[j].state;
1168       clib_memcpy_fast (rt->runtime_data, old_rt[j].runtime_data,
1169                         VLIB_NODE_RUNTIME_DATA_SIZE);
1170     }
1171
1172   vec_free (old_rt);
1173
1174   /* re-clone pre-input nodes */
1175   old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT];
1176   nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT] =
1177     vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT],
1178                      CLIB_CACHE_LINE_BYTES);
1179
1180   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT])
1181   {
1182     vlib_node_t *n = vlib_get_node (vm, rt->node_index);
1183     rt->thread_index = vm_clone->thread_index;
1184     /* copy runtime_data, will be overwritten later for existing rt */
1185     if (n->runtime_data && n->runtime_data_bytes > 0)
1186       clib_memcpy_fast (rt->runtime_data, n->runtime_data,
1187                         clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
1188                                   n->runtime_data_bytes));
1189   }
1190
1191   for (j = 0; j < vec_len (old_rt); j++)
1192     {
1193       rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
1194       rt->state = old_rt[j].state;
1195       clib_memcpy_fast (rt->runtime_data, old_rt[j].runtime_data,
1196                         VLIB_NODE_RUNTIME_DATA_SIZE);
1197     }
1198
1199   vec_free (old_rt);
1200
1201   nm_clone->processes = vec_dup_aligned (nm->processes,
1202                                          CLIB_CACHE_LINE_BYTES);
1203   nm_clone->node_by_error = nm->node_by_error;
1204 }
1205
1206 void
1207 vlib_worker_thread_node_runtime_update (void)
1208 {
1209   /*
1210    * Make a note that we need to do a node runtime update
1211    * prior to releasing the barrier.
1212    */
1213   vlib_global_main.need_vlib_worker_thread_node_runtime_update = 1;
1214 }
1215
1216 u32
1217 unformat_sched_policy (unformat_input_t * input, va_list * args)
1218 {
1219   u32 *r = va_arg (*args, u32 *);
1220
1221   if (0);
1222 #define _(v,f,s) else if (unformat (input, s)) *r = SCHED_POLICY_##f;
1223   foreach_sched_policy
1224 #undef _
1225     else
1226     return 0;
1227   return 1;
1228 }
1229
1230 static clib_error_t *
1231 cpu_config (vlib_main_t * vm, unformat_input_t * input)
1232 {
1233   vlib_thread_registration_t *tr;
1234   uword *p;
1235   vlib_thread_main_t *tm = &vlib_thread_main;
1236   u8 *name;
1237   uword *bitmap;
1238   u32 count;
1239
1240   tm->thread_registrations_by_name = hash_create_string (0, sizeof (uword));
1241
1242   tm->n_thread_stacks = 1;      /* account for main thread */
1243   tm->sched_policy = ~0;
1244   tm->sched_priority = ~0;
1245   tm->main_lcore = ~0;
1246
1247   tr = tm->next;
1248
1249   while (tr)
1250     {
1251       hash_set_mem (tm->thread_registrations_by_name, tr->name, (uword) tr);
1252       tr = tr->next;
1253     }
1254
1255   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1256     {
1257       if (unformat (input, "use-pthreads"))
1258         tm->use_pthreads = 1;
1259       else if (unformat (input, "thread-prefix %v", &tm->thread_prefix))
1260         ;
1261       else if (unformat (input, "main-core %u", &tm->main_lcore))
1262         ;
1263       else if (unformat (input, "skip-cores %u", &tm->skip_cores))
1264         ;
1265       else if (unformat (input, "numa-heap-size %U",
1266                          unformat_memory_size, &tm->numa_heap_size))
1267         ;
1268       else if (unformat (input, "coremask-%s %U", &name,
1269                          unformat_bitmap_mask, &bitmap) ||
1270                unformat (input, "corelist-%s %U", &name,
1271                          unformat_bitmap_list, &bitmap))
1272         {
1273           p = hash_get_mem (tm->thread_registrations_by_name, name);
1274           if (p == 0)
1275             return clib_error_return (0, "no such thread type '%s'", name);
1276
1277           tr = (vlib_thread_registration_t *) p[0];
1278
1279           if (tr->use_pthreads)
1280             return clib_error_return (0,
1281                                       "corelist cannot be set for '%s' threads",
1282                                       name);
1283
1284           tr->coremask = bitmap;
1285           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1286         }
1287       else
1288         if (unformat
1289             (input, "scheduler-policy %U", unformat_sched_policy,
1290              &tm->sched_policy))
1291         ;
1292       else if (unformat (input, "scheduler-priority %u", &tm->sched_priority))
1293         ;
1294       else if (unformat (input, "%s %u", &name, &count))
1295         {
1296           p = hash_get_mem (tm->thread_registrations_by_name, name);
1297           if (p == 0)
1298             return clib_error_return (0, "no such thread type 3 '%s'", name);
1299
1300           tr = (vlib_thread_registration_t *) p[0];
1301           if (tr->fixed_count)
1302             return clib_error_return
1303               (0, "number of %s threads not configurable", tr->name);
1304           tr->count = count;
1305         }
1306       else
1307         break;
1308     }
1309
1310   if (tm->sched_priority != ~0)
1311     {
1312       if (tm->sched_policy == SCHED_FIFO || tm->sched_policy == SCHED_RR)
1313         {
1314           u32 prio_max = sched_get_priority_max (tm->sched_policy);
1315           u32 prio_min = sched_get_priority_min (tm->sched_policy);
1316           if (tm->sched_priority > prio_max)
1317             tm->sched_priority = prio_max;
1318           if (tm->sched_priority < prio_min)
1319             tm->sched_priority = prio_min;
1320         }
1321       else
1322         {
1323           return clib_error_return
1324             (0,
1325              "scheduling priority (%d) is not allowed for `normal` scheduling policy",
1326              tm->sched_priority);
1327         }
1328     }
1329   tr = tm->next;
1330
1331   if (!tm->thread_prefix)
1332     tm->thread_prefix = format (0, "vpp");
1333
1334   while (tr)
1335     {
1336       tm->n_thread_stacks += tr->count;
1337       tm->n_pthreads += tr->count * tr->use_pthreads;
1338       tm->n_threads += tr->count * (tr->use_pthreads == 0);
1339       tr = tr->next;
1340     }
1341
1342   return 0;
1343 }
1344
1345 VLIB_EARLY_CONFIG_FUNCTION (cpu_config, "cpu");
1346
1347 void vnet_main_fixup (vlib_fork_fixup_t which) __attribute__ ((weak));
1348 void
1349 vnet_main_fixup (vlib_fork_fixup_t which)
1350 {
1351 }
1352
1353 void
1354 vlib_worker_thread_fork_fixup (vlib_fork_fixup_t which)
1355 {
1356   vlib_main_t *vm = vlib_get_main ();
1357
1358   if (vlib_mains == 0)
1359     return;
1360
1361   ASSERT (vlib_get_thread_index () == 0);
1362   vlib_worker_thread_barrier_sync (vm);
1363
1364   switch (which)
1365     {
1366     case VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX:
1367       vnet_main_fixup (VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX);
1368       break;
1369
1370     default:
1371       ASSERT (0);
1372     }
1373   vlib_worker_thread_barrier_release (vm);
1374 }
1375
1376   /*
1377    * Enforce minimum open time to minimize packet loss due to Rx overflow,
1378    * based on a test based heuristic that barrier should be open for at least
1379    * 3 time as long as it is closed (with an upper bound of 1ms because by that
1380    *  point it is probably too late to make a difference)
1381    */
1382
1383 #ifndef BARRIER_MINIMUM_OPEN_LIMIT
1384 #define BARRIER_MINIMUM_OPEN_LIMIT 0.001
1385 #endif
1386
1387 #ifndef BARRIER_MINIMUM_OPEN_FACTOR
1388 #define BARRIER_MINIMUM_OPEN_FACTOR 3
1389 #endif
1390
1391 void
1392 vlib_worker_thread_initial_barrier_sync_and_release (vlib_main_t * vm)
1393 {
1394   f64 deadline;
1395   f64 now = vlib_time_now (vm);
1396   u32 count = vec_len (vlib_mains) - 1;
1397
1398   /* No worker threads? */
1399   if (count == 0)
1400     return;
1401
1402   deadline = now + BARRIER_SYNC_TIMEOUT;
1403   *vlib_worker_threads->wait_at_barrier = 1;
1404   while (*vlib_worker_threads->workers_at_barrier != count)
1405     {
1406       if ((now = vlib_time_now (vm)) > deadline)
1407         {
1408           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1409           os_panic ();
1410         }
1411       CLIB_PAUSE ();
1412     }
1413   *vlib_worker_threads->wait_at_barrier = 0;
1414 }
1415
1416 void
1417 vlib_worker_thread_barrier_sync_int (vlib_main_t * vm, const char *func_name)
1418 {
1419   f64 deadline;
1420   f64 now;
1421   f64 t_entry;
1422   f64 t_open;
1423   f64 t_closed;
1424   f64 max_vector_rate;
1425   u32 count;
1426   int i;
1427
1428   if (vec_len (vlib_mains) < 2)
1429     return;
1430
1431   ASSERT (vlib_get_thread_index () == 0);
1432
1433   vlib_worker_threads[0].barrier_caller = func_name;
1434   count = vec_len (vlib_mains) - 1;
1435
1436   /* Record entry relative to last close */
1437   now = vlib_time_now (vm);
1438   t_entry = now - vm->barrier_epoch;
1439
1440   /* Tolerate recursive calls */
1441   if (++vlib_worker_threads[0].recursion_level > 1)
1442     {
1443       barrier_trace_sync_rec (t_entry);
1444       return;
1445     }
1446
1447   /*
1448    * Need data to decide if we're working hard enough to honor
1449    * the barrier hold-down timer.
1450    */
1451   max_vector_rate = 0.0;
1452   for (i = 1; i < vec_len (vlib_mains); i++)
1453     max_vector_rate =
1454       clib_max (max_vector_rate,
1455                 (f64) vlib_last_vectors_per_main_loop (vlib_mains[i]));
1456
1457   vlib_worker_threads[0].barrier_sync_count++;
1458
1459   /* Enforce minimum barrier open time to minimize packet loss */
1460   ASSERT (vm->barrier_no_close_before <= (now + BARRIER_MINIMUM_OPEN_LIMIT));
1461
1462   /*
1463    * If any worker thread seems busy, which we define
1464    * as a vector rate above 10, we enforce the barrier hold-down timer
1465    */
1466   if (max_vector_rate > 10.0)
1467     {
1468       while (1)
1469         {
1470           now = vlib_time_now (vm);
1471           /* Barrier hold-down timer expired? */
1472           if (now >= vm->barrier_no_close_before)
1473             break;
1474           if ((vm->barrier_no_close_before - now)
1475               > (2.0 * BARRIER_MINIMUM_OPEN_LIMIT))
1476             {
1477               clib_warning
1478                 ("clock change: would have waited for %.4f seconds",
1479                  (vm->barrier_no_close_before - now));
1480               break;
1481             }
1482         }
1483     }
1484   /* Record time of closure */
1485   t_open = now - vm->barrier_epoch;
1486   vm->barrier_epoch = now;
1487
1488   deadline = now + BARRIER_SYNC_TIMEOUT;
1489
1490   *vlib_worker_threads->wait_at_barrier = 1;
1491   while (*vlib_worker_threads->workers_at_barrier != count)
1492     {
1493       if ((now = vlib_time_now (vm)) > deadline)
1494         {
1495           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1496           os_panic ();
1497         }
1498     }
1499
1500   t_closed = now - vm->barrier_epoch;
1501
1502   barrier_trace_sync (t_entry, t_open, t_closed);
1503
1504 }
1505
1506 void
1507 vlib_worker_thread_barrier_release (vlib_main_t * vm)
1508 {
1509   f64 deadline;
1510   f64 now;
1511   f64 minimum_open;
1512   f64 t_entry;
1513   f64 t_closed_total;
1514   f64 t_update_main = 0.0;
1515   int refork_needed = 0;
1516
1517   if (vec_len (vlib_mains) < 2)
1518     return;
1519
1520   ASSERT (vlib_get_thread_index () == 0);
1521
1522
1523   now = vlib_time_now (vm);
1524   t_entry = now - vm->barrier_epoch;
1525
1526   if (--vlib_worker_threads[0].recursion_level > 0)
1527     {
1528       barrier_trace_release_rec (t_entry);
1529       return;
1530     }
1531
1532   /* Update (all) node runtimes before releasing the barrier, if needed */
1533   if (vm->need_vlib_worker_thread_node_runtime_update)
1534     {
1535       /*
1536        * Lock stat segment here, so we's safe when
1537        * rebuilding the stat segment node clones from the
1538        * stat thread...
1539        */
1540       vlib_stat_segment_lock ();
1541
1542       /* Do stats elements on main thread */
1543       worker_thread_node_runtime_update_internal ();
1544       vm->need_vlib_worker_thread_node_runtime_update = 0;
1545
1546       /* Do per thread rebuilds in parallel */
1547       refork_needed = 1;
1548       clib_atomic_fetch_add (vlib_worker_threads->node_reforks_required,
1549                              (vec_len (vlib_mains) - 1));
1550       now = vlib_time_now (vm);
1551       t_update_main = now - vm->barrier_epoch;
1552     }
1553
1554   deadline = now + BARRIER_SYNC_TIMEOUT;
1555
1556   /*
1557    * Note when we let go of the barrier.
1558    * Workers can use this to derive a reasonably accurate
1559    * time offset. See vlib_time_now(...)
1560    */
1561   vm->time_last_barrier_release = vlib_time_now (vm);
1562   CLIB_MEMORY_STORE_BARRIER ();
1563
1564   *vlib_worker_threads->wait_at_barrier = 0;
1565
1566   while (*vlib_worker_threads->workers_at_barrier > 0)
1567     {
1568       if ((now = vlib_time_now (vm)) > deadline)
1569         {
1570           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1571           os_panic ();
1572         }
1573     }
1574
1575   /* Wait for reforks before continuing */
1576   if (refork_needed)
1577     {
1578       now = vlib_time_now (vm);
1579
1580       deadline = now + BARRIER_SYNC_TIMEOUT;
1581
1582       while (*vlib_worker_threads->node_reforks_required > 0)
1583         {
1584           if ((now = vlib_time_now (vm)) > deadline)
1585             {
1586               fformat (stderr, "%s: worker thread refork deadlock\n",
1587                        __FUNCTION__);
1588               os_panic ();
1589             }
1590         }
1591       vlib_stat_segment_unlock ();
1592     }
1593
1594   t_closed_total = now - vm->barrier_epoch;
1595
1596   minimum_open = t_closed_total * BARRIER_MINIMUM_OPEN_FACTOR;
1597
1598   if (minimum_open > BARRIER_MINIMUM_OPEN_LIMIT)
1599     {
1600       minimum_open = BARRIER_MINIMUM_OPEN_LIMIT;
1601     }
1602
1603   vm->barrier_no_close_before = now + minimum_open;
1604
1605   /* Record barrier epoch (used to enforce minimum open time) */
1606   vm->barrier_epoch = now;
1607
1608   barrier_trace_release (t_entry, t_closed_total, t_update_main);
1609
1610 }
1611
1612 /*
1613  * Check the frame queue to see if any frames are available.
1614  * If so, pull the packets off the frames and put them to
1615  * the handoff node.
1616  */
1617 int
1618 vlib_frame_queue_dequeue (vlib_main_t * vm, vlib_frame_queue_main_t * fqm)
1619 {
1620   u32 thread_id = vm->thread_index;
1621   vlib_frame_queue_t *fq = fqm->vlib_frame_queues[thread_id];
1622   vlib_frame_queue_elt_t *elt;
1623   u32 *from, *to;
1624   vlib_frame_t *f;
1625   int msg_type;
1626   int processed = 0;
1627   u32 n_left_to_node;
1628   u32 vectors = 0;
1629
1630   ASSERT (fq);
1631   ASSERT (vm == vlib_mains[thread_id]);
1632
1633   if (PREDICT_FALSE (fqm->node_index == ~0))
1634     return 0;
1635   /*
1636    * Gather trace data for frame queues
1637    */
1638   if (PREDICT_FALSE (fq->trace))
1639     {
1640       frame_queue_trace_t *fqt;
1641       frame_queue_nelt_counter_t *fqh;
1642       u32 elix;
1643
1644       fqt = &fqm->frame_queue_traces[thread_id];
1645
1646       fqt->nelts = fq->nelts;
1647       fqt->head = fq->head;
1648       fqt->head_hint = fq->head_hint;
1649       fqt->tail = fq->tail;
1650       fqt->threshold = fq->vector_threshold;
1651       fqt->n_in_use = fqt->tail - fqt->head;
1652       if (fqt->n_in_use >= fqt->nelts)
1653         {
1654           // if beyond max then use max
1655           fqt->n_in_use = fqt->nelts - 1;
1656         }
1657
1658       /* Record the number of elements in use in the histogram */
1659       fqh = &fqm->frame_queue_histogram[thread_id];
1660       fqh->count[fqt->n_in_use]++;
1661
1662       /* Record a snapshot of the elements in use */
1663       for (elix = 0; elix < fqt->nelts; elix++)
1664         {
1665           elt = fq->elts + ((fq->head + 1 + elix) & (fq->nelts - 1));
1666           if (1 || elt->valid)
1667             {
1668               fqt->n_vectors[elix] = elt->n_vectors;
1669             }
1670         }
1671       fqt->written = 1;
1672     }
1673
1674   while (1)
1675     {
1676       vlib_buffer_t *b;
1677       if (fq->head == fq->tail)
1678         {
1679           fq->head_hint = fq->head;
1680           return processed;
1681         }
1682
1683       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
1684
1685       if (!elt->valid)
1686         {
1687           fq->head_hint = fq->head;
1688           return processed;
1689         }
1690
1691       from = elt->buffer_index;
1692       msg_type = elt->msg_type;
1693
1694       ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME);
1695       ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE);
1696
1697       f = vlib_get_frame_to_node (vm, fqm->node_index);
1698
1699       /* If the first vector is traced, set the frame trace flag */
1700       b = vlib_get_buffer (vm, from[0]);
1701       if (b->flags & VLIB_BUFFER_IS_TRACED)
1702         f->frame_flags |= VLIB_NODE_FLAG_TRACE;
1703
1704       to = vlib_frame_vector_args (f);
1705
1706       n_left_to_node = elt->n_vectors;
1707
1708       while (n_left_to_node >= 4)
1709         {
1710           to[0] = from[0];
1711           to[1] = from[1];
1712           to[2] = from[2];
1713           to[3] = from[3];
1714           to += 4;
1715           from += 4;
1716           n_left_to_node -= 4;
1717         }
1718
1719       while (n_left_to_node > 0)
1720         {
1721           to[0] = from[0];
1722           to++;
1723           from++;
1724           n_left_to_node--;
1725         }
1726
1727       vectors += elt->n_vectors;
1728       f->n_vectors = elt->n_vectors;
1729       vlib_put_frame_to_node (vm, fqm->node_index, f);
1730
1731       elt->valid = 0;
1732       elt->n_vectors = 0;
1733       elt->msg_type = 0xfefefefe;
1734       CLIB_MEMORY_BARRIER ();
1735       fq->head++;
1736       processed++;
1737
1738       /*
1739        * Limit the number of packets pushed into the graph
1740        */
1741       if (vectors >= fq->vector_threshold)
1742         {
1743           fq->head_hint = fq->head;
1744           return processed;
1745         }
1746     }
1747   ASSERT (0);
1748   return processed;
1749 }
1750
1751 void
1752 vlib_worker_thread_fn (void *arg)
1753 {
1754   vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg;
1755   vlib_thread_main_t *tm = vlib_get_thread_main ();
1756   vlib_main_t *vm = vlib_get_main ();
1757   clib_error_t *e;
1758
1759   ASSERT (vm->thread_index == vlib_get_thread_index ());
1760
1761   vlib_worker_thread_init (w);
1762   clib_time_init (&vm->clib_time);
1763   clib_mem_set_heap (w->thread_mheap);
1764
1765   e = vlib_call_init_exit_functions_no_sort
1766     (vm, &vm->worker_init_function_registrations, 1 /* call_once */ );
1767   if (e)
1768     clib_error_report (e);
1769
1770   /* Wait until the dpdk init sequence is complete */
1771   while (tm->extern_thread_mgmt && tm->worker_thread_release == 0)
1772     vlib_worker_thread_barrier_check ();
1773
1774   vlib_worker_loop (vm);
1775 }
1776
1777 /* *INDENT-OFF* */
1778 VLIB_REGISTER_THREAD (worker_thread_reg, static) = {
1779   .name = "workers",
1780   .short_name = "wk",
1781   .function = vlib_worker_thread_fn,
1782 };
1783 /* *INDENT-ON* */
1784
1785 u32
1786 vlib_frame_queue_main_init (u32 node_index, u32 frame_queue_nelts)
1787 {
1788   vlib_thread_main_t *tm = vlib_get_thread_main ();
1789   vlib_frame_queue_main_t *fqm;
1790   vlib_frame_queue_t *fq;
1791   int i;
1792
1793   if (frame_queue_nelts == 0)
1794     frame_queue_nelts = FRAME_QUEUE_MAX_NELTS;
1795
1796   ASSERT (frame_queue_nelts >= 8);
1797
1798   vec_add2 (tm->frame_queue_mains, fqm, 1);
1799
1800   fqm->node_index = node_index;
1801   fqm->frame_queue_nelts = frame_queue_nelts;
1802   fqm->queue_hi_thresh = frame_queue_nelts - 2;
1803
1804   vec_validate (fqm->vlib_frame_queues, tm->n_vlib_mains - 1);
1805   vec_validate (fqm->per_thread_data, tm->n_vlib_mains - 1);
1806   _vec_len (fqm->vlib_frame_queues) = 0;
1807   for (i = 0; i < tm->n_vlib_mains; i++)
1808     {
1809       vlib_frame_queue_per_thread_data_t *ptd;
1810       fq = vlib_frame_queue_alloc (frame_queue_nelts);
1811       vec_add1 (fqm->vlib_frame_queues, fq);
1812
1813       ptd = vec_elt_at_index (fqm->per_thread_data, i);
1814       vec_validate (ptd->handoff_queue_elt_by_thread_index,
1815                     tm->n_vlib_mains - 1);
1816       vec_validate_init_empty (ptd->congested_handoff_queue_by_thread_index,
1817                                tm->n_vlib_mains - 1,
1818                                (vlib_frame_queue_t *) (~0));
1819     }
1820
1821   return (fqm - tm->frame_queue_mains);
1822 }
1823
1824 int
1825 vlib_thread_cb_register (struct vlib_main_t *vm, vlib_thread_callbacks_t * cb)
1826 {
1827   vlib_thread_main_t *tm = vlib_get_thread_main ();
1828
1829   if (tm->extern_thread_mgmt)
1830     return -1;
1831
1832   tm->cb.vlib_launch_thread_cb = cb->vlib_launch_thread_cb;
1833   tm->extern_thread_mgmt = 1;
1834   return 0;
1835 }
1836
1837 void
1838 vlib_process_signal_event_mt_helper (vlib_process_signal_event_mt_args_t *
1839                                      args)
1840 {
1841   ASSERT (vlib_get_thread_index () == 0);
1842   vlib_process_signal_event (vlib_get_main (), args->node_index,
1843                              args->type_opaque, args->data);
1844 }
1845
1846 void *rpc_call_main_thread_cb_fn;
1847
1848 void
1849 vlib_rpc_call_main_thread (void *callback, u8 * args, u32 arg_size)
1850 {
1851   if (rpc_call_main_thread_cb_fn)
1852     {
1853       void (*fp) (void *, u8 *, u32) = rpc_call_main_thread_cb_fn;
1854       (*fp) (callback, args, arg_size);
1855     }
1856   else
1857     clib_warning ("BUG: rpc_call_main_thread_cb_fn NULL!");
1858 }
1859
1860 clib_error_t *
1861 threads_init (vlib_main_t * vm)
1862 {
1863   return 0;
1864 }
1865
1866 VLIB_INIT_FUNCTION (threads_init);
1867
1868
1869 static clib_error_t *
1870 show_clock_command_fn (vlib_main_t * vm,
1871                        unformat_input_t * input, vlib_cli_command_t * cmd)
1872 {
1873   int i;
1874   int verbose = 0;
1875
1876   (void) unformat (input, "verbose %=", &verbose, 1);
1877
1878   vlib_cli_output (vm, "%U", format_clib_time, &vm->clib_time, verbose);
1879
1880   if (vec_len (vlib_mains) == 1)
1881     return 0;
1882
1883   vlib_cli_output (vm, "Time last barrier release %.9f",
1884                    vm->time_last_barrier_release);
1885
1886   for (i = 1; i < vec_len (vlib_mains); i++)
1887     {
1888       if (vlib_mains[i] == 0)
1889         continue;
1890
1891       vlib_cli_output (vm, "%d: %U", i, format_clib_time,
1892                        &vlib_mains[i]->clib_time, verbose);
1893
1894       vlib_cli_output (vm, "Thread %d offset %.9f error %.9f", i,
1895                        vlib_mains[i]->time_offset,
1896                        vm->time_last_barrier_release -
1897                        vlib_mains[i]->time_last_barrier_release);
1898     }
1899   return 0;
1900 }
1901
1902 /* *INDENT-OFF* */
1903 VLIB_CLI_COMMAND (f_command, static) =
1904 {
1905   .path = "show clock",
1906   .short_help = "show clock",
1907   .function = show_clock_command_fn,
1908 };
1909 /* *INDENT-ON* */
1910
1911 /*
1912  * fd.io coding-style-patch-verification: ON
1913  *
1914  * Local Variables:
1915  * eval: (c-set-style "gnu")
1916  * End:
1917  */