X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fapplication.c;h=9020d1c286444346baa22f5116998302a13fc677;hb=f8f516a8b0ccab2f5d9796f90419bf2661c750af;hp=71fc93f960f09f6aa966f67715227c9dca33ec41;hpb=7758bf68a03a32f17c07154172157f5bdf30e684;p=vpp.git diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 71fc93f960f..9020d1c2864 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -28,11 +28,6 @@ static application_t *app_pool; */ static uword *app_by_api_client_index; -/** - * Default application event queue size - */ -static u32 default_app_evt_queue_size = 128; - static u8 * app_get_name_from_reg_index (application_t * app) { @@ -138,6 +133,7 @@ application_new () app->index = application_get_index (app); app->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX; app->first_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX; + app->local_segment_manager = APP_INVALID_SEGMENT_MANAGER_INDEX; if (CLIB_DEBUG > 1) clib_warning ("[%d] New app (%d)", getpid (), app->index); return app; @@ -147,8 +143,8 @@ void application_del (application_t * app) { vnet_unbind_args_t _a, *a = &_a; - segment_manager_t *sm; u64 handle, *handles = 0; + segment_manager_t *sm; u32 index; int i; @@ -207,6 +203,12 @@ application_del (application_t * app) segment_manager_del (sm); } } + + /* + * Local connections cleanup + */ + application_local_sessions_del (app); + application_table_del (app); pool_put (app_pool, app); } @@ -255,8 +257,8 @@ int application_init (application_t * app, u32 api_client_index, u64 * options, session_cb_vft_t * cb_fns) { - u32 app_evt_queue_size, first_seg_size, prealloc_fifo_pairs; ssvm_segment_type_t seg_type = SSVM_SEGMENT_MEMFD; + u32 first_seg_size, prealloc_fifo_pairs; segment_manager_properties_t *props; vl_api_registration_t *reg; segment_manager_t *sm; @@ -298,15 +300,14 @@ application_init (application_t * app, u32 api_client_index, u64 * options, props->rx_fifo_size = options[APP_OPTIONS_RX_FIFO_SIZE]; if (options[APP_OPTIONS_TX_FIFO_SIZE]) props->tx_fifo_size = options[APP_OPTIONS_TX_FIFO_SIZE]; + if (options[APP_OPTIONS_EVT_QUEUE_SIZE]) + props->evt_q_size = options[APP_OPTIONS_EVT_QUEUE_SIZE]; props->segment_type = seg_type; - app_evt_queue_size = options[APP_OPTIONS_EVT_QUEUE_SIZE] > 0 ? - options[APP_OPTIONS_EVT_QUEUE_SIZE] : default_app_evt_queue_size; first_seg_size = options[APP_OPTIONS_SEGMENT_SIZE]; prealloc_fifo_pairs = options[APP_OPTIONS_PREALLOC_FIFO_PAIRS]; - if ((rv = segment_manager_init (sm, first_seg_size, app_evt_queue_size, - prealloc_fifo_pairs))) + if ((rv = segment_manager_init (sm, first_seg_size, prealloc_fifo_pairs))) return rv; sm->first_is_protected = 1; @@ -319,6 +320,7 @@ application_init (application_t * app, u32 api_client_index, u64 * options, app->cb_fns = *cb_fns; app->ns_index = options[APP_OPTIONS_NAMESPACE]; app->listeners_table = hash_create (0, sizeof (u64)); + app->local_connects = hash_create (0, sizeof (u64)); app->proxied_transports = options[APP_OPTIONS_PROXY_TRANSPORT]; app->event_queue = segment_manager_event_queue (sm); @@ -333,6 +335,13 @@ application_init (application_t * app, u32 api_client_index, u64 * options, /* Add app to lookup by api_client_index table */ application_table_add (app); + /* + * Segment manager for local sessions + */ + sm = segment_manager_new (); + sm->app_index = app->index; + app->local_segment_manager = segment_manager_index (sm); + return 0; } @@ -388,11 +397,11 @@ application_alloc_segment_manager (application_t * app) */ int application_start_listen (application_t * srv, session_endpoint_t * sep, - u64 * res) + session_handle_t * res) { segment_manager_t *sm; stream_session_t *s; - u64 handle; + session_handle_t handle; session_type_t sst; sst = session_type_from_proto_and_ip (sep->transport_proto, sep->is_ip4); @@ -425,7 +434,7 @@ err: * Stop listening on session associated to handle */ int -application_stop_listen (application_t * srv, u64 handle) +application_stop_listen (application_t * srv, session_handle_t handle) { stream_session_t *listener; uword *indexp; @@ -499,6 +508,26 @@ application_get_listen_segment_manager (application_t * app, return segment_manager_get (*smp); } +segment_manager_t * +application_get_local_segment_manager (application_t * app) +{ + return segment_manager_get (app->local_segment_manager); +} + +segment_manager_t * +application_get_local_segment_manager_w_session (application_t * app, + local_session_t * ls) +{ + stream_session_t *listener; + if (application_local_session_listener_has_transport (ls)) + { + listener = listen_session_get (ls->listener_session_type, + ls->listener_index); + return application_get_listen_segment_manager (app, listener); + } + return segment_manager_get (app->local_segment_manager); +} + int application_is_proxy (application_t * app) { @@ -731,12 +760,407 @@ application_get_segment_manager_properties (u32 app_index) return &app->sm_properties; } +local_session_t * +application_alloc_local_session (application_t * app) +{ + local_session_t *s; + pool_get (app->local_sessions, s); + memset (s, 0, sizeof (*s)); + s->app_index = app->index; + s->session_index = s - app->local_sessions; + s->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0); + return s; +} + +void +application_free_local_session (application_t * app, local_session_t * s) +{ + pool_put (app->local_sessions, s); + if (CLIB_DEBUG) + memset (s, 0xfc, sizeof (*s)); +} + +local_session_t * +application_get_local_session (application_t * app, u32 session_index) +{ + return pool_elt_at_index (app->local_sessions, session_index); +} + +local_session_t * +application_get_local_session_from_handle (session_handle_t handle) +{ + application_t *server; + u32 session_index, server_index; + local_session_parse_handle (handle, &server_index, &session_index); + server = application_get (server_index); + return application_get_local_session (server, session_index); +} + +always_inline void +application_local_listener_session_endpoint (local_session_t * ll, + session_endpoint_t * sep) +{ + sep->transport_proto = + session_type_transport_proto (ll->listener_session_type); + sep->port = ll->port; + sep->is_ip4 = ll->listener_session_type & 1; +} + +int +application_start_local_listen (application_t * server, + session_endpoint_t * sep, + session_handle_t * handle) +{ + session_handle_t lh; + local_session_t *ll; + u32 table_index; + + table_index = application_local_session_table (server); + + /* An exact sep match, as opposed to session_lookup_local_listener */ + lh = session_lookup_endpoint_listener (table_index, sep, 1); + if (lh != SESSION_INVALID_HANDLE) + return VNET_API_ERROR_ADDRESS_IN_USE; + + pool_get (server->local_listen_sessions, ll); + memset (ll, 0, sizeof (*ll)); + ll->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0); + ll->app_index = server->index; + ll->session_index = ll - server->local_listen_sessions; + ll->port = sep->port; + /* Store the original session type for the unbind */ + ll->listener_session_type = + session_type_from_proto_and_ip (sep->transport_proto, sep->is_ip4); + + *handle = application_local_session_handle (ll); + session_lookup_add_session_endpoint (table_index, sep, *handle); + + return 0; +} + +/** + * Clean up local session table. If we have a listener session use it to + * find the port and proto. If not, the handle must be a local table handle + * so parse it. + */ +int +application_stop_local_listen (application_t * server, session_handle_t lh) +{ + session_endpoint_t sep = SESSION_ENDPOINT_NULL; + u32 table_index, ll_index, server_index; + stream_session_t *sl = 0; + local_session_t *ll, *ls; + + table_index = application_local_session_table (server); + + /* We have both local and global table binds. Figure from global what + * the sep we should be cleaning up is. + */ + if (!session_handle_is_local (lh)) + { + sl = listen_session_get_from_handle (lh); + if (!sl || listen_session_get_local_session_endpoint (sl, &sep)) + { + clib_warning ("broken listener"); + return -1; + } + lh = session_lookup_endpoint_listener (table_index, &sep, 0); + if (lh == SESSION_INVALID_HANDLE) + return -1; + } + + local_session_parse_handle (lh, &server_index, &ll_index); + ASSERT (server->index == server_index); + if (!(ll = application_get_local_listen_session (server, ll_index))) + { + clib_warning ("no local listener"); + return -1; + } + application_local_listener_session_endpoint (ll, &sep); + session_lookup_del_session_endpoint (table_index, &sep); + + /* *INDENT-OFF* */ + pool_foreach (ls, server->local_sessions, ({ + if (ls->listener_index == ll->session_index) + application_local_session_disconnect (server->index, ls); + })); + /* *INDENT-ON* */ + pool_put_index (server->local_listen_sessions, ll->session_index); + + return 0; +} + +int +application_local_session_connect (u32 table_index, application_t * client, + application_t * server, + local_session_t * ll, u32 opaque) +{ + u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10; + segment_manager_properties_t *props, *cprops; + int rv, has_transport, seg_index; + svm_fifo_segment_private_t *seg; + segment_manager_t *sm; + local_session_t *ls; + svm_queue_t *sq, *cq; + + ls = application_alloc_local_session (server); + + props = application_segment_manager_properties (server); + cprops = application_segment_manager_properties (client); + evt_q_elts = props->evt_q_size + cprops->evt_q_size; + evt_q_sz = evt_q_elts * sizeof (session_fifo_event_t); + seg_size = props->rx_fifo_size + props->tx_fifo_size + evt_q_sz + margin; + + has_transport = session_has_transport ((stream_session_t *) ll); + if (!has_transport) + { + /* Local sessions don't have backing transport */ + ls->port = ll->port; + sm = application_get_local_segment_manager (server); + } + else + { + stream_session_t *sl = (stream_session_t *) ll; + transport_connection_t *tc; + tc = listen_session_get_transport (sl); + ls->port = tc->lcl_port; + sm = application_get_listen_segment_manager (server, sl); + } + + seg_index = segment_manager_add_segment (sm, seg_size); + if (seg_index < 0) + { + clib_warning ("failed to add new cut-through segment"); + return seg_index; + } + seg = segment_manager_get_segment_w_lock (sm, seg_index); + sq = segment_manager_alloc_queue (seg, props->evt_q_size); + cq = segment_manager_alloc_queue (seg, cprops->evt_q_size); + ls->server_evt_q = pointer_to_uword (sq); + ls->client_evt_q = pointer_to_uword (cq); + rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size, + props->tx_fifo_size, + &ls->server_rx_fifo, + &ls->server_tx_fifo); + if (rv) + { + clib_warning ("failed to add fifos in cut-through segment"); + segment_manager_segment_reader_unlock (sm); + goto failed; + } + ls->server_rx_fifo->master_session_index = ls->session_index; + ls->server_tx_fifo->master_session_index = ls->session_index; + ls->server_rx_fifo->master_thread_index = ~0; + ls->server_tx_fifo->master_thread_index = ~0; + ls->svm_segment_index = seg_index; + ls->listener_index = ll->session_index; + ls->client_index = client->index; + ls->client_opaque = opaque; + ls->listener_session_type = ll->session_type; + + if ((rv = server->cb_fns.add_segment_callback (server->api_client_index, + &seg->ssvm))) + { + clib_warning ("failed to notify server of new segment"); + segment_manager_segment_reader_unlock (sm); + goto failed; + } + segment_manager_segment_reader_unlock (sm); + if ((rv = server->cb_fns.session_accept_callback ((stream_session_t *) ls))) + { + clib_warning ("failed to send accept cut-through notify to server"); + goto failed; + } + if (server->flags & APP_OPTIONS_FLAGS_IS_BUILTIN) + application_local_session_connect_notify (ls); + + return 0; + +failed: + if (!has_transport) + segment_manager_del_segment (sm, seg); + return rv; +} + +static uword +application_client_local_connect_key (local_session_t * ls) +{ + return ((uword) ls->app_index << 32 | (uword) ls->session_index); +} + +static void +application_client_local_connect_key_parse (uword key, u32 * app_index, + u32 * session_index) +{ + *app_index = key >> 32; + *session_index = key & 0xFFFFFFFF; +} + +int +application_local_session_connect_notify (local_session_t * ls) +{ + svm_fifo_segment_private_t *seg; + application_t *client, *server; + segment_manager_t *sm; + int rv, is_fail = 0; + uword client_key; + + client = application_get (ls->client_index); + server = application_get (ls->app_index); + sm = application_get_local_segment_manager_w_session (server, ls); + seg = segment_manager_get_segment_w_lock (sm, ls->svm_segment_index); + if ((rv = client->cb_fns.add_segment_callback (client->api_client_index, + &seg->ssvm))) + { + clib_warning ("failed to notify client %u of new segment", + ls->client_index); + segment_manager_segment_reader_unlock (sm); + application_local_session_disconnect (ls->client_index, ls); + is_fail = 1; + } + else + { + segment_manager_segment_reader_unlock (sm); + } + + client->cb_fns.session_connected_callback (client->index, ls->client_opaque, + (stream_session_t *) ls, + is_fail); + + client_key = application_client_local_connect_key (ls); + hash_set (client->local_connects, client_key, client_key); + return 0; +} + +int +application_local_session_disconnect (u32 app_index, local_session_t * ls) +{ + svm_fifo_segment_private_t *seg; + application_t *client, *server; + segment_manager_t *sm; + uword client_key; + + client = application_get_if_valid (ls->client_index); + server = application_get (ls->app_index); + + if (ls->session_state == SESSION_STATE_CLOSED) + { + cleanup: + client_key = application_client_local_connect_key (ls); + sm = application_get_local_segment_manager_w_session (server, ls); + seg = segment_manager_get_segment (sm, ls->svm_segment_index); + + if (client) + { + hash_unset (client->local_connects, client_key); + client->cb_fns.del_segment_callback (client->api_client_index, + &seg->ssvm); + } + + server->cb_fns.del_segment_callback (server->api_client_index, + &seg->ssvm); + segment_manager_del_segment (sm, seg); + application_free_local_session (server, ls); + return 0; + } + + if (app_index == ls->client_index) + { + send_local_session_disconnect_callback (ls->app_index, ls); + } + else + { + if (!client) + { + goto cleanup; + } + else if (ls->session_state < SESSION_STATE_READY) + { + client->cb_fns.session_connected_callback (client->index, + ls->client_opaque, + (stream_session_t *) ls, + 1 /* is_fail */ ); + ls->session_state = SESSION_STATE_CLOSED; + goto cleanup; + } + else + { + send_local_session_disconnect_callback (ls->client_index, ls); + } + } + + ls->session_state = SESSION_STATE_CLOSED; + + return 0; +} + +void +application_local_sessions_del (application_t * app) +{ + u32 index, server_index, session_index, table_index; + segment_manager_t *sm; + u64 handle, *handles = 0; + local_session_t *ls, *ll; + application_t *server; + session_endpoint_t sep; + int i; + + /* + * Local listens. Don't bother with local sessions, we clean them lower + */ + table_index = application_local_session_table (app); + /* *INDENT-OFF* */ + pool_foreach (ll, app->local_listen_sessions, ({ + application_local_listener_session_endpoint (ll, &sep); + session_lookup_del_session_endpoint (table_index, &sep); + })); + /* *INDENT-ON* */ + + /* + * Local sessions + */ + if (app->local_sessions) + { + /* *INDENT-OFF* */ + pool_foreach (ls, app->local_sessions, ({ + application_local_session_disconnect (app->index, ls); + })); + /* *INDENT-ON* */ + } + + /* + * Local connects + */ + vec_reset_length (handles); + /* *INDENT-OFF* */ + hash_foreach (handle, index, app->local_connects, ({ + vec_add1 (handles, handle); + })); + /* *INDENT-ON* */ + + for (i = 0; i < vec_len (handles); i++) + { + application_client_local_connect_key_parse (handles[i], &server_index, + &session_index); + server = application_get_if_valid (server_index); + if (server) + { + ls = application_get_local_session (server, session_index); + application_local_session_disconnect (app->index, ls); + } + } + + sm = segment_manager_get (app->local_segment_manager); + sm->app_index = SEGMENT_MANAGER_INVALID_APP_INDEX; + segment_manager_del (sm); +} + u8 * format_application_listener (u8 * s, va_list * args) { application_t *app = va_arg (*args, application_t *); u64 handle = va_arg (*args, u64); - u32 index = va_arg (*args, u32); + u32 sm_index = va_arg (*args, u32); int verbose = va_arg (*args, int); stream_session_t *listener; u8 *app_name, *str; @@ -759,7 +1183,7 @@ format_application_listener (u8 * s, va_list * args) if (verbose) { s = format (s, "%-40s%-20s%-15u%-15u%-10u", str, app_name, - app->api_client_index, handle, index); + app->api_client_index, handle, sm_index); } else s = format (s, "%-40s%-20s", str, app_name); @@ -832,6 +1256,76 @@ application_format_connects (application_t * app, int verbose) vec_free (app_name); } +void +application_format_local_sessions (application_t * app, int verbose) +{ + vlib_main_t *vm = vlib_get_main (); + local_session_t *ls; + transport_proto_t tp; + u8 *conn = 0; + + /* Header */ + if (app == 0) + { + vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "ServerApp", + "ClientApp"); + return; + } + + /* *INDENT-OFF* */ + pool_foreach (ls, app->local_listen_sessions, ({ + tp = session_type_transport_proto(ls->listener_session_type); + conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp, + ls->port); + vlib_cli_output (vm, "%-40v%-15u%-20s", conn, ls->app_index, "*"); + vec_reset_length (conn); + })); + pool_foreach (ls, app->local_sessions, ({ + tp = session_type_transport_proto(ls->listener_session_type); + conn = format (0, "[L][%U] *:%u", format_transport_proto_short, tp, + ls->port); + vlib_cli_output (vm, "%-40v%-15u%-20u", conn, ls->app_index, + ls->client_index); + vec_reset_length (conn); + })); + /* *INDENT-ON* */ + + vec_free (conn); +} + +void +application_format_local_connects (application_t * app, int verbose) +{ + vlib_main_t *vm = vlib_get_main (); + u32 app_index, session_index; + application_t *server; + local_session_t *ls; + uword client_key; + u64 value; + + /* Header */ + if (app == 0) + { + if (verbose) + vlib_cli_output (vm, "%-40s%-15s%-20s%-10s", "Connection", "App", + "Peer App", "SegManager"); + else + vlib_cli_output (vm, "%-40s%-15s%-20s", "Connection", "App", + "Peer App"); + return; + } + + /* *INDENT-OFF* */ + hash_foreach (client_key, value, app->local_connects, ({ + application_client_local_connect_key_parse (client_key, &app_index, + &session_index); + server = application_get (app_index); + ls = application_get_local_session (server, session_index); + vlib_cli_output (vm, "%-40s%-15s%-20s", "TODO", ls->app_index, ls->client_index); + })); + /* *INDENT-ON* */ +} + u8 * format_application (u8 * s, va_list * args) { @@ -869,13 +1363,92 @@ format_application (u8 * s, va_list * args) return s; } + +void +application_format_all_listeners (vlib_main_t * vm, int do_local, int verbose) +{ + application_t *app; + u32 sm_index; + u64 handle; + + if (!pool_elts (app_pool)) + { + vlib_cli_output (vm, "No active server bindings"); + return; + } + + if (do_local) + { + application_format_local_sessions (0, verbose); + /* *INDENT-OFF* */ + pool_foreach (app, app_pool, ({ + if (!pool_elts (app->local_sessions) + && !pool_elts(app->local_connects)) + continue; + application_format_local_sessions (app, verbose); + })); + /* *INDENT-ON* */ + } + else + { + vlib_cli_output (vm, "%U", format_application_listener, 0 /* header */ , + 0, 0, verbose); + + /* *INDENT-OFF* */ + pool_foreach (app, app_pool, ({ + if (hash_elts (app->listeners_table) == 0) + continue; + hash_foreach (handle, sm_index, app->listeners_table, ({ + vlib_cli_output (vm, "%U", format_application_listener, app, + handle, sm_index, verbose); + })); + })); + /* *INDENT-ON* */ + } +} + +void +application_format_all_clients (vlib_main_t * vm, int do_local, int verbose) +{ + application_t *app; + + if (!pool_elts (app_pool)) + { + vlib_cli_output (vm, "No active apps"); + return; + } + + if (do_local) + { + application_format_local_connects (0, verbose); + + /* *INDENT-OFF* */ + pool_foreach (app, app_pool, ({ + if (app->local_connects) + application_format_local_connects (app, verbose); + })); + /* *INDENT-ON* */ + } + else + { + application_format_connects (0, verbose); + + /* *INDENT-OFF* */ + pool_foreach (app, app_pool, ({ + if (app->connects_seg_manager == (u32)~0) + continue; + application_format_connects (app, verbose); + })); + /* *INDENT-ON* */ + } +} + static clib_error_t * show_app_command_fn (vlib_main_t * vm, unformat_input_t * input, vlib_cli_command_t * cmd) { + int do_server = 0, do_client = 0, do_local = 0; application_t *app; - int do_server = 0; - int do_client = 0; int verbose = 0; session_cli_return_if_not_enabled (); @@ -886,6 +1459,8 @@ show_app_command_fn (vlib_main_t * vm, unformat_input_t * input, do_server = 1; else if (unformat (input, "client")) do_client = 1; + else if (unformat (input, "local")) + do_local = 1; else if (unformat (input, "verbose")) verbose = 1; else @@ -893,49 +1468,10 @@ show_app_command_fn (vlib_main_t * vm, unformat_input_t * input, } if (do_server) - { - u64 handle; - u32 index; - if (pool_elts (app_pool)) - { - vlib_cli_output (vm, "%U", format_application_listener, - 0 /* header */ , 0, 0, verbose); - - /* *INDENT-OFF* */ - pool_foreach (app, app_pool, ({ - /* App's listener sessions */ - if (hash_elts (app->listeners_table) == 0) - continue; - hash_foreach (handle, index, app->listeners_table, ({ - vlib_cli_output (vm, "%U", format_application_listener, app, - handle, index, verbose); - })); - })); - /* *INDENT-ON* */ - - } - else - vlib_cli_output (vm, "No active server bindings"); - } + application_format_all_listeners (vm, do_local, verbose); if (do_client) - { - if (pool_elts (app_pool)) - { - application_format_connects (0, verbose); - - /* *INDENT-OFF* */ - pool_foreach (app, app_pool, - ({ - if (app->connects_seg_manager == (u32)~0) - continue; - application_format_connects (app, verbose); - })); - /* *INDENT-ON* */ - } - else - vlib_cli_output (vm, "No active client bindings"); - } + application_format_all_clients (vm, do_local, verbose); /* Print app related info */ if (!do_server && !do_client)