vcl: mt detection and cleanup
[vpp.git] / src / vcl / vcl_locked.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vcl/vcl_locked.h>
17 #include <vcl/vcl_private.h>
18
19 typedef struct vls_shared_data_
20 {
21   clib_spinlock_t lock;
22   u32 owner_wrk_index;
23   u32 *workers_subscribed;
24   clib_bitmap_t *listeners;
25 } vls_shared_data_t;
26
27 typedef struct vcl_locked_session_
28 {
29   clib_spinlock_t lock;
30   u32 session_index;
31   u32 worker_index;
32   u32 vls_index;
33   u32 shared_data_index;
34   /** VCL session owned by different workers because of migration */
35   u32 owner_vcl_wrk_index;
36   uword *vcl_wrk_index_to_session_index;
37 } vcl_locked_session_t;
38
39 typedef struct vls_worker_
40 {
41   vcl_locked_session_t *vls_pool;
42   uword *session_index_to_vlsh_table;
43   u32 wrk_index;
44 } vls_worker_t;
45
46 typedef struct vls_local_
47 {
48   int vls_wrk_index;
49   volatile int vls_mt_n_threads;
50   pthread_mutex_t vls_mt_mq_mlock;
51   pthread_mutex_t vls_mt_spool_mlock;
52   volatile u8 select_mp_check;
53   volatile u8 epoll_mp_check;
54 } vls_process_local_t;
55
56 static vls_process_local_t vls_local;
57 static vls_process_local_t *vlsl = &vls_local;
58
59 typedef struct vls_main_
60 {
61   vls_worker_t *workers;
62   clib_rwlock_t vls_table_lock;
63   /** Pool of data shared by sessions owned by different workers */
64   vls_shared_data_t *shared_data_pool;
65   clib_rwlock_t shared_data_lock;
66 } vls_main_t;
67
68 vls_main_t *vlsm;
69
70 typedef enum vls_rpc_msg_type_
71 {
72   VLS_RPC_CLONE_AND_SHARE,
73   VLS_RPC_SESS_CLEANUP,
74 } vls_rpc_msg_type_e;
75
76 typedef struct vls_rpc_msg_
77 {
78   u8 type;
79   u8 data[0];
80 } vls_rpc_msg_t;
81
82 typedef struct vls_clone_and_share_msg_
83 {
84   u32 vls_index;                /**< vls to be shared */
85   u32 session_index;            /**< vcl session to be shared */
86   u32 origin_vls_wrk;           /**< vls worker that initiated the rpc */
87   u32 origin_vls_index;         /**< vls session of the originator */
88   u32 origin_vcl_wrk;           /**< vcl worker that initiated the rpc */
89   u32 origin_session_index;     /**< vcl session of the originator */
90 } vls_clone_and_share_msg_t;
91
92 typedef struct vls_sess_cleanup_msg_
93 {
94   u32 session_index;            /**< vcl session to be cleaned */
95   u32 origin_vcl_wrk;           /**< worker that initiated the rpc */
96 } vls_sess_cleanup_msg_t;
97
98 void vls_send_session_cleanup_rpc (vcl_worker_t * wrk,
99                                    u32 dst_wrk_index, u32 dst_session_index);
100 void vls_send_clone_and_share_rpc (vcl_worker_t * wrk,
101                                    vcl_locked_session_t * vls,
102                                    u32 session_index, u32 vls_wrk_index,
103                                    u32 dst_wrk_index, u32 dst_vls_index,
104                                    u32 dst_session_index);
105
106
107 static inline u32
108 vls_get_worker_index (void)
109 {
110   if (vls_mt_wrk_supported ())
111     return vlsl->vls_wrk_index;
112   else
113     return vcl_get_worker_index ();
114 }
115
116 static u32
117 vls_shared_data_alloc (void)
118 {
119   vls_shared_data_t *vls_shd;
120   u32 shd_index;
121
122   clib_rwlock_writer_lock (&vlsm->shared_data_lock);
123   pool_get_zero (vlsm->shared_data_pool, vls_shd);
124   clib_spinlock_init (&vls_shd->lock);
125   shd_index = vls_shd - vlsm->shared_data_pool;
126   clib_rwlock_writer_unlock (&vlsm->shared_data_lock);
127
128   return shd_index;
129 }
130
131 static u32
132 vls_shared_data_index (vls_shared_data_t * vls_shd)
133 {
134   return vls_shd - vlsm->shared_data_pool;
135 }
136
137 vls_shared_data_t *
138 vls_shared_data_get (u32 shd_index)
139 {
140   if (pool_is_free_index (vlsm->shared_data_pool, shd_index))
141     return 0;
142   return pool_elt_at_index (vlsm->shared_data_pool, shd_index);
143 }
144
145 static void
146 vls_shared_data_free (u32 shd_index)
147 {
148   vls_shared_data_t *vls_shd;
149
150   clib_rwlock_writer_lock (&vlsm->shared_data_lock);
151   vls_shd = vls_shared_data_get (shd_index);
152   clib_spinlock_free (&vls_shd->lock);
153   clib_bitmap_free (vls_shd->listeners);
154   vec_free (vls_shd->workers_subscribed);
155   pool_put (vlsm->shared_data_pool, vls_shd);
156   clib_rwlock_writer_unlock (&vlsm->shared_data_lock);
157 }
158
159 static inline void
160 vls_shared_data_pool_rlock (void)
161 {
162   clib_rwlock_reader_lock (&vlsm->shared_data_lock);
163 }
164
165 static inline void
166 vls_shared_data_pool_runlock (void)
167 {
168   clib_rwlock_reader_unlock (&vlsm->shared_data_lock);
169 }
170
171 static inline void
172 vls_mt_table_rlock (void)
173 {
174   if (vlsl->vls_mt_n_threads > 1)
175     clib_rwlock_reader_lock (&vlsm->vls_table_lock);
176 }
177
178 static inline void
179 vls_mt_table_runlock (void)
180 {
181   if (vlsl->vls_mt_n_threads > 1)
182     clib_rwlock_reader_unlock (&vlsm->vls_table_lock);
183 }
184
185 static inline void
186 vls_mt_table_wlock (void)
187 {
188   if (vlsl->vls_mt_n_threads > 1)
189     clib_rwlock_writer_lock (&vlsm->vls_table_lock);
190 }
191
192 static inline void
193 vls_mt_table_wunlock (void)
194 {
195   if (vlsl->vls_mt_n_threads > 1)
196     clib_rwlock_writer_unlock (&vlsm->vls_table_lock);
197 }
198
199 typedef enum
200 {
201   VLS_MT_OP_READ,
202   VLS_MT_OP_WRITE,
203   VLS_MT_OP_SPOOL,
204   VLS_MT_OP_XPOLL,
205 } vls_mt_ops_t;
206
207 typedef enum
208 {
209   VLS_MT_LOCK_MQ = 1 << 0,
210   VLS_MT_LOCK_SPOOL = 1 << 1
211 } vls_mt_lock_type_t;
212
213 static void
214 vls_mt_add (void)
215 {
216   vlsl->vls_mt_n_threads += 1;
217
218   /* If multi-thread workers are supported, for each new thread register a new
219    * vcl worker with vpp. Otherwise, all threads use the same vcl worker, so
220    * update the vcl worker's thread local worker index variable */
221   if (vls_mt_wrk_supported ())
222     vls_register_vcl_worker ();
223   else
224     vcl_set_worker_index (vlsl->vls_wrk_index);
225 }
226
227 static inline void
228 vls_mt_mq_lock (void)
229 {
230   pthread_mutex_lock (&vlsl->vls_mt_mq_mlock);
231 }
232
233 static inline void
234 vls_mt_mq_unlock (void)
235 {
236   pthread_mutex_unlock (&vlsl->vls_mt_mq_mlock);
237 }
238
239 static inline void
240 vls_mt_spool_lock (void)
241 {
242   pthread_mutex_lock (&vlsl->vls_mt_spool_mlock);
243 }
244
245 static inline void
246 vls_mt_create_unlock (void)
247 {
248   pthread_mutex_unlock (&vlsl->vls_mt_spool_mlock);
249 }
250
251 static void
252 vls_mt_locks_init (void)
253 {
254   pthread_mutex_init (&vlsl->vls_mt_mq_mlock, NULL);
255   pthread_mutex_init (&vlsl->vls_mt_spool_mlock, NULL);
256 }
257
258 u8
259 vls_is_shared (vcl_locked_session_t * vls)
260 {
261   return (vls->shared_data_index != ~0);
262 }
263
264 static inline void
265 vls_lock (vcl_locked_session_t * vls)
266 {
267   if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls))
268     clib_spinlock_lock (&vls->lock);
269 }
270
271 static inline void
272 vls_unlock (vcl_locked_session_t * vls)
273 {
274   if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls))
275     clib_spinlock_unlock (&vls->lock);
276 }
277
278 static inline vcl_session_handle_t
279 vls_to_sh (vcl_locked_session_t * vls)
280 {
281   return vcl_session_handle_from_index (vls->session_index);
282 }
283
284 static inline vcl_session_handle_t
285 vls_to_sh_tu (vcl_locked_session_t * vls)
286 {
287   vcl_session_handle_t sh;
288   sh = vls_to_sh (vls);
289   vls_mt_table_runlock ();
290   return sh;
291 }
292
293 static vls_worker_t *
294 vls_worker_get_current (void)
295 {
296   return pool_elt_at_index (vlsm->workers, vls_get_worker_index ());
297 }
298
299 static void
300 vls_worker_alloc (void)
301 {
302   vls_worker_t *wrk;
303
304   pool_get_zero (vlsm->workers, wrk);
305   wrk->wrk_index = vcl_get_worker_index ();
306 }
307
308 static void
309 vls_worker_free (vls_worker_t * wrk)
310 {
311   hash_free (wrk->session_index_to_vlsh_table);
312   pool_free (wrk->vls_pool);
313   pool_put (vlsm->workers, wrk);
314 }
315
316 static vls_worker_t *
317 vls_worker_get (u32 wrk_index)
318 {
319   if (pool_is_free_index (vlsm->workers, wrk_index))
320     return 0;
321   return pool_elt_at_index (vlsm->workers, wrk_index);
322 }
323
324 static vls_handle_t
325 vls_alloc (vcl_session_handle_t sh)
326 {
327   vls_worker_t *wrk = vls_worker_get_current ();
328   vcl_locked_session_t *vls;
329
330   vls_mt_table_wlock ();
331
332   pool_get_zero (wrk->vls_pool, vls);
333   vls->session_index = vppcom_session_index (sh);
334   vls->worker_index = vppcom_session_worker (sh);
335   vls->vls_index = vls - wrk->vls_pool;
336   vls->shared_data_index = ~0;
337   hash_set (wrk->session_index_to_vlsh_table, vls->session_index,
338             vls->vls_index);
339   if (vls_mt_wrk_supported ())
340     {
341       hash_set (vls->vcl_wrk_index_to_session_index, vls->worker_index,
342                 vls->session_index);
343       vls->owner_vcl_wrk_index = vls->worker_index;
344     }
345   clib_spinlock_init (&vls->lock);
346
347   vls_mt_table_wunlock ();
348   return vls->vls_index;
349 }
350
351 static vcl_locked_session_t *
352 vls_get (vls_handle_t vlsh)
353 {
354   vls_worker_t *wrk = vls_worker_get_current ();
355   if (pool_is_free_index (wrk->vls_pool, vlsh))
356     return 0;
357   return pool_elt_at_index (wrk->vls_pool, vlsh);
358 }
359
360 static void
361 vls_free (vcl_locked_session_t * vls)
362 {
363   vls_worker_t *wrk = vls_worker_get_current ();
364
365   ASSERT (vls != 0);
366   hash_unset (wrk->session_index_to_vlsh_table, vls->session_index);
367   clib_spinlock_free (&vls->lock);
368   pool_put (wrk->vls_pool, vls);
369 }
370
371 static vcl_locked_session_t *
372 vls_get_and_lock (vls_handle_t vlsh)
373 {
374   vls_worker_t *wrk = vls_worker_get_current ();
375   vcl_locked_session_t *vls;
376   if (pool_is_free_index (wrk->vls_pool, vlsh))
377     return 0;
378   vls = pool_elt_at_index (wrk->vls_pool, vlsh);
379   vls_lock (vls);
380   return vls;
381 }
382
383 static vcl_locked_session_t *
384 vls_get_w_dlock (vls_handle_t vlsh)
385 {
386   vcl_locked_session_t *vls;
387   vls_mt_table_rlock ();
388   vls = vls_get_and_lock (vlsh);
389   if (!vls)
390     vls_mt_table_runlock ();
391   return vls;
392 }
393
394 static inline void
395 vls_get_and_unlock (vls_handle_t vlsh)
396 {
397   vcl_locked_session_t *vls;
398   vls_mt_table_rlock ();
399   vls = vls_get (vlsh);
400   vls_unlock (vls);
401   vls_mt_table_runlock ();
402 }
403
404 static inline void
405 vls_dunlock (vcl_locked_session_t * vls)
406 {
407   vls_unlock (vls);
408   vls_mt_table_runlock ();
409 }
410
411 static vcl_locked_session_t *
412 vls_session_get (vls_worker_t * wrk, u32 vls_index)
413 {
414   if (pool_is_free_index (wrk->vls_pool, vls_index))
415     return 0;
416   return pool_elt_at_index (wrk->vls_pool, vls_index);
417 }
418
419 vcl_session_handle_t
420 vlsh_to_sh (vls_handle_t vlsh)
421 {
422   vcl_locked_session_t *vls;
423   int rv;
424
425   vls = vls_get_w_dlock (vlsh);
426   if (!vls)
427     return INVALID_SESSION_ID;
428   rv = vls_to_sh (vls);
429   vls_dunlock (vls);
430   return rv;
431 }
432
433 vcl_session_handle_t
434 vlsh_to_session_index (vls_handle_t vlsh)
435 {
436   vcl_session_handle_t sh;
437   sh = vlsh_to_sh (vlsh);
438   return vppcom_session_index (sh);
439 }
440
441 vls_handle_t
442 vls_si_to_vlsh (u32 session_index)
443 {
444   vls_worker_t *wrk = vls_worker_get_current ();
445   uword *vlshp;
446   vlshp = hash_get (wrk->session_index_to_vlsh_table, session_index);
447   return vlshp ? *vlshp : VLS_INVALID_HANDLE;
448 }
449
450 vls_handle_t
451 vls_session_index_to_vlsh (uint32_t session_index)
452 {
453   vls_handle_t vlsh;
454
455   vls_mt_table_rlock ();
456   vlsh = vls_si_to_vlsh (session_index);
457   vls_mt_table_runlock ();
458
459   return vlsh;
460 }
461
462 u8
463 vls_is_shared_by_wrk (vcl_locked_session_t * vls, u32 wrk_index)
464 {
465   vls_shared_data_t *vls_shd;
466   int i;
467
468   if (vls->shared_data_index == ~0)
469     return 0;
470
471   vls_shared_data_pool_rlock ();
472
473   vls_shd = vls_shared_data_get (vls->shared_data_index);
474   clib_spinlock_lock (&vls_shd->lock);
475
476   for (i = 0; i < vec_len (vls_shd->workers_subscribed); i++)
477     if (vls_shd->workers_subscribed[i] == wrk_index)
478       {
479         clib_spinlock_unlock (&vls_shd->lock);
480         vls_shared_data_pool_runlock ();
481         return 1;
482       }
483   clib_spinlock_unlock (&vls_shd->lock);
484
485   vls_shared_data_pool_runlock ();
486   return 0;
487 }
488
489 static void
490 vls_listener_wrk_set (vcl_locked_session_t * vls, u32 wrk_index, u8 is_active)
491 {
492   vls_shared_data_t *vls_shd;
493
494   if (vls->shared_data_index == ~0)
495     {
496       clib_warning ("not a shared session");
497       return;
498     }
499
500   vls_shared_data_pool_rlock ();
501
502   vls_shd = vls_shared_data_get (vls->shared_data_index);
503
504   clib_spinlock_lock (&vls_shd->lock);
505   clib_bitmap_set (vls_shd->listeners, wrk_index, is_active);
506   clib_spinlock_unlock (&vls_shd->lock);
507
508   vls_shared_data_pool_runlock ();
509 }
510
511 static u32
512 vls_shared_get_owner (vcl_locked_session_t * vls)
513 {
514   vls_shared_data_t *vls_shd;
515   u32 owner_wrk;
516
517   vls_shared_data_pool_rlock ();
518
519   vls_shd = vls_shared_data_get (vls->shared_data_index);
520   owner_wrk = vls_shd->owner_wrk_index;
521
522   vls_shared_data_pool_runlock ();
523
524   return owner_wrk;
525 }
526
527 static u8
528 vls_listener_wrk_is_active (vcl_locked_session_t * vls, u32 wrk_index)
529 {
530   vls_shared_data_t *vls_shd;
531   u8 is_set;
532
533   if (vls->shared_data_index == ~0)
534     {
535       clib_warning ("not a shared session");
536       return 0;
537     }
538
539   vls_shared_data_pool_rlock ();
540
541   vls_shd = vls_shared_data_get (vls->shared_data_index);
542
543   clib_spinlock_lock (&vls_shd->lock);
544   is_set = clib_bitmap_get (vls_shd->listeners, wrk_index);
545   clib_spinlock_unlock (&vls_shd->lock);
546
547   vls_shared_data_pool_runlock ();
548
549   return (is_set == 1);
550 }
551
552 static void
553 vls_listener_wrk_start_listen (vcl_locked_session_t * vls, u32 wrk_index)
554 {
555   vppcom_session_listen (vls_to_sh (vls), ~0);
556   vls_listener_wrk_set (vls, wrk_index, 1 /* is_active */ );
557 }
558
559 static void
560 vls_listener_wrk_stop_listen (vcl_locked_session_t * vls, u32 wrk_index)
561 {
562   vcl_worker_t *wrk;
563   vcl_session_t *s;
564
565   wrk = vcl_worker_get (wrk_index);
566   s = vcl_session_get (wrk, vls->session_index);
567   if (s->session_state != STATE_LISTEN)
568     return;
569   vcl_send_session_unlisten (wrk, s);
570   s->session_state = STATE_LISTEN_NO_MQ;
571   vls_listener_wrk_set (vls, wrk_index, 0 /* is_active */ );
572 }
573
574 static int
575 vls_shared_data_subscriber_position (vls_shared_data_t * vls_shd,
576                                      u32 wrk_index)
577 {
578   int i;
579
580   for (i = 0; i < vec_len (vls_shd->workers_subscribed); i++)
581     {
582       if (vls_shd->workers_subscribed[i] == wrk_index)
583         return i;
584     }
585   return -1;
586 }
587
588 int
589 vls_unshare_session (vcl_locked_session_t * vls, vcl_worker_t * wrk)
590 {
591   vls_shared_data_t *vls_shd;
592   int do_disconnect, pos;
593   u32 n_subscribers;
594   vcl_session_t *s;
595
596   if (vls->shared_data_index == ~0)
597     return 0;
598
599   s = vcl_session_get (wrk, vls->session_index);
600   if (s->session_state == STATE_LISTEN)
601     vls_listener_wrk_set (vls, wrk->wrk_index, 0 /* is_active */ );
602
603   vls_shared_data_pool_rlock ();
604
605   vls_shd = vls_shared_data_get (vls->shared_data_index);
606   clib_spinlock_lock (&vls_shd->lock);
607
608   pos = vls_shared_data_subscriber_position (vls_shd, wrk->wrk_index);
609   if (pos < 0)
610     {
611       clib_warning ("worker %u not subscribed for vls %u", wrk->wrk_index,
612                     vls->worker_index);
613       goto done;
614     }
615
616   /*
617    * Unsubscribe from share data and fifos
618    */
619   if (s->rx_fifo)
620     {
621       svm_fifo_del_subscriber (s->rx_fifo, wrk->vpp_wrk_index);
622       svm_fifo_del_subscriber (s->tx_fifo, wrk->vpp_wrk_index);
623     }
624   vec_del1 (vls_shd->workers_subscribed, pos);
625
626   /*
627    * Cleanup vcl state
628    */
629   n_subscribers = vec_len (vls_shd->workers_subscribed);
630   do_disconnect = s->session_state == STATE_LISTEN || !n_subscribers;
631   vcl_session_cleanup (wrk, s, vcl_session_handle (s), do_disconnect);
632
633   /*
634    * No subscriber left, cleanup shared data
635    */
636   if (!n_subscribers)
637     {
638       u32 shd_index = vls_shared_data_index (vls_shd);
639
640       clib_spinlock_unlock (&vls_shd->lock);
641       vls_shared_data_pool_runlock ();
642
643       vls_shared_data_free (shd_index);
644
645       /* All locks have been dropped */
646       return 0;
647     }
648
649   /* Return, if this is not the owning worker */
650   if (vls_shd->owner_wrk_index != wrk->wrk_index)
651     goto done;
652
653   ASSERT (vec_len (vls_shd->workers_subscribed));
654
655   /*
656    *  Check if we can change owner or close
657    */
658   vls_shd->owner_wrk_index = vls_shd->workers_subscribed[0];
659   vcl_send_session_worker_update (wrk, s, vls_shd->owner_wrk_index);
660
661   /* XXX is this still needed? */
662   if (vec_len (vls_shd->workers_subscribed) > 1)
663     clib_warning ("more workers need to be updated");
664
665 done:
666
667   clib_spinlock_unlock (&vls_shd->lock);
668   vls_shared_data_pool_runlock ();
669
670   return 0;
671 }
672
673 void
674 vls_init_share_session (vls_worker_t * vls_wrk, vcl_locked_session_t * vls)
675 {
676   vls_shared_data_t *vls_shd;
677
678   u32 vls_shd_index = vls_shared_data_alloc ();
679
680   vls_shared_data_pool_rlock ();
681
682   vls_shd = vls_shared_data_get (vls_shd_index);
683   vls_shd->owner_wrk_index = vls_wrk->wrk_index;
684   vls->shared_data_index = vls_shd_index;
685   vec_add1 (vls_shd->workers_subscribed, vls_wrk->wrk_index);
686
687   vls_shared_data_pool_runlock ();
688 }
689
690 void
691 vls_share_session (vls_worker_t * vls_wrk, vcl_locked_session_t * vls)
692 {
693   vcl_worker_t *vcl_wrk = vcl_worker_get (vls_wrk->wrk_index);
694   vls_shared_data_t *vls_shd;
695   vcl_session_t *s;
696
697   s = vcl_session_get (vcl_wrk, vls->session_index);
698   if (!s)
699     {
700       clib_warning ("wrk %u session %u vls %u NOT AVAILABLE",
701                     vcl_wrk->wrk_index, vls->session_index, vls->vls_index);
702       return;
703     }
704
705   ASSERT (vls->shared_data_index != ~0);
706
707   /* Reinit session lock */
708   clib_spinlock_init (&vls->lock);
709
710   vls_shared_data_pool_rlock ();
711
712   vls_shd = vls_shared_data_get (vls->shared_data_index);
713
714   clib_spinlock_lock (&vls_shd->lock);
715   vec_add1 (vls_shd->workers_subscribed, vls_wrk->wrk_index);
716   clib_spinlock_unlock (&vls_shd->lock);
717
718   vls_shared_data_pool_runlock ();
719
720   if (s->rx_fifo)
721     {
722       svm_fifo_add_subscriber (s->rx_fifo, vcl_wrk->vpp_wrk_index);
723       svm_fifo_add_subscriber (s->tx_fifo, vcl_wrk->vpp_wrk_index);
724     }
725   else if (s->session_state == STATE_LISTEN)
726     {
727       s->session_state = STATE_LISTEN_NO_MQ;
728     }
729 }
730
731 static void
732 vls_share_sessions (vls_worker_t * vls_parent_wrk, vls_worker_t * vls_wrk)
733 {
734   vcl_locked_session_t *vls, *parent_vls;
735
736   /* *INDENT-OFF* */
737   pool_foreach (vls, vls_wrk->vls_pool, ({
738     /* Initialize sharing on parent session */
739     if (vls->shared_data_index == ~0)
740       {
741         parent_vls = vls_session_get (vls_parent_wrk, vls->vls_index);
742         vls_init_share_session (vls_parent_wrk, parent_vls);
743         vls->shared_data_index = parent_vls->shared_data_index;
744       }
745     vls_share_session (vls_wrk, vls);
746   }));
747   /* *INDENT-ON* */
748 }
749
750 void
751 vls_worker_copy_on_fork (vcl_worker_t * parent_wrk)
752 {
753   vls_worker_t *vls_wrk = vls_worker_get_current (), *vls_parent_wrk;
754   vcl_worker_t *wrk = vcl_worker_get_current ();
755
756   /*
757    * init vcl worker
758    */
759   wrk->vpp_event_queues = vec_dup (parent_wrk->vpp_event_queues);
760   wrk->sessions = pool_dup (parent_wrk->sessions);
761   wrk->session_index_by_vpp_handles =
762     hash_dup (parent_wrk->session_index_by_vpp_handles);
763
764   /*
765    * init vls worker
766    */
767   vls_parent_wrk = vls_worker_get (parent_wrk->wrk_index);
768   vls_wrk->session_index_to_vlsh_table =
769     hash_dup (vls_parent_wrk->session_index_to_vlsh_table);
770   vls_wrk->vls_pool = pool_dup (vls_parent_wrk->vls_pool);
771
772   vls_share_sessions (vls_parent_wrk, vls_wrk);
773 }
774
775 static void
776 vls_mt_acq_locks (vcl_locked_session_t * vls, vls_mt_ops_t op, int *locks_acq)
777 {
778   vcl_worker_t *wrk = vcl_worker_get_current ();
779   vcl_session_t *s = 0;
780   int is_nonblk = 0;
781
782   if (vls)
783     {
784       s = vcl_session_get (wrk, vls->session_index);
785       if (PREDICT_FALSE (!s))
786         return;
787       is_nonblk = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
788     }
789
790   switch (op)
791     {
792     case VLS_MT_OP_READ:
793       if (!is_nonblk)
794         is_nonblk = vcl_session_read_ready (s) != 0;
795       if (!is_nonblk)
796         {
797           vls_mt_mq_lock ();
798           *locks_acq |= VLS_MT_LOCK_MQ;
799         }
800       break;
801     case VLS_MT_OP_WRITE:
802       ASSERT (s);
803       if (!is_nonblk)
804         is_nonblk = vcl_session_write_ready (s) != 0;
805       if (!is_nonblk)
806         {
807           vls_mt_mq_lock ();
808           *locks_acq |= VLS_MT_LOCK_MQ;
809         }
810       break;
811     case VLS_MT_OP_XPOLL:
812       vls_mt_mq_lock ();
813       *locks_acq |= VLS_MT_LOCK_MQ;
814       break;
815     case VLS_MT_OP_SPOOL:
816       vls_mt_spool_lock ();
817       *locks_acq |= VLS_MT_LOCK_SPOOL;
818       break;
819     default:
820       break;
821     }
822 }
823
824 static void
825 vls_mt_rel_locks (int locks_acq)
826 {
827   if (locks_acq & VLS_MT_LOCK_MQ)
828     vls_mt_mq_unlock ();
829   if (locks_acq & VLS_MT_LOCK_SPOOL)
830     vls_mt_create_unlock ();
831 }
832
833 static inline u8
834 vls_mt_session_should_migrate (vcl_locked_session_t * vls)
835 {
836   return (vls_mt_wrk_supported ()
837           && vls->worker_index != vcl_get_worker_index ());
838 }
839
840 static void
841 vls_mt_session_migrate (vcl_locked_session_t * vls)
842 {
843   u32 wrk_index = vcl_get_worker_index ();
844   vcl_worker_t *wrk;
845   u32 src_sid, sid;
846   vcl_session_t *session;
847   uword *p;
848
849   ASSERT (vls_mt_wrk_supported () && vls->worker_index != wrk_index);
850
851   /*
852    * VCL session on current vcl worker already allocated. Update current
853    * owner worker and index and return
854    */
855   if ((p = hash_get (vls->vcl_wrk_index_to_session_index, wrk_index)))
856     {
857       vls->worker_index = wrk_index;
858       vls->session_index = (u32) p[0];
859       return;
860     }
861
862   /*
863    * Ask vcl worker that owns the original vcl session to clone it into
864    * current vcl worker session pool
865    */
866
867   if (!(p = hash_get (vls->vcl_wrk_index_to_session_index,
868                       vls->owner_vcl_wrk_index)))
869     {
870       VERR ("session in owner worker(%u) is free", vls->owner_vcl_wrk_index);
871       ASSERT (0);
872       return;
873     }
874
875   src_sid = (u32) p[0];
876   wrk = vcl_worker_get_current ();
877   session = vcl_session_alloc (wrk);
878   sid = session->session_index;
879   vls_send_clone_and_share_rpc (wrk, vls, sid, vls_get_worker_index (),
880                                 vls->owner_vcl_wrk_index, vls->vls_index,
881                                 src_sid);
882   session->session_index = sid;
883   vls->worker_index = wrk_index;
884   vls->session_index = sid;
885   hash_set (vls->vcl_wrk_index_to_session_index, wrk_index, sid);
886   VDBG (1, "migrate session of worker (session): %u (%u) -> %u (%u)",
887         vls->owner_vcl_wrk_index, src_sid, wrk_index, sid);
888
889   if (PREDICT_FALSE (session->is_vep && session->vep.next_sh != ~0))
890     {
891       /* TODO: rollback? */
892       VERR ("can't migrate nonempty epoll session");
893       ASSERT (0);
894       return;
895     }
896   else if (PREDICT_FALSE (!session->is_vep &&
897                           session->session_state != STATE_CLOSED))
898     {
899       /* TODO: rollback? */
900       VERR ("migrate NOT supported, session_status (%u)",
901             session->session_state);
902       ASSERT (0);
903       return;
904     }
905 }
906
907 static inline void
908 vls_mt_detect (void)
909 {
910   if (PREDICT_FALSE (vcl_get_worker_index () == ~0))
911     vls_mt_add ();
912 }
913
914 #define vls_mt_guard(_vls, _op)                                         \
915   int _locks_acq = 0;                                                   \
916   if (vls_mt_wrk_supported ())                                          \
917     {                                                                   \
918       if (PREDICT_FALSE (_vls                                           \
919             && ((vcl_locked_session_t *)_vls)->worker_index !=          \
920                 vcl_get_worker_index ()))                               \
921           vls_mt_session_migrate (_vls);                                \
922     }                                                                   \
923   else                                                                  \
924     {                                                                   \
925       if (PREDICT_FALSE (vlsl->vls_mt_n_threads > 1))                   \
926         vls_mt_acq_locks (_vls, _op, &_locks_acq);                      \
927     }                                                                   \
928
929 #define vls_mt_unguard()                                                \
930   if (PREDICT_FALSE (_locks_acq))                                       \
931     vls_mt_rel_locks (_locks_acq)
932
933 int
934 vls_write (vls_handle_t vlsh, void *buf, size_t nbytes)
935 {
936   vcl_locked_session_t *vls;
937   int rv;
938
939   vls_mt_detect ();
940   if (!(vls = vls_get_w_dlock (vlsh)))
941     return VPPCOM_EBADFD;
942
943   vls_mt_guard (vls, VLS_MT_OP_WRITE);
944   rv = vppcom_session_write (vls_to_sh_tu (vls), buf, nbytes);
945   vls_mt_unguard ();
946   vls_get_and_unlock (vlsh);
947   return rv;
948 }
949
950 int
951 vls_write_msg (vls_handle_t vlsh, void *buf, size_t nbytes)
952 {
953   vcl_locked_session_t *vls;
954   int rv;
955
956   vls_mt_detect ();
957   if (!(vls = vls_get_w_dlock (vlsh)))
958     return VPPCOM_EBADFD;
959   vls_mt_guard (vls, VLS_MT_OP_WRITE);
960   rv = vppcom_session_write_msg (vls_to_sh_tu (vls), buf, nbytes);
961   vls_mt_unguard ();
962   vls_get_and_unlock (vlsh);
963   return rv;
964 }
965
966 int
967 vls_sendto (vls_handle_t vlsh, void *buf, int buflen, int flags,
968             vppcom_endpt_t * ep)
969 {
970   vcl_locked_session_t *vls;
971   int rv;
972
973   vls_mt_detect ();
974   if (!(vls = vls_get_w_dlock (vlsh)))
975     return VPPCOM_EBADFD;
976   vls_mt_guard (vls, VLS_MT_OP_WRITE);
977   rv = vppcom_session_sendto (vls_to_sh_tu (vls), buf, buflen, flags, ep);
978   vls_mt_unguard ();
979   vls_get_and_unlock (vlsh);
980   return rv;
981 }
982
983 ssize_t
984 vls_read (vls_handle_t vlsh, void *buf, size_t nbytes)
985 {
986   vcl_locked_session_t *vls;
987   int rv;
988
989   vls_mt_detect ();
990   if (!(vls = vls_get_w_dlock (vlsh)))
991     return VPPCOM_EBADFD;
992   vls_mt_guard (vls, VLS_MT_OP_READ);
993   rv = vppcom_session_read (vls_to_sh_tu (vls), buf, nbytes);
994   vls_mt_unguard ();
995   vls_get_and_unlock (vlsh);
996   return rv;
997 }
998
999 ssize_t
1000 vls_recvfrom (vls_handle_t vlsh, void *buffer, uint32_t buflen, int flags,
1001               vppcom_endpt_t * ep)
1002 {
1003   vcl_locked_session_t *vls;
1004   int rv;
1005
1006   vls_mt_detect ();
1007   if (!(vls = vls_get_w_dlock (vlsh)))
1008     return VPPCOM_EBADFD;
1009   vls_mt_guard (vls, VLS_MT_OP_READ);
1010   rv = vppcom_session_recvfrom (vls_to_sh_tu (vls), buffer, buflen, flags,
1011                                 ep);
1012   vls_mt_unguard ();
1013   vls_get_and_unlock (vlsh);
1014   return rv;
1015 }
1016
1017 int
1018 vls_attr (vls_handle_t vlsh, uint32_t op, void *buffer, uint32_t * buflen)
1019 {
1020   vcl_locked_session_t *vls;
1021   int rv;
1022
1023   vls_mt_detect ();
1024   if (!(vls = vls_get_w_dlock (vlsh)))
1025     return VPPCOM_EBADFD;
1026   if (vls_mt_session_should_migrate (vls))
1027     vls_mt_session_migrate (vls);
1028   rv = vppcom_session_attr (vls_to_sh_tu (vls), op, buffer, buflen);
1029   vls_get_and_unlock (vlsh);
1030   return rv;
1031 }
1032
1033 int
1034 vls_bind (vls_handle_t vlsh, vppcom_endpt_t * ep)
1035 {
1036   vcl_locked_session_t *vls;
1037   int rv;
1038
1039   vls_mt_detect ();
1040   if (!(vls = vls_get_w_dlock (vlsh)))
1041     return VPPCOM_EBADFD;
1042   rv = vppcom_session_bind (vls_to_sh_tu (vls), ep);
1043   vls_get_and_unlock (vlsh);
1044   return rv;
1045 }
1046
1047 int
1048 vls_listen (vls_handle_t vlsh, int q_len)
1049 {
1050   vcl_locked_session_t *vls;
1051   int rv;
1052
1053   vls_mt_detect ();
1054   if (!(vls = vls_get_w_dlock (vlsh)))
1055     return VPPCOM_EBADFD;
1056   vls_mt_guard (vls, VLS_MT_OP_XPOLL);
1057   rv = vppcom_session_listen (vls_to_sh_tu (vls), q_len);
1058   vls_mt_unguard ();
1059   vls_get_and_unlock (vlsh);
1060   return rv;
1061 }
1062
1063 int
1064 vls_connect (vls_handle_t vlsh, vppcom_endpt_t * server_ep)
1065 {
1066   vcl_locked_session_t *vls;
1067   int rv;
1068
1069   vls_mt_detect ();
1070   if (!(vls = vls_get_w_dlock (vlsh)))
1071     return VPPCOM_EBADFD;
1072   vls_mt_guard (vls, VLS_MT_OP_XPOLL);
1073   rv = vppcom_session_connect (vls_to_sh_tu (vls), server_ep);
1074   vls_mt_unguard ();
1075   vls_get_and_unlock (vlsh);
1076   return rv;
1077 }
1078
1079 static inline void
1080 vls_mp_checks (vcl_locked_session_t * vls, int is_add)
1081 {
1082   vcl_worker_t *wrk = vcl_worker_get_current ();
1083   vcl_session_t *s;
1084   u32 owner_wrk;
1085
1086   if (vls_mt_wrk_supported ())
1087     return;
1088
1089   s = vcl_session_get (wrk, vls->session_index);
1090   switch (s->session_state)
1091     {
1092     case STATE_LISTEN:
1093       if (is_add)
1094         {
1095           vls_listener_wrk_set (vls, vls->worker_index, 1 /* is_active */ );
1096           break;
1097         }
1098       vls_listener_wrk_stop_listen (vls, vls->worker_index);
1099       break;
1100     case STATE_LISTEN_NO_MQ:
1101       if (!is_add)
1102         break;
1103
1104       /* Register worker as listener */
1105       vls_listener_wrk_start_listen (vls, wrk->wrk_index);
1106
1107       /* If owner worker did not attempt to accept/xpoll on the session,
1108        * force a listen stop for it, since it may not be interested in
1109        * accepting new sessions.
1110        * This is pretty much a hack done to give app workers the illusion
1111        * that it is fine to listen and not accept new sessions for a
1112        * given listener. Without it, we would accumulate unhandled
1113        * accepts on the passive worker message queue. */
1114       owner_wrk = vls_shared_get_owner (vls);
1115       if (!vls_listener_wrk_is_active (vls, owner_wrk))
1116         vls_listener_wrk_stop_listen (vls, owner_wrk);
1117       break;
1118     default:
1119       break;
1120     }
1121 }
1122
1123 vls_handle_t
1124 vls_accept (vls_handle_t listener_vlsh, vppcom_endpt_t * ep, int flags)
1125 {
1126   vls_handle_t accepted_vlsh;
1127   vcl_locked_session_t *vls;
1128   int sh;
1129
1130   vls_mt_detect ();
1131   if (!(vls = vls_get_w_dlock (listener_vlsh)))
1132     return VPPCOM_EBADFD;
1133   if (vcl_n_workers () > 1)
1134     vls_mp_checks (vls, 1 /* is_add */ );
1135   vls_mt_guard (vls, VLS_MT_OP_SPOOL);
1136   sh = vppcom_session_accept (vls_to_sh_tu (vls), ep, flags);
1137   vls_mt_unguard ();
1138   vls_get_and_unlock (listener_vlsh);
1139   if (sh < 0)
1140     return sh;
1141   accepted_vlsh = vls_alloc (sh);
1142   if (PREDICT_FALSE (accepted_vlsh == VLS_INVALID_HANDLE))
1143     vppcom_session_close (sh);
1144   return accepted_vlsh;
1145 }
1146
1147 vls_handle_t
1148 vls_create (uint8_t proto, uint8_t is_nonblocking)
1149 {
1150   vcl_session_handle_t sh;
1151   vls_handle_t vlsh;
1152
1153   vls_mt_detect ();
1154   vls_mt_guard (0, VLS_MT_OP_SPOOL);
1155   sh = vppcom_session_create (proto, is_nonblocking);
1156   vls_mt_unguard ();
1157   if (sh == INVALID_SESSION_ID)
1158     return VLS_INVALID_HANDLE;
1159
1160   vlsh = vls_alloc (sh);
1161   if (PREDICT_FALSE (vlsh == VLS_INVALID_HANDLE))
1162     vppcom_session_close (sh);
1163
1164   return vlsh;
1165 }
1166
1167 static void
1168 vls_mt_session_cleanup (vcl_locked_session_t * vls)
1169 {
1170   u32 session_index, wrk_index, current_vcl_wrk;
1171   vcl_worker_t *wrk = vcl_worker_get_current ();
1172
1173   ASSERT (vls_mt_wrk_supported ());
1174
1175   current_vcl_wrk = vcl_get_worker_index ();
1176
1177   /* *INDENT-OFF* */
1178   hash_foreach (wrk_index, session_index, vls->vcl_wrk_index_to_session_index,
1179     ({
1180       if (current_vcl_wrk != wrk_index)
1181         vls_send_session_cleanup_rpc (wrk, wrk_index, session_index);
1182     }));
1183   /* *INDENT-ON* */
1184   hash_free (vls->vcl_wrk_index_to_session_index);
1185 }
1186
1187 int
1188 vls_close (vls_handle_t vlsh)
1189 {
1190   vcl_locked_session_t *vls;
1191   int rv;
1192
1193   vls_mt_detect ();
1194   vls_mt_table_wlock ();
1195
1196   vls = vls_get_and_lock (vlsh);
1197   if (!vls)
1198     {
1199       vls_mt_table_wunlock ();
1200       return VPPCOM_EBADFD;
1201     }
1202
1203   vls_mt_guard (vls, VLS_MT_OP_SPOOL);
1204
1205   if (vls_is_shared (vls))
1206     rv = vls_unshare_session (vls, vcl_worker_get_current ());
1207   else
1208     rv = vppcom_session_close (vls_to_sh (vls));
1209
1210   if (vls_mt_wrk_supported ())
1211     vls_mt_session_cleanup (vls);
1212
1213   vls_free (vls);
1214   vls_mt_unguard ();
1215
1216   vls_mt_table_wunlock ();
1217
1218   return rv;
1219 }
1220
1221 vls_handle_t
1222 vls_epoll_create (void)
1223 {
1224   vcl_session_handle_t sh;
1225   vls_handle_t vlsh;
1226
1227   vls_mt_detect ();
1228
1229   sh = vppcom_epoll_create ();
1230   if (sh == INVALID_SESSION_ID)
1231     return VLS_INVALID_HANDLE;
1232
1233   vlsh = vls_alloc (sh);
1234   if (vlsh == VLS_INVALID_HANDLE)
1235     vppcom_session_close (sh);
1236
1237   return vlsh;
1238 }
1239
1240 static void
1241 vls_epoll_ctl_mp_checks (vcl_locked_session_t * vls, int op)
1242 {
1243   if (vcl_n_workers () <= 1)
1244     {
1245       vlsl->epoll_mp_check = 1;
1246       return;
1247     }
1248
1249   if (op == EPOLL_CTL_MOD)
1250     return;
1251
1252   vlsl->epoll_mp_check = 1;
1253   vls_mp_checks (vls, op == EPOLL_CTL_ADD);
1254 }
1255
1256 int
1257 vls_epoll_ctl (vls_handle_t ep_vlsh, int op, vls_handle_t vlsh,
1258                struct epoll_event *event)
1259 {
1260   vcl_locked_session_t *ep_vls, *vls;
1261   vcl_session_handle_t ep_sh, sh;
1262   int rv;
1263
1264   vls_mt_detect ();
1265   vls_mt_table_rlock ();
1266   ep_vls = vls_get_and_lock (ep_vlsh);
1267   vls = vls_get_and_lock (vlsh);
1268
1269   if (vls_mt_session_should_migrate (ep_vls))
1270     vls_mt_session_migrate (ep_vls);
1271
1272   ep_sh = vls_to_sh (ep_vls);
1273   sh = vls_to_sh (vls);
1274
1275   if (PREDICT_FALSE (!vlsl->epoll_mp_check))
1276     vls_epoll_ctl_mp_checks (vls, op);
1277
1278   vls_mt_table_runlock ();
1279
1280   rv = vppcom_epoll_ctl (ep_sh, op, sh, event);
1281
1282   vls_mt_table_rlock ();
1283   ep_vls = vls_get (ep_vlsh);
1284   vls = vls_get (vlsh);
1285   vls_unlock (vls);
1286   vls_unlock (ep_vls);
1287   vls_mt_table_runlock ();
1288   return rv;
1289 }
1290
1291 int
1292 vls_epoll_wait (vls_handle_t ep_vlsh, struct epoll_event *events,
1293                 int maxevents, double wait_for_time)
1294 {
1295   vcl_locked_session_t *vls;
1296   int rv;
1297
1298   vls_mt_detect ();
1299   if (!(vls = vls_get_w_dlock (ep_vlsh)))
1300     return VPPCOM_EBADFD;
1301   vls_mt_guard (0, VLS_MT_OP_XPOLL);
1302   rv = vppcom_epoll_wait (vls_to_sh_tu (vls), events, maxevents,
1303                           wait_for_time);
1304   vls_mt_unguard ();
1305   vls_get_and_unlock (ep_vlsh);
1306   return rv;
1307 }
1308
1309 static void
1310 vls_select_mp_checks (vcl_si_set * read_map)
1311 {
1312   vcl_locked_session_t *vls;
1313   vcl_worker_t *wrk;
1314   vcl_session_t *s;
1315   u32 si;
1316
1317   if (vcl_n_workers () <= 1)
1318     {
1319       vlsl->select_mp_check = 1;
1320       return;
1321     }
1322
1323   if (!read_map)
1324     return;
1325
1326   vlsl->select_mp_check = 1;
1327   wrk = vcl_worker_get_current ();
1328
1329   /* *INDENT-OFF* */
1330   clib_bitmap_foreach (si, read_map, ({
1331     s = vcl_session_get (wrk, si);
1332     if (s->session_state == STATE_LISTEN)
1333       {
1334         vls = vls_get (vls_session_index_to_vlsh (si));
1335         vls_mp_checks (vls, 1 /* is_add */);
1336       }
1337   }));
1338   /* *INDENT-ON* */
1339 }
1340
1341 int
1342 vls_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map,
1343             vcl_si_set * except_map, double wait_for_time)
1344 {
1345   int rv;
1346
1347   vls_mt_detect ();
1348   vls_mt_guard (0, VLS_MT_OP_XPOLL);
1349   if (PREDICT_FALSE (!vlsl->select_mp_check))
1350     vls_select_mp_checks (read_map);
1351   rv = vppcom_select (n_bits, read_map, write_map, except_map, wait_for_time);
1352   vls_mt_unguard ();
1353   return rv;
1354 }
1355
1356 static void
1357 vls_unshare_vcl_worker_sessions (vcl_worker_t * wrk)
1358 {
1359   u32 current_wrk, is_current;
1360   vcl_locked_session_t *vls;
1361   vcl_session_t *s;
1362
1363   if (pool_elts (vcm->workers) <= 1)
1364     return;
1365
1366   current_wrk = vcl_get_worker_index ();
1367   is_current = current_wrk == wrk->wrk_index;
1368
1369   /* *INDENT-OFF* */
1370   pool_foreach (s, wrk->sessions, ({
1371     vls = vls_get (vls_si_to_vlsh (s->session_index));
1372     if (vls && (is_current || vls_is_shared_by_wrk (vls, current_wrk)))
1373       vls_unshare_session (vls, wrk);
1374   }));
1375   /* *INDENT-ON* */
1376 }
1377
1378 static void
1379 vls_cleanup_vcl_worker (vcl_worker_t * wrk)
1380 {
1381   vls_worker_t *vls_wrk = vls_worker_get (wrk->wrk_index);
1382
1383   /* Unshare sessions and also cleanup worker since child may have
1384    * called _exit () and therefore vcl may not catch the event */
1385   vls_unshare_vcl_worker_sessions (wrk);
1386   vcl_worker_cleanup (wrk, 1 /* notify vpp */ );
1387
1388   vls_worker_free (vls_wrk);
1389 }
1390
1391 static void
1392 vls_cleanup_forked_child (vcl_worker_t * wrk, vcl_worker_t * child_wrk)
1393 {
1394   vcl_worker_t *sub_child;
1395   int tries = 0;
1396
1397   if (child_wrk->forked_child != ~0)
1398     {
1399       sub_child = vcl_worker_get_if_valid (child_wrk->forked_child);
1400       if (sub_child)
1401         {
1402           /* Wait a bit, maybe the process is going away */
1403           while (kill (sub_child->current_pid, 0) >= 0 && tries++ < 50)
1404             usleep (1e3);
1405           if (kill (sub_child->current_pid, 0) < 0)
1406             vls_cleanup_forked_child (child_wrk, sub_child);
1407         }
1408     }
1409   vls_cleanup_vcl_worker (child_wrk);
1410   VDBG (0, "Cleaned up forked child wrk %u", child_wrk->wrk_index);
1411   wrk->forked_child = ~0;
1412 }
1413
1414 static struct sigaction old_sa;
1415
1416 static void
1417 vls_intercept_sigchld_handler (int signum, siginfo_t * si, void *uc)
1418 {
1419   vcl_worker_t *wrk, *child_wrk;
1420
1421   if (vcl_get_worker_index () == ~0)
1422     return;
1423
1424   if (sigaction (SIGCHLD, &old_sa, 0))
1425     {
1426       VERR ("couldn't restore sigchld");
1427       exit (-1);
1428     }
1429
1430   wrk = vcl_worker_get_current ();
1431   if (wrk->forked_child == ~0)
1432     return;
1433
1434   child_wrk = vcl_worker_get_if_valid (wrk->forked_child);
1435   if (!child_wrk)
1436     goto done;
1437
1438   if (si && si->si_pid != child_wrk->current_pid)
1439     {
1440       VDBG (0, "unexpected child pid %u", si->si_pid);
1441       goto done;
1442     }
1443   vls_cleanup_forked_child (wrk, child_wrk);
1444
1445 done:
1446   if (old_sa.sa_flags & SA_SIGINFO)
1447     {
1448       void (*fn) (int, siginfo_t *, void *) = old_sa.sa_sigaction;
1449       fn (signum, si, uc);
1450     }
1451   else
1452     {
1453       void (*fn) (int) = old_sa.sa_handler;
1454       if (fn)
1455         fn (signum);
1456     }
1457 }
1458
1459 static void
1460 vls_incercept_sigchld ()
1461 {
1462   struct sigaction sa;
1463   clib_memset (&sa, 0, sizeof (sa));
1464   sa.sa_sigaction = vls_intercept_sigchld_handler;
1465   sa.sa_flags = SA_SIGINFO;
1466   if (sigaction (SIGCHLD, &sa, &old_sa))
1467     {
1468       VERR ("couldn't intercept sigchld");
1469       exit (-1);
1470     }
1471 }
1472
1473 static void
1474 vls_app_pre_fork (void)
1475 {
1476   vls_incercept_sigchld ();
1477   vcl_flush_mq_events ();
1478 }
1479
1480 static void
1481 vls_app_fork_child_handler (void)
1482 {
1483   vcl_worker_t *parent_wrk;
1484   int rv, parent_wrk_index;
1485   u8 *child_name;
1486
1487   parent_wrk_index = vcl_get_worker_index ();
1488   VDBG (0, "initializing forked child %u with parent wrk %u", getpid (),
1489         parent_wrk_index);
1490
1491   /*
1492    * Allocate worker vcl
1493    */
1494   vcl_set_worker_index (~0);
1495   if (!vcl_worker_alloc_and_init ())
1496     VERR ("couldn't allocate new worker");
1497
1498   /*
1499    * Attach to binary api
1500    */
1501   child_name = format (0, "%v-child-%u%c", vcm->app_name, getpid (), 0);
1502   vcl_cleanup_bapi ();
1503   vppcom_api_hookup ();
1504   vcm->app_state = STATE_APP_START;
1505   rv = vppcom_connect_to_vpp ((char *) child_name);
1506   vec_free (child_name);
1507   if (rv)
1508     {
1509       VERR ("couldn't connect to VPP!");
1510       return;
1511     }
1512
1513   /*
1514    * Allocate/initialize vls worker
1515    */
1516   vls_worker_alloc ();
1517
1518   /*
1519    * Register worker with vpp and share sessions
1520    */
1521   vcl_worker_register_with_vpp ();
1522   parent_wrk = vcl_worker_get (parent_wrk_index);
1523   vls_worker_copy_on_fork (parent_wrk);
1524   parent_wrk->forked_child = vcl_get_worker_index ();
1525
1526   /* Reset number of threads and set wrk index */
1527   vlsl->vls_mt_n_threads = 0;
1528   vlsl->vls_wrk_index = vcl_get_worker_index ();
1529   vlsl->select_mp_check = 0;
1530   vlsl->epoll_mp_check = 0;
1531   vls_mt_locks_init ();
1532
1533   VDBG (0, "forked child main worker initialized");
1534   vcm->forking = 0;
1535 }
1536
1537 static void
1538 vls_app_fork_parent_handler (void)
1539 {
1540   vcm->forking = 1;
1541   while (vcm->forking)
1542     ;
1543 }
1544
1545 void
1546 vls_app_exit (void)
1547 {
1548   vls_worker_t *wrk = vls_worker_get_current ();
1549
1550   /* Unshare the sessions. VCL will clean up the worker */
1551   vls_unshare_vcl_worker_sessions (vcl_worker_get_current ());
1552   vls_worker_free (wrk);
1553 }
1554
1555 static void
1556 vls_clone_and_share_rpc_handler (void *args)
1557 {
1558   vls_clone_and_share_msg_t *msg = (vls_clone_and_share_msg_t *) args;
1559   vls_worker_t *wrk = vls_worker_get_current (), *dst_wrk;
1560   vcl_locked_session_t *vls, *dst_vls;
1561   vcl_worker_t *vcl_wrk = vcl_worker_get_current (), *dst_vcl_wrk;
1562   vcl_session_t *s, *dst_s;
1563
1564   vls = vls_session_get (wrk, msg->vls_index);
1565
1566   if (!vls_mt_wrk_supported ())
1567     vls_init_share_session (wrk, vls);
1568
1569   s = vcl_session_get (vcl_wrk, msg->session_index);
1570   dst_wrk = vls_worker_get (msg->origin_vls_wrk);
1571   dst_vcl_wrk = vcl_worker_get (msg->origin_vcl_wrk);
1572   dst_vls = vls_session_get (dst_wrk, msg->origin_vls_index);
1573   dst_vls->shared_data_index = vls->shared_data_index;
1574   dst_s = vcl_session_get (dst_vcl_wrk, msg->origin_session_index);
1575   clib_memcpy (dst_s, s, sizeof (*s));
1576
1577   dst_vcl_wrk->rpc_done = 1;
1578
1579   VDBG (1, "proces session clone of worker (session): %u (%u) -> %u (%u)",
1580         vcl_wrk->wrk_index, msg->session_index, dst_vcl_wrk->wrk_index,
1581         msg->origin_session_index);
1582 }
1583
1584 static void
1585 vls_session_cleanup_rpc_handler (void *args)
1586 {
1587   vls_sess_cleanup_msg_t *msg = (vls_sess_cleanup_msg_t *) args;
1588   vcl_worker_t *wrk = vcl_worker_get_current ();
1589   vcl_worker_t *dst_wrk = vcl_worker_get (msg->origin_vcl_wrk);
1590
1591   vppcom_session_close (vcl_session_handle_from_index (msg->session_index));
1592
1593   dst_wrk->rpc_done = 1;
1594
1595   VDBG (1, "proces session cleanup of worker (session): %u (%u) from %u ()",
1596         wrk->wrk_index, msg->session_index, dst_wrk->wrk_index);
1597 }
1598
1599 static void
1600 vls_rpc_handler (void *args)
1601 {
1602   vls_rpc_msg_t *msg = (vls_rpc_msg_t *) args;
1603   switch (msg->type)
1604     {
1605     case VLS_RPC_CLONE_AND_SHARE:
1606       vls_clone_and_share_rpc_handler (msg->data);
1607       break;
1608     case VLS_RPC_SESS_CLEANUP:
1609       vls_session_cleanup_rpc_handler (msg->data);
1610       break;
1611     default:
1612       break;
1613     }
1614 }
1615
1616 void
1617 vls_send_clone_and_share_rpc (vcl_worker_t * wrk,
1618                               vcl_locked_session_t * vls, u32 session_index,
1619                               u32 vls_wrk_index, u32 dst_wrk_index,
1620                               u32 dst_vls_index, u32 dst_session_index)
1621 {
1622   u8 data[sizeof (u8) + sizeof (vls_clone_and_share_msg_t)];
1623   vls_clone_and_share_msg_t *msg;
1624   vls_rpc_msg_t *rpc;
1625   int ret;
1626
1627   rpc = (vls_rpc_msg_t *) & data;
1628   rpc->type = VLS_RPC_CLONE_AND_SHARE;
1629   msg = (vls_clone_and_share_msg_t *) & rpc->data;
1630   msg->origin_vls_wrk = vls_wrk_index;
1631   msg->origin_vls_index = vls->vls_index;
1632   msg->origin_vcl_wrk = wrk->wrk_index;
1633   msg->origin_session_index = session_index;
1634   msg->vls_index = dst_vls_index;
1635   msg->session_index = dst_session_index;
1636
1637   wrk->rpc_done = 0;
1638   ret = vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data));
1639
1640   VDBG (1, "send session clone to wrk (session): %u (%u) -> %u (%u), ret=%d",
1641         dst_wrk_index, msg->session_index, msg->origin_vcl_wrk,
1642         msg->origin_session_index, ret);
1643   while (!ret && !wrk->rpc_done)
1644     ;
1645 }
1646
1647 void
1648 vls_send_session_cleanup_rpc (vcl_worker_t * wrk,
1649                               u32 dst_wrk_index, u32 dst_session_index)
1650 {
1651   u8 data[sizeof (u8) + sizeof (vls_sess_cleanup_msg_t)];
1652   vls_sess_cleanup_msg_t *msg;
1653   vls_rpc_msg_t *rpc;
1654   int ret;
1655
1656   rpc = (vls_rpc_msg_t *) & data;
1657   rpc->type = VLS_RPC_SESS_CLEANUP;
1658   msg = (vls_sess_cleanup_msg_t *) & rpc->data;
1659   msg->origin_vcl_wrk = wrk->wrk_index;
1660   msg->session_index = dst_session_index;
1661
1662   wrk->rpc_done = 0;
1663   ret = vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data));
1664
1665   VDBG (1, "send session cleanup to wrk (session): %u (%u) from %u, ret=%d",
1666         dst_wrk_index, msg->session_index, msg->origin_vcl_wrk, ret);
1667   while (!ret && !wrk->rpc_done)
1668     ;
1669 }
1670
1671 int
1672 vls_app_create (char *app_name)
1673 {
1674   int rv;
1675
1676   if ((rv = vppcom_app_create (app_name)))
1677     return rv;
1678
1679   vlsm = clib_mem_alloc (sizeof (vls_main_t));
1680   clib_memset (vlsm, 0, sizeof (*vlsm));
1681   clib_rwlock_init (&vlsm->vls_table_lock);
1682   clib_rwlock_init (&vlsm->shared_data_lock);
1683   pool_alloc (vlsm->workers, vcm->cfg.max_workers);
1684
1685   pthread_atfork (vls_app_pre_fork, vls_app_fork_parent_handler,
1686                   vls_app_fork_child_handler);
1687   atexit (vls_app_exit);
1688   vls_worker_alloc ();
1689   vlsl->vls_wrk_index = vcl_get_worker_index ();
1690   vls_mt_locks_init ();
1691   vcm->wrk_rpc_fn = vls_rpc_handler;
1692   return VPPCOM_OK;
1693 }
1694
1695 unsigned char
1696 vls_use_eventfd (void)
1697 {
1698   return vcm->cfg.use_mq_eventfd;
1699 }
1700
1701 unsigned char
1702 vls_mt_wrk_supported (void)
1703 {
1704   return vcm->cfg.mt_wrk_supported;
1705 }
1706
1707 int
1708 vls_use_real_epoll (void)
1709 {
1710   if (vcl_get_worker_index () == ~0)
1711     return 0;
1712
1713   return vcl_worker_get_current ()->vcl_needs_real_epoll;
1714 }
1715
1716 void
1717 vls_register_vcl_worker (void)
1718 {
1719   if (vppcom_worker_register () != VPPCOM_OK)
1720     {
1721       VERR ("failed to register worker");
1722       return;
1723     }
1724 }
1725
1726 /*
1727  * fd.io coding-style-patch-verification: ON
1728  *
1729  * Local Variables:
1730  * eval: (c-set-style "gnu")
1731  * End:
1732  */