X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fsession.c;h=7513aa32ed8e7ca906492a0f39175441666f30f1;hb=7da8829d8;hp=9a4d29bdf29dc7e47359e7d9ce42eb9762ff9820;hpb=b462418890240b2e38dbf522f9dd0196b79e0fa8;p=vpp.git diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 9a4d29bdf29..7513aa32ed8 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -28,11 +28,12 @@ static inline int session_send_evt_to_thread (void *data, void *args, u32 thread_index, session_evt_type_t evt_type) { + session_worker_t *wrk = session_main_get_worker (thread_index); session_event_t *evt; svm_msg_q_msg_t msg; svm_msg_q_t *mq; - mq = session_main_get_vpp_event_queue (thread_index); + mq = wrk->vpp_event_queue; if (PREDICT_FALSE (svm_msg_q_lock (mq))) return -1; if (PREDICT_FALSE (svm_msg_q_is_full (mq) @@ -72,6 +73,10 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index, evt->event_type = evt_type; svm_msg_q_add_and_unlock (mq, &msg); + + if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT)) + vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index); + return 0; } @@ -121,19 +126,20 @@ session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args) void session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio) { - session_t *s; + session_t *s = session_get (tc->s_index, tc->thread_index); - s = session_get (tc->s_index, tc->thread_index); ASSERT (s->thread_index == vlib_get_thread_index ()); ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED); + if (!(s->flags & SESSION_F_CUSTOM_TX)) { s->flags |= SESSION_F_CUSTOM_TX; if (svm_fifo_set_event (s->tx_fifo) || transport_connection_is_descheduled (tc)) { - session_worker_t *wrk; session_evt_elt_t *elt; + session_worker_t *wrk; + wrk = session_main_get_worker (tc->thread_index); if (has_prio) elt = session_evt_alloc_new (wrk); @@ -142,6 +148,10 @@ session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio) elt->evt.session_index = tc->s_index; elt->evt.event_type = SESSION_IO_EVT_TX; tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED; + + if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT)) + vlib_node_set_interrupt_pending (wrk->vm, + session_queue_node.index); } } } @@ -157,6 +167,9 @@ sesssion_reschedule_tx (transport_connection_t * tc) elt = session_evt_alloc_new (wrk); elt->evt.session_index = tc->s_index; elt->evt.event_type = SESSION_IO_EVT_TX; + + if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT)) + vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index); } static void @@ -175,6 +188,9 @@ session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt) clib_memset (&elt->evt, 0, sizeof (session_event_t)); elt->evt.session_handle = session_handle (s); elt->evt.event_type = evt; + + if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT)) + vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index); } else session_send_ctrl_evt_to_thread (s, evt); @@ -228,9 +244,6 @@ session_is_valid (u32 si, u8 thread_index) s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si); - if (!s) - return 1; - if (s->thread_index != thread_index || s->session_index != si) return 0; @@ -854,23 +867,12 @@ static void session_switch_pool_reply (void *arg) { u32 session_index = pointer_to_uword (arg); - segment_manager_t *sm; - app_worker_t *app_wrk; session_t *s; s = session_get_if_valid (session_index, vlib_get_thread_index ()); if (!s) return; - app_wrk = app_worker_get_if_valid (s->app_wrk_index); - if (!app_wrk) - return; - - /* Attach fifos to the right session and segment slice */ - sm = app_worker_get_connect_segment_manager (app_wrk); - segment_manager_attach_fifo (sm, s->rx_fifo, s); - segment_manager_attach_fifo (sm, s->tx_fifo, s); - /* Notify app that it has data on the new session */ session_enqueue_notify (s); } @@ -910,8 +912,8 @@ session_switch_pool (void *cb_args) { /* Cleanup fifo segment slice state for fifos */ sm = app_worker_get_connect_segment_manager (app_wrk); - segment_manager_detach_fifo (sm, s->rx_fifo); - segment_manager_detach_fifo (sm, s->tx_fifo); + segment_manager_detach_fifo (sm, &s->rx_fifo); + segment_manager_detach_fifo (sm, &s->tx_fifo); /* Notify app, using old session, about the migration event */ app_worker_migrate_notify (app_wrk, s, new_sh); @@ -935,6 +937,8 @@ session_dgram_connect_notify (transport_connection_t * tc, { session_t *new_s; session_switch_pool_args_t *rpc_args; + segment_manager_t *sm; + app_worker_t *app_wrk; /* * Clone half-open session to the right thread. @@ -944,7 +948,17 @@ session_dgram_connect_notify (transport_connection_t * tc, new_s->session_state = SESSION_STATE_READY; new_s->flags |= SESSION_F_IS_MIGRATING; - session_lookup_add_connection (tc, session_handle (new_s)); + if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP)) + session_lookup_add_connection (tc, session_handle (new_s)); + + app_wrk = app_worker_get_if_valid (new_s->app_wrk_index); + if (app_wrk) + { + /* New set of fifos attached to the same shared memory */ + sm = app_worker_get_connect_segment_manager (app_wrk); + segment_manager_attach_fifo (sm, &new_s->rx_fifo, new_s); + segment_manager_attach_fifo (sm, &new_s->tx_fifo, new_s); + } /* * Ask thread owning the old session to clean it up and make us the tx @@ -1547,9 +1561,6 @@ session_vpp_event_queues_allocate (session_main_t * smm) cfg->ring_cfgs = rc; smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (eqs, i, cfg); - - if (svm_msg_q_alloc_consumer_eventfd (smm->wrk[i].vpp_event_queue)) - clib_warning ("eventfd returned"); } } @@ -1595,16 +1606,14 @@ session_register_transport (transport_proto_t transport_proto, vec_validate (smm->session_type_to_next, session_type); vec_validate (smm->session_tx_fns, session_type); - /* *INDENT-OFF* */ if (output_node != ~0) { - foreach_vlib_main (({ - next_index = vlib_node_add_next (this_vlib_main, - session_queue_node.index, - output_node); - })); + foreach_vlib_main () + { + next_index = vlib_node_add_next ( + this_vlib_main, session_queue_node.index, output_node); + } } - /* *INDENT-ON* */ smm->session_type_to_next[session_type] = next_index; smm->session_tx_fns[session_type] = @@ -1663,18 +1672,15 @@ void session_queue_run_on_main_thread (vlib_main_t * vm) { ASSERT (vlib_get_thread_index () == 0); - vlib_process_signal_event_mt (vm, session_queue_process_node.index, - SESSION_Q_PROCESS_RUN_ON_MAIN, 0); + vlib_node_set_interrupt_pending (vm, session_queue_node.index); } static clib_error_t * session_manager_main_enable (vlib_main_t * vm) { - segment_manager_main_init_args_t _sm_args = { 0 }, *sm_args = &_sm_args; session_main_t *smm = &session_main; vlib_thread_main_t *vtm = vlib_get_thread_main (); u32 num_threads, preallocated_sessions_per_worker; - uword margin = 8 << 12; session_worker_t *wrk; int i; @@ -1696,22 +1702,23 @@ session_manager_main_enable (vlib_main_t * vm) wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list); wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list); wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list); - wrk->vm = vlib_mains[i]; + wrk->vm = vlib_get_main_by_index (i); wrk->last_vlib_time = vlib_time_now (vm); wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ; vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type); if (num_threads > 1) clib_rwlock_init (&smm->wrk[i].peekers_rw_locks); + + if (!smm->no_adaptive && smm->use_private_rx_mqs) + session_wrk_enable_adaptive_mode (wrk); } /* Allocate vpp event queues segment and queue */ session_vpp_event_queues_allocate (smm); - /* Initialize fifo segment main baseva and timeout */ - sm_args->baseva = smm->session_baseva + smm->evt_qs_segment_size + margin; - sm_args->size = smm->session_va_space_size; - segment_manager_main_init (sm_args); + /* Initialize segment manager properties */ + segment_manager_main_init (); /* Preallocate sessions */ if (smm->preallocated_sessions) @@ -1760,35 +1767,42 @@ session_manager_main_disable (vlib_main_t * vm) void session_node_enable_disable (u8 is_en) { + u8 mstate = is_en ? VLIB_NODE_STATE_INTERRUPT : VLIB_NODE_STATE_DISABLED; u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED; - vlib_thread_main_t *vtm = vlib_get_thread_main (); - u8 have_workers = vtm->n_threads != 0; - - /* *INDENT-OFF* */ - foreach_vlib_main (({ - if (have_workers && ii == 0) - { - if (is_en) - { - vlib_node_set_state (this_vlib_main, - session_queue_process_node.index, state); - vlib_node_t *n = vlib_get_node (this_vlib_main, - session_queue_process_node.index); - vlib_start_process (this_vlib_main, n->runtime_index); - } - else - { - vlib_process_signal_event_mt (this_vlib_main, - session_queue_process_node.index, - SESSION_Q_PROCESS_STOP, 0); - } - if (!session_main.poll_main) - continue; - } - vlib_node_set_state (this_vlib_main, session_queue_node.index, - state); - })); - /* *INDENT-ON* */ + session_main_t *sm = &session_main; + vlib_main_t *vm; + vlib_node_t *n; + int n_vlibs, i; + + n_vlibs = vlib_get_n_threads (); + for (i = 0; i < n_vlibs; i++) + { + vm = vlib_get_main_by_index (i); + /* main thread with workers and not polling */ + if (i == 0 && n_vlibs > 1) + { + vlib_node_set_state (vm, session_queue_node.index, mstate); + if (is_en) + { + vlib_node_set_state (vm, session_queue_process_node.index, + state); + n = vlib_get_node (vm, session_queue_process_node.index); + vlib_start_process (vm, n->runtime_index); + } + else + { + vlib_process_signal_event_mt (vm, + session_queue_process_node.index, + SESSION_Q_PROCESS_STOP, 0); + } + if (!sm->poll_main) + continue; + } + vlib_node_set_state (vm, session_queue_node.index, state); + } + + if (sm->use_private_rx_mqs) + application_enable_rx_mqs_nodes (is_en); } clib_error_t * @@ -1821,6 +1835,8 @@ session_main_init (vlib_main_t * vm) smm->is_enabled = 0; smm->session_enable_asap = 0; smm->poll_main = 0; + smm->use_private_rx_mqs = 0; + smm->no_adaptive = 0; smm->session_baseva = HIGH_SEGMENT_BASEVA; #if (HIGH_SEGMENT_BASEVA > (4ULL << 30)) @@ -1831,7 +1847,7 @@ session_main_init (vlib_main_t * vm) smm->evt_qs_segment_size = 1 << 20; #endif - smm->last_transport_proto_type = TRANSPORT_PROTO_QUIC; + smm->last_transport_proto_type = TRANSPORT_PROTO_DTLS; return 0; } @@ -1940,6 +1956,10 @@ session_config_fn (vlib_main_t * vm, unformat_input_t * input) appns_sapi_enable (); else if (unformat (input, "poll-main")) smm->poll_main = 1; + else if (unformat (input, "use-private-rx-mqs")) + smm->use_private_rx_mqs = 1; + else if (unformat (input, "no-adaptive")) + smm->no_adaptive = 1; else return clib_error_return (0, "unknown input `%U'", format_unformat_error, input);