session: add support for evt filtering 00/43900/3
authorFlorin Coras <[email protected]>
Thu, 16 Oct 2025 17:07:56 +0000 (13:07 -0400)
committerFlorin Coras <[email protected]>
Thu, 16 Oct 2025 21:20:03 +0000 (17:20 -0400)
Allow per app listener level filtering of event generation.

Type: improvement

Change-Id: Iae3afb77d624e1e00b27b9b97f58360da876ca71
Signed-off-by: Florin Coras <[email protected]>
src/vnet/session/application.c
src/vnet/session/application.h
src/vnet/session/application_eventing.c
src/vnet/session/application_eventing.h
src/vnet/session/session.c

index 71977bc..55b116f 100644 (file)
@@ -931,6 +931,7 @@ application_free (application_t * app)
   if (application_is_builtin (app))
     application_name_table_del (app);
 
+  hash_free (app->evt_collector_session_filter);
   app_crypto_ctx_free (&app->crypto_ctx);
 
   vec_free (app->name);
index 0e0daae..13ba6b6 100644 (file)
@@ -132,7 +132,7 @@ typedef struct application_
   u32 app_index;
 
   /** Flags */
-  u32 flags;
+  app_options_flags_t flags;
 
   /** Callbacks: shoulder-taps for the server/client */
   session_cb_vft_t cb_fns;
@@ -170,6 +170,9 @@ typedef struct application_
   /** collector index, if any */
   u32 evt_collector_index;
 
+  /* collector session filter, if any */
+  uword *evt_collector_session_filter;
+
   /** app crypto state */
   app_crypto_ctx_t crypto_ctx;
 } application_t;
index fe1fa4b..810678e 100644 (file)
@@ -120,7 +120,7 @@ app_evt_collector_log_session (app_evt_collector_t *c, session_t *s)
   if (!tc)
     return;
 
-  cwrk = &c->wrk[s->thread_index];
+  cwrk = app_evt_collector_wrk_get (c, s->thread_index);
   chunk = app_evt_buffer_alloc_chunk (&cwrk->buf);
 
   msg = app_evt_buf_chunk_append_uninit (chunk, sizeof (app_evt_msg_t));
@@ -216,6 +216,10 @@ app_evt_collect_on_session_cleanup (session_t *s)
 
   app_wrk = app_worker_get (s->app_wrk_index);
   app = application_get (app_wrk->app_index);
+  /* If filtering configured, log only if listener found */
+  if (app->evt_collector_session_filter &&
+      !hash_get (app->evt_collector_session_filter, s->listener_handle))
+    return;
   c = app_evt_collector_get (app->evt_collector_index);
   if (PREDICT_FALSE (!c || !c->is_ready))
     return;
@@ -298,7 +302,7 @@ app_evt_collector_connected_callback (u32 app_index, u32 api_context,
       goto check_map;
     }
 
-  cwrk = &c->wrk[s->thread_index];
+  cwrk = app_evt_collector_wrk_get (c, s->thread_index);
   cwrk->session_handle = session_handle (s);
   s->opaque = c->collector_index << 16 | s->thread_index;
   s->session_state = SESSION_STATE_READY;
@@ -347,7 +351,7 @@ app_evt_collector_disconnect_callback (session_t *s)
   c->is_ready = 0;
   CLIB_SPINLOCK_UNLOCK (c->session_map_lock);
 
-  cwrk = &c->wrk[s->thread_index];
+  cwrk = app_evt_collector_wrk_get (c, s->thread_index);
   cwrk->session_handle = SESSION_INVALID_HANDLE;
 
   /* Worker session disconnected, try to reconnect */
@@ -524,7 +528,7 @@ app_evt_collector_enable_command_fn (vlib_main_t *vm, unformat_input_t *input,
   u8 *collector_uri = 0, is_enable = 0, is_add = 1;
   app_evt_main_t *alm = &app_evt_main;
   clib_error_t *error = 0;
-  u32 app_index = ~0;
+  u32 app_index = ~0, ls_index = ~0;
   u64 tmp64 = 0;
 
   if (!unformat_user (input, unformat_line_input, line_input))
@@ -551,6 +555,8 @@ app_evt_collector_enable_command_fn (vlib_main_t *vm, unformat_input_t *input,
        vec_add1 (collector_uri, 0);
       else if (unformat (line_input, "app %d", &app_index))
        ;
+      else if (unformat (line_input, "listener %u", &ls_index))
+       ;
       else if (unformat (line_input, "add"))
        ;
       else if (unformat (line_input, "del"))
@@ -611,11 +617,20 @@ app_evt_collector_enable_command_fn (vlib_main_t *vm, unformat_input_t *input,
        }
       if (!is_add)
        {
+         if (ls_index != ~0)
+           {
+             hash_unset (app->evt_collector_session_filter, ls_index);
+             goto done;
+           }
          app->evt_collector_index = APP_INVALID_INDEX;
          app->cb_fns.app_evt_callback = 0;
+         hash_free (app->evt_collector_session_filter);
          goto done;
        }
       app->cb_fns.app_evt_callback = app_evt_collector_get_cb_fn ();
+      /* listeners are allocated on main thread, so it's enough to use index */
+      if (ls_index != ~0)
+       hash_set (app->evt_collector_session_filter, ls_index, 1);
     }
 
 done:
@@ -627,7 +642,8 @@ done:
 VLIB_CLI_COMMAND (app_evt_collector_command, static) = {
   .path = "app evt-collector",
   .short_help = "app evt-collector [enable] [segment-size <nn>[k|m]] "
-               "[fifo-size <nn>[k|m]] [add|del] [uri <uri>] [app <index>] ",
+               "[fifo-size <nn>[k|m]] [add|del] [uri <uri>] [app <index> "
+               "[listener <index>]] ",
   .function = app_evt_collector_enable_command_fn,
 };
 
index dcd28b5..d4d6ef5 100644 (file)
@@ -216,6 +216,13 @@ typedef struct app_evt_main_
   u32 fifo_size;    /**< fifo size */
 } app_evt_main_t;
 
+static inline app_evt_collector_wrk_t *
+app_evt_collector_wrk_get (app_evt_collector_t *c,
+                          clib_thread_index_t thread_index)
+{
+  return &c->wrk[thread_index];
+}
+
 int app_evt_collector_add (app_evt_collector_cfg_t *cfg);
 app_evt_collector_t *app_evt_collector_get (u32 c_index);
 void *app_evt_collector_get_cb_fn ();
index 031d0a0..71230f2 100644 (file)
@@ -749,6 +749,7 @@ session_stream_connect_notify (transport_connection_t * tc,
   s = session_alloc_for_connection (tc);
   session_set_state (s, SESSION_STATE_CONNECTING);
   s->app_wrk_index = app_wrk->wrk_index;
+  s->listener_handle = SESSION_INVALID_HANDLE;
   s->opaque = opaque;
   new_si = s->session_index;
   new_ti = s->thread_index;
@@ -869,6 +870,7 @@ session_dgram_connect_notify (transport_connection_t *tc,
    */
   new_s = session_clone_safe (tc->s_index, osh.thread_index);
   new_s->connection_index = tc->c_index;
+  new_s->listener_handle = SESSION_INVALID_HANDLE;
   session_set_state (new_s, SESSION_STATE_READY);
   new_s->flags |= SESSION_F_IS_MIGRATING;