vpp_lite: add cpu pinning support (VPP-467)
[vpp.git] / vlib / vlib / threads.c
1 /*
2  * Copyright (c) 2015 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define _GNU_SOURCE
16
17 #include <signal.h>
18 #include <math.h>
19 #include <vppinfra/format.h>
20 #include <vlib/vlib.h>
21
22 #include <vlib/threads.h>
23 #include <vlib/unix/cj.h>
24
25
26 #if DPDK==1
27 #include <rte_config.h>
28 #include <rte_common.h>
29 #include <rte_eal.h>
30 #include <rte_launch.h>
31 #include <rte_lcore.h>
32 #endif
33 DECLARE_CJ_GLOBAL_LOG;
34
35 #define FRAME_QUEUE_NELTS 32
36
37
38 #if DPDK==1
39 /*
40  *  Weak definitions of DPDK symbols used in this file.
41  *  Needed for linking test programs without DPDK libs.
42  */
43 unsigned __thread __attribute__ ((weak)) RTE_PER_LCORE (_lcore_id);
44 struct lcore_config __attribute__ ((weak)) lcore_config[];
45 unsigned __attribute__ ((weak)) rte_socket_id ();
46 int __attribute__ ((weak)) rte_eal_remote_launch ();
47 #endif
48 u32
49 vl (void *p)
50 {
51   return vec_len (p);
52 }
53
54 vlib_thread_main_t vlib_thread_main;
55
56 uword
57 os_get_cpu_number (void)
58 {
59   void *sp;
60   uword n;
61   u32 len;
62
63   len = vec_len (vlib_thread_stacks);
64   if (len == 0)
65     return 0;
66
67   /* Get any old stack address. */
68   sp = &sp;
69
70   n = ((uword) sp - (uword) vlib_thread_stacks[0])
71     >> VLIB_LOG2_THREAD_STACK_SIZE;
72
73   /* "processes" have their own stacks, and they always run in thread 0 */
74   n = n >= len ? 0 : n;
75
76   return n;
77 }
78
79 uword
80 os_get_ncpus (void)
81 {
82   u32 len;
83
84   len = vec_len (vlib_thread_stacks);
85   if (len == 0)
86     return 1;
87   else
88     return len;
89 }
90
91 void
92 vlib_set_thread_name (char *name)
93 {
94   int pthread_setname_np (pthread_t __target_thread, const char *__name);
95   int rv;
96   pthread_t thread = pthread_self ();
97
98   if (thread)
99     {
100       rv = pthread_setname_np (thread, name);
101       if (rv)
102         clib_warning ("pthread_setname_np returned %d", rv);
103     }
104 }
105
106 static int
107 sort_registrations_by_no_clone (void *a0, void *a1)
108 {
109   vlib_thread_registration_t **tr0 = a0;
110   vlib_thread_registration_t **tr1 = a1;
111
112   return ((i32) ((*tr0)->no_data_structure_clone)
113           - ((i32) ((*tr1)->no_data_structure_clone)));
114 }
115
116 static uword *
117 vlib_sysfs_list_to_bitmap (char *filename)
118 {
119   FILE *fp;
120   uword *r = 0;
121
122   fp = fopen (filename, "r");
123
124   if (fp != NULL)
125     {
126       u8 *buffer = 0;
127       vec_validate (buffer, 256 - 1);
128       if (fgets ((char *) buffer, 256, fp))
129         {
130           unformat_input_t in;
131           unformat_init_string (&in, (char *) buffer,
132                                 strlen ((char *) buffer));
133           if (unformat (&in, "%U", unformat_bitmap_list, &r) != 1)
134             clib_warning ("unformat_bitmap_list failed");
135           unformat_free (&in);
136         }
137       vec_free (buffer);
138       fclose (fp);
139     }
140   return r;
141 }
142
143
144 /* Called early in the init sequence */
145
146 clib_error_t *
147 vlib_thread_init (vlib_main_t * vm)
148 {
149   vlib_thread_main_t *tm = &vlib_thread_main;
150   vlib_worker_thread_t *w;
151   vlib_thread_registration_t *tr;
152   u32 n_vlib_mains = 1;
153   u32 first_index = 1;
154   u32 i;
155   uword *avail_cpu;
156
157   /* get bitmaps of active cpu cores and sockets */
158   tm->cpu_core_bitmap =
159     vlib_sysfs_list_to_bitmap ("/sys/devices/system/cpu/online");
160   tm->cpu_socket_bitmap =
161     vlib_sysfs_list_to_bitmap ("/sys/devices/system/node/online");
162
163   avail_cpu = clib_bitmap_dup (tm->cpu_core_bitmap);
164
165   /* skip cores */
166   for (i = 0; i < tm->skip_cores; i++)
167     {
168       uword c = clib_bitmap_first_set (avail_cpu);
169       if (c == ~0)
170         return clib_error_return (0, "no available cpus to skip");
171
172       avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
173     }
174
175   /* grab cpu for main thread */
176   if (!tm->main_lcore)
177     {
178       tm->main_lcore = clib_bitmap_first_set (avail_cpu);
179       if (tm->main_lcore == (u8) ~ 0)
180         return clib_error_return (0, "no available cpus to be used for the"
181                                   " main thread");
182     }
183   else
184     {
185       if (clib_bitmap_get (avail_cpu, tm->main_lcore) == 0)
186         return clib_error_return (0, "cpu %u is not available to be used"
187                                   " for the main thread", tm->main_lcore);
188     }
189   avail_cpu = clib_bitmap_set (avail_cpu, tm->main_lcore, 0);
190
191   /* assume that there is socket 0 only if there is no data from sysfs */
192   if (!tm->cpu_socket_bitmap)
193     tm->cpu_socket_bitmap = clib_bitmap_set (0, 0, 1);
194
195   /* pin main thread to main_lcore  */
196 #if DPDK==0
197   {
198     cpu_set_t cpuset;
199     CPU_ZERO (&cpuset);
200     CPU_SET (tm->main_lcore, &cpuset);
201     pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
202   }
203 #endif
204
205   /* as many threads as stacks... */
206   vec_validate_aligned (vlib_worker_threads, vec_len (vlib_thread_stacks) - 1,
207                         CLIB_CACHE_LINE_BYTES);
208
209   /* Preallocate thread 0 */
210   _vec_len (vlib_worker_threads) = 1;
211   w = vlib_worker_threads;
212   w->thread_mheap = clib_mem_get_heap ();
213   w->thread_stack = vlib_thread_stacks[0];
214   w->lcore_id = tm->main_lcore;
215   w->lwp = syscall (SYS_gettid);
216   w->thread_id = pthread_self ();
217   tm->n_vlib_mains = 1;
218
219   if (tm->sched_policy != ~0)
220     {
221       struct sched_param sched_param;
222       if (!sched_getparam (w->lwp, &sched_param))
223         {
224           if (tm->sched_priority != ~0)
225             sched_param.sched_priority = tm->sched_priority;
226           sched_setscheduler (w->lwp, tm->sched_policy, &sched_param);
227         }
228     }
229
230   /* assign threads to cores and set n_vlib_mains */
231   tr = tm->next;
232
233   while (tr)
234     {
235       vec_add1 (tm->registrations, tr);
236       tr = tr->next;
237     }
238
239   vec_sort_with_function (tm->registrations, sort_registrations_by_no_clone);
240
241   for (i = 0; i < vec_len (tm->registrations); i++)
242     {
243       int j;
244       tr = tm->registrations[i];
245       tr->first_index = first_index;
246       first_index += tr->count;
247       n_vlib_mains += (tr->no_data_structure_clone == 0) ? tr->count : 0;
248
249       /* construct coremask */
250       if (tr->use_pthreads || !tr->count)
251         continue;
252
253       if (tr->coremask)
254         {
255           uword c;
256           /* *INDENT-OFF* */
257           clib_bitmap_foreach (c, tr->coremask, ({
258             if (clib_bitmap_get(avail_cpu, c) == 0)
259               return clib_error_return (0, "cpu %u is not available to be used"
260                                         " for the '%s' thread",c, tr->name);
261
262             avail_cpu = clib_bitmap_set(avail_cpu, c, 0);
263           }));
264 /* *INDENT-ON* */
265
266         }
267       else
268         {
269           for (j = 0; j < tr->count; j++)
270             {
271               uword c = clib_bitmap_first_set (avail_cpu);
272               if (c == ~0)
273                 return clib_error_return (0,
274                                           "no available cpus to be used for"
275                                           " the '%s' thread", tr->name);
276
277               avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
278               tr->coremask = clib_bitmap_set (tr->coremask, c, 1);
279             }
280         }
281     }
282
283   clib_bitmap_free (avail_cpu);
284
285   tm->n_vlib_mains = n_vlib_mains;
286
287   vec_validate_aligned (vlib_worker_threads, first_index - 1,
288                         CLIB_CACHE_LINE_BYTES);
289
290
291   tm->efd.enabled = VLIB_EFD_DISABLED;
292   tm->efd.queue_hi_thresh = ((VLIB_EFD_DEF_WORKER_HI_THRESH_PCT *
293                               FRAME_QUEUE_NELTS) / 100);
294   return 0;
295 }
296
297 vlib_worker_thread_t *
298 vlib_alloc_thread (vlib_main_t * vm)
299 {
300   vlib_worker_thread_t *w;
301
302   if (vec_len (vlib_worker_threads) >= vec_len (vlib_thread_stacks))
303     {
304       clib_warning ("out of worker threads... Quitting...");
305       exit (1);
306     }
307   vec_add2 (vlib_worker_threads, w, 1);
308   w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
309   return w;
310 }
311
312 vlib_frame_queue_t *
313 vlib_frame_queue_alloc (int nelts)
314 {
315   vlib_frame_queue_t *fq;
316
317   fq = clib_mem_alloc_aligned (sizeof (*fq), CLIB_CACHE_LINE_BYTES);
318   memset (fq, 0, sizeof (*fq));
319   fq->nelts = nelts;
320   fq->vector_threshold = 128;   // packets
321   vec_validate_aligned (fq->elts, nelts - 1, CLIB_CACHE_LINE_BYTES);
322
323   if (1)
324     {
325       if (((uword) & fq->tail) & (CLIB_CACHE_LINE_BYTES - 1))
326         fformat (stderr, "WARNING: fq->tail unaligned\n");
327       if (((uword) & fq->head) & (CLIB_CACHE_LINE_BYTES - 1))
328         fformat (stderr, "WARNING: fq->head unaligned\n");
329       if (((uword) fq->elts) & (CLIB_CACHE_LINE_BYTES - 1))
330         fformat (stderr, "WARNING: fq->elts unaligned\n");
331
332       if (sizeof (fq->elts[0]) % CLIB_CACHE_LINE_BYTES)
333         fformat (stderr, "WARNING: fq->elts[0] size %d\n",
334                  sizeof (fq->elts[0]));
335       if (nelts & (nelts - 1))
336         {
337           fformat (stderr, "FATAL: nelts MUST be a power of 2\n");
338           abort ();
339         }
340     }
341
342   return (fq);
343 }
344
345 void vl_msg_api_handler_no_free (void *) __attribute__ ((weak));
346 void
347 vl_msg_api_handler_no_free (void *v)
348 {
349 }
350
351 /* Turned off, save as reference material... */
352 #if 0
353 static inline int
354 vlib_frame_queue_dequeue_internal (int thread_id,
355                                    vlib_main_t * vm, vlib_node_main_t * nm)
356 {
357   vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
358   vlib_frame_queue_elt_t *elt;
359   vlib_frame_t *f;
360   vlib_pending_frame_t *p;
361   vlib_node_runtime_t *r;
362   u32 node_runtime_index;
363   int msg_type;
364   u64 before;
365   int processed = 0;
366
367   ASSERT (vm == vlib_mains[thread_id]);
368
369   while (1)
370     {
371       if (fq->head == fq->tail)
372         return processed;
373
374       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
375
376       if (!elt->valid)
377         return processed;
378
379       before = clib_cpu_time_now ();
380
381       f = elt->frame;
382       node_runtime_index = elt->node_runtime_index;
383       msg_type = elt->msg_type;
384
385       switch (msg_type)
386         {
387         case VLIB_FRAME_QUEUE_ELT_FREE_BUFFERS:
388           vlib_buffer_free (vm, vlib_frame_vector_args (f), f->n_vectors);
389           /* note fallthrough... */
390         case VLIB_FRAME_QUEUE_ELT_FREE_FRAME:
391           r = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
392                                 node_runtime_index);
393           vlib_frame_free (vm, r, f);
394           break;
395         case VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME:
396           vec_add2 (vm->node_main.pending_frames, p, 1);
397           f->flags |= (VLIB_FRAME_PENDING | VLIB_FRAME_FREE_AFTER_DISPATCH);
398           p->node_runtime_index = elt->node_runtime_index;
399           p->frame_index = vlib_frame_index (vm, f);
400           p->next_frame_index = VLIB_PENDING_FRAME_NO_NEXT_FRAME;
401           fq->dequeue_vectors += (u64) f->n_vectors;
402           break;
403         case VLIB_FRAME_QUEUE_ELT_API_MSG:
404           vl_msg_api_handler_no_free (f);
405           break;
406         default:
407           clib_warning ("bogus frame queue message, type %d", msg_type);
408           break;
409         }
410       elt->valid = 0;
411       fq->dequeues++;
412       fq->dequeue_ticks += clib_cpu_time_now () - before;
413       CLIB_MEMORY_BARRIER ();
414       fq->head++;
415       processed++;
416     }
417   ASSERT (0);
418   return processed;
419 }
420
421 int
422 vlib_frame_queue_dequeue (int thread_id,
423                           vlib_main_t * vm, vlib_node_main_t * nm)
424 {
425   return vlib_frame_queue_dequeue_internal (thread_id, vm, nm);
426 }
427
428 int
429 vlib_frame_queue_enqueue (vlib_main_t * vm, u32 node_runtime_index,
430                           u32 frame_queue_index, vlib_frame_t * frame,
431                           vlib_frame_queue_msg_type_t type)
432 {
433   vlib_frame_queue_t *fq = vlib_frame_queues[frame_queue_index];
434   vlib_frame_queue_elt_t *elt;
435   u32 save_count;
436   u64 new_tail;
437   u64 before = clib_cpu_time_now ();
438
439   ASSERT (fq);
440
441   new_tail = __sync_add_and_fetch (&fq->tail, 1);
442
443   /* Wait until a ring slot is available */
444   while (new_tail >= fq->head + fq->nelts)
445     {
446       f64 b4 = vlib_time_now_ticks (vm, before);
447       vlib_worker_thread_barrier_check (vm, b4);
448       /* Bad idea. Dequeue -> enqueue -> dequeue -> trouble */
449       // vlib_frame_queue_dequeue (vm->cpu_index, vm, nm);
450     }
451
452   elt = fq->elts + (new_tail & (fq->nelts - 1));
453
454   /* this would be very bad... */
455   while (elt->valid)
456     {
457     }
458
459   /* Once we enqueue the frame, frame->n_vectors is owned elsewhere... */
460   save_count = frame->n_vectors;
461
462   elt->frame = frame;
463   elt->node_runtime_index = node_runtime_index;
464   elt->msg_type = type;
465   CLIB_MEMORY_BARRIER ();
466   elt->valid = 1;
467
468   return save_count;
469 }
470 #endif /* 0 */
471
472 /* To be called by vlib worker threads upon startup */
473 void
474 vlib_worker_thread_init (vlib_worker_thread_t * w)
475 {
476   vlib_thread_main_t *tm = vlib_get_thread_main ();
477
478   /* worker threads wants no signals. */
479   {
480     sigset_t s;
481     sigfillset (&s);
482     pthread_sigmask (SIG_SETMASK, &s, 0);
483   }
484
485   clib_mem_set_heap (w->thread_mheap);
486
487   if (vec_len (tm->thread_prefix) && w->registration->short_name)
488     {
489       w->name = format (0, "%v_%s_%d%c", tm->thread_prefix,
490                         w->registration->short_name, w->instance_id, '\0');
491       vlib_set_thread_name ((char *) w->name);
492     }
493
494   if (!w->registration->use_pthreads)
495     {
496
497       /* Initial barrier sync, for both worker and i/o threads */
498       clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, 1);
499
500       while (*vlib_worker_threads->wait_at_barrier)
501         ;
502
503       clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, -1);
504     }
505 }
506
507 void *
508 vlib_worker_thread_bootstrap_fn (void *arg)
509 {
510   void *rv;
511   vlib_worker_thread_t *w = arg;
512
513   w->lwp = syscall (SYS_gettid);
514   w->thread_id = pthread_self ();
515
516   rv = (void *) clib_calljmp
517     ((uword (*)(uword)) w->thread_function,
518      (uword) arg, w->thread_stack + VLIB_THREAD_STACK_SIZE);
519   /* NOTREACHED, we hope */
520   return rv;
521 }
522
523 static int
524 vlib_launch_thread (void *fp, vlib_worker_thread_t * w, unsigned lcore_id)
525 {
526   void *(*fp_arg) (void *) = fp;
527
528   w->lcore_id = lcore_id;
529 #if DPDK==1
530   if (!w->registration->use_pthreads)
531     if (rte_eal_remote_launch)  /* do we have dpdk linked */
532       return rte_eal_remote_launch (fp, (void *) w, lcore_id);
533     else
534       return -1;
535   else
536 #endif
537     {
538       int ret;
539       pthread_t worker;
540       cpu_set_t cpuset;
541       CPU_ZERO (&cpuset);
542       CPU_SET (lcore_id, &cpuset);
543
544       ret = pthread_create (&worker, NULL /* attr */ , fp_arg, (void *) w);
545       if (ret == 0)
546         return pthread_setaffinity_np (worker, sizeof (cpu_set_t), &cpuset);
547       else
548         return ret;
549     }
550 }
551
552 static clib_error_t *
553 start_workers (vlib_main_t * vm)
554 {
555   int i, j;
556   vlib_worker_thread_t *w;
557   vlib_main_t *vm_clone;
558   void *oldheap;
559   vlib_frame_queue_t *fq;
560   vlib_thread_main_t *tm = &vlib_thread_main;
561   vlib_thread_registration_t *tr;
562   vlib_node_runtime_t *rt;
563   u32 n_vlib_mains = tm->n_vlib_mains;
564   u32 worker_thread_index;
565   u8 *main_heap = clib_mem_get_per_cpu_heap ();
566   mheap_t *main_heap_header = mheap_header (main_heap);
567
568   vec_reset_length (vlib_worker_threads);
569
570   /* Set up the main thread */
571   vec_add2_aligned (vlib_worker_threads, w, 1, CLIB_CACHE_LINE_BYTES);
572   w->elog_track.name = "main thread";
573   elog_track_register (&vm->elog_main, &w->elog_track);
574
575   if (vec_len (tm->thread_prefix))
576     {
577       w->name = format (0, "%v_main%c", tm->thread_prefix, '\0');
578       vlib_set_thread_name ((char *) w->name);
579     }
580
581   /*
582    * Truth of the matter: we always use at least two
583    * threads. So, make the main heap thread-safe
584    * and make the event log thread-safe.
585    */
586   main_heap_header->flags |= MHEAP_FLAG_THREAD_SAFE;
587   vm->elog_main.lock =
588     clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES, CLIB_CACHE_LINE_BYTES);
589   vm->elog_main.lock[0] = 0;
590
591   if (n_vlib_mains > 1)
592     {
593       vec_validate (vlib_mains, tm->n_vlib_mains - 1);
594       _vec_len (vlib_mains) = 0;
595       vec_add1 (vlib_mains, vm);
596
597       vec_validate (vlib_frame_queues, tm->n_vlib_mains - 1);
598       _vec_len (vlib_frame_queues) = 0;
599       fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS);
600       vec_add1 (vlib_frame_queues, fq);
601
602       vlib_worker_threads->wait_at_barrier =
603         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
604       vlib_worker_threads->workers_at_barrier =
605         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
606
607       /* Ask for an initial barrier sync */
608       *vlib_worker_threads->workers_at_barrier = 0;
609       *vlib_worker_threads->wait_at_barrier = 1;
610
611       worker_thread_index = 1;
612
613       for (i = 0; i < vec_len (tm->registrations); i++)
614         {
615           vlib_node_main_t *nm, *nm_clone;
616           vlib_buffer_main_t *bm_clone;
617           vlib_buffer_free_list_t *fl_clone, *fl_orig;
618           vlib_buffer_free_list_t *orig_freelist_pool;
619           int k;
620
621           tr = tm->registrations[i];
622
623           if (tr->count == 0)
624             continue;
625
626           for (k = 0; k < tr->count; k++)
627             {
628               vec_add2 (vlib_worker_threads, w, 1);
629               if (tr->mheap_size)
630                 w->thread_mheap =
631                   mheap_alloc (0 /* use VM */ , tr->mheap_size);
632               else
633                 w->thread_mheap = main_heap;
634               w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
635               w->thread_function = tr->function;
636               w->thread_function_arg = w;
637               w->instance_id = k;
638               w->registration = tr;
639
640               w->elog_track.name =
641                 (char *) format (0, "%s %d", tr->name, k + 1);
642               vec_add1 (w->elog_track.name, 0);
643               elog_track_register (&vm->elog_main, &w->elog_track);
644
645               if (tr->no_data_structure_clone)
646                 continue;
647
648               /* Allocate "to-worker-N" frame queue */
649               if (tr->frame_queue_nelts)
650                 {
651                   fq = vlib_frame_queue_alloc (tr->frame_queue_nelts);
652                 }
653               else
654                 {
655                   fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS);
656                 }
657
658               vec_validate (vlib_frame_queues, worker_thread_index);
659               vlib_frame_queues[worker_thread_index] = fq;
660
661               /* Fork vlib_global_main et al. Look for bugs here */
662               oldheap = clib_mem_set_heap (w->thread_mheap);
663
664               vm_clone = clib_mem_alloc (sizeof (*vm_clone));
665               clib_memcpy (vm_clone, vlib_mains[0], sizeof (*vm_clone));
666
667               vm_clone->cpu_index = worker_thread_index;
668               vm_clone->heap_base = w->thread_mheap;
669               vm_clone->mbuf_alloc_list = 0;
670               memset (&vm_clone->random_buffer, 0,
671                       sizeof (vm_clone->random_buffer));
672
673               nm = &vlib_mains[0]->node_main;
674               nm_clone = &vm_clone->node_main;
675               /* fork next frames array, preserving node runtime indices */
676               nm_clone->next_frames = vec_dup (nm->next_frames);
677               for (j = 0; j < vec_len (nm_clone->next_frames); j++)
678                 {
679                   vlib_next_frame_t *nf = &nm_clone->next_frames[j];
680                   u32 save_node_runtime_index;
681                   u32 save_flags;
682
683                   save_node_runtime_index = nf->node_runtime_index;
684                   save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
685                   vlib_next_frame_init (nf);
686                   nf->node_runtime_index = save_node_runtime_index;
687                   nf->flags = save_flags;
688                 }
689
690               /* fork the frame dispatch queue */
691               nm_clone->pending_frames = 0;
692               vec_validate (nm_clone->pending_frames, 10);      /* $$$$$?????? */
693               _vec_len (nm_clone->pending_frames) = 0;
694
695               /* fork nodes */
696               nm_clone->nodes = 0;
697               for (j = 0; j < vec_len (nm->nodes); j++)
698                 {
699                   vlib_node_t *n;
700                   n = clib_mem_alloc_no_fail (sizeof (*n));
701                   clib_memcpy (n, nm->nodes[j], sizeof (*n));
702                   /* none of the copied nodes have enqueue rights given out */
703                   n->owner_node_index = VLIB_INVALID_NODE_INDEX;
704                   memset (&n->stats_total, 0, sizeof (n->stats_total));
705                   memset (&n->stats_last_clear, 0,
706                           sizeof (n->stats_last_clear));
707                   vec_add1 (nm_clone->nodes, n);
708                 }
709               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
710                 vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
711
712               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
713                 vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
714               vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
715                 rt->cpu_index = vm_clone->cpu_index;
716
717               nm_clone->processes = vec_dup (nm->processes);
718
719               /* zap the (per worker) frame freelists, etc */
720               nm_clone->frame_sizes = 0;
721               nm_clone->frame_size_hash = 0;
722
723               /* Packet trace buffers are guaranteed to be empty, nothing to do here */
724
725               clib_mem_set_heap (oldheap);
726               vec_add1 (vlib_mains, vm_clone);
727
728               vm_clone->error_main.counters =
729                 vec_dup (vlib_mains[0]->error_main.counters);
730               vm_clone->error_main.counters_last_clear =
731                 vec_dup (vlib_mains[0]->error_main.counters_last_clear);
732
733               /* Fork the vlib_buffer_main_t free lists, etc. */
734               bm_clone = vec_dup (vm_clone->buffer_main);
735               vm_clone->buffer_main = bm_clone;
736
737               orig_freelist_pool = bm_clone->buffer_free_list_pool;
738               bm_clone->buffer_free_list_pool = 0;
739
740             /* *INDENT-OFF* */
741             pool_foreach (fl_orig, orig_freelist_pool,
742                           ({
743                             pool_get_aligned (bm_clone->buffer_free_list_pool,
744                                               fl_clone, CLIB_CACHE_LINE_BYTES);
745                             ASSERT (fl_orig - orig_freelist_pool
746                                     == fl_clone - bm_clone->buffer_free_list_pool);
747
748                             fl_clone[0] = fl_orig[0];
749                             fl_clone->aligned_buffers = 0;
750                             fl_clone->unaligned_buffers = 0;
751                             fl_clone->n_alloc = 0;
752                           }));
753 /* *INDENT-ON* */
754
755               worker_thread_index++;
756             }
757         }
758     }
759   else
760     {
761       /* only have non-data-structure copy threads to create... */
762       for (i = 0; i < vec_len (tm->registrations); i++)
763         {
764           tr = tm->registrations[i];
765
766           for (j = 0; j < tr->count; j++)
767             {
768               vec_add2 (vlib_worker_threads, w, 1);
769               if (tr->mheap_size)
770                 w->thread_mheap =
771                   mheap_alloc (0 /* use VM */ , tr->mheap_size);
772               else
773                 w->thread_mheap = main_heap;
774               w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
775               w->thread_function = tr->function;
776               w->thread_function_arg = w;
777               w->instance_id = j;
778               w->elog_track.name =
779                 (char *) format (0, "%s %d", tr->name, j + 1);
780               w->registration = tr;
781               vec_add1 (w->elog_track.name, 0);
782               elog_track_register (&vm->elog_main, &w->elog_track);
783             }
784         }
785     }
786
787   worker_thread_index = 1;
788
789   for (i = 0; i < vec_len (tm->registrations); i++)
790     {
791       int j;
792
793       tr = tm->registrations[i];
794
795       if (tr->use_pthreads || tm->use_pthreads)
796         {
797           for (j = 0; j < tr->count; j++)
798             {
799               w = vlib_worker_threads + worker_thread_index++;
800               if (vlib_launch_thread (vlib_worker_thread_bootstrap_fn, w, 0) <
801                   0)
802                 clib_warning ("Couldn't start '%s' pthread ", tr->name);
803             }
804         }
805       else
806         {
807           uword c;
808             /* *INDENT-OFF* */
809             clib_bitmap_foreach (c, tr->coremask, ({
810               w = vlib_worker_threads + worker_thread_index++;
811               if (vlib_launch_thread (vlib_worker_thread_bootstrap_fn, w, c) < 0)
812                 clib_warning ("Couldn't start DPDK lcore %d", c);
813
814             }));
815 /* *INDENT-ON* */
816         }
817     }
818   vlib_worker_thread_barrier_sync (vm);
819   vlib_worker_thread_barrier_release (vm);
820   return 0;
821 }
822
823 VLIB_MAIN_LOOP_ENTER_FUNCTION (start_workers);
824
825 void
826 vlib_worker_thread_node_runtime_update (void)
827 {
828   int i, j;
829   vlib_worker_thread_t *w;
830   vlib_main_t *vm;
831   vlib_node_main_t *nm, *nm_clone;
832   vlib_node_t **old_nodes_clone;
833   vlib_main_t *vm_clone;
834   vlib_node_runtime_t *rt, *old_rt;
835   void *oldheap;
836   never_inline void
837     vlib_node_runtime_sync_stats (vlib_main_t * vm,
838                                   vlib_node_runtime_t * r,
839                                   uword n_calls,
840                                   uword n_vectors, uword n_clocks);
841
842   ASSERT (os_get_cpu_number () == 0);
843
844   if (vec_len (vlib_mains) == 0)
845     return;
846
847   vm = vlib_mains[0];
848   nm = &vm->node_main;
849
850   ASSERT (os_get_cpu_number () == 0);
851   ASSERT (*vlib_worker_threads->wait_at_barrier == 1);
852
853   /*
854    * Scrape all runtime stats, so we don't lose node runtime(s) with
855    * pending counts, or throw away worker / io thread counts.
856    */
857   for (j = 0; j < vec_len (nm->nodes); j++)
858     {
859       vlib_node_t *n;
860       n = nm->nodes[j];
861       vlib_node_sync_stats (vm, n);
862     }
863
864   for (i = 1; i < vec_len (vlib_mains); i++)
865     {
866       vlib_node_t *n;
867
868       vm_clone = vlib_mains[i];
869       nm_clone = &vm_clone->node_main;
870
871       for (j = 0; j < vec_len (nm_clone->nodes); j++)
872         {
873           n = nm_clone->nodes[j];
874
875           rt = vlib_node_get_runtime (vm_clone, n->index);
876           vlib_node_runtime_sync_stats (vm_clone, rt, 0, 0, 0);
877         }
878     }
879
880   for (i = 1; i < vec_len (vlib_mains); i++)
881     {
882       vlib_node_runtime_t *rt;
883       w = vlib_worker_threads + i;
884       oldheap = clib_mem_set_heap (w->thread_mheap);
885
886       vm_clone = vlib_mains[i];
887
888       /* Re-clone error heap */
889       u64 *old_counters = vm_clone->error_main.counters;
890       u64 *old_counters_all_clear = vm_clone->error_main.counters_last_clear;
891       clib_memcpy (&vm_clone->error_main, &vm->error_main,
892                    sizeof (vm->error_main));
893       j = vec_len (vm->error_main.counters) - 1;
894       vec_validate_aligned (old_counters, j, CLIB_CACHE_LINE_BYTES);
895       vec_validate_aligned (old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES);
896       vm_clone->error_main.counters = old_counters;
897       vm_clone->error_main.counters_last_clear = old_counters_all_clear;
898
899       nm_clone = &vm_clone->node_main;
900       vec_free (nm_clone->next_frames);
901       nm_clone->next_frames = vec_dup (nm->next_frames);
902
903       for (j = 0; j < vec_len (nm_clone->next_frames); j++)
904         {
905           vlib_next_frame_t *nf = &nm_clone->next_frames[j];
906           u32 save_node_runtime_index;
907           u32 save_flags;
908
909           save_node_runtime_index = nf->node_runtime_index;
910           save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
911           vlib_next_frame_init (nf);
912           nf->node_runtime_index = save_node_runtime_index;
913           nf->flags = save_flags;
914         }
915
916       old_nodes_clone = nm_clone->nodes;
917       nm_clone->nodes = 0;
918
919       /* re-fork nodes */
920       for (j = 0; j < vec_len (nm->nodes); j++)
921         {
922           vlib_node_t *old_n_clone;
923           vlib_node_t *new_n, *new_n_clone;
924
925           new_n = nm->nodes[j];
926           old_n_clone = old_nodes_clone[j];
927
928           new_n_clone = clib_mem_alloc_no_fail (sizeof (*new_n_clone));
929           clib_memcpy (new_n_clone, new_n, sizeof (*new_n));
930           /* none of the copied nodes have enqueue rights given out */
931           new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX;
932
933           if (j >= vec_len (old_nodes_clone))
934             {
935               /* new node, set to zero */
936               memset (&new_n_clone->stats_total, 0,
937                       sizeof (new_n_clone->stats_total));
938               memset (&new_n_clone->stats_last_clear, 0,
939                       sizeof (new_n_clone->stats_last_clear));
940             }
941           else
942             {
943               /* Copy stats if the old data is valid */
944               clib_memcpy (&new_n_clone->stats_total,
945                            &old_n_clone->stats_total,
946                            sizeof (new_n_clone->stats_total));
947               clib_memcpy (&new_n_clone->stats_last_clear,
948                            &old_n_clone->stats_last_clear,
949                            sizeof (new_n_clone->stats_last_clear));
950
951               /* keep previous node state */
952               new_n_clone->state = old_n_clone->state;
953             }
954           vec_add1 (nm_clone->nodes, new_n_clone);
955         }
956       /* Free the old node clone */
957       for (j = 0; j < vec_len (old_nodes_clone); j++)
958         clib_mem_free (old_nodes_clone[j]);
959       vec_free (old_nodes_clone);
960
961       vec_free (nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
962
963       nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
964         vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
965
966       /* clone input node runtime */
967       old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT];
968
969       nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
970         vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
971
972       vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
973       {
974         rt->cpu_index = vm_clone->cpu_index;
975       }
976
977       for (j = 0; j < vec_len (old_rt); j++)
978         {
979           rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
980           rt->state = old_rt[j].state;
981         }
982
983       vec_free (old_rt);
984
985       nm_clone->processes = vec_dup (nm->processes);
986
987       clib_mem_set_heap (oldheap);
988
989       // vnet_main_fork_fixup (i);
990     }
991 }
992
993 u32
994 unformat_sched_policy (unformat_input_t * input, va_list * args)
995 {
996   u32 *r = va_arg (*args, u32 *);
997
998   if (0);
999 #define _(v,f,s) else if (unformat (input, s)) *r = SCHED_POLICY_##f;
1000   foreach_sched_policy
1001 #undef _
1002     else
1003     return 0;
1004   return 1;
1005 }
1006
1007 static clib_error_t *
1008 cpu_config (vlib_main_t * vm, unformat_input_t * input)
1009 {
1010   vlib_thread_registration_t *tr;
1011   uword *p;
1012   vlib_thread_main_t *tm = &vlib_thread_main;
1013   u8 *name;
1014   u64 coremask;
1015   uword *bitmap;
1016   u32 count;
1017
1018   tm->thread_registrations_by_name = hash_create_string (0, sizeof (uword));
1019
1020   tm->n_thread_stacks = 1;      /* account for main thread */
1021   tm->sched_policy = ~0;
1022   tm->sched_priority = ~0;
1023
1024   tr = tm->next;
1025
1026   while (tr)
1027     {
1028       hash_set_mem (tm->thread_registrations_by_name, tr->name, (uword) tr);
1029       tr = tr->next;
1030     }
1031
1032   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1033     {
1034       if (unformat (input, "use-pthreads"))
1035         tm->use_pthreads = 1;
1036       else if (unformat (input, "thread-prefix %v", &tm->thread_prefix))
1037         ;
1038       else if (unformat (input, "main-core %u", &tm->main_lcore))
1039         ;
1040       else if (unformat (input, "skip-cores %u", &tm->skip_cores))
1041         ;
1042       else if (unformat (input, "coremask-%s %llx", &name, &coremask))
1043         {
1044           p = hash_get_mem (tm->thread_registrations_by_name, name);
1045           if (p == 0)
1046             return clib_error_return (0, "no such thread type '%s'", name);
1047
1048           tr = (vlib_thread_registration_t *) p[0];
1049
1050           if (tr->use_pthreads)
1051             return clib_error_return (0,
1052                                       "coremask cannot be set for '%s' threads",
1053                                       name);
1054
1055           tr->coremask = clib_bitmap_set_multiple
1056             (tr->coremask, 0, coremask, BITS (coremask));
1057           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1058         }
1059       else if (unformat (input, "corelist-%s %U", &name, unformat_bitmap_list,
1060                          &bitmap))
1061         {
1062           p = hash_get_mem (tm->thread_registrations_by_name, name);
1063           if (p == 0)
1064             return clib_error_return (0, "no such thread type '%s'", name);
1065
1066           tr = (vlib_thread_registration_t *) p[0];
1067
1068           if (tr->use_pthreads)
1069             return clib_error_return (0,
1070                                       "corelist cannot be set for '%s' threads",
1071                                       name);
1072
1073           tr->coremask = bitmap;
1074           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1075         }
1076       else
1077         if (unformat
1078             (input, "scheduler-policy %U", unformat_sched_policy,
1079              &tm->sched_policy))
1080         ;
1081       else if (unformat (input, "scheduler-priority %u", &tm->sched_priority))
1082         ;
1083       else if (unformat (input, "%s %u", &name, &count))
1084         {
1085           p = hash_get_mem (tm->thread_registrations_by_name, name);
1086           if (p == 0)
1087             return clib_error_return (0, "no such thread type 3 '%s'", name);
1088
1089           tr = (vlib_thread_registration_t *) p[0];
1090           if (tr->fixed_count)
1091             return clib_error_return
1092               (0, "number of %s threads not configurable", tr->name);
1093           tr->count = count;
1094         }
1095       else
1096         break;
1097     }
1098
1099   if (tm->sched_priority != ~0)
1100     {
1101       if (tm->sched_policy == SCHED_FIFO || tm->sched_policy == SCHED_RR)
1102         {
1103           u32 prio_max = sched_get_priority_max (tm->sched_policy);
1104           u32 prio_min = sched_get_priority_min (tm->sched_policy);
1105           if (tm->sched_priority > prio_max)
1106             tm->sched_priority = prio_max;
1107           if (tm->sched_priority < prio_min)
1108             tm->sched_priority = prio_min;
1109         }
1110       else
1111         {
1112           return clib_error_return
1113             (0,
1114              "scheduling priority (%d) is not allowed for `normal` scheduling policy",
1115              tm->sched_priority);
1116         }
1117     }
1118   tr = tm->next;
1119
1120   if (!tm->thread_prefix)
1121     tm->thread_prefix = format (0, "vpp");
1122
1123   while (tr)
1124     {
1125       tm->n_thread_stacks += tr->count;
1126       tm->n_pthreads += tr->count * tr->use_pthreads;
1127       tm->n_eal_threads += tr->count * (tr->use_pthreads == 0);
1128       tr = tr->next;
1129     }
1130
1131   return 0;
1132 }
1133
1134 VLIB_EARLY_CONFIG_FUNCTION (cpu_config, "cpu");
1135
1136 #if !defined (__x86_64__) && !defined (__aarch64__) && !defined (__powerpc64__) && !defined(__arm__)
1137 void
1138 __sync_fetch_and_add_8 (void)
1139 {
1140   fformat (stderr, "%s called\n", __FUNCTION__);
1141   abort ();
1142 }
1143
1144 void
1145 __sync_add_and_fetch_8 (void)
1146 {
1147   fformat (stderr, "%s called\n", __FUNCTION__);
1148   abort ();
1149 }
1150 #endif
1151
1152 void vnet_main_fixup (vlib_fork_fixup_t which) __attribute__ ((weak));
1153 void
1154 vnet_main_fixup (vlib_fork_fixup_t which)
1155 {
1156 }
1157
1158 void
1159 vlib_worker_thread_fork_fixup (vlib_fork_fixup_t which)
1160 {
1161   vlib_main_t *vm = vlib_get_main ();
1162
1163   if (vlib_mains == 0)
1164     return;
1165
1166   ASSERT (os_get_cpu_number () == 0);
1167   vlib_worker_thread_barrier_sync (vm);
1168
1169   switch (which)
1170     {
1171     case VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX:
1172       vnet_main_fixup (VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX);
1173       break;
1174
1175     default:
1176       ASSERT (0);
1177     }
1178   vlib_worker_thread_barrier_release (vm);
1179 }
1180
1181 void
1182 vlib_worker_thread_barrier_sync (vlib_main_t * vm)
1183 {
1184   f64 deadline;
1185   u32 count;
1186
1187   if (!vlib_mains)
1188     return;
1189
1190   count = vec_len (vlib_mains) - 1;
1191
1192   /* Tolerate recursive calls */
1193   if (++vlib_worker_threads[0].recursion_level > 1)
1194     return;
1195
1196   vlib_worker_threads[0].barrier_sync_count++;
1197
1198   ASSERT (os_get_cpu_number () == 0);
1199
1200   deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT;
1201
1202   *vlib_worker_threads->wait_at_barrier = 1;
1203   while (*vlib_worker_threads->workers_at_barrier != count)
1204     {
1205       if (vlib_time_now (vm) > deadline)
1206         {
1207           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1208           os_panic ();
1209         }
1210     }
1211 }
1212
1213 void
1214 vlib_worker_thread_barrier_release (vlib_main_t * vm)
1215 {
1216   f64 deadline;
1217
1218   if (!vlib_mains)
1219     return;
1220
1221   if (--vlib_worker_threads[0].recursion_level > 0)
1222     return;
1223
1224   deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT;
1225
1226   *vlib_worker_threads->wait_at_barrier = 0;
1227
1228   while (*vlib_worker_threads->workers_at_barrier > 0)
1229     {
1230       if (vlib_time_now (vm) > deadline)
1231         {
1232           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1233           os_panic ();
1234         }
1235     }
1236 }
1237
1238 /*
1239  * Check the frame queue to see if any frames are available.
1240  * If so, pull the packets off the frames and put them to
1241  * the handoff node.
1242  */
1243 static inline int
1244 vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
1245 {
1246   u32 thread_id = vm->cpu_index;
1247   vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
1248   vlib_frame_queue_elt_t *elt;
1249   u32 *from, *to;
1250   vlib_frame_t *f;
1251   int msg_type;
1252   int processed = 0;
1253   u32 n_left_to_node;
1254   u32 vectors = 0;
1255   vlib_thread_main_t *tm = vlib_get_thread_main ();
1256
1257   ASSERT (fq);
1258   ASSERT (vm == vlib_mains[thread_id]);
1259
1260   if (PREDICT_FALSE (tm->handoff_dispatch_node_index == ~0))
1261     return 0;
1262   /*
1263    * Gather trace data for frame queues
1264    */
1265   if (PREDICT_FALSE (fq->trace))
1266     {
1267       frame_queue_trace_t *fqt;
1268       frame_queue_nelt_counter_t *fqh;
1269       u32 elix;
1270
1271       fqt = &tm->frame_queue_traces[thread_id];
1272
1273       fqt->nelts = fq->nelts;
1274       fqt->head = fq->head;
1275       fqt->head_hint = fq->head_hint;
1276       fqt->tail = fq->tail;
1277       fqt->threshold = fq->vector_threshold;
1278       fqt->n_in_use = fqt->tail - fqt->head;
1279       if (fqt->n_in_use >= fqt->nelts)
1280         {
1281           // if beyond max then use max
1282           fqt->n_in_use = fqt->nelts - 1;
1283         }
1284
1285       /* Record the number of elements in use in the histogram */
1286       fqh = &tm->frame_queue_histogram[thread_id];
1287       fqh->count[fqt->n_in_use]++;
1288
1289       /* Record a snapshot of the elements in use */
1290       for (elix = 0; elix < fqt->nelts; elix++)
1291         {
1292           elt = fq->elts + ((fq->head + 1 + elix) & (fq->nelts - 1));
1293           if (1 || elt->valid)
1294             {
1295               fqt->n_vectors[elix] = elt->n_vectors;
1296             }
1297         }
1298       fqt->written = 1;
1299     }
1300
1301   while (1)
1302     {
1303       if (fq->head == fq->tail)
1304         {
1305           fq->head_hint = fq->head;
1306           return processed;
1307         }
1308
1309       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
1310
1311       if (!elt->valid)
1312         {
1313           fq->head_hint = fq->head;
1314           return processed;
1315         }
1316
1317       from = elt->buffer_index;
1318       msg_type = elt->msg_type;
1319
1320       ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME);
1321       ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE);
1322
1323       f = vlib_get_frame_to_node (vm, tm->handoff_dispatch_node_index);
1324
1325       to = vlib_frame_vector_args (f);
1326
1327       n_left_to_node = elt->n_vectors;
1328
1329       while (n_left_to_node >= 4)
1330         {
1331           to[0] = from[0];
1332           to[1] = from[1];
1333           to[2] = from[2];
1334           to[3] = from[3];
1335           to += 4;
1336           from += 4;
1337           n_left_to_node -= 4;
1338         }
1339
1340       while (n_left_to_node > 0)
1341         {
1342           to[0] = from[0];
1343           to++;
1344           from++;
1345           n_left_to_node--;
1346         }
1347
1348       vectors += elt->n_vectors;
1349       f->n_vectors = elt->n_vectors;
1350       vlib_put_frame_to_node (vm, tm->handoff_dispatch_node_index, f);
1351
1352       elt->valid = 0;
1353       elt->n_vectors = 0;
1354       elt->msg_type = 0xfefefefe;
1355       CLIB_MEMORY_BARRIER ();
1356       fq->head++;
1357       processed++;
1358
1359       /*
1360        * Limit the number of packets pushed into the graph
1361        */
1362       if (vectors >= fq->vector_threshold)
1363         {
1364           fq->head_hint = fq->head;
1365           return processed;
1366         }
1367     }
1368   ASSERT (0);
1369   return processed;
1370 }
1371
1372 static_always_inline void
1373 vlib_worker_thread_internal (vlib_main_t * vm)
1374 {
1375   vlib_node_main_t *nm = &vm->node_main;
1376   u64 cpu_time_now = clib_cpu_time_now ();
1377
1378   vec_alloc (nm->pending_interrupt_node_runtime_indices, 32);
1379
1380   while (1)
1381     {
1382       vlib_worker_thread_barrier_check ();
1383
1384       vlib_frame_queue_dequeue_internal (vm);
1385
1386       vlib_node_runtime_t *n;
1387       vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT])
1388       {
1389         cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
1390                                       VLIB_NODE_STATE_POLLING, /* frame */ 0,
1391                                       cpu_time_now);
1392       }
1393
1394       /* Next handle interrupts. */
1395       {
1396         uword l = _vec_len (nm->pending_interrupt_node_runtime_indices);
1397         uword i;
1398         if (l > 0)
1399           {
1400             _vec_len (nm->pending_interrupt_node_runtime_indices) = 0;
1401             for (i = 0; i < l; i++)
1402               {
1403                 n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
1404                                       nm->
1405                                       pending_interrupt_node_runtime_indices
1406                                       [i]);
1407                 cpu_time_now =
1408                   dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
1409                                  VLIB_NODE_STATE_INTERRUPT,
1410                                  /* frame */ 0,
1411                                  cpu_time_now);
1412               }
1413           }
1414       }
1415
1416       if (_vec_len (nm->pending_frames))
1417         {
1418           int i;
1419           cpu_time_now = clib_cpu_time_now ();
1420           for (i = 0; i < _vec_len (nm->pending_frames); i++)
1421             {
1422               vlib_pending_frame_t *p;
1423
1424               p = nm->pending_frames + i;
1425
1426               cpu_time_now = dispatch_pending_node (vm, p, cpu_time_now);
1427             }
1428           _vec_len (nm->pending_frames) = 0;
1429         }
1430       vlib_increment_main_loop_counter (vm);
1431
1432       /* Record time stamp in case there are no enabled nodes and above
1433          calls do not update time stamp. */
1434       cpu_time_now = clib_cpu_time_now ();
1435     }
1436 }
1437
1438 void
1439 vlib_worker_thread_fn (void *arg)
1440 {
1441   vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg;
1442   vlib_thread_main_t *tm = vlib_get_thread_main ();
1443   vlib_main_t *vm = vlib_get_main ();
1444
1445   ASSERT (vm->cpu_index == os_get_cpu_number ());
1446
1447   vlib_worker_thread_init (w);
1448   clib_time_init (&vm->clib_time);
1449   clib_mem_set_heap (w->thread_mheap);
1450
1451   /* Wait until the dpdk init sequence is complete */
1452   while (tm->worker_thread_release == 0)
1453     vlib_worker_thread_barrier_check ();
1454
1455   vlib_worker_thread_internal (vm);
1456 }
1457
1458 /* *INDENT-OFF* */
1459 VLIB_REGISTER_THREAD (worker_thread_reg, static) = {
1460   .name = "workers",
1461   .short_name = "wk",
1462   .function = vlib_worker_thread_fn,
1463 };
1464 /* *INDENT-ON* */
1465
1466 clib_error_t *
1467 threads_init (vlib_main_t * vm)
1468 {
1469   vlib_thread_main_t *tm = vlib_get_thread_main ();
1470
1471   tm->handoff_dispatch_node_index = ~0;
1472
1473   return 0;
1474 }
1475
1476 VLIB_INIT_FUNCTION (threads_init);
1477
1478 /*
1479  * fd.io coding-style-patch-verification: ON
1480  *
1481  * Local Variables:
1482  * eval: (c-set-style "gnu")
1483  * End:
1484  */