stats: histogram, gauge and ring buffer types 71/43471/11
authorOle Troan <[email protected]>
Wed, 23 Jul 2025 10:46:49 +0000 (12:46 +0200)
committerDamjan Marion <[email protected]>
Fri, 26 Sep 2025 09:56:43 +0000 (09:56 +0000)
A new log2 histogram type with prometheus exporter support.
min_exp can be set in the first element to adjust the bins.

The ring buffer is intended for exporting records to a client side
reader. If the reader cannot keep up the writer will overwrite oldest
entry.

Added a new gauge type, which is like scalar index, but being
explicit allows the prometheus exported to set type corretly.

Type: improvement
Change-Id: Ibe1244f28e01eee8d61a3ca6edb6fd1801f1c942
Signed-off-by: Ole Troan <[email protected]>
Change-Id: I1a6046c6962d67db8c510a571e9414723acbbd7e
Signed-off-by: Ole Troan <[email protected]>
16 files changed:
src/plugins/unittest/CMakeLists.txt
src/plugins/unittest/histogram_test.c [new file with mode: 0644]
src/plugins/unittest/ring_buffer_test.c [new file with mode: 0644]
src/vlib/counter.c
src/vlib/counter.h
src/vlib/stats/cli.c
src/vlib/stats/shared.h
src/vlib/stats/stats.c
src/vlib/stats/stats.h
src/vpp-api/client/stat_client.c
src/vpp-api/client/stat_client.h
src/vpp-api/python/vpp_papi/vpp_stats.py
src/vpp/app/vpp_get_stats.c
src/vpp/app/vpp_prometheus_export.c
test/test_ring_buffer.py [new file with mode: 0644]
test/test_ring_buffer_simple.py [new file with mode: 0644]

index ab9168f..8ae3bcf 100644 (file)
@@ -60,6 +60,8 @@ add_vpp_plugin(unittest
   util_test.c
   vlib_test.c
   counter_test.c
+  ring_buffer_test.c
+  histogram_test.c
 
   COMPONENT
   vpp-plugin-devtools
diff --git a/src/plugins/unittest/histogram_test.c b/src/plugins/unittest/histogram_test.c
new file mode 100644 (file)
index 0000000..4032450
--- /dev/null
@@ -0,0 +1,122 @@
+#include <vlib/vlib.h>
+#include <vlib/stats/stats.h>
+#include <vlib/cli.h>
+#include <vlib/counter.h>
+#include <vppinfra/time.h>
+
+// Histogram test main structure
+static vlib_log2_histogram_main_t histogram_test_main;
+
+static clib_error_t *
+histogram_gen_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                         vlib_cli_command_t *cmd)
+{
+  u8 *name = 0;
+  u32 count = 1000;
+  u32 interval_usec = 1000;
+  u32 min_exp = 0;   // Minimum exponent (2^0 = 1)
+  u32 num_bins = 16; // Number of bins
+  u32 i;
+  u32 value;
+
+  // Parse CLI arguments
+  if (!unformat (input, "%s %u %u %u %u", &name, &count, &interval_usec,
+                &min_exp, &num_bins))
+    {
+      // Try without min_exp and num_bins (for backward compatibility)
+      if (!unformat (input, "%s %u %u", &name, &count, &interval_usec))
+       return clib_error_return (
+         0,
+         "parse error: '%U'\nUsage: test stats histogram-gen <name> "
+         "<count> <interval-usec> [min-exp] [num-bins]",
+         format_unformat_error, input);
+    }
+
+  // Initialize histogram
+  histogram_test_main.name = (char *) name;
+  histogram_test_main.min_exp = min_exp;
+  vlib_validate_log2_histogram (&histogram_test_main, num_bins);
+
+  clib_warning ("DEBUG: Created histogram '%s' with min_exp=%u, num_bins=%u",
+               name, min_exp, num_bins);
+
+  // Generate test data with different distributions
+  for (i = 0; i < count; ++i)
+    {
+      // Generate different types of values for testing
+      if (i % 4 == 0)
+       {
+         // Small values (0-15)
+         value = i % 16;
+       }
+      else if (i % 4 == 1)
+       {
+         // Medium values (16-255)
+         value = 16 + (i % 240);
+       }
+      else if (i % 4 == 2)
+       {
+         // Large values (256-4095)
+         value = 256 + (i % 3840);
+       }
+      else
+       {
+         // Very large values (4096+)
+         value = 4096 + (i % 10000);
+       }
+
+      // Calculate bin index and increment
+      u8 bin = vlib_log2_histogram_bin_index (&histogram_test_main, value);
+      vlib_increment_log2_histogram_bin (&histogram_test_main, 0, bin, 1);
+
+      // Debug: Print every 100th entry or last few entries
+      if (i % 100 == 0 || i >= count - 5)
+       {
+         clib_warning ("DEBUG: Entry %u - value: %u, bin: %u (2^%u = %u)", i,
+                       value, bin, min_exp + bin, 1ULL << (min_exp + bin));
+       }
+
+      vlib_process_suspend (vm, 1e-6 * interval_usec);
+    }
+
+  vlib_cli_output (
+    vm, "Generated %u histogram entries for '%s' (min_exp=%u, bins=%u)", count,
+    name, min_exp, num_bins);
+  return 0;
+}
+
+VLIB_CLI_COMMAND (histogram_gen_command, static) = {
+  .path = "test stats histogram-gen",
+  .short_help = "test stats histogram-gen <name> <count> <interval-usec> "
+               "[min-exp] [num-bins]",
+  .function = histogram_gen_command_fn,
+};
+
+static clib_error_t *
+histogram_clear_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                           vlib_cli_command_t *cmd)
+{
+  u8 *name = 0;
+  u32 entry_index;
+
+  if (!unformat (input, "%s", &name))
+    return clib_error_return (
+      0, "parse error: '%U'\nUsage: test stats histogram-clear <name>",
+      format_unformat_error, input);
+
+  entry_index = vlib_stats_find_entry_index ("%s", name);
+  if (entry_index == STAT_SEGMENT_INDEX_INVALID)
+    {
+      return clib_error_return (0, "Histogram '%s' not found", name);
+    }
+
+  vlib_stats_remove_entry (entry_index);
+  vlib_cli_output (vm, "Cleared histogram '%s'", name);
+  return 0;
+}
+
+VLIB_CLI_COMMAND (histogram_clear_command, static) = {
+  .path = "test stats histogram-clear",
+  .short_help = "test stats histogram-clear <name>",
+  .function = histogram_clear_command_fn,
+};
\ No newline at end of file
diff --git a/src/plugins/unittest/ring_buffer_test.c b/src/plugins/unittest/ring_buffer_test.c
new file mode 100644 (file)
index 0000000..aedef9e
--- /dev/null
@@ -0,0 +1,245 @@
+#include <vlib/vlib.h>
+#include <vlib/stats/stats.h>
+#include <vlib/cli.h>
+#include <vppinfra/time.h>
+
+// Message struct for the ring buffer
+typedef struct
+{
+  u64 seq;
+  f64 timestamp;
+} ring_test_msg_t;
+
+// Create a simple test schema string (CDDL-like format)
+static const char *
+get_ring_test_schema_string (void)
+{
+  return "ring_test_schema = {\n"
+        "  name: \"ring_test\",\n"
+        "  version: 1,\n"
+        "  fields: [\n"
+        "    { id: 0, name: \"seq\", type: \"uint64\" },\n"
+        "    { id: 1, name: \"timestamp\", type: \"float64\" }\n"
+        "  ]\n"
+        "}";
+}
+
+static clib_error_t *
+ring_buffer_gen_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                           vlib_cli_command_t *cmd)
+{
+  u8 *name = 0;
+  u32 count = 1000;
+  u32 interval_usec = 1000;
+  u32 ring_size = 16; // Default ring size
+  u32 i;
+  vlib_stats_ring_config_t config;
+  u32 entry_index;
+  ring_test_msg_t msg;
+
+  // Parse CLI arguments
+  if (!unformat (input, "%s %u %u %u", &name, &count, &interval_usec,
+                &ring_size))
+    {
+      // Try without ring_size (for backward compatibility)
+      if (!unformat (input, "%s %u %u", &name, &count, &interval_usec))
+       return clib_error_return (
+         0,
+         "parse error: '%U'\nUsage: test stats ring-buffer-gen <name> "
+         "<count> <interval-usec> [ring-size]",
+         format_unformat_error, input);
+    }
+
+  // Get test schema string
+  const char *schema_string = get_ring_test_schema_string ();
+  u32 schema_size = strlen (schema_string);
+
+  config.entry_size = sizeof (ring_test_msg_t);
+  config.ring_size = ring_size;
+  config.n_threads = 1;
+  config.schema_size = schema_size;
+  config.schema_version = 1;
+
+  // Create or find the ring buffer
+  entry_index = vlib_stats_find_entry_index ("%s", name);
+  if (entry_index == STAT_SEGMENT_INDEX_INVALID)
+    {
+      clib_warning ("DEBUG: Creating new ring buffer with name: %s", name);
+      entry_index =
+       vlib_stats_add_ring_buffer (&config, (u8 *) schema_string, "%s", name);
+    }
+  else
+    {
+      clib_warning (
+       "DEBUG: Found existing ring buffer with name: %s, entry_index: %u",
+       name, entry_index);
+    }
+  if (entry_index == STAT_SEGMENT_INDEX_INVALID)
+    return clib_error_return (0, "Failed to create/find ring buffer");
+
+  // Debug: Print initial ring buffer state
+  vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+  vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index);
+  vlib_stats_ring_buffer_t *ring_buffer = e->data;
+  vlib_stats_ring_metadata_t *metadata =
+    (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer +
+                                   ring_buffer->metadata_offset);
+  clib_warning ("DEBUG: Initial ring buffer state - head: %u, sequence: %lu",
+               metadata->head, metadata->sequence);
+
+  for (i = 0; i < count; ++i)
+    {
+      msg.seq = i;
+      msg.timestamp = vlib_time_now (vm);
+      vlib_stats_ring_produce (entry_index, 0, &msg);
+
+      // Debug: Print every 10th entry or last few entries
+      if (i % 10 == 0 || i >= count - 5)
+       {
+         clib_warning (
+           "DEBUG: Wrote entry %u - seq: %lu, head: %u, sequence: %lu", i,
+           msg.seq, metadata->head, metadata->sequence);
+       }
+
+      vlib_process_suspend (vm, 1e-6 * interval_usec);
+    }
+
+  // Debug: Print final ring buffer state
+  clib_warning ("DEBUG: Final ring buffer state - head: %u, sequence: %lu",
+               metadata->head, metadata->sequence);
+
+  vlib_cli_output (vm,
+                  "Generated %u messages to ring buffer '%s' (ring size %u)",
+                  count, name, ring_size);
+  return 0;
+}
+
+VLIB_CLI_COMMAND (ring_buffer_gen_command, static) = {
+  .path = "test stats ring-buffer-gen",
+  .short_help =
+    "test stats ring-buffer-gen <name> <count> <interval-usec> [ring-size]",
+  .function = ring_buffer_gen_command_fn,
+};
+
+static clib_error_t *
+ring_buffer_show_schema_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                                   vlib_cli_command_t *cmd)
+{
+  u8 *name = 0;
+  u32 entry_index;
+  u32 schema_size, schema_version;
+
+  if (!unformat (input, "%s", &name))
+    return clib_error_return (
+      0, "parse error: '%U'\nUsage: test stats ring-buffer-show-schema <name>",
+      format_unformat_error, input);
+
+  entry_index = vlib_stats_find_entry_index ("%s", name);
+  if (entry_index == STAT_SEGMENT_INDEX_INVALID)
+    return clib_error_return (0, "Ring buffer '%s' not found", name);
+
+  if (vlib_stats_ring_get_schema (entry_index, 0, NULL, &schema_size,
+                                 &schema_version) != 0)
+    {
+      return clib_error_return (0, "No schema found in ring buffer '%s'",
+                               name);
+    }
+
+  u8 *schema_data = clib_mem_alloc (schema_size);
+  if (!schema_data)
+    {
+      return clib_error_return (0, "Failed to allocate memory for schema");
+    }
+
+  if (vlib_stats_ring_get_schema (entry_index, 0, schema_data, &schema_size,
+                                 &schema_version) != 0)
+    {
+      clib_mem_free (schema_data);
+      return clib_error_return (0, "Failed to retrieve schema");
+    }
+
+  // Display schema as string
+  vlib_cli_output (vm, "Ring Buffer Schema (version %u):\n", schema_version);
+  vlib_cli_output (vm, "%.*s\n", schema_size, schema_data);
+
+  clib_mem_free (schema_data);
+  return 0;
+}
+
+VLIB_CLI_COMMAND (ring_buffer_show_schema_command, static) = {
+  .path = "test stats ring-buffer-show-schema",
+  .short_help = "test stats ring-buffer-show-schema <name>",
+  .function = ring_buffer_show_schema_command_fn,
+};
+
+// Test command to verify schema functionality
+static clib_error_t *
+ring_buffer_test_schema_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                                   vlib_cli_command_t *cmd)
+{
+  u8 *name = 0;
+  u32 entry_index;
+  u32 schema_size, schema_version;
+
+  if (!unformat (input, "%s", &name))
+    return clib_error_return (
+      0, "parse error: '%U'\nUsage: test stats ring-buffer-test-schema <name>",
+      format_unformat_error, input);
+
+  entry_index = vlib_stats_find_entry_index ("%s", name);
+  if (entry_index == STAT_SEGMENT_INDEX_INVALID)
+    return clib_error_return (0, "Ring buffer '%s' not found", name);
+
+  // Test schema retrieval
+  if (vlib_stats_ring_get_schema (entry_index, 0, NULL, &schema_size,
+                                 &schema_version) != 0)
+    {
+      vlib_cli_output (vm, "❌ No schema found in ring buffer '%s'\n", name);
+      return 0;
+    }
+
+  vlib_cli_output (vm, "✅ Schema found in ring buffer '%s':\n", name);
+  vlib_cli_output (vm, "   - Schema size: %u bytes\n", schema_size);
+  vlib_cli_output (vm, "   - Schema version: %u\n", schema_version);
+
+  // Test schema data retrieval
+  u8 *schema_data = clib_mem_alloc (schema_size);
+  if (!schema_data)
+    {
+      return clib_error_return (0, "Failed to allocate memory for schema");
+    }
+
+  if (vlib_stats_ring_get_schema (entry_index, 0, schema_data, &schema_size,
+                                 &schema_version) != 0)
+    {
+      clib_mem_free (schema_data);
+      vlib_cli_output (vm, "❌ Failed to retrieve schema data\n");
+      return 0;
+    }
+
+  vlib_cli_output (vm, "✅ Schema data retrieved successfully\n");
+
+  // Verify schema is a valid string (null-terminated)
+  if (schema_size > 0 && schema_data[schema_size - 1] == '\0')
+    {
+      vlib_cli_output (vm, "✅ Schema is a valid null-terminated string\n");
+      vlib_cli_output (vm, "Schema content:\n%.*s\n", schema_size - 1,
+                      schema_data);
+    }
+  else
+    {
+      vlib_cli_output (vm, "✅ Schema is a valid string (size: %u bytes)\n",
+                      schema_size);
+      vlib_cli_output (vm, "Schema content:\n%.*s\n", schema_size,
+                      schema_data);
+    }
+
+  clib_mem_free (schema_data);
+  return 0;
+}
+
+VLIB_CLI_COMMAND (ring_buffer_test_schema_command, static) = {
+  .path = "test stats ring-buffer-test-schema",
+  .short_help = "test stats ring-buffer-test-schema <name>",
+  .function = ring_buffer_test_schema_command_fn,
+};
index 9f14d02..ca7c347 100644 (file)
@@ -41,7 +41,7 @@
 #include <vlib/stats/stats.h>
 
 void
-vlib_clear_simple_counters (vlib_simple_counter_main_t * cm)
+vlib_clear_simple_counters (vlib_simple_counter_main_t *cm)
 {
   counter_t *my_counters;
   uword i, j;
@@ -58,7 +58,7 @@ vlib_clear_simple_counters (vlib_simple_counter_main_t * cm)
 }
 
 void
