Scheduler policy & priority config, few minor fixes (VPP-425)
[vpp.git] / vlib / vlib / threads.c
1 /*
2  * Copyright (c) 2015 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define _GNU_SOURCE
16
17 #include <signal.h>
18 #include <math.h>
19 #include <vppinfra/format.h>
20 #include <vlib/vlib.h>
21
22 #include <vlib/threads.h>
23 #include <vlib/unix/cj.h>
24
25
26 #if DPDK==1
27 #include <rte_config.h>
28 #include <rte_common.h>
29 #include <rte_eal.h>
30 #include <rte_launch.h>
31 #include <rte_lcore.h>
32 #endif
33 DECLARE_CJ_GLOBAL_LOG;
34
35 #define FRAME_QUEUE_NELTS 32
36
37
38 #if DPDK==1
39 /*
40  *  Weak definitions of DPDK symbols used in this file.
41  *  Needed for linking test programs without DPDK libs.
42  */
43 unsigned __thread __attribute__ ((weak)) RTE_PER_LCORE (_lcore_id);
44 struct lcore_config __attribute__ ((weak)) lcore_config[];
45 unsigned __attribute__ ((weak)) rte_socket_id ();
46 int __attribute__ ((weak)) rte_eal_remote_launch ();
47 #endif
48 u32
49 vl (void *p)
50 {
51   return vec_len (p);
52 }
53
54 vlib_thread_main_t vlib_thread_main;
55
56 uword
57 os_get_cpu_number (void)
58 {
59   void *sp;
60   uword n;
61   u32 len;
62
63   len = vec_len (vlib_thread_stacks);
64   if (len == 0)
65     return 0;
66
67   /* Get any old stack address. */
68   sp = &sp;
69
70   n = ((uword) sp - (uword) vlib_thread_stacks[0])
71     >> VLIB_LOG2_THREAD_STACK_SIZE;
72
73   /* "processes" have their own stacks, and they always run in thread 0 */
74   n = n >= len ? 0 : n;
75
76   return n;
77 }
78
79 uword
80 os_get_ncpus (void)
81 {
82   u32 len;
83
84   len = vec_len (vlib_thread_stacks);
85   if (len == 0)
86     return 1;
87   else
88     return len;
89 }
90
91 void
92 vlib_set_thread_name (char *name)
93 {
94   int pthread_setname_np (pthread_t __target_thread, const char *__name);
95   int rv;
96   pthread_t thread = pthread_self ();
97
98   if (thread)
99     {
100       rv = pthread_setname_np (thread, name);
101       if (rv)
102         clib_warning ("pthread_setname_np returned %d", rv);
103     }
104 }
105
106 static int
107 sort_registrations_by_no_clone (void *a0, void *a1)
108 {
109   vlib_thread_registration_t **tr0 = a0;
110   vlib_thread_registration_t **tr1 = a1;
111
112   return ((i32) ((*tr0)->no_data_structure_clone)
113           - ((i32) ((*tr1)->no_data_structure_clone)));
114 }
115
116 static uword *
117 vlib_sysfs_list_to_bitmap (char *filename)
118 {
119   FILE *fp;
120   uword *r = 0;
121
122   fp = fopen (filename, "r");
123
124   if (fp != NULL)
125     {
126       u8 *buffer = 0;
127       vec_validate (buffer, 256 - 1);
128       if (fgets ((char *) buffer, 256, fp))
129         {
130           unformat_input_t in;
131           unformat_init_string (&in, (char *) buffer,
132                                 strlen ((char *) buffer));
133           if (unformat (&in, "%U", unformat_bitmap_list, &r) != 1)
134             clib_warning ("unformat_bitmap_list failed");
135           unformat_free (&in);
136         }
137       vec_free (buffer);
138       fclose (fp);
139     }
140   return r;
141 }
142
143
144 /* Called early in the init sequence */
145
146 clib_error_t *
147 vlib_thread_init (vlib_main_t * vm)
148 {
149   vlib_thread_main_t *tm = &vlib_thread_main;
150   vlib_worker_thread_t *w;
151   vlib_thread_registration_t *tr;
152   u32 n_vlib_mains = 1;
153   u32 first_index = 1;
154   u32 i;
155   uword *avail_cpu;
156
157   /* get bitmaps of active cpu cores and sockets */
158   tm->cpu_core_bitmap =
159     vlib_sysfs_list_to_bitmap ("/sys/devices/system/cpu/online");
160   tm->cpu_socket_bitmap =
161     vlib_sysfs_list_to_bitmap ("/sys/devices/system/node/online");
162
163   avail_cpu = clib_bitmap_dup (tm->cpu_core_bitmap);
164
165   /* skip cores */
166   for (i = 0; i < tm->skip_cores; i++)
167     {
168       uword c = clib_bitmap_first_set (avail_cpu);
169       if (c == ~0)
170         return clib_error_return (0, "no available cpus to skip");
171
172       avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
173     }
174
175   /* grab cpu for main thread */
176   if (!tm->main_lcore)
177     {
178       tm->main_lcore = clib_bitmap_first_set (avail_cpu);
179       if (tm->main_lcore == (u8) ~ 0)
180         return clib_error_return (0, "no available cpus to be used for the"
181                                   " main thread");
182     }
183   else
184     {
185       if (clib_bitmap_get (avail_cpu, tm->main_lcore) == 0)
186         return clib_error_return (0, "cpu %u is not available to be used"
187                                   " for the main thread", tm->main_lcore);
188     }
189   avail_cpu = clib_bitmap_set (avail_cpu, tm->main_lcore, 0);
190
191   /* assume that there is socket 0 only if there is no data from sysfs */
192   if (!tm->cpu_socket_bitmap)
193     tm->cpu_socket_bitmap = clib_bitmap_set (0, 0, 1);
194
195   /* pin main thread to main_lcore  */
196 #if DPDK==0
197   {
198     cpu_set_t cpuset;
199     CPU_ZERO (&cpuset);
200     CPU_SET (tm->main_lcore, &cpuset);
201     pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
202   }
203 #endif
204
205   /* as many threads as stacks... */
206   vec_validate_aligned (vlib_worker_threads, vec_len (vlib_thread_stacks) - 1,
207                         CLIB_CACHE_LINE_BYTES);
208
209   /* Preallocate thread 0 */
210   _vec_len (vlib_worker_threads) = 1;
211   w = vlib_worker_threads;
212   w->thread_mheap = clib_mem_get_heap ();
213   w->thread_stack = vlib_thread_stacks[0];
214   w->dpdk_lcore_id = -1;
215   w->lwp = syscall (SYS_gettid);
216   tm->n_vlib_mains = 1;
217
218   if (tm->sched_policy != ~0)
219     {
220       struct sched_param sched_param;
221       if (!sched_getparam (w->lwp, &sched_param))
222         {
223           if (tm->sched_priority != ~0)
224             sched_param.sched_priority = tm->sched_priority;
225           sched_setscheduler (w->lwp, tm->sched_policy, &sched_param);
226         }
227     }
228
229   /* assign threads to cores and set n_vlib_mains */
230   tr = tm->next;
231
232   while (tr)
233     {
234       vec_add1 (tm->registrations, tr);
235       tr = tr->next;
236     }
237
238   vec_sort_with_function (tm->registrations, sort_registrations_by_no_clone);
239
240   for (i = 0; i < vec_len (tm->registrations); i++)
241     {
242       int j;
243       tr = tm->registrations[i];
244       tr->first_index = first_index;
245       first_index += tr->count;
246       n_vlib_mains += (tr->no_data_structure_clone == 0) ? tr->count : 0;
247
248       /* construct coremask */
249       if (tr->use_pthreads || !tr->count)
250         continue;
251
252       if (tr->coremask)
253         {
254           uword c;
255           /* *INDENT-OFF* */
256           clib_bitmap_foreach (c, tr->coremask, ({
257             if (clib_bitmap_get(avail_cpu, c) == 0)
258               return clib_error_return (0, "cpu %u is not available to be used"
259                                         " for the '%s' thread",c, tr->name);
260
261             avail_cpu = clib_bitmap_set(avail_cpu, c, 0);
262           }));
263 /* *INDENT-ON* */
264
265         }
266       else
267         {
268           for (j = 0; j < tr->count; j++)
269             {
270               uword c = clib_bitmap_first_set (avail_cpu);
271               if (c == ~0)
272                 return clib_error_return (0,
273                                           "no available cpus to be used for"
274                                           " the '%s' thread", tr->name);
275
276               avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
277               tr->coremask = clib_bitmap_set (tr->coremask, c, 1);
278             }
279         }
280     }
281
282   clib_bitmap_free (avail_cpu);
283
284   tm->n_vlib_mains = n_vlib_mains;
285
286   vec_validate_aligned (vlib_worker_threads, first_index - 1,
287                         CLIB_CACHE_LINE_BYTES);
288
289
290   tm->efd.enabled = VLIB_EFD_DISABLED;
291   tm->efd.queue_hi_thresh = ((VLIB_EFD_DEF_WORKER_HI_THRESH_PCT *
292                               FRAME_QUEUE_NELTS) / 100);
293   return 0;
294 }
295
296 vlib_worker_thread_t *
297 vlib_alloc_thread (vlib_main_t * vm)
298 {
299   vlib_worker_thread_t *w;
300
301   if (vec_len (vlib_worker_threads) >= vec_len (vlib_thread_stacks))
302     {
303       clib_warning ("out of worker threads... Quitting...");
304       exit (1);
305     }
306   vec_add2 (vlib_worker_threads, w, 1);
307   w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
308   return w;
309 }
310
311 vlib_frame_queue_t *
312 vlib_frame_queue_alloc (int nelts)
313 {
314   vlib_frame_queue_t *fq;
315
316   fq = clib_mem_alloc_aligned (sizeof (*fq), CLIB_CACHE_LINE_BYTES);
317   memset (fq, 0, sizeof (*fq));
318   fq->nelts = nelts;
319   fq->vector_threshold = 128;   // packets
320   vec_validate_aligned (fq->elts, nelts - 1, CLIB_CACHE_LINE_BYTES);
321
322   if (1)
323     {
324       if (((uword) & fq->tail) & (CLIB_CACHE_LINE_BYTES - 1))
325         fformat (stderr, "WARNING: fq->tail unaligned\n");
326       if (((uword) & fq->head) & (CLIB_CACHE_LINE_BYTES - 1))
327         fformat (stderr, "WARNING: fq->head unaligned\n");
328       if (((uword) fq->elts) & (CLIB_CACHE_LINE_BYTES - 1))
329         fformat (stderr, "WARNING: fq->elts unaligned\n");
330
331       if (sizeof (fq->elts[0]) % CLIB_CACHE_LINE_BYTES)
332         fformat (stderr, "WARNING: fq->elts[0] size %d\n",
333                  sizeof (fq->elts[0]));
334       if (nelts & (nelts - 1))
335         {
336           fformat (stderr, "FATAL: nelts MUST be a power of 2\n");
337           abort ();
338         }
339     }
340
341   return (fq);
342 }
343
344 void vl_msg_api_handler_no_free (void *) __attribute__ ((weak));
345 void
346 vl_msg_api_handler_no_free (void *v)
347 {
348 }
349
350 /* Turned off, save as reference material... */
351 #if 0
352 static inline int
353 vlib_frame_queue_dequeue_internal (int thread_id,
354                                    vlib_main_t * vm, vlib_node_main_t * nm)
355 {
356   vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
357   vlib_frame_queue_elt_t *elt;
358   vlib_frame_t *f;
359   vlib_pending_frame_t *p;
360   vlib_node_runtime_t *r;
361   u32 node_runtime_index;
362   int msg_type;
363   u64 before;
364   int processed = 0;
365
366   ASSERT (vm == vlib_mains[thread_id]);
367
368   while (1)
369     {
370       if (fq->head == fq->tail)
371         return processed;
372
373       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
374
375       if (!elt->valid)
376         return processed;
377
378       before = clib_cpu_time_now ();
379
380       f = elt->frame;
381       node_runtime_index = elt->node_runtime_index;
382       msg_type = elt->msg_type;
383
384       switch (msg_type)
385         {
386         case VLIB_FRAME_QUEUE_ELT_FREE_BUFFERS:
387           vlib_buffer_free (vm, vlib_frame_vector_args (f), f->n_vectors);
388           /* note fallthrough... */
389         case VLIB_FRAME_QUEUE_ELT_FREE_FRAME:
390           r = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
391                                 node_runtime_index);
392           vlib_frame_free (vm, r, f);
393           break;
394         case VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME:
395           vec_add2 (vm->node_main.pending_frames, p, 1);
396           f->flags |= (VLIB_FRAME_PENDING | VLIB_FRAME_FREE_AFTER_DISPATCH);
397           p->node_runtime_index = elt->node_runtime_index;
398           p->frame_index = vlib_frame_index (vm, f);
399           p->next_frame_index = VLIB_PENDING_FRAME_NO_NEXT_FRAME;
400           fq->dequeue_vectors += (u64) f->n_vectors;
401           break;
402         case VLIB_FRAME_QUEUE_ELT_API_MSG:
403           vl_msg_api_handler_no_free (f);
404           break;
405         default:
406           clib_warning ("bogus frame queue message, type %d", msg_type);
407           break;
408         }
409       elt->valid = 0;
410       fq->dequeues++;
411       fq->dequeue_ticks += clib_cpu_time_now () - before;
412       CLIB_MEMORY_BARRIER ();
413       fq->head++;
414       processed++;
415     }
416   ASSERT (0);
417   return processed;
418 }
419
420 int
421 vlib_frame_queue_dequeue (int thread_id,
422                           vlib_main_t * vm, vlib_node_main_t * nm)
423 {
424   return vlib_frame_queue_dequeue_internal (thread_id, vm, nm);
425 }
426
427 int
428 vlib_frame_queue_enqueue (vlib_main_t * vm, u32 node_runtime_index,
429                           u32 frame_queue_index, vlib_frame_t * frame,
430                           vlib_frame_queue_msg_type_t type)
431 {
432   vlib_frame_queue_t *fq = vlib_frame_queues[frame_queue_index];
433   vlib_frame_queue_elt_t *elt;
434   u32 save_count;
435   u64 new_tail;
436   u64 before = clib_cpu_time_now ();
437
438   ASSERT (fq);
439
440   new_tail = __sync_add_and_fetch (&fq->tail, 1);
441
442   /* Wait until a ring slot is available */
443   while (new_tail >= fq->head + fq->nelts)
444     {
445       f64 b4 = vlib_time_now_ticks (vm, before);
446       vlib_worker_thread_barrier_check (vm, b4);
447       /* Bad idea. Dequeue -> enqueue -> dequeue -> trouble */
448       // vlib_frame_queue_dequeue (vm->cpu_index, vm, nm);
449     }
450
451   elt = fq->elts + (new_tail & (fq->nelts - 1));
452
453   /* this would be very bad... */
454   while (elt->valid)
455     {
456     }
457
458   /* Once we enqueue the frame, frame->n_vectors is owned elsewhere... */
459   save_count = frame->n_vectors;
460
461   elt->frame = frame;
462   elt->node_runtime_index = node_runtime_index;
463   elt->msg_type = type;
464   CLIB_MEMORY_BARRIER ();
465   elt->valid = 1;
466
467   return save_count;
468 }
469 #endif /* 0 */
470
471 /* To be called by vlib worker threads upon startup */
472 void
473 vlib_worker_thread_init (vlib_worker_thread_t * w)
474 {
475   vlib_thread_main_t *tm = vlib_get_thread_main ();
476
477   /* worker threads wants no signals. */
478   {
479     sigset_t s;
480     sigfillset (&s);
481     pthread_sigmask (SIG_SETMASK, &s, 0);
482   }
483
484   clib_mem_set_heap (w->thread_mheap);
485
486   if (vec_len (tm->thread_prefix) && w->registration->short_name)
487     {
488       w->name = format (0, "%v_%s_%d%c", tm->thread_prefix,
489                         w->registration->short_name, w->instance_id, '\0');
490       vlib_set_thread_name ((char *) w->name);
491     }
492
493   if (!w->registration->use_pthreads)
494     {
495
496       /* Initial barrier sync, for both worker and i/o threads */
497       clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, 1);
498
499       while (*vlib_worker_threads->wait_at_barrier)
500         ;
501
502       clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, -1);
503     }
504 }
505
506 void *
507 vlib_worker_thread_bootstrap_fn (void *arg)
508 {
509   void *rv;
510   vlib_worker_thread_t *w = arg;
511
512   w->lwp = syscall (SYS_gettid);
513   w->dpdk_lcore_id = -1;
514 #if DPDK==1
515   if (w->registration && !w->registration->use_pthreads && rte_socket_id)       /* do we really have dpdk linked */
516     {
517       unsigned lcore = rte_lcore_id ();
518       lcore = lcore < RTE_MAX_LCORE ? lcore : -1;
519       w->dpdk_lcore_id = lcore;
520     }
521 #endif
522
523   rv = (void *) clib_calljmp
524     ((uword (*)(uword)) w->thread_function,
525      (uword) arg, w->thread_stack + VLIB_THREAD_STACK_SIZE);
526   /* NOTREACHED, we hope */
527   return rv;
528 }
529
530 static int
531 vlib_launch_thread (void *fp, vlib_worker_thread_t * w, unsigned lcore_id)
532 {
533   void *(*fp_arg) (void *) = fp;
534
535 #if DPDK==1
536   if (!w->registration->use_pthreads)
537     if (rte_eal_remote_launch)  /* do we have dpdk linked */
538       return rte_eal_remote_launch (fp, (void *) w, lcore_id);
539     else
540       return -1;
541   else
542 #endif
543     {
544       int ret;
545       pthread_t worker;
546       cpu_set_t cpuset;
547       CPU_ZERO (&cpuset);
548       CPU_SET (lcore_id, &cpuset);
549
550       ret = pthread_create (&worker, NULL /* attr */ , fp_arg, (void *) w);
551       if (ret == 0)
552         return pthread_setaffinity_np (worker, sizeof (cpu_set_t), &cpuset);
553       else
554         return ret;
555     }
556 }
557
558 static clib_error_t *
559 start_workers (vlib_main_t * vm)
560 {
561   int i, j;
562   vlib_worker_thread_t *w;
563   vlib_main_t *vm_clone;
564   void *oldheap;
565   vlib_frame_queue_t *fq;
566   vlib_thread_main_t *tm = &vlib_thread_main;
567   vlib_thread_registration_t *tr;
568   vlib_node_runtime_t *rt;
569   u32 n_vlib_mains = tm->n_vlib_mains;
570   u32 worker_thread_index;
571   u8 *main_heap = clib_mem_get_per_cpu_heap ();
572   mheap_t *main_heap_header = mheap_header (main_heap);
573
574   vec_reset_length (vlib_worker_threads);
575
576   /* Set up the main thread */
577   vec_add2_aligned (vlib_worker_threads, w, 1, CLIB_CACHE_LINE_BYTES);
578   w->elog_track.name = "main thread";
579   elog_track_register (&vm->elog_main, &w->elog_track);
580
581   if (vec_len (tm->thread_prefix))
582     {
583       w->name = format (0, "%v_main%c", tm->thread_prefix, '\0');
584       vlib_set_thread_name ((char *) w->name);
585     }
586
587 #if DPDK==1
588   w->dpdk_lcore_id = -1;
589   if (rte_socket_id)            /* do we really have dpdk linked */
590     {
591       unsigned lcore = rte_lcore_id ();
592       w->dpdk_lcore_id = lcore < RTE_MAX_LCORE ? lcore : -1;;
593     }
594 #endif
595
596   /*
597    * Truth of the matter: we always use at least two
598    * threads. So, make the main heap thread-safe
599    * and make the event log thread-safe.
600    */
601   main_heap_header->flags |= MHEAP_FLAG_THREAD_SAFE;
602   vm->elog_main.lock =
603     clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES, CLIB_CACHE_LINE_BYTES);
604   vm->elog_main.lock[0] = 0;
605
606   if (n_vlib_mains > 1)
607     {
608       vec_validate (vlib_mains, tm->n_vlib_mains - 1);
609       _vec_len (vlib_mains) = 0;
610       vec_add1 (vlib_mains, vm);
611
612       vec_validate (vlib_frame_queues, tm->n_vlib_mains - 1);
613       _vec_len (vlib_frame_queues) = 0;
614       fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS);
615       vec_add1 (vlib_frame_queues, fq);
616
617       vlib_worker_threads->wait_at_barrier =
618         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
619       vlib_worker_threads->workers_at_barrier =
620         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
621
622       /* Ask for an initial barrier sync */
623       *vlib_worker_threads->workers_at_barrier = 0;
624       *vlib_worker_threads->wait_at_barrier = 1;
625
626       worker_thread_index = 1;
627
628       for (i = 0; i < vec_len (tm->registrations); i++)
629         {
630           vlib_node_main_t *nm, *nm_clone;
631           vlib_buffer_main_t *bm_clone;
632           vlib_buffer_free_list_t *fl_clone, *fl_orig;
633           vlib_buffer_free_list_t *orig_freelist_pool;
634           int k;
635
636           tr = tm->registrations[i];
637
638           if (tr->count == 0)
639             continue;
640
641           for (k = 0; k < tr->count; k++)
642             {
643               vec_add2 (vlib_worker_threads, w, 1);
644               if (tr->mheap_size)
645                 w->thread_mheap =
646                   mheap_alloc (0 /* use VM */ , tr->mheap_size);
647               else
648                 w->thread_mheap = main_heap;
649               w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
650               w->thread_function = tr->function;
651               w->thread_function_arg = w;
652               w->instance_id = k;
653               w->registration = tr;
654
655               w->elog_track.name =
656                 (char *) format (0, "%s %d", tr->name, k + 1);
657               vec_add1 (w->elog_track.name, 0);
658               elog_track_register (&vm->elog_main, &w->elog_track);
659
660               if (tr->no_data_structure_clone)
661                 continue;
662
663               /* Allocate "to-worker-N" frame queue */
664               if (tr->frame_queue_nelts)
665                 {
666                   fq = vlib_frame_queue_alloc (tr->frame_queue_nelts);
667                 }
668               else
669                 {
670                   fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS);
671                 }
672
673               vec_validate (vlib_frame_queues, worker_thread_index);
674               vlib_frame_queues[worker_thread_index] = fq;
675
676               /* Fork vlib_global_main et al. Look for bugs here */
677               oldheap = clib_mem_set_heap (w->thread_mheap);
678
679               vm_clone = clib_mem_alloc (sizeof (*vm_clone));
680               clib_memcpy (vm_clone, vlib_mains[0], sizeof (*vm_clone));
681
682               vm_clone->cpu_index = worker_thread_index;
683               vm_clone->heap_base = w->thread_mheap;
684               vm_clone->mbuf_alloc_list = 0;
685               memset (&vm_clone->random_buffer, 0,
686                       sizeof (vm_clone->random_buffer));
687
688               nm = &vlib_mains[0]->node_main;
689               nm_clone = &vm_clone->node_main;
690               /* fork next frames array, preserving node runtime indices */
691               nm_clone->next_frames = vec_dup (nm->next_frames);
692               for (j = 0; j < vec_len (nm_clone->next_frames); j++)
693                 {
694                   vlib_next_frame_t *nf = &nm_clone->next_frames[j];
695                   u32 save_node_runtime_index;
696                   u32 save_flags;
697
698                   save_node_runtime_index = nf->node_runtime_index;
699                   save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
700                   vlib_next_frame_init (nf);
701                   nf->node_runtime_index = save_node_runtime_index;
702                   nf->flags = save_flags;
703                 }
704
705               /* fork the frame dispatch queue */
706               nm_clone->pending_frames = 0;
707               vec_validate (nm_clone->pending_frames, 10);      /* $$$$$?????? */
708               _vec_len (nm_clone->pending_frames) = 0;
709
710               /* fork nodes */
711               nm_clone->nodes = 0;
712               for (j = 0; j < vec_len (nm->nodes); j++)
713                 {
714                   vlib_node_t *n;
715                   n = clib_mem_alloc_no_fail (sizeof (*n));
716                   clib_memcpy (n, nm->nodes[j], sizeof (*n));
717                   /* none of the copied nodes have enqueue rights given out */
718                   n->owner_node_index = VLIB_INVALID_NODE_INDEX;
719                   memset (&n->stats_total, 0, sizeof (n->stats_total));
720                   memset (&n->stats_last_clear, 0,
721                           sizeof (n->stats_last_clear));
722                   vec_add1 (nm_clone->nodes, n);
723                 }
724               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
725                 vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
726
727               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
728                 vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
729               vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
730                 rt->cpu_index = vm_clone->cpu_index;
731
732               nm_clone->processes = vec_dup (nm->processes);
733
734               /* zap the (per worker) frame freelists, etc */
735               nm_clone->frame_sizes = 0;
736               nm_clone->frame_size_hash = 0;
737
738               /* Packet trace buffers are guaranteed to be empty, nothing to do here */
739
740               clib_mem_set_heap (oldheap);
741               vec_add1 (vlib_mains, vm_clone);
742
743               vm_clone->error_main.counters =
744                 vec_dup (vlib_mains[0]->error_main.counters);
745               vm_clone->error_main.counters_last_clear =
746                 vec_dup (vlib_mains[0]->error_main.counters_last_clear);
747
748               /* Fork the vlib_buffer_main_t free lists, etc. */
749               bm_clone = vec_dup (vm_clone->buffer_main);
750               vm_clone->buffer_main = bm_clone;
751
752               orig_freelist_pool = bm_clone->buffer_free_list_pool;
753               bm_clone->buffer_free_list_pool = 0;
754
755             /* *INDENT-OFF* */
756             pool_foreach (fl_orig, orig_freelist_pool,
757                           ({
758                             pool_get_aligned (bm_clone->buffer_free_list_pool,
759                                               fl_clone, CLIB_CACHE_LINE_BYTES);
760                             ASSERT (fl_orig - orig_freelist_pool
761                                     == fl_clone - bm_clone->buffer_free_list_pool);
762
763                             fl_clone[0] = fl_orig[0];
764                             fl_clone->aligned_buffers = 0;
765                             fl_clone->unaligned_buffers = 0;
766                             fl_clone->n_alloc = 0;
767                           }));
768 /* *INDENT-ON* */
769
770               worker_thread_index++;
771             }
772         }
773     }
774   else
775     {
776       /* only have non-data-structure copy threads to create... */
777       for (i = 0; i < vec_len (tm->registrations); i++)
778         {
779           tr = tm->registrations[i];
780
781           for (j = 0; j < tr->count; j++)
782             {
783               vec_add2 (vlib_worker_threads, w, 1);
784               if (tr->mheap_size)
785                 w->thread_mheap =
786                   mheap_alloc (0 /* use VM */ , tr->mheap_size);
787               else
788                 w->thread_mheap = main_heap;
789               w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
790               w->thread_function = tr->function;
791               w->thread_function_arg = w;
792               w->instance_id = j;
793               w->elog_track.name =
794                 (char *) format (0, "%s %d", tr->name, j + 1);
795               w->registration = tr;
796               vec_add1 (w->elog_track.name, 0);
797               elog_track_register (&vm->elog_main, &w->elog_track);
798             }
799         }
800     }
801
802   worker_thread_index = 1;
803
804   for (i = 0; i < vec_len (tm->registrations); i++)
805     {
806       int j;
807
808       tr = tm->registrations[i];
809
810       if (tr->use_pthreads || tm->use_pthreads)
811         {
812           for (j = 0; j < tr->count; j++)
813             {
814               w = vlib_worker_threads + worker_thread_index++;
815               if (vlib_launch_thread (vlib_worker_thread_bootstrap_fn, w, 0) <
816                   0)
817                 clib_warning ("Couldn't start '%s' pthread ", tr->name);
818             }
819         }
820       else
821         {
822           uword c;
823             /* *INDENT-OFF* */
824             clib_bitmap_foreach (c, tr->coremask, ({
825               w = vlib_worker_threads + worker_thread_index++;
826               if (vlib_launch_thread (vlib_worker_thread_bootstrap_fn, w, c) < 0)
827                 clib_warning ("Couldn't start DPDK lcore %d", c);
828
829             }));
830 /* *INDENT-ON* */
831         }
832     }
833   vlib_worker_thread_barrier_sync (vm);
834   vlib_worker_thread_barrier_release (vm);
835   return 0;
836 }
837
838 VLIB_MAIN_LOOP_ENTER_FUNCTION (start_workers);
839
840 void
841 vlib_worker_thread_node_runtime_update (void)
842 {
843   int i, j;
844   vlib_worker_thread_t *w;
845   vlib_main_t *vm;
846   vlib_node_main_t *nm, *nm_clone;
847   vlib_node_t **old_nodes_clone;
848   vlib_main_t *vm_clone;
849   vlib_node_runtime_t *rt, *old_rt;
850   void *oldheap;
851   never_inline void
852     vlib_node_runtime_sync_stats (vlib_main_t * vm,
853                                   vlib_node_runtime_t * r,
854                                   uword n_calls,
855                                   uword n_vectors, uword n_clocks);
856
857   ASSERT (os_get_cpu_number () == 0);
858
859   if (vec_len (vlib_mains) == 0)
860     return;
861
862   vm = vlib_mains[0];
863   nm = &vm->node_main;
864
865   ASSERT (os_get_cpu_number () == 0);
866   ASSERT (*vlib_worker_threads->wait_at_barrier == 1);
867
868   /*
869    * Scrape all runtime stats, so we don't lose node runtime(s) with
870    * pending counts, or throw away worker / io thread counts.
871    */
872   for (j = 0; j < vec_len (nm->nodes); j++)
873     {
874       vlib_node_t *n;
875       n = nm->nodes[j];
876       vlib_node_sync_stats (vm, n);
877     }
878
879   for (i = 1; i < vec_len (vlib_mains); i++)
880     {
881       vlib_node_t *n;
882
883       vm_clone = vlib_mains[i];
884       nm_clone = &vm_clone->node_main;
885
886       for (j = 0; j < vec_len (nm_clone->nodes); j++)
887         {
888           n = nm_clone->nodes[j];
889
890           rt = vlib_node_get_runtime (vm_clone, n->index);
891           vlib_node_runtime_sync_stats (vm_clone, rt, 0, 0, 0);
892         }
893     }
894
895   for (i = 1; i < vec_len (vlib_mains); i++)
896     {
897       vlib_node_runtime_t *rt;
898       w = vlib_worker_threads + i;
899       oldheap = clib_mem_set_heap (w->thread_mheap);
900
901       vm_clone = vlib_mains[i];
902
903       /* Re-clone error heap */
904       u64 *old_counters = vm_clone->error_main.counters;
905       u64 *old_counters_all_clear = vm_clone->error_main.counters_last_clear;
906       clib_memcpy (&vm_clone->error_main, &vm->error_main,
907                    sizeof (vm->error_main));
908       j = vec_len (vm->error_main.counters) - 1;
909       vec_validate_aligned (old_counters, j, CLIB_CACHE_LINE_BYTES);
910       vec_validate_aligned (old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES);
911       vm_clone->error_main.counters = old_counters;
912       vm_clone->error_main.counters_last_clear = old_counters_all_clear;
913
914       nm_clone = &vm_clone->node_main;
915       vec_free (nm_clone->next_frames);
916       nm_clone->next_frames = vec_dup (nm->next_frames);
917
918       for (j = 0; j < vec_len (nm_clone->next_frames); j++)
919         {
920           vlib_next_frame_t *nf = &nm_clone->next_frames[j];
921           u32 save_node_runtime_index;
922           u32 save_flags;
923
924           save_node_runtime_index = nf->node_runtime_index;
925           save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
926           vlib_next_frame_init (nf);
927           nf->node_runtime_index = save_node_runtime_index;
928           nf->flags = save_flags;
929         }
930
931       old_nodes_clone = nm_clone->nodes;
932       nm_clone->nodes = 0;
933
934       /* re-fork nodes */
935       for (j = 0; j < vec_len (nm->nodes); j++)
936         {
937           vlib_node_t *old_n_clone;
938           vlib_node_t *new_n, *new_n_clone;
939
940           new_n = nm->nodes[j];
941           old_n_clone = old_nodes_clone[j];
942
943           new_n_clone = clib_mem_alloc_no_fail (sizeof (*new_n_clone));
944           clib_memcpy (new_n_clone, new_n, sizeof (*new_n));
945           /* none of the copied nodes have enqueue rights given out */
946           new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX;
947
948           if (j >= vec_len (old_nodes_clone))
949             {
950               /* new node, set to zero */
951               memset (&new_n_clone->stats_total, 0,
952                       sizeof (new_n_clone->stats_total));
953               memset (&new_n_clone->stats_last_clear, 0,
954                       sizeof (new_n_clone->stats_last_clear));
955             }
956           else
957             {
958               /* Copy stats if the old data is valid */
959               clib_memcpy (&new_n_clone->stats_total,
960                            &old_n_clone->stats_total,
961                            sizeof (new_n_clone->stats_total));
962               clib_memcpy (&new_n_clone->stats_last_clear,
963                            &old_n_clone->stats_last_clear,
964                            sizeof (new_n_clone->stats_last_clear));
965
966               /* keep previous node state */
967               new_n_clone->state = old_n_clone->state;
968             }
969           vec_add1 (nm_clone->nodes, new_n_clone);
970         }
971       /* Free the old node clone */
972       for (j = 0; j < vec_len (old_nodes_clone); j++)
973         clib_mem_free (old_nodes_clone[j]);
974       vec_free (old_nodes_clone);
975
976       vec_free (nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
977
978       nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
979         vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
980
981       /* clone input node runtime */
982       old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT];
983
984       nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
985         vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
986
987       vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
988       {
989         rt->cpu_index = vm_clone->cpu_index;
990       }
991
992       for (j = 0; j < vec_len (old_rt); j++)
993         {
994           rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
995           rt->state = old_rt[j].state;
996         }
997
998       vec_free (old_rt);
999
1000       nm_clone->processes = vec_dup (nm->processes);
1001
1002       clib_mem_set_heap (oldheap);
1003
1004       // vnet_main_fork_fixup (i);
1005     }
1006 }
1007
1008 u32
1009 unformat_sched_policy (unformat_input_t * input, va_list * args)
1010 {
1011   u32 *r = va_arg (*args, u32 *);
1012
1013   if (0);
1014 #define _(v,f,s) else if (unformat (input, s)) *r = SCHED_POLICY_##f;
1015   foreach_sched_policy
1016 #undef _
1017     else
1018     return 0;
1019   return 1;
1020 }
1021
1022 static clib_error_t *
1023 cpu_config (vlib_main_t * vm, unformat_input_t * input)
1024 {
1025   vlib_thread_registration_t *tr;
1026   uword *p;
1027   vlib_thread_main_t *tm = &vlib_thread_main;
1028   u8 *name;
1029   u64 coremask;
1030   uword *bitmap;
1031   u32 count;
1032
1033   tm->thread_registrations_by_name = hash_create_string (0, sizeof (uword));
1034
1035   tm->n_thread_stacks = 1;      /* account for main thread */
1036   tm->sched_policy = ~0;
1037   tm->sched_priority = ~0;
1038
1039   tr = tm->next;
1040
1041   while (tr)
1042     {
1043       hash_set_mem (tm->thread_registrations_by_name, tr->name, (uword) tr);
1044       tr = tr->next;
1045     }
1046
1047   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1048     {
1049       if (unformat (input, "use-pthreads"))
1050         tm->use_pthreads = 1;
1051       else if (unformat (input, "thread-prefix %v", &tm->thread_prefix))
1052         ;
1053       else if (unformat (input, "main-core %u", &tm->main_lcore))
1054         ;
1055       else if (unformat (input, "skip-cores %u", &tm->skip_cores))
1056         ;
1057       else if (unformat (input, "coremask-%s %llx", &name, &coremask))
1058         {
1059           p = hash_get_mem (tm->thread_registrations_by_name, name);
1060           if (p == 0)
1061             return clib_error_return (0, "no such thread type '%s'", name);
1062
1063           tr = (vlib_thread_registration_t *) p[0];
1064
1065           if (tr->use_pthreads)
1066             return clib_error_return (0,
1067                                       "coremask cannot be set for '%s' threads",
1068                                       name);
1069
1070           tr->coremask = clib_bitmap_set_multiple
1071             (tr->coremask, 0, coremask, BITS (coremask));
1072           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1073         }
1074       else if (unformat (input, "corelist-%s %U", &name, unformat_bitmap_list,
1075                          &bitmap))
1076         {
1077           p = hash_get_mem (tm->thread_registrations_by_name, name);
1078           if (p == 0)
1079             return clib_error_return (0, "no such thread type '%s'", name);
1080
1081           tr = (vlib_thread_registration_t *) p[0];
1082
1083           if (tr->use_pthreads)
1084             return clib_error_return (0,
1085                                       "corelist cannot be set for '%s' threads",
1086                                       name);
1087
1088           tr->coremask = bitmap;
1089           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1090         }
1091       else
1092         if (unformat
1093             (input, "scheduler-policy %U", unformat_sched_policy,
1094              &tm->sched_policy))
1095         ;
1096       else if (unformat (input, "scheduler-priority %u", &tm->sched_priority))
1097         ;
1098       else if (unformat (input, "%s %u", &name, &count))
1099         {
1100           p = hash_get_mem (tm->thread_registrations_by_name, name);
1101           if (p == 0)
1102             return clib_error_return (0, "no such thread type 3 '%s'", name);
1103
1104           tr = (vlib_thread_registration_t *) p[0];
1105           if (tr->fixed_count)
1106             return clib_error_return
1107               (0, "number of %s threads not configurable", tr->name);
1108           tr->count = count;
1109         }
1110       else
1111         break;
1112     }
1113
1114   if (tm->sched_priority != ~0)
1115     {
1116       if (tm->sched_policy == SCHED_FIFO || tm->sched_policy == SCHED_RR)
1117         {
1118           u32 prio_max = sched_get_priority_max (tm->sched_policy);
1119           u32 prio_min = sched_get_priority_min (tm->sched_policy);
1120           if (tm->sched_priority > prio_max)
1121             tm->sched_priority = prio_max;
1122           if (tm->sched_priority < prio_min)
1123             tm->sched_priority = prio_min;
1124         }
1125       else
1126         {
1127           return clib_error_return
1128             (0,
1129              "scheduling priority (%d) is not allowed for `normal` scheduling policy",
1130              tm->sched_priority);
1131         }
1132     }
1133   tr = tm->next;
1134
1135   if (!tm->thread_prefix)
1136     tm->thread_prefix = format (0, "vpp");
1137
1138   while (tr)
1139     {
1140       tm->n_thread_stacks += tr->count;
1141       tm->n_pthreads += tr->count * tr->use_pthreads;
1142       tm->n_eal_threads += tr->count * (tr->use_pthreads == 0);
1143       tr = tr->next;
1144     }
1145
1146   return 0;
1147 }
1148
1149 VLIB_EARLY_CONFIG_FUNCTION (cpu_config, "cpu");
1150
1151 #if !defined (__x86_64__) && !defined (__aarch64__) && !defined (__powerpc64__) && !defined(__arm__)
1152 void
1153 __sync_fetch_and_add_8 (void)
1154 {
1155   fformat (stderr, "%s called\n", __FUNCTION__);
1156   abort ();
1157 }
1158
1159 void
1160 __sync_add_and_fetch_8 (void)
1161 {
1162   fformat (stderr, "%s called\n", __FUNCTION__);
1163   abort ();
1164 }
1165 #endif
1166
1167 void vnet_main_fixup (vlib_fork_fixup_t which) __attribute__ ((weak));
1168 void
1169 vnet_main_fixup (vlib_fork_fixup_t which)
1170 {
1171 }
1172
1173 void
1174 vlib_worker_thread_fork_fixup (vlib_fork_fixup_t which)
1175 {
1176   vlib_main_t *vm = vlib_get_main ();
1177
1178   if (vlib_mains == 0)
1179     return;
1180
1181   ASSERT (os_get_cpu_number () == 0);
1182   vlib_worker_thread_barrier_sync (vm);
1183
1184   switch (which)
1185     {
1186     case VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX:
1187       vnet_main_fixup (VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX);
1188       break;
1189
1190     default:
1191       ASSERT (0);
1192     }
1193   vlib_worker_thread_barrier_release (vm);
1194 }
1195
1196 void
1197 vlib_worker_thread_barrier_sync (vlib_main_t * vm)
1198 {
1199   f64 deadline;
1200   u32 count;
1201
1202   if (!vlib_mains)
1203     return;
1204
1205   count = vec_len (vlib_mains) - 1;
1206
1207   /* Tolerate recursive calls */
1208   if (++vlib_worker_threads[0].recursion_level > 1)
1209     return;
1210
1211   vlib_worker_threads[0].barrier_sync_count++;
1212
1213   ASSERT (os_get_cpu_number () == 0);
1214
1215   deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT;
1216
1217   *vlib_worker_threads->wait_at_barrier = 1;
1218   while (*vlib_worker_threads->workers_at_barrier != count)
1219     {
1220       if (vlib_time_now (vm) > deadline)
1221         {
1222           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1223           os_panic ();
1224         }
1225     }
1226 }
1227
1228 void
1229 vlib_worker_thread_barrier_release (vlib_main_t * vm)
1230 {
1231   f64 deadline;
1232
1233   if (!vlib_mains)
1234     return;
1235
1236   if (--vlib_worker_threads[0].recursion_level > 0)
1237     return;
1238
1239   deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT;
1240
1241   *vlib_worker_threads->wait_at_barrier = 0;
1242
1243   while (*vlib_worker_threads->workers_at_barrier > 0)
1244     {
1245       if (vlib_time_now (vm) > deadline)
1246         {
1247           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1248           os_panic ();
1249         }
1250     }
1251 }
1252
1253 /*
1254  * Check the frame queue to see if any frames are available.
1255  * If so, pull the packets off the frames and put them to
1256  * the handoff node.
1257  */
1258 static inline int
1259 vlib_frame_queue_dequeue_internal (vlib_main_t * vm)
1260 {
1261   u32 thread_id = vm->cpu_index;
1262   vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
1263   vlib_frame_queue_elt_t *elt;
1264   u32 *from, *to;
1265   vlib_frame_t *f;
1266   int msg_type;
1267   int processed = 0;
1268   u32 n_left_to_node;
1269   u32 vectors = 0;
1270   vlib_thread_main_t *tm = vlib_get_thread_main ();
1271
1272   ASSERT (fq);
1273   ASSERT (vm == vlib_mains[thread_id]);
1274
1275   if (PREDICT_FALSE (tm->handoff_dispatch_node_index == ~0))
1276     return 0;
1277   /*
1278    * Gather trace data for frame queues
1279    */
1280   if (PREDICT_FALSE (fq->trace))
1281     {
1282       frame_queue_trace_t *fqt;
1283       frame_queue_nelt_counter_t *fqh;
1284       u32 elix;
1285
1286       fqt = &tm->frame_queue_traces[thread_id];
1287
1288       fqt->nelts = fq->nelts;
1289       fqt->head = fq->head;
1290       fqt->head_hint = fq->head_hint;
1291       fqt->tail = fq->tail;
1292       fqt->threshold = fq->vector_threshold;
1293       fqt->n_in_use = fqt->tail - fqt->head;
1294       if (fqt->n_in_use >= fqt->nelts)
1295         {
1296           // if beyond max then use max
1297           fqt->n_in_use = fqt->nelts - 1;
1298         }
1299
1300       /* Record the number of elements in use in the histogram */
1301       fqh = &tm->frame_queue_histogram[thread_id];
1302       fqh->count[fqt->n_in_use]++;
1303
1304       /* Record a snapshot of the elements in use */
1305       for (elix = 0; elix < fqt->nelts; elix++)
1306         {
1307           elt = fq->elts + ((fq->head + 1 + elix) & (fq->nelts - 1));
1308           if (1 || elt->valid)
1309             {
1310               fqt->n_vectors[elix] = elt->n_vectors;
1311             }
1312         }
1313       fqt->written = 1;
1314     }
1315
1316   while (1)
1317     {
1318       if (fq->head == fq->tail)
1319         {
1320           fq->head_hint = fq->head;
1321           return processed;
1322         }
1323
1324       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
1325
1326       if (!elt->valid)
1327         {
1328           fq->head_hint = fq->head;
1329           return processed;
1330         }
1331
1332       from = elt->buffer_index;
1333       msg_type = elt->msg_type;
1334
1335       ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME);
1336       ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE);
1337
1338       f = vlib_get_frame_to_node (vm, tm->handoff_dispatch_node_index);
1339
1340       to = vlib_frame_vector_args (f);
1341
1342       n_left_to_node = elt->n_vectors;
1343
1344       while (n_left_to_node >= 4)
1345         {
1346           to[0] = from[0];
1347           to[1] = from[1];
1348           to[2] = from[2];
1349           to[3] = from[3];
1350           to += 4;
1351           from += 4;
1352           n_left_to_node -= 4;
1353         }
1354
1355       while (n_left_to_node > 0)
1356         {
1357           to[0] = from[0];
1358           to++;
1359           from++;
1360           n_left_to_node--;
1361         }
1362
1363       vectors += elt->n_vectors;
1364       f->n_vectors = elt->n_vectors;
1365       vlib_put_frame_to_node (vm, tm->handoff_dispatch_node_index, f);
1366
1367       elt->valid = 0;
1368       elt->n_vectors = 0;
1369       elt->msg_type = 0xfefefefe;
1370       CLIB_MEMORY_BARRIER ();
1371       fq->head++;
1372       processed++;
1373
1374       /*
1375        * Limit the number of packets pushed into the graph
1376        */
1377       if (vectors >= fq->vector_threshold)
1378         {
1379           fq->head_hint = fq->head;
1380           return processed;
1381         }
1382     }
1383   ASSERT (0);
1384   return processed;
1385 }
1386
1387 static_always_inline void
1388 vlib_worker_thread_internal (vlib_main_t * vm)
1389 {
1390   vlib_node_main_t *nm = &vm->node_main;
1391   u64 cpu_time_now = clib_cpu_time_now ();
1392
1393   vec_alloc (nm->pending_interrupt_node_runtime_indices, 32);
1394
1395   while (1)
1396     {
1397       vlib_worker_thread_barrier_check ();
1398
1399       vlib_frame_queue_dequeue_internal (vm);
1400
1401       vlib_node_runtime_t *n;
1402       vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT])
1403       {
1404         cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
1405                                       VLIB_NODE_STATE_POLLING, /* frame */ 0,
1406                                       cpu_time_now);
1407       }
1408
1409       /* Next handle interrupts. */
1410       {
1411         uword l = _vec_len (nm->pending_interrupt_node_runtime_indices);
1412         uword i;
1413         if (l > 0)
1414           {
1415             _vec_len (nm->pending_interrupt_node_runtime_indices) = 0;
1416             for (i = 0; i < l; i++)
1417               {
1418                 n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
1419                                       nm->
1420                                       pending_interrupt_node_runtime_indices
1421                                       [i]);
1422                 cpu_time_now =
1423                   dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
1424                                  VLIB_NODE_STATE_INTERRUPT,
1425                                  /* frame */ 0,
1426                                  cpu_time_now);
1427               }
1428           }
1429       }
1430
1431       if (_vec_len (nm->pending_frames))
1432         {
1433           int i;
1434           cpu_time_now = clib_cpu_time_now ();
1435           for (i = 0; i < _vec_len (nm->pending_frames); i++)
1436             {
1437               vlib_pending_frame_t *p;
1438
1439               p = nm->pending_frames + i;
1440
1441               cpu_time_now = dispatch_pending_node (vm, p, cpu_time_now);
1442             }
1443           _vec_len (nm->pending_frames) = 0;
1444         }
1445       vlib_increment_main_loop_counter (vm);
1446
1447       /* Record time stamp in case there are no enabled nodes and above
1448          calls do not update time stamp. */
1449       cpu_time_now = clib_cpu_time_now ();
1450     }
1451 }
1452
1453 void
1454 vlib_worker_thread_fn (void *arg)
1455 {
1456   vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg;
1457   vlib_thread_main_t *tm = vlib_get_thread_main ();
1458   vlib_main_t *vm = vlib_get_main ();
1459
1460   ASSERT (vm->cpu_index == os_get_cpu_number ());
1461
1462   vlib_worker_thread_init (w);
1463   clib_time_init (&vm->clib_time);
1464   clib_mem_set_heap (w->thread_mheap);
1465
1466   /* Wait until the dpdk init sequence is complete */
1467   while (tm->worker_thread_release == 0)
1468     vlib_worker_thread_barrier_check ();
1469
1470   vlib_worker_thread_internal (vm);
1471 }
1472
1473 /* *INDENT-OFF* */
1474 VLIB_REGISTER_THREAD (worker_thread_reg, static) = {
1475   .name = "workers",
1476   .short_name = "wk",
1477   .function = vlib_worker_thread_fn,
1478 };
1479 /* *INDENT-ON* */
1480
1481 clib_error_t *
1482 threads_init (vlib_main_t * vm)
1483 {
1484   vlib_thread_main_t *tm = vlib_get_thread_main ();
1485
1486   tm->handoff_dispatch_node_index = ~0;
1487
1488   return 0;
1489 }
1490
1491 VLIB_INIT_FUNCTION (threads_init);
1492
1493 /*
1494  * fd.io coding-style-patch-verification: ON
1495  *
1496  * Local Variables:
1497  * eval: (c-set-style "gnu")
1498  * End:
1499  */