fd-io-styleify pass
[vpp.git] / vlib / vlib / threads.c
1 /*
2  * Copyright (c) 2015 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define _GNU_SOURCE
16 #include <sched.h>
17
18 #include <signal.h>
19 #include <math.h>
20 #include <vppinfra/format.h>
21 #include <vlib/vlib.h>
22
23 #include <vlib/threads.h>
24 #include <vlib/unix/cj.h>
25
26
27 #if DPDK==1
28 #include <rte_config.h>
29 #include <rte_common.h>
30 #include <rte_eal.h>
31 #include <rte_launch.h>
32 #include <rte_lcore.h>
33 #endif
34 DECLARE_CJ_GLOBAL_LOG;
35
36 #define FRAME_QUEUE_NELTS 32
37
38
39 #if DPDK==1
40 /*
41  *  Weak definitions of DPDK symbols used in this file.
42  *  Needed for linking test programs without DPDK libs.
43  */
44 unsigned __thread __attribute__ ((weak)) RTE_PER_LCORE (_lcore_id);
45 struct lcore_config __attribute__ ((weak)) lcore_config[];
46 unsigned __attribute__ ((weak)) rte_socket_id ();
47 int __attribute__ ((weak)) rte_eal_remote_launch ();
48 #endif
49 u32
50 vl (void *p)
51 {
52   return vec_len (p);
53 }
54
55 vlib_thread_main_t vlib_thread_main;
56
57 uword
58 os_get_cpu_number (void)
59 {
60   void *sp;
61   uword n;
62   u32 len;
63
64   len = vec_len (vlib_thread_stacks);
65   if (len == 0)
66     return 0;
67
68   /* Get any old stack address. */
69   sp = &sp;
70
71   n = ((uword) sp - (uword) vlib_thread_stacks[0])
72     >> VLIB_LOG2_THREAD_STACK_SIZE;
73
74   /* "processes" have their own stacks, and they always run in thread 0 */
75   n = n >= len ? 0 : n;
76
77   return n;
78 }
79
80 void
81 vlib_set_thread_name (char *name)
82 {
83   int pthread_setname_np (pthread_t __target_thread, const char *__name);
84   pthread_t thread = pthread_self ();
85
86   if (thread)
87     pthread_setname_np (thread, name);
88 }
89
90 static int
91 sort_registrations_by_no_clone (void *a0, void *a1)
92 {
93   vlib_thread_registration_t **tr0 = a0;
94   vlib_thread_registration_t **tr1 = a1;
95
96   return ((i32) ((*tr0)->no_data_structure_clone)
97           - ((i32) ((*tr1)->no_data_structure_clone)));
98 }
99
100 static uword *
101 vlib_sysfs_list_to_bitmap (char *filename)
102 {
103   FILE *fp;
104   uword *r = 0;
105
106   fp = fopen (filename, "r");
107
108   if (fp != NULL)
109     {
110       u8 *buffer = 0;
111       vec_validate (buffer, 256 - 1);
112       if (fgets ((char *) buffer, 256, fp))
113         {
114           unformat_input_t in;
115           unformat_init_string (&in, (char *) buffer,
116                                 strlen ((char *) buffer));
117           unformat (&in, "%U", unformat_bitmap_list, &r);
118           unformat_free (&in);
119         }
120       vec_free (buffer);
121       fclose (fp);
122     }
123   return r;
124 }
125
126
127 /* Called early in the init sequence */
128
129 clib_error_t *
130 vlib_thread_init (vlib_main_t * vm)
131 {
132   vlib_thread_main_t *tm = &vlib_thread_main;
133   vlib_worker_thread_t *w;
134   vlib_thread_registration_t *tr;
135   u32 n_vlib_mains = 1;
136   u32 first_index = 1;
137   u32 i;
138   uword *avail_cpu;
139
140   /* get bitmaps of active cpu cores and sockets */
141   tm->cpu_core_bitmap =
142     vlib_sysfs_list_to_bitmap ("/sys/devices/system/cpu/online");
143   tm->cpu_socket_bitmap =
144     vlib_sysfs_list_to_bitmap ("/sys/devices/system/node/online");
145
146   avail_cpu = clib_bitmap_dup (tm->cpu_core_bitmap);
147
148   /* skip cores */
149   for (i = 0; i < tm->skip_cores; i++)
150     {
151       uword c = clib_bitmap_first_set (avail_cpu);
152       if (c == ~0)
153         return clib_error_return (0, "no available cpus to skip");
154
155       avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
156     }
157
158   /* grab cpu for main thread */
159   if (!tm->main_lcore)
160     {
161       tm->main_lcore = clib_bitmap_first_set (avail_cpu);
162       if (tm->main_lcore == (u8) ~ 0)
163         return clib_error_return (0, "no available cpus to be used for the"
164                                   " main thread");
165     }
166   else
167     {
168       if (clib_bitmap_get (avail_cpu, tm->main_lcore) == 0)
169         return clib_error_return (0, "cpu %u is not available to be used"
170                                   " for the main thread", tm->main_lcore);
171     }
172   avail_cpu = clib_bitmap_set (avail_cpu, tm->main_lcore, 0);
173
174   /* assume that there is socket 0 only if there is no data from sysfs */
175   if (!tm->cpu_socket_bitmap)
176     tm->cpu_socket_bitmap = clib_bitmap_set (0, 0, 1);
177
178   /* pin main thread to main_lcore  */
179 #if DPDK==0
180   {
181     cpu_set_t cpuset;
182     CPU_ZERO (&cpuset);
183     CPU_SET (tm->main_lcore, &cpuset);
184     pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
185   }
186 #endif
187
188   /* as many threads as stacks... */
189   vec_validate_aligned (vlib_worker_threads, vec_len (vlib_thread_stacks) - 1,
190                         CLIB_CACHE_LINE_BYTES);
191
192   /* Preallocate thread 0 */
193   _vec_len (vlib_worker_threads) = 1;
194   w = vlib_worker_threads;
195   w->thread_mheap = clib_mem_get_heap ();
196   w->thread_stack = vlib_thread_stacks[0];
197   w->dpdk_lcore_id = -1;
198   w->lwp = syscall (SYS_gettid);
199   tm->n_vlib_mains = 1;
200
201   /* assign threads to cores and set n_vlib_mains */
202   tr = tm->next;
203
204   while (tr)
205     {
206       vec_add1 (tm->registrations, tr);
207       tr = tr->next;
208     }
209
210   vec_sort_with_function (tm->registrations, sort_registrations_by_no_clone);
211
212   for (i = 0; i < vec_len (tm->registrations); i++)
213     {
214       int j;
215       tr = tm->registrations[i];
216       tr->first_index = first_index;
217       first_index += tr->count;
218       n_vlib_mains += (tr->no_data_structure_clone == 0) ? tr->count : 0;
219
220       /* construct coremask */
221       if (tr->use_pthreads || !tr->count)
222         continue;
223
224       if (tr->coremask)
225         {
226           uword c;
227           /* *INDENT-OFF* */
228           clib_bitmap_foreach (c, tr->coremask, ({
229             if (clib_bitmap_get(avail_cpu, c) == 0)
230               return clib_error_return (0, "cpu %u is not available to be used"
231                                         " for the '%s' thread",c, tr->name);
232
233             avail_cpu = clib_bitmap_set(avail_cpu, c, 0);
234           }));
235 /* *INDENT-ON* */
236
237         }
238       else
239         {
240           for (j = 0; j < tr->count; j++)
241             {
242               uword c = clib_bitmap_first_set (avail_cpu);
243               if (c == ~0)
244                 return clib_error_return (0,
245                                           "no available cpus to be used for"
246                                           " the '%s' thread", tr->name);
247
248               avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
249               tr->coremask = clib_bitmap_set (tr->coremask, c, 1);
250             }
251         }
252     }
253
254   clib_bitmap_free (avail_cpu);
255
256   tm->n_vlib_mains = n_vlib_mains;
257
258   vec_validate_aligned (vlib_worker_threads, first_index - 1,
259                         CLIB_CACHE_LINE_BYTES);
260
261
262   tm->efd.enabled = VLIB_EFD_DISABLED;
263   tm->efd.queue_hi_thresh = ((VLIB_EFD_DEF_WORKER_HI_THRESH_PCT *
264                               FRAME_QUEUE_NELTS) / 100);
265   return 0;
266 }
267
268 vlib_worker_thread_t *
269 vlib_alloc_thread (vlib_main_t * vm)
270 {
271   vlib_worker_thread_t *w;
272
273   if (vec_len (vlib_worker_threads) >= vec_len (vlib_thread_stacks))
274     {
275       clib_warning ("out of worker threads... Quitting...");
276       exit (1);
277     }
278   vec_add2 (vlib_worker_threads, w, 1);
279   w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
280   return w;
281 }
282
283 vlib_frame_queue_t *
284 vlib_frame_queue_alloc (int nelts)
285 {
286   vlib_frame_queue_t *fq;
287
288   fq = clib_mem_alloc_aligned (sizeof (*fq), CLIB_CACHE_LINE_BYTES);
289   memset (fq, 0, sizeof (*fq));
290   fq->nelts = nelts;
291   fq->vector_threshold = 128;   // packets
292   vec_validate_aligned (fq->elts, nelts - 1, CLIB_CACHE_LINE_BYTES);
293
294   if (1)
295     {
296       if (((uword) & fq->tail) & (CLIB_CACHE_LINE_BYTES - 1))
297         fformat (stderr, "WARNING: fq->tail unaligned\n");
298       if (((uword) & fq->head) & (CLIB_CACHE_LINE_BYTES - 1))
299         fformat (stderr, "WARNING: fq->head unaligned\n");
300       if (((uword) fq->elts) & (CLIB_CACHE_LINE_BYTES - 1))
301         fformat (stderr, "WARNING: fq->elts unaligned\n");
302
303       if (sizeof (fq->elts[0]) % CLIB_CACHE_LINE_BYTES)
304         fformat (stderr, "WARNING: fq->elts[0] size %d\n",
305                  sizeof (fq->elts[0]));
306       if (nelts & (nelts - 1))
307         {
308           fformat (stderr, "FATAL: nelts MUST be a power of 2\n");
309           abort ();
310         }
311     }
312
313   return (fq);
314 }
315
316 void vl_msg_api_handler_no_free (void *) __attribute__ ((weak));
317 void
318 vl_msg_api_handler_no_free (void *v)
319 {
320 }
321
322 /* Turned off, save as reference material... */
323 #if 0
324 static inline int
325 vlib_frame_queue_dequeue_internal (int thread_id,
326                                    vlib_main_t * vm, vlib_node_main_t * nm)
327 {
328   vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
329   vlib_frame_queue_elt_t *elt;
330   vlib_frame_t *f;
331   vlib_pending_frame_t *p;
332   vlib_node_runtime_t *r;
333   u32 node_runtime_index;
334   int msg_type;
335   u64 before;
336   int processed = 0;
337
338   ASSERT (vm == vlib_mains[thread_id]);
339
340   while (1)
341     {
342       if (fq->head == fq->tail)
343         return processed;
344
345       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
346
347       if (!elt->valid)
348         return processed;
349
350       before = clib_cpu_time_now ();
351
352       f = elt->frame;
353       node_runtime_index = elt->node_runtime_index;
354       msg_type = elt->msg_type;
355
356       switch (msg_type)
357         {
358         case VLIB_FRAME_QUEUE_ELT_FREE_BUFFERS:
359           vlib_buffer_free (vm, vlib_frame_vector_args (f), f->n_vectors);
360           /* note fallthrough... */
361         case VLIB_FRAME_QUEUE_ELT_FREE_FRAME:
362           r = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
363                                 node_runtime_index);
364           vlib_frame_free (vm, r, f);
365           break;
366         case VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME:
367           vec_add2 (vm->node_main.pending_frames, p, 1);
368           f->flags |= (VLIB_FRAME_PENDING | VLIB_FRAME_FREE_AFTER_DISPATCH);
369           p->node_runtime_index = elt->node_runtime_index;
370           p->frame_index = vlib_frame_index (vm, f);
371           p->next_frame_index = VLIB_PENDING_FRAME_NO_NEXT_FRAME;
372           fq->dequeue_vectors += (u64) f->n_vectors;
373           break;
374         case VLIB_FRAME_QUEUE_ELT_API_MSG:
375           vl_msg_api_handler_no_free (f);
376           break;
377         default:
378           clib_warning ("bogus frame queue message, type %d", msg_type);
379           break;
380         }
381       elt->valid = 0;
382       fq->dequeues++;
383       fq->dequeue_ticks += clib_cpu_time_now () - before;
384       CLIB_MEMORY_BARRIER ();
385       fq->head++;
386       processed++;
387     }
388   ASSERT (0);
389   return processed;
390 }
391
392 int
393 vlib_frame_queue_dequeue (int thread_id,
394                           vlib_main_t * vm, vlib_node_main_t * nm)
395 {
396   return vlib_frame_queue_dequeue_internal (thread_id, vm, nm);
397 }
398
399 int
400 vlib_frame_queue_enqueue (vlib_main_t * vm, u32 node_runtime_index,
401                           u32 frame_queue_index, vlib_frame_t * frame,
402                           vlib_frame_queue_msg_type_t type)
403 {
404   vlib_frame_queue_t *fq = vlib_frame_queues[frame_queue_index];
405   vlib_frame_queue_elt_t *elt;
406   u32 save_count;
407   u64 new_tail;
408   u64 before = clib_cpu_time_now ();
409
410   ASSERT (fq);
411
412   new_tail = __sync_add_and_fetch (&fq->tail, 1);
413
414   /* Wait until a ring slot is available */
415   while (new_tail >= fq->head + fq->nelts)
416     {
417       f64 b4 = vlib_time_now_ticks (vm, before);
418       vlib_worker_thread_barrier_check (vm, b4);
419       /* Bad idea. Dequeue -> enqueue -> dequeue -> trouble */
420       // vlib_frame_queue_dequeue (vm->cpu_index, vm, nm);
421     }
422
423   elt = fq->elts + (new_tail & (fq->nelts - 1));
424
425   /* this would be very bad... */
426   while (elt->valid)
427     {
428     }
429
430   /* Once we enqueue the frame, frame->n_vectors is owned elsewhere... */
431   save_count = frame->n_vectors;
432
433   elt->frame = frame;
434   elt->node_runtime_index = node_runtime_index;
435   elt->msg_type = type;
436   CLIB_MEMORY_BARRIER ();
437   elt->valid = 1;
438
439   return save_count;
440 }
441 #endif /* 0 */
442
443 /* To be called by vlib worker threads upon startup */
444 void
445 vlib_worker_thread_init (vlib_worker_thread_t * w)
446 {
447   vlib_thread_main_t *tm = vlib_get_thread_main ();
448
449   /* worker threads wants no signals. */
450   {
451     sigset_t s;
452     sigfillset (&s);
453     pthread_sigmask (SIG_SETMASK, &s, 0);
454   }
455
456   clib_mem_set_heap (w->thread_mheap);
457
458   if (vec_len (tm->thread_prefix) && w->registration->short_name)
459     {
460       w->name = format (0, "%v_%s_%d%c", tm->thread_prefix,
461                         w->registration->short_name, w->instance_id, '\0');
462       vlib_set_thread_name ((char *) w->name);
463     }
464
465   if (!w->registration->use_pthreads)
466     {
467
468       /* Initial barrier sync, for both worker and i/o threads */
469       clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, 1);
470
471       while (*vlib_worker_threads->wait_at_barrier)
472         ;
473
474       clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, -1);
475     }
476 }
477
478 void *
479 vlib_worker_thread_bootstrap_fn (void *arg)
480 {
481   void *rv;
482   vlib_worker_thread_t *w = arg;
483
484   w->lwp = syscall (SYS_gettid);
485   w->dpdk_lcore_id = -1;
486 #if DPDK==1
487   if (w->registration && !w->registration->use_pthreads && rte_socket_id)       /* do we really have dpdk linked */
488     {
489       unsigned lcore = rte_lcore_id ();
490       lcore = lcore < RTE_MAX_LCORE ? lcore : -1;
491       w->dpdk_lcore_id = lcore;
492     }
493 #endif
494
495   rv = (void *) clib_calljmp
496     ((uword (*)(uword)) w->thread_function,
497      (uword) arg, w->thread_stack + VLIB_THREAD_STACK_SIZE);
498   /* NOTREACHED, we hope */
499   return rv;
500 }
501
502 static int
503 vlib_launch_thread (void *fp, vlib_worker_thread_t * w, unsigned lcore_id)
504 {
505   void *(*fp_arg) (void *) = fp;
506
507 #if DPDK==1
508   if (!w->registration->use_pthreads)
509     if (rte_eal_remote_launch)  /* do we have dpdk linked */
510       return rte_eal_remote_launch (fp, (void *) w, lcore_id);
511     else
512       return -1;
513   else
514 #endif
515     {
516       int ret;
517       pthread_t worker;
518       cpu_set_t cpuset;
519       CPU_ZERO (&cpuset);
520       CPU_SET (lcore_id, &cpuset);
521
522       ret = pthread_create (&worker, NULL /* attr */ , fp_arg, (void *) w);
523       if (ret == 0)
524         return pthread_setaffinity_np (worker, sizeof (cpu_set_t), &cpuset);
525       else
526         return ret;
527     }
528 }
529
530 static clib_error_t *
531 start_workers (vlib_main_t * vm)
532 {
533   int i, j;
534   vlib_worker_thread_t *w;
535   vlib_main_t *vm_clone;
536   void *oldheap;
537   vlib_frame_queue_t *fq;
538   vlib_thread_main_t *tm = &vlib_thread_main;
539   vlib_thread_registration_t *tr;
540   vlib_node_runtime_t *rt;
541   u32 n_vlib_mains = tm->n_vlib_mains;
542   u32 worker_thread_index;
543   u8 *main_heap = clib_mem_get_per_cpu_heap ();
544   mheap_t *main_heap_header = mheap_header (main_heap);
545
546   vec_reset_length (vlib_worker_threads);
547
548   /* Set up the main thread */
549   vec_add2_aligned (vlib_worker_threads, w, 1, CLIB_CACHE_LINE_BYTES);
550   w->elog_track.name = "main thread";
551   elog_track_register (&vm->elog_main, &w->elog_track);
552
553   if (vec_len (tm->thread_prefix))
554     {
555       w->name = format (0, "%v_main%c", tm->thread_prefix, '\0');
556       vlib_set_thread_name ((char *) w->name);
557     }
558
559 #if DPDK==1
560   w->dpdk_lcore_id = -1;
561   if (rte_socket_id)            /* do we really have dpdk linked */
562     {
563       unsigned lcore = rte_lcore_id ();
564       w->dpdk_lcore_id = lcore < RTE_MAX_LCORE ? lcore : -1;;
565     }
566 #endif
567
568   /*
569    * Truth of the matter: we always use at least two
570    * threads. So, make the main heap thread-safe
571    * and make the event log thread-safe.
572    */
573   main_heap_header->flags |= MHEAP_FLAG_THREAD_SAFE;
574   vm->elog_main.lock =
575     clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES, CLIB_CACHE_LINE_BYTES);
576   vm->elog_main.lock[0] = 0;
577
578   if (n_vlib_mains > 1)
579     {
580       vec_validate (vlib_mains, tm->n_vlib_mains - 1);
581       _vec_len (vlib_mains) = 0;
582       vec_add1 (vlib_mains, vm);
583
584       vec_validate (vlib_frame_queues, tm->n_vlib_mains - 1);
585       _vec_len (vlib_frame_queues) = 0;
586       fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS);
587       vec_add1 (vlib_frame_queues, fq);
588
589       vlib_worker_threads->wait_at_barrier =
590         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
591       vlib_worker_threads->workers_at_barrier =
592         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
593
594       /* Ask for an initial barrier sync */
595       *vlib_worker_threads->workers_at_barrier = 0;
596       *vlib_worker_threads->wait_at_barrier = 1;
597
598       worker_thread_index = 1;
599
600       for (i = 0; i < vec_len (tm->registrations); i++)
601         {
602           vlib_node_main_t *nm, *nm_clone;
603           vlib_buffer_main_t *bm_clone;
604           vlib_buffer_free_list_t *fl_clone, *fl_orig;
605           vlib_buffer_free_list_t *orig_freelist_pool;
606           int k;
607
608           tr = tm->registrations[i];
609
610           if (tr->count == 0)
611             continue;
612
613           for (k = 0; k < tr->count; k++)
614             {
615               vec_add2 (vlib_worker_threads, w, 1);
616               if (tr->mheap_size)
617                 w->thread_mheap =
618                   mheap_alloc (0 /* use VM */ , tr->mheap_size);
619               else
620                 w->thread_mheap = main_heap;
621               w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
622               w->thread_function = tr->function;
623               w->thread_function_arg = w;
624               w->instance_id = k;
625               w->registration = tr;
626
627               w->elog_track.name =
628                 (char *) format (0, "%s %d", tr->name, k + 1);
629               vec_add1 (w->elog_track.name, 0);
630               elog_track_register (&vm->elog_main, &w->elog_track);
631
632               if (tr->no_data_structure_clone)
633                 continue;
634
635               /* Allocate "to-worker-N" frame queue */
636               if (tr->frame_queue_nelts)
637                 {
638                   fq = vlib_frame_queue_alloc (tr->frame_queue_nelts);
639                 }
640               else
641                 {
642                   fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS);
643                 }
644
645               vec_validate (vlib_frame_queues, worker_thread_index);
646               vlib_frame_queues[worker_thread_index] = fq;
647
648               /* Fork vlib_global_main et al. Look for bugs here */
649               oldheap = clib_mem_set_heap (w->thread_mheap);
650
651               vm_clone = clib_mem_alloc (sizeof (*vm_clone));
652               clib_memcpy (vm_clone, vlib_mains[0], sizeof (*vm_clone));
653
654               vm_clone->cpu_index = worker_thread_index;
655               vm_clone->heap_base = w->thread_mheap;
656               vm_clone->mbuf_alloc_list = 0;
657               memset (&vm_clone->random_buffer, 0,
658                       sizeof (vm_clone->random_buffer));
659
660               nm = &vlib_mains[0]->node_main;
661               nm_clone = &vm_clone->node_main;
662               /* fork next frames array, preserving node runtime indices */
663               nm_clone->next_frames = vec_dup (nm->next_frames);
664               for (j = 0; j < vec_len (nm_clone->next_frames); j++)
665                 {
666                   vlib_next_frame_t *nf = &nm_clone->next_frames[j];
667                   u32 save_node_runtime_index;
668                   u32 save_flags;
669
670                   save_node_runtime_index = nf->node_runtime_index;
671                   save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
672                   vlib_next_frame_init (nf);
673                   nf->node_runtime_index = save_node_runtime_index;
674                   nf->flags = save_flags;
675                 }
676
677               /* fork the frame dispatch queue */
678               nm_clone->pending_frames = 0;
679               vec_validate (nm_clone->pending_frames, 10);      /* $$$$$?????? */
680               _vec_len (nm_clone->pending_frames) = 0;
681
682               /* fork nodes */
683               nm_clone->nodes = 0;
684               for (j = 0; j < vec_len (nm->nodes); j++)
685                 {
686                   vlib_node_t *n;
687                   n = clib_mem_alloc_no_fail (sizeof (*n));
688                   clib_memcpy (n, nm->nodes[j], sizeof (*n));
689                   /* none of the copied nodes have enqueue rights given out */
690                   n->owner_node_index = VLIB_INVALID_NODE_INDEX;
691                   memset (&n->stats_total, 0, sizeof (n->stats_total));
692                   memset (&n->stats_last_clear, 0,
693                           sizeof (n->stats_last_clear));
694                   vec_add1 (nm_clone->nodes, n);
695                 }
696               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
697                 vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
698
699               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
700                 vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
701               vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
702                 rt->cpu_index = vm_clone->cpu_index;
703
704               nm_clone->processes = vec_dup (nm->processes);
705
706               /* zap the (per worker) frame freelists, etc */
707               nm_clone->frame_sizes = 0;
708               nm_clone->frame_size_hash = 0;
709
710               /* Packet trace buffers are guaranteed to be empty, nothing to do here */
711
712               clib_mem_set_heap (oldheap);
713               vec_add1 (vlib_mains, vm_clone);
714
715               vm_clone->error_main.counters =
716                 vec_dup (vlib_mains[0]->error_main.counters);
717               vm_clone->error_main.counters_last_clear =
718                 vec_dup (vlib_mains[0]->error_main.counters_last_clear);
719
720               /* Fork the vlib_buffer_main_t free lists, etc. */
721               bm_clone = vec_dup (vm_clone->buffer_main);
722               vm_clone->buffer_main = bm_clone;
723
724               orig_freelist_pool = bm_clone->buffer_free_list_pool;
725               bm_clone->buffer_free_list_pool = 0;
726
727             /* *INDENT-OFF* */
728             pool_foreach (fl_orig, orig_freelist_pool,
729                           ({
730                             pool_get_aligned (bm_clone->buffer_free_list_pool,
731                                               fl_clone, CLIB_CACHE_LINE_BYTES);
732                             ASSERT (fl_orig - orig_freelist_pool
733                                     == fl_clone - bm_clone->buffer_free_list_pool);
734
735                             fl_clone[0] = fl_orig[0];
736                             fl_clone->aligned_buffers = 0;
737                             fl_clone->unaligned_buffers = 0;
738                             fl_clone->n_alloc = 0;
739                           }));
740 /* *INDENT-ON* */
741
742               worker_thread_index++;
743             }
744         }
745     }
746   else
747     {
748       /* only have non-data-structure copy threads to create... */
749       for (i = 0; i < vec_len (tm->registrations); i++)
750         {
751           tr = tm->registrations[i];
752
753           for (j = 0; j < tr->count; j++)
754             {
755               vec_add2 (vlib_worker_threads, w, 1);
756               if (tr->mheap_size)
757                 w->thread_mheap =
758                   mheap_alloc (0 /* use VM */ , tr->mheap_size);
759               else
760                 w->thread_mheap = main_heap;
761               w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
762               w->thread_function = tr->function;
763               w->thread_function_arg = w;
764               w->instance_id = j;
765               w->elog_track.name =
766                 (char *) format (0, "%s %d", tr->name, j + 1);
767               w->registration = tr;
768               vec_add1 (w->elog_track.name, 0);
769               elog_track_register (&vm->elog_main, &w->elog_track);
770             }
771         }
772     }
773
774   worker_thread_index = 1;
775
776   for (i = 0; i < vec_len (tm->registrations); i++)
777     {
778       int j;
779
780       tr = tm->registrations[i];
781
782       if (tr->use_pthreads || tm->use_pthreads)
783         {
784           for (j = 0; j < tr->count; j++)
785             {
786               w = vlib_worker_threads + worker_thread_index++;
787               if (vlib_launch_thread (vlib_worker_thread_bootstrap_fn, w, 0) <
788                   0)
789                 clib_warning ("Couldn't start '%s' pthread ", tr->name);
790             }
791         }
792       else
793         {
794           uword c;
795             /* *INDENT-OFF* */
796             clib_bitmap_foreach (c, tr->coremask, ({
797               w = vlib_worker_threads + worker_thread_index++;
798               if (vlib_launch_thread (vlib_worker_thread_bootstrap_fn, w, c) < 0)
799                 clib_warning ("Couldn't start DPDK lcore %d", c);
800
801             }));
802 /* *INDENT-ON* */
803         }
804     }
805   vlib_worker_thread_barrier_sync (vm);
806   vlib_worker_thread_barrier_release (vm);
807   return 0;
808 }
809
810 VLIB_MAIN_LOOP_ENTER_FUNCTION (start_workers);
811
812 void
813 vlib_worker_thread_node_runtime_update (void)
814 {
815   int i, j;
816   vlib_worker_thread_t *w;
817   vlib_main_t *vm;
818   vlib_node_main_t *nm, *nm_clone;
819   vlib_node_t **old_nodes_clone;
820   vlib_main_t *vm_clone;
821   vlib_node_runtime_t *rt, *old_rt;
822   void *oldheap;
823   never_inline void
824     vlib_node_runtime_sync_stats (vlib_main_t * vm,
825                                   vlib_node_runtime_t * r,
826                                   uword n_calls,
827                                   uword n_vectors, uword n_clocks);
828
829   ASSERT (os_get_cpu_number () == 0);
830
831   if (vec_len (vlib_mains) == 0)
832     return;
833
834   vm = vlib_mains[0];
835   nm = &vm->node_main;
836
837   ASSERT (os_get_cpu_number () == 0);
838   ASSERT (*vlib_worker_threads->wait_at_barrier == 1);
839
840   /*
841    * Scrape all runtime stats, so we don't lose node runtime(s) with
842    * pending counts, or throw away worker / io thread counts.
843    */
844   for (j = 0; j < vec_len (nm->nodes); j++)
845     {
846       vlib_node_t *n;
847       n = nm->nodes[j];
848       vlib_node_sync_stats (vm, n);
849     }
850
851   for (i = 1; i < vec_len (vlib_mains); i++)
852     {
853       vlib_node_t *n;
854
855       vm_clone = vlib_mains[i];
856       nm_clone = &vm_clone->node_main;
857
858       for (j = 0; j < vec_len (nm_clone->nodes); j++)
859         {
860           n = nm_clone->nodes[j];
861
862           rt = vlib_node_get_runtime (vm_clone, n->index);
863           vlib_node_runtime_sync_stats (vm_clone, rt, 0, 0, 0);
864         }
865     }
866
867   for (i = 1; i < vec_len (vlib_mains); i++)
868     {
869       vlib_node_runtime_t *rt;
870       w = vlib_worker_threads + i;
871       oldheap = clib_mem_set_heap (w->thread_mheap);
872
873       vm_clone = vlib_mains[i];
874
875       /* Re-clone error heap */
876       u64 *old_counters = vm_clone->error_main.counters;
877       u64 *old_counters_all_clear = vm_clone->error_main.counters_last_clear;
878       clib_memcpy (&vm_clone->error_main, &vm->error_main,
879                    sizeof (vm->error_main));
880       j = vec_len (vm->error_main.counters) - 1;
881       vec_validate_aligned (old_counters, j, CLIB_CACHE_LINE_BYTES);
882       vec_validate_aligned (old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES);
883       vm_clone->error_main.counters = old_counters;
884       vm_clone->error_main.counters_last_clear = old_counters_all_clear;
885
886       nm_clone = &vm_clone->node_main;
887       vec_free (nm_clone->next_frames);
888       nm_clone->next_frames = vec_dup (nm->next_frames);
889
890       for (j = 0; j < vec_len (nm_clone->next_frames); j++)
891         {
892           vlib_next_frame_t *nf = &nm_clone->next_frames[j];
893           u32 save_node_runtime_index;
894           u32 save_flags;
895
896           save_node_runtime_index = nf->node_runtime_index;
897           save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
898           vlib_next_frame_init (nf);
899           nf->node_runtime_index = save_node_runtime_index;
900           nf->flags = save_flags;
901         }
902
903       old_nodes_clone = nm_clone->nodes;
904       nm_clone->nodes = 0;
905
906       /* re-fork nodes */
907       for (j = 0; j < vec_len (nm->nodes); j++)
908         {
909           vlib_node_t *old_n_clone;
910           vlib_node_t *new_n, *new_n_clone;
911
912           new_n = nm->nodes[j];
913           old_n_clone = old_nodes_clone[j];
914
915           new_n_clone = clib_mem_alloc_no_fail (sizeof (*new_n_clone));
916           clib_memcpy (new_n_clone, new_n, sizeof (*new_n));
917           /* none of the copied nodes have enqueue rights given out */
918           new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX;
919
920           if (j >= vec_len (old_nodes_clone))
921             {
922               /* new node, set to zero */
923               memset (&new_n_clone->stats_total, 0,
924                       sizeof (new_n_clone->stats_total));
925               memset (&new_n_clone->stats_last_clear, 0,
926                       sizeof (new_n_clone->stats_last_clear));
927             }
928           else
929             {
930               /* Copy stats if the old data is valid */
931               clib_memcpy (&new_n_clone->stats_total,
932                            &old_n_clone->stats_total,
933                            sizeof (new_n_clone->stats_total));
934               clib_memcpy (&new_n_clone->stats_last_clear,
935                            &old_n_clone->stats_last_clear,
936                            sizeof (new_n_clone->stats_last_clear));
937
938               /* keep previous node state */
939               new_n_clone->state = old_n_clone->state;
940             }
941           vec_add1 (nm_clone->nodes, new_n_clone);
942         }
943       /* Free the old node clone */
944       for (j = 0; j < vec_len (old_nodes_clone); j++)
945         clib_mem_free (old_nodes_clone[j]);
946       vec_free (old_nodes_clone);
947
948       vec_free (nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
949
950       nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
951         vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
952
953       /* clone input node runtime */
954       old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT];
955
956       nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
957         vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
958
959       vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
960       {
961         rt->cpu_index = vm_clone->cpu_index;
962       }
963
964       for (j = 0; j < vec_len (old_rt); j++)
965         {
966           rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
967           rt->state = old_rt[j].state;
968         }
969
970       vec_free (old_rt);
971
972       nm_clone->processes = vec_dup (nm->processes);
973
974       clib_mem_set_heap (oldheap);
975
976       // vnet_main_fork_fixup (i);
977     }
978 }
979
980 static clib_error_t *
981 cpu_config (vlib_main_t * vm, unformat_input_t * input)
982 {
983   vlib_thread_registration_t *tr;
984   uword *p;
985   vlib_thread_main_t *tm = &vlib_thread_main;
986   u8 *name;
987   u64 coremask;
988   uword *bitmap;
989   u32 count;
990
991   tm->thread_registrations_by_name = hash_create_string (0, sizeof (uword));
992   tm->n_thread_stacks = 1;      /* account for main thread */
993
994   tr = tm->next;
995
996   while (tr)
997     {
998       hash_set_mem (tm->thread_registrations_by_name, tr->name, (uword) tr);
999       tr = tr->next;
1000     }
1001
1002   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1003     {
1004       if (unformat (input, "use-pthreads"))
1005         tm->use_pthreads = 1;
1006       else if (unformat (input, "thread-prefix %v", &tm->thread_prefix))
1007         ;
1008       else if (unformat (input, "main-core %u", &tm->main_lcore))
1009         ;
1010       else if (unformat (input, "skip-cores %u", &tm->skip_cores))
1011         ;
1012       else if (unformat (input, "coremask-%s %llx", &name, &coremask))
1013         {
1014           p = hash_get_mem (tm->thread_registrations_by_name, name);
1015           if (p == 0)
1016             return clib_error_return (0, "no such thread type '%s'", name);
1017
1018           tr = (vlib_thread_registration_t *) p[0];
1019
1020           if (tr->use_pthreads)
1021             return clib_error_return (0,
1022                                       "coremask cannot be set for '%s' threads",
1023                                       name);
1024
1025           tr->coremask = clib_bitmap_set_multiple
1026             (tr->coremask, 0, coremask, BITS (coremask));
1027           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1028         }
1029       else if (unformat (input, "corelist-%s %U", &name, unformat_bitmap_list,
1030                          &bitmap))
1031         {
1032           p = hash_get_mem (tm->thread_registrations_by_name, name);
1033           if (p == 0)
1034             return clib_error_return (0, "no such thread type '%s'", name);
1035
1036           tr = (vlib_thread_registration_t *) p[0];
1037
1038           if (tr->use_pthreads)
1039             return clib_error_return (0,
1040                                       "corelist cannot be set for '%s' threads",
1041                                       name);
1042
1043           tr->coremask = bitmap;
1044           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1045         }
1046       else if (unformat (input, "%s %u", &name, &count))
1047         {
1048           p = hash_get_mem (tm->thread_registrations_by_name, name);
1049           if (p == 0)
1050             return clib_error_return (0, "no such thread type '%s'", name);
1051
1052           tr = (vlib_thread_registration_t *) p[0];
1053           if (tr->fixed_count)
1054             return clib_error_return
1055               (0, "number of %s threads not configurable", tr->name);
1056           tr->count = count;
1057         }
1058       else
1059         break;
1060     }
1061
1062   tr = tm->next;
1063
1064   if (!tm->thread_prefix)
1065     tm->thread_prefix = format (0, "vpp");
1066
1067   while (tr)
1068     {
1069       tm->n_thread_stacks += tr->count;
1070       tm->n_pthreads += tr->count * tr->use_pthreads;
1071       tm->n_eal_threads += tr->count * (tr->use_pthreads == 0);
1072       tr = tr->next;
1073     }
1074
1075   return 0;
1076 }
1077
1078 VLIB_EARLY_CONFIG_FUNCTION (cpu_config, "cpu");
1079
1080 #if !defined (__x86_64__) && !defined (__aarch64__) && !defined (__powerpc64__) && !defined(__arm__)
1081 void
1082 __sync_fetch_and_add_8 (void)
1083 {
1084   fformat (stderr, "%s called\n", __FUNCTION__);
1085   abort ();
1086 }
1087
1088 void
1089 __sync_add_and_fetch_8 (void)
1090 {
1091   fformat (stderr, "%s called\n", __FUNCTION__);
1092   abort ();
1093 }
1094 #endif
1095
1096 void vnet_main_fixup (vlib_fork_fixup_t which) __attribute__ ((weak));
1097 void
1098 vnet_main_fixup (vlib_fork_fixup_t which)
1099 {
1100 }
1101
1102 void
1103 vlib_worker_thread_fork_fixup (vlib_fork_fixup_t which)
1104 {
1105   vlib_main_t *vm = vlib_get_main ();
1106
1107   if (vlib_mains == 0)
1108     return;
1109
1110   ASSERT (os_get_cpu_number () == 0);
1111   vlib_worker_thread_barrier_sync (vm);
1112
1113   switch (which)
1114     {
1115     case VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX:
1116       vnet_main_fixup (VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX);
1117       break;
1118
1119     default:
1120       ASSERT (0);
1121     }
1122   vlib_worker_thread_barrier_release (vm);
1123 }
1124
1125 void
1126 vlib_worker_thread_barrier_sync (vlib_main_t * vm)
1127 {
1128   f64 deadline;
1129   u32 count;
1130
1131   if (!vlib_mains)
1132     return;
1133
1134   count = vec_len (vlib_mains) - 1;
1135
1136   /* Tolerate recursive calls */
1137   if (++vlib_worker_threads[0].recursion_level > 1)
1138     return;
1139
1140   vlib_worker_threads[0].barrier_sync_count++;
1141
1142   ASSERT (os_get_cpu_number () == 0);
1143
1144   deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT;
1145
1146   *vlib_worker_threads->wait_at_barrier = 1;
1147   while (*vlib_worker_threads->workers_at_barrier != count)
1148     {
1149       if (vlib_time_now (vm) > deadline)
1150         {
1151           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1152           os_panic ();
1153         }
1154     }
1155 }
1156
1157 void
1158 vlib_worker_thread_barrier_release (vlib_main_t * vm)
1159 {
1160   f64 deadline;
1161
1162   if (!vlib_mains)
1163     return;
1164
1165   if (--vlib_worker_threads[0].recursion_level > 0)
1166     return;
1167
1168   deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT;
1169
1170   *vlib_worker_threads->wait_at_barrier = 0;
1171
1172   while (*vlib_worker_threads->workers_at_barrier > 0)
1173     {
1174       if (vlib_time_now (vm) > deadline)
1175         {
1176           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1177           os_panic ();
1178         }
1179     }
1180 }
1181
1182 /*
1183  * Check the frame queue to see if any frames are available.
1184  * If so, pull the packets off the frames and put them to
1185  * the handoff node.
1186  */
1187 static inline int
1188 vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
1189 {
1190   u32 thread_id = vm->cpu_index;
1191   vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
1192   vlib_frame_queue_elt_t *elt;
1193   u32 *from, *to;
1194   vlib_frame_t *f;
1195   int msg_type;
1196   int processed = 0;
1197   u32 n_left_to_node;
1198   u32 vectors = 0;
1199   vlib_thread_main_t *tm = vlib_get_thread_main ();
1200
1201   ASSERT (fq);
1202   ASSERT (vm == vlib_mains[thread_id]);
1203
1204   if (PREDICT_FALSE (tm->handoff_dispatch_node_index == ~0))
1205     return 0;
1206   /*
1207    * Gather trace data for frame queues
1208    */
1209   if (PREDICT_FALSE (fq->trace))
1210     {
1211       frame_queue_trace_t *fqt;
1212       frame_queue_nelt_counter_t *fqh;
1213       u32 elix;
1214
1215       fqt = &tm->frame_queue_traces[thread_id];
1216
1217       fqt->nelts = fq->nelts;
1218       fqt->head = fq->head;
1219       fqt->head_hint = fq->head_hint;
1220       fqt->tail = fq->tail;
1221       fqt->threshold = fq->vector_threshold;
1222       fqt->n_in_use = fqt->tail - fqt->head;
1223       if (fqt->n_in_use >= fqt->nelts)
1224         {
1225           // if beyond max then use max
1226           fqt->n_in_use = fqt->nelts - 1;
1227         }
1228
1229       /* Record the number of elements in use in the histogram */
1230       fqh = &tm->frame_queue_histogram[thread_id];
1231       fqh->count[fqt->n_in_use]++;
1232
1233       /* Record a snapshot of the elements in use */
1234       for (elix = 0; elix < fqt->nelts; elix++)
1235         {
1236           elt = fq->elts + ((fq->head + 1 + elix) & (fq->nelts - 1));
1237           if (1 || elt->valid)
1238             {
1239               fqt->n_vectors[elix] = elt->n_vectors;
1240             }
1241         }
1242       fqt->written = 1;
1243     }
1244
1245   while (1)
1246     {
1247       if (fq->head == fq->tail)
1248         {
1249           fq->head_hint = fq->head;
1250           return processed;
1251         }
1252
1253       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
1254
1255       if (!elt->valid)
1256         {
1257           fq->head_hint = fq->head;
1258           return processed;
1259         }
1260
1261       from = elt->buffer_index;
1262       msg_type = elt->msg_type;
1263
1264       ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME);
1265       ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE);
1266
1267       f = vlib_get_frame_to_node (vm, tm->handoff_dispatch_node_index);
1268
1269       to = vlib_frame_vector_args (f);
1270
1271       n_left_to_node = elt->n_vectors;
1272
1273       while (n_left_to_node >= 4)
1274         {
1275           to[0] = from[0];
1276           to[1] = from[1];
1277           to[2] = from[2];
1278           to[3] = from[3];
1279           to += 4;
1280           from += 4;
1281           n_left_to_node -= 4;
1282         }
1283
1284       while (n_left_to_node > 0)
1285         {
1286           to[0] = from[0];
1287           to++;
1288           from++;
1289           n_left_to_node--;
1290         }
1291
1292       vectors += elt->n_vectors;
1293       f->n_vectors = elt->n_vectors;
1294       vlib_put_frame_to_node (vm, tm->handoff_dispatch_node_index, f);
1295
1296       elt->valid = 0;
1297       elt->n_vectors = 0;
1298       elt->msg_type = 0xfefefefe;
1299       CLIB_MEMORY_BARRIER ();
1300       fq->head++;
1301       processed++;
1302
1303       /*
1304        * Limit the number of packets pushed into the graph
1305        */
1306       if (vectors >= fq->vector_threshold)
1307         {
1308           fq->head_hint = fq->head;
1309           return processed;
1310         }
1311     }
1312   ASSERT (0);
1313   return processed;
1314 }
1315
1316 static_always_inline void
1317 vlib_worker_thread_internal (vlib_main_t * vm)
1318 {
1319   vlib_node_main_t *nm = &vm->node_main;
1320   u64 cpu_time_now = clib_cpu_time_now ();
1321
1322   while (1)
1323     {
1324       vlib_worker_thread_barrier_check ();
1325
1326       vlib_frame_queue_dequeue_internal (vm);
1327
1328       vlib_node_runtime_t *n;
1329       vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT])
1330       {
1331         cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
1332                                       VLIB_NODE_STATE_POLLING, /* frame */ 0,
1333                                       cpu_time_now);
1334       }
1335
1336       if (_vec_len (nm->pending_frames))
1337         {
1338           int i;
1339           cpu_time_now = clib_cpu_time_now ();
1340           for (i = 0; i < _vec_len (nm->pending_frames); i++)
1341             {
1342               vlib_pending_frame_t *p;
1343
1344               p = nm->pending_frames + i;
1345
1346               cpu_time_now = dispatch_pending_node (vm, p, cpu_time_now);
1347             }
1348           _vec_len (nm->pending_frames) = 0;
1349         }
1350       vlib_increment_main_loop_counter (vm);
1351
1352       /* Record time stamp in case there are no enabled nodes and above
1353          calls do not update time stamp. */
1354       cpu_time_now = clib_cpu_time_now ();
1355     }
1356 }
1357
1358 void
1359 vlib_worker_thread_fn (void *arg)
1360 {
1361   vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg;
1362   vlib_thread_main_t *tm = vlib_get_thread_main ();
1363   vlib_main_t *vm = vlib_get_main ();
1364
1365   ASSERT (vm->cpu_index == os_get_cpu_number ());
1366
1367   vlib_worker_thread_init (w);
1368   clib_time_init (&vm->clib_time);
1369   clib_mem_set_heap (w->thread_mheap);
1370
1371   /* Wait until the dpdk init sequence is complete */
1372   while (tm->worker_thread_release == 0)
1373     vlib_worker_thread_barrier_check ();
1374
1375   vlib_worker_thread_internal (vm);
1376 }
1377
1378 /* *INDENT-OFF* */
1379 VLIB_REGISTER_THREAD (worker_thread_reg, static) = {
1380   .name = "workers",
1381   .short_name = "wk",
1382   .function = vlib_worker_thread_fn,
1383 };
1384 /* *INDENT-ON* */
1385
1386 clib_error_t *
1387 threads_init (vlib_main_t * vm)
1388 {
1389   vlib_thread_main_t *tm = vlib_get_thread_main ();
1390
1391   tm->handoff_dispatch_node_index = ~0;
1392
1393   return 0;
1394 }
1395
1396 VLIB_INIT_FUNCTION (threads_init);
1397
1398 /*
1399  * fd.io coding-style-patch-verification: ON
1400  *
1401  * Local Variables:
1402  * eval: (c-set-style "gnu")
1403  * End:
1404  */