-vlib_clear_combined_counters (vlib_combined_counter_main_t * cm)
+vlib_clear_combined_counters (vlib_combined_counter_main_t *cm)
 {
   vlib_counter_t *my_counters;
   uword i, j;
@@ -76,7 +76,7 @@ vlib_clear_combined_counters (vlib_combined_counter_main_t * cm)
 }
 
 void
-vlib_validate_simple_counter (vlib_simple_counter_main_t * cm, u32 index)
+vlib_validate_simple_counter (vlib_simple_counter_main_t *cm, u32 index)
 {
   vlib_thread_main_t *tm = vlib_get_thread_main ();
   char *name = cm->stat_segment_name ? cm->stat_segment_name : cm->name;
@@ -99,7 +99,7 @@ vlib_validate_simple_counter (vlib_simple_counter_main_t * cm, u32 index)
 }
 
 void
-vlib_free_simple_counter (vlib_simple_counter_main_t * cm)
+vlib_free_simple_counter (vlib_simple_counter_main_t *cm)
 {
   if (cm->stats_entry_index == ~0)
     {
@@ -115,7 +115,7 @@ vlib_free_simple_counter (vlib_simple_counter_main_t * cm)
 }
 
 void
-vlib_validate_combined_counter (vlib_combined_counter_main_t * cm, u32 index)
+vlib_validate_combined_counter (vlib_combined_counter_main_t *cm, u32 index)
 {
   vlib_thread_main_t *tm = vlib_get_thread_main ();
   char *name = cm->stat_segment_name ? cm->stat_segment_name : cm->name;
@@ -138,8 +138,8 @@ vlib_validate_combined_counter (vlib_combined_counter_main_t * cm, u32 index)
 }
 
 int
-  vlib_validate_combined_counter_will_expand
-  (vlib_combined_counter_main_t * cm, u32 index)
+vlib_validate_combined_counter_will_expand (vlib_combined_counter_main_t *cm,
+                                           u32 index)
 {
   vlib_thread_main_t *tm = vlib_get_thread_main ();
   int i;
@@ -170,7 +170,7 @@ int
 }
 
 void
-vlib_free_combined_counter (vlib_combined_counter_main_t * cm)
+vlib_free_combined_counter (vlib_combined_counter_main_t *cm)
 {
   if (cm->stats_entry_index == ~0)
     {
@@ -186,19 +186,52 @@ vlib_free_combined_counter (vlib_combined_counter_main_t * cm)
 }
 
 u32
-vlib_combined_counter_n_counters (const vlib_combined_counter_main_t * cm)
+vlib_combined_counter_n_counters (const vlib_combined_counter_main_t *cm)
 {
   ASSERT (cm->counters);
   return (vec_len (cm->counters[0]));
 }
 
 u32
-vlib_simple_counter_n_counters (const vlib_simple_counter_main_t * cm)
+vlib_simple_counter_n_counters (const vlib_simple_counter_main_t *cm)
 {
   ASSERT (cm->counters);
   return (vec_len (cm->counters[0]));
 }
 
+void
+vlib_validate_log2_histogram (vlib_log2_histogram_main_t *hm, u32 num_bins)
+{
+  vlib_thread_main_t *tm = vlib_get_thread_main ();
+  char *name = hm->stat_segment_name ? hm->stat_segment_name : hm->name;
+
+  if (name == 0)
+    {
+      if (hm->bins == 0)
+       {
+         hm->stats_entry_index = ~0;
+         vec_validate (hm->bins, tm->n_vlib_mains - 1);
+         for (int i = 0; i < tm->n_vlib_mains; i++)
+           {
+             vec_validate_aligned (hm->bins[i], num_bins - 1,
+                                   CLIB_CACHE_LINE_BYTES);
+           }
+         return;
+       }
+    }
+
+  if (hm->bins == 0)
+    hm->stats_entry_index = vlib_stats_add_histogram_log2 ("%s", name);
+
+  vlib_stats_validate (hm->stats_entry_index, tm->n_vlib_mains - 1, num_bins);
+  hm->bins = vlib_stats_get_entry_data_pointer (hm->stats_entry_index);
+  for (int i = 0; i < tm->n_vlib_mains; i++)
+    {
+      counter_t *c = hm->bins[i];
+      c[0] = hm->min_exp;
+    }
+}
+
 /*
  * fd.io coding-style-patch-verification: ON
  *
index a9c2617..5c57fe7 100644 (file)
@@ -41,6 +41,7 @@
 #define included_vlib_counter_h
 
 #include <vlib/counter_types.h>
+#include <vppinfra/clib.h>
 
 /** \file
 
 
 typedef struct
 {
-  counter_t **counters;         /**< Per-thread u64 non-atomic counters */
-  char *name;                  /**< The counter collection's name. */
-  char *stat_segment_name;    /**< Name in stat segment directory */
+  counter_t **counters;           /**< Per-thread u64 non-atomic counters */
+  char *name;             /**< The counter collection's name. */
+  char *stat_segment_name; /**< Name in stat segment directory */
   u32 stats_entry_index;
 } vlib_simple_counter_main_t;
 
 /** The number of counters (not the number of per-thread counters) */
-u32 vlib_simple_counter_n_counters (const vlib_simple_counter_main_t * cm);
+u32 vlib_simple_counter_n_counters (const vlib_simple_counter_main_t *cm);
 
 /** Pre-fetch a per-thread simple counter for the given object index */
 always_inline void
@@ -143,7 +144,7 @@ vlib_set_simple_counter (vlib_simple_counter_main_t *cm,
     @returns - (u64) current counter value
 */
 always_inline counter_t
-vlib_get_simple_counter (vlib_simple_counter_main_t * cm, u32 index)
+vlib_get_simple_counter (vlib_simple_counter_main_t *cm, u32 index)
 {
   counter_t *my_counters;
   counter_t v;
@@ -169,7 +170,7 @@ vlib_get_simple_counter (vlib_simple_counter_main_t * cm, u32 index)
     @param index - (u32) index of the counter to clear
 */
 always_inline void
-vlib_zero_simple_counter (vlib_simple_counter_main_t * cm, u32 index)
+vlib_zero_simple_counter (vlib_simple_counter_main_t *cm, u32 index)
 {
   counter_t *my_counters;
   int i;
@@ -189,7 +190,7 @@ vlib_zero_simple_counter (vlib_simple_counter_main_t * cm, u32 index)
 */
 
 always_inline void
-vlib_counter_add (vlib_counter_t * a, vlib_counter_t * b)
+vlib_counter_add (vlib_counter_t *a, vlib_counter_t *b)
 {
   a->packets += b->packets;
   a->bytes += b->bytes;
@@ -200,7 +201,7 @@ vlib_counter_add (vlib_counter_t * a, vlib_counter_t * b)
     @param b - (vlib_counter_t *) src counter
 */
 always_inline void
-vlib_counter_sub (vlib_counter_t * a, vlib_counter_t * b)
+vlib_counter_sub (vlib_counter_t *a, vlib_counter_t *b)
 {
   ASSERT (a->packets >= b->packets);
   ASSERT (a->bytes >= b->bytes);
@@ -212,7 +213,7 @@ vlib_counter_sub (vlib_counter_t * a, vlib_counter_t * b)
     @param a - (vlib_counter_t *) counter to clear
 */
 always_inline void
-vlib_counter_zero (vlib_counter_t * a)
+vlib_counter_zero (vlib_counter_t *a)
 {
   a->packets = a->bytes = 0;
 }
@@ -220,25 +221,24 @@ vlib_counter_zero (vlib_counter_t * a)
 /** A collection of combined counters */
 typedef struct
 {
-  vlib_counter_t **counters;   /**< Per-thread u64 non-atomic counter pairs */
-  char *name; /**< The counter collection's name. */
-  char *stat_segment_name;     /**< Name in stat segment directory */
+  vlib_counter_t **counters; /**< Per-thread u64 non-atomic counter pairs */
+  char *name;               /**< The counter collection's name. */
+  char *stat_segment_name;   /**< Name in stat segment directory */
   u32 stats_entry_index;
 } vlib_combined_counter_main_t;
 
 /** The number of counters (not the number of per-thread counters) */
-u32 vlib_combined_counter_n_counters (const vlib_combined_counter_main_t *
-                                     cm);
+u32 vlib_combined_counter_n_counters (const vlib_combined_counter_main_t *cm);
 
 /** Clear a collection of simple counters
     @param cm - (vlib_simple_counter_main_t *) collection to clear
 */
-void vlib_clear_simple_counters (vlib_simple_counter_main_t * cm);
+void vlib_clear_simple_counters (vlib_simple_counter_main_t *cm);
 
 /** Clear a collection of combined counters
     @param cm - (vlib_combined_counter_main_t *) collection to clear
 */
-void vlib_clear_combined_counters (vlib_combined_counter_main_t * cm);
+void vlib_clear_combined_counters (vlib_combined_counter_main_t *cm);
 
 /** Increment a combined counter
     @param cm - (vlib_combined_counter_main_t *) comined counter main pointer
@@ -276,7 +276,6 @@ vlib_prefetch_combined_counter (const vlib_combined_counter_main_t *cm,
   clib_prefetch_store (cpu_counters + index);
 }
 
-
 /** Get the value of a combined counter, never called in the speed path
     Scrapes the entire set of per-thread counters. Innacurate unless
     worker threads which might increment the counter are
@@ -288,8 +287,8 @@ vlib_prefetch_combined_counter (const vlib_combined_counter_main_t *cm,
 */
 
 static inline void
-vlib_get_combined_counter (const vlib_combined_counter_main_t * cm,
-                          u32 index, vlib_counter_t * result)
+vlib_get_combined_counter (const vlib_combined_counter_main_t *cm, u32 index,
+                          vlib_counter_t *result)
 {
   vlib_counter_t *my_counters, *counter;
   int i;
@@ -314,7 +313,7 @@ vlib_get_combined_counter (const vlib_combined_counter_main_t * cm,
     @param index - (u32) index of the counter to clear
 */
 always_inline void
-vlib_zero_combined_counter (vlib_combined_counter_main_t * cm, u32 index)
+vlib_zero_combined_counter (vlib_combined_counter_main_t *cm, u32 index)
 {
   vlib_counter_t *my_counters, *counter;
   int i;
@@ -330,13 +329,13 @@ vlib_zero_combined_counter (vlib_combined_counter_main_t * cm, u32 index)
 }
 
 /** validate a simple counter
-    @param cm - (vlib_simple_counter_main_t *) pointer to the counter collection
+    @param cm - (vlib_simple_counter_main_t *) pointer to the counter
+   collection
     @param index - (u32) index of the counter to validate
 */
 
-void vlib_validate_simple_counter (vlib_simple_counter_main_t * cm,
-                                  u32 index);
-void vlib_free_simple_counter (vlib_simple_counter_main_t * cm);
+void vlib_validate_simple_counter (vlib_simple_counter_main_t *cm, u32 index);
+void vlib_free_simple_counter (vlib_simple_counter_main_t *cm);
 
 /** validate a combined counter
     @param cm - (vlib_combined_counter_main_t *) pointer to the counter
@@ -344,12 +343,13 @@ void vlib_free_simple_counter (vlib_simple_counter_main_t * cm);
     @param index - (u32) index of the counter to validate
 */
 
-void vlib_validate_combined_counter (vlib_combined_counter_main_t * cm,
+void vlib_validate_combined_counter (vlib_combined_counter_main_t *cm,
                                     u32 index);
-int vlib_validate_combined_counter_will_expand
-  (vlib_combined_counter_main_t * cm, u32 index);
+int
+vlib_validate_combined_counter_will_expand (vlib_combined_counter_main_t *cm,
+                                           u32 index);
 
-void vlib_free_combined_counter (vlib_combined_counter_main_t * cm);
+void vlib_free_combined_counter (vlib_combined_counter_main_t *cm);
 
 /** Obtain the number of simple or combined counters allocated.
     A macro which reduces to to vec_len(cm->maxi), the answer in either
@@ -359,7 +359,44 @@ void vlib_free_combined_counter (vlib_combined_counter_main_t * cm);
     (vlib_combined_counter_main_t) the counter collection to interrogate
     @returns vec_len(cm->maxi)
 */
-#define vlib_counter_len(cm) vec_len((cm)->maxi)
+#define vlib_counter_len(cm) vec_len ((cm)->maxi)
+
+typedef struct
+{
+  counter_t **bins;       /**< Per-thread u64 non-atomic histogram bins */
+  u32 min_exp;            /**< log2 bin minimum exponent */
+  char *name;             /**< The histogram collection's name. */
+  char *stat_segment_name; /**< Name in stat segment directory */
+  u32 stats_entry_index;
+} vlib_log2_histogram_main_t;
+
+void vlib_validate_log2_histogram (vlib_log2_histogram_main_t *hm,
+                                  u32 num_bins);
+
+static_always_inline u8
+vlib_log2_histogram_bin_index (const vlib_log2_histogram_main_t *hm, u32 value)
+{
+  if (value == 0)
+    return 0;
+  u8 log2_val = min_log2 (value);
+  int min_exp = hm->min_exp;
+  int n_bins = vec_len (hm->bins[0]);
+  int bin = log2_val - min_exp;
+  if (bin < 0)
+    bin = 0;
+  if (bin >= n_bins)
+    bin = n_bins - 1;
+  return (u8) bin;
+}
+
+always_inline void
+vlib_increment_log2_histogram_bin (vlib_log2_histogram_main_t *hm,
+                                  clib_thread_index_t thread_index, u8 bin,
+                                  u64 increment)
+{
+  uint64_t *my_bins = hm->bins[thread_index];
+  my_bins[bin + 1] += increment;
+}
 
 #endif /* included_vlib_counter_h */
 
index 94a852a..8373a31 100644 (file)
@@ -38,6 +38,13 @@ format_stat_dir_entry (u8 *s, va_list *args)
       type_name = "NameVector";
       break;
 
+    case STAT_DIR_TYPE_RING_BUFFER:
+      type_name = "RingBuffer";
+      break;
+
+    case STAT_DIR_TYPE_HISTOGRAM_LOG2:
+      type_name = "Histogram";
+      break;
     case STAT_DIR_TYPE_EMPTY:
       type_name = "empty";
       break;
@@ -53,6 +60,74 @@ format_stat_dir_entry (u8 *s, va_list *args)
 
   return format (s, format_string, ep->name, type_name, 0);
 }
+
+static u8 *
+format_stat_dir_entry_detail (u8 *s, va_list *args)
+{
+  vlib_stats_entry_t *ep = va_arg (*args, vlib_stats_entry_t *);
+
+  if (ep->type == STAT_DIR_TYPE_RING_BUFFER)
+    {
+      vlib_stats_ring_buffer_t *rb = ep->data;
+      if (rb)
+       {
+         s = format (s, "RingBuffer: %s\n", ep->name);
+         s = format (s, "  ring_size: %u\n", rb->config.ring_size);
+         s = format (s, "  entry_size: %u\n", rb->config.entry_size);
+         s = format (s, "  n_threads: %u\n", rb->config.n_threads);
+         for (u32 t = 0; t < rb->config.n_threads; t++)
+           {
+             vlib_stats_ring_metadata_t *md =
+               (vlib_stats_ring_metadata_t
+                  *) ((u8 *) rb + rb->metadata_offset +
+                      t * sizeof (vlib_stats_ring_metadata_t));
+             s = format (s, "  [thread %u] head:%u seq:%llu\n", t, md->head,
+                         (unsigned long long) md->sequence);
+           }
+       }
+      else
+       {
+         s = format (s, "RingBuffer: %s (uninitialized)\n", ep->name);
+       }
+    }
+  else if (ep->type == STAT_DIR_TYPE_HISTOGRAM_LOG2)
+    {
+      u64 **log2_histogram_bins = ep->data;
+      if (!log2_histogram_bins)
+       {
+         s = format (s, "Histogram: %s (uninitialized)\n", ep->name);
+         return s;
+       }
+
+      s = format (s, "Histogram: %s\n", ep->name);
+      for (u32 k = 0; k < vec_len (log2_histogram_bins); k++)
+       {
+         u64 *bins = log2_histogram_bins[k];
+         int n_bins = vec_len (bins);
+         if (n_bins < 2) // Need at least min_exp + one bin
+           continue;
+         u32 min_exp = bins[0];
+         u64 cumulative = 0;
+         u64 sum = 0;
+         s = format (s, "  [thread %u]:\n", k);
+         for (int j = 1; j < n_bins; ++j)
+           {
+             cumulative += bins[j];
+             sum += bins[j] * (1ULL << (min_exp + j - 1)); // midpoint approx
+             s = format (s, "    <= %llu: %llu (cumulative: %llu)\n",
+                         (1ULL << (min_exp + j - 1)), bins[j], cumulative);
+           }
+         s = format (s, "    +Inf: %llu (total count: %llu, sum: %llu)\n",
+                     cumulative, cumulative, sum);
+       }
+    }
+  else
+    {
+      s = format (s, "Entry: %s (type %d)\n", ep->name, ep->type);
+    }
+  return s;
+}
+
 static clib_error_t *
 show_stat_segment_command_fn (vlib_main_t *vm, unformat_input_t *input,
                              vlib_cli_command_t *cmd)
@@ -60,11 +135,41 @@ show_stat_segment_command_fn (vlib_main_t *vm, unformat_input_t *input,
   vlib_stats_segment_t *sm = vlib_stats_get_segment ();
   vlib_stats_entry_t *show_data;
   int i;
-
   int verbose = 0;
+  u8 *counter_name = 0;
 
-  if (unformat (input, "verbose"))
-    verbose = 1;
+  // Parse both 'verbose' and counter name in any order
+  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (input, "verbose"))
+       verbose = 1;
+      else if (unformat (input, "%s", &counter_name))
+       ;
+      else
+       break;
+    }
+
+  if (counter_name)
+    {
+      u32 index = vlib_stats_find_entry_index ("%s", counter_name);
+      if (index != STAT_SEGMENT_INDEX_INVALID)
+       {
+         vlib_stats_entry_t *ep = sm->directory_vector + index;
+         vlib_cli_output (vm, "%U", format_stat_dir_entry_detail, ep);
+         if (verbose)
+           {
+             ASSERT (sm->heap);
+             vlib_cli_output (vm, "%U", format_clib_mem_heap, sm->heap,
+                              0 /* verbose */);
+           }
+       }
+      else
+       {
+         vlib_cli_output (vm, "Counter '%s' not found.", counter_name);
+       }
+      vec_free (counter_name);
+      return 0;
+    }
 
   /* Lock even as reader, as this command doesn't handle epoch changes */
   vlib_stats_segment_lock ();
@@ -116,6 +221,6 @@ VLIB_CLI_COMMAND (show_stat_segment_hash_command, static) = {
 
 VLIB_CLI_COMMAND (show_stat_segment_command, static) = {
   .path = "show statistics segment",
-  .short_help = "show statistics segment [verbose]",
+  .short_help = "show statistics segment [counter-name] [verbose]",
   .function = show_stat_segment_command_fn,
 };
index 8e44ce3..893c8b5 100644 (file)
@@ -14,6 +14,9 @@ typedef enum
   STAT_DIR_TYPE_NAME_VECTOR,
   STAT_DIR_TYPE_EMPTY,
   STAT_DIR_TYPE_SYMLINK,
+  STAT_DIR_TYPE_HISTOGRAM_LOG2,
+  STAT_DIR_TYPE_RING_BUFFER,
+  STAT_DIR_TYPE_GAUGE,
 } stat_directory_type_t;
 
 typedef struct
@@ -34,6 +37,8 @@ typedef struct
 #define VLIB_STATS_MAX_NAME_SZ 128
   char name[VLIB_STATS_MAX_NAME_SZ];
 } vlib_stats_entry_t;
+_Static_assert (sizeof (vlib_stats_entry_t) == 144,
+               "vlib_stats_entry_t size must be 144 bytes");
 
 /*
  * Shared header first in the shared memory segment.
index b7743ec..30718bc 100644 (file)
@@ -149,6 +149,7 @@ vlib_stats_remove_entry (u32 entry_index)
       break;
 
     case STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE:
+    case STAT_DIR_TYPE_HISTOGRAM_LOG2:
       c = e->data;
       e->data = 0;
       oldheap = clib_mem_set_heap (sm->heap);
@@ -168,6 +169,20 @@ vlib_stats_remove_entry (u32 entry_index)
       clib_mem_set_heap (oldheap);
       break;
 
+    case STAT_DIR_TYPE_RING_BUFFER:
+      {
+       vlib_stats_ring_buffer_t *ring_buffer = e->data;
+       if (ring_buffer)
+         {
+           /* Free ring buffer memory */
+           oldheap = clib_mem_set_heap (sm->heap);
+           clib_mem_free (ring_buffer);
+           clib_mem_set_heap (oldheap);
+         }
+       e->data = 0;
+      }
+      break;
+
     case STAT_DIR_TYPE_SCALAR_INDEX:
     case STAT_DIR_TYPE_SYMLINK:
       break;
@@ -241,7 +256,7 @@ vlib_stats_add_gauge (char *fmt, ...)
   va_start (va, fmt);
   name = va_format (0, fmt, &va);
   va_end (va);
-  return vlib_stats_new_entry_internal (STAT_DIR_TYPE_SCALAR_INDEX, name);
+  return vlib_stats_new_entry_internal (STAT_DIR_TYPE_GAUGE, name);
 }
 
 void
@@ -368,6 +383,18 @@ vlib_stats_add_counter_vector (char *fmt, ...)
                                        name);
 }
 
+u32
+vlib_stats_add_histogram_log2 (char *fmt, ...)
+{
+  va_list va;
+  u8 *name;
+
+  va_start (va, fmt);
+  name = va_format (0, fmt, &va);
+  va_end (va);
+  return vlib_stats_new_entry_internal (STAT_DIR_TYPE_HISTOGRAM_LOG2, name);
+}
+
 u32
 vlib_stats_add_counter_pair_vector (char *fmt, ...)
 {
@@ -390,7 +417,8 @@ vlib_stats_validate_will_expand_internal (u32 entry_index, va_list *va)
   int rv = 1;
 
   oldheap = clib_mem_set_heap (sm->heap);
-  if (e->type == STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE)
+  if (e->type == STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE ||
+      e->type == STAT_DIR_TYPE_HISTOGRAM_LOG2)
     {
       u32 idx0 = va_arg (*va, u32);
       u32 idx1 = va_arg (*va, u32);
@@ -459,7 +487,8 @@ vlib_stats_validate (u32 entry_index, ...)
 
   va_start (va, entry_index);
 
-  if (e->type == STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE)
+  if (e->type == STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE ||
+      e->type == STAT_DIR_TYPE_HISTOGRAM_LOG2)
     {
       u32 idx0 = va_arg (va, u32);
       u32 idx1 = va_arg (va, u32);
@@ -572,3 +601,308 @@ vlib_stats_register_collector_fn (vlib_stats_collector_reg_t *reg)
 
   return;
 }
+
+u32
+vlib_stats_add_ring_buffer (vlib_stats_ring_config_t *config,
+                           const void *schema_data, char *fmt, ...)
+{
+  vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+  va_list va;
+  u8 *name;
+  vlib_stats_ring_buffer_t *ring_buffer;
+  u32 entry_index, total_size, metadata_size, data_size, schema_offset;
+
+  va_start (va, fmt);
+  name = va_format (0, fmt, &va);
+  va_end (va);
+
+  entry_index =
+    vlib_stats_new_entry_internal (STAT_DIR_TYPE_RING_BUFFER, name);
+  if (entry_index == CLIB_U32_MAX)
+    return CLIB_U32_MAX;
+  vlib_stats_segment_lock ();
+
+  /* Calculate sizes */
+  metadata_size = config->n_threads * sizeof (vlib_stats_ring_metadata_t);
+  data_size = config->n_threads * config->ring_size * config->entry_size;
+
+  /* Calculate total size with metadata aligned to 64 bytes for atomic
+   * operations */
+  u32 data_offset = sizeof (vlib_stats_ring_buffer_t);
+  u32 metadata_offset = CLIB_CACHE_LINE_ROUND (data_offset + data_size);
+
+  /* Add space for schema if provided */
+  if (config->schema_size > 0)
+    {
+      schema_offset = metadata_offset + metadata_size;
+      total_size = schema_offset + config->schema_size;
+    }
+  else
+    {
+      schema_offset = 0;
+      total_size = metadata_offset + metadata_size;
+    }
+
+  /* Allocate ring buffer structure */
+  void *oldheap = clib_mem_set_heap (sm->heap);
+  ring_buffer = clib_mem_alloc_aligned (total_size, CLIB_CACHE_LINE_BYTES);
+  clib_memset (ring_buffer, 0, total_size);
+  clib_mem_set_heap (oldheap);
+
+  /* Set up offsets */
+  ring_buffer->config = *config;
+  ring_buffer->data_offset = data_offset;
+  ring_buffer->metadata_offset = metadata_offset;
+
+  /* Copy schema data if provided */
+  if (config->schema_size > 0 && schema_data)
+    {
+      void *schema_location = (u8 *) ring_buffer + schema_offset;
+      clib_memcpy_fast (schema_location, schema_data, config->schema_size);
+
+      /* Initialize metadata for all threads with schema info */
+      for (u32 thread_index = 0; thread_index < config->n_threads;
+          thread_index++)
+       {
+         vlib_stats_ring_metadata_t *metadata =
+           (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer +
+                                           metadata_offset +
+                                           thread_index *
+                                             sizeof (
+                                               vlib_stats_ring_metadata_t));
+         metadata->schema_version = config->schema_version;
+         metadata->schema_offset = schema_offset;
+         metadata->schema_size = config->schema_size;
+       }
+    }
+
+  sm->directory_vector[entry_index].data = ring_buffer;
+
+  vlib_stats_segment_unlock ();
+
+  return entry_index;
+}
+
+void *
+vlib_stats_ring_get_entry (u32 entry_index, u32 thread_index)
+{
+  vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+  vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index);
+  vlib_stats_ring_buffer_t *ring_buffer = e->data;
+  vlib_stats_ring_metadata_t *metadata =
+    (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer +
+                                   ring_buffer->metadata_offset +
+                                   thread_index *
+                                     sizeof (vlib_stats_ring_metadata_t));
+
+  if (thread_index >= ring_buffer->config.n_threads)
+    return 0;
+
+  ASSERT (((uintptr_t) metadata % CLIB_CACHE_LINE_BYTES) == 0);
+
+  // /* Check if ring is full */
+  // if (metadata->count >= ring_buffer->config.ring_size)
+  //   return 0;
+
+  /* Calculate entry pointer */
+  u32 offset = thread_index * ring_buffer->config.ring_size *
+              ring_buffer->config.entry_size;
+  offset += metadata->head * ring_buffer->config.entry_size;
+
+  return (u8 *) ring_buffer + ring_buffer->data_offset + offset;
+}
+
+int
+vlib_stats_ring_produce (u32 entry_index, u32 thread_index, void *data)
+{
+  vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+  vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index);
+  vlib_stats_ring_buffer_t *ring_buffer = e->data;
+  vlib_stats_ring_metadata_t *metadata =
+    (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer +
+                                   ring_buffer->metadata_offset +
+                                   thread_index *
+                                     sizeof (vlib_stats_ring_metadata_t));
+
+  if (thread_index >= ring_buffer->config.n_threads)
+    return -1;
+
+  ASSERT (((uintptr_t) metadata % CLIB_CACHE_LINE_BYTES) == 0);
+
+  /* Prefetch next slot for better cache performance */
+  u32 next_head = (metadata->head + 1) % ring_buffer->config.ring_size;
+  /* Note: CLIB_PREFETCH would be used here for optimal cache performance */
+
+  /* Copy data to current head position */
+  void *entry = vlib_stats_ring_get_entry (entry_index, thread_index);
+  if (!entry)
+    return -1;
+
+  clib_memcpy_fast (entry, data, ring_buffer->config.entry_size);
+
+  /* Update metadata - always advance head and increment sequence */
+  metadata->head = next_head;
+  __atomic_store_n (&metadata->sequence, metadata->sequence + 1,
+                   __ATOMIC_RELEASE);
+
+  return 0;
+}
+
+int
+vlib_stats_ring_consume (u32 entry_index, u32 thread_index, void *data,
+                        u64 *sequence_out)
+{
+  vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+  vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index);
+  vlib_stats_ring_buffer_t *ring_buffer = e->data;
+  vlib_stats_ring_metadata_t *metadata =
+    (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer +
+                                   ring_buffer->metadata_offset +
+                                   thread_index *
+                                     sizeof (vlib_stats_ring_metadata_t));
+
+  if (thread_index >= ring_buffer->config.n_threads)
+    return -1;
+
+  ASSERT (((uintptr_t) metadata % CLIB_CACHE_LINE_BYTES) == 0);
+
+  /* Note: This function is for testing/debugging only since the reader
+     is read-only and manages its own state. The writer doesn't track
+     reader position. */
+
+  /* Return sequence number if requested */
+  if (sequence_out)
+    *sequence_out = __atomic_load_n (&metadata->sequence, __ATOMIC_ACQUIRE);
+
+  return 0;
+}
+
+/* Direct serialization APIs - avoid extra copy */
+
+void *
+vlib_stats_ring_reserve_slot (u32 entry_index, u32 thread_index)
+{
+  vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+  vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index);
+  vlib_stats_ring_buffer_t *ring_buffer = e->data;
+  vlib_stats_ring_metadata_t *metadata =
+    (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer +
+                                   ring_buffer->metadata_offset +
+                                   thread_index *
+                                     sizeof (vlib_stats_ring_metadata_t));
+
+  if (thread_index >= ring_buffer->config.n_threads)
+    return 0;
+
+  ASSERT (((uintptr_t) metadata % CLIB_CACHE_LINE_BYTES) == 0);
+
+  /* Calculate entry pointer */
+  u32 offset = thread_index * ring_buffer->config.ring_size *
+              ring_buffer->config.entry_size;
+  offset += metadata->head * ring_buffer->config.entry_size;
+
+  return (u8 *) ring_buffer + ring_buffer->data_offset + offset;
+}
+
+int
+vlib_stats_ring_commit_slot (u32 entry_index, u32 thread_index)
+{
+  vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+  vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index);
+  vlib_stats_ring_buffer_t *ring_buffer = e->data;
+  vlib_stats_ring_metadata_t *metadata =
+    (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer +
+                                   ring_buffer->metadata_offset +
+                                   thread_index *
+                                     sizeof (vlib_stats_ring_metadata_t));
+  ASSERT (((uintptr_t) metadata % CLIB_CACHE_LINE_BYTES) == 0);
+  if (thread_index >= ring_buffer->config.n_threads)
+    return -1;
+
+  /* Update metadata - always advance head and increment sequence */
+  metadata->head = (metadata->head + 1) % ring_buffer->config.ring_size;
+  __atomic_store_n (&metadata->sequence, metadata->sequence + 1,
+                   __ATOMIC_RELEASE);
+
+  return 0;
+}
+
+int
+vlib_stats_ring_abort_slot (u32 entry_index, u32 thread_index)
+{
+  /* For abort, we don't need to do anything since we haven't updated the
+     metadata yet. The slot will be reused on the next reserve call. */
+  return 0;
+}
+
+u32
+vlib_stats_ring_get_slot_size (u32 entry_index)
+{
+  vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+  vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index);
+  vlib_stats_ring_buffer_t *ring_buffer = e->data;
+
+  return ring_buffer->config.entry_size;
+}
+
+u32
+vlib_stats_ring_get_count (u32 entry_index, u32 thread_index)
+{
+  /* Note: Since the writer doesn't track reader state, we can't determine
+     the actual count of unread entries. This function is kept for API
+     compatibility but always returns 0. */
+  return 0;
+}
+
+u32
+vlib_stats_ring_get_free_space (u32 entry_index, u32 thread_index)
+{
+  /* Note: Since the writer doesn't track reader state, we can't determine
+     the actual free space. This function is kept for API compatibility
+     but always returns the full ring size. */
+  vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+  vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index);
+  vlib_stats_ring_buffer_t *ring_buffer = e->data;
+
+  return ring_buffer->config.ring_size;
+}
+
+/* Schema retrieval from ring buffers */
+
+int
+vlib_stats_ring_get_schema (u32 entry_index, u32 thread_index,
+                           void *schema_data, u32 *schema_size,
+                           u32 *schema_version)
+{
+  vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+  vlib_stats_entry_t *e = vlib_stats_get_entry (sm, entry_index);
+  vlib_stats_ring_buffer_t *ring_buffer = e->data;
+  vlib_stats_ring_metadata_t *metadata =
+    (vlib_stats_ring_metadata_t *) ((u8 *) ring_buffer +
+                                   ring_buffer->metadata_offset +
+                                   thread_index *
+                                     sizeof (vlib_stats_ring_metadata_t));
+
+  if (thread_index >= ring_buffer->config.n_threads)
+    return -1;
+
+  ASSERT (((uintptr_t) metadata % CLIB_CACHE_LINE_BYTES) == 0);
+
+  /* Check if schema exists */
+  if (metadata->schema_size == 0)
+    return -1;
+
+  /* Return schema information */
+  if (schema_version)
+    *schema_version = metadata->schema_version;
+  if (schema_size)
+    *schema_size = metadata->schema_size;
+  if (schema_data)
+    {
+      void *schema_location = (u8 *) ring_buffer + ring_buffer->data_offset +
+                             metadata->schema_offset;
+      clib_memcpy_fast (schema_data, schema_location, metadata->schema_size);
+    }
+
+  return 0;
+}
index ab1e282..003b986 100644 (file)
@@ -161,4 +161,60 @@ void vlib_stats_register_collector_fn (vlib_stats_collector_reg_t *r);
 
 format_function_t format_vlib_stats_symlink;
 
