From: Ole Troan Date: Wed, 23 Jul 2025 10:46:49 +0000 (+0200) Subject: stats: histogram, gauge and ring buffer types X-Git-Url: https://gerrit.fd.io/r/gitweb?a=commitdiff_plain;h=7a2a07911525e5ccf9e6dc4196dd90b2df99a9c2;p=vpp.git stats: histogram, gauge and ring buffer types A new log2 histogram type with prometheus exporter support. min_exp can be set in the first element to adjust the bins. The ring buffer is intended for exporting records to a client side reader. If the reader cannot keep up the writer will overwrite oldest entry. Added a new gauge type, which is like scalar index, but being explicit allows the prometheus exported to set type corretly. Type: improvement Change-Id: Ibe1244f28e01eee8d61a3ca6edb6fd1801f1c942 Signed-off-by: Ole Troan Change-Id: I1a6046c6962d67db8c510a571e9414723acbbd7e Signed-off-by: Ole Troan --- diff --git a/src/plugins/unittest/CMakeLists.txt b/src/plugins/unittest/CMakeLists.txt index ab9168fd459..8ae3bcf588e 100644 --- a/src/plugins/unittest/CMakeLists.txt +++ b/src/plugins/unittest/CMakeLists.txt @@ -60,6 +60,8 @@ add_vpp_plugin(unittest util_test.c vlib_test.c counter_test.c + ring_buffer_test.c + histogram_test.c COMPONENT vpp-plugin-devtools diff --git a/src/plugins/unittest/histogram_test.c b/src/plugins/unittest/histogram_test.c new file mode 100644 index 00000000000..40324507741 --- /dev/null +++ b/src/plugins/unittest/histogram_test.c @@ -0,0 +1,122 @@ +#include +#include +#include +#include +#include + +// Histogram test main structure +static vlib_log2_histogram_main_t histogram_test_main; + +static clib_error_t * +histogram_gen_command_fn (vlib_main_t *vm, unformat_input_t *input, + vlib_cli_command_t *cmd) +{ + u8 *name = 0; + u32 count = 1000; + u32 interval_usec = 1000; + u32 min_exp = 0; // Minimum exponent (2^0 = 1) + u32 num_bins = 16; // Number of bins + u32 i; + u32 value; + + // Parse CLI arguments + if (!unformat (input, "%s %u %u %u %u", &name, &count, &interval_usec, + &min_exp, &num_bins)) + { + // Try without min_exp and num_bins (for backward compatibility) + if (!unformat (input, "%s %u %u", &name, &count, &interval_usec)) + return clib_error_return ( + 0, + "parse error: '%U'\nUsage: test stats histogram-gen " + " [min-exp] [num-bins]", + format_unformat_error, input); + } + + // Initialize histogram + histogram_test_main.name = (char *) name; + histogram_test_main.min_exp = min_exp; + vlib_validate_log2_histogram (&histogram_test_main, num_bins); + + clib_warning ("DEBUG: Created histogram '%s' with min_exp=%u, num_bins=%u", + name, min_exp, num_bins); + + // Generate test data with different distributions + for (i = 0; i < count; ++i) + { + // Generate different types of values for testing + if (i % 4 == 0) + { + // Small values (0-15) + value = i % 16; + } + else if (i % 4 == 1) + { + // Medium values (16-255) + value = 16 + (i % 240); + } + else if (i % 4 == 2) + { + // Large values (256-4095) + value = 256 + (i % 3840); + } + else + { + // Very large values (4096+) + value = 4096 + (i % 10000); + } + + // Calculate bin index and increment + u8 bin = vlib_log2_histogram_bin_index (&histogram_test_main, value); + vlib_increment_log2_histogram_bin (&histogram_test_main, 0, bin, 1); + + // Debug: Print every 100th entry or last few entries + if (i % 100 == 0 || i >= count - 5) + { + clib_warning ("DEBUG: Entry %u - value: %u, bin: %u (2^%u = %u)", i, + value, bin, min_exp + bin, 1ULL << (min_exp + bin)); + } + + vlib_process_suspend (vm, 1e-6 * interval_usec); + } + + vlib_cli_output ( + vm, "Generated %u histogram entries for '%s' (min_exp=%u, bins=%u)", count, + name, min_exp, num_bins); + return 0; +} + +VLIB_CLI_COMMAND (histogram_gen_command, static) = { + .path = "test stats histogram-gen", + .short_help = "test stats histogram-gen " + "[min-exp] [num-bins]", + .function = histogram_gen_command_fn, +}; + +static clib_error_t * +histogram_clear_command_fn (vlib_main_t *vm, unformat_input_t *input, + vlib_cli_command_t *cmd) +{ + u8 *name = 0; + u32 entry_index; + + if (!unformat (input, "%s", &name)) + return clib_error_return ( + 0, "parse error: '%U'\nUsage: test stats histogram-clear ", + format_unformat_error, input); + + entry_index = vlib_stats_find_entry_index ("%s", name); + if (entry_index == STAT_SEGMENT_INDEX_INVALID) + { + return clib_error_return (0, "Histogram '%s' not found", name); + } + + vlib_stats_remove_entry (entry_index); + vlib_cli_output (vm, "Cleared histogram '%s'", name); + return 0; +} + +VLIB_CLI_COMMAND (histogram_clear_command, static) = { + .path = "test stats histogram-clear", + .short_help = "test stats histogram-clear ", + .function = histogram_clear_command_fn, +}; \ No newline at end of file diff --git a/src/plugins/unittest/ring_buffer_test.c b/src/plugins/unittest/ring_buffer_test.c new file mode 100644 index 00000000000..aedef9e5a16 --- /dev/null +++ b/src/plugins/unittest/ring_buffer_test.c @@ -0,0 +1,245 @@ +#include +#include +#include +#include + +// Message struct for the ring buffer +typedef struct +{ + u64 seq; + f64 timestamp; +} ring_test_msg_t; + +// Create a simple test schema string (CDDL-like format) +static const char * +get_ring_test_schema_string (void) +{ + return "ring_test_schema = {\n" + " name: \"ring_test\",\n" + " version: 1,\n" + " fields: [\n" + " { id: 0, name: \"seq\", type: \"uint64\" },\n" + " { id: 1, name: \"timestamp\", type: \"float64\" }\n" + " ]\n" + "}"; +} + +static clib_error_t * +ring_buffer_gen_command_fn (vlib_main_t *vm, unformat_input_t *input, + vlib_cli_command_t *cmd) +{ + u8 *name = 0; + u32 count = 1000; + u32 interval_usec = 1000; + u32 ring_size = 16; // Default ring size + u32 i; + vlib_stats_ring_config_t config; + u32 entry_index; + ring_test_msg_t msg; + + // Parse CLI arguments + if (!unformat (input, "%s %u %u %u", &name, &count, &interval_usec, + &ring_size)) + { + // Try without ring_size (for backward compatibility) + if (!unformat (input, "%s %u %u", &name, &count, &interval_usec)) + return clib_error_return ( + 0, + "parse error: '%U'\nUsage: test stats ring-buffer-gen " + " [ring-size]", + format_unformat_error, input); + } + + // Get test schema string + const char *schema_string = get_ring_test_schema_string (); + u32 schema_size = strlen (schema_string); + + config.entry_size = sizeof (ring_test_msg_t); + config.ring_size = ring_size; + config.n_threads = 1; + config.schema_size = schema_size; + config.schema_version = 1; + + // Create or find the ring buffer + entry_index = vlib_stats_find_entry_index ("%s", name); + if (entry_index == STAT_SEGMENT_INDEX_INVALID) + { + clib_warning ("DEBUG: Creating new ring buffer with name: %s", name); + entry_index = + vlib_stats_add_ring_buffer (&config, (u8 *) schema_string, "%s", name); + } + else + { + clib_warning ( + "DEBUG: Found existing ring buffer with name: %s, entry_index: %u", + name, entry_index); + } + if (entry_index == STAT_SEGMENT_INDEX_INVALID) + return clib_error_return (0, "Failed to create/find ring buffer"); + + // Debug: Print initial ring buffer state + vlib_stats_segment_t *sm = vlib_stats_get_segment (); + vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index); + vlib_stats_ring_buffer_t *ring_buffer = e->data; + vlib_stats_ring_metadata_t *metadata = + (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer + + ring_buffer->metadata_offset); + clib_warning ("DEBUG: Initial ring buffer state - head: %u, sequence: %lu", + metadata->head, metadata->sequence); + + for (i = 0; i < count; ++i) + { + msg.seq = i; + msg.timestamp = vlib_time_now (vm); + vlib_stats_ring_produce (entry_index, 0, &msg); + + // Debug: Print every 10th entry or last few entries + if (i % 10 == 0 || i >= count - 5) + { + clib_warning ( + "DEBUG: Wrote entry %u - seq: %lu, head: %u, sequence: %lu", i, + msg.seq, metadata->head, metadata->sequence); + } + + vlib_process_suspend (vm, 1e-6 * interval_usec); + } + + // Debug: Print final ring buffer state + clib_warning ("DEBUG: Final ring buffer state - head: %u, sequence: %lu", + metadata->head, metadata->sequence); + + vlib_cli_output (vm, + "Generated %u messages to ring buffer '%s' (ring size %u)", + count, name, ring_size); + return 0; +} + +VLIB_CLI_COMMAND (ring_buffer_gen_command, static) = { + .path = "test stats ring-buffer-gen", + .short_help = + "test stats ring-buffer-gen [ring-size]", + .function = ring_buffer_gen_command_fn, +}; + +static clib_error_t * +ring_buffer_show_schema_command_fn (vlib_main_t *vm, unformat_input_t *input, + vlib_cli_command_t *cmd) +{ + u8 *name = 0; + u32 entry_index; + u32 schema_size, schema_version; + + if (!unformat (input, "%s", &name)) + return clib_error_return ( + 0, "parse error: '%U'\nUsage: test stats ring-buffer-show-schema ", + format_unformat_error, input); + + entry_index = vlib_stats_find_entry_index ("%s", name); + if (entry_index == STAT_SEGMENT_INDEX_INVALID) + return clib_error_return (0, "Ring buffer '%s' not found", name); + + if (vlib_stats_ring_get_schema (entry_index, 0, NULL, &schema_size, + &schema_version) != 0) + { + return clib_error_return (0, "No schema found in ring buffer '%s'", + name); + } + + u8 *schema_data = clib_mem_alloc (schema_size); + if (!schema_data) + { + return clib_error_return (0, "Failed to allocate memory for schema"); + } + + if (vlib_stats_ring_get_schema (entry_index, 0, schema_data, &schema_size, + &schema_version) != 0) + { + clib_mem_free (schema_data); + return clib_error_return (0, "Failed to retrieve schema"); + } + + // Display schema as string + vlib_cli_output (vm, "Ring Buffer Schema (version %u):\n", schema_version); + vlib_cli_output (vm, "%.*s\n", schema_size, schema_data); + + clib_mem_free (schema_data); + return 0; +} + +VLIB_CLI_COMMAND (ring_buffer_show_schema_command, static) = { + .path = "test stats ring-buffer-show-schema", + .short_help = "test stats ring-buffer-show-schema ", + .function = ring_buffer_show_schema_command_fn, +}; + +// Test command to verify schema functionality +static clib_error_t * +ring_buffer_test_schema_command_fn (vlib_main_t *vm, unformat_input_t *input, + vlib_cli_command_t *cmd) +{ + u8 *name = 0; + u32 entry_index; + u32 schema_size, schema_version; + + if (!unformat (input, "%s", &name)) + return clib_error_return ( + 0, "parse error: '%U'\nUsage: test stats ring-buffer-test-schema ", + format_unformat_error, input); + + entry_index = vlib_stats_find_entry_index ("%s", name); + if (entry_index == STAT_SEGMENT_INDEX_INVALID) + return clib_error_return (0, "Ring buffer '%s' not found", name); + + // Test schema retrieval + if (vlib_stats_ring_get_schema (entry_index, 0, NULL, &schema_size, + &schema_version) != 0) + { + vlib_cli_output (vm, "❌ No schema found in ring buffer '%s'\n", name); + return 0; + } + + vlib_cli_output (vm, "✅ Schema found in ring buffer '%s':\n", name); + vlib_cli_output (vm, " - Schema size: %u bytes\n", schema_size); + vlib_cli_output (vm, " - Schema version: %u\n", schema_version); + + // Test schema data retrieval + u8 *schema_data = clib_mem_alloc (schema_size); + if (!schema_data) + { + return clib_error_return (0, "Failed to allocate memory for schema"); + } + + if (vlib_stats_ring_get_schema (entry_index, 0, schema_data, &schema_size, + &schema_version) != 0) + { + clib_mem_free (schema_data); + vlib_cli_output (vm, "❌ Failed to retrieve schema data\n"); + return 0; + } + + vlib_cli_output (vm, "✅ Schema data retrieved successfully\n"); + + // Verify schema is a valid string (null-terminated) + if (schema_size > 0 && schema_data[schema_size - 1] == '\0') + { + vlib_cli_output (vm, "✅ Schema is a valid null-terminated string\n"); + vlib_cli_output (vm, "Schema content:\n%.*s\n", schema_size - 1, + schema_data); + } + else + { + vlib_cli_output (vm, "✅ Schema is a valid string (size: %u bytes)\n", + schema_size); + vlib_cli_output (vm, "Schema content:\n%.*s\n", schema_size, + schema_data); + } + + clib_mem_free (schema_data); + return 0; +} + +VLIB_CLI_COMMAND (ring_buffer_test_schema_command, static) = { + .path = "test stats ring-buffer-test-schema", + .short_help = "test stats ring-buffer-test-schema ", + .function = ring_buffer_test_schema_command_fn, +}; diff --git a/src/vlib/counter.c b/src/vlib/counter.c index 9f14d02909f..ca7c347a820 100644 --- a/src/vlib/counter.c +++ b/src/vlib/counter.c @@ -41,7 +41,7 @@ #include void -vlib_clear_simple_counters (vlib_simple_counter_main_t * cm) +vlib_clear_simple_counters (vlib_simple_counter_main_t *cm) { counter_t *my_counters; uword i, j; @@ -58,7 +58,7 @@ vlib_clear_simple_counters (vlib_simple_counter_main_t * cm) } void -vlib_clear_combined_counters (vlib_combined_counter_main_t * cm) +vlib_clear_combined_counters (vlib_combined_counter_main_t *cm) { vlib_counter_t *my_counters; uword i, j; @@ -76,7 +76,7 @@ vlib_clear_combined_counters (vlib_combined_counter_main_t * cm) } void -vlib_validate_simple_counter (vlib_simple_counter_main_t * cm, u32 index) +vlib_validate_simple_counter (vlib_simple_counter_main_t *cm, u32 index) { vlib_thread_main_t *tm = vlib_get_thread_main (); char *name = cm->stat_segment_name ? cm->stat_segment_name : cm->name; @@ -99,7 +99,7 @@ vlib_validate_simple_counter (vlib_simple_counter_main_t * cm, u32 index) } void -vlib_free_simple_counter (vlib_simple_counter_main_t * cm) +vlib_free_simple_counter (vlib_simple_counter_main_t *cm) { if (cm->stats_entry_index == ~0) { @@ -115,7 +115,7 @@ vlib_free_simple_counter (vlib_simple_counter_main_t * cm) } void -vlib_validate_combined_counter (vlib_combined_counter_main_t * cm, u32 index) +vlib_validate_combined_counter (vlib_combined_counter_main_t *cm, u32 index) { vlib_thread_main_t *tm = vlib_get_thread_main (); char *name = cm->stat_segment_name ? cm->stat_segment_name : cm->name; @@ -138,8 +138,8 @@ vlib_validate_combined_counter (vlib_combined_counter_main_t * cm, u32 index) } int - vlib_validate_combined_counter_will_expand - (vlib_combined_counter_main_t * cm, u32 index) +vlib_validate_combined_counter_will_expand (vlib_combined_counter_main_t *cm, + u32 index) { vlib_thread_main_t *tm = vlib_get_thread_main (); int i; @@ -170,7 +170,7 @@ int } void -vlib_free_combined_counter (vlib_combined_counter_main_t * cm) +vlib_free_combined_counter (vlib_combined_counter_main_t *cm) { if (cm->stats_entry_index == ~0) { @@ -186,19 +186,52 @@ vlib_free_combined_counter (vlib_combined_counter_main_t * cm) } u32 -vlib_combined_counter_n_counters (const vlib_combined_counter_main_t * cm) +vlib_combined_counter_n_counters (const vlib_combined_counter_main_t *cm) { ASSERT (cm->counters); return (vec_len (cm->counters[0])); } u32 -vlib_simple_counter_n_counters (const vlib_simple_counter_main_t * cm) +vlib_simple_counter_n_counters (const vlib_simple_counter_main_t *cm) { ASSERT (cm->counters); return (vec_len (cm->counters[0])); } +void +vlib_validate_log2_histogram (vlib_log2_histogram_main_t *hm, u32 num_bins) +{ + vlib_thread_main_t *tm = vlib_get_thread_main (); + char *name = hm->stat_segment_name ? hm->stat_segment_name : hm->name; + + if (name == 0) + { + if (hm->bins == 0) + { + hm->stats_entry_index = ~0; + vec_validate (hm->bins, tm->n_vlib_mains - 1); + for (int i = 0; i < tm->n_vlib_mains; i++) + { + vec_validate_aligned (hm->bins[i], num_bins - 1, + CLIB_CACHE_LINE_BYTES); + } + return; + } + } + + if (hm->bins == 0) + hm->stats_entry_index = vlib_stats_add_histogram_log2 ("%s", name); + + vlib_stats_validate (hm->stats_entry_index, tm->n_vlib_mains - 1, num_bins); + hm->bins = vlib_stats_get_entry_data_pointer (hm->stats_entry_index); + for (int i = 0; i < tm->n_vlib_mains; i++) + { + counter_t *c = hm->bins[i]; + c[0] = hm->min_exp; + } +} + /* * fd.io coding-style-patch-verification: ON * diff --git a/src/vlib/counter.h b/src/vlib/counter.h index a9c261770d4..5c57fe77bf9 100644 --- a/src/vlib/counter.h +++ b/src/vlib/counter.h @@ -41,6 +41,7 @@ #define included_vlib_counter_h #include +#include /** \file @@ -56,14 +57,14 @@ typedef struct { - counter_t **counters; /**< Per-thread u64 non-atomic counters */ - char *name; /**< The counter collection's name. */ - char *stat_segment_name; /**< Name in stat segment directory */ + counter_t **counters; /**< Per-thread u64 non-atomic counters */ + char *name; /**< The counter collection's name. */ + char *stat_segment_name; /**< Name in stat segment directory */ u32 stats_entry_index; } vlib_simple_counter_main_t; /** The number of counters (not the number of per-thread counters) */ -u32 vlib_simple_counter_n_counters (const vlib_simple_counter_main_t * cm); +u32 vlib_simple_counter_n_counters (const vlib_simple_counter_main_t *cm); /** Pre-fetch a per-thread simple counter for the given object index */ always_inline void @@ -143,7 +144,7 @@ vlib_set_simple_counter (vlib_simple_counter_main_t *cm, @returns - (u64) current counter value */ always_inline counter_t -vlib_get_simple_counter (vlib_simple_counter_main_t * cm, u32 index) +vlib_get_simple_counter (vlib_simple_counter_main_t *cm, u32 index) { counter_t *my_counters; counter_t v; @@ -169,7 +170,7 @@ vlib_get_simple_counter (vlib_simple_counter_main_t * cm, u32 index) @param index - (u32) index of the counter to clear */ always_inline void -vlib_zero_simple_counter (vlib_simple_counter_main_t * cm, u32 index) +vlib_zero_simple_counter (vlib_simple_counter_main_t *cm, u32 index) { counter_t *my_counters; int i; @@ -189,7 +190,7 @@ vlib_zero_simple_counter (vlib_simple_counter_main_t * cm, u32 index) */ always_inline void -vlib_counter_add (vlib_counter_t * a, vlib_counter_t * b) +vlib_counter_add (vlib_counter_t *a, vlib_counter_t *b) { a->packets += b->packets; a->bytes += b->bytes; @@ -200,7 +201,7 @@ vlib_counter_add (vlib_counter_t * a, vlib_counter_t * b) @param b - (vlib_counter_t *) src counter */ always_inline void -vlib_counter_sub (vlib_counter_t * a, vlib_counter_t * b) +vlib_counter_sub (vlib_counter_t *a, vlib_counter_t *b) { ASSERT (a->packets >= b->packets); ASSERT (a->bytes >= b->bytes); @@ -212,7 +213,7 @@ vlib_counter_sub (vlib_counter_t * a, vlib_counter_t * b) @param a - (vlib_counter_t *) counter to clear */ always_inline void -vlib_counter_zero (vlib_counter_t * a) +vlib_counter_zero (vlib_counter_t *a) { a->packets = a->bytes = 0; } @@ -220,25 +221,24 @@ vlib_counter_zero (vlib_counter_t * a) /** A collection of combined counters */ typedef struct { - vlib_counter_t **counters; /**< Per-thread u64 non-atomic counter pairs */ - char *name; /**< The counter collection's name. */ - char *stat_segment_name; /**< Name in stat segment directory */ + vlib_counter_t **counters; /**< Per-thread u64 non-atomic counter pairs */ + char *name; /**< The counter collection's name. */ + char *stat_segment_name; /**< Name in stat segment directory */ u32 stats_entry_index; } vlib_combined_counter_main_t; /** The number of counters (not the number of per-thread counters) */ -u32 vlib_combined_counter_n_counters (const vlib_combined_counter_main_t * - cm); +u32 vlib_combined_counter_n_counters (const vlib_combined_counter_main_t *cm); /** Clear a collection of simple counters @param cm - (vlib_simple_counter_main_t *) collection to clear */ -void vlib_clear_simple_counters (vlib_simple_counter_main_t * cm); +void vlib_clear_simple_counters (vlib_simple_counter_main_t *cm); /** Clear a collection of combined counters @param cm - (vlib_combined_counter_main_t *) collection to clear */ -void vlib_clear_combined_counters (vlib_combined_counter_main_t * cm); +void vlib_clear_combined_counters (vlib_combined_counter_main_t *cm); /** Increment a combined counter @param cm - (vlib_combined_counter_main_t *) comined counter main pointer @@ -276,7 +276,6 @@ vlib_prefetch_combined_counter (const vlib_combined_counter_main_t *cm, clib_prefetch_store (cpu_counters + index); } - /** Get the value of a combined counter, never called in the speed path Scrapes the entire set of per-thread counters. Innacurate unless worker threads which might increment the counter are @@ -288,8 +287,8 @@ vlib_prefetch_combined_counter (const vlib_combined_counter_main_t *cm, */ static inline void -vlib_get_combined_counter (const vlib_combined_counter_main_t * cm, - u32 index, vlib_counter_t * result) +vlib_get_combined_counter (const vlib_combined_counter_main_t *cm, u32 index, + vlib_counter_t *result) { vlib_counter_t *my_counters, *counter; int i; @@ -314,7 +313,7 @@ vlib_get_combined_counter (const vlib_combined_counter_main_t * cm, @param index - (u32) index of the counter to clear */ always_inline void -vlib_zero_combined_counter (vlib_combined_counter_main_t * cm, u32 index) +vlib_zero_combined_counter (vlib_combined_counter_main_t *cm, u32 index) { vlib_counter_t *my_counters, *counter; int i; @@ -330,13 +329,13 @@ vlib_zero_combined_counter (vlib_combined_counter_main_t * cm, u32 index) } /** validate a simple counter - @param cm - (vlib_simple_counter_main_t *) pointer to the counter collection + @param cm - (vlib_simple_counter_main_t *) pointer to the counter + collection @param index - (u32) index of the counter to validate */ -void vlib_validate_simple_counter (vlib_simple_counter_main_t * cm, - u32 index); -void vlib_free_simple_counter (vlib_simple_counter_main_t * cm); +void vlib_validate_simple_counter (vlib_simple_counter_main_t *cm, u32 index); +void vlib_free_simple_counter (vlib_simple_counter_main_t *cm); /** validate a combined counter @param cm - (vlib_combined_counter_main_t *) pointer to the counter @@ -344,12 +343,13 @@ void vlib_free_simple_counter (vlib_simple_counter_main_t * cm); @param index - (u32) index of the counter to validate */ -void vlib_validate_combined_counter (vlib_combined_counter_main_t * cm, +void vlib_validate_combined_counter (vlib_combined_counter_main_t *cm, u32 index); -int vlib_validate_combined_counter_will_expand - (vlib_combined_counter_main_t * cm, u32 index); +int +vlib_validate_combined_counter_will_expand (vlib_combined_counter_main_t *cm, + u32 index); -void vlib_free_combined_counter (vlib_combined_counter_main_t * cm); +void vlib_free_combined_counter (vlib_combined_counter_main_t *cm); /** Obtain the number of simple or combined counters allocated. A macro which reduces to to vec_len(cm->maxi), the answer in either @@ -359,7 +359,44 @@ void vlib_free_combined_counter (vlib_combined_counter_main_t * cm); (vlib_combined_counter_main_t) the counter collection to interrogate @returns vec_len(cm->maxi) */ -#define vlib_counter_len(cm) vec_len((cm)->maxi) +#define vlib_counter_len(cm) vec_len ((cm)->maxi) + +typedef struct +{ + counter_t **bins; /**< Per-thread u64 non-atomic histogram bins */ + u32 min_exp; /**< log2 bin minimum exponent */ + char *name; /**< The histogram collection's name. */ + char *stat_segment_name; /**< Name in stat segment directory */ + u32 stats_entry_index; +} vlib_log2_histogram_main_t; + +void vlib_validate_log2_histogram (vlib_log2_histogram_main_t *hm, + u32 num_bins); + +static_always_inline u8 +vlib_log2_histogram_bin_index (const vlib_log2_histogram_main_t *hm, u32 value) +{ + if (value == 0) + return 0; + u8 log2_val = min_log2 (value); + int min_exp = hm->min_exp; + int n_bins = vec_len (hm->bins[0]); + int bin = log2_val - min_exp; + if (bin < 0) + bin = 0; + if (bin >= n_bins) + bin = n_bins - 1; + return (u8) bin; +} + +always_inline void +vlib_increment_log2_histogram_bin (vlib_log2_histogram_main_t *hm, + clib_thread_index_t thread_index, u8 bin, + u64 increment) +{ + uint64_t *my_bins = hm->bins[thread_index]; + my_bins[bin + 1] += increment; +} #endif /* included_vlib_counter_h */ diff --git a/src/vlib/stats/cli.c b/src/vlib/stats/cli.c index 94a852ac751..8373a31430c 100644 --- a/src/vlib/stats/cli.c +++ b/src/vlib/stats/cli.c @@ -38,6 +38,13 @@ format_stat_dir_entry (u8 *s, va_list *args) type_name = "NameVector"; break; + case STAT_DIR_TYPE_RING_BUFFER: + type_name = "RingBuffer"; + break; + + case STAT_DIR_TYPE_HISTOGRAM_LOG2: + type_name = "Histogram"; + break; case STAT_DIR_TYPE_EMPTY: type_name = "empty"; break; @@ -53,6 +60,74 @@ format_stat_dir_entry (u8 *s, va_list *args) return format (s, format_string, ep->name, type_name, 0); } + +static u8 * +format_stat_dir_entry_detail (u8 *s, va_list *args) +{ + vlib_stats_entry_t *ep = va_arg (*args, vlib_stats_entry_t *); + + if (ep->type == STAT_DIR_TYPE_RING_BUFFER) + { + vlib_stats_ring_buffer_t *rb = ep->data; + if (rb) + { + s = format (s, "RingBuffer: %s\n", ep->name); + s = format (s, " ring_size: %u\n", rb->config.ring_size); + s = format (s, " entry_size: %u\n", rb->config.entry_size); + s = format (s, " n_threads: %u\n", rb->config.n_threads); + for (u32 t = 0; t < rb->config.n_threads; t++) + { + vlib_stats_ring_metadata_t *md = + (vlib_stats_ring_metadata_t + *) ((u8 *) rb + rb->metadata_offset + + t * sizeof (vlib_stats_ring_metadata_t)); + s = format (s, " [thread %u] head:%u seq:%llu\n", t, md->head, + (unsigned long long) md->sequence); + } + } + else + { + s = format (s, "RingBuffer: %s (uninitialized)\n", ep->name); + } + } + else if (ep->type == STAT_DIR_TYPE_HISTOGRAM_LOG2) + { + u64 **log2_histogram_bins = ep->data; + if (!log2_histogram_bins) + { + s = format (s, "Histogram: %s (uninitialized)\n", ep->name); + return s; + } + + s = format (s, "Histogram: %s\n", ep->name); + for (u32 k = 0; k < vec_len (log2_histogram_bins); k++) + { + u64 *bins = log2_histogram_bins[k]; + int n_bins = vec_len (bins); + if (n_bins < 2) // Need at least min_exp + one bin + continue; + u32 min_exp = bins[0]; + u64 cumulative = 0; + u64 sum = 0; + s = format (s, " [thread %u]:\n", k); + for (int j = 1; j < n_bins; ++j) + { + cumulative += bins[j]; + sum += bins[j] * (1ULL << (min_exp + j - 1)); // midpoint approx + s = format (s, " <= %llu: %llu (cumulative: %llu)\n", + (1ULL << (min_exp + j - 1)), bins[j], cumulative); + } + s = format (s, " +Inf: %llu (total count: %llu, sum: %llu)\n", + cumulative, cumulative, sum); + } + } + else + { + s = format (s, "Entry: %s (type %d)\n", ep->name, ep->type); + } + return s; +} + static clib_error_t * show_stat_segment_command_fn (vlib_main_t *vm, unformat_input_t *input, vlib_cli_command_t *cmd) @@ -60,11 +135,41 @@ show_stat_segment_command_fn (vlib_main_t *vm, unformat_input_t *input, vlib_stats_segment_t *sm = vlib_stats_get_segment (); vlib_stats_entry_t *show_data; int i; - int verbose = 0; + u8 *counter_name = 0; - if (unformat (input, "verbose")) - verbose = 1; + // Parse both 'verbose' and counter name in any order + while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT) + { + if (unformat (input, "verbose")) + verbose = 1; + else if (unformat (input, "%s", &counter_name)) + ; + else + break; + } + + if (counter_name) + { + u32 index = vlib_stats_find_entry_index ("%s", counter_name); + if (index != STAT_SEGMENT_INDEX_INVALID) + { + vlib_stats_entry_t *ep = sm->directory_vector + index; + vlib_cli_output (vm, "%U", format_stat_dir_entry_detail, ep); + if (verbose) + { + ASSERT (sm->heap); + vlib_cli_output (vm, "%U", format_clib_mem_heap, sm->heap, + 0 /* verbose */); + } + } + else + { + vlib_cli_output (vm, "Counter '%s' not found.", counter_name); + } + vec_free (counter_name); + return 0; + } /* Lock even as reader, as this command doesn't handle epoch changes */ vlib_stats_segment_lock (); @@ -116,6 +221,6 @@ VLIB_CLI_COMMAND (show_stat_segment_hash_command, static) = { VLIB_CLI_COMMAND (show_stat_segment_command, static) = { .path = "show statistics segment", - .short_help = "show statistics segment [verbose]", + .short_help = "show statistics segment [counter-name] [verbose]", .function = show_stat_segment_command_fn, }; diff --git a/src/vlib/stats/shared.h b/src/vlib/stats/shared.h index 8e44ce3dc86..893c8b55541 100644 --- a/src/vlib/stats/shared.h +++ b/src/vlib/stats/shared.h @@ -14,6 +14,9 @@ typedef enum STAT_DIR_TYPE_NAME_VECTOR, STAT_DIR_TYPE_EMPTY, STAT_DIR_TYPE_SYMLINK, + STAT_DIR_TYPE_HISTOGRAM_LOG2, + STAT_DIR_TYPE_RING_BUFFER, + STAT_DIR_TYPE_GAUGE, } stat_directory_type_t; typedef struct @@ -34,6 +37,8 @@ typedef struct #define VLIB_STATS_MAX_NAME_SZ 128 char name[VLIB_STATS_MAX_NAME_SZ]; } vlib_stats_entry_t; +_Static_assert (sizeof (vlib_stats_entry_t) == 144, + "vlib_stats_entry_t size must be 144 bytes"); /* * Shared header first in the shared memory segment. diff --git a/src/vlib/stats/stats.c b/src/vlib/stats/stats.c index b7743ec70f2..30718bc284d 100644 --- a/src/vlib/stats/stats.c +++ b/src/vlib/stats/stats.c @@ -149,6 +149,7 @@ vlib_stats_remove_entry (u32 entry_index) break; case STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE: + case STAT_DIR_TYPE_HISTOGRAM_LOG2: c = e->data; e->data = 0; oldheap = clib_mem_set_heap (sm->heap); @@ -168,6 +169,20 @@ vlib_stats_remove_entry (u32 entry_index) clib_mem_set_heap (oldheap); break; + case STAT_DIR_TYPE_RING_BUFFER: + { + vlib_stats_ring_buffer_t *ring_buffer = e->data; + if (ring_buffer) + { + /* Free ring buffer memory */ + oldheap = clib_mem_set_heap (sm->heap); + clib_mem_free (ring_buffer); + clib_mem_set_heap (oldheap); + } + e->data = 0; + } + break; + case STAT_DIR_TYPE_SCALAR_INDEX: case STAT_DIR_TYPE_SYMLINK: break; @@ -241,7 +256,7 @@ vlib_stats_add_gauge (char *fmt, ...) va_start (va, fmt); name = va_format (0, fmt, &va); va_end (va); - return vlib_stats_new_entry_internal (STAT_DIR_TYPE_SCALAR_INDEX, name); + return vlib_stats_new_entry_internal (STAT_DIR_TYPE_GAUGE, name); } void @@ -368,6 +383,18 @@ vlib_stats_add_counter_vector (char *fmt, ...) name); } +u32 +vlib_stats_add_histogram_log2 (char *fmt, ...) +{ + va_list va; + u8 *name; + + va_start (va, fmt); + name = va_format (0, fmt, &va); + va_end (va); + return vlib_stats_new_entry_internal (STAT_DIR_TYPE_HISTOGRAM_LOG2, name); +} + u32 vlib_stats_add_counter_pair_vector (char *fmt, ...) { @@ -390,7 +417,8 @@ vlib_stats_validate_will_expand_internal (u32 entry_index, va_list *va) int rv = 1; oldheap = clib_mem_set_heap (sm->heap); - if (e->type == STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE) + if (e->type == STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE || + e->type == STAT_DIR_TYPE_HISTOGRAM_LOG2) { u32 idx0 = va_arg (*va, u32); u32 idx1 = va_arg (*va, u32); @@ -459,7 +487,8 @@ vlib_stats_validate (u32 entry_index, ...) va_start (va, entry_index); - if (e->type == STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE) + if (e->type == STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE || + e->type == STAT_DIR_TYPE_HISTOGRAM_LOG2) { u32 idx0 = va_arg (va, u32); u32 idx1 = va_arg (va, u32); @@ -572,3 +601,308 @@ vlib_stats_register_collector_fn (vlib_stats_collector_reg_t *reg) return; } + +u32 +vlib_stats_add_ring_buffer (vlib_stats_ring_config_t *config, + const void *schema_data, char *fmt, ...) +{ + vlib_stats_segment_t *sm = vlib_stats_get_segment (); + va_list va; + u8 *name; + vlib_stats_ring_buffer_t *ring_buffer; + u32 entry_index, total_size, metadata_size, data_size, schema_offset; + + va_start (va, fmt); + name = va_format (0, fmt, &va); + va_end (va); + + entry_index = + vlib_stats_new_entry_internal (STAT_DIR_TYPE_RING_BUFFER, name); + if (entry_index == CLIB_U32_MAX) + return CLIB_U32_MAX; + vlib_stats_segment_lock (); + + /* Calculate sizes */ + metadata_size = config->n_threads * sizeof (vlib_stats_ring_metadata_t); + data_size = config->n_threads * config->ring_size * config->entry_size; + + /* Calculate total size with metadata aligned to 64 bytes for atomic + * operations */ + u32 data_offset = sizeof (vlib_stats_ring_buffer_t); + u32 metadata_offset = CLIB_CACHE_LINE_ROUND (data_offset + data_size); + + /* Add space for schema if provided */ + if (config->schema_size > 0) + { + schema_offset = metadata_offset + metadata_size; + total_size = schema_offset + config->schema_size; + } + else + { + schema_offset = 0; + total_size = metadata_offset + metadata_size; + } + + /* Allocate ring buffer structure */ + void *oldheap = clib_mem_set_heap (sm->heap); + ring_buffer = clib_mem_alloc_aligned (total_size, CLIB_CACHE_LINE_BYTES); + clib_memset (ring_buffer, 0, total_size); + clib_mem_set_heap (oldheap); + + /* Set up offsets */ + ring_buffer->config = *config; + ring_buffer->data_offset = data_offset; + ring_buffer->metadata_offset = metadata_offset; + + /* Copy schema data if provided */ + if (config->schema_size > 0 && schema_data) + { + void *schema_location = (u8 *) ring_buffer + schema_offset; + clib_memcpy_fast (schema_location, schema_data, config->schema_size); + + /* Initialize metadata for all threads with schema info */ + for (u32 thread_index = 0; thread_index < config->n_threads; + thread_index++) + { + vlib_stats_ring_metadata_t *metadata = + (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer + + metadata_offset + + thread_index * + sizeof ( + vlib_stats_ring_metadata_t)); + metadata->schema_version = config->schema_version; + metadata->schema_offset = schema_offset; + metadata->schema_size = config->schema_size; + } + } + + sm->directory_vector[entry_index].data = ring_buffer; + + vlib_stats_segment_unlock (); + + return entry_index; +} + +void * +vlib_stats_ring_get_entry (u32 entry_index, u32 thread_index) +{ + vlib_stats_segment_t *sm = vlib_stats_get_segment (); + vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index); + vlib_stats_ring_buffer_t *ring_buffer = e->data; + vlib_stats_ring_metadata_t *metadata = + (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer + + ring_buffer->metadata_offset + + thread_index * + sizeof (vlib_stats_ring_metadata_t)); + + if (thread_index >= ring_buffer->config.n_threads) + return 0; + + ASSERT (((uintptr_t) metadata % CLIB_CACHE_LINE_BYTES) == 0); + + // /* Check if ring is full */ + // if (metadata->count >= ring_buffer->config.ring_size) + // return 0; + + /* Calculate entry pointer */ + u32 offset = thread_index * ring_buffer->config.ring_size * + ring_buffer->config.entry_size; + offset += metadata->head * ring_buffer->config.entry_size; + + return (u8 *) ring_buffer + ring_buffer->data_offset + offset; +} + +int +vlib_stats_ring_produce (u32 entry_index, u32 thread_index, void *data) +{ + vlib_stats_segment_t *sm = vlib_stats_get_segment (); + vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index); + vlib_stats_ring_buffer_t *ring_buffer = e->data; + vlib_stats_ring_metadata_t *metadata = + (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer + + ring_buffer->metadata_offset + + thread_index * + sizeof (vlib_stats_ring_metadata_t)); + + if (thread_index >= ring_buffer->config.n_threads) + return -1; + + ASSERT (((uintptr_t) metadata % CLIB_CACHE_LINE_BYTES) == 0); + + /* Prefetch next slot for better cache performance */ + u32 next_head = (metadata->head + 1) % ring_buffer->config.ring_size; + /* Note: CLIB_PREFETCH would be used here for optimal cache performance */ + + /* Copy data to current head position */ + void *entry = vlib_stats_ring_get_entry (entry_index, thread_index); + if (!entry) + return -1; + + clib_memcpy_fast (entry, data, ring_buffer->config.entry_size); + + /* Update metadata - always advance head and increment sequence */ + metadata->head = next_head; + __atomic_store_n (&metadata->sequence, metadata->sequence + 1, + __ATOMIC_RELEASE); + + return 0; +} + +int +vlib_stats_ring_consume (u32 entry_index, u32 thread_index, void *data, + u64 *sequence_out) +{ + vlib_stats_segment_t *sm = vlib_stats_get_segment (); + vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index); + vlib_stats_ring_buffer_t *ring_buffer = e->data; + vlib_stats_ring_metadata_t *metadata = + (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer + + ring_buffer->metadata_offset + + thread_index * + sizeof (vlib_stats_ring_metadata_t)); + + if (thread_index >= ring_buffer->config.n_threads) + return -1; + + ASSERT (((uintptr_t) metadata % CLIB_CACHE_LINE_BYTES) == 0); + + /* Note: This function is for testing/debugging only since the reader + is read-only and manages its own state. The writer doesn't track + reader position. */ + + /* Return sequence number if requested */ + if (sequence_out) + *sequence_out = __atomic_load_n (&metadata->sequence, __ATOMIC_ACQUIRE); + + return 0; +} + +/* Direct serialization APIs - avoid extra copy */ + +void * +vlib_stats_ring_reserve_slot (u32 entry_index, u32 thread_index) +{ + vlib_stats_segment_t *sm = vlib_stats_get_segment (); + vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index); + vlib_stats_ring_buffer_t *ring_buffer = e->data; + vlib_stats_ring_metadata_t *metadata = + (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer + + ring_buffer->metadata_offset + + thread_index * + sizeof (vlib_stats_ring_metadata_t)); + + if (thread_index >= ring_buffer->config.n_threads) + return 0; + + ASSERT (((uintptr_t) metadata % CLIB_CACHE_LINE_BYTES) == 0); + + /* Calculate entry pointer */ + u32 offset = thread_index * ring_buffer->config.ring_size * + ring_buffer->config.entry_size; + offset += metadata->head * ring_buffer->config.entry_size; + + return (u8 *) ring_buffer + ring_buffer->data_offset + offset; +} + +int +vlib_stats_ring_commit_slot (u32 entry_index, u32 thread_index) +{ + vlib_stats_segment_t *sm = vlib_stats_get_segment (); + vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index); + vlib_stats_ring_buffer_t *ring_buffer = e->data; + vlib_stats_ring_metadata_t *metadata = + (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer + + ring_buffer->metadata_offset + + thread_index * + sizeof (vlib_stats_ring_metadata_t)); + ASSERT (((uintptr_t) metadata % CLIB_CACHE_LINE_BYTES) == 0); + if (thread_index >= ring_buffer->config.n_threads) + return -1; + + /* Update metadata - always advance head and increment sequence */ + metadata->head = (metadata->head + 1) % ring_buffer->config.ring_size; + __atomic_store_n (&metadata->sequence, metadata->sequence + 1, + __ATOMIC_RELEASE); + + return 0; +} + +int +vlib_stats_ring_abort_slot (u32 entry_index, u32 thread_index) +{ + /* For abort, we don't need to do anything since we haven't updated the + metadata yet. The slot will be reused on the next reserve call. */ + return 0; +} + +u32 +vlib_stats_ring_get_slot_size (u32 entry_index) +{ + vlib_stats_segment_t *sm = vlib_stats_get_segment (); + vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index); + vlib_stats_ring_buffer_t *ring_buffer = e->data; + + return ring_buffer->config.entry_size; +} + +u32 +vlib_stats_ring_get_count (u32 entry_index, u32 thread_index) +{ + /* Note: Since the writer doesn't track reader state, we can't determine + the actual count of unread entries. This function is kept for API + compatibility but always returns 0. */ + return 0; +} + +u32 +vlib_stats_ring_get_free_space (u32 entry_index, u32 thread_index) +{ + /* Note: Since the writer doesn't track reader state, we can't determine + the actual free space. This function is kept for API compatibility + but always returns the full ring size. */ + vlib_stats_segment_t *sm = vlib_stats_get_segment (); + vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index); + vlib_stats_ring_buffer_t *ring_buffer = e->data; + + return ring_buffer->config.ring_size; +} + +/* Schema retrieval from ring buffers */ + +int +vlib_stats_ring_get_schema (u32 entry_index, u32 thread_index, + void *schema_data, u32 *schema_size, + u32 *schema_version) +{ + vlib_stats_segment_t *sm = vlib_stats_get_segment (); + vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index); + vlib_stats_ring_buffer_t *ring_buffer = e->data; + vlib_stats_ring_metadata_t *metadata = + (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer + + ring_buffer->metadata_offset + + thread_index * + sizeof (vlib_stats_ring_metadata_t)); + + if (thread_index >= ring_buffer->config.n_threads) + return -1; + + ASSERT (((uintptr_t) metadata % CLIB_CACHE_LINE_BYTES) == 0); + + /* Check if schema exists */ + if (metadata->schema_size == 0) + return -1; + + /* Return schema information */ + if (schema_version) + *schema_version = metadata->schema_version; + if (schema_size) + *schema_size = metadata->schema_size; + if (schema_data) + { + void *schema_location = (u8 *) ring_buffer + ring_buffer->data_offset + + metadata->schema_offset; + clib_memcpy_fast (schema_data, schema_location, metadata->schema_size); + } + + return 0; +} diff --git a/src/vlib/stats/stats.h b/src/vlib/stats/stats.h index ab1e2828c5a..003b986d157 100644 --- a/src/vlib/stats/stats.h +++ b/src/vlib/stats/stats.h @@ -161,4 +161,60 @@ void vlib_stats_register_collector_fn (vlib_stats_collector_reg_t *r); format_function_t format_vlib_stats_symlink; +/* log2 histogram */ +u32 vlib_stats_add_histogram_log2 (char *fmt, ...); +void vlib_stats_set_histogram_log2 (u32 entry_index, u32 thread_index, + const u64 *bin_counts); + +/* Ring buffer configuration */ +typedef struct +{ + u32 entry_size; /* Size of each entry in bytes */ + u32 ring_size; /* Number of entries in the ring */ + u32 n_threads; /* Number of threads (one ring per thread) */ + u32 schema_size; /* Size of schema data in bytes (0 if no schema) */ + u32 schema_version; /* Schema version number */ +} __clib_packed vlib_stats_ring_config_t; + +/* Ring buffer metadata (per thread) */ +typedef struct +{ + CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); + u32 head; /* Producer position */ + u32 schema_version; /* Schema version for this ring */ + u64 sequence; /* Sequence number for overwrite detection */ + u32 schema_offset; /* Offset to schema data within ring buffer */ + u32 schema_size; /* Size of schema data in bytes */ + u8 _pad1[CLIB_CACHE_LINE_BYTES - 24]; /* Pad to cache line size */ +} vlib_stats_ring_metadata_t; + +/* Ring buffer entry in stats directory */ +typedef struct +{ + vlib_stats_ring_config_t config; + u32 metadata_offset; /* Offset to metadata array (64-byte aligned) */ + u32 data_offset; /* Offset to ring buffer data */ +} __clib_packed vlib_stats_ring_buffer_t; + +/* Ring buffer */ +u32 vlib_stats_add_ring_buffer (vlib_stats_ring_config_t *config, + const void *schema_data, char *fmt, ...); +void *vlib_stats_ring_get_entry (u32 entry_index, u32 thread_index); +int vlib_stats_ring_produce (u32 entry_index, u32 thread_index, void *data); +int vlib_stats_ring_consume (u32 entry_index, u32 thread_index, void *data, + u64 *sequence_out); +u32 vlib_stats_ring_get_count (u32 entry_index, u32 thread_index); +u32 vlib_stats_ring_get_free_space (u32 entry_index, u32 thread_index); + +/* Direct serialization APIs - avoid extra copy */ +void *vlib_stats_ring_reserve_slot (u32 entry_index, u32 thread_index); +int vlib_stats_ring_commit_slot (u32 entry_index, u32 thread_index); +int vlib_stats_ring_abort_slot (u32 entry_index, u32 thread_index); +u32 vlib_stats_ring_get_slot_size (u32 entry_index); + +/* Schema retrieval from ring buffers */ +int vlib_stats_ring_get_schema (u32 entry_index, u32 thread_index, + void *schema_data, u32 *schema_size, + u32 *schema_version); + #endif diff --git a/src/vpp-api/client/stat_client.c b/src/vpp-api/client/stat_client.c index 359813f8d57..5614390e55b 100644 --- a/src/vpp-api/client/stat_client.c +++ b/src/vpp-api/client/stat_client.c @@ -241,6 +241,7 @@ copy_data (vlib_stats_entry_t *ep, u32 index2, char *name, switch (ep->type) { case STAT_DIR_TYPE_SCALAR_INDEX: + case STAT_DIR_TYPE_GAUGE: result.scalar_value = ep->value; break; @@ -294,6 +295,19 @@ copy_data (vlib_stats_entry_t *ep, u32 index2, char *name, } case STAT_DIR_TYPE_EMPTY: + case STAT_DIR_TYPE_RING_BUFFER: + break; + + case STAT_DIR_TYPE_HISTOGRAM_LOG2: + { + uint64_t **bins = stat_segment_adjust (sm, ep->data); + result.log2_histogram_bins = stat_vec_dup (sm, bins); + for (i = 0; i < vec_len (bins); i++) + { + uint64_t *thread_bins = stat_segment_adjust (sm, bins[i]); + result.log2_histogram_bins[i] = stat_vec_dup (sm, thread_bins); + } + } break; default: @@ -326,7 +340,14 @@ stat_segment_data_free (stat_segment_data_t * res) vec_free (res[i].name_vector); break; case STAT_DIR_TYPE_SCALAR_INDEX: + case STAT_DIR_TYPE_GAUGE: case STAT_DIR_TYPE_EMPTY: + case STAT_DIR_TYPE_RING_BUFFER: + break; + case STAT_DIR_TYPE_HISTOGRAM_LOG2: + for (j = 0; j < vec_len (res[i].log2_histogram_bins); j++) + vec_free (res[i].log2_histogram_bins[j]); + vec_free (res[i].log2_histogram_bins); break; default: assert (0); diff --git a/src/vpp-api/client/stat_client.h b/src/vpp-api/client/stat_client.h index d9671c69ff2..722ba6eeff5 100644 --- a/src/vpp-api/client/stat_client.h +++ b/src/vpp-api/client/stat_client.h @@ -44,6 +44,7 @@ typedef struct counter_t **simple_counter_vec; vlib_counter_t **combined_counter_vec; uint8_t **name_vector; + uint64_t **log2_histogram_bins; // [thread][bin] for histogram }; } stat_segment_data_t; diff --git a/src/vpp-api/python/vpp_papi/vpp_stats.py b/src/vpp-api/python/vpp_papi/vpp_stats.py index aa9ff85b3c7..2929bbae30d 100755 --- a/src/vpp-api/python/vpp_papi/vpp_stats.py +++ b/src/vpp-api/python/vpp_papi/vpp_stats.py @@ -49,6 +49,8 @@ from struct import Struct import time import unittest import re +import asyncio +import sys def recv_fd(sock): @@ -289,6 +291,47 @@ class VPPStats: result[cnt] = self.__getitem__(cnt, blocking) return result + def get_ring_buffer(self, name, blocking=True): + """Get a ring buffer by name""" + if not self.connected: + self.connect() + + while True: + try: + if self.last_epoch != self.epoch: + self.refresh(blocking) + with self.lock: + entry = self.directory[name] + if entry.type == 8: # STAT_DIR_TYPE_RING_BUFFER + return entry.get_counter(self) + else: + raise ValueError(f"'{name}' is not a ring buffer") + except IOError: + if not blocking: + raise + + def poll_ring_buffer(self, name, thread_index=0, timeout=None, callback=None): + """Convenience method to poll a ring buffer by name""" + ring_buffer = self.get_ring_buffer(name) + return ring_buffer.poll_for_data(thread_index, timeout, callback) + + async def poll_ring_buffer_async( + self, name, thread_index=0, timeout=None, callback=None + ): + """Async convenience method to poll a ring buffer by name""" + ring_buffer = self.get_ring_buffer(name) + return await ring_buffer.poll_for_data_async(thread_index, timeout, callback) + + def get_ring_buffer_schema(self, name, thread_index=0): + """Get schema from a ring buffer by name""" + ring_buffer = self.get_ring_buffer(name) + return ring_buffer.get_schema(thread_index) + + def get_ring_buffer_schema_string(self, name, thread_index=0): + """Get schema as string from a ring buffer by name""" + ring_buffer = self.get_ring_buffer(name) + return ring_buffer.get_schema_string(thread_index) + class StatsLock: """Stat segment optimistic locking""" @@ -393,6 +436,28 @@ class SimpleList(list): return sum(self) +# Add a helper class for histogram log2 +class StatsHistogramLog2: + def __init__(self, bins, min_exp): + self.bins = bins # list of lists: [thread][bin] + self.min_exp = min_exp + + def sum(self): + return sum(sum(thread_bins) for thread_bins in self.bins) + + def thread_count(self): + return len(self.bins) + + def bin_count(self): + return max((len(b) for b in self.bins), default=0) + + def __getitem__(self, idx): + return self.bins[idx] + + def __repr__(self): + return f"StatsHistogramLog2(min_exp={self.min_exp}, bins={self.bins})" + + class StatsEntry: """An individual stats entry""" @@ -412,6 +477,12 @@ class StatsEntry: self.function = self.name elif stattype == 6: self.function = self.symlink + elif stattype == 7: # STAT_DIR_TYPE_HISTOGRAM_LOG2 + self.function = self.histogram_log2 + elif stattype == 8: # STAT_DIR_TYPE_RING_BUFFER + self.function = self.ring_buffer + elif stattype == 9: # STAT_DIR_TYPE_GAUGE + self.function = self.scalar else: self.function = self.illegal @@ -457,12 +528,480 @@ class StatsEntry: name = stats.directory_by_idx[index1] return stats[name][:, index2] + def ring_buffer(self, stats): + """Ring buffer counter""" + return StatsRingBuffer(stats, self.value) + + def histogram_log2(self, stats): + """Histogram log2 counter (STAT_DIR_TYPE_HISTOGRAM_LOG2)""" + # The value is a pointer to a vector of pointers (per-thread), each pointing to a vector of uint64_t bins + threads_ptr = self.value + thread_vec = StatsVector(stats, threads_ptr, "P") + all_bins = [] + min_exp = 0 + for thread_ptr_tuple in thread_vec: + bins_ptr = thread_ptr_tuple[0] + if bins_ptr: + bins_vec = StatsVector(stats, bins_ptr, "Q") + bins = [v[0] for v in bins_vec] + if bins: + min_exp = bins[0] + all_bins.append(bins[1:]) + else: + all_bins.append([]) + else: + all_bins.append([]) + return StatsHistogramLog2(all_bins, min_exp) + def get_counter(self, stats): """Return a list of counters""" if stats: return self.function(stats) +class StatsRingBuffer: + """Ring buffer for high-performance data streaming""" + + def __init__(self, stats, ptr): + self.stats = stats + self.ring_buffer_ptr = ptr + self.config = self._get_config() + self.metadata_ptr = self._get_metadata_ptr() + self.data_ptr = self._get_data_ptr() + # Track local tail and last sequence for each thread + # Note: Since writer doesn't track reader state, we initialize local_tails to 0 + self.local_tails = [0] * self.config["n_threads"] + self.last_sequences = [None] * self.config["n_threads"] + + def _get_config(self): + """Get ring buffer configuration from shared memory""" + config_offset = self.ring_buffer_ptr - self.stats.base + # Read the full config structure: entry_size, ring_size, n_threads, schema_size, schema_version + config_data = self.stats.statseg[config_offset : config_offset + 20] + entry_size, ring_size, n_threads, schema_size, schema_version = Struct( + "=IIIII" + ).unpack(config_data) + return { + "entry_size": entry_size, + "ring_size": ring_size, + "n_threads": n_threads, + "schema_size": schema_size, + "schema_version": schema_version, + } + + def _get_metadata_ptr(self): + """Get pointer to metadata array using offset""" + config_offset = self.ring_buffer_ptr - self.stats.base + # Read metadata_offset from the structure (at offset 20) + metadata_offset_data = self.stats.statseg[ + config_offset + 20 : config_offset + 24 + ] + metadata_offset = Struct("=I").unpack(metadata_offset_data)[0] + return config_offset + metadata_offset + + def _get_data_ptr(self): + """Get pointer to ring buffer data using offset""" + config_offset = self.ring_buffer_ptr - self.stats.base + # Read data_offset from the structure (at offset 24) + data_offset_data = self.stats.statseg[config_offset + 24 : config_offset + 28] + data_offset = Struct("=I").unpack(data_offset_data)[0] + return config_offset + data_offset + + def _get_thread_metadata(self, thread_index): + """Get metadata for a specific thread, including sequence number and schema info""" + if thread_index >= self.config["n_threads"]: + raise IndexError(f"Thread index {thread_index} out of range") + + # Metadata struct: head, schema_version, sequence, schema_offset, schema_size, padding + metadata_offset = self.metadata_ptr + ( + thread_index * 64 # CLIB_CACHE_LINE_BYTES, typically 64 + ) + metadatafmt_struct = Struct( + "=IIQII" + ) # head, schema_version, sequence, schema_offset, schema_size + metadata_data = self.stats.statseg[ + metadata_offset : metadata_offset + metadatafmt_struct.size + ] + head, schema_version, sequence, schema_offset, schema_size = ( + metadatafmt_struct.unpack(metadata_data) + ) + return { + "head": head, + "schema_version": schema_version, + "sequence": sequence, + "schema_offset": schema_offset, + "schema_size": schema_size, + } + + def get_schema(self, thread_index=0): + """Get schema data from ring buffer for a specific thread""" + metadata = self._get_thread_metadata(thread_index) + + # Check if schema exists + if metadata["schema_size"] == 0: + return None, 0, 0 + + # Calculate schema location + config_offset = self.ring_buffer_ptr - self.stats.base + schema_location = config_offset + metadata["schema_offset"] + + # Read schema data + schema_data = self.stats.statseg[ + schema_location : schema_location + metadata["schema_size"] + ] + + return schema_data, metadata["schema_size"], metadata["schema_version"] + + def get_schema_string(self, thread_index=0): + """Get schema as a string (for text-based schemas like CDDL)""" + schema_data, schema_size, schema_version = self.get_schema(thread_index) + + if schema_data is None: + return None, 0, 0 + + try: + # Try to decode as UTF-8 string + schema_string = schema_data.decode("utf-8") + return schema_string, schema_size, schema_version + except UnicodeDecodeError: + # If it's not a valid UTF-8 string, return as bytes + return schema_data, schema_size, schema_version + + def get_count(self, thread_index=0): + """Get current count of entries in ring buffer for a thread""" + # Note: Since the writer doesn't track reader state, we can't determine + # the actual count. This method is kept for API compatibility. + return 0 + + def is_empty(self, thread_index=0): + """Check if ring buffer is empty for a thread""" + # Note: Since the writer doesn't track reader state, we can't determine + # if the ring is empty. This method is kept for API compatibility. + return True + + def is_full(self, thread_index=0): + """Check if ring buffer is full for a thread""" + # Note: Since the writer doesn't track reader state, we can't determine + # if the ring is full. This method is kept for API compatibility. + return False + + def consume_data(self, thread_index=0, max_entries=None): + """Consume data from ring buffer for a thread (read-only), with sequence check""" + # Read metadata atomically to get consistent snapshot + metadata = self._get_thread_metadata(thread_index) + local_tail = self.local_tails[thread_index] + last_sequence = self.last_sequences[thread_index] + sequence = metadata["sequence"] + ring_size = self.config["ring_size"] + + # Overwrite detection: did the producer lap us? + if last_sequence is not None: + delta = (sequence - last_sequence) % (1 << 64) + if delta > ring_size: + print( + f"[WARN] Ring buffer overwrite detected on thread {thread_index}: " + f"sequence jumped from {last_sequence} to {sequence} (delta={delta}, ring_size={ring_size})" + ) + # Resync local_tail to a reasonable position + local_tail = (metadata["head"] - ring_size) % ring_size + + # If the sequence hasn't changed, nothing new to read + if last_sequence == sequence: + return [] + + # Calculate how many new entries are available + if last_sequence is None: + # First time reading - calculate how many entries are available + available = min(sequence, ring_size) + # Calculate starting position: (head - available) % ring_size + # This gives us the oldest entry that's still available + local_tail = (metadata["head"] - available) % ring_size + else: + available = (sequence - last_sequence) % (1 << 64) + if available > ring_size: + available = ring_size # Cap at ring size + + if available == 0: + self.last_sequences[thread_index] = sequence + return [] + + if max_entries is None: + max_entries = available + else: + max_entries = min(max_entries, available) + + consumed_data = [] + entry_size = self.config["entry_size"] + + # Calculate data offset for this thread + thread_data_offset = self.data_ptr + ( + thread_index * self.config["ring_size"] * entry_size + ) + + # Read data with retry logic for potential contention + max_retries = 3 + for retry in range(max_retries): + try: + for i in range(max_entries): + entry_offset = thread_data_offset + (local_tail * entry_size) + entry_data = self.stats.statseg[ + entry_offset : entry_offset + entry_size + ] + consumed_data.append(entry_data) + local_tail = (local_tail + 1) % self.config["ring_size"] + + # Verify sequence number hasn't changed during our read + current_metadata = self._get_thread_metadata(thread_index) + if current_metadata["sequence"] == sequence: + # Success - update local state + self.local_tails[thread_index] = local_tail + # Update last_sequence based on how many entries we read + if last_sequence is None: + # First time reading - update to the sequence number of the last entry we read + self.last_sequences[thread_index] = ( + sequence - available + len(consumed_data) + ) + else: + # Subsequent reading - update by the number of entries we read + self.last_sequences[thread_index] = last_sequence + len( + consumed_data + ) + return consumed_data + else: + # Sequence changed during read, retry + if retry < max_retries - 1: + # Re-read metadata and recalculate + metadata = current_metadata + sequence = metadata["sequence"] + if last_sequence is None: + available = min(sequence, ring_size) + # Calculate starting position: (head - available) % ring_size + local_tail = (metadata["head"] - available) % ring_size + else: + available = (sequence - last_sequence) % (1 << 64) + if available > ring_size: + available = ring_size + if available == 0: + self.last_sequences[thread_index] = sequence + return [] + max_entries = min(max_entries, available) + consumed_data = [] + local_tail = self.local_tails[thread_index] + continue + else: + # Max retries reached, return what we have + print(f"[WARN] Max retries reached, returning partial data") + self.local_tails[thread_index] = local_tail + self.last_sequences[thread_index] = sequence + return consumed_data + + except Exception as e: + print(f"[ERROR] Exception during data read: {e}") + if retry < max_retries - 1: + continue + else: + return consumed_data + + return consumed_data + + def consume_data_batch(self, thread_index=0, max_entries=None, prefetch=True): + """Consume data from ring buffer in batches for better performance""" + # Read metadata atomically to get consistent snapshot + metadata = self._get_thread_metadata(thread_index) + local_tail = self.local_tails[thread_index] + last_sequence = self.last_sequences[thread_index] + sequence = metadata["sequence"] + ring_size = self.config["ring_size"] + + # Overwrite detection: did the producer lap us? + if last_sequence is not None: + delta = (sequence - last_sequence) % (1 << 64) + if delta > ring_size: + print( + f"[WARN] Ring buffer overwrite detected on thread {thread_index}: " + f"sequence jumped from {last_sequence} to {sequence} (delta={delta}, ring_size={ring_size})" + ) + # Resync local_tail to a reasonable position + local_tail = (metadata["head"] - ring_size) % ring_size + + # If the sequence hasn't changed, nothing new to read + if last_sequence == sequence: + return [] + + # Calculate how many new entries are available + if last_sequence is None: + # First time reading - calculate how many entries are available + available = min(sequence, ring_size) + # Calculate starting position: (head - available) % ring_size + # This gives us the oldest entry that's still available + local_tail = (metadata["head"] - available) % ring_size + else: + available = (sequence - last_sequence) % (1 << 64) + if available > ring_size: + available = ring_size # Cap at ring size + + if available == 0: + self.last_sequences[thread_index] = sequence + return [] + + if max_entries is None: + max_entries = available + else: + max_entries = min(max_entries, available) + + consumed_data = [] + entry_size = self.config["entry_size"] + + # Calculate data offset for this thread + thread_data_offset = self.data_ptr + ( + thread_index * self.config["ring_size"] * entry_size + ) + + # Prefetch next few entries for better cache performance + if prefetch and max_entries > 1: + next_tail = (local_tail + 1) % ring_size + next_offset = thread_data_offset + (next_tail * entry_size) + # Note: Python doesn't have direct prefetch, but we can optimize memory access patterns + # by reading data in larger chunks when possible + + # Read data with retry logic for potential contention + max_retries = 3 + for retry in range(max_retries): + try: + # Read data in larger chunks when possible for better performance + chunk_size = min(max_entries, 16) # Read up to 16 entries at once + for chunk_start in range(0, max_entries, chunk_size): + chunk_end = min(chunk_start + chunk_size, max_entries) + + for i in range(chunk_start, chunk_end): + entry_offset = thread_data_offset + (local_tail * entry_size) + entry_data = self.stats.statseg[ + entry_offset : entry_offset + entry_size + ] + consumed_data.append(entry_data) + local_tail = (local_tail + 1) % self.config["ring_size"] + + # Verify sequence number hasn't changed during our read + current_metadata = self._get_thread_metadata(thread_index) + if current_metadata["sequence"] == sequence: + # Success - update local state + self.local_tails[thread_index] = local_tail + # Update last_sequence based on how many entries we read + if last_sequence is None: + # First time reading - update to the sequence number of the last entry we read + self.last_sequences[thread_index] = ( + sequence - available + len(consumed_data) + ) + else: + # Subsequent reading - update by the number of entries we read + self.last_sequences[thread_index] = last_sequence + len( + consumed_data + ) + return consumed_data + else: + # Sequence changed during read, retry + if retry < max_retries - 1: + # Re-read metadata and recalculate + metadata = current_metadata + sequence = metadata["sequence"] + if last_sequence is None: + available = min(sequence, ring_size) + local_tail = (metadata["head"] - available) % ring_size + else: + available = (sequence - last_sequence) % (1 << 64) + if available > ring_size: + available = ring_size + if available == 0: + self.last_sequences[thread_index] = sequence + return [] + max_entries = min(max_entries, available) + consumed_data = [] + local_tail = self.local_tails[thread_index] + continue + else: + # Max retries reached, return what we have + print(f"[WARN] Max retries reached, returning partial data") + self.local_tails[thread_index] = local_tail + self.last_sequences[thread_index] = sequence + return consumed_data + + except Exception as e: + print(f"[ERROR] Exception during data read: {e}") + if retry < max_retries - 1: + continue + else: + return consumed_data + + return consumed_data + + def poll_for_data(self, thread_index=0, timeout=None, callback=None): + """Poll for new data in ring buffer""" + start_time = time.time() + + while True: + data = self.consume_data(thread_index) + if data: + if callback: + for entry_data in data: + try: + callback(entry_data) + except Exception as e: + print(f"Callback error: {e}") + else: + return data + + if timeout and (time.time() - start_time) > timeout: + return [] + time.sleep(0.000001) # 1μs polling interval + + async def poll_for_data_async(self, thread_index=0, timeout=None, callback=None): + """Async version of poll_for_data""" + + start_time = asyncio.get_event_loop().time() + + while True: + metadata = self._get_thread_metadata(thread_index) + local_tail = self.local_tails[thread_index] + + # Check if new data is available using local tail + if metadata["head"] != local_tail: + # Calculate how many new entries + if metadata["head"] > local_tail: + new_entries = metadata["head"] - local_tail + else: + new_entries = ( + self.config["ring_size"] - local_tail + metadata["head"] + ) + + data = self.consume_data(thread_index, new_entries) + + if callback: + for entry_data in data: + await callback(entry_data) + else: + return data + + if timeout and (asyncio.get_event_loop().time() - start_time) > timeout: + return [] + + # Yield control to asyncio + await asyncio.sleep(0.000001) # 1μs polling interval + + def get_config(self): + """Get ring buffer configuration""" + return self.config.copy() + + def __repr__(self): + schema_info = "" + if self.config["schema_size"] > 0: + schema_info = f", schema_size={self.config['schema_size']}, schema_version={self.config['schema_version']}" + + return ( + f"StatsRingBuffer(entry_size={self.config['entry_size']}, " + f"ring_size={self.config['ring_size']}, n_threads={self.config['n_threads']}{schema_info})" + ) + + class TestStats(unittest.TestCase): """Basic statseg tests""" @@ -557,8 +1096,556 @@ class TestStats(unittest.TestCase): print("/sys/nodes/unix-epoll-input", self.stat["/nodes/unix-epoll-input/calls"]) +class TestRingBuffer(unittest.TestCase): + """Ring buffer specific tests""" + + def setUp(self): + """Connect to statseg and find test ring buffer""" + self.stat = VPPStats() + self.stat.connect() + + # Look for test ring buffer created by VPP CLI command + try: + self.ring_buffer = self.stat.get_ring_buffer("/test/ring-buffer") + self.ring_buffer_available = True + except (KeyError, ValueError): + print( + "Test ring buffer not found. Run 'test stats ring-buffer-gen test_ring 100 1000 16' in VPP first." + ) + self.ring_buffer_available = False + + def tearDown(self): + """Disconnect from statseg""" + self.stat.disconnect() + + def test_ring_buffer_config(self): + """Test ring buffer configuration access""" + if not self.ring_buffer_available: + self.skipTest("Ring buffer not available") + + config = self.ring_buffer.get_config() + self.assertIsInstance(config, dict) + self.assertIn("entry_size", config) + self.assertIn("ring_size", config) + self.assertIn("n_threads", config) + + print(f"Ring buffer config: {config}") + + # Verify reasonable values + self.assertGreater(config["entry_size"], 0) + self.assertGreater(config["ring_size"], 0) + self.assertGreater(config["n_threads"], 0) + + def test_ring_buffer_metadata_access(self): + """Test ring buffer metadata access""" + if not self.ring_buffer_available: + self.skipTest("Ring buffer not available") + + # Test metadata access for thread 0 + metadata = self.ring_buffer._get_thread_metadata(0) + self.assertIsInstance(metadata, dict) + self.assertIn("head", metadata) + self.assertIn("sequence", metadata) + + print(f"Thread 0 metadata: {metadata}") + + # Verify metadata values are reasonable + self.assertIsInstance(metadata["head"], int) + self.assertIsInstance(metadata["sequence"], int) + self.assertGreaterEqual(metadata["head"], 0) + self.assertGreaterEqual(metadata["sequence"], 0) + + def test_ring_buffer_consume_empty(self): + """Test consuming from empty ring buffer""" + if not self.ring_buffer_available: + self.skipTest("Ring buffer not available") + + # Consume from empty buffer + data = self.ring_buffer.consume_data(thread_index=0) + self.assertEqual(data, []) + + # Test with max_entries + data = self.ring_buffer.consume_data(thread_index=0, max_entries=10) + self.assertEqual(data, []) + + def test_ring_buffer_consume_batch_empty(self): + """Test batch consuming from empty ring buffer""" + if not self.ring_buffer_available: + self.skipTest("Ring buffer not available") + + # Consume from empty buffer + data = self.ring_buffer.consume_data_batch(thread_index=0) + self.assertEqual(data, []) + + # Test with max_entries + data = self.ring_buffer.consume_data_batch(thread_index=0, max_entries=10) + self.assertEqual(data, []) + + def test_ring_buffer_poll_empty(self): + """Test polling empty ring buffer""" + if not self.ring_buffer_available: + self.skipTest("Ring buffer not available") + + # Poll with short timeout + data = self.ring_buffer.poll_for_data(thread_index=0, timeout=0.1) + self.assertEqual(data, []) + + def test_ring_buffer_poll_with_callback(self): + """Test polling with callback function""" + if not self.ring_buffer_available: + self.skipTest("Ring buffer not available") + + collected_data = [] + + def callback(data): + collected_data.append(data) + + # Poll with callback and short timeout + result = self.ring_buffer.poll_for_data( + thread_index=0, timeout=0.1, callback=callback + ) + self.assertEqual(result, []) + self.assertEqual(collected_data, []) + + def test_ring_buffer_invalid_thread(self): + """Test ring buffer with invalid thread index""" + if not self.ring_buffer_available: + self.skipTest("Ring buffer not available") + + config = self.ring_buffer.get_config() + invalid_thread = config["n_threads"] + 1 + + # Test metadata access with invalid thread + with self.assertRaises(IndexError): + self.ring_buffer._get_thread_metadata(invalid_thread) + + # Test consume with invalid thread + data = self.ring_buffer.consume_data(thread_index=invalid_thread) + self.assertEqual(data, []) + + def test_ring_buffer_api_compatibility(self): + """Test API compatibility methods""" + if not self.ring_buffer_available: + self.skipTest("Ring buffer not available") + + # Test compatibility methods (these return simplified values) + count = self.ring_buffer.get_count(thread_index=0) + self.assertEqual(count, 0) + + is_empty = self.ring_buffer.is_empty(thread_index=0) + self.assertTrue(is_empty) + + is_full = self.ring_buffer.is_full(thread_index=0) + self.assertFalse(is_full) + + def test_ring_buffer_repr(self): + """Test ring buffer string representation""" + if not self.ring_buffer_available: + self.skipTest("Ring buffer not available") + + repr_str = repr(self.ring_buffer) + self.assertIsInstance(repr_str, str) + self.assertIn("StatsRingBuffer", repr_str) + self.assertIn("entry_size", repr_str) + self.assertIn("ring_size", repr_str) + self.assertIn("n_threads", repr_str) + + print(f"Ring buffer repr: {repr_str}") + + def test_ring_buffer_multiple_threads(self): + """Test ring buffer access across multiple threads""" + if not self.ring_buffer_available: + self.skipTest("Ring buffer not available") + + config = self.ring_buffer.get_config() + + # Test all available threads + for thread_index in range(config["n_threads"]): + metadata = self.ring_buffer._get_thread_metadata(thread_index) + self.assertIsInstance(metadata, dict) + self.assertIn("head", metadata) + self.assertIn("sequence", metadata) + + data = self.ring_buffer.consume_data(thread_index=thread_index) + self.assertIsInstance(data, list) + + def test_ring_buffer_sequence_consistency(self): + """Test sequence number consistency across reads""" + if not self.ring_buffer_available: + self.skipTest("Ring buffer not available") + + # Read metadata multiple times to check consistency + metadata1 = self.ring_buffer._get_thread_metadata(0) + metadata2 = self.ring_buffer._get_thread_metadata(0) + + # Sequence numbers should be consistent (same or increasing) + self.assertGreaterEqual(metadata2["sequence"], metadata1["sequence"]) + + def test_ring_buffer_error_handling(self): + """Test ring buffer error handling""" + if not self.ring_buffer_available: + self.skipTest("Ring buffer not available") + + # Test with invalid parameters + data = self.ring_buffer.consume_data(thread_index=0, max_entries=0) + self.assertEqual(data, []) + + data = self.ring_buffer.consume_data(thread_index=0, max_entries=-1) + self.assertEqual(data, []) + + def test_ring_buffer_batch_vs_individual(self): + """Test that batch and individual consume return same results""" + if not self.ring_buffer_available: + self.skipTest("Ring buffer not available") + + # Reset local state to ensure fair comparison + self.ring_buffer.local_tails[0] = 0 + self.ring_buffer.last_sequences[0] = None + + # Consume with individual method + individual_data = self.ring_buffer.consume_data(thread_index=0, max_entries=5) + + # Reset local state + self.ring_buffer.local_tails[0] = 0 + self.ring_buffer.last_sequences[0] = None + + # Consume with batch method + batch_data = self.ring_buffer.consume_data_batch(thread_index=0, max_entries=5) + + # Results should be the same + self.assertEqual(individual_data, batch_data) + + def test_ring_buffer_prefetch_parameter(self): + """Test prefetch parameter in batch consume""" + if not self.ring_buffer_available: + self.skipTest("Ring buffer not available") + + # Test with prefetch enabled + data1 = self.ring_buffer.consume_data_batch(thread_index=0, prefetch=True) + + # Test with prefetch disabled + data2 = self.ring_buffer.consume_data_batch(thread_index=0, prefetch=False) + + # Results should be the same regardless of prefetch setting + self.assertEqual(data1, data2) + + def test_ring_buffer_schema_access(self): + """Test ring buffer schema access""" + if not self.ring_buffer_available: + self.skipTest("Ring buffer not available") + + # Test schema access + schema_data, schema_size, schema_version = self.ring_buffer.get_schema( + thread_index=0 + ) + + # Should return schema information (may be None if no schema) + self.assertIsInstance(schema_size, int) + self.assertIsInstance(schema_version, int) + self.assertGreaterEqual(schema_size, 0) + self.assertGreaterEqual(schema_version, 0) + + # Test schema string access + schema_string, schema_size_str, schema_version_str = ( + self.ring_buffer.get_schema_string(thread_index=0) + ) + + # Should return consistent information + self.assertEqual(schema_size, schema_size_str) + self.assertEqual(schema_version, schema_version_str) + + # If schema exists, it should be readable + if schema_size > 0: + self.assertIsNotNone(schema_data) + self.assertIsInstance(schema_data, bytes) + self.assertEqual(len(schema_data), schema_size) + + # If it's a string schema, it should be decodable + if schema_string is not None: + self.assertIsInstance(schema_string, str) + self.assertGreater(len(schema_string), 0) + + print(f"Schema info: size={schema_size}, version={schema_version}") + if schema_string: + print(f"Schema content: {schema_string}") + + def test_ring_buffer_schema_metadata(self): + """Test that schema information is included in metadata""" + if not self.ring_buffer_available: + self.skipTest("Ring buffer not available") + + # Get metadata for thread 0 + metadata = self.ring_buffer._get_thread_metadata(0) + + # Should include schema information + self.assertIn("schema_version", metadata) + self.assertIn("schema_offset", metadata) + self.assertIn("schema_size", metadata) + + # Values should be consistent with config + self.assertEqual( + metadata["schema_version"], self.ring_buffer.config["schema_version"] + ) + self.assertEqual( + metadata["schema_size"], self.ring_buffer.config["schema_size"] + ) + + def test_ring_buffer_schema_config(self): + """Test that schema information is included in config""" + if not self.ring_buffer_available: + self.skipTest("Ring buffer not available") + + config = self.ring_buffer.get_config() + + # Should include schema information + self.assertIn("schema_size", config) + self.assertIn("schema_version", config) + + # Values should be reasonable + self.assertIsInstance(config["schema_size"], int) + self.assertIsInstance(config["schema_version"], int) + self.assertGreaterEqual(config["schema_size"], 0) + self.assertGreaterEqual(config["schema_version"], 0) + + print( + f"Config schema info: size={config['schema_size']}, version={config['schema_version']}" + ) + + def test_ring_buffer_schema_convenience_methods(self): + """Test convenience methods for schema access""" + if not self.ring_buffer_available: + self.skipTest("Ring buffer not available") + + # Test convenience methods from VPPStats + schema_data, schema_size, schema_version = self.stat.get_ring_buffer_schema( + "/test/ring-buffer" + ) + schema_string, schema_size_str, schema_version_str = ( + self.stat.get_ring_buffer_schema_string("/test/ring-buffer") + ) + + # Should return consistent information + self.assertEqual(schema_size, schema_size_str) + self.assertEqual(schema_version, schema_version_str) + + # Should match direct access + direct_schema_data, direct_schema_size, direct_schema_version = ( + self.ring_buffer.get_schema() + ) + self.assertEqual(schema_size, direct_schema_size) + self.assertEqual(schema_version, direct_schema_version) + if schema_data is not None: + self.assertEqual(schema_data, direct_schema_data) + + def test_ring_buffer_schema_content(self): + """Test that schema content matches expected CDDL format""" + if not self.ring_buffer_available: + self.skipTest("Ring buffer not available") + + # Get schema string + schema_string, schema_size, schema_version = self.ring_buffer.get_schema_string( + thread_index=0 + ) + + # If schema exists, verify it has expected content + if schema_size > 0 and schema_string: + # Should be a string (not bytes) + self.assertIsInstance(schema_string, str) + + # Should contain expected CDDL-like content + self.assertIn("ring_test_schema", schema_string) + self.assertIn("name:", schema_string) + self.assertIn("version:", schema_string) + self.assertIn("fields:", schema_string) + self.assertIn("seq", schema_string) + self.assertIn("timestamp", schema_string) + + print(f"✓ Schema content verified: {schema_string[:100]}...") + else: + print("ℹ No schema found in ring buffer") + + +class TestRingBufferIntegration(unittest.TestCase): + """Integration tests that coordinate with VPP writer tests""" + + def setUp(self): + """Connect to statseg and find integration test ring buffer""" + self.stat = VPPStats() + self.stat.connect() + + # Look for integration test ring buffer + try: + self.ring_buffer = self.stat.get_ring_buffer("/integration/test") + self.ring_buffer_available = True + except (KeyError, ValueError): + print( + "Integration test ring buffer not found. Run integration test setup first." + ) + self.ring_buffer_available = False + + def tearDown(self): + """Disconnect from statseg""" + self.stat.disconnect() + + def test_integration_data_flow(self): + """Test complete data flow from writer to reader""" + if not self.ring_buffer_available: + self.skipTest("Integration ring buffer not available") + + # This test would coordinate with a VPP writer test + # For now, we'll test the basic flow + + # Reset reader state + self.ring_buffer.local_tails[0] = 0 + self.ring_buffer.last_sequences[0] = None + + # Try to consume data + data = self.ring_buffer.consume_data(thread_index=0) + + # Should get list (empty or with data) + self.assertIsInstance(data, list) + + # If we got data, verify it's the right format + for entry in data: + self.assertIsInstance(entry, bytes) + self.assertGreater(len(entry), 0) + + def test_integration_overwrite_detection(self): + """Test overwrite detection in integration scenario""" + if not self.ring_buffer_available: + self.skipTest("Integration ring buffer not available") + + # This test would coordinate with a writer that intentionally overwrites + # For now, we'll test the detection mechanism + + # Simulate overwrite by manipulating sequence numbers + original_sequence = self.ring_buffer.last_sequences[0] + + # This would normally happen when writer laps reader + # For testing, we'll just verify the detection logic exists + metadata = self.ring_buffer._get_thread_metadata(0) + self.assertIn("sequence", metadata) + + def test_integration_performance(self): + """Test performance characteristics""" + if not self.ring_buffer_available: + self.skipTest("Integration ring buffer not available") + + import time + + # Test individual consume performance + start_time = time.time() + for _ in range(100): + self.ring_buffer.consume_data(thread_index=0, max_entries=1) + individual_time = time.time() - start_time + + # Test batch consume performance + start_time = time.time() + for _ in range(10): + self.ring_buffer.consume_data_batch(thread_index=0, max_entries=10) + batch_time = time.time() - start_time + + print(f"Individual consume time: {individual_time:.6f}s") + print(f"Batch consume time: {batch_time:.6f}s") + + # Batch should be faster (though with empty data, difference might be minimal) + self.assertIsInstance(individual_time, float) + self.assertIsInstance(batch_time, float) + + +def run_ring_buffer_tests(): + """Run ring buffer tests with proper setup""" + print("Running Ring Buffer Tests...") + print("=" * 50) + + # Create test suite + suite = unittest.TestSuite() + + # Add ring buffer tests + suite.addTest(unittest.makeSuite(TestRingBuffer)) + suite.addTest(unittest.makeSuite(TestRingBufferIntegration)) + + # Run tests + runner = unittest.TextTestRunner(verbosity=2) + result = runner.run(suite) + + print("=" * 50) + print(f"Tests run: {result.testsRun}") + print(f"Failures: {len(result.failures)}") + print(f"Errors: {len(result.errors)}") + + return result.wasSuccessful() + + +def demo_ring_buffer_schema(): + """Demo function showing how to use ring buffer schema functionality""" + print("Ring Buffer Schema Demo") + print("=" * 30) + + try: + # Connect to VPP stats + stats = VPPStats() + stats.connect() + + # Look for test ring buffer + try: + ring_buffer = stats.get_ring_buffer("/test/ring-buffer") + print("✓ Found test ring buffer") + + # Get configuration + config = ring_buffer.get_config() + print(f"Ring buffer config: {config}") + + # Get schema information + schema_data, schema_size, schema_version = ring_buffer.get_schema() + print(f"Schema info: size={schema_size}, version={schema_version}") + + # Get schema as string + schema_string, _, _ = ring_buffer.get_schema_string() + if schema_string: + print(f"Schema content:\n{schema_string}") + else: + print("No schema found") + + # Use convenience methods + print("\nUsing convenience methods:") + conv_schema_string, conv_size, conv_version = ( + stats.get_ring_buffer_schema_string("/test/ring-buffer") + ) + print( + f"Convenience method result: size={conv_size}, version={conv_version}" + ) + if conv_schema_string: + print(f"Schema: {conv_schema_string[:100]}...") + + except (KeyError, ValueError) as e: + print(f"Test ring buffer not found: {e}") + print( + "Run 'test stats ring-buffer-gen /test/ring-buffer 100 1000 16' in VPP first" + ) + + stats.disconnect() + + except Exception as e: + print(f"Error: {e}") + print("Make sure VPP is running and stats socket is available") + + print("=" * 30) + + if __name__ == "__main__": import cProfile from pstats import Stats + # Run ring buffer tests if available + if "--ring-buffer-tests" in sys.argv: + success = run_ring_buffer_tests() + sys.exit(0 if success else 1) + + # Run schema demo if requested + if "--schema-demo" in sys.argv: + demo_ring_buffer_schema() + sys.exit(0) + + # Run original tests unittest.main() diff --git a/src/vpp/app/vpp_get_stats.c b/src/vpp/app/vpp_get_stats.c index 1c3b9d9538d..699b5e95fae 100644 --- a/src/vpp/app/vpp_get_stats.c +++ b/src/vpp/app/vpp_get_stats.c @@ -81,9 +81,37 @@ stat_poll_loop (u8 ** patterns) break; case STAT_DIR_TYPE_SCALAR_INDEX: + case STAT_DIR_TYPE_GAUGE: fformat (stdout, "%.2f %s\n", res[i].scalar_value, res[i].name); break; + case STAT_DIR_TYPE_HISTOGRAM_LOG2: + for (k = 0; k < vec_len (res[i].log2_histogram_bins); k++) + { + u64 *bins = res[i].log2_histogram_bins[k]; + int n_bins = vec_len (bins); + if (n_bins < 2) // Need at least min_exp + one bin + continue; + u32 min_exp = bins[0]; + u64 cumulative = 0; + u64 sum = 0; + fformat (stdout, "Histogram %s (thread %d):\n", res[i].name, + k); + for (int j = 1; j < n_bins; ++j) + { + cumulative += bins[j]; + sum += bins[j] * + (1ULL << (min_exp + j - 1)); // midpoint approx + fformat (stdout, " <= %llu: %llu (cumulative: %llu)\n", + (1ULL << (min_exp + j - 1)), bins[j], + cumulative); + } + fformat (stdout, + " +Inf: %llu (total count: %llu, sum: %llu)\n", + cumulative, cumulative, sum); + } + break; + case STAT_DIR_TYPE_EMPTY: break; @@ -224,9 +252,37 @@ reconnect: break; case STAT_DIR_TYPE_SCALAR_INDEX: + case STAT_DIR_TYPE_GAUGE: fformat (stdout, "%.2f %s\n", res[i].scalar_value, res[i].name); break; + case STAT_DIR_TYPE_HISTOGRAM_LOG2: + for (k = 0; k < vec_len (res[i].log2_histogram_bins); k++) + { + u64 *bins = res[i].log2_histogram_bins[k]; + int n_bins = vec_len (bins); + if (n_bins < 2) // Need at least min_exp + one bin + continue; + u32 min_exp = bins[0]; + u64 cumulative = 0; + u64 sum = 0; + fformat (stdout, "Histogram %s (thread %d):\n", res[i].name, + k); + for (int j = 1; j < n_bins; ++j) + { + cumulative += bins[j]; + sum += bins[j] * + (1ULL << (min_exp + j - 1)); // midpoint approx + fformat (stdout, " <= %llu: %llu (cumulative: %llu)\n", + (1ULL << (min_exp + j - 1)), bins[j], + cumulative); + } + fformat (stdout, + " +Inf: %llu (total count: %llu, sum: %llu)\n", + cumulative, cumulative, sum); + } + break; + case STAT_DIR_TYPE_NAME_VECTOR: if (res[i].name_vector == 0) continue; diff --git a/src/vpp/app/vpp_prometheus_export.c b/src/vpp/app/vpp_prometheus_export.c index c6000a8a008..e40212def16 100644 --- a/src/vpp/app/vpp_prometheus_export.c +++ b/src/vpp/app/vpp_prometheus_export.c @@ -1,6 +1,6 @@ /* *------------------------------------------------------------------ - * vpp_get_stats.c + * vpp_prometheus_export.c * * Copyright (c) 2018 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); @@ -35,6 +35,7 @@ #include #include #include +#include #include /* https://github.com/prometheus/prometheus/wiki/Default-port-allocations */ @@ -55,6 +56,45 @@ prom_string (char *s) return s; } +// For STAT_DIR_TYPE_HISTOGRAM_LOG2, the data is in res->log2_histogram_bins +static void +print_log2_histogram_metric (FILE *stream, stat_segment_data_t *res) +{ + int n_threads = vec_len (res->log2_histogram_bins); + char sanitized_name[VLIB_STATS_MAX_NAME_SZ]; + strncpy (sanitized_name, res->name, VLIB_STATS_MAX_NAME_SZ - 1); + sanitized_name[VLIB_STATS_MAX_NAME_SZ - 1] = '\0'; + prom_string (sanitized_name); + + for (int thread = 0; thread < n_threads; ++thread) + { + u64 *bins = res->log2_histogram_bins[thread]; + int n_bins = vec_len (bins); + if (n_bins < 2) // Need at least min_exp + one bin + continue; + u32 min_exp = bins[0]; + u64 cumulative = 0; + u64 sum = 0; + fformat (stream, "# TYPE %s histogram\n", sanitized_name); + for (int i = 1; i < n_bins; ++i) + { + cumulative += bins[i]; + sum += bins[i] * (1ULL << (min_exp + i - 1)); // midpoint approx + fformat (stream, "%s{le=\"%llu\",thread=\"%d\"} %llu\n", + sanitized_name, (1ULL << (min_exp + i - 1)), thread, + cumulative); + } + // +Inf bucket + fformat (stream, "%s{le=\"+Inf\",thread=\"%d\"} %llu\n", sanitized_name, + thread, cumulative); + // _count and _sum + fformat (stream, "%s_count{thread=\"%d\"} %llu\n", sanitized_name, + thread, cumulative); + fformat (stream, "%s_sum{thread=\"%d\"} %llu\n", sanitized_name, thread, + sum); + } +} + static void print_metric_v1 (FILE *stream, stat_segment_data_t *res) { @@ -62,6 +102,9 @@ print_metric_v1 (FILE *stream, stat_segment_data_t *res) switch (res->type) { + case STAT_DIR_TYPE_HISTOGRAM_LOG2: + print_log2_histogram_metric (stream, res); + break; case STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE: fformat (stream, "# TYPE %s counter\n", prom_string (res->name)); for (k = 0; k < vec_len (res->simple_counter_vec); k++) @@ -91,7 +134,11 @@ print_metric_v1 (FILE *stream, stat_segment_data_t *res) fformat (stream, "%s %.2f\n", prom_string (res->name), res->scalar_value); break; - + case STAT_DIR_TYPE_GAUGE: + fformat (stream, "# TYPE %s gauge\n", prom_string (res->name)); + fformat (stream, "%s %.2f\n", prom_string (res->name), + res->scalar_value); + break; case STAT_DIR_TYPE_NAME_VECTOR: fformat (stream, "# TYPE %s_info gauge\n", prom_string (res->name)); for (k = 0; k < vec_len (res->name_vector); k++) @@ -159,9 +206,13 @@ print_metric_v2 (FILE *stream, stat_segment_data_t *res) num_tokens = tokenize (res->name, tokens, lengths, MAX_TOKENS); switch (res->type) { + case STAT_DIR_TYPE_HISTOGRAM_LOG2: + print_log2_histogram_metric (stream, res); + break; case STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE: if (res->simple_counter_vec == 0) return; + for (k = 0; k < vec_len (res->simple_counter_vec); k++) for (j = 0; j < vec_len (res->simple_counter_vec[k]); j++) { @@ -260,6 +311,7 @@ print_metric_v2 (FILE *stream, stat_segment_data_t *res) break; case STAT_DIR_TYPE_SCALAR_INDEX: + case STAT_DIR_TYPE_GAUGE: if ((num_tokens == 4) && !strncmp (tokens[1], "buffer-pools", lengths[1])) { diff --git a/test/test_ring_buffer.py b/test/test_ring_buffer.py new file mode 100644 index 00000000000..25b6b3b6f78 --- /dev/null +++ b/test/test_ring_buffer.py @@ -0,0 +1,424 @@ +import time +import unittest +import struct +from framework import VppTestCase + +RING_NAME = "/test/ring_buffer_pytest" +RING_SIZE = 1024 +COUNT = 100 +INTERVAL_USEC = 1000 # 1ms + + +class TestRingBuffer(VppTestCase): + """Ring Buffer Test Case""" + + maxDiff = None + + @classmethod + def setUpClass(cls): + super(TestRingBuffer, cls).setUpClass() + + def setUp(self): + super(TestRingBuffer, self).setUp() + + def tearDown(self): + super(TestRingBuffer, self).tearDown() + + def test_ring_buffer_generation(self): + """Test ring buffer generation""" + + # Step 1: Generate messages using the VPPApiClient CLI + cli_cmd = f"test stats ring-buffer-gen {RING_NAME} {COUNT} {INTERVAL_USEC} {RING_SIZE}" + result = self.vapi.cli(cli_cmd) + assert "Generated" in result, f"CLI failed: {result}" + + # Step 2: Connect to stats segment and get the ring buffer + ring_buffer = self.statistics.get_ring_buffer(f"{RING_NAME}") + + # Step 3: Poll for all messages + received = [] + start = time.time() + while len(received) < COUNT and (time.time() - start) < 10: + data = ring_buffer.poll_for_data(thread_index=0, timeout=0.5) + for entry in data: + # Unpack the struct: u64 seq, f64 timestamp + seq, ts = struct.unpack(" 0, f"Timestamp should be positive, got {ts}" + + # print(f"Received {len(received)} messages from ring buffer '{RING_NAME}'") + + def test_ring_buffer_overwrite(self): + """Test ring buffer overwrite behavior""" + ring_size = 16 + total_messages = 3 * ring_size + interval_usec = 100 # Fast, to fill quickly + ring_name = "/test/ring_buffer_overwrite" + + # Step 1: Generate more messages than the ring size + cli_cmd = f"test stats ring-buffer-gen {ring_name} {total_messages} {interval_usec} {ring_size}" + result = self.vapi.cli(cli_cmd) + assert "Generated" in result, f"CLI failed: {result}" + + # Ensure all messages are written before polling + time.sleep(0.1) + + # Step 2: Connect to stats segment and get the ring buffer + ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}") + + # Step 3: Poll for all messages (should only get the last 'ring_size' messages) + received = {} + start = time.time() + while len(received) < ring_size and (time.time() - start) < 10: + data = ring_buffer.poll_for_data(thread_index=0, timeout=0.5) + for entry in data: + seq, ts = struct.unpack(" 0, f"Timestamp should be positive, got {ts}" + + def test_ring_buffer_batch_operations(self): + """Test ring buffer batch operations""" + ring_name = "/test/ring_buffer_batch" + batch_size = 10 + total_messages = 50 + + # Step 1: Generate messages using VPP CLI + cli_cmd = f"test stats ring-buffer-gen {ring_name} {total_messages} {INTERVAL_USEC} {RING_SIZE}" + result = self.vapi.cli(cli_cmd) + assert "Generated" in result, f"CLI failed: {result}" + + # Step 2: Get ring buffer + ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}") + + # Step 3: Test individual consume + individual_data = [] + start = time.time() + while len(individual_data) < total_messages and (time.time() - start) < 10: + data = ring_buffer.consume_data(thread_index=0, max_entries=batch_size) + individual_data.extend(data) + time.sleep(0.01) + + # Step 4: Reset and test batch consume + ring_buffer.local_tails[0] = 0 + ring_buffer.last_sequences[0] = None + + batch_data = [] + start = time.time() + while len(batch_data) < total_messages and (time.time() - start) < 10: + data = ring_buffer.consume_data_batch( + thread_index=0, max_entries=batch_size + ) + batch_data.extend(data) + time.sleep(0.01) + + # Step 5: Validate results are the same + assert len(individual_data) == len( + batch_data + ), "Individual and batch should return same number of entries" + assert ( + individual_data == batch_data + ), "Individual and batch should return same data" + + # Step 6: Validate data format + for entry in individual_data: + seq, ts = struct.unpack("= 0, f"Sequence should be non-negative, got {seq}" + assert ts > 0, f"Timestamp should be positive, got {ts}" + + def test_ring_buffer_configuration(self): + """Test ring buffer configuration access""" + ring_name = "/test/ring_buffer_config" + + # Step 1: Create ring buffer + cli_cmd = ( + f"test stats ring-buffer-gen {ring_name} 10 {INTERVAL_USEC} {RING_SIZE}" + ) + result = self.vapi.cli(cli_cmd) + assert "Generated" in result, f"CLI failed: {result}" + + # Step 2: Get ring buffer and check configuration + ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}") + config = ring_buffer.get_config() + + # Step 3: Validate configuration + assert "entry_size" in config, "Config should contain entry_size" + assert "ring_size" in config, "Config should contain ring_size" + assert "n_threads" in config, "Config should contain n_threads" + assert config["entry_size"] > 0, "Entry size should be positive" + assert config["ring_size"] > 0, "Ring size should be positive" + assert config["n_threads"] > 0, "Number of threads should be positive" + + # Step 4: Check metadata access + metadata = ring_buffer._get_thread_metadata(0) + assert "head" in metadata, "Metadata should contain head" + assert "sequence" in metadata, "Metadata should contain sequence" + assert isinstance(metadata["head"], int), "Head should be integer" + assert isinstance(metadata["sequence"], int), "Sequence should be integer" + + def test_ring_buffer_error_handling(self): + """Test ring buffer error handling""" + ring_name = "/test/ring_buffer_error" + + # Step 1: Create ring buffer + cli_cmd = ( + f"test stats ring-buffer-gen {ring_name} 10 {INTERVAL_USEC} {RING_SIZE}" + ) + result = self.vapi.cli(cli_cmd) + assert "Generated" in result, f"CLI failed: {result}" + + # Step 2: Get ring buffer + ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}") + + # Step 3: Test invalid thread index + config = ring_buffer.get_config() + invalid_thread = config["n_threads"] + 1 + + try: + ring_buffer._get_thread_metadata(invalid_thread) + assert False, "Should have raised IndexError for invalid thread" + except IndexError: + pass # Expected + + # Step 4: Test invalid parameters + data = ring_buffer.consume_data(thread_index=0, max_entries=0) + assert data == [], "Zero max_entries should return empty list" + + data = ring_buffer.consume_data(thread_index=0, max_entries=-1) + assert data == [], "Negative max_entries should return empty list" + + def test_ring_buffer_api_compatibility(self): + """Test ring buffer API compatibility methods""" + ring_name = "/test/ring_buffer_compat" + + # Step 1: Create ring buffer + cli_cmd = ( + f"test stats ring-buffer-gen {ring_name} 10 {INTERVAL_USEC} {RING_SIZE}" + ) + result = self.vapi.cli(cli_cmd) + assert "Generated" in result, f"CLI failed: {result}" + + # Step 2: Get ring buffer + ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}") + + # Step 3: Test compatibility methods + count = ring_buffer.get_count(thread_index=0) + is_empty = ring_buffer.is_empty(thread_index=0) + is_full = ring_buffer.is_full(thread_index=0) + + # These methods return simplified values since writer doesn't track reader state + assert count == 0, "Count should be 0 (writer doesn't track reader)" + assert is_empty == True, "Is empty should be True (writer doesn't track reader)" + assert is_full == False, "Is full should be False (writer doesn't track reader)" + + # Step 4: Test string representation + repr_str = repr(ring_buffer) + assert ( + "StatsRingBuffer" in repr_str + ), "String representation should contain StatsRingBuffer" + assert ( + "entry_size" in repr_str + ), "String representation should contain entry_size" + assert "ring_size" in repr_str, "String representation should contain ring_size" + assert "n_threads" in repr_str, "String representation should contain n_threads" + + def test_ring_buffer_performance(self): + """Test ring buffer performance characteristics""" + ring_name = "/test/ring_buffer_perf" + test_entries = 100 + + # Step 1: Create ring buffer + cli_cmd = f"test stats ring-buffer-gen {ring_name} {test_entries} {INTERVAL_USEC} {RING_SIZE}" + result = self.vapi.cli(cli_cmd) + assert "Generated" in result, f"CLI failed: {result}" + + # Step 2: Get ring buffer + ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}") + + # Step 3: Test individual consume performance + start_time = time.time() + individual_count = 0 + for _ in range(100): # 100 operations + data = ring_buffer.consume_data(thread_index=0, max_entries=1) + individual_count += len(data) + individual_time = time.time() - start_time + + # Step 4: Reset and test batch consume performance + ring_buffer.local_tails[0] = 0 + ring_buffer.last_sequences[0] = None + + start_time = time.time() + batch_count = 0 + for _ in range(10): # 10 batch operations + data = ring_buffer.consume_data_batch(thread_index=0, max_entries=10) + batch_count += len(data) + batch_time = time.time() - start_time + + # Step 5: Calculate performance metrics + individual_throughput = 100 / individual_time if individual_time > 0 else 0 + batch_throughput = 100 / batch_time if batch_time > 0 else 0 + + print(f"Individual consume: {individual_throughput:.0f} ops/sec") + print(f"Batch consume: {batch_throughput:.0f} ops/sec") + + # Step 6: Validate performance (basic sanity checks) + assert individual_time > 0, "Individual operations should take some time" + assert batch_time > 0, "Batch operations should take some time" + assert individual_count >= 0, "Individual count should be non-negative" + assert batch_count >= 0, "Batch count should be non-negative" + + def test_ring_buffer_multiple_threads(self): + """Test ring buffer access across multiple threads""" + ring_name = "/test/ring_buffer_multi" + threads = 2 + + # Step 1: Create ring buffer with multiple threads using CLI (single-threaded) + cli_cmd = ( + f"test stats ring-buffer-gen {ring_name} 10 {INTERVAL_USEC} {RING_SIZE}" + ) + result = self.vapi.cli(cli_cmd) + assert "Generated" in result, f"CLI failed: {result}" + + # Step 2: Get ring buffer + ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}") + + # Step 3: Test single thread (since CLI only supports single-threaded) + config = ring_buffer.get_config() + assert ( + config["n_threads"] == 1 + ), f"CLI creates single-threaded ring buffers, got {config['n_threads']}" + + # Test metadata access + metadata = ring_buffer._get_thread_metadata(0) + assert "head" in metadata, "Thread 0 metadata should contain head" + assert "sequence" in metadata, "Thread 0 metadata should contain sequence" + + # Test data consumption + data = ring_buffer.consume_data(thread_index=0) + assert isinstance(data, list), "Thread 0 should return list" + + def test_ring_buffer_sequence_consistency(self): + """Test sequence number consistency across reads""" + ring_name = "/test/ring_buffer_seq" + + # Step 1: Create ring buffer + cli_cmd = ( + f"test stats ring-buffer-gen {ring_name} 10 {INTERVAL_USEC} {RING_SIZE}" + ) + result = self.vapi.cli(cli_cmd) + assert "Generated" in result, f"CLI failed: {result}" + + # Step 2: Get ring buffer + ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}") + + # Step 3: Read metadata multiple times to check consistency + metadata1 = ring_buffer._get_thread_metadata(0) + time.sleep(0.01) # Small delay + metadata2 = ring_buffer._get_thread_metadata(0) + + # Sequence numbers should be consistent (same or increasing) + assert ( + metadata2["sequence"] >= metadata1["sequence"] + ), "Sequence should be monotonically increasing" + + def test_ring_buffer_polling_with_callback(self): + """Test ring buffer polling with callback function""" + ring_name = "/test/ring_buffer_callback" + + # Step 1: Create ring buffer + cli_cmd = ( + f"test stats ring-buffer-gen {ring_name} 5 {INTERVAL_USEC} {RING_SIZE}" + ) + result = self.vapi.cli(cli_cmd) + assert "Generated" in result, f"CLI failed: {result}" + + # Step 2: Get ring buffer + ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}") + + # Step 3: Test polling with callback + collected_data = [] + + def callback(data): + collected_data.append(data) + + # Poll with callback and short timeout + result_data = ring_buffer.poll_for_data( + thread_index=0, timeout=1.0, callback=callback + ) + + # Both callback and return value should work + assert isinstance(result_data, list), "Poll should return list" + assert isinstance(collected_data, list), "Callback should collect data in list" + + def test_ring_buffer_empty_operations(self): + """Test ring buffer operations on empty buffer""" + ring_name = "/test/ring_buffer_empty" + + # Step 1: Create ring buffer but don't generate data + cli_cmd = ( + f"test stats ring-buffer-gen {ring_name} 0 {INTERVAL_USEC} {RING_SIZE}" + ) + result = self.vapi.cli(cli_cmd) + assert "Generated" in result, f"CLI failed: {result}" + + # Step 2: Get ring buffer + ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}") + + # Step 3: Test empty consume + data = ring_buffer.consume_data(thread_index=0) + assert data == [], "Empty buffer should return empty list" + + # Step 4: Test empty batch consume + data = ring_buffer.consume_data_batch(thread_index=0) + assert data == [], "Empty buffer should return empty list for batch" + + # Step 5: Test empty poll + data = ring_buffer.poll_for_data(thread_index=0, timeout=0.1) + assert data == [], "Empty buffer should return empty list for poll" + + def test_ring_buffer_prefetch_parameter(self): + """Test prefetch parameter in batch consume""" + ring_name = "/test/ring_buffer_prefetch" + + # Step 1: Create ring buffer + cli_cmd = ( + f"test stats ring-buffer-gen {ring_name} 10 {INTERVAL_USEC} {RING_SIZE}" + ) + result = self.vapi.cli(cli_cmd) + assert "Generated" in result, f"CLI failed: {result}" + + # Step 2: Get ring buffer + ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}") + + # Step 3: Test with prefetch enabled + data1 = ring_buffer.consume_data_batch(thread_index=0, prefetch=True) + + # Step 4: Reset and test with prefetch disabled + ring_buffer.local_tails[0] = 0 + ring_buffer.last_sequences[0] = None + + data2 = ring_buffer.consume_data_batch(thread_index=0, prefetch=False) + + # Step 5: Results should be the same regardless of prefetch setting + assert data1 == data2, "Prefetch parameter should not affect results" diff --git a/test/test_ring_buffer_simple.py b/test/test_ring_buffer_simple.py new file mode 100644 index 00000000000..086d654a773 --- /dev/null +++ b/test/test_ring_buffer_simple.py @@ -0,0 +1,218 @@ +#!/usr/bin/env python3 +""" +Simple Ring Buffer Test Runner + +This script provides a simple way to test ring buffer functionality +using the existing VPP make test infrastructure. + +Usage: + python3 test/test_ring_buffer_simple.py + +This script can be run as part of the VPP test suite or standalone. +""" + +import sys +import time +import struct +from framework import VppTestCase + + +class TestRingBufferSimple(VppTestCase): + """Simple Ring Buffer Test Case for Make Test Infrastructure""" + + def test_ring_buffer_basic_functionality(self): + """Test basic ring buffer functionality""" + import time + import struct + + # Use a unique name for each test run to avoid reading old data + ring_name = f"/test/ring_buffer_simple_{int(time.time())}" + count = 50 + interval_usec = 1000 + ring_size = 64 + + # Step 1: Generate messages using VPP CLI + cli_cmd = f"test stats ring-buffer-gen {ring_name} {count} {interval_usec} {ring_size}" + result = self.vapi.cli(cli_cmd) + assert "Generated" in result, f"CLI failed: {result}" + + # Step 2: Get ring buffer from stats segment + ring_buffer = self.statistics.get_ring_buffer(ring_name) + + # Step 3: Consume all messages + received = [] + start = time.time() + + # Debug: Check initial state + metadata = ring_buffer._get_thread_metadata(0) + print( + f"DEBUG: Initial metadata - head: {metadata['head']}, sequence: {metadata['sequence']}" + ) + print(f"DEBUG: Ring size: {ring_buffer.get_config()['ring_size']}") + + while len(received) < count and (time.time() - start) < 10: + data = ring_buffer.consume_data(thread_index=0, max_entries=10) + print(f"DEBUG: Consumed {len(data)} entries") + for entry in data: + seq, ts = struct.unpack(" 0, f"Timestamp should be positive, got {ts}" + + print(f"✓ Successfully received {len(received)} messages from ring buffer") + + def test_ring_buffer_batch_operations(self): + """Test ring buffer batch operations""" + ring_name = "/test/ring_buffer_batch_simple" + count = 30 + interval_usec = 1000 + ring_size = 64 + + # Step 1: Generate messages + cli_cmd = f"test stats ring-buffer-gen {ring_name} {count} {interval_usec} {ring_size}" + result = self.vapi.cli(cli_cmd) + assert "Generated" in result, f"CLI failed: {result}" + + # Step 2: Get ring buffer + ring_buffer = self.statistics.get_ring_buffer(ring_name) + + # Step 3: Test batch consume + data = ring_buffer.consume_data_batch(thread_index=0, max_entries=count) + + # Step 4: Validate results + assert len(data) == count, f"Expected {count} entries, got {len(data)}" + + # Check data format + for i, entry in enumerate(data): + seq, ts = struct.unpack(" 0, f"Timestamp should be positive, got {ts}" + + print(f"✓ Successfully received {len(data)} messages using batch operations") + + def test_ring_buffer_configuration(self): + """Test ring buffer configuration access""" + ring_name = "/test/ring_buffer_config_simple" + count = 10 + interval_usec = 1000 + ring_size = 32 + + # Step 1: Create ring buffer + cli_cmd = f"test stats ring-buffer-gen {ring_name} {count} {interval_usec} {ring_size}" + result = self.vapi.cli(cli_cmd) + assert "Generated" in result, f"CLI failed: {result}" + + # Step 2: Get ring buffer and check configuration + ring_buffer = self.statistics.get_ring_buffer(ring_name) + config = ring_buffer.get_config() + + # Step 3: Validate configuration + assert ( + config["entry_size"] == 16 + ), f"Expected entry_size 16, got {config['entry_size']}" + assert ( + config["ring_size"] == ring_size + ), f"Expected ring_size {ring_size}, got {config['ring_size']}" + assert ( + config["n_threads"] == 1 + ), f"Expected n_threads 1, got {config['n_threads']}" + + # Step 4: Check metadata + metadata = ring_buffer._get_thread_metadata(0) + assert "head" in metadata, "Metadata should contain head" + assert "sequence" in metadata, "Metadata should contain sequence" + assert ( + metadata["sequence"] >= count + ), f"Sequence should be >= {count}, got {metadata['sequence']}" + + print(f"✓ Ring buffer configuration validated: {config}") + + def test_ring_buffer_empty_operations(self): + """Test ring buffer operations on empty buffer""" + ring_name = "/test/ring_buffer_empty_simple" + + # Step 1: Create ring buffer without generating data + cli_cmd = f"test stats ring-buffer-gen {ring_name} 0 1000 32" + result = self.vapi.cli(cli_cmd) + assert "Generated" in result, f"CLI failed: {result}" + + # Step 2: Get ring buffer + ring_buffer = self.statistics.get_ring_buffer(ring_name) + + # Step 3: Test various operations on empty buffer + data = ring_buffer.consume_data(thread_index=0) + assert data == [], "Empty buffer should return empty list" + + data = ring_buffer.consume_data_batch(thread_index=0) + assert data == [], "Empty buffer should return empty list for batch" + + data = ring_buffer.poll_for_data(thread_index=0, timeout=0.1) + assert data == [], "Empty buffer should return empty list for poll" + + # Step 4: Check API compatibility methods + count = ring_buffer.get_count(thread_index=0) + is_empty = ring_buffer.is_empty(thread_index=0) + is_full = ring_buffer.is_full(thread_index=0) + + assert count == 0, "Count should be 0 for empty buffer" + assert is_empty == True, "Is empty should be True for empty buffer" + assert is_full == False, "Is full should be False for empty buffer" + + print("✓ Empty buffer operations validated") + + def test_ring_buffer_error_handling(self): + """Test ring buffer error handling""" + ring_name = "/test/ring_buffer_error_simple" + + # Step 1: Create ring buffer + cli_cmd = f"test stats ring-buffer-gen {ring_name} 5 1000 32" + result = self.vapi.cli(cli_cmd) + assert "Generated" in result, f"CLI failed: {result}" + + # Step 2: Get ring buffer + ring_buffer = self.statistics.get_ring_buffer(ring_name) + + # Step 3: Test invalid thread index + try: + ring_buffer._get_thread_metadata(999) # Invalid thread + assert False, "Should have raised IndexError for invalid thread" + except IndexError: + pass # Expected + + # Step 4: Test invalid parameters + data = ring_buffer.consume_data(thread_index=0, max_entries=0) + assert data == [], "Zero max_entries should return empty list" + + data = ring_buffer.consume_data(thread_index=0, max_entries=-1) + assert data == [], "Negative max_entries should return empty list" + + print("✓ Error handling validated") + + +def main(): + """Main function for standalone execution""" + import unittest + + # Create test suite + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(TestRingBufferSimple)) + + # Run tests + runner = unittest.TextTestRunner(verbosity=2) + result = runner.run(suite) + + # Return appropriate exit code + return 0 if result.wasSuccessful() else 1 + + +if __name__ == "__main__": + sys.exit(main())