thread: Add show threads api
[vpp.git] / src / vlib / threads.c
1 /*
2  * Copyright (c) 2015 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define _GNU_SOURCE
16
17 #include <signal.h>
18 #include <math.h>
19 #include <vppinfra/format.h>
20 #include <vppinfra/linux/sysfs.h>
21 #include <vlib/vlib.h>
22
23 #include <vlib/threads.h>
24 #include <vlib/unix/cj.h>
25
26 DECLARE_CJ_GLOBAL_LOG;
27
28 #define FRAME_QUEUE_NELTS 64
29
30 u32
31 vl (void *p)
32 {
33   return vec_len (p);
34 }
35
36 vlib_worker_thread_t *vlib_worker_threads;
37 vlib_thread_main_t vlib_thread_main;
38
39 /*
40  * Barrier tracing can be enabled on a normal build to collect information
41  * on barrier use, including timings and call stacks.  Deliberately not
42  * keyed off CLIB_DEBUG, because that can add significant overhead which
43  * imapacts observed timings.
44  */
45
46 static u32
47 elog_id_for_msg_name (const char *msg_name)
48 {
49   uword *p, r;
50   static uword *h;
51   u8 *name_copy;
52
53   if (!h)
54     h = hash_create_string (0, sizeof (uword));
55
56   p = hash_get_mem (h, msg_name);
57   if (p)
58     return p[0];
59   r = elog_string (&vlib_global_main.elog_main, "%s", msg_name);
60
61   name_copy = format (0, "%s%c", msg_name, 0);
62
63   hash_set_mem (h, name_copy, r);
64
65   return r;
66 }
67
68 static inline void
69 barrier_trace_sync (f64 t_entry, f64 t_open, f64 t_closed)
70 {
71   if (!vlib_worker_threads->barrier_elog_enabled)
72     return;
73
74     /* *INDENT-OFF* */
75     ELOG_TYPE_DECLARE (e) =
76       {
77         .format = "bar-trace-%s-#%d",
78         .format_args = "T4i4",
79       };
80     /* *INDENT-ON* */
81   struct
82   {
83     u32 caller, count, t_entry, t_open, t_closed;
84   } *ed = 0;
85
86   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
87   ed->count = (int) vlib_worker_threads[0].barrier_sync_count;
88   ed->caller = elog_id_for_msg_name (vlib_worker_threads[0].barrier_caller);
89   ed->t_entry = (int) (1000000.0 * t_entry);
90   ed->t_open = (int) (1000000.0 * t_open);
91   ed->t_closed = (int) (1000000.0 * t_closed);
92 }
93
94 static inline void
95 barrier_trace_sync_rec (f64 t_entry)
96 {
97   if (!vlib_worker_threads->barrier_elog_enabled)
98     return;
99
100     /* *INDENT-OFF* */
101     ELOG_TYPE_DECLARE (e) =
102       {
103         .format = "bar-syncrec-%s-#%d",
104         .format_args = "T4i4",
105       };
106     /* *INDENT-ON* */
107   struct
108   {
109     u32 caller, depth;
110   } *ed = 0;
111
112   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
113   ed->depth = (int) vlib_worker_threads[0].recursion_level - 1;
114   ed->caller = elog_id_for_msg_name (vlib_worker_threads[0].barrier_caller);
115 }
116
117 static inline void
118 barrier_trace_release_rec (f64 t_entry)
119 {
120   if (!vlib_worker_threads->barrier_elog_enabled)
121     return;
122
123     /* *INDENT-OFF* */
124     ELOG_TYPE_DECLARE (e) =
125       {
126         .format = "bar-relrrec-#%d",
127         .format_args = "i4",
128       };
129     /* *INDENT-ON* */
130   struct
131   {
132     u32 depth;
133   } *ed = 0;
134
135   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
136   ed->depth = (int) vlib_worker_threads[0].recursion_level;
137 }
138
139 static inline void
140 barrier_trace_release (f64 t_entry, f64 t_closed_total, f64 t_update_main)
141 {
142   if (!vlib_worker_threads->barrier_elog_enabled)
143     return;
144
145     /* *INDENT-OFF* */
146     ELOG_TYPE_DECLARE (e) =
147       {
148         .format = "bar-rel-#%d-e%d-u%d-t%d",
149         .format_args = "i4i4i4i4",
150       };
151     /* *INDENT-ON* */
152   struct
153   {
154     u32 count, t_entry, t_update_main, t_closed_total;
155   } *ed = 0;
156
157   ed = ELOG_DATA (&vlib_global_main.elog_main, e);
158   ed->t_entry = (int) (1000000.0 * t_entry);
159   ed->t_update_main = (int) (1000000.0 * t_update_main);
160   ed->t_closed_total = (int) (1000000.0 * t_closed_total);
161   ed->count = (int) vlib_worker_threads[0].barrier_sync_count;
162
163   /* Reset context for next trace */
164   vlib_worker_threads[0].barrier_context = NULL;
165 }
166
167 uword
168 os_get_nthreads (void)
169 {
170   u32 len;
171
172   len = vec_len (vlib_thread_stacks);
173   if (len == 0)
174     return 1;
175   else
176     return len;
177 }
178
179 void
180 vlib_set_thread_name (char *name)
181 {
182   int pthread_setname_np (pthread_t __target_thread, const char *__name);
183   int rv;
184   pthread_t thread = pthread_self ();
185
186   if (thread)
187     {
188       rv = pthread_setname_np (thread, name);
189       if (rv)
190         clib_warning ("pthread_setname_np returned %d", rv);
191     }
192 }
193
194 static int
195 sort_registrations_by_no_clone (void *a0, void *a1)
196 {
197   vlib_thread_registration_t **tr0 = a0;
198   vlib_thread_registration_t **tr1 = a1;
199
200   return ((i32) ((*tr0)->no_data_structure_clone)
201           - ((i32) ((*tr1)->no_data_structure_clone)));
202 }
203
204 static uword *
205 clib_sysfs_list_to_bitmap (char *filename)
206 {
207   FILE *fp;
208   uword *r = 0;
209
210   fp = fopen (filename, "r");
211
212   if (fp != NULL)
213     {
214       u8 *buffer = 0;
215       vec_validate (buffer, 256 - 1);
216       if (fgets ((char *) buffer, 256, fp))
217         {
218           unformat_input_t in;
219           unformat_init_string (&in, (char *) buffer,
220                                 strlen ((char *) buffer));
221           if (unformat (&in, "%U", unformat_bitmap_list, &r) != 1)
222             clib_warning ("unformat_bitmap_list failed");
223           unformat_free (&in);
224         }
225       vec_free (buffer);
226       fclose (fp);
227     }
228   return r;
229 }
230
231
232 /* Called early in the init sequence */
233
234 clib_error_t *
235 vlib_thread_init (vlib_main_t * vm)
236 {
237   vlib_thread_main_t *tm = &vlib_thread_main;
238   vlib_worker_thread_t *w;
239   vlib_thread_registration_t *tr;
240   u32 n_vlib_mains = 1;
241   u32 first_index = 1;
242   u32 i;
243   uword *avail_cpu;
244
245   /* get bitmaps of active cpu cores and sockets */
246   tm->cpu_core_bitmap =
247     clib_sysfs_list_to_bitmap ("/sys/devices/system/cpu/online");
248   tm->cpu_socket_bitmap =
249     clib_sysfs_list_to_bitmap ("/sys/devices/system/node/online");
250
251   avail_cpu = clib_bitmap_dup (tm->cpu_core_bitmap);
252
253   /* skip cores */
254   for (i = 0; i < tm->skip_cores; i++)
255     {
256       uword c = clib_bitmap_first_set (avail_cpu);
257       if (c == ~0)
258         return clib_error_return (0, "no available cpus to skip");
259
260       avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
261     }
262
263   /* grab cpu for main thread */
264   if (tm->main_lcore == ~0)
265     {
266       /* if main-lcore is not set, we try to use lcore 1 */
267       if (clib_bitmap_get (avail_cpu, 1))
268         tm->main_lcore = 1;
269       else
270         tm->main_lcore = clib_bitmap_first_set (avail_cpu);
271       if (tm->main_lcore == (u8) ~ 0)
272         return clib_error_return (0, "no available cpus to be used for the"
273                                   " main thread");
274     }
275   else
276     {
277       if (clib_bitmap_get (avail_cpu, tm->main_lcore) == 0)
278         return clib_error_return (0, "cpu %u is not available to be used"
279                                   " for the main thread", tm->main_lcore);
280     }
281   avail_cpu = clib_bitmap_set (avail_cpu, tm->main_lcore, 0);
282
283   /* assume that there is socket 0 only if there is no data from sysfs */
284   if (!tm->cpu_socket_bitmap)
285     tm->cpu_socket_bitmap = clib_bitmap_set (0, 0, 1);
286
287   /* pin main thread to main_lcore  */
288   if (tm->cb.vlib_thread_set_lcore_cb)
289     {
290       tm->cb.vlib_thread_set_lcore_cb (0, tm->main_lcore);
291     }
292   else
293     {
294       cpu_set_t cpuset;
295       CPU_ZERO (&cpuset);
296       CPU_SET (tm->main_lcore, &cpuset);
297       pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
298     }
299
300   /* as many threads as stacks... */
301   vec_validate_aligned (vlib_worker_threads, vec_len (vlib_thread_stacks) - 1,
302                         CLIB_CACHE_LINE_BYTES);
303
304   /* Preallocate thread 0 */
305   _vec_len (vlib_worker_threads) = 1;
306   w = vlib_worker_threads;
307   w->thread_mheap = clib_mem_get_heap ();
308   w->thread_stack = vlib_thread_stacks[0];
309   w->cpu_id = tm->main_lcore;
310   w->lwp = syscall (SYS_gettid);
311   w->thread_id = pthread_self ();
312   tm->n_vlib_mains = 1;
313
314   if (tm->sched_policy != ~0)
315     {
316       struct sched_param sched_param;
317       if (!sched_getparam (w->lwp, &sched_param))
318         {
319           if (tm->sched_priority != ~0)
320             sched_param.sched_priority = tm->sched_priority;
321           sched_setscheduler (w->lwp, tm->sched_policy, &sched_param);
322         }
323     }
324
325   /* assign threads to cores and set n_vlib_mains */
326   tr = tm->next;
327
328   while (tr)
329     {
330       vec_add1 (tm->registrations, tr);
331       tr = tr->next;
332     }
333
334   vec_sort_with_function (tm->registrations, sort_registrations_by_no_clone);
335
336   for (i = 0; i < vec_len (tm->registrations); i++)
337     {
338       int j;
339       tr = tm->registrations[i];
340       tr->first_index = first_index;
341       first_index += tr->count;
342       n_vlib_mains += (tr->no_data_structure_clone == 0) ? tr->count : 0;
343
344       /* construct coremask */
345       if (tr->use_pthreads || !tr->count)
346         continue;
347
348       if (tr->coremask)
349         {
350           uword c;
351           /* *INDENT-OFF* */
352           clib_bitmap_foreach (c, tr->coremask, ({
353             if (clib_bitmap_get(avail_cpu, c) == 0)
354               return clib_error_return (0, "cpu %u is not available to be used"
355                                         " for the '%s' thread",c, tr->name);
356
357             avail_cpu = clib_bitmap_set(avail_cpu, c, 0);
358           }));
359 /* *INDENT-ON* */
360
361         }
362       else
363         {
364           for (j = 0; j < tr->count; j++)
365             {
366               uword c = clib_bitmap_first_set (avail_cpu);
367               if (c == ~0)
368                 return clib_error_return (0,
369                                           "no available cpus to be used for"
370                                           " the '%s' thread", tr->name);
371
372               avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
373               tr->coremask = clib_bitmap_set (tr->coremask, c, 1);
374             }
375         }
376     }
377
378   clib_bitmap_free (avail_cpu);
379
380   tm->n_vlib_mains = n_vlib_mains;
381
382   vec_validate_aligned (vlib_worker_threads, first_index - 1,
383                         CLIB_CACHE_LINE_BYTES);
384
385   return 0;
386 }
387
388 vlib_frame_queue_t *
389 vlib_frame_queue_alloc (int nelts)
390 {
391   vlib_frame_queue_t *fq;
392
393   fq = clib_mem_alloc_aligned (sizeof (*fq), CLIB_CACHE_LINE_BYTES);
394   memset (fq, 0, sizeof (*fq));
395   fq->nelts = nelts;
396   fq->vector_threshold = 128;   // packets
397   vec_validate_aligned (fq->elts, nelts - 1, CLIB_CACHE_LINE_BYTES);
398
399   if (1)
400     {
401       if (((uword) & fq->tail) & (CLIB_CACHE_LINE_BYTES - 1))
402         fformat (stderr, "WARNING: fq->tail unaligned\n");
403       if (((uword) & fq->head) & (CLIB_CACHE_LINE_BYTES - 1))
404         fformat (stderr, "WARNING: fq->head unaligned\n");
405       if (((uword) fq->elts) & (CLIB_CACHE_LINE_BYTES - 1))
406         fformat (stderr, "WARNING: fq->elts unaligned\n");
407
408       if (sizeof (fq->elts[0]) % CLIB_CACHE_LINE_BYTES)
409         fformat (stderr, "WARNING: fq->elts[0] size %d\n",
410                  sizeof (fq->elts[0]));
411       if (nelts & (nelts - 1))
412         {
413           fformat (stderr, "FATAL: nelts MUST be a power of 2\n");
414           abort ();
415         }
416     }
417
418   return (fq);
419 }
420
421 void vl_msg_api_handler_no_free (void *) __attribute__ ((weak));
422 void
423 vl_msg_api_handler_no_free (void *v)
424 {
425 }
426
427 /* Turned off, save as reference material... */
428 #if 0
429 static inline int
430 vlib_frame_queue_dequeue_internal (int thread_id,
431                                    vlib_main_t * vm, vlib_node_main_t * nm)
432 {
433   vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
434   vlib_frame_queue_elt_t *elt;
435   vlib_frame_t *f;
436   vlib_pending_frame_t *p;
437   vlib_node_runtime_t *r;
438   u32 node_runtime_index;
439   int msg_type;
440   u64 before;
441   int processed = 0;
442
443   ASSERT (vm == vlib_mains[thread_id]);
444
445   while (1)
446     {
447       if (fq->head == fq->tail)
448         return processed;
449
450       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
451
452       if (!elt->valid)
453         return processed;
454
455       before = clib_cpu_time_now ();
456
457       f = elt->frame;
458       node_runtime_index = elt->node_runtime_index;
459       msg_type = elt->msg_type;
460
461       switch (msg_type)
462         {
463         case VLIB_FRAME_QUEUE_ELT_FREE_BUFFERS:
464           vlib_buffer_free (vm, vlib_frame_vector_args (f), f->n_vectors);
465           /* note fallthrough... */
466         case VLIB_FRAME_QUEUE_ELT_FREE_FRAME:
467           r = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
468                                 node_runtime_index);
469           vlib_frame_free (vm, r, f);
470           break;
471         case VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME:
472           vec_add2 (vm->node_main.pending_frames, p, 1);
473           f->flags |= (VLIB_FRAME_PENDING | VLIB_FRAME_FREE_AFTER_DISPATCH);
474           p->node_runtime_index = elt->node_runtime_index;
475           p->frame_index = vlib_frame_index (vm, f);
476           p->next_frame_index = VLIB_PENDING_FRAME_NO_NEXT_FRAME;
477           fq->dequeue_vectors += (u64) f->n_vectors;
478           break;
479         case VLIB_FRAME_QUEUE_ELT_API_MSG:
480           vl_msg_api_handler_no_free (f);
481           break;
482         default:
483           clib_warning ("bogus frame queue message, type %d", msg_type);
484           break;
485         }
486       elt->valid = 0;
487       fq->dequeues++;
488       fq->dequeue_ticks += clib_cpu_time_now () - before;
489       CLIB_MEMORY_BARRIER ();
490       fq->head++;
491       processed++;
492     }
493   ASSERT (0);
494   return processed;
495 }
496
497 int
498 vlib_frame_queue_dequeue (int thread_id,
499                           vlib_main_t * vm, vlib_node_main_t * nm)
500 {
501   return vlib_frame_queue_dequeue_internal (thread_id, vm, nm);
502 }
503
504 int
505 vlib_frame_queue_enqueue (vlib_main_t * vm, u32 node_runtime_index,
506                           u32 frame_queue_index, vlib_frame_t * frame,
507                           vlib_frame_queue_msg_type_t type)
508 {
509   vlib_frame_queue_t *fq = vlib_frame_queues[frame_queue_index];
510   vlib_frame_queue_elt_t *elt;
511   u32 save_count;
512   u64 new_tail;
513   u64 before = clib_cpu_time_now ();
514
515   ASSERT (fq);
516
517   new_tail = __sync_add_and_fetch (&fq->tail, 1);
518
519   /* Wait until a ring slot is available */
520   while (new_tail >= fq->head + fq->nelts)
521     {
522       f64 b4 = vlib_time_now_ticks (vm, before);
523       vlib_worker_thread_barrier_check (vm, b4);
524       /* Bad idea. Dequeue -> enqueue -> dequeue -> trouble */
525       // vlib_frame_queue_dequeue (vm->thread_index, vm, nm);
526     }
527
528   elt = fq->elts + (new_tail & (fq->nelts - 1));
529
530   /* this would be very bad... */
531   while (elt->valid)
532     {
533     }
534
535   /* Once we enqueue the frame, frame->n_vectors is owned elsewhere... */
536   save_count = frame->n_vectors;
537
538   elt->frame = frame;
539   elt->node_runtime_index = node_runtime_index;
540   elt->msg_type = type;
541   CLIB_MEMORY_BARRIER ();
542   elt->valid = 1;
543
544   return save_count;
545 }
546 #endif /* 0 */
547
548 /* To be called by vlib worker threads upon startup */
549 void
550 vlib_worker_thread_init (vlib_worker_thread_t * w)
551 {
552   vlib_thread_main_t *tm = vlib_get_thread_main ();
553
554   /*
555    * Note: disabling signals in worker threads as follows
556    * prevents the api post-mortem dump scheme from working
557    * {
558    *    sigset_t s;
559    *    sigfillset (&s);
560    *    pthread_sigmask (SIG_SETMASK, &s, 0);
561    *  }
562    */
563
564   clib_mem_set_heap (w->thread_mheap);
565
566   if (vec_len (tm->thread_prefix) && w->registration->short_name)
567     {
568       w->name = format (0, "%v_%s_%d%c", tm->thread_prefix,
569                         w->registration->short_name, w->instance_id, '\0');
570       vlib_set_thread_name ((char *) w->name);
571     }
572
573   if (!w->registration->use_pthreads)
574     {
575
576       /* Initial barrier sync, for both worker and i/o threads */
577       clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, 1);
578
579       while (*vlib_worker_threads->wait_at_barrier)
580         ;
581
582       clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, -1);
583     }
584 }
585
586 void *
587 vlib_worker_thread_bootstrap_fn (void *arg)
588 {
589   void *rv;
590   vlib_worker_thread_t *w = arg;
591
592   w->lwp = syscall (SYS_gettid);
593   w->thread_id = pthread_self ();
594
595   __os_thread_index = w - vlib_worker_threads;
596
597   rv = (void *) clib_calljmp
598     ((uword (*)(uword)) w->thread_function,
599      (uword) arg, w->thread_stack + VLIB_THREAD_STACK_SIZE);
600   /* NOTREACHED, we hope */
601   return rv;
602 }
603
604 static void
605 vlib_get_thread_core_socket (vlib_worker_thread_t * w, unsigned cpu_id)
606 {
607   const char *sys_cpu_path = "/sys/devices/system/cpu/cpu";
608   u8 *p = 0;
609   int core_id = -1, socket_id = -1;
610
611   p = format (p, "%s%u/topology/core_id%c", sys_cpu_path, cpu_id, 0);
612   clib_sysfs_read ((char *) p, "%d", &core_id);
613   vec_reset_length (p);
614   p =
615     format (p, "%s%u/topology/physical_package_id%c", sys_cpu_path, cpu_id,
616             0);
617   clib_sysfs_read ((char *) p, "%d", &socket_id);
618   vec_free (p);
619
620   w->core_id = core_id;
621   w->socket_id = socket_id;
622 }
623
624 static clib_error_t *
625 vlib_launch_thread_int (void *fp, vlib_worker_thread_t * w, unsigned cpu_id)
626 {
627   vlib_thread_main_t *tm = &vlib_thread_main;
628   void *(*fp_arg) (void *) = fp;
629
630   w->cpu_id = cpu_id;
631   vlib_get_thread_core_socket (w, cpu_id);
632   if (tm->cb.vlib_launch_thread_cb && !w->registration->use_pthreads)
633     return tm->cb.vlib_launch_thread_cb (fp, (void *) w, cpu_id);
634   else
635     {
636       pthread_t worker;
637       cpu_set_t cpuset;
638       CPU_ZERO (&cpuset);
639       CPU_SET (cpu_id, &cpuset);
640
641       if (pthread_create (&worker, NULL /* attr */ , fp_arg, (void *) w))
642         return clib_error_return_unix (0, "pthread_create");
643
644       if (pthread_setaffinity_np (worker, sizeof (cpu_set_t), &cpuset))
645         return clib_error_return_unix (0, "pthread_setaffinity_np");
646
647       return 0;
648     }
649 }
650
651 static clib_error_t *
652 start_workers (vlib_main_t * vm)
653 {
654   int i, j;
655   vlib_worker_thread_t *w;
656   vlib_main_t *vm_clone;
657   void *oldheap;
658   vlib_thread_main_t *tm = &vlib_thread_main;
659   vlib_thread_registration_t *tr;
660   vlib_node_runtime_t *rt;
661   u32 n_vlib_mains = tm->n_vlib_mains;
662   u32 worker_thread_index;
663   u8 *main_heap = clib_mem_get_per_cpu_heap ();
664
665   vec_reset_length (vlib_worker_threads);
666
667   /* Set up the main thread */
668   vec_add2_aligned (vlib_worker_threads, w, 1, CLIB_CACHE_LINE_BYTES);
669   w->elog_track.name = "main thread";
670   elog_track_register (&vm->elog_main, &w->elog_track);
671
672   if (vec_len (tm->thread_prefix))
673     {
674       w->name = format (0, "%v_main%c", tm->thread_prefix, '\0');
675       vlib_set_thread_name ((char *) w->name);
676     }
677
678   vm->elog_main.lock =
679     clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES, CLIB_CACHE_LINE_BYTES);
680   vm->elog_main.lock[0] = 0;
681
682   if (n_vlib_mains > 1)
683     {
684       /* Replace hand-crafted length-1 vector with a real vector */
685       vlib_mains = 0;
686
687       vec_validate_aligned (vlib_mains, tm->n_vlib_mains - 1,
688                             CLIB_CACHE_LINE_BYTES);
689       _vec_len (vlib_mains) = 0;
690       vec_add1_aligned (vlib_mains, vm, CLIB_CACHE_LINE_BYTES);
691
692       vlib_worker_threads->wait_at_barrier =
693         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
694       vlib_worker_threads->workers_at_barrier =
695         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
696
697       vlib_worker_threads->node_reforks_required =
698         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
699
700       /* Ask for an initial barrier sync */
701       *vlib_worker_threads->workers_at_barrier = 0;
702       *vlib_worker_threads->wait_at_barrier = 1;
703
704       /* Without update or refork */
705       *vlib_worker_threads->node_reforks_required = 0;
706       vm->need_vlib_worker_thread_node_runtime_update = 0;
707
708       /* init timing */
709       vm->barrier_epoch = 0;
710       vm->barrier_no_close_before = 0;
711
712       worker_thread_index = 1;
713
714       for (i = 0; i < vec_len (tm->registrations); i++)
715         {
716           vlib_node_main_t *nm, *nm_clone;
717           vlib_buffer_free_list_t *fl_clone, *fl_orig;
718           vlib_buffer_free_list_t *orig_freelist_pool;
719           int k;
720
721           tr = tm->registrations[i];
722
723           if (tr->count == 0)
724             continue;
725
726           for (k = 0; k < tr->count; k++)
727             {
728               vlib_node_t *n;
729
730               vec_add2 (vlib_worker_threads, w, 1);
731               /* Currently unused, may not really work */
732               if (tr->mheap_size)
733                 {
734 #if USE_DLMALLOC == 0
735                   w->thread_mheap =
736                     mheap_alloc (0 /* use VM */ , tr->mheap_size);
737 #else
738                   w->thread_mheap = create_mspace (tr->mheap_size,
739                                                    0 /* unlocked */ );
740 #endif
741                 }
742               else
743                 w->thread_mheap = main_heap;
744
745               w->thread_stack =
746                 vlib_thread_stack_init (w - vlib_worker_threads);
747               w->thread_function = tr->function;
748               w->thread_function_arg = w;
749               w->instance_id = k;
750               w->registration = tr;
751
752               w->elog_track.name =
753                 (char *) format (0, "%s %d", tr->name, k + 1);
754               vec_add1 (w->elog_track.name, 0);
755               elog_track_register (&vm->elog_main, &w->elog_track);
756
757               if (tr->no_data_structure_clone)
758                 continue;
759
760               /* Fork vlib_global_main et al. Look for bugs here */
761               oldheap = clib_mem_set_heap (w->thread_mheap);
762
763               vm_clone = clib_mem_alloc_aligned (sizeof (*vm_clone),
764                                                  CLIB_CACHE_LINE_BYTES);
765               clib_memcpy (vm_clone, vlib_mains[0], sizeof (*vm_clone));
766
767               vm_clone->thread_index = worker_thread_index;
768               vm_clone->heap_base = w->thread_mheap;
769               vm_clone->heap_aligned_base = (void *)
770                 (((uword) w->thread_mheap) & ~(VLIB_FRAME_ALIGN - 1));
771               vm_clone->init_functions_called =
772                 hash_create (0, /* value bytes */ 0);
773               vm_clone->pending_rpc_requests = 0;
774               vec_validate (vm_clone->pending_rpc_requests, 0);
775               _vec_len (vm_clone->pending_rpc_requests) = 0;
776               memset (&vm_clone->random_buffer, 0,
777                       sizeof (vm_clone->random_buffer));
778
779               nm = &vlib_mains[0]->node_main;
780               nm_clone = &vm_clone->node_main;
781               /* fork next frames array, preserving node runtime indices */
782               nm_clone->next_frames = vec_dup_aligned (nm->next_frames,
783                                                        CLIB_CACHE_LINE_BYTES);
784               for (j = 0; j < vec_len (nm_clone->next_frames); j++)
785                 {
786                   vlib_next_frame_t *nf = &nm_clone->next_frames[j];
787                   u32 save_node_runtime_index;
788                   u32 save_flags;
789
790                   save_node_runtime_index = nf->node_runtime_index;
791                   save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
792                   vlib_next_frame_init (nf);
793                   nf->node_runtime_index = save_node_runtime_index;
794                   nf->flags = save_flags;
795                 }
796
797               /* fork the frame dispatch queue */
798               nm_clone->pending_frames = 0;
799               vec_validate (nm_clone->pending_frames, 10);      /* $$$$$?????? */
800               _vec_len (nm_clone->pending_frames) = 0;
801
802               /* fork nodes */
803               nm_clone->nodes = 0;
804
805               /* Allocate all nodes in single block for speed */
806               n = clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*n));
807
808               for (j = 0; j < vec_len (nm->nodes); j++)
809                 {
810                   clib_memcpy (n, nm->nodes[j], sizeof (*n));
811                   /* none of the copied nodes have enqueue rights given out */
812                   n->owner_node_index = VLIB_INVALID_NODE_INDEX;
813                   memset (&n->stats_total, 0, sizeof (n->stats_total));
814                   memset (&n->stats_last_clear, 0,
815                           sizeof (n->stats_last_clear));
816                   vec_add1 (nm_clone->nodes, n);
817                   n++;
818                 }
819               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
820                 vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
821                                  CLIB_CACHE_LINE_BYTES);
822               vec_foreach (rt,
823                            nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
824               {
825                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
826                 rt->thread_index = vm_clone->thread_index;
827                 /* copy initial runtime_data from node */
828                 if (n->runtime_data && n->runtime_data_bytes > 0)
829                   clib_memcpy (rt->runtime_data, n->runtime_data,
830                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
831                                          n->runtime_data_bytes));
832               }
833
834               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
835                 vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
836                                  CLIB_CACHE_LINE_BYTES);
837               vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
838               {
839                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
840                 rt->thread_index = vm_clone->thread_index;
841                 /* copy initial runtime_data from node */
842                 if (n->runtime_data && n->runtime_data_bytes > 0)
843                   clib_memcpy (rt->runtime_data, n->runtime_data,
844                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
845                                          n->runtime_data_bytes));
846               }
847
848               nm_clone->processes = vec_dup_aligned (nm->processes,
849                                                      CLIB_CACHE_LINE_BYTES);
850
851               /* zap the (per worker) frame freelists, etc */
852               nm_clone->frame_sizes = 0;
853               nm_clone->frame_size_hash = hash_create (0, sizeof (uword));
854
855               /* Packet trace buffers are guaranteed to be empty, nothing to do here */
856
857               clib_mem_set_heap (oldheap);
858               vec_add1_aligned (vlib_mains, vm_clone, CLIB_CACHE_LINE_BYTES);
859
860               vm_clone->error_main.counters = vec_dup_aligned
861                 (vlib_mains[0]->error_main.counters, CLIB_CACHE_LINE_BYTES);
862               vm_clone->error_main.counters_last_clear = vec_dup_aligned
863                 (vlib_mains[0]->error_main.counters_last_clear,
864                  CLIB_CACHE_LINE_BYTES);
865
866               /* Fork the vlib_buffer_main_t free lists, etc. */
867               orig_freelist_pool = vm_clone->buffer_free_list_pool;
868               vm_clone->buffer_free_list_pool = 0;
869
870             /* *INDENT-OFF* */
871             pool_foreach (fl_orig, orig_freelist_pool,
872                           ({
873                             pool_get_aligned (vm_clone->buffer_free_list_pool,
874                                               fl_clone, CLIB_CACHE_LINE_BYTES);
875                             ASSERT (fl_orig - orig_freelist_pool
876                                     == fl_clone - vm_clone->buffer_free_list_pool);
877
878                             fl_clone[0] = fl_orig[0];
879                             fl_clone->buffers = 0;
880                             fl_clone->n_alloc = 0;
881                           }));
882 /* *INDENT-ON* */
883
884               worker_thread_index++;
885             }
886         }
887     }
888   else
889     {
890       /* only have non-data-structure copy threads to create... */
891       for (i = 0; i < vec_len (tm->registrations); i++)
892         {
893           tr = tm->registrations[i];
894
895           for (j = 0; j < tr->count; j++)
896             {
897               vec_add2 (vlib_worker_threads, w, 1);
898               if (tr->mheap_size)
899                 {
900 #if USE_DLMALLOC == 0
901                   w->thread_mheap =
902                     mheap_alloc (0 /* use VM */ , tr->mheap_size);
903 #else
904                   w->thread_mheap =
905                     create_mspace (tr->mheap_size, 0 /* locked */ );
906 #endif
907                 }
908               else
909                 w->thread_mheap = main_heap;
910               w->thread_stack =
911                 vlib_thread_stack_init (w - vlib_worker_threads);
912               w->thread_function = tr->function;
913               w->thread_function_arg = w;
914               w->instance_id = j;
915               w->elog_track.name =
916                 (char *) format (0, "%s %d", tr->name, j + 1);
917               w->registration = tr;
918               vec_add1 (w->elog_track.name, 0);
919               elog_track_register (&vm->elog_main, &w->elog_track);
920             }
921         }
922     }
923
924   worker_thread_index = 1;
925
926   for (i = 0; i < vec_len (tm->registrations); i++)
927     {
928       clib_error_t *err;
929       int j;
930
931       tr = tm->registrations[i];
932
933       if (tr->use_pthreads || tm->use_pthreads)
934         {
935           for (j = 0; j < tr->count; j++)
936             {
937               w = vlib_worker_threads + worker_thread_index++;
938               err = vlib_launch_thread_int (vlib_worker_thread_bootstrap_fn,
939                                             w, 0);
940               if (err)
941                 clib_error_report (err);
942             }
943         }
944       else
945         {
946           uword c;
947           /* *INDENT-OFF* */
948           clib_bitmap_foreach (c, tr->coremask, ({
949             w = vlib_worker_threads + worker_thread_index++;
950             err = vlib_launch_thread_int (vlib_worker_thread_bootstrap_fn,
951                                           w, c);
952             if (err)
953               clib_error_report (err);
954           }));
955           /* *INDENT-ON* */
956         }
957     }
958   vlib_worker_thread_barrier_sync (vm);
959   vlib_worker_thread_barrier_release (vm);
960   return 0;
961 }
962
963 VLIB_MAIN_LOOP_ENTER_FUNCTION (start_workers);
964
965
966 static inline void
967 worker_thread_node_runtime_update_internal (void)
968 {
969   int i, j;
970   vlib_main_t *vm;
971   vlib_node_main_t *nm, *nm_clone;
972   vlib_main_t *vm_clone;
973   vlib_node_runtime_t *rt;
974   never_inline void
975     vlib_node_runtime_sync_stats (vlib_main_t * vm,
976                                   vlib_node_runtime_t * r,
977                                   uword n_calls,
978                                   uword n_vectors, uword n_clocks);
979
980   ASSERT (vlib_get_thread_index () == 0);
981
982   vm = vlib_mains[0];
983   nm = &vm->node_main;
984
985   ASSERT (*vlib_worker_threads->wait_at_barrier == 1);
986
987   /*
988    * Scrape all runtime stats, so we don't lose node runtime(s) with
989    * pending counts, or throw away worker / io thread counts.
990    */
991   for (j = 0; j < vec_len (nm->nodes); j++)
992     {
993       vlib_node_t *n;
994       n = nm->nodes[j];
995       vlib_node_sync_stats (vm, n);
996     }
997
998   for (i = 1; i < vec_len (vlib_mains); i++)
999     {
1000       vlib_node_t *n;
1001
1002       vm_clone = vlib_mains[i];
1003       nm_clone = &vm_clone->node_main;
1004
1005       for (j = 0; j < vec_len (nm_clone->nodes); j++)
1006         {
1007           n = nm_clone->nodes[j];
1008
1009           rt = vlib_node_get_runtime (vm_clone, n->index);
1010           vlib_node_runtime_sync_stats (vm_clone, rt, 0, 0, 0);
1011         }
1012     }
1013
1014   /* Per-worker clone rebuilds are now done on each thread */
1015 }
1016
1017
1018 void
1019 vlib_worker_thread_node_refork (void)
1020 {
1021   vlib_main_t *vm, *vm_clone;
1022   vlib_node_main_t *nm, *nm_clone;
1023   vlib_node_t **old_nodes_clone;
1024   vlib_node_runtime_t *rt, *old_rt;
1025
1026   vlib_node_t *new_n_clone;
1027
1028   int j;
1029
1030   vm = vlib_mains[0];
1031   nm = &vm->node_main;
1032   vm_clone = vlib_get_main ();
1033   nm_clone = &vm_clone->node_main;
1034
1035   /* Re-clone error heap */
1036   u64 *old_counters = vm_clone->error_main.counters;
1037   u64 *old_counters_all_clear = vm_clone->error_main.counters_last_clear;
1038
1039   clib_memcpy (&vm_clone->error_main, &vm->error_main,
1040                sizeof (vm->error_main));
1041   j = vec_len (vm->error_main.counters) - 1;
1042   vec_validate_aligned (old_counters, j, CLIB_CACHE_LINE_BYTES);
1043   vec_validate_aligned (old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES);
1044   vm_clone->error_main.counters = old_counters;
1045   vm_clone->error_main.counters_last_clear = old_counters_all_clear;
1046
1047   nm_clone = &vm_clone->node_main;
1048   vec_free (nm_clone->next_frames);
1049   nm_clone->next_frames = vec_dup_aligned (nm->next_frames,
1050                                            CLIB_CACHE_LINE_BYTES);
1051
1052   for (j = 0; j < vec_len (nm_clone->next_frames); j++)
1053     {
1054       vlib_next_frame_t *nf = &nm_clone->next_frames[j];
1055       u32 save_node_runtime_index;
1056       u32 save_flags;
1057
1058       save_node_runtime_index = nf->node_runtime_index;
1059       save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
1060       vlib_next_frame_init (nf);
1061       nf->node_runtime_index = save_node_runtime_index;
1062       nf->flags = save_flags;
1063     }
1064
1065   old_nodes_clone = nm_clone->nodes;
1066   nm_clone->nodes = 0;
1067
1068   /* re-fork nodes */
1069
1070   /* Allocate all nodes in single block for speed */
1071   new_n_clone =
1072     clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*new_n_clone));
1073   for (j = 0; j < vec_len (nm->nodes); j++)
1074     {
1075       vlib_node_t *old_n_clone;
1076       vlib_node_t *new_n;
1077
1078       new_n = nm->nodes[j];
1079       old_n_clone = old_nodes_clone[j];
1080
1081       clib_memcpy (new_n_clone, new_n, sizeof (*new_n));
1082       /* none of the copied nodes have enqueue rights given out */
1083       new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX;
1084
1085       if (j >= vec_len (old_nodes_clone))
1086         {
1087           /* new node, set to zero */
1088           memset (&new_n_clone->stats_total, 0,
1089                   sizeof (new_n_clone->stats_total));
1090           memset (&new_n_clone->stats_last_clear, 0,
1091                   sizeof (new_n_clone->stats_last_clear));
1092         }
1093       else
1094         {
1095           /* Copy stats if the old data is valid */
1096           clib_memcpy (&new_n_clone->stats_total,
1097                        &old_n_clone->stats_total,
1098                        sizeof (new_n_clone->stats_total));
1099           clib_memcpy (&new_n_clone->stats_last_clear,
1100                        &old_n_clone->stats_last_clear,
1101                        sizeof (new_n_clone->stats_last_clear));
1102
1103           /* keep previous node state */
1104           new_n_clone->state = old_n_clone->state;
1105         }
1106       vec_add1 (nm_clone->nodes, new_n_clone);
1107       new_n_clone++;
1108     }
1109   /* Free the old node clones */
1110   clib_mem_free (old_nodes_clone[0]);
1111
1112   vec_free (old_nodes_clone);
1113
1114
1115   /* re-clone internal nodes */
1116   old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL];
1117   nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
1118     vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
1119                      CLIB_CACHE_LINE_BYTES);
1120
1121   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
1122   {
1123     vlib_node_t *n = vlib_get_node (vm, rt->node_index);
1124     rt->thread_index = vm_clone->thread_index;
1125     /* copy runtime_data, will be overwritten later for existing rt */
1126     if (n->runtime_data && n->runtime_data_bytes > 0)
1127       clib_memcpy (rt->runtime_data, n->runtime_data,
1128                    clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
1129                              n->runtime_data_bytes));
1130   }
1131
1132   for (j = 0; j < vec_len (old_rt); j++)
1133     {
1134       rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
1135       rt->state = old_rt[j].state;
1136       clib_memcpy (rt->runtime_data, old_rt[j].runtime_data,
1137                    VLIB_NODE_RUNTIME_DATA_SIZE);
1138     }
1139
1140   vec_free (old_rt);
1141
1142   /* re-clone input nodes */
1143   old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT];
1144   nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
1145     vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
1146                      CLIB_CACHE_LINE_BYTES);
1147
1148   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
1149   {
1150     vlib_node_t *n = vlib_get_node (vm, rt->node_index);
1151     rt->thread_index = vm_clone->thread_index;
1152     /* copy runtime_data, will be overwritten later for existing rt */
1153     if (n->runtime_data && n->runtime_data_bytes > 0)
1154       clib_memcpy (rt->runtime_data, n->runtime_data,
1155                    clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
1156                              n->runtime_data_bytes));
1157   }
1158
1159   for (j = 0; j < vec_len (old_rt); j++)
1160     {
1161       rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
1162       rt->state = old_rt[j].state;
1163       clib_memcpy (rt->runtime_data, old_rt[j].runtime_data,
1164                    VLIB_NODE_RUNTIME_DATA_SIZE);
1165     }
1166
1167   vec_free (old_rt);
1168
1169   nm_clone->processes = vec_dup_aligned (nm->processes,
1170                                          CLIB_CACHE_LINE_BYTES);
1171 }
1172
1173 void
1174 vlib_worker_thread_node_runtime_update (void)
1175 {
1176   /*
1177    * Make a note that we need to do a node runtime update
1178    * prior to releasing the barrier.
1179    */
1180   vlib_global_main.need_vlib_worker_thread_node_runtime_update = 1;
1181 }
1182
1183 u32
1184 unformat_sched_policy (unformat_input_t * input, va_list * args)
1185 {
1186   u32 *r = va_arg (*args, u32 *);
1187
1188   if (0);
1189 #define _(v,f,s) else if (unformat (input, s)) *r = SCHED_POLICY_##f;
1190   foreach_sched_policy
1191 #undef _
1192     else
1193     return 0;
1194   return 1;
1195 }
1196
1197 static clib_error_t *
1198 cpu_config (vlib_main_t * vm, unformat_input_t * input)
1199 {
1200   vlib_thread_registration_t *tr;
1201   uword *p;
1202   vlib_thread_main_t *tm = &vlib_thread_main;
1203   u8 *name;
1204   uword *bitmap;
1205   u32 count;
1206
1207   tm->thread_registrations_by_name = hash_create_string (0, sizeof (uword));
1208
1209   tm->n_thread_stacks = 1;      /* account for main thread */
1210   tm->sched_policy = ~0;
1211   tm->sched_priority = ~0;
1212   tm->main_lcore = ~0;
1213
1214   tr = tm->next;
1215
1216   while (tr)
1217     {
1218       hash_set_mem (tm->thread_registrations_by_name, tr->name, (uword) tr);
1219       tr = tr->next;
1220     }
1221
1222   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1223     {
1224       if (unformat (input, "use-pthreads"))
1225         tm->use_pthreads = 1;
1226       else if (unformat (input, "thread-prefix %v", &tm->thread_prefix))
1227         ;
1228       else if (unformat (input, "main-core %u", &tm->main_lcore))
1229         ;
1230       else if (unformat (input, "skip-cores %u", &tm->skip_cores))
1231         ;
1232       else if (unformat (input, "coremask-%s %U", &name,
1233                          unformat_bitmap_mask, &bitmap) ||
1234                unformat (input, "corelist-%s %U", &name,
1235                          unformat_bitmap_list, &bitmap))
1236         {
1237           p = hash_get_mem (tm->thread_registrations_by_name, name);
1238           if (p == 0)
1239             return clib_error_return (0, "no such thread type '%s'", name);
1240
1241           tr = (vlib_thread_registration_t *) p[0];
1242
1243           if (tr->use_pthreads)
1244             return clib_error_return (0,
1245                                       "corelist cannot be set for '%s' threads",
1246                                       name);
1247
1248           tr->coremask = bitmap;
1249           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1250         }
1251       else
1252         if (unformat
1253             (input, "scheduler-policy %U", unformat_sched_policy,
1254              &tm->sched_policy))
1255         ;
1256       else if (unformat (input, "scheduler-priority %u", &tm->sched_priority))
1257         ;
1258       else if (unformat (input, "%s %u", &name, &count))
1259         {
1260           p = hash_get_mem (tm->thread_registrations_by_name, name);
1261           if (p == 0)
1262             return clib_error_return (0, "no such thread type 3 '%s'", name);
1263
1264           tr = (vlib_thread_registration_t *) p[0];
1265           if (tr->fixed_count)
1266             return clib_error_return
1267               (0, "number of %s threads not configurable", tr->name);
1268           tr->count = count;
1269         }
1270       else
1271         break;
1272     }
1273
1274   if (tm->sched_priority != ~0)
1275     {
1276       if (tm->sched_policy == SCHED_FIFO || tm->sched_policy == SCHED_RR)
1277         {
1278           u32 prio_max = sched_get_priority_max (tm->sched_policy);
1279           u32 prio_min = sched_get_priority_min (tm->sched_policy);
1280           if (tm->sched_priority > prio_max)
1281             tm->sched_priority = prio_max;
1282           if (tm->sched_priority < prio_min)
1283             tm->sched_priority = prio_min;
1284         }
1285       else
1286         {
1287           return clib_error_return
1288             (0,
1289              "scheduling priority (%d) is not allowed for `normal` scheduling policy",
1290              tm->sched_priority);
1291         }
1292     }
1293   tr = tm->next;
1294
1295   if (!tm->thread_prefix)
1296     tm->thread_prefix = format (0, "vpp");
1297
1298   while (tr)
1299     {
1300       tm->n_thread_stacks += tr->count;
1301       tm->n_pthreads += tr->count * tr->use_pthreads;
1302       tm->n_threads += tr->count * (tr->use_pthreads == 0);
1303       tr = tr->next;
1304     }
1305
1306   return 0;
1307 }
1308
1309 VLIB_EARLY_CONFIG_FUNCTION (cpu_config, "cpu");
1310
1311 #if !defined (__x86_64__) && !defined (__i386__) && !defined (__aarch64__) && !defined (__powerpc64__) && !defined(__arm__)
1312 void
1313 __sync_fetch_and_add_8 (void)
1314 {
1315   fformat (stderr, "%s called\n", __FUNCTION__);
1316   abort ();
1317 }
1318
1319 void
1320 __sync_add_and_fetch_8 (void)
1321 {
1322   fformat (stderr, "%s called\n", __FUNCTION__);
1323   abort ();
1324 }
1325 #endif
1326
1327 void vnet_main_fixup (vlib_fork_fixup_t which) __attribute__ ((weak));
1328 void
1329 vnet_main_fixup (vlib_fork_fixup_t which)
1330 {
1331 }
1332
1333 void
1334 vlib_worker_thread_fork_fixup (vlib_fork_fixup_t which)
1335 {
1336   vlib_main_t *vm = vlib_get_main ();
1337
1338   if (vlib_mains == 0)
1339     return;
1340
1341   ASSERT (vlib_get_thread_index () == 0);
1342   vlib_worker_thread_barrier_sync (vm);
1343
1344   switch (which)
1345     {
1346     case VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX:
1347       vnet_main_fixup (VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX);
1348       break;
1349
1350     default:
1351       ASSERT (0);
1352     }
1353   vlib_worker_thread_barrier_release (vm);
1354 }
1355
1356   /*
1357    * Enforce minimum open time to minimize packet loss due to Rx overflow,
1358    * based on a test based heuristic that barrier should be open for at least
1359    * 3 time as long as it is closed (with an upper bound of 1ms because by that
1360    *  point it is probably too late to make a difference)
1361    */
1362
1363 #ifndef BARRIER_MINIMUM_OPEN_LIMIT
1364 #define BARRIER_MINIMUM_OPEN_LIMIT 0.001
1365 #endif
1366
1367 #ifndef BARRIER_MINIMUM_OPEN_FACTOR
1368 #define BARRIER_MINIMUM_OPEN_FACTOR 3
1369 #endif
1370
1371 void
1372 vlib_worker_thread_barrier_sync_int (vlib_main_t * vm)
1373 {
1374   f64 deadline;
1375   f64 now;
1376   f64 t_entry;
1377   f64 t_open;
1378   f64 t_closed;
1379   u32 count;
1380
1381   if (vec_len (vlib_mains) < 2)
1382     return;
1383
1384   ASSERT (vlib_get_thread_index () == 0);
1385
1386   count = vec_len (vlib_mains) - 1;
1387
1388   /* Record entry relative to last close */
1389   now = vlib_time_now (vm);
1390   t_entry = now - vm->barrier_epoch;
1391
1392   /* Tolerate recursive calls */
1393   if (++vlib_worker_threads[0].recursion_level > 1)
1394     {
1395       barrier_trace_sync_rec (t_entry);
1396       return;
1397     }
1398
1399   vlib_worker_threads[0].barrier_sync_count++;
1400
1401   /* Enforce minimum barrier open time to minimize packet loss */
1402   ASSERT (vm->barrier_no_close_before <= (now + BARRIER_MINIMUM_OPEN_LIMIT));
1403
1404   while (1)
1405     {
1406       now = vlib_time_now (vm);
1407       /* Barrier hold-down timer expired? */
1408       if (now >= vm->barrier_no_close_before)
1409         break;
1410       if ((vm->barrier_no_close_before - now)
1411           > (2.0 * BARRIER_MINIMUM_OPEN_LIMIT))
1412         {
1413           clib_warning ("clock change: would have waited for %.4f seconds",
1414                         (vm->barrier_no_close_before - now));
1415           break;
1416         }
1417     }
1418   /* Record time of closure */
1419   t_open = now - vm->barrier_epoch;
1420   vm->barrier_epoch = now;
1421
1422   deadline = now + BARRIER_SYNC_TIMEOUT;
1423
1424   *vlib_worker_threads->wait_at_barrier = 1;
1425   while (*vlib_worker_threads->workers_at_barrier != count)
1426     {
1427       if ((now = vlib_time_now (vm)) > deadline)
1428         {
1429           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1430           os_panic ();
1431         }
1432     }
1433
1434   t_closed = now - vm->barrier_epoch;
1435
1436   barrier_trace_sync (t_entry, t_open, t_closed);
1437
1438 }
1439
1440 void vlib_stat_segment_lock (void) __attribute__ ((weak));
1441 void
1442 vlib_stat_segment_lock (void)
1443 {
1444 }
1445
1446 void vlib_stat_segment_unlock (void) __attribute__ ((weak));
1447 void
1448 vlib_stat_segment_unlock (void)
1449 {
1450 }
1451
1452 void
1453 vlib_worker_thread_barrier_release (vlib_main_t * vm)
1454 {
1455   f64 deadline;
1456   f64 now;
1457   f64 minimum_open;
1458   f64 t_entry;
1459   f64 t_closed_total;
1460   f64 t_update_main = 0.0;
1461   int refork_needed = 0;
1462
1463   if (vec_len (vlib_mains) < 2)
1464     return;
1465
1466   ASSERT (vlib_get_thread_index () == 0);
1467
1468
1469   now = vlib_time_now (vm);
1470   t_entry = now - vm->barrier_epoch;
1471
1472   if (--vlib_worker_threads[0].recursion_level > 0)
1473     {
1474       barrier_trace_release_rec (t_entry);
1475       return;
1476     }
1477
1478   /* Update (all) node runtimes before releasing the barrier, if needed */
1479   if (vm->need_vlib_worker_thread_node_runtime_update)
1480     {
1481       /*
1482        * Lock stat segment here, so we's safe when
1483        * rebuilding the stat segment node clones from the
1484        * stat thread...
1485        */
1486       vlib_stat_segment_lock ();
1487
1488       /* Do stats elements on main thread */
1489       worker_thread_node_runtime_update_internal ();
1490       vm->need_vlib_worker_thread_node_runtime_update = 0;
1491
1492       /* Do per thread rebuilds in parallel */
1493       refork_needed = 1;
1494       clib_smp_atomic_add (vlib_worker_threads->node_reforks_required,
1495                            (vec_len (vlib_mains) - 1));
1496       now = vlib_time_now (vm);
1497       t_update_main = now - vm->barrier_epoch;
1498     }
1499
1500   deadline = now + BARRIER_SYNC_TIMEOUT;
1501
1502   *vlib_worker_threads->wait_at_barrier = 0;
1503
1504   while (*vlib_worker_threads->workers_at_barrier > 0)
1505     {
1506       if ((now = vlib_time_now (vm)) > deadline)
1507         {
1508           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1509           os_panic ();
1510         }
1511     }
1512
1513   /* Wait for reforks before continuing */
1514   if (refork_needed)
1515     {
1516       now = vlib_time_now (vm);
1517
1518       deadline = now + BARRIER_SYNC_TIMEOUT;
1519
1520       while (*vlib_worker_threads->node_reforks_required > 0)
1521         {
1522           if ((now = vlib_time_now (vm)) > deadline)
1523             {
1524               fformat (stderr, "%s: worker thread refork deadlock\n",
1525                        __FUNCTION__);
1526               os_panic ();
1527             }
1528         }
1529       vlib_stat_segment_unlock ();
1530     }
1531
1532   t_closed_total = now - vm->barrier_epoch;
1533
1534   minimum_open = t_closed_total * BARRIER_MINIMUM_OPEN_FACTOR;
1535
1536   if (minimum_open > BARRIER_MINIMUM_OPEN_LIMIT)
1537     {
1538       minimum_open = BARRIER_MINIMUM_OPEN_LIMIT;
1539     }
1540
1541   vm->barrier_no_close_before = now + minimum_open;
1542
1543   /* Record barrier epoch (used to enforce minimum open time) */
1544   vm->barrier_epoch = now;
1545
1546   barrier_trace_release (t_entry, t_closed_total, t_update_main);
1547
1548 }
1549
1550 /*
1551  * Check the frame queue to see if any frames are available.
1552  * If so, pull the packets off the frames and put them to
1553  * the handoff node.
1554  */
1555 int
1556 vlib_frame_queue_dequeue (vlib_main_t * vm, vlib_frame_queue_main_t * fqm)
1557 {
1558   u32 thread_id = vm->thread_index;
1559   vlib_frame_queue_t *fq = fqm->vlib_frame_queues[thread_id];
1560   vlib_frame_queue_elt_t *elt;
1561   u32 *from, *to;
1562   vlib_frame_t *f;
1563   int msg_type;
1564   int processed = 0;
1565   u32 n_left_to_node;
1566   u32 vectors = 0;
1567
1568   ASSERT (fq);
1569   ASSERT (vm == vlib_mains[thread_id]);
1570
1571   if (PREDICT_FALSE (fqm->node_index == ~0))
1572     return 0;
1573   /*
1574    * Gather trace data for frame queues
1575    */
1576   if (PREDICT_FALSE (fq->trace))
1577     {
1578       frame_queue_trace_t *fqt;
1579       frame_queue_nelt_counter_t *fqh;
1580       u32 elix;
1581
1582       fqt = &fqm->frame_queue_traces[thread_id];
1583
1584       fqt->nelts = fq->nelts;
1585       fqt->head = fq->head;
1586       fqt->head_hint = fq->head_hint;
1587       fqt->tail = fq->tail;
1588       fqt->threshold = fq->vector_threshold;
1589       fqt->n_in_use = fqt->tail - fqt->head;
1590       if (fqt->n_in_use >= fqt->nelts)
1591         {
1592           // if beyond max then use max
1593           fqt->n_in_use = fqt->nelts - 1;
1594         }
1595
1596       /* Record the number of elements in use in the histogram */
1597       fqh = &fqm->frame_queue_histogram[thread_id];
1598       fqh->count[fqt->n_in_use]++;
1599
1600       /* Record a snapshot of the elements in use */
1601       for (elix = 0; elix < fqt->nelts; elix++)
1602         {
1603           elt = fq->elts + ((fq->head + 1 + elix) & (fq->nelts - 1));
1604           if (1 || elt->valid)
1605             {
1606               fqt->n_vectors[elix] = elt->n_vectors;
1607             }
1608         }
1609       fqt->written = 1;
1610     }
1611
1612   while (1)
1613     {
1614       if (fq->head == fq->tail)
1615         {
1616           fq->head_hint = fq->head;
1617           return processed;
1618         }
1619
1620       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
1621
1622       if (!elt->valid)
1623         {
1624           fq->head_hint = fq->head;
1625           return processed;
1626         }
1627
1628       from = elt->buffer_index;
1629       msg_type = elt->msg_type;
1630
1631       ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME);
1632       ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE);
1633
1634       f = vlib_get_frame_to_node (vm, fqm->node_index);
1635
1636       to = vlib_frame_vector_args (f);
1637
1638       n_left_to_node = elt->n_vectors;
1639
1640       while (n_left_to_node >= 4)
1641         {
1642           to[0] = from[0];
1643           to[1] = from[1];
1644           to[2] = from[2];
1645           to[3] = from[3];
1646           to += 4;
1647           from += 4;
1648           n_left_to_node -= 4;
1649         }
1650
1651       while (n_left_to_node > 0)
1652         {
1653           to[0] = from[0];
1654           to++;
1655           from++;
1656           n_left_to_node--;
1657         }
1658
1659       vectors += elt->n_vectors;
1660       f->n_vectors = elt->n_vectors;
1661       vlib_put_frame_to_node (vm, fqm->node_index, f);
1662
1663       elt->valid = 0;
1664       elt->n_vectors = 0;
1665       elt->msg_type = 0xfefefefe;
1666       CLIB_MEMORY_BARRIER ();
1667       fq->head++;
1668       processed++;
1669
1670       /*
1671        * Limit the number of packets pushed into the graph
1672        */
1673       if (vectors >= fq->vector_threshold)
1674         {
1675           fq->head_hint = fq->head;
1676           return processed;
1677         }
1678     }
1679   ASSERT (0);
1680   return processed;
1681 }
1682
1683 void
1684 vlib_worker_thread_fn (void *arg)
1685 {
1686   vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg;
1687   vlib_thread_main_t *tm = vlib_get_thread_main ();
1688   vlib_main_t *vm = vlib_get_main ();
1689   clib_error_t *e;
1690
1691   ASSERT (vm->thread_index == vlib_get_thread_index ());
1692
1693   vlib_worker_thread_init (w);
1694   clib_time_init (&vm->clib_time);
1695   clib_mem_set_heap (w->thread_mheap);
1696
1697   /* Wait until the dpdk init sequence is complete */
1698   while (tm->extern_thread_mgmt && tm->worker_thread_release == 0)
1699     vlib_worker_thread_barrier_check ();
1700
1701   e = vlib_call_init_exit_functions
1702     (vm, vm->worker_init_function_registrations, 1 /* call_once */ );
1703   if (e)
1704     clib_error_report (e);
1705
1706   vlib_worker_loop (vm);
1707 }
1708
1709 /* *INDENT-OFF* */
1710 VLIB_REGISTER_THREAD (worker_thread_reg, static) = {
1711   .name = "workers",
1712   .short_name = "wk",
1713   .function = vlib_worker_thread_fn,
1714 };
1715 /* *INDENT-ON* */
1716
1717 u32
1718 vlib_frame_queue_main_init (u32 node_index, u32 frame_queue_nelts)
1719 {
1720   vlib_thread_main_t *tm = vlib_get_thread_main ();
1721   vlib_frame_queue_main_t *fqm;
1722   vlib_frame_queue_t *fq;
1723   int i;
1724
1725   if (frame_queue_nelts == 0)
1726     frame_queue_nelts = FRAME_QUEUE_NELTS;
1727
1728   ASSERT (frame_queue_nelts >= 8);
1729
1730   vec_add2 (tm->frame_queue_mains, fqm, 1);
1731
1732   fqm->node_index = node_index;
1733   fqm->frame_queue_nelts = frame_queue_nelts;
1734   fqm->queue_hi_thresh = frame_queue_nelts - 2;
1735
1736   vec_validate (fqm->vlib_frame_queues, tm->n_vlib_mains - 1);
1737   vec_validate (fqm->per_thread_data, tm->n_vlib_mains - 1);
1738   _vec_len (fqm->vlib_frame_queues) = 0;
1739   for (i = 0; i < tm->n_vlib_mains; i++)
1740     {
1741       vlib_frame_queue_per_thread_data_t *ptd;
1742       fq = vlib_frame_queue_alloc (frame_queue_nelts);
1743       vec_add1 (fqm->vlib_frame_queues, fq);
1744
1745       ptd = vec_elt_at_index (fqm->per_thread_data, i);
1746       vec_validate (ptd->handoff_queue_elt_by_thread_index,
1747                     tm->n_vlib_mains - 1);
1748       vec_validate_init_empty (ptd->congested_handoff_queue_by_thread_index,
1749                                tm->n_vlib_mains - 1,
1750                                (vlib_frame_queue_t *) (~0));
1751     }
1752
1753   return (fqm - tm->frame_queue_mains);
1754 }
1755
1756 int
1757 vlib_thread_cb_register (struct vlib_main_t *vm, vlib_thread_callbacks_t * cb)
1758 {
1759   vlib_thread_main_t *tm = vlib_get_thread_main ();
1760
1761   if (tm->extern_thread_mgmt)
1762     return -1;
1763
1764   tm->cb.vlib_launch_thread_cb = cb->vlib_launch_thread_cb;
1765   tm->extern_thread_mgmt = 1;
1766   return 0;
1767 }
1768
1769 void
1770 vlib_process_signal_event_mt_helper (vlib_process_signal_event_mt_args_t *
1771                                      args)
1772 {
1773   ASSERT (vlib_get_thread_index () == 0);
1774   vlib_process_signal_event (vlib_get_main (), args->node_index,
1775                              args->type_opaque, args->data);
1776 }
1777
1778 void *rpc_call_main_thread_cb_fn;
1779
1780 void
1781 vlib_rpc_call_main_thread (void *callback, u8 * args, u32 arg_size)
1782 {
1783   if (rpc_call_main_thread_cb_fn)
1784     {
1785       void (*fp) (void *, u8 *, u32) = rpc_call_main_thread_cb_fn;
1786       (*fp) (callback, args, arg_size);
1787     }
1788   else
1789     clib_warning ("BUG: rpc_call_main_thread_cb_fn NULL!");
1790 }
1791
1792 clib_error_t *
1793 threads_init (vlib_main_t * vm)
1794 {
1795   return 0;
1796 }
1797
1798 VLIB_INIT_FUNCTION (threads_init);
1799
1800 /*
1801  * fd.io coding-style-patch-verification: ON
1802  *
1803  * Local Variables:
1804  * eval: (c-set-style "gnu")
1805  * End:
1806  */