Refork worker thread data structures in parallel (VPP-970)
[vpp.git] / src / vlib / threads.c
1 /*
2  * Copyright (c) 2015 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define _GNU_SOURCE
16
17 #include <signal.h>
18 #include <math.h>
19 #include <vppinfra/format.h>
20 #include <vlib/vlib.h>
21
22 #include <vlib/threads.h>
23 #include <vlib/unix/cj.h>
24
25 DECLARE_CJ_GLOBAL_LOG;
26
27 #define FRAME_QUEUE_NELTS 32
28
29 u32
30 vl (void *p)
31 {
32   return vec_len (p);
33 }
34
35 vlib_worker_thread_t *vlib_worker_threads;
36 vlib_thread_main_t vlib_thread_main;
37
38 uword
39 os_get_nthreads (void)
40 {
41   u32 len;
42
43   len = vec_len (vlib_thread_stacks);
44   if (len == 0)
45     return 1;
46   else
47     return len;
48 }
49
50 void
51 vlib_set_thread_name (char *name)
52 {
53   int pthread_setname_np (pthread_t __target_thread, const char *__name);
54   int rv;
55   pthread_t thread = pthread_self ();
56
57   if (thread)
58     {
59       rv = pthread_setname_np (thread, name);
60       if (rv)
61         clib_warning ("pthread_setname_np returned %d", rv);
62     }
63 }
64
65 static int
66 sort_registrations_by_no_clone (void *a0, void *a1)
67 {
68   vlib_thread_registration_t **tr0 = a0;
69   vlib_thread_registration_t **tr1 = a1;
70
71   return ((i32) ((*tr0)->no_data_structure_clone)
72           - ((i32) ((*tr1)->no_data_structure_clone)));
73 }
74
75 static uword *
76 vlib_sysfs_list_to_bitmap (char *filename)
77 {
78   FILE *fp;
79   uword *r = 0;
80
81   fp = fopen (filename, "r");
82
83   if (fp != NULL)
84     {
85       u8 *buffer = 0;
86       vec_validate (buffer, 256 - 1);
87       if (fgets ((char *) buffer, 256, fp))
88         {
89           unformat_input_t in;
90           unformat_init_string (&in, (char *) buffer,
91                                 strlen ((char *) buffer));
92           if (unformat (&in, "%U", unformat_bitmap_list, &r) != 1)
93             clib_warning ("unformat_bitmap_list failed");
94           unformat_free (&in);
95         }
96       vec_free (buffer);
97       fclose (fp);
98     }
99   return r;
100 }
101
102
103 /* Called early in the init sequence */
104
105 clib_error_t *
106 vlib_thread_init (vlib_main_t * vm)
107 {
108   vlib_thread_main_t *tm = &vlib_thread_main;
109   vlib_worker_thread_t *w;
110   vlib_thread_registration_t *tr;
111   u32 n_vlib_mains = 1;
112   u32 first_index = 1;
113   u32 i;
114   uword *avail_cpu;
115
116   /* get bitmaps of active cpu cores and sockets */
117   tm->cpu_core_bitmap =
118     vlib_sysfs_list_to_bitmap ("/sys/devices/system/cpu/online");
119   tm->cpu_socket_bitmap =
120     vlib_sysfs_list_to_bitmap ("/sys/devices/system/node/online");
121
122   avail_cpu = clib_bitmap_dup (tm->cpu_core_bitmap);
123
124   /* skip cores */
125   for (i = 0; i < tm->skip_cores; i++)
126     {
127       uword c = clib_bitmap_first_set (avail_cpu);
128       if (c == ~0)
129         return clib_error_return (0, "no available cpus to skip");
130
131       avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
132     }
133
134   /* grab cpu for main thread */
135   if (!tm->main_lcore)
136     {
137       tm->main_lcore = clib_bitmap_first_set (avail_cpu);
138       if (tm->main_lcore == (u8) ~ 0)
139         return clib_error_return (0, "no available cpus to be used for the"
140                                   " main thread");
141     }
142   else
143     {
144       if (clib_bitmap_get (avail_cpu, tm->main_lcore) == 0)
145         return clib_error_return (0, "cpu %u is not available to be used"
146                                   " for the main thread", tm->main_lcore);
147     }
148   avail_cpu = clib_bitmap_set (avail_cpu, tm->main_lcore, 0);
149
150   /* assume that there is socket 0 only if there is no data from sysfs */
151   if (!tm->cpu_socket_bitmap)
152     tm->cpu_socket_bitmap = clib_bitmap_set (0, 0, 1);
153
154   /* pin main thread to main_lcore  */
155   if (tm->cb.vlib_thread_set_lcore_cb)
156     {
157       tm->cb.vlib_thread_set_lcore_cb (0, tm->main_lcore);
158     }
159   else
160     {
161       cpu_set_t cpuset;
162       CPU_ZERO (&cpuset);
163       CPU_SET (tm->main_lcore, &cpuset);
164       pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
165     }
166
167   /* as many threads as stacks... */
168   vec_validate_aligned (vlib_worker_threads, vec_len (vlib_thread_stacks) - 1,
169                         CLIB_CACHE_LINE_BYTES);
170
171   /* Preallocate thread 0 */
172   _vec_len (vlib_worker_threads) = 1;
173   w = vlib_worker_threads;
174   w->thread_mheap = clib_mem_get_heap ();
175   w->thread_stack = vlib_thread_stacks[0];
176   w->lcore_id = tm->main_lcore;
177   w->lwp = syscall (SYS_gettid);
178   w->thread_id = pthread_self ();
179   tm->n_vlib_mains = 1;
180
181   if (tm->sched_policy != ~0)
182     {
183       struct sched_param sched_param;
184       if (!sched_getparam (w->lwp, &sched_param))
185         {
186           if (tm->sched_priority != ~0)
187             sched_param.sched_priority = tm->sched_priority;
188           sched_setscheduler (w->lwp, tm->sched_policy, &sched_param);
189         }
190     }
191
192   /* assign threads to cores and set n_vlib_mains */
193   tr = tm->next;
194
195   while (tr)
196     {
197       vec_add1 (tm->registrations, tr);
198       tr = tr->next;
199     }
200
201   vec_sort_with_function (tm->registrations, sort_registrations_by_no_clone);
202
203   for (i = 0; i < vec_len (tm->registrations); i++)
204     {
205       int j;
206       tr = tm->registrations[i];
207       tr->first_index = first_index;
208       first_index += tr->count;
209       n_vlib_mains += (tr->no_data_structure_clone == 0) ? tr->count : 0;
210
211       /* construct coremask */
212       if (tr->use_pthreads || !tr->count)
213         continue;
214
215       if (tr->coremask)
216         {
217           uword c;
218           /* *INDENT-OFF* */
219           clib_bitmap_foreach (c, tr->coremask, ({
220             if (clib_bitmap_get(avail_cpu, c) == 0)
221               return clib_error_return (0, "cpu %u is not available to be used"
222                                         " for the '%s' thread",c, tr->name);
223
224             avail_cpu = clib_bitmap_set(avail_cpu, c, 0);
225           }));
226 /* *INDENT-ON* */
227
228         }
229       else
230         {
231           for (j = 0; j < tr->count; j++)
232             {
233               uword c = clib_bitmap_first_set (avail_cpu);
234               if (c == ~0)
235                 return clib_error_return (0,
236                                           "no available cpus to be used for"
237                                           " the '%s' thread", tr->name);
238
239               avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
240               tr->coremask = clib_bitmap_set (tr->coremask, c, 1);
241             }
242         }
243     }
244
245   clib_bitmap_free (avail_cpu);
246
247   tm->n_vlib_mains = n_vlib_mains;
248
249   vec_validate_aligned (vlib_worker_threads, first_index - 1,
250                         CLIB_CACHE_LINE_BYTES);
251
252   return 0;
253 }
254
255 vlib_frame_queue_t *
256 vlib_frame_queue_alloc (int nelts)
257 {
258   vlib_frame_queue_t *fq;
259
260   fq = clib_mem_alloc_aligned (sizeof (*fq), CLIB_CACHE_LINE_BYTES);
261   memset (fq, 0, sizeof (*fq));
262   fq->nelts = nelts;
263   fq->vector_threshold = 128;   // packets
264   vec_validate_aligned (fq->elts, nelts - 1, CLIB_CACHE_LINE_BYTES);
265
266   if (1)
267     {
268       if (((uword) & fq->tail) & (CLIB_CACHE_LINE_BYTES - 1))
269         fformat (stderr, "WARNING: fq->tail unaligned\n");
270       if (((uword) & fq->head) & (CLIB_CACHE_LINE_BYTES - 1))
271         fformat (stderr, "WARNING: fq->head unaligned\n");
272       if (((uword) fq->elts) & (CLIB_CACHE_LINE_BYTES - 1))
273         fformat (stderr, "WARNING: fq->elts unaligned\n");
274
275       if (sizeof (fq->elts[0]) % CLIB_CACHE_LINE_BYTES)
276         fformat (stderr, "WARNING: fq->elts[0] size %d\n",
277                  sizeof (fq->elts[0]));
278       if (nelts & (nelts - 1))
279         {
280           fformat (stderr, "FATAL: nelts MUST be a power of 2\n");
281           abort ();
282         }
283     }
284
285   return (fq);
286 }
287
288 void vl_msg_api_handler_no_free (void *) __attribute__ ((weak));
289 void
290 vl_msg_api_handler_no_free (void *v)
291 {
292 }
293
294 /* Turned off, save as reference material... */
295 #if 0
296 static inline int
297 vlib_frame_queue_dequeue_internal (int thread_id,
298                                    vlib_main_t * vm, vlib_node_main_t * nm)
299 {
300   vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
301   vlib_frame_queue_elt_t *elt;
302   vlib_frame_t *f;
303   vlib_pending_frame_t *p;
304   vlib_node_runtime_t *r;
305   u32 node_runtime_index;
306   int msg_type;
307   u64 before;
308   int processed = 0;
309
310   ASSERT (vm == vlib_mains[thread_id]);
311
312   while (1)
313     {
314       if (fq->head == fq->tail)
315         return processed;
316
317       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
318
319       if (!elt->valid)
320         return processed;
321
322       before = clib_cpu_time_now ();
323
324       f = elt->frame;
325       node_runtime_index = elt->node_runtime_index;
326       msg_type = elt->msg_type;
327
328       switch (msg_type)
329         {
330         case VLIB_FRAME_QUEUE_ELT_FREE_BUFFERS:
331           vlib_buffer_free (vm, vlib_frame_vector_args (f), f->n_vectors);
332           /* note fallthrough... */
333         case VLIB_FRAME_QUEUE_ELT_FREE_FRAME:
334           r = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
335                                 node_runtime_index);
336           vlib_frame_free (vm, r, f);
337           break;
338         case VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME:
339           vec_add2 (vm->node_main.pending_frames, p, 1);
340           f->flags |= (VLIB_FRAME_PENDING | VLIB_FRAME_FREE_AFTER_DISPATCH);
341           p->node_runtime_index = elt->node_runtime_index;
342           p->frame_index = vlib_frame_index (vm, f);
343           p->next_frame_index = VLIB_PENDING_FRAME_NO_NEXT_FRAME;
344           fq->dequeue_vectors += (u64) f->n_vectors;
345           break;
346         case VLIB_FRAME_QUEUE_ELT_API_MSG:
347           vl_msg_api_handler_no_free (f);
348           break;
349         default:
350           clib_warning ("bogus frame queue message, type %d", msg_type);
351           break;
352         }
353       elt->valid = 0;
354       fq->dequeues++;
355       fq->dequeue_ticks += clib_cpu_time_now () - before;
356       CLIB_MEMORY_BARRIER ();
357       fq->head++;
358       processed++;
359     }
360   ASSERT (0);
361   return processed;
362 }
363
364 int
365 vlib_frame_queue_dequeue (int thread_id,
366                           vlib_main_t * vm, vlib_node_main_t * nm)
367 {
368   return vlib_frame_queue_dequeue_internal (thread_id, vm, nm);
369 }
370
371 int
372 vlib_frame_queue_enqueue (vlib_main_t * vm, u32 node_runtime_index,
373                           u32 frame_queue_index, vlib_frame_t * frame,
374                           vlib_frame_queue_msg_type_t type)
375 {
376   vlib_frame_queue_t *fq = vlib_frame_queues[frame_queue_index];
377   vlib_frame_queue_elt_t *elt;
378   u32 save_count;
379   u64 new_tail;
380   u64 before = clib_cpu_time_now ();
381
382   ASSERT (fq);
383
384   new_tail = __sync_add_and_fetch (&fq->tail, 1);
385
386   /* Wait until a ring slot is available */
387   while (new_tail >= fq->head + fq->nelts)
388     {
389       f64 b4 = vlib_time_now_ticks (vm, before);
390       vlib_worker_thread_barrier_check (vm, b4);
391       /* Bad idea. Dequeue -> enqueue -> dequeue -> trouble */
392       // vlib_frame_queue_dequeue (vm->thread_index, vm, nm);
393     }
394
395   elt = fq->elts + (new_tail & (fq->nelts - 1));
396
397   /* this would be very bad... */
398   while (elt->valid)
399     {
400     }
401
402   /* Once we enqueue the frame, frame->n_vectors is owned elsewhere... */
403   save_count = frame->n_vectors;
404
405   elt->frame = frame;
406   elt->node_runtime_index = node_runtime_index;
407   elt->msg_type = type;
408   CLIB_MEMORY_BARRIER ();
409   elt->valid = 1;
410
411   return save_count;
412 }
413 #endif /* 0 */
414
415 /* To be called by vlib worker threads upon startup */
416 void
417 vlib_worker_thread_init (vlib_worker_thread_t * w)
418 {
419   vlib_thread_main_t *tm = vlib_get_thread_main ();
420
421   /*
422    * Note: disabling signals in worker threads as follows
423    * prevents the api post-mortem dump scheme from working
424    * {
425    *    sigset_t s;
426    *    sigfillset (&s);
427    *    pthread_sigmask (SIG_SETMASK, &s, 0);
428    *  }
429    */
430
431   clib_mem_set_heap (w->thread_mheap);
432
433   if (vec_len (tm->thread_prefix) && w->registration->short_name)
434     {
435       w->name = format (0, "%v_%s_%d%c", tm->thread_prefix,
436                         w->registration->short_name, w->instance_id, '\0');
437       vlib_set_thread_name ((char *) w->name);
438     }
439
440   if (!w->registration->use_pthreads)
441     {
442
443       /* Initial barrier sync, for both worker and i/o threads */
444       clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, 1);
445
446       while (*vlib_worker_threads->wait_at_barrier)
447         ;
448
449       clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, -1);
450     }
451 }
452
453 void *
454 vlib_worker_thread_bootstrap_fn (void *arg)
455 {
456   void *rv;
457   vlib_worker_thread_t *w = arg;
458
459   w->lwp = syscall (SYS_gettid);
460   w->thread_id = pthread_self ();
461
462   __os_thread_index = w - vlib_worker_threads;
463
464   rv = (void *) clib_calljmp
465     ((uword (*)(uword)) w->thread_function,
466      (uword) arg, w->thread_stack + VLIB_THREAD_STACK_SIZE);
467   /* NOTREACHED, we hope */
468   return rv;
469 }
470
471 static clib_error_t *
472 vlib_launch_thread_int (void *fp, vlib_worker_thread_t * w, unsigned lcore_id)
473 {
474   vlib_thread_main_t *tm = &vlib_thread_main;
475   void *(*fp_arg) (void *) = fp;
476
477   w->lcore_id = lcore_id;
478   if (tm->cb.vlib_launch_thread_cb && !w->registration->use_pthreads)
479     return tm->cb.vlib_launch_thread_cb (fp, (void *) w, lcore_id);
480   else
481     {
482       pthread_t worker;
483       cpu_set_t cpuset;
484       CPU_ZERO (&cpuset);
485       CPU_SET (lcore_id, &cpuset);
486
487       if (pthread_create (&worker, NULL /* attr */ , fp_arg, (void *) w))
488         return clib_error_return_unix (0, "pthread_create");
489
490       if (pthread_setaffinity_np (worker, sizeof (cpu_set_t), &cpuset))
491         return clib_error_return_unix (0, "pthread_setaffinity_np");
492
493       return 0;
494     }
495 }
496
497 static clib_error_t *
498 start_workers (vlib_main_t * vm)
499 {
500   int i, j;
501   vlib_worker_thread_t *w;
502   vlib_main_t *vm_clone;
503   void *oldheap;
504   vlib_thread_main_t *tm = &vlib_thread_main;
505   vlib_thread_registration_t *tr;
506   vlib_node_runtime_t *rt;
507   u32 n_vlib_mains = tm->n_vlib_mains;
508   u32 worker_thread_index;
509   u8 *main_heap = clib_mem_get_per_cpu_heap ();
510   mheap_t *main_heap_header = mheap_header (main_heap);
511
512   vec_reset_length (vlib_worker_threads);
513
514   /* Set up the main thread */
515   vec_add2_aligned (vlib_worker_threads, w, 1, CLIB_CACHE_LINE_BYTES);
516   w->elog_track.name = "main thread";
517   elog_track_register (&vm->elog_main, &w->elog_track);
518
519   if (vec_len (tm->thread_prefix))
520     {
521       w->name = format (0, "%v_main%c", tm->thread_prefix, '\0');
522       vlib_set_thread_name ((char *) w->name);
523     }
524
525   /*
526    * Truth of the matter: we always use at least two
527    * threads. So, make the main heap thread-safe
528    * and make the event log thread-safe.
529    */
530   main_heap_header->flags |= MHEAP_FLAG_THREAD_SAFE;
531   vm->elog_main.lock =
532     clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES, CLIB_CACHE_LINE_BYTES);
533   vm->elog_main.lock[0] = 0;
534
535   if (n_vlib_mains > 1)
536     {
537       /* Replace hand-crafted length-1 vector with a real vector */
538       vlib_mains = 0;
539
540       vec_validate_aligned (vlib_mains, tm->n_vlib_mains - 1,
541                             CLIB_CACHE_LINE_BYTES);
542       _vec_len (vlib_mains) = 0;
543       vec_add1_aligned (vlib_mains, vm, CLIB_CACHE_LINE_BYTES);
544
545       vlib_worker_threads->wait_at_barrier =
546         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
547       vlib_worker_threads->workers_at_barrier =
548         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
549
550       vlib_worker_threads->node_reforks_required =
551         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
552
553       /* Ask for an initial barrier sync */
554       *vlib_worker_threads->workers_at_barrier = 0;
555       *vlib_worker_threads->wait_at_barrier = 1;
556
557       /* Without update or refork */
558       *vlib_worker_threads->node_reforks_required = 0;
559       vm->need_vlib_worker_thread_node_runtime_update = 0;
560
561       worker_thread_index = 1;
562
563       for (i = 0; i < vec_len (tm->registrations); i++)
564         {
565           vlib_node_main_t *nm, *nm_clone;
566           vlib_buffer_main_t *bm_clone;
567           vlib_buffer_free_list_t *fl_clone, *fl_orig;
568           vlib_buffer_free_list_t *orig_freelist_pool;
569           int k;
570
571           tr = tm->registrations[i];
572
573           if (tr->count == 0)
574             continue;
575
576           for (k = 0; k < tr->count; k++)
577             {
578               vlib_node_t *n;
579
580               vec_add2 (vlib_worker_threads, w, 1);
581               if (tr->mheap_size)
582                 w->thread_mheap =
583                   mheap_alloc (0 /* use VM */ , tr->mheap_size);
584               else
585                 w->thread_mheap = main_heap;
586
587               w->thread_stack =
588                 vlib_thread_stack_init (w - vlib_worker_threads);
589               w->thread_function = tr->function;
590               w->thread_function_arg = w;
591               w->instance_id = k;
592               w->registration = tr;
593
594               w->elog_track.name =
595                 (char *) format (0, "%s %d", tr->name, k + 1);
596               vec_add1 (w->elog_track.name, 0);
597               elog_track_register (&vm->elog_main, &w->elog_track);
598
599               if (tr->no_data_structure_clone)
600                 continue;
601
602               /* Fork vlib_global_main et al. Look for bugs here */
603               oldheap = clib_mem_set_heap (w->thread_mheap);
604
605               vm_clone = clib_mem_alloc (sizeof (*vm_clone));
606               clib_memcpy (vm_clone, vlib_mains[0], sizeof (*vm_clone));
607
608               vm_clone->thread_index = worker_thread_index;
609               vm_clone->heap_base = w->thread_mheap;
610               vm_clone->mbuf_alloc_list = 0;
611               vm_clone->init_functions_called =
612                 hash_create (0, /* value bytes */ 0);
613               memset (&vm_clone->random_buffer, 0,
614                       sizeof (vm_clone->random_buffer));
615
616               nm = &vlib_mains[0]->node_main;
617               nm_clone = &vm_clone->node_main;
618               /* fork next frames array, preserving node runtime indices */
619               nm_clone->next_frames = vec_dup (nm->next_frames);
620               for (j = 0; j < vec_len (nm_clone->next_frames); j++)
621                 {
622                   vlib_next_frame_t *nf = &nm_clone->next_frames[j];
623                   u32 save_node_runtime_index;
624                   u32 save_flags;
625
626                   save_node_runtime_index = nf->node_runtime_index;
627                   save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
628                   vlib_next_frame_init (nf);
629                   nf->node_runtime_index = save_node_runtime_index;
630                   nf->flags = save_flags;
631                 }
632
633               /* fork the frame dispatch queue */
634               nm_clone->pending_frames = 0;
635               vec_validate (nm_clone->pending_frames, 10);      /* $$$$$?????? */
636               _vec_len (nm_clone->pending_frames) = 0;
637
638               /* fork nodes */
639               nm_clone->nodes = 0;
640
641               /* Allocate all nodes in single block for speed */
642               n = clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*n));
643
644               for (j = 0; j < vec_len (nm->nodes); j++)
645                 {
646                   clib_memcpy (n, nm->nodes[j], sizeof (*n));
647                   /* none of the copied nodes have enqueue rights given out */
648                   n->owner_node_index = VLIB_INVALID_NODE_INDEX;
649                   memset (&n->stats_total, 0, sizeof (n->stats_total));
650                   memset (&n->stats_last_clear, 0,
651                           sizeof (n->stats_last_clear));
652                   vec_add1 (nm_clone->nodes, n);
653                   n++;
654                 }
655               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
656                 vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
657               vec_foreach (rt,
658                            nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
659               {
660                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
661                 rt->thread_index = vm_clone->thread_index;
662                 /* copy initial runtime_data from node */
663                 if (n->runtime_data && n->runtime_data_bytes > 0)
664                   clib_memcpy (rt->runtime_data, n->runtime_data,
665                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
666                                          n->runtime_data_bytes));
667               }
668
669               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
670                 vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
671               vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
672               {
673                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
674                 rt->thread_index = vm_clone->thread_index;
675                 /* copy initial runtime_data from node */
676                 if (n->runtime_data && n->runtime_data_bytes > 0)
677                   clib_memcpy (rt->runtime_data, n->runtime_data,
678                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
679                                          n->runtime_data_bytes));
680               }
681
682               nm_clone->processes = vec_dup (nm->processes);
683
684               /* zap the (per worker) frame freelists, etc */
685               nm_clone->frame_sizes = 0;
686               nm_clone->frame_size_hash = hash_create (0, sizeof (uword));
687
688               /* Packet trace buffers are guaranteed to be empty, nothing to do here */
689
690               clib_mem_set_heap (oldheap);
691               vec_add1_aligned (vlib_mains, vm_clone, CLIB_CACHE_LINE_BYTES);
692
693               vm_clone->error_main.counters =
694                 vec_dup (vlib_mains[0]->error_main.counters);
695               vm_clone->error_main.counters_last_clear =
696                 vec_dup (vlib_mains[0]->error_main.counters_last_clear);
697
698               /* Fork the vlib_buffer_main_t free lists, etc. */
699               bm_clone = vec_dup (vm_clone->buffer_main);
700               vm_clone->buffer_main = bm_clone;
701
702               orig_freelist_pool = bm_clone->buffer_free_list_pool;
703               bm_clone->buffer_free_list_pool = 0;
704
705             /* *INDENT-OFF* */
706             pool_foreach (fl_orig, orig_freelist_pool,
707                           ({
708                             pool_get_aligned (bm_clone->buffer_free_list_pool,
709                                               fl_clone, CLIB_CACHE_LINE_BYTES);
710                             ASSERT (fl_orig - orig_freelist_pool
711                                     == fl_clone - bm_clone->buffer_free_list_pool);
712
713                             fl_clone[0] = fl_orig[0];
714                             fl_clone->buffers = 0;
715                             fl_clone->n_alloc = 0;
716                           }));
717 /* *INDENT-ON* */
718
719               worker_thread_index++;
720             }
721         }
722     }
723   else
724     {
725       /* only have non-data-structure copy threads to create... */
726       for (i = 0; i < vec_len (tm->registrations); i++)
727         {
728           tr = tm->registrations[i];
729
730           for (j = 0; j < tr->count; j++)
731             {
732               vec_add2 (vlib_worker_threads, w, 1);
733               if (tr->mheap_size)
734                 w->thread_mheap =
735                   mheap_alloc (0 /* use VM */ , tr->mheap_size);
736               else
737                 w->thread_mheap = main_heap;
738               w->thread_stack =
739                 vlib_thread_stack_init (w - vlib_worker_threads);
740               w->thread_function = tr->function;
741               w->thread_function_arg = w;
742               w->instance_id = j;
743               w->elog_track.name =
744                 (char *) format (0, "%s %d", tr->name, j + 1);
745               w->registration = tr;
746               vec_add1 (w->elog_track.name, 0);
747               elog_track_register (&vm->elog_main, &w->elog_track);
748             }
749         }
750     }
751
752   worker_thread_index = 1;
753
754   for (i = 0; i < vec_len (tm->registrations); i++)
755     {
756       clib_error_t *err;
757       int j;
758
759       tr = tm->registrations[i];
760
761       if (tr->use_pthreads || tm->use_pthreads)
762         {
763           for (j = 0; j < tr->count; j++)
764             {
765               w = vlib_worker_threads + worker_thread_index++;
766               err = vlib_launch_thread_int (vlib_worker_thread_bootstrap_fn,
767                                             w, 0);
768               if (err)
769                 clib_error_report (err);
770             }
771         }
772       else
773         {
774           uword c;
775           /* *INDENT-OFF* */
776           clib_bitmap_foreach (c, tr->coremask, ({
777             w = vlib_worker_threads + worker_thread_index++;
778             err = vlib_launch_thread_int (vlib_worker_thread_bootstrap_fn,
779                                           w, c);
780             if (err)
781               clib_error_report (err);
782           }));
783           /* *INDENT-ON* */
784         }
785     }
786   vlib_worker_thread_barrier_sync (vm);
787   vlib_worker_thread_barrier_release (vm);
788   return 0;
789 }
790
791 VLIB_MAIN_LOOP_ENTER_FUNCTION (start_workers);
792
793 static inline void
794 worker_thread_node_runtime_update_internal (void)
795 {
796   int i, j;
797   vlib_main_t *vm;
798   vlib_node_main_t *nm, *nm_clone;
799   vlib_main_t *vm_clone;
800   vlib_node_runtime_t *rt;
801   never_inline void
802     vlib_node_runtime_sync_stats (vlib_main_t * vm,
803                                   vlib_node_runtime_t * r,
804                                   uword n_calls,
805                                   uword n_vectors, uword n_clocks);
806
807   ASSERT (vlib_get_thread_index () == 0);
808
809   vm = vlib_mains[0];
810   nm = &vm->node_main;
811
812   ASSERT (*vlib_worker_threads->wait_at_barrier == 1);
813
814   /*
815    * Scrape all runtime stats, so we don't lose node runtime(s) with
816    * pending counts, or throw away worker / io thread counts.
817    */
818   for (j = 0; j < vec_len (nm->nodes); j++)
819     {
820       vlib_node_t *n;
821       n = nm->nodes[j];
822       vlib_node_sync_stats (vm, n);
823     }
824
825   for (i = 1; i < vec_len (vlib_mains); i++)
826     {
827       vlib_node_t *n;
828
829       vm_clone = vlib_mains[i];
830       nm_clone = &vm_clone->node_main;
831
832       for (j = 0; j < vec_len (nm_clone->nodes); j++)
833         {
834           n = nm_clone->nodes[j];
835
836           rt = vlib_node_get_runtime (vm_clone, n->index);
837           vlib_node_runtime_sync_stats (vm_clone, rt, 0, 0, 0);
838         }
839     }
840
841   /* Per-worker clone rebuilds are now done on each thread */
842 }
843
844
845 void
846 vlib_worker_thread_node_refork (void)
847 {
848   vlib_main_t *vm, *vm_clone;
849   vlib_node_main_t *nm, *nm_clone;
850   vlib_node_t **old_nodes_clone;
851   vlib_node_runtime_t *rt, *old_rt;
852
853   vlib_node_t *new_n_clone;
854
855   int j;
856
857   vm = vlib_mains[0];
858   nm = &vm->node_main;
859   vm_clone = vlib_get_main ();
860   nm_clone = &vm_clone->node_main;
861
862   /* Re-clone error heap */
863   u64 *old_counters = vm_clone->error_main.counters;
864   u64 *old_counters_all_clear = vm_clone->error_main.counters_last_clear;
865
866   clib_memcpy (&vm_clone->error_main, &vm->error_main,
867                sizeof (vm->error_main));
868   j = vec_len (vm->error_main.counters) - 1;
869   vec_validate_aligned (old_counters, j, CLIB_CACHE_LINE_BYTES);
870   vec_validate_aligned (old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES);
871   vm_clone->error_main.counters = old_counters;
872   vm_clone->error_main.counters_last_clear = old_counters_all_clear;
873
874   nm_clone = &vm_clone->node_main;
875   vec_free (nm_clone->next_frames);
876   nm_clone->next_frames = vec_dup (nm->next_frames);
877
878   for (j = 0; j < vec_len (nm_clone->next_frames); j++)
879     {
880       vlib_next_frame_t *nf = &nm_clone->next_frames[j];
881       u32 save_node_runtime_index;
882       u32 save_flags;
883
884       save_node_runtime_index = nf->node_runtime_index;
885       save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
886       vlib_next_frame_init (nf);
887       nf->node_runtime_index = save_node_runtime_index;
888       nf->flags = save_flags;
889     }
890
891   old_nodes_clone = nm_clone->nodes;
892   nm_clone->nodes = 0;
893
894   /* re-fork nodes */
895
896   /* Allocate all nodes in single block for speed */
897   new_n_clone =
898     clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*new_n_clone));
899   for (j = 0; j < vec_len (nm->nodes); j++)
900     {
901       vlib_node_t *old_n_clone;
902       vlib_node_t *new_n;
903
904       new_n = nm->nodes[j];
905       old_n_clone = old_nodes_clone[j];
906
907       clib_memcpy (new_n_clone, new_n, sizeof (*new_n));
908       /* none of the copied nodes have enqueue rights given out */
909       new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX;
910
911       if (j >= vec_len (old_nodes_clone))
912         {
913           /* new node, set to zero */
914           memset (&new_n_clone->stats_total, 0,
915                   sizeof (new_n_clone->stats_total));
916           memset (&new_n_clone->stats_last_clear, 0,
917                   sizeof (new_n_clone->stats_last_clear));
918         }
919       else
920         {
921           /* Copy stats if the old data is valid */
922           clib_memcpy (&new_n_clone->stats_total,
923                        &old_n_clone->stats_total,
924                        sizeof (new_n_clone->stats_total));
925           clib_memcpy (&new_n_clone->stats_last_clear,
926                        &old_n_clone->stats_last_clear,
927                        sizeof (new_n_clone->stats_last_clear));
928
929           /* keep previous node state */
930           new_n_clone->state = old_n_clone->state;
931         }
932       vec_add1 (nm_clone->nodes, new_n_clone);
933       new_n_clone++;
934     }
935   /* Free the old node clones */
936   clib_mem_free (old_nodes_clone[0]);
937
938   vec_free (old_nodes_clone);
939
940
941   /* re-clone internal nodes */
942   old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL];
943   nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
944     vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
945
946   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
947   {
948     vlib_node_t *n = vlib_get_node (vm, rt->node_index);
949     rt->thread_index = vm_clone->thread_index;
950     /* copy runtime_data, will be overwritten later for existing rt */
951     if (n->runtime_data && n->runtime_data_bytes > 0)
952       clib_memcpy (rt->runtime_data, n->runtime_data,
953                    clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
954                              n->runtime_data_bytes));
955   }
956
957   for (j = 0; j < vec_len (old_rt); j++)
958     {
959       rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
960       rt->state = old_rt[j].state;
961       clib_memcpy (rt->runtime_data, old_rt[j].runtime_data,
962                    VLIB_NODE_RUNTIME_DATA_SIZE);
963     }
964
965   vec_free (old_rt);
966
967   /* re-clone input nodes */
968   old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT];
969   nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
970     vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
971
972   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
973   {
974     vlib_node_t *n = vlib_get_node (vm, rt->node_index);
975     rt->thread_index = vm_clone->thread_index;
976     /* copy runtime_data, will be overwritten later for existing rt */
977     if (n->runtime_data && n->runtime_data_bytes > 0)
978       clib_memcpy (rt->runtime_data, n->runtime_data,
979                    clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
980                              n->runtime_data_bytes));
981   }
982
983   for (j = 0; j < vec_len (old_rt); j++)
984     {
985       rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
986       rt->state = old_rt[j].state;
987       clib_memcpy (rt->runtime_data, old_rt[j].runtime_data,
988                    VLIB_NODE_RUNTIME_DATA_SIZE);
989     }
990
991   vec_free (old_rt);
992
993   nm_clone->processes = vec_dup (nm->processes);
994 }
995
996
997 void
998 vlib_worker_thread_node_runtime_update (void)
999 {
1000   /*
1001    * Make a note that we need to do a node runtime update
1002    * prior to releasing the barrier.
1003    */
1004   vlib_global_main.need_vlib_worker_thread_node_runtime_update = 1;
1005 }
1006
1007 u32
1008 unformat_sched_policy (unformat_input_t * input, va_list * args)
1009 {
1010   u32 *r = va_arg (*args, u32 *);
1011
1012   if (0);
1013 #define _(v,f,s) else if (unformat (input, s)) *r = SCHED_POLICY_##f;
1014   foreach_sched_policy
1015 #undef _
1016     else
1017     return 0;
1018   return 1;
1019 }
1020
1021 static clib_error_t *
1022 cpu_config (vlib_main_t * vm, unformat_input_t * input)
1023 {
1024   vlib_thread_registration_t *tr;
1025   uword *p;
1026   vlib_thread_main_t *tm = &vlib_thread_main;
1027   u8 *name;
1028   u64 coremask;
1029   uword *bitmap;
1030   u32 count;
1031
1032   tm->thread_registrations_by_name = hash_create_string (0, sizeof (uword));
1033
1034   tm->n_thread_stacks = 1;      /* account for main thread */
1035   tm->sched_policy = ~0;
1036   tm->sched_priority = ~0;
1037
1038   tr = tm->next;
1039
1040   while (tr)
1041     {
1042       hash_set_mem (tm->thread_registrations_by_name, tr->name, (uword) tr);
1043       tr = tr->next;
1044     }
1045
1046   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1047     {
1048       if (unformat (input, "use-pthreads"))
1049         tm->use_pthreads = 1;
1050       else if (unformat (input, "thread-prefix %v", &tm->thread_prefix))
1051         ;
1052       else if (unformat (input, "main-core %u", &tm->main_lcore))
1053         ;
1054       else if (unformat (input, "skip-cores %u", &tm->skip_cores))
1055         ;
1056       else if (unformat (input, "coremask-%s %llx", &name, &coremask))
1057         {
1058           p = hash_get_mem (tm->thread_registrations_by_name, name);
1059           if (p == 0)
1060             return clib_error_return (0, "no such thread type '%s'", name);
1061
1062           tr = (vlib_thread_registration_t *) p[0];
1063
1064           if (tr->use_pthreads)
1065             return clib_error_return (0,
1066                                       "coremask cannot be set for '%s' threads",
1067                                       name);
1068
1069           tr->coremask = clib_bitmap_set_multiple
1070             (tr->coremask, 0, coremask, BITS (coremask));
1071           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1072         }
1073       else if (unformat (input, "corelist-%s %U", &name, unformat_bitmap_list,
1074                          &bitmap))
1075         {
1076           p = hash_get_mem (tm->thread_registrations_by_name, name);
1077           if (p == 0)
1078             return clib_error_return (0, "no such thread type '%s'", name);
1079
1080           tr = (vlib_thread_registration_t *) p[0];
1081
1082           if (tr->use_pthreads)
1083             return clib_error_return (0,
1084                                       "corelist cannot be set for '%s' threads",
1085                                       name);
1086
1087           tr->coremask = bitmap;
1088           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1089         }
1090       else
1091         if (unformat
1092             (input, "scheduler-policy %U", unformat_sched_policy,
1093              &tm->sched_policy))
1094         ;
1095       else if (unformat (input, "scheduler-priority %u", &tm->sched_priority))
1096         ;
1097       else if (unformat (input, "%s %u", &name, &count))
1098         {
1099           p = hash_get_mem (tm->thread_registrations_by_name, name);
1100           if (p == 0)
1101             return clib_error_return (0, "no such thread type 3 '%s'", name);
1102
1103           tr = (vlib_thread_registration_t *) p[0];
1104           if (tr->fixed_count)
1105             return clib_error_return
1106               (0, "number of %s threads not configurable", tr->name);
1107           tr->count = count;
1108         }
1109       else
1110         break;
1111     }
1112
1113   if (tm->sched_priority != ~0)
1114     {
1115       if (tm->sched_policy == SCHED_FIFO || tm->sched_policy == SCHED_RR)
1116         {
1117           u32 prio_max = sched_get_priority_max (tm->sched_policy);
1118           u32 prio_min = sched_get_priority_min (tm->sched_policy);
1119           if (tm->sched_priority > prio_max)
1120             tm->sched_priority = prio_max;
1121           if (tm->sched_priority < prio_min)
1122             tm->sched_priority = prio_min;
1123         }
1124       else
1125         {
1126           return clib_error_return
1127             (0,
1128              "scheduling priority (%d) is not allowed for `normal` scheduling policy",
1129              tm->sched_priority);
1130         }
1131     }
1132   tr = tm->next;
1133
1134   if (!tm->thread_prefix)
1135     tm->thread_prefix = format (0, "vpp");
1136
1137   while (tr)
1138     {
1139       tm->n_thread_stacks += tr->count;
1140       tm->n_pthreads += tr->count * tr->use_pthreads;
1141       tm->n_threads += tr->count * (tr->use_pthreads == 0);
1142       tr = tr->next;
1143     }
1144
1145   return 0;
1146 }
1147
1148 VLIB_EARLY_CONFIG_FUNCTION (cpu_config, "cpu");
1149
1150 #if !defined (__x86_64__) && !defined (__i386__) && !defined (__aarch64__) && !defined (__powerpc64__) && !defined(__arm__)
1151 void
1152 __sync_fetch_and_add_8 (void)
1153 {
1154   fformat (stderr, "%s called\n", __FUNCTION__);
1155   abort ();
1156 }
1157
1158 void
1159 __sync_add_and_fetch_8 (void)
1160 {
1161   fformat (stderr, "%s called\n", __FUNCTION__);
1162   abort ();
1163 }
1164 #endif
1165
1166 void vnet_main_fixup (vlib_fork_fixup_t which) __attribute__ ((weak));
1167 void
1168 vnet_main_fixup (vlib_fork_fixup_t which)
1169 {
1170 }
1171
1172 void
1173 vlib_worker_thread_fork_fixup (vlib_fork_fixup_t which)
1174 {
1175   vlib_main_t *vm = vlib_get_main ();
1176
1177   if (vlib_mains == 0)
1178     return;
1179
1180   ASSERT (vlib_get_thread_index () == 0);
1181   vlib_worker_thread_barrier_sync (vm);
1182
1183   switch (which)
1184     {
1185     case VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX:
1186       vnet_main_fixup (VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX);
1187       break;
1188
1189     default:
1190       ASSERT (0);
1191     }
1192   vlib_worker_thread_barrier_release (vm);
1193 }
1194
1195 void
1196 vlib_worker_thread_barrier_sync (vlib_main_t * vm)
1197 {
1198   f64 deadline;
1199   u32 count;
1200
1201   if (vec_len (vlib_mains) < 2)
1202     return;
1203
1204   ASSERT (vlib_get_thread_index () == 0);
1205
1206   count = vec_len (vlib_mains) - 1;
1207
1208   /* Tolerate recursive calls */
1209   if (++vlib_worker_threads[0].recursion_level > 1)
1210     return;
1211
1212   vlib_worker_threads[0].barrier_sync_count++;
1213
1214   deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT;
1215
1216   *vlib_worker_threads->wait_at_barrier = 1;
1217   while (*vlib_worker_threads->workers_at_barrier != count)
1218     {
1219       if (vlib_time_now (vm) > deadline)
1220         {
1221           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1222           os_panic ();
1223         }
1224     }
1225 }
1226
1227 void
1228 vlib_worker_thread_barrier_release (vlib_main_t * vm)
1229 {
1230   f64 deadline;
1231   int refork_needed = 0;
1232
1233   if (vec_len (vlib_mains) < 2)
1234     return;
1235
1236   ASSERT (vlib_get_thread_index () == 0);
1237
1238   if (--vlib_worker_threads[0].recursion_level > 0)
1239     return;
1240
1241   /* Update (all) node runtimes before releasing the barrier, if needed */
1242   if (vm->need_vlib_worker_thread_node_runtime_update)
1243     {
1244       /* Do stats elements on main thread */
1245       worker_thread_node_runtime_update_internal ();
1246       vm->need_vlib_worker_thread_node_runtime_update = 0;
1247
1248       /* Do per thread rebuilds in parallel */
1249       refork_needed = 1;
1250       clib_smp_atomic_add (vlib_worker_threads->node_reforks_required,
1251                            (vec_len (vlib_mains) - 1));
1252     }
1253
1254   deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT;
1255
1256   *vlib_worker_threads->wait_at_barrier = 0;
1257
1258   while (*vlib_worker_threads->workers_at_barrier > 0)
1259     {
1260       if (vlib_time_now (vm) > deadline)
1261         {
1262           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1263           os_panic ();
1264         }
1265     }
1266
1267   /* Wait for reforks before continuing */
1268   if (refork_needed)
1269     {
1270       deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT;
1271
1272       while (*vlib_worker_threads->node_reforks_required > 0)
1273         {
1274           if (vlib_time_now (vm) > deadline)
1275             {
1276               fformat (stderr, "%s: worker thread refork deadlock\n",
1277                        __FUNCTION__);
1278               os_panic ();
1279             }
1280         }
1281     }
1282 }
1283
1284 /*
1285  * Check the frame queue to see if any frames are available.
1286  * If so, pull the packets off the frames and put them to
1287  * the handoff node.
1288  */
1289 int
1290 vlib_frame_queue_dequeue (vlib_main_t * vm, vlib_frame_queue_main_t * fqm)
1291 {
1292   u32 thread_id = vm->thread_index;
1293   vlib_frame_queue_t *fq = fqm->vlib_frame_queues[thread_id];
1294   vlib_frame_queue_elt_t *elt;
1295   u32 *from, *to;
1296   vlib_frame_t *f;
1297   int msg_type;
1298   int processed = 0;
1299   u32 n_left_to_node;
1300   u32 vectors = 0;
1301
1302   ASSERT (fq);
1303   ASSERT (vm == vlib_mains[thread_id]);
1304
1305   if (PREDICT_FALSE (fqm->node_index == ~0))
1306     return 0;
1307   /*
1308    * Gather trace data for frame queues
1309    */
1310   if (PREDICT_FALSE (fq->trace))
1311     {
1312       frame_queue_trace_t *fqt;
1313       frame_queue_nelt_counter_t *fqh;
1314       u32 elix;
1315
1316       fqt = &fqm->frame_queue_traces[thread_id];
1317
1318       fqt->nelts = fq->nelts;
1319       fqt->head = fq->head;
1320       fqt->head_hint = fq->head_hint;
1321       fqt->tail = fq->tail;
1322       fqt->threshold = fq->vector_threshold;
1323       fqt->n_in_use = fqt->tail - fqt->head;
1324       if (fqt->n_in_use >= fqt->nelts)
1325         {
1326           // if beyond max then use max
1327           fqt->n_in_use = fqt->nelts - 1;
1328         }
1329
1330       /* Record the number of elements in use in the histogram */
1331       fqh = &fqm->frame_queue_histogram[thread_id];
1332       fqh->count[fqt->n_in_use]++;
1333
1334       /* Record a snapshot of the elements in use */
1335       for (elix = 0; elix < fqt->nelts; elix++)
1336         {
1337           elt = fq->elts + ((fq->head + 1 + elix) & (fq->nelts - 1));
1338           if (1 || elt->valid)
1339             {
1340               fqt->n_vectors[elix] = elt->n_vectors;
1341             }
1342         }
1343       fqt->written = 1;
1344     }
1345
1346   while (1)
1347     {
1348       if (fq->head == fq->tail)
1349         {
1350           fq->head_hint = fq->head;
1351           return processed;
1352         }
1353
1354       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
1355
1356       if (!elt->valid)
1357         {
1358           fq->head_hint = fq->head;
1359           return processed;
1360         }
1361
1362       from = elt->buffer_index;
1363       msg_type = elt->msg_type;
1364
1365       ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME);
1366       ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE);
1367
1368       f = vlib_get_frame_to_node (vm, fqm->node_index);
1369
1370       to = vlib_frame_vector_args (f);
1371
1372       n_left_to_node = elt->n_vectors;
1373
1374       while (n_left_to_node >= 4)
1375         {
1376           to[0] = from[0];
1377           to[1] = from[1];
1378           to[2] = from[2];
1379           to[3] = from[3];
1380           to += 4;
1381           from += 4;
1382           n_left_to_node -= 4;
1383         }
1384
1385       while (n_left_to_node > 0)
1386         {
1387           to[0] = from[0];
1388           to++;
1389           from++;
1390           n_left_to_node--;
1391         }
1392
1393       vectors += elt->n_vectors;
1394       f->n_vectors = elt->n_vectors;
1395       vlib_put_frame_to_node (vm, fqm->node_index, f);
1396
1397       elt->valid = 0;
1398       elt->n_vectors = 0;
1399       elt->msg_type = 0xfefefefe;
1400       CLIB_MEMORY_BARRIER ();
1401       fq->head++;
1402       processed++;
1403
1404       /*
1405        * Limit the number of packets pushed into the graph
1406        */
1407       if (vectors >= fq->vector_threshold)
1408         {
1409           fq->head_hint = fq->head;
1410           return processed;
1411         }
1412     }
1413   ASSERT (0);
1414   return processed;
1415 }
1416
1417 void
1418 vlib_worker_thread_fn (void *arg)
1419 {
1420   vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg;
1421   vlib_thread_main_t *tm = vlib_get_thread_main ();
1422   vlib_main_t *vm = vlib_get_main ();
1423   clib_error_t *e;
1424
1425   ASSERT (vm->thread_index == vlib_get_thread_index ());
1426
1427   vlib_worker_thread_init (w);
1428   clib_time_init (&vm->clib_time);
1429   clib_mem_set_heap (w->thread_mheap);
1430
1431   /* Wait until the dpdk init sequence is complete */
1432   while (tm->extern_thread_mgmt && tm->worker_thread_release == 0)
1433     vlib_worker_thread_barrier_check ();
1434
1435   e = vlib_call_init_exit_functions
1436     (vm, vm->worker_init_function_registrations, 1 /* call_once */ );
1437   if (e)
1438     clib_error_report (e);
1439
1440   vlib_worker_loop (vm);
1441 }
1442
1443 /* *INDENT-OFF* */
1444 VLIB_REGISTER_THREAD (worker_thread_reg, static) = {
1445   .name = "workers",
1446   .short_name = "wk",
1447   .function = vlib_worker_thread_fn,
1448 };
1449 /* *INDENT-ON* */
1450
1451 u32
1452 vlib_frame_queue_main_init (u32 node_index, u32 frame_queue_nelts)
1453 {
1454   vlib_thread_main_t *tm = vlib_get_thread_main ();
1455   vlib_frame_queue_main_t *fqm;
1456   vlib_frame_queue_t *fq;
1457   int i;
1458
1459   if (frame_queue_nelts == 0)
1460     frame_queue_nelts = FRAME_QUEUE_NELTS;
1461
1462   vec_add2 (tm->frame_queue_mains, fqm, 1);
1463
1464   fqm->node_index = node_index;
1465
1466   vec_validate (fqm->vlib_frame_queues, tm->n_vlib_mains - 1);
1467   _vec_len (fqm->vlib_frame_queues) = 0;
1468   for (i = 0; i < tm->n_vlib_mains; i++)
1469     {
1470       fq = vlib_frame_queue_alloc (frame_queue_nelts);
1471       vec_add1 (fqm->vlib_frame_queues, fq);
1472     }
1473
1474   return (fqm - tm->frame_queue_mains);
1475 }
1476
1477
1478 int
1479 vlib_thread_cb_register (struct vlib_main_t *vm, vlib_thread_callbacks_t * cb)
1480 {
1481   vlib_thread_main_t *tm = vlib_get_thread_main ();
1482
1483   if (tm->extern_thread_mgmt)
1484     return -1;
1485
1486   tm->cb.vlib_launch_thread_cb = cb->vlib_launch_thread_cb;
1487   tm->extern_thread_mgmt = 1;
1488   return 0;
1489 }
1490
1491 clib_error_t *
1492 threads_init (vlib_main_t * vm)
1493 {
1494   return 0;
1495 }
1496
1497 VLIB_INIT_FUNCTION (threads_init);
1498
1499 /*
1500  * fd.io coding-style-patch-verification: ON
1501  *
1502  * Local Variables:
1503  * eval: (c-set-style "gnu")
1504  * End:
1505  */