From 9ed4013fda78e950f0aba3c6709c3a6b47e04295 Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Wed, 23 Apr 2025 00:49:14 -0400 Subject: [PATCH] session: add session eventing infra for apps Basic framework/supporting infrastructure for now. Start evt app with default collector with: app evt-collector enable [uri ] Add/del collectors (only one supported for now) with: app evt-collector add External applications can request eventing to default collector. Builtin applications can also provide custom eventing functions. Type: feature Change-Id: I3547bfc9b258b33a4e8c60c161de75c21533b7f1 Signed-off-by: Florin Coras --- src/vnet/CMakeLists.txt | 2 + src/vnet/session/application.c | 4 + src/vnet/session/application.h | 3 + src/vnet/session/application_eventing.c | 677 +++++++++++++++++++++++++++++++ src/vnet/session/application_eventing.h | 214 ++++++++++ src/vnet/session/application_interface.h | 5 +- src/vnet/session/session_input.c | 2 + 7 files changed, 906 insertions(+), 1 deletion(-) create mode 100644 src/vnet/session/application_eventing.c create mode 100644 src/vnet/session/application_eventing.h diff --git a/src/vnet/CMakeLists.txt b/src/vnet/CMakeLists.txt index 5c9c5cc0dc5..14c91100eb6 100644 --- a/src/vnet/CMakeLists.txt +++ b/src/vnet/CMakeLists.txt @@ -974,6 +974,7 @@ list(APPEND VNET_SOURCES session/application.c session/application_worker.c session/session_cli.c + session/application_eventing.c session/application_interface.c session/application_local.c session/application_namespace.c @@ -992,6 +993,7 @@ list(APPEND VNET_HEADERS session/transport.h session/transport_types.h session/application_interface.h + session/application_eventing.h session/application_local.h session/application_namespace.h session/session_debug.h diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 1a2509e6356..1bfce79e4b4 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -855,6 +856,9 @@ application_alloc_and_init (app_init_args_t *a) props->pct_first_alloc = opts[APP_OPTIONS_PCT_FIRST_ALLOC]; props->segment_type = seg_type; + if (opts[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_LOG_COLLECTOR) + app->cb_fns.app_evt_callback = app_evt_collector_get_cb_fn (); + /* Add app to lookup by api_client_index table */ if (!application_is_builtin (app)) application_api_table_add (app->app_index, a->api_client_index); diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h index d748eae9cd5..2d605c3af82 100644 --- a/src/vnet/session/application.h +++ b/src/vnet/session/application.h @@ -166,6 +166,9 @@ typedef struct application_ * linked list maintained by the app sublayer for each worker */ app_rx_mq_elt_t *rx_mqs; + + /** collector index, if any */ + u32 evt_collector_index; } application_t; typedef struct app_rx_mq_handle_ diff --git a/src/vnet/session/application_eventing.c b/src/vnet/session/application_eventing.c new file mode 100644 index 00000000000..e698d1ce910 --- /dev/null +++ b/src/vnet/session/application_eventing.c @@ -0,0 +1,677 @@ +/* SPDX-License-Identifier: Apache-2.0 + * Copyright (c) 2025 Cisco Systems, Inc. + */ + +#include +#include +#include +#include + +app_evt_main_t app_evt_main = { .app_index = APP_INVALID_INDEX }; + +void +app_evt_buffer_append_chunk (app_evt_buffer_t *buf, + app_evt_buffer_chunk_t *chunk) +{ + app_evt_buffer_chunk_t *tail; + buf->len += chunk->len; + + if (buf->tail_chunk == ~0) + { + buf->head_chunk = chunk->chunk_index; + buf->tail_chunk = chunk->chunk_index; + return; + } + tail = app_evt_buffer_get_chunk (buf, buf->tail_chunk); + tail->next_index = chunk->chunk_index; + buf->tail_chunk = chunk->chunk_index; +} + +void +app_evt_collector_wrk_send (app_evt_collector_wrk_t *cwrk) +{ + u32 max_enq, to_send = 0, next_c; + app_evt_buffer_chunk_t *c; + svm_fifo_seg_t *seg; + session_t *cs; + int wrote; + + cs = session_get_from_handle_if_valid (cwrk->session_handle); + if (!cs) + { + clib_warning ("session not found"); + return; + } + max_enq = svm_fifo_max_enqueue_prod (cs->tx_fifo); + + if (!max_enq) + { + svm_fifo_add_want_deq_ntf (cs->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); + return; + } + + c = app_evt_buffer_get_chunk (&cwrk->buf, cwrk->buf.head_chunk); + while (c) + { + if (c->len + to_send > max_enq) + break; + + to_send += c->len; + vec_add2 (cwrk->segs, seg, 1); + seg->data = c->data; + seg->len = c->len; + + c = app_evt_buffer_get_chunk (&cwrk->buf, c->next_index); + } + + if (session_has_transport (cs)) + { + wrote = svm_fifo_enqueue_segments ( + cs->tx_fifo, cwrk->segs, vec_len (cwrk->segs), 0 /* allow partial*/); + } + else + { + /* Special handling of client cut-throughs */ + ct_connection_t *cct; + + cct = (ct_connection_t *) session_get_transport (cs); + wrote = + svm_fifo_enqueue_segments (cct->client_tx_fifo, cwrk->segs, + vec_len (cwrk->segs), 0 /* allow partial*/); + } + + if (wrote > 0 && svm_fifo_set_event (cs->tx_fifo)) + session_program_tx_io_evt (cs->handle, SESSION_IO_EVT_TX); + + cwrk->buf.len -= wrote > 0 ? wrote : 0; + + next_c = cwrk->buf.head_chunk; + while (wrote > 0) + { + c = app_evt_buffer_get_chunk (&cwrk->buf, next_c); + next_c = c->next_index; + ASSERT (wrote >= c->len); + wrote -= c->len; + app_evt_buffer_free_chunk (&cwrk->buf, c); + } + ASSERT (wrote == 0); + cwrk->buf.head_chunk = next_c; + if (cwrk->buf.head_chunk == ~0) + cwrk->buf.tail_chunk = ~0; + + vec_reset_length (cwrk->segs); + if (cwrk->buf.len) + svm_fifo_add_want_deq_ntf (cs->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF); + + return; +} + +static void +app_evt_collector_log_session (app_evt_collector_t *c, session_t *s) +{ + app_evt_msg_data_session_stats_t *sess_stats; + app_evt_collector_wrk_t *cwrk; + app_evt_buffer_chunk_t *chunk; + app_evt_msg_data_t *data_msg; + transport_connection_t *tc; + app_evt_msg_t *msg; + + tc = session_get_transport (s); + if (!tc) + return; + + cwrk = &c->wrk[s->thread_index]; + chunk = app_evt_buffer_alloc_chunk (&cwrk->buf); + + msg = app_evt_buf_chunk_append_uninit (chunk, sizeof (app_evt_msg_t)); + msg->msg_type = APP_EVT_MSG_DATA; + + data_msg = + app_evt_buf_chunk_append_uninit (chunk, sizeof (app_evt_msg_data_t)); + data_msg->data_type = APP_EVT_MSG_DATA_SESSION_STATS; + + sess_stats = app_evt_buf_chunk_append_uninit ( + chunk, sizeof (app_evt_msg_data_session_stats_t)); + sess_stats->transport_proto_type = tc->proto; + + switch (tc->proto) + { + case TRANSPORT_PROTO_TCP: + { + tcp_connection_t *tcp_conn = (tcp_connection_t *) tc; + tcp_session_stats_t *tcp_stats = app_evt_buf_chunk_append_uninit ( + chunk, sizeof (tcp_session_stats_t)); + sess_stats->msg_len = sizeof (app_evt_msg_data_session_stats_t) + + sizeof (tcp_session_stats_t); + clib_memcpy_fast (tcp_stats->conn_id, tc->opaque_conn_id, + sizeof (tc->opaque_conn_id)); + tcp_stats->end_ts = transport_time_now (s->thread_index); + +#define _(type, name) tcp_stats->name = tcp_conn->name; + foreach_tcp_transport_stat +#undef _ + } + break; + case TRANSPORT_PROTO_UDP: + { + udp_connection_t *udp_conn = (udp_connection_t *) tc; + udp_session_stats_t *udp_stats = app_evt_buf_chunk_append_uninit ( + chunk, sizeof (udp_session_stats_t)); + sess_stats->msg_len = sizeof (app_evt_msg_data_session_stats_t) + + sizeof (udp_session_stats_t); + clib_memcpy_fast (udp_stats->conn_id, tc->opaque_conn_id, + sizeof (tc->opaque_conn_id)); + udp_stats->end_ts = transport_time_now (s->thread_index); + +#define _(type, name) udp_stats->name = udp_conn->name; + foreach_udp_transport_stat +#undef _ + } + break; + case TRANSPORT_PROTO_CT: + { + ct_connection_t *ct_conn = (ct_connection_t *) tc; + ct_session_stats_t *ct_stats = + app_evt_buf_chunk_append_uninit (chunk, sizeof (ct_session_stats_t)); + sess_stats->msg_len = sizeof (app_evt_msg_data_session_stats_t) + + sizeof (ct_session_stats_t); + clib_memcpy_fast (ct_stats->conn_id, tc->opaque_conn_id, + sizeof (tc->opaque_conn_id)); + ct_stats->actual_proto = ct_conn->actual_tp; + ct_stats->end_ts = transport_time_now (s->thread_index); + } + break; + default: + break; + }; + + data_msg->msg_len = sizeof (app_evt_msg_data_t) + sess_stats->msg_len; + msg->msg_len = sizeof (app_evt_msg_t) + data_msg->msg_len; + + app_evt_buffer_append_chunk (&cwrk->buf, chunk); + app_evt_collector_wrk_send (cwrk); +} + +app_evt_collector_t * +app_evt_collector_get (u32 c_index) +{ + app_evt_main_t *alm = &app_evt_main; + if (pool_is_free_index (alm->collectors, c_index)) + return 0; + return pool_elt_at_index (alm->collectors, c_index); +} + +static void +app_evt_collect_on_session_cleanup (session_t *s) +{ + app_evt_collector_t *c; + app_worker_t *app_wrk; + application_t *app; + + app_wrk = app_worker_get (s->app_wrk_index); + app = application_get (app_wrk->app_index); + c = app_evt_collector_get (app->evt_collector_index); + if (PREDICT_FALSE (!c || !c->is_ready)) + return; + app_evt_collector_log_session (c, s); +} + +void * +app_evt_collector_get_cb_fn (void) +{ + app_evt_main_t *alm = &app_evt_main; + + if (alm->app_index == APP_INVALID_INDEX) + return 0; + + return app_evt_collect_on_session_cleanup; +} + +static void +alc_more_connects_cb_fn (void *arg) +{ + app_evt_main_t *alm = &app_evt_main; + vnet_connect_args_t _a = {}, *a = &_a; + u32 c_index = pointer_to_uword (arg); + app_evt_collector_t *c; + int rv; + + c = app_evt_collector_get (c_index); + a->sep_ext = c->cfg.sep; + a->app_index = alm->app_index; + a->api_context = c->collector_index; + + if ((rv = vnet_connect (a))) + { + clib_warning ("could not connect session for collector %u: %U", c_index, + format_session_error, rv); + return; + } +} + +static void +app_evt_collector_program_connect (u32 c_index) +{ + u32 connects_thread = transport_cl_thread (); + + session_send_rpc_evt_to_thread_force (connects_thread, + alc_more_connects_cb_fn, + uword_to_pointer (c_index, void *)); +} + +static int +app_evt_collector_connected_callback (u32 app_index, u32 api_context, + session_t *s, session_error_t err) +{ + app_evt_main_t *alm = &app_evt_main; + app_evt_collector_wrk_t *cwrk; + u32 session_map, num_workers; + app_evt_collector_t *c; + + c = app_evt_collector_get (api_context); + if (!c) + { + clib_warning ("app_evt_collector_connected_callback: " + "invalid collector index %u", + api_context); + return -1; + } + + CLIB_SPINLOCK_LOCK (c->session_map_lock); + session_map = c->session_map; + CLIB_SPINLOCK_UNLOCK (c->session_map_lock); + + if (err) + goto check_map; + + /* Already have a session */ + if (session_map & (1 << s->thread_index)) + { + vnet_disconnect_args_t a = { session_handle (s), alm->app_index }; + vnet_disconnect_session (&a); + goto check_map; + } + + cwrk = &c->wrk[s->thread_index]; + cwrk->session_handle = session_handle (s); + s->opaque = c->collector_index << 16 | s->thread_index; + s->session_state = SESSION_STATE_READY; + + CLIB_SPINLOCK_LOCK (c->session_map_lock); + c->session_map |= 1 << s->thread_index; + session_map = c->session_map; + CLIB_SPINLOCK_UNLOCK (c->session_map_lock); + +check_map: + + num_workers = vlib_num_workers (); + + /* If no workers and we have a session, accept it */ + if (!num_workers && (session_map != 0)) + return 0; + + /* If not all threads apart from 0 (main) are set + * then we need to connect more sessions */ + if (session_map != (1 << (num_workers + 1)) - 2) + app_evt_collector_program_connect (c->collector_index); + else + c->is_ready = 1; + + return 0; +} + +static int +app_evt_collector_accept_callback (session_t *s) +{ + clib_warning ("not implemented"); + return -1; +} + +static void +app_evt_collector_disconnect_callback (session_t *s) +{ + app_evt_collector_t *c = app_evt_collector_get (s->opaque >> 16); + vnet_disconnect_args_t a = { session_handle (s), app_evt_main.app_index }; + app_evt_collector_wrk_t *cwrk; + + vnet_disconnect_session (&a); + + CLIB_SPINLOCK_LOCK (c->session_map_lock); + c->session_map &= ~(1 << s->thread_index); + c->is_ready = 0; + CLIB_SPINLOCK_UNLOCK (c->session_map_lock); + + cwrk = &c->wrk[s->thread_index]; + cwrk->session_handle = SESSION_INVALID_HANDLE; + + /* Worker session disconnected, try to reconnect */ + app_evt_collector_program_connect (c->collector_index); +} + +static void +app_evt_collector_reset_callback (session_t *s) +{ + app_evt_collector_disconnect_callback (s); +} + +static int +app_evt_collector_rx_callback (session_t *s) +{ + /* TODO */ + return 0; +} + +static int +app_evt_collector_tx_callback (session_t *s) +{ + app_worker_t *app_wrk = app_worker_get (s->app_wrk_index); + application_t *app = application_get (app_wrk->app_index); + app_evt_collector_t *c = app_evt_collector_get (app->evt_collector_index); + app_evt_collector_wrk_t *cwrk = &c->wrk[s->thread_index]; + + /* If we have data buffered, try to send it now */ + if (cwrk->buf.len) + app_evt_collector_wrk_send (cwrk); + + return 0; +} + +static int +app_evt_collector_add_segment_cb (u32 client_index, u64 segment_handle) +{ + return 0; +} + +static int +app_evt_collector_del_segment_cb (u32 app_wrk_index, u64 segment_handle) +{ + return 0; +} + +static session_cb_vft_t app_evtger_cb_vft = { + .session_accept_callback = app_evt_collector_accept_callback, + .session_connected_callback = app_evt_collector_connected_callback, + .session_disconnect_callback = app_evt_collector_disconnect_callback, + .session_reset_callback = app_evt_collector_reset_callback, + .builtin_app_rx_callback = app_evt_collector_rx_callback, + .builtin_app_tx_callback = app_evt_collector_tx_callback, + .add_segment_callback = app_evt_collector_add_segment_cb, + .del_segment_callback = app_evt_collector_del_segment_cb, +}; + +static int +app_evt_collector_connect (app_evt_collector_t *c) +{ + app_evt_main_t *alm = &app_evt_main; + u32 num_threads; + int i, rv; + + num_threads = vlib_num_workers (); + num_threads = num_threads == 0 ? 1 : num_threads; + + vnet_connect_args_t cargs = { + .sep_ext = c->cfg.sep, + .app_index = alm->app_index, + .api_context = c->collector_index, + }; + + for (i = 0; i < num_threads; i++) + { + rv = vnet_connect (&cargs); + if (rv) + { + clib_warning ("could not connect %U", format_session_error, rv); + return -1; + } + } + + return 0; +} + +int +app_evt_collector_add (app_evt_collector_cfg_t *cfg) +{ + app_evt_main_t *alm = &app_evt_main; + app_evt_collector_t *c; + + pool_get_zero (alm->collectors, c); + c->cfg = *cfg; + + vec_validate (c->wrk, vlib_num_workers ()); + for (int i = 0; i < vec_len (c->wrk); i++) + { + c->wrk[i].session_handle = SESSION_INVALID_HANDLE; + c->wrk[i].buf.head_chunk = ~0; + c->wrk[i].buf.tail_chunk = ~0; + } + + return app_evt_collector_connect (c); +} + +static int +app_evt_collector_del (app_evt_collector_cfg_t *cfg) +{ + app_evt_collector_wrk_t *cwrk; + app_evt_collector_t *c; + + pool_foreach (c, app_evt_main.collectors) + { + if (c->cfg.sep.is_ip4 == cfg->sep.is_ip4 && + c->cfg.sep.port == cfg->sep.port && + ip46_address_cmp (&c->cfg.sep.ip, &cfg->sep.ip) == 0) + { + pool_put (app_evt_main.collectors, c); + vec_foreach (cwrk, c->wrk) + { + if (cwrk->session_handle != SESSION_INVALID_HANDLE) + { + vnet_disconnect_args_t a = { cwrk->session_handle, + app_evt_main.app_index }; + vnet_disconnect_session (&a); + } + } + return 0; + } + } + return -1; +} + +static int +app_evt_collector_attach (void) +{ + app_evt_main_t *alm = &app_evt_main; + vnet_app_attach_args_t _a = {}, *a = &_a; + u64 options[APP_OPTIONS_N_OPTIONS]; + int rv; + + clib_memset (options, 0, sizeof (options)); + + a->name = format (0, "app-evt-collector"); + a->api_client_index = ~0; + a->session_cb_vft = &app_evtger_cb_vft; + a->options = options; + a->options[APP_OPTIONS_SEGMENT_SIZE] = alm->segment_size; + a->options[APP_OPTIONS_ADD_SEGMENT_SIZE] = alm->segment_size; + a->options[APP_OPTIONS_RX_FIFO_SIZE] = alm->fifo_size; + a->options[APP_OPTIONS_TX_FIFO_SIZE] = alm->fifo_size; + a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN | + APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE | + APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE; + + if ((rv = vnet_application_attach (a))) + { + clib_warning ("app session evt-collector attach failed: %U", + format_session_error, rv); + return rv; + } + + alm->app_index = a->app_index; + + return 0; +} + +static clib_error_t * +app_evt_collector_enable_command_fn (vlib_main_t *vm, unformat_input_t *input, + vlib_cli_command_t *cmd) +{ + unformat_input_t _line_input, *line_input = &_line_input; + u8 *collector_uri = 0, is_enable = 0, is_add = 1; + app_evt_main_t *alm = &app_evt_main; + clib_error_t *error = 0; + u64 tmp64 = 0; + + if (!unformat_user (input, unformat_line_input, line_input)) + return 0; + + if (alm->app_index != APP_INVALID_INDEX) + { + /* Default configs */ + alm->fifo_size = 4 << 20; + alm->segment_size = 32 << 20; + } + + while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT) + { + if (unformat (line_input, "enable")) + is_enable = 1; + else if (unformat (line_input, "fifo-size %U", unformat_memory_size, + &alm->fifo_size)) + ; + else if (unformat (line_input, "segment-size %U", unformat_memory_size, + &tmp64)) + alm->segment_size = tmp64; + else if (unformat (line_input, "uri %s", &collector_uri)) + vec_add1 (collector_uri, 0); + else if (unformat (line_input, "add")) + ; + else if (unformat (line_input, "del")) + is_add = 0; + else + { + error = clib_error_return (0, "unknown input `%U'", + format_unformat_error, line_input); + goto done; + } + } + + if (is_enable) + { + if (alm->app_index != APP_INVALID_INDEX) + { + error = clib_error_return (0, "app evt-collector already enabled"); + goto done; + } + app_evt_collector_attach (); + } + + if (collector_uri) + { + app_evt_collector_cfg_t cfg = { .is_server = 1 }; + + if (alm->app_index == APP_INVALID_INDEX) + { + error = clib_error_return (0, "app evt-collector not enabled"); + goto done; + } + + if (parse_uri ((char *) collector_uri, &cfg.sep)) + { + error = + clib_error_return (0, "Invalid collector uri [%v]", collector_uri); + goto done; + } + if (is_add && app_evt_collector_add (&cfg)) + { + error = clib_error_return (0, "Failed to add collector"); + goto done; + } + if (!is_add && app_evt_collector_del (&cfg)) + { + error = clib_error_return (0, "Failed to remove collector"); + goto done; + } + } + +done: + unformat_free (line_input); + vec_free (collector_uri); + return error; +} + +VLIB_CLI_COMMAND (app_evt_collector_command, static) = { + .path = "app evt-collector", + .short_help = "app evt-collector [enable] [segment-size [k|m]] " + "[fifo-size [k|m]] [add|del] uri ", + .function = app_evt_collector_enable_command_fn, +}; + +static u8 * +format_app_evt_collector (u8 *s, va_list *args) +{ + app_evt_collector_t *c = va_arg (*args, app_evt_collector_t *); + u32 i, indent; + + s = format (s, "[%u] ", c->collector_index); + indent = format_get_indent (s); + s = format (s, "remote %U:%u is server %d\n", format_ip46_address, + &c->cfg.sep.ip, c->cfg.sep.is_ip4, c->cfg.sep.port, + c->cfg.is_server); + s = format (s, "%Uis ready: %u session map: 0x%x\n", format_white_space, + indent, c->is_ready, c->session_map); + s = format (s, "%Usessions:\n", format_white_space, indent); + for (i = vlib_num_workers () ? 1 : 0; i < vec_len (c->wrk); i++) + { + if (c->wrk[i].session_handle != SESSION_INVALID_HANDLE) + { + session_t *cs = session_get_from_handle (c->wrk[i].session_handle); + transport_endpoint_t tep; + session_get_endpoint (cs, &tep, 1 /* is_lcl */); + s = format (s, "%U [%u:%u] %U:%u\n", format_white_space, indent, + cs->thread_index, cs->session_index, format_ip46_address, + &tep.ip, tep.is_ip4, tep.port); + } + else + s = format (s, "%U \n", format_white_space, indent, + format_session, c->wrk[i].session_handle); + } + + return s; +} + +static clib_error_t * +show_app_evt_collector_command_fn (vlib_main_t *vm, unformat_input_t *input, + vlib_cli_command_t *cmd) +{ + unformat_input_t _line_input, *line_input = &_line_input; + app_evt_main_t *alm = &app_evt_main; + clib_error_t *error = 0; + app_evt_collector_t *c; + + if (unformat_user (input, unformat_line_input, line_input)) + { + error = clib_error_return (0, "unknown input `%U'", + format_unformat_error, line_input); + goto done; + } + + if (alm->app_index == APP_INVALID_INDEX) + { + error = clib_error_return (0, "app evt-collector not enabled"); + goto done; + } + + vlib_cli_output (vm, "app evt-collector app-index: %u", alm->app_index); + vlib_cli_output (vm, " fifo size %U segment size %U", format_memory_size, + alm->fifo_size, format_memory_size, alm->segment_size); + pool_foreach (c, alm->collectors) + vlib_cli_output (vm, " %U", format_app_evt_collector, c, 0); + +done: + return error; +} + +VLIB_CLI_COMMAND (show_app_evt_collector_command, static) = { + .path = "show app evt-collector", + .short_help = "show app evt-collector", + .function = show_app_evt_collector_command_fn, +}; diff --git a/src/vnet/session/application_eventing.h b/src/vnet/session/application_eventing.h new file mode 100644 index 00000000000..a42061b02cd --- /dev/null +++ b/src/vnet/session/application_eventing.h @@ -0,0 +1,214 @@ +/* SPDX-License-Identifier: Apache-2.0 + * Copyright (c) 2025 Cisco Systems, Inc. + */ + +#ifndef SRC_VNET_SESSION_APP_EVENTING_H_ +#define SRC_VNET_SESSION_APP_EVENTING_H_ + +#include +#include +#include + +typedef enum app_evt_msg_type_ +{ + APP_EVT_MSG_CTRL, + APP_EVT_MSG_DATA +} __clib_packed app_evt_msg_type_t; + +typedef enum app_evt_msg_ctrl_type_ +{ + APP_EVT_MSG_CTRL_INIT, + APP_EVT_MSG_CTRL_REPLY_INIT, +} __clib_packed app_evt_msg_ctrl_type_t; + +typedef struct app_evt_msg_ctrl_ +{ + app_evt_msg_ctrl_type_t ctrl_type; + u32 msg_len; + u8 data[0]; +} __clib_packed app_evt_msg_ctrl_t; + +typedef enum app_evt_msg_data_msg_type_ +{ + APP_EVT_MSG_DATA_SESSION_STATS, +} __clib_packed app_evt_msg_data_msg_type_t; + +typedef struct app_evt_msg_data_ +{ + app_evt_msg_data_msg_type_t data_type; + u32 msg_len; + u8 data[0]; +} __clib_packed app_evt_msg_data_t; + +typedef struct app_evt_msg_data_session_stats_ +{ + u8 transport_proto_type; /**< vpp transport proto */ + u32 msg_len; + u8 data[0]; +} __clib_packed app_evt_msg_data_session_stats_t; + +typedef struct app_evt_msg_ +{ + app_evt_msg_type_t msg_type; + u32 msg_len; + u8 data[0]; +} __clib_packed app_evt_msg_t; + +#define foreach_tcp_transport_stat \ + _ (u64, segs_in) \ + _ (u64, bytes_in) \ + _ (u64, segs_out) \ + _ (u64, bytes_out) \ + _ (u64, data_segs_in) \ + _ (u64, data_segs_out) \ + _ (u32, snd_mss) \ + _ (u32, dupacks_in) \ + _ (u32, dupacks_out) \ + _ (u32, fr_occurences) \ + _ (u32, tr_occurences) \ + _ (u64, bytes_retrans) \ + _ (u64, segs_retrans) \ + _ (u32, srtt) \ + _ (u32, rttvar) \ + _ (f64, mrtt_us) \ + _ (tcp_errors_t, errors) \ + _ (f64, start_ts) + +typedef struct tcp_transport_stats_ +{ + u8 conn_id[TRANSPORT_CONN_ID_LEN]; +#define _(type, name) type name; + foreach_tcp_transport_stat +#undef _ + f64 end_ts; +} __clib_packed tcp_session_stats_t; + +#define foreach_udp_transport_stat \ + _ (u64, bytes_in) \ + _ (u64, dgrams_in) \ + _ (u64, bytes_out) \ + _ (u64, dgrams_out) \ + _ (u32, errors_in) \ + _ (u16, mss) + +typedef struct udp_transport_stats_ +{ + u8 conn_id[TRANSPORT_CONN_ID_LEN]; +#define _(type, name) type name; + foreach_udp_transport_stat +#undef _ + f64 end_ts; +} __clib_packed udp_session_stats_t; + +typedef struct ct_transport_stats_ +{ + u8 conn_id[TRANSPORT_CONN_ID_LEN]; + transport_proto_t actual_proto; + f64 end_ts; +} __clib_packed ct_session_stats_t; + +typedef struct app_evt_collector_cfg_ +{ + session_endpoint_cfg_t sep; /**< collector endpoint */ + u8 is_server : 1; /**< collector is server */ +} app_evt_collector_cfg_t; + +typedef struct app_evt_buffer_chunk_ +{ + u32 chunk_index; /**< index in pool */ + u32 next_index; /**< next in linked list */ + u32 len; /**< evt data length */ + u8 data[512]; /**< evt data */ +} __clib_packed app_evt_buffer_chunk_t; + +static inline void +app_evt_buf_chunk_append (app_evt_buffer_chunk_t *chunk, const void *data, + u32 len) +{ + clib_memcpy (chunk->data + chunk->len, data, len); + chunk->len += len; + ASSERT (chunk->len <= sizeof (chunk->data)); +} + +static inline void * +app_evt_buf_chunk_append_uninit (app_evt_buffer_chunk_t *chunk, u32 len) +{ + u8 *start = chunk->data + chunk->len; + chunk->len += len; + ASSERT (chunk->len <= sizeof (chunk->data)); + return start; +} + +typedef struct app_evt_buffer_ +{ + app_evt_buffer_chunk_t *chunks; /**< pool of chunks */ + u32 head_chunk; /**< head of linked list */ + u32 tail_chunk; /**< tail of linked list */ + u32 len; /**< evt data length */ +} app_evt_buffer_t; + +static inline app_evt_buffer_chunk_t * +app_evt_buffer_alloc_chunk (app_evt_buffer_t *buf) +{ + app_evt_buffer_chunk_t *chunk; + + pool_get_zero (buf->chunks, chunk); + chunk->chunk_index = chunk - buf->chunks; + chunk->next_index = ~0; + + return chunk; +} + +static inline app_evt_buffer_chunk_t * +app_evt_buffer_get_chunk (app_evt_buffer_t *buf, u32 chunk_index) +{ + if (pool_is_free_index (buf->chunks, chunk_index)) + return 0; + return pool_elt_at_index (buf->chunks, chunk_index); +} + +static inline void +app_evt_buffer_free_chunk (app_evt_buffer_t *buf, + app_evt_buffer_chunk_t *chunk) +{ + pool_put (buf->chunks, chunk); +} + +void app_evt_buffer_append_chunk (app_evt_buffer_t *buf, + app_evt_buffer_chunk_t *chunk); + +typedef struct app_evt_collector_wrk_ +{ + session_handle_t session_handle; /**< per-worker session handle */ + app_evt_buffer_t buf; /**< per-worker evt buffer */ + svm_fifo_seg_t *segs; +} app_evt_collector_wrk_t; + +typedef struct app_evt_collector_ +{ + app_evt_collector_wrk_t *wrk; /**< per-thread context */ + u8 is_ready : 1; /**< collector initialized */ + u32 collector_index; /**< collector index */ + u32 session_map; /**< map of connected sessions */ + u32 session_map_lock; /**< lock for session map */ + app_evt_collector_cfg_t cfg; /**< collector config */ +} app_evt_collector_t; + +typedef struct app_evt_main_ +{ + app_evt_collector_t *collectors; /**< pool of collectors */ + u32 app_index; /**< evt collector app index */ + + /* + * application config + */ + u32 segment_size; /**< segment size */ + u32 fifo_size; /**< fifo size */ +} app_evt_main_t; + +int app_evt_collector_add (app_evt_collector_cfg_t *cfg); +app_evt_collector_t *app_evt_collector_get (u32 c_index); +void *app_evt_collector_get_cb_fn (); +void app_evt_collector_wrk_send (app_evt_collector_wrk_t *cwrk); + +#endif /* SRC_VNET_SESSION_APP_EVENTING_H_ */ \ No newline at end of file diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h index 33b61187fe3..a8ca422afac 100644 --- a/src/vnet/session/application_interface.h +++ b/src/vnet/session/application_interface.h @@ -84,6 +84,8 @@ typedef struct session_cb_vft_ /** Custom fifo allocation for proxy */ int (*proxy_alloc_session_fifos) (session_t *s); + /** Collect and export session logs */ + int (*app_evt_callback) (session_t *s); } session_cb_vft_t; #define foreach_app_init_args \ @@ -243,7 +245,8 @@ typedef enum _ (EVT_MQ_USE_EVENTFD, "Use eventfds for signaling") \ _ (MEMFD_FOR_BUILTIN, "Use memfd for builtin app segs") \ _ (USE_HUGE_PAGE, "Use huge page for FIFO") \ - _ (GET_ORIGINAL_DST, "Get original dst enabled") + _ (GET_ORIGINAL_DST, "Get original dst enabled") \ + _ (LOG_COLLECTOR, "App requests log collector") typedef enum _app_options { diff --git a/src/vnet/session/session_input.c b/src/vnet/session/session_input.c index ffd27802ab2..ca12ae55376 100644 --- a/src/vnet/session/session_input.c +++ b/src/vnet/session/session_input.c @@ -251,6 +251,8 @@ app_worker_flush_events_inline (app_worker_t *app_wrk, } if (evt->as_u64[0] >> 32 == SESSION_CLEANUP_TRANSPORT) { + if (app->cb_fns.app_evt_callback) + app->cb_fns.app_evt_callback (s); /* postponed cleanup requested */ if (evt->as_u64[1]) transport_cleanup_cb ((void *) evt->as_u64[1], -- 2.16.6