completelly deprecate os_get_cpu_number, replace new occurences
[vpp.git] / src / vlib / threads.c
1 /*
2  * Copyright (c) 2015 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define _GNU_SOURCE
16
17 #include <signal.h>
18 #include <math.h>
19 #include <vppinfra/format.h>
20 #include <vlib/vlib.h>
21
22 #include <vlib/threads.h>
23 #include <vlib/unix/cj.h>
24
25 DECLARE_CJ_GLOBAL_LOG;
26
27 #define FRAME_QUEUE_NELTS 32
28
29 u32
30 vl (void *p)
31 {
32   return vec_len (p);
33 }
34
35 vlib_worker_thread_t *vlib_worker_threads;
36 vlib_thread_main_t vlib_thread_main;
37
38 uword
39 os_get_nthreads (void)
40 {
41   u32 len;
42
43   len = vec_len (vlib_thread_stacks);
44   if (len == 0)
45     return 1;
46   else
47     return len;
48 }
49
50 void
51 vlib_set_thread_name (char *name)
52 {
53   int pthread_setname_np (pthread_t __target_thread, const char *__name);
54   int rv;
55   pthread_t thread = pthread_self ();
56
57   if (thread)
58     {
59       rv = pthread_setname_np (thread, name);
60       if (rv)
61         clib_warning ("pthread_setname_np returned %d", rv);
62     }
63 }
64
65 static int
66 sort_registrations_by_no_clone (void *a0, void *a1)
67 {
68   vlib_thread_registration_t **tr0 = a0;
69   vlib_thread_registration_t **tr1 = a1;
70
71   return ((i32) ((*tr0)->no_data_structure_clone)
72           - ((i32) ((*tr1)->no_data_structure_clone)));
73 }
74
75 static uword *
76 vlib_sysfs_list_to_bitmap (char *filename)
77 {
78   FILE *fp;
79   uword *r = 0;
80
81   fp = fopen (filename, "r");
82
83   if (fp != NULL)
84     {
85       u8 *buffer = 0;
86       vec_validate (buffer, 256 - 1);
87       if (fgets ((char *) buffer, 256, fp))
88         {
89           unformat_input_t in;
90           unformat_init_string (&in, (char *) buffer,
91                                 strlen ((char *) buffer));
92           if (unformat (&in, "%U", unformat_bitmap_list, &r) != 1)
93             clib_warning ("unformat_bitmap_list failed");
94           unformat_free (&in);
95         }
96       vec_free (buffer);
97       fclose (fp);
98     }
99   return r;
100 }
101
102
103 /* Called early in the init sequence */
104
105 clib_error_t *
106 vlib_thread_init (vlib_main_t * vm)
107 {
108   vlib_thread_main_t *tm = &vlib_thread_main;
109   vlib_worker_thread_t *w;
110   vlib_thread_registration_t *tr;
111   u32 n_vlib_mains = 1;
112   u32 first_index = 1;
113   u32 i;
114   uword *avail_cpu;
115
116   /* get bitmaps of active cpu cores and sockets */
117   tm->cpu_core_bitmap =
118     vlib_sysfs_list_to_bitmap ("/sys/devices/system/cpu/online");
119   tm->cpu_socket_bitmap =
120     vlib_sysfs_list_to_bitmap ("/sys/devices/system/node/online");
121
122   avail_cpu = clib_bitmap_dup (tm->cpu_core_bitmap);
123
124   /* skip cores */
125   for (i = 0; i < tm->skip_cores; i++)
126     {
127       uword c = clib_bitmap_first_set (avail_cpu);
128       if (c == ~0)
129         return clib_error_return (0, "no available cpus to skip");
130
131       avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
132     }
133
134   /* grab cpu for main thread */
135   if (!tm->main_lcore)
136     {
137       tm->main_lcore = clib_bitmap_first_set (avail_cpu);
138       if (tm->main_lcore == (u8) ~ 0)
139         return clib_error_return (0, "no available cpus to be used for the"
140                                   " main thread");
141     }
142   else
143     {
144       if (clib_bitmap_get (avail_cpu, tm->main_lcore) == 0)
145         return clib_error_return (0, "cpu %u is not available to be used"
146                                   " for the main thread", tm->main_lcore);
147     }
148   avail_cpu = clib_bitmap_set (avail_cpu, tm->main_lcore, 0);
149
150   /* assume that there is socket 0 only if there is no data from sysfs */
151   if (!tm->cpu_socket_bitmap)
152     tm->cpu_socket_bitmap = clib_bitmap_set (0, 0, 1);
153
154   /* pin main thread to main_lcore  */
155   if (tm->cb.vlib_thread_set_lcore_cb)
156     {
157       tm->cb.vlib_thread_set_lcore_cb (0, tm->main_lcore);
158     }
159   else
160     {
161       cpu_set_t cpuset;
162       CPU_ZERO (&cpuset);
163       CPU_SET (tm->main_lcore, &cpuset);
164       pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
165     }
166
167   /* as many threads as stacks... */
168   vec_validate_aligned (vlib_worker_threads, vec_len (vlib_thread_stacks) - 1,
169                         CLIB_CACHE_LINE_BYTES);
170
171   /* Preallocate thread 0 */
172   _vec_len (vlib_worker_threads) = 1;
173   w = vlib_worker_threads;
174   w->thread_mheap = clib_mem_get_heap ();
175   w->thread_stack = vlib_thread_stacks[0];
176   w->lcore_id = tm->main_lcore;
177   w->lwp = syscall (SYS_gettid);
178   w->thread_id = pthread_self ();
179   tm->n_vlib_mains = 1;
180
181   if (tm->sched_policy != ~0)
182     {
183       struct sched_param sched_param;
184       if (!sched_getparam (w->lwp, &sched_param))
185         {
186           if (tm->sched_priority != ~0)
187             sched_param.sched_priority = tm->sched_priority;
188           sched_setscheduler (w->lwp, tm->sched_policy, &sched_param);
189         }
190     }
191
192   /* assign threads to cores and set n_vlib_mains */
193   tr = tm->next;
194
195   while (tr)
196     {
197       vec_add1 (tm->registrations, tr);
198       tr = tr->next;
199     }
200
201   vec_sort_with_function (tm->registrations, sort_registrations_by_no_clone);
202
203   for (i = 0; i < vec_len (tm->registrations); i++)
204     {
205       int j;
206       tr = tm->registrations[i];
207       tr->first_index = first_index;
208       first_index += tr->count;
209       n_vlib_mains += (tr->no_data_structure_clone == 0) ? tr->count : 0;
210
211       /* construct coremask */
212       if (tr->use_pthreads || !tr->count)
213         continue;
214
215       if (tr->coremask)
216         {
217           uword c;
218           /* *INDENT-OFF* */
219           clib_bitmap_foreach (c, tr->coremask, ({
220             if (clib_bitmap_get(avail_cpu, c) == 0)
221               return clib_error_return (0, "cpu %u is not available to be used"
222                                         " for the '%s' thread",c, tr->name);
223
224             avail_cpu = clib_bitmap_set(avail_cpu, c, 0);
225           }));
226 /* *INDENT-ON* */
227
228         }
229       else
230         {
231           for (j = 0; j < tr->count; j++)
232             {
233               uword c = clib_bitmap_first_set (avail_cpu);
234               if (c == ~0)
235                 return clib_error_return (0,
236                                           "no available cpus to be used for"
237                                           " the '%s' thread", tr->name);
238
239               avail_cpu = clib_bitmap_set (avail_cpu, c, 0);
240               tr->coremask = clib_bitmap_set (tr->coremask, c, 1);
241             }
242         }
243     }
244
245   clib_bitmap_free (avail_cpu);
246
247   tm->n_vlib_mains = n_vlib_mains;
248
249   vec_validate_aligned (vlib_worker_threads, first_index - 1,
250                         CLIB_CACHE_LINE_BYTES);
251
252   return 0;
253 }
254
255 vlib_frame_queue_t *
256 vlib_frame_queue_alloc (int nelts)
257 {
258   vlib_frame_queue_t *fq;
259
260   fq = clib_mem_alloc_aligned (sizeof (*fq), CLIB_CACHE_LINE_BYTES);
261   memset (fq, 0, sizeof (*fq));
262   fq->nelts = nelts;
263   fq->vector_threshold = 128;   // packets
264   vec_validate_aligned (fq->elts, nelts - 1, CLIB_CACHE_LINE_BYTES);
265
266   if (1)
267     {
268       if (((uword) & fq->tail) & (CLIB_CACHE_LINE_BYTES - 1))
269         fformat (stderr, "WARNING: fq->tail unaligned\n");
270       if (((uword) & fq->head) & (CLIB_CACHE_LINE_BYTES - 1))
271         fformat (stderr, "WARNING: fq->head unaligned\n");
272       if (((uword) fq->elts) & (CLIB_CACHE_LINE_BYTES - 1))
273         fformat (stderr, "WARNING: fq->elts unaligned\n");
274
275       if (sizeof (fq->elts[0]) % CLIB_CACHE_LINE_BYTES)
276         fformat (stderr, "WARNING: fq->elts[0] size %d\n",
277                  sizeof (fq->elts[0]));
278       if (nelts & (nelts - 1))
279         {
280           fformat (stderr, "FATAL: nelts MUST be a power of 2\n");
281           abort ();
282         }
283     }
284
285   return (fq);
286 }
287
288 void vl_msg_api_handler_no_free (void *) __attribute__ ((weak));
289 void
290 vl_msg_api_handler_no_free (void *v)
291 {
292 }
293
294 /* Turned off, save as reference material... */
295 #if 0
296 static inline int
297 vlib_frame_queue_dequeue_internal (int thread_id,
298                                    vlib_main_t * vm, vlib_node_main_t * nm)
299 {
300   vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
301   vlib_frame_queue_elt_t *elt;
302   vlib_frame_t *f;
303   vlib_pending_frame_t *p;
304   vlib_node_runtime_t *r;
305   u32 node_runtime_index;
306   int msg_type;
307   u64 before;
308   int processed = 0;
309
310   ASSERT (vm == vlib_mains[thread_id]);
311
312   while (1)
313     {
314       if (fq->head == fq->tail)
315         return processed;
316
317       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
318
319       if (!elt->valid)
320         return processed;
321
322       before = clib_cpu_time_now ();
323
324       f = elt->frame;
325       node_runtime_index = elt->node_runtime_index;
326       msg_type = elt->msg_type;
327
328       switch (msg_type)
329         {
330         case VLIB_FRAME_QUEUE_ELT_FREE_BUFFERS:
331           vlib_buffer_free (vm, vlib_frame_vector_args (f), f->n_vectors);
332           /* note fallthrough... */
333         case VLIB_FRAME_QUEUE_ELT_FREE_FRAME:
334           r = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
335                                 node_runtime_index);
336           vlib_frame_free (vm, r, f);
337           break;
338         case VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME:
339           vec_add2 (vm->node_main.pending_frames, p, 1);
340           f->flags |= (VLIB_FRAME_PENDING | VLIB_FRAME_FREE_AFTER_DISPATCH);
341           p->node_runtime_index = elt->node_runtime_index;
342           p->frame_index = vlib_frame_index (vm, f);
343           p->next_frame_index = VLIB_PENDING_FRAME_NO_NEXT_FRAME;
344           fq->dequeue_vectors += (u64) f->n_vectors;
345           break;
346         case VLIB_FRAME_QUEUE_ELT_API_MSG:
347           vl_msg_api_handler_no_free (f);
348           break;
349         default:
350           clib_warning ("bogus frame queue message, type %d", msg_type);
351           break;
352         }
353       elt->valid = 0;
354       fq->dequeues++;
355       fq->dequeue_ticks += clib_cpu_time_now () - before;
356       CLIB_MEMORY_BARRIER ();
357       fq->head++;
358       processed++;
359     }
360   ASSERT (0);
361   return processed;
362 }
363
364 int
365 vlib_frame_queue_dequeue (int thread_id,
366                           vlib_main_t * vm, vlib_node_main_t * nm)
367 {
368   return vlib_frame_queue_dequeue_internal (thread_id, vm, nm);
369 }
370
371 int
372 vlib_frame_queue_enqueue (vlib_main_t * vm, u32 node_runtime_index,
373                           u32 frame_queue_index, vlib_frame_t * frame,
374                           vlib_frame_queue_msg_type_t type)
375 {
376   vlib_frame_queue_t *fq = vlib_frame_queues[frame_queue_index];
377   vlib_frame_queue_elt_t *elt;
378   u32 save_count;
379   u64 new_tail;
380   u64 before = clib_cpu_time_now ();
381
382   ASSERT (fq);
383
384   new_tail = __sync_add_and_fetch (&fq->tail, 1);
385
386   /* Wait until a ring slot is available */
387   while (new_tail >= fq->head + fq->nelts)
388     {
389       f64 b4 = vlib_time_now_ticks (vm, before);
390       vlib_worker_thread_barrier_check (vm, b4);
391       /* Bad idea. Dequeue -> enqueue -> dequeue -> trouble */
392       // vlib_frame_queue_dequeue (vm->thread_index, vm, nm);
393     }
394
395   elt = fq->elts + (new_tail & (fq->nelts - 1));
396
397   /* this would be very bad... */
398   while (elt->valid)
399     {
400     }
401
402   /* Once we enqueue the frame, frame->n_vectors is owned elsewhere... */
403   save_count = frame->n_vectors;
404
405   elt->frame = frame;
406   elt->node_runtime_index = node_runtime_index;
407   elt->msg_type = type;
408   CLIB_MEMORY_BARRIER ();
409   elt->valid = 1;
410
411   return save_count;
412 }
413 #endif /* 0 */
414
415 /* To be called by vlib worker threads upon startup */
416 void
417 vlib_worker_thread_init (vlib_worker_thread_t * w)
418 {
419   vlib_thread_main_t *tm = vlib_get_thread_main ();
420
421   /*
422    * Note: disabling signals in worker threads as follows
423    * prevents the api post-mortem dump scheme from working
424    * {
425    *    sigset_t s;
426    *    sigfillset (&s);
427    *    pthread_sigmask (SIG_SETMASK, &s, 0);
428    *  }
429    */
430
431   clib_mem_set_heap (w->thread_mheap);
432
433   if (vec_len (tm->thread_prefix) && w->registration->short_name)
434     {
435       w->name = format (0, "%v_%s_%d%c", tm->thread_prefix,
436                         w->registration->short_name, w->instance_id, '\0');
437       vlib_set_thread_name ((char *) w->name);
438     }
439
440   if (!w->registration->use_pthreads)
441     {
442
443       /* Initial barrier sync, for both worker and i/o threads */
444       clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, 1);
445
446       while (*vlib_worker_threads->wait_at_barrier)
447         ;
448
449       clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, -1);
450     }
451 }
452
453 void *
454 vlib_worker_thread_bootstrap_fn (void *arg)
455 {
456   void *rv;
457   vlib_worker_thread_t *w = arg;
458
459   w->lwp = syscall (SYS_gettid);
460   w->thread_id = pthread_self ();
461
462   __os_thread_index = w - vlib_worker_threads;
463
464   rv = (void *) clib_calljmp
465     ((uword (*)(uword)) w->thread_function,
466      (uword) arg, w->thread_stack + VLIB_THREAD_STACK_SIZE);
467   /* NOTREACHED, we hope */
468   return rv;
469 }
470
471 static clib_error_t *
472 vlib_launch_thread_int (void *fp, vlib_worker_thread_t * w, unsigned lcore_id)
473 {
474   vlib_thread_main_t *tm = &vlib_thread_main;
475   void *(*fp_arg) (void *) = fp;
476
477   w->lcore_id = lcore_id;
478   if (tm->cb.vlib_launch_thread_cb && !w->registration->use_pthreads)
479     return tm->cb.vlib_launch_thread_cb (fp, (void *) w, lcore_id);
480   else
481     {
482       pthread_t worker;
483       cpu_set_t cpuset;
484       CPU_ZERO (&cpuset);
485       CPU_SET (lcore_id, &cpuset);
486
487       if (pthread_create (&worker, NULL /* attr */ , fp_arg, (void *) w))
488         return clib_error_return_unix (0, "pthread_create");
489
490       if (pthread_setaffinity_np (worker, sizeof (cpu_set_t), &cpuset))
491         return clib_error_return_unix (0, "pthread_setaffinity_np");
492
493       return 0;
494     }
495 }
496
497 static clib_error_t *
498 start_workers (vlib_main_t * vm)
499 {
500   int i, j;
501   vlib_worker_thread_t *w;
502   vlib_main_t *vm_clone;
503   void *oldheap;
504   vlib_thread_main_t *tm = &vlib_thread_main;
505   vlib_thread_registration_t *tr;
506   vlib_node_runtime_t *rt;
507   u32 n_vlib_mains = tm->n_vlib_mains;
508   u32 worker_thread_index;
509   u8 *main_heap = clib_mem_get_per_cpu_heap ();
510   mheap_t *main_heap_header = mheap_header (main_heap);
511
512   vec_reset_length (vlib_worker_threads);
513
514   /* Set up the main thread */
515   vec_add2_aligned (vlib_worker_threads, w, 1, CLIB_CACHE_LINE_BYTES);
516   w->elog_track.name = "main thread";
517   elog_track_register (&vm->elog_main, &w->elog_track);
518
519   if (vec_len (tm->thread_prefix))
520     {
521       w->name = format (0, "%v_main%c", tm->thread_prefix, '\0');
522       vlib_set_thread_name ((char *) w->name);
523     }
524
525   /*
526    * Truth of the matter: we always use at least two
527    * threads. So, make the main heap thread-safe
528    * and make the event log thread-safe.
529    */
530   main_heap_header->flags |= MHEAP_FLAG_THREAD_SAFE;
531   vm->elog_main.lock =
532     clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES, CLIB_CACHE_LINE_BYTES);
533   vm->elog_main.lock[0] = 0;
534
535   if (n_vlib_mains > 1)
536     {
537       /* Replace hand-crafted length-1 vector with a real vector */
538       vlib_mains = 0;
539
540       vec_validate_aligned (vlib_mains, tm->n_vlib_mains - 1,
541                             CLIB_CACHE_LINE_BYTES);
542       _vec_len (vlib_mains) = 0;
543       vec_add1_aligned (vlib_mains, vm, CLIB_CACHE_LINE_BYTES);
544
545       vlib_worker_threads->wait_at_barrier =
546         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
547       vlib_worker_threads->workers_at_barrier =
548         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
549
550       /* Ask for an initial barrier sync */
551       *vlib_worker_threads->workers_at_barrier = 0;
552       *vlib_worker_threads->wait_at_barrier = 1;
553
554       worker_thread_index = 1;
555
556       for (i = 0; i < vec_len (tm->registrations); i++)
557         {
558           vlib_node_main_t *nm, *nm_clone;
559           vlib_buffer_main_t *bm_clone;
560           vlib_buffer_free_list_t *fl_clone, *fl_orig;
561           vlib_buffer_free_list_t *orig_freelist_pool;
562           int k;
563
564           tr = tm->registrations[i];
565
566           if (tr->count == 0)
567             continue;
568
569           for (k = 0; k < tr->count; k++)
570             {
571               vec_add2 (vlib_worker_threads, w, 1);
572               if (tr->mheap_size)
573                 w->thread_mheap =
574                   mheap_alloc (0 /* use VM */ , tr->mheap_size);
575               else
576                 w->thread_mheap = main_heap;
577
578               w->thread_stack =
579                 vlib_thread_stack_init (w - vlib_worker_threads);
580               w->thread_function = tr->function;
581               w->thread_function_arg = w;
582               w->instance_id = k;
583               w->registration = tr;
584
585               w->elog_track.name =
586                 (char *) format (0, "%s %d", tr->name, k + 1);
587               vec_add1 (w->elog_track.name, 0);
588               elog_track_register (&vm->elog_main, &w->elog_track);
589
590               if (tr->no_data_structure_clone)
591                 continue;
592
593               /* Fork vlib_global_main et al. Look for bugs here */
594               oldheap = clib_mem_set_heap (w->thread_mheap);
595
596               vm_clone = clib_mem_alloc (sizeof (*vm_clone));
597               clib_memcpy (vm_clone, vlib_mains[0], sizeof (*vm_clone));
598
599               vm_clone->thread_index = worker_thread_index;
600               vm_clone->heap_base = w->thread_mheap;
601               vm_clone->mbuf_alloc_list = 0;
602               vm_clone->init_functions_called =
603                 hash_create (0, /* value bytes */ 0);
604               memset (&vm_clone->random_buffer, 0,
605                       sizeof (vm_clone->random_buffer));
606
607               nm = &vlib_mains[0]->node_main;
608               nm_clone = &vm_clone->node_main;
609               /* fork next frames array, preserving node runtime indices */
610               nm_clone->next_frames = vec_dup (nm->next_frames);
611               for (j = 0; j < vec_len (nm_clone->next_frames); j++)
612                 {
613                   vlib_next_frame_t *nf = &nm_clone->next_frames[j];
614                   u32 save_node_runtime_index;
615                   u32 save_flags;
616
617                   save_node_runtime_index = nf->node_runtime_index;
618                   save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
619                   vlib_next_frame_init (nf);
620                   nf->node_runtime_index = save_node_runtime_index;
621                   nf->flags = save_flags;
622                 }
623
624               /* fork the frame dispatch queue */
625               nm_clone->pending_frames = 0;
626               vec_validate (nm_clone->pending_frames, 10);      /* $$$$$?????? */
627               _vec_len (nm_clone->pending_frames) = 0;
628
629               /* fork nodes */
630               nm_clone->nodes = 0;
631               for (j = 0; j < vec_len (nm->nodes); j++)
632                 {
633                   vlib_node_t *n;
634                   n = clib_mem_alloc_no_fail (sizeof (*n));
635                   clib_memcpy (n, nm->nodes[j], sizeof (*n));
636                   /* none of the copied nodes have enqueue rights given out */
637                   n->owner_node_index = VLIB_INVALID_NODE_INDEX;
638                   memset (&n->stats_total, 0, sizeof (n->stats_total));
639                   memset (&n->stats_last_clear, 0,
640                           sizeof (n->stats_last_clear));
641                   vec_add1 (nm_clone->nodes, n);
642                 }
643               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
644                 vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
645               vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
646               {
647                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
648                 rt->thread_index = vm_clone->thread_index;
649                 /* copy initial runtime_data from node */
650                 if (n->runtime_data && n->runtime_data_bytes > 0)
651                   clib_memcpy (rt->runtime_data, n->runtime_data,
652                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
653                                          n->runtime_data_bytes));
654               }
655
656               nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
657                 vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
658               vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
659               {
660                 vlib_node_t *n = vlib_get_node (vm, rt->node_index);
661                 rt->thread_index = vm_clone->thread_index;
662                 /* copy initial runtime_data from node */
663                 if (n->runtime_data && n->runtime_data_bytes > 0)
664                   clib_memcpy (rt->runtime_data, n->runtime_data,
665                                clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
666                                          n->runtime_data_bytes));
667               }
668
669               nm_clone->processes = vec_dup (nm->processes);
670
671               /* zap the (per worker) frame freelists, etc */
672               nm_clone->frame_sizes = 0;
673               nm_clone->frame_size_hash = 0;
674
675               /* Packet trace buffers are guaranteed to be empty, nothing to do here */
676
677               clib_mem_set_heap (oldheap);
678               vec_add1_aligned (vlib_mains, vm_clone, CLIB_CACHE_LINE_BYTES);
679
680               vm_clone->error_main.counters =
681                 vec_dup (vlib_mains[0]->error_main.counters);
682               vm_clone->error_main.counters_last_clear =
683                 vec_dup (vlib_mains[0]->error_main.counters_last_clear);
684
685               /* Fork the vlib_buffer_main_t free lists, etc. */
686               bm_clone = vec_dup (vm_clone->buffer_main);
687               vm_clone->buffer_main = bm_clone;
688
689               orig_freelist_pool = bm_clone->buffer_free_list_pool;
690               bm_clone->buffer_free_list_pool = 0;
691
692             /* *INDENT-OFF* */
693             pool_foreach (fl_orig, orig_freelist_pool,
694                           ({
695                             pool_get_aligned (bm_clone->buffer_free_list_pool,
696                                               fl_clone, CLIB_CACHE_LINE_BYTES);
697                             ASSERT (fl_orig - orig_freelist_pool
698                                     == fl_clone - bm_clone->buffer_free_list_pool);
699
700                             fl_clone[0] = fl_orig[0];
701                             fl_clone->buffers = 0;
702                             fl_clone->n_alloc = 0;
703                           }));
704 /* *INDENT-ON* */
705
706               worker_thread_index++;
707             }
708         }
709     }
710   else
711     {
712       /* only have non-data-structure copy threads to create... */
713       for (i = 0; i < vec_len (tm->registrations); i++)
714         {
715           tr = tm->registrations[i];
716
717           for (j = 0; j < tr->count; j++)
718             {
719               vec_add2 (vlib_worker_threads, w, 1);
720               if (tr->mheap_size)
721                 w->thread_mheap =
722                   mheap_alloc (0 /* use VM */ , tr->mheap_size);
723               else
724                 w->thread_mheap = main_heap;
725               w->thread_stack =
726                 vlib_thread_stack_init (w - vlib_worker_threads);
727               w->thread_function = tr->function;
728               w->thread_function_arg = w;
729               w->instance_id = j;
730               w->elog_track.name =
731                 (char *) format (0, "%s %d", tr->name, j + 1);
732               w->registration = tr;
733               vec_add1 (w->elog_track.name, 0);
734               elog_track_register (&vm->elog_main, &w->elog_track);
735             }
736         }
737     }
738
739   worker_thread_index = 1;
740
741   for (i = 0; i < vec_len (tm->registrations); i++)
742     {
743       clib_error_t *err;
744       int j;
745
746       tr = tm->registrations[i];
747
748       if (tr->use_pthreads || tm->use_pthreads)
749         {
750           for (j = 0; j < tr->count; j++)
751             {
752               w = vlib_worker_threads + worker_thread_index++;
753               err = vlib_launch_thread_int (vlib_worker_thread_bootstrap_fn,
754                                             w, 0);
755               if (err)
756                 clib_error_report (err);
757             }
758         }
759       else
760         {
761           uword c;
762           /* *INDENT-OFF* */
763           clib_bitmap_foreach (c, tr->coremask, ({
764             w = vlib_worker_threads + worker_thread_index++;
765             err = vlib_launch_thread_int (vlib_worker_thread_bootstrap_fn,
766                                           w, c);
767             if (err)
768               clib_error_report (err);
769           }));
770           /* *INDENT-ON* */
771         }
772     }
773   vlib_worker_thread_barrier_sync (vm);
774   vlib_worker_thread_barrier_release (vm);
775   return 0;
776 }
777
778 VLIB_MAIN_LOOP_ENTER_FUNCTION (start_workers);
779
780 void
781 vlib_worker_thread_node_runtime_update (void)
782 {
783   int i, j;
784   vlib_worker_thread_t *w;
785   vlib_main_t *vm;
786   vlib_node_main_t *nm, *nm_clone;
787   vlib_node_t **old_nodes_clone;
788   vlib_main_t *vm_clone;
789   vlib_node_runtime_t *rt, *old_rt;
790   void *oldheap;
791   never_inline void
792     vlib_node_runtime_sync_stats (vlib_main_t * vm,
793                                   vlib_node_runtime_t * r,
794                                   uword n_calls,
795                                   uword n_vectors, uword n_clocks);
796
797   ASSERT (vlib_get_thread_index () == 0);
798
799   if (vec_len (vlib_mains) == 1)
800     return;
801
802   vm = vlib_mains[0];
803   nm = &vm->node_main;
804
805   ASSERT (vlib_get_thread_index () == 0);
806   ASSERT (*vlib_worker_threads->wait_at_barrier == 1);
807
808   /*
809    * Scrape all runtime stats, so we don't lose node runtime(s) with
810    * pending counts, or throw away worker / io thread counts.
811    */
812   for (j = 0; j < vec_len (nm->nodes); j++)
813     {
814       vlib_node_t *n;
815       n = nm->nodes[j];
816       vlib_node_sync_stats (vm, n);
817     }
818
819   for (i = 1; i < vec_len (vlib_mains); i++)
820     {
821       vlib_node_t *n;
822
823       vm_clone = vlib_mains[i];
824       nm_clone = &vm_clone->node_main;
825
826       for (j = 0; j < vec_len (nm_clone->nodes); j++)
827         {
828           n = nm_clone->nodes[j];
829
830           rt = vlib_node_get_runtime (vm_clone, n->index);
831           vlib_node_runtime_sync_stats (vm_clone, rt, 0, 0, 0);
832         }
833     }
834
835   for (i = 1; i < vec_len (vlib_mains); i++)
836     {
837       vlib_node_runtime_t *rt;
838       w = vlib_worker_threads + i;
839       oldheap = clib_mem_set_heap (w->thread_mheap);
840
841       vm_clone = vlib_mains[i];
842
843       /* Re-clone error heap */
844       u64 *old_counters = vm_clone->error_main.counters;
845       u64 *old_counters_all_clear = vm_clone->error_main.counters_last_clear;
846       clib_memcpy (&vm_clone->error_main, &vm->error_main,
847                    sizeof (vm->error_main));
848       j = vec_len (vm->error_main.counters) - 1;
849       vec_validate_aligned (old_counters, j, CLIB_CACHE_LINE_BYTES);
850       vec_validate_aligned (old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES);
851       vm_clone->error_main.counters = old_counters;
852       vm_clone->error_main.counters_last_clear = old_counters_all_clear;
853
854       nm_clone = &vm_clone->node_main;
855       vec_free (nm_clone->next_frames);
856       nm_clone->next_frames = vec_dup (nm->next_frames);
857
858       for (j = 0; j < vec_len (nm_clone->next_frames); j++)
859         {
860           vlib_next_frame_t *nf = &nm_clone->next_frames[j];
861           u32 save_node_runtime_index;
862           u32 save_flags;
863
864           save_node_runtime_index = nf->node_runtime_index;
865           save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
866           vlib_next_frame_init (nf);
867           nf->node_runtime_index = save_node_runtime_index;
868           nf->flags = save_flags;
869         }
870
871       old_nodes_clone = nm_clone->nodes;
872       nm_clone->nodes = 0;
873
874       /* re-fork nodes */
875       for (j = 0; j < vec_len (nm->nodes); j++)
876         {
877           vlib_node_t *old_n_clone;
878           vlib_node_t *new_n, *new_n_clone;
879
880           new_n = nm->nodes[j];
881           old_n_clone = old_nodes_clone[j];
882
883           new_n_clone = clib_mem_alloc_no_fail (sizeof (*new_n_clone));
884           clib_memcpy (new_n_clone, new_n, sizeof (*new_n));
885           /* none of the copied nodes have enqueue rights given out */
886           new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX;
887
888           if (j >= vec_len (old_nodes_clone))
889             {
890               /* new node, set to zero */
891               memset (&new_n_clone->stats_total, 0,
892                       sizeof (new_n_clone->stats_total));
893               memset (&new_n_clone->stats_last_clear, 0,
894                       sizeof (new_n_clone->stats_last_clear));
895             }
896           else
897             {
898               /* Copy stats if the old data is valid */
899               clib_memcpy (&new_n_clone->stats_total,
900                            &old_n_clone->stats_total,
901                            sizeof (new_n_clone->stats_total));
902               clib_memcpy (&new_n_clone->stats_last_clear,
903                            &old_n_clone->stats_last_clear,
904                            sizeof (new_n_clone->stats_last_clear));
905
906               /* keep previous node state */
907               new_n_clone->state = old_n_clone->state;
908             }
909           vec_add1 (nm_clone->nodes, new_n_clone);
910         }
911       /* Free the old node clone */
912       for (j = 0; j < vec_len (old_nodes_clone); j++)
913         clib_mem_free (old_nodes_clone[j]);
914       vec_free (old_nodes_clone);
915
916
917       /* re-clone internal nodes */
918       old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL];
919       nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
920         vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
921
922       vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
923       {
924         vlib_node_t *n = vlib_get_node (vm, rt->node_index);
925         rt->thread_index = vm_clone->thread_index;
926         /* copy runtime_data, will be overwritten later for existing rt */
927         if (n->runtime_data && n->runtime_data_bytes > 0)
928           clib_memcpy (rt->runtime_data, n->runtime_data,
929                        clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
930                                  n->runtime_data_bytes));
931       }
932
933       for (j = 0; j < vec_len (old_rt); j++)
934         {
935           rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
936           rt->state = old_rt[j].state;
937           clib_memcpy (rt->runtime_data, old_rt[j].runtime_data,
938                        VLIB_NODE_RUNTIME_DATA_SIZE);
939         }
940
941       vec_free (old_rt);
942
943       /* re-clone input nodes */
944       old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT];
945       nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
946         vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
947
948       vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
949       {
950         vlib_node_t *n = vlib_get_node (vm, rt->node_index);
951         rt->thread_index = vm_clone->thread_index;
952         /* copy runtime_data, will be overwritten later for existing rt */
953         if (n->runtime_data && n->runtime_data_bytes > 0)
954           clib_memcpy (rt->runtime_data, n->runtime_data,
955                        clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
956                                  n->runtime_data_bytes));
957       }
958
959       for (j = 0; j < vec_len (old_rt); j++)
960         {
961           rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
962           rt->state = old_rt[j].state;
963           clib_memcpy (rt->runtime_data, old_rt[j].runtime_data,
964                        VLIB_NODE_RUNTIME_DATA_SIZE);
965         }
966
967       vec_free (old_rt);
968
969       nm_clone->processes = vec_dup (nm->processes);
970
971       clib_mem_set_heap (oldheap);
972
973       // vnet_main_fork_fixup (i);
974     }
975 }
976
977 u32
978 unformat_sched_policy (unformat_input_t * input, va_list * args)
979 {
980   u32 *r = va_arg (*args, u32 *);
981
982   if (0);
983 #define _(v,f,s) else if (unformat (input, s)) *r = SCHED_POLICY_##f;
984   foreach_sched_policy
985 #undef _
986     else
987     return 0;
988   return 1;
989 }
990
991 static clib_error_t *
992 cpu_config (vlib_main_t * vm, unformat_input_t * input)
993 {
994   vlib_thread_registration_t *tr;
995   uword *p;
996   vlib_thread_main_t *tm = &vlib_thread_main;
997   u8 *name;
998   u64 coremask;
999   uword *bitmap;
1000   u32 count;
1001
1002   tm->thread_registrations_by_name = hash_create_string (0, sizeof (uword));
1003
1004   tm->n_thread_stacks = 1;      /* account for main thread */
1005   tm->sched_policy = ~0;
1006   tm->sched_priority = ~0;
1007
1008   tr = tm->next;
1009
1010   while (tr)
1011     {
1012       hash_set_mem (tm->thread_registrations_by_name, tr->name, (uword) tr);
1013       tr = tr->next;
1014     }
1015
1016   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1017     {
1018       if (unformat (input, "use-pthreads"))
1019         tm->use_pthreads = 1;
1020       else if (unformat (input, "thread-prefix %v", &tm->thread_prefix))
1021         ;
1022       else if (unformat (input, "main-core %u", &tm->main_lcore))
1023         ;
1024       else if (unformat (input, "skip-cores %u", &tm->skip_cores))
1025         ;
1026       else if (unformat (input, "coremask-%s %llx", &name, &coremask))
1027         {
1028           p = hash_get_mem (tm->thread_registrations_by_name, name);
1029           if (p == 0)
1030             return clib_error_return (0, "no such thread type '%s'", name);
1031
1032           tr = (vlib_thread_registration_t *) p[0];
1033
1034           if (tr->use_pthreads)
1035             return clib_error_return (0,
1036                                       "coremask cannot be set for '%s' threads",
1037                                       name);
1038
1039           tr->coremask = clib_bitmap_set_multiple
1040             (tr->coremask, 0, coremask, BITS (coremask));
1041           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1042         }
1043       else if (unformat (input, "corelist-%s %U", &name, unformat_bitmap_list,
1044                          &bitmap))
1045         {
1046           p = hash_get_mem (tm->thread_registrations_by_name, name);
1047           if (p == 0)
1048             return clib_error_return (0, "no such thread type '%s'", name);
1049
1050           tr = (vlib_thread_registration_t *) p[0];
1051
1052           if (tr->use_pthreads)
1053             return clib_error_return (0,
1054                                       "corelist cannot be set for '%s' threads",
1055                                       name);
1056
1057           tr->coremask = bitmap;
1058           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1059         }
1060       else
1061         if (unformat
1062             (input, "scheduler-policy %U", unformat_sched_policy,
1063              &tm->sched_policy))
1064         ;
1065       else if (unformat (input, "scheduler-priority %u", &tm->sched_priority))
1066         ;
1067       else if (unformat (input, "%s %u", &name, &count))
1068         {
1069           p = hash_get_mem (tm->thread_registrations_by_name, name);
1070           if (p == 0)
1071             return clib_error_return (0, "no such thread type 3 '%s'", name);
1072
1073           tr = (vlib_thread_registration_t *) p[0];
1074           if (tr->fixed_count)
1075             return clib_error_return
1076               (0, "number of %s threads not configurable", tr->name);
1077           tr->count = count;
1078         }
1079       else
1080         break;
1081     }
1082
1083   if (tm->sched_priority != ~0)
1084     {
1085       if (tm->sched_policy == SCHED_FIFO || tm->sched_policy == SCHED_RR)
1086         {
1087           u32 prio_max = sched_get_priority_max (tm->sched_policy);
1088           u32 prio_min = sched_get_priority_min (tm->sched_policy);
1089           if (tm->sched_priority > prio_max)
1090             tm->sched_priority = prio_max;
1091           if (tm->sched_priority < prio_min)
1092             tm->sched_priority = prio_min;
1093         }
1094       else
1095         {
1096           return clib_error_return
1097             (0,
1098              "scheduling priority (%d) is not allowed for `normal` scheduling policy",
1099              tm->sched_priority);
1100         }
1101     }
1102   tr = tm->next;
1103
1104   if (!tm->thread_prefix)
1105     tm->thread_prefix = format (0, "vpp");
1106
1107   while (tr)
1108     {
1109       tm->n_thread_stacks += tr->count;
1110       tm->n_pthreads += tr->count * tr->use_pthreads;
1111       tm->n_threads += tr->count * (tr->use_pthreads == 0);
1112       tr = tr->next;
1113     }
1114
1115   return 0;
1116 }
1117
1118 VLIB_EARLY_CONFIG_FUNCTION (cpu_config, "cpu");
1119
1120 #if !defined (__x86_64__) && !defined (__i386__) && !defined (__aarch64__) && !defined (__powerpc64__) && !defined(__arm__)
1121 void
1122 __sync_fetch_and_add_8 (void)
1123 {
1124   fformat (stderr, "%s called\n", __FUNCTION__);
1125   abort ();
1126 }
1127
1128 void
1129 __sync_add_and_fetch_8 (void)
1130 {
1131   fformat (stderr, "%s called\n", __FUNCTION__);
1132   abort ();
1133 }
1134 #endif
1135
1136 void vnet_main_fixup (vlib_fork_fixup_t which) __attribute__ ((weak));
1137 void
1138 vnet_main_fixup (vlib_fork_fixup_t which)
1139 {
1140 }
1141
1142 void
1143 vlib_worker_thread_fork_fixup (vlib_fork_fixup_t which)
1144 {
1145   vlib_main_t *vm = vlib_get_main ();
1146
1147   if (vlib_mains == 0)
1148     return;
1149
1150   ASSERT (vlib_get_thread_index () == 0);
1151   vlib_worker_thread_barrier_sync (vm);
1152
1153   switch (which)
1154     {
1155     case VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX:
1156       vnet_main_fixup (VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX);
1157       break;
1158
1159     default:
1160       ASSERT (0);
1161     }
1162   vlib_worker_thread_barrier_release (vm);
1163 }
1164
1165 void
1166 vlib_worker_thread_barrier_sync (vlib_main_t * vm)
1167 {
1168   f64 deadline;
1169   u32 count;
1170
1171   if (vec_len (vlib_mains) < 2)
1172     return;
1173
1174   count = vec_len (vlib_mains) - 1;
1175
1176   /* Tolerate recursive calls */
1177   if (++vlib_worker_threads[0].recursion_level > 1)
1178     return;
1179
1180   vlib_worker_threads[0].barrier_sync_count++;
1181
1182   ASSERT (vlib_get_thread_index () == 0);
1183
1184   deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT;
1185
1186   *vlib_worker_threads->wait_at_barrier = 1;
1187   while (*vlib_worker_threads->workers_at_barrier != count)
1188     {
1189       if (vlib_time_now (vm) > deadline)
1190         {
1191           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1192           os_panic ();
1193         }
1194     }
1195 }
1196
1197 void
1198 vlib_worker_thread_barrier_release (vlib_main_t * vm)
1199 {
1200   f64 deadline;
1201
1202   if (vec_len (vlib_mains) < 2)
1203     return;
1204
1205   if (--vlib_worker_threads[0].recursion_level > 0)
1206     return;
1207
1208   deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT;
1209
1210   *vlib_worker_threads->wait_at_barrier = 0;
1211
1212   while (*vlib_worker_threads->workers_at_barrier > 0)
1213     {
1214       if (vlib_time_now (vm) > deadline)
1215         {
1216           fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1217           os_panic ();
1218         }
1219     }
1220 }
1221
1222 /*
1223  * Check the frame queue to see if any frames are available.
1224  * If so, pull the packets off the frames and put them to
1225  * the handoff node.
1226  */
1227 int
1228 vlib_frame_queue_dequeue (vlib_main_t * vm, vlib_frame_queue_main_t * fqm)
1229 {
1230   u32 thread_id = vm->thread_index;
1231   vlib_frame_queue_t *fq = fqm->vlib_frame_queues[thread_id];
1232   vlib_frame_queue_elt_t *elt;
1233   u32 *from, *to;
1234   vlib_frame_t *f;
1235   int msg_type;
1236   int processed = 0;
1237   u32 n_left_to_node;
1238   u32 vectors = 0;
1239
1240   ASSERT (fq);
1241   ASSERT (vm == vlib_mains[thread_id]);
1242
1243   if (PREDICT_FALSE (fqm->node_index == ~0))
1244     return 0;
1245   /*
1246    * Gather trace data for frame queues
1247    */
1248   if (PREDICT_FALSE (fq->trace))
1249     {
1250       frame_queue_trace_t *fqt;
1251       frame_queue_nelt_counter_t *fqh;
1252       u32 elix;
1253
1254       fqt = &fqm->frame_queue_traces[thread_id];
1255
1256       fqt->nelts = fq->nelts;
1257       fqt->head = fq->head;
1258       fqt->head_hint = fq->head_hint;
1259       fqt->tail = fq->tail;
1260       fqt->threshold = fq->vector_threshold;
1261       fqt->n_in_use = fqt->tail - fqt->head;
1262       if (fqt->n_in_use >= fqt->nelts)
1263         {
1264           // if beyond max then use max
1265           fqt->n_in_use = fqt->nelts - 1;
1266         }
1267
1268       /* Record the number of elements in use in the histogram */
1269       fqh = &fqm->frame_queue_histogram[thread_id];
1270       fqh->count[fqt->n_in_use]++;
1271
1272       /* Record a snapshot of the elements in use */
1273       for (elix = 0; elix < fqt->nelts; elix++)
1274         {
1275           elt = fq->elts + ((fq->head + 1 + elix) & (fq->nelts - 1));
1276           if (1 || elt->valid)
1277             {
1278               fqt->n_vectors[elix] = elt->n_vectors;
1279             }
1280         }
1281       fqt->written = 1;
1282     }
1283
1284   while (1)
1285     {
1286       if (fq->head == fq->tail)
1287         {
1288           fq->head_hint = fq->head;
1289           return processed;
1290         }
1291
1292       elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1));
1293
1294       if (!elt->valid)
1295         {
1296           fq->head_hint = fq->head;
1297           return processed;
1298         }
1299
1300       from = elt->buffer_index;
1301       msg_type = elt->msg_type;
1302
1303       ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME);
1304       ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE);
1305
1306       f = vlib_get_frame_to_node (vm, fqm->node_index);
1307
1308       to = vlib_frame_vector_args (f);
1309
1310       n_left_to_node = elt->n_vectors;
1311
1312       while (n_left_to_node >= 4)
1313         {
1314           to[0] = from[0];
1315           to[1] = from[1];
1316           to[2] = from[2];
1317           to[3] = from[3];
1318           to += 4;
1319           from += 4;
1320           n_left_to_node -= 4;
1321         }
1322
1323       while (n_left_to_node > 0)
1324         {
1325           to[0] = from[0];
1326           to++;
1327           from++;
1328           n_left_to_node--;
1329         }
1330
1331       vectors += elt->n_vectors;
1332       f->n_vectors = elt->n_vectors;
1333       vlib_put_frame_to_node (vm, fqm->node_index, f);
1334
1335       elt->valid = 0;
1336       elt->n_vectors = 0;
1337       elt->msg_type = 0xfefefefe;
1338       CLIB_MEMORY_BARRIER ();
1339       fq->head++;
1340       processed++;
1341
1342       /*
1343        * Limit the number of packets pushed into the graph
1344        */
1345       if (vectors >= fq->vector_threshold)
1346         {
1347           fq->head_hint = fq->head;
1348           return processed;
1349         }
1350     }
1351   ASSERT (0);
1352   return processed;
1353 }
1354
1355 void
1356 vlib_worker_thread_fn (void *arg)
1357 {
1358   vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg;
1359   vlib_thread_main_t *tm = vlib_get_thread_main ();
1360   vlib_main_t *vm = vlib_get_main ();
1361   clib_error_t *e;
1362
1363   ASSERT (vm->thread_index == vlib_get_thread_index ());
1364
1365   vlib_worker_thread_init (w);
1366   clib_time_init (&vm->clib_time);
1367   clib_mem_set_heap (w->thread_mheap);
1368
1369   /* Wait until the dpdk init sequence is complete */
1370   while (tm->extern_thread_mgmt && tm->worker_thread_release == 0)
1371     vlib_worker_thread_barrier_check ();
1372
1373   e = vlib_call_init_exit_functions
1374     (vm, vm->worker_init_function_registrations, 1 /* call_once */ );
1375   if (e)
1376     clib_error_report (e);
1377
1378   vlib_worker_loop (vm);
1379 }
1380
1381 /* *INDENT-OFF* */
1382 VLIB_REGISTER_THREAD (worker_thread_reg, static) = {
1383   .name = "workers",
1384   .short_name = "wk",
1385   .function = vlib_worker_thread_fn,
1386 };
1387 /* *INDENT-ON* */
1388
1389 u32
1390 vlib_frame_queue_main_init (u32 node_index, u32 frame_queue_nelts)
1391 {
1392   vlib_thread_main_t *tm = vlib_get_thread_main ();
1393   vlib_frame_queue_main_t *fqm;
1394   vlib_frame_queue_t *fq;
1395   int i;
1396
1397   if (frame_queue_nelts == 0)
1398     frame_queue_nelts = FRAME_QUEUE_NELTS;
1399
1400   vec_add2 (tm->frame_queue_mains, fqm, 1);
1401
1402   fqm->node_index = node_index;
1403
1404   vec_validate (fqm->vlib_frame_queues, tm->n_vlib_mains - 1);
1405   _vec_len (fqm->vlib_frame_queues) = 0;
1406   for (i = 0; i < tm->n_vlib_mains; i++)
1407     {
1408       fq = vlib_frame_queue_alloc (frame_queue_nelts);
1409       vec_add1 (fqm->vlib_frame_queues, fq);
1410     }
1411
1412   return (fqm - tm->frame_queue_mains);
1413 }
1414
1415
1416 int
1417 vlib_thread_cb_register (struct vlib_main_t *vm, vlib_thread_callbacks_t * cb)
1418 {
1419   vlib_thread_main_t *tm = vlib_get_thread_main ();
1420
1421   if (tm->extern_thread_mgmt)
1422     return -1;
1423
1424   tm->cb.vlib_launch_thread_cb = cb->vlib_launch_thread_cb;
1425   tm->extern_thread_mgmt = 1;
1426   return 0;
1427 }
1428
1429 clib_error_t *
1430 threads_init (vlib_main_t * vm)
1431 {
1432   return 0;
1433 }
1434
1435 VLIB_INIT_FUNCTION (threads_init);
1436
1437 /*
1438  * fd.io coding-style-patch-verification: ON
1439  *
1440  * Local Variables:
1441  * eval: (c-set-style "gnu")
1442  * End:
1443  */