+/* log2 histogram */
+u32 vlib_stats_add_histogram_log2 (char *fmt, ...);
+void vlib_stats_set_histogram_log2 (u32 entry_index, u32 thread_index,
+                                   const u64 *bin_counts);
+
+/* Ring buffer configuration */
+typedef struct
+{
+  u32 entry_size;     /* Size of each entry in bytes */
+  u32 ring_size;      /* Number of entries in the ring */
+  u32 n_threads;      /* Number of threads (one ring per thread) */
+  u32 schema_size;    /* Size of schema data in bytes (0 if no schema) */
+  u32 schema_version; /* Schema version number */
+} __clib_packed vlib_stats_ring_config_t;
+
+/* Ring buffer metadata (per thread) */
+typedef struct
+{
+  CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
+  u32 head;          /* Producer position */
+  u32 schema_version; /* Schema version for this ring */
+  u64 sequence;              /* Sequence number for overwrite detection */
+  u32 schema_offset;  /* Offset to schema data within ring buffer */
+  u32 schema_size;    /* Size of schema data in bytes */
+  u8 _pad1[CLIB_CACHE_LINE_BYTES - 24]; /* Pad to cache line size */
+} vlib_stats_ring_metadata_t;
+
+/* Ring buffer entry in stats directory */
+typedef struct
+{
+  vlib_stats_ring_config_t config;
+  u32 metadata_offset; /* Offset to metadata array (64-byte aligned) */
+  u32 data_offset;     /* Offset to ring buffer data */
+} __clib_packed vlib_stats_ring_buffer_t;
+
+/* Ring buffer */
+u32 vlib_stats_add_ring_buffer (vlib_stats_ring_config_t *config,
+                               const void *schema_data, char *fmt, ...);
+void *vlib_stats_ring_get_entry (u32 entry_index, u32 thread_index);
+int vlib_stats_ring_produce (u32 entry_index, u32 thread_index, void *data);
+int vlib_stats_ring_consume (u32 entry_index, u32 thread_index, void *data,
+                            u64 *sequence_out);
+u32 vlib_stats_ring_get_count (u32 entry_index, u32 thread_index);
+u32 vlib_stats_ring_get_free_space (u32 entry_index, u32 thread_index);
+
+/* Direct serialization APIs - avoid extra copy */
+void *vlib_stats_ring_reserve_slot (u32 entry_index, u32 thread_index);
+int vlib_stats_ring_commit_slot (u32 entry_index, u32 thread_index);
+int vlib_stats_ring_abort_slot (u32 entry_index, u32 thread_index);
+u32 vlib_stats_ring_get_slot_size (u32 entry_index);
+
+/* Schema retrieval from ring buffers */
+int vlib_stats_ring_get_schema (u32 entry_index, u32 thread_index,
+                               void *schema_data, u32 *schema_size,
+                               u32 *schema_version);
+
 #endif
