2 * Copyright (c) 2019 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
16 #include <vcl/vcl_locked.h>
17 #include <vcl/vcl_private.h>
19 typedef struct vls_shared_data_
23 u32 *workers_subscribed;
24 clib_bitmap_t *listeners;
27 typedef struct vcl_locked_session_
33 u32 shared_data_index;
34 } vcl_locked_session_t;
36 typedef struct vls_worker_
38 vcl_locked_session_t *vls_pool;
39 uword *session_index_to_vlsh_table;
43 typedef struct vls_local_
46 volatile int vls_mt_n_threads;
47 pthread_mutex_t vls_mt_mq_mlock;
48 pthread_mutex_t vls_mt_spool_mlock;
49 volatile u8 select_mp_check;
50 volatile u8 epoll_mp_check;
51 } vls_process_local_t;
53 static vls_process_local_t vls_local;
54 static vls_process_local_t *vlsl = &vls_local;
56 typedef struct vls_main_
58 vls_worker_t *workers;
59 clib_rwlock_t vls_table_lock;
60 /** Pool of data shared by sessions owned by different workers */
61 vls_shared_data_t *shared_data_pool;
62 clib_rwlock_t shared_data_lock;
68 vls_get_worker_index (void)
70 return vcl_get_worker_index ();
74 vls_shared_data_alloc (void)
76 vls_shared_data_t *vls_shd;
79 clib_rwlock_writer_lock (&vlsm->shared_data_lock);
80 pool_get_zero (vlsm->shared_data_pool, vls_shd);
81 clib_spinlock_init (&vls_shd->lock);
82 shd_index = vls_shd - vlsm->shared_data_pool;
83 clib_rwlock_writer_unlock (&vlsm->shared_data_lock);
89 vls_shared_data_index (vls_shared_data_t * vls_shd)
91 return vls_shd - vlsm->shared_data_pool;
95 vls_shared_data_get (u32 shd_index)
97 if (pool_is_free_index (vlsm->shared_data_pool, shd_index))
99 return pool_elt_at_index (vlsm->shared_data_pool, shd_index);
103 vls_shared_data_free (u32 shd_index)
105 vls_shared_data_t *vls_shd;
107 clib_rwlock_writer_lock (&vlsm->shared_data_lock);
108 vls_shd = vls_shared_data_get (shd_index);
109 clib_spinlock_free (&vls_shd->lock);
110 clib_bitmap_free (vls_shd->listeners);
111 vec_free (vls_shd->workers_subscribed);
112 pool_put (vlsm->shared_data_pool, vls_shd);
113 clib_rwlock_writer_unlock (&vlsm->shared_data_lock);
117 vls_shared_data_pool_rlock (void)
119 clib_rwlock_reader_lock (&vlsm->shared_data_lock);
123 vls_shared_data_pool_runlock (void)
125 clib_rwlock_reader_unlock (&vlsm->shared_data_lock);
129 vls_table_rlock (void)
131 if (vlsl->vls_mt_n_threads > 1)
132 clib_rwlock_reader_lock (&vlsm->vls_table_lock);
136 vls_table_runlock (void)
138 if (vlsl->vls_mt_n_threads > 1)
139 clib_rwlock_reader_unlock (&vlsm->vls_table_lock);
143 vls_table_wlock (void)
145 if (vlsl->vls_mt_n_threads > 1)
146 clib_rwlock_writer_lock (&vlsm->vls_table_lock);
150 vls_table_wunlock (void)
152 if (vlsl->vls_mt_n_threads > 1)
153 clib_rwlock_writer_unlock (&vlsm->vls_table_lock);
166 VLS_MT_LOCK_MQ = 1 << 0,
167 VLS_MT_LOCK_SPOOL = 1 << 1
168 } vls_mt_lock_type_t;
173 vlsl->vls_mt_n_threads += 1;
174 vcl_set_worker_index (vlsl->vls_wrk_index);
178 vls_mt_mq_lock (void)
180 pthread_mutex_lock (&vlsl->vls_mt_mq_mlock);
184 vls_mt_mq_unlock (void)
186 pthread_mutex_unlock (&vlsl->vls_mt_mq_mlock);
190 vls_mt_spool_lock (void)
192 pthread_mutex_lock (&vlsl->vls_mt_spool_mlock);
196 vls_mt_create_unlock (void)
198 pthread_mutex_unlock (&vlsl->vls_mt_spool_mlock);
202 vls_mt_locks_init (void)
204 pthread_mutex_init (&vlsl->vls_mt_mq_mlock, NULL);
205 pthread_mutex_init (&vlsl->vls_mt_spool_mlock, NULL);
209 vls_is_shared (vcl_locked_session_t * vls)
211 return (vls->shared_data_index != ~0);
215 vls_lock (vcl_locked_session_t * vls)
217 if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls))
218 clib_spinlock_lock (&vls->lock);
222 vls_unlock (vcl_locked_session_t * vls)
224 if ((vlsl->vls_mt_n_threads > 1) || vls_is_shared (vls))
225 clib_spinlock_unlock (&vls->lock);
228 static inline vcl_session_handle_t
229 vls_to_sh (vcl_locked_session_t * vls)
231 return vcl_session_handle_from_index (vls->session_index);
234 static inline vcl_session_handle_t
235 vls_to_sh_tu (vcl_locked_session_t * vls)
237 vcl_session_handle_t sh;
238 sh = vls_to_sh (vls);
239 vls_table_runlock ();
243 static vls_worker_t *
244 vls_worker_get_current (void)
246 return pool_elt_at_index (vlsm->workers, vls_get_worker_index ());
250 vls_worker_alloc (void)
254 pool_get_zero (vlsm->workers, wrk);
255 wrk->wrk_index = vcl_get_worker_index ();
259 vls_worker_free (vls_worker_t * wrk)
261 hash_free (wrk->session_index_to_vlsh_table);
262 pool_free (wrk->vls_pool);
263 pool_put (vlsm->workers, wrk);
266 static vls_worker_t *
267 vls_worker_get (u32 wrk_index)
269 if (pool_is_free_index (vlsm->workers, wrk_index))
271 return pool_elt_at_index (vlsm->workers, wrk_index);
275 vls_alloc (vcl_session_handle_t sh)
277 vls_worker_t *wrk = vls_worker_get_current ();
278 vcl_locked_session_t *vls;
282 pool_get_zero (wrk->vls_pool, vls);
283 vls->session_index = vppcom_session_index (sh);
284 vls->worker_index = vppcom_session_worker (sh);
285 vls->vls_index = vls - wrk->vls_pool;
286 vls->shared_data_index = ~0;
287 hash_set (wrk->session_index_to_vlsh_table, vls->session_index,
289 clib_spinlock_init (&vls->lock);
291 vls_table_wunlock ();
292 return vls->vls_index;
295 static vcl_locked_session_t *
296 vls_get (vls_handle_t vlsh)
298 vls_worker_t *wrk = vls_worker_get_current ();
299 if (pool_is_free_index (wrk->vls_pool, vlsh))
301 return pool_elt_at_index (wrk->vls_pool, vlsh);
305 vls_free (vcl_locked_session_t * vls)
307 vls_worker_t *wrk = vls_worker_get_current ();
310 hash_unset (wrk->session_index_to_vlsh_table, vls->session_index);
311 clib_spinlock_free (&vls->lock);
312 pool_put (wrk->vls_pool, vls);
315 static vcl_locked_session_t *
316 vls_get_and_lock (vls_handle_t vlsh)
318 vls_worker_t *wrk = vls_worker_get_current ();
319 vcl_locked_session_t *vls;
320 if (pool_is_free_index (wrk->vls_pool, vlsh))
322 vls = pool_elt_at_index (wrk->vls_pool, vlsh);
327 static vcl_locked_session_t *
328 vls_get_w_dlock (vls_handle_t vlsh)
330 vcl_locked_session_t *vls;
332 vls = vls_get_and_lock (vlsh);
334 vls_table_runlock ();
339 vls_get_and_unlock (vls_handle_t vlsh)
341 vcl_locked_session_t *vls;
343 vls = vls_get (vlsh);
345 vls_table_runlock ();
349 vls_dunlock (vcl_locked_session_t * vls)
352 vls_table_runlock ();
355 static vcl_locked_session_t *
356 vls_session_get (vls_worker_t * wrk, u32 vls_index)
358 if (pool_is_free_index (wrk->vls_pool, vls_index))
360 return pool_elt_at_index (wrk->vls_pool, vls_index);
364 vlsh_to_sh (vls_handle_t vlsh)
366 vcl_locked_session_t *vls;
369 vls = vls_get_w_dlock (vlsh);
371 return INVALID_SESSION_ID;
372 rv = vls_to_sh (vls);
378 vlsh_to_session_index (vls_handle_t vlsh)
380 vcl_session_handle_t sh;
381 sh = vlsh_to_sh (vlsh);
382 return vppcom_session_index (sh);
386 vls_si_to_vlsh (u32 session_index)
388 vls_worker_t *wrk = vls_worker_get_current ();
390 vlshp = hash_get (wrk->session_index_to_vlsh_table, session_index);
391 return vlshp ? *vlshp : VLS_INVALID_HANDLE;
395 vls_session_index_to_vlsh (uint32_t session_index)
400 vlsh = vls_si_to_vlsh (session_index);
401 vls_table_runlock ();
407 vls_is_shared_by_wrk (vcl_locked_session_t * vls, u32 wrk_index)
409 vls_shared_data_t *vls_shd;
412 if (vls->shared_data_index == ~0)
415 vls_shared_data_pool_rlock ();
417 vls_shd = vls_shared_data_get (vls->shared_data_index);
418 clib_spinlock_lock (&vls_shd->lock);
420 for (i = 0; i < vec_len (vls_shd->workers_subscribed); i++)
421 if (vls_shd->workers_subscribed[i] == wrk_index)
423 clib_spinlock_unlock (&vls_shd->lock);
424 vls_shared_data_pool_runlock ();
427 clib_spinlock_unlock (&vls_shd->lock);
429 vls_shared_data_pool_runlock ();
434 vls_listener_wrk_set (vcl_locked_session_t * vls, u32 wrk_index, u8 is_active)
436 vls_shared_data_t *vls_shd;
438 if (vls->shared_data_index == ~0)
440 clib_warning ("not a shared session");
444 vls_shared_data_pool_rlock ();
446 vls_shd = vls_shared_data_get (vls->shared_data_index);
448 clib_spinlock_lock (&vls_shd->lock);
449 clib_bitmap_set (vls_shd->listeners, wrk_index, is_active);
450 clib_spinlock_unlock (&vls_shd->lock);
452 vls_shared_data_pool_runlock ();
456 vls_shared_get_owner (vcl_locked_session_t * vls)
458 vls_shared_data_t *vls_shd;
461 vls_shared_data_pool_rlock ();
463 vls_shd = vls_shared_data_get (vls->shared_data_index);
464 owner_wrk = vls_shd->owner_wrk_index;
466 vls_shared_data_pool_runlock ();
472 vls_listener_wrk_is_active (vcl_locked_session_t * vls, u32 wrk_index)
474 vls_shared_data_t *vls_shd;
477 if (vls->shared_data_index == ~0)
479 clib_warning ("not a shared session");
483 vls_shared_data_pool_rlock ();
485 vls_shd = vls_shared_data_get (vls->shared_data_index);
487 clib_spinlock_lock (&vls_shd->lock);
488 is_set = clib_bitmap_get (vls_shd->listeners, wrk_index);
489 clib_spinlock_unlock (&vls_shd->lock);
491 vls_shared_data_pool_runlock ();
493 return (is_set == 1);
497 vls_listener_wrk_start_listen (vcl_locked_session_t * vls, u32 wrk_index)
499 vppcom_session_listen (vls_to_sh (vls), ~0);
500 vls_listener_wrk_set (vls, wrk_index, 1 /* is_active */ );
504 vls_listener_wrk_stop_listen (vcl_locked_session_t * vls, u32 wrk_index)
509 wrk = vcl_worker_get (wrk_index);
510 s = vcl_session_get (wrk, vls->session_index);
511 if (s->session_state != STATE_LISTEN)
513 vcl_send_session_unlisten (wrk, s);
514 s->session_state = STATE_LISTEN_NO_MQ;
515 vls_listener_wrk_set (vls, wrk_index, 0 /* is_active */ );
519 vls_shared_data_subscriber_position (vls_shared_data_t * vls_shd,
524 for (i = 0; i < vec_len (vls_shd->workers_subscribed); i++)
526 if (vls_shd->workers_subscribed[i] == wrk_index)
533 vls_unshare_session (vcl_locked_session_t * vls, vcl_worker_t * wrk)
535 vls_shared_data_t *vls_shd;
536 int do_disconnect, pos;
540 ASSERT (vls->shared_data_index != ~0);
542 s = vcl_session_get (wrk, vls->session_index);
543 if (s->session_state == STATE_LISTEN)
544 vls_listener_wrk_set (vls, wrk->wrk_index, 0 /* is_active */ );
546 vls_shared_data_pool_rlock ();
548 vls_shd = vls_shared_data_get (vls->shared_data_index);
549 clib_spinlock_lock (&vls_shd->lock);
551 pos = vls_shared_data_subscriber_position (vls_shd, wrk->wrk_index);
554 clib_warning ("worker %u not subscribed for vls %u", wrk->wrk_index,
560 * Unsubscribe from share data and fifos
564 svm_fifo_del_subscriber (s->rx_fifo, wrk->vpp_wrk_index);
565 svm_fifo_del_subscriber (s->tx_fifo, wrk->vpp_wrk_index);
567 vec_del1 (vls_shd->workers_subscribed, pos);
572 n_subscribers = vec_len (vls_shd->workers_subscribed);
573 do_disconnect = s->session_state == STATE_LISTEN || !n_subscribers;
574 vcl_session_cleanup (wrk, s, vcl_session_handle (s), do_disconnect);
577 * No subscriber left, cleanup shared data
581 u32 shd_index = vls_shared_data_index (vls_shd);
583 clib_spinlock_unlock (&vls_shd->lock);
584 vls_shared_data_pool_runlock ();
586 vls_shared_data_free (shd_index);
588 /* All locks have been dropped */
592 /* Return, if this is not the owning worker */
593 if (vls_shd->owner_wrk_index != wrk->wrk_index)
596 ASSERT (vec_len (vls_shd->workers_subscribed));
599 * Check if we can change owner or close
601 vls_shd->owner_wrk_index = vls_shd->workers_subscribed[0];
602 vcl_send_session_worker_update (wrk, s, vls_shd->owner_wrk_index);
604 /* XXX is this still needed? */
605 if (vec_len (vls_shd->workers_subscribed) > 1)
606 clib_warning ("more workers need to be updated");
610 clib_spinlock_unlock (&vls_shd->lock);
611 vls_shared_data_pool_runlock ();
617 vls_share_session (vcl_locked_session_t * vls, vls_worker_t * vls_wrk,
618 vls_worker_t * vls_parent_wrk, vcl_worker_t * vcl_wrk)
620 vcl_locked_session_t *parent_vls;
621 vls_shared_data_t *vls_shd;
624 s = vcl_session_get (vcl_wrk, vls->session_index);
627 clib_warning ("wrk %u parent %u session %u vls %u NOT AVAILABLE",
628 vcl_wrk->wrk_index, vls_parent_wrk->wrk_index,
629 vls->session_index, vls->vls_index);
633 /* Reinit session lock */
634 clib_spinlock_init (&vls->lock);
636 if (vls->shared_data_index != ~0)
638 vls_shared_data_pool_rlock ();
639 vls_shd = vls_shared_data_get (vls->shared_data_index);
643 u32 vls_shd_index = vls_shared_data_alloc ();
645 vls_shared_data_pool_rlock ();
647 vls_shd = vls_shared_data_get (vls_shd_index);
648 vls_shd->owner_wrk_index = vls_parent_wrk->wrk_index;
649 vls->shared_data_index = vls_shd_index;
651 /* Update parent shared data */
652 parent_vls = vls_session_get (vls_parent_wrk, vls->vls_index);
653 parent_vls->shared_data_index = vls_shd_index;
654 vec_add1 (vls_shd->workers_subscribed, vls_parent_wrk->wrk_index);
657 clib_spinlock_lock (&vls_shd->lock);
659 vec_add1 (vls_shd->workers_subscribed, vls_wrk->wrk_index);
661 clib_spinlock_unlock (&vls_shd->lock);
662 vls_shared_data_pool_runlock ();
666 svm_fifo_add_subscriber (s->rx_fifo, vcl_wrk->vpp_wrk_index);
667 svm_fifo_add_subscriber (s->tx_fifo, vcl_wrk->vpp_wrk_index);
669 else if (s->session_state == STATE_LISTEN)
671 s->session_state = STATE_LISTEN_NO_MQ;
676 vls_share_sessions (vls_worker_t * vls_parent_wrk, vls_worker_t * vls_wrk)
678 vcl_worker_t *vcl_wrk = vcl_worker_get (vls_wrk->wrk_index);
679 vcl_locked_session_t *vls;
682 pool_foreach (vls, vls_wrk->vls_pool, ({
683 vls_share_session (vls, vls_wrk, vls_parent_wrk, vcl_wrk);
689 vls_worker_copy_on_fork (vcl_worker_t * parent_wrk)
691 vls_worker_t *vls_wrk = vls_worker_get_current (), *vls_parent_wrk;
692 vcl_worker_t *wrk = vcl_worker_get_current ();
697 wrk->vpp_event_queues = vec_dup (parent_wrk->vpp_event_queues);
698 wrk->sessions = pool_dup (parent_wrk->sessions);
699 wrk->session_index_by_vpp_handles =
700 hash_dup (parent_wrk->session_index_by_vpp_handles);
705 vls_parent_wrk = vls_worker_get (parent_wrk->wrk_index);
706 vls_wrk->session_index_to_vlsh_table =
707 hash_dup (vls_parent_wrk->session_index_to_vlsh_table);
708 vls_wrk->vls_pool = pool_dup (vls_parent_wrk->vls_pool);
710 vls_share_sessions (vls_parent_wrk, vls_wrk);
714 vls_mt_acq_locks (vcl_locked_session_t * vls, vls_mt_ops_t op, int *locks_acq)
716 vcl_worker_t *wrk = vcl_worker_get_current ();
717 vcl_session_t *s = 0;
722 s = vcl_session_get (wrk, vls->session_index);
723 if (PREDICT_FALSE (!s))
725 is_nonblk = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
732 is_nonblk = vcl_session_read_ready (s) != 0;
736 *locks_acq |= VLS_MT_LOCK_MQ;
739 case VLS_MT_OP_WRITE:
742 is_nonblk = vcl_session_write_ready (s) != 0;
746 *locks_acq |= VLS_MT_LOCK_MQ;
749 case VLS_MT_OP_XPOLL:
751 *locks_acq |= VLS_MT_LOCK_MQ;
753 case VLS_MT_OP_SPOOL:
754 vls_mt_spool_lock ();
755 *locks_acq |= VLS_MT_LOCK_SPOOL;
763 vls_mt_rel_locks (int locks_acq)
765 if (locks_acq & VLS_MT_LOCK_MQ)
767 if (locks_acq & VLS_MT_LOCK_SPOOL)
768 vls_mt_create_unlock ();
771 #define vls_mt_guard(_vls, _op) \
772 int _locks_acq = 0; \
773 if (PREDICT_FALSE (vcl_get_worker_index () == ~0)) \
775 if (PREDICT_FALSE (vlsl->vls_mt_n_threads > 1)) \
776 vls_mt_acq_locks (_vls, _op, &_locks_acq); \
778 #define vls_mt_unguard() \
779 if (PREDICT_FALSE (_locks_acq)) \
780 vls_mt_rel_locks (_locks_acq)
783 vls_write (vls_handle_t vlsh, void *buf, size_t nbytes)
785 vcl_locked_session_t *vls;
788 if (!(vls = vls_get_w_dlock (vlsh)))
789 return VPPCOM_EBADFD;
791 vls_mt_guard (vls, VLS_MT_OP_WRITE);
792 rv = vppcom_session_write (vls_to_sh_tu (vls), buf, nbytes);
794 vls_get_and_unlock (vlsh);
799 vls_write_msg (vls_handle_t vlsh, void *buf, size_t nbytes)
801 vcl_locked_session_t *vls;
804 if (!(vls = vls_get_w_dlock (vlsh)))
805 return VPPCOM_EBADFD;
806 vls_mt_guard (vls, VLS_MT_OP_WRITE);
807 rv = vppcom_session_write_msg (vls_to_sh_tu (vls), buf, nbytes);
809 vls_get_and_unlock (vlsh);
814 vls_sendto (vls_handle_t vlsh, void *buf, int buflen, int flags,
817 vcl_locked_session_t *vls;
820 if (!(vls = vls_get_w_dlock (vlsh)))
821 return VPPCOM_EBADFD;
822 vls_mt_guard (vls, VLS_MT_OP_WRITE);
823 rv = vppcom_session_sendto (vls_to_sh_tu (vls), buf, buflen, flags, ep);
825 vls_get_and_unlock (vlsh);
830 vls_read (vls_handle_t vlsh, void *buf, size_t nbytes)
832 vcl_locked_session_t *vls;
835 if (!(vls = vls_get_w_dlock (vlsh)))
836 return VPPCOM_EBADFD;
837 vls_mt_guard (vls, VLS_MT_OP_READ);
838 rv = vppcom_session_read (vls_to_sh_tu (vls), buf, nbytes);
840 vls_get_and_unlock (vlsh);
845 vls_recvfrom (vls_handle_t vlsh, void *buffer, uint32_t buflen, int flags,
848 vcl_locked_session_t *vls;
851 if (!(vls = vls_get_w_dlock (vlsh)))
852 return VPPCOM_EBADFD;
853 vls_mt_guard (vls, VLS_MT_OP_READ);
854 rv = vppcom_session_recvfrom (vls_to_sh_tu (vls), buffer, buflen, flags,
857 vls_get_and_unlock (vlsh);
862 vls_attr (vls_handle_t vlsh, uint32_t op, void *buffer, uint32_t * buflen)
864 vcl_locked_session_t *vls;
867 if (PREDICT_FALSE (vcl_get_worker_index () == ~0))
870 if (!(vls = vls_get_w_dlock (vlsh)))
871 return VPPCOM_EBADFD;
872 rv = vppcom_session_attr (vls_to_sh_tu (vls), op, buffer, buflen);
873 vls_get_and_unlock (vlsh);
878 vls_bind (vls_handle_t vlsh, vppcom_endpt_t * ep)
880 vcl_locked_session_t *vls;
883 if (!(vls = vls_get_w_dlock (vlsh)))
884 return VPPCOM_EBADFD;
885 rv = vppcom_session_bind (vls_to_sh_tu (vls), ep);
886 vls_get_and_unlock (vlsh);
891 vls_listen (vls_handle_t vlsh, int q_len)
893 vcl_locked_session_t *vls;
896 if (!(vls = vls_get_w_dlock (vlsh)))
897 return VPPCOM_EBADFD;
898 vls_mt_guard (vls, VLS_MT_OP_XPOLL);
899 rv = vppcom_session_listen (vls_to_sh_tu (vls), q_len);
901 vls_get_and_unlock (vlsh);
906 vls_connect (vls_handle_t vlsh, vppcom_endpt_t * server_ep)
908 vcl_locked_session_t *vls;
911 if (!(vls = vls_get_w_dlock (vlsh)))
912 return VPPCOM_EBADFD;
913 vls_mt_guard (vls, VLS_MT_OP_XPOLL);
914 rv = vppcom_session_connect (vls_to_sh_tu (vls), server_ep);
916 vls_get_and_unlock (vlsh);
921 vls_mp_checks (vcl_locked_session_t * vls, int is_add)
923 vcl_worker_t *wrk = vcl_worker_get_current ();
927 s = vcl_session_get (wrk, vls->session_index);
928 switch (s->session_state)
933 vls_listener_wrk_set (vls, vls->worker_index, 1 /* is_active */ );
936 vls_listener_wrk_stop_listen (vls, vls->worker_index);
938 case STATE_LISTEN_NO_MQ:
942 /* Register worker as listener */
943 vls_listener_wrk_start_listen (vls, wrk->wrk_index);
945 /* If owner worker did not attempt to accept/xpoll on the session,
946 * force a listen stop for it, since it may not be interested in
947 * accepting new sessions.
948 * This is pretty much a hack done to give app workers the illusion
949 * that it is fine to listen and not accept new sessions for a
950 * given listener. Without it, we would accumulate unhandled
951 * accepts on the passive worker message queue. */
952 owner_wrk = vls_shared_get_owner (vls);
953 if (!vls_listener_wrk_is_active (vls, owner_wrk))
954 vls_listener_wrk_stop_listen (vls, owner_wrk);
962 vls_accept (vls_handle_t listener_vlsh, vppcom_endpt_t * ep, int flags)
964 vls_handle_t accepted_vlsh;
965 vcl_locked_session_t *vls;
968 if (!(vls = vls_get_w_dlock (listener_vlsh)))
969 return VPPCOM_EBADFD;
970 if (vcl_n_workers () > 1)
971 vls_mp_checks (vls, 1 /* is_add */ );
972 vls_mt_guard (vls, VLS_MT_OP_SPOOL);
973 sh = vppcom_session_accept (vls_to_sh_tu (vls), ep, flags);
975 vls_get_and_unlock (listener_vlsh);
978 accepted_vlsh = vls_alloc (sh);
979 if (PREDICT_FALSE (accepted_vlsh == VLS_INVALID_HANDLE))
980 vppcom_session_close (sh);
981 return accepted_vlsh;
985 vls_create (uint8_t proto, uint8_t is_nonblocking)
987 vcl_session_handle_t sh;
990 vls_mt_guard (0, VLS_MT_OP_SPOOL);
991 sh = vppcom_session_create (proto, is_nonblocking);
993 if (sh == INVALID_SESSION_ID)
994 return VLS_INVALID_HANDLE;
996 vlsh = vls_alloc (sh);
997 if (PREDICT_FALSE (vlsh == VLS_INVALID_HANDLE))
998 vppcom_session_close (sh);
1004 vls_close (vls_handle_t vlsh)
1006 vcl_locked_session_t *vls;
1011 vls = vls_get_and_lock (vlsh);
1014 vls_table_wunlock ();
1015 return VPPCOM_EBADFD;
1018 vls_mt_guard (0, VLS_MT_OP_SPOOL);
1020 if (vls_is_shared (vls))
1021 rv = vls_unshare_session (vls, vcl_worker_get_current ());
1023 rv = vppcom_session_close (vls_to_sh (vls));
1028 vls_table_wunlock ();
1034 vls_epoll_create (void)
1036 vcl_session_handle_t sh;
1039 if (PREDICT_FALSE (vcl_get_worker_index () == ~0))
1042 sh = vppcom_epoll_create ();
1043 if (sh == INVALID_SESSION_ID)
1044 return VLS_INVALID_HANDLE;
1046 vlsh = vls_alloc (sh);
1047 if (vlsh == VLS_INVALID_HANDLE)
1048 vppcom_session_close (sh);
1054 vls_epoll_ctl_mp_checks (vcl_locked_session_t * vls, int op)
1056 if (vcl_n_workers () <= 1)
1058 vlsl->epoll_mp_check = 1;
1062 if (op == EPOLL_CTL_MOD)
1065 vlsl->epoll_mp_check = 1;
1066 vls_mp_checks (vls, op == EPOLL_CTL_ADD);
1070 vls_epoll_ctl (vls_handle_t ep_vlsh, int op, vls_handle_t vlsh,
1071 struct epoll_event *event)
1073 vcl_locked_session_t *ep_vls, *vls;
1074 vcl_session_handle_t ep_sh, sh;
1078 ep_vls = vls_get_and_lock (ep_vlsh);
1079 vls = vls_get_and_lock (vlsh);
1080 ep_sh = vls_to_sh (ep_vls);
1081 sh = vls_to_sh (vls);
1083 if (PREDICT_FALSE (!vlsl->epoll_mp_check))
1084 vls_epoll_ctl_mp_checks (vls, op);
1086 vls_table_runlock ();
1088 rv = vppcom_epoll_ctl (ep_sh, op, sh, event);
1091 ep_vls = vls_get (ep_vlsh);
1092 vls = vls_get (vlsh);
1094 vls_unlock (ep_vls);
1095 vls_table_runlock ();
1100 vls_epoll_wait (vls_handle_t ep_vlsh, struct epoll_event *events,
1101 int maxevents, double wait_for_time)
1103 vcl_locked_session_t *vls;
1106 if (!(vls = vls_get_w_dlock (ep_vlsh)))
1107 return VPPCOM_EBADFD;
1108 vls_mt_guard (0, VLS_MT_OP_XPOLL);
1109 rv = vppcom_epoll_wait (vls_to_sh_tu (vls), events, maxevents,
1112 vls_get_and_unlock (ep_vlsh);
1117 vls_select_mp_checks (vcl_si_set * read_map)
1119 vcl_locked_session_t *vls;
1124 if (vcl_n_workers () <= 1)
1126 vlsl->select_mp_check = 1;
1133 vlsl->select_mp_check = 1;
1134 wrk = vcl_worker_get_current ();
1137 clib_bitmap_foreach (si, read_map, ({
1138 s = vcl_session_get (wrk, si);
1139 if (s->session_state == STATE_LISTEN)
1141 vls = vls_get (vls_session_index_to_vlsh (si));
1142 vls_mp_checks (vls, 1 /* is_add */);
1149 vls_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map,
1150 vcl_si_set * except_map, double wait_for_time)
1154 vls_mt_guard (0, VLS_MT_OP_XPOLL);
1155 if (PREDICT_FALSE (!vlsl->select_mp_check))
1156 vls_select_mp_checks (read_map);
1157 rv = vppcom_select (n_bits, read_map, write_map, except_map, wait_for_time);
1163 vls_unshare_vcl_worker_sessions (vcl_worker_t * wrk)
1165 u32 current_wrk, is_current;
1166 vcl_locked_session_t *vls;
1169 if (pool_elts (vcm->workers) <= 1)
1172 current_wrk = vcl_get_worker_index ();
1173 is_current = current_wrk == wrk->wrk_index;
1176 pool_foreach (s, wrk->sessions, ({
1177 vls = vls_get (vls_si_to_vlsh (s->session_index));
1178 if (vls && (is_current || vls_is_shared_by_wrk (vls, current_wrk)))
1179 vls_unshare_session (vls, wrk);
1185 vls_cleanup_vcl_worker (vcl_worker_t * wrk)
1187 vls_worker_t *vls_wrk = vls_worker_get (wrk->wrk_index);
1189 /* Unshare sessions and also cleanup worker since child may have
1190 * called _exit () and therefore vcl may not catch the event */
1191 vls_unshare_vcl_worker_sessions (wrk);
1192 vcl_worker_cleanup (wrk, 1 /* notify vpp */ );
1194 vls_worker_free (vls_wrk);
1198 vls_cleanup_forked_child (vcl_worker_t * wrk, vcl_worker_t * child_wrk)
1200 vcl_worker_t *sub_child;
1203 if (child_wrk->forked_child != ~0)
1205 sub_child = vcl_worker_get_if_valid (child_wrk->forked_child);
1208 /* Wait a bit, maybe the process is going away */
1209 while (kill (sub_child->current_pid, 0) >= 0 && tries++ < 50)
1211 if (kill (sub_child->current_pid, 0) < 0)
1212 vls_cleanup_forked_child (child_wrk, sub_child);
1215 vls_cleanup_vcl_worker (child_wrk);
1216 VDBG (0, "Cleaned up forked child wrk %u", child_wrk->wrk_index);
1217 wrk->forked_child = ~0;
1220 static struct sigaction old_sa;
1223 vls_intercept_sigchld_handler (int signum, siginfo_t * si, void *uc)
1225 vcl_worker_t *wrk, *child_wrk;
1227 if (vcl_get_worker_index () == ~0)
1230 if (sigaction (SIGCHLD, &old_sa, 0))
1232 VERR ("couldn't restore sigchld");
1236 wrk = vcl_worker_get_current ();
1237 if (wrk->forked_child == ~0)
1240 child_wrk = vcl_worker_get_if_valid (wrk->forked_child);
1244 if (si && si->si_pid != child_wrk->current_pid)
1246 VDBG (0, "unexpected child pid %u", si->si_pid);
1249 vls_cleanup_forked_child (wrk, child_wrk);
1252 if (old_sa.sa_flags & SA_SIGINFO)
1254 void (*fn) (int, siginfo_t *, void *) = old_sa.sa_sigaction;
1255 fn (signum, si, uc);
1259 void (*fn) (int) = old_sa.sa_handler;
1266 vls_incercept_sigchld ()
1268 struct sigaction sa;
1269 clib_memset (&sa, 0, sizeof (sa));
1270 sa.sa_sigaction = vls_intercept_sigchld_handler;
1271 sa.sa_flags = SA_SIGINFO;
1272 if (sigaction (SIGCHLD, &sa, &old_sa))
1274 VERR ("couldn't intercept sigchld");
1280 vls_app_pre_fork (void)
1282 vls_incercept_sigchld ();
1283 vcl_flush_mq_events ();
1287 vls_app_fork_child_handler (void)
1289 vcl_worker_t *parent_wrk;
1290 int rv, parent_wrk_index;
1293 parent_wrk_index = vcl_get_worker_index ();
1294 VDBG (0, "initializing forked child %u with parent wrk %u", getpid (),
1298 * Allocate worker vcl
1300 vcl_set_worker_index (~0);
1301 if (!vcl_worker_alloc_and_init ())
1302 VERR ("couldn't allocate new worker");
1305 * Attach to binary api
1307 child_name = format (0, "%v-child-%u%c", vcm->app_name, getpid (), 0);
1308 vcl_cleanup_bapi ();
1309 vppcom_api_hookup ();
1310 vcm->app_state = STATE_APP_START;
1311 rv = vppcom_connect_to_vpp ((char *) child_name);
1312 vec_free (child_name);
1315 VERR ("couldn't connect to VPP!");
1320 * Allocate/initialize vls worker
1322 vls_worker_alloc ();
1325 * Register worker with vpp and share sessions
1327 vcl_worker_register_with_vpp ();
1328 parent_wrk = vcl_worker_get (parent_wrk_index);
1329 vls_worker_copy_on_fork (parent_wrk);
1330 parent_wrk->forked_child = vcl_get_worker_index ();
1332 /* Reset number of threads and set wrk index */
1333 vlsl->vls_mt_n_threads = 0;
1334 vlsl->vls_wrk_index = vcl_get_worker_index ();
1335 vlsl->select_mp_check = 0;
1336 vlsl->epoll_mp_check = 0;
1337 vls_mt_locks_init ();
1339 VDBG (0, "forked child main worker initialized");
1344 vls_app_fork_parent_handler (void)
1347 while (vcm->forking)
1354 vls_worker_t *wrk = vls_worker_get_current ();
1356 /* Unshare the sessions. VCL will clean up the worker */
1357 vls_unshare_vcl_worker_sessions (vcl_worker_get_current ());
1358 vls_worker_free (wrk);
1362 vls_app_create (char *app_name)
1366 if ((rv = vppcom_app_create (app_name)))
1369 vlsm = clib_mem_alloc (sizeof (vls_main_t));
1370 clib_memset (vlsm, 0, sizeof (*vlsm));
1371 clib_rwlock_init (&vlsm->vls_table_lock);
1372 clib_rwlock_init (&vlsm->shared_data_lock);
1373 pool_alloc (vlsm->workers, vcm->cfg.max_workers);
1375 pthread_atfork (vls_app_pre_fork, vls_app_fork_parent_handler,
1376 vls_app_fork_child_handler);
1377 atexit (vls_app_exit);
1378 vls_worker_alloc ();
1379 vlsl->vls_wrk_index = vcl_get_worker_index ();
1380 vls_mt_locks_init ();
1385 * fd.io coding-style-patch-verification: ON
1388 * eval: (c-set-style "gnu")