util_test.c
vlib_test.c
counter_test.c
+ ring_buffer_test.c
+ histogram_test.c
COMPONENT
vpp-plugin-devtools
--- /dev/null
+#include <vlib/vlib.h>
+#include <vlib/stats/stats.h>
+#include <vlib/cli.h>
+#include <vlib/counter.h>
+#include <vppinfra/time.h>
+
+// Histogram test main structure
+static vlib_log2_histogram_main_t histogram_test_main;
+
+static clib_error_t *
+histogram_gen_command_fn (vlib_main_t *vm, unformat_input_t *input,
+ vlib_cli_command_t *cmd)
+{
+ u8 *name = 0;
+ u32 count = 1000;
+ u32 interval_usec = 1000;
+ u32 min_exp = 0; // Minimum exponent (2^0 = 1)
+ u32 num_bins = 16; // Number of bins
+ u32 i;
+ u32 value;
+
+ // Parse CLI arguments
+ if (!unformat (input, "%s %u %u %u %u", &name, &count, &interval_usec,
+ &min_exp, &num_bins))
+ {
+ // Try without min_exp and num_bins (for backward compatibility)
+ if (!unformat (input, "%s %u %u", &name, &count, &interval_usec))
+ return clib_error_return (
+ 0,
+ "parse error: '%U'\nUsage: test stats histogram-gen <name> "
+ "<count> <interval-usec> [min-exp] [num-bins]",
+ format_unformat_error, input);
+ }
+
+ // Initialize histogram
+ histogram_test_main.name = (char *) name;
+ histogram_test_main.min_exp = min_exp;
+ vlib_validate_log2_histogram (&histogram_test_main, num_bins);
+
+ clib_warning ("DEBUG: Created histogram '%s' with min_exp=%u, num_bins=%u",
+ name, min_exp, num_bins);
+
+ // Generate test data with different distributions
+ for (i = 0; i < count; ++i)
+ {
+ // Generate different types of values for testing
+ if (i % 4 == 0)
+ {
+ // Small values (0-15)
+ value = i % 16;
+ }
+ else if (i % 4 == 1)
+ {
+ // Medium values (16-255)
+ value = 16 + (i % 240);
+ }
+ else if (i % 4 == 2)
+ {
+ // Large values (256-4095)
+ value = 256 + (i % 3840);
+ }
+ else
+ {
+ // Very large values (4096+)
+ value = 4096 + (i % 10000);
+ }
+
+ // Calculate bin index and increment
+ u8 bin = vlib_log2_histogram_bin_index (&histogram_test_main, value);
+ vlib_increment_log2_histogram_bin (&histogram_test_main, 0, bin, 1);
+
+ // Debug: Print every 100th entry or last few entries
+ if (i % 100 == 0 || i >= count - 5)
+ {
+ clib_warning ("DEBUG: Entry %u - value: %u, bin: %u (2^%u = %u)", i,
+ value, bin, min_exp + bin, 1ULL << (min_exp + bin));
+ }
+
+ vlib_process_suspend (vm, 1e-6 * interval_usec);
+ }
+
+ vlib_cli_output (
+ vm, "Generated %u histogram entries for '%s' (min_exp=%u, bins=%u)", count,
+ name, min_exp, num_bins);
+ return 0;
+}
+
+VLIB_CLI_COMMAND (histogram_gen_command, static) = {
+ .path = "test stats histogram-gen",
+ .short_help = "test stats histogram-gen <name> <count> <interval-usec> "
+ "[min-exp] [num-bins]",
+ .function = histogram_gen_command_fn,
+};
+
+static clib_error_t *
+histogram_clear_command_fn (vlib_main_t *vm, unformat_input_t *input,
+ vlib_cli_command_t *cmd)
+{
+ u8 *name = 0;
+ u32 entry_index;
+
+ if (!unformat (input, "%s", &name))
+ return clib_error_return (
+ 0, "parse error: '%U'\nUsage: test stats histogram-clear <name>",
+ format_unformat_error, input);
+
+ entry_index = vlib_stats_find_entry_index ("%s", name);
+ if (entry_index == STAT_SEGMENT_INDEX_INVALID)
+ {
+ return clib_error_return (0, "Histogram '%s' not found", name);
+ }
+
+ vlib_stats_remove_entry (entry_index);
+ vlib_cli_output (vm, "Cleared histogram '%s'", name);
+ return 0;
+}
+
+VLIB_CLI_COMMAND (histogram_clear_command, static) = {
+ .path = "test stats histogram-clear",
+ .short_help = "test stats histogram-clear <name>",
+ .function = histogram_clear_command_fn,
+};
\ No newline at end of file
--- /dev/null
+#include <vlib/vlib.h>
+#include <vlib/stats/stats.h>
+#include <vlib/cli.h>
+#include <vppinfra/time.h>
+
+// Message struct for the ring buffer
+typedef struct
+{
+ u64 seq;
+ f64 timestamp;
+} ring_test_msg_t;
+
+// Create a simple test schema string (CDDL-like format)
+static const char *
+get_ring_test_schema_string (void)
+{
+ return "ring_test_schema = {\n"
+ " name: \"ring_test\",\n"
+ " version: 1,\n"
+ " fields: [\n"
+ " { id: 0, name: \"seq\", type: \"uint64\" },\n"
+ " { id: 1, name: \"timestamp\", type: \"float64\" }\n"
+ " ]\n"
+ "}";
+}
+
+static clib_error_t *
+ring_buffer_gen_command_fn (vlib_main_t *vm, unformat_input_t *input,
+ vlib_cli_command_t *cmd)
+{
+ u8 *name = 0;
+ u32 count = 1000;
+ u32 interval_usec = 1000;
+ u32 ring_size = 16; // Default ring size
+ u32 i;
+ vlib_stats_ring_config_t config;
+ u32 entry_index;
+ ring_test_msg_t msg;
+
+ // Parse CLI arguments
+ if (!unformat (input, "%s %u %u %u", &name, &count, &interval_usec,
+ &ring_size))
+ {
+ // Try without ring_size (for backward compatibility)
+ if (!unformat (input, "%s %u %u", &name, &count, &interval_usec))
+ return clib_error_return (
+ 0,
+ "parse error: '%U'\nUsage: test stats ring-buffer-gen <name> "
+ "<count> <interval-usec> [ring-size]",
+ format_unformat_error, input);
+ }
+
+ // Get test schema string
+ const char *schema_string = get_ring_test_schema_string ();
+ u32 schema_size = strlen (schema_string);
+
+ config.entry_size = sizeof (ring_test_msg_t);
+ config.ring_size = ring_size;
+ config.n_threads = 1;
+ config.schema_size = schema_size;
+ config.schema_version = 1;
+
+ // Create or find the ring buffer
+ entry_index = vlib_stats_find_entry_index ("%s", name);
+ if (entry_index == STAT_SEGMENT_INDEX_INVALID)
+ {
+ clib_warning ("DEBUG: Creating new ring buffer with name: %s", name);
+ entry_index =
+ vlib_stats_add_ring_buffer (&config, (u8 *) schema_string, "%s", name);
+ }
+ else
+ {
+ clib_warning (
+ "DEBUG: Found existing ring buffer with name: %s, entry_index: %u",
+ name, entry_index);
+ }
+ if (entry_index == STAT_SEGMENT_INDEX_INVALID)
+ return clib_error_return (0, "Failed to create/find ring buffer");
+
+ // Debug: Print initial ring buffer state
+ vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+ vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index);
+ vlib_stats_ring_buffer_t *ring_buffer = e->data;
+ vlib_stats_ring_metadata_t *metadata =
+ (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer +
+ ring_buffer->metadata_offset);
+ clib_warning ("DEBUG: Initial ring buffer state - head: %u, sequence: %lu",
+ metadata->head, metadata->sequence);
+
+ for (i = 0; i < count; ++i)
+ {
+ msg.seq = i;
+ msg.timestamp = vlib_time_now (vm);
+ vlib_stats_ring_produce (entry_index, 0, &msg);
+
+ // Debug: Print every 10th entry or last few entries
+ if (i % 10 == 0 || i >= count - 5)
+ {
+ clib_warning (
+ "DEBUG: Wrote entry %u - seq: %lu, head: %u, sequence: %lu", i,
+ msg.seq, metadata->head, metadata->sequence);
+ }
+
+ vlib_process_suspend (vm, 1e-6 * interval_usec);
+ }
+
+ // Debug: Print final ring buffer state
+ clib_warning ("DEBUG: Final ring buffer state - head: %u, sequence: %lu",
+ metadata->head, metadata->sequence);
+
+ vlib_cli_output (vm,
+ "Generated %u messages to ring buffer '%s' (ring size %u)",
+ count, name, ring_size);
+ return 0;
+}
+
+VLIB_CLI_COMMAND (ring_buffer_gen_command, static) = {
+ .path = "test stats ring-buffer-gen",
+ .short_help =
+ "test stats ring-buffer-gen <name> <count> <interval-usec> [ring-size]",
+ .function = ring_buffer_gen_command_fn,
+};
+
+static clib_error_t *
+ring_buffer_show_schema_command_fn (vlib_main_t *vm, unformat_input_t *input,
+ vlib_cli_command_t *cmd)
+{
+ u8 *name = 0;
+ u32 entry_index;
+ u32 schema_size, schema_version;
+
+ if (!unformat (input, "%s", &name))
+ return clib_error_return (
+ 0, "parse error: '%U'\nUsage: test stats ring-buffer-show-schema <name>",
+ format_unformat_error, input);
+
+ entry_index = vlib_stats_find_entry_index ("%s", name);
+ if (entry_index == STAT_SEGMENT_INDEX_INVALID)
+ return clib_error_return (0, "Ring buffer '%s' not found", name);
+
+ if (vlib_stats_ring_get_schema (entry_index, 0, NULL, &schema_size,
+ &schema_version) != 0)
+ {
+ return clib_error_return (0, "No schema found in ring buffer '%s'",
+ name);
+ }
+
+ u8 *schema_data = clib_mem_alloc (schema_size);
+ if (!schema_data)
+ {
+ return clib_error_return (0, "Failed to allocate memory for schema");
+ }
+
+ if (vlib_stats_ring_get_schema (entry_index, 0, schema_data, &schema_size,
+ &schema_version) != 0)
+ {
+ clib_mem_free (schema_data);
+ return clib_error_return (0, "Failed to retrieve schema");
+ }
+
+ // Display schema as string
+ vlib_cli_output (vm, "Ring Buffer Schema (version %u):\n", schema_version);
+ vlib_cli_output (vm, "%.*s\n", schema_size, schema_data);
+
+ clib_mem_free (schema_data);
+ return 0;
+}
+
+VLIB_CLI_COMMAND (ring_buffer_show_schema_command, static) = {
+ .path = "test stats ring-buffer-show-schema",
+ .short_help = "test stats ring-buffer-show-schema <name>",
+ .function = ring_buffer_show_schema_command_fn,
+};
+
+// Test command to verify schema functionality
+static clib_error_t *
+ring_buffer_test_schema_command_fn (vlib_main_t *vm, unformat_input_t *input,
+ vlib_cli_command_t *cmd)
+{
+ u8 *name = 0;
+ u32 entry_index;
+ u32 schema_size, schema_version;
+
+ if (!unformat (input, "%s", &name))
+ return clib_error_return (
+ 0, "parse error: '%U'\nUsage: test stats ring-buffer-test-schema <name>",
+ format_unformat_error, input);
+
+ entry_index = vlib_stats_find_entry_index ("%s", name);
+ if (entry_index == STAT_SEGMENT_INDEX_INVALID)
+ return clib_error_return (0, "Ring buffer '%s' not found", name);
+
+ // Test schema retrieval
+ if (vlib_stats_ring_get_schema (entry_index, 0, NULL, &schema_size,
+ &schema_version) != 0)
+ {
+ vlib_cli_output (vm, "❌ No schema found in ring buffer '%s'\n", name);
+ return 0;
+ }
+
+ vlib_cli_output (vm, "✅ Schema found in ring buffer '%s':\n", name);
+ vlib_cli_output (vm, " - Schema size: %u bytes\n", schema_size);
+ vlib_cli_output (vm, " - Schema version: %u\n", schema_version);
+
+ // Test schema data retrieval
+ u8 *schema_data = clib_mem_alloc (schema_size);
+ if (!schema_data)
+ {
+ return clib_error_return (0, "Failed to allocate memory for schema");
+ }
+
+ if (vlib_stats_ring_get_schema (entry_index, 0, schema_data, &schema_size,
+ &schema_version) != 0)
+ {
+ clib_mem_free (schema_data);
+ vlib_cli_output (vm, "❌ Failed to retrieve schema data\n");
+ return 0;
+ }
+
+ vlib_cli_output (vm, "✅ Schema data retrieved successfully\n");
+
+ // Verify schema is a valid string (null-terminated)
+ if (schema_size > 0 && schema_data[schema_size - 1] == '\0')
+ {
+ vlib_cli_output (vm, "✅ Schema is a valid null-terminated string\n");
+ vlib_cli_output (vm, "Schema content:\n%.*s\n", schema_size - 1,
+ schema_data);
+ }
+ else
+ {
+ vlib_cli_output (vm, "✅ Schema is a valid string (size: %u bytes)\n",
+ schema_size);
+ vlib_cli_output (vm, "Schema content:\n%.*s\n", schema_size,
+ schema_data);
+ }
+
+ clib_mem_free (schema_data);
+ return 0;
+}
+
+VLIB_CLI_COMMAND (ring_buffer_test_schema_command, static) = {
+ .path = "test stats ring-buffer-test-schema",
+ .short_help = "test stats ring-buffer-test-schema <name>",
+ .function = ring_buffer_test_schema_command_fn,
+};
#include <vlib/stats/stats.h>
void
-vlib_clear_simple_counters (vlib_simple_counter_main_t * cm)
+vlib_clear_simple_counters (vlib_simple_counter_main_t *cm)
{
counter_t *my_counters;
uword i, j;
}
void
-vlib_clear_combined_counters (vlib_combined_counter_main_t * cm)
+vlib_clear_combined_counters (vlib_combined_counter_main_t *cm)
{
vlib_counter_t *my_counters;
uword i, j;
}
void
-vlib_validate_simple_counter (vlib_simple_counter_main_t * cm, u32 index)
+vlib_validate_simple_counter (vlib_simple_counter_main_t *cm, u32 index)
{
vlib_thread_main_t *tm = vlib_get_thread_main ();
char *name = cm->stat_segment_name ? cm->stat_segment_name : cm->name;
}
void
-vlib_free_simple_counter (vlib_simple_counter_main_t * cm)
+vlib_free_simple_counter (vlib_simple_counter_main_t *cm)
{
if (cm->stats_entry_index == ~0)
{
}
void
-vlib_validate_combined_counter (vlib_combined_counter_main_t * cm, u32 index)
+vlib_validate_combined_counter (vlib_combined_counter_main_t *cm, u32 index)
{
vlib_thread_main_t *tm = vlib_get_thread_main ();
char *name = cm->stat_segment_name ? cm->stat_segment_name : cm->name;
}
int
- vlib_validate_combined_counter_will_expand
- (vlib_combined_counter_main_t * cm, u32 index)
+vlib_validate_combined_counter_will_expand (vlib_combined_counter_main_t *cm,
+ u32 index)
{
vlib_thread_main_t *tm = vlib_get_thread_main ();
int i;
}
void
-vlib_free_combined_counter (vlib_combined_counter_main_t * cm)
+vlib_free_combined_counter (vlib_combined_counter_main_t *cm)
{
if (cm->stats_entry_index == ~0)
{
}
u32
-vlib_combined_counter_n_counters (const vlib_combined_counter_main_t * cm)
+vlib_combined_counter_n_counters (const vlib_combined_counter_main_t *cm)
{
ASSERT (cm->counters);
return (vec_len (cm->counters[0]));
}
u32
-vlib_simple_counter_n_counters (const vlib_simple_counter_main_t * cm)
+vlib_simple_counter_n_counters (const vlib_simple_counter_main_t *cm)
{
ASSERT (cm->counters);
return (vec_len (cm->counters[0]));
}
+void
+vlib_validate_log2_histogram (vlib_log2_histogram_main_t *hm, u32 num_bins)
+{
+ vlib_thread_main_t *tm = vlib_get_thread_main ();
+ char *name = hm->stat_segment_name ? hm->stat_segment_name : hm->name;
+
+ if (name == 0)
+ {
+ if (hm->bins == 0)
+ {
+ hm->stats_entry_index = ~0;
+ vec_validate (hm->bins, tm->n_vlib_mains - 1);
+ for (int i = 0; i < tm->n_vlib_mains; i++)
+ {
+ vec_validate_aligned (hm->bins[i], num_bins - 1,
+ CLIB_CACHE_LINE_BYTES);
+ }
+ return;
+ }
+ }
+
+ if (hm->bins == 0)
+ hm->stats_entry_index = vlib_stats_add_histogram_log2 ("%s", name);
+
+ vlib_stats_validate (hm->stats_entry_index, tm->n_vlib_mains - 1, num_bins);
+ hm->bins = vlib_stats_get_entry_data_pointer (hm->stats_entry_index);
+ for (int i = 0; i < tm->n_vlib_mains; i++)
+ {
+ counter_t *c = hm->bins[i];
+ c[0] = hm->min_exp;
+ }
+}
+
/*
* fd.io coding-style-patch-verification: ON
*
#define included_vlib_counter_h
#include <vlib/counter_types.h>
+#include <vppinfra/clib.h>
/** \file
typedef struct
{
- counter_t **counters; /**< Per-thread u64 non-atomic counters */
- char *name; /**< The counter collection's name. */
- char *stat_segment_name; /**< Name in stat segment directory */
+ counter_t **counters; /**< Per-thread u64 non-atomic counters */
+ char *name; /**< The counter collection's name. */
+ char *stat_segment_name; /**< Name in stat segment directory */
u32 stats_entry_index;
} vlib_simple_counter_main_t;
/** The number of counters (not the number of per-thread counters) */
-u32 vlib_simple_counter_n_counters (const vlib_simple_counter_main_t * cm);
+u32 vlib_simple_counter_n_counters (const vlib_simple_counter_main_t *cm);
/** Pre-fetch a per-thread simple counter for the given object index */
always_inline void
@returns - (u64) current counter value
*/
always_inline counter_t
-vlib_get_simple_counter (vlib_simple_counter_main_t * cm, u32 index)
+vlib_get_simple_counter (vlib_simple_counter_main_t *cm, u32 index)
{
counter_t *my_counters;
counter_t v;
@param index - (u32) index of the counter to clear
*/
always_inline void
-vlib_zero_simple_counter (vlib_simple_counter_main_t * cm, u32 index)
+vlib_zero_simple_counter (vlib_simple_counter_main_t *cm, u32 index)
{
counter_t *my_counters;
int i;
*/
always_inline void
-vlib_counter_add (vlib_counter_t * a, vlib_counter_t * b)
+vlib_counter_add (vlib_counter_t *a, vlib_counter_t *b)
{
a->packets += b->packets;
a->bytes += b->bytes;
@param b - (vlib_counter_t *) src counter
*/
always_inline void
-vlib_counter_sub (vlib_counter_t * a, vlib_counter_t * b)
+vlib_counter_sub (vlib_counter_t *a, vlib_counter_t *b)
{
ASSERT (a->packets >= b->packets);
ASSERT (a->bytes >= b->bytes);
@param a - (vlib_counter_t *) counter to clear
*/
always_inline void
-vlib_counter_zero (vlib_counter_t * a)
+vlib_counter_zero (vlib_counter_t *a)
{
a->packets = a->bytes = 0;
}
/** A collection of combined counters */
typedef struct
{
- vlib_counter_t **counters; /**< Per-thread u64 non-atomic counter pairs */
- char *name; /**< The counter collection's name. */
- char *stat_segment_name; /**< Name in stat segment directory */
+ vlib_counter_t **counters; /**< Per-thread u64 non-atomic counter pairs */
+ char *name; /**< The counter collection's name. */
+ char *stat_segment_name; /**< Name in stat segment directory */
u32 stats_entry_index;
} vlib_combined_counter_main_t;
/** The number of counters (not the number of per-thread counters) */
-u32 vlib_combined_counter_n_counters (const vlib_combined_counter_main_t *
- cm);
+u32 vlib_combined_counter_n_counters (const vlib_combined_counter_main_t *cm);
/** Clear a collection of simple counters
@param cm - (vlib_simple_counter_main_t *) collection to clear
*/
-void vlib_clear_simple_counters (vlib_simple_counter_main_t * cm);
+void vlib_clear_simple_counters (vlib_simple_counter_main_t *cm);
/** Clear a collection of combined counters
@param cm - (vlib_combined_counter_main_t *) collection to clear
*/
-void vlib_clear_combined_counters (vlib_combined_counter_main_t * cm);
+void vlib_clear_combined_counters (vlib_combined_counter_main_t *cm);
/** Increment a combined counter
@param cm - (vlib_combined_counter_main_t *) comined counter main pointer
clib_prefetch_store (cpu_counters + index);
}
-
/** Get the value of a combined counter, never called in the speed path
Scrapes the entire set of per-thread counters. Innacurate unless
worker threads which might increment the counter are
*/
static inline void
-vlib_get_combined_counter (const vlib_combined_counter_main_t * cm,
- u32 index, vlib_counter_t * result)
+vlib_get_combined_counter (const vlib_combined_counter_main_t *cm, u32 index,
+ vlib_counter_t *result)
{
vlib_counter_t *my_counters, *counter;
int i;
@param index - (u32) index of the counter to clear
*/
always_inline void
-vlib_zero_combined_counter (vlib_combined_counter_main_t * cm, u32 index)
+vlib_zero_combined_counter (vlib_combined_counter_main_t *cm, u32 index)
{
vlib_counter_t *my_counters, *counter;
int i;
}
/** validate a simple counter
- @param cm - (vlib_simple_counter_main_t *) pointer to the counter collection
+ @param cm - (vlib_simple_counter_main_t *) pointer to the counter
+ collection
@param index - (u32) index of the counter to validate
*/
-void vlib_validate_simple_counter (vlib_simple_counter_main_t * cm,
- u32 index);
-void vlib_free_simple_counter (vlib_simple_counter_main_t * cm);
+void vlib_validate_simple_counter (vlib_simple_counter_main_t *cm, u32 index);
+void vlib_free_simple_counter (vlib_simple_counter_main_t *cm);
/** validate a combined counter
@param cm - (vlib_combined_counter_main_t *) pointer to the counter
@param index - (u32) index of the counter to validate
*/
-void vlib_validate_combined_counter (vlib_combined_counter_main_t * cm,
+void vlib_validate_combined_counter (vlib_combined_counter_main_t *cm,
u32 index);
-int vlib_validate_combined_counter_will_expand
- (vlib_combined_counter_main_t * cm, u32 index);
+int
+vlib_validate_combined_counter_will_expand (vlib_combined_counter_main_t *cm,
+ u32 index);
-void vlib_free_combined_counter (vlib_combined_counter_main_t * cm);
+void vlib_free_combined_counter (vlib_combined_counter_main_t *cm);
/** Obtain the number of simple or combined counters allocated.
A macro which reduces to to vec_len(cm->maxi), the answer in either
(vlib_combined_counter_main_t) the counter collection to interrogate
@returns vec_len(cm->maxi)
*/
-#define vlib_counter_len(cm) vec_len((cm)->maxi)
+#define vlib_counter_len(cm) vec_len ((cm)->maxi)
+
+typedef struct
+{
+ counter_t **bins; /**< Per-thread u64 non-atomic histogram bins */
+ u32 min_exp; /**< log2 bin minimum exponent */
+ char *name; /**< The histogram collection's name. */
+ char *stat_segment_name; /**< Name in stat segment directory */
+ u32 stats_entry_index;
+} vlib_log2_histogram_main_t;
+
+void vlib_validate_log2_histogram (vlib_log2_histogram_main_t *hm,
+ u32 num_bins);
+
+static_always_inline u8
+vlib_log2_histogram_bin_index (const vlib_log2_histogram_main_t *hm, u32 value)
+{
+ if (value == 0)
+ return 0;
+ u8 log2_val = min_log2 (value);
+ int min_exp = hm->min_exp;
+ int n_bins = vec_len (hm->bins[0]);
+ int bin = log2_val - min_exp;
+ if (bin < 0)
+ bin = 0;
+ if (bin >= n_bins)
+ bin = n_bins - 1;
+ return (u8) bin;
+}
+
+always_inline void
+vlib_increment_log2_histogram_bin (vlib_log2_histogram_main_t *hm,
+ clib_thread_index_t thread_index, u8 bin,
+ u64 increment)
+{
+ uint64_t *my_bins = hm->bins[thread_index];
+ my_bins[bin + 1] += increment;
+}
#endif /* included_vlib_counter_h */
type_name = "NameVector";
break;
+ case STAT_DIR_TYPE_RING_BUFFER:
+ type_name = "RingBuffer";
+ break;
+
+ case STAT_DIR_TYPE_HISTOGRAM_LOG2:
+ type_name = "Histogram";
+ break;
case STAT_DIR_TYPE_EMPTY:
type_name = "empty";
break;
return format (s, format_string, ep->name, type_name, 0);
}
+
+static u8 *
+format_stat_dir_entry_detail (u8 *s, va_list *args)
+{
+ vlib_stats_entry_t *ep = va_arg (*args, vlib_stats_entry_t *);
+
+ if (ep->type == STAT_DIR_TYPE_RING_BUFFER)
+ {
+ vlib_stats_ring_buffer_t *rb = ep->data;
+ if (rb)
+ {
+ s = format (s, "RingBuffer: %s\n", ep->name);
+ s = format (s, " ring_size: %u\n", rb->config.ring_size);
+ s = format (s, " entry_size: %u\n", rb->config.entry_size);
+ s = format (s, " n_threads: %u\n", rb->config.n_threads);
+ for (u32 t = 0; t < rb->config.n_threads; t++)
+ {
+ vlib_stats_ring_metadata_t *md =
+ (vlib_stats_ring_metadata_t
+ *) ((u8 *) rb + rb->metadata_offset +
+ t * sizeof (vlib_stats_ring_metadata_t));
+ s = format (s, " [thread %u] head:%u seq:%llu\n", t, md->head,
+ (unsigned long long) md->sequence);
+ }
+ }
+ else
+ {
+ s = format (s, "RingBuffer: %s (uninitialized)\n", ep->name);
+ }
+ }
+ else if (ep->type == STAT_DIR_TYPE_HISTOGRAM_LOG2)
+ {
+ u64 **log2_histogram_bins = ep->data;
+ if (!log2_histogram_bins)
+ {
+ s = format (s, "Histogram: %s (uninitialized)\n", ep->name);
+ return s;
+ }
+
+ s = format (s, "Histogram: %s\n", ep->name);
+ for (u32 k = 0; k < vec_len (log2_histogram_bins); k++)
+ {
+ u64 *bins = log2_histogram_bins[k];
+ int n_bins = vec_len (bins);
+ if (n_bins < 2) // Need at least min_exp + one bin
+ continue;
+ u32 min_exp = bins[0];
+ u64 cumulative = 0;
+ u64 sum = 0;
+ s = format (s, " [thread %u]:\n", k);
+ for (int j = 1; j < n_bins; ++j)
+ {
+ cumulative += bins[j];
+ sum += bins[j] * (1ULL << (min_exp + j - 1)); // midpoint approx
+ s = format (s, " <= %llu: %llu (cumulative: %llu)\n",
+ (1ULL << (min_exp + j - 1)), bins[j], cumulative);
+ }
+ s = format (s, " +Inf: %llu (total count: %llu, sum: %llu)\n",
+ cumulative, cumulative, sum);
+ }
+ }
+ else
+ {
+ s = format (s, "Entry: %s (type %d)\n", ep->name, ep->type);
+ }
+ return s;
+}
+
static clib_error_t *
show_stat_segment_command_fn (vlib_main_t *vm, unformat_input_t *input,
vlib_cli_command_t *cmd)
vlib_stats_segment_t *sm = vlib_stats_get_segment ();
vlib_stats_entry_t *show_data;
int i;
-
int verbose = 0;
+ u8 *counter_name = 0;
- if (unformat (input, "verbose"))
- verbose = 1;
+ // Parse both 'verbose' and counter name in any order
+ while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
+ {
+ if (unformat (input, "verbose"))
+ verbose = 1;
+ else if (unformat (input, "%s", &counter_name))
+ ;
+ else
+ break;
+ }
+
+ if (counter_name)
+ {
+ u32 index = vlib_stats_find_entry_index ("%s", counter_name);
+ if (index != STAT_SEGMENT_INDEX_INVALID)
+ {
+ vlib_stats_entry_t *ep = sm->directory_vector + index;
+ vlib_cli_output (vm, "%U", format_stat_dir_entry_detail, ep);
+ if (verbose)
+ {
+ ASSERT (sm->heap);
+ vlib_cli_output (vm, "%U", format_clib_mem_heap, sm->heap,
+ 0 /* verbose */);
+ }
+ }
+ else
+ {
+ vlib_cli_output (vm, "Counter '%s' not found.", counter_name);
+ }
+ vec_free (counter_name);
+ return 0;
+ }
/* Lock even as reader, as this command doesn't handle epoch changes */
vlib_stats_segment_lock ();
VLIB_CLI_COMMAND (show_stat_segment_command, static) = {
.path = "show statistics segment",
- .short_help = "show statistics segment [verbose]",
+ .short_help = "show statistics segment [counter-name] [verbose]",
.function = show_stat_segment_command_fn,
};
STAT_DIR_TYPE_NAME_VECTOR,
STAT_DIR_TYPE_EMPTY,
STAT_DIR_TYPE_SYMLINK,
+ STAT_DIR_TYPE_HISTOGRAM_LOG2,
+ STAT_DIR_TYPE_RING_BUFFER,
+ STAT_DIR_TYPE_GAUGE,
} stat_directory_type_t;
typedef struct
#define VLIB_STATS_MAX_NAME_SZ 128
char name[VLIB_STATS_MAX_NAME_SZ];
} vlib_stats_entry_t;
+_Static_assert (sizeof (vlib_stats_entry_t) == 144,
+ "vlib_stats_entry_t size must be 144 bytes");
/*
* Shared header first in the shared memory segment.
break;
case STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE:
+ case STAT_DIR_TYPE_HISTOGRAM_LOG2:
c = e->data;
e->data = 0;
oldheap = clib_mem_set_heap (sm->heap);
clib_mem_set_heap (oldheap);
break;
+ case STAT_DIR_TYPE_RING_BUFFER:
+ {
+ vlib_stats_ring_buffer_t *ring_buffer = e->data;
+ if (ring_buffer)
+ {
+ /* Free ring buffer memory */
+ oldheap = clib_mem_set_heap (sm->heap);
+ clib_mem_free (ring_buffer);
+ clib_mem_set_heap (oldheap);
+ }
+ e->data = 0;
+ }
+ break;
+
case STAT_DIR_TYPE_SCALAR_INDEX:
case STAT_DIR_TYPE_SYMLINK:
break;
va_start (va, fmt);
name = va_format (0, fmt, &va);
va_end (va);
- return vlib_stats_new_entry_internal (STAT_DIR_TYPE_SCALAR_INDEX, name);
+ return vlib_stats_new_entry_internal (STAT_DIR_TYPE_GAUGE, name);
}
void
name);
}
+u32
+vlib_stats_add_histogram_log2 (char *fmt, ...)
+{
+ va_list va;
+ u8 *name;
+
+ va_start (va, fmt);
+ name = va_format (0, fmt, &va);
+ va_end (va);
+ return vlib_stats_new_entry_internal (STAT_DIR_TYPE_HISTOGRAM_LOG2, name);
+}
+
u32
vlib_stats_add_counter_pair_vector (char *fmt, ...)
{
int rv = 1;
oldheap = clib_mem_set_heap (sm->heap);
- if (e->type == STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE)
+ if (e->type == STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE ||
+ e->type == STAT_DIR_TYPE_HISTOGRAM_LOG2)
{
u32 idx0 = va_arg (*va, u32);
u32 idx1 = va_arg (*va, u32);
va_start (va, entry_index);
- if (e->type == STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE)
+ if (e->type == STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE ||
+ e->type == STAT_DIR_TYPE_HISTOGRAM_LOG2)
{
u32 idx0 = va_arg (va, u32);
u32 idx1 = va_arg (va, u32);
return;
}
+
+u32
+vlib_stats_add_ring_buffer (vlib_stats_ring_config_t *config,
+ const void *schema_data, char *fmt, ...)
+{
+ vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+ va_list va;
+ u8 *name;
+ vlib_stats_ring_buffer_t *ring_buffer;
+ u32 entry_index, total_size, metadata_size, data_size, schema_offset;
+
+ va_start (va, fmt);
+ name = va_format (0, fmt, &va);
+ va_end (va);
+
+ entry_index =
+ vlib_stats_new_entry_internal (STAT_DIR_TYPE_RING_BUFFER, name);
+ if (entry_index == CLIB_U32_MAX)
+ return CLIB_U32_MAX;
+ vlib_stats_segment_lock ();
+
+ /* Calculate sizes */
+ metadata_size = config->n_threads * sizeof (vlib_stats_ring_metadata_t);
+ data_size = config->n_threads * config->ring_size * config->entry_size;
+
+ /* Calculate total size with metadata aligned to 64 bytes for atomic
+ * operations */
+ u32 data_offset = sizeof (vlib_stats_ring_buffer_t);
+ u32 metadata_offset = CLIB_CACHE_LINE_ROUND (data_offset + data_size);
+
+ /* Add space for schema if provided */
+ if (config->schema_size > 0)
+ {
+ schema_offset = metadata_offset + metadata_size;
+ total_size = schema_offset + config->schema_size;
+ }
+ else
+ {
+ schema_offset = 0;
+ total_size = metadata_offset + metadata_size;
+ }
+
+ /* Allocate ring buffer structure */
+ void *oldheap = clib_mem_set_heap (sm->heap);
+ ring_buffer = clib_mem_alloc_aligned (total_size, CLIB_CACHE_LINE_BYTES);
+ clib_memset (ring_buffer, 0, total_size);
+ clib_mem_set_heap (oldheap);
+
+ /* Set up offsets */
+ ring_buffer->config = *config;
+ ring_buffer->data_offset = data_offset;
+ ring_buffer->metadata_offset = metadata_offset;
+
+ /* Copy schema data if provided */
+ if (config->schema_size > 0 && schema_data)
+ {
+ void *schema_location = (u8 *) ring_buffer + schema_offset;
+ clib_memcpy_fast (schema_location, schema_data, config->schema_size);
+
+ /* Initialize metadata for all threads with schema info */
+ for (u32 thread_index = 0; thread_index < config->n_threads;
+ thread_index++)
+ {
+ vlib_stats_ring_metadata_t *metadata =
+ (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer +
+ metadata_offset +
+ thread_index *
+ sizeof (
+ vlib_stats_ring_metadata_t));
+ metadata->schema_version = config->schema_version;
+ metadata->schema_offset = schema_offset;
+ metadata->schema_size = config->schema_size;
+ }
+ }
+
+ sm->directory_vector[entry_index].data = ring_buffer;
+
+ vlib_stats_segment_unlock ();
+
+ return entry_index;
+}
+
+void *
+vlib_stats_ring_get_entry (u32 entry_index, u32 thread_index)
+{
+ vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+ vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index);
+ vlib_stats_ring_buffer_t *ring_buffer = e->data;
+ vlib_stats_ring_metadata_t *metadata =
+ (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer +
+ ring_buffer->metadata_offset +
+ thread_index *
+ sizeof (vlib_stats_ring_metadata_t));
+
+ if (thread_index >= ring_buffer->config.n_threads)
+ return 0;
+
+ ASSERT (((uintptr_t) metadata % CLIB_CACHE_LINE_BYTES) == 0);
+
+ // /* Check if ring is full */
+ // if (metadata->count >= ring_buffer->config.ring_size)
+ // return 0;
+
+ /* Calculate entry pointer */
+ u32 offset = thread_index * ring_buffer->config.ring_size *
+ ring_buffer->config.entry_size;
+ offset += metadata->head * ring_buffer->config.entry_size;
+
+ return (u8 *) ring_buffer + ring_buffer->data_offset + offset;
+}
+
+int
+vlib_stats_ring_produce (u32 entry_index, u32 thread_index, void *data)
+{
+ vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+ vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index);
+ vlib_stats_ring_buffer_t *ring_buffer = e->data;
+ vlib_stats_ring_metadata_t *metadata =
+ (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer +
+ ring_buffer->metadata_offset +
+ thread_index *
+ sizeof (vlib_stats_ring_metadata_t));
+
+ if (thread_index >= ring_buffer->config.n_threads)
+ return -1;
+
+ ASSERT (((uintptr_t) metadata % CLIB_CACHE_LINE_BYTES) == 0);
+
+ /* Prefetch next slot for better cache performance */
+ u32 next_head = (metadata->head + 1) % ring_buffer->config.ring_size;
+ /* Note: CLIB_PREFETCH would be used here for optimal cache performance */
+
+ /* Copy data to current head position */
+ void *entry = vlib_stats_ring_get_entry (entry_index, thread_index);
+ if (!entry)
+ return -1;
+
+ clib_memcpy_fast (entry, data, ring_buffer->config.entry_size);
+
+ /* Update metadata - always advance head and increment sequence */
+ metadata->head = next_head;
+ __atomic_store_n (&metadata->sequence, metadata->sequence + 1,
+ __ATOMIC_RELEASE);
+
+ return 0;
+}
+
+int
+vlib_stats_ring_consume (u32 entry_index, u32 thread_index, void *data,
+ u64 *sequence_out)
+{
+ vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+ vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index);
+ vlib_stats_ring_buffer_t *ring_buffer = e->data;
+ vlib_stats_ring_metadata_t *metadata =
+ (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer +
+ ring_buffer->metadata_offset +
+ thread_index *
+ sizeof (vlib_stats_ring_metadata_t));
+
+ if (thread_index >= ring_buffer->config.n_threads)
+ return -1;
+
+ ASSERT (((uintptr_t) metadata % CLIB_CACHE_LINE_BYTES) == 0);
+
+ /* Note: This function is for testing/debugging only since the reader
+ is read-only and manages its own state. The writer doesn't track
+ reader position. */
+
+ /* Return sequence number if requested */
+ if (sequence_out)
+ *sequence_out = __atomic_load_n (&metadata->sequence, __ATOMIC_ACQUIRE);
+
+ return 0;
+}
+
+/* Direct serialization APIs - avoid extra copy */
+
+void *
+vlib_stats_ring_reserve_slot (u32 entry_index, u32 thread_index)
+{
+ vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+ vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index);
+ vlib_stats_ring_buffer_t *ring_buffer = e->data;
+ vlib_stats_ring_metadata_t *metadata =
+ (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer +
+ ring_buffer->metadata_offset +
+ thread_index *
+ sizeof (vlib_stats_ring_metadata_t));
+
+ if (thread_index >= ring_buffer->config.n_threads)
+ return 0;
+
+ ASSERT (((uintptr_t) metadata % CLIB_CACHE_LINE_BYTES) == 0);
+
+ /* Calculate entry pointer */
+ u32 offset = thread_index * ring_buffer->config.ring_size *
+ ring_buffer->config.entry_size;
+ offset += metadata->head * ring_buffer->config.entry_size;
+
+ return (u8 *) ring_buffer + ring_buffer->data_offset + offset;
+}
+
+int
+vlib_stats_ring_commit_slot (u32 entry_index, u32 thread_index)
+{
+ vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+ vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index);
+ vlib_stats_ring_buffer_t *ring_buffer = e->data;
+ vlib_stats_ring_metadata_t *metadata =
+ (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer +
+ ring_buffer->metadata_offset +
+ thread_index *
+ sizeof (vlib_stats_ring_metadata_t));
+ ASSERT (((uintptr_t) metadata % CLIB_CACHE_LINE_BYTES) == 0);
+ if (thread_index >= ring_buffer->config.n_threads)
+ return -1;
+
+ /* Update metadata - always advance head and increment sequence */
+ metadata->head = (metadata->head + 1) % ring_buffer->config.ring_size;
+ __atomic_store_n (&metadata->sequence, metadata->sequence + 1,
+ __ATOMIC_RELEASE);
+
+ return 0;
+}
+
+int
+vlib_stats_ring_abort_slot (u32 entry_index, u32 thread_index)
+{
+ /* For abort, we don't need to do anything since we haven't updated the
+ metadata yet. The slot will be reused on the next reserve call. */
+ return 0;
+}
+
+u32
+vlib_stats_ring_get_slot_size (u32 entry_index)
+{
+ vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+ vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index);
+ vlib_stats_ring_buffer_t *ring_buffer = e->data;
+
+ return ring_buffer->config.entry_size;
+}
+
+u32
+vlib_stats_ring_get_count (u32 entry_index, u32 thread_index)
+{
+ /* Note: Since the writer doesn't track reader state, we can't determine
+ the actual count of unread entries. This function is kept for API
+ compatibility but always returns 0. */
+ return 0;
+}
+
+u32
+vlib_stats_ring_get_free_space (u32 entry_index, u32 thread_index)
+{
+ /* Note: Since the writer doesn't track reader state, we can't determine
+ the actual free space. This function is kept for API compatibility
+ but always returns the full ring size. */
+ vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+ vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index);
+ vlib_stats_ring_buffer_t *ring_buffer = e->data;
+
+ return ring_buffer->config.ring_size;
+}
+
+/* Schema retrieval from ring buffers */
+
+int
+vlib_stats_ring_get_schema (u32 entry_index, u32 thread_index,
+ void *schema_data, u32 *schema_size,
+ u32 *schema_version)
+{
+ vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+ vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index);
+ vlib_stats_ring_buffer_t *ring_buffer = e->data;
+ vlib_stats_ring_metadata_t *metadata =
+ (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer +
+ ring_buffer->metadata_offset +
+ thread_index *
+ sizeof (vlib_stats_ring_metadata_t));
+
+ if (thread_index >= ring_buffer->config.n_threads)
+ return -1;
+
+ ASSERT (((uintptr_t) metadata % CLIB_CACHE_LINE_BYTES) == 0);
+
+ /* Check if schema exists */
+ if (metadata->schema_size == 0)
+ return -1;
+
+ /* Return schema information */
+ if (schema_version)
+ *schema_version = metadata->schema_version;
+ if (schema_size)
+ *schema_size = metadata->schema_size;
+ if (schema_data)
+ {
+ void *schema_location = (u8 *) ring_buffer + ring_buffer->data_offset +
+ metadata->schema_offset;
+ clib_memcpy_fast (schema_data, schema_location, metadata->schema_size);
+ }
+
+ return 0;
+}
format_function_t format_vlib_stats_symlink;
+/* log2 histogram */
+u32 vlib_stats_add_histogram_log2 (char *fmt, ...);
+void vlib_stats_set_histogram_log2 (u32 entry_index, u32 thread_index,
+ const u64 *bin_counts);
+
+/* Ring buffer configuration */
+typedef struct
+{
+ u32 entry_size; /* Size of each entry in bytes */
+ u32 ring_size; /* Number of entries in the ring */
+ u32 n_threads; /* Number of threads (one ring per thread) */
+ u32 schema_size; /* Size of schema data in bytes (0 if no schema) */
+ u32 schema_version; /* Schema version number */
+} __clib_packed vlib_stats_ring_config_t;
+
+/* Ring buffer metadata (per thread) */
+typedef struct
+{
+ CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
+ u32 head; /* Producer position */
+ u32 schema_version; /* Schema version for this ring */
+ u64 sequence; /* Sequence number for overwrite detection */
+ u32 schema_offset; /* Offset to schema data within ring buffer */
+ u32 schema_size; /* Size of schema data in bytes */
+ u8 _pad1[CLIB_CACHE_LINE_BYTES - 24]; /* Pad to cache line size */
+} vlib_stats_ring_metadata_t;
+
+/* Ring buffer entry in stats directory */
+typedef struct
+{
+ vlib_stats_ring_config_t config;
+ u32 metadata_offset; /* Offset to metadata array (64-byte aligned) */
+ u32 data_offset; /* Offset to ring buffer data */
+} __clib_packed vlib_stats_ring_buffer_t;
+
+/* Ring buffer */
+u32 vlib_stats_add_ring_buffer (vlib_stats_ring_config_t *config,
+ const void *schema_data, char *fmt, ...);
+void *vlib_stats_ring_get_entry (u32 entry_index, u32 thread_index);
+int vlib_stats_ring_produce (u32 entry_index, u32 thread_index, void *data);
+int vlib_stats_ring_consume (u32 entry_index, u32 thread_index, void *data,
+ u64 *sequence_out);
+u32 vlib_stats_ring_get_count (u32 entry_index, u32 thread_index);
+u32 vlib_stats_ring_get_free_space (u32 entry_index, u32 thread_index);
+
+/* Direct serialization APIs - avoid extra copy */
+void *vlib_stats_ring_reserve_slot (u32 entry_index, u32 thread_index);
+int vlib_stats_ring_commit_slot (u32 entry_index, u32 thread_index);
+int vlib_stats_ring_abort_slot (u32 entry_index, u32 thread_index);
+u32 vlib_stats_ring_get_slot_size (u32 entry_index);
+
+/* Schema retrieval from ring buffers */
+int vlib_stats_ring_get_schema (u32 entry_index, u32 thread_index,
+ void *schema_data, u32 *schema_size,
+ u32 *schema_version);
+
#endif
switch (ep->type)
{
case STAT_DIR_TYPE_SCALAR_INDEX:
+ case STAT_DIR_TYPE_GAUGE:
result.scalar_value = ep->value;
break;
}
case STAT_DIR_TYPE_EMPTY:
+ case STAT_DIR_TYPE_RING_BUFFER:
+ break;
+
+ case STAT_DIR_TYPE_HISTOGRAM_LOG2:
+ {
+ uint64_t **bins = stat_segment_adjust (sm, ep->data);
+ result.log2_histogram_bins = stat_vec_dup (sm, bins);
+ for (i = 0; i < vec_len (bins); i++)
+ {
+ uint64_t *thread_bins = stat_segment_adjust (sm, bins[i]);
+ result.log2_histogram_bins[i] = stat_vec_dup (sm, thread_bins);
+ }
+ }
break;
default:
vec_free (res[i].name_vector);
break;
case STAT_DIR_TYPE_SCALAR_INDEX:
+ case STAT_DIR_TYPE_GAUGE:
case STAT_DIR_TYPE_EMPTY:
+ case STAT_DIR_TYPE_RING_BUFFER:
+ break;
+ case STAT_DIR_TYPE_HISTOGRAM_LOG2:
+ for (j = 0; j < vec_len (res[i].log2_histogram_bins); j++)
+ vec_free (res[i].log2_histogram_bins[j]);
+ vec_free (res[i].log2_histogram_bins);
break;
default:
assert (0);
counter_t **simple_counter_vec;
vlib_counter_t **combined_counter_vec;
uint8_t **name_vector;
+ uint64_t **log2_histogram_bins; // [thread][bin] for histogram
};
} stat_segment_data_t;
import time
import unittest
import re
+import asyncio
+import sys
def recv_fd(sock):
result[cnt] = self.__getitem__(cnt, blocking)
return result
+ def get_ring_buffer(self, name, blocking=True):
+ """Get a ring buffer by name"""
+ if not self.connected:
+ self.connect()
+
+ while True:
+ try:
+ if self.last_epoch != self.epoch:
+ self.refresh(blocking)
+ with self.lock:
+ entry = self.directory[name]
+ if entry.type == 8: # STAT_DIR_TYPE_RING_BUFFER
+ return entry.get_counter(self)
+ else:
+ raise ValueError(f"'{name}' is not a ring buffer")
+ except IOError:
+ if not blocking:
+ raise
+
+ def poll_ring_buffer(self, name, thread_index=0, timeout=None, callback=None):
+ """Convenience method to poll a ring buffer by name"""
+ ring_buffer = self.get_ring_buffer(name)
+ return ring_buffer.poll_for_data(thread_index, timeout, callback)
+
+ async def poll_ring_buffer_async(
+ self, name, thread_index=0, timeout=None, callback=None
+ ):
+ """Async convenience method to poll a ring buffer by name"""
+ ring_buffer = self.get_ring_buffer(name)
+ return await ring_buffer.poll_for_data_async(thread_index, timeout, callback)
+
+ def get_ring_buffer_schema(self, name, thread_index=0):
+ """Get schema from a ring buffer by name"""
+ ring_buffer = self.get_ring_buffer(name)
+ return ring_buffer.get_schema(thread_index)
+
+ def get_ring_buffer_schema_string(self, name, thread_index=0):
+ """Get schema as string from a ring buffer by name"""
+ ring_buffer = self.get_ring_buffer(name)
+ return ring_buffer.get_schema_string(thread_index)
+
class StatsLock:
"""Stat segment optimistic locking"""
return sum(self)
+# Add a helper class for histogram log2
+class StatsHistogramLog2:
+ def __init__(self, bins, min_exp):
+ self.bins = bins # list of lists: [thread][bin]
+ self.min_exp = min_exp
+
+ def sum(self):
+ return sum(sum(thread_bins) for thread_bins in self.bins)
+
+ def thread_count(self):
+ return len(self.bins)
+
+ def bin_count(self):
+ return max((len(b) for b in self.bins), default=0)
+
+ def __getitem__(self, idx):
+ return self.bins[idx]
+
+ def __repr__(self):
+ return f"StatsHistogramLog2(min_exp={self.min_exp}, bins={self.bins})"
+
+
class StatsEntry:
"""An individual stats entry"""
self.function = self.name
elif stattype == 6:
self.function = self.symlink
+ elif stattype == 7: # STAT_DIR_TYPE_HISTOGRAM_LOG2
+ self.function = self.histogram_log2
+ elif stattype == 8: # STAT_DIR_TYPE_RING_BUFFER
+ self.function = self.ring_buffer
+ elif stattype == 9: # STAT_DIR_TYPE_GAUGE
+ self.function = self.scalar
else:
self.function = self.illegal
name = stats.directory_by_idx[index1]
return stats[name][:, index2]
+ def ring_buffer(self, stats):
+ """Ring buffer counter"""
+ return StatsRingBuffer(stats, self.value)
+
+ def histogram_log2(self, stats):
+ """Histogram log2 counter (STAT_DIR_TYPE_HISTOGRAM_LOG2)"""
+ # The value is a pointer to a vector of pointers (per-thread), each pointing to a vector of uint64_t bins
+ threads_ptr = self.value
+ thread_vec = StatsVector(stats, threads_ptr, "P")
+ all_bins = []
+ min_exp = 0
+ for thread_ptr_tuple in thread_vec:
+ bins_ptr = thread_ptr_tuple[0]
+ if bins_ptr:
+ bins_vec = StatsVector(stats, bins_ptr, "Q")
+ bins = [v[0] for v in bins_vec]
+ if bins:
+ min_exp = bins[0]
+ all_bins.append(bins[1:])
+ else:
+ all_bins.append([])
+ else:
+ all_bins.append([])
+ return StatsHistogramLog2(all_bins, min_exp)
+
def get_counter(self, stats):
"""Return a list of counters"""
if stats:
return self.function(stats)
+class StatsRingBuffer:
+ """Ring buffer for high-performance data streaming"""
+
+ def __init__(self, stats, ptr):
+ self.stats = stats
+ self.ring_buffer_ptr = ptr
+ self.config = self._get_config()
+ self.metadata_ptr = self._get_metadata_ptr()
+ self.data_ptr = self._get_data_ptr()
+ # Track local tail and last sequence for each thread
+ # Note: Since writer doesn't track reader state, we initialize local_tails to 0
+ self.local_tails = [0] * self.config["n_threads"]
+ self.last_sequences = [None] * self.config["n_threads"]
+
+ def _get_config(self):
+ """Get ring buffer configuration from shared memory"""
+ config_offset = self.ring_buffer_ptr - self.stats.base
+ # Read the full config structure: entry_size, ring_size, n_threads, schema_size, schema_version
+ config_data = self.stats.statseg[config_offset : config_offset + 20]
+ entry_size, ring_size, n_threads, schema_size, schema_version = Struct(
+ "=IIIII"
+ ).unpack(config_data)
+ return {
+ "entry_size": entry_size,
+ "ring_size": ring_size,
+ "n_threads": n_threads,
+ "schema_size": schema_size,
+ "schema_version": schema_version,
+ }
+
+ def _get_metadata_ptr(self):
+ """Get pointer to metadata array using offset"""
+ config_offset = self.ring_buffer_ptr - self.stats.base
+ # Read metadata_offset from the structure (at offset 20)
+ metadata_offset_data = self.stats.statseg[
+ config_offset + 20 : config_offset + 24
+ ]
+ metadata_offset = Struct("=I").unpack(metadata_offset_data)[0]
+ return config_offset + metadata_offset
+
+ def _get_data_ptr(self):
+ """Get pointer to ring buffer data using offset"""
+ config_offset = self.ring_buffer_ptr - self.stats.base
+ # Read data_offset from the structure (at offset 24)
+ data_offset_data = self.stats.statseg[config_offset + 24 : config_offset + 28]
+ data_offset = Struct("=I").unpack(data_offset_data)[0]
+ return config_offset + data_offset
+
+ def _get_thread_metadata(self, thread_index):
+ """Get metadata for a specific thread, including sequence number and schema info"""
+ if thread_index >= self.config["n_threads"]:
+ raise IndexError(f"Thread index {thread_index} out of range")
+
+ # Metadata struct: head, schema_version, sequence, schema_offset, schema_size, padding
+ metadata_offset = self.metadata_ptr + (
+ thread_index * 64 # CLIB_CACHE_LINE_BYTES, typically 64
+ )
+ metadatafmt_struct = Struct(
+ "=IIQII"
+ ) # head, schema_version, sequence, schema_offset, schema_size
+ metadata_data = self.stats.statseg[
+ metadata_offset : metadata_offset + metadatafmt_struct.size
+ ]
+ head, schema_version, sequence, schema_offset, schema_size = (
+ metadatafmt_struct.unpack(metadata_data)
+ )
+ return {
+ "head": head,
+ "schema_version": schema_version,
+ "sequence": sequence,
+ "schema_offset": schema_offset,
+ "schema_size": schema_size,
+ }
+
+ def get_schema(self, thread_index=0):
+ """Get schema data from ring buffer for a specific thread"""
+ metadata = self._get_thread_metadata(thread_index)
+
+ # Check if schema exists
+ if metadata["schema_size"] == 0:
+ return None, 0, 0
+
+ # Calculate schema location
+ config_offset = self.ring_buffer_ptr - self.stats.base
+ schema_location = config_offset + metadata["schema_offset"]
+
+ # Read schema data
+ schema_data = self.stats.statseg[
+ schema_location : schema_location + metadata["schema_size"]
+ ]
+
+ return schema_data, metadata["schema_size"], metadata["schema_version"]
+
+ def get_schema_string(self, thread_index=0):
+ """Get schema as a string (for text-based schemas like CDDL)"""
+ schema_data, schema_size, schema_version = self.get_schema(thread_index)
+
+ if schema_data is None:
+ return None, 0, 0
+
+ try:
+ # Try to decode as UTF-8 string
+ schema_string = schema_data.decode("utf-8")
+ return schema_string, schema_size, schema_version
+ except UnicodeDecodeError:
+ # If it's not a valid UTF-8 string, return as bytes
+ return schema_data, schema_size, schema_version
+
+ def get_count(self, thread_index=0):
+ """Get current count of entries in ring buffer for a thread"""
+ # Note: Since the writer doesn't track reader state, we can't determine
+ # the actual count. This method is kept for API compatibility.
+ return 0
+
+ def is_empty(self, thread_index=0):
+ """Check if ring buffer is empty for a thread"""
+ # Note: Since the writer doesn't track reader state, we can't determine
+ # if the ring is empty. This method is kept for API compatibility.
+ return True
+
+ def is_full(self, thread_index=0):
+ """Check if ring buffer is full for a thread"""
+ # Note: Since the writer doesn't track reader state, we can't determine
+ # if the ring is full. This method is kept for API compatibility.
+ return False
+
+ def consume_data(self, thread_index=0, max_entries=None):
+ """Consume data from ring buffer for a thread (read-only), with sequence check"""
+ # Read metadata atomically to get consistent snapshot
+ metadata = self._get_thread_metadata(thread_index)
+ local_tail = self.local_tails[thread_index]
+ last_sequence = self.last_sequences[thread_index]
+ sequence = metadata["sequence"]
+ ring_size = self.config["ring_size"]
+
+ # Overwrite detection: did the producer lap us?
+ if last_sequence is not None:
+ delta = (sequence - last_sequence) % (1 << 64)
+ if delta > ring_size:
+ print(
+ f"[WARN] Ring buffer overwrite detected on thread {thread_index}: "
+ f"sequence jumped from {last_sequence} to {sequence} (delta={delta}, ring_size={ring_size})"
+ )
+ # Resync local_tail to a reasonable position
+ local_tail = (metadata["head"] - ring_size) % ring_size
+
+ # If the sequence hasn't changed, nothing new to read
+ if last_sequence == sequence:
+ return []
+
+ # Calculate how many new entries are available
+ if last_sequence is None:
+ # First time reading - calculate how many entries are available
+ available = min(sequence, ring_size)
+ # Calculate starting position: (head - available) % ring_size
+ # This gives us the oldest entry that's still available
+ local_tail = (metadata["head"] - available) % ring_size
+ else:
+ available = (sequence - last_sequence) % (1 << 64)
+ if available > ring_size:
+ available = ring_size # Cap at ring size
+
+ if available == 0:
+ self.last_sequences[thread_index] = sequence
+ return []
+
+ if max_entries is None:
+ max_entries = available
+ else:
+ max_entries = min(max_entries, available)
+
+ consumed_data = []
+ entry_size = self.config["entry_size"]
+
+ # Calculate data offset for this thread
+ thread_data_offset = self.data_ptr + (
+ thread_index * self.config["ring_size"] * entry_size
+ )
+
+ # Read data with retry logic for potential contention
+ max_retries = 3
+ for retry in range(max_retries):
+ try:
+ for i in range(max_entries):
+ entry_offset = thread_data_offset + (local_tail * entry_size)
+ entry_data = self.stats.statseg[
+ entry_offset : entry_offset + entry_size
+ ]
+ consumed_data.append(entry_data)
+ local_tail = (local_tail + 1) % self.config["ring_size"]
+
+ # Verify sequence number hasn't changed during our read
+ current_metadata = self._get_thread_metadata(thread_index)
+ if current_metadata["sequence"] == sequence:
+ # Success - update local state
+ self.local_tails[thread_index] = local_tail
+ # Update last_sequence based on how many entries we read
+ if last_sequence is None:
+ # First time reading - update to the sequence number of the last entry we read
+ self.last_sequences[thread_index] = (
+ sequence - available + len(consumed_data)
+ )
+ else:
+ # Subsequent reading - update by the number of entries we read
+ self.last_sequences[thread_index] = last_sequence + len(
+ consumed_data
+ )
+ return consumed_data
+ else:
+ # Sequence changed during read, retry
+ if retry < max_retries - 1:
+ # Re-read metadata and recalculate
+ metadata = current_metadata
+ sequence = metadata["sequence"]
+ if last_sequence is None:
+ available = min(sequence, ring_size)
+ # Calculate starting position: (head - available) % ring_size
+ local_tail = (metadata["head"] - available) % ring_size
+ else:
+ available = (sequence - last_sequence) % (1 << 64)
+ if available > ring_size:
+ available = ring_size
+ if available == 0:
+ self.last_sequences[thread_index] = sequence
+ return []
+ max_entries = min(max_entries, available)
+ consumed_data = []
+ local_tail = self.local_tails[thread_index]
+ continue
+ else:
+ # Max retries reached, return what we have
+ print(f"[WARN] Max retries reached, returning partial data")
+ self.local_tails[thread_index] = local_tail
+ self.last_sequences[thread_index] = sequence
+ return consumed_data
+
+ except Exception as e:
+ print(f"[ERROR] Exception during data read: {e}")
+ if retry < max_retries - 1:
+ continue
+ else:
+ return consumed_data
+
+ return consumed_data
+
+ def consume_data_batch(self, thread_index=0, max_entries=None, prefetch=True):
+ """Consume data from ring buffer in batches for better performance"""
+ # Read metadata atomically to get consistent snapshot
+ metadata = self._get_thread_metadata(thread_index)
+ local_tail = self.local_tails[thread_index]
+ last_sequence = self.last_sequences[thread_index]
+ sequence = metadata["sequence"]
+ ring_size = self.config["ring_size"]
+
+ # Overwrite detection: did the producer lap us?
+ if last_sequence is not None:
+ delta = (sequence - last_sequence) % (1 << 64)
+ if delta > ring_size:
+ print(
+ f"[WARN] Ring buffer overwrite detected on thread {thread_index}: "
+ f"sequence jumped from {last_sequence} to {sequence} (delta={delta}, ring_size={ring_size})"
+ )
+ # Resync local_tail to a reasonable position
+ local_tail = (metadata["head"] - ring_size) % ring_size
+
+ # If the sequence hasn't changed, nothing new to read
+ if last_sequence == sequence:
+ return []
+
+ # Calculate how many new entries are available
+ if last_sequence is None:
+ # First time reading - calculate how many entries are available
+ available = min(sequence, ring_size)
+ # Calculate starting position: (head - available) % ring_size
+ # This gives us the oldest entry that's still available
+ local_tail = (metadata["head"] - available) % ring_size
+ else:
+ available = (sequence - last_sequence) % (1 << 64)
+ if available > ring_size:
+ available = ring_size # Cap at ring size
+
+ if available == 0:
+ self.last_sequences[thread_index] = sequence
+ return []
+
+ if max_entries is None:
+ max_entries = available
+ else:
+ max_entries = min(max_entries, available)
+
+ consumed_data = []
+ entry_size = self.config["entry_size"]
+
+ # Calculate data offset for this thread
+ thread_data_offset = self.data_ptr + (
+ thread_index * self.config["ring_size"] * entry_size
+ )
+
+ # Prefetch next few entries for better cache performance
+ if prefetch and max_entries > 1:
+ next_tail = (local_tail + 1) % ring_size
+ next_offset = thread_data_offset + (next_tail * entry_size)
+ # Note: Python doesn't have direct prefetch, but we can optimize memory access patterns
+ # by reading data in larger chunks when possible
+
+ # Read data with retry logic for potential contention
+ max_retries = 3
+ for retry in range(max_retries):
+ try:
+ # Read data in larger chunks when possible for better performance
+ chunk_size = min(max_entries, 16) # Read up to 16 entries at once
+ for chunk_start in range(0, max_entries, chunk_size):
+ chunk_end = min(chunk_start + chunk_size, max_entries)
+
+ for i in range(chunk_start, chunk_end):
+ entry_offset = thread_data_offset + (local_tail * entry_size)
+ entry_data = self.stats.statseg[
+ entry_offset : entry_offset + entry_size
+ ]
+ consumed_data.append(entry_data)
+ local_tail = (local_tail + 1) % self.config["ring_size"]
+
+ # Verify sequence number hasn't changed during our read
+ current_metadata = self._get_thread_metadata(thread_index)
+ if current_metadata["sequence"] == sequence:
+ # Success - update local state
+ self.local_tails[thread_index] = local_tail
+ # Update last_sequence based on how many entries we read
+ if last_sequence is None:
+ # First time reading - update to the sequence number of the last entry we read
+ self.last_sequences[thread_index] = (
+ sequence - available + len(consumed_data)
+ )
+ else:
+ # Subsequent reading - update by the number of entries we read
+ self.last_sequences[thread_index] = last_sequence + len(
+ consumed_data
+ )
+ return consumed_data
+ else:
+ # Sequence changed during read, retry
+ if retry < max_retries - 1:
+ # Re-read metadata and recalculate
+ metadata = current_metadata
+ sequence = metadata["sequence"]
+ if last_sequence is None:
+ available = min(sequence, ring_size)
+ local_tail = (metadata["head"] - available) % ring_size
+ else:
+ available = (sequence - last_sequence) % (1 << 64)
+ if available > ring_size:
+ available = ring_size
+ if available == 0:
+ self.last_sequences[thread_index] = sequence
+ return []
+ max_entries = min(max_entries, available)
+ consumed_data = []
+ local_tail = self.local_tails[thread_index]
+ continue
+ else:
+ # Max retries reached, return what we have
+ print(f"[WARN] Max retries reached, returning partial data")
+ self.local_tails[thread_index] = local_tail
+ self.last_sequences[thread_index] = sequence
+ return consumed_data
+
+ except Exception as e:
+ print(f"[ERROR] Exception during data read: {e}")
+ if retry < max_retries - 1:
+ continue
+ else:
+ return consumed_data
+
+ return consumed_data
+
+ def poll_for_data(self, thread_index=0, timeout=None, callback=None):
+ """Poll for new data in ring buffer"""
+ start_time = time.time()
+
+ while True:
+ data = self.consume_data(thread_index)
+ if data:
+ if callback:
+ for entry_data in data:
+ try:
+ callback(entry_data)
+ except Exception as e:
+ print(f"Callback error: {e}")
+ else:
+ return data
+
+ if timeout and (time.time() - start_time) > timeout:
+ return []
+ time.sleep(0.000001) # 1μs polling interval
+
+ async def poll_for_data_async(self, thread_index=0, timeout=None, callback=None):
+ """Async version of poll_for_data"""
+
+ start_time = asyncio.get_event_loop().time()
+
+ while True:
+ metadata = self._get_thread_metadata(thread_index)
+ local_tail = self.local_tails[thread_index]
+
+ # Check if new data is available using local tail
+ if metadata["head"] != local_tail:
+ # Calculate how many new entries
+ if metadata["head"] > local_tail:
+ new_entries = metadata["head"] - local_tail
+ else:
+ new_entries = (
+ self.config["ring_size"] - local_tail + metadata["head"]
+ )
+
+ data = self.consume_data(thread_index, new_entries)
+
+ if callback:
+ for entry_data in data:
+ await callback(entry_data)
+ else:
+ return data
+
+ if timeout and (asyncio.get_event_loop().time() - start_time) > timeout:
+ return []
+
+ # Yield control to asyncio
+ await asyncio.sleep(0.000001) # 1μs polling interval
+
+ def get_config(self):
+ """Get ring buffer configuration"""
+ return self.config.copy()
+
+ def __repr__(self):
+ schema_info = ""
+ if self.config["schema_size"] > 0:
+ schema_info = f", schema_size={self.config['schema_size']}, schema_version={self.config['schema_version']}"
+
+ return (
+ f"StatsRingBuffer(entry_size={self.config['entry_size']}, "
+ f"ring_size={self.config['ring_size']}, n_threads={self.config['n_threads']}{schema_info})"
+ )
+
+
class TestStats(unittest.TestCase):
"""Basic statseg tests"""
print("/sys/nodes/unix-epoll-input", self.stat["/nodes/unix-epoll-input/calls"])
+class TestRingBuffer(unittest.TestCase):
+ """Ring buffer specific tests"""
+
+ def setUp(self):
+ """Connect to statseg and find test ring buffer"""
+ self.stat = VPPStats()
+ self.stat.connect()
+
+ # Look for test ring buffer created by VPP CLI command
+ try:
+ self.ring_buffer = self.stat.get_ring_buffer("/test/ring-buffer")
+ self.ring_buffer_available = True
+ except (KeyError, ValueError):
+ print(
+ "Test ring buffer not found. Run 'test stats ring-buffer-gen test_ring 100 1000 16' in VPP first."
+ )
+ self.ring_buffer_available = False
+
+ def tearDown(self):
+ """Disconnect from statseg"""
+ self.stat.disconnect()
+
+ def test_ring_buffer_config(self):
+ """Test ring buffer configuration access"""
+ if not self.ring_buffer_available:
+ self.skipTest("Ring buffer not available")
+
+ config = self.ring_buffer.get_config()
+ self.assertIsInstance(config, dict)
+ self.assertIn("entry_size", config)
+ self.assertIn("ring_size", config)
+ self.assertIn("n_threads", config)
+
+ print(f"Ring buffer config: {config}")
+
+ # Verify reasonable values
+ self.assertGreater(config["entry_size"], 0)
+ self.assertGreater(config["ring_size"], 0)
+ self.assertGreater(config["n_threads"], 0)
+
+ def test_ring_buffer_metadata_access(self):
+ """Test ring buffer metadata access"""
+ if not self.ring_buffer_available:
+ self.skipTest("Ring buffer not available")
+
+ # Test metadata access for thread 0
+ metadata = self.ring_buffer._get_thread_metadata(0)
+ self.assertIsInstance(metadata, dict)
+ self.assertIn("head", metadata)
+ self.assertIn("sequence", metadata)
+
+ print(f"Thread 0 metadata: {metadata}")
+
+ # Verify metadata values are reasonable
+ self.assertIsInstance(metadata["head"], int)
+ self.assertIsInstance(metadata["sequence"], int)
+ self.assertGreaterEqual(metadata["head"], 0)
+ self.assertGreaterEqual(metadata["sequence"], 0)
+
+ def test_ring_buffer_consume_empty(self):
+ """Test consuming from empty ring buffer"""
+ if not self.ring_buffer_available:
+ self.skipTest("Ring buffer not available")
+
+ # Consume from empty buffer
+ data = self.ring_buffer.consume_data(thread_index=0)
+ self.assertEqual(data, [])
+
+ # Test with max_entries
+ data = self.ring_buffer.consume_data(thread_index=0, max_entries=10)
+ self.assertEqual(data, [])
+
+ def test_ring_buffer_consume_batch_empty(self):
+ """Test batch consuming from empty ring buffer"""
+ if not self.ring_buffer_available:
+ self.skipTest("Ring buffer not available")
+
+ # Consume from empty buffer
+ data = self.ring_buffer.consume_data_batch(thread_index=0)
+ self.assertEqual(data, [])
+
+ # Test with max_entries
+ data = self.ring_buffer.consume_data_batch(thread_index=0, max_entries=10)
+ self.assertEqual(data, [])
+
+ def test_ring_buffer_poll_empty(self):
+ """Test polling empty ring buffer"""
+ if not self.ring_buffer_available:
+ self.skipTest("Ring buffer not available")
+
+ # Poll with short timeout
+ data = self.ring_buffer.poll_for_data(thread_index=0, timeout=0.1)
+ self.assertEqual(data, [])
+
+ def test_ring_buffer_poll_with_callback(self):
+ """Test polling with callback function"""
+ if not self.ring_buffer_available:
+ self.skipTest("Ring buffer not available")
+
+ collected_data = []
+
+ def callback(data):
+ collected_data.append(data)
+
+ # Poll with callback and short timeout
+ result = self.ring_buffer.poll_for_data(
+ thread_index=0, timeout=0.1, callback=callback
+ )
+ self.assertEqual(result, [])
+ self.assertEqual(collected_data, [])
+
+ def test_ring_buffer_invalid_thread(self):
+ """Test ring buffer with invalid thread index"""
+ if not self.ring_buffer_available:
+ self.skipTest("Ring buffer not available")
+
+ config = self.ring_buffer.get_config()
+ invalid_thread = config["n_threads"] + 1
+
+ # Test metadata access with invalid thread
+ with self.assertRaises(IndexError):
+ self.ring_buffer._get_thread_metadata(invalid_thread)
+
+ # Test consume with invalid thread
+ data = self.ring_buffer.consume_data(thread_index=invalid_thread)
+ self.assertEqual(data, [])
+
+ def test_ring_buffer_api_compatibility(self):
+ """Test API compatibility methods"""
+ if not self.ring_buffer_available:
+ self.skipTest("Ring buffer not available")
+
+ # Test compatibility methods (these return simplified values)
+ count = self.ring_buffer.get_count(thread_index=0)
+ self.assertEqual(count, 0)
+
+ is_empty = self.ring_buffer.is_empty(thread_index=0)
+ self.assertTrue(is_empty)
+
+ is_full = self.ring_buffer.is_full(thread_index=0)
+ self.assertFalse(is_full)
+
+ def test_ring_buffer_repr(self):
+ """Test ring buffer string representation"""
+ if not self.ring_buffer_available:
+ self.skipTest("Ring buffer not available")
+
+ repr_str = repr(self.ring_buffer)
+ self.assertIsInstance(repr_str, str)
+ self.assertIn("StatsRingBuffer", repr_str)
+ self.assertIn("entry_size", repr_str)
+ self.assertIn("ring_size", repr_str)
+ self.assertIn("n_threads", repr_str)
+
+ print(f"Ring buffer repr: {repr_str}")
+
+ def test_ring_buffer_multiple_threads(self):
+ """Test ring buffer access across multiple threads"""
+ if not self.ring_buffer_available:
+ self.skipTest("Ring buffer not available")
+
+ config = self.ring_buffer.get_config()
+
+ # Test all available threads
+ for thread_index in range(config["n_threads"]):
+ metadata = self.ring_buffer._get_thread_metadata(thread_index)
+ self.assertIsInstance(metadata, dict)
+ self.assertIn("head", metadata)
+ self.assertIn("sequence", metadata)
+
+ data = self.ring_buffer.consume_data(thread_index=thread_index)
+ self.assertIsInstance(data, list)
+
+ def test_ring_buffer_sequence_consistency(self):
+ """Test sequence number consistency across reads"""
+ if not self.ring_buffer_available:
+ self.skipTest("Ring buffer not available")
+
+ # Read metadata multiple times to check consistency
+ metadata1 = self.ring_buffer._get_thread_metadata(0)
+ metadata2 = self.ring_buffer._get_thread_metadata(0)
+
+ # Sequence numbers should be consistent (same or increasing)
+ self.assertGreaterEqual(metadata2["sequence"], metadata1["sequence"])
+
+ def test_ring_buffer_error_handling(self):
+ """Test ring buffer error handling"""
+ if not self.ring_buffer_available:
+ self.skipTest("Ring buffer not available")
+
+ # Test with invalid parameters
+ data = self.ring_buffer.consume_data(thread_index=0, max_entries=0)
+ self.assertEqual(data, [])
+
+ data = self.ring_buffer.consume_data(thread_index=0, max_entries=-1)
+ self.assertEqual(data, [])
+
+ def test_ring_buffer_batch_vs_individual(self):
+ """Test that batch and individual consume return same results"""
+ if not self.ring_buffer_available:
+ self.skipTest("Ring buffer not available")
+
+ # Reset local state to ensure fair comparison
+ self.ring_buffer.local_tails[0] = 0
+ self.ring_buffer.last_sequences[0] = None
+
+ # Consume with individual method
+ individual_data = self.ring_buffer.consume_data(thread_index=0, max_entries=5)
+
+ # Reset local state
+ self.ring_buffer.local_tails[0] = 0
+ self.ring_buffer.last_sequences[0] = None
+
+ # Consume with batch method
+ batch_data = self.ring_buffer.consume_data_batch(thread_index=0, max_entries=5)
+
+ # Results should be the same
+ self.assertEqual(individual_data, batch_data)
+
+ def test_ring_buffer_prefetch_parameter(self):
+ """Test prefetch parameter in batch consume"""
+ if not self.ring_buffer_available:
+ self.skipTest("Ring buffer not available")
+
+ # Test with prefetch enabled
+ data1 = self.ring_buffer.consume_data_batch(thread_index=0, prefetch=True)
+
+ # Test with prefetch disabled
+ data2 = self.ring_buffer.consume_data_batch(thread_index=0, prefetch=False)
+
+ # Results should be the same regardless of prefetch setting
+ self.assertEqual(data1, data2)
+
+ def test_ring_buffer_schema_access(self):
+ """Test ring buffer schema access"""
+ if not self.ring_buffer_available:
+ self.skipTest("Ring buffer not available")
+
+ # Test schema access
+ schema_data, schema_size, schema_version = self.ring_buffer.get_schema(
+ thread_index=0
+ )
+
+ # Should return schema information (may be None if no schema)
+ self.assertIsInstance(schema_size, int)
+ self.assertIsInstance(schema_version, int)
+ self.assertGreaterEqual(schema_size, 0)
+ self.assertGreaterEqual(schema_version, 0)
+
+ # Test schema string access
+ schema_string, schema_size_str, schema_version_str = (
+ self.ring_buffer.get_schema_string(thread_index=0)
+ )
+
+ # Should return consistent information
+ self.assertEqual(schema_size, schema_size_str)
+ self.assertEqual(schema_version, schema_version_str)
+
+ # If schema exists, it should be readable
+ if schema_size > 0:
+ self.assertIsNotNone(schema_data)
+ self.assertIsInstance(schema_data, bytes)
+ self.assertEqual(len(schema_data), schema_size)
+
+ # If it's a string schema, it should be decodable
+ if schema_string is not None:
+ self.assertIsInstance(schema_string, str)
+ self.assertGreater(len(schema_string), 0)
+
+ print(f"Schema info: size={schema_size}, version={schema_version}")
+ if schema_string:
+ print(f"Schema content: {schema_string}")
+
+ def test_ring_buffer_schema_metadata(self):
+ """Test that schema information is included in metadata"""
+ if not self.ring_buffer_available:
+ self.skipTest("Ring buffer not available")
+
+ # Get metadata for thread 0
+ metadata = self.ring_buffer._get_thread_metadata(0)
+
+ # Should include schema information
+ self.assertIn("schema_version", metadata)
+ self.assertIn("schema_offset", metadata)
+ self.assertIn("schema_size", metadata)
+
+ # Values should be consistent with config
+ self.assertEqual(
+ metadata["schema_version"], self.ring_buffer.config["schema_version"]
+ )
+ self.assertEqual(
+ metadata["schema_size"], self.ring_buffer.config["schema_size"]
+ )
+
+ def test_ring_buffer_schema_config(self):
+ """Test that schema information is included in config"""
+ if not self.ring_buffer_available:
+ self.skipTest("Ring buffer not available")
+
+ config = self.ring_buffer.get_config()
+
+ # Should include schema information
+ self.assertIn("schema_size", config)
+ self.assertIn("schema_version", config)
+
+ # Values should be reasonable
+ self.assertIsInstance(config["schema_size"], int)
+ self.assertIsInstance(config["schema_version"], int)
+ self.assertGreaterEqual(config["schema_size"], 0)
+ self.assertGreaterEqual(config["schema_version"], 0)
+
+ print(
+ f"Config schema info: size={config['schema_size']}, version={config['schema_version']}"
+ )
+
+ def test_ring_buffer_schema_convenience_methods(self):
+ """Test convenience methods for schema access"""
+ if not self.ring_buffer_available:
+ self.skipTest("Ring buffer not available")
+
+ # Test convenience methods from VPPStats
+ schema_data, schema_size, schema_version = self.stat.get_ring_buffer_schema(
+ "/test/ring-buffer"
+ )
+ schema_string, schema_size_str, schema_version_str = (
+ self.stat.get_ring_buffer_schema_string("/test/ring-buffer")
+ )
+
+ # Should return consistent information
+ self.assertEqual(schema_size, schema_size_str)
+ self.assertEqual(schema_version, schema_version_str)
+
+ # Should match direct access
+ direct_schema_data, direct_schema_size, direct_schema_version = (
+ self.ring_buffer.get_schema()
+ )
+ self.assertEqual(schema_size, direct_schema_size)
+ self.assertEqual(schema_version, direct_schema_version)
+ if schema_data is not None:
+ self.assertEqual(schema_data, direct_schema_data)
+
+ def test_ring_buffer_schema_content(self):
+ """Test that schema content matches expected CDDL format"""
+ if not self.ring_buffer_available:
+ self.skipTest("Ring buffer not available")
+
+ # Get schema string
+ schema_string, schema_size, schema_version = self.ring_buffer.get_schema_string(
+ thread_index=0
+ )
+
+ # If schema exists, verify it has expected content
+ if schema_size > 0 and schema_string:
+ # Should be a string (not bytes)
+ self.assertIsInstance(schema_string, str)
+
+ # Should contain expected CDDL-like content
+ self.assertIn("ring_test_schema", schema_string)
+ self.assertIn("name:", schema_string)
+ self.assertIn("version:", schema_string)
+ self.assertIn("fields:", schema_string)
+ self.assertIn("seq", schema_string)
+ self.assertIn("timestamp", schema_string)
+
+ print(f"✓ Schema content verified: {schema_string[:100]}...")
+ else:
+ print("ℹ No schema found in ring buffer")
+
+
+class TestRingBufferIntegration(unittest.TestCase):
+ """Integration tests that coordinate with VPP writer tests"""
+
+ def setUp(self):
+ """Connect to statseg and find integration test ring buffer"""
+ self.stat = VPPStats()
+ self.stat.connect()
+
+ # Look for integration test ring buffer
+ try:
+ self.ring_buffer = self.stat.get_ring_buffer("/integration/test")
+ self.ring_buffer_available = True
+ except (KeyError, ValueError):
+ print(
+ "Integration test ring buffer not found. Run integration test setup first."
+ )
+ self.ring_buffer_available = False
+
+ def tearDown(self):
+ """Disconnect from statseg"""
+ self.stat.disconnect()
+
+ def test_integration_data_flow(self):
+ """Test complete data flow from writer to reader"""
+ if not self.ring_buffer_available:
+ self.skipTest("Integration ring buffer not available")
+
+ # This test would coordinate with a VPP writer test
+ # For now, we'll test the basic flow
+
+ # Reset reader state
+ self.ring_buffer.local_tails[0] = 0
+ self.ring_buffer.last_sequences[0] = None
+
+ # Try to consume data
+ data = self.ring_buffer.consume_data(thread_index=0)
+
+ # Should get list (empty or with data)
+ self.assertIsInstance(data, list)
+
+ # If we got data, verify it's the right format
+ for entry in data:
+ self.assertIsInstance(entry, bytes)
+ self.assertGreater(len(entry), 0)
+
+ def test_integration_overwrite_detection(self):
+ """Test overwrite detection in integration scenario"""
+ if not self.ring_buffer_available:
+ self.skipTest("Integration ring buffer not available")
+
+ # This test would coordinate with a writer that intentionally overwrites
+ # For now, we'll test the detection mechanism
+
+ # Simulate overwrite by manipulating sequence numbers
+ original_sequence = self.ring_buffer.last_sequences[0]
+
+ # This would normally happen when writer laps reader
+ # For testing, we'll just verify the detection logic exists
+ metadata = self.ring_buffer._get_thread_metadata(0)
+ self.assertIn("sequence", metadata)
+
+ def test_integration_performance(self):
+ """Test performance characteristics"""
+ if not self.ring_buffer_available:
+ self.skipTest("Integration ring buffer not available")
+
+ import time
+
+ # Test individual consume performance
+ start_time = time.time()
+ for _ in range(100):
+ self.ring_buffer.consume_data(thread_index=0, max_entries=1)
+ individual_time = time.time() - start_time
+
+ # Test batch consume performance
+ start_time = time.time()
+ for _ in range(10):
+ self.ring_buffer.consume_data_batch(thread_index=0, max_entries=10)
+ batch_time = time.time() - start_time
+
+ print(f"Individual consume time: {individual_time:.6f}s")
+ print(f"Batch consume time: {batch_time:.6f}s")
+
+ # Batch should be faster (though with empty data, difference might be minimal)
+ self.assertIsInstance(individual_time, float)
+ self.assertIsInstance(batch_time, float)
+
+
+def run_ring_buffer_tests():
+ """Run ring buffer tests with proper setup"""
+ print("Running Ring Buffer Tests...")
+ print("=" * 50)
+
+ # Create test suite
+ suite = unittest.TestSuite()
+
+ # Add ring buffer tests
+ suite.addTest(unittest.makeSuite(TestRingBuffer))
+ suite.addTest(unittest.makeSuite(TestRingBufferIntegration))
+
+ # Run tests
+ runner = unittest.TextTestRunner(verbosity=2)
+ result = runner.run(suite)
+
+ print("=" * 50)
+ print(f"Tests run: {result.testsRun}")
+ print(f"Failures: {len(result.failures)}")
+ print(f"Errors: {len(result.errors)}")
+
+ return result.wasSuccessful()
+
+
+def demo_ring_buffer_schema():
+ """Demo function showing how to use ring buffer schema functionality"""
+ print("Ring Buffer Schema Demo")
+ print("=" * 30)
+
+ try:
+ # Connect to VPP stats
+ stats = VPPStats()
+ stats.connect()
+
+ # Look for test ring buffer
+ try:
+ ring_buffer = stats.get_ring_buffer("/test/ring-buffer")
+ print("✓ Found test ring buffer")
+
+ # Get configuration
+ config = ring_buffer.get_config()
+ print(f"Ring buffer config: {config}")
+
+ # Get schema information
+ schema_data, schema_size, schema_version = ring_buffer.get_schema()
+ print(f"Schema info: size={schema_size}, version={schema_version}")
+
+ # Get schema as string
+ schema_string, _, _ = ring_buffer.get_schema_string()
+ if schema_string:
+ print(f"Schema content:\n{schema_string}")
+ else:
+ print("No schema found")
+
+ # Use convenience methods
+ print("\nUsing convenience methods:")
+ conv_schema_string, conv_size, conv_version = (
+ stats.get_ring_buffer_schema_string("/test/ring-buffer")
+ )
+ print(
+ f"Convenience method result: size={conv_size}, version={conv_version}"
+ )
+ if conv_schema_string:
+ print(f"Schema: {conv_schema_string[:100]}...")
+
+ except (KeyError, ValueError) as e:
+ print(f"Test ring buffer not found: {e}")
+ print(
+ "Run 'test stats ring-buffer-gen /test/ring-buffer 100 1000 16' in VPP first"
+ )
+
+ stats.disconnect()
+
+ except Exception as e:
+ print(f"Error: {e}")
+ print("Make sure VPP is running and stats socket is available")
+
+ print("=" * 30)
+
+
if __name__ == "__main__":
import cProfile
from pstats import Stats
+ # Run ring buffer tests if available
+ if "--ring-buffer-tests" in sys.argv:
+ success = run_ring_buffer_tests()
+ sys.exit(0 if success else 1)
+
+ # Run schema demo if requested
+ if "--schema-demo" in sys.argv:
+ demo_ring_buffer_schema()
+ sys.exit(0)
+
+ # Run original tests
unittest.main()
break;
case STAT_DIR_TYPE_SCALAR_INDEX:
+ case STAT_DIR_TYPE_GAUGE:
fformat (stdout, "%.2f %s\n", res[i].scalar_value, res[i].name);
break;
+ case STAT_DIR_TYPE_HISTOGRAM_LOG2:
+ for (k = 0; k < vec_len (res[i].log2_histogram_bins); k++)
+ {
+ u64 *bins = res[i].log2_histogram_bins[k];
+ int n_bins = vec_len (bins);
+ if (n_bins < 2) // Need at least min_exp + one bin
+ continue;
+ u32 min_exp = bins[0];
+ u64 cumulative = 0;
+ u64 sum = 0;
+ fformat (stdout, "Histogram %s (thread %d):\n", res[i].name,
+ k);
+ for (int j = 1; j < n_bins; ++j)
+ {
+ cumulative += bins[j];
+ sum += bins[j] *
+ (1ULL << (min_exp + j - 1)); // midpoint approx
+ fformat (stdout, " <= %llu: %llu (cumulative: %llu)\n",
+ (1ULL << (min_exp + j - 1)), bins[j],
+ cumulative);
+ }
+ fformat (stdout,
+ " +Inf: %llu (total count: %llu, sum: %llu)\n",
+ cumulative, cumulative, sum);
+ }
+ break;
+
case STAT_DIR_TYPE_EMPTY:
break;
break;
case STAT_DIR_TYPE_SCALAR_INDEX:
+ case STAT_DIR_TYPE_GAUGE:
fformat (stdout, "%.2f %s\n", res[i].scalar_value, res[i].name);
break;
+ case STAT_DIR_TYPE_HISTOGRAM_LOG2:
+ for (k = 0; k < vec_len (res[i].log2_histogram_bins); k++)
+ {
+ u64 *bins = res[i].log2_histogram_bins[k];
+ int n_bins = vec_len (bins);
+ if (n_bins < 2) // Need at least min_exp + one bin
+ continue;
+ u32 min_exp = bins[0];
+ u64 cumulative = 0;
+ u64 sum = 0;
+ fformat (stdout, "Histogram %s (thread %d):\n", res[i].name,
+ k);
+ for (int j = 1; j < n_bins; ++j)
+ {
+ cumulative += bins[j];
+ sum += bins[j] *
+ (1ULL << (min_exp + j - 1)); // midpoint approx
+ fformat (stdout, " <= %llu: %llu (cumulative: %llu)\n",
+ (1ULL << (min_exp + j - 1)), bins[j],
+ cumulative);
+ }
+ fformat (stdout,
+ " +Inf: %llu (total count: %llu, sum: %llu)\n",
+ cumulative, cumulative, sum);
+ }
+ break;
+
case STAT_DIR_TYPE_NAME_VECTOR:
if (res[i].name_vector == 0)
continue;
/*
*------------------------------------------------------------------
- * vpp_get_stats.c
+ * vpp_prometheus_export.c
*
* Copyright (c) 2018 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
#include <sys/socket.h>
#include <vpp-api/client/stat_client.h>
#include <vlib/vlib.h>
+#include <vlib/stats/shared.h>
#include <ctype.h>
/* https://github.com/prometheus/prometheus/wiki/Default-port-allocations */
return s;
}
+// For STAT_DIR_TYPE_HISTOGRAM_LOG2, the data is in res->log2_histogram_bins
+static void
+print_log2_histogram_metric (FILE *stream, stat_segment_data_t *res)
+{
+ int n_threads = vec_len (res->log2_histogram_bins);
+ char sanitized_name[VLIB_STATS_MAX_NAME_SZ];
+ strncpy (sanitized_name, res->name, VLIB_STATS_MAX_NAME_SZ - 1);
+ sanitized_name[VLIB_STATS_MAX_NAME_SZ - 1] = '\0';
+ prom_string (sanitized_name);
+
+ for (int thread = 0; thread < n_threads; ++thread)
+ {
+ u64 *bins = res->log2_histogram_bins[thread];
+ int n_bins = vec_len (bins);
+ if (n_bins < 2) // Need at least min_exp + one bin
+ continue;
+ u32 min_exp = bins[0];
+ u64 cumulative = 0;
+ u64 sum = 0;
+ fformat (stream, "# TYPE %s histogram\n", sanitized_name);
+ for (int i = 1; i < n_bins; ++i)
+ {
+ cumulative += bins[i];
+ sum += bins[i] * (1ULL << (min_exp + i - 1)); // midpoint approx
+ fformat (stream, "%s{le=\"%llu\",thread=\"%d\"} %llu\n",
+ sanitized_name, (1ULL << (min_exp + i - 1)), thread,
+ cumulative);
+ }
+ // +Inf bucket
+ fformat (stream, "%s{le=\"+Inf\",thread=\"%d\"} %llu\n", sanitized_name,
+ thread, cumulative);
+ // _count and _sum
+ fformat (stream, "%s_count{thread=\"%d\"} %llu\n", sanitized_name,
+ thread, cumulative);
+ fformat (stream, "%s_sum{thread=\"%d\"} %llu\n", sanitized_name, thread,
+ sum);
+ }
+}
+
static void
print_metric_v1 (FILE *stream, stat_segment_data_t *res)
{
switch (res->type)
{
+ case STAT_DIR_TYPE_HISTOGRAM_LOG2:
+ print_log2_histogram_metric (stream, res);
+ break;
case STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE:
fformat (stream, "# TYPE %s counter\n", prom_string (res->name));
for (k = 0; k < vec_len (res->simple_counter_vec); k++)
fformat (stream, "%s %.2f\n", prom_string (res->name),
res->scalar_value);
break;
-
+ case STAT_DIR_TYPE_GAUGE:
+ fformat (stream, "# TYPE %s gauge\n", prom_string (res->name));
+ fformat (stream, "%s %.2f\n", prom_string (res->name),
+ res->scalar_value);
+ break;
case STAT_DIR_TYPE_NAME_VECTOR:
fformat (stream, "# TYPE %s_info gauge\n", prom_string (res->name));
for (k = 0; k < vec_len (res->name_vector); k++)
num_tokens = tokenize (res->name, tokens, lengths, MAX_TOKENS);
switch (res->type)
{
+ case STAT_DIR_TYPE_HISTOGRAM_LOG2:
+ print_log2_histogram_metric (stream, res);
+ break;
case STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE:
if (res->simple_counter_vec == 0)
return;
+
for (k = 0; k < vec_len (res->simple_counter_vec); k++)
for (j = 0; j < vec_len (res->simple_counter_vec[k]); j++)
{
break;
case STAT_DIR_TYPE_SCALAR_INDEX:
+ case STAT_DIR_TYPE_GAUGE:
if ((num_tokens == 4) &&
!strncmp (tokens[1], "buffer-pools", lengths[1]))
{
--- /dev/null
+import time
+import unittest
+import struct
+from framework import VppTestCase
+
+RING_NAME = "/test/ring_buffer_pytest"
+RING_SIZE = 1024
+COUNT = 100
+INTERVAL_USEC = 1000 # 1ms
+
+
+class TestRingBuffer(VppTestCase):
+ """Ring Buffer Test Case"""
+
+ maxDiff = None
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestRingBuffer, cls).setUpClass()
+
+ def setUp(self):
+ super(TestRingBuffer, self).setUp()
+
+ def tearDown(self):
+ super(TestRingBuffer, self).tearDown()
+
+ def test_ring_buffer_generation(self):
+ """Test ring buffer generation"""
+
+ # Step 1: Generate messages using the VPPApiClient CLI
+ cli_cmd = f"test stats ring-buffer-gen {RING_NAME} {COUNT} {INTERVAL_USEC} {RING_SIZE}"
+ result = self.vapi.cli(cli_cmd)
+ assert "Generated" in result, f"CLI failed: {result}"
+
+ # Step 2: Connect to stats segment and get the ring buffer
+ ring_buffer = self.statistics.get_ring_buffer(f"{RING_NAME}")
+
+ # Step 3: Poll for all messages
+ received = []
+ start = time.time()
+ while len(received) < COUNT and (time.time() - start) < 10:
+ data = ring_buffer.poll_for_data(thread_index=0, timeout=0.5)
+ for entry in data:
+ # Unpack the struct: u64 seq, f64 timestamp
+ seq, ts = struct.unpack("<Qd", entry)
+ received.append((seq, ts))
+ time.sleep(0.01)
+
+ # Step 4: Validate
+ assert len(received) == COUNT, f"Expected {COUNT} messages, got {len(received)}"
+ for i, (seq, ts) in enumerate(received):
+ assert seq == i, f"Sequence mismatch at {i}: got {seq}"
+ assert ts > 0, f"Timestamp should be positive, got {ts}"
+
+ # print(f"Received {len(received)} messages from ring buffer '{RING_NAME}'")
+
+ def test_ring_buffer_overwrite(self):
+ """Test ring buffer overwrite behavior"""
+ ring_size = 16
+ total_messages = 3 * ring_size
+ interval_usec = 100 # Fast, to fill quickly
+ ring_name = "/test/ring_buffer_overwrite"
+
+ # Step 1: Generate more messages than the ring size
+ cli_cmd = f"test stats ring-buffer-gen {ring_name} {total_messages} {interval_usec} {ring_size}"
+ result = self.vapi.cli(cli_cmd)
+ assert "Generated" in result, f"CLI failed: {result}"
+
+ # Ensure all messages are written before polling
+ time.sleep(0.1)
+
+ # Step 2: Connect to stats segment and get the ring buffer
+ ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+ # Step 3: Poll for all messages (should only get the last 'ring_size' messages)
+ received = {}
+ start = time.time()
+ while len(received) < ring_size and (time.time() - start) < 10:
+ data = ring_buffer.poll_for_data(thread_index=0, timeout=0.5)
+ for entry in data:
+ seq, ts = struct.unpack("<Qd", entry)
+ if seq not in received:
+ received[seq] = ts
+ time.sleep(0.01)
+
+ # Debug: print the received sequence numbers
+ print(f"Received sequence numbers: {sorted(received.keys())}")
+
+ # Step 4: Validate
+ assert (
+ len(received) == ring_size
+ ), f"Expected {ring_size} messages, got {len(received)}"
+ expected_start = total_messages - ring_size
+ for i, seq in enumerate(sorted(received)):
+ ts = received[seq]
+ assert (
+ seq == expected_start + i
+ ), f"Sequence mismatch at {i}: got {seq}, expected {expected_start + i}"
+ assert ts > 0, f"Timestamp should be positive, got {ts}"
+
+ def test_ring_buffer_batch_operations(self):
+ """Test ring buffer batch operations"""
+ ring_name = "/test/ring_buffer_batch"
+ batch_size = 10
+ total_messages = 50
+
+ # Step 1: Generate messages using VPP CLI
+ cli_cmd = f"test stats ring-buffer-gen {ring_name} {total_messages} {INTERVAL_USEC} {RING_SIZE}"
+ result = self.vapi.cli(cli_cmd)
+ assert "Generated" in result, f"CLI failed: {result}"
+
+ # Step 2: Get ring buffer
+ ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+ # Step 3: Test individual consume
+ individual_data = []
+ start = time.time()
+ while len(individual_data) < total_messages and (time.time() - start) < 10:
+ data = ring_buffer.consume_data(thread_index=0, max_entries=batch_size)
+ individual_data.extend(data)
+ time.sleep(0.01)
+
+ # Step 4: Reset and test batch consume
+ ring_buffer.local_tails[0] = 0
+ ring_buffer.last_sequences[0] = None
+
+ batch_data = []
+ start = time.time()
+ while len(batch_data) < total_messages and (time.time() - start) < 10:
+ data = ring_buffer.consume_data_batch(
+ thread_index=0, max_entries=batch_size
+ )
+ batch_data.extend(data)
+ time.sleep(0.01)
+
+ # Step 5: Validate results are the same
+ assert len(individual_data) == len(
+ batch_data
+ ), "Individual and batch should return same number of entries"
+ assert (
+ individual_data == batch_data
+ ), "Individual and batch should return same data"
+
+ # Step 6: Validate data format
+ for entry in individual_data:
+ seq, ts = struct.unpack("<Qd", entry)
+ assert seq >= 0, f"Sequence should be non-negative, got {seq}"
+ assert ts > 0, f"Timestamp should be positive, got {ts}"
+
+ def test_ring_buffer_configuration(self):
+ """Test ring buffer configuration access"""
+ ring_name = "/test/ring_buffer_config"
+
+ # Step 1: Create ring buffer
+ cli_cmd = (
+ f"test stats ring-buffer-gen {ring_name} 10 {INTERVAL_USEC} {RING_SIZE}"
+ )
+ result = self.vapi.cli(cli_cmd)
+ assert "Generated" in result, f"CLI failed: {result}"
+
+ # Step 2: Get ring buffer and check configuration
+ ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+ config = ring_buffer.get_config()
+
+ # Step 3: Validate configuration
+ assert "entry_size" in config, "Config should contain entry_size"
+ assert "ring_size" in config, "Config should contain ring_size"
+ assert "n_threads" in config, "Config should contain n_threads"
+ assert config["entry_size"] > 0, "Entry size should be positive"
+ assert config["ring_size"] > 0, "Ring size should be positive"
+ assert config["n_threads"] > 0, "Number of threads should be positive"
+
+ # Step 4: Check metadata access
+ metadata = ring_buffer._get_thread_metadata(0)
+ assert "head" in metadata, "Metadata should contain head"
+ assert "sequence" in metadata, "Metadata should contain sequence"
+ assert isinstance(metadata["head"], int), "Head should be integer"
+ assert isinstance(metadata["sequence"], int), "Sequence should be integer"
+
+ def test_ring_buffer_error_handling(self):
+ """Test ring buffer error handling"""
+ ring_name = "/test/ring_buffer_error"
+
+ # Step 1: Create ring buffer
+ cli_cmd = (
+ f"test stats ring-buffer-gen {ring_name} 10 {INTERVAL_USEC} {RING_SIZE}"
+ )
+ result = self.vapi.cli(cli_cmd)
+ assert "Generated" in result, f"CLI failed: {result}"
+
+ # Step 2: Get ring buffer
+ ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+ # Step 3: Test invalid thread index
+ config = ring_buffer.get_config()
+ invalid_thread = config["n_threads"] + 1
+
+ try:
+ ring_buffer._get_thread_metadata(invalid_thread)
+ assert False, "Should have raised IndexError for invalid thread"
+ except IndexError:
+ pass # Expected
+
+ # Step 4: Test invalid parameters
+ data = ring_buffer.consume_data(thread_index=0, max_entries=0)
+ assert data == [], "Zero max_entries should return empty list"
+
+ data = ring_buffer.consume_data(thread_index=0, max_entries=-1)
+ assert data == [], "Negative max_entries should return empty list"
+
+ def test_ring_buffer_api_compatibility(self):
+ """Test ring buffer API compatibility methods"""
+ ring_name = "/test/ring_buffer_compat"
+
+ # Step 1: Create ring buffer
+ cli_cmd = (
+ f"test stats ring-buffer-gen {ring_name} 10 {INTERVAL_USEC} {RING_SIZE}"
+ )
+ result = self.vapi.cli(cli_cmd)
+ assert "Generated" in result, f"CLI failed: {result}"
+
+ # Step 2: Get ring buffer
+ ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+ # Step 3: Test compatibility methods
+ count = ring_buffer.get_count(thread_index=0)
+ is_empty = ring_buffer.is_empty(thread_index=0)
+ is_full = ring_buffer.is_full(thread_index=0)
+
+ # These methods return simplified values since writer doesn't track reader state
+ assert count == 0, "Count should be 0 (writer doesn't track reader)"
+ assert is_empty == True, "Is empty should be True (writer doesn't track reader)"
+ assert is_full == False, "Is full should be False (writer doesn't track reader)"
+
+ # Step 4: Test string representation
+ repr_str = repr(ring_buffer)
+ assert (
+ "StatsRingBuffer" in repr_str
+ ), "String representation should contain StatsRingBuffer"
+ assert (
+ "entry_size" in repr_str
+ ), "String representation should contain entry_size"
+ assert "ring_size" in repr_str, "String representation should contain ring_size"
+ assert "n_threads" in repr_str, "String representation should contain n_threads"
+
+ def test_ring_buffer_performance(self):
+ """Test ring buffer performance characteristics"""
+ ring_name = "/test/ring_buffer_perf"
+ test_entries = 100
+
+ # Step 1: Create ring buffer
+ cli_cmd = f"test stats ring-buffer-gen {ring_name} {test_entries} {INTERVAL_USEC} {RING_SIZE}"
+ result = self.vapi.cli(cli_cmd)
+ assert "Generated" in result, f"CLI failed: {result}"
+
+ # Step 2: Get ring buffer
+ ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+ # Step 3: Test individual consume performance
+ start_time = time.time()
+ individual_count = 0
+ for _ in range(100): # 100 operations
+ data = ring_buffer.consume_data(thread_index=0, max_entries=1)
+ individual_count += len(data)
+ individual_time = time.time() - start_time
+
+ # Step 4: Reset and test batch consume performance
+ ring_buffer.local_tails[0] = 0
+ ring_buffer.last_sequences[0] = None
+
+ start_time = time.time()
+ batch_count = 0
+ for _ in range(10): # 10 batch operations
+ data = ring_buffer.consume_data_batch(thread_index=0, max_entries=10)
+ batch_count += len(data)
+ batch_time = time.time() - start_time
+
+ # Step 5: Calculate performance metrics
+ individual_throughput = 100 / individual_time if individual_time > 0 else 0
+ batch_throughput = 100 / batch_time if batch_time > 0 else 0
+
+ print(f"Individual consume: {individual_throughput:.0f} ops/sec")
+ print(f"Batch consume: {batch_throughput:.0f} ops/sec")
+
+ # Step 6: Validate performance (basic sanity checks)
+ assert individual_time > 0, "Individual operations should take some time"
+ assert batch_time > 0, "Batch operations should take some time"
+ assert individual_count >= 0, "Individual count should be non-negative"
+ assert batch_count >= 0, "Batch count should be non-negative"
+
+ def test_ring_buffer_multiple_threads(self):
+ """Test ring buffer access across multiple threads"""
+ ring_name = "/test/ring_buffer_multi"
+ threads = 2
+
+ # Step 1: Create ring buffer with multiple threads using CLI (single-threaded)
+ cli_cmd = (
+ f"test stats ring-buffer-gen {ring_name} 10 {INTERVAL_USEC} {RING_SIZE}"
+ )
+ result = self.vapi.cli(cli_cmd)
+ assert "Generated" in result, f"CLI failed: {result}"
+
+ # Step 2: Get ring buffer
+ ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+ # Step 3: Test single thread (since CLI only supports single-threaded)
+ config = ring_buffer.get_config()
+ assert (
+ config["n_threads"] == 1
+ ), f"CLI creates single-threaded ring buffers, got {config['n_threads']}"
+
+ # Test metadata access
+ metadata = ring_buffer._get_thread_metadata(0)
+ assert "head" in metadata, "Thread 0 metadata should contain head"
+ assert "sequence" in metadata, "Thread 0 metadata should contain sequence"
+
+ # Test data consumption
+ data = ring_buffer.consume_data(thread_index=0)
+ assert isinstance(data, list), "Thread 0 should return list"
+
+ def test_ring_buffer_sequence_consistency(self):
+ """Test sequence number consistency across reads"""
+ ring_name = "/test/ring_buffer_seq"
+
+ # Step 1: Create ring buffer
+ cli_cmd = (
+ f"test stats ring-buffer-gen {ring_name} 10 {INTERVAL_USEC} {RING_SIZE}"
+ )
+ result = self.vapi.cli(cli_cmd)
+ assert "Generated" in result, f"CLI failed: {result}"
+
+ # Step 2: Get ring buffer
+ ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+ # Step 3: Read metadata multiple times to check consistency
+ metadata1 = ring_buffer._get_thread_metadata(0)
+ time.sleep(0.01) # Small delay
+ metadata2 = ring_buffer._get_thread_metadata(0)
+
+ # Sequence numbers should be consistent (same or increasing)
+ assert (
+ metadata2["sequence"] >= metadata1["sequence"]
+ ), "Sequence should be monotonically increasing"
+
+ def test_ring_buffer_polling_with_callback(self):
+ """Test ring buffer polling with callback function"""
+ ring_name = "/test/ring_buffer_callback"
+
+ # Step 1: Create ring buffer
+ cli_cmd = (
+ f"test stats ring-buffer-gen {ring_name} 5 {INTERVAL_USEC} {RING_SIZE}"
+ )
+ result = self.vapi.cli(cli_cmd)
+ assert "Generated" in result, f"CLI failed: {result}"
+
+ # Step 2: Get ring buffer
+ ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+ # Step 3: Test polling with callback
+ collected_data = []
+
+ def callback(data):
+ collected_data.append(data)
+
+ # Poll with callback and short timeout
+ result_data = ring_buffer.poll_for_data(
+ thread_index=0, timeout=1.0, callback=callback
+ )
+
+ # Both callback and return value should work
+ assert isinstance(result_data, list), "Poll should return list"
+ assert isinstance(collected_data, list), "Callback should collect data in list"
+
+ def test_ring_buffer_empty_operations(self):
+ """Test ring buffer operations on empty buffer"""
+ ring_name = "/test/ring_buffer_empty"
+
+ # Step 1: Create ring buffer but don't generate data
+ cli_cmd = (
+ f"test stats ring-buffer-gen {ring_name} 0 {INTERVAL_USEC} {RING_SIZE}"
+ )
+ result = self.vapi.cli(cli_cmd)
+ assert "Generated" in result, f"CLI failed: {result}"
+
+ # Step 2: Get ring buffer
+ ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+ # Step 3: Test empty consume
+ data = ring_buffer.consume_data(thread_index=0)
+ assert data == [], "Empty buffer should return empty list"
+
+ # Step 4: Test empty batch consume
+ data = ring_buffer.consume_data_batch(thread_index=0)
+ assert data == [], "Empty buffer should return empty list for batch"
+
+ # Step 5: Test empty poll
+ data = ring_buffer.poll_for_data(thread_index=0, timeout=0.1)
+ assert data == [], "Empty buffer should return empty list for poll"
+
+ def test_ring_buffer_prefetch_parameter(self):
+ """Test prefetch parameter in batch consume"""
+ ring_name = "/test/ring_buffer_prefetch"
+
+ # Step 1: Create ring buffer
+ cli_cmd = (
+ f"test stats ring-buffer-gen {ring_name} 10 {INTERVAL_USEC} {RING_SIZE}"
+ )
+ result = self.vapi.cli(cli_cmd)
+ assert "Generated" in result, f"CLI failed: {result}"
+
+ # Step 2: Get ring buffer
+ ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+ # Step 3: Test with prefetch enabled
+ data1 = ring_buffer.consume_data_batch(thread_index=0, prefetch=True)
+
+ # Step 4: Reset and test with prefetch disabled
+ ring_buffer.local_tails[0] = 0
+ ring_buffer.last_sequences[0] = None
+
+ data2 = ring_buffer.consume_data_batch(thread_index=0, prefetch=False)
+
+ # Step 5: Results should be the same regardless of prefetch setting
+ assert data1 == data2, "Prefetch parameter should not affect results"
--- /dev/null
+#!/usr/bin/env python3
+"""
+Simple Ring Buffer Test Runner
+
+This script provides a simple way to test ring buffer functionality
+using the existing VPP make test infrastructure.
+
+Usage:
+ python3 test/test_ring_buffer_simple.py
+
+This script can be run as part of the VPP test suite or standalone.
+"""
+
+import sys
+import time
+import struct
+from framework import VppTestCase
+
+
+class TestRingBufferSimple(VppTestCase):
+ """Simple Ring Buffer Test Case for Make Test Infrastructure"""
+
+ def test_ring_buffer_basic_functionality(self):
+ """Test basic ring buffer functionality"""
+ import time
+ import struct
+
+ # Use a unique name for each test run to avoid reading old data
+ ring_name = f"/test/ring_buffer_simple_{int(time.time())}"
+ count = 50
+ interval_usec = 1000
+ ring_size = 64
+
+ # Step 1: Generate messages using VPP CLI
+ cli_cmd = f"test stats ring-buffer-gen {ring_name} {count} {interval_usec} {ring_size}"
+ result = self.vapi.cli(cli_cmd)
+ assert "Generated" in result, f"CLI failed: {result}"
+
+ # Step 2: Get ring buffer from stats segment
+ ring_buffer = self.statistics.get_ring_buffer(ring_name)
+
+ # Step 3: Consume all messages
+ received = []
+ start = time.time()
+
+ # Debug: Check initial state
+ metadata = ring_buffer._get_thread_metadata(0)
+ print(
+ f"DEBUG: Initial metadata - head: {metadata['head']}, sequence: {metadata['sequence']}"
+ )
+ print(f"DEBUG: Ring size: {ring_buffer.get_config()['ring_size']}")
+
+ while len(received) < count and (time.time() - start) < 10:
+ data = ring_buffer.consume_data(thread_index=0, max_entries=10)
+ print(f"DEBUG: Consumed {len(data)} entries")
+ for entry in data:
+ seq, ts = struct.unpack("<Qd", entry)
+ received.append((seq, ts))
+ if len(received) <= 5: # Print first few entries
+ print(f"DEBUG: Entry {len(received)-1}: seq={seq}, ts={ts}")
+ time.sleep(0.01)
+
+ # Step 4: Validate results
+ assert len(received) == count, f"Expected {count} messages, got {len(received)}"
+
+ # Check sequence numbers are consecutive
+ for i, (seq, ts) in enumerate(received):
+ assert seq == i, f"Sequence mismatch at {i}: got {seq}, expected {i}"
+ assert ts > 0, f"Timestamp should be positive, got {ts}"
+
+ print(f"✓ Successfully received {len(received)} messages from ring buffer")
+
+ def test_ring_buffer_batch_operations(self):
+ """Test ring buffer batch operations"""
+ ring_name = "/test/ring_buffer_batch_simple"
+ count = 30
+ interval_usec = 1000
+ ring_size = 64
+
+ # Step 1: Generate messages
+ cli_cmd = f"test stats ring-buffer-gen {ring_name} {count} {interval_usec} {ring_size}"
+ result = self.vapi.cli(cli_cmd)
+ assert "Generated" in result, f"CLI failed: {result}"
+
+ # Step 2: Get ring buffer
+ ring_buffer = self.statistics.get_ring_buffer(ring_name)
+
+ # Step 3: Test batch consume
+ data = ring_buffer.consume_data_batch(thread_index=0, max_entries=count)
+
+ # Step 4: Validate results
+ assert len(data) == count, f"Expected {count} entries, got {len(data)}"
+
+ # Check data format
+ for i, entry in enumerate(data):
+ seq, ts = struct.unpack("<Qd", entry)
+ assert seq == i, f"Sequence mismatch at {i}: got {seq}, expected {i}"
+ assert ts > 0, f"Timestamp should be positive, got {ts}"
+
+ print(f"✓ Successfully received {len(data)} messages using batch operations")
+
+ def test_ring_buffer_configuration(self):
+ """Test ring buffer configuration access"""
+ ring_name = "/test/ring_buffer_config_simple"
+ count = 10
+ interval_usec = 1000
+ ring_size = 32
+
+ # Step 1: Create ring buffer
+ cli_cmd = f"test stats ring-buffer-gen {ring_name} {count} {interval_usec} {ring_size}"
+ result = self.vapi.cli(cli_cmd)
+ assert "Generated" in result, f"CLI failed: {result}"
+
+ # Step 2: Get ring buffer and check configuration
+ ring_buffer = self.statistics.get_ring_buffer(ring_name)
+ config = ring_buffer.get_config()
+
+ # Step 3: Validate configuration
+ assert (
+ config["entry_size"] == 16
+ ), f"Expected entry_size 16, got {config['entry_size']}"
+ assert (
+ config["ring_size"] == ring_size
+ ), f"Expected ring_size {ring_size}, got {config['ring_size']}"
+ assert (
+ config["n_threads"] == 1
+ ), f"Expected n_threads 1, got {config['n_threads']}"
+
+ # Step 4: Check metadata
+ metadata = ring_buffer._get_thread_metadata(0)
+ assert "head" in metadata, "Metadata should contain head"
+ assert "sequence" in metadata, "Metadata should contain sequence"
+ assert (
+ metadata["sequence"] >= count
+ ), f"Sequence should be >= {count}, got {metadata['sequence']}"
+
+ print(f"✓ Ring buffer configuration validated: {config}")
+
+ def test_ring_buffer_empty_operations(self):
+ """Test ring buffer operations on empty buffer"""
+ ring_name = "/test/ring_buffer_empty_simple"
+
+ # Step 1: Create ring buffer without generating data
+ cli_cmd = f"test stats ring-buffer-gen {ring_name} 0 1000 32"
+ result = self.vapi.cli(cli_cmd)
+ assert "Generated" in result, f"CLI failed: {result}"
+
+ # Step 2: Get ring buffer
+ ring_buffer = self.statistics.get_ring_buffer(ring_name)
+
+ # Step 3: Test various operations on empty buffer
+ data = ring_buffer.consume_data(thread_index=0)
+ assert data == [], "Empty buffer should return empty list"
+
+ data = ring_buffer.consume_data_batch(thread_index=0)
+ assert data == [], "Empty buffer should return empty list for batch"
+
+ data = ring_buffer.poll_for_data(thread_index=0, timeout=0.1)
+ assert data == [], "Empty buffer should return empty list for poll"
+
+ # Step 4: Check API compatibility methods
+ count = ring_buffer.get_count(thread_index=0)
+ is_empty = ring_buffer.is_empty(thread_index=0)
+ is_full = ring_buffer.is_full(thread_index=0)
+
+ assert count == 0, "Count should be 0 for empty buffer"
+ assert is_empty == True, "Is empty should be True for empty buffer"
+ assert is_full == False, "Is full should be False for empty buffer"
+
+ print("✓ Empty buffer operations validated")
+
+ def test_ring_buffer_error_handling(self):
+ """Test ring buffer error handling"""
+ ring_name = "/test/ring_buffer_error_simple"
+
+ # Step 1: Create ring buffer
+ cli_cmd = f"test stats ring-buffer-gen {ring_name} 5 1000 32"
+ result = self.vapi.cli(cli_cmd)
+ assert "Generated" in result, f"CLI failed: {result}"
+
+ # Step 2: Get ring buffer
+ ring_buffer = self.statistics.get_ring_buffer(ring_name)
+
+ # Step 3: Test invalid thread index
+ try:
+ ring_buffer._get_thread_metadata(999) # Invalid thread
+ assert False, "Should have raised IndexError for invalid thread"
+ except IndexError:
+ pass # Expected
+
+ # Step 4: Test invalid parameters
+ data = ring_buffer.consume_data(thread_index=0, max_entries=0)
+ assert data == [], "Zero max_entries should return empty list"
+
+ data = ring_buffer.consume_data(thread_index=0, max_entries=-1)
+ assert data == [], "Negative max_entries should return empty list"
+
+ print("✓ Error handling validated")
+
+
+def main():
+ """Main function for standalone execution"""
+ import unittest
+
+ # Create test suite
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(TestRingBufferSimple))
+
+ # Run tests
+ runner = unittest.TextTestRunner(verbosity=2)
+ result = runner.run(suite)
+
+ # Return appropriate exit code
+ return 0 if result.wasSuccessful() else 1
+
+
+if __name__ == "__main__":
+ sys.exit(main())