02da8cd2cf5b942335c57ad920940e055ae955c0
[vpp.git] / src / vcl / vcl_locked.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vcl/vcl_locked.h>
17 #include <vcl/vcl_private.h>
18
19 typedef struct vls_shared_data_
20 {
21   clib_spinlock_t lock;
22   u32 owner_wrk_index;
23   u32 *workers_subscribed;
24   clib_bitmap_t *listeners;
25 } vls_shared_data_t;
26
27 typedef struct vcl_locked_session_
28 {
29   clib_spinlock_t lock;
30   u32 session_index;
31   u32 worker_index;
32   u32 vls_index;
33   u32 shared_data_index;
34   /** VCL session owned by different workers because of migration */
35   u32 owner_vcl_wrk_index;
36   uword *vcl_wrk_index_to_session_index;
37 } vcl_locked_session_t;
38
39 typedef struct vls_worker_
40 {
41   vcl_locked_session_t *vls_pool;
42   uword *session_index_to_vlsh_table;
43   u32 wrk_index;
44 } vls_worker_t;
45
46 typedef struct vls_local_
47 {
48   int vls_wrk_index;
49   volatile int vls_mt_n_threads;
50   pthread_mutex_t vls_mt_mq_mlock;
51   pthread_mutex_t vls_mt_spool_mlock;
52   volatile u8 select_mp_check;
53   volatile u8 epoll_mp_check;
54 } vls_process_local_t;
55
56 static vls_process_local_t vls_local;
57 static vls_process_local_t *vlsl = &vls_local;
58
59 typedef struct vls_main_
60 {
61   vls_worker_t *workers;
62   clib_rwlock_t vls_table_lock;
63   /** Pool of data shared by sessions owned by different workers */
64   vls_shared_data_t *shared_data_pool;
65   clib_rwlock_t shared_data_lock;
66 } vls_main_t;
67
68 vls_main_t *vlsm;
69
70 typedef enum vls_rpc_msg_type_
71 {
72   VLS_RPC_CLONE_AND_SHARE,
73   VLS_RPC_SESS_CLEANUP,
74 } vls_rpc_msg_type_e;
75
76 typedef struct vls_rpc_msg_
77 {
78   u8 type;
79   u8 data[0];
80 } vls_rpc_msg_t;
81
82 typedef struct vls_clone_and_share_msg_
83 {
84   u32 vls_index;                /**< vls to be shared */
85   u32 session_index;            /**< vcl session to be shared */
86   u32 origin_vls_wrk;           /**< vls worker that initiated the rpc */
87   u32 origin_vls_index;         /**< vls session of the originator */
88   u32 origin_vcl_wrk;           /**< vcl worker that initiated the rpc */
89   u32 origin_session_index;     /**< vcl session of the originator */
90 } vls_clone_and_share_msg_t;
91
92 typedef struct vls_sess_cleanup_msg_
93 {
94   u32 session_index;            /**< vcl session to be cleaned */
95   u32 origin_vcl_wrk;           /**< worker that initiated the rpc */
96 } vls_sess_cleanup_msg_t;
97
98 void vls_send_session_cleanup_rpc (vcl_worker_t * wrk,
99                                    u32 dst_wrk_index, u32 dst_session_index);
100 void vls_send_clone_and_share_rpc (vcl_worker_t * wrk,
101                                    vcl_locked_session_t * vls,
102                                    u32 session_index, u32 vls_wrk_index,
103                                    u32 dst_wrk_index, u32 dst_vls_index,
104                                    u32 dst_session_index);
105
106
107 static inline u32
108 vls_get_worker_index (void)
109 {
110   if (vls_mt_wrk_supported ())
111     return vlsl->vls_wrk_index;
112   else
113     return vcl_get_worker_index ();
114 }
115
116 static u32
117 vls_shared_data_alloc (void)
118 {
119   vls_shared_data_t *vls_shd;
120   u32 shd_index;
121
122   clib_rwlock_writer_lock (&vlsm->shared_data_lock);
123   pool_get_zero (vlsm->shared_data_pool, vls_shd);
124   clib_spinlock_init (&vls_shd->lock);
125   shd_index = vls_shd - vlsm->shared_data_pool;
126   clib_rwlock_writer_unlock (&vlsm->shared_data_lock);
127
128   return shd_index;
129 }
130
131 static u32
132 vls_shared_data_index (vls_shared_data_t * vls_shd)
133 {
134   return vls_shd - vlsm->shared_data_pool;
135 }
136
137 vls_shared_data_t *
138 vls_shared_data_get (u32 shd_index)
139 {
140   if (pool_is_free_index (vlsm->shared_data_pool, shd_index))
141     return 0;
142   return pool_elt_at_index (vlsm->shared_data_pool, shd_index);
143 }
144
145 static void
146 vls_shared_data_free (u32 shd_index)
147 {
148   vls_shared_data_t *vls_shd;
149
150   clib_rwlock_writer_lock (&vlsm->shared_data_lock);
151   vls_shd = vls_shared_data_get (shd_index);
152   clib_spinlock_free (&vls_shd->lock);
153   clib_bitmap_free (vls_shd->listeners);
154   vec_free (vls_shd->workers_subscribed);
155   pool_put (vlsm->shared_data_pool, vls_shd);
156   clib_rwlock_writer_unlock (&vlsm->shared_data_lock);
157 }
158
159 static inline void
160 vls_shared_data_pool_rlock (void)
161 {
162   clib_rwlock_reader_lock (&vlsm->shared_data_lock);
163 }
164
165 static inline void
166 vls_shared_data_pool_runlock (void)
167 {
168   clib_rwlock_reader_unlock (&vlsm->shared_data_lock);
169 }
170
171 static inline void
172 vls_table_rlock (void)
173 {
174   if (vlsl->vls_mt_n_threads > 1)
175     clib_rwlock_reader_lock (&vlsm->vls_table_lock);
176 }
177
178 static inline void
179 vls_table_runlock (void)
180 {
181   if (vlsl->vls_mt_n_threads > 1)
182     clib_rwlock_reader_unlock (&vlsm->vls_table_lock);
183 }
184
185 static inline void
186 vls_table_wlock (void)
187 {
188   if (vlsl->vls_mt_n_threads > 1)
189     clib_rwlock_writer_lock (&vlsm->vls_table_lock);
190 }
191
192 static inline void
193 vls_table_wunlock (void)
194 {
195   if (vlsl->vls_mt_n_threads > 1)
196     clib_rwlock_writer_unlock (&vlsm->vls_table_lock);
197 }
198
199 typedef enum
200 {
201   VLS_MT_OP_READ,
202   VLS_MT_OP_WRITE,
203   VLS_MT_OP_SPOOL,
204   VLS_MT_OP_XPOLL,
205 } vls_mt_ops_t;
206
207 typedef enum
208 {
209   VLS_MT_LOCK_MQ = 1 << 0,
210   VLS_MT_LOCK_SPOOL = 1 << 1
211 } vls_mt_lock_type_t;
212
213 static void
214 vls_mt_add (void)
215 {
216   vlsl->vls_mt_n_threads += 1;
217   if (vls_mt_wrk_supported ())
218     vls_register_vcl_worker ();
219   else
220     vcl_set_worker_index (vlsl->vls_wrk_index);
221 }
222
223 static inline void
224 vls_mt_mq_lock (void)
225 {
226   pthread_mutex_lock (&vlsl->vls_mt_mq_mlock);
227 }
228
229 static inline void
230 vls_mt_mq_unlock (void)
231 {
232   pthread_mutex_unlock (&vlsl->vls_mt_mq_mlock);
233 }
234
235 static inline void
236 vls_mt_spool_lock (void)
237 {
238   pthread_mutex_lock (&vlsl->vls_mt_spool_mlock);
239 }
240
241 static inline void
242 vls_mt_create_unlock (void)
243 {
244   pthread_mutex_unlock (&vlsl->vls_mt_spool_mlock);
245 }
246
247 static void
248 vls_mt_locks_init (void)
249 {
250   pthread_mutex_init (&vlsl->vls_mt_mq_mlock, NULL);
251   pthread_mutex_init (&vlsl->vls_mt_spool_mlock, NULL);
252 }
253
254 u8
255 vls_is_shared (vcl_locked_session_t * vls)
256 {
257   return (vls->shared_data_index != ~0);
258 }
259
260 static inline void
261 vls_lock (vcl_locked_session_t * vls)
262 {
263   if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls))
264     clib_spinlock_lock (&vls->lock);
265 }
266
267 static inline void
268 vls_unlock (vcl_locked_session_t * vls)
269 {
270   if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls))
271     clib_spinlock_unlock (&vls->lock);
272 }
273
274 static inline vcl_session_handle_t
275 vls_to_sh (vcl_locked_session_t * vls)
276 {
277   return vcl_session_handle_from_index (vls->session_index);
278 }
279
280 static inline vcl_session_handle_t
281 vls_to_sh_tu (vcl_locked_session_t * vls)
282 {
283   vcl_session_handle_t sh;
284   sh = vls_to_sh (vls);
285   vls_table_runlock ();
286   return sh;
287 }
288
289 static vls_worker_t *
290 vls_worker_get_current (void)
291 {
292   return pool_elt_at_index (vlsm->workers, vls_get_worker_index ());
293 }
294
295 static void
296 vls_worker_alloc (void)
297 {
298   vls_worker_t *wrk;
299
300   pool_get_zero (vlsm->workers, wrk);
301   wrk->wrk_index = vcl_get_worker_index ();
302 }
303
304 static void
305 vls_worker_free (vls_worker_t * wrk)
306 {
307   hash_free (wrk->session_index_to_vlsh_table);
308   pool_free (wrk->vls_pool);
309   pool_put (vlsm->workers, wrk);
310 }
311
312 static vls_worker_t *
313 vls_worker_get (u32 wrk_index)
314 {
315   if (pool_is_free_index (vlsm->workers, wrk_index))
316     return 0;
317   return pool_elt_at_index (vlsm->workers, wrk_index);
318 }
319
320 static vls_handle_t
321 vls_alloc (vcl_session_handle_t sh)
322 {
323   vls_worker_t *wrk = vls_worker_get_current ();
324   vcl_locked_session_t *vls;
325
326   vls_table_wlock ();
327
328   pool_get_zero (wrk->vls_pool, vls);
329   vls->session_index = vppcom_session_index (sh);
330   vls->worker_index = vppcom_session_worker (sh);
331   vls->vls_index = vls - wrk->vls_pool;
332   vls->shared_data_index = ~0;
333   hash_set (wrk->session_index_to_vlsh_table, vls->session_index,
334             vls->vls_index);
335   if (vls_mt_wrk_supported ())
336     {
337       hash_set (vls->vcl_wrk_index_to_session_index, vls->worker_index,
338                 vls->session_index);
339       vls->owner_vcl_wrk_index = vls->worker_index;
340     }
341   clib_spinlock_init (&vls->lock);
342
343   vls_table_wunlock ();
344   return vls->vls_index;
345 }
346
347 static vcl_locked_session_t *
348 vls_get (vls_handle_t vlsh)
349 {
350   vls_worker_t *wrk = vls_worker_get_current ();
351   if (pool_is_free_index (wrk->vls_pool, vlsh))
352     return 0;
353   return pool_elt_at_index (wrk->vls_pool, vlsh);
354 }
355
356 static void
357 vls_free (vcl_locked_session_t * vls)
358 {
359   vls_worker_t *wrk = vls_worker_get_current ();
360
361   ASSERT (vls != 0);
362   hash_unset (wrk->session_index_to_vlsh_table, vls->session_index);
363   clib_spinlock_free (&vls->lock);
364   pool_put (wrk->vls_pool, vls);
365 }
366
367 static vcl_locked_session_t *
368 vls_get_and_lock (vls_handle_t vlsh)
369 {
370   vls_worker_t *wrk = vls_worker_get_current ();
371   vcl_locked_session_t *vls;
372   if (pool_is_free_index (wrk->vls_pool, vlsh))
373     return 0;
374   vls = pool_elt_at_index (wrk->vls_pool, vlsh);
375   vls_lock (vls);
376   return vls;
377 }
378
379 static vcl_locked_session_t *
380 vls_get_w_dlock (vls_handle_t vlsh)
381 {
382   vcl_locked_session_t *vls;
383   vls_table_rlock ();
384   vls = vls_get_and_lock (vlsh);
385   if (!vls)
386     vls_table_runlock ();
387   return vls;
388 }
389
390 static inline void
391 vls_get_and_unlock (vls_handle_t vlsh)
392 {
393   vcl_locked_session_t *vls;
394   vls_table_rlock ();
395   vls = vls_get (vlsh);
396   vls_unlock (vls);
397   vls_table_runlock ();
398 }
399
400 static inline void
401 vls_dunlock (vcl_locked_session_t * vls)
402 {
403   vls_unlock (vls);
404   vls_table_runlock ();
405 }
406
407 static vcl_locked_session_t *
408 vls_session_get (vls_worker_t * wrk, u32 vls_index)
409 {
410   if (pool_is_free_index (wrk->vls_pool, vls_index))
411     return 0;
412   return pool_elt_at_index (wrk->vls_pool, vls_index);
413 }
414
415 vcl_session_handle_t
416 vlsh_to_sh (vls_handle_t vlsh)
417 {
418   vcl_locked_session_t *vls;
419   int rv;
420
421   vls = vls_get_w_dlock (vlsh);
422   if (!vls)
423     return INVALID_SESSION_ID;
424   rv = vls_to_sh (vls);
425   vls_dunlock (vls);
426   return rv;
427 }
428
429 vcl_session_handle_t
430 vlsh_to_session_index (vls_handle_t vlsh)
431 {
432   vcl_session_handle_t sh;
433   sh = vlsh_to_sh (vlsh);
434   return vppcom_session_index (sh);
435 }
436
437 vls_handle_t
438 vls_si_to_vlsh (u32 session_index)
439 {
440   vls_worker_t *wrk = vls_worker_get_current ();
441   uword *vlshp;
442   vlshp = hash_get (wrk->session_index_to_vlsh_table, session_index);
443   return vlshp ? *vlshp : VLS_INVALID_HANDLE;
444 }
445
446 vls_handle_t
447 vls_session_index_to_vlsh (uint32_t session_index)
448 {
449   vls_handle_t vlsh;
450
451   vls_table_rlock ();
452   vlsh = vls_si_to_vlsh (session_index);
453   vls_table_runlock ();
454
455   return vlsh;
456 }
457
458 u8
459 vls_is_shared_by_wrk (vcl_locked_session_t * vls, u32 wrk_index)
460 {
461   vls_shared_data_t *vls_shd;
462   int i;
463
464   if (vls->shared_data_index == ~0)
465     return 0;
466
467   vls_shared_data_pool_rlock ();
468
469   vls_shd = vls_shared_data_get (vls->shared_data_index);
470   clib_spinlock_lock (&vls_shd->lock);
471
472   for (i = 0; i < vec_len (vls_shd->workers_subscribed); i++)
473     if (vls_shd->workers_subscribed[i] == wrk_index)
474       {
475         clib_spinlock_unlock (&vls_shd->lock);
476         vls_shared_data_pool_runlock ();
477         return 1;
478       }
479   clib_spinlock_unlock (&vls_shd->lock);
480
481   vls_shared_data_pool_runlock ();
482   return 0;
483 }
484
485 static void
486 vls_listener_wrk_set (vcl_locked_session_t * vls, u32 wrk_index, u8 is_active)
487 {
488   vls_shared_data_t *vls_shd;
489
490   if (vls->shared_data_index == ~0)
491     {
492       clib_warning ("not a shared session");
493       return;
494     }
495
496   vls_shared_data_pool_rlock ();
497
498   vls_shd = vls_shared_data_get (vls->shared_data_index);
499
500   clib_spinlock_lock (&vls_shd->lock);
501   clib_bitmap_set (vls_shd->listeners, wrk_index, is_active);
502   clib_spinlock_unlock (&vls_shd->lock);
503
504   vls_shared_data_pool_runlock ();
505 }
506
507 static u32
508 vls_shared_get_owner (vcl_locked_session_t * vls)
509 {
510   vls_shared_data_t *vls_shd;
511   u32 owner_wrk;
512
513   vls_shared_data_pool_rlock ();
514
515   vls_shd = vls_shared_data_get (vls->shared_data_index);
516   owner_wrk = vls_shd->owner_wrk_index;
517
518   vls_shared_data_pool_runlock ();
519
520   return owner_wrk;
521 }
522
523 static u8
524 vls_listener_wrk_is_active (vcl_locked_session_t * vls, u32 wrk_index)
525 {
526   vls_shared_data_t *vls_shd;
527   u8 is_set;
528
529   if (vls->shared_data_index == ~0)
530     {
531       clib_warning ("not a shared session");
532       return 0;
533     }
534
535   vls_shared_data_pool_rlock ();
536
537   vls_shd = vls_shared_data_get (vls->shared_data_index);
538
539   clib_spinlock_lock (&vls_shd->lock);
540   is_set = clib_bitmap_get (vls_shd->listeners, wrk_index);
541   clib_spinlock_unlock (&vls_shd->lock);
542
543   vls_shared_data_pool_runlock ();
544
545   return (is_set == 1);
546 }
547
548 static void
549 vls_listener_wrk_start_listen (vcl_locked_session_t * vls, u32 wrk_index)
550 {
551   vppcom_session_listen (vls_to_sh (vls), ~0);
552   vls_listener_wrk_set (vls, wrk_index, 1 /* is_active */ );
553 }
554
555 static void
556 vls_listener_wrk_stop_listen (vcl_locked_session_t * vls, u32 wrk_index)
557 {
558   vcl_worker_t *wrk;
559   vcl_session_t *s;
560
561   wrk = vcl_worker_get (wrk_index);
562   s = vcl_session_get (wrk, vls->session_index);
563   if (s->session_state != STATE_LISTEN)
564     return;
565   vcl_send_session_unlisten (wrk, s);
566   s->session_state = STATE_LISTEN_NO_MQ;
567   vls_listener_wrk_set (vls, wrk_index, 0 /* is_active */ );
568 }
569
570 static int
571 vls_shared_data_subscriber_position (vls_shared_data_t * vls_shd,
572                                      u32 wrk_index)
573 {
574   int i;
575
576   for (i = 0; i < vec_len (vls_shd->workers_subscribed); i++)
577     {
578       if (vls_shd->workers_subscribed[i] == wrk_index)
579         return i;
580     }
581   return -1;
582 }
583
584 int
585 vls_unshare_session (vcl_locked_session_t * vls, vcl_worker_t * wrk)
586 {
587   vls_shared_data_t *vls_shd;
588   int do_disconnect, pos;
589   u32 n_subscribers;
590   vcl_session_t *s;
591
592   if (vls->shared_data_index == ~0)
593     return 0;
594
595   s = vcl_session_get (wrk, vls->session_index);
596   if (s->session_state == STATE_LISTEN)
597     vls_listener_wrk_set (vls, wrk->wrk_index, 0 /* is_active */ );
598
599   vls_shared_data_pool_rlock ();
600
601   vls_shd = vls_shared_data_get (vls->shared_data_index);
602   clib_spinlock_lock (&vls_shd->lock);
603
604   pos = vls_shared_data_subscriber_position (vls_shd, wrk->wrk_index);
605   if (pos < 0)
606     {
607       clib_warning ("worker %u not subscribed for vls %u", wrk->wrk_index,
608                     vls->worker_index);
609       goto done;
610     }
611
612   /*
613    * Unsubscribe from share data and fifos
614    */
615   if (s->rx_fifo)
616     {
617       svm_fifo_del_subscriber (s->rx_fifo, wrk->vpp_wrk_index);
618       svm_fifo_del_subscriber (s->tx_fifo, wrk->vpp_wrk_index);
619     }
620   vec_del1 (vls_shd->workers_subscribed, pos);
621
622   /*
623    * Cleanup vcl state
624    */
625   n_subscribers = vec_len (vls_shd->workers_subscribed);
626   do_disconnect = s->session_state == STATE_LISTEN || !n_subscribers;
627   vcl_session_cleanup (wrk, s, vcl_session_handle (s), do_disconnect);
628
629   /*
630    * No subscriber left, cleanup shared data
631    */
632   if (!n_subscribers)
633     {
634       u32 shd_index = vls_shared_data_index (vls_shd);
635
636       clib_spinlock_unlock (&vls_shd->lock);
637       vls_shared_data_pool_runlock ();
638
639       vls_shared_data_free (shd_index);
640
641       /* All locks have been dropped */
642       return 0;
643     }
644
645   /* Return, if this is not the owning worker */
646   if (vls_shd->owner_wrk_index != wrk->wrk_index)
647     goto done;
648
649   ASSERT (vec_len (vls_shd->workers_subscribed));
650
651   /*
652    *  Check if we can change owner or close
653    */
654   vls_shd->owner_wrk_index = vls_shd->workers_subscribed[0];
655   vcl_send_session_worker_update (wrk, s, vls_shd->owner_wrk_index);
656
657   /* XXX is this still needed? */
658   if (vec_len (vls_shd->workers_subscribed) > 1)
659     clib_warning ("more workers need to be updated");
660
661 done:
662
663   clib_spinlock_unlock (&vls_shd->lock);
664   vls_shared_data_pool_runlock ();
665
666   return 0;
667 }
668
669 void
670 vls_init_share_session (vls_worker_t * vls_wrk, vcl_locked_session_t * vls)
671 {
672   vls_shared_data_t *vls_shd;
673
674   u32 vls_shd_index = vls_shared_data_alloc ();
675
676   vls_shared_data_pool_rlock ();
677
678   vls_shd = vls_shared_data_get (vls_shd_index);
679   vls_shd->owner_wrk_index = vls_wrk->wrk_index;
680   vls->shared_data_index = vls_shd_index;
681   vec_add1 (vls_shd->workers_subscribed, vls_wrk->wrk_index);
682
683   vls_shared_data_pool_runlock ();
684 }
685
686 void
687 vls_share_session (vls_worker_t * vls_wrk, vcl_locked_session_t * vls)
688 {
689   vcl_worker_t *vcl_wrk = vcl_worker_get (vls_wrk->wrk_index);
690   vls_shared_data_t *vls_shd;
691   vcl_session_t *s;
692
693   s = vcl_session_get (vcl_wrk, vls->session_index);
694   if (!s)
695     {
696       clib_warning ("wrk %u session %u vls %u NOT AVAILABLE",
697                     vcl_wrk->wrk_index, vls->session_index, vls->vls_index);
698       return;
699     }
700
701   ASSERT (vls->shared_data_index != ~0);
702
703   /* Reinit session lock */
704   clib_spinlock_init (&vls->lock);
705
706   vls_shared_data_pool_rlock ();
707
708   vls_shd = vls_shared_data_get (vls->shared_data_index);
709
710   clib_spinlock_lock (&vls_shd->lock);
711   vec_add1 (vls_shd->workers_subscribed, vls_wrk->wrk_index);
712   clib_spinlock_unlock (&vls_shd->lock);
713
714   vls_shared_data_pool_runlock ();
715
716   if (s->rx_fifo)
717     {
718       svm_fifo_add_subscriber (s->rx_fifo, vcl_wrk->vpp_wrk_index);
719       svm_fifo_add_subscriber (s->tx_fifo, vcl_wrk->vpp_wrk_index);
720     }
721   else if (s->session_state == STATE_LISTEN)
722     {
723       s->session_state = STATE_LISTEN_NO_MQ;
724     }
725 }
726
727 static void
728 vls_share_sessions (vls_worker_t * vls_parent_wrk, vls_worker_t * vls_wrk)
729 {
730   vcl_locked_session_t *vls, *parent_vls;
731
732   /* *INDENT-OFF* */
733   pool_foreach (vls, vls_wrk->vls_pool, ({
734     /* Initialize sharing on parent session */
735     if (vls->shared_data_index == ~0)
736       {
737         parent_vls = vls_session_get (vls_parent_wrk, vls->vls_index);
738         vls_init_share_session (vls_parent_wrk, parent_vls);
739         vls->shared_data_index = parent_vls->shared_data_index;
740       }
741     vls_share_session (vls_wrk, vls);
742   }));
743   /* *INDENT-ON* */
744 }
745
746 void
747 vls_worker_copy_on_fork (vcl_worker_t * parent_wrk)
748 {
749   vls_worker_t *vls_wrk = vls_worker_get_current (), *vls_parent_wrk;
750   vcl_worker_t *wrk = vcl_worker_get_current ();
751
752   /*
753    * init vcl worker
754    */
755   wrk->vpp_event_queues = vec_dup (parent_wrk->vpp_event_queues);
756   wrk->sessions = pool_dup (parent_wrk->sessions);
757   wrk->session_index_by_vpp_handles =
758     hash_dup (parent_wrk->session_index_by_vpp_handles);
759
760   /*
761    * init vls worker
762    */
763   vls_parent_wrk = vls_worker_get (parent_wrk->wrk_index);
764   vls_wrk->session_index_to_vlsh_table =
765     hash_dup (vls_parent_wrk->session_index_to_vlsh_table);
766   vls_wrk->vls_pool = pool_dup (vls_parent_wrk->vls_pool);
767
768   vls_share_sessions (vls_parent_wrk, vls_wrk);
769 }
770
771 static void
772 vls_mt_acq_locks (vcl_locked_session_t * vls, vls_mt_ops_t op, int *locks_acq)
773 {
774   vcl_worker_t *wrk = vcl_worker_get_current ();
775   vcl_session_t *s = 0;
776   int is_nonblk = 0;
777
778   if (vls)
779     {
780       s = vcl_session_get (wrk, vls->session_index);
781       if (PREDICT_FALSE (!s))
782         return;
783       is_nonblk = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
784     }
785
786   switch (op)
787     {
788     case VLS_MT_OP_READ:
789       if (!is_nonblk)
790         is_nonblk = vcl_session_read_ready (s) != 0;
791       if (!is_nonblk)
792         {
793           vls_mt_mq_lock ();
794           *locks_acq |= VLS_MT_LOCK_MQ;
795         }
796       break;
797     case VLS_MT_OP_WRITE:
798       ASSERT (s);
799       if (!is_nonblk)
800         is_nonblk = vcl_session_write_ready (s) != 0;
801       if (!is_nonblk)
802         {
803           vls_mt_mq_lock ();
804           *locks_acq |= VLS_MT_LOCK_MQ;
805         }
806       break;
807     case VLS_MT_OP_XPOLL:
808       vls_mt_mq_lock ();
809       *locks_acq |= VLS_MT_LOCK_MQ;
810       break;
811     case VLS_MT_OP_SPOOL:
812       vls_mt_spool_lock ();
813       *locks_acq |= VLS_MT_LOCK_SPOOL;
814       break;
815     default:
816       break;
817     }
818 }
819
820 static void
821 vls_mt_rel_locks (int locks_acq)
822 {
823   if (locks_acq & VLS_MT_LOCK_MQ)
824     vls_mt_mq_unlock ();
825   if (locks_acq & VLS_MT_LOCK_SPOOL)
826     vls_mt_create_unlock ();
827 }
828
829 static void
830 vls_session_migrate (vcl_locked_session_t * vls)
831 {
832   u32 wrk_index = vcl_get_worker_index ();
833   vcl_worker_t *wrk;
834   u32 src_sid, sid;
835   vcl_session_t *session;
836   uword *p;
837
838   if (!vls_mt_wrk_supported ())
839     return;
840
841   if (PREDICT_TRUE (vls->worker_index == wrk_index))
842     return;
843   if ((p = hash_get (vls->vcl_wrk_index_to_session_index, wrk_index)))
844     {
845       vls->worker_index = wrk_index;
846       vls->session_index = (u32) p[0];
847       return;
848     }
849
850   /* migrate from orignal vls */
851   if (!(p = hash_get (vls->vcl_wrk_index_to_session_index,
852                       vls->owner_vcl_wrk_index)))
853     {
854       VERR ("session in owner worker(%u) is free", vls->owner_vcl_wrk_index);
855       ASSERT (0);
856       return;
857     }
858
859   src_sid = (u32) p[0];
860   wrk = vcl_worker_get_current ();
861   session = vcl_session_alloc (wrk);
862   sid = session->session_index;
863   vls_send_clone_and_share_rpc (wrk, vls, sid, vls_get_worker_index (),
864                                 vls->owner_vcl_wrk_index, vls->vls_index,
865                                 src_sid);
866   session->session_index = sid;
867   vls->worker_index = wrk_index;
868   vls->session_index = sid;
869   hash_set (vls->vcl_wrk_index_to_session_index, wrk_index, sid);
870   VDBG (1, "migrate session of worker (session): %u (%u) -> %u (%u)",
871         vls->owner_vcl_wrk_index, src_sid, wrk_index, sid);
872
873   if (PREDICT_FALSE (session->is_vep && session->vep.next_sh != ~0))
874     {
875       /* TODO: rollback? */
876       VERR ("can't migrate nonempty epoll session");
877       ASSERT (0);
878       return;
879     }
880   else if (PREDICT_FALSE (!session->is_vep &&
881                           session->session_state != STATE_CLOSED))
882     {
883       /* TODO: rollback? */
884       VERR ("migrate NOT supported, session_status (%u)",
885             session->session_state);
886       ASSERT (0);
887       return;
888     }
889 }
890
891 #define vls_mt_guard(_vls, _op)                         \
892   int _locks_acq = 0;                                   \
893   if (PREDICT_FALSE (vcl_get_worker_index () == ~0))    \
894     vls_mt_add ();                                      \
895   if (PREDICT_FALSE (_vls && vls_mt_wrk_supported () && \
896       ((vcl_locked_session_t *)_vls)->worker_index !=   \
897       vcl_get_worker_index ()))                         \
898     vls_session_migrate (_vls);                         \
899   if (PREDICT_FALSE (vlsl->vls_mt_n_threads > 1))       \
900     vls_mt_acq_locks (_vls, _op, &_locks_acq);          \
901
902 #define vls_mt_unguard()                                \
903   if (PREDICT_FALSE (_locks_acq))                       \
904     vls_mt_rel_locks (_locks_acq)
905
906 int
907 vls_write (vls_handle_t vlsh, void *buf, size_t nbytes)
908 {
909   vcl_locked_session_t *vls;
910   int rv;
911
912   if (!(vls = vls_get_w_dlock (vlsh)))
913     return VPPCOM_EBADFD;
914
915   vls_mt_guard (vls, VLS_MT_OP_WRITE);
916   rv = vppcom_session_write (vls_to_sh_tu (vls), buf, nbytes);
917   vls_mt_unguard ();
918   vls_get_and_unlock (vlsh);
919   return rv;
920 }
921
922 int
923 vls_write_msg (vls_handle_t vlsh, void *buf, size_t nbytes)
924 {
925   vcl_locked_session_t *vls;
926   int rv;
927
928   if (!(vls = vls_get_w_dlock (vlsh)))
929     return VPPCOM_EBADFD;
930   vls_mt_guard (vls, VLS_MT_OP_WRITE);
931   rv = vppcom_session_write_msg (vls_to_sh_tu (vls), buf, nbytes);
932   vls_mt_unguard ();
933   vls_get_and_unlock (vlsh);
934   return rv;
935 }
936
937 int
938 vls_sendto (vls_handle_t vlsh, void *buf, int buflen, int flags,
939             vppcom_endpt_t * ep)
940 {
941   vcl_locked_session_t *vls;
942   int rv;
943
944   if (!(vls = vls_get_w_dlock (vlsh)))
945     return VPPCOM_EBADFD;
946   vls_mt_guard (vls, VLS_MT_OP_WRITE);
947   rv = vppcom_session_sendto (vls_to_sh_tu (vls), buf, buflen, flags, ep);
948   vls_mt_unguard ();
949   vls_get_and_unlock (vlsh);
950   return rv;
951 }
952
953 ssize_t
954 vls_read (vls_handle_t vlsh, void *buf, size_t nbytes)
955 {
956   vcl_locked_session_t *vls;
957   int rv;
958
959   if (!(vls = vls_get_w_dlock (vlsh)))
960     return VPPCOM_EBADFD;
961   vls_mt_guard (vls, VLS_MT_OP_READ);
962   rv = vppcom_session_read (vls_to_sh_tu (vls), buf, nbytes);
963   vls_mt_unguard ();
964   vls_get_and_unlock (vlsh);
965   return rv;
966 }
967
968 ssize_t
969 vls_recvfrom (vls_handle_t vlsh, void *buffer, uint32_t buflen, int flags,
970               vppcom_endpt_t * ep)
971 {
972   vcl_locked_session_t *vls;
973   int rv;
974
975   if (!(vls = vls_get_w_dlock (vlsh)))
976     return VPPCOM_EBADFD;
977   vls_mt_guard (vls, VLS_MT_OP_READ);
978   rv = vppcom_session_recvfrom (vls_to_sh_tu (vls), buffer, buflen, flags,
979                                 ep);
980   vls_mt_unguard ();
981   vls_get_and_unlock (vlsh);
982   return rv;
983 }
984
985 int
986 vls_attr (vls_handle_t vlsh, uint32_t op, void *buffer, uint32_t * buflen)
987 {
988   vcl_locked_session_t *vls;
989   int rv;
990
991   if (PREDICT_FALSE (vcl_get_worker_index () == ~0))
992     vls_mt_add ();
993
994   if (!(vls = vls_get_w_dlock (vlsh)))
995     return VPPCOM_EBADFD;
996   vls_session_migrate (vls);
997   rv = vppcom_session_attr (vls_to_sh_tu (vls), op, buffer, buflen);
998   vls_get_and_unlock (vlsh);
999   return rv;
1000 }
1001
1002 int
1003 vls_bind (vls_handle_t vlsh, vppcom_endpt_t * ep)
1004 {
1005   vcl_locked_session_t *vls;
1006   int rv;
1007
1008   if (!(vls = vls_get_w_dlock (vlsh)))
1009     return VPPCOM_EBADFD;
1010   rv = vppcom_session_bind (vls_to_sh_tu (vls), ep);
1011   vls_get_and_unlock (vlsh);
1012   return rv;
1013 }
1014
1015 int
1016 vls_listen (vls_handle_t vlsh, int q_len)
1017 {
1018   vcl_locked_session_t *vls;
1019   int rv;
1020
1021   if (!(vls = vls_get_w_dlock (vlsh)))
1022     return VPPCOM_EBADFD;
1023   vls_mt_guard (vls, VLS_MT_OP_XPOLL);
1024   rv = vppcom_session_listen (vls_to_sh_tu (vls), q_len);
1025   vls_mt_unguard ();
1026   vls_get_and_unlock (vlsh);
1027   return rv;
1028 }
1029
1030 int
1031 vls_connect (vls_handle_t vlsh, vppcom_endpt_t * server_ep)
1032 {
1033   vcl_locked_session_t *vls;
1034   int rv;
1035
1036   if (!(vls = vls_get_w_dlock (vlsh)))
1037     return VPPCOM_EBADFD;
1038   vls_mt_guard (vls, VLS_MT_OP_XPOLL);
1039   rv = vppcom_session_connect (vls_to_sh_tu (vls), server_ep);
1040   vls_mt_unguard ();
1041   vls_get_and_unlock (vlsh);
1042   return rv;
1043 }
1044
1045 static inline void
1046 vls_mp_checks (vcl_locked_session_t * vls, int is_add)
1047 {
1048   vcl_worker_t *wrk = vcl_worker_get_current ();
1049   vcl_session_t *s;
1050   u32 owner_wrk;
1051
1052   if (vls_mt_wrk_supported ())
1053     return;
1054
1055   s = vcl_session_get (wrk, vls->session_index);
1056   switch (s->session_state)
1057     {
1058     case STATE_LISTEN:
1059       if (is_add)
1060         {
1061           vls_listener_wrk_set (vls, vls->worker_index, 1 /* is_active */ );
1062           break;
1063         }
1064       vls_listener_wrk_stop_listen (vls, vls->worker_index);
1065       break;
1066     case STATE_LISTEN_NO_MQ:
1067       if (!is_add)
1068         break;
1069
1070       /* Register worker as listener */
1071       vls_listener_wrk_start_listen (vls, wrk->wrk_index);
1072
1073       /* If owner worker did not attempt to accept/xpoll on the session,
1074        * force a listen stop for it, since it may not be interested in
1075        * accepting new sessions.
1076        * This is pretty much a hack done to give app workers the illusion
1077        * that it is fine to listen and not accept new sessions for a
1078        * given listener. Without it, we would accumulate unhandled
1079        * accepts on the passive worker message queue. */
1080       owner_wrk = vls_shared_get_owner (vls);
1081       if (!vls_listener_wrk_is_active (vls, owner_wrk))
1082         vls_listener_wrk_stop_listen (vls, owner_wrk);
1083       break;
1084     default:
1085       break;
1086     }
1087 }
1088
1089 vls_handle_t
1090 vls_accept (vls_handle_t listener_vlsh, vppcom_endpt_t * ep, int flags)
1091 {
1092   vls_handle_t accepted_vlsh;
1093   vcl_locked_session_t *vls;
1094   int sh;
1095
1096   if (!(vls = vls_get_w_dlock (listener_vlsh)))
1097     return VPPCOM_EBADFD;
1098   if (vcl_n_workers () > 1)
1099     vls_mp_checks (vls, 1 /* is_add */ );
1100   vls_mt_guard (vls, VLS_MT_OP_SPOOL);
1101   sh = vppcom_session_accept (vls_to_sh_tu (vls), ep, flags);
1102   vls_mt_unguard ();
1103   vls_get_and_unlock (listener_vlsh);
1104   if (sh < 0)
1105     return sh;
1106   accepted_vlsh = vls_alloc (sh);
1107   if (PREDICT_FALSE (accepted_vlsh == VLS_INVALID_HANDLE))
1108     vppcom_session_close (sh);
1109   return accepted_vlsh;
1110 }
1111
1112 vls_handle_t
1113 vls_create (uint8_t proto, uint8_t is_nonblocking)
1114 {
1115   vcl_session_handle_t sh;
1116   vls_handle_t vlsh;
1117
1118   vls_mt_guard (0, VLS_MT_OP_SPOOL);
1119   sh = vppcom_session_create (proto, is_nonblocking);
1120   vls_mt_unguard ();
1121   if (sh == INVALID_SESSION_ID)
1122     return VLS_INVALID_HANDLE;
1123
1124   vlsh = vls_alloc (sh);
1125   if (PREDICT_FALSE (vlsh == VLS_INVALID_HANDLE))
1126     vppcom_session_close (sh);
1127
1128   return vlsh;
1129 }
1130
1131 static void
1132 vls_migrate_session_close (vcl_locked_session_t * vls)
1133 {
1134   u32 session_index, wrk_index;
1135   vcl_worker_t *wrk = vcl_worker_get_current ();
1136
1137   if (!vls_mt_wrk_supported ())
1138     return;
1139
1140   /* *INDENT-OFF* */
1141   hash_foreach (wrk_index, session_index, vls->vcl_wrk_index_to_session_index,
1142     ({
1143       if (vcl_get_worker_index () != wrk_index)
1144         {
1145           vls_send_session_cleanup_rpc (wrk, wrk_index, session_index);
1146         }
1147     }));
1148   /* *INDENT-ON* */
1149   hash_free (vls->vcl_wrk_index_to_session_index);
1150 }
1151
1152 int
1153 vls_close (vls_handle_t vlsh)
1154 {
1155   vcl_locked_session_t *vls;
1156   int rv;
1157
1158   vls_table_wlock ();
1159
1160   vls = vls_get_and_lock (vlsh);
1161   if (!vls)
1162     {
1163       vls_table_wunlock ();
1164       return VPPCOM_EBADFD;
1165     }
1166
1167   vls_mt_guard (vls, VLS_MT_OP_SPOOL);
1168
1169   if (vls_is_shared (vls))
1170     rv = vls_unshare_session (vls, vcl_worker_get_current ());
1171   else
1172     rv = vppcom_session_close (vls_to_sh (vls));
1173
1174   vls_migrate_session_close (vls);
1175
1176   vls_free (vls);
1177   vls_mt_unguard ();
1178
1179   vls_table_wunlock ();
1180
1181   return rv;
1182 }
1183
1184 vls_handle_t
1185 vls_epoll_create (void)
1186 {
1187   vcl_session_handle_t sh;
1188   vls_handle_t vlsh;
1189
1190   if (PREDICT_FALSE (vcl_get_worker_index () == ~0))
1191     vls_mt_add ();
1192
1193   sh = vppcom_epoll_create ();
1194   if (sh == INVALID_SESSION_ID)
1195     return VLS_INVALID_HANDLE;
1196
1197   vlsh = vls_alloc (sh);
1198   if (vlsh == VLS_INVALID_HANDLE)
1199     vppcom_session_close (sh);
1200
1201   return vlsh;
1202 }
1203
1204 static void
1205 vls_epoll_ctl_mp_checks (vcl_locked_session_t * vls, int op)
1206 {
1207   if (vcl_n_workers () <= 1)
1208     {
1209       vlsl->epoll_mp_check = 1;
1210       return;
1211     }
1212
1213   if (op == EPOLL_CTL_MOD)
1214     return;
1215
1216   vlsl->epoll_mp_check = 1;
1217   vls_mp_checks (vls, op == EPOLL_CTL_ADD);
1218 }
1219
1220 int
1221 vls_epoll_ctl (vls_handle_t ep_vlsh, int op, vls_handle_t vlsh,
1222                struct epoll_event *event)
1223 {
1224   vcl_locked_session_t *ep_vls, *vls;
1225   vcl_session_handle_t ep_sh, sh;
1226   int rv;
1227
1228   vls_table_rlock ();
1229   ep_vls = vls_get_and_lock (ep_vlsh);
1230   vls = vls_get_and_lock (vlsh);
1231   vls_session_migrate (ep_vls);
1232   ep_sh = vls_to_sh (ep_vls);
1233   sh = vls_to_sh (vls);
1234
1235   if (PREDICT_FALSE (!vlsl->epoll_mp_check))
1236     vls_epoll_ctl_mp_checks (vls, op);
1237
1238   vls_table_runlock ();
1239
1240   rv = vppcom_epoll_ctl (ep_sh, op, sh, event);
1241
1242   vls_table_rlock ();
1243   ep_vls = vls_get (ep_vlsh);
1244   vls = vls_get (vlsh);
1245   vls_unlock (vls);
1246   vls_unlock (ep_vls);
1247   vls_table_runlock ();
1248   return rv;
1249 }
1250
1251 int
1252 vls_epoll_wait (vls_handle_t ep_vlsh, struct epoll_event *events,
1253                 int maxevents, double wait_for_time)
1254 {
1255   vcl_locked_session_t *vls;
1256   int rv;
1257
1258   if (!(vls = vls_get_w_dlock (ep_vlsh)))
1259     return VPPCOM_EBADFD;
1260   vls_mt_guard (0, VLS_MT_OP_XPOLL);
1261   rv = vppcom_epoll_wait (vls_to_sh_tu (vls), events, maxevents,
1262                           wait_for_time);
1263   vls_mt_unguard ();
1264   vls_get_and_unlock (ep_vlsh);
1265   return rv;
1266 }
1267
1268 static void
1269 vls_select_mp_checks (vcl_si_set * read_map)
1270 {
1271   vcl_locked_session_t *vls;
1272   vcl_worker_t *wrk;
1273   vcl_session_t *s;
1274   u32 si;
1275
1276   if (vcl_n_workers () <= 1)
1277     {
1278       vlsl->select_mp_check = 1;
1279       return;
1280     }
1281
1282   if (!read_map)
1283     return;
1284
1285   vlsl->select_mp_check = 1;
1286   wrk = vcl_worker_get_current ();
1287
1288   /* *INDENT-OFF* */
1289   clib_bitmap_foreach (si, read_map, ({
1290     s = vcl_session_get (wrk, si);
1291     if (s->session_state == STATE_LISTEN)
1292       {
1293         vls = vls_get (vls_session_index_to_vlsh (si));
1294         vls_mp_checks (vls, 1 /* is_add */);
1295       }
1296   }));
1297   /* *INDENT-ON* */
1298 }
1299
1300 int
1301 vls_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map,
1302             vcl_si_set * except_map, double wait_for_time)
1303 {
1304   int rv;
1305
1306   vls_mt_guard (0, VLS_MT_OP_XPOLL);
1307   if (PREDICT_FALSE (!vlsl->select_mp_check))
1308     vls_select_mp_checks (read_map);
1309   rv = vppcom_select (n_bits, read_map, write_map, except_map, wait_for_time);
1310   vls_mt_unguard ();
1311   return rv;
1312 }
1313
1314 static void
1315 vls_unshare_vcl_worker_sessions (vcl_worker_t * wrk)
1316 {
1317   u32 current_wrk, is_current;
1318   vcl_locked_session_t *vls;
1319   vcl_session_t *s;
1320
1321   if (pool_elts (vcm->workers) <= 1)
1322     return;
1323
1324   current_wrk = vcl_get_worker_index ();
1325   is_current = current_wrk == wrk->wrk_index;
1326
1327   /* *INDENT-OFF* */
1328   pool_foreach (s, wrk->sessions, ({
1329     vls = vls_get (vls_si_to_vlsh (s->session_index));
1330     if (vls && (is_current || vls_is_shared_by_wrk (vls, current_wrk)))
1331       vls_unshare_session (vls, wrk);
1332   }));
1333   /* *INDENT-ON* */
1334 }
1335
1336 static void
1337 vls_cleanup_vcl_worker (vcl_worker_t * wrk)
1338 {
1339   vls_worker_t *vls_wrk = vls_worker_get (wrk->wrk_index);
1340
1341   /* Unshare sessions and also cleanup worker since child may have
1342    * called _exit () and therefore vcl may not catch the event */
1343   vls_unshare_vcl_worker_sessions (wrk);
1344   vcl_worker_cleanup (wrk, 1 /* notify vpp */ );
1345
1346   vls_worker_free (vls_wrk);
1347 }
1348
1349 static void
1350 vls_cleanup_forked_child (vcl_worker_t * wrk, vcl_worker_t * child_wrk)
1351 {
1352   vcl_worker_t *sub_child;
1353   int tries = 0;
1354
1355   if (child_wrk->forked_child != ~0)
1356     {
1357       sub_child = vcl_worker_get_if_valid (child_wrk->forked_child);
1358       if (sub_child)
1359         {
1360           /* Wait a bit, maybe the process is going away */
1361           while (kill (sub_child->current_pid, 0) >= 0 && tries++ < 50)
1362             usleep (1e3);
1363           if (kill (sub_child->current_pid, 0) < 0)
1364             vls_cleanup_forked_child (child_wrk, sub_child);
1365         }
1366     }
1367   vls_cleanup_vcl_worker (child_wrk);
1368   VDBG (0, "Cleaned up forked child wrk %u", child_wrk->wrk_index);
1369   wrk->forked_child = ~0;
1370 }
1371
1372 static struct sigaction old_sa;
1373
1374 static void
1375 vls_intercept_sigchld_handler (int signum, siginfo_t * si, void *uc)
1376 {
1377   vcl_worker_t *wrk, *child_wrk;
1378
1379   if (vcl_get_worker_index () == ~0)
1380     return;
1381
1382   if (sigaction (SIGCHLD, &old_sa, 0))
1383     {
1384       VERR ("couldn't restore sigchld");
1385       exit (-1);
1386     }
1387
1388   wrk = vcl_worker_get_current ();
1389   if (wrk->forked_child == ~0)
1390     return;
1391
1392   child_wrk = vcl_worker_get_if_valid (wrk->forked_child);
1393   if (!child_wrk)
1394     goto done;
1395
1396   if (si && si->si_pid != child_wrk->current_pid)
1397     {
1398       VDBG (0, "unexpected child pid %u", si->si_pid);
1399       goto done;
1400     }
1401   vls_cleanup_forked_child (wrk, child_wrk);
1402
1403 done:
1404   if (old_sa.sa_flags & SA_SIGINFO)
1405     {
1406       void (*fn) (int, siginfo_t *, void *) = old_sa.sa_sigaction;
1407       fn (signum, si, uc);
1408     }
1409   else
1410     {
1411       void (*fn) (int) = old_sa.sa_handler;
1412       if (fn)
1413         fn (signum);
1414     }
1415 }
1416
1417 static void
1418 vls_incercept_sigchld ()
1419 {
1420   struct sigaction sa;
1421   clib_memset (&sa, 0, sizeof (sa));
1422   sa.sa_sigaction = vls_intercept_sigchld_handler;
1423   sa.sa_flags = SA_SIGINFO;
1424   if (sigaction (SIGCHLD, &sa, &old_sa))
1425     {
1426       VERR ("couldn't intercept sigchld");
1427       exit (-1);
1428     }
1429 }
1430
1431 static void
1432 vls_app_pre_fork (void)
1433 {
1434   vls_incercept_sigchld ();
1435   vcl_flush_mq_events ();
1436 }
1437
1438 static void
1439 vls_app_fork_child_handler (void)
1440 {
1441   vcl_worker_t *parent_wrk;
1442   int rv, parent_wrk_index;
1443   u8 *child_name;
1444
1445   parent_wrk_index = vcl_get_worker_index ();
1446   VDBG (0, "initializing forked child %u with parent wrk %u", getpid (),
1447         parent_wrk_index);
1448
1449   /*
1450    * Allocate worker vcl
1451    */
1452   vcl_set_worker_index (~0);
1453   if (!vcl_worker_alloc_and_init ())
1454     VERR ("couldn't allocate new worker");
1455
1456   /*
1457    * Attach to binary api
1458    */
1459   child_name = format (0, "%v-child-%u%c", vcm->app_name, getpid (), 0);
1460   vcl_cleanup_bapi ();
1461   vppcom_api_hookup ();
1462   vcm->app_state = STATE_APP_START;
1463   rv = vppcom_connect_to_vpp ((char *) child_name);
1464   vec_free (child_name);
1465   if (rv)
1466     {
1467       VERR ("couldn't connect to VPP!");
1468       return;
1469     }
1470
1471   /*
1472    * Allocate/initialize vls worker
1473    */
1474   vls_worker_alloc ();
1475
1476   /*
1477    * Register worker with vpp and share sessions
1478    */
1479   vcl_worker_register_with_vpp ();
1480   parent_wrk = vcl_worker_get (parent_wrk_index);
1481   vls_worker_copy_on_fork (parent_wrk);
1482   parent_wrk->forked_child = vcl_get_worker_index ();
1483
1484   /* Reset number of threads and set wrk index */
1485   vlsl->vls_mt_n_threads = 0;
1486   vlsl->vls_wrk_index = vcl_get_worker_index ();
1487   vlsl->select_mp_check = 0;
1488   vlsl->epoll_mp_check = 0;
1489   vls_mt_locks_init ();
1490
1491   VDBG (0, "forked child main worker initialized");
1492   vcm->forking = 0;
1493 }
1494
1495 static void
1496 vls_app_fork_parent_handler (void)
1497 {
1498   vcm->forking = 1;
1499   while (vcm->forking)
1500     ;
1501 }
1502
1503 void
1504 vls_app_exit (void)
1505 {
1506   vls_worker_t *wrk = vls_worker_get_current ();
1507
1508   /* Unshare the sessions. VCL will clean up the worker */
1509   vls_unshare_vcl_worker_sessions (vcl_worker_get_current ());
1510   vls_worker_free (wrk);
1511 }
1512
1513 static void
1514 vls_clone_and_share_rpc_handler (void *args)
1515 {
1516   vls_clone_and_share_msg_t *msg = (vls_clone_and_share_msg_t *) args;
1517   vls_worker_t *wrk = vls_worker_get_current (), *dst_wrk;
1518   vcl_locked_session_t *vls, *dst_vls;
1519   vcl_worker_t *vcl_wrk = vcl_worker_get_current (), *dst_vcl_wrk;
1520   vcl_session_t *s, *dst_s;
1521
1522   vls = vls_session_get (wrk, msg->vls_index);
1523
1524   if (!vls_mt_wrk_supported ())
1525     vls_init_share_session (wrk, vls);
1526
1527   s = vcl_session_get (vcl_wrk, msg->session_index);
1528   dst_wrk = vls_worker_get (msg->origin_vls_wrk);
1529   dst_vcl_wrk = vcl_worker_get (msg->origin_vcl_wrk);
1530   dst_vls = vls_session_get (dst_wrk, msg->origin_vls_index);
1531   dst_vls->shared_data_index = vls->shared_data_index;
1532   dst_s = vcl_session_get (dst_vcl_wrk, msg->origin_session_index);
1533   clib_memcpy (dst_s, s, sizeof (*s));
1534
1535   dst_vcl_wrk->rpc_done = 1;
1536
1537   VDBG (1, "proces session clone of worker (session): %u (%u) -> %u (%u)",
1538         vcl_wrk->wrk_index, msg->session_index, dst_vcl_wrk->wrk_index,
1539         msg->origin_session_index);
1540 }
1541
1542 static void
1543 vls_session_cleanup_rpc_handler (void *args)
1544 {
1545   vls_sess_cleanup_msg_t *msg = (vls_sess_cleanup_msg_t *) args;
1546   vcl_worker_t *wrk = vcl_worker_get_current ();
1547   vcl_worker_t *dst_wrk = vcl_worker_get (msg->origin_vcl_wrk);
1548
1549   vppcom_session_close (vcl_session_handle_from_index (msg->session_index));
1550
1551   dst_wrk->rpc_done = 1;
1552
1553   VDBG (1, "proces session cleanup of worker (session): %u (%u) from %u ()",
1554         wrk->wrk_index, msg->session_index, dst_wrk->wrk_index);
1555 }
1556
1557 static void
1558 vls_rpc_handler (void *args)
1559 {
1560   vls_rpc_msg_t *msg = (vls_rpc_msg_t *) args;
1561   switch (msg->type)
1562     {
1563     case VLS_RPC_CLONE_AND_SHARE:
1564       vls_clone_and_share_rpc_handler (msg->data);
1565       break;
1566     case VLS_RPC_SESS_CLEANUP:
1567       vls_session_cleanup_rpc_handler (msg->data);
1568       break;
1569     default:
1570       break;
1571     }
1572 }
1573
1574 void
1575 vls_send_clone_and_share_rpc (vcl_worker_t * wrk,
1576                               vcl_locked_session_t * vls, u32 session_index,
1577                               u32 vls_wrk_index, u32 dst_wrk_index,
1578                               u32 dst_vls_index, u32 dst_session_index)
1579 {
1580   u8 data[sizeof (u8) + sizeof (vls_clone_and_share_msg_t)];
1581   vls_clone_and_share_msg_t *msg;
1582   vls_rpc_msg_t *rpc;
1583   int ret;
1584
1585   rpc = (vls_rpc_msg_t *) & data;
1586   rpc->type = VLS_RPC_CLONE_AND_SHARE;
1587   msg = (vls_clone_and_share_msg_t *) & rpc->data;
1588   msg->origin_vls_wrk = vls_wrk_index;
1589   msg->origin_vls_index = vls->vls_index;
1590   msg->origin_vcl_wrk = wrk->wrk_index;
1591   msg->origin_session_index = session_index;
1592   msg->vls_index = dst_vls_index;
1593   msg->session_index = dst_session_index;
1594
1595   wrk->rpc_done = 0;
1596   ret = vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data));
1597
1598   VDBG (1,
1599         "send session clone of worker (session): %u (%u) -> %u (%u), ret=%d",
1600         dst_wrk_index, msg->session_index, msg->origin_vcl_wrk,
1601         msg->origin_session_index, ret);
1602   while (!ret && !wrk->rpc_done)
1603     ;
1604 }
1605
1606 void
1607 vls_send_session_cleanup_rpc (vcl_worker_t * wrk,
1608                               u32 dst_wrk_index, u32 dst_session_index)
1609 {
1610   u8 data[sizeof (u8) + sizeof (vls_sess_cleanup_msg_t)];
1611   vls_sess_cleanup_msg_t *msg;
1612   vls_rpc_msg_t *rpc;
1613   int ret;
1614
1615   rpc = (vls_rpc_msg_t *) & data;
1616   rpc->type = VLS_RPC_SESS_CLEANUP;
1617   msg = (vls_sess_cleanup_msg_t *) & rpc->data;
1618   msg->origin_vcl_wrk = wrk->wrk_index;
1619   msg->session_index = dst_session_index;
1620
1621   wrk->rpc_done = 0;
1622   ret = vcl_send_worker_rpc (dst_wrk_index, rpc, sizeof (data));
1623
1624   VDBG (1,
1625         "send session cleanup of worker (session): %u (%u) from %u (), ret=%d",
1626         dst_wrk_index, msg->session_index, msg->origin_vcl_wrk, ret);
1627   while (!ret && !wrk->rpc_done)
1628     ;
1629 }
1630
1631 int
1632 vls_app_create (char *app_name)
1633 {
1634   int rv;
1635
1636   if ((rv = vppcom_app_create (app_name)))
1637     return rv;
1638
1639   vlsm = clib_mem_alloc (sizeof (vls_main_t));
1640   clib_memset (vlsm, 0, sizeof (*vlsm));
1641   clib_rwlock_init (&vlsm->vls_table_lock);
1642   clib_rwlock_init (&vlsm->shared_data_lock);
1643   pool_alloc (vlsm->workers, vcm->cfg.max_workers);
1644
1645   pthread_atfork (vls_app_pre_fork, vls_app_fork_parent_handler,
1646                   vls_app_fork_child_handler);
1647   atexit (vls_app_exit);
1648   vls_worker_alloc ();
1649   vlsl->vls_wrk_index = vcl_get_worker_index ();
1650   vls_mt_locks_init ();
1651   vcm->wrk_rpc_fn = vls_rpc_handler;
1652   return VPPCOM_OK;
1653 }
1654
1655 unsigned char
1656 vls_use_eventfd (void)
1657 {
1658   return vcm->cfg.use_mq_eventfd;
1659 }
1660
1661 unsigned char
1662 vls_mt_wrk_supported (void)
1663 {
1664   return vcm->cfg.mt_supported;
1665 }
1666
1667 int
1668 vls_use_real_epoll (void)
1669 {
1670   if (vcl_get_worker_index () == ~0)
1671     return 0;
1672
1673   return vcl_worker_get_current ()->vcl_needs_real_epoll;
1674 }
1675
1676 void
1677 vls_register_vcl_worker (void)
1678 {
1679   if (vppcom_worker_register () != VPPCOM_OK)
1680     {
1681       VERR ("failed to register worker");
1682       return;
1683     }
1684 }
1685
1686 /*
1687  * fd.io coding-style-patch-verification: ON
1688  *
1689  * Local Variables:
1690  * eval: (c-set-style "gnu")
1691  * End:
1692  */