vcl: more session struct cleanup
[vpp.git] / src / vcl / vcl_locked.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vcl/vcl_locked.h>
17 #include <vcl/vcl_private.h>
18
19 typedef struct vls_shared_data_
20 {
21   clib_spinlock_t lock;
22   u32 owner_wrk_index;
23   u32 *workers_subscribed;
24   clib_bitmap_t *listeners;
25 } vls_shared_data_t;
26
27 typedef struct vcl_locked_session_
28 {
29   clib_spinlock_t lock;
30   u32 session_index;
31   u32 worker_index;
32   u32 vls_index;
33   u32 shared_data_index;
34   /** VCL session owned by different workers because of migration */
35   u32 owner_vcl_wrk_index;
36   uword *vcl_wrk_index_to_session_index;
37 } vcl_locked_session_t;
38
39 typedef struct vls_worker_
40 {
41   vcl_locked_session_t *vls_pool;
42   uword *session_handle_to_vlsh_table;
43   u32 wrk_index;
44 } vls_worker_t;
45
46 typedef struct vls_local_
47 {
48   int vls_wrk_index;
49   volatile int vls_mt_n_threads;
50   pthread_mutex_t vls_mt_mq_mlock;
51   pthread_mutex_t vls_mt_spool_mlock;
52   volatile u8 select_mp_check;
53   volatile u8 epoll_mp_check;
54 } vls_process_local_t;
55
56 static vls_process_local_t vls_local;
57 static vls_process_local_t *vlsl = &vls_local;
58
59 typedef struct vls_main_
60 {
61   vls_worker_t *workers;
62   clib_rwlock_t vls_table_lock;
63   /** Pool of data shared by sessions owned by different workers */
64   vls_shared_data_t *shared_data_pool;
65   clib_rwlock_t shared_data_lock;
66 } vls_main_t;
67
68 vls_main_t *vlsm;
69
70 typedef enum vls_rpc_msg_type_
71 {
72   VLS_RPC_CLONE_AND_SHARE,
73   VLS_RPC_SESS_CLEANUP,
74 } vls_rpc_msg_type_e;
75
76 typedef struct vls_rpc_msg_
77 {
78   u8 type;
79   u8 data[0];
80 } vls_rpc_msg_t;
81
82 typedef struct vls_clone_and_share_msg_
83 {
84   u32 vls_index;                /**< vls to be shared */
85   u32 session_index;            /**< vcl session to be shared */
86   u32 origin_vls_wrk;           /**< vls worker that initiated the rpc */
87   u32 origin_vls_index;         /**< vls session of the originator */
88   u32 origin_vcl_wrk;           /**< vcl worker that initiated the rpc */
89   u32 origin_session_index;     /**< vcl session of the originator */
90 } vls_clone_and_share_msg_t;
91
92 typedef struct vls_sess_cleanup_msg_
93 {
94   u32 session_index;            /**< vcl session to be cleaned */
95   u32 origin_vcl_wrk;           /**< worker that initiated the rpc */
96 } vls_sess_cleanup_msg_t;
97
98 void vls_send_session_cleanup_rpc (vcl_worker_t * wrk,
99                                    u32 dst_wrk_index, u32 dst_session_index);
100 void vls_send_clone_and_share_rpc (vcl_worker_t * wrk,
101                                    vcl_locked_session_t * vls,
102                                    u32 session_index, u32 vls_wrk_index,
103                                    u32 dst_wrk_index, u32 dst_vls_index,
104                                    u32 dst_session_index);
105
106
107 static inline u32
108 vls_get_worker_index (void)
109 {
110   if (vls_mt_wrk_supported ())
111     return vlsl->vls_wrk_index;
112   else
113     return vcl_get_worker_index ();
114 }
115
116 static u32
117 vls_shared_data_alloc (void)
118 {
119   vls_shared_data_t *vls_shd;
120   u32 shd_index;
121
122   clib_rwlock_writer_lock (&vlsm->shared_data_lock);
123   pool_get_zero (vlsm->shared_data_pool, vls_shd);
124   clib_spinlock_init (&vls_shd->lock);
125   shd_index = vls_shd - vlsm->shared_data_pool;
126   clib_rwlock_writer_unlock (&vlsm->shared_data_lock);
127
128   return shd_index;
129 }
130
131 static u32
132 vls_shared_data_index (vls_shared_data_t * vls_shd)
133 {
134   return vls_shd - vlsm->shared_data_pool;
135 }
136
137 vls_shared_data_t *
138 vls_shared_data_get (u32 shd_index)
139 {
140   if (pool_is_free_index (vlsm->shared_data_pool, shd_index))
141     return 0;
142   return pool_elt_at_index (vlsm->shared_data_pool, shd_index);
143 }
144
145 static void
146 vls_shared_data_free (u32 shd_index)
147 {
148   vls_shared_data_t *vls_shd;
149
150   clib_rwlock_writer_lock (&vlsm->shared_data_lock);
151   vls_shd = vls_shared_data_get (shd_index);
152   clib_spinlock_free (&vls_shd->lock);
153   clib_bitmap_free (vls_shd->listeners);
154   vec_free (vls_shd->workers_subscribed);
155   pool_put (vlsm->shared_data_pool, vls_shd);
156   clib_rwlock_writer_unlock (&vlsm->shared_data_lock);
157 }
158
159 static inline void
160 vls_shared_data_pool_rlock (void)
161 {
162   clib_rwlock_reader_lock (&vlsm->shared_data_lock);
163 }
164
165 static inline void
166 vls_shared_data_pool_runlock (void)
167 {
168   clib_rwlock_reader_unlock (&vlsm->shared_data_lock);
169 }
170
171 static inline void
172 vls_mt_table_rlock (void)
173 {
174   if (vlsl->vls_mt_n_threads > 1)
175     clib_rwlock_reader_lock (&vlsm->vls_table_lock);
176 }
177
178 static inline void
179 vls_mt_table_runlock (void)
180 {
181   if (vlsl->vls_mt_n_threads > 1)
182     clib_rwlock_reader_unlock (&vlsm->vls_table_lock);
183 }
184
185 static inline void
186 vls_mt_table_wlock (void)
187 {
188   if (vlsl->vls_mt_n_threads > 1)
189     clib_rwlock_writer_lock (&vlsm->vls_table_lock);
190 }
191
192 static inline void
193 vls_mt_table_wunlock (void)
194 {
195   if (vlsl->vls_mt_n_threads > 1)
196     clib_rwlock_writer_unlock (&vlsm->vls_table_lock);
197 }
198
199 typedef enum
200 {
201   VLS_MT_OP_READ,
202   VLS_MT_OP_WRITE,
203   VLS_MT_OP_SPOOL,
204   VLS_MT_OP_XPOLL,
205 } vls_mt_ops_t;
206
207 typedef enum
208 {
209   VLS_MT_LOCK_MQ = 1 << 0,
210   VLS_MT_LOCK_SPOOL = 1 << 1
211 } vls_mt_lock_type_t;
212
213 static void
214 vls_mt_add (void)
215 {
216   vlsl->vls_mt_n_threads += 1;
217
218   /* If multi-thread workers are supported, for each new thread register a new
219    * vcl worker with vpp. Otherwise, all threads use the same vcl worker, so
220    * update the vcl worker's thread local worker index variable */
221   if (vls_mt_wrk_supported ())
222     vls_register_vcl_worker ();
223   else
224     vcl_set_worker_index (vlsl->vls_wrk_index);
225 }
226
227 static inline void
228 vls_mt_mq_lock (void)
229 {
230   pthread_mutex_lock (&vlsl->vls_mt_mq_mlock);
231 }
232
233 static inline void
234 vls_mt_mq_unlock (void)
235 {
236   pthread_mutex_unlock (&vlsl->vls_mt_mq_mlock);
237 }
238
239 static inline void
240 vls_mt_spool_lock (void)
241 {
242   pthread_mutex_lock (&vlsl->vls_mt_spool_mlock);
243 }
244
245 static inline void
246 vls_mt_create_unlock (void)
247 {
248   pthread_mutex_unlock (&vlsl->vls_mt_spool_mlock);
249 }
250
251 static void
252 vls_mt_locks_init (void)
253 {
254   pthread_mutex_init (&vlsl->vls_mt_mq_mlock, NULL);
255   pthread_mutex_init (&vlsl->vls_mt_spool_mlock, NULL);
256 }
257
258 u8
259 vls_is_shared (vcl_locked_session_t * vls)
260 {
261   return (vls->shared_data_index != ~0);
262 }
263
264 static inline void
265 vls_lock (vcl_locked_session_t * vls)
266 {
267   if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls))
268     clib_spinlock_lock (&vls->lock);
269 }
270
271 static inline void
272 vls_unlock (vcl_locked_session_t * vls)
273 {
274   if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls))
275     clib_spinlock_unlock (&vls->lock);
276 }
277
278 static inline vcl_session_handle_t
279 vls_to_sh (vcl_locked_session_t * vls)
280 {
281   return vcl_session_handle_from_index (vls->session_index);
282 }
283
284 static inline vcl_session_handle_t
285 vls_to_sh_tu (vcl_locked_session_t * vls)
286 {
287   vcl_session_handle_t sh;
288   sh = vls_to_sh (vls);
289   vls_mt_table_runlock ();
290   return sh;
291 }
292
293 static vls_worker_t *
294 vls_worker_get_current (void)
295 {
296   return pool_elt_at_index (vlsm->workers, vls_get_worker_index ());
297 }
298
299 static void
300 vls_worker_alloc (void)
301 {
302   vls_worker_t *wrk;
303
304   pool_get_zero (vlsm->workers, wrk);
305   wrk->wrk_index = vcl_get_worker_index ();
306 }
307
308 static void
309 vls_worker_free (vls_worker_t * wrk)
310 {
311   hash_free (wrk->session_handle_to_vlsh_table);
312   pool_free (wrk->vls_pool);
313   pool_put (vlsm->workers, wrk);
314 }
315
316 static vls_worker_t *
317 vls_worker_get (u32 wrk_index)
318 {
319   if (pool_is_free_index (vlsm->workers, wrk_index))
320     return 0;
321   return pool_elt_at_index (vlsm->workers, wrk_index);
322 }
323
324 static vls_handle_t
325 vls_alloc (vcl_session_handle_t sh)
326 {
327   vls_worker_t *wrk = vls_worker_get_current ();
328   vcl_locked_session_t *vls;
329
330   vls_mt_table_wlock ();
331
332   pool_get_zero (wrk->vls_pool, vls);
333   vls->session_index = vppcom_session_index (sh);
334   vls->worker_index = vppcom_session_worker (sh);
335   vls->vls_index = vls - wrk->vls_pool;
336   vls->shared_data_index = ~0;
337   hash_set (wrk->session_handle_to_vlsh_table, sh, vls->vls_index);
338   if (vls_mt_wrk_supported ())
339     {
340       hash_set (vls->vcl_wrk_index_to_session_index, vls->worker_index,
341                 vls->session_index);
342       vls->owner_vcl_wrk_index = vls->worker_index;
343     }
344   clib_spinlock_init (&vls->lock);
345
346   vls_mt_table_wunlock ();
347   return vls->vls_index;
348 }
349
350 static vcl_locked_session_t *
351 vls_get (vls_handle_t vlsh)
352 {
353   vls_worker_t *wrk = vls_worker_get_current ();
354   if (pool_is_free_index (wrk->vls_pool, vlsh))
355     return 0;
356   return pool_elt_at_index (wrk->vls_pool, vlsh);
357 }
358
359 static void
360 vls_free (vcl_locked_session_t * vls)
361 {
362   vls_worker_t *wrk = vls_worker_get_current ();
363
364   ASSERT (vls != 0);
365   hash_unset (wrk->session_handle_to_vlsh_table,
366               vcl_session_handle_from_index (vls->session_index));
367   clib_spinlock_free (&vls->lock);
368   pool_put (wrk->vls_pool, vls);
369 }
370
371 static vcl_locked_session_t *
372 vls_get_and_lock (vls_handle_t vlsh)
373 {
374   vls_worker_t *wrk = vls_worker_get_current ();
375   vcl_locked_session_t *vls;
376   if (pool_is_free_index (wrk->vls_pool, vlsh))
377     return 0;
378   vls = pool_elt_at_index (wrk->vls_pool, vlsh);
379   vls_lock (vls);
380   return vls;
381 }
382
383 static vcl_locked_session_t *
384 vls_get_w_dlock (vls_handle_t vlsh)
385 {
386   vcl_locked_session_t *vls;
387   vls_mt_table_rlock ();
388   vls = vls_get_and_lock (vlsh);
389   if (!vls)
390     vls_mt_table_runlock ();
391   return vls;
392 }
393
394 static inline void
395 vls_get_and_unlock (vls_handle_t vlsh)
396 {
397   vcl_locked_session_t *vls;
398   vls_mt_table_rlock ();
399   vls = vls_get (vlsh);
400   vls_unlock (vls);
401   vls_mt_table_runlock ();
402 }
403
404 static inline void
405 vls_dunlock (vcl_locked_session_t * vls)
406 {
407   vls_unlock (vls);
408   vls_mt_table_runlock ();
409 }
410
411 static vcl_locked_session_t *
412 vls_session_get (vls_worker_t * wrk, u32 vls_index)
413 {
414   if (pool_is_free_index (wrk->vls_pool, vls_index))
415     return 0;
416   return pool_elt_at_index (wrk->vls_pool, vls_index);
417 }
418
419 vcl_session_handle_t
420 vlsh_to_sh (vls_handle_t vlsh)
421 {
422   vcl_locked_session_t *vls;
423   int rv;
424
425   vls = vls_get_w_dlock (vlsh);
426   if (!vls)
427     return INVALID_SESSION_ID;
428   rv = vls_to_sh (vls);
429   vls_dunlock (vls);
430   return rv;
431 }
432
433 vcl_session_handle_t
434 vlsh_to_session_index (vls_handle_t vlsh)
435 {
436   vcl_session_handle_t sh;
437   sh = vlsh_to_sh (vlsh);
438   return vppcom_session_index (sh);
439 }
440
441 vls_handle_t
442 vls_si_wi_to_vlsh (u32 session_index, u32 vcl_wrk_index)
443 {
444   vls_worker_t *wrk = vls_worker_get_current ();
445   uword *vlshp;
446   vlshp = hash_get (wrk->session_handle_to_vlsh_table,
447                     vcl_session_handle_from_wrk_session_index (session_index,
448                                                                vcl_wrk_index));
449   return vlshp ? *vlshp : VLS_INVALID_HANDLE;
450 }
451
452 vls_handle_t
453 vls_session_index_to_vlsh (uint32_t session_index)
454 {
455   vls_handle_t vlsh;
456
457   vls_mt_table_rlock ();
458   vlsh = vls_si_wi_to_vlsh (session_index, vcl_get_worker_index ());
459   vls_mt_table_runlock ();
460
461   return vlsh;
462 }
463
464 u8
465 vls_is_shared_by_wrk (vcl_locked_session_t * vls, u32 wrk_index)
466 {
467   vls_shared_data_t *vls_shd;
468   int i;
469
470   if (vls->shared_data_index == ~0)
471     return 0;
472
473   vls_shared_data_pool_rlock ();
474
475   vls_shd = vls_shared_data_get (vls->shared_data_index);
476   clib_spinlock_lock (&vls_shd->lock);
477
478   for (i = 0; i < vec_len (vls_shd->workers_subscribed); i++)
479     if (vls_shd->workers_subscribed[i] == wrk_index)
480       {
481         clib_spinlock_unlock (&vls_shd->lock);
482         vls_shared_data_pool_runlock ();
483         return 1;
484       }
485   clib_spinlock_unlock (&vls_shd->lock);
486
487   vls_shared_data_pool_runlock ();
488   return 0;
489 }
490
491 static void
492 vls_listener_wrk_set (vcl_locked_session_t * vls, u32 wrk_index, u8 is_active)
493 {
494   vls_shared_data_t *vls_shd;
495
496   if (vls->shared_data_index == ~0)
497     {
498       clib_warning ("not a shared session");
499       return;
500     }
501
502   vls_shared_data_pool_rlock ();
503
504   vls_shd = vls_shared_data_get (vls->shared_data_index);
505
506   clib_spinlock_lock (&vls_shd->lock);
507   clib_bitmap_set (vls_shd->listeners, wrk_index, is_active);
508   clib_spinlock_unlock (&vls_shd->lock);
509
510   vls_shared_data_pool_runlock ();
511 }
512
513 static u32
514 vls_shared_get_owner (vcl_locked_session_t * vls)
515 {
516   vls_shared_data_t *vls_shd;
517   u32 owner_wrk;
518
519   vls_shared_data_pool_rlock ();
520
521   vls_shd = vls_shared_data_get (vls->shared_data_index);
522   owner_wrk = vls_shd->owner_wrk_index;
523
524   vls_shared_data_pool_runlock ();
525
526   return owner_wrk;
527 }
528
529 static u8
530 vls_listener_wrk_is_active (vcl_locked_session_t * vls, u32 wrk_index)
531 {
532   vls_shared_data_t *vls_shd;
533   u8 is_set;
534
535   if (vls->shared_data_index == ~0)
536     {
537       clib_warning ("not a shared session");
538       return 0;
539     }
540
541   vls_shared_data_pool_rlock ();
542
543   vls_shd = vls_shared_data_get (vls->shared_data_index);
544
545   clib_spinlock_lock (&vls_shd->lock);
546   is_set = clib_bitmap_get (vls_shd->listeners, wrk_index);
547   clib_spinlock_unlock (&vls_shd->lock);
548
549   vls_shared_data_pool_runlock ();
550
551   return (is_set == 1);
552 }
553
554 static void
555 vls_listener_wrk_start_listen (vcl_locked_session_t * vls, u32 wrk_index)
556 {
557   vppcom_session_listen (vls_to_sh (vls), ~0);
558   vls_listener_wrk_set (vls, wrk_index, 1 /* is_active */ );
559 }
560
561 static void
562 vls_listener_wrk_stop_listen (vcl_locked_session_t * vls, u32 wrk_index)
563 {
564   vcl_worker_t *wrk;
565   vcl_session_t *s;
566
567   wrk = vcl_worker_get (wrk_index);
568   s = vcl_session_get (wrk, vls->session_index);
569   if (s->session_state != VCL_STATE_LISTEN)
570     return;
571   vcl_send_session_unlisten (wrk, s);
572   s->session_state = VCL_STATE_LISTEN_NO_MQ;
573   vls_listener_wrk_set (vls, wrk_index, 0 /* is_active */ );
574 }
575
576 static int
577 vls_shared_data_subscriber_position (vls_shared_data_t * vls_shd,
578                                      u32 wrk_index)
579 {
580   int i;
581
582   for (i = 0; i < vec_len (vls_shd->workers_subscribed); i++)
583     {
584       if (vls_shd->workers_subscribed[i] == wrk_index)
585         return i;
586     }
587   return -1;
588 }
589
590 int
591 vls_unshare_session (vcl_locked_session_t * vls, vcl_worker_t * wrk)
592 {
593   vls_shared_data_t *vls_shd;
594   int do_disconnect, pos;
595   u32 n_subscribers;
596   vcl_session_t *s;
597
598   if (vls->shared_data_index == ~0)
599     return 0;
600
601   s = vcl_session_get (wrk, vls->session_index);
602   if (s->session_state == VCL_STATE_LISTEN)
603     vls_listener_wrk_set (vls, wrk->wrk_index, 0 /* is_active */ );
604
605   vls_shared_data_pool_rlock ();
606
607   vls_shd = vls_shared_data_get (vls->shared_data_index);
608   clib_spinlock_lock (&vls_shd->lock);
609
610   pos = vls_shared_data_subscriber_position (vls_shd, wrk->wrk_index);
611   if (pos < 0)
612     {
613       clib_warning ("worker %u not subscribed for vls %u", wrk->wrk_index,
614                     vls->worker_index);
615       goto done;
616     }
617
618   /*
619    * Unsubscribe from share data and fifos
620    */
621   if (s->rx_fifo)
622     {
623       svm_fifo_del_subscriber (s->rx_fifo, wrk->vpp_wrk_index);
624       svm_fifo_del_subscriber (s->tx_fifo, wrk->vpp_wrk_index);
625     }
626   vec_del1 (vls_shd->workers_subscribed, pos);
627
628   /*
629    * Cleanup vcl state
630    */
631   n_subscribers = vec_len (vls_shd->workers_subscribed);
632   do_disconnect = s->session_state == VCL_STATE_LISTEN || !n_subscribers;
633   vcl_session_cleanup (wrk, s, vcl_session_handle (s), do_disconnect);
634
635   /*
636    * No subscriber left, cleanup shared data
637    */
638   if (!n_subscribers)
639     {
640       u32 shd_index = vls_shared_data_index (vls_shd);
641
642       clib_spinlock_unlock (&vls_shd->lock);
643       vls_shared_data_pool_runlock ();
644
645       vls_shared_data_free (shd_index);
646
647       /* All locks have been dropped */
648       return 0;
649     }
650
651   /* Return, if this is not the owning worker */
652   if (vls_shd->owner_wrk_index != wrk->wrk_index)
653     goto done;
654
655   ASSERT (vec_len (vls_shd->workers_subscribed));
656
657   /*
658    *  Check if we can change owner or close
659    */
660   vls_shd->owner_wrk_index = vls_shd->workers_subscribed[0];
661   vcl_send_session_worker_update (wrk, s, vls_shd->owner_wrk_index);
662
663   /* XXX is this still needed? */
664   if (vec_len (vls_shd->workers_subscribed) > 1)
665     clib_warning ("more workers need to be updated");
666
667 done:
668
669   clib_spinlock_unlock (&vls_shd->lock);
670   vls_shared_data_pool_runlock ();
671
672   return 0;
673 }
674
675 void
676 vls_init_share_session (vls_worker_t * vls_wrk, vcl_locked_session_t * vls)
677 {
678   vls_shared_data_t *vls_shd;
679
680   u32 vls_shd_index = vls_shared_data_alloc ();
681
682   vls_shared_data_pool_rlock ();
683
684   vls_shd = vls_shared_data_get (vls_shd_index);
685   vls_shd->owner_wrk_index = vls_wrk->wrk_index;
686   vls->shared_data_index = vls_shd_index;
687   vec_add1 (vls_shd->workers_subscribed, vls_wrk->wrk_index);
688
689   vls_shared_data_pool_runlock ();
690 }
691
692 void
693 vls_share_session (vls_worker_t * vls_wrk, vcl_locked_session_t * vls)
694 {
695   vcl_worker_t *vcl_wrk = vcl_worker_get (vls_wrk->wrk_index);
696   vls_shared_data_t *vls_shd;
697   vcl_session_t *s;
698
699   s = vcl_session_get (vcl_wrk, vls->session_index);
700   if (!s)
701     {
702       clib_warning ("wrk %u session %u vls %u NOT AVAILABLE",
703                     vcl_wrk->wrk_index, vls->session_index, vls->vls_index);
704       return;
705     }
706
707   ASSERT (vls->shared_data_index != ~0);
708
709   /* Reinit session lock */
710   clib_spinlock_init (&vls->lock);
711
712   vls_shared_data_pool_rlock ();
713
714   vls_shd = vls_shared_data_get (vls->shared_data_index);
715
716   clib_spinlock_lock (&vls_shd->lock);
717   vec_add1 (vls_shd->workers_subscribed, vls_wrk->wrk_index);
718   clib_spinlock_unlock (&vls_shd->lock);
719
720   vls_shared_data_pool_runlock ();
721
722   if (s->rx_fifo)
723     {
724       svm_fifo_add_subscriber (s->rx_fifo, vcl_wrk->vpp_wrk_index);
725       svm_fifo_add_subscriber (s->tx_fifo, vcl_wrk->vpp_wrk_index);
726     }
727   else if (s->session_state == VCL_STATE_LISTEN)
728     {
729       s->session_state = VCL_STATE_LISTEN_NO_MQ;
730     }
731 }
732
733 static void
734 vls_share_sessions (vls_worker_t * vls_parent_wrk, vls_worker_t * vls_wrk)
735 {
736   vcl_locked_session_t *vls, *parent_vls;
737
738   /* *INDENT-OFF* */
739   pool_foreach (vls, vls_wrk->vls_pool, ({
740     /* Initialize sharing on parent session */
741     if (vls->shared_data_index == ~0)
742       {
743         parent_vls = vls_session_get (vls_parent_wrk, vls->vls_index);
744         vls_init_share_session (vls_parent_wrk, parent_vls);
745         vls->shared_data_index = parent_vls->shared_data_index;
746       }
747     vls_share_session (vls_wrk, vls);
748   }));
749   /* *INDENT-ON* */
750 }
751
752 void
753 vls_worker_copy_on_fork (vcl_worker_t * parent_wrk)
754 {
755   vls_worker_t *vls_wrk = vls_worker_get_current (), *vls_parent_wrk;
756   vcl_worker_t *wrk = vcl_worker_get_current ();
757   u32 vls_index, session_index, wrk_index;
758   vcl_session_handle_t sh;
759
760   /*
761    * init vcl worker
762    */
763   wrk->vpp_event_queues = vec_dup (parent_wrk->vpp_event_queues);
764   wrk->sessions = pool_dup (parent_wrk->sessions);
765   wrk->session_index_by_vpp_handles =
766     hash_dup (parent_wrk->session_index_by_vpp_handles);
767
768   /*
769    * init vls worker
770    */
771   vls_parent_wrk = vls_worker_get (parent_wrk->wrk_index);
772   /* *INDENT-OFF* */
773   hash_foreach (sh, vls_index, vls_parent_wrk->session_handle_to_vlsh_table,
774     ({
775       vcl_session_handle_parse (sh, &wrk_index, &session_index);
776       hash_set (vls_wrk->session_handle_to_vlsh_table,
777                 vcl_session_handle_from_index (session_index), vls_index);
778     }));
779   /* *INDENT-ON* */
780   vls_wrk->vls_pool = pool_dup (vls_parent_wrk->vls_pool);
781
782   vls_share_sessions (vls_parent_wrk, vls_wrk);
783 }
784
785 static void
786 vls_mt_acq_locks (vcl_locked_session_t * vls, vls_mt_ops_t op, int *locks_acq)
787 {
788   vcl_worker_t *wrk = vcl_worker_get_current ();
789   vcl_session_t *s = 0;
790   int is_nonblk = 0;
791
792   if (vls)
793     {
794       s = vcl_session_get (wrk, vls->session_index);
795       if (PREDICT_FALSE (!s))
796         return;
797       is_nonblk = vcl_session_has_attr (s, VCL_SESS_ATTR_NONBLOCK);
798     }
799
800   switch (op)
801     {
802     case VLS_MT_OP_READ:
803       if (!is_nonblk)
804         is_nonblk = vcl_session_read_ready (s) != 0;
805       if (!is_nonblk)
806         {
807           vls_mt_mq_lock ();
808           *locks_acq |= VLS_MT_LOCK_MQ;
809         }
810       break;
811     case VLS_MT_OP_WRITE:
812       ASSERT (s);
813       if (!is_nonblk)
814         is_nonblk = vcl_session_write_ready (s) != 0;
815       if (!is_nonblk)
816         {
817           vls_mt_mq_lock ();
818           *locks_acq |= VLS_MT_LOCK_MQ;
819         }
820       break;
821     case VLS_MT_OP_XPOLL:
822       vls_mt_mq_lock ();
823       *locks_acq |= VLS_MT_LOCK_MQ;
824       break;
825     case VLS_MT_OP_SPOOL:
826       vls_mt_spool_lock ();
827       *locks_acq |= VLS_MT_LOCK_SPOOL;
828       break;
829     default:
830       break;
831     }
832 }
833
834 static void
835 vls_mt_rel_locks (int locks_acq)
836 {
837   if (locks_acq & VLS_MT_LOCK_MQ)
838     vls_mt_mq_unlock ();
839   if (locks_acq & VLS_MT_LOCK_SPOOL)
840     vls_mt_create_unlock ();
841 }
842
843 static inline u8
844 vls_mt_session_should_migrate (vcl_locked_session_t * vls)
845 {
846   return (vls_mt_wrk_supported ()
847           && vls->worker_index != vcl_get_worker_index ());
848 }
849
850 static void
851 vls_mt_session_migrate (vcl_locked_session_t * vls)
852 {
853   u32 wrk_index = vcl_get_worker_index ();
854   vcl_worker_t *wrk;
855   u32 src_sid, sid;
856   vcl_session_t *session;
857   uword *p;
858
859   ASSERT (vls_mt_wrk_supported () && vls->worker_index != wrk_index);
860
861   /*
862    * VCL session on current vcl worker already allocated. Update current
863    * owner worker and index and return
864    */
865   if ((p = hash_get (vls->vcl_wrk_index_to_session_index, wrk_index)))
866     {
867       vls->worker_index = wrk_index;
868       vls->session_index = (u32) p[0];
869       return;
870     }
871
872   /*
873    * Ask vcl worker that owns the original vcl session to clone it into
874    * current vcl worker session pool
875    */
876
877   if (!(p = hash_get (vls->vcl_wrk_index_to_session_index,
878                       vls->owner_vcl_wrk_index)))
879     {
880       VERR ("session in owner worker(%u) is free", vls->owner_vcl_wrk_index);
881       ASSERT (0);
882       return;
883     }
884
885   src_sid = (u32) p[0];
886   wrk = vcl_worker_get_current ();
887   session = vcl_session_alloc (wrk);
888   sid = session->session_index;
889   vls_send_clone_and_share_rpc (wrk, vls, sid, vls_get_worker_index (),
890                                 vls->owner_vcl_wrk_index, vls->vls_index,
891                                 src_sid);
892   session->session_index = sid;
893   vls->worker_index = wrk_index;
894   vls->session_index = sid;
895   hash_set (vls->vcl_wrk_index_to_session_index, wrk_index, sid);
896   VDBG (1, "migrate session of worker (session): %u (%u) -> %u (%u)",
897         vls->owner_vcl_wrk_index, src_sid, wrk_index, sid);
898
899   if (PREDICT_FALSE ((session->flags & VCL_SESSION_F_IS_VEP)
900                      && session->vep.next_sh != ~0))
901     {
902       /* TODO: rollback? */
903       VERR ("can't migrate nonempty epoll session");
904       ASSERT (0);
905       return;
906     }
907   else if (PREDICT_FALSE (!(session->flags & VCL_SESSION_F_IS_VEP) &&
908                           session->session_state != VCL_STATE_CLOSED))
909     {
910       /* TODO: rollback? */
911       VERR ("migrate NOT supported, session_status (%u)",
912             session->session_state);
913       ASSERT (0);
914       return;
915     }
916 }
917
918 static inline void
919 vls_mt_detect (void)
920 {
921   if (PREDICT_FALSE (vcl_get_worker_index () == ~0))
922     vls_mt_add ();
923 }
924
925 #define vls_mt_guard(_vls, _op)                                         \
926   int _locks_acq = 0;                                                   \
927   if (vls_mt_wrk_supported ())                                          \
928     {                                                                   \
929       if (PREDICT_FALSE (_vls                                           \
930             && ((vcl_locked_session_t *)_vls)->worker_index !=          \
931                 vcl_get_worker_index ()))                               \
932           vls_mt_session_migrate (_vls);                                \
933     }                                                                   \
934   else                                                                  \
935     {                                                                   \
936       if (PREDICT_FALSE (vlsl->vls_mt_n_threads > 1))                   \
937         vls_mt_acq_locks (_vls, _op, &_locks_acq);                      \
938     }                                                                   \
939
940 #define vls_mt_unguard()                                                \
941   if (PREDICT_FALSE (_locks_acq))                                       \
942     vls_mt_rel_locks (_locks_acq)
943
944 int
945 vls_write (vls_handle_t vlsh, void *buf, size_t nbytes)
946 {
947   vcl_locked_session_t *vls;
948   int rv;
949
950   vls_mt_detect ();
951   if (!(vls = vls_get_w_dlock (vlsh)))
952     return VPPCOM_EBADFD;
953
954   vls_mt_guard (vls, VLS_MT_OP_WRITE);
955   rv = vppcom_session_write (vls_to_sh_tu (vls), buf, nbytes);
956   vls_mt_unguard ();
957   vls_get_and_unlock (vlsh);
958   return rv;
959 }
960
961 int
962 vls_write_msg (vls_handle_t vlsh, void *buf, size_t nbytes)
963 {
964   vcl_locked_session_t *vls;
965   int rv;
966
967   vls_mt_detect ();
968   if (!(vls = vls_get_w_dlock (vlsh)))
969     return VPPCOM_EBADFD;
970   vls_mt_guard (vls, VLS_MT_OP_WRITE);
971   rv = vppcom_session_write_msg (vls_to_sh_tu (vls), buf, nbytes);
972   vls_mt_unguard ();
973   vls_get_and_unlock (vlsh);
974   return rv;
975 }
976
977 int
978 vls_sendto (vls_handle_t vlsh, void *buf, int buflen, int flags,
979             vppcom_endpt_t * ep)
980 {
981   vcl_locked_session_t *vls;
982   int rv;
983
984   vls_mt_detect ();
985   if (!(vls = vls_get_w_dlock (vlsh)))
986     return VPPCOM_EBADFD;
987   vls_mt_guard (vls, VLS_MT_OP_WRITE);
988   rv = vppcom_session_sendto (vls_to_sh_tu (vls), buf, buflen, flags, ep);
989   vls_mt_unguard ();
990   vls_get_and_unlock (vlsh);
991   return rv;
992 }
993
994 ssize_t
995 vls_read (vls_handle_t vlsh, void *buf, size_t nbytes)
996 {
997   vcl_locked_session_t *vls;
998   int rv;
999
1000   vls_mt_detect ();
1001   if (!(vls = vls_get_w_dlock (vlsh)))
1002     return VPPCOM_EBADFD;
1003   vls_mt_guard (vls, VLS_MT_OP_READ);
1004   rv = vppcom_session_read (vls_to_sh_tu (vls), buf, nbytes);
1005   vls_mt_unguard ();
1006   vls_get_and_unlock (vlsh);
1007   return rv;
1008 }
1009
1010 ssize_t
1011 vls_recvfrom (vls_handle_t vlsh, void *buffer, uint32_t buflen, int flags,
1012               vppcom_endpt_t * ep)
1013 {
1014   vcl_locked_session_t *vls;
1015   int rv;
1016
1017   vls_mt_detect ();
1018   if (!(vls = vls_get_w_dlock (vlsh)))
1019     return VPPCOM_EBADFD;
1020   vls_mt_guard (vls, VLS_MT_OP_READ);
1021   rv = vppcom_session_recvfrom (vls_to_sh_tu (vls), buffer, buflen, flags,
1022                                 ep);
1023   vls_mt_unguard ();
1024   vls_get_and_unlock (vlsh);
1025   return rv;
1026 }
1027
1028 int
1029 vls_attr (vls_handle_t vlsh, uint32_t op, void *buffer, uint32_t * buflen)
1030 {
1031   vcl_locked_session_t *vls;
1032   int rv;
1033
1034   vls_mt_detect ();
1035   if (!(vls = vls_get_w_dlock (vlsh)))
1036     return VPPCOM_EBADFD;
1037   if (vls_mt_session_should_migrate (vls))
1038     vls_mt_session_migrate (vls);
1039   rv = vppcom_session_attr (vls_to_sh_tu (vls), op, buffer, buflen);
1040   vls_get_and_unlock (vlsh);
1041   return rv;
1042 }
1043
1044 int
1045 vls_bind (vls_handle_t vlsh, vppcom_endpt_t * ep)
1046 {
1047   vcl_locked_session_t *vls;
1048   int rv;
1049
1050   vls_mt_detect ();
1051   if (!(vls = vls_get_w_dlock (vlsh)))
1052     return VPPCOM_EBADFD;
1053   rv = vppcom_session_bind (vls_to_sh_tu (vls), ep);
1054   vls_get_and_unlock (vlsh);
1055   return rv;
1056 }
1057
1058 int
1059 vls_listen (vls_handle_t vlsh, int q_len)
1060 {
1061   vcl_locked_session_t *vls;
1062   int rv;
1063
1064   vls_mt_detect ();
1065   if (!(vls = vls_get_w_dlock (vlsh)))
1066     return VPPCOM_EBADFD;
1067   vls_mt_guard (vls, VLS_MT_OP_XPOLL);
1068   rv = vppcom_session_listen (vls_to_sh_tu (vls), q_len);
1069   vls_mt_unguard ();
1070   vls_get_and_unlock (vlsh);
1071   return rv;
1072 }
1073
1074 int
1075 vls_connect (vls_handle_t vlsh, vppcom_endpt_t * server_ep)
1076 {
1077   vcl_locked_session_t *vls;
1078   int rv;
1079
1080   vls_mt_detect ();
1081   if (!(vls = vls_get_w_dlock (vlsh)))
1082     return VPPCOM_EBADFD;
1083   vls_mt_guard (vls, VLS_MT_OP_XPOLL);
1084   rv = vppcom_session_connect (vls_to_sh_tu (vls), server_ep);
1085   vls_mt_unguard ();
1086   vls_get_and_unlock (vlsh);
1087   return rv;
1088 }
1089
1090 static inline void
1091 vls_mp_checks (vcl_locked_session_t * vls, int is_add)
1092 {
1093   vcl_worker_t *wrk = vcl_worker_get_current ();
1094   vcl_session_t *s;
1095   u32 owner_wrk;
1096
1097   if (vls_mt_wrk_supported ())
1098     return;
1099
1100   s = vcl_session_get (wrk, vls->session_index);
1101   switch (s->session_state)
1102     {
1103     case VCL_STATE_LISTEN:
1104       if (is_add)
1105         {
1106           vls_listener_wrk_set (vls, vls->worker_index, 1 /* is_active */ );
1107           break;
1108         }
1109       vls_listener_wrk_stop_listen (vls, vls->worker_index);
1110       break;
1111     case VCL_STATE_LISTEN_NO_MQ:
1112       if (!is_add)
1113         break;
1114
1115       /* Register worker as listener */
1116       vls_listener_wrk_start_listen (vls, wrk->wrk_index);
1117
1118       /* If owner worker did not attempt to accept/xpoll on the session,
1119        * force a listen stop for it, since it may not be interested in
1120        * accepting new sessions.
1121        * This is pretty much a hack done to give app workers the illusion
1122        * that it is fine to listen and not accept new sessions for a
1123        * given listener. Without it, we would accumulate unhandled
1124        * accepts on the passive worker message queue. */
1125       owner_wrk = vls_shared_get_owner (vls);
1126       if (!vls_listener_wrk_is_active (vls, owner_wrk))
1127         vls_listener_wrk_stop_listen (vls, owner_wrk);
1128       break;
1129     default:
1130       break;
1131     }
1132 }
1133
1134 vls_handle_t
1135 vls_accept (vls_handle_t listener_vlsh, vppcom_endpt_t * ep, int flags)
1136 {
1137   vls_handle_t accepted_vlsh;
1138   vcl_locked_session_t *vls;
1139   int sh;
1140
1141   vls_mt_detect ();
1142   if (!(vls = vls_get_w_dlock (listener_vlsh)))
1143     return VPPCOM_EBADFD;
1144   if (vcl_n_workers () > 1)
1145     vls_mp_checks (vls, 1 /* is_add */ );
1146   vls_mt_guard (vls, VLS_MT_OP_SPOOL);
1147   sh = vppcom_session_accept (vls_to_sh_tu (vls), ep, flags);
1148   vls_mt_unguard ();
1149   vls_get_and_unlock (listener_vlsh);
1150   if (sh < 0)
1151     return sh;
1152   accepted_vlsh = vls_alloc (sh);
1153   if (PREDICT_FALSE (accepted_vlsh == VLS_INVALID_HANDLE))
1154     vppcom_session_close (sh);
1155   return accepted_vlsh;
1156 }
1157
1158 vls_handle_t
1159 vls_create (uint8_t proto, uint8_t is_nonblocking)
1160 {
1161   vcl_session_handle_t sh;
1162   vls_handle_t vlsh;
1163
1164   vls_mt_detect ();
1165   vls_mt_guard (0, VLS_MT_OP_SPOOL);
1166   sh = vppcom_session_create (proto, is_nonblocking);
1167   vls_mt_unguard ();
1168   if (sh == INVALID_SESSION_ID)
1169     return VLS_INVALID_HANDLE;
1170
1171   vlsh = vls_alloc (sh);
1172   if (PREDICT_FALSE (vlsh == VLS_INVALID_HANDLE))
1173     vppcom_session_close (sh);
1174
1175   return vlsh;
1176 }
1177
1178 static void
1179 vls_mt_session_cleanup (vcl_locked_session_t * vls)
1180 {
1181   u32 session_index, wrk_index, current_vcl_wrk;
1182   vcl_worker_t *wrk = vcl_worker_get_current ();
1183
1184   ASSERT (vls_mt_wrk_supported ());
1185
1186   current_vcl_wrk = vcl_get_worker_index ();
1187
1188   /* *INDENT-OFF* */
1189   hash_foreach (wrk_index, session_index, vls->vcl_wrk_index_to_session_index,
1190     ({
1191       if (current_vcl_wrk != wrk_index)
1192         vls_send_session_cleanup_rpc (wrk, wrk_index, session_index);
1193     }));
1194   /* *INDENT-ON* */
1195   hash_free (vls->vcl_wrk_index_to_session_index);
1196 }
1197
1198 int
1199 vls_close (vls_handle_t vlsh)
1200 {
1201   vcl_locked_session_t *vls;
1202   int rv;
1203
1204   vls_mt_detect ();
1205   vls_mt_table_wlock ();
1206
1207   vls = vls_get_and_lock (vlsh);
1208   if (!vls)
1209     {
1210       vls_mt_table_wunlock ();
1211       return VPPCOM_EBADFD;
1212     }
1213
1214   vls_mt_guard (vls, VLS_MT_OP_SPOOL);
1215
1216   if (vls_is_shared (vls))
1217     rv = vls_unshare_session (vls, vcl_worker_get_current ());
1218   else
1219     rv = vppcom_session_close (vls_to_sh (vls));
1220
1221   if (vls_mt_wrk_supported ())
1222     vls_mt_session_cleanup (vls);
1223
1224   vls_free (vls);
1225   vls_mt_unguard ();
1226
1227   vls_mt_table_wunlock ();
1228
1229   return rv;
1230 }
1231
1232 vls_handle_t
1233 vls_epoll_create (void)
1234 {
1235   vcl_session_handle_t sh;
1236   vls_handle_t vlsh;
1237
1238   vls_mt_detect ();
1239
1240   sh = vppcom_epoll_create ();
1241   if (sh == INVALID_SESSION_ID)
1242     return VLS_INVALID_HANDLE;
1243
1244   vlsh = vls_alloc (sh);
1245   if (vlsh == VLS_INVALID_HANDLE)
1246     vppcom_session_close (sh);
1247
1248   return vlsh;
1249 }
1250
1251 static void
1252 vls_epoll_ctl_mp_checks (vcl_locked_session_t * vls, int op)
1253 {
1254   if (vcl_n_workers () <= 1)
1255     {
1256       vlsl->epoll_mp_check = 1;
1257       return;
1258     }
1259
1260   if (op == EPOLL_CTL_MOD)
1261     return;
1262
1263   vlsl->epoll_mp_check = 1;
1264   vls_mp_checks (vls, op == EPOLL_CTL_ADD);
1265 }
1266
1267 int
1268 vls_epoll_ctl (vls_handle_t ep_vlsh, int op, vls_handle_t vlsh,
1269                struct epoll_event *event)
1270 {
1271   vcl_locked_session_t *ep_vls, *vls;
1272   vcl_session_handle_t ep_sh, sh;
1273   int rv;
1274
1275   vls_mt_detect ();
1276   vls_mt_table_rlock ();
1277   ep_vls = vls_get_and_lock (ep_vlsh);
1278   vls = vls_get_and_lock (vlsh);
1279
1280   if (vls_mt_session_should_migrate (ep_vls))
1281     vls_mt_session_migrate (ep_vls);
1282
1283   ep_sh = vls_to_sh (ep_vls);
1284   sh = vls_to_sh (vls);
1285
1286   if (PREDICT_FALSE (!vlsl->epoll_mp_check))
1287     vls_epoll_ctl_mp_checks (vls, op);
1288
1289   vls_mt_table_runlock ();
1290
1291   rv = vppcom_epoll_ctl (ep_sh, op, sh, event);
1292
1293   vls_mt_table_rlock ();
1294   ep_vls = vls_get (ep_vlsh);
1295   vls = vls_get (vlsh);
1296   vls_unlock (vls);
1297   vls_unlock (ep_vls);
1298   vls_mt_table_runlock ();
1299   return rv;
1300 }
1301
1302 int
1303 vls_epoll_wait (vls_handle_t ep_vlsh, struct epoll_event *events,
1304                 int maxevents, double wait_for_time)
1305 {
1306   vcl_locked_session_t *vls;
1307   int rv;
1308
1309   vls_mt_detect ();
1310   if (!(vls = vls_get_w_dlock (ep_vlsh)))
1311     return VPPCOM_EBADFD;
1312   vls_mt_guard (0, VLS_MT_OP_XPOLL);
1313   rv = vppcom_epoll_wait (vls_to_sh_tu (vls), events, maxevents,
1314                           wait_for_time);
1315   vls_mt_unguard ();
1316   vls_get_and_unlock (ep_vlsh);
1317   return rv;
1318 }
1319
1320 static void
1321 vls_select_mp_checks (vcl_si_set * read_map)
1322 {
1323   vcl_locked_session_t *vls;
1324   vcl_worker_t *wrk;
1325   vcl_session_t *s;
1326   u32 si;
1327
1328   if (vcl_n_workers () <= 1)
1329     {
1330       vlsl->select_mp_check = 1;
1331       return;
1332     }
1333
1334   if (!read_map)
1335     return;
1336
1337   vlsl->select_mp_check = 1;
1338   wrk = vcl_worker_get_current ();
1339
1340   /* *INDENT-OFF* */
1341   clib_bitmap_foreach (si, read_map, ({
1342     s = vcl_session_get (wrk, si);
1343     if (s->session_state == VCL_STATE_LISTEN)
1344       {
1345         vls = vls_get (vls_session_index_to_vlsh (si));
1346         vls_mp_checks (vls, 1 /* is_add */);
1347       }
1348   }));
1349   /* *INDENT-ON* */
1350 }
1351
1352 int
1353 vls_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map,
1354             vcl_si_set * except_map, double wait_for_time)
1355 {
1356   int rv;
1357
1358   vls_mt_detect ();
1359   vls_mt_guard (0, VLS_MT_OP_XPOLL);
1360   if (PREDICT_FALSE (!vlsl->select_mp_check))
1361     vls_select_mp_checks (read_map);
1362   rv = vppcom_select (n_bits, read_map, write_map, except_map, wait_for_time);
1363   vls_mt_unguard ();
1364   return rv;
1365 }
1366
1367 static void
1368 vls_unshare_vcl_worker_sessions (vcl_worker_t * wrk)
1369 {
1370   u32 current_wrk, is_current;
1371   vcl_locked_session_t *vls;
1372   vcl_session_t *s;
1373
1374   if (pool_elts (vcm->workers) <= 1)
1375     return;
1376
1377   current_wrk = vcl_get_worker_index ();
1378   is_current = current_wrk == wrk->wrk_index;
1379
1380   /* *INDENT-OFF* */
1381   pool_foreach (s, wrk->sessions, ({
1382     vls = vls_get (vls_si_wi_to_vlsh (s->session_index, wrk->wrk_index));
1383     if (vls && (is_current || vls_is_shared_by_wrk (vls, current_wrk)))
1384       vls_unshare_session (vls, wrk);
1385   }));
1386   /* *INDENT-ON* */
1387 }
1388
1389 static void
1390 vls_cleanup_vcl_worker (vcl_worker_t * wrk)
1391 {
1392   vls_worker_t *vls_wrk = vls_worker_get (wrk->wrk_index);
1393
1394   /* Unshare sessions and also cleanup worker since child may have
1395    * called _exit () and therefore vcl may not catch the event */
1396   vls_unshare_vcl_worker_sessions (wrk);
1397   vcl_worker_cleanup (wrk, 1 /* notify vpp */ );
1398
1399   vls_worker_free (vls_wrk);
1400 }
1401
1402 static void
1403 vls_cleanup_forked_child (vcl_worker_t * wrk, vcl_worker_t * child_wrk)
1404 {
1405   vcl_worker_t *sub_child;
1406   int tries = 0;
1407
1408   if (child_wrk->forked_child != ~0)
1409     {
1410       sub_child = vcl_worker_get_if_valid (child_wrk->forked_child);
1411       if (sub_child)
1412         {
1413           /* Wait a bit, maybe the process is going away */
1414           while (kill (sub_child->current_pid, 0) >= 0 && tries++ < 50)
1415             usleep (1e3);
1416           if (kill (sub_child->current_pid, 0) < 0)
1417             vls_cleanup_forked_child (child_wrk, sub_child);
1418         }
1419     }
1420   vls_cleanup_vcl_worker (child_wrk);
1421   VDBG (0, "Cleaned up forked child wrk %u", child_wrk->wrk_index);
1422   wrk->forked_child = ~0;
1423 }
1424
1425 static struct sigaction old_sa;
1426
1427 static void
1428 vls_intercept_sigchld_handler (int signum, siginfo_t * si, void *uc)
1429 {
1430   vcl_worker_t *wrk, *child_wrk;
1431
1432   if (vcl_get_worker_index () == ~0)
1433     return;
1434
1435   if (sigaction (SIGCHLD, &old_sa, 0))
1436     {
1437       VERR ("couldn't restore sigchld");
1438       exit (-1);
1439     }
1440
1441   wrk = vcl_worker_get_current ();
1442   if (wrk->forked_child == ~0)
1443     return;
1444
1445   child_wrk = vcl_worker_get_if_valid (wrk->forked_child);
1446   if (!child_wrk)
1447     goto done;
1448
1449   if (si && si->si_pid != child_wrk->current_pid)
1450     {
1451       VDBG (0, "unexpected child pid %u", si->si_pid);
1452       goto done;
1453     }
1454   vls_cleanup_forked_child (wrk, child_wrk);
1455
1456 done:
1457   if (old_sa.sa_flags & SA_SIGINFO)
1458     {
1459       void (*fn) (int, siginfo_t *, void *) = old_sa.sa_sigaction;
1460       fn (signum, si, uc);
1461     }
1462   else
1463     {
1464       void (*fn) (int) = old_sa.sa_handler;
1465       if (fn)
1466         fn (signum);
1467     }
1468 }
1469
1470 static void
1471 vls_incercept_sigchld ()
1472 {
1473   struct sigaction sa;
1474   clib_memset (&sa, 0, sizeof (sa));
1475   sa.sa_sigaction = vls_intercept_sigchld_handler;
1476   sa.sa_flags = SA_SIGINFO;
1477   if (sigaction (SIGCHLD, &sa, &old_sa))
1478     {
1479       VERR ("couldn't intercept sigchld");
1480       exit (-1);
1481     }
1482 }
1483
1484 static void
1485 vls_app_pre_fork (void)
1486 {
1487   vls_incercept_sigchld ();
1488   vcl_flush_mq_events ();
1489 }
1490
1491 static void
1492 vls_app_fork_child_handler (void)
1493 {
1494   vcl_worker_t *parent_wrk;
1495   int parent_wrk_index;
1496
1497   parent_wrk_index = vcl_get_worker_index ();
1498   VDBG (0, "initializing forked child %u with parent wrk %u", getpid (),
1499         parent_wrk_index);
1500
1501   /*
1502    * Clear old state
1503    */
1504   vcl_set_worker_index (~0);
1505
1506   /*
1507    * Allocate and register vcl worker with vpp
1508    */
1509   if (vppcom_worker_register ())
1510     {
1511       VERR ("couldn't register new worker!");
1512       return;
1513     }
1514
1515   /*
1516    * Allocate/initialize vls worker and share sessions
1517    */
1518   vls_worker_alloc ();
1519   parent_wrk = vcl_worker_get (parent_wrk_index);
1520   vls_worker_copy_on_fork (parent_wrk);
1521   parent_wrk->forked_child = vcl_get_worker_index ();
1522
1523   /* Reset number of threads and set wrk index */
1524   vlsl->vls_mt_n_threads = 0;
1525   vlsl->vls_wrk_index = vcl_get_worker_index ();
1526   vlsl->select_mp_check = 0;
1527   vlsl->epoll_mp_check = 0;
1528   vls_mt_locks_init ();
1529
1530   VDBG (0, "forked child main worker initialized");
1531   vcm->forking = 0;
1532 }
1533
1534 static void
1535 vls_app_fork_parent_handler (void)
1536 {
1537   vcm->forking = 1;
1538   while (vcm->forking)
1539     ;
1540 }
1541
1542 void
1543 vls_app_exit (void)
1544 {
1545   vls_worker_t *wrk = vls_worker_get_current ();
1546
1547   /* Unshare the sessions. VCL will clean up the worker */
1548   vls_unshare_vcl_worker_sessions (vcl_worker_get_current ());
1549   vls_worker_free (wrk);
1550 }
1551
1552 static void
1553 vls_clone_and_share_rpc_handler (void *args)
1554 {
1555   vls_clone_and_share_msg_t *msg = (vls_clone_and_share_msg_t *) args;
1556   vls_worker_t *wrk = vls_worker_get_current (), *dst_wrk;
1557   vcl_locked_session_t *vls, *dst_vls;
1558   vcl_worker_t *vcl_wrk = vcl_worker_get_current (), *dst_vcl_wrk;
1559   vcl_session_t *s, *dst_s;
1560
1561   vls = vls_session_get (wrk, msg->vls_index);
1562
1563   if (!vls_mt_wrk_supported ())
1564     vls_init_share_session (wrk, vls);
1565
1566   s = vcl_session_get (vcl_wrk, msg->session_index);
1567   dst_wrk = vls_worker_get (msg->origin_vls_wrk);
1568   dst_vcl_wrk = vcl_worker_get (msg->origin_vcl_wrk);
1569   dst_vls = vls_session_get (dst_wrk, msg->origin_vls_index);
1570   dst_vls->shared_data_index = vls->shared_data_index;
1571   dst_s = vcl_session_get (dst_vcl_wrk, msg->origin_session_index);
1572   clib_memcpy (dst_s, s, sizeof (*s));
1573
1574   dst_vcl_wrk->rpc_done = 1;
1575
1576   VDBG (1, "proces session clone of worker (session): %u (%u) -> %u (%u)",
1577         vcl_wrk->wrk_index, msg->session_index, dst_vcl_wrk->wrk_index,
1578         msg->origin_session_index);
1579 }
1580
1581 static void
1582 vls_session_cleanup_rpc_handler (void *args)
1583 {
1584   vls_sess_cleanup_msg_t *msg = (vls_sess_cleanup_msg_t *) args;
1585   vcl_worker_t *wrk = vcl_worker_get_current ();
1586   vcl_worker_t *dst_wrk = vcl_worker_get (msg->origin_vcl_wrk);
1587
1588   vppcom_session_close (vcl_session_handle_from_index (msg->session_index));
1589
1590   dst_wrk->rpc_done = 1;
1591
1592   VDBG (1, "proces session cleanup of worker (session): %u (%u) from %u ()",
1593         wrk->wrk_index, msg->session_index, dst_wrk->wrk_index);
1594 }
1595
1596 static void
1597 vls_rpc_handler (void *args)
1598 {
1599   vls_rpc_msg_t *msg = (vls_rpc_msg_t *) args;
1600   switch (msg->type)
1601     {
1602     case VLS_RPC_CLONE_AND_SHARE:
1603       vls_clone_and_share_rpc_handler (msg->data);
1604       break;
1605     case VLS_RPC_SESS_CLEANUP:
1606       vls_session_cleanup_rpc_handler (msg->data);
1607       break;
1608     default:
1609       break;
1610     }
1611 }
1612
1613 void
1614 vls_send_clone_and_share_rpc (vcl_worker_t * wrk,
1615                               vcl_locked_session_t * vls, u32 session_index,
1616                               u32 vls_wrk_index, u32 dst_wrk_index,
1617                               u32 dst_vls_index, u32 dst_session_index)
1618 {
1619   u8 data[sizeof (u8) + sizeof (vls_clone_and_share_msg_t)];
1620   vls_clone_and_share_msg_t *msg;
1621   vls_rpc_msg_t *rpc;
1622   int ret;
1623
1624   rpc = (vls_rpc_msg_t *) & data;
1625   rpc->type = VLS_RPC_CLONE_AND_SHARE;
1626   msg = (vls_clone_and_share_msg_t *) & rpc->data;
1627   msg->origin_vls_wrk = vls_wrk_index;
1628   msg->origin_vls_index = vls->vls_index;
1629   msg->origin_vcl_wrk = wrk->wrk_index;
1630   msg->origin_session_index = session_index;
1631   msg->vls_index = dst_vls_index;
1632   msg->session_index = dst_session_index;
1633
1634   wrk->rpc_done = 0;
1635   ret = vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data));
1636
1637   VDBG (1, "send session clone to wrk (session): %u (%u) -> %u (%u), ret=%d",
1638         dst_wrk_index, msg->session_index, msg->origin_vcl_wrk,
1639         msg->origin_session_index, ret);
1640   while (!ret && !wrk->rpc_done)
1641     ;
1642 }
1643
1644 void
1645 vls_send_session_cleanup_rpc (vcl_worker_t * wrk,
1646                               u32 dst_wrk_index, u32 dst_session_index)
1647 {
1648   u8 data[sizeof (u8) + sizeof (vls_sess_cleanup_msg_t)];
1649   vls_sess_cleanup_msg_t *msg;
1650   vls_rpc_msg_t *rpc;
1651   int ret;
1652
1653   rpc = (vls_rpc_msg_t *) & data;
1654   rpc->type = VLS_RPC_SESS_CLEANUP;
1655   msg = (vls_sess_cleanup_msg_t *) & rpc->data;
1656   msg->origin_vcl_wrk = wrk->wrk_index;
1657   msg->session_index = dst_session_index;
1658
1659   wrk->rpc_done = 0;
1660   ret = vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data));
1661
1662   VDBG (1, "send session cleanup to wrk (session): %u (%u) from %u, ret=%d",
1663         dst_wrk_index, msg->session_index, msg->origin_vcl_wrk, ret);
1664   while (!ret && !wrk->rpc_done)
1665     ;
1666 }
1667
1668 int
1669 vls_app_create (char *app_name)
1670 {
1671   int rv;
1672
1673   if ((rv = vppcom_app_create (app_name)))
1674     return rv;
1675
1676   vlsm = clib_mem_alloc (sizeof (vls_main_t));
1677   clib_memset (vlsm, 0, sizeof (*vlsm));
1678   clib_rwlock_init (&vlsm->vls_table_lock);
1679   clib_rwlock_init (&vlsm->shared_data_lock);
1680   pool_alloc (vlsm->workers, vcm->cfg.max_workers);
1681
1682   pthread_atfork (vls_app_pre_fork, vls_app_fork_parent_handler,
1683                   vls_app_fork_child_handler);
1684   atexit (vls_app_exit);
1685   vls_worker_alloc ();
1686   vlsl->vls_wrk_index = vcl_get_worker_index ();
1687   vls_mt_locks_init ();
1688   vcm->wrk_rpc_fn = vls_rpc_handler;
1689   return VPPCOM_OK;
1690 }
1691
1692 unsigned char
1693 vls_use_eventfd (void)
1694 {
1695   return vcm->cfg.use_mq_eventfd;
1696 }
1697
1698 unsigned char
1699 vls_mt_wrk_supported (void)
1700 {
1701   return vcm->cfg.mt_wrk_supported;
1702 }
1703
1704 int
1705 vls_use_real_epoll (void)
1706 {
1707   if (vcl_get_worker_index () == ~0)
1708     return 0;
1709
1710   return vcl_worker_get_current ()->vcl_needs_real_epoll;
1711 }
1712
1713 void
1714 vls_register_vcl_worker (void)
1715 {
1716   if (vppcom_worker_register () != VPPCOM_OK)
1717     {
1718       VERR ("failed to register worker");
1719       return;
1720     }
1721 }
1722
1723 /*
1724  * fd.io coding-style-patch-verification: ON
1725  *
1726  * Local Variables:
1727  * eval: (c-set-style "gnu")
1728  * End:
1729  */