session: add session eventing infra for apps 48/42848/32
authorFlorin Coras <[email protected]>
Wed, 23 Apr 2025 04:49:14 +0000 (00:49 -0400)
committerFlorin Coras <[email protected]>
Mon, 5 May 2025 21:00:39 +0000 (17:00 -0400)
Basic framework/supporting infrastructure for now.

Start evt app with default collector with:
 app evt-collector enable [uri <uri>]

Add/del collectors (only one supported for now) with:
 app evt-collector add <uri>

External applications can request eventing to default collector. Builtin
applications can also provide custom eventing functions.

Type: feature

Change-Id: I3547bfc9b258b33a4e8c60c161de75c21533b7f1
Signed-off-by: Florin Coras <[email protected]>
src/vnet/CMakeLists.txt
src/vnet/session/application.c
src/vnet/session/application.h
src/vnet/session/application_eventing.c [new file with mode: 0644]
src/vnet/session/application_eventing.h [new file with mode: 0644]
src/vnet/session/application_interface.h
src/vnet/session/session_input.c

index 5c9c5cc..14c9110 100644 (file)
@@ -974,6 +974,7 @@ list(APPEND VNET_SOURCES
   session/application.c
   session/application_worker.c
   session/session_cli.c
+  session/application_eventing.c
   session/application_interface.c
   session/application_local.c
   session/application_namespace.c
@@ -992,6 +993,7 @@ list(APPEND VNET_HEADERS
   session/transport.h
   session/transport_types.h
   session/application_interface.h
+  session/application_eventing.h
   session/application_local.h
   session/application_namespace.h
   session/session_debug.h
index 1a2509e..1bfce79 100644 (file)
@@ -17,6 +17,7 @@
 #include <vnet/session/application_interface.h>
 #include <vnet/session/application_namespace.h>
 #include <vnet/session/application_local.h>
+#include <vnet/session/application_eventing.h>
 #include <vnet/session/session.h>
 #include <vnet/session/segment_manager.h>
 
@@ -855,6 +856,9 @@ application_alloc_and_init (app_init_args_t *a)
     props->pct_first_alloc = opts[APP_OPTIONS_PCT_FIRST_ALLOC];
   props->segment_type = seg_type;
 
+  if (opts[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_LOG_COLLECTOR)
+    app->cb_fns.app_evt_callback = app_evt_collector_get_cb_fn ();
+
   /* Add app to lookup by api_client_index table */
   if (!application_is_builtin (app))
     application_api_table_add (app->app_index, a->api_client_index);
index d748eae..2d605c3 100644 (file)
@@ -166,6 +166,9 @@ typedef struct application_
    * linked list maintained by the app sublayer for each worker
    */
   app_rx_mq_elt_t *rx_mqs;
+
+  /** collector index, if any */
+  u32 evt_collector_index;
 } application_t;
 
 typedef struct app_rx_mq_handle_
diff --git a/src/vnet/session/application_eventing.c b/src/vnet/session/application_eventing.c
new file mode 100644 (file)
index 0000000..e698d1c
--- /dev/null
@@ -0,0 +1,677 @@
+/* SPDX-License-Identifier: Apache-2.0
+ * Copyright (c) 2025 Cisco Systems, Inc.
+ */
+
+#include <vnet/session/application_eventing.h>
+#include <vnet/session/application_local.h>
+#include <vnet/session/session.h>
+#include <vnet/udp/udp.h>
+
+app_evt_main_t app_evt_main = { .app_index = APP_INVALID_INDEX };
+
+void
+app_evt_buffer_append_chunk (app_evt_buffer_t *buf,
+                            app_evt_buffer_chunk_t *chunk)
+{
+  app_evt_buffer_chunk_t *tail;
+  buf->len += chunk->len;
+
+  if (buf->tail_chunk == ~0)
+    {
+      buf->head_chunk = chunk->chunk_index;
+      buf->tail_chunk = chunk->chunk_index;
+      return;
+    }
+  tail = app_evt_buffer_get_chunk (buf, buf->tail_chunk);
+  tail->next_index = chunk->chunk_index;
+  buf->tail_chunk = chunk->chunk_index;
+}
+
+void
+app_evt_collector_wrk_send (app_evt_collector_wrk_t *cwrk)
+{
+  u32 max_enq, to_send = 0, next_c;
+  app_evt_buffer_chunk_t *c;
+  svm_fifo_seg_t *seg;
+  session_t *cs;
+  int wrote;
+
+  cs = session_get_from_handle_if_valid (cwrk->session_handle);
+  if (!cs)
+    {
+      clib_warning ("session not found");
+      return;
+    }
+  max_enq = svm_fifo_max_enqueue_prod (cs->tx_fifo);
+
+  if (!max_enq)
+    {
+      svm_fifo_add_want_deq_ntf (cs->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
+      return;
+    }
+
+  c = app_evt_buffer_get_chunk (&cwrk->buf, cwrk->buf.head_chunk);
+  while (c)
+    {
+      if (c->len + to_send > max_enq)
+       break;
+
+      to_send += c->len;
+      vec_add2 (cwrk->segs, seg, 1);
+      seg->data = c->data;
+      seg->len = c->len;
+
+      c = app_evt_buffer_get_chunk (&cwrk->buf, c->next_index);
+    }
+
+  if (session_has_transport (cs))
+    {
+      wrote = svm_fifo_enqueue_segments (
+       cs->tx_fifo, cwrk->segs, vec_len (cwrk->segs), 0 /* allow partial*/);
+    }
+  else
+    {
+      /* Special handling of client cut-throughs */
+      ct_connection_t *cct;
+
+      cct = (ct_connection_t *) session_get_transport (cs);
+      wrote =
+       svm_fifo_enqueue_segments (cct->client_tx_fifo, cwrk->segs,
+                                  vec_len (cwrk->segs), 0 /* allow partial*/);
+    }
+
+  if (wrote > 0 && svm_fifo_set_event (cs->tx_fifo))
+    session_program_tx_io_evt (cs->handle, SESSION_IO_EVT_TX);
+
+  cwrk->buf.len -= wrote > 0 ? wrote : 0;
+
+  next_c = cwrk->buf.head_chunk;
+  while (wrote > 0)
+    {
+      c = app_evt_buffer_get_chunk (&cwrk->buf, next_c);
+      next_c = c->next_index;
+      ASSERT (wrote >= c->len);
+      wrote -= c->len;
+      app_evt_buffer_free_chunk (&cwrk->buf, c);
+    }
+  ASSERT (wrote == 0);
+  cwrk->buf.head_chunk = next_c;
+  if (cwrk->buf.head_chunk == ~0)
+    cwrk->buf.tail_chunk = ~0;
+
+  vec_reset_length (cwrk->segs);
+  if (cwrk->buf.len)
+    svm_fifo_add_want_deq_ntf (cs->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
+
+  return;
+}
+
+static void
+app_evt_collector_log_session (app_evt_collector_t *c, session_t *s)
+{
+  app_evt_msg_data_session_stats_t *sess_stats;
+  app_evt_collector_wrk_t *cwrk;
+  app_evt_buffer_chunk_t *chunk;
+  app_evt_msg_data_t *data_msg;
+  transport_connection_t *tc;
+  app_evt_msg_t *msg;
+
+  tc = session_get_transport (s);
+  if (!tc)
+    return;
+
+  cwrk = &c->wrk[s->thread_index];
+  chunk = app_evt_buffer_alloc_chunk (&cwrk->buf);
+
+  msg = app_evt_buf_chunk_append_uninit (chunk, sizeof (app_evt_msg_t));
+  msg->msg_type = APP_EVT_MSG_DATA;
+
+  data_msg =
+    app_evt_buf_chunk_append_uninit (chunk, sizeof (app_evt_msg_data_t));
+  data_msg->data_type = APP_EVT_MSG_DATA_SESSION_STATS;
+
+  sess_stats = app_evt_buf_chunk_append_uninit (
+    chunk, sizeof (app_evt_msg_data_session_stats_t));
+  sess_stats->transport_proto_type = tc->proto;
+
+  switch (tc->proto)
+    {
+    case TRANSPORT_PROTO_TCP:
+      {
+       tcp_connection_t *tcp_conn = (tcp_connection_t *) tc;
+       tcp_session_stats_t *tcp_stats = app_evt_buf_chunk_append_uninit (
+         chunk, sizeof (tcp_session_stats_t));
+       sess_stats->msg_len = sizeof (app_evt_msg_data_session_stats_t) +
+                             sizeof (tcp_session_stats_t);
+       clib_memcpy_fast (tcp_stats->conn_id, tc->opaque_conn_id,
+                         sizeof (tc->opaque_conn_id));
+       tcp_stats->end_ts = transport_time_now (s->thread_index);
+
+#define _(type, name) tcp_stats->name = tcp_conn->name;
+       foreach_tcp_transport_stat
+#undef _
+      }
+      break;
+    case TRANSPORT_PROTO_UDP:
+      {
+       udp_connection_t *udp_conn = (udp_connection_t *) tc;
+       udp_session_stats_t *udp_stats = app_evt_buf_chunk_append_uninit (
+         chunk, sizeof (udp_session_stats_t));
+       sess_stats->msg_len = sizeof (app_evt_msg_data_session_stats_t) +
+                             sizeof (udp_session_stats_t);
+       clib_memcpy_fast (udp_stats->conn_id, tc->opaque_conn_id,
+                         sizeof (tc->opaque_conn_id));
+       udp_stats->end_ts = transport_time_now (s->thread_index);
+
+#define _(type, name) udp_stats->name = udp_conn->name;
+       foreach_udp_transport_stat
+#undef _
+      }
+      break;
+    case TRANSPORT_PROTO_CT:
+      {
+       ct_connection_t *ct_conn = (ct_connection_t *) tc;
+       ct_session_stats_t *ct_stats =
+         app_evt_buf_chunk_append_uninit (chunk, sizeof (ct_session_stats_t));
+       sess_stats->msg_len = sizeof (app_evt_msg_data_session_stats_t) +
+                             sizeof (ct_session_stats_t);
+       clib_memcpy_fast (ct_stats->conn_id, tc->opaque_conn_id,
+                         sizeof (tc->opaque_conn_id));
+       ct_stats->actual_proto = ct_conn->actual_tp;
+       ct_stats->end_ts = transport_time_now (s->thread_index);
+      }
+      break;
+    default:
+      break;
+    };
+
+  data_msg->msg_len = sizeof (app_evt_msg_data_t) + sess_stats->msg_len;
+  msg->msg_len = sizeof (app_evt_msg_t) + data_msg->msg_len;
+
+  app_evt_buffer_append_chunk (&cwrk->buf, chunk);
+  app_evt_collector_wrk_send (cwrk);
+}
+
+app_evt_collector_t *
+app_evt_collector_get (u32 c_index)
+{
+  app_evt_main_t *alm = &app_evt_main;
+  if (pool_is_free_index (alm->collectors, c_index))
+    return 0;
+  return pool_elt_at_index (alm->collectors, c_index);
+}
+
+static void
+app_evt_collect_on_session_cleanup (session_t *s)
+{
+  app_evt_collector_t *c;
+  app_worker_t *app_wrk;
+  application_t *app;
+
+  app_wrk = app_worker_get (s->app_wrk_index);
+  app = application_get (app_wrk->app_index);
+  c = app_evt_collector_get (app->evt_collector_index);
+  if (PREDICT_FALSE (!c || !c->is_ready))
+    return;
+  app_evt_collector_log_session (c, s);
+}
+
+void *
+app_evt_collector_get_cb_fn (void)
+{
+  app_evt_main_t *alm = &app_evt_main;
+
+  if (alm->app_index == APP_INVALID_INDEX)
+    return 0;
+
+  return app_evt_collect_on_session_cleanup;
+}
+
+static void
+alc_more_connects_cb_fn (void *arg)
+{
+  app_evt_main_t *alm = &app_evt_main;
+  vnet_connect_args_t _a = {}, *a = &_a;
+  u32 c_index = pointer_to_uword (arg);
+  app_evt_collector_t *c;
+  int rv;
+
+  c = app_evt_collector_get (c_index);
+  a->sep_ext = c->cfg.sep;
+  a->app_index = alm->app_index;
+  a->api_context = c->collector_index;
+
+  if ((rv = vnet_connect (a)))
+    {
+      clib_warning ("could not connect session for collector %u: %U", c_index,
+                   format_session_error, rv);
+      return;
+    }
+}
+
+static void
+app_evt_collector_program_connect (u32 c_index)
+{
+  u32 connects_thread = transport_cl_thread ();
+
+  session_send_rpc_evt_to_thread_force (connects_thread,
+                                       alc_more_connects_cb_fn,
+                                       uword_to_pointer (c_index, void *));
+}
+
+static int
+app_evt_collector_connected_callback (u32 app_index, u32 api_context,
+                                     session_t *s, session_error_t err)
+{
+  app_evt_main_t *alm = &app_evt_main;
+  app_evt_collector_wrk_t *cwrk;
+  u32 session_map, num_workers;
+  app_evt_collector_t *c;
+
+  c = app_evt_collector_get (api_context);
+  if (!c)
+    {
+      clib_warning ("app_evt_collector_connected_callback: "
+                   "invalid collector index %u",
+                   api_context);
+      return -1;
+    }
+
+  CLIB_SPINLOCK_LOCK (c->session_map_lock);
+  session_map = c->session_map;
+  CLIB_SPINLOCK_UNLOCK (c->session_map_lock);
+
+  if (err)
+    goto check_map;
+
+  /* Already have a session */
+  if (session_map & (1 << s->thread_index))
+    {
+      vnet_disconnect_args_t a = { session_handle (s), alm->app_index };
+      vnet_disconnect_session (&a);
+      goto check_map;
+    }
+
+  cwrk = &c->wrk[s->thread_index];
+  cwrk->session_handle = session_handle (s);
+  s->opaque = c->collector_index << 16 | s->thread_index;
+  s->session_state = SESSION_STATE_READY;
+
+  CLIB_SPINLOCK_LOCK (c->session_map_lock);
+  c->session_map |= 1 << s->thread_index;
+  session_map = c->session_map;
+  CLIB_SPINLOCK_UNLOCK (c->session_map_lock);
+
+check_map:
+
+  num_workers = vlib_num_workers ();
+
+  /* If no workers and we have a session, accept it */
+  if (!num_workers && (session_map != 0))
+    return 0;
+
+  /* If not all threads apart from 0 (main) are set
+   * then we need to connect more sessions */
+  if (session_map != (1 << (num_workers + 1)) - 2)
+    app_evt_collector_program_connect (c->collector_index);
+  else
+    c->is_ready = 1;
+
+  return 0;
+}
+
+static int
+app_evt_collector_accept_callback (session_t *s)
+{
+  clib_warning ("not implemented");
+  return -1;
+}
+
+static void
+app_evt_collector_disconnect_callback (session_t *s)
+{
+  app_evt_collector_t *c = app_evt_collector_get (s->opaque >> 16);
+  vnet_disconnect_args_t a = { session_handle (s), app_evt_main.app_index };
+  app_evt_collector_wrk_t *cwrk;
+
+  vnet_disconnect_session (&a);
+
+  CLIB_SPINLOCK_LOCK (c->session_map_lock);
+  c->session_map &= ~(1 << s->thread_index);
+  c->is_ready = 0;
+  CLIB_SPINLOCK_UNLOCK (c->session_map_lock);
+
+  cwrk = &c->wrk[s->thread_index];
+  cwrk->session_handle = SESSION_INVALID_HANDLE;
+
+  /* Worker session disconnected, try to reconnect */
+  app_evt_collector_program_connect (c->collector_index);
+}
+
+static void
+app_evt_collector_reset_callback (session_t *s)
+{
+  app_evt_collector_disconnect_callback (s);
+}
+
+static int
+app_evt_collector_rx_callback (session_t *s)
+{
+  /* TODO */
+  return 0;
+}
+
+static int
+app_evt_collector_tx_callback (session_t *s)
+{
+  app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
+  application_t *app = application_get (app_wrk->app_index);
+  app_evt_collector_t *c = app_evt_collector_get (app->evt_collector_index);
+  app_evt_collector_wrk_t *cwrk = &c->wrk[s->thread_index];
+
+  /* If we have data buffered, try to send it now */
+  if (cwrk->buf.len)
+    app_evt_collector_wrk_send (cwrk);
+
+  return 0;
+}
+
+static int
+app_evt_collector_add_segment_cb (u32 client_index, u64 segment_handle)
+{
+  return 0;
+}
+
+static int
+app_evt_collector_del_segment_cb (u32 app_wrk_index, u64 segment_handle)
+{
+  return 0;
+}
+
+static session_cb_vft_t app_evtger_cb_vft = {
+  .session_accept_callback = app_evt_collector_accept_callback,
+  .session_connected_callback = app_evt_collector_connected_callback,
+  .session_disconnect_callback = app_evt_collector_disconnect_callback,
+  .session_reset_callback = app_evt_collector_reset_callback,
+  .builtin_app_rx_callback = app_evt_collector_rx_callback,
+  .builtin_app_tx_callback = app_evt_collector_tx_callback,
+  .add_segment_callback = app_evt_collector_add_segment_cb,
+  .del_segment_callback = app_evt_collector_del_segment_cb,
+};
+
+static int
+app_evt_collector_connect (app_evt_collector_t *c)
+{
+  app_evt_main_t *alm = &app_evt_main;
+  u32 num_threads;
+  int i, rv;
+
+  num_threads = vlib_num_workers ();
+  num_threads = num_threads == 0 ? 1 : num_threads;
+
+  vnet_connect_args_t cargs = {
+    .sep_ext = c->cfg.sep,
+    .app_index = alm->app_index,
+    .api_context = c->collector_index,
+  };
+
+  for (i = 0; i < num_threads; i++)
+    {
+      rv = vnet_connect (&cargs);
+      if (rv)
+       {
+         clib_warning ("could not connect %U", format_session_error, rv);
+         return -1;
+       }
+    }
+
+  return 0;
+}
+
+int
+app_evt_collector_add (app_evt_collector_cfg_t *cfg)
+{
+  app_evt_main_t *alm = &app_evt_main;
+  app_evt_collector_t *c;
+
+  pool_get_zero (alm->collectors, c);
+  c->cfg = *cfg;
+
+  vec_validate (c->wrk, vlib_num_workers ());
+  for (int i = 0; i < vec_len (c->wrk); i++)
+    {
+      c->wrk[i].session_handle = SESSION_INVALID_HANDLE;
+      c->wrk[i].buf.head_chunk = ~0;
+      c->wrk[i].buf.tail_chunk = ~0;
+    }
+
+  return app_evt_collector_connect (c);
+}
+
+static int
+app_evt_collector_del (app_evt_collector_cfg_t *cfg)
+{
+  app_evt_collector_wrk_t *cwrk;
+  app_evt_collector_t *c;
+
+  pool_foreach (c, app_evt_main.collectors)
+    {
+      if (c->cfg.sep.is_ip4 == cfg->sep.is_ip4 &&
+         c->cfg.sep.port == cfg->sep.port &&
+         ip46_address_cmp (&c->cfg.sep.ip, &cfg->sep.ip) == 0)
+       {
+         pool_put (app_evt_main.collectors, c);
+         vec_foreach (cwrk, c->wrk)
+           {
+             if (cwrk->session_handle != SESSION_INVALID_HANDLE)
+               {
+                 vnet_disconnect_args_t a = { cwrk->session_handle,
+                                              app_evt_main.app_index };
+                 vnet_disconnect_session (&a);
+               }
+           }
+         return 0;
+       }
+    }
+  return -1;
+}
+
+static int
+app_evt_collector_attach (void)
+{
+  app_evt_main_t *alm = &app_evt_main;
+  vnet_app_attach_args_t _a = {}, *a = &_a;
+  u64 options[APP_OPTIONS_N_OPTIONS];
+  int rv;
+
+  clib_memset (options, 0, sizeof (options));
+
+  a->name = format (0, "app-evt-collector");
+  a->api_client_index = ~0;
+  a->session_cb_vft = &app_evtger_cb_vft;
+  a->options = options;
+  a->options[APP_OPTIONS_SEGMENT_SIZE] = alm->segment_size;
+  a->options[APP_OPTIONS_ADD_SEGMENT_SIZE] = alm->segment_size;
+  a->options[APP_OPTIONS_RX_FIFO_SIZE] = alm->fifo_size;
+  a->options[APP_OPTIONS_TX_FIFO_SIZE] = alm->fifo_size;
+  a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN |
+                                 APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE |
+                                 APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
+
+  if ((rv = vnet_application_attach (a)))
+    {
+      clib_warning ("app session evt-collector attach failed: %U",
+                   format_session_error, rv);
+      return rv;
+    }
+
+  alm->app_index = a->app_index;
+
+  return 0;
+}
+
+static clib_error_t *
+app_evt_collector_enable_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                                    vlib_cli_command_t *cmd)
+{
+  unformat_input_t _line_input, *line_input = &_line_input;
+  u8 *collector_uri = 0, is_enable = 0, is_add = 1;
+  app_evt_main_t *alm = &app_evt_main;
+  clib_error_t *error = 0;
+  u64 tmp64 = 0;
+
+  if (!unformat_user (input, unformat_line_input, line_input))
+    return 0;
+
+  if (alm->app_index != APP_INVALID_INDEX)
+    {
+      /* Default configs  */
+      alm->fifo_size = 4 << 20;
+      alm->segment_size = 32 << 20;
+    }
+
+  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (line_input, "enable"))
+       is_enable = 1;
+      else if (unformat (line_input, "fifo-size %U", unformat_memory_size,
+                        &alm->fifo_size))
+       ;
+      else if (unformat (line_input, "segment-size %U", unformat_memory_size,
+                        &tmp64))
+       alm->segment_size = tmp64;
+      else if (unformat (line_input, "uri %s", &collector_uri))
+       vec_add1 (collector_uri, 0);
+      else if (unformat (line_input, "add"))
+       ;
+      else if (unformat (line_input, "del"))
+       is_add = 0;
+      else
+       {
+         error = clib_error_return (0, "unknown input `%U'",
+                                    format_unformat_error, line_input);
+         goto done;
+       }
+    }
+
+  if (is_enable)
+    {
+      if (alm->app_index != APP_INVALID_INDEX)
+       {
+         error = clib_error_return (0, "app evt-collector already enabled");
+         goto done;
+       }
+      app_evt_collector_attach ();
+    }
+
+  if (collector_uri)
+    {
+      app_evt_collector_cfg_t cfg = { .is_server = 1 };
+
+      if (alm->app_index == APP_INVALID_INDEX)
+       {
+         error = clib_error_return (0, "app evt-collector not enabled");
+         goto done;
+       }
+
+      if (parse_uri ((char *) collector_uri, &cfg.sep))
+       {
+         error =
+           clib_error_return (0, "Invalid collector uri [%v]", collector_uri);
+         goto done;
+       }
+      if (is_add && app_evt_collector_add (&cfg))
+       {
+         error = clib_error_return (0, "Failed to add collector");
+         goto done;
+       }
+      if (!is_add && app_evt_collector_del (&cfg))
+       {
+         error = clib_error_return (0, "Failed to remove collector");
+         goto done;
+       }
+    }
+
+done:
+  unformat_free (line_input);
+  vec_free (collector_uri);
+  return error;
+}
+
+VLIB_CLI_COMMAND (app_evt_collector_command, static) = {
+  .path = "app evt-collector",
+  .short_help = "app evt-collector [enable] [segment-size <nn>[k|m]] "
+               "[fifo-size <nn>[k|m]] [add|del] uri <uri>",
+  .function = app_evt_collector_enable_command_fn,
+};
+
+static u8 *
+format_app_evt_collector (u8 *s, va_list *args)
+{
+  app_evt_collector_t *c = va_arg (*args, app_evt_collector_t *);
+  u32 i, indent;
+
+  s = format (s, "[%u] ", c->collector_index);
+  indent = format_get_indent (s);
+  s = format (s, "remote %U:%u is server %d\n", format_ip46_address,
+             &c->cfg.sep.ip, c->cfg.sep.is_ip4, c->cfg.sep.port,
+             c->cfg.is_server);
+  s = format (s, "%Uis ready: %u session map: 0x%x\n", format_white_space,
+             indent, c->is_ready, c->session_map);
+  s = format (s, "%Usessions:\n", format_white_space, indent);
+  for (i = vlib_num_workers () ? 1 : 0; i < vec_len (c->wrk); i++)
+    {
+      if (c->wrk[i].session_handle != SESSION_INVALID_HANDLE)
+       {
+         session_t *cs = session_get_from_handle (c->wrk[i].session_handle);
+         transport_endpoint_t tep;
+         session_get_endpoint (cs, &tep, 1 /* is_lcl */);
+         s = format (s, "%U [%u:%u] %U:%u\n", format_white_space, indent,
+                     cs->thread_index, cs->session_index, format_ip46_address,
+                     &tep.ip, tep.is_ip4, tep.port);
+       }
+      else
+       s = format (s, "%U <not-connected>\n", format_white_space, indent,
+                   format_session, c->wrk[i].session_handle);
+    }
+
+  return s;
+}
+
+static clib_error_t *
+show_app_evt_collector_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                                  vlib_cli_command_t *cmd)
+{
+  unformat_input_t _line_input, *line_input = &_line_input;
+  app_evt_main_t *alm = &app_evt_main;
+  clib_error_t *error = 0;
+  app_evt_collector_t *c;
+
+  if (unformat_user (input, unformat_line_input, line_input))
+    {
+      error = clib_error_return (0, "unknown input `%U'",
+                                format_unformat_error, line_input);
+      goto done;
+    }
+
+  if (alm->app_index == APP_INVALID_INDEX)
+    {
+      error = clib_error_return (0, "app evt-collector not enabled");
+      goto done;
+    }
+
+  vlib_cli_output (vm, "app evt-collector app-index: %u", alm->app_index);
+  vlib_cli_output (vm, " fifo size %U segment size %U", format_memory_size,
+                  alm->fifo_size, format_memory_size, alm->segment_size);
+  pool_foreach (c, alm->collectors)
+    vlib_cli_output (vm, " %U", format_app_evt_collector, c, 0);
+
+done:
+  return error;
+}
+
+VLIB_CLI_COMMAND (show_app_evt_collector_command, static) = {
+  .path = "show app evt-collector",
+  .short_help = "show app evt-collector",
+  .function = show_app_evt_collector_command_fn,
+};
diff --git a/src/vnet/session/application_eventing.h b/src/vnet/session/application_eventing.h
new file mode 100644 (file)
index 0000000..a42061b
--- /dev/null
@@ -0,0 +1,214 @@
+/* SPDX-License-Identifier: Apache-2.0
+ * Copyright (c) 2025 Cisco Systems, Inc.
+ */
+
+#ifndef SRC_VNET_SESSION_APP_EVENTING_H_
+#define SRC_VNET_SESSION_APP_EVENTING_H_
+
+#include <vnet/session/session_types.h>
+#include <vnet/session/application.h>
+#include <vnet/tcp/tcp_types.h>
+
+typedef enum app_evt_msg_type_
+{
+  APP_EVT_MSG_CTRL,
+  APP_EVT_MSG_DATA
+} __clib_packed app_evt_msg_type_t;
+
+typedef enum app_evt_msg_ctrl_type_
+{
+  APP_EVT_MSG_CTRL_INIT,
+  APP_EVT_MSG_CTRL_REPLY_INIT,
+} __clib_packed app_evt_msg_ctrl_type_t;
+
+typedef struct app_evt_msg_ctrl_
+{
+  app_evt_msg_ctrl_type_t ctrl_type;
+  u32 msg_len;
+  u8 data[0];
+} __clib_packed app_evt_msg_ctrl_t;
+
+typedef enum app_evt_msg_data_msg_type_
+{
+  APP_EVT_MSG_DATA_SESSION_STATS,
+} __clib_packed app_evt_msg_data_msg_type_t;
+
+typedef struct app_evt_msg_data_
+{
+  app_evt_msg_data_msg_type_t data_type;
+  u32 msg_len;
+  u8 data[0];
+} __clib_packed app_evt_msg_data_t;
+
+typedef struct app_evt_msg_data_session_stats_
+{
+  u8 transport_proto_type; /**< vpp transport proto */
+  u32 msg_len;
+  u8 data[0];
+} __clib_packed app_evt_msg_data_session_stats_t;
+
+typedef struct app_evt_msg_
+{
+  app_evt_msg_type_t msg_type;
+  u32 msg_len;
+  u8 data[0];
+} __clib_packed app_evt_msg_t;
+
+#define foreach_tcp_transport_stat                                            \
+  _ (u64, segs_in)                                                            \
+  _ (u64, bytes_in)                                                           \
+  _ (u64, segs_out)                                                           \
+  _ (u64, bytes_out)                                                          \
+  _ (u64, data_segs_in)                                                       \
+  _ (u64, data_segs_out)                                                      \
+  _ (u32, snd_mss)                                                            \
+  _ (u32, dupacks_in)                                                         \
+  _ (u32, dupacks_out)                                                        \
+  _ (u32, fr_occurences)                                                      \
+  _ (u32, tr_occurences)                                                      \
+  _ (u64, bytes_retrans)                                                      \
+  _ (u64, segs_retrans)                                                       \
+  _ (u32, srtt)                                                               \
+  _ (u32, rttvar)                                                             \
+  _ (f64, mrtt_us)                                                            \
+  _ (tcp_errors_t, errors)                                                    \
+  _ (f64, start_ts)
+
+typedef struct tcp_transport_stats_
+{
+  u8 conn_id[TRANSPORT_CONN_ID_LEN];
+#define _(type, name) type name;
+  foreach_tcp_transport_stat
+#undef _
+    f64 end_ts;
+} __clib_packed tcp_session_stats_t;
+
+#define foreach_udp_transport_stat                                            \
+  _ (u64, bytes_in)                                                           \
+  _ (u64, dgrams_in)                                                          \
+  _ (u64, bytes_out)                                                          \
+  _ (u64, dgrams_out)                                                         \
+  _ (u32, errors_in)                                                          \
+  _ (u16, mss)
+
+typedef struct udp_transport_stats_
+{
+  u8 conn_id[TRANSPORT_CONN_ID_LEN];
+#define _(type, name) type name;
+  foreach_udp_transport_stat
+#undef _
+    f64 end_ts;
+} __clib_packed udp_session_stats_t;
+
+typedef struct ct_transport_stats_
+{
+  u8 conn_id[TRANSPORT_CONN_ID_LEN];
+  transport_proto_t actual_proto;
+  f64 end_ts;
+} __clib_packed ct_session_stats_t;
+
+typedef struct app_evt_collector_cfg_
+{
+  session_endpoint_cfg_t sep; /**< collector endpoint */
+  u8 is_server : 1;          /**< collector is server */
+} app_evt_collector_cfg_t;
+
+typedef struct app_evt_buffer_chunk_
+{
+  u32 chunk_index; /**< index in pool  */
+  u32 next_index;  /**< next in linked list */
+  u32 len;        /**< evt data length */
+  u8 data[512];           /**< evt data */
+} __clib_packed app_evt_buffer_chunk_t;
+
+static inline void
+app_evt_buf_chunk_append (app_evt_buffer_chunk_t *chunk, const void *data,
+                         u32 len)
+{
+  clib_memcpy (chunk->data + chunk->len, data, len);
+  chunk->len += len;
+  ASSERT (chunk->len <= sizeof (chunk->data));
+}
+
+static inline void *
+app_evt_buf_chunk_append_uninit (app_evt_buffer_chunk_t *chunk, u32 len)
+{
+  u8 *start = chunk->data + chunk->len;
+  chunk->len += len;
+  ASSERT (chunk->len <= sizeof (chunk->data));
+  return start;
+}
+
+typedef struct app_evt_buffer_
+{
+  app_evt_buffer_chunk_t *chunks; /**< pool of chunks */
+  u32 head_chunk;                /**< head of linked list */
+  u32 tail_chunk;                /**< tail of linked list  */
+  u32 len;                       /**< evt data length */
+} app_evt_buffer_t;
+
+static inline app_evt_buffer_chunk_t *
+app_evt_buffer_alloc_chunk (app_evt_buffer_t *buf)
+{
+  app_evt_buffer_chunk_t *chunk;
+
+  pool_get_zero (buf->chunks, chunk);
+  chunk->chunk_index = chunk - buf->chunks;
+  chunk->next_index = ~0;
+
+  return chunk;
+}
+
+static inline app_evt_buffer_chunk_t *
+app_evt_buffer_get_chunk (app_evt_buffer_t *buf, u32 chunk_index)
+{
+  if (pool_is_free_index (buf->chunks, chunk_index))
+    return 0;
+  return pool_elt_at_index (buf->chunks, chunk_index);
+}
+
+static inline void
+app_evt_buffer_free_chunk (app_evt_buffer_t *buf,
+                          app_evt_buffer_chunk_t *chunk)
+{
+  pool_put (buf->chunks, chunk);
+}
+
+void app_evt_buffer_append_chunk (app_evt_buffer_t *buf,
+                                 app_evt_buffer_chunk_t *chunk);
+
+typedef struct app_evt_collector_wrk_
+{
+  session_handle_t session_handle; /**< per-worker session handle */
+  app_evt_buffer_t buf;                   /**< per-worker evt buffer */
+  svm_fifo_seg_t *segs;
+} app_evt_collector_wrk_t;
+
+typedef struct app_evt_collector_
+{
+  app_evt_collector_wrk_t *wrk; /**< per-thread context */
+  u8 is_ready : 1;             /**< collector initialized */
+  u32 collector_index;         /**< collector index */
+  u32 session_map;             /**< map of connected sessions */
+  u32 session_map_lock;                /**< lock for session map */
+  app_evt_collector_cfg_t cfg; /**< collector config */
+} app_evt_collector_t;
+
+typedef struct app_evt_main_
+{
+  app_evt_collector_t *collectors; /**< pool of collectors */
+  u32 app_index;                  /**< evt collector app index */
+
+  /*
+   * application config
+   */
+  u32 segment_size; /**< segment size */
+  u32 fifo_size;    /**< fifo size */
+} app_evt_main_t;
+
+int app_evt_collector_add (app_evt_collector_cfg_t *cfg);
+app_evt_collector_t *app_evt_collector_get (u32 c_index);
+void *app_evt_collector_get_cb_fn ();
+void app_evt_collector_wrk_send (app_evt_collector_wrk_t *cwrk);
+
+#endif /* SRC_VNET_SESSION_APP_EVENTING_H_ */
\ No newline at end of file
index 33b6118..a8ca422 100644 (file)
@@ -84,6 +84,8 @@ typedef struct session_cb_vft_
   /** Custom fifo allocation for proxy */
   int (*proxy_alloc_session_fifos) (session_t *s);
 
+  /** Collect and export session logs */
+  int (*app_evt_callback) (session_t *s);
 } session_cb_vft_t;
 
 #define foreach_app_init_args                  \
@@ -243,7 +245,8 @@ typedef enum
   _ (EVT_MQ_USE_EVENTFD, "Use eventfds for signaling")                        \
   _ (MEMFD_FOR_BUILTIN, "Use memfd for builtin app segs")                     \
   _ (USE_HUGE_PAGE, "Use huge page for FIFO")                                 \
-  _ (GET_ORIGINAL_DST, "Get original dst enabled")
+  _ (GET_ORIGINAL_DST, "Get original dst enabled")                            \
+  _ (LOG_COLLECTOR, "App requests log collector")
 
 typedef enum _app_options
 {
index ffd2780..ca12ae5 100644 (file)
@@ -251,6 +251,8 @@ app_worker_flush_events_inline (app_worker_t *app_wrk,
            }
          if (evt->as_u64[0] >> 32 == SESSION_CLEANUP_TRANSPORT)
            {
+             if (app->cb_fns.app_evt_callback)
+               app->cb_fns.app_evt_callback (s);
              /* postponed cleanup requested */
              if (evt->as_u64[1])
                transport_cleanup_cb ((void *) evt->as_u64[1],