Decouple worker thread code from dpdk, enable worker threads in vpp_lite
[vpp.git] / vlib / vlib / threads.c
1 /*
2  * Copyright (c) 2015 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define _GNU_SOURCE
16 #include <sched.h>
17
18 #include <signal.h>
19 #include <math.h>
20 #include <vppinfra/format.h>
21 #include <vlib/vlib.h>
22
23 #include <vlib/threads.h>
24 #include <vlib/unix/cj.h>
25
26
27 #if DPDK==1
28 #include <rte_config.h>
29 #include <rte_common.h>
30 #include <rte_eal.h>
31 #include <rte_launch.h>
32 #include <rte_lcore.h>
33 #endif
34 DECLARE_CJ_GLOBAL_LOG;
35
36 #define FRAME_QUEUE_NELTS 32
37
38
39 #if DPDK==1
40 /*
41  *  Weak definitions of DPDK symbols used in this file.
42  *  Needed for linking test programs without DPDK libs.
43  */
44 unsigned __thread      __attribute__((weak)) RTE_PER_LCORE(_lcore_id);
45 struct lcore_config    __attribute__((weak)) lcore_config[];
46 unsigned               __attribute__((weak)) rte_socket_id();
47 int                    __attribute__((weak)) rte_eal_remote_launch();
48 #endif
49 u32 vl(void *p)
50 {
51   return vec_len (p);
52 }
53
54 vlib_thread_main_t vlib_thread_main;
55
56 uword
57 os_get_cpu_number (void)
58 {
59   void * sp;
60   uword n;
61   u32 len;
62
63   len = vec_len (vlib_thread_stacks);
64   if (len == 0)
65     return 0;
66
67   /* Get any old stack address. */
68   sp = &sp;
69
70   n = ((uword)sp - (uword)vlib_thread_stacks[0])
71       >> VLIB_LOG2_THREAD_STACK_SIZE;
72
73   /* "processes" have their own stacks, and they always run in thread 0 */
74   n = n >= len ? 0 : n;
75
76   return n;
77 }
78
79 void
80 vlib_set_thread_name (char *name)
81 {
82   int pthread_setname_np (pthread_t __target_thread, const char *__name);
83   pthread_t thread = pthread_self();
84
85   if (thread) 
86     pthread_setname_np(thread, name);
87 }
88
89 static int sort_registrations_by_no_clone  (void *a0, void * a1)
90
91   vlib_thread_registration_t ** tr0 = a0;
92   vlib_thread_registration_t ** tr1 = a1;
93
94   return ((i32)((*tr0)->no_data_structure_clone) 
95           - ((i32)((*tr1)->no_data_structure_clone)));
96 }
97
98 static uword *
99 vlib_sysfs_list_to_bitmap(char * filename)
100 {
101   FILE *fp;
102   uword *r = 0;
103
104   fp = fopen (filename, "r");
105
106   if (fp != NULL)
107     {
108       u8 * buffer = 0;
109       vec_validate (buffer, 256-1);
110       if (fgets ((char *)buffer, 256, fp))
111         {
112           unformat_input_t in;
113           unformat_init_string (&in, (char *) buffer, strlen ((char *) buffer));
114           unformat(&in, "%U", unformat_bitmap_list, &r);
115           unformat_free (&in);
116         }
117       vec_free(buffer);
118       fclose(fp);
119     }
120   return r;
121 }
122
123
124 /* Called early in the init sequence */
125
126 clib_error_t *
127 vlib_thread_init (vlib_main_t * vm)
128 {
129   vlib_thread_main_t * tm = &vlib_thread_main;
130   vlib_worker_thread_t * w;
131   vlib_thread_registration_t * tr;
132   u32 n_vlib_mains = 1;
133   u32 first_index = 1;
134   u32 i;
135   uword * avail_cpu;
136
137   /* get bitmaps of active cpu cores and sockets */
138   tm->cpu_core_bitmap =
139     vlib_sysfs_list_to_bitmap("/sys/devices/system/cpu/online");
140   tm->cpu_socket_bitmap =
141     vlib_sysfs_list_to_bitmap("/sys/devices/system/node/online");
142
143   avail_cpu = clib_bitmap_dup(tm->cpu_core_bitmap);
144
145   /* skip cores */
146   for (i=0; i < tm->skip_cores; i++)
147     {
148       uword c = clib_bitmap_first_set(avail_cpu);
149       if (c == ~0)
150         return clib_error_return (0, "no available cpus to skip");
151
152       avail_cpu = clib_bitmap_set(avail_cpu, c, 0);
153     }
154
155   /* grab cpu for main thread */
156   if (!tm->main_lcore)
157     {
158       tm->main_lcore = clib_bitmap_first_set(avail_cpu);
159       if (tm->main_lcore == (u8) ~0)
160         return clib_error_return (0, "no available cpus to be used for the"
161                                   " main thread");
162     }
163   else
164     {
165       if (clib_bitmap_get(avail_cpu, tm->main_lcore) == 0)
166         return clib_error_return (0, "cpu %u is not available to be used"
167                                   " for the main thread", tm->main_lcore);
168     }
169   avail_cpu = clib_bitmap_set(avail_cpu, tm->main_lcore, 0);
170
171   /* assume that there is socket 0 only if there is no data from sysfs */
172   if (!tm->cpu_socket_bitmap)
173     tm->cpu_socket_bitmap = clib_bitmap_set(0, 0, 1);
174
175   /* pin main thread to main_lcore  */
176 #if DPDK==0
177   {
178      cpu_set_t cpuset;
179      CPU_ZERO(&cpuset);
180      CPU_SET(tm->main_lcore, &cpuset);
181      pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
182   }
183 #endif
184
185   /* as many threads as stacks... */
186   vec_validate_aligned (vlib_worker_threads, vec_len(vlib_thread_stacks)-1,
187                         CLIB_CACHE_LINE_BYTES);
188
189   /* Preallocate thread 0 */
190   _vec_len(vlib_worker_threads) = 1;
191   w = vlib_worker_threads;
192   w->thread_mheap = clib_mem_get_heap();
193   w->thread_stack = vlib_thread_stacks[0];
194   w->dpdk_lcore_id = -1;
195   w->lwp = syscall(SYS_gettid);
196   tm->n_vlib_mains = 1;
197
198   /* assign threads to cores and set n_vlib_mains */
199   tr = tm->next;
200
201   while (tr)
202     {
203       vec_add1 (tm->registrations, tr);
204       tr = tr->next;
205     }
206
207   vec_sort_with_function
208     (tm->registrations, sort_registrations_by_no_clone);
209
210   for (i = 0; i < vec_len (tm->registrations); i++)
211     {
212       int j;
213       tr = tm->registrations[i];
214       tr->first_index = first_index;
215       first_index += tr->count;
216       n_vlib_mains += (tr->no_data_structure_clone == 0) ? tr->count : 0;
217
218       /* construct coremask */
219       if (tr->use_pthreads || !tr->count)
220         continue;
221
222       if (tr->coremask)
223         {
224           uword c;
225           clib_bitmap_foreach (c, tr->coremask, ({
226             if (clib_bitmap_get(avail_cpu, c) == 0)
227               return clib_error_return (0, "cpu %u is not available to be used"
228                                         " for the '%s' thread",c, tr->name);
229
230             avail_cpu = clib_bitmap_set(avail_cpu, c, 0);
231           }));
232
233         }
234       else
235         {
236           for (j=0; j < tr->count; j++)
237             {
238               uword c = clib_bitmap_first_set(avail_cpu);
239               if (c == ~0)
240               return clib_error_return (0, "no available cpus to be used for"
241                                         " the '%s' thread", tr->name);
242
243               avail_cpu = clib_bitmap_set(avail_cpu, c, 0);
244               tr->coremask = clib_bitmap_set(tr->coremask, c, 1);
245             }
246         }
247     }
248
249   clib_bitmap_free(avail_cpu);
250
251   tm->n_vlib_mains = n_vlib_mains;
252
253   vec_validate_aligned (vlib_worker_threads, first_index-1,
254                         CLIB_CACHE_LINE_BYTES);
255
256
257   tm->efd.enabled = VLIB_EFD_DISABLED;
258   tm->efd.queue_hi_thresh = ((VLIB_EFD_DEF_WORKER_HI_THRESH_PCT *
259                               FRAME_QUEUE_NELTS)/100);
260   return 0;
261 }
262
263 vlib_worker_thread_t *
264 vlib_alloc_thread (vlib_main_t * vm)
265 {
266   vlib_worker_thread_t * w;
267
268   if (vec_len(vlib_worker_threads) >= vec_len (vlib_thread_stacks))
269     {
270       clib_warning ("out of worker threads... Quitting...");
271       exit(1);
272     }
273   vec_add2 (vlib_worker_threads, w, 1);
274   w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
275   return w;
276 }
277
278 vlib_frame_queue_t * vlib_frame_queue_alloc (int nelts)
279 {
280   vlib_frame_queue_t * fq;
281
282   fq = clib_mem_alloc_aligned(sizeof (*fq), CLIB_CACHE_LINE_BYTES);
283   memset (fq, 0, sizeof (*fq));
284   fq->nelts = nelts;
285   fq->vector_threshold = 128; // packets
286   vec_validate_aligned (fq->elts, nelts-1, CLIB_CACHE_LINE_BYTES);
287
288   if (1)
289   {
290     if (((uword)&fq->tail) & (CLIB_CACHE_LINE_BYTES - 1))
291       fformat(stderr, "WARNING: fq->tail unaligned\n");
292     if (((uword)&fq->head) & (CLIB_CACHE_LINE_BYTES - 1))
293       fformat(stderr, "WARNING: fq->head unaligned\n");
294     if (((uword)fq->elts) & (CLIB_CACHE_LINE_BYTES - 1))
295       fformat(stderr, "WARNING: fq->elts unaligned\n");
296     
297     if (sizeof (fq->elts[0]) % CLIB_CACHE_LINE_BYTES)
298       fformat(stderr, "WARNING: fq->elts[0] size %d\n", 
299               sizeof (fq->elts[0]));
300     if (nelts & (nelts -1))
301       {
302         fformat (stderr, "FATAL: nelts MUST be a power of 2\n");
303         abort();
304       }
305   }
306   
307   return (fq);
308 }
309
310 void vl_msg_api_handler_no_free (void *) __attribute__ ((weak));
311 void vl_msg_api_handler_no_free (void *v) { }
312
313 /* Turned off, save as reference material... */
314 #if 0
315 static inline int vlib_frame_queue_dequeue_internal (int thread_id, 
316                                                       vlib_main_t *vm, 
317                                                       vlib_node_main_t *nm)
318 {
319   vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
320   vlib_frame_queue_elt_t *elt;
321   vlib_frame_t *f;
322   vlib_pending_frame_t *p;
323   vlib_node_runtime_t *r;
324   u32 node_runtime_index;
325   int msg_type;
326   u64 before;
327   int processed = 0;
328   
329   ASSERT(vm == vlib_mains[thread_id]);
330
331   while (1)
332     {
333       if (fq->head == fq->tail)
334         return processed;
335
336       elt = fq->elts + ((fq->head+1) & (fq->nelts-1));
337
338       if (!elt->valid)
339         return processed;
340
341       before = clib_cpu_time_now();
342
343       f = elt->frame;
344       node_runtime_index = elt->node_runtime_index;
345       msg_type = elt->msg_type;
346
347       switch (msg_type)
348         {
349         case VLIB_FRAME_QUEUE_ELT_FREE_BUFFERS:
350           vlib_buffer_free (vm, vlib_frame_vector_args (f), f->n_vectors);
351           /* note fallthrough... */
352         case VLIB_FRAME_QUEUE_ELT_FREE_FRAME:
353           r = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL], 
354                                 node_runtime_index);
355           vlib_frame_free (vm, r, f);
356           break;
357         case VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME:
358           vec_add2 (vm->node_main.pending_frames, p, 1);
359           f->flags |= (VLIB_FRAME_PENDING | VLIB_FRAME_FREE_AFTER_DISPATCH);
360           p->node_runtime_index = elt->node_runtime_index;
361           p->frame_index = vlib_frame_index (vm, f);
362           p->next_frame_index = VLIB_PENDING_FRAME_NO_NEXT_FRAME;
363           fq->dequeue_vectors += (u64) f->n_vectors;
364           break;
365         case VLIB_FRAME_QUEUE_ELT_API_MSG:
366           vl_msg_api_handler_no_free (f);
367           break;
368         default:
369           clib_warning ("bogus frame queue message, type %d", msg_type);
370           break;
371         }
372       elt->valid = 0;
373       fq->dequeues++;
374       fq->dequeue_ticks += clib_cpu_time_now() - before;
375       CLIB_MEMORY_BARRIER();
376       fq->head++;
377       processed++;
378     }
379   ASSERT(0);
380   return processed;
381 }
382
383 int vlib_frame_queue_dequeue (int thread_id, 
384                                vlib_main_t *vm, 
385                                vlib_node_main_t *nm)
386 {
387   return vlib_frame_queue_dequeue_internal (thread_id, vm, nm);
388 }
389
390 int vlib_frame_queue_enqueue (vlib_main_t *vm, u32 node_runtime_index,
391                               u32 frame_queue_index, vlib_frame_t *frame,
392                               vlib_frame_queue_msg_type_t type)
393 {
394   vlib_frame_queue_t *fq = vlib_frame_queues[frame_queue_index];
395   vlib_frame_queue_elt_t *elt;
396   u32 save_count;
397   u64 new_tail;
398   u64 before = clib_cpu_time_now();
399   
400   ASSERT (fq);
401
402   new_tail = __sync_add_and_fetch (&fq->tail, 1);
403
404   /* Wait until a ring slot is available */
405   while (new_tail >= fq->head + fq->nelts)
406     {
407       f64 b4 = vlib_time_now_ticks (vm, before);
408       vlib_worker_thread_barrier_check (vm, b4);
409       /* Bad idea. Dequeue -> enqueue -> dequeue -> trouble */
410       // vlib_frame_queue_dequeue (vm->cpu_index, vm, nm);
411     }
412
413   elt = fq->elts + (new_tail & (fq->nelts-1));
414
415   /* this would be very bad... */
416   while (elt->valid) 
417     {
418     }
419
420   /* Once we enqueue the frame, frame->n_vectors is owned elsewhere... */
421   save_count = frame->n_vectors;
422
423   elt->frame = frame;
424   elt->node_runtime_index = node_runtime_index;
425   elt->msg_type = type;
426   CLIB_MEMORY_BARRIER();
427   elt->valid = 1;
428
429   return save_count;
430 }
431 #endif /* 0 */
432
433 /* To be called by vlib worker threads upon startup */
434 void vlib_worker_thread_init (vlib_worker_thread_t * w)
435 {
436   vlib_thread_main_t *tm = vlib_get_thread_main();
437   
438   /* worker threads wants no signals. */
439   {
440     sigset_t s;
441     sigfillset (&s);
442     pthread_sigmask (SIG_SETMASK, &s, 0);
443   }
444
445   clib_mem_set_heap (w->thread_mheap);
446
447   if (vec_len(tm->thread_prefix) && w->registration->short_name)
448     {
449       w->name = format(0, "%v_%s_%d%c", tm->thread_prefix,
450                                         w->registration->short_name,
451                                         w->instance_id,
452                                         '\0');
453       vlib_set_thread_name((char *)w->name);
454     }
455
456   if (!w->registration->use_pthreads)
457     {
458
459       /* Initial barrier sync, for both worker and i/o threads */
460       clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, 1);
461
462       while (*vlib_worker_threads->wait_at_barrier)
463           ;
464
465       clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, -1);
466     }
467 }
468
469 void *vlib_worker_thread_bootstrap_fn (void *arg)
470 {
471   void *rv;
472   vlib_worker_thread_t *w = arg;
473   
474   w->lwp = syscall(SYS_gettid);
475   w->dpdk_lcore_id = -1;
476 #if DPDK==1
477   if (w->registration && !w->registration->use_pthreads &&
478       rte_socket_id) /* do we really have dpdk linked */
479     {
480       unsigned lcore = rte_lcore_id();
481       lcore = lcore < RTE_MAX_LCORE ? lcore : -1;
482       w->dpdk_lcore_id = lcore;
483     }
484 #endif
485
486   rv = (void *) clib_calljmp 
487       ((uword (*)(uword)) w->thread_function, 
488        (uword) arg, w->thread_stack + VLIB_THREAD_STACK_SIZE);
489   /* NOTREACHED, we hope */
490   return rv;
491 }
492
493 static int
494 vlib_launch_thread (void *fp, vlib_worker_thread_t *w, unsigned lcore_id)
495 {
496   void *(*fp_arg)(void *) = fp;
497
498 #if DPDK==1
499   if (!w->registration->use_pthreads)
500     if (rte_eal_remote_launch) /* do we have dpdk linked */
501       return rte_eal_remote_launch (fp, (void *)w, lcore_id);
502     else
503       return -1;
504   else
505 #endif
506   {
507     int ret;
508     pthread_t worker;
509     cpu_set_t cpuset;
510     CPU_ZERO(&cpuset);
511     CPU_SET(lcore_id, &cpuset);
512
513     ret = pthread_create (&worker, NULL /* attr */, fp_arg, (void *)w);
514     if(ret == 0)
515         return pthread_setaffinity_np(worker, sizeof(cpu_set_t), &cpuset);
516     else
517         return ret;
518   }
519 }
520
521 static clib_error_t * start_workers (vlib_main_t * vm)
522 {
523   int i, j;
524   vlib_worker_thread_t *w;
525   vlib_main_t *vm_clone;
526   void *oldheap;
527   vlib_frame_queue_t *fq;
528   vlib_thread_main_t * tm = &vlib_thread_main;
529   vlib_thread_registration_t * tr; 
530   vlib_node_runtime_t * rt;
531   u32 n_vlib_mains = tm->n_vlib_mains;
532   u32 worker_thread_index;
533   u8 * main_heap = clib_mem_get_per_cpu_heap();
534   mheap_t * main_heap_header = mheap_header (main_heap);
535       
536   vec_reset_length (vlib_worker_threads);
537
538   /* Set up the main thread */
539   vec_add2_aligned (vlib_worker_threads, w, 1, CLIB_CACHE_LINE_BYTES);
540   w->elog_track.name = "main thread";
541   elog_track_register (&vm->elog_main, &w->elog_track);
542
543   if (vec_len(tm->thread_prefix))
544     {
545       w->name = format(0, "%v_main%c", tm->thread_prefix, '\0');
546       vlib_set_thread_name((char *)w->name);
547     }
548
549 #if DPDK==1
550   w->dpdk_lcore_id = -1;
551   if (rte_socket_id) /* do we really have dpdk linked */
552     {
553       unsigned lcore = rte_lcore_id();
554       w->dpdk_lcore_id = lcore < RTE_MAX_LCORE ? lcore : -1;;
555     }
556 #endif
557
558   /* 
559    * Truth of the matter: we always use at least two
560    * threads. So, make the main heap thread-safe 
561    * and make the event log thread-safe.
562    */
563   main_heap_header->flags |= MHEAP_FLAG_THREAD_SAFE;
564   vm->elog_main.lock = 
565     clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES, 
566                             CLIB_CACHE_LINE_BYTES);
567   vm->elog_main.lock[0] = 0;
568           
569   if (n_vlib_mains > 1)
570     {
571       vec_validate (vlib_mains, tm->n_vlib_mains - 1);
572       _vec_len (vlib_mains) = 0;
573       vec_add1 (vlib_mains, vm);
574
575       vec_validate (vlib_frame_queues, tm->n_vlib_mains - 1);
576       _vec_len (vlib_frame_queues) = 0;
577       fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS);
578       vec_add1 (vlib_frame_queues, fq);
579
580       vlib_worker_threads->wait_at_barrier = 
581         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
582       vlib_worker_threads->workers_at_barrier =
583         clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
584
585       /* Ask for an initial barrier sync */
586       *vlib_worker_threads->workers_at_barrier = 0;
587       *vlib_worker_threads->wait_at_barrier = 1;
588
589       worker_thread_index = 1;
590
591       for (i = 0; i < vec_len(tm->registrations); i++)
592         {
593           vlib_node_main_t *nm, *nm_clone;
594           vlib_buffer_main_t *bm_clone;
595           vlib_buffer_free_list_t *fl_clone, *fl_orig;
596           vlib_buffer_free_list_t *orig_freelist_pool;
597           int k;
598
599           tr = tm->registrations[i];
600
601           if (tr->count == 0)
602             continue;
603
604           for (k = 0; k < tr->count; k++)
605           {
606             vec_add2 (vlib_worker_threads, w, 1);
607             if (tr->mheap_size)
608               w->thread_mheap = mheap_alloc (0 /* use VM */, tr->mheap_size);
609             else
610               w->thread_mheap = main_heap;
611             w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
612             w->thread_function = tr->function;
613             w->thread_function_arg = w;
614             w->instance_id = k;
615             w->registration = tr; 
616             
617             w->elog_track.name = 
618                 (char *) format (0, "%s %d", tr->name, k+1);
619             vec_add1 (w->elog_track.name, 0);
620             elog_track_register (&vm->elog_main, &w->elog_track);
621             
622             if (tr->no_data_structure_clone)
623               continue;
624
625             /* Allocate "to-worker-N" frame queue */
626             if (tr->frame_queue_nelts) {
627                 fq = vlib_frame_queue_alloc (tr->frame_queue_nelts);
628             } else {
629                 fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS);
630             }
631
632             vec_validate (vlib_frame_queues, worker_thread_index);
633             vlib_frame_queues[worker_thread_index] = fq;
634
635             /* Fork vlib_global_main et al. Look for bugs here */
636             oldheap = clib_mem_set_heap (w->thread_mheap);
637
638             vm_clone = clib_mem_alloc (sizeof (*vm_clone));
639             clib_memcpy (vm_clone, vlib_mains[0], sizeof (*vm_clone));
640
641             vm_clone->cpu_index = worker_thread_index;
642             vm_clone->heap_base = w->thread_mheap;
643             vm_clone->mbuf_alloc_list = 0;
644             memset (&vm_clone->random_buffer, 0, sizeof (vm_clone->random_buffer));
645
646             nm = &vlib_mains[0]->node_main;
647             nm_clone = &vm_clone->node_main;
648             /* fork next frames array, preserving node runtime indices */
649             nm_clone->next_frames = vec_dup (nm->next_frames);
650             for (j = 0; j < vec_len (nm_clone->next_frames); j++)
651               {
652                 vlib_next_frame_t *nf = &nm_clone->next_frames[j];
653                 u32 save_node_runtime_index;
654                 u32 save_flags;
655
656                 save_node_runtime_index = nf->node_runtime_index;
657                 save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
658                 vlib_next_frame_init (nf);
659                 nf->node_runtime_index = save_node_runtime_index;
660                 nf->flags = save_flags;
661               }
662
663             /* fork the frame dispatch queue */
664             nm_clone->pending_frames = 0;
665             vec_validate (nm_clone->pending_frames, 10); /* $$$$$?????? */
666             _vec_len (nm_clone->pending_frames) = 0;
667
668             /* fork nodes */
669             nm_clone->nodes = 0;
670             for (j = 0; j < vec_len (nm->nodes); j++) 
671               {
672                 vlib_node_t *n;
673                 n = clib_mem_alloc_no_fail (sizeof(*n));
674                 clib_memcpy (n, nm->nodes[j], sizeof (*n));
675                 /* none of the copied nodes have enqueue rights given out */
676                 n->owner_node_index = VLIB_INVALID_NODE_INDEX;
677                 memset (&n->stats_total, 0, sizeof (n->stats_total));
678                 memset (&n->stats_last_clear, 0, sizeof (n->stats_last_clear));
679                 vec_add1 (nm_clone->nodes, n);
680               }
681             nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
682               vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
683
684             nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
685               vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
686             vec_foreach(rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
687               rt->cpu_index = vm_clone->cpu_index;
688
689             nm_clone->processes = vec_dup (nm->processes);
690
691             /* zap the (per worker) frame freelists, etc */
692             nm_clone->frame_sizes = 0;
693             nm_clone->frame_size_hash = 0;
694
695             /* Packet trace buffers are guaranteed to be empty, nothing to do here */
696
697             clib_mem_set_heap (oldheap);
698             vec_add1 (vlib_mains, vm_clone);
699
700             vm_clone->error_main.counters =
701               vec_dup(vlib_mains[0]->error_main.counters);
702             vm_clone->error_main.counters_last_clear =
703               vec_dup(vlib_mains[0]->error_main.counters_last_clear);
704
705             /* Fork the vlib_buffer_main_t free lists, etc. */
706             bm_clone = vec_dup (vm_clone->buffer_main);
707             vm_clone->buffer_main = bm_clone;
708
709             orig_freelist_pool = bm_clone->buffer_free_list_pool;
710             bm_clone->buffer_free_list_pool = 0;
711
712             pool_foreach (fl_orig, orig_freelist_pool,
713                           ({
714                             pool_get_aligned (bm_clone->buffer_free_list_pool, 
715                                               fl_clone, CLIB_CACHE_LINE_BYTES);
716                             ASSERT (fl_orig - orig_freelist_pool 
717                                     == fl_clone - bm_clone->buffer_free_list_pool);
718
719                             fl_clone[0] = fl_orig[0];
720                             fl_clone->aligned_buffers = 0;
721                             fl_clone->unaligned_buffers = 0;
722                             fl_clone->n_alloc = 0;
723                           }));
724
725             worker_thread_index++;
726           }
727         }
728     }
729   else
730     {
731       /* only have non-data-structure copy threads to create... */
732       for (i = 0; i < vec_len(tm->registrations); i++)
733         {
734           tr = tm->registrations[i];
735
736           for (j = 0; j < tr->count; j++)
737             {
738               vec_add2 (vlib_worker_threads, w, 1);
739               if (tr->mheap_size)
740                 w->thread_mheap = mheap_alloc (0 /* use VM */, tr->mheap_size);
741               else
742                 w->thread_mheap = main_heap;
743               w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
744               w->thread_function = tr->function;
745               w->thread_function_arg = w;
746               w->instance_id = j;
747               w->elog_track.name = 
748                   (char *) format (0, "%s %d", tr->name, j+1);
749               w->registration = tr;
750               vec_add1 (w->elog_track.name, 0);
751               elog_track_register (&vm->elog_main, &w->elog_track);
752             }
753         }
754     }
755
756   worker_thread_index = 1;
757
758   for (i = 0; i < vec_len (tm->registrations); i++)
759     {
760       int j;
761
762       tr = tm->registrations[i];
763
764       if (tr->use_pthreads || tm->use_pthreads)
765         {
766           for (j = 0; j < tr->count; j++)
767             {
768               w = vlib_worker_threads + worker_thread_index++;
769               if (vlib_launch_thread (vlib_worker_thread_bootstrap_fn, w, 0) < 0)
770                 clib_warning ("Couldn't start '%s' pthread ", tr->name);
771             }
772         }
773       else
774         {
775             uword c;
776             clib_bitmap_foreach (c, tr->coremask, ({
777               w = vlib_worker_threads + worker_thread_index++;
778               if (vlib_launch_thread (vlib_worker_thread_bootstrap_fn, w, c) < 0)
779                 clib_warning ("Couldn't start DPDK lcore %d", c);
780
781             }));
782         }
783     }
784   vlib_worker_thread_barrier_sync(vm);
785   vlib_worker_thread_barrier_release(vm);
786   return 0;
787 }
788
789 VLIB_MAIN_LOOP_ENTER_FUNCTION (start_workers);
790
791 void vlib_worker_thread_node_runtime_update(void)
792 {
793   int i, j;
794   vlib_worker_thread_t *w;
795   vlib_main_t *vm;
796   vlib_node_main_t *nm, *nm_clone;
797   vlib_node_t ** old_nodes_clone;
798   vlib_main_t *vm_clone;
799   vlib_node_runtime_t * rt, * old_rt;
800   void *oldheap;
801   never_inline void
802     vlib_node_runtime_sync_stats (vlib_main_t * vm,
803                                   vlib_node_runtime_t * r,
804                                   uword n_calls,
805                                   uword n_vectors,
806                                   uword n_clocks);
807   
808   ASSERT (os_get_cpu_number() == 0);
809
810   if (vec_len (vlib_mains) == 0)
811     return;
812
813   vm = vlib_mains[0];
814   nm = &vm->node_main;
815
816   ASSERT (os_get_cpu_number() == 0);
817   ASSERT (*vlib_worker_threads->wait_at_barrier == 1);
818
819   /* 
820    * Scrape all runtime stats, so we don't lose node runtime(s) with
821    * pending counts, or throw away worker / io thread counts.
822    */
823   for (j = 0; j < vec_len (nm->nodes); j++) 
824     {
825       vlib_node_t * n;
826       n = nm->nodes[j];
827       vlib_node_sync_stats (vm, n);
828     }
829
830   for (i = 1; i < vec_len (vlib_mains); i++)
831     {
832       vlib_node_t * n;
833       
834       vm_clone = vlib_mains[i];
835       nm_clone = &vm_clone->node_main;
836
837       for (j = 0; j < vec_len (nm_clone->nodes); j++) 
838         {
839           n = nm_clone->nodes[j];
840
841           rt = vlib_node_get_runtime (vm_clone, n->index);
842           vlib_node_runtime_sync_stats (vm_clone, rt, 0, 0, 0);
843         }
844     }
845
846   for (i = 1; i < vec_len (vlib_mains); i++)
847     {
848       vlib_node_runtime_t * rt;
849       w = vlib_worker_threads + i;
850       oldheap = clib_mem_set_heap (w->thread_mheap);
851
852       vm_clone = vlib_mains[i];
853
854       /* Re-clone error heap */
855       u64 * old_counters = vm_clone->error_main.counters;
856       u64 * old_counters_all_clear = vm_clone->error_main.counters_last_clear;
857       clib_memcpy (&vm_clone->error_main, &vm->error_main, sizeof (vm->error_main));
858       j = vec_len(vm->error_main.counters) - 1;
859       vec_validate_aligned(old_counters, j, CLIB_CACHE_LINE_BYTES);
860       vec_validate_aligned(old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES);
861       vm_clone->error_main.counters = old_counters;
862       vm_clone->error_main.counters_last_clear = old_counters_all_clear;
863
864       nm_clone = &vm_clone->node_main;
865       vec_free (nm_clone->next_frames);
866       nm_clone->next_frames = vec_dup (nm->next_frames);
867
868       for (j = 0; j < vec_len (nm_clone->next_frames); j++)
869         {
870           vlib_next_frame_t *nf = &nm_clone->next_frames[j];
871           u32 save_node_runtime_index;
872           u32 save_flags;
873
874           save_node_runtime_index = nf->node_runtime_index;
875           save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
876           vlib_next_frame_init (nf);
877           nf->node_runtime_index = save_node_runtime_index;
878           nf->flags = save_flags;
879         }
880
881       old_nodes_clone = nm_clone->nodes;
882       nm_clone->nodes = 0;
883
884       /* re-fork nodes */
885       for (j = 0; j < vec_len (nm->nodes); j++) {
886         vlib_node_t *old_n_clone;
887         vlib_node_t *new_n, *new_n_clone;
888
889         new_n = nm->nodes[j];
890         old_n_clone = old_nodes_clone[j];
891
892         new_n_clone = clib_mem_alloc_no_fail (sizeof(*new_n_clone));
893         clib_memcpy (new_n_clone, new_n, sizeof (*new_n));
894         /* none of the copied nodes have enqueue rights given out */
895         new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX;
896
897         if (j >= vec_len (old_nodes_clone))
898           {
899             /* new node, set to zero */
900             memset (&new_n_clone->stats_total, 0, 
901                     sizeof (new_n_clone->stats_total));
902             memset (&new_n_clone->stats_last_clear, 0, 
903                     sizeof (new_n_clone->stats_last_clear));
904           }
905         else
906           {
907             /* Copy stats if the old data is valid */
908             clib_memcpy (&new_n_clone->stats_total, 
909                     &old_n_clone->stats_total,
910                     sizeof (new_n_clone->stats_total));
911             clib_memcpy (&new_n_clone->stats_last_clear, 
912                     &old_n_clone->stats_last_clear,
913                     sizeof (new_n_clone->stats_last_clear));
914
915             /* keep previous node state */
916             new_n_clone->state = old_n_clone->state;
917           }
918         vec_add1 (nm_clone->nodes, new_n_clone);
919       }
920       /* Free the old node clone */
921       for (j = 0; j < vec_len(old_nodes_clone); j++)
922         clib_mem_free (old_nodes_clone[j]);
923       vec_free (old_nodes_clone);
924       
925       vec_free (nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
926
927       nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
928           vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);
929
930       /* clone input node runtime */
931       old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT];
932
933       nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
934         vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
935
936       vec_foreach(rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
937         {
938           rt->cpu_index = vm_clone->cpu_index;
939         }
940
941       for (j=0; j < vec_len(old_rt); j++)
942         {
943           rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index);
944           rt->state = old_rt[j].state;
945         }
946
947       vec_free(old_rt);
948
949       nm_clone->processes = vec_dup (nm->processes);
950
951       clib_mem_set_heap (oldheap);
952
953       // vnet_main_fork_fixup (i);
954     }
955 }
956
957 static clib_error_t *
958 cpu_config (vlib_main_t * vm, unformat_input_t * input)
959 {
960   vlib_thread_registration_t *tr;
961   uword * p;
962   vlib_thread_main_t * tm = &vlib_thread_main;
963   u8 * name;
964   u64 coremask;
965   uword * bitmap;
966   u32 count;
967
968   tm->thread_registrations_by_name = hash_create_string (0, sizeof (uword));
969   tm->n_thread_stacks = 1;      /* account for main thread */
970
971   tr = tm->next;
972
973   while (tr)
974     {
975       hash_set_mem (tm->thread_registrations_by_name, tr->name, (uword)tr);
976       tr = tr->next;
977     }
978
979   while (unformat_check_input(input) != UNFORMAT_END_OF_INPUT)
980     {
981       if (unformat (input, "use-pthreads"))
982         tm->use_pthreads = 1;
983       else if (unformat (input, "thread-prefix %v", &tm->thread_prefix))
984           ;
985       else if (unformat (input, "main-core %u", &tm->main_lcore))
986           ;
987       else if (unformat (input, "skip-cores %u", &tm->skip_cores))
988           ;
989       else if (unformat (input, "coremask-%s %llx", &name, &coremask))
990         {
991           p = hash_get_mem (tm->thread_registrations_by_name, name);
992           if (p == 0)
993             return clib_error_return (0, "no such thread type '%s'", name);
994
995           tr = (vlib_thread_registration_t *)p[0];
996
997           if  (tr->use_pthreads)
998             return clib_error_return (0, "coremask cannot be set for '%s' threads",
999                                       name);
1000
1001           tr->coremask = clib_bitmap_set_multiple 
1002             (tr->coremask, 0, coremask, BITS(coremask));
1003           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1004         }
1005       else if (unformat (input, "corelist-%s %U", &name, unformat_bitmap_list,
1006                &bitmap))
1007         {
1008           p = hash_get_mem (tm->thread_registrations_by_name, name);
1009           if (p == 0)
1010             return clib_error_return (0, "no such thread type '%s'", name);
1011
1012           tr = (vlib_thread_registration_t *)p[0];
1013
1014           if  (tr->use_pthreads)
1015             return clib_error_return (0, "corelist cannot be set for '%s' threads",
1016                                       name);
1017
1018           tr->coremask = bitmap;
1019           tr->count = clib_bitmap_count_set_bits (tr->coremask);
1020         }
1021       else if (unformat (input, "%s %u", &name, &count))
1022         {
1023           p = hash_get_mem (tm->thread_registrations_by_name, name);
1024           if (p == 0)
1025               return clib_error_return (0, "no such thread type '%s'", name);
1026                                         
1027           tr = (vlib_thread_registration_t *)p[0];
1028           if (tr->fixed_count)
1029             return clib_error_return 
1030               (0, "number of %s threads not configurable", tr->name);
1031           tr->count = count;
1032         }
1033       else 
1034         break;
1035     }
1036
1037   tr = tm->next;
1038
1039   if (!tm->thread_prefix)
1040     tm->thread_prefix = format(0, "vpp");
1041
1042   while (tr)
1043     {
1044       tm->n_thread_stacks += tr->count;
1045       tm->n_pthreads += tr->count * tr->use_pthreads;
1046       tm->n_eal_threads += tr->count * (tr->use_pthreads == 0);
1047       tr = tr->next;
1048     }
1049
1050   return 0;
1051 }
1052
1053 VLIB_EARLY_CONFIG_FUNCTION (cpu_config, "cpu");
1054
1055 #if !defined (__x86_64__) && !defined (__aarch64__) && !defined (__powerpc64__) && !defined(__arm__)
1056 void __sync_fetch_and_add_8 (void)
1057 {
1058   fformat(stderr, "%s called\n", __FUNCTION__);
1059   abort();
1060 }
1061 void __sync_add_and_fetch_8 (void)
1062 {
1063   fformat(stderr, "%s called\n", __FUNCTION__);
1064   abort();
1065 }
1066 #endif
1067
1068 void vnet_main_fixup (vlib_fork_fixup_t which) __attribute__ ((weak));
1069 void vnet_main_fixup (vlib_fork_fixup_t which) { }
1070
1071 void vlib_worker_thread_fork_fixup (vlib_fork_fixup_t which)
1072 {
1073   vlib_main_t * vm = vlib_get_main();
1074
1075   if (vlib_mains == 0)
1076     return;
1077
1078   ASSERT(os_get_cpu_number() == 0);
1079   vlib_worker_thread_barrier_sync(vm);
1080
1081   switch (which)
1082     {
1083     case VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX:
1084       vnet_main_fixup (VLIB_WORKER_THREAD_FORK_FIXUP_NEW_SW_IF_INDEX);
1085       break;
1086
1087     default:
1088       ASSERT(0);
1089     }
1090   vlib_worker_thread_barrier_release(vm);
1091 }
1092
1093 void vlib_worker_thread_barrier_sync(vlib_main_t *vm)
1094 {
1095   f64 deadline;
1096   u32 count;
1097   
1098   if (!vlib_mains)
1099       return;
1100
1101   count = vec_len (vlib_mains) - 1;
1102
1103   /* Tolerate recursive calls */
1104   if (++vlib_worker_threads[0].recursion_level > 1)
1105       return;
1106
1107   vlib_worker_threads[0].barrier_sync_count++;
1108
1109   ASSERT (os_get_cpu_number() == 0);
1110
1111   deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT;
1112
1113   *vlib_worker_threads->wait_at_barrier = 1;
1114   while (*vlib_worker_threads->workers_at_barrier != count)
1115     {
1116       if (vlib_time_now(vm) > deadline)
1117         {
1118           fformat(stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1119           os_panic();
1120         }
1121     }
1122 }
1123
1124 void vlib_worker_thread_barrier_release(vlib_main_t * vm)
1125 {
1126   f64 deadline;
1127
1128   if (!vlib_mains)
1129       return;
1130
1131   if (--vlib_worker_threads[0].recursion_level > 0)
1132     return;
1133
1134   deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT;
1135
1136   *vlib_worker_threads->wait_at_barrier = 0;
1137
1138   while (*vlib_worker_threads->workers_at_barrier > 0)
1139     {
1140       if (vlib_time_now(vm) > deadline)
1141         {
1142           fformat(stderr, "%s: worker thread deadlock\n", __FUNCTION__);
1143           os_panic();
1144         }
1145     }
1146 }
1147
1148 /*
1149  * Check the frame queue to see if any frames are available.
1150  * If so, pull the packets off the frames and put them to 
1151  * the handoff node.
1152  */
1153 static inline int vlib_frame_queue_dequeue_internal (vlib_main_t *vm)
1154 {
1155   u32 thread_id = vm->cpu_index;
1156   vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
1157   vlib_frame_queue_elt_t *elt;
1158   u32 * from, * to;
1159   vlib_frame_t * f;
1160   int msg_type;
1161   int processed = 0;
1162   u32 n_left_to_node;
1163   u32 vectors = 0;
1164   vlib_thread_main_t *tm = vlib_get_thread_main();
1165
1166   ASSERT (fq);
1167   ASSERT(vm == vlib_mains[thread_id]);
1168
1169   if (PREDICT_FALSE (tm->handoff_dispatch_node_index == ~0))
1170       return 0;
1171   /*
1172    * Gather trace data for frame queues
1173    */
1174   if (PREDICT_FALSE(fq->trace))
1175     {
1176       frame_queue_trace_t *fqt;
1177       frame_queue_nelt_counter_t *fqh;
1178       u32 elix;
1179
1180       fqt = &tm->frame_queue_traces[thread_id];
1181
1182       fqt->nelts = fq->nelts;
1183       fqt->head = fq->head;
1184       fqt->head_hint = fq->head_hint;
1185       fqt->tail = fq->tail;
1186       fqt->threshold = fq->vector_threshold;
1187       fqt->n_in_use = fqt->tail - fqt->head;
1188       if (fqt->n_in_use >= fqt->nelts){
1189         // if beyond max then use max
1190         fqt->n_in_use = fqt->nelts-1;
1191       }
1192
1193       /* Record the number of elements in use in the histogram */
1194       fqh = &tm->frame_queue_histogram[thread_id];
1195       fqh->count[ fqt->n_in_use ]++;
1196
1197       /* Record a snapshot of the elements in use */
1198       for (elix=0; elix<fqt->nelts; elix++) {
1199         elt = fq->elts + ((fq->head+1 + elix) & (fq->nelts-1));
1200         if (1 || elt->valid) 
1201           {
1202             fqt->n_vectors[elix] = elt->n_vectors;
1203           }
1204       }
1205       fqt->written = 1;
1206     }
1207
1208   while (1)
1209     {
1210       if (fq->head == fq->tail)
1211         {
1212           fq->head_hint = fq->head;
1213           return processed;
1214         }
1215
1216       elt = fq->elts + ((fq->head+1) & (fq->nelts-1));
1217
1218       if (!elt->valid)
1219         {
1220           fq->head_hint = fq->head;
1221           return processed;
1222         }
1223
1224       from = elt->buffer_index;
1225       msg_type = elt->msg_type;
1226
1227       ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME);
1228       ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE);
1229
1230       f = vlib_get_frame_to_node (vm, tm->handoff_dispatch_node_index);
1231
1232       to = vlib_frame_vector_args (f);
1233
1234       n_left_to_node = elt->n_vectors;
1235
1236       while (n_left_to_node >= 4)
1237         {
1238           to[0] = from[0];
1239           to[1] = from[1];
1240           to[2] = from[2];
1241           to[3] = from[3];
1242           to += 4;
1243           from += 4;
1244           n_left_to_node -= 4;
1245         }
1246
1247       while (n_left_to_node > 0)
1248         {
1249           to[0] = from[0];
1250           to++;
1251           from++;
1252           n_left_to_node--;
1253         }
1254
1255       vectors += elt->n_vectors;
1256       f->n_vectors = elt->n_vectors;
1257       vlib_put_frame_to_node (vm, tm->handoff_dispatch_node_index, f);
1258
1259       elt->valid = 0;
1260       elt->n_vectors = 0;
1261       elt->msg_type = 0xfefefefe;
1262       CLIB_MEMORY_BARRIER();
1263       fq->head++;
1264       processed++;
1265
1266       /*
1267        * Limit the number of packets pushed into the graph
1268        */
1269       if (vectors >= fq->vector_threshold)
1270         {
1271           fq->head_hint = fq->head;
1272           return processed;
1273         }
1274     }
1275   ASSERT(0);
1276   return processed;
1277 }
1278
1279 static_always_inline void
1280 vlib_worker_thread_internal (vlib_main_t *vm)
1281 {
1282   vlib_node_main_t * nm = &vm->node_main;
1283   u64 cpu_time_now = clib_cpu_time_now ();
1284
1285   while (1)
1286     {
1287       vlib_worker_thread_barrier_check ();
1288
1289       vlib_frame_queue_dequeue_internal (vm);
1290
1291       vlib_node_runtime_t * n;
1292       vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT])
1293         {
1294           cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
1295                                         VLIB_NODE_STATE_POLLING, /* frame */ 0,
1296                                         cpu_time_now);
1297         }
1298
1299       if (_vec_len (nm->pending_frames))
1300         {
1301           int i;
1302           cpu_time_now = clib_cpu_time_now ();
1303           for (i = 0; i < _vec_len (nm->pending_frames); i++) {
1304             vlib_pending_frame_t *p;
1305
1306             p = nm->pending_frames + i;
1307
1308             cpu_time_now = dispatch_pending_node (vm, p, cpu_time_now);
1309           }
1310           _vec_len (nm->pending_frames) = 0;
1311         }
1312       vlib_increment_main_loop_counter (vm);
1313
1314       /* Record time stamp in case there are no enabled nodes and above
1315          calls do not update time stamp. */
1316       cpu_time_now = clib_cpu_time_now ();
1317     }
1318 }
1319
1320 void vlib_worker_thread_fn (void * arg)
1321 {
1322   vlib_worker_thread_t * w = (vlib_worker_thread_t *) arg;
1323   vlib_thread_main_t * tm = vlib_get_thread_main();
1324   vlib_main_t * vm = vlib_get_main();
1325
1326   ASSERT(vm->cpu_index == os_get_cpu_number());
1327
1328   vlib_worker_thread_init (w);
1329   clib_time_init (&vm->clib_time);
1330   clib_mem_set_heap (w->thread_mheap);
1331
1332   /* Wait until the dpdk init sequence is complete */
1333   while (tm->worker_thread_release == 0)
1334     vlib_worker_thread_barrier_check ();
1335
1336   vlib_worker_thread_internal(vm);
1337 }
1338
1339 VLIB_REGISTER_THREAD (worker_thread_reg, static) = {
1340   .name = "workers",
1341   .short_name = "wk",
1342   .function = vlib_worker_thread_fn,
1343 };
1344
1345 clib_error_t *threads_init (vlib_main_t *vm)
1346 {
1347   vlib_thread_main_t * tm = vlib_get_thread_main();
1348
1349   tm->handoff_dispatch_node_index = ~0;
1350
1351   return 0;
1352 }
1353
1354 VLIB_INIT_FUNCTION (threads_init);