index 359813f..5614390 100644 (file)
@@ -241,6 +241,7 @@ copy_data (vlib_stats_entry_t *ep, u32 index2, char *name,
   switch (ep->type)
     {
     case STAT_DIR_TYPE_SCALAR_INDEX:
+    case STAT_DIR_TYPE_GAUGE:
       result.scalar_value = ep->value;
       break;
 
@@ -294,6 +295,19 @@ copy_data (vlib_stats_entry_t *ep, u32 index2, char *name,
       }
 
     case STAT_DIR_TYPE_EMPTY:
+    case STAT_DIR_TYPE_RING_BUFFER:
+      break;
+
+    case STAT_DIR_TYPE_HISTOGRAM_LOG2:
+      {
+       uint64_t **bins = stat_segment_adjust (sm, ep->data);
+       result.log2_histogram_bins = stat_vec_dup (sm, bins);
+       for (i = 0; i < vec_len (bins); i++)
+         {
+           uint64_t *thread_bins = stat_segment_adjust (sm, bins[i]);
+           result.log2_histogram_bins[i] = stat_vec_dup (sm, thread_bins);
+         }
+      }
       break;
 
     default:
@@ -326,7 +340,14 @@ stat_segment_data_free (stat_segment_data_t * res)
          vec_free (res[i].name_vector);
          break;
        case STAT_DIR_TYPE_SCALAR_INDEX:
+       case STAT_DIR_TYPE_GAUGE:
        case STAT_DIR_TYPE_EMPTY:
+       case STAT_DIR_TYPE_RING_BUFFER:
+         break;
+       case STAT_DIR_TYPE_HISTOGRAM_LOG2:
+         for (j = 0; j < vec_len (res[i].log2_histogram_bins); j++)
+           vec_free (res[i].log2_histogram_bins[j]);
+         vec_free (res[i].log2_histogram_bins);
          break;
        default:
          assert (0);
index d9671c6..722ba6e 100644 (file)
@@ -44,6 +44,7 @@ typedef struct
     counter_t **simple_counter_vec;
     vlib_counter_t **combined_counter_vec;
     uint8_t **name_vector;
+    uint64_t **log2_histogram_bins; // [thread][bin] for histogram
   };
 } stat_segment_data_t;
 
index aa9ff85..2929bba 100755 (executable)
@@ -49,6 +49,8 @@ from struct import Struct
 import time
 import unittest
 import re
+import asyncio
+import sys
 
 
 def recv_fd(sock):
@@ -289,6 +291,47 @@ class VPPStats:
             result[cnt] = self.__getitem__(cnt, blocking)
         return result
 
+    def get_ring_buffer(self, name, blocking=True):
+        """Get a ring buffer by name"""
+        if not self.connected:
+            self.connect()
+
+        while True:
+            try:
+                if self.last_epoch != self.epoch:
+                    self.refresh(blocking)
+                with self.lock:
+                    entry = self.directory[name]
+                    if entry.type == 8:  # STAT_DIR_TYPE_RING_BUFFER
+                        return entry.get_counter(self)
+                    else:
+                        raise ValueError(f"'{name}' is not a ring buffer")
+            except IOError:
+                if not blocking:
+                    raise
+
+    def poll_ring_buffer(self, name, thread_index=0, timeout=None, callback=None):
+        """Convenience method to poll a ring buffer by name"""
+        ring_buffer = self.get_ring_buffer(name)
+        return ring_buffer.poll_for_data(thread_index, timeout, callback)
+
+    async def poll_ring_buffer_async(
+        self, name, thread_index=0, timeout=None, callback=None
+    ):
+        """Async convenience method to poll a ring buffer by name"""
+        ring_buffer = self.get_ring_buffer(name)
+        return await ring_buffer.poll_for_data_async(thread_index, timeout, callback)
+
+    def get_ring_buffer_schema(self, name, thread_index=0):
+        """Get schema from a ring buffer by name"""
+        ring_buffer = self.get_ring_buffer(name)
+        return ring_buffer.get_schema(thread_index)
+
+    def get_ring_buffer_schema_string(self, name, thread_index=0):
+        """Get schema as string from a ring buffer by name"""
+        ring_buffer = self.get_ring_buffer(name)
+        return ring_buffer.get_schema_string(thread_index)
+
 
 class StatsLock:
     """Stat segment optimistic locking"""
@@ -393,6 +436,28 @@ class SimpleList(list):
         return sum(self)
 
 
+# Add a helper class for histogram log2
+class StatsHistogramLog2:
+    def __init__(self, bins, min_exp):
+        self.bins = bins  # list of lists: [thread][bin]
+        self.min_exp = min_exp
+
+    def sum(self):
+        return sum(sum(thread_bins) for thread_bins in self.bins)
+
+    def thread_count(self):
+        return len(self.bins)
+
+    def bin_count(self):
+        return max((len(b) for b in self.bins), default=0)
+
+    def __getitem__(self, idx):
+        return self.bins[idx]
+
+    def __repr__(self):
+        return f"StatsHistogramLog2(min_exp={self.min_exp}, bins={self.bins})"
+
+
 class StatsEntry:
     """An individual stats entry"""
 
@@ -412,6 +477,12 @@ class StatsEntry:
             self.function = self.name
         elif stattype == 6:
             self.function = self.symlink
+        elif stattype == 7:  # STAT_DIR_TYPE_HISTOGRAM_LOG2
+            self.function = self.histogram_log2
+        elif stattype == 8:  # STAT_DIR_TYPE_RING_BUFFER
+            self.function = self.ring_buffer
+        elif stattype == 9:  # STAT_DIR_TYPE_GAUGE
+            self.function = self.scalar
         else:
             self.function = self.illegal
 
@@ -457,12 +528,480 @@ class StatsEntry:
         name = stats.directory_by_idx[index1]
         return stats[name][:, index2]
 
+    def ring_buffer(self, stats):
+        """Ring buffer counter"""
+        return StatsRingBuffer(stats, self.value)
+
+    def histogram_log2(self, stats):
+        """Histogram log2 counter (STAT_DIR_TYPE_HISTOGRAM_LOG2)"""
+        # The value is a pointer to a vector of pointers (per-thread), each pointing to a vector of uint64_t bins
+        threads_ptr = self.value
+        thread_vec = StatsVector(stats, threads_ptr, "P")
+        all_bins = []
+        min_exp = 0
+        for thread_ptr_tuple in thread_vec:
+            bins_ptr = thread_ptr_tuple[0]
+            if bins_ptr:
+                bins_vec = StatsVector(stats, bins_ptr, "Q")
+                bins = [v[0] for v in bins_vec]
+                if bins:
+                    min_exp = bins[0]
+                    all_bins.append(bins[1:])
+                else:
+                    all_bins.append([])
+            else:
+                all_bins.append([])
+        return StatsHistogramLog2(all_bins, min_exp)
+
     def get_counter(self, stats):
         """Return a list of counters"""
         if stats:
             return self.function(stats)
 
 
