session: api to add new transport types
[vpp.git] / src / vcl / vcl_locked.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vcl/vcl_locked.h>
17 #include <vcl/vcl_private.h>
18
19 typedef struct vls_shared_data_
20 {
21   clib_spinlock_t lock;
22   u32 owner_wrk_index;
23   u32 *workers_subscribed;
24   clib_bitmap_t *listeners;
25 } vls_shared_data_t;
26
27 typedef struct vcl_locked_session_
28 {
29   clib_spinlock_t lock;
30   u32 session_index;
31   u32 worker_index;
32   u32 vls_index;
33   u32 shared_data_index;
34 } vcl_locked_session_t;
35
36 typedef struct vls_worker_
37 {
38   vcl_locked_session_t *vls_pool;
39   uword *session_index_to_vlsh_table;
40   u32 wrk_index;
41 } vls_worker_t;
42
43 typedef struct vls_local_
44 {
45   int vls_wrk_index;
46   volatile int vls_mt_n_threads;
47   pthread_mutex_t vls_mt_mq_mlock;
48   pthread_mutex_t vls_mt_spool_mlock;
49   volatile u8 select_mp_check;
50   volatile u8 epoll_mp_check;
51 } vls_process_local_t;
52
53 static vls_process_local_t vls_local;
54 static vls_process_local_t *vlsl = &vls_local;
55
56 typedef struct vls_main_
57 {
58   vls_worker_t *workers;
59   clib_rwlock_t vls_table_lock;
60   /** Pool of data shared by sessions owned by different workers */
61   vls_shared_data_t *shared_data_pool;
62   clib_rwlock_t shared_data_lock;
63 } vls_main_t;
64
65 vls_main_t *vlsm;
66
67 static inline u32
68 vls_get_worker_index (void)
69 {
70   return vcl_get_worker_index ();
71 }
72
73 static u32
74 vls_shared_data_alloc (void)
75 {
76   vls_shared_data_t *vls_shd;
77   u32 shd_index;
78
79   clib_rwlock_writer_lock (&vlsm->shared_data_lock);
80   pool_get_zero (vlsm->shared_data_pool, vls_shd);
81   clib_spinlock_init (&vls_shd->lock);
82   shd_index = vls_shd - vlsm->shared_data_pool;
83   clib_rwlock_writer_unlock (&vlsm->shared_data_lock);
84
85   return shd_index;
86 }
87
88 static u32
89 vls_shared_data_index (vls_shared_data_t * vls_shd)
90 {
91   return vls_shd - vlsm->shared_data_pool;
92 }
93
94 vls_shared_data_t *
95 vls_shared_data_get (u32 shd_index)
96 {
97   if (pool_is_free_index (vlsm->shared_data_pool, shd_index))
98     return 0;
99   return pool_elt_at_index (vlsm->shared_data_pool, shd_index);
100 }
101
102 static void
103 vls_shared_data_free (u32 shd_index)
104 {
105   vls_shared_data_t *vls_shd;
106
107   clib_rwlock_writer_lock (&vlsm->shared_data_lock);
108   vls_shd = vls_shared_data_get (shd_index);
109   clib_spinlock_free (&vls_shd->lock);
110   clib_bitmap_free (vls_shd->listeners);
111   vec_free (vls_shd->workers_subscribed);
112   pool_put (vlsm->shared_data_pool, vls_shd);
113   clib_rwlock_writer_unlock (&vlsm->shared_data_lock);
114 }
115
116 static inline void
117 vls_shared_data_pool_rlock (void)
118 {
119   clib_rwlock_reader_lock (&vlsm->shared_data_lock);
120 }
121
122 static inline void
123 vls_shared_data_pool_runlock (void)
124 {
125   clib_rwlock_reader_unlock (&vlsm->shared_data_lock);
126 }
127
128 static inline void
129 vls_table_rlock (void)
130 {
131   if (vlsl->vls_mt_n_threads > 1)
132     clib_rwlock_reader_lock (&vlsm->vls_table_lock);
133 }
134
135 static inline void
136 vls_table_runlock (void)
137 {
138   if (vlsl->vls_mt_n_threads > 1)
139     clib_rwlock_reader_unlock (&vlsm->vls_table_lock);
140 }
141
142 static inline void
143 vls_table_wlock (void)
144 {
145   if (vlsl->vls_mt_n_threads > 1)
146     clib_rwlock_writer_lock (&vlsm->vls_table_lock);
147 }
148
149 static inline void
150 vls_table_wunlock (void)
151 {
152   if (vlsl->vls_mt_n_threads > 1)
153     clib_rwlock_writer_unlock (&vlsm->vls_table_lock);
154 }
155
156 typedef enum
157 {
158   VLS_MT_OP_READ,
159   VLS_MT_OP_WRITE,
160   VLS_MT_OP_SPOOL,
161   VLS_MT_OP_XPOLL,
162 } vls_mt_ops_t;
163
164 typedef enum
165 {
166   VLS_MT_LOCK_MQ = 1 << 0,
167   VLS_MT_LOCK_SPOOL = 1 << 1
168 } vls_mt_lock_type_t;
169
170 static void
171 vls_mt_add (void)
172 {
173   vlsl->vls_mt_n_threads += 1;
174   vcl_set_worker_index (vlsl->vls_wrk_index);
175 }
176
177 static inline void
178 vls_mt_mq_lock (void)
179 {
180   pthread_mutex_lock (&vlsl->vls_mt_mq_mlock);
181 }
182
183 static inline void
184 vls_mt_mq_unlock (void)
185 {
186   pthread_mutex_unlock (&vlsl->vls_mt_mq_mlock);
187 }
188
189 static inline void
190 vls_mt_spool_lock (void)
191 {
192   pthread_mutex_lock (&vlsl->vls_mt_spool_mlock);
193 }
194
195 static inline void
196 vls_mt_create_unlock (void)
197 {
198   pthread_mutex_unlock (&vlsl->vls_mt_spool_mlock);
199 }
200
201 static void
202 vls_mt_locks_init (void)
203 {
204   pthread_mutex_init (&vlsl->vls_mt_mq_mlock, NULL);
205   pthread_mutex_init (&vlsl->vls_mt_spool_mlock, NULL);
206 }
207
208 u8
209 vls_is_shared (vcl_locked_session_t * vls)
210 {
211   return (vls->shared_data_index != ~0);
212 }
213
214 static inline void
215 vls_lock (vcl_locked_session_t * vls)
216 {
217   if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls))
218     clib_spinlock_lock (&vls->lock);
219 }
220
221 static inline void
222 vls_unlock (vcl_locked_session_t * vls)
223 {
224   if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls))
225     clib_spinlock_unlock (&vls->lock);
226 }
227
228 static inline vcl_session_handle_t
229 vls_to_sh (vcl_locked_session_t * vls)
230 {
231   return vcl_session_handle_from_index (vls->session_index);
232 }
233
234 static inline vcl_session_handle_t
235 vls_to_sh_tu (vcl_locked_session_t * vls)
236 {
237   vcl_session_handle_t sh;
238   sh = vls_to_sh (vls);
239   vls_table_runlock ();
240   return sh;
241 }
242
243 static vls_worker_t *
244 vls_worker_get_current (void)
245 {
246   return pool_elt_at_index (vlsm->workers, vls_get_worker_index ());
247 }
248
249 static void
250 vls_worker_alloc (void)
251 {
252   vls_worker_t *wrk;
253
254   pool_get_zero (vlsm->workers, wrk);
255   wrk->wrk_index = vcl_get_worker_index ();
256 }
257
258 static void
259 vls_worker_free (vls_worker_t * wrk)
260 {
261   hash_free (wrk->session_index_to_vlsh_table);
262   pool_free (wrk->vls_pool);
263   pool_put (vlsm->workers, wrk);
264 }
265
266 static vls_worker_t *
267 vls_worker_get (u32 wrk_index)
268 {
269   if (pool_is_free_index (vlsm->workers, wrk_index))
270     return 0;
271   return pool_elt_at_index (vlsm->workers, wrk_index);
272 }
273
274 static vls_handle_t
275 vls_alloc (vcl_session_handle_t sh)
276 {
277   vls_worker_t *wrk = vls_worker_get_current ();
278   vcl_locked_session_t *vls;
279
280   vls_table_wlock ();
281
282   pool_get_zero (wrk->vls_pool, vls);
283   vls->session_index = vppcom_session_index (sh);
284   vls->worker_index = vppcom_session_worker (sh);
285   vls->vls_index = vls - wrk->vls_pool;
286   vls->shared_data_index = ~0;
287   hash_set (wrk->session_index_to_vlsh_table, vls->session_index,
288             vls->vls_index);
289   clib_spinlock_init (&vls->lock);
290
291   vls_table_wunlock ();
292   return vls->vls_index;
293 }
294
295 static vcl_locked_session_t *
296 vls_get (vls_handle_t vlsh)
297 {
298   vls_worker_t *wrk = vls_worker_get_current ();
299   if (pool_is_free_index (wrk->vls_pool, vlsh))
300     return 0;
301   return pool_elt_at_index (wrk->vls_pool, vlsh);
302 }
303
304 static void
305 vls_free (vcl_locked_session_t * vls)
306 {
307   vls_worker_t *wrk = vls_worker_get_current ();
308
309   ASSERT (vls != 0);
310   hash_unset (wrk->session_index_to_vlsh_table, vls->session_index);
311   clib_spinlock_free (&vls->lock);
312   pool_put (wrk->vls_pool, vls);
313 }
314
315 static vcl_locked_session_t *
316 vls_get_and_lock (vls_handle_t vlsh)
317 {
318   vls_worker_t *wrk = vls_worker_get_current ();
319   vcl_locked_session_t *vls;
320   if (pool_is_free_index (wrk->vls_pool, vlsh))
321     return 0;
322   vls = pool_elt_at_index (wrk->vls_pool, vlsh);
323   vls_lock (vls);
324   return vls;
325 }
326
327 static vcl_locked_session_t *
328 vls_get_w_dlock (vls_handle_t vlsh)
329 {
330   vcl_locked_session_t *vls;
331   vls_table_rlock ();
332   vls = vls_get_and_lock (vlsh);
333   if (!vls)
334     vls_table_runlock ();
335   return vls;
336 }
337
338 static inline void
339 vls_get_and_unlock (vls_handle_t vlsh)
340 {
341   vcl_locked_session_t *vls;
342   vls_table_rlock ();
343   vls = vls_get (vlsh);
344   vls_unlock (vls);
345   vls_table_runlock ();
346 }
347
348 static inline void
349 vls_dunlock (vcl_locked_session_t * vls)
350 {
351   vls_unlock (vls);
352   vls_table_runlock ();
353 }
354
355 static vcl_locked_session_t *
356 vls_session_get (vls_worker_t * wrk, u32 vls_index)
357 {
358   if (pool_is_free_index (wrk->vls_pool, vls_index))
359     return 0;
360   return pool_elt_at_index (wrk->vls_pool, vls_index);
361 }
362
363 vcl_session_handle_t
364 vlsh_to_sh (vls_handle_t vlsh)
365 {
366   vcl_locked_session_t *vls;
367   int rv;
368
369   vls = vls_get_w_dlock (vlsh);
370   if (!vls)
371     return INVALID_SESSION_ID;
372   rv = vls_to_sh (vls);
373   vls_dunlock (vls);
374   return rv;
375 }
376
377 vcl_session_handle_t
378 vlsh_to_session_index (vls_handle_t vlsh)
379 {
380   vcl_session_handle_t sh;
381   sh = vlsh_to_sh (vlsh);
382   return vppcom_session_index (sh);
383 }
384
385 vls_handle_t
386 vls_si_to_vlsh (u32 session_index)
387 {
388   vls_worker_t *wrk = vls_worker_get_current ();
389   uword *vlshp;
390   vlshp = hash_get (wrk->session_index_to_vlsh_table, session_index);
391   return vlshp ? *vlshp : VLS_INVALID_HANDLE;
392 }
393
394 vls_handle_t
395 vls_session_index_to_vlsh (uint32_t session_index)
396 {
397   vls_handle_t vlsh;
398
399   vls_table_rlock ();
400   vlsh = vls_si_to_vlsh (session_index);
401   vls_table_runlock ();
402
403   return vlsh;
404 }
405
406 u8
407 vls_is_shared_by_wrk (vcl_locked_session_t * vls, u32 wrk_index)
408 {
409   vls_shared_data_t *vls_shd;
410   int i;
411
412   if (vls->shared_data_index == ~0)
413     return 0;
414
415   vls_shared_data_pool_rlock ();
416
417   vls_shd = vls_shared_data_get (vls->shared_data_index);
418   clib_spinlock_lock (&vls_shd->lock);
419
420   for (i = 0; i < vec_len (vls_shd->workers_subscribed); i++)
421     if (vls_shd->workers_subscribed[i] == wrk_index)
422       {
423         clib_spinlock_unlock (&vls_shd->lock);
424         vls_shared_data_pool_runlock ();
425         return 1;
426       }
427   clib_spinlock_unlock (&vls_shd->lock);
428
429   vls_shared_data_pool_runlock ();
430   return 0;
431 }
432
433 static void
434 vls_listener_wrk_set (vcl_locked_session_t * vls, u32 wrk_index, u8 is_active)
435 {
436   vls_shared_data_t *vls_shd;
437
438   if (vls->shared_data_index == ~0)
439     {
440       clib_warning ("not a shared session");
441       return;
442     }
443
444   vls_shared_data_pool_rlock ();
445
446   vls_shd = vls_shared_data_get (vls->shared_data_index);
447
448   clib_spinlock_lock (&vls_shd->lock);
449   clib_bitmap_set (vls_shd->listeners, wrk_index, is_active);
450   clib_spinlock_unlock (&vls_shd->lock);
451
452   vls_shared_data_pool_runlock ();
453 }
454
455 static u32
456 vls_shared_get_owner (vcl_locked_session_t * vls)
457 {
458   vls_shared_data_t *vls_shd;
459   u32 owner_wrk;
460
461   vls_shared_data_pool_rlock ();
462
463   vls_shd = vls_shared_data_get (vls->shared_data_index);
464   owner_wrk = vls_shd->owner_wrk_index;
465
466   vls_shared_data_pool_runlock ();
467
468   return owner_wrk;
469 }
470
471 static u8
472 vls_listener_wrk_is_active (vcl_locked_session_t * vls, u32 wrk_index)
473 {
474   vls_shared_data_t *vls_shd;
475   u8 is_set;
476
477   if (vls->shared_data_index == ~0)
478     {
479       clib_warning ("not a shared session");
480       return 0;
481     }
482
483   vls_shared_data_pool_rlock ();
484
485   vls_shd = vls_shared_data_get (vls->shared_data_index);
486
487   clib_spinlock_lock (&vls_shd->lock);
488   is_set = clib_bitmap_get (vls_shd->listeners, wrk_index);
489   clib_spinlock_unlock (&vls_shd->lock);
490
491   vls_shared_data_pool_runlock ();
492
493   return (is_set == 1);
494 }
495
496 static void
497 vls_listener_wrk_start_listen (vcl_locked_session_t * vls, u32 wrk_index)
498 {
499   vppcom_session_listen (vls_to_sh (vls), ~0);
500   vls_listener_wrk_set (vls, wrk_index, 1 /* is_active */ );
501 }
502
503 static void
504 vls_listener_wrk_stop_listen (vcl_locked_session_t * vls, u32 wrk_index)
505 {
506   vcl_worker_t *wrk;
507   vcl_session_t *s;
508
509   wrk = vcl_worker_get (wrk_index);
510   s = vcl_session_get (wrk, vls->session_index);
511   if (s->session_state != STATE_LISTEN)
512     return;
513   vcl_send_session_unlisten (wrk, s);
514   s->session_state = STATE_LISTEN_NO_MQ;
515   vls_listener_wrk_set (vls, wrk_index, 0 /* is_active */ );
516 }
517
518 static int
519 vls_shared_data_subscriber_position (vls_shared_data_t * vls_shd,
520                                      u32 wrk_index)
521 {
522   int i;
523
524   for (i = 0; i < vec_len (vls_shd->workers_subscribed); i++)
525     {
526       if (vls_shd->workers_subscribed[i] == wrk_index)
527         return i;
528     }
529   return -1;
530 }
531
532 int
533 vls_unshare_session (vcl_locked_session_t * vls, vcl_worker_t * wrk)
534 {
535   vls_shared_data_t *vls_shd;
536   int do_disconnect, pos;
537   u32 n_subscribers;
538   vcl_session_t *s;
539
540   ASSERT (vls->shared_data_index != ~0);
541
542   s = vcl_session_get (wrk, vls->session_index);
543   if (s->session_state == STATE_LISTEN)
544     vls_listener_wrk_set (vls, wrk->wrk_index, 0 /* is_active */ );
545
546   vls_shared_data_pool_rlock ();
547
548   vls_shd = vls_shared_data_get (vls->shared_data_index);
549   clib_spinlock_lock (&vls_shd->lock);
550
551   pos = vls_shared_data_subscriber_position (vls_shd, wrk->wrk_index);
552   if (pos < 0)
553     {
554       clib_warning ("worker %u not subscribed for vls %u", wrk->wrk_index,
555                     vls->worker_index);
556       goto done;
557     }
558
559   /*
560    * Unsubscribe from share data and fifos
561    */
562   if (s->rx_fifo)
563     {
564       svm_fifo_del_subscriber (s->rx_fifo, wrk->vpp_wrk_index);
565       svm_fifo_del_subscriber (s->tx_fifo, wrk->vpp_wrk_index);
566     }
567   vec_del1 (vls_shd->workers_subscribed, pos);
568
569   /*
570    * Cleanup vcl state
571    */
572   n_subscribers = vec_len (vls_shd->workers_subscribed);
573   do_disconnect = s->session_state == STATE_LISTEN || !n_subscribers;
574   vcl_session_cleanup (wrk, s, vcl_session_handle (s), do_disconnect);
575
576   /*
577    * No subscriber left, cleanup shared data
578    */
579   if (!n_subscribers)
580     {
581       u32 shd_index = vls_shared_data_index (vls_shd);
582
583       clib_spinlock_unlock (&vls_shd->lock);
584       vls_shared_data_pool_runlock ();
585
586       vls_shared_data_free (shd_index);
587
588       /* All locks have been dropped */
589       return 0;
590     }
591
592   /* Return, if this is not the owning worker */
593   if (vls_shd->owner_wrk_index != wrk->wrk_index)
594     goto done;
595
596   ASSERT (vec_len (vls_shd->workers_subscribed));
597
598   /*
599    *  Check if we can change owner or close
600    */
601   vls_shd->owner_wrk_index = vls_shd->workers_subscribed[0];
602   vcl_send_session_worker_update (wrk, s, vls_shd->owner_wrk_index);
603
604   /* XXX is this still needed? */
605   if (vec_len (vls_shd->workers_subscribed) > 1)
606     clib_warning ("more workers need to be updated");
607
608 done:
609
610   clib_spinlock_unlock (&vls_shd->lock);
611   vls_shared_data_pool_runlock ();
612
613   return 0;
614 }
615
616 void
617 vls_share_session (vcl_locked_session_t * vls, vls_worker_t * vls_wrk,
618                    vls_worker_t * vls_parent_wrk, vcl_worker_t * vcl_wrk)
619 {
620   vcl_locked_session_t *parent_vls;
621   vls_shared_data_t *vls_shd;
622   vcl_session_t *s;
623
624   s = vcl_session_get (vcl_wrk, vls->session_index);
625   if (!s)
626     {
627       clib_warning ("wrk %u parent %u session %u vls %u NOT AVAILABLE",
628                     vcl_wrk->wrk_index, vls_parent_wrk->wrk_index,
629                     vls->session_index, vls->vls_index);
630       return;
631     }
632
633   /* Reinit session lock */
634   clib_spinlock_init (&vls->lock);
635
636   if (vls->shared_data_index != ~0)
637     {
638       vls_shared_data_pool_rlock ();
639       vls_shd = vls_shared_data_get (vls->shared_data_index);
640     }
641   else
642     {
643       u32 vls_shd_index = vls_shared_data_alloc ();
644
645       vls_shared_data_pool_rlock ();
646
647       vls_shd = vls_shared_data_get (vls_shd_index);
648       vls_shd->owner_wrk_index = vls_parent_wrk->wrk_index;
649       vls->shared_data_index = vls_shd_index;
650
651       /* Update parent shared data */
652       parent_vls = vls_session_get (vls_parent_wrk, vls->vls_index);
653       parent_vls->shared_data_index = vls_shd_index;
654       vec_add1 (vls_shd->workers_subscribed, vls_parent_wrk->wrk_index);
655     }
656
657   clib_spinlock_lock (&vls_shd->lock);
658
659   vec_add1 (vls_shd->workers_subscribed, vls_wrk->wrk_index);
660
661   clib_spinlock_unlock (&vls_shd->lock);
662   vls_shared_data_pool_runlock ();
663
664   if (s->rx_fifo)
665     {
666       svm_fifo_add_subscriber (s->rx_fifo, vcl_wrk->vpp_wrk_index);
667       svm_fifo_add_subscriber (s->tx_fifo, vcl_wrk->vpp_wrk_index);
668     }
669   else if (s->session_state == STATE_LISTEN)
670     {
671       s->session_state = STATE_LISTEN_NO_MQ;
672     }
673 }
674
675 static void
676 vls_share_sessions (vls_worker_t * vls_parent_wrk, vls_worker_t * vls_wrk)
677 {
678   vcl_worker_t *vcl_wrk = vcl_worker_get (vls_wrk->wrk_index);
679   vcl_locked_session_t *vls;
680
681   /* *INDENT-OFF* */
682   pool_foreach (vls, vls_wrk->vls_pool, ({
683     vls_share_session (vls, vls_wrk, vls_parent_wrk, vcl_wrk);
684   }));
685   /* *INDENT-ON* */
686 }
687
688 void
689 vls_worker_copy_on_fork (vcl_worker_t * parent_wrk)
690 {
691   vls_worker_t *vls_wrk = vls_worker_get_current (), *vls_parent_wrk;
692   vcl_worker_t *wrk = vcl_worker_get_current ();
693
694   /*
695    * init vcl worker
696    */
697   wrk->vpp_event_queues = vec_dup (parent_wrk->vpp_event_queues);
698   wrk->sessions = pool_dup (parent_wrk->sessions);
699   wrk->session_index_by_vpp_handles =
700     hash_dup (parent_wrk->session_index_by_vpp_handles);
701
702   /*
703    * init vls worker
704    */
705   vls_parent_wrk = vls_worker_get (parent_wrk->wrk_index);
706   vls_wrk->session_index_to_vlsh_table =
707     hash_dup (vls_parent_wrk->session_index_to_vlsh_table);
708   vls_wrk->vls_pool = pool_dup (vls_parent_wrk->vls_pool);
709
710   vls_share_sessions (vls_parent_wrk, vls_wrk);
711 }
712
713 static void
714 vls_mt_acq_locks (vcl_locked_session_t * vls, vls_mt_ops_t op, int *locks_acq)
715 {
716   vcl_worker_t *wrk = vcl_worker_get_current ();
717   vcl_session_t *s = 0;
718   int is_nonblk = 0;
719
720   if (vls)
721     {
722       s = vcl_session_get (wrk, vls->session_index);
723       if (PREDICT_FALSE (!s))
724         return;
725       is_nonblk = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
726     }
727
728   switch (op)
729     {
730     case VLS_MT_OP_READ:
731       if (!is_nonblk)
732         is_nonblk = vcl_session_read_ready (s) != 0;
733       if (!is_nonblk)
734         {
735           vls_mt_mq_lock ();
736           *locks_acq |= VLS_MT_LOCK_MQ;
737         }
738       break;
739     case VLS_MT_OP_WRITE:
740       ASSERT (s);
741       if (!is_nonblk)
742         is_nonblk = vcl_session_write_ready (s) != 0;
743       if (!is_nonblk)
744         {
745           vls_mt_mq_lock ();
746           *locks_acq |= VLS_MT_LOCK_MQ;
747         }
748       break;
749     case VLS_MT_OP_XPOLL:
750       vls_mt_mq_lock ();
751       *locks_acq |= VLS_MT_LOCK_MQ;
752       break;
753     case VLS_MT_OP_SPOOL:
754       vls_mt_spool_lock ();
755       *locks_acq |= VLS_MT_LOCK_SPOOL;
756       break;
757     default:
758       break;
759     }
760 }
761
762 static void
763 vls_mt_rel_locks (int locks_acq)
764 {
765   if (locks_acq & VLS_MT_LOCK_MQ)
766     vls_mt_mq_unlock ();
767   if (locks_acq & VLS_MT_LOCK_SPOOL)
768     vls_mt_create_unlock ();
769 }
770
771 #define vls_mt_guard(_vls, _op)                         \
772   int _locks_acq = 0;                                   \
773   if (PREDICT_FALSE (vcl_get_worker_index () == ~0))    \
774     vls_mt_add ();                                      \
775   if (PREDICT_FALSE (vlsl->vls_mt_n_threads > 1))       \
776     vls_mt_acq_locks (_vls, _op, &_locks_acq);          \
777
778 #define vls_mt_unguard()                                \
779   if (PREDICT_FALSE (_locks_acq))                       \
780     vls_mt_rel_locks (_locks_acq)
781
782 int
783 vls_write (vls_handle_t vlsh, void *buf, size_t nbytes)
784 {
785   vcl_locked_session_t *vls;
786   int rv;
787
788   if (!(vls = vls_get_w_dlock (vlsh)))
789     return VPPCOM_EBADFD;
790
791   vls_mt_guard (vls, VLS_MT_OP_WRITE);
792   rv = vppcom_session_write (vls_to_sh_tu (vls), buf, nbytes);
793   vls_mt_unguard ();
794   vls_get_and_unlock (vlsh);
795   return rv;
796 }
797
798 int
799 vls_write_msg (vls_handle_t vlsh, void *buf, size_t nbytes)
800 {
801   vcl_locked_session_t *vls;
802   int rv;
803
804   if (!(vls = vls_get_w_dlock (vlsh)))
805     return VPPCOM_EBADFD;
806   vls_mt_guard (vls, VLS_MT_OP_WRITE);
807   rv = vppcom_session_write_msg (vls_to_sh_tu (vls), buf, nbytes);
808   vls_mt_unguard ();
809   vls_get_and_unlock (vlsh);
810   return rv;
811 }
812
813 int
814 vls_sendto (vls_handle_t vlsh, void *buf, int buflen, int flags,
815             vppcom_endpt_t * ep)
816 {
817   vcl_locked_session_t *vls;
818   int rv;
819
820   if (!(vls = vls_get_w_dlock (vlsh)))
821     return VPPCOM_EBADFD;
822   vls_mt_guard (vls, VLS_MT_OP_WRITE);
823   rv = vppcom_session_sendto (vls_to_sh_tu (vls), buf, buflen, flags, ep);
824   vls_mt_unguard ();
825   vls_get_and_unlock (vlsh);
826   return rv;
827 }
828
829 ssize_t
830 vls_read (vls_handle_t vlsh, void *buf, size_t nbytes)
831 {
832   vcl_locked_session_t *vls;
833   int rv;
834
835   if (!(vls = vls_get_w_dlock (vlsh)))
836     return VPPCOM_EBADFD;
837   vls_mt_guard (vls, VLS_MT_OP_READ);
838   rv = vppcom_session_read (vls_to_sh_tu (vls), buf, nbytes);
839   vls_mt_unguard ();
840   vls_get_and_unlock (vlsh);
841   return rv;
842 }
843
844 ssize_t
845 vls_recvfrom (vls_handle_t vlsh, void *buffer, uint32_t buflen, int flags,
846               vppcom_endpt_t * ep)
847 {
848   vcl_locked_session_t *vls;
849   int rv;
850
851   if (!(vls = vls_get_w_dlock (vlsh)))
852     return VPPCOM_EBADFD;
853   vls_mt_guard (vls, VLS_MT_OP_READ);
854   rv = vppcom_session_recvfrom (vls_to_sh_tu (vls), buffer, buflen, flags,
855                                 ep);
856   vls_mt_unguard ();
857   vls_get_and_unlock (vlsh);
858   return rv;
859 }
860
861 int
862 vls_attr (vls_handle_t vlsh, uint32_t op, void *buffer, uint32_t * buflen)
863 {
864   vcl_locked_session_t *vls;
865   int rv;
866
867   if (PREDICT_FALSE (vcl_get_worker_index () == ~0))
868     vls_mt_add ();
869
870   if (!(vls = vls_get_w_dlock (vlsh)))
871     return VPPCOM_EBADFD;
872   rv = vppcom_session_attr (vls_to_sh_tu (vls), op, buffer, buflen);
873   vls_get_and_unlock (vlsh);
874   return rv;
875 }
876
877 int
878 vls_bind (vls_handle_t vlsh, vppcom_endpt_t * ep)
879 {
880   vcl_locked_session_t *vls;
881   int rv;
882
883   if (!(vls = vls_get_w_dlock (vlsh)))
884     return VPPCOM_EBADFD;
885   rv = vppcom_session_bind (vls_to_sh_tu (vls), ep);
886   vls_get_and_unlock (vlsh);
887   return rv;
888 }
889
890 int
891 vls_listen (vls_handle_t vlsh, int q_len)
892 {
893   vcl_locked_session_t *vls;
894   int rv;
895
896   if (!(vls = vls_get_w_dlock (vlsh)))
897     return VPPCOM_EBADFD;
898   vls_mt_guard (vls, VLS_MT_OP_XPOLL);
899   rv = vppcom_session_listen (vls_to_sh_tu (vls), q_len);
900   vls_mt_unguard ();
901   vls_get_and_unlock (vlsh);
902   return rv;
903 }
904
905 int
906 vls_connect (vls_handle_t vlsh, vppcom_endpt_t * server_ep)
907 {
908   vcl_locked_session_t *vls;
909   int rv;
910
911   if (!(vls = vls_get_w_dlock (vlsh)))
912     return VPPCOM_EBADFD;
913   vls_mt_guard (vls, VLS_MT_OP_XPOLL);
914   rv = vppcom_session_connect (vls_to_sh_tu (vls), server_ep);
915   vls_mt_unguard ();
916   vls_get_and_unlock (vlsh);
917   return rv;
918 }
919
920 static inline void
921 vls_mp_checks (vcl_locked_session_t * vls, int is_add)
922 {
923   vcl_worker_t *wrk = vcl_worker_get_current ();
924   vcl_session_t *s;
925   u32 owner_wrk;
926
927   s = vcl_session_get (wrk, vls->session_index);
928   switch (s->session_state)
929     {
930     case STATE_LISTEN:
931       if (is_add)
932         {
933           vls_listener_wrk_set (vls, vls->worker_index, 1 /* is_active */ );
934           break;
935         }
936       vls_listener_wrk_stop_listen (vls, vls->worker_index);
937       break;
938     case STATE_LISTEN_NO_MQ:
939       if (!is_add)
940         break;
941
942       /* Register worker as listener */
943       vls_listener_wrk_start_listen (vls, wrk->wrk_index);
944
945       /* If owner worker did not attempt to accept/xpoll on the session,
946        * force a listen stop for it, since it may not be interested in
947        * accepting new sessions.
948        * This is pretty much a hack done to give app workers the illusion
949        * that it is fine to listen and not accept new sessions for a
950        * given listener. Without it, we would accumulate unhandled
951        * accepts on the passive worker message queue. */
952       owner_wrk = vls_shared_get_owner (vls);
953       if (!vls_listener_wrk_is_active (vls, owner_wrk))
954         vls_listener_wrk_stop_listen (vls, owner_wrk);
955       break;
956     default:
957       break;
958     }
959 }
960
961 vls_handle_t
962 vls_accept (vls_handle_t listener_vlsh, vppcom_endpt_t * ep, int flags)
963 {
964   vls_handle_t accepted_vlsh;
965   vcl_locked_session_t *vls;
966   int sh;
967
968   if (!(vls = vls_get_w_dlock (listener_vlsh)))
969     return VPPCOM_EBADFD;
970   if (vcl_n_workers () > 1)
971     vls_mp_checks (vls, 1 /* is_add */ );
972   vls_mt_guard (vls, VLS_MT_OP_SPOOL);
973   sh = vppcom_session_accept (vls_to_sh_tu (vls), ep, flags);
974   vls_mt_unguard ();
975   vls_get_and_unlock (listener_vlsh);
976   if (sh < 0)
977     return sh;
978   accepted_vlsh = vls_alloc (sh);
979   if (PREDICT_FALSE (accepted_vlsh == VLS_INVALID_HANDLE))
980     vppcom_session_close (sh);
981   return accepted_vlsh;
982 }
983
984 vls_handle_t
985 vls_create (uint8_t proto, uint8_t is_nonblocking)
986 {
987   vcl_session_handle_t sh;
988   vls_handle_t vlsh;
989
990   vls_mt_guard (0, VLS_MT_OP_SPOOL);
991   sh = vppcom_session_create (proto, is_nonblocking);
992   vls_mt_unguard ();
993   if (sh == INVALID_SESSION_ID)
994     return VLS_INVALID_HANDLE;
995
996   vlsh = vls_alloc (sh);
997   if (PREDICT_FALSE (vlsh == VLS_INVALID_HANDLE))
998     vppcom_session_close (sh);
999
1000   return vlsh;
1001 }
1002
1003 int
1004 vls_close (vls_handle_t vlsh)
1005 {
1006   vcl_locked_session_t *vls;
1007   int rv;
1008
1009   vls_table_wlock ();
1010
1011   vls = vls_get_and_lock (vlsh);
1012   if (!vls)
1013     {
1014       vls_table_wunlock ();
1015       return VPPCOM_EBADFD;
1016     }
1017
1018   vls_mt_guard (0, VLS_MT_OP_SPOOL);
1019
1020   if (vls_is_shared (vls))
1021     rv = vls_unshare_session (vls, vcl_worker_get_current ());
1022   else
1023     rv = vppcom_session_close (vls_to_sh (vls));
1024
1025   vls_free (vls);
1026   vls_mt_unguard ();
1027
1028   vls_table_wunlock ();
1029
1030   return rv;
1031 }
1032
1033 vls_handle_t
1034 vls_epoll_create (void)
1035 {
1036   vcl_session_handle_t sh;
1037   vls_handle_t vlsh;
1038
1039   if (PREDICT_FALSE (vcl_get_worker_index () == ~0))
1040     vls_mt_add ();
1041
1042   sh = vppcom_epoll_create ();
1043   if (sh == INVALID_SESSION_ID)
1044     return VLS_INVALID_HANDLE;
1045
1046   vlsh = vls_alloc (sh);
1047   if (vlsh == VLS_INVALID_HANDLE)
1048     vppcom_session_close (sh);
1049
1050   return vlsh;
1051 }
1052
1053 static void
1054 vls_epoll_ctl_mp_checks (vcl_locked_session_t * vls, int op)
1055 {
1056   if (vcl_n_workers () <= 1)
1057     {
1058       vlsl->epoll_mp_check = 1;
1059       return;
1060     }
1061
1062   if (op == EPOLL_CTL_MOD)
1063     return;
1064
1065   vlsl->epoll_mp_check = 1;
1066   vls_mp_checks (vls, op == EPOLL_CTL_ADD);
1067 }
1068
1069 int
1070 vls_epoll_ctl (vls_handle_t ep_vlsh, int op, vls_handle_t vlsh,
1071                struct epoll_event *event)
1072 {
1073   vcl_locked_session_t *ep_vls, *vls;
1074   vcl_session_handle_t ep_sh, sh;
1075   int rv;
1076
1077   vls_table_rlock ();
1078   ep_vls = vls_get_and_lock (ep_vlsh);
1079   vls = vls_get_and_lock (vlsh);
1080   ep_sh = vls_to_sh (ep_vls);
1081   sh = vls_to_sh (vls);
1082
1083   if (PREDICT_FALSE (!vlsl->epoll_mp_check))
1084     vls_epoll_ctl_mp_checks (vls, op);
1085
1086   vls_table_runlock ();
1087
1088   rv = vppcom_epoll_ctl (ep_sh, op, sh, event);
1089
1090   vls_table_rlock ();
1091   ep_vls = vls_get (ep_vlsh);
1092   vls = vls_get (vlsh);
1093   vls_unlock (vls);
1094   vls_unlock (ep_vls);
1095   vls_table_runlock ();
1096   return rv;
1097 }
1098
1099 int
1100 vls_epoll_wait (vls_handle_t ep_vlsh, struct epoll_event *events,
1101                 int maxevents, double wait_for_time)
1102 {
1103   vcl_locked_session_t *vls;
1104   int rv;
1105
1106   if (!(vls = vls_get_w_dlock (ep_vlsh)))
1107     return VPPCOM_EBADFD;
1108   vls_mt_guard (0, VLS_MT_OP_XPOLL);
1109   rv = vppcom_epoll_wait (vls_to_sh_tu (vls), events, maxevents,
1110                           wait_for_time);
1111   vls_mt_unguard ();
1112   vls_get_and_unlock (ep_vlsh);
1113   return rv;
1114 }
1115
1116 static void
1117 vls_select_mp_checks (vcl_si_set * read_map)
1118 {
1119   vcl_locked_session_t *vls;
1120   vcl_worker_t *wrk;
1121   vcl_session_t *s;
1122   u32 si;
1123
1124   if (vcl_n_workers () <= 1)
1125     {
1126       vlsl->select_mp_check = 1;
1127       return;
1128     }
1129
1130   if (!read_map)
1131     return;
1132
1133   vlsl->select_mp_check = 1;
1134   wrk = vcl_worker_get_current ();
1135
1136   /* *INDENT-OFF* */
1137   clib_bitmap_foreach (si, read_map, ({
1138     s = vcl_session_get (wrk, si);
1139     if (s->session_state == STATE_LISTEN)
1140       {
1141         vls = vls_get (vls_session_index_to_vlsh (si));
1142         vls_mp_checks (vls, 1 /* is_add */);
1143       }
1144   }));
1145   /* *INDENT-ON* */
1146 }
1147
1148 int
1149 vls_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map,
1150             vcl_si_set * except_map, double wait_for_time)
1151 {
1152   int rv;
1153
1154   vls_mt_guard (0, VLS_MT_OP_XPOLL);
1155   if (PREDICT_FALSE (!vlsl->select_mp_check))
1156     vls_select_mp_checks (read_map);
1157   rv = vppcom_select (n_bits, read_map, write_map, except_map, wait_for_time);
1158   vls_mt_unguard ();
1159   return rv;
1160 }
1161
1162 static void
1163 vls_unshare_vcl_worker_sessions (vcl_worker_t * wrk)
1164 {
1165   u32 current_wrk, is_current;
1166   vcl_locked_session_t *vls;
1167   vcl_session_t *s;
1168
1169   if (pool_elts (vcm->workers) <= 1)
1170     return;
1171
1172   current_wrk = vcl_get_worker_index ();
1173   is_current = current_wrk == wrk->wrk_index;
1174
1175   /* *INDENT-OFF* */
1176   pool_foreach (s, wrk->sessions, ({
1177     vls = vls_get (vls_si_to_vlsh (s->session_index));
1178     if (vls && (is_current || vls_is_shared_by_wrk (vls, current_wrk)))
1179       vls_unshare_session (vls, wrk);
1180   }));
1181   /* *INDENT-ON* */
1182 }
1183
1184 static void
1185 vls_cleanup_vcl_worker (vcl_worker_t * wrk)
1186 {
1187   vls_worker_t *vls_wrk = vls_worker_get (wrk->wrk_index);
1188
1189   /* Unshare sessions and also cleanup worker since child may have
1190    * called _exit () and therefore vcl may not catch the event */
1191   vls_unshare_vcl_worker_sessions (wrk);
1192   vcl_worker_cleanup (wrk, 1 /* notify vpp */ );
1193
1194   vls_worker_free (vls_wrk);
1195 }
1196
1197 static void
1198 vls_cleanup_forked_child (vcl_worker_t * wrk, vcl_worker_t * child_wrk)
1199 {
1200   vcl_worker_t *sub_child;
1201   int tries = 0;
1202
1203   if (child_wrk->forked_child != ~0)
1204     {
1205       sub_child = vcl_worker_get_if_valid (child_wrk->forked_child);
1206       if (sub_child)
1207         {
1208           /* Wait a bit, maybe the process is going away */
1209           while (kill (sub_child->current_pid, 0) >= 0 && tries++ < 50)
1210             usleep (1e3);
1211           if (kill (sub_child->current_pid, 0) < 0)
1212             vls_cleanup_forked_child (child_wrk, sub_child);
1213         }
1214     }
1215   vls_cleanup_vcl_worker (child_wrk);
1216   VDBG (0, "Cleaned up forked child wrk %u", child_wrk->wrk_index);
1217   wrk->forked_child = ~0;
1218 }
1219
1220 static struct sigaction old_sa;
1221
1222 static void
1223 vls_intercept_sigchld_handler (int signum, siginfo_t * si, void *uc)
1224 {
1225   vcl_worker_t *wrk, *child_wrk;
1226
1227   if (vcl_get_worker_index () == ~0)
1228     return;
1229
1230   if (sigaction (SIGCHLD, &old_sa, 0))
1231     {
1232       VERR ("couldn't restore sigchld");
1233       exit (-1);
1234     }
1235
1236   wrk = vcl_worker_get_current ();
1237   if (wrk->forked_child == ~0)
1238     return;
1239
1240   child_wrk = vcl_worker_get_if_valid (wrk->forked_child);
1241   if (!child_wrk)
1242     goto done;
1243
1244   if (si && si->si_pid != child_wrk->current_pid)
1245     {
1246       VDBG (0, "unexpected child pid %u", si->si_pid);
1247       goto done;
1248     }
1249   vls_cleanup_forked_child (wrk, child_wrk);
1250
1251 done:
1252   if (old_sa.sa_flags & SA_SIGINFO)
1253     {
1254       void (*fn) (int, siginfo_t *, void *) = old_sa.sa_sigaction;
1255       fn (signum, si, uc);
1256     }
1257   else
1258     {
1259       void (*fn) (int) = old_sa.sa_handler;
1260       if (fn)
1261         fn (signum);
1262     }
1263 }
1264
1265 static void
1266 vls_incercept_sigchld ()
1267 {
1268   struct sigaction sa;
1269   clib_memset (&sa, 0, sizeof (sa));
1270   sa.sa_sigaction = vls_intercept_sigchld_handler;
1271   sa.sa_flags = SA_SIGINFO;
1272   if (sigaction (SIGCHLD, &sa, &old_sa))
1273     {
1274       VERR ("couldn't intercept sigchld");
1275       exit (-1);
1276     }
1277 }
1278
1279 static void
1280 vls_app_pre_fork (void)
1281 {
1282   vls_incercept_sigchld ();
1283   vcl_flush_mq_events ();
1284 }
1285
1286 static void
1287 vls_app_fork_child_handler (void)
1288 {
1289   vcl_worker_t *parent_wrk;
1290   int rv, parent_wrk_index;
1291   u8 *child_name;
1292
1293   parent_wrk_index = vcl_get_worker_index ();
1294   VDBG (0, "initializing forked child %u with parent wrk %u", getpid (),
1295         parent_wrk_index);
1296
1297   /*
1298    * Allocate worker vcl
1299    */
1300   vcl_set_worker_index (~0);
1301   if (!vcl_worker_alloc_and_init ())
1302     VERR ("couldn't allocate new worker");
1303
1304   /*
1305    * Attach to binary api
1306    */
1307   child_name = format (0, "%v-child-%u%c", vcm->app_name, getpid (), 0);
1308   vcl_cleanup_bapi ();
1309   vppcom_api_hookup ();
1310   vcm->app_state = STATE_APP_START;
1311   rv = vppcom_connect_to_vpp ((char *) child_name);
1312   vec_free (child_name);
1313   if (rv)
1314     {
1315       VERR ("couldn't connect to VPP!");
1316       return;
1317     }
1318
1319   /*
1320    * Allocate/initialize vls worker
1321    */
1322   vls_worker_alloc ();
1323
1324   /*
1325    * Register worker with vpp and share sessions
1326    */
1327   vcl_worker_register_with_vpp ();
1328   parent_wrk = vcl_worker_get (parent_wrk_index);
1329   vls_worker_copy_on_fork (parent_wrk);
1330   parent_wrk->forked_child = vcl_get_worker_index ();
1331
1332   /* Reset number of threads and set wrk index */
1333   vlsl->vls_mt_n_threads = 0;
1334   vlsl->vls_wrk_index = vcl_get_worker_index ();
1335   vlsl->select_mp_check = 0;
1336   vlsl->epoll_mp_check = 0;
1337   vls_mt_locks_init ();
1338
1339   VDBG (0, "forked child main worker initialized");
1340   vcm->forking = 0;
1341 }
1342
1343 static void
1344 vls_app_fork_parent_handler (void)
1345 {
1346   vcm->forking = 1;
1347   while (vcm->forking)
1348     ;
1349 }
1350
1351 void
1352 vls_app_exit (void)
1353 {
1354   vls_worker_t *wrk = vls_worker_get_current ();
1355
1356   /* Unshare the sessions. VCL will clean up the worker */
1357   vls_unshare_vcl_worker_sessions (vcl_worker_get_current ());
1358   vls_worker_free (wrk);
1359 }
1360
1361 int
1362 vls_app_create (char *app_name)
1363 {
1364   int rv;
1365
1366   if ((rv = vppcom_app_create (app_name)))
1367     return rv;
1368
1369   vlsm = clib_mem_alloc (sizeof (vls_main_t));
1370   clib_memset (vlsm, 0, sizeof (*vlsm));
1371   clib_rwlock_init (&vlsm->vls_table_lock);
1372   clib_rwlock_init (&vlsm->shared_data_lock);
1373   pool_alloc (vlsm->workers, vcm->cfg.max_workers);
1374
1375   pthread_atfork (vls_app_pre_fork, vls_app_fork_parent_handler,
1376                   vls_app_fork_child_handler);
1377   atexit (vls_app_exit);
1378   vls_worker_alloc ();
1379   vlsl->vls_wrk_index = vcl_get_worker_index ();
1380   vls_mt_locks_init ();
1381   return VPPCOM_OK;
1382 }
1383
1384 /*
1385  * fd.io coding-style-patch-verification: ON
1386  *
1387  * Local Variables:
1388  * eval: (c-set-style "gnu")
1389  * End:
1390  */