vlib: improvement to automatic core pinning
[vpp.git] / src / vlib / threads.c
1 /*
2  * Copyright (c) 2015 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define _GNU_SOURCE
16
17 #include <signal.h>
18 #include <math.h>
19 #ifdef __FreeBSD__
20 #include <pthread_np.h>
21 #endif /* __FreeBSD__ */
22 #include <vppinfra/format.h>
23 #include <vppinfra/time_range.h>
24 #include <vppinfra/interrupt.h>
25 #include <vppinfra/bitmap.h>
26 #include <vppinfra/unix.h>
27 #include <vlib/vlib.h>
28
29 #include <vlib/threads.h>
30
31 #include <vlib/stats/stats.h>
32
33 u32
34 vl (void *p)
35 {
36   return vec_len (p);
37 }
38
39 vlib_worker_thread_t *vlib_worker_threads;
40 vlib_thread_main_t vlib_thread_main;
41
42 /*
43  * Barrier tracing can be enabled on a normal build to collect information
44  * on barrier use, including timings and call stacks.  Deliberately not
45  * keyed off CLIB_DEBUG, because that can add significant overhead which
46  * imapacts observed timings.
47  */
48
49 static inline void
50 barrier_trace_sync (f64 t_entry, f64 t_open, f64 t_closed)
51 {
52   if (!vlib_worker_threads->barrier_elog_enabled)
53     return;
54
55   ELOG_TYPE_DECLARE (e) = {
56     .format = "bar-trace-%s-#%d",
57     .format_args = "T4i4",
58   };
59
60   struct
61   {
62     u32 caller, count, t_entry, t_open, t_closed;
63   } *ed = 0;
64
65   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
66   ed->count = (int) vlib_worker_threads[0].barrier_sync_count;
67   ed->caller = elog_string (&vlib_global_main.elog_main,
68                             (char *) vlib_worker_threads[0].barrier_caller);
69   ed->t_entry = (int) (1000000.0 * t_entry);
70   ed->t_open = (int) (1000000.0 * t_open);
71   ed->t_closed = (int) (1000000.0 * t_closed);
72 }
73
74 static inline void
75 barrier_trace_sync_rec (f64 t_entry)
76 {
77   if (!vlib_worker_threads->barrier_elog_enabled)
78     return;
79
80   ELOG_TYPE_DECLARE (e) = {
81     .format = "bar-syncrec-%s-#%d",
82     .format_args = "T4i4",
83   };
84
85   struct
86   {
87     u32 caller, depth;
88   } *ed = 0;
89
90   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
91   ed->depth = (int) vlib_worker_threads[0].recursion_level - 1;
92   ed->caller = elog_string (&vlib_global_main.elog_main,
93                             (char *) vlib_worker_threads[0].barrier_caller);
94 }
95
96 static inline void
97 barrier_trace_release_rec (f64 t_entry)
98 {
99   if (!vlib_worker_threads->barrier_elog_enabled)
100     return;
101
102   ELOG_TYPE_DECLARE (e) = {
103     .format = "bar-relrrec-#%d",
104     .format_args = "i4",
105   };
106
107   struct
108   {
109     u32 depth;
110   } *ed = 0;
111
112   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
113   ed->depth = (int) vlib_worker_threads[0].recursion_level;
114 }
115
116 static inline void
117 barrier_trace_release (f64 t_entry, f64 t_closed_total, f64 t_update_main)
118 {
119   if (!vlib_worker_threads->barrier_elog_enabled)
120     return;
121
122   ELOG_TYPE_DECLARE (e) = {
123     .format = "bar-rel-#%d-e%d-u%d-t%d",
124     .format_args = "i4i4i4i4",
125   };
126
127   struct
128   {
129     u32 count, t_entry, t_update_main, t_closed_total;
130   } *ed = 0;
131
132   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
133   ed->t_entry = (int) (1000000.0 * t_entry);
134   ed->t_update_main = (int) (1000000.0 * t_update_main);
135   ed->t_closed_total = (int) (1000000.0 * t_closed_total);
136   ed->count = (int) vlib_worker_threads[0].barrier_sync_count;
137
138   /* Reset context for next trace */
139   vlib_worker_threads[0].barrier_context = NULL;
140 }
141
142 uword
143 os_get_nthreads (void)
144 {
145   return vec_len (vlib_thread_stacks);
146 }
147
148 void
149 vlib_set_thread_name (char *name)
150 {
151   int pthread_setname_np (pthread_t __target_thread, const char *__name);
152   int rv;
153   pthread_t thread = pthread_self ();
154
155   if (thread)
156     {
157       rv = pthread_setname_np (thread, name);
158       if (rv)
159         clib_warning ("pthread_setname_np returned %d", rv);
160     }
161 }
162
163 static int
164 sort_registrations_by_no_clone (void *a0, void *a1)
165 {
166   vlib_thread_registration_t **tr0 = a0;
167   vlib_thread_registration_t **tr1 = a1;
168
169   return ((i32) ((*tr0)->no_data_structure_clone)
170           - ((i32) ((*tr1)->no_data_structure_clone)));
171 }
172
173
174 /* Called early in the init sequence */
175
176 clib_error_t *
177 vlib_thread_init (vlib_main_t * vm)
178 {
179   vlib_thread_main_t *tm = &vlib_thread_main;
180   vlib_worker_thread_t *w;
181   vlib_thread_registration_t *tr;
182   u32 n_vlib_mains = 1;
183   u32 first_index = 1;
184   u32 i;
185   uword *avail_cpu;
186   u32 stats_num_worker_threads_dir_index;
187
188   stats_num_worker_threads_dir_index =
189     vlib_stats_add_gauge ("/sys/num_worker_threads");
190   ASSERT (stats_num_worker_threads_dir_index != ~0);
191
192   /* get bitmaps of active cpu cores and sockets */
193   tm->cpu_core_bitmap = os_get_online_cpu_core_bitmap ();
194   tm->cpu_socket_bitmap = os_get_online_cpu_node_bitmap ();
195
196   avail_cpu = clib_bitmap_dup (tm->cpu_core_bitmap);
197
198   /* skip cores */
199   for (i = 0; i < tm->skip_cores; i++)
200     {
201       uword c = clib_bitmap_first_set (avail_cpu);
202       if (c == ~0)
203         return clib_error_return (0, "no available cpus to skip");
204
205       avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
206     }
207
208   /* if main thread affinity is unspecified, set to current running cpu */
209   if (tm->main_lcore == ~0)
210     tm->main_lcore = sched_getcpu ();
211
212   /* grab cpu for main thread */
213   if (tm->main_lcore != ~0)
214     {
215       if (clib_bitmap_get (avail_cpu, tm->main_lcore) == 0)
216         return clib_error_return (0, "cpu %u is not available to be used"
217                                   " for the main thread", tm->main_lcore);
218       avail_cpu = clib_bitmap_set (avail_cpu, tm->main_lcore, 0);
219     }
220
221   /* assume that there is socket 0 only if there is no data from sysfs */
222   if (!tm->cpu_socket_bitmap)
223     tm->cpu_socket_bitmap = clib_bitmap_set (0, 0, 1);
224
225   /* pin main thread to main_lcore  */
226   if (tm->main_lcore != ~0)
227     {
228       cpu_set_t cpuset;
229       CPU_ZERO (&cpuset);
230       CPU_SET (tm->main_lcore, &cpuset);
231       if (pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t),
232                                   &cpuset))
233         {
234           return clib_error_return (0, "could not pin main thread to cpu %u",
235                                     tm->main_lcore);
236         }
237     }
238
239   /* Set up thread 0 */
240   vec_validate_aligned (vlib_worker_threads, 0, CLIB_CACHE_LINE_BYTES);
241   vec_set_len (vlib_worker_threads, 1);
242   w = vlib_worker_threads;
243   w->thread_mheap = clib_mem_get_heap ();
244   w->thread_stack = vlib_thread_stacks[0];
245   w->cpu_id = tm->main_lcore;
246 #ifdef __FreeBSD__
247   w->lwp = pthread_getthreadid_np ();
248 #else
249   w->lwp = syscall (SYS_gettid);
250 #endif /* __FreeBSD__ */
251   w->thread_id = pthread_self ();
252   tm->n_vlib_mains = 1;
253
254   vlib_get_thread_core_numa (w, w->cpu_id);
255
256   if (tm->sched_policy != ~0)
257     {
258       struct sched_param sched_param;
259       if (!sched_getparam (w->lwp, &sched_param))
260         {
261           if (tm->sched_priority != ~0)
262             sched_param.sched_priority = tm->sched_priority;
263           sched_setscheduler (w->lwp, tm->sched_policy, &sched_param);
264         }
265     }
266
267   /* assign threads to cores and set n_vlib_mains */
268   tr = tm->next;
269
270   while (tr)
271     {
272       vec_add1 (tm->registrations, tr);
273       tr = tr->next;
274     }
275
276   vec_sort_with_function (tm->registrations, sort_registrations_by_no_clone);
277
278   for (i = 0; i < vec_len (tm->registrations); i++)
279     {
280       int j;
281       tr = tm->registrations[i];
282       tr->first_index = first_index;
283       first_index += tr->count;
284       n_vlib_mains += (tr->no_data_structure_clone == 0) ? tr->count : 0;
285
286       /* construct coremask */
287       if (tr->use_pthreads || !tr->count)
288         continue;
289
290       if (tr->coremask)
291         {
292           uword c;
293           clib_bitmap_foreach (c, tr->coremask)  {
294             if (clib_bitmap_get(avail_cpu, c) == 0)
295               return clib_error_return (0, "cpu %u is not available to be used"
296                                         " for the '%s' thread",c, tr->name);
297
298             avail_cpu = clib_bitmap_set(avail_cpu, c, 0);
299           }
300         }
301       else
302         {
303           for (j = 0; j < tr->count; j++)
304             {
305               /* Do not use CPU 0 by default - leave it to the host and IRQs */
306               uword avail_c0 = clib_bitmap_get (avail_cpu, 0);
307               avail_cpu = clib_bitmap_set (avail_cpu, 0, 0);
308
309               uword c = clib_bitmap_first_set (avail_cpu);
310               /* Use CPU 0 as a last resort */
311               if (c == ~0 && avail_c0)
312                 {
313                   c = 0;
314                   avail_c0 = 0;
315                 }
316
317               if (c == ~0)
318                 return clib_error_return (0,
319                                           "no available cpus to be used for"
320                                           " the '%s' thread #%u",
321                                           tr->name, tr->count);
322
323               avail_cpu = clib_bitmap_set (avail_cpu, 0, avail_c0);
324               avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
325               tr->coremask = clib_bitmap_set (tr->coremask, c, 1);
326             }
327         }
328     }
329
330   clib_bitmap_free (avail_cpu);
331
332   tm->n_vlib_mains = n_vlib_mains;
333   vlib_stats_set_gauge (stats_num_worker_threads_dir_index, n_vlib_mains - 1);
334
335   /*
336    * Allocate the remaining worker threads, and thread stack vector slots
337    * from now on, calls to os_get_nthreads() will return the correct
338    * answer.
339    */
340   vec_validate_aligned (vlib_worker_threads, first_index - 1,
341                         CLIB_CACHE_LINE_BYTES);
342   vec_validate (vlib_thread_stacks, vec_len (vlib_worker_threads) - 1);
343   return 0;
344 }
345
346 vlib_frame_queue_t *
347 vlib_frame_queue_alloc (int nelts)
348 {
349   vlib_frame_queue_t *fq;
350
351   fq = clib_mem_alloc_aligned (sizeof (*fq), CLIB_CACHE_LINE_BYTES);
352   clib_memset (fq, 0, sizeof (*fq));
353   fq->nelts = nelts;
354   fq->vector_threshold = 2 * VLIB_FRAME_SIZE;
355   vec_validate_aligned (fq->elts, nelts - 1, CLIB_CACHE_LINE_BYTES);
356
357   if (nelts & (nelts - 1))
358     {
359       fformat (stderr, "FATAL: nelts MUST be a power of 2\n");
360       abort ();
361     }
362
363   return (fq);
364 }
365
366 void vl_msg_api_handler_no_free (void *) __attribute__ ((weak));
367 void
368 vl_msg_api_handler_no_free (void *v)
369 {
370 }
371
372 /* To be called by vlib worker threads upon startup */
373 void
374 vlib_worker_thread_init (vlib_worker_thread_t * w)
375 {
376   vlib_thread_main_t *tm = vlib_get_thread_main ();
377   sigset_t signals;
378   int rv;
379
380   /*
381    * Note: disabling signals in worker threads as follows
382    * prevents the api post-mortem dump scheme from working
383    * {
384    *    sigset_t s;
385    *    sigfillset (&s);
386    *    pthread_sigmask (SIG_SETMASK, &s, 0);
387    *  }
388    * We can still disable signals for SIGINT,SIGHUP and SIGTERM as they don't
389    * trigger post-dump handlers anyway.
390    */
391   sigemptyset (&signals);
392   sigaddset (&signals, SIGINT);
393   sigaddset (&signals, SIGHUP);
394   sigaddset (&signals, SIGTERM);
395   rv = pthread_sigmask (SIG_BLOCK, &signals, NULL);
396
397   if (rv)
398     clib_warning ("Failed to set the worker signal mask");
399
400   clib_mem_set_heap (w->thread_mheap);
401
402   if (vec_len (tm->thread_prefix) && w->registration->short_name)
403     {
404       w->name = format (0, "%v_%s_%d%c", tm->thread_prefix,
405                         w->registration->short_name, w->instance_id, '\0');
406       vlib_set_thread_name ((char *) w->name);
407     }
408
409   if (!w->registration->use_pthreads)
410     {
411
412       /* Initial barrier sync, for both worker and i/o threads */
413       clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, 1);
414
415       while (*vlib_worker_threads->wait_at_barrier)
416         ;
417
418       clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, -1);
419     }
420 }
421
422 void *
423 vlib_worker_thread_bootstrap_fn (void *arg)
424 {
425   vlib_worker_thread_t *w = arg;
426
427 #ifdef __FreeBSD__
428   w->lwp = pthread_getthreadid_np ();
429 #else
430   w->lwp = syscall (SYS_gettid);
431 #endif /* __FreeBSD__ */
432   w->thread_id = pthread_self ();
433
434   __os_thread_index = w - vlib_worker_threads;
435
436   if (CLIB_DEBUG > 0)
437     {
438       void *frame_addr = __builtin_frame_address (0);
439       if (frame_addr < (void *) w->thread_stack ||
440           frame_addr > (void *) w->thread_stack + VLIB_THREAD_STACK_SIZE)
441         {
442           /* heap is not set yet */
443           fprintf (stderr, "thread stack is not set properly\n");
444           exit (1);
445         }
446     }
447
448   w->thread_function (arg);
449
450   return 0;
451 }
452
453 void
454 vlib_get_thread_core_numa (vlib_worker_thread_t * w, unsigned cpu_id)
455 {
456   clib_bitmap_t *nbmp = 0, *cbmp = 0;
457   int node, core_id = -1, numa_id = -1;
458
459   core_id = os_get_cpu_phys_core_id (cpu_id);
460   nbmp = os_get_online_cpu_node_bitmap ();
461
462   clib_bitmap_foreach (node, nbmp)  {
463       cbmp = os_get_cpu_on_node_bitmap (node);
464       if (clib_bitmap_get (cbmp, cpu_id))
465         numa_id = node;
466       vec_reset_length (cbmp);
467   }
468
469   vec_free (nbmp);
470   vec_free (cbmp);
471
472   w->core_id = core_id;
473   w->numa_id = numa_id;
474 }
475
476 static clib_error_t *
477 vlib_launch_thread_int (void *fp, vlib_worker_thread_t * w, unsigned cpu_id)
478 {
479   clib_mem_main_t *mm = &clib_mem_main;
480   vlib_thread_main_t *tm = &vlib_thread_main;
481   pthread_t worker;
482   pthread_attr_t attr;
483   cpu_set_t cpuset;
484   void *(*fp_arg) (void *) = fp;
485   void *numa_heap;
486
487   w->cpu_id = cpu_id;
488   vlib_get_thread_core_numa (w, cpu_id);
489
490   /* Set up NUMA-bound heap if indicated */
491   if (mm->per_numa_mheaps[w->numa_id] == 0)
492     {
493       /* If the user requested a NUMA heap, create it... */
494       if (tm->numa_heap_size)
495         {
496           clib_mem_set_numa_affinity (w->numa_id, 1 /* force */ );
497           numa_heap = clib_mem_create_heap (0 /* DIY */ , tm->numa_heap_size,
498                                             1 /* is_locked */ ,
499                                             "numa %u heap", w->numa_id);
500           clib_mem_set_default_numa_affinity ();
501           mm->per_numa_mheaps[w->numa_id] = numa_heap;
502         }
503       else
504         {
505           /* Or, use the main heap */
506           mm->per_numa_mheaps[w->numa_id] = w->thread_mheap;
507         }
508     }
509
510       CPU_ZERO (&cpuset);
511       CPU_SET (cpu_id, &cpuset);
512
513       if (pthread_attr_init (&attr))
514         return clib_error_return_unix (0, "pthread_attr_init");
515
516       if (pthread_attr_setstack (&attr, w->thread_stack,
517                                  VLIB_THREAD_STACK_SIZE))
518         return clib_error_return_unix (0, "pthread_attr_setstack");
519
520       if (pthread_create (&worker, &attr, fp_arg, (void *) w))
521         return clib_error_return_unix (0, "pthread_create");
522
523       if (pthread_setaffinity_np (worker, sizeof (cpu_set_t), &cpuset))
524         return clib_error_return_unix (0, "pthread_setaffinity_np");
525
526       if (pthread_attr_destroy (&attr))
527         return clib_error_return_unix (0, "pthread_attr_destroy");
528
529       return 0;
530 }
531
532 static clib_error_t *
533 start_workers (vlib_main_t * vm)
534 {
535   vlib_global_main_t *vgm = vlib_get_global_main ();
536   vlib_main_t *fvm = vlib_get_first_main ();
537   int i, j;
538   vlib_worker_thread_t *w;
539   vlib_main_t *vm_clone;
540   void *oldheap;
541   vlib_thread_main_t *tm = &vlib_thread_main;
542   vlib_thread_registration_t *tr;
543   vlib_node_runtime_t *rt;
544   u32 n_vlib_mains = tm->n_vlib_mains;
545   u32 worker_thread_index;
546   u32 stats_err_entry_index = fvm->error_main.stats_err_entry_index;
547   clib_mem_heap_t *main_heap = clib_mem_get_per_cpu_heap ();
548   vlib_stats_register_mem_heap (main_heap);
549
550   vec_reset_length (vlib_worker_threads);
551
552   /* Set up the main thread */
553   vec_add2_aligned (vlib_worker_threads, w, 1, CLIB_CACHE_LINE_BYTES);
554   w->elog_track.name = "main thread";
555   elog_track_register (vlib_get_elog_main (), &w->elog_track);
556
557   if (vec_len (tm->thread_prefix))
558     {
559       w->name = format (0, "%v_main%c", tm->thread_prefix, '\0');
560       vlib_set_thread_name ((char *) w->name);
561     }
562
563   vgm->elog_main.lock =
564     clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES, CLIB_CACHE_LINE_BYTES);
565   vgm->elog_main.lock[0] = 0;
566
567   clib_callback_data_init (&vm->vlib_node_runtime_perf_callbacks,
568                            &vm->worker_thread_main_loop_callback_lock);
569
570   vec_validate_aligned (vgm->vlib_mains, n_vlib_mains - 1,
571                         CLIB_CACHE_LINE_BYTES);
572   vec_set_len (vgm->vlib_mains, 0);
573   vec_add1_aligned (vgm->vlib_mains, vm, CLIB_CACHE_LINE_BYTES);
574
575   if (n_vlib_mains > 1)
576     {
577       vlib_worker_threads->wait_at_barrier =
578         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
579       vlib_worker_threads->workers_at_barrier =
580         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
581
582       vlib_worker_threads->node_reforks_required =
583         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
584
585       /* We'll need the rpc vector lock... */
586       clib_spinlock_init (&vm->pending_rpc_lock);
587
588       /* Ask for an initial barrier sync */
589       *vlib_worker_threads->workers_at_barrier = 0;
590       *vlib_worker_threads->wait_at_barrier = 1;
591
592       /* Without update or refork */
593       *vlib_worker_threads->node_reforks_required = 0;
594       vgm->need_vlib_worker_thread_node_runtime_update = 0;
595
596       /* init timing */
597       vm->barrier_epoch = 0;
598       vm->barrier_no_close_before = 0;
599
600       worker_thread_index = 1;
601       clib_spinlock_init (&vm->worker_thread_main_loop_callback_lock);
602
603       for (i = 0; i < vec_len (tm->registrations); i++)
604         {
605           vlib_node_main_t *nm, *nm_clone;
606           int k;
607
608           tr = tm->registrations[i];
609
610           if (tr->count == 0)
611             continue;
612
613           for (k = 0; k < tr->count; k++)
614             {
615               vlib_node_t *n;
616               u64 **c;
617
618               vec_add2 (vlib_worker_threads, w, 1);
619               /* Currently unused, may not really work */
620               if (tr->mheap_size)
621                 w->thread_mheap = clib_mem_create_heap (0, tr->mheap_size,
622                                                         /* unlocked */ 0,
623                                                         "%s%d heap",
624                                                         tr->name, k);
625               else
626                 w->thread_mheap = main_heap;
627
628               w->thread_stack =
629                 vlib_thread_stack_init (w - vlib_worker_threads);
630               w->thread_function = tr->function;
631               w->thread_function_arg = w;
632               w->instance_id = k;
633               w->registration = tr;
634
635               w->elog_track.name =
636                 (char *) format (0, "%s %d", tr->name, k + 1);
637               vec_add1 (w->elog_track.name, 0);
638               elog_track_register (vlib_get_elog_main (), &w->elog_track);
639
640               if (tr->no_data_structure_clone)
641                 continue;
642
643               /* Fork vlib_global_main et al. Look for bugs here */
644               oldheap = clib_mem_set_heap (w->thread_mheap);
645
646               vm_clone = clib_mem_alloc_aligned (sizeof (*vm_clone),
647                                                  CLIB_CACHE_LINE_BYTES);
648               clib_memcpy (vm_clone, vlib_get_first_main (),
649                            sizeof (*vm_clone));
650
651               vm_clone->thread_index = worker_thread_index;
652               vm_clone->pending_rpc_requests = 0;
653               vec_validate (vm_clone->pending_rpc_requests, 0);
654               vec_set_len (vm_clone->pending_rpc_requests, 0);
655               clib_memset (&vm_clone->random_buffer, 0,
656                            sizeof (vm_clone->random_buffer));
657               clib_spinlock_init
658                 (&vm_clone->worker_thread_main_loop_callback_lock);
659               clib_callback_data_init
660                 (&vm_clone->vlib_node_runtime_perf_callbacks,
661                  &vm_clone->worker_thread_main_loop_callback_lock);
662
663               nm = &vlib_get_first_main ()->node_main;
664               nm_clone = &vm_clone->node_main;
665               /* fork next frames array, preserving node runtime indices */
666               nm_clone->next_frames = vec_dup_aligned (nm->next_frames,
667                                                        CLIB_CACHE_LINE_BYTES);
668               for (j = 0; j < vec_len (nm_clone->next_frames); j++)
669                 {
670                   vlib_next_frame_t *nf = &nm_clone->next_frames[j];
671                   u32 save_node_runtime_index;
672                   u32 save_flags;
673
674                   save_node_runtime_index = nf->node_runtime_index;
675                   save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
676                   vlib_next_frame_init (nf);
677                   nf->node_runtime_index = save_node_runtime_index;
678                   nf->flags = save_flags;
679                 }
680
681               /* fork the frame dispatch queue */
682               nm_clone->pending_frames = 0;
683               vec_validate (nm_clone->pending_frames, 10);
684               vec_set_len (nm_clone->pending_frames, 0);
685
686               /* fork nodes */
687               nm_clone->nodes = 0;
688
689               /* Allocate all nodes in single block for speed */
690               n = clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*n));
691
692               for (j = 0; j < vec_len (nm->nodes); j++)
693                 {
694                   clib_memcpy (n, nm->nodes[j], sizeof (*n));
695                   /* none of the copied nodes have enqueue rights given out */
696                   n->owner_node_index = VLIB_INVALID_NODE_INDEX;
697                   clib_memset (&n->stats_total, 0, sizeof (n->stats_total));
698                   clib_memset (&n->stats_last_clear, 0,
699                                sizeof (n->stats_last_clear));
700                   vec_add1 (nm_clone->nodes, n);
701                   n++;
702                 }
703               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
704                 vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
705                                  CLIB_CACHE_LINE_BYTES);
706               vec_foreach (rt,
707                            nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
708               {
709                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
710                 /* copy initial runtime_data from node */
711                 if (n->runtime_data && n->runtime_data_bytes > 0)
712                   clib_memcpy (rt->runtime_data, n->runtime_data,
713                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
714                                          n->runtime_data_bytes));
715               }
716
717               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
718                 vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
719                                  CLIB_CACHE_LINE_BYTES);
720               clib_interrupt_init (
721                 &nm_clone->input_node_interrupts,
722                 vec_len (nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT]));
723               clib_interrupt_init (
724                 &nm_clone->pre_input_node_interrupts,
725                 vec_len (nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT]));
726               vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
727               {
728                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
729                 /* copy initial runtime_data from node */
730                 if (n->runtime_data && n->runtime_data_bytes > 0)
731                   clib_memcpy (rt->runtime_data, n->runtime_data,
732                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
733                                          n->runtime_data_bytes));
734               }
735
736               nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT] =
737                 vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT],
738                                  CLIB_CACHE_LINE_BYTES);
739               vec_foreach (rt,
740                            nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT])
741               {
742                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
743                 /* copy initial runtime_data from node */
744                 if (n->runtime_data && n->runtime_data_bytes > 0)
745                   clib_memcpy (rt->runtime_data, n->runtime_data,
746                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
747                                          n->runtime_data_bytes));
748               }
749
750               nm_clone->processes = vec_dup_aligned (nm->processes,
751                                                      CLIB_CACHE_LINE_BYTES);
752
753               /* Create per-thread frame freelist */
754               nm_clone->frame_sizes = 0;
755               nm_clone->node_by_error = nm->node_by_error;
756
757               /* Packet trace buffers are guaranteed to be empty, nothing to do here */
758
759               clib_mem_set_heap (oldheap);
760               vec_add1_aligned (vgm->vlib_mains, vm_clone,
761                                 CLIB_CACHE_LINE_BYTES);
762
763               /* Switch to the stats segment ... */
764               vlib_stats_validate (stats_err_entry_index, worker_thread_index,
765                                    vec_len (fvm->error_main.counters) - 1);
766               c = vlib_stats_get_entry_data_pointer (stats_err_entry_index);
767               vm_clone->error_main.counters = c[worker_thread_index];
768
769               vm_clone->error_main.counters_last_clear = vec_dup_aligned (
770                 vlib_get_first_main ()->error_main.counters_last_clear,
771                 CLIB_CACHE_LINE_BYTES);
772
773               worker_thread_index++;
774             }
775         }
776     }
777   else
778     {
779       /* only have non-data-structure copy threads to create... */
780       for (i = 0; i < vec_len (tm->registrations); i++)
781         {
782           tr = tm->registrations[i];
783
784           for (j = 0; j < tr->count; j++)
785             {
786               vec_add2 (vlib_worker_threads, w, 1);
787               if (tr->mheap_size)
788                 {
789                   w->thread_mheap = clib_mem_create_heap (0, tr->mheap_size,
790                                                           /* locked */ 0,
791                                                           "%s%d heap",
792                                                           tr->name, j);
793                 }
794               else
795                 w->thread_mheap = main_heap;
796               w->thread_stack =
797                 vlib_thread_stack_init (w - vlib_worker_threads);
798               w->thread_function = tr->function;
799               w->thread_function_arg = w;
800               w->instance_id = j;
801               w->elog_track.name =
802                 (char *) format (0, "%s %d", tr->name, j + 1);
803               w->registration = tr;
804               vec_add1 (w->elog_track.name, 0);
805               elog_track_register (vlib_get_elog_main (), &w->elog_track);
806             }
807         }
808     }
809
810   worker_thread_index = 1;
811
812   for (i = 0; i < vec_len (tm->registrations); i++)
813     {
814       clib_error_t *err;
815       int j;
816
817       tr = tm->registrations[i];
818
819       if (tr->use_pthreads || tm->use_pthreads)
820         {
821           for (j = 0; j < tr->count; j++)
822             {
823
824               w = vlib_worker_threads + worker_thread_index++;
825               err = vlib_launch_thread_int (vlib_worker_thread_bootstrap_fn,
826                                             w, 0);
827               if (err)
828                 clib_unix_error ("%U, thread %s init on cpu %d failed",
829                                  format_clib_error, err, tr->name, 0);
830             }
831         }
832       else
833         {
834           uword c;
835           clib_bitmap_foreach (c, tr->coremask)  {
836             w = vlib_worker_threads + worker_thread_index++;
837             err = vlib_launch_thread_int (vlib_worker_thread_bootstrap_fn,
838                                           w, c);
839             if (err)
840               clib_unix_error ("%U, thread %s init on cpu %d failed",
841                                format_clib_error, err, tr->name, c);
842             }
843         }
844     }
845   vlib_worker_thread_barrier_sync (vm);
846   {
847     clib_error_t *err;
848     err = vlib_call_init_exit_functions (
849       vm, &vgm->num_workers_change_function_registrations, 1 /* call_once */,
850       1 /* is_global */);
851     if (err)
852       clib_error_report (err);
853   }
854   vlib_worker_thread_barrier_release (vm);
855   return 0;
856 }
857
858 VLIB_MAIN_LOOP_ENTER_FUNCTION (start_workers);
859
860
861 static inline void
862 worker_thread_node_runtime_update_internal (void)
863 {
864   int i, j;
865   vlib_main_t *vm;
866   vlib_node_main_t *nm, *nm_clone;
867   vlib_main_t *vm_clone;
868   vlib_node_runtime_t *rt;
869
870   ASSERT (vlib_get_thread_index () == 0);
871
872   vm = vlib_get_first_main ();
873   nm = &vm->node_main;
874
875   ASSERT (*vlib_worker_threads->wait_at_barrier == 1);
876
877   /*
878    * Scrape all runtime stats, so we don't lose node runtime(s) with
879    * pending counts, or throw away worker / io thread counts.
880    */
881   for (j = 0; j < vec_len (nm->nodes); j++)
882     {
883       vlib_node_t *n;
884       n = nm->nodes[j];
885       vlib_node_sync_stats (vm, n);
886     }
887
888   for (i = 1; i < vlib_get_n_threads (); i++)
889     {
890       vlib_node_t *n;
891
892       vm_clone = vlib_get_main_by_index (i);
893       nm_clone = &vm_clone->node_main;
894
895       for (j = 0; j < vec_len (nm_clone->nodes); j++)
896         {
897           n = nm_clone->nodes[j];
898
899           rt = vlib_node_get_runtime (vm_clone, n->index);
900           vlib_node_runtime_sync_stats (vm_clone, rt, 0, 0, 0);
901         }
902     }
903
904   /* Per-worker clone rebuilds are now done on each thread */
905 }
906
907
908 void
909 vlib_worker_thread_node_refork (void)
910 {
911   vlib_main_t *vm, *vm_clone;
912   vlib_node_main_t *nm, *nm_clone;
913   vlib_node_t **old_nodes_clone;
914   vlib_node_runtime_t *rt, *old_rt;
915   u64 **c;
916
917   vlib_node_t *new_n_clone;
918
919   int j;
920
921   vm = vlib_get_first_main ();
922   nm = &vm->node_main;
923   vm_clone = vlib_get_main ();
924   nm_clone = &vm_clone->node_main;
925
926   /* Re-clone error heap */
927   u64 *old_counters_all_clear = vm_clone->error_main.counters_last_clear;
928
929   clib_memcpy_fast (&vm_clone->error_main, &vm->error_main,
930                     sizeof (vm->error_main));
931   j = vec_len (vm->error_main.counters) - 1;
932
933   c = vlib_stats_get_entry_data_pointer (vm->error_main.stats_err_entry_index);
934   vm_clone->error_main.counters = c[vm_clone->thread_index];
935
936   vec_validate_aligned (old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES);
937   vm_clone->error_main.counters_last_clear = old_counters_all_clear;
938
939   for (j = 0; j < vec_len (nm_clone->next_frames); j++)
940     {
941       vlib_next_frame_t *nf = &nm_clone->next_frames[j];
942       if ((nf->flags & VLIB_FRAME_IS_ALLOCATED) && nf->frame != NULL)
943         {
944           vlib_frame_t *f = nf->frame;
945           nf->frame = NULL;
946           vlib_frame_free (vm_clone, f);
947         }
948     }
949
950   vec_free (nm_clone->next_frames);
951   nm_clone->next_frames = vec_dup_aligned (nm->next_frames,
952                                            CLIB_CACHE_LINE_BYTES);
953
954   for (j = 0; j < vec_len (nm_clone->next_frames); j++)
955     {
956       vlib_next_frame_t *nf = &nm_clone->next_frames[j];
957       u32 save_node_runtime_index;
958       u32 save_flags;
959
960       save_node_runtime_index = nf->node_runtime_index;
961       save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
962       vlib_next_frame_init (nf);
963       nf->node_runtime_index = save_node_runtime_index;
964       nf->flags = save_flags;
965     }
966
967   old_nodes_clone = nm_clone->nodes;
968   nm_clone->nodes = 0;
969
970   /* re-fork nodes */
971
972   /* Allocate all nodes in single block for speed */
973   new_n_clone =
974     clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*new_n_clone));
975   for (j = 0; j < vec_len (nm->nodes); j++)
976     {
977       vlib_node_t *new_n = nm->nodes[j];
978
979       clib_memcpy_fast (new_n_clone, new_n, sizeof (*new_n));
980       /* none of the copied nodes have enqueue rights given out */
981       new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX;
982
983       if (j >= vec_len (old_nodes_clone))
984         {
985           /* new node, set to zero */
986           clib_memset (&new_n_clone->stats_total, 0,
987                        sizeof (new_n_clone->stats_total));
988           clib_memset (&new_n_clone->stats_last_clear, 0,
989                        sizeof (new_n_clone->stats_last_clear));
990         }
991       else
992         {
993           vlib_node_t *old_n_clone = old_nodes_clone[j];
994           /* Copy stats if the old data is valid */
995           clib_memcpy_fast (&new_n_clone->stats_total,
996                             &old_n_clone->stats_total,
997                             sizeof (new_n_clone->stats_total));
998           clib_memcpy_fast (&new_n_clone->stats_last_clear,
999                             &old_n_clone->stats_last_clear,
1000                             sizeof (new_n_clone->stats_last_clear));
1001
1002           /* keep previous node state */
1003           new_n_clone->state = old_n_clone->state;
1004           new_n_clone->flags = old_n_clone->flags;
1005         }
1006       vec_add1 (nm_clone->nodes, new_n_clone);
1007       new_n_clone++;
1008     }
1009   /* Free the old node clones */
1010   clib_mem_free (old_nodes_clone[0]);
1011
1012   vec_free (old_nodes_clone);
1013
1014
1015   /* re-clone internal nodes */
1016   old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL];
1017   nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
1018     vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
1019                      CLIB_CACHE_LINE_BYTES);
1020
1021   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
1022   {
1023     vlib_node_t *n = vlib_get_node (vm, rt->node_index);
1024     /* copy runtime_data, will be overwritten later for existing rt */
1025     if (n->runtime_data && n->runtime_data_bytes > 0)
1026       clib_memcpy_fast (rt->runtime_data, n->runtime_data,
1027                         clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
1028                                   n->runtime_data_bytes));
1029   }
1030
1031   for (j = 0; j < vec_len (old_rt); j++)
1032     {
1033       rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
1034       rt->state = old_rt[j].state;
1035       rt->flags = old_rt[j].flags;
1036       clib_memcpy_fast (rt->runtime_data, old_rt[j].runtime_data,
1037                         VLIB_NODE_RUNTIME_DATA_SIZE);
1038     }
1039
1040   vec_free (old_rt);
1041
1042   /* re-clone input nodes */
1043   old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT];
1044   nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
1045     vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
1046                      CLIB_CACHE_LINE_BYTES);
1047   clib_interrupt_resize (
1048     &nm_clone->input_node_interrupts,
1049     vec_len (nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT]));
1050   clib_interrupt_resize (
1051     &nm_clone->pre_input_node_interrupts,
1052     vec_len (nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT]));
1053
1054   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
1055   {
1056     vlib_node_t *n = vlib_get_node (vm, rt->node_index);
1057     /* copy runtime_data, will be overwritten later for existing rt */
1058     if (n->runtime_data && n->runtime_data_bytes > 0)
1059       clib_memcpy_fast (rt->runtime_data, n->runtime_data,
1060                         clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
1061                                   n->runtime_data_bytes));
1062   }
1063
1064   for (j = 0; j < vec_len (old_rt); j++)
1065     {
1066       rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
1067       rt->state = old_rt[j].state;
1068       rt->flags = old_rt[j].flags;
1069       clib_memcpy_fast (rt->runtime_data, old_rt[j].runtime_data,
1070                         VLIB_NODE_RUNTIME_DATA_SIZE);
1071     }
1072
1073   vec_free (old_rt);
1074
1075   /* re-clone pre-input nodes */
1076   old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT];
1077   nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT] =
1078     vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT],
1079                      CLIB_CACHE_LINE_BYTES);
1080
1081   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT])
1082   {
1083     vlib_node_t *n = vlib_get_node (vm, rt->node_index);
1084     /* copy runtime_data, will be overwritten later for existing rt */
1085     if (n->runtime_data && n->runtime_data_bytes > 0)
1086       clib_memcpy_fast (rt->runtime_data, n->runtime_data,
1087                         clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
1088                                   n->runtime_data_bytes));
1089   }
1090
1091   for (j = 0; j < vec_len (old_rt); j++)
1092     {
1093       rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
1094       rt->state = old_rt[j].state;
1095       rt->flags = old_rt[j].flags;
1096       clib_memcpy_fast (rt->runtime_data, old_rt[j].runtime_data,
1097                         VLIB_NODE_RUNTIME_DATA_SIZE);
1098     }
1099
1100   vec_free (old_rt);
1101
1102   vec_free (nm_clone->processes);
1103   nm_clone->processes = vec_dup_aligned (nm->processes,
1104                                          CLIB_CACHE_LINE_BYTES);
1105   nm_clone->node_by_error = nm->node_by_error;
1106 }
1107
1108 void
1109 vlib_worker_thread_node_runtime_update (void)
1110 {
1111   /*
1112    * Make a note that we need to do a node runtime update
1113    * prior to releasing the barrier.
1114    */
1115   vlib_global_main.need_vlib_worker_thread_node_runtime_update = 1;
1116 }
1117
1118 u32
1119 unformat_sched_policy (unformat_input_t * input, va_list * args)
1120 {
1121   u32 *r = va_arg (*args, u32 *);
1122
1123   if (0);
1124 #define _(v,f,s) else if (unformat (input, s)) *r = SCHED_POLICY_##f;
1125   foreach_sched_policy
1126 #undef _
1127     else
1128     return 0;
1129   return 1;
1130 }
1131
1132 static clib_error_t *
1133 cpu_config (vlib_main_t * vm, unformat_input_t * input)
1134 {
1135   vlib_thread_registration_t *tr;
1136   uword *p;
1137   vlib_thread_main_t *tm = &vlib_thread_main;
1138   u8 *name;
1139   uword *bitmap;
1140   u32 count;
1141   int use_corelist = 0;
1142
1143   tm->thread_registrations_by_name = hash_create_string (0, sizeof (uword));
1144
1145   tm->n_thread_stacks = 1;      /* account for main thread */
1146   tm->sched_policy = ~0;
1147   tm->sched_priority = ~0;
1148   tm->main_lcore = ~0;
1149
1150   tr = tm->next;
1151
1152   while (tr)
1153     {
1154       hash_set_mem (tm->thread_registrations_by_name, tr->name, (uword) tr);
1155       tr = tr->next;
1156     }
1157
1158   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1159     {
1160       if (unformat (input, "use-pthreads"))
1161         tm->use_pthreads = 1;
1162       else if (unformat (input, "thread-prefix %v", &tm->thread_prefix))
1163         ;
1164       else if (unformat (input, "main-core %u", &tm->main_lcore))
1165         ;
1166       else if (unformat (input, "skip-cores %u", &tm->skip_cores))
1167         ;
1168       else if (unformat (input, "numa-heap-size %U",
1169                          unformat_memory_size, &tm->numa_heap_size))
1170         ;
1171       else if (unformat (input, "coremask-%s %U", &name,
1172                          unformat_bitmap_mask, &bitmap) ||
1173                unformat (input, "corelist-%s %U", &name,
1174                          unformat_bitmap_list, &bitmap))
1175         {
1176           p = hash_get_mem (tm->thread_registrations_by_name, name);
1177           if (p == 0)
1178             return clib_error_return (0, "no such thread type '%s'", name);
1179
1180           tr = (vlib_thread_registration_t *) p[0];
1181
1182           if (tr->use_pthreads)
1183             return clib_error_return (0,
1184                                       "corelist cannot be set for '%s' threads",
1185                                       name);
1186           if (tr->count)
1187             return clib_error_return
1188               (0, "core placement of '%s' threads is already configured",
1189                name);
1190
1191           tr->coremask = bitmap;
1192           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1193           use_corelist = 1;
1194         }
1195       else
1196         if (unformat
1197             (input, "scheduler-policy %U", unformat_sched_policy,
1198              &tm->sched_policy))
1199         ;
1200       else if (unformat (input, "scheduler-priority %u", &tm->sched_priority))
1201         ;
1202       else if (unformat (input, "%s %u", &name, &count))
1203         {
1204           p = hash_get_mem (tm->thread_registrations_by_name, name);
1205           if (p == 0)
1206             return clib_error_return (0, "no such thread type 3 '%s'", name);
1207
1208           tr = (vlib_thread_registration_t *) p[0];
1209
1210           if (tr->fixed_count)
1211             return clib_error_return
1212               (0, "number of '%s' threads not configurable", name);
1213           if (tr->count)
1214             return clib_error_return
1215               (0, "number of '%s' threads is already configured", name);
1216
1217           tr->count = count;
1218         }
1219       else
1220         break;
1221     }
1222
1223   if (use_corelist && tm->main_lcore == ~0)
1224     return clib_error_return (0, "main-core must be specified when using "
1225                                  "corelist-* or coremask-* attribute");
1226   if (tm->sched_priority != ~0)
1227     {
1228       if (tm->sched_policy == SCHED_FIFO || tm->sched_policy == SCHED_RR)
1229         {
1230           u32 prio_max = sched_get_priority_max (tm->sched_policy);
1231           u32 prio_min = sched_get_priority_min (tm->sched_policy);
1232           if (tm->sched_priority > prio_max)
1233             tm->sched_priority = prio_max;
1234           if (tm->sched_priority < prio_min)
1235             tm->sched_priority = prio_min;
1236         }
1237       else
1238         {
1239           return clib_error_return
1240             (0,
1241              "scheduling priority (%d) is not allowed for `normal` scheduling policy",
1242              tm->sched_priority);
1243         }
1244     }
1245   tr = tm->next;
1246
1247   if (!tm->thread_prefix)
1248     tm->thread_prefix = format (0, "vpp");
1249
1250   while (tr)
1251     {
1252       tm->n_thread_stacks += tr->count;
1253       tm->n_pthreads += tr->count * tr->use_pthreads;
1254       tm->n_threads += tr->count * (tr->use_pthreads == 0);
1255       tr = tr->next;
1256     }
1257
1258   return 0;
1259 }
1260
1261 VLIB_EARLY_CONFIG_FUNCTION (cpu_config, "cpu");
1262
1263   /*
1264    * Enforce minimum open time to minimize packet loss due to Rx overflow,
1265    * based on a test based heuristic that barrier should be open for at least
1266    * 3 time as long as it is closed (with an upper bound of 1ms because by that
1267    *  point it is probably too late to make a difference)
1268    */
1269
1270 #ifndef BARRIER_MINIMUM_OPEN_LIMIT
1271 #define BARRIER_MINIMUM_OPEN_LIMIT 0.001
1272 #endif
1273
1274 #ifndef BARRIER_MINIMUM_OPEN_FACTOR
1275 #define BARRIER_MINIMUM_OPEN_FACTOR 3
1276 #endif
1277
1278 void
1279 vlib_worker_thread_initial_barrier_sync_and_release (vlib_main_t * vm)
1280 {
1281   f64 deadline;
1282   f64 now = vlib_time_now (vm);
1283   u32 count = vlib_get_n_threads () - 1;
1284
1285   /* No worker threads? */
1286   if (count == 0)
1287     return;
1288
1289   deadline = now + BARRIER_SYNC_TIMEOUT;
1290   *vlib_worker_threads->wait_at_barrier = 1;
1291   while (*vlib_worker_threads->workers_at_barrier != count)
1292     {
1293       if ((now = vlib_time_now (vm)) > deadline)
1294         {
1295           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1296           os_panic ();
1297         }
1298       CLIB_PAUSE ();
1299     }
1300   *vlib_worker_threads->wait_at_barrier = 0;
1301 }
1302
1303 /**
1304  * Return true if the wroker thread barrier is held
1305  */
1306 u8
1307 vlib_worker_thread_barrier_held (void)
1308 {
1309   if (vlib_get_n_threads () < 2)
1310     return (1);
1311
1312   return (*vlib_worker_threads->wait_at_barrier == 1);
1313 }
1314
1315 void
1316 vlib_worker_thread_barrier_sync_int (vlib_main_t * vm, const char *func_name)
1317 {
1318   f64 deadline;
1319   f64 now;
1320   f64 t_entry;
1321   f64 t_open;
1322   f64 t_closed;
1323   f64 max_vector_rate;
1324   u32 count;
1325   int i;
1326
1327   if (vlib_get_n_threads () < 2)
1328     return;
1329
1330   ASSERT (vlib_get_thread_index () == 0);
1331
1332   vlib_worker_threads[0].barrier_caller = func_name;
1333   count = vlib_get_n_threads () - 1;
1334
1335   /* Record entry relative to last close */
1336   now = vlib_time_now (vm);
1337   t_entry = now - vm->barrier_epoch;
1338
1339   /* Tolerate recursive calls */
1340   if (++vlib_worker_threads[0].recursion_level > 1)
1341     {
1342       barrier_trace_sync_rec (t_entry);
1343       return;
1344     }
1345
1346   if (PREDICT_FALSE (vec_len (vm->barrier_perf_callbacks) != 0))
1347     clib_call_callbacks (vm->barrier_perf_callbacks, vm,
1348                          vm->clib_time.last_cpu_time, 0 /* enter */ );
1349
1350   /*
1351    * Need data to decide if we're working hard enough to honor
1352    * the barrier hold-down timer.
1353    */
1354   max_vector_rate = 0.0;
1355   for (i = 1; i < vlib_get_n_threads (); i++)
1356     {
1357       vlib_main_t *ovm = vlib_get_main_by_index (i);
1358       max_vector_rate = clib_max (max_vector_rate,
1359                                   (f64) vlib_last_vectors_per_main_loop (ovm));
1360     }
1361
1362   vlib_worker_threads[0].barrier_sync_count++;
1363
1364   /* Enforce minimum barrier open time to minimize packet loss */
1365   ASSERT (vm->barrier_no_close_before <= (now + BARRIER_MINIMUM_OPEN_LIMIT));
1366
1367   /*
1368    * If any worker thread seems busy, which we define
1369    * as a vector rate above 10, we enforce the barrier hold-down timer
1370    */
1371   if (max_vector_rate > 10.0)
1372     {
1373       while (1)
1374         {
1375           now = vlib_time_now (vm);
1376           /* Barrier hold-down timer expired? */
1377           if (now >= vm->barrier_no_close_before)
1378             break;
1379           if ((vm->barrier_no_close_before - now)
1380               > (2.0 * BARRIER_MINIMUM_OPEN_LIMIT))
1381             {
1382               clib_warning
1383                 ("clock change: would have waited for %.4f seconds",
1384                  (vm->barrier_no_close_before - now));
1385               break;
1386             }
1387         }
1388     }
1389   /* Record time of closure */
1390   t_open = now - vm->barrier_epoch;
1391   vm->barrier_epoch = now;
1392
1393   deadline = now + BARRIER_SYNC_TIMEOUT;
1394
1395   *vlib_worker_threads->wait_at_barrier = 1;
1396   while (*vlib_worker_threads->workers_at_barrier != count)
1397     {
1398       if ((now = vlib_time_now (vm)) > deadline)
1399         {
1400           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1401           os_panic ();
1402         }
1403     }
1404
1405   t_closed = now - vm->barrier_epoch;
1406
1407   barrier_trace_sync (t_entry, t_open, t_closed);
1408
1409 }
1410
1411 void
1412 vlib_worker_thread_barrier_release (vlib_main_t * vm)
1413 {
1414   vlib_global_main_t *vgm = vlib_get_global_main ();
1415   f64 deadline;
1416   f64 now;
1417   f64 minimum_open;
1418   f64 t_entry;
1419   f64 t_closed_total;
1420   f64 t_update_main = 0.0;
1421   int refork_needed = 0;
1422
1423   if (vlib_get_n_threads () < 2)
1424     return;
1425
1426   ASSERT (vlib_get_thread_index () == 0);
1427
1428
1429   now = vlib_time_now (vm);
1430   t_entry = now - vm->barrier_epoch;
1431
1432   if (--vlib_worker_threads[0].recursion_level > 0)
1433     {
1434       barrier_trace_release_rec (t_entry);
1435       return;
1436     }
1437
1438   /* Update (all) node runtimes before releasing the barrier, if needed */
1439   if (vgm->need_vlib_worker_thread_node_runtime_update)
1440     {
1441       /*
1442        * Lock stat segment here, so we's safe when
1443        * rebuilding the stat segment node clones from the
1444        * stat thread...
1445        */
1446       vlib_stats_segment_lock ();
1447
1448       /* Do stats elements on main thread */
1449       worker_thread_node_runtime_update_internal ();
1450       vgm->need_vlib_worker_thread_node_runtime_update = 0;
1451
1452       /* Do per thread rebuilds in parallel */
1453       refork_needed = 1;
1454       clib_atomic_fetch_add (vlib_worker_threads->node_reforks_required,
1455                              (vlib_get_n_threads () - 1));
1456       now = vlib_time_now (vm);
1457       t_update_main = now - vm->barrier_epoch;
1458     }
1459
1460   deadline = now + BARRIER_SYNC_TIMEOUT;
1461
1462   /*
1463    * Note when we let go of the barrier.
1464    * Workers can use this to derive a reasonably accurate
1465    * time offset. See vlib_time_now(...)
1466    */
1467   vm->time_last_barrier_release = vlib_time_now (vm);
1468   CLIB_MEMORY_STORE_BARRIER ();
1469
1470   *vlib_worker_threads->wait_at_barrier = 0;
1471
1472   while (*vlib_worker_threads->workers_at_barrier > 0)
1473     {
1474       if ((now = vlib_time_now (vm)) > deadline)
1475         {
1476           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1477           os_panic ();
1478         }
1479     }
1480
1481   /* Wait for reforks before continuing */
1482   if (refork_needed)
1483     {
1484       now = vlib_time_now (vm);
1485
1486       deadline = now + BARRIER_SYNC_TIMEOUT;
1487
1488       while (*vlib_worker_threads->node_reforks_required > 0)
1489         {
1490           if ((now = vlib_time_now (vm)) > deadline)
1491             {
1492               fformat (stderr, "%s: worker thread refork deadlock\n",
1493                        __FUNCTION__);
1494               os_panic ();
1495             }
1496         }
1497       vlib_stats_segment_unlock ();
1498     }
1499
1500   t_closed_total = now - vm->barrier_epoch;
1501
1502   minimum_open = t_closed_total * BARRIER_MINIMUM_OPEN_FACTOR;
1503
1504   if (minimum_open > BARRIER_MINIMUM_OPEN_LIMIT)
1505     {
1506       minimum_open = BARRIER_MINIMUM_OPEN_LIMIT;
1507     }
1508
1509   vm->barrier_no_close_before = now + minimum_open;
1510
1511   /* Record barrier epoch (used to enforce minimum open time) */
1512   vm->barrier_epoch = now;
1513
1514   barrier_trace_release (t_entry, t_closed_total, t_update_main);
1515
1516   if (PREDICT_FALSE (vec_len (vm->barrier_perf_callbacks) != 0))
1517     clib_call_callbacks (vm->barrier_perf_callbacks, vm,
1518                          vm->clib_time.last_cpu_time, 1 /* leave */ );
1519 }
1520
1521 static void
1522 vlib_worker_sync_rpc (void *args)
1523 {
1524   ASSERT (vlib_thread_is_main_w_barrier ());
1525   vlib_worker_threads->wait_before_barrier = 0;
1526 }
1527
1528 void
1529 vlib_workers_sync (void)
1530 {
1531   if (PREDICT_FALSE (!vlib_num_workers ()))
1532     return;
1533
1534   if (!(*vlib_worker_threads->wait_at_barrier) &&
1535       !clib_atomic_swap_rel_n (&vlib_worker_threads->wait_before_barrier, 1))
1536     {
1537       u32 thread_index = vlib_get_thread_index ();
1538       vlib_rpc_call_main_thread (vlib_worker_sync_rpc, (u8 *) &thread_index,
1539                                  sizeof (thread_index));
1540       vlib_worker_flush_pending_rpc_requests (vlib_get_main ());
1541     }
1542
1543   /* Wait until main thread asks for barrier */
1544   while (!(*vlib_worker_threads->wait_at_barrier))
1545     ;
1546
1547   /* Stop before barrier and make sure all threads are either
1548    * at worker barrier or the barrier before it */
1549   clib_atomic_fetch_add (&vlib_worker_threads->workers_before_barrier, 1);
1550   while (vlib_num_workers () > (*vlib_worker_threads->workers_at_barrier +
1551                                 vlib_worker_threads->workers_before_barrier))
1552     ;
1553 }
1554
1555 void
1556 vlib_workers_continue (void)
1557 {
1558   if (PREDICT_FALSE (!vlib_num_workers ()))
1559     return;
1560
1561   clib_atomic_fetch_add (&vlib_worker_threads->done_work_before_barrier, 1);
1562
1563   /* Wait until all workers are done with work before barrier */
1564   while (vlib_worker_threads->done_work_before_barrier <
1565          vlib_worker_threads->workers_before_barrier)
1566     ;
1567
1568   clib_atomic_fetch_add (&vlib_worker_threads->done_work_before_barrier, -1);
1569   clib_atomic_fetch_add (&vlib_worker_threads->workers_before_barrier, -1);
1570 }
1571
1572 /**
1573  * Wait until each of the workers has been once around the track
1574  */
1575 void
1576 vlib_worker_wait_one_loop (void)
1577 {
1578   vlib_global_main_t *vgm = vlib_get_global_main ();
1579   ASSERT (vlib_get_thread_index () == 0);
1580
1581   if (vlib_get_n_threads () < 2)
1582     return;
1583
1584   if (vlib_worker_thread_barrier_held ())
1585     return;
1586
1587   u32 *counts = 0;
1588   u32 ii;
1589
1590   vec_validate (counts, vlib_get_n_threads () - 1);
1591
1592   /* record the current loop counts */
1593   vec_foreach_index (ii, vgm->vlib_mains)
1594     counts[ii] = vgm->vlib_mains[ii]->main_loop_count;
1595
1596   /* spin until each changes, apart from the main thread, or we'd be
1597    * a while */
1598   for (ii = 1; ii < vec_len (counts); ii++)
1599     {
1600       while (counts[ii] == vgm->vlib_mains[ii]->main_loop_count)
1601         CLIB_PAUSE ();
1602     }
1603
1604   vec_free (counts);
1605   return;
1606 }
1607
1608 void
1609 vlib_worker_flush_pending_rpc_requests (vlib_main_t *vm)
1610 {
1611   vlib_main_t *vm_global = vlib_get_first_main ();
1612
1613   ASSERT (vm != vm_global);
1614
1615   clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock);
1616   vec_append (vm_global->pending_rpc_requests, vm->pending_rpc_requests);
1617   vec_reset_length (vm->pending_rpc_requests);
1618   clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);
1619 }
1620
1621 void
1622 vlib_worker_thread_fn (void *arg)
1623 {
1624   vlib_global_main_t *vgm = vlib_get_global_main ();
1625   vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg;
1626   vlib_main_t *vm = vlib_get_main ();
1627   clib_error_t *e;
1628
1629   ASSERT (vm->thread_index == vlib_get_thread_index ());
1630
1631   vlib_worker_thread_init (w);
1632   clib_time_init (&vm->clib_time);
1633   clib_mem_set_heap (w->thread_mheap);
1634
1635   vm->worker_init_functions_called = hash_create (0, 0);
1636
1637   e = vlib_call_init_exit_functions_no_sort (
1638     vm, &vgm->worker_init_function_registrations, 1 /* call_once */,
1639     0 /* is_global */);
1640   if (e)
1641     clib_error_report (e);
1642
1643   vlib_worker_loop (vm);
1644 }
1645
1646 VLIB_REGISTER_THREAD (worker_thread_reg, static) = {
1647   .name = "workers",
1648   .short_name = "wk",
1649   .function = vlib_worker_thread_fn,
1650 };
1651
1652 extern clib_march_fn_registration
1653   *vlib_frame_queue_dequeue_with_aux_fn_march_fn_registrations;
1654 extern clib_march_fn_registration
1655   *vlib_frame_queue_dequeue_fn_march_fn_registrations;
1656 u32
1657 vlib_frame_queue_main_init (u32 node_index, u32 frame_queue_nelts)
1658 {
1659   vlib_thread_main_t *tm = vlib_get_thread_main ();
1660   vlib_main_t *vm = vlib_get_main ();
1661   vlib_frame_queue_main_t *fqm;
1662   vlib_frame_queue_t *fq;
1663   vlib_node_t *node;
1664   int i;
1665   u32 num_threads;
1666
1667   if (frame_queue_nelts == 0)
1668     frame_queue_nelts = FRAME_QUEUE_MAX_NELTS;
1669
1670   num_threads = 1 /* main thread */  + tm->n_threads;
1671   ASSERT (frame_queue_nelts >= 8 + num_threads);
1672
1673   vec_add2 (tm->frame_queue_mains, fqm, 1);
1674
1675   node = vlib_get_node (vm, fqm->node_index);
1676   ASSERT (node);
1677   if (node->aux_offset)
1678     {
1679       fqm->frame_queue_dequeue_fn =
1680         CLIB_MARCH_FN_VOID_POINTER (vlib_frame_queue_dequeue_with_aux_fn);
1681     }
1682   else
1683     {
1684       fqm->frame_queue_dequeue_fn =
1685         CLIB_MARCH_FN_VOID_POINTER (vlib_frame_queue_dequeue_fn);
1686     }
1687
1688   fqm->node_index = node_index;
1689   fqm->frame_queue_nelts = frame_queue_nelts;
1690
1691   vec_validate (fqm->vlib_frame_queues, tm->n_vlib_mains - 1);
1692   vec_set_len (fqm->vlib_frame_queues, 0);
1693   for (i = 0; i < tm->n_vlib_mains; i++)
1694     {
1695       fq = vlib_frame_queue_alloc (frame_queue_nelts);
1696       vec_add1 (fqm->vlib_frame_queues, fq);
1697     }
1698
1699   return (fqm - tm->frame_queue_mains);
1700 }
1701
1702 void
1703 vlib_process_signal_event_mt_helper (vlib_process_signal_event_mt_args_t *
1704                                      args)
1705 {
1706   ASSERT (vlib_get_thread_index () == 0);
1707   vlib_process_signal_event (vlib_get_main (), args->node_index,
1708                              args->type_opaque, args->data);
1709 }
1710
1711 void *rpc_call_main_thread_cb_fn;
1712
1713 void
1714 vlib_rpc_call_main_thread (void *callback, u8 * args, u32 arg_size)
1715 {
1716   if (rpc_call_main_thread_cb_fn)
1717     {
1718       void (*fp) (void *, u8 *, u32) = rpc_call_main_thread_cb_fn;
1719       (*fp) (callback, args, arg_size);
1720     }
1721   else
1722     clib_warning ("BUG: rpc_call_main_thread_cb_fn NULL!");
1723 }
1724
1725 clib_error_t *
1726 threads_init (vlib_main_t * vm)
1727 {
1728   const vlib_thread_main_t *tm = vlib_get_thread_main ();
1729
1730   if (tm->main_lcore == ~0 && tm->n_vlib_mains > 1)
1731     return clib_error_return (0, "Configuration error, a main core must "
1732                                  "be specified when using worker threads");
1733
1734   return 0;
1735 }
1736
1737 VLIB_INIT_FUNCTION (threads_init);
1738
1739 static clib_error_t *
1740 show_clock_command_fn (vlib_main_t * vm,
1741                        unformat_input_t * input, vlib_cli_command_t * cmd)
1742 {
1743   int verbose = 0;
1744   clib_timebase_t _tb, *tb = &_tb;
1745
1746   (void) unformat (input, "verbose %=", &verbose, 1);
1747
1748   clib_timebase_init (tb, 0 /* GMT */ , CLIB_TIMEBASE_DAYLIGHT_NONE,
1749                       &vm->clib_time);
1750
1751   vlib_cli_output (vm, "%U, %U GMT", format_clib_time, &vm->clib_time,
1752                    verbose, format_clib_timebase_time,
1753                    clib_timebase_now (tb));
1754
1755   vlib_cli_output (vm, "Time last barrier release %.9f",
1756                    vm->time_last_barrier_release);
1757
1758   foreach_vlib_main ()
1759     {
1760       vlib_cli_output (vm, "%d: %U", this_vlib_main->thread_index,
1761                        format_clib_time, &this_vlib_main->clib_time, verbose);
1762
1763       vlib_cli_output (vm, "Thread %d offset %.9f error %.9f",
1764                        this_vlib_main->thread_index,
1765                        this_vlib_main->time_offset,
1766                        vm->time_last_barrier_release -
1767                          this_vlib_main->time_last_barrier_release);
1768     }
1769   return 0;
1770 }
1771
1772 VLIB_CLI_COMMAND (f_command, static) =
1773 {
1774   .path = "show clock",
1775   .short_help = "show clock",
1776   .function = show_clock_command_fn,
1777 };
1778
1779 vlib_thread_main_t *
1780 vlib_get_thread_main_not_inline (void)
1781 {
1782   return vlib_get_thread_main ();
1783 }
1784
1785 /*
1786  * fd.io coding-style-patch-verification: ON
1787  *
1788  * Local Variables:
1789  * eval: (c-set-style "gnu")
1790  * End:
1791  */