From a10cab8ce3c417028e830b6f1fa5847c928159f8 Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Thu, 16 Oct 2025 13:07:56 -0400 Subject: [PATCH] session: add support for evt filtering Allow per app listener level filtering of event generation. Type: improvement Change-Id: Iae3afb77d624e1e00b27b9b97f58360da876ca71 Signed-off-by: Florin Coras --- src/vnet/session/application.c | 1 + src/vnet/session/application.h | 5 ++++- src/vnet/session/application_eventing.c | 26 +++++++++++++++++++++----- src/vnet/session/application_eventing.h | 7 +++++++ src/vnet/session/session.c | 2 ++ 5 files changed, 35 insertions(+), 6 deletions(-) diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 71977bc9b6c..55b116f7386 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -931,6 +931,7 @@ application_free (application_t * app) if (application_is_builtin (app)) application_name_table_del (app); + hash_free (app->evt_collector_session_filter); app_crypto_ctx_free (&app->crypto_ctx); vec_free (app->name); diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h index 0e0daaeb45c..13ba6b61a10 100644 --- a/src/vnet/session/application.h +++ b/src/vnet/session/application.h @@ -132,7 +132,7 @@ typedef struct application_ u32 app_index; /** Flags */ - u32 flags; + app_options_flags_t flags; /** Callbacks: shoulder-taps for the server/client */ session_cb_vft_t cb_fns; @@ -170,6 +170,9 @@ typedef struct application_ /** collector index, if any */ u32 evt_collector_index; + /* collector session filter, if any */ + uword *evt_collector_session_filter; + /** app crypto state */ app_crypto_ctx_t crypto_ctx; } application_t; diff --git a/src/vnet/session/application_eventing.c b/src/vnet/session/application_eventing.c index fe1fa4bdcc9..810678e371b 100644 --- a/src/vnet/session/application_eventing.c +++ b/src/vnet/session/application_eventing.c @@ -120,7 +120,7 @@ app_evt_collector_log_session (app_evt_collector_t *c, session_t *s) if (!tc) return; - cwrk = &c->wrk[s->thread_index]; + cwrk = app_evt_collector_wrk_get (c, s->thread_index); chunk = app_evt_buffer_alloc_chunk (&cwrk->buf); msg = app_evt_buf_chunk_append_uninit (chunk, sizeof (app_evt_msg_t)); @@ -216,6 +216,10 @@ app_evt_collect_on_session_cleanup (session_t *s) app_wrk = app_worker_get (s->app_wrk_index); app = application_get (app_wrk->app_index); + /* If filtering configured, log only if listener found */ + if (app->evt_collector_session_filter && + !hash_get (app->evt_collector_session_filter, s->listener_handle)) + return; c = app_evt_collector_get (app->evt_collector_index); if (PREDICT_FALSE (!c || !c->is_ready)) return; @@ -298,7 +302,7 @@ app_evt_collector_connected_callback (u32 app_index, u32 api_context, goto check_map; } - cwrk = &c->wrk[s->thread_index]; + cwrk = app_evt_collector_wrk_get (c, s->thread_index); cwrk->session_handle = session_handle (s); s->opaque = c->collector_index << 16 | s->thread_index; s->session_state = SESSION_STATE_READY; @@ -347,7 +351,7 @@ app_evt_collector_disconnect_callback (session_t *s) c->is_ready = 0; CLIB_SPINLOCK_UNLOCK (c->session_map_lock); - cwrk = &c->wrk[s->thread_index]; + cwrk = app_evt_collector_wrk_get (c, s->thread_index); cwrk->session_handle = SESSION_INVALID_HANDLE; /* Worker session disconnected, try to reconnect */ @@ -524,7 +528,7 @@ app_evt_collector_enable_command_fn (vlib_main_t *vm, unformat_input_t *input, u8 *collector_uri = 0, is_enable = 0, is_add = 1; app_evt_main_t *alm = &app_evt_main; clib_error_t *error = 0; - u32 app_index = ~0; + u32 app_index = ~0, ls_index = ~0; u64 tmp64 = 0; if (!unformat_user (input, unformat_line_input, line_input)) @@ -551,6 +555,8 @@ app_evt_collector_enable_command_fn (vlib_main_t *vm, unformat_input_t *input, vec_add1 (collector_uri, 0); else if (unformat (line_input, "app %d", &app_index)) ; + else if (unformat (line_input, "listener %u", &ls_index)) + ; else if (unformat (line_input, "add")) ; else if (unformat (line_input, "del")) @@ -611,11 +617,20 @@ app_evt_collector_enable_command_fn (vlib_main_t *vm, unformat_input_t *input, } if (!is_add) { + if (ls_index != ~0) + { + hash_unset (app->evt_collector_session_filter, ls_index); + goto done; + } app->evt_collector_index = APP_INVALID_INDEX; app->cb_fns.app_evt_callback = 0; + hash_free (app->evt_collector_session_filter); goto done; } app->cb_fns.app_evt_callback = app_evt_collector_get_cb_fn (); + /* listeners are allocated on main thread, so it's enough to use index */ + if (ls_index != ~0) + hash_set (app->evt_collector_session_filter, ls_index, 1); } done: @@ -627,7 +642,8 @@ done: VLIB_CLI_COMMAND (app_evt_collector_command, static) = { .path = "app evt-collector", .short_help = "app evt-collector [enable] [segment-size [k|m]] " - "[fifo-size [k|m]] [add|del] [uri ] [app ] ", + "[fifo-size [k|m]] [add|del] [uri ] [app " + "[listener ]] ", .function = app_evt_collector_enable_command_fn, }; diff --git a/src/vnet/session/application_eventing.h b/src/vnet/session/application_eventing.h index dcd28b5fe43..d4d6ef5fb31 100644 --- a/src/vnet/session/application_eventing.h +++ b/src/vnet/session/application_eventing.h @@ -216,6 +216,13 @@ typedef struct app_evt_main_ u32 fifo_size; /**< fifo size */ } app_evt_main_t; +static inline app_evt_collector_wrk_t * +app_evt_collector_wrk_get (app_evt_collector_t *c, + clib_thread_index_t thread_index) +{ + return &c->wrk[thread_index]; +} + int app_evt_collector_add (app_evt_collector_cfg_t *cfg); app_evt_collector_t *app_evt_collector_get (u32 c_index); void *app_evt_collector_get_cb_fn (); diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 031d0a0e49b..71230f2eac7 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -749,6 +749,7 @@ session_stream_connect_notify (transport_connection_t * tc, s = session_alloc_for_connection (tc); session_set_state (s, SESSION_STATE_CONNECTING); s->app_wrk_index = app_wrk->wrk_index; + s->listener_handle = SESSION_INVALID_HANDLE; s->opaque = opaque; new_si = s->session_index; new_ti = s->thread_index; @@ -869,6 +870,7 @@ session_dgram_connect_notify (transport_connection_t *tc, */ new_s = session_clone_safe (tc->s_index, osh.thread_index); new_s->connection_index = tc->c_index; + new_s->listener_handle = SESSION_INVALID_HANDLE; session_set_state (new_s, SESSION_STATE_READY); new_s->flags |= SESSION_F_IS_MIGRATING; -- 2.16.6