+class StatsRingBuffer:
+    """Ring buffer for high-performance data streaming"""
+
+    def __init__(self, stats, ptr):
+        self.stats = stats
+        self.ring_buffer_ptr = ptr
+        self.config = self._get_config()
+        self.metadata_ptr = self._get_metadata_ptr()
+        self.data_ptr = self._get_data_ptr()
+        # Track local tail and last sequence for each thread
+        # Note: Since writer doesn't track reader state, we initialize local_tails to 0
+        self.local_tails = [0] * self.config["n_threads"]
+        self.last_sequences = [None] * self.config["n_threads"]
+
+    def _get_config(self):
+        """Get ring buffer configuration from shared memory"""
+        config_offset = self.ring_buffer_ptr - self.stats.base
+        # Read the full config structure: entry_size, ring_size, n_threads, schema_size, schema_version
+        config_data = self.stats.statseg[config_offset : config_offset + 20]
+        entry_size, ring_size, n_threads, schema_size, schema_version = Struct(
+            "=IIIII"
+        ).unpack(config_data)
+        return {
+            "entry_size": entry_size,
+            "ring_size": ring_size,
+            "n_threads": n_threads,
+            "schema_size": schema_size,
+            "schema_version": schema_version,
+        }
+
+    def _get_metadata_ptr(self):
+        """Get pointer to metadata array using offset"""
+        config_offset = self.ring_buffer_ptr - self.stats.base
+        # Read metadata_offset from the structure (at offset 20)
+        metadata_offset_data = self.stats.statseg[
+            config_offset + 20 : config_offset + 24
+        ]
+        metadata_offset = Struct("=I").unpack(metadata_offset_data)[0]
+        return config_offset + metadata_offset
+
+    def _get_data_ptr(self):
+        """Get pointer to ring buffer data using offset"""
+        config_offset = self.ring_buffer_ptr - self.stats.base
+        # Read data_offset from the structure (at offset 24)
+        data_offset_data = self.stats.statseg[config_offset + 24 : config_offset + 28]
+        data_offset = Struct("=I").unpack(data_offset_data)[0]
+        return config_offset + data_offset
+
+    def _get_thread_metadata(self, thread_index):
+        """Get metadata for a specific thread, including sequence number and schema info"""
+        if thread_index >= self.config["n_threads"]:
+            raise IndexError(f"Thread index {thread_index} out of range")
+
+        # Metadata struct: head, schema_version, sequence, schema_offset, schema_size, padding
+        metadata_offset = self.metadata_ptr + (
+            thread_index * 64  # CLIB_CACHE_LINE_BYTES, typically 64
+        )
+        metadatafmt_struct = Struct(
+            "=IIQII"
+        )  # head, schema_version, sequence, schema_offset, schema_size
+        metadata_data = self.stats.statseg[
+            metadata_offset : metadata_offset + metadatafmt_struct.size
+        ]
+        head, schema_version, sequence, schema_offset, schema_size = (
+            metadatafmt_struct.unpack(metadata_data)
+        )
+        return {
+            "head": head,
+            "schema_version": schema_version,
+            "sequence": sequence,
+            "schema_offset": schema_offset,
+            "schema_size": schema_size,
+        }
+
+    def get_schema(self, thread_index=0):
+        """Get schema data from ring buffer for a specific thread"""
+        metadata = self._get_thread_metadata(thread_index)
+
+        # Check if schema exists
+        if metadata["schema_size"] == 0:
+            return None, 0, 0
+
+        # Calculate schema location
+        config_offset = self.ring_buffer_ptr - self.stats.base
+        schema_location = config_offset + metadata["schema_offset"]
+
+        # Read schema data
+        schema_data = self.stats.statseg[
+            schema_location : schema_location + metadata["schema_size"]
+        ]
+
+        return schema_data, metadata["schema_size"], metadata["schema_version"]
+
+    def get_schema_string(self, thread_index=0):
+        """Get schema as a string (for text-based schemas like CDDL)"""
+        schema_data, schema_size, schema_version = self.get_schema(thread_index)
+
+        if schema_data is None:
+            return None, 0, 0
+
+        try:
+            # Try to decode as UTF-8 string
+            schema_string = schema_data.decode("utf-8")
+            return schema_string, schema_size, schema_version
+        except UnicodeDecodeError:
+            # If it's not a valid UTF-8 string, return as bytes
+            return schema_data, schema_size, schema_version
+
+    def get_count(self, thread_index=0):
+        """Get current count of entries in ring buffer for a thread"""
+        # Note: Since the writer doesn't track reader state, we can't determine
+        # the actual count. This method is kept for API compatibility.
+        return 0
+
+    def is_empty(self, thread_index=0):
+        """Check if ring buffer is empty for a thread"""
+        # Note: Since the writer doesn't track reader state, we can't determine
+        # if the ring is empty. This method is kept for API compatibility.
+        return True
+
+    def is_full(self, thread_index=0):
+        """Check if ring buffer is full for a thread"""
+        # Note: Since the writer doesn't track reader state, we can't determine
+        # if the ring is full. This method is kept for API compatibility.
+        return False
+
+    def consume_data(self, thread_index=0, max_entries=None):
+        """Consume data from ring buffer for a thread (read-only), with sequence check"""
+        # Read metadata atomically to get consistent snapshot
+        metadata = self._get_thread_metadata(thread_index)
+        local_tail = self.local_tails[thread_index]
+        last_sequence = self.last_sequences[thread_index]
+        sequence = metadata["sequence"]
+        ring_size = self.config["ring_size"]
+
+        # Overwrite detection: did the producer lap us?
+        if last_sequence is not None:
+            delta = (sequence - last_sequence) % (1 << 64)
+            if delta > ring_size:
+                print(
+                    f"[WARN] Ring buffer overwrite detected on thread {thread_index}: "
+                    f"sequence jumped from {last_sequence} to {sequence} (delta={delta}, ring_size={ring_size})"
+                )
+                # Resync local_tail to a reasonable position
+                local_tail = (metadata["head"] - ring_size) % ring_size
+
+        # If the sequence hasn't changed, nothing new to read
+        if last_sequence == sequence:
+            return []
+
+        # Calculate how many new entries are available
+        if last_sequence is None:
+            # First time reading - calculate how many entries are available
+            available = min(sequence, ring_size)
+            # Calculate starting position: (head - available) % ring_size
+            # This gives us the oldest entry that's still available
+            local_tail = (metadata["head"] - available) % ring_size
+        else:
+            available = (sequence - last_sequence) % (1 << 64)
+            if available > ring_size:
+                available = ring_size  # Cap at ring size
+
+        if available == 0:
+            self.last_sequences[thread_index] = sequence
+            return []
+
+        if max_entries is None:
+            max_entries = available
+        else:
+            max_entries = min(max_entries, available)
+
+        consumed_data = []
+        entry_size = self.config["entry_size"]
+
+        # Calculate data offset for this thread
+        thread_data_offset = self.data_ptr + (
+            thread_index * self.config["ring_size"] * entry_size
+        )
+
+        # Read data with retry logic for potential contention
+        max_retries = 3
+        for retry in range(max_retries):
+            try:
+                for i in range(max_entries):
+                    entry_offset = thread_data_offset + (local_tail * entry_size)
+                    entry_data = self.stats.statseg[
+                        entry_offset : entry_offset + entry_size
+                    ]
+                    consumed_data.append(entry_data)
+                    local_tail = (local_tail + 1) % self.config["ring_size"]
+
+                # Verify sequence number hasn't changed during our read
+                current_metadata = self._get_thread_metadata(thread_index)
+                if current_metadata["sequence"] == sequence:
+                    # Success - update local state
+                    self.local_tails[thread_index] = local_tail
+                    # Update last_sequence based on how many entries we read
+                    if last_sequence is None:
+                        # First time reading - update to the sequence number of the last entry we read
+                        self.last_sequences[thread_index] = (
+                            sequence - available + len(consumed_data)
+                        )
+                    else:
+                        # Subsequent reading - update by the number of entries we read
+                        self.last_sequences[thread_index] = last_sequence + len(
+                            consumed_data
+                        )
+                    return consumed_data
+                else:
+                    # Sequence changed during read, retry
+                    if retry < max_retries - 1:
+                        # Re-read metadata and recalculate
+                        metadata = current_metadata
+                        sequence = metadata["sequence"]
+                        if last_sequence is None:
+                            available = min(sequence, ring_size)
+                            # Calculate starting position: (head - available) % ring_size
+                            local_tail = (metadata["head"] - available) % ring_size
+                        else:
+                            available = (sequence - last_sequence) % (1 << 64)
+                            if available > ring_size:
+                                available = ring_size
+                        if available == 0:
+                            self.last_sequences[thread_index] = sequence
+                            return []
+                        max_entries = min(max_entries, available)
+                        consumed_data = []
+                        local_tail = self.local_tails[thread_index]
+                        continue
+                    else:
+                        # Max retries reached, return what we have
+                        print(f"[WARN] Max retries reached, returning partial data")
+                        self.local_tails[thread_index] = local_tail
+                        self.last_sequences[thread_index] = sequence
+                        return consumed_data
+
+            except Exception as e:
+                print(f"[ERROR] Exception during data read: {e}")
+                if retry < max_retries - 1:
+                    continue
+                else:
+                    return consumed_data
+
+        return consumed_data
+
+    def consume_data_batch(self, thread_index=0, max_entries=None, prefetch=True):
+        """Consume data from ring buffer in batches for better performance"""
+        # Read metadata atomically to get consistent snapshot
+        metadata = self._get_thread_metadata(thread_index)
+        local_tail = self.local_tails[thread_index]
+        last_sequence = self.last_sequences[thread_index]
+        sequence = metadata["sequence"]
+        ring_size = self.config["ring_size"]
+
+        # Overwrite detection: did the producer lap us?
+        if last_sequence is not None:
+            delta = (sequence - last_sequence) % (1 << 64)
+            if delta > ring_size:
+                print(
+                    f"[WARN] Ring buffer overwrite detected on thread {thread_index}: "
+                    f"sequence jumped from {last_sequence} to {sequence} (delta={delta}, ring_size={ring_size})"
+                )
+                # Resync local_tail to a reasonable position
+                local_tail = (metadata["head"] - ring_size) % ring_size
+
+        # If the sequence hasn't changed, nothing new to read
+        if last_sequence == sequence:
+            return []
+
+        # Calculate how many new entries are available
+        if last_sequence is None:
+            # First time reading - calculate how many entries are available
+            available = min(sequence, ring_size)
+            # Calculate starting position: (head - available) % ring_size
+            # This gives us the oldest entry that's still available
+            local_tail = (metadata["head"] - available) % ring_size
+        else:
+            available = (sequence - last_sequence) % (1 << 64)
+            if available > ring_size:
+                available = ring_size  # Cap at ring size
+
+        if available == 0:
+            self.last_sequences[thread_index] = sequence
+            return []
+
+        if max_entries is None:
+            max_entries = available
+        else:
+            max_entries = min(max_entries, available)
+
+        consumed_data = []
+        entry_size = self.config["entry_size"]
+
+        # Calculate data offset for this thread
+        thread_data_offset = self.data_ptr + (
+            thread_index * self.config["ring_size"] * entry_size
+        )
+
+        # Prefetch next few entries for better cache performance
+        if prefetch and max_entries > 1:
+            next_tail = (local_tail + 1) % ring_size
+            next_offset = thread_data_offset + (next_tail * entry_size)
+            # Note: Python doesn't have direct prefetch, but we can optimize memory access patterns
+            # by reading data in larger chunks when possible
+
+        # Read data with retry logic for potential contention
+        max_retries = 3
+        for retry in range(max_retries):
+            try:
+                # Read data in larger chunks when possible for better performance
+                chunk_size = min(max_entries, 16)  # Read up to 16 entries at once
+                for chunk_start in range(0, max_entries, chunk_size):
+                    chunk_end = min(chunk_start + chunk_size, max_entries)
+
+                    for i in range(chunk_start, chunk_end):
+                        entry_offset = thread_data_offset + (local_tail * entry_size)
+                        entry_data = self.stats.statseg[
+                            entry_offset : entry_offset + entry_size
+                        ]
+                        consumed_data.append(entry_data)
+                        local_tail = (local_tail + 1) % self.config["ring_size"]
+
+                # Verify sequence number hasn't changed during our read
+                current_metadata = self._get_thread_metadata(thread_index)
+                if current_metadata["sequence"] == sequence:
+                    # Success - update local state
+                    self.local_tails[thread_index] = local_tail
+                    # Update last_sequence based on how many entries we read
+                    if last_sequence is None:
+                        # First time reading - update to the sequence number of the last entry we read
+                        self.last_sequences[thread_index] = (
+                            sequence - available + len(consumed_data)
+                        )
+                    else:
+                        # Subsequent reading - update by the number of entries we read
+                        self.last_sequences[thread_index] = last_sequence + len(
+                            consumed_data
+                        )
+                    return consumed_data
+                else:
+                    # Sequence changed during read, retry
+                    if retry < max_retries - 1:
+                        # Re-read metadata and recalculate
+                        metadata = current_metadata
+                        sequence = metadata["sequence"]
+                        if last_sequence is None:
+                            available = min(sequence, ring_size)
+                            local_tail = (metadata["head"] - available) % ring_size
+                        else:
+                            available = (sequence - last_sequence) % (1 << 64)
+                            if available > ring_size:
+                                available = ring_size
+                        if available == 0:
+                            self.last_sequences[thread_index] = sequence
+                            return []
+                        max_entries = min(max_entries, available)
+                        consumed_data = []
+                        local_tail = self.local_tails[thread_index]
+                        continue
+                    else:
+                        # Max retries reached, return what we have
+                        print(f"[WARN] Max retries reached, returning partial data")
+                        self.local_tails[thread_index] = local_tail
+                        self.last_sequences[thread_index] = sequence
+                        return consumed_data
+
+            except Exception as e:
+                print(f"[ERROR] Exception during data read: {e}")
+                if retry < max_retries - 1:
+                    continue
+                else:
+                    return consumed_data
+
+        return consumed_data
+
+    def poll_for_data(self, thread_index=0, timeout=None, callback=None):
+        """Poll for new data in ring buffer"""
+        start_time = time.time()
+
+        while True:
+            data = self.consume_data(thread_index)
+            if data:
+                if callback:
+                    for entry_data in data:
+                        try:
+                            callback(entry_data)
+                        except Exception as e:
+                            print(f"Callback error: {e}")
+                else:
+                    return data
+
+            if timeout and (time.time() - start_time) > timeout:
+                return []
+            time.sleep(0.000001)  # 1μs polling interval
+
+    async def poll_for_data_async(self, thread_index=0, timeout=None, callback=None):
+        """Async version of poll_for_data"""
+
+        start_time = asyncio.get_event_loop().time()
+
+        while True:
+            metadata = self._get_thread_metadata(thread_index)
+            local_tail = self.local_tails[thread_index]
+
+            # Check if new data is available using local tail
+            if metadata["head"] != local_tail:
+                # Calculate how many new entries
+                if metadata["head"] > local_tail:
+                    new_entries = metadata["head"] - local_tail
+                else:
+                    new_entries = (
+                        self.config["ring_size"] - local_tail + metadata["head"]
+                    )
+
+                data = self.consume_data(thread_index, new_entries)
+
+                if callback:
+                    for entry_data in data:
+                        await callback(entry_data)
+                else:
+                    return data
+
+            if timeout and (asyncio.get_event_loop().time() - start_time) > timeout:
+                return []
+
+            # Yield control to asyncio
+            await asyncio.sleep(0.000001)  # 1μs polling interval
+
+    def get_config(self):
+        """Get ring buffer configuration"""
+        return self.config.copy()
+
+    def __repr__(self):
+        schema_info = ""
+        if self.config["schema_size"] > 0:
+            schema_info = f", schema_size={self.config['schema_size']}, schema_version={self.config['schema_version']}"
+
+        return (
+            f"StatsRingBuffer(entry_size={self.config['entry_size']}, "
+            f"ring_size={self.config['ring_size']}, n_threads={self.config['n_threads']}{schema_info})"
+        )
+
+
 class TestStats(unittest.TestCase):
     """Basic statseg tests"""
 
@@ -557,8 +1096,556 @@ class TestStats(unittest.TestCase):
         print("/sys/nodes/unix-epoll-input", self.stat["/nodes/unix-epoll-input/calls"])
 
 
