67c57a60f41e76655c35aa248ceabf6dce0407c9
[vpp.git] / vlib / vlib / threads.c
1 /*
2  * Copyright (c) 2015 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define _GNU_SOURCE
16
17 #include <signal.h>
18 #include <math.h>
19 #include <vppinfra/format.h>
20 #include <vlib/vlib.h>
21
22 #include <vlib/threads.h>
23 #include <vlib/unix/cj.h>
24
25
26 #if DPDK==1
27 #include <rte_config.h>
28 #include <rte_common.h>
29 #include <rte_eal.h>
30 #include <rte_launch.h>
31 #include <rte_lcore.h>
32 #endif
33 DECLARE_CJ_GLOBAL_LOG;
34
35 #define FRAME_QUEUE_NELTS 32
36
37
38 #if DPDK==1
39 /*
40  *  Weak definitions of DPDK symbols used in this file.
41  *  Needed for linking test programs without DPDK libs.
42  */
43 unsigned __thread __attribute__ ((weak)) RTE_PER_LCORE (_lcore_id);
44 struct lcore_config __attribute__ ((weak)) lcore_config[];
45 unsigned __attribute__ ((weak)) rte_socket_id ();
46 int __attribute__ ((weak)) rte_eal_remote_launch ();
47 #endif
48 u32
49 vl (void *p)
50 {
51   return vec_len (p);
52 }
53
54 vlib_thread_main_t vlib_thread_main;
55
56 uword
57 os_get_cpu_number (void)
58 {
59   void *sp;
60   uword n;
61   u32 len;
62
63   len = vec_len (vlib_thread_stacks);
64   if (len == 0)
65     return 0;
66
67   /* Get any old stack address. */
68   sp = &sp;
69
70   n = ((uword) sp - (uword) vlib_thread_stacks[0])
71     >> VLIB_LOG2_THREAD_STACK_SIZE;
72
73   /* "processes" have their own stacks, and they always run in thread 0 */
74   n = n >= len ? 0 : n;
75
76   return n;
77 }
78
79 uword
80 os_get_ncpus (void)
81 {
82   u32 len;
83
84   len = vec_len (vlib_thread_stacks);
85   if (len == 0)
86     return 1;
87   else
88     return len;
89 }
90
91 void
92 vlib_set_thread_name (char *name)
93 {
94   int pthread_setname_np (pthread_t __target_thread, const char *__name);
95   int rv;
96   pthread_t thread = pthread_self ();
97
98   if (thread)
99     {
100       rv = pthread_setname_np (thread, name);
101       if (rv)
102         clib_warning ("pthread_setname_np returned %d", rv);
103     }
104 }
105
106 static int
107 sort_registrations_by_no_clone (void *a0, void *a1)
108 {
109   vlib_thread_registration_t **tr0 = a0;
110   vlib_thread_registration_t **tr1 = a1;
111
112   return ((i32) ((*tr0)->no_data_structure_clone)
113           - ((i32) ((*tr1)->no_data_structure_clone)));
114 }
115
116 static uword *
117 vlib_sysfs_list_to_bitmap (char *filename)
118 {
119   FILE *fp;
120   uword *r = 0;
121
122   fp = fopen (filename, "r");
123
124   if (fp != NULL)
125     {
126       u8 *buffer = 0;
127       vec_validate (buffer, 256 - 1);
128       if (fgets ((char *) buffer, 256, fp))
129         {
130           unformat_input_t in;
131           unformat_init_string (&in, (char *) buffer,
132                                 strlen ((char *) buffer));
133           if (unformat (&in, "%U", unformat_bitmap_list, &r) != 1)
134             clib_warning ("unformat_bitmap_list failed");
135           unformat_free (&in);
136         }
137       vec_free (buffer);
138       fclose (fp);
139     }
140   return r;
141 }
142
143
144 /* Called early in the init sequence */
145
146 clib_error_t *
147 vlib_thread_init (vlib_main_t * vm)
148 {
149   vlib_thread_main_t *tm = &vlib_thread_main;
150   vlib_worker_thread_t *w;
151   vlib_thread_registration_t *tr;
152   u32 n_vlib_mains = 1;
153   u32 first_index = 1;
154   u32 i;
155   uword *avail_cpu;
156
157   /* get bitmaps of active cpu cores and sockets */
158   tm->cpu_core_bitmap =
159     vlib_sysfs_list_to_bitmap ("/sys/devices/system/cpu/online");
160   tm->cpu_socket_bitmap =
161     vlib_sysfs_list_to_bitmap ("/sys/devices/system/node/online");
162
163   avail_cpu = clib_bitmap_dup (tm->cpu_core_bitmap);
164
165   /* skip cores */
166   for (i = 0; i < tm->skip_cores; i++)
167     {
168       uword c = clib_bitmap_first_set (avail_cpu);
169       if (c == ~0)
170         return clib_error_return (0, "no available cpus to skip");
171
172       avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
173     }
174
175   /* grab cpu for main thread */
176   if (!tm->main_lcore)
177     {
178       tm->main_lcore = clib_bitmap_first_set (avail_cpu);
179       if (tm->main_lcore == (u8) ~ 0)
180         return clib_error_return (0, "no available cpus to be used for the"
181                                   " main thread");
182     }
183   else
184     {
185       if (clib_bitmap_get (avail_cpu, tm->main_lcore) == 0)
186         return clib_error_return (0, "cpu %u is not available to be used"
187                                   " for the main thread", tm->main_lcore);
188     }
189   avail_cpu = clib_bitmap_set (avail_cpu, tm->main_lcore, 0);
190
191   /* assume that there is socket 0 only if there is no data from sysfs */
192   if (!tm->cpu_socket_bitmap)
193     tm->cpu_socket_bitmap = clib_bitmap_set (0, 0, 1);
194
195   /* pin main thread to main_lcore  */
196 #if DPDK==0
197   {
198     cpu_set_t cpuset;
199     CPU_ZERO (&cpuset);
200     CPU_SET (tm->main_lcore, &cpuset);
201     pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
202   }
203 #endif
204
205   /* as many threads as stacks... */
206   vec_validate_aligned (vlib_worker_threads, vec_len (vlib_thread_stacks) - 1,
207                         CLIB_CACHE_LINE_BYTES);
208
209   /* Preallocate thread 0 */
210   _vec_len (vlib_worker_threads) = 1;
211   w = vlib_worker_threads;
212   w->thread_mheap = clib_mem_get_heap ();
213   w->thread_stack = vlib_thread_stacks[0];
214   w->dpdk_lcore_id = -1;
215   w->lwp = syscall (SYS_gettid);
216   tm->n_vlib_mains = 1;
217
218   if (tm->sched_policy != ~0)
219     {
220       struct sched_param sched_param;
221       if (!sched_getparam (w->lwp, &sched_param))
222         {
223           if (tm->sched_priority != ~0)
224             sched_param.sched_priority = tm->sched_priority;
225           sched_setscheduler (w->lwp, tm->sched_policy, &sched_param);
226         }
227     }
228
229   /* assign threads to cores and set n_vlib_mains */
230   tr = tm->next;
231
232   while (tr)
233     {
234       vec_add1 (tm->registrations, tr);
235       tr = tr->next;
236     }
237
238   vec_sort_with_function (tm->registrations, sort_registrations_by_no_clone);
239
240   for (i = 0; i < vec_len (tm->registrations); i++)
241     {
242       int j;
243       tr = tm->registrations[i];
244       tr->first_index = first_index;
245       first_index += tr->count;
246       n_vlib_mains += (tr->no_data_structure_clone == 0) ? tr->count : 0;
247
248       /* construct coremask */
249       if (tr->use_pthreads || !tr->count)
250         continue;
251
252       if (tr->coremask)
253         {
254           uword c;
255           /* *INDENT-OFF* */
256           clib_bitmap_foreach (c, tr->coremask, ({
257             if (clib_bitmap_get(avail_cpu, c) == 0)
258               return clib_error_return (0, "cpu %u is not available to be used"
259                                         " for the '%s' thread",c, tr->name);
260
261             avail_cpu = clib_bitmap_set(avail_cpu, c, 0);
262           }));
263 /* *INDENT-ON* */
264
265         }
266       else
267         {
268           for (j = 0; j < tr->count; j++)
269             {
270               uword c = clib_bitmap_first_set (avail_cpu);
271               if (c == ~0)
272                 return clib_error_return (0,
273                                           "no available cpus to be used for"
274                                           " the '%s' thread", tr->name);
275
276               avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
277               tr->coremask = clib_bitmap_set (tr->coremask, c, 1);
278             }
279         }
280     }
281
282   clib_bitmap_free (avail_cpu);
283
284   tm->n_vlib_mains = n_vlib_mains;
285
286   vec_validate_aligned (vlib_worker_threads, first_index - 1,
287                         CLIB_CACHE_LINE_BYTES);
288
289
290   tm->efd.enabled = VLIB_EFD_DISABLED;
291   tm->efd.queue_hi_thresh = ((VLIB_EFD_DEF_WORKER_HI_THRESH_PCT *
292                               FRAME_QUEUE_NELTS) / 100);
293   return 0;
294 }
295
296 vlib_worker_thread_t *
297 vlib_alloc_thread (vlib_main_t * vm)
298 {
299   vlib_worker_thread_t *w;
300
301   if (vec_len (vlib_worker_threads) >= vec_len (vlib_thread_stacks))
302     {
303       clib_warning ("out of worker threads... Quitting...");
304       exit (1);
305     }
306   vec_add2 (vlib_worker_threads, w, 1);
307   w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
308   return w;
309 }
310
311 vlib_frame_queue_t *
312 vlib_frame_queue_alloc (int nelts)
313 {
314   vlib_frame_queue_t *fq;
315
316   fq = clib_mem_alloc_aligned (sizeof (*fq), CLIB_CACHE_LINE_BYTES);
317   memset (fq, 0, sizeof (*fq));
318   fq->nelts = nelts;
319   fq->vector_threshold = 128;   // packets
320   vec_validate_aligned (fq->elts, nelts - 1, CLIB_CACHE_LINE_BYTES);
321
322   if (1)
323     {
324       if (((uword) & fq->tail) & (CLIB_CACHE_LINE_BYTES - 1))
325         fformat (stderr, "WARNING: fq->tail unaligned\n");
326       if (((uword) & fq->head) & (CLIB_CACHE_LINE_BYTES - 1))
327         fformat (stderr, "WARNING: fq->head unaligned\n");
328       if (((uword) fq->elts) & (CLIB_CACHE_LINE_BYTES - 1))
329         fformat (stderr, "WARNING: fq->elts unaligned\n");
330
331       if (sizeof (fq->elts[0]) % CLIB_CACHE_LINE_BYTES)
332         fformat (stderr, "WARNING: fq->elts[0] size %d\n",
333                  sizeof (fq->elts[0]));
334       if (nelts & (nelts - 1))
335         {
336           fformat (stderr, "FATAL: nelts MUST be a power of 2\n");
337           abort ();
338         }
339     }
340
341   return (fq);
342 }
343
344 void vl_msg_api_handler_no_free (void *) __attribute__ ((weak));
345 void
346 vl_msg_api_handler_no_free (void *v)
347 {
348 }
349
350 /* Turned off, save as reference material... */
351 #if 0
352 static inline int
353 vlib_frame_queue_dequeue_internal (int thread_id,
354                                    vlib_main_t * vm, vlib_node_main_t * nm)
355 {
356   vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
357   vlib_frame_queue_elt_t *elt;
358   vlib_frame_t *f;
359   vlib_pending_frame_t *p;
360   vlib_node_runtime_t *r;
361   u32 node_runtime_index;
362   int msg_type;
363   u64 before;
364   int processed = 0;
365
366   ASSERT (vm == vlib_mains[thread_id]);
367
368   while (1)
369     {
370       if (fq->head == fq->tail)
371         return processed;
372
373       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
374
375       if (!elt->valid)
376         return processed;
377
378       before = clib_cpu_time_now ();
379
380       f = elt->frame;
381       node_runtime_index = elt->node_runtime_index;
382       msg_type = elt->msg_type;
383
384       switch (msg_type)
385         {
386         case VLIB_FRAME_QUEUE_ELT_FREE_BUFFERS:
387           vlib_buffer_free (vm, vlib_frame_vector_args (f), f->n_vectors);
388           /* note fallthrough... */
389         case VLIB_FRAME_QUEUE_ELT_FREE_FRAME:
390           r = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
391                                 node_runtime_index);
392           vlib_frame_free (vm, r, f);
393           break;
394         case VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME:
395           vec_add2 (vm->node_main.pending_frames, p, 1);
396           f->flags |= (VLIB_FRAME_PENDING | VLIB_FRAME_FREE_AFTER_DISPATCH);
397           p->node_runtime_index = elt->node_runtime_index;
398           p->frame_index = vlib_frame_index (vm, f);
399           p->next_frame_index = VLIB_PENDING_FRAME_NO_NEXT_FRAME;
400           fq->dequeue_vectors += (u64) f->n_vectors;
401           break;
402         case VLIB_FRAME_QUEUE_ELT_API_MSG:
403           vl_msg_api_handler_no_free (f);
404           break;
405         default:
406           clib_warning ("bogus frame queue message, type %d", msg_type);
407           break;
408         }
409       elt->valid = 0;
410       fq->dequeues++;
411       fq->dequeue_ticks += clib_cpu_time_now () - before;
412       CLIB_MEMORY_BARRIER ();
413       fq->head++;
414       processed++;
415     }
416   ASSERT (0);
417   return processed;
418 }
419
420 int
421 vlib_frame_queue_dequeue (int thread_id,
422                           vlib_main_t * vm, vlib_node_main_t * nm)
423 {
424   return vlib_frame_queue_dequeue_internal (thread_id, vm, nm);
425 }
426
427 int
428 vlib_frame_queue_enqueue (vlib_main_t * vm, u32 node_runtime_index,
429                           u32 frame_queue_index, vlib_frame_t * frame,
430                           vlib_frame_queue_msg_type_t type)
431 {
432   vlib_frame_queue_t *fq = vlib_frame_queues[frame_queue_index];
433   vlib_frame_queue_elt_t *elt;
434   u32 save_count;
435   u64 new_tail;
436   u64 before = clib_cpu_time_now ();
437
438   ASSERT (fq);
439
440   new_tail = __sync_add_and_fetch (&fq->tail, 1);
441
442   /* Wait until a ring slot is available */
443   while (new_tail >= fq->head + fq->nelts)
444     {
445       f64 b4 = vlib_time_now_ticks (vm, before);
446       vlib_worker_thread_barrier_check (vm, b4);
447       /* Bad idea. Dequeue -> enqueue -> dequeue -> trouble */
448       // vlib_frame_queue_dequeue (vm->cpu_index, vm, nm);
449     }
450
451   elt = fq->elts + (new_tail & (fq->nelts - 1));
452
453   /* this would be very bad... */
454   while (elt->valid)
455     {
456     }
457
458   /* Once we enqueue the frame, frame->n_vectors is owned elsewhere... */
459   save_count = frame->n_vectors;
460
461   elt->frame = frame;
462   elt->node_runtime_index = node_runtime_index;
463   elt->msg_type = type;
464   CLIB_MEMORY_BARRIER ();
465   elt->valid = 1;
466
467   return save_count;
468 }
469 #endif /* 0 */
470
471 /* To be called by vlib worker threads upon startup */
472 void
473 vlib_worker_thread_init (vlib_worker_thread_t * w)
474 {
475   vlib_thread_main_t *tm = vlib_get_thread_main ();
476
477   /* worker threads wants no signals. */
478   {
479     sigset_t s;
480     sigfillset (&s);
481     pthread_sigmask (SIG_SETMASK, &s, 0);
482   }
483
484   clib_mem_set_heap (w->thread_mheap);
485
486   if (vec_len (tm->thread_prefix) && w->registration->short_name)
487     {
488       w->name = format (0, "%v_%s_%d%c", tm->thread_prefix,
489                         w->registration->short_name, w->instance_id, '\0');
490       vlib_set_thread_name ((char *) w->name);
491     }
492
493   if (!w->registration->use_pthreads)
494     {
495
496       /* Initial barrier sync, for both worker and i/o threads */
497       clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, 1);
498
499       while (*vlib_worker_threads->wait_at_barrier)
500         ;
501
502       clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, -1);
503     }
504 }
505
506 void *
507 vlib_worker_thread_bootstrap_fn (void *arg)
508 {
509   void *rv;
510   vlib_worker_thread_t *w = arg;
511
512   w->lwp = syscall (SYS_gettid);
513   w->dpdk_lcore_id = -1;
514 #if DPDK==1
515   if (w->registration && !w->registration->use_pthreads && rte_socket_id)       /* do we really have dpdk linked */
516     {
517       unsigned lcore = rte_lcore_id ();
518       lcore = lcore < RTE_MAX_LCORE ? lcore : -1;
519       w->dpdk_lcore_id = lcore;
520     }
521 #endif
522
523   rv = (void *) clib_calljmp
524     ((uword (*)(uword)) w->thread_function,
525      (uword) arg, w->thread_stack + VLIB_THREAD_STACK_SIZE);
526   /* NOTREACHED, we hope */
527   return rv;
528 }
529
530 static int
531 vlib_launch_thread (void *fp, vlib_worker_thread_t * w, unsigned lcore_id)
532 {
533   void *(*fp_arg) (void *) = fp;
534
535 #if DPDK==1
536   if (!w->registration->use_pthreads)
537     if (rte_eal_remote_launch)  /* do we have dpdk linked */
538       return rte_eal_remote_launch (fp, (void *) w, lcore_id);
539     else
540       return -1;
541   else
542 #endif
543     {
544       int ret;
545       pthread_t worker;
546       cpu_set_t cpuset;
547       CPU_ZERO (&cpuset);
548       CPU_SET (lcore_id, &cpuset);
549
550       ret = pthread_create (&worker, NULL /* attr */ , fp_arg, (void *) w);
551       if (ret == 0)
552         return pthread_setaffinity_np (worker, sizeof (cpu_set_t), &cpuset);
553       else
554         return ret;
555     }
556 }
557
558 static clib_error_t *
559 start_workers (vlib_main_t * vm)
560 {
561   int i, j;
562   vlib_worker_thread_t *w;
563   vlib_main_t *vm_clone;
564   void *oldheap;
565   vlib_frame_queue_t *fq;
566   vlib_thread_main_t *tm = &vlib_thread_main;
567   vlib_thread_registration_t *tr;
568   vlib_node_runtime_t *rt;
569   u32 n_vlib_mains = tm->n_vlib_mains;
570   u32 worker_thread_index;
571   u8 *main_heap = clib_mem_get_per_cpu_heap ();
572   mheap_t *main_heap_header = mheap_header (main_heap);
573
574   vec_reset_length (vlib_worker_threads);
575
576   /* Set up the main thread */
577   vec_add2_aligned (vlib_worker_threads, w, 1, CLIB_CACHE_LINE_BYTES);
578   w->elog_track.name = "main thread";
579   elog_track_register (&vm->elog_main, &w->elog_track);
580
581   if (vec_len (tm->thread_prefix))
582     {
583       w->name = format (0, "%v_main%c", tm->thread_prefix, '\0');
584       vlib_set_thread_name ((char *) w->name);
585     }
586
587 #if DPDK==1
588   w->dpdk_lcore_id = -1;
589   if (rte_socket_id)            /* do we really have dpdk linked */
590     {
591       unsigned lcore = rte_lcore_id ();
592       w->dpdk_lcore_id = lcore < RTE_MAX_LCORE ? lcore : -1;;
593     }
594 #endif
595
596   /*
597    * Truth of the matter: we always use at least two
598    * threads. So, make the main heap thread-safe
599    * and make the event log thread-safe.
600    */
601   main_heap_header->flags |= MHEAP_FLAG_THREAD_SAFE;
602   vm->elog_main.lock =
603     clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES, CLIB_CACHE_LINE_BYTES);
604   vm->elog_main.lock[0] = 0;
605
606   if (n_vlib_mains > 1)
607     {
608       vec_validate (vlib_mains, tm->n_vlib_mains - 1);
609       _vec_len (vlib_mains) = 0;
610       vec_add1 (vlib_mains, vm);
611
612       vec_validate (vlib_frame_queues, tm->n_vlib_mains - 1);
613       _vec_len (vlib_frame_queues) = 0;
614       fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS);
615       vec_add1 (vlib_frame_queues, fq);
616
617       vlib_worker_threads->wait_at_barrier =
618         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
619       vlib_worker_threads->workers_at_barrier =
620         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
621
622       /* Ask for an initial barrier sync */
623       *vlib_worker_threads->workers_at_barrier = 0;
624       *vlib_worker_threads->wait_at_barrier = 1;
625
626       worker_thread_index = 1;
627
628       for (i = 0; i < vec_len (tm->registrations); i++)
629         {
630           vlib_node_main_t *nm, *nm_clone;
631           vlib_buffer_main_t *bm_clone;
632           vlib_buffer_free_list_t *fl_clone, *fl_orig;
633           vlib_buffer_free_list_t *orig_freelist_pool;
634           int k;
635
636           tr = tm->registrations[i];
637
638           if (tr->count == 0)
639             continue;
640
641           for (k = 0; k < tr->count; k++)
642             {
643               vec_add2 (vlib_worker_threads, w, 1);
644               if (tr->mheap_size)
645                 w->thread_mheap =
646                   mheap_alloc (0 /* use VM */ , tr->mheap_size);
647               else
648                 w->thread_mheap = main_heap;
649               w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
650               w->thread_function = tr->function;
651               w->thread_function_arg = w;
652               w->instance_id = k;
653               w->registration = tr;
654
655               w->elog_track.name =
656                 (char *) format (0, "%s %d", tr->name, k + 1);
657               vec_add1 (w->elog_track.name, 0);
658               elog_track_register (&vm->elog_main, &w->elog_track);
659
660               if (tr->no_data_structure_clone)
661                 continue;
662
663               /* Allocate "to-worker-N" frame queue */
664               if (tr->frame_queue_nelts)
665                 {
666                   fq = vlib_frame_queue_alloc (tr->frame_queue_nelts);
667                 }
668               else
669                 {
670                   fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS);
671                 }
672
673               vec_validate (vlib_frame_queues, worker_thread_index);
674               vlib_frame_queues[worker_thread_index] = fq;
675
676               /* Fork vlib_global_main et al. Look for bugs here */
677               oldheap = clib_mem_set_heap (w->thread_mheap);
678
679               vm_clone = clib_mem_alloc (sizeof (*vm_clone));
680               clib_memcpy (vm_clone, vlib_mains[0], sizeof (*vm_clone));
681
682               vm_clone->cpu_index = worker_thread_index;
683               vm_clone->heap_base = w->thread_mheap;
684               vm_clone->mbuf_alloc_list = 0;
685               memset (&vm_clone->random_buffer, 0,
686                       sizeof (vm_clone->random_buffer));
687
688               nm = &vlib_mains[0]->node_main;
689               nm_clone = &vm_clone->node_main;
690               /* fork next frames array, preserving node runtime indices */
691               nm_clone->next_frames = vec_dup (nm->next_frames);
692               for (j = 0; j < vec_len (nm_clone->next_frames); j++)
693                 {
694                   vlib_next_frame_t *nf = &nm_clone->next_frames[j];
695                   u32 save_node_runtime_index;
696                   u32 save_flags;
697
698                   save_node_runtime_index = nf->node_runtime_index;
699                   save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
700                   vlib_next_frame_init (nf);
701                   nf->node_runtime_index = save_node_runtime_index;
702                   nf->flags = save_flags;
703                 }
704
705               /* fork the frame dispatch queue */
706               nm_clone->pending_frames = 0;
707               vec_validate (nm_clone->pending_frames, 10);      /* $$$$$?????? */
708               _vec_len (nm_clone->pending_frames) = 0;
709
710               /* fork nodes */
711               nm_clone->nodes = 0;
712               for (j = 0; j < vec_len (nm->nodes); j++)
713                 {
714                   vlib_node_t *n;
715                   n = clib_mem_alloc_no_fail (sizeof (*n));
716                   clib_memcpy (n, nm->nodes[j], sizeof (*n));
717                   /* none of the copied nodes have enqueue rights given out */
718                   n->owner_node_index = VLIB_INVALID_NODE_INDEX;
719                   memset (&n->stats_total, 0, sizeof (n->stats_total));
720                   memset (&n->stats_last_clear, 0,
721                           sizeof (n->stats_last_clear));
722                   vec_add1 (nm_clone->nodes, n);
723                 }
724               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
725                 vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
726
727               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
728                 vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
729               vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
730                 rt->cpu_index = vm_clone->cpu_index;
731
732               nm_clone->processes = vec_dup (nm->processes);
733
734               /* zap the (per worker) frame freelists, etc */
735               nm_clone->frame_sizes = 0;
736               nm_clone->frame_size_hash = 0;
737
738               /* Packet trace buffers are guaranteed to be empty, nothing to do here */
739
740               clib_mem_set_heap (oldheap);
741               vec_add1 (vlib_mains, vm_clone);
742
743               vm_clone->error_main.counters =
744                 vec_dup (vlib_mains[0]->error_main.counters);
745               vm_clone->error_main.counters_last_clear =
746                 vec_dup (vlib_mains[0]->error_main.counters_last_clear);
747
748               /* Fork the vlib_buffer_main_t free lists, etc. */
749               bm_clone = vec_dup (vm_clone->buffer_main);
750               vm_clone->buffer_main = bm_clone;
751
752               orig_freelist_pool = bm_clone->buffer_free_list_pool;
753               bm_clone->buffer_free_list_pool = 0;
754
755             /* *INDENT-OFF* */
756             pool_foreach (fl_orig, orig_freelist_pool,
757                           ({
758                             pool_get_aligned (bm_clone->buffer_free_list_pool,
759                                               fl_clone, CLIB_CACHE_LINE_BYTES);
760                             ASSERT (fl_orig - orig_freelist_pool
761                                     == fl_clone - bm_clone->buffer_free_list_pool);
762
763                             fl_clone[0] = fl_orig[0];
764                             fl_clone->aligned_buffers = 0;
765                             fl_clone->unaligned_buffers = 0;
766                             fl_clone->n_alloc = 0;
767                           }));
768 /* *INDENT-ON* */
769
770               worker_thread_index++;
771             }
772         }
773     }
774   else
775     {
776       /* only have non-data-structure copy threads to create... */
777       for (i = 0; i < vec_len (tm->registrations); i++)
778         {
779           tr = tm->registrations[i];
780
781           for (j = 0; j < tr->count; j++)
782             {
783               vec_add2 (vlib_worker_threads, w, 1);
784               if (tr->mheap_size)
785                 w->thread_mheap =
786                   mheap_alloc (0 /* use VM */ , tr->mheap_size);
787               else
788                 w->thread_mheap = main_heap;
789               w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
790               w->thread_function = tr->function;
791               w->thread_function_arg = w;
792               w->instance_id = j;
793               w->elog_track.name =
794                 (char *) format (0, "%s %d", tr->name, j + 1);
795               w->registration = tr;
796               vec_add1 (w->elog_track.name, 0);
797               elog_track_register (&vm->elog_main, &w->elog_track);
798             }
799         }
800     }
801
802   worker_thread_index = 1;
803
804   for (i = 0; i < vec_len (tm->registrations); i++)
805     {
806       int j;
807
808       tr = tm->registrations[i];
809
810       if (tr->use_pthreads || tm->use_pthreads)
811         {
812           for (j = 0; j < tr->count; j++)
813             {
814               w = vlib_worker_threads + worker_thread_index++;
815               if (vlib_launch_thread (vlib_worker_thread_bootstrap_fn, w, 0) <
816                   0)
817                 clib_warning ("Couldn't start '%s' pthread ", tr->name);
818             }
819         }
820       else
821         {
822           uword c;
823             /* *INDENT-OFF* */
824             clib_bitmap_foreach (c, tr->coremask, ({
825               w = vlib_worker_threads + worker_thread_index++;
826               if (vlib_launch_thread (vlib_worker_thread_bootstrap_fn, w, c) < 0)
827                 clib_warning ("Couldn't start DPDK lcore %d", c);
828
829             }));
830 /* *INDENT-ON* */
831         }
832     }
833   vlib_worker_thread_barrier_sync (vm);
834   vlib_worker_thread_barrier_release (vm);
835   return 0;
836 }
837
838 VLIB_MAIN_LOOP_ENTER_FUNCTION (start_workers);
839
840 void
841 vlib_worker_thread_node_runtime_update (void)
842 {
843   int i, j;
844   vlib_worker_thread_t *w;
845   vlib_main_t *vm;
846   vlib_node_main_t *nm, *nm_clone;
847   vlib_node_t **old_nodes_clone;
848   vlib_main_t *vm_clone;
849   vlib_node_runtime_t *rt, *old_rt;
850   void *oldheap;
851   never_inline void
852     vlib_node_runtime_sync_stats (vlib_main_t * vm,
853                                   vlib_node_runtime_t * r,
854                                   uword n_calls,
855                                   uword n_vectors, uword n_clocks);
856
857   ASSERT (os_get_cpu_number () == 0);
858
859   if (vec_len (vlib_mains) == 0)
860     return;
861
862   vm = vlib_mains[0];
863   nm = &vm->node_main;
864
865   ASSERT (os_get_cpu_number () == 0);
866   ASSERT (*vlib_worker_threads->wait_at_barrier == 1);
867
868   /*
869    * Scrape all runtime stats, so we don't lose node runtime(s) with
870    * pending counts, or throw away worker / io thread counts.
871    */
872   for (j = 0; j < vec_len (nm->nodes); j++)
873     {
874       vlib_node_t *n;
875       n = nm->nodes[j];
876       vlib_node_sync_stats (vm, n);
877     }
878
879   for (i = 1; i < vec_len (vlib_mains); i++)
880     {
881       vlib_node_t *n;
882
883       vm_clone = vlib_mains[i];
884       nm_clone = &vm_clone->node_main;
885
886       for (j = 0; j < vec_len (nm_clone->nodes); j++)
887         {
888           n = nm_clone->nodes[j];
889
890           rt = vlib_node_get_runtime (vm_clone, n->index);
891           vlib_node_runtime_sync_stats (vm_clone, rt, 0, 0, 0);
892         }
893     }
894
895   for (i = 1; i < vec_len (vlib_mains); i++)
896     {
897       vlib_node_runtime_t *rt;
898       w = vlib_worker_threads + i;
899       oldheap = clib_mem_set_heap (w->thread_mheap);
900
901       vm_clone = vlib_mains[i];
902
903       /* Re-clone error heap */
904       u64 *old_counters = vm_clone->error_main.counters;
905       u64 *old_counters_all_clear = vm_clone->error_main.counters_last_clear;
906       clib_memcpy (&vm_clone->error_main, &vm->error_main,
907                    sizeof (vm->error_main));
908       j = vec_len (vm->error_main.counters) - 1;
909       vec_validate_aligned (old_counters, j, CLIB_CACHE_LINE_BYTES);
910       vec_validate_aligned (old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES);
911       vm_clone->error_main.counters = old_counters;
912       vm_clone->error_main.counters_last_clear = old_counters_all_clear;
913
914       nm_clone = &vm_clone->node_main;
915       vec_free (nm_clone->next_frames);
916       nm_clone->next_frames = vec_dup (nm->next_frames);
917
918       for (j = 0; j < vec_len (nm_clone->next_frames); j++)
919         {
920           vlib_next_frame_t *nf = &nm_clone->next_frames[j];
921           u32 save_node_runtime_index;
922           u32 save_flags;
923
924           save_node_runtime_index = nf->node_runtime_index;
925           save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
926           vlib_next_frame_init (nf);
927           nf->node_runtime_index = save_node_runtime_index;
928           nf->flags = save_flags;
929         }
930
931       old_nodes_clone = nm_clone->nodes;
932       nm_clone->nodes = 0;
933
934       /* re-fork nodes */
935       for (j = 0; j < vec_len (nm->nodes); j++)
936         {
937           vlib_node_t *old_n_clone;
938           vlib_node_t *new_n, *new_n_clone;
939
940           new_n = nm->nodes[j];
941           old_n_clone = old_nodes_clone[j];
942
943           new_n_clone = clib_mem_alloc_no_fail (sizeof (*new_n_clone));
944           clib_memcpy (new_n_clone, new_n, sizeof (*new_n));
945           /* none of the copied nodes have enqueue rights given out */
946           new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX;
947
948           if (j >= vec_len (old_nodes_clone))
949             {
950               /* new node, set to zero */
951               memset (&new_n_clone->stats_total, 0,
952                       sizeof (new_n_clone->stats_total));
953               memset (&new_n_clone->stats_last_clear, 0,
954                       sizeof (new_n_clone->stats_last_clear));
955             }
956           else
957             {
958               /* Copy stats if the old data is valid */
959               clib_memcpy (&new_n_clone->stats_total,
960                            &old_n_clone->stats_total,
961                            sizeof (new_n_clone->stats_total));
962               clib_memcpy (&new_n_clone->stats_last_clear,
963                            &old_n_clone->stats_last_clear,
964                            sizeof (new_n_clone->stats_last_clear));
965
966               /* keep previous node state */
967               new_n_clone->state = old_n_clone->state;
968             }
969           vec_add1 (nm_clone->nodes, new_n_clone);
970         }
971       /* Free the old node clone */
972       for (j = 0; j < vec_len (old_nodes_clone); j++)
973         clib_mem_free (old_nodes_clone[j]);
974       vec_free (old_nodes_clone);
975
976       vec_free (nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
977
978       nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
979         vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
980
981       /* clone input node runtime */
982       old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT];
983
984       nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
985         vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
986
987       vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
988       {
989         rt->cpu_index = vm_clone->cpu_index;
990       }
991
992       for (j = 0; j < vec_len (old_rt); j++)
993         {
994           rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
995           rt->state = old_rt[j].state;
996         }
997
998       vec_free (old_rt);
999
1000       nm_clone->processes = vec_dup (nm->processes);
1001
1002       clib_mem_set_heap (oldheap);
1003
1004       // vnet_main_fork_fixup (i);
1005     }
1006 }
1007
1008 u32
1009 unformat_sched_policy (unformat_input_t * input, va_list * args)
1010 {
1011   u32 *r = va_arg (*args, u32 *);
1012
1013   if (0);
1014 #define _(v,f,s) else if (unformat (input, s)) *r = SCHED_POLICY_##f;
1015   foreach_sched_policy
1016 #undef _
1017     else
1018     return 0;
1019   return 1;
1020 }
1021
1022 static clib_error_t *
1023 cpu_config (vlib_main_t * vm, unformat_input_t * input)
1024 {
1025   vlib_thread_registration_t *tr;
1026   uword *p;
1027   vlib_thread_main_t *tm = &vlib_thread_main;
1028   u8 *name;
1029   u64 coremask;
1030   uword *bitmap;
1031   u32 count;
1032
1033   tm->thread_registrations_by_name = hash_create_string (0, sizeof (uword));
1034
1035   tm->n_thread_stacks = 1;      /* account for main thread */
1036   tm->sched_policy = ~0;
1037   tm->sched_priority = ~0;
1038
1039   tr = tm->next;
1040
1041   while (tr)
1042     {
1043       hash_set_mem (tm->thread_registrations_by_name, tr->name, (uword) tr);
1044       tr = tr->next;
1045     }
1046
1047   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1048     {
1049       if (unformat (input, "use-pthreads"))
1050         tm->use_pthreads = 1;
1051       else if (unformat (input, "thread-prefix %v", &tm->thread_prefix))
1052         ;
1053       else if (unformat (input, "main-core %u", &tm->main_lcore))
1054         ;
1055       else if (unformat (input, "skip-cores %u", &tm->skip_cores))
1056         ;
1057       else if (unformat (input, "coremask-%s %llx", &name, &coremask))
1058         {
1059           p = hash_get_mem (tm->thread_registrations_by_name, name);
1060           if (p == 0)
1061             return clib_error_return (0, "no such thread type '%s'", name);
1062
1063           tr = (vlib_thread_registration_t *) p[0];
1064
1065           if (tr->use_pthreads)
1066             return clib_error_return (0,
1067                                       "coremask cannot be set for '%s' threads",
1068                                       name);
1069
1070           tr->coremask = clib_bitmap_set_multiple
1071             (tr->coremask, 0, coremask, BITS (coremask));
1072           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1073         }
1074       else if (unformat (input, "corelist-%s %U", &name, unformat_bitmap_list,
1075                          &bitmap))
1076         {
1077           p = hash_get_mem (tm->thread_registrations_by_name, name);
1078           if (p == 0)
1079             return clib_error_return (0, "no such thread type '%s'", name);
1080
1081           tr = (vlib_thread_registration_t *) p[0];
1082
1083           if (tr->use_pthreads)
1084             return clib_error_return (0,
1085                                       "corelist cannot be set for '%s' threads",
1086                                       name);
1087
1088           tr->coremask = bitmap;
1089           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1090         }
1091       else
1092         if (unformat
1093             (input, "scheduler-policy %U", unformat_sched_policy,
1094              &tm->sched_policy))
1095         ;
1096       else if (unformat (input, "scheduler-prio %u", &tm->sched_priority))
1097         ;
1098       else if (unformat (input, "%s %u", &name, &count))
1099         {
1100           p = hash_get_mem (tm->thread_registrations_by_name, name);
1101           if (p == 0)
1102             return clib_error_return (0, "no such thread type 3 '%s'", name);
1103
1104           tr = (vlib_thread_registration_t *) p[0];
1105           if (tr->fixed_count)
1106             return clib_error_return
1107               (0, "number of %s threads not configurable", tr->name);
1108           tr->count = count;
1109         }
1110       else
1111         break;
1112     }
1113
1114   if (tm->sched_policy != ~0)
1115     {
1116       if (tm->sched_priority != ~0
1117           && (tm->sched_policy == SCHED_FIFO || tm->sched_policy == SCHED_RR))
1118         {
1119           u32 prio_max = sched_get_priority_max (tm->sched_policy);
1120           u32 prio_min = sched_get_priority_min (tm->sched_policy);
1121           if (tm->sched_priority > prio_max)
1122             tm->sched_priority = prio_max;
1123           if (tm->sched_priority < prio_min)
1124             tm->sched_priority = prio_min;
1125         }
1126       else
1127         tm->sched_priority = 0;
1128     }
1129   tr = tm->next;
1130
1131   if (!tm->thread_prefix)
1132     tm->thread_prefix = format (0, "vpp");
1133
1134   while (tr)
1135     {
1136       tm->n_thread_stacks += tr->count;
1137       tm->n_pthreads += tr->count * tr->use_pthreads;
1138       tm->n_eal_threads += tr->count * (tr->use_pthreads == 0);
1139       tr = tr->next;
1140     }
1141
1142   return 0;
1143 }
1144
1145 VLIB_EARLY_CONFIG_FUNCTION (cpu_config, "cpu");
1146
1147 #if !defined (__x86_64__) && !defined (__aarch64__) && !defined (__powerpc64__) && !defined(__arm__)
1148 void
1149 __sync_fetch_and_add_8 (void)
1150 {
1151   fformat (stderr, "%s called\n", __FUNCTION__);
1152   abort ();
1153 }
1154
1155 void
1156 __sync_add_and_fetch_8 (void)
1157 {
1158   fformat (stderr, "%s called\n", __FUNCTION__);
1159   abort ();
1160 }
1161 #endif
1162
1163 void vnet_main_fixup (vlib_fork_fixup_t which) __attribute__ ((weak));
1164 void
1165 vnet_main_fixup (vlib_fork_fixup_t which)
1166 {
1167 }
1168
1169 void
1170 vlib_worker_thread_fork_fixup (vlib_fork_fixup_t which)
1171 {
1172   vlib_main_t *vm = vlib_get_main ();
1173
1174   if (vlib_mains == 0)
1175     return;
1176
1177   ASSERT (os_get_cpu_number () == 0);
1178   vlib_worker_thread_barrier_sync (vm);
1179
1180   switch (which)
1181     {
1182     case VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX:
1183       vnet_main_fixup (VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX);
1184       break;
1185
1186     default:
1187       ASSERT (0);
1188     }
1189   vlib_worker_thread_barrier_release (vm);
1190 }
1191
1192 void
1193 vlib_worker_thread_barrier_sync (vlib_main_t * vm)
1194 {
1195   f64 deadline;
1196   u32 count;
1197
1198   if (!vlib_mains)
1199     return;
1200
1201   count = vec_len (vlib_mains) - 1;
1202
1203   /* Tolerate recursive calls */
1204   if (++vlib_worker_threads[0].recursion_level > 1)
1205     return;
1206
1207   vlib_worker_threads[0].barrier_sync_count++;
1208
1209   ASSERT (os_get_cpu_number () == 0);
1210
1211   deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT;
1212
1213   *vlib_worker_threads->wait_at_barrier = 1;
1214   while (*vlib_worker_threads->workers_at_barrier != count)
1215     {
1216       if (vlib_time_now (vm) > deadline)
1217         {
1218           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1219           os_panic ();
1220         }
1221     }
1222 }
1223
1224 void
1225 vlib_worker_thread_barrier_release (vlib_main_t * vm)
1226 {
1227   f64 deadline;
1228
1229   if (!vlib_mains)
1230     return;
1231
1232   if (--vlib_worker_threads[0].recursion_level > 0)
1233     return;
1234
1235   deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT;
1236
1237   *vlib_worker_threads->wait_at_barrier = 0;
1238
1239   while (*vlib_worker_threads->workers_at_barrier > 0)
1240     {
1241       if (vlib_time_now (vm) > deadline)
1242         {
1243           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1244           os_panic ();
1245         }
1246     }
1247 }
1248
1249 /*
1250  * Check the frame queue to see if any frames are available.
1251  * If so, pull the packets off the frames and put them to
1252  * the handoff node.
1253  */
1254 static inline int
1255 vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
1256 {
1257   u32 thread_id = vm->cpu_index;
1258   vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
1259   vlib_frame_queue_elt_t *elt;
1260   u32 *from, *to;
1261   vlib_frame_t *f;
1262   int msg_type;
1263   int processed = 0;
1264   u32 n_left_to_node;
1265   u32 vectors = 0;
1266   vlib_thread_main_t *tm = vlib_get_thread_main ();
1267
1268   ASSERT (fq);
1269   ASSERT (vm == vlib_mains[thread_id]);
1270
1271   if (PREDICT_FALSE (tm->handoff_dispatch_node_index == ~0))
1272     return 0;
1273   /*
1274    * Gather trace data for frame queues
1275    */
1276   if (PREDICT_FALSE (fq->trace))
1277     {
1278       frame_queue_trace_t *fqt;
1279       frame_queue_nelt_counter_t *fqh;
1280       u32 elix;
1281
1282       fqt = &tm->frame_queue_traces[thread_id];
1283
1284       fqt->nelts = fq->nelts;
1285       fqt->head = fq->head;
1286       fqt->head_hint = fq->head_hint;
1287       fqt->tail = fq->tail;
1288       fqt->threshold = fq->vector_threshold;
1289       fqt->n_in_use = fqt->tail - fqt->head;
1290       if (fqt->n_in_use >= fqt->nelts)
1291         {
1292           // if beyond max then use max
1293           fqt->n_in_use = fqt->nelts - 1;
1294         }
1295
1296       /* Record the number of elements in use in the histogram */
1297       fqh = &tm->frame_queue_histogram[thread_id];
1298       fqh->count[fqt->n_in_use]++;
1299
1300       /* Record a snapshot of the elements in use */
1301       for (elix = 0; elix < fqt->nelts; elix++)
1302         {
1303           elt = fq->elts + ((fq->head + 1 + elix) & (fq->nelts - 1));
1304           if (1 || elt->valid)
1305             {
1306               fqt->n_vectors[elix] = elt->n_vectors;
1307             }
1308         }
1309       fqt->written = 1;
1310     }
1311
1312   while (1)
1313     {
1314       if (fq->head == fq->tail)
1315         {
1316           fq->head_hint = fq->head;
1317           return processed;
1318         }
1319
1320       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
1321
1322       if (!elt->valid)
1323         {
1324           fq->head_hint = fq->head;
1325           return processed;
1326         }
1327
1328       from = elt->buffer_index;
1329       msg_type = elt->msg_type;
1330
1331       ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME);
1332       ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE);
1333
1334       f = vlib_get_frame_to_node (vm, tm->handoff_dispatch_node_index);
1335
1336       to = vlib_frame_vector_args (f);
1337
1338       n_left_to_node = elt->n_vectors;
1339
1340       while (n_left_to_node >= 4)
1341         {
1342           to[0] = from[0];
1343           to[1] = from[1];
1344           to[2] = from[2];
1345           to[3] = from[3];
1346           to += 4;
1347           from += 4;
1348           n_left_to_node -= 4;
1349         }
1350
1351       while (n_left_to_node > 0)
1352         {
1353           to[0] = from[0];
1354           to++;
1355           from++;
1356           n_left_to_node--;
1357         }
1358
1359       vectors += elt->n_vectors;
1360       f->n_vectors = elt->n_vectors;
1361       vlib_put_frame_to_node (vm, tm->handoff_dispatch_node_index, f);
1362
1363       elt->valid = 0;
1364       elt->n_vectors = 0;
1365       elt->msg_type = 0xfefefefe;
1366       CLIB_MEMORY_BARRIER ();
1367       fq->head++;
1368       processed++;
1369
1370       /*
1371        * Limit the number of packets pushed into the graph
1372        */
1373       if (vectors >= fq->vector_threshold)
1374         {
1375           fq->head_hint = fq->head;
1376           return processed;
1377         }
1378     }
1379   ASSERT (0);
1380   return processed;
1381 }
1382
1383 static_always_inline void
1384 vlib_worker_thread_internal (vlib_main_t * vm)
1385 {
1386   vlib_node_main_t *nm = &vm->node_main;
1387   u64 cpu_time_now = clib_cpu_time_now ();
1388
1389   vec_alloc (nm->pending_interrupt_node_runtime_indices, 32);
1390
1391   while (1)
1392     {
1393       vlib_worker_thread_barrier_check ();
1394
1395       vlib_frame_queue_dequeue_internal (vm);
1396
1397       vlib_node_runtime_t *n;
1398       vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT])
1399       {
1400         cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
1401                                       VLIB_NODE_STATE_POLLING, /* frame */ 0,
1402                                       cpu_time_now);
1403       }
1404
1405       /* Next handle interrupts. */
1406       {
1407         uword l = _vec_len (nm->pending_interrupt_node_runtime_indices);
1408         uword i;
1409         if (l > 0)
1410           {
1411             _vec_len (nm->pending_interrupt_node_runtime_indices) = 0;
1412             for (i = 0; i < l; i++)
1413               {
1414                 n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
1415                                       nm->
1416                                       pending_interrupt_node_runtime_indices
1417                                       [i]);
1418                 cpu_time_now =
1419                   dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
1420                                  VLIB_NODE_STATE_INTERRUPT,
1421                                  /* frame */ 0,
1422                                  cpu_time_now);
1423               }
1424           }
1425       }
1426
1427       if (_vec_len (nm->pending_frames))
1428         {
1429           int i;
1430           cpu_time_now = clib_cpu_time_now ();
1431           for (i = 0; i < _vec_len (nm->pending_frames); i++)
1432             {
1433               vlib_pending_frame_t *p;
1434
1435               p = nm->pending_frames + i;
1436
1437               cpu_time_now = dispatch_pending_node (vm, p, cpu_time_now);
1438             }
1439           _vec_len (nm->pending_frames) = 0;
1440         }
1441       vlib_increment_main_loop_counter (vm);
1442
1443       /* Record time stamp in case there are no enabled nodes and above
1444          calls do not update time stamp. */
1445       cpu_time_now = clib_cpu_time_now ();
1446     }
1447 }
1448
1449 void
1450 vlib_worker_thread_fn (void *arg)
1451 {
1452   vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg;
1453   vlib_thread_main_t *tm = vlib_get_thread_main ();
1454   vlib_main_t *vm = vlib_get_main ();
1455
1456   ASSERT (vm->cpu_index == os_get_cpu_number ());
1457
1458   vlib_worker_thread_init (w);
1459   clib_time_init (&vm->clib_time);
1460   clib_mem_set_heap (w->thread_mheap);
1461
1462   /* Wait until the dpdk init sequence is complete */
1463   while (tm->worker_thread_release == 0)
1464     vlib_worker_thread_barrier_check ();
1465
1466   vlib_worker_thread_internal (vm);
1467 }
1468
1469 /* *INDENT-OFF* */
1470 VLIB_REGISTER_THREAD (worker_thread_reg, static) = {
1471   .name = "workers",
1472   .short_name = "wk",
1473   .function = vlib_worker_thread_fn,
1474 };
1475 /* *INDENT-ON* */
1476
1477 clib_error_t *
1478 threads_init (vlib_main_t * vm)
1479 {
1480   vlib_thread_main_t *tm = vlib_get_thread_main ();
1481
1482   tm->handoff_dispatch_node_index = ~0;
1483
1484   return 0;
1485 }
1486
1487 VLIB_INIT_FUNCTION (threads_init);
1488
1489 /*
1490  * fd.io coding-style-patch-verification: ON
1491  *
1492  * Local Variables:
1493  * eval: (c-set-style "gnu")
1494  * End:
1495  */