X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fsession.c;h=d27d1bbab237192628233136cd201608b062f44f;hb=61ae056bd;hp=bd69f752de4677b803d694e18f33a3d14c34846a;hpb=1a8c43778ffbeae4b9084328d07cbc440abb83b2;p=vpp.git diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index bd69f752de4..d27d1bbab23 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -18,7 +18,6 @@ */ #include -#include #include #include #include @@ -283,6 +282,20 @@ session_delete (session_t * s) session_free_w_fifos (s); } +void +session_cleanup_half_open (transport_proto_t tp, session_handle_t ho_handle) +{ + transport_cleanup_half_open (tp, session_handle_index (ho_handle)); +} + +void +session_half_open_delete_notify (transport_proto_t tp, + session_handle_t ho_handle) +{ + app_worker_t *app_wrk = app_worker_get (session_handle_data (ho_handle)); + app_worker_del_half_open (app_wrk, tp, ho_handle); +} + session_t * session_alloc_for_connection (transport_connection_t * tc) { @@ -611,6 +624,11 @@ session_enqueue_notify_inline (session_t * s) SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo)); s->flags &= ~SESSION_F_RX_EVT; + + /* Application didn't confirm accept yet */ + if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING)) + return 0; + if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX))) return -1; @@ -735,14 +753,14 @@ session_main_flush_all_enqueue_events (u8 transport_proto) return errors; } -static inline int -session_stream_connect_notify_inline (transport_connection_t * tc, u8 is_fail, - session_state_t opened_state) +int +session_stream_connect_notify (transport_connection_t * tc, + session_error_t err) { + session_handle_t ho_handle, wrk_handle; u32 opaque = 0, new_ti, new_si; app_worker_t *app_wrk; session_t *s = 0; - u64 ho_handle; /* * Find connection handle and cleanup half-open table @@ -758,14 +776,22 @@ session_stream_connect_notify_inline (transport_connection_t * tc, u8 is_fail, /* Get the app's index from the handle we stored when opening connection * and the opaque (api_context for external apps) from transport session * index */ - app_wrk = app_worker_get_if_valid (ho_handle >> 32); + app_wrk = app_worker_get_if_valid (session_handle_data (ho_handle)); if (!app_wrk) return -1; - opaque = tc->s_index; + wrk_handle = app_worker_lookup_half_open (app_wrk, tc->proto, ho_handle); + if (wrk_handle == SESSION_INVALID_HANDLE) + return -1; + + /* Make sure this is the same half-open index */ + if (session_handle_index (wrk_handle) != session_handle_index (ho_handle)) + return -1; + + opaque = session_handle_data (wrk_handle); - if (is_fail) - return app_worker_connect_notify (app_wrk, s, opaque); + if (err) + return app_worker_connect_notify (app_wrk, s, err, opaque); s = session_alloc_for_connection (tc); s->session_state = SESSION_STATE_CONNECTING; @@ -773,39 +799,53 @@ session_stream_connect_notify_inline (transport_connection_t * tc, u8 is_fail, new_si = s->session_index; new_ti = s->thread_index; - if (app_worker_init_connected (app_wrk, s)) + if ((err = app_worker_init_connected (app_wrk, s))) { session_free (s); - app_worker_connect_notify (app_wrk, 0, opaque); + app_worker_connect_notify (app_wrk, 0, err, opaque); return -1; } s = session_get (new_si, new_ti); - s->session_state = opened_state; + s->session_state = SESSION_STATE_READY; session_lookup_add_connection (tc, session_handle (s)); - if (app_worker_connect_notify (app_wrk, s, opaque)) + if (app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque)) { + session_lookup_del_connection (tc); + /* Avoid notifying app about rejected session cleanup */ s = session_get (new_si, new_ti); - session_free_w_fifos (s); + segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo); + session_free (s); return -1; } return 0; } -int -session_stream_connect_notify (transport_connection_t * tc, u8 is_fail) +static void +session_switch_pool_reply (void *arg) { - return session_stream_connect_notify_inline (tc, is_fail, - SESSION_STATE_READY); -} + u32 session_index = pointer_to_uword (arg); + segment_manager_t *sm; + app_worker_t *app_wrk; + session_t *s; -int -session_ho_stream_connect_notify (transport_connection_t * tc, u8 is_fail) -{ - return session_stream_connect_notify_inline (tc, is_fail, - SESSION_STATE_OPENED); + s = session_get_if_valid (session_index, vlib_get_thread_index ()); + if (!s) + return; + + app_wrk = app_worker_get_if_valid (s->app_wrk_index); + if (!app_wrk) + return; + + /* Attach fifos to the right session and segment slice */ + sm = app_worker_get_connect_segment_manager (app_wrk); + segment_manager_attach_fifo (sm, s->rx_fifo, s); + segment_manager_attach_fifo (sm, s->tx_fifo, s); + + /* Notify app that it has data on the new session */ + session_enqueue_notify (s); } typedef struct _session_switch_pool_args @@ -823,28 +863,38 @@ static void session_switch_pool (void *cb_args) { session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args; + session_handle_t new_sh; + segment_manager_t *sm; app_worker_t *app_wrk; session_t *s; + void *rargs; ASSERT (args->thread_index == vlib_get_thread_index ()); s = session_get (args->session_index, args->thread_index); - s->tx_fifo->master_session_index = args->new_session_index; - s->tx_fifo->master_thread_index = args->new_thread_index; + transport_cleanup (session_get_transport_proto (s), s->connection_index, s->thread_index); + new_sh = session_make_handle (args->new_session_index, + args->new_thread_index); + app_wrk = app_worker_get_if_valid (s->app_wrk_index); if (app_wrk) { - session_handle_t new_sh; - new_sh = session_make_handle (args->new_session_index, - args->new_thread_index); - app_worker_migrate_notify (app_wrk, s, new_sh); + /* Cleanup fifo segment slice state for fifos */ + sm = app_worker_get_connect_segment_manager (app_wrk); + segment_manager_detach_fifo (sm, s->rx_fifo); + segment_manager_detach_fifo (sm, s->tx_fifo); - /* Trigger app read on the new thread */ - session_enqueue_notify_thread (new_sh); + /* Notify app, using old session, about the migration event */ + app_worker_migrate_notify (app_wrk, s, new_sh); } + /* Trigger app read and fifo updates on the new thread */ + rargs = uword_to_pointer (args->new_session_index, void *); + session_send_rpc_evt_to_thread (args->new_thread_index, + session_switch_pool_reply, rargs); + session_free (s); clib_mem_free (cb_args); } @@ -864,10 +914,9 @@ session_dgram_connect_notify (transport_connection_t * tc, */ new_s = session_clone_safe (tc->s_index, old_thread_index); new_s->connection_index = tc->c_index; - new_s->rx_fifo->master_session_index = new_s->session_index; - new_s->rx_fifo->master_thread_index = new_s->thread_index; new_s->session_state = SESSION_STATE_READY; new_s->flags |= SESSION_F_IS_MIGRATING; + session_lookup_add_connection (tc, session_handle (new_s)); /* @@ -1069,7 +1118,10 @@ session_stream_accept (transport_connection_t * tc, u32 listener_index, s->session_state = SESSION_STATE_CREATED; if ((rv = app_worker_init_accepted (s))) - return rv; + { + session_free (s); + return rv; + } session_lookup_add_connection (tc, session_handle (s)); @@ -1077,9 +1129,46 @@ session_stream_accept (transport_connection_t * tc, u32 listener_index, if (notify) { app_worker_t *app_wrk = app_worker_get (s->app_wrk_index); - return app_worker_accept_notify (app_wrk, s); + if ((rv = app_worker_accept_notify (app_wrk, s))) + { + session_lookup_del_session (s); + segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo); + session_free (s); + return rv; + } + } + + return 0; +} + +int +session_dgram_accept (transport_connection_t * tc, u32 listener_index, + u32 thread_index) +{ + app_worker_t *app_wrk; + session_t *s; + int rv; + + s = session_alloc_for_connection (tc); + s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index; + + if ((rv = app_worker_init_accepted (s))) + { + session_free (s); + return rv; + } + + app_wrk = app_worker_get (s->app_wrk_index); + if ((rv = app_worker_accept_notify (app_wrk, s))) + { + segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo); + session_free (s); + return rv; } + s->session_state = SESSION_STATE_READY; + session_lookup_add_connection (tc, session_handle (s)); + return 0; } @@ -1098,7 +1187,7 @@ session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) if (rv < 0) { SESSION_DBG ("Transport failed to open connection."); - return VNET_API_ERROR_SESSION_CONNECT; + return rv; } tc = transport_get_half_open (rmt->transport_proto, (u32) rv); @@ -1116,7 +1205,7 @@ session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) sh = session_handle (s); session_lookup_add_connection (tc, sh); - return app_worker_connect_notify (app_wrk, s, opaque); + return app_worker_connect_notify (app_wrk, s, SESSION_E_NONE, opaque); } int @@ -1124,7 +1213,7 @@ session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) { transport_connection_t *tc; transport_endpoint_cfg_t *tep; - u64 handle; + u64 handle, wrk_handle; int rv; tep = session_endpoint_to_transport_cfg (rmt); @@ -1132,7 +1221,7 @@ session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) if (rv < 0) { SESSION_DBG ("Transport failed to open connection."); - return VNET_API_ERROR_SESSION_CONNECT; + return rv; } tc = transport_get_half_open (rmt->transport_proto, (u32) rv); @@ -1144,15 +1233,21 @@ session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque) * is needed when the connect notify comes and we have to notify the * external app */ - handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index; + handle = session_make_handle (tc->c_index, app_wrk_index); session_lookup_add_half_open (tc, handle); - /* Store api_context (opaque) for when the reply comes. Not the nicest - * thing but better than allocating a separate half-open pool. + /* Store the half-open handle in the connection. Transport will use it + * when cleaning up @ref session_half_open_delete_notify */ - tc->s_index = opaque; - if (transport_half_open_has_fifos (rmt->transport_proto)) - return session_ho_stream_connect_notify (tc, 0 /* is_fail */ ); + tc->s_ho_handle = handle; + + /* Track the half-open connections in case we want to forcefully + * clean them up @ref session_cleanup_half_open + */ + wrk_handle = session_make_handle (tc->c_index, opaque); + app_worker_add_half_open (app_worker_get (app_wrk_index), + rmt->transport_proto, handle, wrk_handle); + return 0; } @@ -1211,7 +1306,8 @@ int session_listen (session_t * ls, session_endpoint_cfg_t * sep) { transport_endpoint_t *tep; - u32 tc_index, s_index; + int tc_index; + u32 s_index; /* Transport bind/listen */ tep = session_endpoint_to_transport (sep); @@ -1219,8 +1315,8 @@ session_listen (session_t * ls, session_endpoint_cfg_t * sep) tc_index = transport_start_listen (session_get_transport_proto (ls), s_index, tep); - if (tc_index == (u32) ~ 0) - return -1; + if (tc_index < 0) + return tc_index; /* Attach transport to session. Lookup tables are populated by the app * worker because local tables (for ct sessions) are not backed by a fib */ @@ -1242,14 +1338,17 @@ session_stop_listen (session_t * s) transport_connection_t *tc; if (s->session_state != SESSION_STATE_LISTENING) - return -1; + return SESSION_E_NOLISTEN; tc = transport_get_listener (tp, s->connection_index); + + /* If no transport, assume everything was cleaned up already */ if (!tc) - return VNET_API_ERROR_ADDRESS_NOT_IN_USE; + return SESSION_E_NONE; if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP)) session_lookup_del_connection (tc); + transport_stop_listen (tp, s->connection_index); return 0; } @@ -1469,12 +1568,6 @@ static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = { }; /* *INDENT-ON* */ -/** - * Initialize session layer for given transport proto and ip version - * - * Allocates per session type (transport proto + ip version) data structures - * and adds arc from session queue node to session type output node. - */ void session_register_transport (transport_proto_t transport_proto, const transport_proto_vft_t * vft, u8 is_ip4, @@ -1505,6 +1598,24 @@ session_register_transport (transport_proto_t transport_proto, session_tx_fns[vft->transport_options.tx_type]; } +transport_proto_t +session_add_transport_proto (void) +{ + session_main_t *smm = &session_main; + session_worker_t *wrk; + u32 thread; + + smm->last_transport_proto_type += 1; + + for (thread = 0; thread < vec_len (smm->wrk); thread++) + { + wrk = session_main_get_worker (thread); + vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type); + } + + return smm->last_transport_proto_type; +} + transport_connection_t * session_get_transport (session_t * s) { @@ -1536,11 +1647,11 @@ listen_session_get_transport (session_t * s) } void -session_flush_frames_main_thread (vlib_main_t * vm) +session_queue_run_on_main_thread (vlib_main_t * vm) { ASSERT (vlib_get_thread_index () == 0); vlib_process_signal_event_mt (vm, session_queue_process_node.index, - SESSION_Q_PROCESS_FLUSH_FRAMES, 0); + SESSION_Q_PROCESS_RUN_ON_MAIN, 0); } static clib_error_t * @@ -1568,8 +1679,9 @@ session_manager_main_enable (vlib_main_t * vm) wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list); wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list); wrk->vm = vlib_mains[i]; - wrk->last_vlib_time = vlib_time_now (vlib_mains[i]); + wrk->last_vlib_time = vlib_time_now (vm); wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ; + vec_validate (wrk->session_to_enqueue, smm->last_transport_proto_type); if (num_threads > 1) clib_rwlock_init (&smm->wrk[i].peekers_rw_locks); @@ -1613,6 +1725,8 @@ session_manager_main_enable (vlib_main_t * vm) /* Enable transports */ transport_enable_disable (vm, 1); + session_debug_init (); + return 0; } @@ -1672,10 +1786,14 @@ vnet_session_enable_disable (vlib_main_t * vm, u8 is_en) } clib_error_t * -session_manager_main_init (vlib_main_t * vm) +session_main_init (vlib_main_t * vm) { session_main_t *smm = &session_main; + + smm->is_enabled = 0; + smm->session_enable_asap = 0; smm->session_baseva = HIGH_SEGMENT_BASEVA; + #if (HIGH_SEGMENT_BASEVA > (4ULL << 30)) smm->session_va_space_size = 128ULL << 30; smm->evt_qs_segment_size = 64 << 20; @@ -1683,13 +1801,14 @@ session_manager_main_init (vlib_main_t * vm) smm->session_va_space_size = 128 << 20; smm->evt_qs_segment_size = 1 << 20; #endif - smm->is_enabled = 0; - smm->session_enable_asap = 0; + + smm->last_transport_proto_type = TRANSPORT_PROTO_QUIC; + return 0; } static clib_error_t * -session_main_init (vlib_main_t * vm) +session_main_loop_init (vlib_main_t * vm) { session_main_t *smm = &session_main; if (smm->session_enable_asap) @@ -1701,8 +1820,8 @@ session_main_init (vlib_main_t * vm) return 0; } -VLIB_INIT_FUNCTION (session_manager_main_init); -VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_init); +VLIB_INIT_FUNCTION (session_main_init); +VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_loop_init); static clib_error_t * session_config_fn (vlib_main_t * vm, unformat_input_t * input) @@ -1785,6 +1904,8 @@ session_config_fn (vlib_main_t * vm, unformat_input_t * input) ; else if (unformat (input, "enable")) smm->session_enable_asap = 1; + else if (unformat (input, "use-app-socket-api")) + appns_sapi_enable (); else return clib_error_return (0, "unknown input `%U'", format_unformat_error, input);