+class TestRingBuffer(unittest.TestCase):
+    """Ring buffer specific tests"""
+
+    def setUp(self):
+        """Connect to statseg and find test ring buffer"""
+        self.stat = VPPStats()
+        self.stat.connect()
+
+        # Look for test ring buffer created by VPP CLI command
+        try:
+            self.ring_buffer = self.stat.get_ring_buffer("/test/ring-buffer")
+            self.ring_buffer_available = True
+        except (KeyError, ValueError):
+            print(
+                "Test ring buffer not found. Run 'test stats ring-buffer-gen test_ring 100 1000 16' in VPP first."
+            )
+            self.ring_buffer_available = False
+
+    def tearDown(self):
+        """Disconnect from statseg"""
+        self.stat.disconnect()
+
+    def test_ring_buffer_config(self):
+        """Test ring buffer configuration access"""
+        if not self.ring_buffer_available:
+            self.skipTest("Ring buffer not available")
+
+        config = self.ring_buffer.get_config()
+        self.assertIsInstance(config, dict)
+        self.assertIn("entry_size", config)
+        self.assertIn("ring_size", config)
+        self.assertIn("n_threads", config)
+
+        print(f"Ring buffer config: {config}")
+
+        # Verify reasonable values
+        self.assertGreater(config["entry_size"], 0)
+        self.assertGreater(config["ring_size"], 0)
+        self.assertGreater(config["n_threads"], 0)
+
+    def test_ring_buffer_metadata_access(self):
+        """Test ring buffer metadata access"""
+        if not self.ring_buffer_available:
+            self.skipTest("Ring buffer not available")
+
+        # Test metadata access for thread 0
+        metadata = self.ring_buffer._get_thread_metadata(0)
+        self.assertIsInstance(metadata, dict)
+        self.assertIn("head", metadata)
+        self.assertIn("sequence", metadata)
+
+        print(f"Thread 0 metadata: {metadata}")
+
+        # Verify metadata values are reasonable
+        self.assertIsInstance(metadata["head"], int)
+        self.assertIsInstance(metadata["sequence"], int)
+        self.assertGreaterEqual(metadata["head"], 0)
+        self.assertGreaterEqual(metadata["sequence"], 0)
+
+    def test_ring_buffer_consume_empty(self):
+        """Test consuming from empty ring buffer"""
+        if not self.ring_buffer_available:
+            self.skipTest("Ring buffer not available")
+
+        # Consume from empty buffer
+        data = self.ring_buffer.consume_data(thread_index=0)
+        self.assertEqual(data, [])
+
+        # Test with max_entries
+        data = self.ring_buffer.consume_data(thread_index=0, max_entries=10)
+        self.assertEqual(data, [])
+
+    def test_ring_buffer_consume_batch_empty(self):
+        """Test batch consuming from empty ring buffer"""
+        if not self.ring_buffer_available:
+            self.skipTest("Ring buffer not available")
+
+        # Consume from empty buffer
+        data = self.ring_buffer.consume_data_batch(thread_index=0)
+        self.assertEqual(data, [])
+
+        # Test with max_entries
+        data = self.ring_buffer.consume_data_batch(thread_index=0, max_entries=10)
+        self.assertEqual(data, [])
+
+    def test_ring_buffer_poll_empty(self):
+        """Test polling empty ring buffer"""
+        if not self.ring_buffer_available:
+            self.skipTest("Ring buffer not available")
+
+        # Poll with short timeout
+        data = self.ring_buffer.poll_for_data(thread_index=0, timeout=0.1)
+        self.assertEqual(data, [])
+
+    def test_ring_buffer_poll_with_callback(self):
+        """Test polling with callback function"""
+        if not self.ring_buffer_available:
+            self.skipTest("Ring buffer not available")
+
+        collected_data = []
+
+        def callback(data):
+            collected_data.append(data)
+
+        # Poll with callback and short timeout
+        result = self.ring_buffer.poll_for_data(
+            thread_index=0, timeout=0.1, callback=callback
+        )
+        self.assertEqual(result, [])
+        self.assertEqual(collected_data, [])
+
+    def test_ring_buffer_invalid_thread(self):
+        """Test ring buffer with invalid thread index"""
+        if not self.ring_buffer_available:
+            self.skipTest("Ring buffer not available")
+
+        config = self.ring_buffer.get_config()
+        invalid_thread = config["n_threads"] + 1
+
+        # Test metadata access with invalid thread
+        with self.assertRaises(IndexError):
+            self.ring_buffer._get_thread_metadata(invalid_thread)
+
+        # Test consume with invalid thread
+        data = self.ring_buffer.consume_data(thread_index=invalid_thread)
+        self.assertEqual(data, [])
+
+    def test_ring_buffer_api_compatibility(self):
+        """Test API compatibility methods"""
+        if not self.ring_buffer_available:
+            self.skipTest("Ring buffer not available")
+
+        # Test compatibility methods (these return simplified values)
+        count = self.ring_buffer.get_count(thread_index=0)
+        self.assertEqual(count, 0)
+
+        is_empty = self.ring_buffer.is_empty(thread_index=0)
+        self.assertTrue(is_empty)
+
+        is_full = self.ring_buffer.is_full(thread_index=0)
+        self.assertFalse(is_full)
+
+    def test_ring_buffer_repr(self):
+        """Test ring buffer string representation"""
+        if not self.ring_buffer_available:
+            self.skipTest("Ring buffer not available")
+
+        repr_str = repr(self.ring_buffer)
+        self.assertIsInstance(repr_str, str)
+        self.assertIn("StatsRingBuffer", repr_str)
+        self.assertIn("entry_size", repr_str)
+        self.assertIn("ring_size", repr_str)
+        self.assertIn("n_threads", repr_str)
+
+        print(f"Ring buffer repr: {repr_str}")
+
+    def test_ring_buffer_multiple_threads(self):
+        """Test ring buffer access across multiple threads"""
+        if not self.ring_buffer_available:
+            self.skipTest("Ring buffer not available")
+
+        config = self.ring_buffer.get_config()
+
+        # Test all available threads
+        for thread_index in range(config["n_threads"]):
+            metadata = self.ring_buffer._get_thread_metadata(thread_index)
+            self.assertIsInstance(metadata, dict)
+            self.assertIn("head", metadata)
+            self.assertIn("sequence", metadata)
+
+            data = self.ring_buffer.consume_data(thread_index=thread_index)
+            self.assertIsInstance(data, list)
+
+    def test_ring_buffer_sequence_consistency(self):
+        """Test sequence number consistency across reads"""
+        if not self.ring_buffer_available:
+            self.skipTest("Ring buffer not available")
+
+        # Read metadata multiple times to check consistency
+        metadata1 = self.ring_buffer._get_thread_metadata(0)
+        metadata2 = self.ring_buffer._get_thread_metadata(0)
+
+        # Sequence numbers should be consistent (same or increasing)
+        self.assertGreaterEqual(metadata2["sequence"], metadata1["sequence"])
+
+    def test_ring_buffer_error_handling(self):
+        """Test ring buffer error handling"""
+        if not self.ring_buffer_available:
+            self.skipTest("Ring buffer not available")
+
+        # Test with invalid parameters
+        data = self.ring_buffer.consume_data(thread_index=0, max_entries=0)
+        self.assertEqual(data, [])
+
+        data = self.ring_buffer.consume_data(thread_index=0, max_entries=-1)
+        self.assertEqual(data, [])
+
+    def test_ring_buffer_batch_vs_individual(self):
+        """Test that batch and individual consume return same results"""
+        if not self.ring_buffer_available:
+            self.skipTest("Ring buffer not available")
+
+        # Reset local state to ensure fair comparison
+        self.ring_buffer.local_tails[0] = 0
+        self.ring_buffer.last_sequences[0] = None
+
+        # Consume with individual method
+        individual_data = self.ring_buffer.consume_data(thread_index=0, max_entries=5)
+
+        # Reset local state
+        self.ring_buffer.local_tails[0] = 0
+        self.ring_buffer.last_sequences[0] = None
+
+        # Consume with batch method
+        batch_data = self.ring_buffer.consume_data_batch(thread_index=0, max_entries=5)
+
+        # Results should be the same
+        self.assertEqual(individual_data, batch_data)
+
+    def test_ring_buffer_prefetch_parameter(self):
+        """Test prefetch parameter in batch consume"""
+        if not self.ring_buffer_available:
+            self.skipTest("Ring buffer not available")
+
+        # Test with prefetch enabled
+        data1 = self.ring_buffer.consume_data_batch(thread_index=0, prefetch=True)
+
+        # Test with prefetch disabled
+        data2 = self.ring_buffer.consume_data_batch(thread_index=0, prefetch=False)
+
+        # Results should be the same regardless of prefetch setting
+        self.assertEqual(data1, data2)
+
+    def test_ring_buffer_schema_access(self):
+        """Test ring buffer schema access"""
+        if not self.ring_buffer_available:
+            self.skipTest("Ring buffer not available")
+
+        # Test schema access
+        schema_data, schema_size, schema_version = self.ring_buffer.get_schema(
+            thread_index=0
+        )
+
+        # Should return schema information (may be None if no schema)
+        self.assertIsInstance(schema_size, int)
+        self.assertIsInstance(schema_version, int)
+        self.assertGreaterEqual(schema_size, 0)
+        self.assertGreaterEqual(schema_version, 0)
+
+        # Test schema string access
+        schema_string, schema_size_str, schema_version_str = (
+            self.ring_buffer.get_schema_string(thread_index=0)
+        )
+
+        # Should return consistent information
+        self.assertEqual(schema_size, schema_size_str)
+        self.assertEqual(schema_version, schema_version_str)
+
+        # If schema exists, it should be readable
+        if schema_size > 0:
+            self.assertIsNotNone(schema_data)
+            self.assertIsInstance(schema_data, bytes)
+            self.assertEqual(len(schema_data), schema_size)
+
+            # If it's a string schema, it should be decodable
+            if schema_string is not None:
+                self.assertIsInstance(schema_string, str)
+                self.assertGreater(len(schema_string), 0)
+
+        print(f"Schema info: size={schema_size}, version={schema_version}")
+        if schema_string:
+            print(f"Schema content: {schema_string}")
+
+    def test_ring_buffer_schema_metadata(self):
+        """Test that schema information is included in metadata"""
+        if not self.ring_buffer_available:
+            self.skipTest("Ring buffer not available")
+
+        # Get metadata for thread 0
+        metadata = self.ring_buffer._get_thread_metadata(0)
+
+        # Should include schema information
+        self.assertIn("schema_version", metadata)
+        self.assertIn("schema_offset", metadata)
+        self.assertIn("schema_size", metadata)
+
+        # Values should be consistent with config
+        self.assertEqual(
+            metadata["schema_version"], self.ring_buffer.config["schema_version"]
+        )
+        self.assertEqual(
+            metadata["schema_size"], self.ring_buffer.config["schema_size"]
+        )
+
+    def test_ring_buffer_schema_config(self):
+        """Test that schema information is included in config"""
+        if not self.ring_buffer_available:
+            self.skipTest("Ring buffer not available")
+
+        config = self.ring_buffer.get_config()
+
+        # Should include schema information
+        self.assertIn("schema_size", config)
+        self.assertIn("schema_version", config)
+
+        # Values should be reasonable
+        self.assertIsInstance(config["schema_size"], int)
+        self.assertIsInstance(config["schema_version"], int)
+        self.assertGreaterEqual(config["schema_size"], 0)
+        self.assertGreaterEqual(config["schema_version"], 0)
+
+        print(
+            f"Config schema info: size={config['schema_size']}, version={config['schema_version']}"
+        )
+
+    def test_ring_buffer_schema_convenience_methods(self):
+        """Test convenience methods for schema access"""
+        if not self.ring_buffer_available:
+            self.skipTest("Ring buffer not available")
+
+        # Test convenience methods from VPPStats
+        schema_data, schema_size, schema_version = self.stat.get_ring_buffer_schema(
+            "/test/ring-buffer"
+        )
+        schema_string, schema_size_str, schema_version_str = (
+            self.stat.get_ring_buffer_schema_string("/test/ring-buffer")
+        )
+
+        # Should return consistent information
+        self.assertEqual(schema_size, schema_size_str)
+        self.assertEqual(schema_version, schema_version_str)
+
+        # Should match direct access
+        direct_schema_data, direct_schema_size, direct_schema_version = (
+            self.ring_buffer.get_schema()
+        )
+        self.assertEqual(schema_size, direct_schema_size)
+        self.assertEqual(schema_version, direct_schema_version)
+        if schema_data is not None:
+            self.assertEqual(schema_data, direct_schema_data)
+
+    def test_ring_buffer_schema_content(self):
+        """Test that schema content matches expected CDDL format"""
+        if not self.ring_buffer_available:
+            self.skipTest("Ring buffer not available")
+
+        # Get schema string
+        schema_string, schema_size, schema_version = self.ring_buffer.get_schema_string(
+            thread_index=0
+        )
+
+        # If schema exists, verify it has expected content
+        if schema_size > 0 and schema_string:
+            # Should be a string (not bytes)
+            self.assertIsInstance(schema_string, str)
+
+            # Should contain expected CDDL-like content
+            self.assertIn("ring_test_schema", schema_string)
+            self.assertIn("name:", schema_string)
+            self.assertIn("version:", schema_string)
+            self.assertIn("fields:", schema_string)
+            self.assertIn("seq", schema_string)
+            self.assertIn("timestamp", schema_string)
+
+            print(f"✓ Schema content verified: {schema_string[:100]}...")
+        else:
+            print("ℹ No schema found in ring buffer")
+
+
+class TestRingBufferIntegration(unittest.TestCase):
+    """Integration tests that coordinate with VPP writer tests"""
+
+    def setUp(self):
+        """Connect to statseg and find integration test ring buffer"""
+        self.stat = VPPStats()
+        self.stat.connect()
+
+        # Look for integration test ring buffer
+        try:
+            self.ring_buffer = self.stat.get_ring_buffer("/integration/test")
+            self.ring_buffer_available = True
+        except (KeyError, ValueError):
+            print(
+                "Integration test ring buffer not found. Run integration test setup first."
+            )
+            self.ring_buffer_available = False
+
+    def tearDown(self):
+        """Disconnect from statseg"""
+        self.stat.disconnect()
+
+    def test_integration_data_flow(self):
+        """Test complete data flow from writer to reader"""
+        if not self.ring_buffer_available:
+            self.skipTest("Integration ring buffer not available")
+
+        # This test would coordinate with a VPP writer test
+        # For now, we'll test the basic flow
+
+        # Reset reader state
+        self.ring_buffer.local_tails[0] = 0
+        self.ring_buffer.last_sequences[0] = None
+
+        # Try to consume data
+        data = self.ring_buffer.consume_data(thread_index=0)
+
+        # Should get list (empty or with data)
+        self.assertIsInstance(data, list)
+
+        # If we got data, verify it's the right format
+        for entry in data:
+            self.assertIsInstance(entry, bytes)
+            self.assertGreater(len(entry), 0)
+
+    def test_integration_overwrite_detection(self):
+        """Test overwrite detection in integration scenario"""
+        if not self.ring_buffer_available:
+            self.skipTest("Integration ring buffer not available")
+
+        # This test would coordinate with a writer that intentionally overwrites
+        # For now, we'll test the detection mechanism
+
+        # Simulate overwrite by manipulating sequence numbers
+        original_sequence = self.ring_buffer.last_sequences[0]
+
+        # This would normally happen when writer laps reader
+        # For testing, we'll just verify the detection logic exists
+        metadata = self.ring_buffer._get_thread_metadata(0)
+        self.assertIn("sequence", metadata)
+
+    def test_integration_performance(self):
+        """Test performance characteristics"""
+        if not self.ring_buffer_available:
+            self.skipTest("Integration ring buffer not available")
+
+        import time
+
+        # Test individual consume performance
+        start_time = time.time()
+        for _ in range(100):
+            self.ring_buffer.consume_data(thread_index=0, max_entries=1)
+        individual_time = time.time() - start_time
+
+        # Test batch consume performance
+        start_time = time.time()
+        for _ in range(10):
+            self.ring_buffer.consume_data_batch(thread_index=0, max_entries=10)
+        batch_time = time.time() - start_time
+
+        print(f"Individual consume time: {individual_time:.6f}s")
+        print(f"Batch consume time: {batch_time:.6f}s")
+
+        # Batch should be faster (though with empty data, difference might be minimal)
+        self.assertIsInstance(individual_time, float)
+        self.assertIsInstance(batch_time, float)
+
+
+def run_ring_buffer_tests():
+    """Run ring buffer tests with proper setup"""
+    print("Running Ring Buffer Tests...")
+    print("=" * 50)
+
+    # Create test suite
+    suite = unittest.TestSuite()
+
+    # Add ring buffer tests
+    suite.addTest(unittest.makeSuite(TestRingBuffer))
+    suite.addTest(unittest.makeSuite(TestRingBufferIntegration))
+
+    # Run tests
+    runner = unittest.TextTestRunner(verbosity=2)
+    result = runner.run(suite)
+
+    print("=" * 50)
+    print(f"Tests run: {result.testsRun}")
+    print(f"Failures: {len(result.failures)}")
+    print(f"Errors: {len(result.errors)}")
+
+    return result.wasSuccessful()
+
+
+def demo_ring_buffer_schema():
+    """Demo function showing how to use ring buffer schema functionality"""
+    print("Ring Buffer Schema Demo")
+    print("=" * 30)
+
+    try:
+        # Connect to VPP stats
+        stats = VPPStats()
+        stats.connect()
+
+        # Look for test ring buffer
+        try:
+            ring_buffer = stats.get_ring_buffer("/test/ring-buffer")
+            print("✓ Found test ring buffer")
+
+            # Get configuration
+            config = ring_buffer.get_config()
+            print(f"Ring buffer config: {config}")
+
+            # Get schema information
+            schema_data, schema_size, schema_version = ring_buffer.get_schema()
+            print(f"Schema info: size={schema_size}, version={schema_version}")
+
+            # Get schema as string
+            schema_string, _, _ = ring_buffer.get_schema_string()
+            if schema_string:
+                print(f"Schema content:\n{schema_string}")
+            else:
+                print("No schema found")
+
+            # Use convenience methods
+            print("\nUsing convenience methods:")
+            conv_schema_string, conv_size, conv_version = (
+                stats.get_ring_buffer_schema_string("/test/ring-buffer")
+            )
+            print(
+                f"Convenience method result: size={conv_size}, version={conv_version}"
+            )
+            if conv_schema_string:
+                print(f"Schema: {conv_schema_string[:100]}...")
+
+        except (KeyError, ValueError) as e:
+            print(f"Test ring buffer not found: {e}")
+            print(
+                "Run 'test stats ring-buffer-gen /test/ring-buffer 100 1000 16' in VPP first"
+            )
+
+        stats.disconnect()
+
+    except Exception as e:
+        print(f"Error: {e}")
+        print("Make sure VPP is running and stats socket is available")
+
+    print("=" * 30)
+
+
 if __name__ == "__main__":
     import cProfile
     from pstats import Stats
 
+    # Run ring buffer tests if available
+    if "--ring-buffer-tests" in sys.argv:
+        success = run_ring_buffer_tests()
+        sys.exit(0 if success else 1)
+
+    # Run schema demo if requested
+    if "--schema-demo" in sys.argv:
+        demo_ring_buffer_schema()
+        sys.exit(0)
+
+    # Run original tests
     unittest.main()
index 1c3b9d9..699b5e9 100644 (file)
@@ -81,9 +81,37 @@ stat_poll_loop (u8 ** patterns)
              break;
 
            case STAT_DIR_TYPE_SCALAR_INDEX:
+           case STAT_DIR_TYPE_GAUGE:
              fformat (stdout, "%.2f %s\n", res[i].scalar_value, res[i].name);
              break;
 
+           case STAT_DIR_TYPE_HISTOGRAM_LOG2:
+             for (k = 0; k < vec_len (res[i].log2_histogram_bins); k++)
+               {
+                 u64 *bins = res[i].log2_histogram_bins[k];
+                 int n_bins = vec_len (bins);
+                 if (n_bins < 2) // Need at least min_exp + one bin
+                   continue;
+                 u32 min_exp = bins[0];
+                 u64 cumulative = 0;
+                 u64 sum = 0;
+                 fformat (stdout, "Histogram %s (thread %d):\n", res[i].name,
+                          k);
+                 for (int j = 1; j < n_bins; ++j)
+                   {
+                     cumulative += bins[j];
+                     sum += bins[j] *
+                            (1ULL << (min_exp + j - 1)); // midpoint approx
+                     fformat (stdout, "  <= %llu: %llu (cumulative: %llu)\n",
+                              (1ULL << (min_exp + j - 1)), bins[j],
+                              cumulative);
+                   }
+                 fformat (stdout,
+                          "  +Inf: %llu (total count: %llu, sum: %llu)\n",
+                          cumulative, cumulative, sum);
+               }
+             break;
+
            case STAT_DIR_TYPE_EMPTY:
              break;
 
