X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvnet%2Fsession%2Fapplication.c;h=3e127df3bd2eb14c5997204ee4d9fce0cbab4cfd;hb=993683150202254c6ba8dd43e087a7229edd5d4c;hp=14169fab83dfa08262f910281fbbcb2540c4abfb;hpb=58d36f02b45c5af38b7df81fb7976129cad3e05b;p=vpp.git diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 14169fab83d..3e127df3bd2 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -28,6 +28,11 @@ static application_t *app_pool; */ static uword *app_by_api_client_index; +/** + * Hash table of builtin apps by name + */ +static uword *app_by_name; + static u8 * app_get_name_from_reg_index (application_t * app) { @@ -43,6 +48,14 @@ app_get_name_from_reg_index (application_t * app) return app_name; } +static u8 * +app_get_name (application_t * app) +{ + if (!app->name) + return app_get_name_from_reg_index (app); + return app->name; +} + u32 application_session_table (application_t * app, u8 fib_proto) { @@ -104,13 +117,19 @@ application_name_from_index (u32 app_index) static void application_table_add (application_t * app) { - hash_set (app_by_api_client_index, app->api_client_index, app->index); + if (app->api_client_index != APP_INVALID_INDEX) + hash_set (app_by_api_client_index, app->api_client_index, app->index); + else if (app->name) + hash_set_mem (app_by_name, app->name, app->index); } static void application_table_del (application_t * app) { - hash_unset (app_by_api_client_index, app->api_client_index); + if (app->api_client_index != APP_INVALID_INDEX) + hash_unset (app_by_api_client_index, app->api_client_index); + else if (app->name) + hash_unset_mem (app_by_name, app->name); } application_t * @@ -124,6 +143,17 @@ application_lookup (u32 api_client_index) return 0; } +application_t * +application_lookup_name (const u8 * name) +{ + uword *p; + p = hash_get_mem (app_by_name, name); + if (p) + return application_get (p[0]); + + return 0; +} + application_t * application_new () { @@ -213,6 +243,7 @@ application_del (application_t * app) vec_free (app->tls_key); application_table_del (app); + vec_free (app->name); pool_put (app_pool, app); } @@ -257,8 +288,8 @@ application_verify_cfg (ssvm_segment_type_t st) } int -application_init (application_t * app, u32 api_client_index, u64 * options, - session_cb_vft_t * cb_fns) +application_init (application_t * app, u32 api_client_index, u8 * app_name, + u64 * options, session_cb_vft_t * cb_fns) { ssvm_segment_type_t seg_type = SSVM_SEGMENT_MEMFD; u32 first_seg_size, prealloc_fifo_pairs; @@ -281,6 +312,12 @@ application_init (application_t * app, u32 api_client_index, u64 * options, } else { + if (options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD) + { + clib_warning ("mq eventfds can only be used if socket transport is " + "used for api"); + return VNET_API_ERROR_APP_UNSUPPORTED_CFG; + } seg_type = SSVM_SEGMENT_PRIVATE; } @@ -305,6 +342,8 @@ application_init (application_t * app, u32 api_client_index, u64 * options, props->tx_fifo_size = options[APP_OPTIONS_TX_FIFO_SIZE]; if (options[APP_OPTIONS_EVT_QUEUE_SIZE]) props->evt_q_size = options[APP_OPTIONS_EVT_QUEUE_SIZE]; + if (options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD) + props->use_mq_eventfd = 1; if (options[APP_OPTIONS_TLS_ENGINE]) app->tls_engine = options[APP_OPTIONS_TLS_ENGINE]; props->segment_type = seg_type; @@ -328,6 +367,7 @@ application_init (application_t * app, u32 api_client_index, u64 * options, app->local_connects = hash_create (0, sizeof (u64)); app->proxied_transports = options[APP_OPTIONS_PROXY_TRANSPORT]; app->event_queue = segment_manager_event_queue (sm); + app->name = vec_dup (app_name); /* If no scope enabled, default to global */ if (!application_has_global_scope (app) @@ -410,16 +450,12 @@ application_start_listen (application_t * srv, session_endpoint_t * sep, session_type_t sst; sst = session_type_from_proto_and_ip (sep->transport_proto, sep->is_ip4); - s = listen_session_new (sst); + s = listen_session_new (0, sst); s->app_index = srv->index; - if (stream_session_listen (s, sep)) - goto err; - /* Allocate segment manager. All sessions derived out of a listen session * have fifos allocated by the same segment manager. */ - sm = application_alloc_segment_manager (srv); - if (sm == 0) + if (!(sm = application_alloc_segment_manager (srv))) goto err; /* Add to app's listener table. Useful to find all child listeners @@ -427,6 +463,13 @@ application_start_listen (application_t * srv, session_endpoint_t * sep, handle = listen_session_get_handle (s); hash_set (srv->listeners_table, handle, segment_manager_index (sm)); + if (stream_session_listen (s, sep)) + { + segment_manager_del (sm); + hash_unset (srv->listeners_table, handle); + goto err; + } + *res = handle; return 0; @@ -534,8 +577,7 @@ application_get_local_segment_manager_w_session (application_t * app, stream_session_t *listener; if (application_local_session_listener_has_transport (ls)) { - listener = listen_session_get (ls->listener_session_type, - ls->listener_index); + listener = listen_session_get (ls->listener_index); return application_get_listen_segment_manager (app, listener); } return segment_manager_get (app->local_segment_manager); @@ -773,6 +815,146 @@ application_get_segment_manager_properties (u32 app_index) return &app->sm_properties; } +static inline int +app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock) +{ + if (PREDICT_FALSE (svm_msg_q_is_full (mq))) + { + clib_warning ("evt q full"); + svm_msg_q_free_msg (mq, msg); + if (lock) + svm_msg_q_unlock (mq); + return -1; + } + + if (lock) + { + svm_msg_q_add_and_unlock (mq, msg); + return 0; + } + + /* Even when not locking the ring, we must wait for queue mutex */ + if (svm_msg_q_add (mq, msg, SVM_Q_WAIT)) + { + clib_warning ("msg q add returned"); + return -1; + } + return 0; +} + +static inline int +app_send_io_evt_rx (application_t * app, stream_session_t * s, u8 lock) +{ + session_event_t *evt; + svm_msg_q_msg_t msg; + svm_msg_q_t *mq; + + if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY + && s->session_state != SESSION_STATE_LISTENING)) + { + /* Session is closed so app will never clean up. Flush rx fifo */ + if (s->session_state == SESSION_STATE_CLOSED) + svm_fifo_dequeue_drop_all (s->server_rx_fifo); + return 0; + } + + if (app->cb_fns.builtin_app_rx_callback) + return app->cb_fns.builtin_app_rx_callback (s); + + if (svm_fifo_has_event (s->server_rx_fifo) + || svm_fifo_is_empty (s->server_rx_fifo)) + return 0; + + mq = app->event_queue; + if (lock) + svm_msg_q_lock (mq); + + if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))) + { + clib_warning ("evt q rings full"); + if (lock) + svm_msg_q_unlock (mq); + return -1; + } + + msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); + ASSERT (!svm_msg_q_msg_is_invalid (&msg)); + + evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg); + evt->fifo = s->server_rx_fifo; + evt->event_type = FIFO_EVENT_APP_RX; + + if (app_enqueue_evt (mq, &msg, lock)) + return -1; + (void) svm_fifo_set_event (s->server_rx_fifo); + return 0; +} + +static inline int +app_send_io_evt_tx (application_t * app, stream_session_t * s, u8 lock) +{ + svm_msg_q_t *mq; + session_event_t *evt; + svm_msg_q_msg_t msg; + + if (application_is_builtin (app)) + return 0; + + mq = app->event_queue; + if (lock) + svm_msg_q_lock (mq); + + if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))) + { + clib_warning ("evt q rings full"); + if (lock) + svm_msg_q_unlock (mq); + return -1; + } + + msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING); + ASSERT (!svm_msg_q_msg_is_invalid (&msg)); + + evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg); + evt->event_type = FIFO_EVENT_APP_TX; + evt->fifo = s->server_tx_fifo; + + return app_enqueue_evt (mq, &msg, lock); +} + +/* *INDENT-OFF* */ +typedef int (app_send_evt_handler_fn) (application_t *app, + stream_session_t *s, + u8 lock); +static app_send_evt_handler_fn * const app_send_evt_handler_fns[3] = { + app_send_io_evt_rx, + 0, + app_send_io_evt_tx, +}; +/* *INDENT-ON* */ + +/** + * Send event to application + * + * Logic from queue perspective is non-blocking. That is, if there's + * not enough space to enqueue a message, we return. However, if the lock + * flag is set, we do wait for queue mutex. + */ +int +application_send_event (application_t * app, stream_session_t * s, + u8 evt_type) +{ + ASSERT (app && evt_type <= FIFO_EVENT_APP_TX); + return app_send_evt_handler_fns[evt_type] (app, s, 0 /* lock */ ); +} + +int +application_lock_and_send_event (application_t * app, stream_session_t * s, + u8 evt_type) +{ + return app_send_evt_handler_fns[evt_type] (app, s, 1 /* lock */ ); +} + local_session_t * application_alloc_local_session (application_t * app) { @@ -796,6 +978,8 @@ application_free_local_session (application_t * app, local_session_t * s) local_session_t * application_get_local_session (application_t * app, u32 session_index) { + if (pool_is_free_index (app->local_sessions, session_index)) + return 0; return pool_elt_at_index (app->local_sessions, session_index); } @@ -904,6 +1088,23 @@ application_stop_local_listen (application_t * server, session_handle_t lh) return 0; } +static void +application_local_session_fix_eventds (svm_msg_q_t * sq, svm_msg_q_t * cq) +{ + int fd; + + /* + * segment manager initializes only the producer eventds, since vpp is + * typically the producer. But for local sessions, we also pass to the + * apps the mqs they listen on for events from peer apps, so they are also + * consumer fds. + */ + fd = svm_msg_q_get_producer_eventfd (sq); + svm_msg_q_set_consumer_eventfd (sq, fd); + fd = svm_msg_q_get_producer_eventfd (cq); + svm_msg_q_set_consumer_eventfd (cq, fd); +} + int application_local_session_connect (u32 table_index, application_t * client, application_t * server, @@ -911,19 +1112,22 @@ application_local_session_connect (u32 table_index, application_t * client, { u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10; segment_manager_properties_t *props, *cprops; + u32 round_rx_fifo_sz, round_tx_fifo_sz; int rv, has_transport, seg_index; svm_fifo_segment_private_t *seg; segment_manager_t *sm; local_session_t *ls; - svm_queue_t *sq, *cq; + svm_msg_q_t *sq, *cq; ls = application_alloc_local_session (server); props = application_segment_manager_properties (server); cprops = application_segment_manager_properties (client); evt_q_elts = props->evt_q_size + cprops->evt_q_size; - evt_q_sz = evt_q_elts * sizeof (session_fifo_event_t); - seg_size = props->rx_fifo_size + props->tx_fifo_size + evt_q_sz + margin; + evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts); + round_rx_fifo_sz = 1 << max_log2 (props->rx_fifo_size); + round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size); + seg_size = round_rx_fifo_sz + round_tx_fifo_sz + evt_q_sz + margin; has_transport = session_has_transport ((stream_session_t *) ll); if (!has_transport) @@ -948,8 +1152,12 @@ application_local_session_connect (u32 table_index, application_t * client, return seg_index; } seg = segment_manager_get_segment_w_lock (sm, seg_index); - sq = segment_manager_alloc_queue (seg, props->evt_q_size); - cq = segment_manager_alloc_queue (seg, cprops->evt_q_size); + sq = segment_manager_alloc_queue (seg, props); + cq = segment_manager_alloc_queue (seg, cprops); + + if (props->use_mq_eventfd) + application_local_session_fix_eventds (sq, cq); + ls->server_evt_q = pointer_to_uword (sq); ls->client_evt_q = pointer_to_uword (cq); rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size, @@ -1047,46 +1255,62 @@ application_local_session_connect_notify (local_session_t * ls) } int -application_local_session_disconnect (u32 app_index, local_session_t * ls) +application_local_session_cleanup (application_t * client, + application_t * server, + local_session_t * ls) { svm_fifo_segment_private_t *seg; - application_t *client, *server; segment_manager_t *sm; uword client_key; + u8 has_transport; - client = application_get_if_valid (ls->client_index); - server = application_get (ls->app_index); - - if (ls->session_state == SESSION_STATE_CLOSED) - { - cleanup: - client_key = application_client_local_connect_key (ls); - sm = application_get_local_segment_manager_w_session (server, ls); - seg = segment_manager_get_segment (sm, ls->svm_segment_index); + has_transport = session_has_transport ((stream_session_t *) ls); + client_key = application_client_local_connect_key (ls); + if (!has_transport) + sm = application_get_local_segment_manager_w_session (server, ls); + else + sm = application_get_listen_segment_manager (server, + (stream_session_t *) ls); - if (client) - { - hash_unset (client->local_connects, client_key); - client->cb_fns.del_segment_callback (client->api_client_index, - &seg->ssvm); - } + seg = segment_manager_get_segment (sm, ls->svm_segment_index); + if (client) + hash_unset (client->local_connects, client_key); + if (!has_transport) + { server->cb_fns.del_segment_callback (server->api_client_index, &seg->ssvm); + if (client) + client->cb_fns.del_segment_callback (client->api_client_index, + &seg->ssvm); segment_manager_del_segment (sm, seg); - application_free_local_session (server, ls); - return 0; } + application_free_local_session (server, ls); + + return 0; +} + +int +application_local_session_disconnect (u32 app_index, local_session_t * ls) +{ + application_t *client, *server; + + client = application_get_if_valid (ls->client_index); + server = application_get (ls->app_index); + + if (ls->session_state == SESSION_STATE_CLOSED) + return application_local_session_cleanup (client, server, ls); + if (app_index == ls->client_index) { - send_local_session_disconnect_callback (ls->app_index, ls); + mq_send_local_session_disconnected_cb (ls->app_index, ls); } else { if (!client) { - goto cleanup; + return application_local_session_cleanup (client, server, ls); } else if (ls->session_state < SESSION_STATE_READY) { @@ -1095,11 +1319,11 @@ application_local_session_disconnect (u32 app_index, local_session_t * ls) (stream_session_t *) ls, 1 /* is_fail */ ); ls->session_state = SESSION_STATE_CLOSED; - goto cleanup; + return application_local_session_cleanup (client, server, ls); } else { - send_local_session_disconnect_callback (ls->client_index, ls); + mq_send_local_session_disconnected_cb (client->index, ls); } } @@ -1108,6 +1332,16 @@ application_local_session_disconnect (u32 app_index, local_session_t * ls) return 0; } +int +application_local_session_disconnect_w_index (u32 app_index, u32 ls_index) +{ + application_t *app; + local_session_t *ls; + app = application_get (app_index); + ls = application_get_local_session (app, ls_index); + return application_local_session_disconnect (app_index, ls); +} + void application_local_sessions_del (application_t * app) { @@ -1377,26 +1611,25 @@ format_application (u8 * s, va_list * args) { if (verbose) s = format (s, "%-10s%-20s%-15s%-15s%-15s%-15s%-15s", "Index", "Name", - "API Client", "Namespace", "Add seg size", "Rx fifo size", - "Tx fifo size"); + "API Client", "Namespace", "Add seg size", "Rx-f size", + "Tx-f size"); else - s = - format (s, "%-10s%-20s%-15s%-40s", "Index", "Name", "API Client", - "Namespace"); + s = format (s, "%-10s%-20s%-15s%-40s", "Index", "Name", "API Client", + "Namespace"); return s; } - app_name = app_get_name_from_reg_index (app); + app_name = app_get_name (app); app_ns_name = app_namespace_id_from_index (app->ns_index); props = application_segment_manager_properties (app); if (verbose) - s = - format (s, "%-10d%-20s%-15d%-15d%-15d%-15d%-15d", app->index, app_name, - app->api_client_index, app->ns_index, - props->add_segment_size, - props->rx_fifo_size, props->tx_fifo_size); + s = format (s, "%-10u%-20s%-15d%-15u%-15U%-15U%-15U", app->index, + app_name, app->api_client_index, app->ns_index, + format_memory_size, props->add_segment_size, + format_memory_size, props->rx_fifo_size, format_memory_size, + props->tx_fifo_size); else - s = format (s, "%-10d%-20s%-15d%-40s", app->index, app_name, + s = format (s, "%-10u%-20s%-15d%-40s", app->index, app_name, app->api_client_index, app_ns_name); return s; }