sort worker-thread init functions in advance
[vpp.git] / src / vlib / threads.c
1 /*
2  * Copyright (c) 2015 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define _GNU_SOURCE
16
17 #include <signal.h>
18 #include <math.h>
19 #include <vppinfra/format.h>
20 #include <vppinfra/linux/sysfs.h>
21 #include <vlib/vlib.h>
22
23 #include <vlib/threads.h>
24 #include <vlib/unix/cj.h>
25
26 #include <vlib/stat_weak_inlines.h>
27
28 DECLARE_CJ_GLOBAL_LOG;
29
30 #define FRAME_QUEUE_NELTS 64
31
32 u32
33 vl (void *p)
34 {
35   return vec_len (p);
36 }
37
38 vlib_worker_thread_t *vlib_worker_threads;
39 vlib_thread_main_t vlib_thread_main;
40
41 /*
42  * Barrier tracing can be enabled on a normal build to collect information
43  * on barrier use, including timings and call stacks.  Deliberately not
44  * keyed off CLIB_DEBUG, because that can add significant overhead which
45  * imapacts observed timings.
46  */
47
48 u32
49 elog_global_id_for_msg_name (const char *msg_name)
50 {
51   uword *p, r;
52   static uword *h;
53   u8 *name_copy;
54
55   if (!h)
56     h = hash_create_string (0, sizeof (uword));
57
58   p = hash_get_mem (h, msg_name);
59   if (p)
60     return p[0];
61   r = elog_string (&vlib_global_main.elog_main, "%s", msg_name);
62
63   name_copy = format (0, "%s%c", msg_name, 0);
64
65   hash_set_mem (h, name_copy, r);
66
67   return r;
68 }
69
70 static inline void
71 barrier_trace_sync (f64 t_entry, f64 t_open, f64 t_closed)
72 {
73   if (!vlib_worker_threads->barrier_elog_enabled)
74     return;
75
76     /* *INDENT-OFF* */
77     ELOG_TYPE_DECLARE (e) =
78       {
79         .format = "bar-trace-%s-#%d",
80         .format_args = "T4i4",
81       };
82     /* *INDENT-ON* */
83   struct
84   {
85     u32 caller, count, t_entry, t_open, t_closed;
86   } *ed = 0;
87
88   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
89   ed->count = (int) vlib_worker_threads[0].barrier_sync_count;
90   ed->caller = elog_global_id_for_msg_name
91     (vlib_worker_threads[0].barrier_caller);
92   ed->t_entry = (int) (1000000.0 * t_entry);
93   ed->t_open = (int) (1000000.0 * t_open);
94   ed->t_closed = (int) (1000000.0 * t_closed);
95 }
96
97 static inline void
98 barrier_trace_sync_rec (f64 t_entry)
99 {
100   if (!vlib_worker_threads->barrier_elog_enabled)
101     return;
102
103     /* *INDENT-OFF* */
104     ELOG_TYPE_DECLARE (e) =
105       {
106         .format = "bar-syncrec-%s-#%d",
107         .format_args = "T4i4",
108       };
109     /* *INDENT-ON* */
110   struct
111   {
112     u32 caller, depth;
113   } *ed = 0;
114
115   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
116   ed->depth = (int) vlib_worker_threads[0].recursion_level - 1;
117   ed->caller = elog_global_id_for_msg_name
118     (vlib_worker_threads[0].barrier_caller);
119 }
120
121 static inline void
122 barrier_trace_release_rec (f64 t_entry)
123 {
124   if (!vlib_worker_threads->barrier_elog_enabled)
125     return;
126
127     /* *INDENT-OFF* */
128     ELOG_TYPE_DECLARE (e) =
129       {
130         .format = "bar-relrrec-#%d",
131         .format_args = "i4",
132       };
133     /* *INDENT-ON* */
134   struct
135   {
136     u32 depth;
137   } *ed = 0;
138
139   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
140   ed->depth = (int) vlib_worker_threads[0].recursion_level;
141 }
142
143 static inline void
144 barrier_trace_release (f64 t_entry, f64 t_closed_total, f64 t_update_main)
145 {
146   if (!vlib_worker_threads->barrier_elog_enabled)
147     return;
148
149     /* *INDENT-OFF* */
150     ELOG_TYPE_DECLARE (e) =
151       {
152         .format = "bar-rel-#%d-e%d-u%d-t%d",
153         .format_args = "i4i4i4i4",
154       };
155     /* *INDENT-ON* */
156   struct
157   {
158     u32 count, t_entry, t_update_main, t_closed_total;
159   } *ed = 0;
160
161   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
162   ed->t_entry = (int) (1000000.0 * t_entry);
163   ed->t_update_main = (int) (1000000.0 * t_update_main);
164   ed->t_closed_total = (int) (1000000.0 * t_closed_total);
165   ed->count = (int) vlib_worker_threads[0].barrier_sync_count;
166
167   /* Reset context for next trace */
168   vlib_worker_threads[0].barrier_context = NULL;
169 }
170
171 uword
172 os_get_nthreads (void)
173 {
174   return vec_len (vlib_thread_stacks);
175 }
176
177 void
178 vlib_set_thread_name (char *name)
179 {
180   int pthread_setname_np (pthread_t __target_thread, const char *__name);
181   int rv;
182   pthread_t thread = pthread_self ();
183
184   if (thread)
185     {
186       rv = pthread_setname_np (thread, name);
187       if (rv)
188         clib_warning ("pthread_setname_np returned %d", rv);
189     }
190 }
191
192 static int
193 sort_registrations_by_no_clone (void *a0, void *a1)
194 {
195   vlib_thread_registration_t **tr0 = a0;
196   vlib_thread_registration_t **tr1 = a1;
197
198   return ((i32) ((*tr0)->no_data_structure_clone)
199           - ((i32) ((*tr1)->no_data_structure_clone)));
200 }
201
202 static uword *
203 clib_sysfs_list_to_bitmap (char *filename)
204 {
205   FILE *fp;
206   uword *r = 0;
207
208   fp = fopen (filename, "r");
209
210   if (fp != NULL)
211     {
212       u8 *buffer = 0;
213       vec_validate (buffer, 256 - 1);
214       if (fgets ((char *) buffer, 256, fp))
215         {
216           unformat_input_t in;
217           unformat_init_string (&in, (char *) buffer,
218                                 strlen ((char *) buffer));
219           if (unformat (&in, "%U", unformat_bitmap_list, &r) != 1)
220             clib_warning ("unformat_bitmap_list failed");
221           unformat_free (&in);
222         }
223       vec_free (buffer);
224       fclose (fp);
225     }
226   return r;
227 }
228
229
230 /* Called early in the init sequence */
231
232 clib_error_t *
233 vlib_thread_init (vlib_main_t * vm)
234 {
235   vlib_thread_main_t *tm = &vlib_thread_main;
236   vlib_worker_thread_t *w;
237   vlib_thread_registration_t *tr;
238   u32 n_vlib_mains = 1;
239   u32 first_index = 1;
240   u32 i;
241   uword *avail_cpu;
242
243   /* get bitmaps of active cpu cores and sockets */
244   tm->cpu_core_bitmap =
245     clib_sysfs_list_to_bitmap ("/sys/devices/system/cpu/online");
246   tm->cpu_socket_bitmap =
247     clib_sysfs_list_to_bitmap ("/sys/devices/system/node/online");
248
249   avail_cpu = clib_bitmap_dup (tm->cpu_core_bitmap);
250
251   /* skip cores */
252   for (i = 0; i < tm->skip_cores; i++)
253     {
254       uword c = clib_bitmap_first_set (avail_cpu);
255       if (c == ~0)
256         return clib_error_return (0, "no available cpus to skip");
257
258       avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
259     }
260
261   /* grab cpu for main thread */
262   if (tm->main_lcore == ~0)
263     {
264       /* if main-lcore is not set, we try to use lcore 1 */
265       if (clib_bitmap_get (avail_cpu, 1))
266         tm->main_lcore = 1;
267       else
268         tm->main_lcore = clib_bitmap_first_set (avail_cpu);
269       if (tm->main_lcore == (u8) ~ 0)
270         return clib_error_return (0, "no available cpus to be used for the"
271                                   " main thread");
272     }
273   else
274     {
275       if (clib_bitmap_get (avail_cpu, tm->main_lcore) == 0)
276         return clib_error_return (0, "cpu %u is not available to be used"
277                                   " for the main thread", tm->main_lcore);
278     }
279   avail_cpu = clib_bitmap_set (avail_cpu, tm->main_lcore, 0);
280
281   /* assume that there is socket 0 only if there is no data from sysfs */
282   if (!tm->cpu_socket_bitmap)
283     tm->cpu_socket_bitmap = clib_bitmap_set (0, 0, 1);
284
285   /* pin main thread to main_lcore  */
286   if (tm->cb.vlib_thread_set_lcore_cb)
287     {
288       tm->cb.vlib_thread_set_lcore_cb (0, tm->main_lcore);
289     }
290   else
291     {
292       cpu_set_t cpuset;
293       CPU_ZERO (&cpuset);
294       CPU_SET (tm->main_lcore, &cpuset);
295       pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
296     }
297
298   /* Set up thread 0 */
299   vec_validate_aligned (vlib_worker_threads, 0, CLIB_CACHE_LINE_BYTES);
300   _vec_len (vlib_worker_threads) = 1;
301   w = vlib_worker_threads;
302   w->thread_mheap = clib_mem_get_heap ();
303   w->thread_stack = vlib_thread_stacks[0];
304   w->cpu_id = tm->main_lcore;
305   w->lwp = syscall (SYS_gettid);
306   w->thread_id = pthread_self ();
307   tm->n_vlib_mains = 1;
308
309   if (tm->sched_policy != ~0)
310     {
311       struct sched_param sched_param;
312       if (!sched_getparam (w->lwp, &sched_param))
313         {
314           if (tm->sched_priority != ~0)
315             sched_param.sched_priority = tm->sched_priority;
316           sched_setscheduler (w->lwp, tm->sched_policy, &sched_param);
317         }
318     }
319
320   /* assign threads to cores and set n_vlib_mains */
321   tr = tm->next;
322
323   while (tr)
324     {
325       vec_add1 (tm->registrations, tr);
326       tr = tr->next;
327     }
328
329   vec_sort_with_function (tm->registrations, sort_registrations_by_no_clone);
330
331   for (i = 0; i < vec_len (tm->registrations); i++)
332     {
333       int j;
334       tr = tm->registrations[i];
335       tr->first_index = first_index;
336       first_index += tr->count;
337       n_vlib_mains += (tr->no_data_structure_clone == 0) ? tr->count : 0;
338
339       /* construct coremask */
340       if (tr->use_pthreads || !tr->count)
341         continue;
342
343       if (tr->coremask)
344         {
345           uword c;
346           /* *INDENT-OFF* */
347           clib_bitmap_foreach (c, tr->coremask, ({
348             if (clib_bitmap_get(avail_cpu, c) == 0)
349               return clib_error_return (0, "cpu %u is not available to be used"
350                                         " for the '%s' thread",c, tr->name);
351
352             avail_cpu = clib_bitmap_set(avail_cpu, c, 0);
353           }));
354           /* *INDENT-ON* */
355         }
356       else
357         {
358           for (j = 0; j < tr->count; j++)
359             {
360               uword c = clib_bitmap_first_set (avail_cpu);
361               if (c == ~0)
362                 return clib_error_return (0,
363                                           "no available cpus to be used for"
364                                           " the '%s' thread", tr->name);
365
366               avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
367               tr->coremask = clib_bitmap_set (tr->coremask, c, 1);
368             }
369         }
370     }
371
372   clib_bitmap_free (avail_cpu);
373
374   tm->n_vlib_mains = n_vlib_mains;
375
376   /*
377    * Allocate the remaining worker threads, and thread stack vector slots
378    * from now on, calls to os_get_nthreads() will return the correct
379    * answer.
380    */
381   vec_validate_aligned (vlib_worker_threads, first_index - 1,
382                         CLIB_CACHE_LINE_BYTES);
383   vec_validate (vlib_thread_stacks, vec_len (vlib_worker_threads) - 1);
384   return 0;
385 }
386
387 vlib_frame_queue_t *
388 vlib_frame_queue_alloc (int nelts)
389 {
390   vlib_frame_queue_t *fq;
391
392   fq = clib_mem_alloc_aligned (sizeof (*fq), CLIB_CACHE_LINE_BYTES);
393   clib_memset (fq, 0, sizeof (*fq));
394   fq->nelts = nelts;
395   fq->vector_threshold = 128;   // packets
396   vec_validate_aligned (fq->elts, nelts - 1, CLIB_CACHE_LINE_BYTES);
397
398   if (1)
399     {
400       if (((uword) & fq->tail) & (CLIB_CACHE_LINE_BYTES - 1))
401         fformat (stderr, "WARNING: fq->tail unaligned\n");
402       if (((uword) & fq->head) & (CLIB_CACHE_LINE_BYTES - 1))
403         fformat (stderr, "WARNING: fq->head unaligned\n");
404       if (((uword) fq->elts) & (CLIB_CACHE_LINE_BYTES - 1))
405         fformat (stderr, "WARNING: fq->elts unaligned\n");
406
407       if (sizeof (fq->elts[0]) % CLIB_CACHE_LINE_BYTES)
408         fformat (stderr, "WARNING: fq->elts[0] size %d\n",
409                  sizeof (fq->elts[0]));
410       if (nelts & (nelts - 1))
411         {
412           fformat (stderr, "FATAL: nelts MUST be a power of 2\n");
413           abort ();
414         }
415     }
416
417   return (fq);
418 }
419
420 void vl_msg_api_handler_no_free (void *) __attribute__ ((weak));
421 void
422 vl_msg_api_handler_no_free (void *v)
423 {
424 }
425
426 /* Turned off, save as reference material... */
427 #if 0
428 static inline int
429 vlib_frame_queue_dequeue_internal (int thread_id,
430                                    vlib_main_t * vm, vlib_node_main_t * nm)
431 {
432   vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
433   vlib_frame_queue_elt_t *elt;
434   vlib_frame_t *f;
435   vlib_pending_frame_t *p;
436   vlib_node_runtime_t *r;
437   u32 node_runtime_index;
438   int msg_type;
439   u64 before;
440   int processed = 0;
441
442   ASSERT (vm == vlib_mains[thread_id]);
443
444   while (1)
445     {
446       if (fq->head == fq->tail)
447         return processed;
448
449       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
450
451       if (!elt->valid)
452         return processed;
453
454       before = clib_cpu_time_now ();
455
456       f = elt->frame;
457       node_runtime_index = elt->node_runtime_index;
458       msg_type = elt->msg_type;
459
460       switch (msg_type)
461         {
462         case VLIB_FRAME_QUEUE_ELT_FREE_BUFFERS:
463           vlib_buffer_free (vm, vlib_frame_vector_args (f), f->n_vectors);
464           /* note fallthrough... */
465         case VLIB_FRAME_QUEUE_ELT_FREE_FRAME:
466           r = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
467                                 node_runtime_index);
468           vlib_frame_free (vm, r, f);
469           break;
470         case VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME:
471           vec_add2 (vm->node_main.pending_frames, p, 1);
472           f->flags |= (VLIB_FRAME_PENDING | VLIB_FRAME_FREE_AFTER_DISPATCH);
473           p->node_runtime_index = elt->node_runtime_index;
474           p->frame_index = vlib_frame_index (vm, f);
475           p->next_frame_index = VLIB_PENDING_FRAME_NO_NEXT_FRAME;
476           fq->dequeue_vectors += (u64) f->n_vectors;
477           break;
478         case VLIB_FRAME_QUEUE_ELT_API_MSG:
479           vl_msg_api_handler_no_free (f);
480           break;
481         default:
482           clib_warning ("bogus frame queue message, type %d", msg_type);
483           break;
484         }
485       elt->valid = 0;
486       fq->dequeues++;
487       fq->dequeue_ticks += clib_cpu_time_now () - before;
488       CLIB_MEMORY_BARRIER ();
489       fq->head++;
490       processed++;
491     }
492   ASSERT (0);
493   return processed;
494 }
495
496 int
497 vlib_frame_queue_dequeue (int thread_id,
498                           vlib_main_t * vm, vlib_node_main_t * nm)
499 {
500   return vlib_frame_queue_dequeue_internal (thread_id, vm, nm);
501 }
502
503 int
504 vlib_frame_queue_enqueue (vlib_main_t * vm, u32 node_runtime_index,
505                           u32 frame_queue_index, vlib_frame_t * frame,
506                           vlib_frame_queue_msg_type_t type)
507 {
508   vlib_frame_queue_t *fq = vlib_frame_queues[frame_queue_index];
509   vlib_frame_queue_elt_t *elt;
510   u32 save_count;
511   u64 new_tail;
512   u64 before = clib_cpu_time_now ();
513
514   ASSERT (fq);
515
516   new_tail = clib_atomic_add_fetch (&fq->tail, 1);
517
518   /* Wait until a ring slot is available */
519   while (new_tail >= fq->head + fq->nelts)
520     {
521       f64 b4 = vlib_time_now_ticks (vm, before);
522       vlib_worker_thread_barrier_check (vm, b4);
523       /* Bad idea. Dequeue -> enqueue -> dequeue -> trouble */
524       // vlib_frame_queue_dequeue (vm->thread_index, vm, nm);
525     }
526
527   elt = fq->elts + (new_tail & (fq->nelts - 1));
528
529   /* this would be very bad... */
530   while (elt->valid)
531     {
532     }
533
534   /* Once we enqueue the frame, frame->n_vectors is owned elsewhere... */
535   save_count = frame->n_vectors;
536
537   elt->frame = frame;
538   elt->node_runtime_index = node_runtime_index;
539   elt->msg_type = type;
540   CLIB_MEMORY_BARRIER ();
541   elt->valid = 1;
542
543   return save_count;
544 }
545 #endif /* 0 */
546
547 /* To be called by vlib worker threads upon startup */
548 void
549 vlib_worker_thread_init (vlib_worker_thread_t * w)
550 {
551   vlib_thread_main_t *tm = vlib_get_thread_main ();
552
553   /*
554    * Note: disabling signals in worker threads as follows
555    * prevents the api post-mortem dump scheme from working
556    * {
557    *    sigset_t s;
558    *    sigfillset (&s);
559    *    pthread_sigmask (SIG_SETMASK, &s, 0);
560    *  }
561    */
562
563   clib_mem_set_heap (w->thread_mheap);
564
565   if (vec_len (tm->thread_prefix) && w->registration->short_name)
566     {
567       w->name = format (0, "%v_%s_%d%c", tm->thread_prefix,
568                         w->registration->short_name, w->instance_id, '\0');
569       vlib_set_thread_name ((char *) w->name);
570     }
571
572   if (!w->registration->use_pthreads)
573     {
574
575       /* Initial barrier sync, for both worker and i/o threads */
576       clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, 1);
577
578       while (*vlib_worker_threads->wait_at_barrier)
579         ;
580
581       clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, -1);
582     }
583 }
584
585 void *
586 vlib_worker_thread_bootstrap_fn (void *arg)
587 {
588   void *rv;
589   vlib_worker_thread_t *w = arg;
590
591   w->lwp = syscall (SYS_gettid);
592   w->thread_id = pthread_self ();
593
594   __os_thread_index = w - vlib_worker_threads;
595
596   rv = (void *) clib_calljmp
597     ((uword (*)(uword)) w->thread_function,
598      (uword) arg, w->thread_stack + VLIB_THREAD_STACK_SIZE);
599   /* NOTREACHED, we hope */
600   return rv;
601 }
602
603 static void
604 vlib_get_thread_core_socket (vlib_worker_thread_t * w, unsigned cpu_id)
605 {
606   const char *sys_cpu_path = "/sys/devices/system/cpu/cpu";
607   u8 *p = 0;
608   int core_id = -1, socket_id = -1;
609
610   p = format (p, "%s%u/topology/core_id%c", sys_cpu_path, cpu_id, 0);
611   clib_sysfs_read ((char *) p, "%d", &core_id);
612   vec_reset_length (p);
613   p =
614     format (p, "%s%u/topology/physical_package_id%c", sys_cpu_path, cpu_id,
615             0);
616   clib_sysfs_read ((char *) p, "%d", &socket_id);
617   vec_free (p);
618
619   w->core_id = core_id;
620   w->socket_id = socket_id;
621 }
622
623 static clib_error_t *
624 vlib_launch_thread_int (void *fp, vlib_worker_thread_t * w, unsigned cpu_id)
625 {
626   vlib_thread_main_t *tm = &vlib_thread_main;
627   void *(*fp_arg) (void *) = fp;
628
629   w->cpu_id = cpu_id;
630   vlib_get_thread_core_socket (w, cpu_id);
631   if (tm->cb.vlib_launch_thread_cb && !w->registration->use_pthreads)
632     return tm->cb.vlib_launch_thread_cb (fp, (void *) w, cpu_id);
633   else
634     {
635       pthread_t worker;
636       cpu_set_t cpuset;
637       CPU_ZERO (&cpuset);
638       CPU_SET (cpu_id, &cpuset);
639
640       if (pthread_create (&worker, NULL /* attr */ , fp_arg, (void *) w))
641         return clib_error_return_unix (0, "pthread_create");
642
643       if (pthread_setaffinity_np (worker, sizeof (cpu_set_t), &cpuset))
644         return clib_error_return_unix (0, "pthread_setaffinity_np");
645
646       return 0;
647     }
648 }
649
650 static clib_error_t *
651 start_workers (vlib_main_t * vm)
652 {
653   int i, j;
654   vlib_worker_thread_t *w;
655   vlib_main_t *vm_clone;
656   void *oldheap;
657   vlib_thread_main_t *tm = &vlib_thread_main;
658   vlib_thread_registration_t *tr;
659   vlib_node_runtime_t *rt;
660   u32 n_vlib_mains = tm->n_vlib_mains;
661   u32 worker_thread_index;
662   u8 *main_heap = clib_mem_get_per_cpu_heap ();
663
664   vec_reset_length (vlib_worker_threads);
665
666   /* Set up the main thread */
667   vec_add2_aligned (vlib_worker_threads, w, 1, CLIB_CACHE_LINE_BYTES);
668   w->elog_track.name = "main thread";
669   elog_track_register (&vm->elog_main, &w->elog_track);
670
671   if (vec_len (tm->thread_prefix))
672     {
673       w->name = format (0, "%v_main%c", tm->thread_prefix, '\0');
674       vlib_set_thread_name ((char *) w->name);
675     }
676
677   vm->elog_main.lock =
678     clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES, CLIB_CACHE_LINE_BYTES);
679   vm->elog_main.lock[0] = 0;
680
681   if (n_vlib_mains > 1)
682     {
683       /* Replace hand-crafted length-1 vector with a real vector */
684       vlib_mains = 0;
685
686       vec_validate_aligned (vlib_mains, tm->n_vlib_mains - 1,
687                             CLIB_CACHE_LINE_BYTES);
688       _vec_len (vlib_mains) = 0;
689       vec_add1_aligned (vlib_mains, vm, CLIB_CACHE_LINE_BYTES);
690
691       vlib_worker_threads->wait_at_barrier =
692         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
693       vlib_worker_threads->workers_at_barrier =
694         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
695
696       vlib_worker_threads->node_reforks_required =
697         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
698
699       /* We'll need the rpc vector lock... */
700       clib_spinlock_init (&vm->pending_rpc_lock);
701
702       /* Ask for an initial barrier sync */
703       *vlib_worker_threads->workers_at_barrier = 0;
704       *vlib_worker_threads->wait_at_barrier = 1;
705
706       /* Without update or refork */
707       *vlib_worker_threads->node_reforks_required = 0;
708       vm->need_vlib_worker_thread_node_runtime_update = 0;
709
710       /* init timing */
711       vm->barrier_epoch = 0;
712       vm->barrier_no_close_before = 0;
713
714       worker_thread_index = 1;
715
716       for (i = 0; i < vec_len (tm->registrations); i++)
717         {
718           vlib_node_main_t *nm, *nm_clone;
719           int k;
720
721           tr = tm->registrations[i];
722
723           if (tr->count == 0)
724             continue;
725
726           for (k = 0; k < tr->count; k++)
727             {
728               vlib_node_t *n;
729
730               vec_add2 (vlib_worker_threads, w, 1);
731               /* Currently unused, may not really work */
732               if (tr->mheap_size)
733                 {
734 #if USE_DLMALLOC == 0
735                   w->thread_mheap =
736                     mheap_alloc (0 /* use VM */ , tr->mheap_size);
737 #else
738                   w->thread_mheap = create_mspace (tr->mheap_size,
739                                                    0 /* unlocked */ );
740 #endif
741                 }
742               else
743                 w->thread_mheap = main_heap;
744
745               w->thread_stack =
746                 vlib_thread_stack_init (w - vlib_worker_threads);
747               w->thread_function = tr->function;
748               w->thread_function_arg = w;
749               w->instance_id = k;
750               w->registration = tr;
751
752               w->elog_track.name =
753                 (char *) format (0, "%s %d", tr->name, k + 1);
754               vec_add1 (w->elog_track.name, 0);
755               elog_track_register (&vm->elog_main, &w->elog_track);
756
757               if (tr->no_data_structure_clone)
758                 continue;
759
760               /* Fork vlib_global_main et al. Look for bugs here */
761               oldheap = clib_mem_set_heap (w->thread_mheap);
762
763               vm_clone = clib_mem_alloc_aligned (sizeof (*vm_clone),
764                                                  CLIB_CACHE_LINE_BYTES);
765               clib_memcpy (vm_clone, vlib_mains[0], sizeof (*vm_clone));
766
767               vm_clone->thread_index = worker_thread_index;
768               vm_clone->heap_base = w->thread_mheap;
769               vm_clone->heap_aligned_base = (void *)
770                 (((uword) w->thread_mheap) & ~(VLIB_FRAME_ALIGN - 1));
771               vm_clone->init_functions_called =
772                 hash_create (0, /* value bytes */ 0);
773               vm_clone->pending_rpc_requests = 0;
774               vec_validate (vm_clone->pending_rpc_requests, 0);
775               _vec_len (vm_clone->pending_rpc_requests) = 0;
776               clib_memset (&vm_clone->random_buffer, 0,
777                            sizeof (vm_clone->random_buffer));
778
779               nm = &vlib_mains[0]->node_main;
780               nm_clone = &vm_clone->node_main;
781               /* fork next frames array, preserving node runtime indices */
782               nm_clone->next_frames = vec_dup_aligned (nm->next_frames,
783                                                        CLIB_CACHE_LINE_BYTES);
784               for (j = 0; j < vec_len (nm_clone->next_frames); j++)
785                 {
786                   vlib_next_frame_t *nf = &nm_clone->next_frames[j];
787                   u32 save_node_runtime_index;
788                   u32 save_flags;
789
790                   save_node_runtime_index = nf->node_runtime_index;
791                   save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
792                   vlib_next_frame_init (nf);
793                   nf->node_runtime_index = save_node_runtime_index;
794                   nf->flags = save_flags;
795                 }
796
797               /* fork the frame dispatch queue */
798               nm_clone->pending_frames = 0;
799               vec_validate (nm_clone->pending_frames, 10);
800               _vec_len (nm_clone->pending_frames) = 0;
801
802               /* fork nodes */
803               nm_clone->nodes = 0;
804
805               /* Allocate all nodes in single block for speed */
806               n = clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*n));
807
808               for (j = 0; j < vec_len (nm->nodes); j++)
809                 {
810                   clib_memcpy (n, nm->nodes[j], sizeof (*n));
811                   /* none of the copied nodes have enqueue rights given out */
812                   n->owner_node_index = VLIB_INVALID_NODE_INDEX;
813                   clib_memset (&n->stats_total, 0, sizeof (n->stats_total));
814                   clib_memset (&n->stats_last_clear, 0,
815                                sizeof (n->stats_last_clear));
816                   vec_add1 (nm_clone->nodes, n);
817                   n++;
818                 }
819               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
820                 vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
821                                  CLIB_CACHE_LINE_BYTES);
822               vec_foreach (rt,
823                            nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
824               {
825                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
826                 rt->thread_index = vm_clone->thread_index;
827                 /* copy initial runtime_data from node */
828                 if (n->runtime_data && n->runtime_data_bytes > 0)
829                   clib_memcpy (rt->runtime_data, n->runtime_data,
830                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
831                                          n->runtime_data_bytes));
832               }
833
834               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
835                 vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
836                                  CLIB_CACHE_LINE_BYTES);
837               vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
838               {
839                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
840                 rt->thread_index = vm_clone->thread_index;
841                 /* copy initial runtime_data from node */
842                 if (n->runtime_data && n->runtime_data_bytes > 0)
843                   clib_memcpy (rt->runtime_data, n->runtime_data,
844                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
845                                          n->runtime_data_bytes));
846               }
847
848               nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT] =
849                 vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT],
850                                  CLIB_CACHE_LINE_BYTES);
851               vec_foreach (rt,
852                            nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT])
853               {
854                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
855                 rt->thread_index = vm_clone->thread_index;
856                 /* copy initial runtime_data from node */
857                 if (n->runtime_data && n->runtime_data_bytes > 0)
858                   clib_memcpy (rt->runtime_data, n->runtime_data,
859                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
860                                          n->runtime_data_bytes));
861               }
862
863               nm_clone->processes = vec_dup_aligned (nm->processes,
864                                                      CLIB_CACHE_LINE_BYTES);
865
866               /* Create per-thread frame freelist */
867               nm_clone->frame_sizes = vec_new (vlib_frame_size_t, 1);
868 #ifdef VLIB_SUPPORTS_ARBITRARY_SCALAR_SIZES
869               nm_clone->frame_size_hash = hash_create (0, sizeof (uword));
870 #endif
871
872               /* Packet trace buffers are guaranteed to be empty, nothing to do here */
873
874               clib_mem_set_heap (oldheap);
875               vec_add1_aligned (vlib_mains, vm_clone, CLIB_CACHE_LINE_BYTES);
876
877               /* Switch to the stats segment ... */
878               void *oldheap = vlib_stats_push_heap (0);
879               vm_clone->error_main.counters = vec_dup_aligned
880                 (vlib_mains[0]->error_main.counters, CLIB_CACHE_LINE_BYTES);
881               vlib_stats_pop_heap2 (vm_clone->error_main.counters,
882                                     worker_thread_index, oldheap, 1);
883
884               vm_clone->error_main.counters_last_clear = vec_dup_aligned
885                 (vlib_mains[0]->error_main.counters_last_clear,
886                  CLIB_CACHE_LINE_BYTES);
887
888               worker_thread_index++;
889             }
890         }
891     }
892   else
893     {
894       /* only have non-data-structure copy threads to create... */
895       for (i = 0; i < vec_len (tm->registrations); i++)
896         {
897           tr = tm->registrations[i];
898
899           for (j = 0; j < tr->count; j++)
900             {
901               vec_add2 (vlib_worker_threads, w, 1);
902               if (tr->mheap_size)
903                 {
904 #if USE_DLMALLOC == 0
905                   w->thread_mheap =
906                     mheap_alloc (0 /* use VM */ , tr->mheap_size);
907 #else
908                   w->thread_mheap =
909                     create_mspace (tr->mheap_size, 0 /* locked */ );
910 #endif
911                 }
912               else
913                 w->thread_mheap = main_heap;
914               w->thread_stack =
915                 vlib_thread_stack_init (w - vlib_worker_threads);
916               w->thread_function = tr->function;
917               w->thread_function_arg = w;
918               w->instance_id = j;
919               w->elog_track.name =
920                 (char *) format (0, "%s %d", tr->name, j + 1);
921               w->registration = tr;
922               vec_add1 (w->elog_track.name, 0);
923               elog_track_register (&vm->elog_main, &w->elog_track);
924             }
925         }
926     }
927
928   worker_thread_index = 1;
929
930   for (i = 0; i < vec_len (tm->registrations); i++)
931     {
932       clib_error_t *err;
933       int j;
934
935       tr = tm->registrations[i];
936
937       if (tr->use_pthreads || tm->use_pthreads)
938         {
939           for (j = 0; j < tr->count; j++)
940             {
941               w = vlib_worker_threads + worker_thread_index++;
942               err = vlib_launch_thread_int (vlib_worker_thread_bootstrap_fn,
943                                             w, 0);
944               if (err)
945                 clib_error_report (err);
946             }
947         }
948       else
949         {
950           uword c;
951           /* *INDENT-OFF* */
952           clib_bitmap_foreach (c, tr->coremask, ({
953             w = vlib_worker_threads + worker_thread_index++;
954             err = vlib_launch_thread_int (vlib_worker_thread_bootstrap_fn,
955                                           w, c);
956             if (err)
957               clib_error_report (err);
958           }));
959           /* *INDENT-ON* */
960         }
961     }
962   vlib_worker_thread_barrier_sync (vm);
963   vlib_worker_thread_barrier_release (vm);
964   return 0;
965 }
966
967 VLIB_MAIN_LOOP_ENTER_FUNCTION (start_workers);
968
969
970 static inline void
971 worker_thread_node_runtime_update_internal (void)
972 {
973   int i, j;
974   vlib_main_t *vm;
975   vlib_node_main_t *nm, *nm_clone;
976   vlib_main_t *vm_clone;
977   vlib_node_runtime_t *rt;
978   never_inline void
979     vlib_node_runtime_sync_stats (vlib_main_t * vm,
980                                   vlib_node_runtime_t * r,
981                                   uword n_calls,
982                                   uword n_vectors, uword n_clocks);
983
984   ASSERT (vlib_get_thread_index () == 0);
985
986   vm = vlib_mains[0];
987   nm = &vm->node_main;
988
989   ASSERT (*vlib_worker_threads->wait_at_barrier == 1);
990
991   /*
992    * Scrape all runtime stats, so we don't lose node runtime(s) with
993    * pending counts, or throw away worker / io thread counts.
994    */
995   for (j = 0; j < vec_len (nm->nodes); j++)
996     {
997       vlib_node_t *n;
998       n = nm->nodes[j];
999       vlib_node_sync_stats (vm, n);
1000     }
1001
1002   for (i = 1; i < vec_len (vlib_mains); i++)
1003     {
1004       vlib_node_t *n;
1005
1006       vm_clone = vlib_mains[i];
1007       nm_clone = &vm_clone->node_main;
1008
1009       for (j = 0; j < vec_len (nm_clone->nodes); j++)
1010         {
1011           n = nm_clone->nodes[j];
1012
1013           rt = vlib_node_get_runtime (vm_clone, n->index);
1014           vlib_node_runtime_sync_stats (vm_clone, rt, 0, 0, 0);
1015         }
1016     }
1017
1018   /* Per-worker clone rebuilds are now done on each thread */
1019 }
1020
1021
1022 void
1023 vlib_worker_thread_node_refork (void)
1024 {
1025   vlib_main_t *vm, *vm_clone;
1026   vlib_node_main_t *nm, *nm_clone;
1027   vlib_node_t **old_nodes_clone;
1028   vlib_node_runtime_t *rt, *old_rt;
1029
1030   vlib_node_t *new_n_clone;
1031
1032   int j;
1033
1034   vm = vlib_mains[0];
1035   nm = &vm->node_main;
1036   vm_clone = vlib_get_main ();
1037   nm_clone = &vm_clone->node_main;
1038
1039   /* Re-clone error heap */
1040   u64 *old_counters = vm_clone->error_main.counters;
1041   u64 *old_counters_all_clear = vm_clone->error_main.counters_last_clear;
1042
1043   clib_memcpy_fast (&vm_clone->error_main, &vm->error_main,
1044                     sizeof (vm->error_main));
1045   j = vec_len (vm->error_main.counters) - 1;
1046
1047   /* Switch to the stats segment ... */
1048   void *oldheap = vlib_stats_push_heap (0);
1049   vec_validate_aligned (old_counters, j, CLIB_CACHE_LINE_BYTES);
1050   vm_clone->error_main.counters = old_counters;
1051   vlib_stats_pop_heap2 (vm_clone->error_main.counters, vm_clone->thread_index,
1052                         oldheap, 0);
1053
1054   vec_validate_aligned (old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES);
1055   vm_clone->error_main.counters_last_clear = old_counters_all_clear;
1056
1057   nm_clone = &vm_clone->node_main;
1058   vec_free (nm_clone->next_frames);
1059   nm_clone->next_frames = vec_dup_aligned (nm->next_frames,
1060                                            CLIB_CACHE_LINE_BYTES);
1061
1062   for (j = 0; j < vec_len (nm_clone->next_frames); j++)
1063     {
1064       vlib_next_frame_t *nf = &nm_clone->next_frames[j];
1065       u32 save_node_runtime_index;
1066       u32 save_flags;
1067
1068       save_node_runtime_index = nf->node_runtime_index;
1069       save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
1070       vlib_next_frame_init (nf);
1071       nf->node_runtime_index = save_node_runtime_index;
1072       nf->flags = save_flags;
1073     }
1074
1075   old_nodes_clone = nm_clone->nodes;
1076   nm_clone->nodes = 0;
1077
1078   /* re-fork nodes */
1079
1080   /* Allocate all nodes in single block for speed */
1081   new_n_clone =
1082     clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*new_n_clone));
1083   for (j = 0; j < vec_len (nm->nodes); j++)
1084     {
1085       vlib_node_t *old_n_clone;
1086       vlib_node_t *new_n;
1087
1088       new_n = nm->nodes[j];
1089       old_n_clone = old_nodes_clone[j];
1090
1091       clib_memcpy_fast (new_n_clone, new_n, sizeof (*new_n));
1092       /* none of the copied nodes have enqueue rights given out */
1093       new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX;
1094
1095       if (j >= vec_len (old_nodes_clone))
1096         {
1097           /* new node, set to zero */
1098           clib_memset (&new_n_clone->stats_total, 0,
1099                        sizeof (new_n_clone->stats_total));
1100           clib_memset (&new_n_clone->stats_last_clear, 0,
1101                        sizeof (new_n_clone->stats_last_clear));
1102         }
1103       else
1104         {
1105           /* Copy stats if the old data is valid */
1106           clib_memcpy_fast (&new_n_clone->stats_total,
1107                             &old_n_clone->stats_total,
1108                             sizeof (new_n_clone->stats_total));
1109           clib_memcpy_fast (&new_n_clone->stats_last_clear,
1110                             &old_n_clone->stats_last_clear,
1111                             sizeof (new_n_clone->stats_last_clear));
1112
1113           /* keep previous node state */
1114           new_n_clone->state = old_n_clone->state;
1115         }
1116       vec_add1 (nm_clone->nodes, new_n_clone);
1117       new_n_clone++;
1118     }
1119   /* Free the old node clones */
1120   clib_mem_free (old_nodes_clone[0]);
1121
1122   vec_free (old_nodes_clone);
1123
1124
1125   /* re-clone internal nodes */
1126   old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL];
1127   nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
1128     vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
1129                      CLIB_CACHE_LINE_BYTES);
1130
1131   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
1132   {
1133     vlib_node_t *n = vlib_get_node (vm, rt->node_index);
1134     rt->thread_index = vm_clone->thread_index;
1135     /* copy runtime_data, will be overwritten later for existing rt */
1136     if (n->runtime_data && n->runtime_data_bytes > 0)
1137       clib_memcpy_fast (rt->runtime_data, n->runtime_data,
1138                         clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
1139                                   n->runtime_data_bytes));
1140   }
1141
1142   for (j = 0; j < vec_len (old_rt); j++)
1143     {
1144       rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
1145       rt->state = old_rt[j].state;
1146       clib_memcpy_fast (rt->runtime_data, old_rt[j].runtime_data,
1147                         VLIB_NODE_RUNTIME_DATA_SIZE);
1148     }
1149
1150   vec_free (old_rt);
1151
1152   /* re-clone input nodes */
1153   old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT];
1154   nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
1155     vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
1156                      CLIB_CACHE_LINE_BYTES);
1157
1158   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
1159   {
1160     vlib_node_t *n = vlib_get_node (vm, rt->node_index);
1161     rt->thread_index = vm_clone->thread_index;
1162     /* copy runtime_data, will be overwritten later for existing rt */
1163     if (n->runtime_data && n->runtime_data_bytes > 0)
1164       clib_memcpy_fast (rt->runtime_data, n->runtime_data,
1165                         clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
1166                                   n->runtime_data_bytes));
1167   }
1168
1169   for (j = 0; j < vec_len (old_rt); j++)
1170     {
1171       rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
1172       rt->state = old_rt[j].state;
1173       clib_memcpy_fast (rt->runtime_data, old_rt[j].runtime_data,
1174                         VLIB_NODE_RUNTIME_DATA_SIZE);
1175     }
1176
1177   vec_free (old_rt);
1178
1179   /* re-clone pre-input nodes */
1180   old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT];
1181   nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT] =
1182     vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT],
1183                      CLIB_CACHE_LINE_BYTES);
1184
1185   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT])
1186   {
1187     vlib_node_t *n = vlib_get_node (vm, rt->node_index);
1188     rt->thread_index = vm_clone->thread_index;
1189     /* copy runtime_data, will be overwritten later for existing rt */
1190     if (n->runtime_data && n->runtime_data_bytes > 0)
1191       clib_memcpy_fast (rt->runtime_data, n->runtime_data,
1192                         clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
1193                                   n->runtime_data_bytes));
1194   }
1195
1196   for (j = 0; j < vec_len (old_rt); j++)
1197     {
1198       rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
1199       rt->state = old_rt[j].state;
1200       clib_memcpy_fast (rt->runtime_data, old_rt[j].runtime_data,
1201                         VLIB_NODE_RUNTIME_DATA_SIZE);
1202     }
1203
1204   vec_free (old_rt);
1205
1206   nm_clone->processes = vec_dup_aligned (nm->processes,
1207                                          CLIB_CACHE_LINE_BYTES);
1208 }
1209
1210 void
1211 vlib_worker_thread_node_runtime_update (void)
1212 {
1213   /*
1214    * Make a note that we need to do a node runtime update
1215    * prior to releasing the barrier.
1216    */
1217   vlib_global_main.need_vlib_worker_thread_node_runtime_update = 1;
1218 }
1219
1220 u32
1221 unformat_sched_policy (unformat_input_t * input, va_list * args)
1222 {
1223   u32 *r = va_arg (*args, u32 *);
1224
1225   if (0);
1226 #define _(v,f,s) else if (unformat (input, s)) *r = SCHED_POLICY_##f;
1227   foreach_sched_policy
1228 #undef _
1229     else
1230     return 0;
1231   return 1;
1232 }
1233
1234 static clib_error_t *
1235 cpu_config (vlib_main_t * vm, unformat_input_t * input)
1236 {
1237   vlib_thread_registration_t *tr;
1238   uword *p;
1239   vlib_thread_main_t *tm = &vlib_thread_main;
1240   u8 *name;
1241   uword *bitmap;
1242   u32 count;
1243
1244   tm->thread_registrations_by_name = hash_create_string (0, sizeof (uword));
1245
1246   tm->n_thread_stacks = 1;      /* account for main thread */
1247   tm->sched_policy = ~0;
1248   tm->sched_priority = ~0;
1249   tm->main_lcore = ~0;
1250
1251   tr = tm->next;
1252
1253   while (tr)
1254     {
1255       hash_set_mem (tm->thread_registrations_by_name, tr->name, (uword) tr);
1256       tr = tr->next;
1257     }
1258
1259   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1260     {
1261       if (unformat (input, "use-pthreads"))
1262         tm->use_pthreads = 1;
1263       else if (unformat (input, "thread-prefix %v", &tm->thread_prefix))
1264         ;
1265       else if (unformat (input, "main-core %u", &tm->main_lcore))
1266         ;
1267       else if (unformat (input, "skip-cores %u", &tm->skip_cores))
1268         ;
1269       else if (unformat (input, "coremask-%s %U", &name,
1270                          unformat_bitmap_mask, &bitmap) ||
1271                unformat (input, "corelist-%s %U", &name,
1272                          unformat_bitmap_list, &bitmap))
1273         {
1274           p = hash_get_mem (tm->thread_registrations_by_name, name);
1275           if (p == 0)
1276             return clib_error_return (0, "no such thread type '%s'", name);
1277
1278           tr = (vlib_thread_registration_t *) p[0];
1279
1280           if (tr->use_pthreads)
1281             return clib_error_return (0,
1282                                       "corelist cannot be set for '%s' threads",
1283                                       name);
1284
1285           tr->coremask = bitmap;
1286           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1287         }
1288       else
1289         if (unformat
1290             (input, "scheduler-policy %U", unformat_sched_policy,
1291              &tm->sched_policy))
1292         ;
1293       else if (unformat (input, "scheduler-priority %u", &tm->sched_priority))
1294         ;
1295       else if (unformat (input, "%s %u", &name, &count))
1296         {
1297           p = hash_get_mem (tm->thread_registrations_by_name, name);
1298           if (p == 0)
1299             return clib_error_return (0, "no such thread type 3 '%s'", name);
1300
1301           tr = (vlib_thread_registration_t *) p[0];
1302           if (tr->fixed_count)
1303             return clib_error_return
1304               (0, "number of %s threads not configurable", tr->name);
1305           tr->count = count;
1306         }
1307       else
1308         break;
1309     }
1310
1311   if (tm->sched_priority != ~0)
1312     {
1313       if (tm->sched_policy == SCHED_FIFO || tm->sched_policy == SCHED_RR)
1314         {
1315           u32 prio_max = sched_get_priority_max (tm->sched_policy);
1316           u32 prio_min = sched_get_priority_min (tm->sched_policy);
1317           if (tm->sched_priority > prio_max)
1318             tm->sched_priority = prio_max;
1319           if (tm->sched_priority < prio_min)
1320             tm->sched_priority = prio_min;
1321         }
1322       else
1323         {
1324           return clib_error_return
1325             (0,
1326              "scheduling priority (%d) is not allowed for `normal` scheduling policy",
1327              tm->sched_priority);
1328         }
1329     }
1330   tr = tm->next;
1331
1332   if (!tm->thread_prefix)
1333     tm->thread_prefix = format (0, "vpp");
1334
1335   while (tr)
1336     {
1337       tm->n_thread_stacks += tr->count;
1338       tm->n_pthreads += tr->count * tr->use_pthreads;
1339       tm->n_threads += tr->count * (tr->use_pthreads == 0);
1340       tr = tr->next;
1341     }
1342
1343   return 0;
1344 }
1345
1346 VLIB_EARLY_CONFIG_FUNCTION (cpu_config, "cpu");
1347
1348 void vnet_main_fixup (vlib_fork_fixup_t which) __attribute__ ((weak));
1349 void
1350 vnet_main_fixup (vlib_fork_fixup_t which)
1351 {
1352 }
1353
1354 void
1355 vlib_worker_thread_fork_fixup (vlib_fork_fixup_t which)
1356 {
1357   vlib_main_t *vm = vlib_get_main ();
1358
1359   if (vlib_mains == 0)
1360     return;
1361
1362   ASSERT (vlib_get_thread_index () == 0);
1363   vlib_worker_thread_barrier_sync (vm);
1364
1365   switch (which)
1366     {
1367     case VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX:
1368       vnet_main_fixup (VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX);
1369       break;
1370
1371     default:
1372       ASSERT (0);
1373     }
1374   vlib_worker_thread_barrier_release (vm);
1375 }
1376
1377   /*
1378    * Enforce minimum open time to minimize packet loss due to Rx overflow,
1379    * based on a test based heuristic that barrier should be open for at least
1380    * 3 time as long as it is closed (with an upper bound of 1ms because by that
1381    *  point it is probably too late to make a difference)
1382    */
1383
1384 #ifndef BARRIER_MINIMUM_OPEN_LIMIT
1385 #define BARRIER_MINIMUM_OPEN_LIMIT 0.001
1386 #endif
1387
1388 #ifndef BARRIER_MINIMUM_OPEN_FACTOR
1389 #define BARRIER_MINIMUM_OPEN_FACTOR 3
1390 #endif
1391
1392 void
1393 vlib_worker_thread_initial_barrier_sync_and_release (vlib_main_t * vm)
1394 {
1395   f64 deadline;
1396   f64 now = vlib_time_now (vm);
1397   u32 count = vec_len (vlib_mains) - 1;
1398
1399   /* No worker threads? */
1400   if (count == 0)
1401     return;
1402
1403   deadline = now + BARRIER_SYNC_TIMEOUT;
1404   *vlib_worker_threads->wait_at_barrier = 1;
1405   while (*vlib_worker_threads->workers_at_barrier != count)
1406     {
1407       if ((now = vlib_time_now (vm)) > deadline)
1408         {
1409           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1410           os_panic ();
1411         }
1412       CLIB_PAUSE ();
1413     }
1414   *vlib_worker_threads->wait_at_barrier = 0;
1415 }
1416
1417 void
1418 vlib_worker_thread_barrier_sync_int (vlib_main_t * vm, const char *func_name)
1419 {
1420   f64 deadline;
1421   f64 now;
1422   f64 t_entry;
1423   f64 t_open;
1424   f64 t_closed;
1425   f64 max_vector_rate;
1426   u32 count;
1427   int i;
1428
1429   if (vec_len (vlib_mains) < 2)
1430     return;
1431
1432   ASSERT (vlib_get_thread_index () == 0);
1433
1434   vlib_worker_threads[0].barrier_caller = func_name;
1435   count = vec_len (vlib_mains) - 1;
1436
1437   /* Record entry relative to last close */
1438   now = vlib_time_now (vm);
1439   t_entry = now - vm->barrier_epoch;
1440
1441   /* Tolerate recursive calls */
1442   if (++vlib_worker_threads[0].recursion_level > 1)
1443     {
1444       barrier_trace_sync_rec (t_entry);
1445       return;
1446     }
1447
1448   /*
1449    * Need data to decide if we're working hard enough to honor
1450    * the barrier hold-down timer.
1451    */
1452   max_vector_rate = 0.0;
1453   for (i = 1; i < vec_len (vlib_mains); i++)
1454     max_vector_rate =
1455       clib_max (max_vector_rate,
1456                 vlib_last_vectors_per_main_loop_as_f64 (vlib_mains[i]));
1457
1458   vlib_worker_threads[0].barrier_sync_count++;
1459
1460   /* Enforce minimum barrier open time to minimize packet loss */
1461   ASSERT (vm->barrier_no_close_before <= (now + BARRIER_MINIMUM_OPEN_LIMIT));
1462
1463   /*
1464    * If any worker thread seems busy, which we define
1465    * as a vector rate above 10, we enforce the barrier hold-down timer
1466    */
1467   if (max_vector_rate > 10.0)
1468     {
1469       while (1)
1470         {
1471           now = vlib_time_now (vm);
1472           /* Barrier hold-down timer expired? */
1473           if (now >= vm->barrier_no_close_before)
1474             break;
1475           if ((vm->barrier_no_close_before - now)
1476               > (2.0 * BARRIER_MINIMUM_OPEN_LIMIT))
1477             {
1478               clib_warning
1479                 ("clock change: would have waited for %.4f seconds",
1480                  (vm->barrier_no_close_before - now));
1481               break;
1482             }
1483         }
1484     }
1485   /* Record time of closure */
1486   t_open = now - vm->barrier_epoch;
1487   vm->barrier_epoch = now;
1488
1489   deadline = now + BARRIER_SYNC_TIMEOUT;
1490
1491   *vlib_worker_threads->wait_at_barrier = 1;
1492   while (*vlib_worker_threads->workers_at_barrier != count)
1493     {
1494       if ((now = vlib_time_now (vm)) > deadline)
1495         {
1496           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1497           os_panic ();
1498         }
1499     }
1500
1501   t_closed = now - vm->barrier_epoch;
1502
1503   barrier_trace_sync (t_entry, t_open, t_closed);
1504
1505 }
1506
1507 void
1508 vlib_worker_thread_barrier_release (vlib_main_t * vm)
1509 {
1510   f64 deadline;
1511   f64 now;
1512   f64 minimum_open;
1513   f64 t_entry;
1514   f64 t_closed_total;
1515   f64 t_update_main = 0.0;
1516   int refork_needed = 0;
1517
1518   if (vec_len (vlib_mains) < 2)
1519     return;
1520
1521   ASSERT (vlib_get_thread_index () == 0);
1522
1523
1524   now = vlib_time_now (vm);
1525   t_entry = now - vm->barrier_epoch;
1526
1527   if (--vlib_worker_threads[0].recursion_level > 0)
1528     {
1529       barrier_trace_release_rec (t_entry);
1530       return;
1531     }
1532
1533   /* Update (all) node runtimes before releasing the barrier, if needed */
1534   if (vm->need_vlib_worker_thread_node_runtime_update)
1535     {
1536       /*
1537        * Lock stat segment here, so we's safe when
1538        * rebuilding the stat segment node clones from the
1539        * stat thread...
1540        */
1541       vlib_stat_segment_lock ();
1542
1543       /* Do stats elements on main thread */
1544       worker_thread_node_runtime_update_internal ();
1545       vm->need_vlib_worker_thread_node_runtime_update = 0;
1546
1547       /* Do per thread rebuilds in parallel */
1548       refork_needed = 1;
1549       clib_atomic_fetch_add (vlib_worker_threads->node_reforks_required,
1550                              (vec_len (vlib_mains) - 1));
1551       now = vlib_time_now (vm);
1552       t_update_main = now - vm->barrier_epoch;
1553     }
1554
1555   deadline = now + BARRIER_SYNC_TIMEOUT;
1556
1557   /*
1558    * Note when we let go of the barrier.
1559    * Workers can use this to derive a reasonably accurate
1560    * time offset. See vlib_time_now(...)
1561    */
1562   vm->time_last_barrier_release = vlib_time_now (vm);
1563   CLIB_MEMORY_STORE_BARRIER ();
1564
1565   *vlib_worker_threads->wait_at_barrier = 0;
1566
1567   while (*vlib_worker_threads->workers_at_barrier > 0)
1568     {
1569       if ((now = vlib_time_now (vm)) > deadline)
1570         {
1571           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1572           os_panic ();
1573         }
1574     }
1575
1576   /* Wait for reforks before continuing */
1577   if (refork_needed)
1578     {
1579       now = vlib_time_now (vm);
1580
1581       deadline = now + BARRIER_SYNC_TIMEOUT;
1582
1583       while (*vlib_worker_threads->node_reforks_required > 0)
1584         {
1585           if ((now = vlib_time_now (vm)) > deadline)
1586             {
1587               fformat (stderr, "%s: worker thread refork deadlock\n",
1588                        __FUNCTION__);
1589               os_panic ();
1590             }
1591         }
1592       vlib_stat_segment_unlock ();
1593     }
1594
1595   t_closed_total = now - vm->barrier_epoch;
1596
1597   minimum_open = t_closed_total * BARRIER_MINIMUM_OPEN_FACTOR;
1598
1599   if (minimum_open > BARRIER_MINIMUM_OPEN_LIMIT)
1600     {
1601       minimum_open = BARRIER_MINIMUM_OPEN_LIMIT;
1602     }
1603
1604   vm->barrier_no_close_before = now + minimum_open;
1605
1606   /* Record barrier epoch (used to enforce minimum open time) */
1607   vm->barrier_epoch = now;
1608
1609   barrier_trace_release (t_entry, t_closed_total, t_update_main);
1610
1611 }
1612
1613 /*
1614  * Check the frame queue to see if any frames are available.
1615  * If so, pull the packets off the frames and put them to
1616  * the handoff node.
1617  */
1618 int
1619 vlib_frame_queue_dequeue (vlib_main_t * vm, vlib_frame_queue_main_t * fqm)
1620 {
1621   u32 thread_id = vm->thread_index;
1622   vlib_frame_queue_t *fq = fqm->vlib_frame_queues[thread_id];
1623   vlib_frame_queue_elt_t *elt;
1624   u32 *from, *to;
1625   vlib_frame_t *f;
1626   int msg_type;
1627   int processed = 0;
1628   u32 n_left_to_node;
1629   u32 vectors = 0;
1630
1631   ASSERT (fq);
1632   ASSERT (vm == vlib_mains[thread_id]);
1633
1634   if (PREDICT_FALSE (fqm->node_index == ~0))
1635     return 0;
1636   /*
1637    * Gather trace data for frame queues
1638    */
1639   if (PREDICT_FALSE (fq->trace))
1640     {
1641       frame_queue_trace_t *fqt;
1642       frame_queue_nelt_counter_t *fqh;
1643       u32 elix;
1644
1645       fqt = &fqm->frame_queue_traces[thread_id];
1646
1647       fqt->nelts = fq->nelts;
1648       fqt->head = fq->head;
1649       fqt->head_hint = fq->head_hint;
1650       fqt->tail = fq->tail;
1651       fqt->threshold = fq->vector_threshold;
1652       fqt->n_in_use = fqt->tail - fqt->head;
1653       if (fqt->n_in_use >= fqt->nelts)
1654         {
1655           // if beyond max then use max
1656           fqt->n_in_use = fqt->nelts - 1;
1657         }
1658
1659       /* Record the number of elements in use in the histogram */
1660       fqh = &fqm->frame_queue_histogram[thread_id];
1661       fqh->count[fqt->n_in_use]++;
1662
1663       /* Record a snapshot of the elements in use */
1664       for (elix = 0; elix < fqt->nelts; elix++)
1665         {
1666           elt = fq->elts + ((fq->head + 1 + elix) & (fq->nelts - 1));
1667           if (1 || elt->valid)
1668             {
1669               fqt->n_vectors[elix] = elt->n_vectors;
1670             }
1671         }
1672       fqt->written = 1;
1673     }
1674
1675   while (1)
1676     {
1677       if (fq->head == fq->tail)
1678         {
1679           fq->head_hint = fq->head;
1680           return processed;
1681         }
1682
1683       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
1684
1685       if (!elt->valid)
1686         {
1687           fq->head_hint = fq->head;
1688           return processed;
1689         }
1690
1691       from = elt->buffer_index;
1692       msg_type = elt->msg_type;
1693
1694       ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME);
1695       ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE);
1696
1697       f = vlib_get_frame_to_node (vm, fqm->node_index);
1698
1699       to = vlib_frame_vector_args (f);
1700
1701       n_left_to_node = elt->n_vectors;
1702
1703       while (n_left_to_node >= 4)
1704         {
1705           to[0] = from[0];
1706           to[1] = from[1];
1707           to[2] = from[2];
1708           to[3] = from[3];
1709           to += 4;
1710           from += 4;
1711           n_left_to_node -= 4;
1712         }
1713
1714       while (n_left_to_node > 0)
1715         {
1716           to[0] = from[0];
1717           to++;
1718           from++;
1719           n_left_to_node--;
1720         }
1721
1722       vectors += elt->n_vectors;
1723       f->n_vectors = elt->n_vectors;
1724       vlib_put_frame_to_node (vm, fqm->node_index, f);
1725
1726       elt->valid = 0;
1727       elt->n_vectors = 0;
1728       elt->msg_type = 0xfefefefe;
1729       CLIB_MEMORY_BARRIER ();
1730       fq->head++;
1731       processed++;
1732
1733       /*
1734        * Limit the number of packets pushed into the graph
1735        */
1736       if (vectors >= fq->vector_threshold)
1737         {
1738           fq->head_hint = fq->head;
1739           return processed;
1740         }
1741     }
1742   ASSERT (0);
1743   return processed;
1744 }
1745
1746 void
1747 vlib_worker_thread_fn (void *arg)
1748 {
1749   vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg;
1750   vlib_thread_main_t *tm = vlib_get_thread_main ();
1751   vlib_main_t *vm = vlib_get_main ();
1752   clib_error_t *e;
1753
1754   ASSERT (vm->thread_index == vlib_get_thread_index ());
1755
1756   vlib_worker_thread_init (w);
1757   clib_time_init (&vm->clib_time);
1758   clib_mem_set_heap (w->thread_mheap);
1759
1760   e = vlib_call_init_exit_functions_no_sort
1761     (vm, &vm->worker_init_function_registrations, 1 /* call_once */ );
1762   if (e)
1763     clib_error_report (e);
1764
1765   /* Wait until the dpdk init sequence is complete */
1766   while (tm->extern_thread_mgmt && tm->worker_thread_release == 0)
1767     vlib_worker_thread_barrier_check ();
1768
1769   vlib_worker_loop (vm);
1770 }
1771
1772 /* *INDENT-OFF* */
1773 VLIB_REGISTER_THREAD (worker_thread_reg, static) = {
1774   .name = "workers",
1775   .short_name = "wk",
1776   .function = vlib_worker_thread_fn,
1777 };
1778 /* *INDENT-ON* */
1779
1780 u32
1781 vlib_frame_queue_main_init (u32 node_index, u32 frame_queue_nelts)
1782 {
1783   vlib_thread_main_t *tm = vlib_get_thread_main ();
1784   vlib_frame_queue_main_t *fqm;
1785   vlib_frame_queue_t *fq;
1786   int i;
1787
1788   if (frame_queue_nelts == 0)
1789     frame_queue_nelts = FRAME_QUEUE_NELTS;
1790
1791   ASSERT (frame_queue_nelts >= 8);
1792
1793   vec_add2 (tm->frame_queue_mains, fqm, 1);
1794
1795   fqm->node_index = node_index;
1796   fqm->frame_queue_nelts = frame_queue_nelts;
1797   fqm->queue_hi_thresh = frame_queue_nelts - 2;
1798
1799   vec_validate (fqm->vlib_frame_queues, tm->n_vlib_mains - 1);
1800   vec_validate (fqm->per_thread_data, tm->n_vlib_mains - 1);
1801   _vec_len (fqm->vlib_frame_queues) = 0;
1802   for (i = 0; i < tm->n_vlib_mains; i++)
1803     {
1804       vlib_frame_queue_per_thread_data_t *ptd;
1805       fq = vlib_frame_queue_alloc (frame_queue_nelts);
1806       vec_add1 (fqm->vlib_frame_queues, fq);
1807
1808       ptd = vec_elt_at_index (fqm->per_thread_data, i);
1809       vec_validate (ptd->handoff_queue_elt_by_thread_index,
1810                     tm->n_vlib_mains - 1);
1811       vec_validate_init_empty (ptd->congested_handoff_queue_by_thread_index,
1812                                tm->n_vlib_mains - 1,
1813                                (vlib_frame_queue_t *) (~0));
1814     }
1815
1816   return (fqm - tm->frame_queue_mains);
1817 }
1818
1819 int
1820 vlib_thread_cb_register (struct vlib_main_t *vm, vlib_thread_callbacks_t * cb)
1821 {
1822   vlib_thread_main_t *tm = vlib_get_thread_main ();
1823
1824   if (tm->extern_thread_mgmt)
1825     return -1;
1826
1827   tm->cb.vlib_launch_thread_cb = cb->vlib_launch_thread_cb;
1828   tm->extern_thread_mgmt = 1;
1829   return 0;
1830 }
1831
1832 void
1833 vlib_process_signal_event_mt_helper (vlib_process_signal_event_mt_args_t *
1834                                      args)
1835 {
1836   ASSERT (vlib_get_thread_index () == 0);
1837   vlib_process_signal_event (vlib_get_main (), args->node_index,
1838                              args->type_opaque, args->data);
1839 }
1840
1841 void *rpc_call_main_thread_cb_fn;
1842
1843 void
1844 vlib_rpc_call_main_thread (void *callback, u8 * args, u32 arg_size)
1845 {
1846   if (rpc_call_main_thread_cb_fn)
1847     {
1848       void (*fp) (void *, u8 *, u32) = rpc_call_main_thread_cb_fn;
1849       (*fp) (callback, args, arg_size);
1850     }
1851   else
1852     clib_warning ("BUG: rpc_call_main_thread_cb_fn NULL!");
1853 }
1854
1855 clib_error_t *
1856 threads_init (vlib_main_t * vm)
1857 {
1858   return 0;
1859 }
1860
1861 VLIB_INIT_FUNCTION (threads_init);
1862
1863
1864 static clib_error_t *
1865 show_clock_command_fn (vlib_main_t * vm,
1866                        unformat_input_t * input, vlib_cli_command_t * cmd)
1867 {
1868   int i;
1869   f64 now;
1870
1871   now = vlib_time_now (vm);
1872
1873   vlib_cli_output (vm, "Time now %.9f", now);
1874
1875   if (vec_len (vlib_mains) == 1)
1876     return 0;
1877
1878   vlib_cli_output (vm, "Time last barrier release %.9f",
1879                    vm->time_last_barrier_release);
1880
1881   for (i = 1; i < vec_len (vlib_mains); i++)
1882     {
1883       if (vlib_mains[i] == 0)
1884         continue;
1885       vlib_cli_output (vm, "Thread %d offset %.9f error %.9f", i,
1886                        vlib_mains[i]->time_offset,
1887                        vm->time_last_barrier_release -
1888                        vlib_mains[i]->time_last_barrier_release);
1889     }
1890   return 0;
1891 }
1892
1893 /* *INDENT-OFF* */
1894 VLIB_CLI_COMMAND (f_command, static) =
1895 {
1896   .path = "show clock",
1897   .short_help = "show clock",
1898   .function = show_clock_command_fn,
1899 };
1900 /* *INDENT-ON* */
1901
1902 /*
1903  * fd.io coding-style-patch-verification: ON
1904  *
1905  * Local Variables:
1906  * eval: (c-set-style "gnu")
1907  * End:
1908  */