@@ -224,9 +252,37 @@ reconnect:
              break;
 
            case STAT_DIR_TYPE_SCALAR_INDEX:
+           case STAT_DIR_TYPE_GAUGE:
              fformat (stdout, "%.2f %s\n", res[i].scalar_value, res[i].name);
              break;
 
+           case STAT_DIR_TYPE_HISTOGRAM_LOG2:
+             for (k = 0; k < vec_len (res[i].log2_histogram_bins); k++)
+               {
+                 u64 *bins = res[i].log2_histogram_bins[k];
+                 int n_bins = vec_len (bins);
+                 if (n_bins < 2) // Need at least min_exp + one bin
+                   continue;
+                 u32 min_exp = bins[0];
+                 u64 cumulative = 0;
+                 u64 sum = 0;
+                 fformat (stdout, "Histogram %s (thread %d):\n", res[i].name,
+                          k);
+                 for (int j = 1; j < n_bins; ++j)
+                   {
+                     cumulative += bins[j];
+                     sum += bins[j] *
+                            (1ULL << (min_exp + j - 1)); // midpoint approx
+                     fformat (stdout, "  <= %llu: %llu (cumulative: %llu)\n",
+                              (1ULL << (min_exp + j - 1)), bins[j],
+                              cumulative);
+                   }
+                 fformat (stdout,
+                          "  +Inf: %llu (total count: %llu, sum: %llu)\n",
+                          cumulative, cumulative, sum);
+               }
+             break;
+
            case STAT_DIR_TYPE_NAME_VECTOR:
              if (res[i].name_vector == 0)
                continue;
index c6000a8..e40212d 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *------------------------------------------------------------------
- * vpp_get_stats.c
+ * vpp_prometheus_export.c
  *
  * Copyright (c) 2018 Cisco and/or its affiliates.
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -35,6 +35,7 @@
 #include <sys/socket.h>
 #include <vpp-api/client/stat_client.h>
 #include <vlib/vlib.h>
+#include <vlib/stats/shared.h>
 #include <ctype.h>
 
 /* https://github.com/prometheus/prometheus/wiki/Default-port-allocations */
@@ -55,6 +56,45 @@ prom_string (char *s)
   return s;
 }
 
