trace frame-queue on trigger out of memory
[vpp.git] / src / vlib / threads.c
1 /*
2  * Copyright (c) 2015 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define _GNU_SOURCE
16
17 #include <signal.h>
18 #include <math.h>
19 #include <vppinfra/format.h>
20 #include <vppinfra/linux/sysfs.h>
21 #include <vlib/vlib.h>
22
23 #include <vlib/threads.h>
24 #include <vlib/unix/cj.h>
25
26 #include <vlib/stat_weak_inlines.h>
27
28 DECLARE_CJ_GLOBAL_LOG;
29
30
31 u32
32 vl (void *p)
33 {
34   return vec_len (p);
35 }
36
37 vlib_worker_thread_t *vlib_worker_threads;
38 vlib_thread_main_t vlib_thread_main;
39
40 /*
41  * Barrier tracing can be enabled on a normal build to collect information
42  * on barrier use, including timings and call stacks.  Deliberately not
43  * keyed off CLIB_DEBUG, because that can add significant overhead which
44  * imapacts observed timings.
45  */
46
47 u32
48 elog_global_id_for_msg_name (const char *msg_name)
49 {
50   uword *p, r;
51   static uword *h;
52   u8 *name_copy;
53
54   if (!h)
55     h = hash_create_string (0, sizeof (uword));
56
57   p = hash_get_mem (h, msg_name);
58   if (p)
59     return p[0];
60   r = elog_string (&vlib_global_main.elog_main, "%s", msg_name);
61
62   name_copy = format (0, "%s%c", msg_name, 0);
63
64   hash_set_mem (h, name_copy, r);
65
66   return r;
67 }
68
69 static inline void
70 barrier_trace_sync (f64 t_entry, f64 t_open, f64 t_closed)
71 {
72   if (!vlib_worker_threads->barrier_elog_enabled)
73     return;
74
75     /* *INDENT-OFF* */
76     ELOG_TYPE_DECLARE (e) =
77       {
78         .format = "bar-trace-%s-#%d",
79         .format_args = "T4i4",
80       };
81     /* *INDENT-ON* */
82   struct
83   {
84     u32 caller, count, t_entry, t_open, t_closed;
85   } *ed = 0;
86
87   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
88   ed->count = (int) vlib_worker_threads[0].barrier_sync_count;
89   ed->caller = elog_global_id_for_msg_name
90     (vlib_worker_threads[0].barrier_caller);
91   ed->t_entry = (int) (1000000.0 * t_entry);
92   ed->t_open = (int) (1000000.0 * t_open);
93   ed->t_closed = (int) (1000000.0 * t_closed);
94 }
95
96 static inline void
97 barrier_trace_sync_rec (f64 t_entry)
98 {
99   if (!vlib_worker_threads->barrier_elog_enabled)
100     return;
101
102     /* *INDENT-OFF* */
103     ELOG_TYPE_DECLARE (e) =
104       {
105         .format = "bar-syncrec-%s-#%d",
106         .format_args = "T4i4",
107       };
108     /* *INDENT-ON* */
109   struct
110   {
111     u32 caller, depth;
112   } *ed = 0;
113
114   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
115   ed->depth = (int) vlib_worker_threads[0].recursion_level - 1;
116   ed->caller = elog_global_id_for_msg_name
117     (vlib_worker_threads[0].barrier_caller);
118 }
119
120 static inline void
121 barrier_trace_release_rec (f64 t_entry)
122 {
123   if (!vlib_worker_threads->barrier_elog_enabled)
124     return;
125
126     /* *INDENT-OFF* */
127     ELOG_TYPE_DECLARE (e) =
128       {
129         .format = "bar-relrrec-#%d",
130         .format_args = "i4",
131       };
132     /* *INDENT-ON* */
133   struct
134   {
135     u32 depth;
136   } *ed = 0;
137
138   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
139   ed->depth = (int) vlib_worker_threads[0].recursion_level;
140 }
141
142 static inline void
143 barrier_trace_release (f64 t_entry, f64 t_closed_total, f64 t_update_main)
144 {
145   if (!vlib_worker_threads->barrier_elog_enabled)
146     return;
147
148     /* *INDENT-OFF* */
149     ELOG_TYPE_DECLARE (e) =
150       {
151         .format = "bar-rel-#%d-e%d-u%d-t%d",
152         .format_args = "i4i4i4i4",
153       };
154     /* *INDENT-ON* */
155   struct
156   {
157     u32 count, t_entry, t_update_main, t_closed_total;
158   } *ed = 0;
159
160   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
161   ed->t_entry = (int) (1000000.0 * t_entry);
162   ed->t_update_main = (int) (1000000.0 * t_update_main);
163   ed->t_closed_total = (int) (1000000.0 * t_closed_total);
164   ed->count = (int) vlib_worker_threads[0].barrier_sync_count;
165
166   /* Reset context for next trace */
167   vlib_worker_threads[0].barrier_context = NULL;
168 }
169
170 uword
171 os_get_nthreads (void)
172 {
173   return vec_len (vlib_thread_stacks);
174 }
175
176 void
177 vlib_set_thread_name (char *name)
178 {
179   int pthread_setname_np (pthread_t __target_thread, const char *__name);
180   int rv;
181   pthread_t thread = pthread_self ();
182
183   if (thread)
184     {
185       rv = pthread_setname_np (thread, name);
186       if (rv)
187         clib_warning ("pthread_setname_np returned %d", rv);
188     }
189 }
190
191 static int
192 sort_registrations_by_no_clone (void *a0, void *a1)
193 {
194   vlib_thread_registration_t **tr0 = a0;
195   vlib_thread_registration_t **tr1 = a1;
196
197   return ((i32) ((*tr0)->no_data_structure_clone)
198           - ((i32) ((*tr1)->no_data_structure_clone)));
199 }
200
201 static uword *
202 clib_sysfs_list_to_bitmap (char *filename)
203 {
204   FILE *fp;
205   uword *r = 0;
206
207   fp = fopen (filename, "r");
208
209   if (fp != NULL)
210     {
211       u8 *buffer = 0;
212       vec_validate (buffer, 256 - 1);
213       if (fgets ((char *) buffer, 256, fp))
214         {
215           unformat_input_t in;
216           unformat_init_string (&in, (char *) buffer,
217                                 strlen ((char *) buffer));
218           if (unformat (&in, "%U", unformat_bitmap_list, &r) != 1)
219             clib_warning ("unformat_bitmap_list failed");
220           unformat_free (&in);
221         }
222       vec_free (buffer);
223       fclose (fp);
224     }
225   return r;
226 }
227
228
229 /* Called early in the init sequence */
230
231 clib_error_t *
232 vlib_thread_init (vlib_main_t * vm)
233 {
234   vlib_thread_main_t *tm = &vlib_thread_main;
235   vlib_worker_thread_t *w;
236   vlib_thread_registration_t *tr;
237   u32 n_vlib_mains = 1;
238   u32 first_index = 1;
239   u32 i;
240   uword *avail_cpu;
241
242   /* get bitmaps of active cpu cores and sockets */
243   tm->cpu_core_bitmap =
244     clib_sysfs_list_to_bitmap ("/sys/devices/system/cpu/online");
245   tm->cpu_socket_bitmap =
246     clib_sysfs_list_to_bitmap ("/sys/devices/system/node/online");
247
248   avail_cpu = clib_bitmap_dup (tm->cpu_core_bitmap);
249
250   /* skip cores */
251   for (i = 0; i < tm->skip_cores; i++)
252     {
253       uword c = clib_bitmap_first_set (avail_cpu);
254       if (c == ~0)
255         return clib_error_return (0, "no available cpus to skip");
256
257       avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
258     }
259
260   /* grab cpu for main thread */
261   if (tm->main_lcore == ~0)
262     {
263       /* if main-lcore is not set, we try to use lcore 1 */
264       if (clib_bitmap_get (avail_cpu, 1))
265         tm->main_lcore = 1;
266       else
267         tm->main_lcore = clib_bitmap_first_set (avail_cpu);
268       if (tm->main_lcore == (u8) ~ 0)
269         return clib_error_return (0, "no available cpus to be used for the"
270                                   " main thread");
271     }
272   else
273     {
274       if (clib_bitmap_get (avail_cpu, tm->main_lcore) == 0)
275         return clib_error_return (0, "cpu %u is not available to be used"
276                                   " for the main thread", tm->main_lcore);
277     }
278   avail_cpu = clib_bitmap_set (avail_cpu, tm->main_lcore, 0);
279
280   /* assume that there is socket 0 only if there is no data from sysfs */
281   if (!tm->cpu_socket_bitmap)
282     tm->cpu_socket_bitmap = clib_bitmap_set (0, 0, 1);
283
284   /* pin main thread to main_lcore  */
285   if (tm->cb.vlib_thread_set_lcore_cb)
286     {
287       tm->cb.vlib_thread_set_lcore_cb (0, tm->main_lcore);
288     }
289   else
290     {
291       cpu_set_t cpuset;
292       CPU_ZERO (&cpuset);
293       CPU_SET (tm->main_lcore, &cpuset);
294       pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
295     }
296
297   /* Set up thread 0 */
298   vec_validate_aligned (vlib_worker_threads, 0, CLIB_CACHE_LINE_BYTES);
299   _vec_len (vlib_worker_threads) = 1;
300   w = vlib_worker_threads;
301   w->thread_mheap = clib_mem_get_heap ();
302   w->thread_stack = vlib_thread_stacks[0];
303   w->cpu_id = tm->main_lcore;
304   w->lwp = syscall (SYS_gettid);
305   w->thread_id = pthread_self ();
306   tm->n_vlib_mains = 1;
307
308   if (tm->sched_policy != ~0)
309     {
310       struct sched_param sched_param;
311       if (!sched_getparam (w->lwp, &sched_param))
312         {
313           if (tm->sched_priority != ~0)
314             sched_param.sched_priority = tm->sched_priority;
315           sched_setscheduler (w->lwp, tm->sched_policy, &sched_param);
316         }
317     }
318
319   /* assign threads to cores and set n_vlib_mains */
320   tr = tm->next;
321
322   while (tr)
323     {
324       vec_add1 (tm->registrations, tr);
325       tr = tr->next;
326     }
327
328   vec_sort_with_function (tm->registrations, sort_registrations_by_no_clone);
329
330   for (i = 0; i < vec_len (tm->registrations); i++)
331     {
332       int j;
333       tr = tm->registrations[i];
334       tr->first_index = first_index;
335       first_index += tr->count;
336       n_vlib_mains += (tr->no_data_structure_clone == 0) ? tr->count : 0;
337
338       /* construct coremask */
339       if (tr->use_pthreads || !tr->count)
340         continue;
341
342       if (tr->coremask)
343         {
344           uword c;
345           /* *INDENT-OFF* */
346           clib_bitmap_foreach (c, tr->coremask, ({
347             if (clib_bitmap_get(avail_cpu, c) == 0)
348               return clib_error_return (0, "cpu %u is not available to be used"
349                                         " for the '%s' thread",c, tr->name);
350
351             avail_cpu = clib_bitmap_set(avail_cpu, c, 0);
352           }));
353           /* *INDENT-ON* */
354         }
355       else
356         {
357           for (j = 0; j < tr->count; j++)
358             {
359               uword c = clib_bitmap_first_set (avail_cpu);
360               if (c == ~0)
361                 return clib_error_return (0,
362                                           "no available cpus to be used for"
363                                           " the '%s' thread", tr->name);
364
365               avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
366               tr->coremask = clib_bitmap_set (tr->coremask, c, 1);
367             }
368         }
369     }
370
371   clib_bitmap_free (avail_cpu);
372
373   tm->n_vlib_mains = n_vlib_mains;
374
375   /*
376    * Allocate the remaining worker threads, and thread stack vector slots
377    * from now on, calls to os_get_nthreads() will return the correct
378    * answer.
379    */
380   vec_validate_aligned (vlib_worker_threads, first_index - 1,
381                         CLIB_CACHE_LINE_BYTES);
382   vec_validate (vlib_thread_stacks, vec_len (vlib_worker_threads) - 1);
383   return 0;
384 }
385
386 vlib_frame_queue_t *
387 vlib_frame_queue_alloc (int nelts)
388 {
389   vlib_frame_queue_t *fq;
390
391   fq = clib_mem_alloc_aligned (sizeof (*fq), CLIB_CACHE_LINE_BYTES);
392   clib_memset (fq, 0, sizeof (*fq));
393   fq->nelts = nelts;
394   fq->vector_threshold = 128;   // packets
395   vec_validate_aligned (fq->elts, nelts - 1, CLIB_CACHE_LINE_BYTES);
396
397   if (1)
398     {
399       if (((uword) & fq->tail) & (CLIB_CACHE_LINE_BYTES - 1))
400         fformat (stderr, "WARNING: fq->tail unaligned\n");
401       if (((uword) & fq->head) & (CLIB_CACHE_LINE_BYTES - 1))
402         fformat (stderr, "WARNING: fq->head unaligned\n");
403       if (((uword) fq->elts) & (CLIB_CACHE_LINE_BYTES - 1))
404         fformat (stderr, "WARNING: fq->elts unaligned\n");
405
406       if (sizeof (fq->elts[0]) % CLIB_CACHE_LINE_BYTES)
407         fformat (stderr, "WARNING: fq->elts[0] size %d\n",
408                  sizeof (fq->elts[0]));
409       if (nelts & (nelts - 1))
410         {
411           fformat (stderr, "FATAL: nelts MUST be a power of 2\n");
412           abort ();
413         }
414     }
415
416   return (fq);
417 }
418
419 void vl_msg_api_handler_no_free (void *) __attribute__ ((weak));
420 void
421 vl_msg_api_handler_no_free (void *v)
422 {
423 }
424
425 /* Turned off, save as reference material... */
426 #if 0
427 static inline int
428 vlib_frame_queue_dequeue_internal (int thread_id,
429                                    vlib_main_t * vm, vlib_node_main_t * nm)
430 {
431   vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
432   vlib_frame_queue_elt_t *elt;
433   vlib_frame_t *f;
434   vlib_pending_frame_t *p;
435   vlib_node_runtime_t *r;
436   u32 node_runtime_index;
437   int msg_type;
438   u64 before;
439   int processed = 0;
440
441   ASSERT (vm == vlib_mains[thread_id]);
442
443   while (1)
444     {
445       if (fq->head == fq->tail)
446         return processed;
447
448       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
449
450       if (!elt->valid)
451         return processed;
452
453       before = clib_cpu_time_now ();
454
455       f = elt->frame;
456       node_runtime_index = elt->node_runtime_index;
457       msg_type = elt->msg_type;
458
459       switch (msg_type)
460         {
461         case VLIB_FRAME_QUEUE_ELT_FREE_BUFFERS:
462           vlib_buffer_free (vm, vlib_frame_vector_args (f), f->n_vectors);
463           /* note fallthrough... */
464         case VLIB_FRAME_QUEUE_ELT_FREE_FRAME:
465           r = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
466                                 node_runtime_index);
467           vlib_frame_free (vm, r, f);
468           break;
469         case VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME:
470           vec_add2 (vm->node_main.pending_frames, p, 1);
471           f->flags |= (VLIB_FRAME_PENDING | VLIB_FRAME_FREE_AFTER_DISPATCH);
472           p->node_runtime_index = elt->node_runtime_index;
473           p->frame_index = vlib_frame_index (vm, f);
474           p->next_frame_index = VLIB_PENDING_FRAME_NO_NEXT_FRAME;
475           fq->dequeue_vectors += (u64) f->n_vectors;
476           break;
477         case VLIB_FRAME_QUEUE_ELT_API_MSG:
478           vl_msg_api_handler_no_free (f);
479           break;
480         default:
481           clib_warning ("bogus frame queue message, type %d", msg_type);
482           break;
483         }
484       elt->valid = 0;
485       fq->dequeues++;
486       fq->dequeue_ticks += clib_cpu_time_now () - before;
487       CLIB_MEMORY_BARRIER ();
488       fq->head++;
489       processed++;
490     }
491   ASSERT (0);
492   return processed;
493 }
494
495 int
496 vlib_frame_queue_dequeue (int thread_id,
497                           vlib_main_t * vm, vlib_node_main_t * nm)
498 {
499   return vlib_frame_queue_dequeue_internal (thread_id, vm, nm);
500 }
501
502 int
503 vlib_frame_queue_enqueue (vlib_main_t * vm, u32 node_runtime_index,
504                           u32 frame_queue_index, vlib_frame_t * frame,
505                           vlib_frame_queue_msg_type_t type)
506 {
507   vlib_frame_queue_t *fq = vlib_frame_queues[frame_queue_index];
508   vlib_frame_queue_elt_t *elt;
509   u32 save_count;
510   u64 new_tail;
511   u64 before = clib_cpu_time_now ();
512
513   ASSERT (fq);
514
515   new_tail = clib_atomic_add_fetch (&fq->tail, 1);
516
517   /* Wait until a ring slot is available */
518   while (new_tail >= fq->head + fq->nelts)
519     {
520       f64 b4 = vlib_time_now_ticks (vm, before);
521       vlib_worker_thread_barrier_check (vm, b4);
522       /* Bad idea. Dequeue -> enqueue -> dequeue -> trouble */
523       // vlib_frame_queue_dequeue (vm->thread_index, vm, nm);
524     }
525
526   elt = fq->elts + (new_tail & (fq->nelts - 1));
527
528   /* this would be very bad... */
529   while (elt->valid)
530     {
531     }
532
533   /* Once we enqueue the frame, frame->n_vectors is owned elsewhere... */
534   save_count = frame->n_vectors;
535
536   elt->frame = frame;
537   elt->node_runtime_index = node_runtime_index;
538   elt->msg_type = type;
539   CLIB_MEMORY_BARRIER ();
540   elt->valid = 1;
541
542   return save_count;
543 }
544 #endif /* 0 */
545
546 /* To be called by vlib worker threads upon startup */
547 void
548 vlib_worker_thread_init (vlib_worker_thread_t * w)
549 {
550   vlib_thread_main_t *tm = vlib_get_thread_main ();
551
552   /*
553    * Note: disabling signals in worker threads as follows
554    * prevents the api post-mortem dump scheme from working
555    * {
556    *    sigset_t s;
557    *    sigfillset (&s);
558    *    pthread_sigmask (SIG_SETMASK, &s, 0);
559    *  }
560    */
561
562   clib_mem_set_heap (w->thread_mheap);
563
564   if (vec_len (tm->thread_prefix) && w->registration->short_name)
565     {
566       w->name = format (0, "%v_%s_%d%c", tm->thread_prefix,
567                         w->registration->short_name, w->instance_id, '\0');
568       vlib_set_thread_name ((char *) w->name);
569     }
570
571   if (!w->registration->use_pthreads)
572     {
573
574       /* Initial barrier sync, for both worker and i/o threads */
575       clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, 1);
576
577       while (*vlib_worker_threads->wait_at_barrier)
578         ;
579
580       clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, -1);
581     }
582 }
583
584 void *
585 vlib_worker_thread_bootstrap_fn (void *arg)
586 {
587   void *rv;
588   vlib_worker_thread_t *w = arg;
589
590   w->lwp = syscall (SYS_gettid);
591   w->thread_id = pthread_self ();
592
593   __os_thread_index = w - vlib_worker_threads;
594
595   rv = (void *) clib_calljmp
596     ((uword (*)(uword)) w->thread_function,
597      (uword) arg, w->thread_stack + VLIB_THREAD_STACK_SIZE);
598   /* NOTREACHED, we hope */
599   return rv;
600 }
601
602 static void
603 vlib_get_thread_core_socket (vlib_worker_thread_t * w, unsigned cpu_id)
604 {
605   const char *sys_cpu_path = "/sys/devices/system/cpu/cpu";
606   u8 *p = 0;
607   int core_id = -1, socket_id = -1;
608
609   p = format (p, "%s%u/topology/core_id%c", sys_cpu_path, cpu_id, 0);
610   clib_sysfs_read ((char *) p, "%d", &core_id);
611   vec_reset_length (p);
612   p =
613     format (p, "%s%u/topology/physical_package_id%c", sys_cpu_path, cpu_id,
614             0);
615   clib_sysfs_read ((char *) p, "%d", &socket_id);
616   vec_free (p);
617
618   w->core_id = core_id;
619   w->socket_id = socket_id;
620 }
621
622 static clib_error_t *
623 vlib_launch_thread_int (void *fp, vlib_worker_thread_t * w, unsigned cpu_id)
624 {
625   vlib_thread_main_t *tm = &vlib_thread_main;
626   void *(*fp_arg) (void *) = fp;
627
628   w->cpu_id = cpu_id;
629   vlib_get_thread_core_socket (w, cpu_id);
630   if (tm->cb.vlib_launch_thread_cb && !w->registration->use_pthreads)
631     return tm->cb.vlib_launch_thread_cb (fp, (void *) w, cpu_id);
632   else
633     {
634       pthread_t worker;
635       cpu_set_t cpuset;
636       CPU_ZERO (&cpuset);
637       CPU_SET (cpu_id, &cpuset);
638
639       if (pthread_create (&worker, NULL /* attr */ , fp_arg, (void *) w))
640         return clib_error_return_unix (0, "pthread_create");
641
642       if (pthread_setaffinity_np (worker, sizeof (cpu_set_t), &cpuset))
643         return clib_error_return_unix (0, "pthread_setaffinity_np");
644
645       return 0;
646     }
647 }
648
649 static clib_error_t *
650 start_workers (vlib_main_t * vm)
651 {
652   int i, j;
653   vlib_worker_thread_t *w;
654   vlib_main_t *vm_clone;
655   void *oldheap;
656   vlib_thread_main_t *tm = &vlib_thread_main;
657   vlib_thread_registration_t *tr;
658   vlib_node_runtime_t *rt;
659   u32 n_vlib_mains = tm->n_vlib_mains;
660   u32 worker_thread_index;
661   u8 *main_heap = clib_mem_get_per_cpu_heap ();
662
663   vec_reset_length (vlib_worker_threads);
664
665   /* Set up the main thread */
666   vec_add2_aligned (vlib_worker_threads, w, 1, CLIB_CACHE_LINE_BYTES);
667   w->elog_track.name = "main thread";
668   elog_track_register (&vm->elog_main, &w->elog_track);
669
670   if (vec_len (tm->thread_prefix))
671     {
672       w->name = format (0, "%v_main%c", tm->thread_prefix, '\0');
673       vlib_set_thread_name ((char *) w->name);
674     }
675
676   vm->elog_main.lock =
677     clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES, CLIB_CACHE_LINE_BYTES);
678   vm->elog_main.lock[0] = 0;
679
680   if (n_vlib_mains > 1)
681     {
682       /* Replace hand-crafted length-1 vector with a real vector */
683       vlib_mains = 0;
684
685       vec_validate_aligned (vlib_mains, tm->n_vlib_mains - 1,
686                             CLIB_CACHE_LINE_BYTES);
687       _vec_len (vlib_mains) = 0;
688       vec_add1_aligned (vlib_mains, vm, CLIB_CACHE_LINE_BYTES);
689
690       vlib_worker_threads->wait_at_barrier =
691         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
692       vlib_worker_threads->workers_at_barrier =
693         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
694
695       vlib_worker_threads->node_reforks_required =
696         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
697
698       /* We'll need the rpc vector lock... */
699       clib_spinlock_init (&vm->pending_rpc_lock);
700
701       /* Ask for an initial barrier sync */
702       *vlib_worker_threads->workers_at_barrier = 0;
703       *vlib_worker_threads->wait_at_barrier = 1;
704
705       /* Without update or refork */
706       *vlib_worker_threads->node_reforks_required = 0;
707       vm->need_vlib_worker_thread_node_runtime_update = 0;
708
709       /* init timing */
710       vm->barrier_epoch = 0;
711       vm->barrier_no_close_before = 0;
712
713       worker_thread_index = 1;
714
715       for (i = 0; i < vec_len (tm->registrations); i++)
716         {
717           vlib_node_main_t *nm, *nm_clone;
718           int k;
719
720           tr = tm->registrations[i];
721
722           if (tr->count == 0)
723             continue;
724
725           for (k = 0; k < tr->count; k++)
726             {
727               vlib_node_t *n;
728
729               vec_add2 (vlib_worker_threads, w, 1);
730               /* Currently unused, may not really work */
731               if (tr->mheap_size)
732                 {
733 #if USE_DLMALLOC == 0
734                   w->thread_mheap =
735                     mheap_alloc (0 /* use VM */ , tr->mheap_size);
736 #else
737                   w->thread_mheap = create_mspace (tr->mheap_size,
738                                                    0 /* unlocked */ );
739 #endif
740                 }
741               else
742                 w->thread_mheap = main_heap;
743
744               w->thread_stack =
745                 vlib_thread_stack_init (w - vlib_worker_threads);
746               w->thread_function = tr->function;
747               w->thread_function_arg = w;
748               w->instance_id = k;
749               w->registration = tr;
750
751               w->elog_track.name =
752                 (char *) format (0, "%s %d", tr->name, k + 1);
753               vec_add1 (w->elog_track.name, 0);
754               elog_track_register (&vm->elog_main, &w->elog_track);
755
756               if (tr->no_data_structure_clone)
757                 continue;
758
759               /* Fork vlib_global_main et al. Look for bugs here */
760               oldheap = clib_mem_set_heap (w->thread_mheap);
761
762               vm_clone = clib_mem_alloc_aligned (sizeof (*vm_clone),
763                                                  CLIB_CACHE_LINE_BYTES);
764               clib_memcpy (vm_clone, vlib_mains[0], sizeof (*vm_clone));
765
766               vm_clone->thread_index = worker_thread_index;
767               vm_clone->heap_base = w->thread_mheap;
768               vm_clone->heap_aligned_base = (void *)
769                 (((uword) w->thread_mheap) & ~(VLIB_FRAME_ALIGN - 1));
770               vm_clone->init_functions_called =
771                 hash_create (0, /* value bytes */ 0);
772               vm_clone->pending_rpc_requests = 0;
773               vec_validate (vm_clone->pending_rpc_requests, 0);
774               _vec_len (vm_clone->pending_rpc_requests) = 0;
775               clib_memset (&vm_clone->random_buffer, 0,
776                            sizeof (vm_clone->random_buffer));
777
778               nm = &vlib_mains[0]->node_main;
779               nm_clone = &vm_clone->node_main;
780               /* fork next frames array, preserving node runtime indices */
781               nm_clone->next_frames = vec_dup_aligned (nm->next_frames,
782                                                        CLIB_CACHE_LINE_BYTES);
783               for (j = 0; j < vec_len (nm_clone->next_frames); j++)
784                 {
785                   vlib_next_frame_t *nf = &nm_clone->next_frames[j];
786                   u32 save_node_runtime_index;
787                   u32 save_flags;
788
789                   save_node_runtime_index = nf->node_runtime_index;
790                   save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
791                   vlib_next_frame_init (nf);
792                   nf->node_runtime_index = save_node_runtime_index;
793                   nf->flags = save_flags;
794                 }
795
796               /* fork the frame dispatch queue */
797               nm_clone->pending_frames = 0;
798               vec_validate (nm_clone->pending_frames, 10);
799               _vec_len (nm_clone->pending_frames) = 0;
800
801               /* fork nodes */
802               nm_clone->nodes = 0;
803
804               /* Allocate all nodes in single block for speed */
805               n = clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*n));
806
807               for (j = 0; j < vec_len (nm->nodes); j++)
808                 {
809                   clib_memcpy (n, nm->nodes[j], sizeof (*n));
810                   /* none of the copied nodes have enqueue rights given out */
811                   n->owner_node_index = VLIB_INVALID_NODE_INDEX;
812                   clib_memset (&n->stats_total, 0, sizeof (n->stats_total));
813                   clib_memset (&n->stats_last_clear, 0,
814                                sizeof (n->stats_last_clear));
815                   vec_add1 (nm_clone->nodes, n);
816                   n++;
817                 }
818               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
819                 vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
820                                  CLIB_CACHE_LINE_BYTES);
821               vec_foreach (rt,
822                            nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
823               {
824                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
825                 rt->thread_index = vm_clone->thread_index;
826                 /* copy initial runtime_data from node */
827                 if (n->runtime_data && n->runtime_data_bytes > 0)
828                   clib_memcpy (rt->runtime_data, n->runtime_data,
829                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
830                                          n->runtime_data_bytes));
831               }
832
833               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
834                 vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
835                                  CLIB_CACHE_LINE_BYTES);
836               vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
837               {
838                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
839                 rt->thread_index = vm_clone->thread_index;
840                 /* copy initial runtime_data from node */
841                 if (n->runtime_data && n->runtime_data_bytes > 0)
842                   clib_memcpy (rt->runtime_data, n->runtime_data,
843                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
844                                          n->runtime_data_bytes));
845               }
846
847               nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT] =
848                 vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT],
849                                  CLIB_CACHE_LINE_BYTES);
850               vec_foreach (rt,
851                            nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT])
852               {
853                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
854                 rt->thread_index = vm_clone->thread_index;
855                 /* copy initial runtime_data from node */
856                 if (n->runtime_data && n->runtime_data_bytes > 0)
857                   clib_memcpy (rt->runtime_data, n->runtime_data,
858                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
859                                          n->runtime_data_bytes));
860               }
861
862               nm_clone->processes = vec_dup_aligned (nm->processes,
863                                                      CLIB_CACHE_LINE_BYTES);
864
865               /* Create per-thread frame freelist */
866               nm_clone->frame_sizes = vec_new (vlib_frame_size_t, 1);
867 #ifdef VLIB_SUPPORTS_ARBITRARY_SCALAR_SIZES
868               nm_clone->frame_size_hash = hash_create (0, sizeof (uword));
869 #endif
870
871               /* Packet trace buffers are guaranteed to be empty, nothing to do here */
872
873               clib_mem_set_heap (oldheap);
874               vec_add1_aligned (vlib_mains, vm_clone, CLIB_CACHE_LINE_BYTES);
875
876               /* Switch to the stats segment ... */
877               void *oldheap = vlib_stats_push_heap (0);
878               vm_clone->error_main.counters = vec_dup_aligned
879                 (vlib_mains[0]->error_main.counters, CLIB_CACHE_LINE_BYTES);
880               vlib_stats_pop_heap2 (vm_clone->error_main.counters,
881                                     worker_thread_index, oldheap, 1);
882
883               vm_clone->error_main.counters_last_clear = vec_dup_aligned
884                 (vlib_mains[0]->error_main.counters_last_clear,
885                  CLIB_CACHE_LINE_BYTES);
886
887               worker_thread_index++;
888             }
889         }
890     }
891   else
892     {
893       /* only have non-data-structure copy threads to create... */
894       for (i = 0; i < vec_len (tm->registrations); i++)
895         {
896           tr = tm->registrations[i];
897
898           for (j = 0; j < tr->count; j++)
899             {
900               vec_add2 (vlib_worker_threads, w, 1);
901               if (tr->mheap_size)
902                 {
903 #if USE_DLMALLOC == 0
904                   w->thread_mheap =
905                     mheap_alloc (0 /* use VM */ , tr->mheap_size);
906 #else
907                   w->thread_mheap =
908                     create_mspace (tr->mheap_size, 0 /* locked */ );
909 #endif
910                 }
911               else
912                 w->thread_mheap = main_heap;
913               w->thread_stack =
914                 vlib_thread_stack_init (w - vlib_worker_threads);
915               w->thread_function = tr->function;
916               w->thread_function_arg = w;
917               w->instance_id = j;
918               w->elog_track.name =
919                 (char *) format (0, "%s %d", tr->name, j + 1);
920               w->registration = tr;
921               vec_add1 (w->elog_track.name, 0);
922               elog_track_register (&vm->elog_main, &w->elog_track);
923             }
924         }
925     }
926
927   worker_thread_index = 1;
928
929   for (i = 0; i < vec_len (tm->registrations); i++)
930     {
931       clib_error_t *err;
932       int j;
933
934       tr = tm->registrations[i];
935
936       if (tr->use_pthreads || tm->use_pthreads)
937         {
938           for (j = 0; j < tr->count; j++)
939             {
940               w = vlib_worker_threads + worker_thread_index++;
941               err = vlib_launch_thread_int (vlib_worker_thread_bootstrap_fn,
942                                             w, 0);
943               if (err)
944                 clib_error_report (err);
945             }
946         }
947       else
948         {
949           uword c;
950           /* *INDENT-OFF* */
951           clib_bitmap_foreach (c, tr->coremask, ({
952             w = vlib_worker_threads + worker_thread_index++;
953             err = vlib_launch_thread_int (vlib_worker_thread_bootstrap_fn,
954                                           w, c);
955             if (err)
956               clib_error_report (err);
957           }));
958           /* *INDENT-ON* */
959         }
960     }
961   vlib_worker_thread_barrier_sync (vm);
962   vlib_worker_thread_barrier_release (vm);
963   return 0;
964 }
965
966 VLIB_MAIN_LOOP_ENTER_FUNCTION (start_workers);
967
968
969 static inline void
970 worker_thread_node_runtime_update_internal (void)
971 {
972   int i, j;
973   vlib_main_t *vm;
974   vlib_node_main_t *nm, *nm_clone;
975   vlib_main_t *vm_clone;
976   vlib_node_runtime_t *rt;
977   never_inline void
978     vlib_node_runtime_sync_stats (vlib_main_t * vm,
979                                   vlib_node_runtime_t * r,
980                                   uword n_calls,
981                                   uword n_vectors, uword n_clocks);
982
983   ASSERT (vlib_get_thread_index () == 0);
984
985   vm = vlib_mains[0];
986   nm = &vm->node_main;
987
988   ASSERT (*vlib_worker_threads->wait_at_barrier == 1);
989
990   /*
991    * Scrape all runtime stats, so we don't lose node runtime(s) with
992    * pending counts, or throw away worker / io thread counts.
993    */
994   for (j = 0; j < vec_len (nm->nodes); j++)
995     {
996       vlib_node_t *n;
997       n = nm->nodes[j];
998       vlib_node_sync_stats (vm, n);
999     }
1000
1001   for (i = 1; i < vec_len (vlib_mains); i++)
1002     {
1003       vlib_node_t *n;
1004
1005       vm_clone = vlib_mains[i];
1006       nm_clone = &vm_clone->node_main;
1007
1008       for (j = 0; j < vec_len (nm_clone->nodes); j++)
1009         {
1010           n = nm_clone->nodes[j];
1011
1012           rt = vlib_node_get_runtime (vm_clone, n->index);
1013           vlib_node_runtime_sync_stats (vm_clone, rt, 0, 0, 0);
1014         }
1015     }
1016
1017   /* Per-worker clone rebuilds are now done on each thread */
1018 }
1019
1020
1021 void
1022 vlib_worker_thread_node_refork (void)
1023 {
1024   vlib_main_t *vm, *vm_clone;
1025   vlib_node_main_t *nm, *nm_clone;
1026   vlib_node_t **old_nodes_clone;
1027   vlib_node_runtime_t *rt, *old_rt;
1028
1029   vlib_node_t *new_n_clone;
1030
1031   int j;
1032
1033   vm = vlib_mains[0];
1034   nm = &vm->node_main;
1035   vm_clone = vlib_get_main ();
1036   nm_clone = &vm_clone->node_main;
1037
1038   /* Re-clone error heap */
1039   u64 *old_counters = vm_clone->error_main.counters;
1040   u64 *old_counters_all_clear = vm_clone->error_main.counters_last_clear;
1041
1042   clib_memcpy_fast (&vm_clone->error_main, &vm->error_main,
1043                     sizeof (vm->error_main));
1044   j = vec_len (vm->error_main.counters) - 1;
1045
1046   /* Switch to the stats segment ... */
1047   void *oldheap = vlib_stats_push_heap (0);
1048   vec_validate_aligned (old_counters, j, CLIB_CACHE_LINE_BYTES);
1049   vm_clone->error_main.counters = old_counters;
1050   vlib_stats_pop_heap2 (vm_clone->error_main.counters, vm_clone->thread_index,
1051                         oldheap, 0);
1052
1053   vec_validate_aligned (old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES);
1054   vm_clone->error_main.counters_last_clear = old_counters_all_clear;
1055
1056   nm_clone = &vm_clone->node_main;
1057   vec_free (nm_clone->next_frames);
1058   nm_clone->next_frames = vec_dup_aligned (nm->next_frames,
1059                                            CLIB_CACHE_LINE_BYTES);
1060
1061   for (j = 0; j < vec_len (nm_clone->next_frames); j++)
1062     {
1063       vlib_next_frame_t *nf = &nm_clone->next_frames[j];
1064       u32 save_node_runtime_index;
1065       u32 save_flags;
1066
1067       save_node_runtime_index = nf->node_runtime_index;
1068       save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
1069       vlib_next_frame_init (nf);
1070       nf->node_runtime_index = save_node_runtime_index;
1071       nf->flags = save_flags;
1072     }
1073
1074   old_nodes_clone = nm_clone->nodes;
1075   nm_clone->nodes = 0;
1076
1077   /* re-fork nodes */
1078
1079   /* Allocate all nodes in single block for speed */
1080   new_n_clone =
1081     clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*new_n_clone));
1082   for (j = 0; j < vec_len (nm->nodes); j++)
1083     {
1084       vlib_node_t *old_n_clone;
1085       vlib_node_t *new_n;
1086
1087       new_n = nm->nodes[j];
1088       old_n_clone = old_nodes_clone[j];
1089
1090       clib_memcpy_fast (new_n_clone, new_n, sizeof (*new_n));
1091       /* none of the copied nodes have enqueue rights given out */
1092       new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX;
1093
1094       if (j >= vec_len (old_nodes_clone))
1095         {
1096           /* new node, set to zero */
1097           clib_memset (&new_n_clone->stats_total, 0,
1098                        sizeof (new_n_clone->stats_total));
1099           clib_memset (&new_n_clone->stats_last_clear, 0,
1100                        sizeof (new_n_clone->stats_last_clear));
1101         }
1102       else
1103         {
1104           /* Copy stats if the old data is valid */
1105           clib_memcpy_fast (&new_n_clone->stats_total,
1106                             &old_n_clone->stats_total,
1107                             sizeof (new_n_clone->stats_total));
1108           clib_memcpy_fast (&new_n_clone->stats_last_clear,
1109                             &old_n_clone->stats_last_clear,
1110                             sizeof (new_n_clone->stats_last_clear));
1111
1112           /* keep previous node state */
1113           new_n_clone->state = old_n_clone->state;
1114         }
1115       vec_add1 (nm_clone->nodes, new_n_clone);
1116       new_n_clone++;
1117     }
1118   /* Free the old node clones */
1119   clib_mem_free (old_nodes_clone[0]);
1120
1121   vec_free (old_nodes_clone);
1122
1123
1124   /* re-clone internal nodes */
1125   old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL];
1126   nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
1127     vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
1128                      CLIB_CACHE_LINE_BYTES);
1129
1130   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
1131   {
1132     vlib_node_t *n = vlib_get_node (vm, rt->node_index);
1133     rt->thread_index = vm_clone->thread_index;
1134     /* copy runtime_data, will be overwritten later for existing rt */
1135     if (n->runtime_data && n->runtime_data_bytes > 0)
1136       clib_memcpy_fast (rt->runtime_data, n->runtime_data,
1137                         clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
1138                                   n->runtime_data_bytes));
1139   }
1140
1141   for (j = 0; j < vec_len (old_rt); j++)
1142     {
1143       rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
1144       rt->state = old_rt[j].state;
1145       clib_memcpy_fast (rt->runtime_data, old_rt[j].runtime_data,
1146                         VLIB_NODE_RUNTIME_DATA_SIZE);
1147     }
1148
1149   vec_free (old_rt);
1150
1151   /* re-clone input nodes */
1152   old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT];
1153   nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
1154     vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
1155                      CLIB_CACHE_LINE_BYTES);
1156
1157   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
1158   {
1159     vlib_node_t *n = vlib_get_node (vm, rt->node_index);
1160     rt->thread_index = vm_clone->thread_index;
1161     /* copy runtime_data, will be overwritten later for existing rt */
1162     if (n->runtime_data && n->runtime_data_bytes > 0)
1163       clib_memcpy_fast (rt->runtime_data, n->runtime_data,
1164                         clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
1165                                   n->runtime_data_bytes));
1166   }
1167
1168   for (j = 0; j < vec_len (old_rt); j++)
1169     {
1170       rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
1171       rt->state = old_rt[j].state;
1172       clib_memcpy_fast (rt->runtime_data, old_rt[j].runtime_data,
1173                         VLIB_NODE_RUNTIME_DATA_SIZE);
1174     }
1175
1176   vec_free (old_rt);
1177
1178   /* re-clone pre-input nodes */
1179   old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT];
1180   nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT] =
1181     vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT],
1182                      CLIB_CACHE_LINE_BYTES);
1183
1184   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT])
1185   {
1186     vlib_node_t *n = vlib_get_node (vm, rt->node_index);
1187     rt->thread_index = vm_clone->thread_index;
1188     /* copy runtime_data, will be overwritten later for existing rt */
1189     if (n->runtime_data && n->runtime_data_bytes > 0)
1190       clib_memcpy_fast (rt->runtime_data, n->runtime_data,
1191                         clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
1192                                   n->runtime_data_bytes));
1193   }
1194
1195   for (j = 0; j < vec_len (old_rt); j++)
1196     {
1197       rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
1198       rt->state = old_rt[j].state;
1199       clib_memcpy_fast (rt->runtime_data, old_rt[j].runtime_data,
1200                         VLIB_NODE_RUNTIME_DATA_SIZE);
1201     }
1202
1203   vec_free (old_rt);
1204
1205   nm_clone->processes = vec_dup_aligned (nm->processes,
1206                                          CLIB_CACHE_LINE_BYTES);
1207 }
1208
1209 void
1210 vlib_worker_thread_node_runtime_update (void)
1211 {
1212   /*
1213    * Make a note that we need to do a node runtime update
1214    * prior to releasing the barrier.
1215    */
1216   vlib_global_main.need_vlib_worker_thread_node_runtime_update = 1;
1217 }
1218
1219 u32
1220 unformat_sched_policy (unformat_input_t * input, va_list * args)
1221 {
1222   u32 *r = va_arg (*args, u32 *);
1223
1224   if (0);
1225 #define _(v,f,s) else if (unformat (input, s)) *r = SCHED_POLICY_##f;
1226   foreach_sched_policy
1227 #undef _
1228     else
1229     return 0;
1230   return 1;
1231 }
1232
1233 static clib_error_t *
1234 cpu_config (vlib_main_t * vm, unformat_input_t * input)
1235 {
1236   vlib_thread_registration_t *tr;
1237   uword *p;
1238   vlib_thread_main_t *tm = &vlib_thread_main;
1239   u8 *name;
1240   uword *bitmap;
1241   u32 count;
1242
1243   tm->thread_registrations_by_name = hash_create_string (0, sizeof (uword));
1244
1245   tm->n_thread_stacks = 1;      /* account for main thread */
1246   tm->sched_policy = ~0;
1247   tm->sched_priority = ~0;
1248   tm->main_lcore = ~0;
1249
1250   tr = tm->next;
1251
1252   while (tr)
1253     {
1254       hash_set_mem (tm->thread_registrations_by_name, tr->name, (uword) tr);
1255       tr = tr->next;
1256     }
1257
1258   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1259     {
1260       if (unformat (input, "use-pthreads"))
1261         tm->use_pthreads = 1;
1262       else if (unformat (input, "thread-prefix %v", &tm->thread_prefix))
1263         ;
1264       else if (unformat (input, "main-core %u", &tm->main_lcore))
1265         ;
1266       else if (unformat (input, "skip-cores %u", &tm->skip_cores))
1267         ;
1268       else if (unformat (input, "coremask-%s %U", &name,
1269                          unformat_bitmap_mask, &bitmap) ||
1270                unformat (input, "corelist-%s %U", &name,
1271                          unformat_bitmap_list, &bitmap))
1272         {
1273           p = hash_get_mem (tm->thread_registrations_by_name, name);
1274           if (p == 0)
1275             return clib_error_return (0, "no such thread type '%s'", name);
1276
1277           tr = (vlib_thread_registration_t *) p[0];
1278
1279           if (tr->use_pthreads)
1280             return clib_error_return (0,
1281                                       "corelist cannot be set for '%s' threads",
1282                                       name);
1283
1284           tr->coremask = bitmap;
1285           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1286         }
1287       else
1288         if (unformat
1289             (input, "scheduler-policy %U", unformat_sched_policy,
1290              &tm->sched_policy))
1291         ;
1292       else if (unformat (input, "scheduler-priority %u", &tm->sched_priority))
1293         ;
1294       else if (unformat (input, "%s %u", &name, &count))
1295         {
1296           p = hash_get_mem (tm->thread_registrations_by_name, name);
1297           if (p == 0)
1298             return clib_error_return (0, "no such thread type 3 '%s'", name);
1299
1300           tr = (vlib_thread_registration_t *) p[0];
1301           if (tr->fixed_count)
1302             return clib_error_return
1303               (0, "number of %s threads not configurable", tr->name);
1304           tr->count = count;
1305         }
1306       else
1307         break;
1308     }
1309
1310   if (tm->sched_priority != ~0)
1311     {
1312       if (tm->sched_policy == SCHED_FIFO || tm->sched_policy == SCHED_RR)
1313         {
1314           u32 prio_max = sched_get_priority_max (tm->sched_policy);
1315           u32 prio_min = sched_get_priority_min (tm->sched_policy);
1316           if (tm->sched_priority > prio_max)
1317             tm->sched_priority = prio_max;
1318           if (tm->sched_priority < prio_min)
1319             tm->sched_priority = prio_min;
1320         }
1321       else
1322         {
1323           return clib_error_return
1324             (0,
1325              "scheduling priority (%d) is not allowed for `normal` scheduling policy",
1326              tm->sched_priority);
1327         }
1328     }
1329   tr = tm->next;
1330
1331   if (!tm->thread_prefix)
1332     tm->thread_prefix = format (0, "vpp");
1333
1334   while (tr)
1335     {
1336       tm->n_thread_stacks += tr->count;
1337       tm->n_pthreads += tr->count * tr->use_pthreads;
1338       tm->n_threads += tr->count * (tr->use_pthreads == 0);
1339       tr = tr->next;
1340     }
1341
1342   return 0;
1343 }
1344
1345 VLIB_EARLY_CONFIG_FUNCTION (cpu_config, "cpu");
1346
1347 void vnet_main_fixup (vlib_fork_fixup_t which) __attribute__ ((weak));
1348 void
1349 vnet_main_fixup (vlib_fork_fixup_t which)
1350 {
1351 }
1352
1353 void
1354 vlib_worker_thread_fork_fixup (vlib_fork_fixup_t which)
1355 {
1356   vlib_main_t *vm = vlib_get_main ();
1357
1358   if (vlib_mains == 0)
1359     return;
1360
1361   ASSERT (vlib_get_thread_index () == 0);
1362   vlib_worker_thread_barrier_sync (vm);
1363
1364   switch (which)
1365     {
1366     case VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX:
1367       vnet_main_fixup (VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX);
1368       break;
1369
1370     default:
1371       ASSERT (0);
1372     }
1373   vlib_worker_thread_barrier_release (vm);
1374 }
1375
1376   /*
1377    * Enforce minimum open time to minimize packet loss due to Rx overflow,
1378    * based on a test based heuristic that barrier should be open for at least
1379    * 3 time as long as it is closed (with an upper bound of 1ms because by that
1380    *  point it is probably too late to make a difference)
1381    */
1382
1383 #ifndef BARRIER_MINIMUM_OPEN_LIMIT
1384 #define BARRIER_MINIMUM_OPEN_LIMIT 0.001
1385 #endif
1386
1387 #ifndef BARRIER_MINIMUM_OPEN_FACTOR
1388 #define BARRIER_MINIMUM_OPEN_FACTOR 3
1389 #endif
1390
1391 void
1392 vlib_worker_thread_initial_barrier_sync_and_release (vlib_main_t * vm)
1393 {
1394   f64 deadline;
1395   f64 now = vlib_time_now (vm);
1396   u32 count = vec_len (vlib_mains) - 1;
1397
1398   /* No worker threads? */
1399   if (count == 0)
1400     return;
1401
1402   deadline = now + BARRIER_SYNC_TIMEOUT;
1403   *vlib_worker_threads->wait_at_barrier = 1;
1404   while (*vlib_worker_threads->workers_at_barrier != count)
1405     {
1406       if ((now = vlib_time_now (vm)) > deadline)
1407         {
1408           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1409           os_panic ();
1410         }
1411       CLIB_PAUSE ();
1412     }
1413   *vlib_worker_threads->wait_at_barrier = 0;
1414 }
1415
1416 void
1417 vlib_worker_thread_barrier_sync_int (vlib_main_t * vm, const char *func_name)
1418 {
1419   f64 deadline;
1420   f64 now;
1421   f64 t_entry;
1422   f64 t_open;
1423   f64 t_closed;
1424   f64 max_vector_rate;
1425   u32 count;
1426   int i;
1427
1428   if (vec_len (vlib_mains) < 2)
1429     return;
1430
1431   ASSERT (vlib_get_thread_index () == 0);
1432
1433   vlib_worker_threads[0].barrier_caller = func_name;
1434   count = vec_len (vlib_mains) - 1;
1435
1436   /* Record entry relative to last close */
1437   now = vlib_time_now (vm);
1438   t_entry = now - vm->barrier_epoch;
1439
1440   /* Tolerate recursive calls */
1441   if (++vlib_worker_threads[0].recursion_level > 1)
1442     {
1443       barrier_trace_sync_rec (t_entry);
1444       return;
1445     }
1446
1447   /*
1448    * Need data to decide if we're working hard enough to honor
1449    * the barrier hold-down timer.
1450    */
1451   max_vector_rate = 0.0;
1452   for (i = 1; i < vec_len (vlib_mains); i++)
1453     max_vector_rate =
1454       clib_max (max_vector_rate,
1455                 vlib_last_vectors_per_main_loop_as_f64 (vlib_mains[i]));
1456
1457   vlib_worker_threads[0].barrier_sync_count++;
1458
1459   /* Enforce minimum barrier open time to minimize packet loss */
1460   ASSERT (vm->barrier_no_close_before <= (now + BARRIER_MINIMUM_OPEN_LIMIT));
1461
1462   /*
1463    * If any worker thread seems busy, which we define
1464    * as a vector rate above 10, we enforce the barrier hold-down timer
1465    */
1466   if (max_vector_rate > 10.0)
1467     {
1468       while (1)
1469         {
1470           now = vlib_time_now (vm);
1471           /* Barrier hold-down timer expired? */
1472           if (now >= vm->barrier_no_close_before)
1473             break;
1474           if ((vm->barrier_no_close_before - now)
1475               > (2.0 * BARRIER_MINIMUM_OPEN_LIMIT))
1476             {
1477               clib_warning
1478                 ("clock change: would have waited for %.4f seconds",
1479                  (vm->barrier_no_close_before - now));
1480               break;
1481             }
1482         }
1483     }
1484   /* Record time of closure */
1485   t_open = now - vm->barrier_epoch;
1486   vm->barrier_epoch = now;
1487
1488   deadline = now + BARRIER_SYNC_TIMEOUT;
1489
1490   *vlib_worker_threads->wait_at_barrier = 1;
1491   while (*vlib_worker_threads->workers_at_barrier != count)
1492     {
1493       if ((now = vlib_time_now (vm)) > deadline)
1494         {
1495           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1496           os_panic ();
1497         }
1498     }
1499
1500   t_closed = now - vm->barrier_epoch;
1501
1502   barrier_trace_sync (t_entry, t_open, t_closed);
1503
1504 }
1505
1506 void
1507 vlib_worker_thread_barrier_release (vlib_main_t * vm)
1508 {
1509   f64 deadline;
1510   f64 now;
1511   f64 minimum_open;
1512   f64 t_entry;
1513   f64 t_closed_total;
1514   f64 t_update_main = 0.0;
1515   int refork_needed = 0;
1516
1517   if (vec_len (vlib_mains) < 2)
1518     return;
1519
1520   ASSERT (vlib_get_thread_index () == 0);
1521
1522
1523   now = vlib_time_now (vm);
1524   t_entry = now - vm->barrier_epoch;
1525
1526   if (--vlib_worker_threads[0].recursion_level > 0)
1527     {
1528       barrier_trace_release_rec (t_entry);
1529       return;
1530     }
1531
1532   /* Update (all) node runtimes before releasing the barrier, if needed */
1533   if (vm->need_vlib_worker_thread_node_runtime_update)
1534     {
1535       /*
1536        * Lock stat segment here, so we's safe when
1537        * rebuilding the stat segment node clones from the
1538        * stat thread...
1539        */
1540       vlib_stat_segment_lock ();
1541
1542       /* Do stats elements on main thread */
1543       worker_thread_node_runtime_update_internal ();
1544       vm->need_vlib_worker_thread_node_runtime_update = 0;
1545
1546       /* Do per thread rebuilds in parallel */
1547       refork_needed = 1;
1548       clib_atomic_fetch_add (vlib_worker_threads->node_reforks_required,
1549                              (vec_len (vlib_mains) - 1));
1550       now = vlib_time_now (vm);
1551       t_update_main = now - vm->barrier_epoch;
1552     }
1553
1554   deadline = now + BARRIER_SYNC_TIMEOUT;
1555
1556   /*
1557    * Note when we let go of the barrier.
1558    * Workers can use this to derive a reasonably accurate
1559    * time offset. See vlib_time_now(...)
1560    */
1561   vm->time_last_barrier_release = vlib_time_now (vm);
1562   CLIB_MEMORY_STORE_BARRIER ();
1563
1564   *vlib_worker_threads->wait_at_barrier = 0;
1565
1566   while (*vlib_worker_threads->workers_at_barrier > 0)
1567     {
1568       if ((now = vlib_time_now (vm)) > deadline)
1569         {
1570           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1571           os_panic ();
1572         }
1573     }
1574
1575   /* Wait for reforks before continuing */
1576   if (refork_needed)
1577     {
1578       now = vlib_time_now (vm);
1579
1580       deadline = now + BARRIER_SYNC_TIMEOUT;
1581
1582       while (*vlib_worker_threads->node_reforks_required > 0)
1583         {
1584           if ((now = vlib_time_now (vm)) > deadline)
1585             {
1586               fformat (stderr, "%s: worker thread refork deadlock\n",
1587                        __FUNCTION__);
1588               os_panic ();
1589             }
1590         }
1591       vlib_stat_segment_unlock ();
1592     }
1593
1594   t_closed_total = now - vm->barrier_epoch;
1595
1596   minimum_open = t_closed_total * BARRIER_MINIMUM_OPEN_FACTOR;
1597
1598   if (minimum_open > BARRIER_MINIMUM_OPEN_LIMIT)
1599     {
1600       minimum_open = BARRIER_MINIMUM_OPEN_LIMIT;
1601     }
1602
1603   vm->barrier_no_close_before = now + minimum_open;
1604
1605   /* Record barrier epoch (used to enforce minimum open time) */
1606   vm->barrier_epoch = now;
1607
1608   barrier_trace_release (t_entry, t_closed_total, t_update_main);
1609
1610 }
1611
1612 /*
1613  * Check the frame queue to see if any frames are available.
1614  * If so, pull the packets off the frames and put them to
1615  * the handoff node.
1616  */
1617 int
1618 vlib_frame_queue_dequeue (vlib_main_t * vm, vlib_frame_queue_main_t * fqm)
1619 {
1620   u32 thread_id = vm->thread_index;
1621   vlib_frame_queue_t *fq = fqm->vlib_frame_queues[thread_id];
1622   vlib_frame_queue_elt_t *elt;
1623   u32 *from, *to;
1624   vlib_frame_t *f;
1625   int msg_type;
1626   int processed = 0;
1627   u32 n_left_to_node;
1628   u32 vectors = 0;
1629
1630   ASSERT (fq);
1631   ASSERT (vm == vlib_mains[thread_id]);
1632
1633   if (PREDICT_FALSE (fqm->node_index == ~0))
1634     return 0;
1635   /*
1636    * Gather trace data for frame queues
1637    */
1638   if (PREDICT_FALSE (fq->trace))
1639     {
1640       frame_queue_trace_t *fqt;
1641       frame_queue_nelt_counter_t *fqh;
1642       u32 elix;
1643
1644       fqt = &fqm->frame_queue_traces[thread_id];
1645
1646       fqt->nelts = fq->nelts;
1647       fqt->head = fq->head;
1648       fqt->head_hint = fq->head_hint;
1649       fqt->tail = fq->tail;
1650       fqt->threshold = fq->vector_threshold;
1651       fqt->n_in_use = fqt->tail - fqt->head;
1652       if (fqt->n_in_use >= fqt->nelts)
1653         {
1654           // if beyond max then use max
1655           fqt->n_in_use = fqt->nelts - 1;
1656         }
1657
1658       /* Record the number of elements in use in the histogram */
1659       fqh = &fqm->frame_queue_histogram[thread_id];
1660       fqh->count[fqt->n_in_use]++;
1661
1662       /* Record a snapshot of the elements in use */
1663       for (elix = 0; elix < fqt->nelts; elix++)
1664         {
1665           elt = fq->elts + ((fq->head + 1 + elix) & (fq->nelts - 1));
1666           if (1 || elt->valid)
1667             {
1668               fqt->n_vectors[elix] = elt->n_vectors;
1669             }
1670         }
1671       fqt->written = 1;
1672     }
1673
1674   while (1)
1675     {
1676       if (fq->head == fq->tail)
1677         {
1678           fq->head_hint = fq->head;
1679           return processed;
1680         }
1681
1682       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
1683
1684       if (!elt->valid)
1685         {
1686           fq->head_hint = fq->head;
1687           return processed;
1688         }
1689
1690       from = elt->buffer_index;
1691       msg_type = elt->msg_type;
1692
1693       ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME);
1694       ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE);
1695
1696       f = vlib_get_frame_to_node (vm, fqm->node_index);
1697
1698       to = vlib_frame_vector_args (f);
1699
1700       n_left_to_node = elt->n_vectors;
1701
1702       while (n_left_to_node >= 4)
1703         {
1704           to[0] = from[0];
1705           to[1] = from[1];
1706           to[2] = from[2];
1707           to[3] = from[3];
1708           to += 4;
1709           from += 4;
1710           n_left_to_node -= 4;
1711         }
1712
1713       while (n_left_to_node > 0)
1714         {
1715           to[0] = from[0];
1716           to++;
1717           from++;
1718           n_left_to_node--;
1719         }
1720
1721       vectors += elt->n_vectors;
1722       f->n_vectors = elt->n_vectors;
1723       vlib_put_frame_to_node (vm, fqm->node_index, f);
1724
1725       elt->valid = 0;
1726       elt->n_vectors = 0;
1727       elt->msg_type = 0xfefefefe;
1728       CLIB_MEMORY_BARRIER ();
1729       fq->head++;
1730       processed++;
1731
1732       /*
1733        * Limit the number of packets pushed into the graph
1734        */
1735       if (vectors >= fq->vector_threshold)
1736         {
1737           fq->head_hint = fq->head;
1738           return processed;
1739         }
1740     }
1741   ASSERT (0);
1742   return processed;
1743 }
1744
1745 void
1746 vlib_worker_thread_fn (void *arg)
1747 {
1748   vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg;
1749   vlib_thread_main_t *tm = vlib_get_thread_main ();
1750   vlib_main_t *vm = vlib_get_main ();
1751   clib_error_t *e;
1752
1753   ASSERT (vm->thread_index == vlib_get_thread_index ());
1754
1755   vlib_worker_thread_init (w);
1756   clib_time_init (&vm->clib_time);
1757   clib_mem_set_heap (w->thread_mheap);
1758
1759   e = vlib_call_init_exit_functions_no_sort
1760     (vm, &vm->worker_init_function_registrations, 1 /* call_once */ );
1761   if (e)
1762     clib_error_report (e);
1763
1764   /* Wait until the dpdk init sequence is complete */
1765   while (tm->extern_thread_mgmt && tm->worker_thread_release == 0)
1766     vlib_worker_thread_barrier_check ();
1767
1768   vlib_worker_loop (vm);
1769 }
1770
1771 /* *INDENT-OFF* */
1772 VLIB_REGISTER_THREAD (worker_thread_reg, static) = {
1773   .name = "workers",
1774   .short_name = "wk",
1775   .function = vlib_worker_thread_fn,
1776 };
1777 /* *INDENT-ON* */
1778
1779 u32
1780 vlib_frame_queue_main_init (u32 node_index, u32 frame_queue_nelts)
1781 {
1782   vlib_thread_main_t *tm = vlib_get_thread_main ();
1783   vlib_frame_queue_main_t *fqm;
1784   vlib_frame_queue_t *fq;
1785   int i;
1786
1787   if (frame_queue_nelts == 0)
1788     frame_queue_nelts = FRAME_QUEUE_MAX_NELTS;
1789
1790   ASSERT (frame_queue_nelts >= 8);
1791
1792   vec_add2 (tm->frame_queue_mains, fqm, 1);
1793
1794   fqm->node_index = node_index;
1795   fqm->frame_queue_nelts = frame_queue_nelts;
1796   fqm->queue_hi_thresh = frame_queue_nelts - 2;
1797
1798   vec_validate (fqm->vlib_frame_queues, tm->n_vlib_mains - 1);
1799   vec_validate (fqm->per_thread_data, tm->n_vlib_mains - 1);
1800   _vec_len (fqm->vlib_frame_queues) = 0;
1801   for (i = 0; i < tm->n_vlib_mains; i++)
1802     {
1803       vlib_frame_queue_per_thread_data_t *ptd;
1804       fq = vlib_frame_queue_alloc (frame_queue_nelts);
1805       vec_add1 (fqm->vlib_frame_queues, fq);
1806
1807       ptd = vec_elt_at_index (fqm->per_thread_data, i);
1808       vec_validate (ptd->handoff_queue_elt_by_thread_index,
1809                     tm->n_vlib_mains - 1);
1810       vec_validate_init_empty (ptd->congested_handoff_queue_by_thread_index,
1811                                tm->n_vlib_mains - 1,
1812                                (vlib_frame_queue_t *) (~0));
1813     }
1814
1815   return (fqm - tm->frame_queue_mains);
1816 }
1817
1818 int
1819 vlib_thread_cb_register (struct vlib_main_t *vm, vlib_thread_callbacks_t * cb)
1820 {
1821   vlib_thread_main_t *tm = vlib_get_thread_main ();
1822
1823   if (tm->extern_thread_mgmt)
1824     return -1;
1825
1826   tm->cb.vlib_launch_thread_cb = cb->vlib_launch_thread_cb;
1827   tm->extern_thread_mgmt = 1;
1828   return 0;
1829 }
1830
1831 void
1832 vlib_process_signal_event_mt_helper (vlib_process_signal_event_mt_args_t *
1833                                      args)
1834 {
1835   ASSERT (vlib_get_thread_index () == 0);
1836   vlib_process_signal_event (vlib_get_main (), args->node_index,
1837                              args->type_opaque, args->data);
1838 }
1839
1840 void *rpc_call_main_thread_cb_fn;
1841
1842 void
1843 vlib_rpc_call_main_thread (void *callback, u8 * args, u32 arg_size)
1844 {
1845   if (rpc_call_main_thread_cb_fn)
1846     {
1847       void (*fp) (void *, u8 *, u32) = rpc_call_main_thread_cb_fn;
1848       (*fp) (callback, args, arg_size);
1849     }
1850   else
1851     clib_warning ("BUG: rpc_call_main_thread_cb_fn NULL!");
1852 }
1853
1854 clib_error_t *
1855 threads_init (vlib_main_t * vm)
1856 {
1857   return 0;
1858 }
1859
1860 VLIB_INIT_FUNCTION (threads_init);
1861
1862
1863 static clib_error_t *
1864 show_clock_command_fn (vlib_main_t * vm,
1865                        unformat_input_t * input, vlib_cli_command_t * cmd)
1866 {
1867   int i;
1868   f64 now;
1869
1870   now = vlib_time_now (vm);
1871
1872   vlib_cli_output (vm, "Time now %.9f", now);
1873
1874   if (vec_len (vlib_mains) == 1)
1875     return 0;
1876
1877   vlib_cli_output (vm, "Time last barrier release %.9f",
1878                    vm->time_last_barrier_release);
1879
1880   for (i = 1; i < vec_len (vlib_mains); i++)
1881     {
1882       if (vlib_mains[i] == 0)
1883         continue;
1884       vlib_cli_output (vm, "Thread %d offset %.9f error %.9f", i,
1885                        vlib_mains[i]->time_offset,
1886                        vm->time_last_barrier_release -
1887                        vlib_mains[i]->time_last_barrier_release);
1888     }
1889   return 0;
1890 }
1891
1892 /* *INDENT-OFF* */
1893 VLIB_CLI_COMMAND (f_command, static) =
1894 {
1895   .path = "show clock",
1896   .short_help = "show clock",
1897   .function = show_clock_command_fn,
1898 };
1899 /* *INDENT-ON* */
1900
1901 /*
1902  * fd.io coding-style-patch-verification: ON
1903  *
1904  * Local Variables:
1905  * eval: (c-set-style "gnu")
1906  * End:
1907  */