vcl: support inter worker rpc
[vpp.git] / src / vcl / vcl_locked.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vcl/vcl_locked.h>
17 #include <vcl/vcl_private.h>
18
19 typedef struct vls_shared_data_
20 {
21   clib_spinlock_t lock;
22   u32 owner_wrk_index;
23   u32 *workers_subscribed;
24   clib_bitmap_t *listeners;
25 } vls_shared_data_t;
26
27 typedef struct vcl_locked_session_
28 {
29   clib_spinlock_t lock;
30   u32 session_index;
31   u32 worker_index;
32   u32 vls_index;
33   u32 shared_data_index;
34 } vcl_locked_session_t;
35
36 typedef struct vls_worker_
37 {
38   vcl_locked_session_t *vls_pool;
39   uword *session_index_to_vlsh_table;
40   u32 wrk_index;
41   volatile int rpc_done;
42 } vls_worker_t;
43
44 typedef struct vls_local_
45 {
46   int vls_wrk_index;
47   volatile int vls_mt_n_threads;
48   pthread_mutex_t vls_mt_mq_mlock;
49   pthread_mutex_t vls_mt_spool_mlock;
50   volatile u8 select_mp_check;
51   volatile u8 epoll_mp_check;
52 } vls_process_local_t;
53
54 static vls_process_local_t vls_local;
55 static vls_process_local_t *vlsl = &vls_local;
56
57 typedef struct vls_main_
58 {
59   vls_worker_t *workers;
60   clib_rwlock_t vls_table_lock;
61   /** Pool of data shared by sessions owned by different workers */
62   vls_shared_data_t *shared_data_pool;
63   clib_rwlock_t shared_data_lock;
64 } vls_main_t;
65
66 vls_main_t *vlsm;
67
68 typedef enum vls_rpc_msg_type_
69 {
70   VLS_RPC_CLONE_AND_SHARE,
71 } vls_rpc_msg_type_e;
72
73 typedef struct vls_rpc_msg_
74 {
75   u8 type;
76   u8 data[0];
77 } vls_rpc_msg_t;
78
79 typedef struct vls_clone_and_share_msg_
80 {
81   u32 vls_index;                /**< vls to be shared */
82   u32 origin_vls_wrk;           /**< worker that initiated the rpc */
83   u32 origin_vls_index;         /**< vls session of the originator */
84 } vls_clone_and_share_msg_t;
85
86 static inline u32
87 vls_get_worker_index (void)
88 {
89   return vcl_get_worker_index ();
90 }
91
92 static u32
93 vls_shared_data_alloc (void)
94 {
95   vls_shared_data_t *vls_shd;
96   u32 shd_index;
97
98   clib_rwlock_writer_lock (&vlsm->shared_data_lock);
99   pool_get_zero (vlsm->shared_data_pool, vls_shd);
100   clib_spinlock_init (&vls_shd->lock);
101   shd_index = vls_shd - vlsm->shared_data_pool;
102   clib_rwlock_writer_unlock (&vlsm->shared_data_lock);
103
104   return shd_index;
105 }
106
107 static u32
108 vls_shared_data_index (vls_shared_data_t * vls_shd)
109 {
110   return vls_shd - vlsm->shared_data_pool;
111 }
112
113 vls_shared_data_t *
114 vls_shared_data_get (u32 shd_index)
115 {
116   if (pool_is_free_index (vlsm->shared_data_pool, shd_index))
117     return 0;
118   return pool_elt_at_index (vlsm->shared_data_pool, shd_index);
119 }
120
121 static void
122 vls_shared_data_free (u32 shd_index)
123 {
124   vls_shared_data_t *vls_shd;
125
126   clib_rwlock_writer_lock (&vlsm->shared_data_lock);
127   vls_shd = vls_shared_data_get (shd_index);
128   clib_spinlock_free (&vls_shd->lock);
129   clib_bitmap_free (vls_shd->listeners);
130   vec_free (vls_shd->workers_subscribed);
131   pool_put (vlsm->shared_data_pool, vls_shd);
132   clib_rwlock_writer_unlock (&vlsm->shared_data_lock);
133 }
134
135 static inline void
136 vls_shared_data_pool_rlock (void)
137 {
138   clib_rwlock_reader_lock (&vlsm->shared_data_lock);
139 }
140
141 static inline void
142 vls_shared_data_pool_runlock (void)
143 {
144   clib_rwlock_reader_unlock (&vlsm->shared_data_lock);
145 }
146
147 static inline void
148 vls_table_rlock (void)
149 {
150   if (vlsl->vls_mt_n_threads > 1)
151     clib_rwlock_reader_lock (&vlsm->vls_table_lock);
152 }
153
154 static inline void
155 vls_table_runlock (void)
156 {
157   if (vlsl->vls_mt_n_threads > 1)
158     clib_rwlock_reader_unlock (&vlsm->vls_table_lock);
159 }
160
161 static inline void
162 vls_table_wlock (void)
163 {
164   if (vlsl->vls_mt_n_threads > 1)
165     clib_rwlock_writer_lock (&vlsm->vls_table_lock);
166 }
167
168 static inline void
169 vls_table_wunlock (void)
170 {
171   if (vlsl->vls_mt_n_threads > 1)
172     clib_rwlock_writer_unlock (&vlsm->vls_table_lock);
173 }
174
175 typedef enum
176 {
177   VLS_MT_OP_READ,
178   VLS_MT_OP_WRITE,
179   VLS_MT_OP_SPOOL,
180   VLS_MT_OP_XPOLL,
181 } vls_mt_ops_t;
182
183 typedef enum
184 {
185   VLS_MT_LOCK_MQ = 1 << 0,
186   VLS_MT_LOCK_SPOOL = 1 << 1
187 } vls_mt_lock_type_t;
188
189 static void
190 vls_mt_add (void)
191 {
192   vlsl->vls_mt_n_threads += 1;
193   vcl_set_worker_index (vlsl->vls_wrk_index);
194 }
195
196 static inline void
197 vls_mt_mq_lock (void)
198 {
199   pthread_mutex_lock (&vlsl->vls_mt_mq_mlock);
200 }
201
202 static inline void
203 vls_mt_mq_unlock (void)
204 {
205   pthread_mutex_unlock (&vlsl->vls_mt_mq_mlock);
206 }
207
208 static inline void
209 vls_mt_spool_lock (void)
210 {
211   pthread_mutex_lock (&vlsl->vls_mt_spool_mlock);
212 }
213
214 static inline void
215 vls_mt_create_unlock (void)
216 {
217   pthread_mutex_unlock (&vlsl->vls_mt_spool_mlock);
218 }
219
220 static void
221 vls_mt_locks_init (void)
222 {
223   pthread_mutex_init (&vlsl->vls_mt_mq_mlock, NULL);
224   pthread_mutex_init (&vlsl->vls_mt_spool_mlock, NULL);
225 }
226
227 u8
228 vls_is_shared (vcl_locked_session_t * vls)
229 {
230   return (vls->shared_data_index != ~0);
231 }
232
233 static inline void
234 vls_lock (vcl_locked_session_t * vls)
235 {
236   if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls))
237     clib_spinlock_lock (&vls->lock);
238 }
239
240 static inline void
241 vls_unlock (vcl_locked_session_t * vls)
242 {
243   if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls))
244     clib_spinlock_unlock (&vls->lock);
245 }
246
247 static inline vcl_session_handle_t
248 vls_to_sh (vcl_locked_session_t * vls)
249 {
250   return vcl_session_handle_from_index (vls->session_index);
251 }
252
253 static inline vcl_session_handle_t
254 vls_to_sh_tu (vcl_locked_session_t * vls)
255 {
256   vcl_session_handle_t sh;
257   sh = vls_to_sh (vls);
258   vls_table_runlock ();
259   return sh;
260 }
261
262 static vls_worker_t *
263 vls_worker_get_current (void)
264 {
265   return pool_elt_at_index (vlsm->workers, vls_get_worker_index ());
266 }
267
268 static void
269 vls_worker_alloc (void)
270 {
271   vls_worker_t *wrk;
272
273   pool_get_zero (vlsm->workers, wrk);
274   wrk->wrk_index = vcl_get_worker_index ();
275 }
276
277 static void
278 vls_worker_free (vls_worker_t * wrk)
279 {
280   hash_free (wrk->session_index_to_vlsh_table);
281   pool_free (wrk->vls_pool);
282   pool_put (vlsm->workers, wrk);
283 }
284
285 static vls_worker_t *
286 vls_worker_get (u32 wrk_index)
287 {
288   if (pool_is_free_index (vlsm->workers, wrk_index))
289     return 0;
290   return pool_elt_at_index (vlsm->workers, wrk_index);
291 }
292
293 static vls_handle_t
294 vls_alloc (vcl_session_handle_t sh)
295 {
296   vls_worker_t *wrk = vls_worker_get_current ();
297   vcl_locked_session_t *vls;
298
299   vls_table_wlock ();
300
301   pool_get_zero (wrk->vls_pool, vls);
302   vls->session_index = vppcom_session_index (sh);
303   vls->worker_index = vppcom_session_worker (sh);
304   vls->vls_index = vls - wrk->vls_pool;
305   vls->shared_data_index = ~0;
306   hash_set (wrk->session_index_to_vlsh_table, vls->session_index,
307             vls->vls_index);
308   clib_spinlock_init (&vls->lock);
309
310   vls_table_wunlock ();
311   return vls->vls_index;
312 }
313
314 static vcl_locked_session_t *
315 vls_get (vls_handle_t vlsh)
316 {
317   vls_worker_t *wrk = vls_worker_get_current ();
318   if (pool_is_free_index (wrk->vls_pool, vlsh))
319     return 0;
320   return pool_elt_at_index (wrk->vls_pool, vlsh);
321 }
322
323 static void
324 vls_free (vcl_locked_session_t * vls)
325 {
326   vls_worker_t *wrk = vls_worker_get_current ();
327
328   ASSERT (vls != 0);
329   hash_unset (wrk->session_index_to_vlsh_table, vls->session_index);
330   clib_spinlock_free (&vls->lock);
331   pool_put (wrk->vls_pool, vls);
332 }
333
334 static vcl_locked_session_t *
335 vls_get_and_lock (vls_handle_t vlsh)
336 {
337   vls_worker_t *wrk = vls_worker_get_current ();
338   vcl_locked_session_t *vls;
339   if (pool_is_free_index (wrk->vls_pool, vlsh))
340     return 0;
341   vls = pool_elt_at_index (wrk->vls_pool, vlsh);
342   vls_lock (vls);
343   return vls;
344 }
345
346 static vcl_locked_session_t *
347 vls_get_w_dlock (vls_handle_t vlsh)
348 {
349   vcl_locked_session_t *vls;
350   vls_table_rlock ();
351   vls = vls_get_and_lock (vlsh);
352   if (!vls)
353     vls_table_runlock ();
354   return vls;
355 }
356
357 static inline void
358 vls_get_and_unlock (vls_handle_t vlsh)
359 {
360   vcl_locked_session_t *vls;
361   vls_table_rlock ();
362   vls = vls_get (vlsh);
363   vls_unlock (vls);
364   vls_table_runlock ();
365 }
366
367 static inline void
368 vls_dunlock (vcl_locked_session_t * vls)
369 {
370   vls_unlock (vls);
371   vls_table_runlock ();
372 }
373
374 static vcl_locked_session_t *
375 vls_session_get (vls_worker_t * wrk, u32 vls_index)
376 {
377   if (pool_is_free_index (wrk->vls_pool, vls_index))
378     return 0;
379   return pool_elt_at_index (wrk->vls_pool, vls_index);
380 }
381
382 vcl_session_handle_t
383 vlsh_to_sh (vls_handle_t vlsh)
384 {
385   vcl_locked_session_t *vls;
386   int rv;
387
388   vls = vls_get_w_dlock (vlsh);
389   if (!vls)
390     return INVALID_SESSION_ID;
391   rv = vls_to_sh (vls);
392   vls_dunlock (vls);
393   return rv;
394 }
395
396 vcl_session_handle_t
397 vlsh_to_session_index (vls_handle_t vlsh)
398 {
399   vcl_session_handle_t sh;
400   sh = vlsh_to_sh (vlsh);
401   return vppcom_session_index (sh);
402 }
403
404 vls_handle_t
405 vls_si_to_vlsh (u32 session_index)
406 {
407   vls_worker_t *wrk = vls_worker_get_current ();
408   uword *vlshp;
409   vlshp = hash_get (wrk->session_index_to_vlsh_table, session_index);
410   return vlshp ? *vlshp : VLS_INVALID_HANDLE;
411 }
412
413 vls_handle_t
414 vls_session_index_to_vlsh (uint32_t session_index)
415 {
416   vls_handle_t vlsh;
417
418   vls_table_rlock ();
419   vlsh = vls_si_to_vlsh (session_index);
420   vls_table_runlock ();
421
422   return vlsh;
423 }
424
425 u8
426 vls_is_shared_by_wrk (vcl_locked_session_t * vls, u32 wrk_index)
427 {
428   vls_shared_data_t *vls_shd;
429   int i;
430
431   if (vls->shared_data_index == ~0)
432     return 0;
433
434   vls_shared_data_pool_rlock ();
435
436   vls_shd = vls_shared_data_get (vls->shared_data_index);
437   clib_spinlock_lock (&vls_shd->lock);
438
439   for (i = 0; i < vec_len (vls_shd->workers_subscribed); i++)
440     if (vls_shd->workers_subscribed[i] == wrk_index)
441       {
442         clib_spinlock_unlock (&vls_shd->lock);
443         vls_shared_data_pool_runlock ();
444         return 1;
445       }
446   clib_spinlock_unlock (&vls_shd->lock);
447
448   vls_shared_data_pool_runlock ();
449   return 0;
450 }
451
452 static void
453 vls_listener_wrk_set (vcl_locked_session_t * vls, u32 wrk_index, u8 is_active)
454 {
455   vls_shared_data_t *vls_shd;
456
457   if (vls->shared_data_index == ~0)
458     {
459       clib_warning ("not a shared session");
460       return;
461     }
462
463   vls_shared_data_pool_rlock ();
464
465   vls_shd = vls_shared_data_get (vls->shared_data_index);
466
467   clib_spinlock_lock (&vls_shd->lock);
468   clib_bitmap_set (vls_shd->listeners, wrk_index, is_active);
469   clib_spinlock_unlock (&vls_shd->lock);
470
471   vls_shared_data_pool_runlock ();
472 }
473
474 static u32
475 vls_shared_get_owner (vcl_locked_session_t * vls)
476 {
477   vls_shared_data_t *vls_shd;
478   u32 owner_wrk;
479
480   vls_shared_data_pool_rlock ();
481
482   vls_shd = vls_shared_data_get (vls->shared_data_index);
483   owner_wrk = vls_shd->owner_wrk_index;
484
485   vls_shared_data_pool_runlock ();
486
487   return owner_wrk;
488 }
489
490 static u8
491 vls_listener_wrk_is_active (vcl_locked_session_t * vls, u32 wrk_index)
492 {
493   vls_shared_data_t *vls_shd;
494   u8 is_set;
495
496   if (vls->shared_data_index == ~0)
497     {
498       clib_warning ("not a shared session");
499       return 0;
500     }
501
502   vls_shared_data_pool_rlock ();
503
504   vls_shd = vls_shared_data_get (vls->shared_data_index);
505
506   clib_spinlock_lock (&vls_shd->lock);
507   is_set = clib_bitmap_get (vls_shd->listeners, wrk_index);
508   clib_spinlock_unlock (&vls_shd->lock);
509
510   vls_shared_data_pool_runlock ();
511
512   return (is_set == 1);
513 }
514
515 static void
516 vls_listener_wrk_start_listen (vcl_locked_session_t * vls, u32 wrk_index)
517 {
518   vppcom_session_listen (vls_to_sh (vls), ~0);
519   vls_listener_wrk_set (vls, wrk_index, 1 /* is_active */ );
520 }
521
522 static void
523 vls_listener_wrk_stop_listen (vcl_locked_session_t * vls, u32 wrk_index)
524 {
525   vcl_worker_t *wrk;
526   vcl_session_t *s;
527
528   wrk = vcl_worker_get (wrk_index);
529   s = vcl_session_get (wrk, vls->session_index);
530   if (s->session_state != STATE_LISTEN)
531     return;
532   vcl_send_session_unlisten (wrk, s);
533   s->session_state = STATE_LISTEN_NO_MQ;
534   vls_listener_wrk_set (vls, wrk_index, 0 /* is_active */ );
535 }
536
537 static int
538 vls_shared_data_subscriber_position (vls_shared_data_t * vls_shd,
539                                      u32 wrk_index)
540 {
541   int i;
542
543   for (i = 0; i < vec_len (vls_shd->workers_subscribed); i++)
544     {
545       if (vls_shd->workers_subscribed[i] == wrk_index)
546         return i;
547     }
548   return -1;
549 }
550
551 int
552 vls_unshare_session (vcl_locked_session_t * vls, vcl_worker_t * wrk)
553 {
554   vls_shared_data_t *vls_shd;
555   int do_disconnect, pos;
556   u32 n_subscribers;
557   vcl_session_t *s;
558
559   ASSERT (vls->shared_data_index != ~0);
560
561   s = vcl_session_get (wrk, vls->session_index);
562   if (s->session_state == STATE_LISTEN)
563     vls_listener_wrk_set (vls, wrk->wrk_index, 0 /* is_active */ );
564
565   vls_shared_data_pool_rlock ();
566
567   vls_shd = vls_shared_data_get (vls->shared_data_index);
568   clib_spinlock_lock (&vls_shd->lock);
569
570   pos = vls_shared_data_subscriber_position (vls_shd, wrk->wrk_index);
571   if (pos < 0)
572     {
573       clib_warning ("worker %u not subscribed for vls %u", wrk->wrk_index,
574                     vls->worker_index);
575       goto done;
576     }
577
578   /*
579    * Unsubscribe from share data and fifos
580    */
581   if (s->rx_fifo)
582     {
583       svm_fifo_del_subscriber (s->rx_fifo, wrk->vpp_wrk_index);
584       svm_fifo_del_subscriber (s->tx_fifo, wrk->vpp_wrk_index);
585     }
586   vec_del1 (vls_shd->workers_subscribed, pos);
587
588   /*
589    * Cleanup vcl state
590    */
591   n_subscribers = vec_len (vls_shd->workers_subscribed);
592   do_disconnect = s->session_state == STATE_LISTEN || !n_subscribers;
593   vcl_session_cleanup (wrk, s, vcl_session_handle (s), do_disconnect);
594
595   /*
596    * No subscriber left, cleanup shared data
597    */
598   if (!n_subscribers)
599     {
600       u32 shd_index = vls_shared_data_index (vls_shd);
601
602       clib_spinlock_unlock (&vls_shd->lock);
603       vls_shared_data_pool_runlock ();
604
605       vls_shared_data_free (shd_index);
606
607       /* All locks have been dropped */
608       return 0;
609     }
610
611   /* Return, if this is not the owning worker */
612   if (vls_shd->owner_wrk_index != wrk->wrk_index)
613     goto done;
614
615   ASSERT (vec_len (vls_shd->workers_subscribed));
616
617   /*
618    *  Check if we can change owner or close
619    */
620   vls_shd->owner_wrk_index = vls_shd->workers_subscribed[0];
621   vcl_send_session_worker_update (wrk, s, vls_shd->owner_wrk_index);
622
623   /* XXX is this still needed? */
624   if (vec_len (vls_shd->workers_subscribed) > 1)
625     clib_warning ("more workers need to be updated");
626
627 done:
628
629   clib_spinlock_unlock (&vls_shd->lock);
630   vls_shared_data_pool_runlock ();
631
632   return 0;
633 }
634
635 void
636 vls_init_share_session (vls_worker_t * vls_wrk, vcl_locked_session_t * vls)
637 {
638   vls_shared_data_t *vls_shd;
639
640   u32 vls_shd_index = vls_shared_data_alloc ();
641
642   vls_shared_data_pool_rlock ();
643
644   vls_shd = vls_shared_data_get (vls_shd_index);
645   vls_shd->owner_wrk_index = vls_wrk->wrk_index;
646   vls->shared_data_index = vls_shd_index;
647   vec_add1 (vls_shd->workers_subscribed, vls_wrk->wrk_index);
648
649   vls_shared_data_pool_runlock ();
650 }
651
652 void
653 vls_share_session (vls_worker_t * vls_wrk, vcl_locked_session_t * vls)
654 {
655   vcl_worker_t *vcl_wrk = vcl_worker_get (vls_wrk->wrk_index);
656   vls_shared_data_t *vls_shd;
657   vcl_session_t *s;
658
659   s = vcl_session_get (vcl_wrk, vls->session_index);
660   if (!s)
661     {
662       clib_warning ("wrk %u session %u vls %u NOT AVAILABLE",
663                     vcl_wrk->wrk_index, vls->session_index, vls->vls_index);
664       return;
665     }
666
667   ASSERT (vls->shared_data_index != ~0);
668
669   /* Reinit session lock */
670   clib_spinlock_init (&vls->lock);
671
672   vls_shared_data_pool_rlock ();
673
674   vls_shd = vls_shared_data_get (vls->shared_data_index);
675
676   clib_spinlock_lock (&vls_shd->lock);
677   vec_add1 (vls_shd->workers_subscribed, vls_wrk->wrk_index);
678   clib_spinlock_unlock (&vls_shd->lock);
679
680   vls_shared_data_pool_runlock ();
681
682   if (s->rx_fifo)
683     {
684       svm_fifo_add_subscriber (s->rx_fifo, vcl_wrk->vpp_wrk_index);
685       svm_fifo_add_subscriber (s->tx_fifo, vcl_wrk->vpp_wrk_index);
686     }
687   else if (s->session_state == STATE_LISTEN)
688     {
689       s->session_state = STATE_LISTEN_NO_MQ;
690     }
691 }
692
693 static void
694 vls_share_sessions (vls_worker_t * vls_parent_wrk, vls_worker_t * vls_wrk)
695 {
696   vcl_locked_session_t *vls, *parent_vls;
697
698   /* *INDENT-OFF* */
699   pool_foreach (vls, vls_wrk->vls_pool, ({
700     /* Initialize sharing on parent session */
701     if (vls->shared_data_index == ~0)
702       {
703         parent_vls = vls_session_get (vls_parent_wrk, vls->vls_index);
704         vls_init_share_session (vls_parent_wrk, parent_vls);
705         vls->shared_data_index = parent_vls->shared_data_index;
706       }
707     vls_share_session (vls_wrk, vls);
708   }));
709   /* *INDENT-ON* */
710 }
711
712 void
713 vls_worker_copy_on_fork (vcl_worker_t * parent_wrk)
714 {
715   vls_worker_t *vls_wrk = vls_worker_get_current (), *vls_parent_wrk;
716   vcl_worker_t *wrk = vcl_worker_get_current ();
717
718   /*
719    * init vcl worker
720    */
721   wrk->vpp_event_queues = vec_dup (parent_wrk->vpp_event_queues);
722   wrk->sessions = pool_dup (parent_wrk->sessions);
723   wrk->session_index_by_vpp_handles =
724     hash_dup (parent_wrk->session_index_by_vpp_handles);
725
726   /*
727    * init vls worker
728    */
729   vls_parent_wrk = vls_worker_get (parent_wrk->wrk_index);
730   vls_wrk->session_index_to_vlsh_table =
731     hash_dup (vls_parent_wrk->session_index_to_vlsh_table);
732   vls_wrk->vls_pool = pool_dup (vls_parent_wrk->vls_pool);
733
734   vls_share_sessions (vls_parent_wrk, vls_wrk);
735 }
736
737 static void
738 vls_mt_acq_locks (vcl_locked_session_t * vls, vls_mt_ops_t op, int *locks_acq)
739 {
740   vcl_worker_t *wrk = vcl_worker_get_current ();
741   vcl_session_t *s = 0;
742   int is_nonblk = 0;
743
744   if (vls)
745     {
746       s = vcl_session_get (wrk, vls->session_index);
747       if (PREDICT_FALSE (!s))
748         return;
749       is_nonblk = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
750     }
751
752   switch (op)
753     {
754     case VLS_MT_OP_READ:
755       if (!is_nonblk)
756         is_nonblk = vcl_session_read_ready (s) != 0;
757       if (!is_nonblk)
758         {
759           vls_mt_mq_lock ();
760           *locks_acq |= VLS_MT_LOCK_MQ;
761         }
762       break;
763     case VLS_MT_OP_WRITE:
764       ASSERT (s);
765       if (!is_nonblk)
766         is_nonblk = vcl_session_write_ready (s) != 0;
767       if (!is_nonblk)
768         {
769           vls_mt_mq_lock ();
770           *locks_acq |= VLS_MT_LOCK_MQ;
771         }
772       break;
773     case VLS_MT_OP_XPOLL:
774       vls_mt_mq_lock ();
775       *locks_acq |= VLS_MT_LOCK_MQ;
776       break;
777     case VLS_MT_OP_SPOOL:
778       vls_mt_spool_lock ();
779       *locks_acq |= VLS_MT_LOCK_SPOOL;
780       break;
781     default:
782       break;
783     }
784 }
785
786 static void
787 vls_mt_rel_locks (int locks_acq)
788 {
789   if (locks_acq & VLS_MT_LOCK_MQ)
790     vls_mt_mq_unlock ();
791   if (locks_acq & VLS_MT_LOCK_SPOOL)
792     vls_mt_create_unlock ();
793 }
794
795 #define vls_mt_guard(_vls, _op)                         \
796   int _locks_acq = 0;                                   \
797   if (PREDICT_FALSE (vcl_get_worker_index () == ~0))    \
798     vls_mt_add ();                                      \
799   if (PREDICT_FALSE (vlsl->vls_mt_n_threads > 1))       \
800     vls_mt_acq_locks (_vls, _op, &_locks_acq);          \
801
802 #define vls_mt_unguard()                                \
803   if (PREDICT_FALSE (_locks_acq))                       \
804     vls_mt_rel_locks (_locks_acq)
805
806 int
807 vls_write (vls_handle_t vlsh, void *buf, size_t nbytes)
808 {
809   vcl_locked_session_t *vls;
810   int rv;
811
812   if (!(vls = vls_get_w_dlock (vlsh)))
813     return VPPCOM_EBADFD;
814
815   vls_mt_guard (vls, VLS_MT_OP_WRITE);
816   rv = vppcom_session_write (vls_to_sh_tu (vls), buf, nbytes);
817   vls_mt_unguard ();
818   vls_get_and_unlock (vlsh);
819   return rv;
820 }
821
822 int
823 vls_write_msg (vls_handle_t vlsh, void *buf, size_t nbytes)
824 {
825   vcl_locked_session_t *vls;
826   int rv;
827
828   if (!(vls = vls_get_w_dlock (vlsh)))
829     return VPPCOM_EBADFD;
830   vls_mt_guard (vls, VLS_MT_OP_WRITE);
831   rv = vppcom_session_write_msg (vls_to_sh_tu (vls), buf, nbytes);
832   vls_mt_unguard ();
833   vls_get_and_unlock (vlsh);
834   return rv;
835 }
836
837 int
838 vls_sendto (vls_handle_t vlsh, void *buf, int buflen, int flags,
839             vppcom_endpt_t * ep)
840 {
841   vcl_locked_session_t *vls;
842   int rv;
843
844   if (!(vls = vls_get_w_dlock (vlsh)))
845     return VPPCOM_EBADFD;
846   vls_mt_guard (vls, VLS_MT_OP_WRITE);
847   rv = vppcom_session_sendto (vls_to_sh_tu (vls), buf, buflen, flags, ep);
848   vls_mt_unguard ();
849   vls_get_and_unlock (vlsh);
850   return rv;
851 }
852
853 ssize_t
854 vls_read (vls_handle_t vlsh, void *buf, size_t nbytes)
855 {
856   vcl_locked_session_t *vls;
857   int rv;
858
859   if (!(vls = vls_get_w_dlock (vlsh)))
860     return VPPCOM_EBADFD;
861   vls_mt_guard (vls, VLS_MT_OP_READ);
862   rv = vppcom_session_read (vls_to_sh_tu (vls), buf, nbytes);
863   vls_mt_unguard ();
864   vls_get_and_unlock (vlsh);
865   return rv;
866 }
867
868 ssize_t
869 vls_recvfrom (vls_handle_t vlsh, void *buffer, uint32_t buflen, int flags,
870               vppcom_endpt_t * ep)
871 {
872   vcl_locked_session_t *vls;
873   int rv;
874
875   if (!(vls = vls_get_w_dlock (vlsh)))
876     return VPPCOM_EBADFD;
877   vls_mt_guard (vls, VLS_MT_OP_READ);
878   rv = vppcom_session_recvfrom (vls_to_sh_tu (vls), buffer, buflen, flags,
879                                 ep);
880   vls_mt_unguard ();
881   vls_get_and_unlock (vlsh);
882   return rv;
883 }
884
885 int
886 vls_attr (vls_handle_t vlsh, uint32_t op, void *buffer, uint32_t * buflen)
887 {
888   vcl_locked_session_t *vls;
889   int rv;
890
891   if (PREDICT_FALSE (vcl_get_worker_index () == ~0))
892     vls_mt_add ();
893
894   if (!(vls = vls_get_w_dlock (vlsh)))
895     return VPPCOM_EBADFD;
896   rv = vppcom_session_attr (vls_to_sh_tu (vls), op, buffer, buflen);
897   vls_get_and_unlock (vlsh);
898   return rv;
899 }
900
901 int
902 vls_bind (vls_handle_t vlsh, vppcom_endpt_t * ep)
903 {
904   vcl_locked_session_t *vls;
905   int rv;
906
907   if (!(vls = vls_get_w_dlock (vlsh)))
908     return VPPCOM_EBADFD;
909   rv = vppcom_session_bind (vls_to_sh_tu (vls), ep);
910   vls_get_and_unlock (vlsh);
911   return rv;
912 }
913
914 int
915 vls_listen (vls_handle_t vlsh, int q_len)
916 {
917   vcl_locked_session_t *vls;
918   int rv;
919
920   if (!(vls = vls_get_w_dlock (vlsh)))
921     return VPPCOM_EBADFD;
922   vls_mt_guard (vls, VLS_MT_OP_XPOLL);
923   rv = vppcom_session_listen (vls_to_sh_tu (vls), q_len);
924   vls_mt_unguard ();
925   vls_get_and_unlock (vlsh);
926   return rv;
927 }
928
929 int
930 vls_connect (vls_handle_t vlsh, vppcom_endpt_t * server_ep)
931 {
932   vcl_locked_session_t *vls;
933   int rv;
934
935   if (!(vls = vls_get_w_dlock (vlsh)))
936     return VPPCOM_EBADFD;
937   vls_mt_guard (vls, VLS_MT_OP_XPOLL);
938   rv = vppcom_session_connect (vls_to_sh_tu (vls), server_ep);
939   vls_mt_unguard ();
940   vls_get_and_unlock (vlsh);
941   return rv;
942 }
943
944 static inline void
945 vls_mp_checks (vcl_locked_session_t * vls, int is_add)
946 {
947   vcl_worker_t *wrk = vcl_worker_get_current ();
948   vcl_session_t *s;
949   u32 owner_wrk;
950
951   s = vcl_session_get (wrk, vls->session_index);
952   switch (s->session_state)
953     {
954     case STATE_LISTEN:
955       if (is_add)
956         {
957           vls_listener_wrk_set (vls, vls->worker_index, 1 /* is_active */ );
958           break;
959         }
960       vls_listener_wrk_stop_listen (vls, vls->worker_index);
961       break;
962     case STATE_LISTEN_NO_MQ:
963       if (!is_add)
964         break;
965
966       /* Register worker as listener */
967       vls_listener_wrk_start_listen (vls, wrk->wrk_index);
968
969       /* If owner worker did not attempt to accept/xpoll on the session,
970        * force a listen stop for it, since it may not be interested in
971        * accepting new sessions.
972        * This is pretty much a hack done to give app workers the illusion
973        * that it is fine to listen and not accept new sessions for a
974        * given listener. Without it, we would accumulate unhandled
975        * accepts on the passive worker message queue. */
976       owner_wrk = vls_shared_get_owner (vls);
977       if (!vls_listener_wrk_is_active (vls, owner_wrk))
978         vls_listener_wrk_stop_listen (vls, owner_wrk);
979       break;
980     default:
981       break;
982     }
983 }
984
985 vls_handle_t
986 vls_accept (vls_handle_t listener_vlsh, vppcom_endpt_t * ep, int flags)
987 {
988   vls_handle_t accepted_vlsh;
989   vcl_locked_session_t *vls;
990   int sh;
991
992   if (!(vls = vls_get_w_dlock (listener_vlsh)))
993     return VPPCOM_EBADFD;
994   if (vcl_n_workers () > 1)
995     vls_mp_checks (vls, 1 /* is_add */ );
996   vls_mt_guard (vls, VLS_MT_OP_SPOOL);
997   sh = vppcom_session_accept (vls_to_sh_tu (vls), ep, flags);
998   vls_mt_unguard ();
999   vls_get_and_unlock (listener_vlsh);
1000   if (sh < 0)
1001     return sh;
1002   accepted_vlsh = vls_alloc (sh);
1003   if (PREDICT_FALSE (accepted_vlsh == VLS_INVALID_HANDLE))
1004     vppcom_session_close (sh);
1005   return accepted_vlsh;
1006 }
1007
1008 vls_handle_t
1009 vls_create (uint8_t proto, uint8_t is_nonblocking)
1010 {
1011   vcl_session_handle_t sh;
1012   vls_handle_t vlsh;
1013
1014   vls_mt_guard (0, VLS_MT_OP_SPOOL);
1015   sh = vppcom_session_create (proto, is_nonblocking);
1016   vls_mt_unguard ();
1017   if (sh == INVALID_SESSION_ID)
1018     return VLS_INVALID_HANDLE;
1019
1020   vlsh = vls_alloc (sh);
1021   if (PREDICT_FALSE (vlsh == VLS_INVALID_HANDLE))
1022     vppcom_session_close (sh);
1023
1024   return vlsh;
1025 }
1026
1027 int
1028 vls_close (vls_handle_t vlsh)
1029 {
1030   vcl_locked_session_t *vls;
1031   int rv;
1032
1033   vls_table_wlock ();
1034
1035   vls = vls_get_and_lock (vlsh);
1036   if (!vls)
1037     {
1038       vls_table_wunlock ();
1039       return VPPCOM_EBADFD;
1040     }
1041
1042   vls_mt_guard (0, VLS_MT_OP_SPOOL);
1043
1044   if (vls_is_shared (vls))
1045     rv = vls_unshare_session (vls, vcl_worker_get_current ());
1046   else
1047     rv = vppcom_session_close (vls_to_sh (vls));
1048
1049   vls_free (vls);
1050   vls_mt_unguard ();
1051
1052   vls_table_wunlock ();
1053
1054   return rv;
1055 }
1056
1057 vls_handle_t
1058 vls_epoll_create (void)
1059 {
1060   vcl_session_handle_t sh;
1061   vls_handle_t vlsh;
1062
1063   if (PREDICT_FALSE (vcl_get_worker_index () == ~0))
1064     vls_mt_add ();
1065
1066   sh = vppcom_epoll_create ();
1067   if (sh == INVALID_SESSION_ID)
1068     return VLS_INVALID_HANDLE;
1069
1070   vlsh = vls_alloc (sh);
1071   if (vlsh == VLS_INVALID_HANDLE)
1072     vppcom_session_close (sh);
1073
1074   return vlsh;
1075 }
1076
1077 static void
1078 vls_epoll_ctl_mp_checks (vcl_locked_session_t * vls, int op)
1079 {
1080   if (vcl_n_workers () <= 1)
1081     {
1082       vlsl->epoll_mp_check = 1;
1083       return;
1084     }
1085
1086   if (op == EPOLL_CTL_MOD)
1087     return;
1088
1089   vlsl->epoll_mp_check = 1;
1090   vls_mp_checks (vls, op == EPOLL_CTL_ADD);
1091 }
1092
1093 int
1094 vls_epoll_ctl (vls_handle_t ep_vlsh, int op, vls_handle_t vlsh,
1095                struct epoll_event *event)
1096 {
1097   vcl_locked_session_t *ep_vls, *vls;
1098   vcl_session_handle_t ep_sh, sh;
1099   int rv;
1100
1101   vls_table_rlock ();
1102   ep_vls = vls_get_and_lock (ep_vlsh);
1103   vls = vls_get_and_lock (vlsh);
1104   ep_sh = vls_to_sh (ep_vls);
1105   sh = vls_to_sh (vls);
1106
1107   if (PREDICT_FALSE (!vlsl->epoll_mp_check))
1108     vls_epoll_ctl_mp_checks (vls, op);
1109
1110   vls_table_runlock ();
1111
1112   rv = vppcom_epoll_ctl (ep_sh, op, sh, event);
1113
1114   vls_table_rlock ();
1115   ep_vls = vls_get (ep_vlsh);
1116   vls = vls_get (vlsh);
1117   vls_unlock (vls);
1118   vls_unlock (ep_vls);
1119   vls_table_runlock ();
1120   return rv;
1121 }
1122
1123 int
1124 vls_epoll_wait (vls_handle_t ep_vlsh, struct epoll_event *events,
1125                 int maxevents, double wait_for_time)
1126 {
1127   vcl_locked_session_t *vls;
1128   int rv;
1129
1130   if (!(vls = vls_get_w_dlock (ep_vlsh)))
1131     return VPPCOM_EBADFD;
1132   vls_mt_guard (0, VLS_MT_OP_XPOLL);
1133   rv = vppcom_epoll_wait (vls_to_sh_tu (vls), events, maxevents,
1134                           wait_for_time);
1135   vls_mt_unguard ();
1136   vls_get_and_unlock (ep_vlsh);
1137   return rv;
1138 }
1139
1140 static void
1141 vls_select_mp_checks (vcl_si_set * read_map)
1142 {
1143   vcl_locked_session_t *vls;
1144   vcl_worker_t *wrk;
1145   vcl_session_t *s;
1146   u32 si;
1147
1148   if (vcl_n_workers () <= 1)
1149     {
1150       vlsl->select_mp_check = 1;
1151       return;
1152     }
1153
1154   if (!read_map)
1155     return;
1156
1157   vlsl->select_mp_check = 1;
1158   wrk = vcl_worker_get_current ();
1159
1160   /* *INDENT-OFF* */
1161   clib_bitmap_foreach (si, read_map, ({
1162     s = vcl_session_get (wrk, si);
1163     if (s->session_state == STATE_LISTEN)
1164       {
1165         vls = vls_get (vls_session_index_to_vlsh (si));
1166         vls_mp_checks (vls, 1 /* is_add */);
1167       }
1168   }));
1169   /* *INDENT-ON* */
1170 }
1171
1172 int
1173 vls_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map,
1174             vcl_si_set * except_map, double wait_for_time)
1175 {
1176   int rv;
1177
1178   vls_mt_guard (0, VLS_MT_OP_XPOLL);
1179   if (PREDICT_FALSE (!vlsl->select_mp_check))
1180     vls_select_mp_checks (read_map);
1181   rv = vppcom_select (n_bits, read_map, write_map, except_map, wait_for_time);
1182   vls_mt_unguard ();
1183   return rv;
1184 }
1185
1186 static void
1187 vls_unshare_vcl_worker_sessions (vcl_worker_t * wrk)
1188 {
1189   u32 current_wrk, is_current;
1190   vcl_locked_session_t *vls;
1191   vcl_session_t *s;
1192
1193   if (pool_elts (vcm->workers) <= 1)
1194     return;
1195
1196   current_wrk = vcl_get_worker_index ();
1197   is_current = current_wrk == wrk->wrk_index;
1198
1199   /* *INDENT-OFF* */
1200   pool_foreach (s, wrk->sessions, ({
1201     vls = vls_get (vls_si_to_vlsh (s->session_index));
1202     if (vls && (is_current || vls_is_shared_by_wrk (vls, current_wrk)))
1203       vls_unshare_session (vls, wrk);
1204   }));
1205   /* *INDENT-ON* */
1206 }
1207
1208 static void
1209 vls_cleanup_vcl_worker (vcl_worker_t * wrk)
1210 {
1211   vls_worker_t *vls_wrk = vls_worker_get (wrk->wrk_index);
1212
1213   /* Unshare sessions and also cleanup worker since child may have
1214    * called _exit () and therefore vcl may not catch the event */
1215   vls_unshare_vcl_worker_sessions (wrk);
1216   vcl_worker_cleanup (wrk, 1 /* notify vpp */ );
1217
1218   vls_worker_free (vls_wrk);
1219 }
1220
1221 static void
1222 vls_cleanup_forked_child (vcl_worker_t * wrk, vcl_worker_t * child_wrk)
1223 {
1224   vcl_worker_t *sub_child;
1225   int tries = 0;
1226
1227   if (child_wrk->forked_child != ~0)
1228     {
1229       sub_child = vcl_worker_get_if_valid (child_wrk->forked_child);
1230       if (sub_child)
1231         {
1232           /* Wait a bit, maybe the process is going away */
1233           while (kill (sub_child->current_pid, 0) >= 0 && tries++ < 50)
1234             usleep (1e3);
1235           if (kill (sub_child->current_pid, 0) < 0)
1236             vls_cleanup_forked_child (child_wrk, sub_child);
1237         }
1238     }
1239   vls_cleanup_vcl_worker (child_wrk);
1240   VDBG (0, "Cleaned up forked child wrk %u", child_wrk->wrk_index);
1241   wrk->forked_child = ~0;
1242 }
1243
1244 static struct sigaction old_sa;
1245
1246 static void
1247 vls_intercept_sigchld_handler (int signum, siginfo_t * si, void *uc)
1248 {
1249   vcl_worker_t *wrk, *child_wrk;
1250
1251   if (vcl_get_worker_index () == ~0)
1252     return;
1253
1254   if (sigaction (SIGCHLD, &old_sa, 0))
1255     {
1256       VERR ("couldn't restore sigchld");
1257       exit (-1);
1258     }
1259
1260   wrk = vcl_worker_get_current ();
1261   if (wrk->forked_child == ~0)
1262     return;
1263
1264   child_wrk = vcl_worker_get_if_valid (wrk->forked_child);
1265   if (!child_wrk)
1266     goto done;
1267
1268   if (si && si->si_pid != child_wrk->current_pid)
1269     {
1270       VDBG (0, "unexpected child pid %u", si->si_pid);
1271       goto done;
1272     }
1273   vls_cleanup_forked_child (wrk, child_wrk);
1274
1275 done:
1276   if (old_sa.sa_flags & SA_SIGINFO)
1277     {
1278       void (*fn) (int, siginfo_t *, void *) = old_sa.sa_sigaction;
1279       fn (signum, si, uc);
1280     }
1281   else
1282     {
1283       void (*fn) (int) = old_sa.sa_handler;
1284       if (fn)
1285         fn (signum);
1286     }
1287 }
1288
1289 static void
1290 vls_incercept_sigchld ()
1291 {
1292   struct sigaction sa;
1293   clib_memset (&sa, 0, sizeof (sa));
1294   sa.sa_sigaction = vls_intercept_sigchld_handler;
1295   sa.sa_flags = SA_SIGINFO;
1296   if (sigaction (SIGCHLD, &sa, &old_sa))
1297     {
1298       VERR ("couldn't intercept sigchld");
1299       exit (-1);
1300     }
1301 }
1302
1303 static void
1304 vls_app_pre_fork (void)
1305 {
1306   vls_incercept_sigchld ();
1307   vcl_flush_mq_events ();
1308 }
1309
1310 static void
1311 vls_app_fork_child_handler (void)
1312 {
1313   vcl_worker_t *parent_wrk;
1314   int rv, parent_wrk_index;
1315   u8 *child_name;
1316
1317   parent_wrk_index = vcl_get_worker_index ();
1318   VDBG (0, "initializing forked child %u with parent wrk %u", getpid (),
1319         parent_wrk_index);
1320
1321   /*
1322    * Allocate worker vcl
1323    */
1324   vcl_set_worker_index (~0);
1325   if (!vcl_worker_alloc_and_init ())
1326     VERR ("couldn't allocate new worker");
1327
1328   /*
1329    * Attach to binary api
1330    */
1331   child_name = format (0, "%v-child-%u%c", vcm->app_name, getpid (), 0);
1332   vcl_cleanup_bapi ();
1333   vppcom_api_hookup ();
1334   vcm->app_state = STATE_APP_START;
1335   rv = vppcom_connect_to_vpp ((char *) child_name);
1336   vec_free (child_name);
1337   if (rv)
1338     {
1339       VERR ("couldn't connect to VPP!");
1340       return;
1341     }
1342
1343   /*
1344    * Allocate/initialize vls worker
1345    */
1346   vls_worker_alloc ();
1347
1348   /*
1349    * Register worker with vpp and share sessions
1350    */
1351   vcl_worker_register_with_vpp ();
1352   parent_wrk = vcl_worker_get (parent_wrk_index);
1353   vls_worker_copy_on_fork (parent_wrk);
1354   parent_wrk->forked_child = vcl_get_worker_index ();
1355
1356   /* Reset number of threads and set wrk index */
1357   vlsl->vls_mt_n_threads = 0;
1358   vlsl->vls_wrk_index = vcl_get_worker_index ();
1359   vlsl->select_mp_check = 0;
1360   vlsl->epoll_mp_check = 0;
1361   vls_mt_locks_init ();
1362
1363   VDBG (0, "forked child main worker initialized");
1364   vcm->forking = 0;
1365 }
1366
1367 static void
1368 vls_app_fork_parent_handler (void)
1369 {
1370   vcm->forking = 1;
1371   while (vcm->forking)
1372     ;
1373 }
1374
1375 void
1376 vls_app_exit (void)
1377 {
1378   vls_worker_t *wrk = vls_worker_get_current ();
1379
1380   /* Unshare the sessions. VCL will clean up the worker */
1381   vls_unshare_vcl_worker_sessions (vcl_worker_get_current ());
1382   vls_worker_free (wrk);
1383 }
1384
1385 static void
1386 vls_clone_and_share_rpc_handler (void *args)
1387 {
1388   vls_clone_and_share_msg_t *msg = (vls_clone_and_share_msg_t *) args;
1389   vls_worker_t *wrk = vls_worker_get_current (), *dst_wrk;
1390   vcl_locked_session_t *vls, *dst_vls;
1391   vcl_worker_t *dst_vcl_wrk;
1392   vcl_session_t *s, *dst_s;
1393
1394   vls = vls_session_get (wrk, msg->vls_index);
1395   vls_init_share_session (wrk, vls);
1396
1397   s = vcl_session_get (vcl_worker_get_current (), vls->session_index);
1398   dst_wrk = vls_worker_get (msg->origin_vls_wrk);
1399   dst_vcl_wrk = vcl_worker_get (msg->origin_vls_wrk);
1400   dst_vls = vls_session_get (dst_wrk, msg->origin_vls_index);
1401   dst_vls->shared_data_index = vls->shared_data_index;
1402   dst_s = vcl_session_get (dst_vcl_wrk, dst_vls->session_index);
1403   clib_memcpy (dst_s, s, sizeof (*s));
1404
1405   dst_wrk->rpc_done = 1;
1406 }
1407
1408 static void
1409 vls_rpc_handler (void *args)
1410 {
1411   vls_rpc_msg_t *msg = (vls_rpc_msg_t *) args;
1412   switch (msg->type)
1413     {
1414     case VLS_RPC_CLONE_AND_SHARE:
1415       vls_clone_and_share_rpc_handler (msg->data);
1416       break;
1417     default:
1418       break;
1419     }
1420 }
1421
1422 void
1423 vls_send_clone_and_share_rpc (vls_worker_t * wrk, vcl_locked_session_t * vls,
1424                               u32 dst_wrk_index, u32 dst_vls_index)
1425 {
1426   u8 data[sizeof (u8) + sizeof (vls_clone_and_share_msg_t)];
1427   vls_clone_and_share_msg_t *msg;
1428   vls_rpc_msg_t *rpc;
1429
1430   rpc = (vls_rpc_msg_t *) & data;
1431   rpc->type = VLS_RPC_CLONE_AND_SHARE;
1432   msg = (vls_clone_and_share_msg_t *) & rpc->data;
1433   msg->origin_vls_wrk = wrk->wrk_index;
1434   msg->origin_vls_index = vls->vls_index;
1435   msg->vls_index = dst_vls_index;
1436
1437   wrk->rpc_done = 0;
1438   vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data));
1439   while (!wrk->rpc_done)
1440     ;
1441 }
1442
1443 int
1444 vls_app_create (char *app_name)
1445 {
1446   int rv;
1447
1448   if ((rv = vppcom_app_create (app_name)))
1449     return rv;
1450
1451   vlsm = clib_mem_alloc (sizeof (vls_main_t));
1452   clib_memset (vlsm, 0, sizeof (*vlsm));
1453   clib_rwlock_init (&vlsm->vls_table_lock);
1454   clib_rwlock_init (&vlsm->shared_data_lock);
1455   pool_alloc (vlsm->workers, vcm->cfg.max_workers);
1456
1457   pthread_atfork (vls_app_pre_fork, vls_app_fork_parent_handler,
1458                   vls_app_fork_child_handler);
1459   atexit (vls_app_exit);
1460   vls_worker_alloc ();
1461   vlsl->vls_wrk_index = vcl_get_worker_index ();
1462   vls_mt_locks_init ();
1463   vcm->wrk_rpc_fn = vls_rpc_handler;
1464   return VPPCOM_OK;
1465 }
1466
1467 unsigned char
1468 vls_use_eventfd (void)
1469 {
1470   return vcm->cfg.use_mq_eventfd;
1471 }
1472
1473 /*
1474  * fd.io coding-style-patch-verification: ON
1475  *
1476  * Local Variables:
1477  * eval: (c-set-style "gnu")
1478  * End:
1479  */