+// For STAT_DIR_TYPE_HISTOGRAM_LOG2, the data is in res->log2_histogram_bins
+static void
+print_log2_histogram_metric (FILE *stream, stat_segment_data_t *res)
+{
+  int n_threads = vec_len (res->log2_histogram_bins);
+  char sanitized_name[VLIB_STATS_MAX_NAME_SZ];
+  strncpy (sanitized_name, res->name, VLIB_STATS_MAX_NAME_SZ - 1);
+  sanitized_name[VLIB_STATS_MAX_NAME_SZ - 1] = '\0';
+  prom_string (sanitized_name);
+
+  for (int thread = 0; thread < n_threads; ++thread)
+    {
+      u64 *bins = res->log2_histogram_bins[thread];
+      int n_bins = vec_len (bins);
+      if (n_bins < 2) // Need at least min_exp + one bin
+       continue;
+      u32 min_exp = bins[0];
+      u64 cumulative = 0;
+      u64 sum = 0;
+      fformat (stream, "# TYPE %s histogram\n", sanitized_name);
+      for (int i = 1; i < n_bins; ++i)
+       {
+         cumulative += bins[i];
+         sum += bins[i] * (1ULL << (min_exp + i - 1)); // midpoint approx
+         fformat (stream, "%s{le=\"%llu\",thread=\"%d\"} %llu\n",
+                  sanitized_name, (1ULL << (min_exp + i - 1)), thread,
+                  cumulative);
+       }
+      // +Inf bucket
+      fformat (stream, "%s{le=\"+Inf\",thread=\"%d\"} %llu\n", sanitized_name,
+              thread, cumulative);
+      // _count and _sum
+      fformat (stream, "%s_count{thread=\"%d\"} %llu\n", sanitized_name,
+              thread, cumulative);
+      fformat (stream, "%s_sum{thread=\"%d\"} %llu\n", sanitized_name, thread,
+              sum);
+    }
+}
+
 static void
 print_metric_v1 (FILE *stream, stat_segment_data_t *res)
 {
@@ -62,6 +102,9 @@ print_metric_v1 (FILE *stream, stat_segment_data_t *res)
 
   switch (res->type)
     {
+    case STAT_DIR_TYPE_HISTOGRAM_LOG2:
+      print_log2_histogram_metric (stream, res);
+      break;
     case STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE:
       fformat (stream, "# TYPE %s counter\n", prom_string (res->name));
       for (k = 0; k < vec_len (res->simple_counter_vec); k++)
@@ -91,7 +134,11 @@ print_metric_v1 (FILE *stream, stat_segment_data_t *res)
       fformat (stream, "%s %.2f\n", prom_string (res->name),
               res->scalar_value);
       break;
-
+    case STAT_DIR_TYPE_GAUGE:
+      fformat (stream, "# TYPE %s gauge\n", prom_string (res->name));
+      fformat (stream, "%s %.2f\n", prom_string (res->name),
+              res->scalar_value);
+      break;
     case STAT_DIR_TYPE_NAME_VECTOR:
       fformat (stream, "# TYPE %s_info gauge\n", prom_string (res->name));
       for (k = 0; k < vec_len (res->name_vector); k++)
@@ -159,9 +206,13 @@ print_metric_v2 (FILE *stream, stat_segment_data_t *res)
   num_tokens = tokenize (res->name, tokens, lengths, MAX_TOKENS);
   switch (res->type)
     {
+    case STAT_DIR_TYPE_HISTOGRAM_LOG2:
+      print_log2_histogram_metric (stream, res);
+      break;
     case STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE:
       if (res->simple_counter_vec == 0)
        return;
+
       for (k = 0; k < vec_len (res->simple_counter_vec); k++)
        for (j = 0; j < vec_len (res->simple_counter_vec[k]); j++)
          {
@@ -260,6 +311,7 @@ print_metric_v2 (FILE *stream, stat_segment_data_t *res)
       break;
 
     case STAT_DIR_TYPE_SCALAR_INDEX:
+    case STAT_DIR_TYPE_GAUGE:
       if ((num_tokens == 4) &&
          !strncmp (tokens[1], "buffer-pools", lengths[1]))
        {
diff --git a/test/test_ring_buffer.py b/test/test_ring_buffer.py
new file mode 100644 (file)
index 0000000..25b6b3b
--- /dev/null
@@ -0,0 +1,424 @@
+import time
+import unittest
+import struct
+from framework import VppTestCase
+
+RING_NAME = "/test/ring_buffer_pytest"
+RING_SIZE = 1024
+COUNT = 100
+INTERVAL_USEC = 1000  # 1ms
+
+
+class TestRingBuffer(VppTestCase):
+    """Ring Buffer Test Case"""
+
+    maxDiff = None
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestRingBuffer, cls).setUpClass()
+
+    def setUp(self):
+        super(TestRingBuffer, self).setUp()
+
+    def tearDown(self):
+        super(TestRingBuffer, self).tearDown()
+
+    def test_ring_buffer_generation(self):
+        """Test ring buffer generation"""
+
+        # Step 1: Generate messages using the VPPApiClient CLI
+        cli_cmd = f"test stats ring-buffer-gen {RING_NAME} {COUNT} {INTERVAL_USEC} {RING_SIZE}"
+        result = self.vapi.cli(cli_cmd)
+        assert "Generated" in result, f"CLI failed: {result}"
+
+        # Step 2: Connect to stats segment and get the ring buffer
+        ring_buffer = self.statistics.get_ring_buffer(f"{RING_NAME}")
+
+        # Step 3: Poll for all messages
+        received = []
+        start = time.time()
+        while len(received) < COUNT and (time.time() - start) < 10:
+            data = ring_buffer.poll_for_data(thread_index=0, timeout=0.5)
+            for entry in data:
+                # Unpack the struct: u64 seq, f64 timestamp
+                seq, ts = struct.unpack("<Qd", entry)
+                received.append((seq, ts))
+            time.sleep(0.01)
+
+        # Step 4: Validate
+        assert len(received) == COUNT, f"Expected {COUNT} messages, got {len(received)}"
+        for i, (seq, ts) in enumerate(received):
+            assert seq == i, f"Sequence mismatch at {i}: got {seq}"
+            assert ts > 0, f"Timestamp should be positive, got {ts}"
+
+        # print(f"Received {len(received)} messages from ring buffer '{RING_NAME}'")
+
+    def test_ring_buffer_overwrite(self):
+        """Test ring buffer overwrite behavior"""
+        ring_size = 16
+        total_messages = 3 * ring_size
+        interval_usec = 100  # Fast, to fill quickly
+        ring_name = "/test/ring_buffer_overwrite"
+
+        # Step 1: Generate more messages than the ring size
+        cli_cmd = f"test stats ring-buffer-gen {ring_name} {total_messages} {interval_usec} {ring_size}"
+        result = self.vapi.cli(cli_cmd)
+        assert "Generated" in result, f"CLI failed: {result}"
+
+        # Ensure all messages are written before polling
+        time.sleep(0.1)
+
+        # Step 2: Connect to stats segment and get the ring buffer
+        ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+        # Step 3: Poll for all messages (should only get the last 'ring_size' messages)
+        received = {}
+        start = time.time()
+        while len(received) < ring_size and (time.time() - start) < 10:
+            data = ring_buffer.poll_for_data(thread_index=0, timeout=0.5)
+            for entry in data:
+                seq, ts = struct.unpack("<Qd", entry)
+                if seq not in received:
+                    received[seq] = ts
+            time.sleep(0.01)
+
+        # Debug: print the received sequence numbers
+        print(f"Received sequence numbers: {sorted(received.keys())}")
+
+        # Step 4: Validate
+        assert (
+            len(received) == ring_size
+        ), f"Expected {ring_size} messages, got {len(received)}"
+        expected_start = total_messages - ring_size
+        for i, seq in enumerate(sorted(received)):
+            ts = received[seq]
+            assert (
+                seq == expected_start + i
+            ), f"Sequence mismatch at {i}: got {seq}, expected {expected_start + i}"
+            assert ts > 0, f"Timestamp should be positive, got {ts}"
+
+    def test_ring_buffer_batch_operations(self):
+        """Test ring buffer batch operations"""
+        ring_name = "/test/ring_buffer_batch"
+        batch_size = 10
+        total_messages = 50
+
+        # Step 1: Generate messages using VPP CLI
+        cli_cmd = f"test stats ring-buffer-gen {ring_name} {total_messages} {INTERVAL_USEC} {RING_SIZE}"
+        result = self.vapi.cli(cli_cmd)
+        assert "Generated" in result, f"CLI failed: {result}"
+
+        # Step 2: Get ring buffer
+        ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+        # Step 3: Test individual consume
+        individual_data = []
+        start = time.time()
+        while len(individual_data) < total_messages and (time.time() - start) < 10:
+            data = ring_buffer.consume_data(thread_index=0, max_entries=batch_size)
+            individual_data.extend(data)
+            time.sleep(0.01)
+
+        # Step 4: Reset and test batch consume
+        ring_buffer.local_tails[0] = 0
+        ring_buffer.last_sequences[0] = None
+
+        batch_data = []
+        start = time.time()
+        while len(batch_data) < total_messages and (time.time() - start) < 10:
+            data = ring_buffer.consume_data_batch(
+                thread_index=0, max_entries=batch_size
+            )
+            batch_data.extend(data)
+            time.sleep(0.01)
+
+        # Step 5: Validate results are the same
+        assert len(individual_data) == len(
+            batch_data
+        ), "Individual and batch should return same number of entries"
+        assert (
+            individual_data == batch_data
+        ), "Individual and batch should return same data"
+
+        # Step 6: Validate data format
+        for entry in individual_data:
+            seq, ts = struct.unpack("<Qd", entry)
+            assert seq >= 0, f"Sequence should be non-negative, got {seq}"
+            assert ts > 0, f"Timestamp should be positive, got {ts}"
+
+    def test_ring_buffer_configuration(self):
+        """Test ring buffer configuration access"""
+        ring_name = "/test/ring_buffer_config"
+
+        # Step 1: Create ring buffer
+        cli_cmd = (
+            f"test stats ring-buffer-gen {ring_name} 10 {INTERVAL_USEC} {RING_SIZE}"
+        )
+        result = self.vapi.cli(cli_cmd)
+        assert "Generated" in result, f"CLI failed: {result}"
+
+        # Step 2: Get ring buffer and check configuration
+        ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+        config = ring_buffer.get_config()
+
+        # Step 3: Validate configuration
+        assert "entry_size" in config, "Config should contain entry_size"
+        assert "ring_size" in config, "Config should contain ring_size"
+        assert "n_threads" in config, "Config should contain n_threads"
+        assert config["entry_size"] > 0, "Entry size should be positive"
+        assert config["ring_size"] > 0, "Ring size should be positive"
+        assert config["n_threads"] > 0, "Number of threads should be positive"
+
+        # Step 4: Check metadata access
+        metadata = ring_buffer._get_thread_metadata(0)
+        assert "head" in metadata, "Metadata should contain head"
+        assert "sequence" in metadata, "Metadata should contain sequence"
+        assert isinstance(metadata["head"], int), "Head should be integer"
+        assert isinstance(metadata["sequence"], int), "Sequence should be integer"
+
+    def test_ring_buffer_error_handling(self):
+        """Test ring buffer error handling"""
+        ring_name = "/test/ring_buffer_error"
+
+        # Step 1: Create ring buffer
+        cli_cmd = (
+            f"test stats ring-buffer-gen {ring_name} 10 {INTERVAL_USEC} {RING_SIZE}"
+        )
+        result = self.vapi.cli(cli_cmd)
+        assert "Generated" in result, f"CLI failed: {result}"
+
+        # Step 2: Get ring buffer
+        ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+        # Step 3: Test invalid thread index
+        config = ring_buffer.get_config()
+        invalid_thread = config["n_threads"] + 1
+
+        try:
+            ring_buffer._get_thread_metadata(invalid_thread)
+            assert False, "Should have raised IndexError for invalid thread"
+        except IndexError:
+            pass  # Expected
+
+        # Step 4: Test invalid parameters
+        data = ring_buffer.consume_data(thread_index=0, max_entries=0)
+        assert data == [], "Zero max_entries should return empty list"
+
+        data = ring_buffer.consume_data(thread_index=0, max_entries=-1)
+        assert data == [], "Negative max_entries should return empty list"
+
+    def test_ring_buffer_api_compatibility(self):
+        """Test ring buffer API compatibility methods"""
+        ring_name = "/test/ring_buffer_compat"
+
+        # Step 1: Create ring buffer
+        cli_cmd = (
+            f"test stats ring-buffer-gen {ring_name} 10 {INTERVAL_USEC} {RING_SIZE}"
+        )
+        result = self.vapi.cli(cli_cmd)
+        assert "Generated" in result, f"CLI failed: {result}"
+
+        # Step 2: Get ring buffer
+        ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+        # Step 3: Test compatibility methods
+        count = ring_buffer.get_count(thread_index=0)
+        is_empty = ring_buffer.is_empty(thread_index=0)
+        is_full = ring_buffer.is_full(thread_index=0)
+
+        # These methods return simplified values since writer doesn't track reader state
+        assert count == 0, "Count should be 0 (writer doesn't track reader)"
+        assert is_empty == True, "Is empty should be True (writer doesn't track reader)"
+        assert is_full == False, "Is full should be False (writer doesn't track reader)"
+
+        # Step 4: Test string representation
+        repr_str = repr(ring_buffer)
+        assert (
+            "StatsRingBuffer" in repr_str
+        ), "String representation should contain StatsRingBuffer"
+        assert (
+            "entry_size" in repr_str
+        ), "String representation should contain entry_size"
+        assert "ring_size" in repr_str, "String representation should contain ring_size"
+        assert "n_threads" in repr_str, "String representation should contain n_threads"
+
+    def test_ring_buffer_performance(self):
+        """Test ring buffer performance characteristics"""
+        ring_name = "/test/ring_buffer_perf"
+        test_entries = 100
+
+        # Step 1: Create ring buffer
+        cli_cmd = f"test stats ring-buffer-gen {ring_name} {test_entries} {INTERVAL_USEC} {RING_SIZE}"
+        result = self.vapi.cli(cli_cmd)
+        assert "Generated" in result, f"CLI failed: {result}"
+
+        # Step 2: Get ring buffer
+        ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+        # Step 3: Test individual consume performance
+        start_time = time.time()
+        individual_count = 0
+        for _ in range(100):  # 100 operations
+            data = ring_buffer.consume_data(thread_index=0, max_entries=1)
+            individual_count += len(data)
+        individual_time = time.time() - start_time
+
+        # Step 4: Reset and test batch consume performance
+        ring_buffer.local_tails[0] = 0
+        ring_buffer.last_sequences[0] = None
+
+        start_time = time.time()
+        batch_count = 0
+        for _ in range(10):  # 10 batch operations
+            data = ring_buffer.consume_data_batch(thread_index=0, max_entries=10)
+            batch_count += len(data)
+        batch_time = time.time() - start_time
+
+        # Step 5: Calculate performance metrics
+        individual_throughput = 100 / individual_time if individual_time > 0 else 0
+        batch_throughput = 100 / batch_time if batch_time > 0 else 0
+
+        print(f"Individual consume: {individual_throughput:.0f} ops/sec")
+        print(f"Batch consume: {batch_throughput:.0f} ops/sec")
+
+        # Step 6: Validate performance (basic sanity checks)
+        assert individual_time > 0, "Individual operations should take some time"
+        assert batch_time > 0, "Batch operations should take some time"
+        assert individual_count >= 0, "Individual count should be non-negative"
+        assert batch_count >= 0, "Batch count should be non-negative"
+
+    def test_ring_buffer_multiple_threads(self):
+        """Test ring buffer access across multiple threads"""
+        ring_name = "/test/ring_buffer_multi"
+        threads = 2
+
+        # Step 1: Create ring buffer with multiple threads using CLI (single-threaded)
+        cli_cmd = (
+            f"test stats ring-buffer-gen {ring_name} 10 {INTERVAL_USEC} {RING_SIZE}"
+        )
+        result = self.vapi.cli(cli_cmd)
+        assert "Generated" in result, f"CLI failed: {result}"
+
+        # Step 2: Get ring buffer
+        ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+        # Step 3: Test single thread (since CLI only supports single-threaded)
+        config = ring_buffer.get_config()
+        assert (
+            config["n_threads"] == 1
+        ), f"CLI creates single-threaded ring buffers, got {config['n_threads']}"
+
+        # Test metadata access
+        metadata = ring_buffer._get_thread_metadata(0)
+        assert "head" in metadata, "Thread 0 metadata should contain head"
+        assert "sequence" in metadata, "Thread 0 metadata should contain sequence"
+
+        # Test data consumption
+        data = ring_buffer.consume_data(thread_index=0)
+        assert isinstance(data, list), "Thread 0 should return list"
+
+    def test_ring_buffer_sequence_consistency(self):
+        """Test sequence number consistency across reads"""
+        ring_name = "/test/ring_buffer_seq"
+
+        # Step 1: Create ring buffer
+        cli_cmd = (
+            f"test stats ring-buffer-gen {ring_name} 10 {INTERVAL_USEC} {RING_SIZE}"
+        )
+        result = self.vapi.cli(cli_cmd)
+        assert "Generated" in result, f"CLI failed: {result}"
+
+        # Step 2: Get ring buffer
+        ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+        # Step 3: Read metadata multiple times to check consistency
+        metadata1 = ring_buffer._get_thread_metadata(0)
+        time.sleep(0.01)  # Small delay
+        metadata2 = ring_buffer._get_thread_metadata(0)
+
+        # Sequence numbers should be consistent (same or increasing)
+        assert (
+            metadata2["sequence"] >= metadata1["sequence"]
+        ), "Sequence should be monotonically increasing"
+
+    def test_ring_buffer_polling_with_callback(self):
+        """Test ring buffer polling with callback function"""
+        ring_name = "/test/ring_buffer_callback"
+
+        # Step 1: Create ring buffer
+        cli_cmd = (
+            f"test stats ring-buffer-gen {ring_name} 5 {INTERVAL_USEC} {RING_SIZE}"
+        )
+        result = self.vapi.cli(cli_cmd)
+        assert "Generated" in result, f"CLI failed: {result}"
+
+        # Step 2: Get ring buffer
+        ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+        # Step 3: Test polling with callback
+        collected_data = []
+
+        def callback(data):
+            collected_data.append(data)
+
+        # Poll with callback and short timeout
+        result_data = ring_buffer.poll_for_data(
+            thread_index=0, timeout=1.0, callback=callback
+        )
+
+        # Both callback and return value should work
+        assert isinstance(result_data, list), "Poll should return list"
+        assert isinstance(collected_data, list), "Callback should collect data in list"
+
+    def test_ring_buffer_empty_operations(self):
+        """Test ring buffer operations on empty buffer"""
+        ring_name = "/test/ring_buffer_empty"
+
+        # Step 1: Create ring buffer but don't generate data
+        cli_cmd = (
+            f"test stats ring-buffer-gen {ring_name} 0 {INTERVAL_USEC} {RING_SIZE}"
+        )
+        result = self.vapi.cli(cli_cmd)
+        assert "Generated" in result, f"CLI failed: {result}"
+
+        # Step 2: Get ring buffer
+        ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+        # Step 3: Test empty consume
+        data = ring_buffer.consume_data(thread_index=0)
+        assert data == [], "Empty buffer should return empty list"
+
+        # Step 4: Test empty batch consume
+        data = ring_buffer.consume_data_batch(thread_index=0)
+        assert data == [], "Empty buffer should return empty list for batch"
+
+        # Step 5: Test empty poll
+        data = ring_buffer.poll_for_data(thread_index=0, timeout=0.1)
+        assert data == [], "Empty buffer should return empty list for poll"
+
+    def test_ring_buffer_prefetch_parameter(self):
+        """Test prefetch parameter in batch consume"""
+        ring_name = "/test/ring_buffer_prefetch"
+
+        # Step 1: Create ring buffer
+        cli_cmd = (
+            f"test stats ring-buffer-gen {ring_name} 10 {INTERVAL_USEC} {RING_SIZE}"
+        )
+        result = self.vapi.cli(cli_cmd)
+        assert "Generated" in result, f"CLI failed: {result}"
+
+        # Step 2: Get ring buffer
+        ring_buffer = self.statistics.get_ring_buffer(f"{ring_name}")
+
+        # Step 3: Test with prefetch enabled
+        data1 = ring_buffer.consume_data_batch(thread_index=0, prefetch=True)
+
+        # Step 4: Reset and test with prefetch disabled
+        ring_buffer.local_tails[0] = 0
+        ring_buffer.last_sequences[0] = None
+
+        data2 = ring_buffer.consume_data_batch(thread_index=0, prefetch=False)
+
+        # Step 5: Results should be the same regardless of prefetch setting
+        assert data1 == data2, "Prefetch parameter should not affect results"
diff --git a/test/test_ring_buffer_simple.py b/test/test_ring_buffer_simple.py
new file mode 100644 (file)
index 0000000..086d654
--- /dev/null
@@ -0,0 +1,218 @@
+#!/usr/bin/env python3
+"""
+Simple Ring Buffer Test Runner
+
+This script provides a simple way to test ring buffer functionality
+using the existing VPP make test infrastructure.
+
+Usage:
+    python3 test/test_ring_buffer_simple.py
+
+This script can be run as part of the VPP test suite or standalone.
+"""
+
+import sys
+import time
+import struct
+from framework import VppTestCase
+
+
+class TestRingBufferSimple(VppTestCase):
+    """Simple Ring Buffer Test Case for Make Test Infrastructure"""
+
+    def test_ring_buffer_basic_functionality(self):
+        """Test basic ring buffer functionality"""
+        import time
+        import struct
+
+        # Use a unique name for each test run to avoid reading old data
+        ring_name = f"/test/ring_buffer_simple_{int(time.time())}"
+        count = 50
+        interval_usec = 1000
+        ring_size = 64
+
+        # Step 1: Generate messages using VPP CLI
+        cli_cmd = f"test stats ring-buffer-gen {ring_name} {count} {interval_usec} {ring_size}"
+        result = self.vapi.cli(cli_cmd)
+        assert "Generated" in result, f"CLI failed: {result}"
+
+        # Step 2: Get ring buffer from stats segment
+        ring_buffer = self.statistics.get_ring_buffer(ring_name)
+
+        # Step 3: Consume all messages
+        received = []
+        start = time.time()
+
+        # Debug: Check initial state
+        metadata = ring_buffer._get_thread_metadata(0)
+        print(
+            f"DEBUG: Initial metadata - head: {metadata['head']}, sequence: {metadata['sequence']}"
+        )
+        print(f"DEBUG: Ring size: {ring_buffer.get_config()['ring_size']}")
+
+        while len(received) < count and (time.time() - start) < 10:
+            data = ring_buffer.consume_data(thread_index=0, max_entries=10)
+            print(f"DEBUG: Consumed {len(data)} entries")
+            for entry in data:
+                seq, ts = struct.unpack("<Qd", entry)
+                received.append((seq, ts))
+                if len(received) <= 5:  # Print first few entries
+                    print(f"DEBUG: Entry {len(received)-1}: seq={seq}, ts={ts}")
+            time.sleep(0.01)
+
+        # Step 4: Validate results
+        assert len(received) == count, f"Expected {count} messages, got {len(received)}"
+
+        # Check sequence numbers are consecutive
+        for i, (seq, ts) in enumerate(received):
+            assert seq == i, f"Sequence mismatch at {i}: got {seq}, expected {i}"
+            assert ts > 0, f"Timestamp should be positive, got {ts}"
+
+        print(f"✓ Successfully received {len(received)} messages from ring buffer")
+
+    def test_ring_buffer_batch_operations(self):
+        """Test ring buffer batch operations"""
+        ring_name = "/test/ring_buffer_batch_simple"
+        count = 30
+        interval_usec = 1000
+        ring_size = 64
+
+        # Step 1: Generate messages
+        cli_cmd = f"test stats ring-buffer-gen {ring_name} {count} {interval_usec} {ring_size}"
+        result = self.vapi.cli(cli_cmd)
+        assert "Generated" in result, f"CLI failed: {result}"
+
+        # Step 2: Get ring buffer
+        ring_buffer = self.statistics.get_ring_buffer(ring_name)
+
+        # Step 3: Test batch consume
+        data = ring_buffer.consume_data_batch(thread_index=0, max_entries=count)
+
+        # Step 4: Validate results
+        assert len(data) == count, f"Expected {count} entries, got {len(data)}"
+
+        # Check data format
+        for i, entry in enumerate(data):
+            seq, ts = struct.unpack("<Qd", entry)
+            assert seq == i, f"Sequence mismatch at {i}: got {seq}, expected {i}"
+            assert ts > 0, f"Timestamp should be positive, got {ts}"
+
+        print(f"✓ Successfully received {len(data)} messages using batch operations")
+
+    def test_ring_buffer_configuration(self):
+        """Test ring buffer configuration access"""
+        ring_name = "/test/ring_buffer_config_simple"
+        count = 10
+        interval_usec = 1000
+        ring_size = 32
+
+        # Step 1: Create ring buffer
+        cli_cmd = f"test stats ring-buffer-gen {ring_name} {count} {interval_usec} {ring_size}"
+        result = self.vapi.cli(cli_cmd)
+        assert "Generated" in result, f"CLI failed: {result}"
+
+        # Step 2: Get ring buffer and check configuration
+        ring_buffer = self.statistics.get_ring_buffer(ring_name)
+        config = ring_buffer.get_config()
+
+        # Step 3: Validate configuration
+        assert (
+            config["entry_size"] == 16
+        ), f"Expected entry_size 16, got {config['entry_size']}"
+        assert (
+            config["ring_size"] == ring_size
+        ), f"Expected ring_size {ring_size}, got {config['ring_size']}"
+        assert (
+            config["n_threads"] == 1
+        ), f"Expected n_threads 1, got {config['n_threads']}"
+
+        # Step 4: Check metadata
+        metadata = ring_buffer._get_thread_metadata(0)
+        assert "head" in metadata, "Metadata should contain head"
+        assert "sequence" in metadata, "Metadata should contain sequence"
+        assert (
+            metadata["sequence"] >= count
+        ), f"Sequence should be >= {count}, got {metadata['sequence']}"
+
+        print(f"✓ Ring buffer configuration validated: {config}")
+
+    def test_ring_buffer_empty_operations(self):
+        """Test ring buffer operations on empty buffer"""
+        ring_name = "/test/ring_buffer_empty_simple"
+
+        # Step 1: Create ring buffer without generating data
+        cli_cmd = f"test stats ring-buffer-gen {ring_name} 0 1000 32"
+        result = self.vapi.cli(cli_cmd)
+        assert "Generated" in result, f"CLI failed: {result}"
+
+        # Step 2: Get ring buffer
+        ring_buffer = self.statistics.get_ring_buffer(ring_name)
+
+        # Step 3: Test various operations on empty buffer
+        data = ring_buffer.consume_data(thread_index=0)
+        assert data == [], "Empty buffer should return empty list"
+
+        data = ring_buffer.consume_data_batch(thread_index=0)
+        assert data == [], "Empty buffer should return empty list for batch"
+
+        data = ring_buffer.poll_for_data(thread_index=0, timeout=0.1)
+        assert data == [], "Empty buffer should return empty list for poll"
+
+        # Step 4: Check API compatibility methods
+        count = ring_buffer.get_count(thread_index=0)
+        is_empty = ring_buffer.is_empty(thread_index=0)
+        is_full = ring_buffer.is_full(thread_index=0)
+
+        assert count == 0, "Count should be 0 for empty buffer"
+        assert is_empty == True, "Is empty should be True for empty buffer"
+        assert is_full == False, "Is full should be False for empty buffer"
+
+        print("✓ Empty buffer operations validated")
+
+    def test_ring_buffer_error_handling(self):
+        """Test ring buffer error handling"""
+        ring_name = "/test/ring_buffer_error_simple"
+
+        # Step 1: Create ring buffer
+        cli_cmd = f"test stats ring-buffer-gen {ring_name} 5 1000 32"
+        result = self.vapi.cli(cli_cmd)
+        assert "Generated" in result, f"CLI failed: {result}"
+
+        # Step 2: Get ring buffer
+        ring_buffer = self.statistics.get_ring_buffer(ring_name)
+
+        # Step 3: Test invalid thread index
+        try:
+            ring_buffer._get_thread_metadata(999)  # Invalid thread
+            assert False, "Should have raised IndexError for invalid thread"
+        except IndexError:
+            pass  # Expected
+
+        # Step 4: Test invalid parameters
+        data = ring_buffer.consume_data(thread_index=0, max_entries=0)
+        assert data == [], "Zero max_entries should return empty list"
+
+        data = ring_buffer.consume_data(thread_index=0, max_entries=-1)
+        assert data == [], "Negative max_entries should return empty list"
+
+        print("✓ Error handling validated")
+
+
+def main():
+    """Main function for standalone execution"""
+    import unittest
+
+    # Create test suite
+    suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(TestRingBufferSimple))
+
+    # Run tests
+    runner = unittest.TextTestRunner(verbosity=2)
+    result = runner.run(suite)
+
+    # Return appropriate exit code
+    return 0 if result.wasSuccessful() else 1
+
+
+if __name__ == "__main__":
+    sys.exit(main())