sflow: initial checkin 80/41680/18
authorPim van Pelt <[email protected]>
Sun, 6 Oct 2024 15:49:00 +0000 (17:49 +0200)
committerBenoit Ganne <[email protected]>
Mon, 20 Jan 2025 17:21:43 +0000 (17:21 +0000)
This is an sFlow dataplane plugin that can sample
1-in-N packets from device-input, copying them to
a FIFO queue and servicing that queue from a main
process which formats them as Netlink PSAMPLEs,
to be picked up by a popular sidecar agent called
host-sflow.

Type: feature
Change-Id: Ic03456472e53309678f182dc8f74d3c81fb619e6
Signed-off-by: [email protected]
Signed-off-by: [email protected]
19 files changed:
MAINTAINERS
docs/spelling_wordlist.txt
src/plugins/sflow/CMakeLists.txt [new file with mode: 0644]
src/plugins/sflow/FEATURE.yaml [new file with mode: 0644]
src/plugins/sflow/node.c [new file with mode: 0644]
src/plugins/sflow/sflow.api [new file with mode: 0644]
src/plugins/sflow/sflow.c [new file with mode: 0644]
src/plugins/sflow/sflow.h [new file with mode: 0644]
src/plugins/sflow/sflow.rst [new file with mode: 0644]
src/plugins/sflow/sflow_common.h [new file with mode: 0644]
src/plugins/sflow/sflow_psample.c [new file with mode: 0644]
src/plugins/sflow/sflow_psample.h [new file with mode: 0644]
src/plugins/sflow/sflow_psample_fields.h [new file with mode: 0644]
src/plugins/sflow/sflow_test.c [new file with mode: 0644]
src/plugins/sflow/sflow_usersock.c [new file with mode: 0644]
src/plugins/sflow/sflow_usersock.h [new file with mode: 0644]
src/plugins/sflow/sflow_vapi.c [new file with mode: 0644]
src/plugins/sflow/sflow_vapi.h [new file with mode: 0644]
test/test_sflow.py [new file with mode: 0644]

index b1065cd..4ad3109 100644 (file)
@@ -903,3 +903,9 @@ Netmap
 I:     netmap
 M:     Tom Jones <[email protected]>
 F:     src/plugins/netmap/
+
+sFlow
+I:     sflow
+M:     Pim van Pelt <[email protected]>
+M:     Neil McKee <[email protected]>
+F:     src/plugins/sflow/
index c698696..b1962cc 100644 (file)
@@ -1381,3 +1381,5 @@ zoomout
 zx
 µs
 oflags
+sflow
+sFlow
diff --git a/src/plugins/sflow/CMakeLists.txt b/src/plugins/sflow/CMakeLists.txt
new file mode 100644 (file)
index 0000000..fc20f0f
--- /dev/null
@@ -0,0 +1,42 @@
+
+# Copyright (c) 2024 InMon Corp.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+include_directories(${CMAKE_SOURCE_DIR}/vpp-api ${CMAKE_CURRENT_BINARY_DIR}/../../vpp-api)
+add_vpp_plugin(sflow
+  SOURCES
+  sflow.c
+  node.c
+  sflow_common.h
+  sflow.h
+  sflow_psample.c
+  sflow_psample.h
+  sflow_psample_fields.h
+  sflow_usersock.c
+  sflow_usersock.h
+  sflow_vapi.c
+  sflow_vapi.h
+
+  MULTIARCH_SOURCES
+  node.c
+
+  API_FILES
+  sflow.api
+
+  API_TEST_SOURCES
+  sflow_test.c
+
+  LINK_LIBRARIES
+  vppapiclient
+  vapiclient
+)
diff --git a/src/plugins/sflow/FEATURE.yaml b/src/plugins/sflow/FEATURE.yaml
new file mode 100644 (file)
index 0000000..612db61
--- /dev/null
@@ -0,0 +1,16 @@
+---
+name: sFlow
+maintainer: Neil McKee <[email protected]>
+
+description: |-
+       This plugin implements the random packet-sampling and interface
+       telemetry streaming required to support standard sFlow export
+       on Linux platforms. The overhead incurred by this monitoring is
+       minimal, so that detailed, real-time traffic analysis can be
+       achieved even under high load conditions, with visibility into
+       any fields that appear in the packet headers. If the linux-cp
+       plugin is running then interfaces will be mapped to their
+       equivalent Linux tap ports.
+
+state: experimental
+properties: [CLI, MULTITHREAD]
diff --git a/src/plugins/sflow/node.c b/src/plugins/sflow/node.c
new file mode 100644 (file)
index 0000000..5182643
--- /dev/null
@@ -0,0 +1,356 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vlib/vlib.h>
+#include <vlibmemory/api.h>
+#include <vnet/vnet.h>
+#include <vnet/pg/pg.h>
+#include <vppinfra/error.h>
+#include <sflow/sflow.h>
+
+typedef struct
+{
+  u32 next_index;
+  u32 sw_if_index;
+  u8 new_src_mac[6];
+  u8 new_dst_mac[6];
+} sflow_trace_t;
+
+#ifndef CLIB_MARCH_VARIANT
+static u8 *
+my_format_mac_address (u8 *s, va_list *args)
+{
+  u8 *a = va_arg (*args, u8 *);
+  return format (s, "%02x:%02x:%02x:%02x:%02x:%02x", a[0], a[1], a[2], a[3],
+                a[4], a[5]);
+}
+
+/* packet trace format function */
+static u8 *
+format_sflow_trace (u8 *s, va_list *args)
+{
+  CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
+  CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
+  sflow_trace_t *t = va_arg (*args, sflow_trace_t *);
+
+  s = format (s, "SFLOW: sw_if_index %d, next index %d\n", t->sw_if_index,
+             t->next_index);
+  s = format (s, "  src %U -> dst %U", my_format_mac_address, t->new_src_mac,
+             my_format_mac_address, t->new_dst_mac);
+  return s;
+}
+
+vlib_node_registration_t sflow_node;
+
+#endif /* CLIB_MARCH_VARIANT */
+
+#ifndef CLIB_MARCH_VARIANT
+static char *sflow_error_strings[] = {
+#define _(sym, string) string,
+  foreach_sflow_error
+#undef _
+};
+#endif /* CLIB_MARCH_VARIANT */
+
+typedef enum
+{
+  SFLOW_NEXT_ETHERNET_INPUT,
+  SFLOW_N_NEXT,
+} sflow_next_t;
+
+VLIB_NODE_FN (sflow_node)
+(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
+{
+  u32 n_left_from, *from, *to_next;
+  sflow_next_t next_index;
+
+  sflow_main_t *smp = &sflow_main;
+  from = vlib_frame_vector_args (frame);
+  n_left_from = frame->n_vectors;
+
+  uword thread_index = os_get_thread_index ();
+  sflow_per_thread_data_t *sfwk =
+    vec_elt_at_index (smp->per_thread_data, thread_index);
+
+  /* note that sfwk->skip==1 means "take the next packet",
+     so we never see sfwk->skip==0. */
+
+  u32 pkts = n_left_from;
+  if (PREDICT_TRUE (sfwk->skip > pkts))
+    {
+      /* skip the whole frame-vector */
+      sfwk->skip -= pkts;
+      sfwk->pool += pkts;
+    }
+  else
+    {
+      while (pkts >= sfwk->skip)
+       {
+         /* reach in to get the one we want. */
+         vlib_buffer_t *bN = vlib_get_buffer (vm, from[sfwk->skip - 1]);
+
+         /* Sample this packet header. */
+         u32 hdr = bN->current_length;
+         if (hdr > smp->headerB)
+           hdr = smp->headerB;
+
+         ethernet_header_t *en = vlib_buffer_get_current (bN);
+         u32 if_index = vnet_buffer (bN)->sw_if_index[VLIB_RX];
+         vnet_hw_interface_t *hw =
+           vnet_get_sup_hw_interface (smp->vnet_main, if_index);
+         if (hw)
+           if_index = hw->hw_if_index;
+         else
+           {
+             // TODO: can we get interfaces that have no hw interface?
+             // If so,  should we ignore the sample?
+           }
+
+         sflow_sample_t sample = {
+           .samplingN = sfwk->smpN,
+           .input_if_index = if_index,
+           .sampled_packet_size =
+             bN->current_length + bN->total_length_not_including_first_buffer,
+           .header_bytes = hdr
+         };
+
+         // TODO: what bit in the buffer can we set right here to indicate
+         // that this packet was sampled (and perhaps another bit to say if it
+         // was dropped or sucessfully enqueued)? That way we can check it
+         // below if the packet is traced, and indicate that in the trace
+         // output.
+
+         // TODO: we end up copying the header twice here. Consider allowing
+         // the enqueue to be just a little more complex.  Like this:
+         // if(!sflow_fifo_enqueue(&sfwk->fifo, &sample, en, hdr).
+         // With headerB==128 that would be memcpy(,,24) plus memcpy(,,128)
+         // instead of the memcpy(,,128) plus memcpy(,,24+256) that we do
+         // here. (We also know that it could be done as a multiple of 8
+         // (aligned) bytes because the sflow_sample_t fields are (6xu32) and
+         // the headerB setting is quantized to the nearest 32 bytes, so there
+         // may be ways to make it even easier for the compiler.)
+         sfwk->smpl++;
+         memcpy (sample.header, en, hdr);
+         if (PREDICT_FALSE (!sflow_fifo_enqueue (&sfwk->fifo, &sample)))
+           sfwk->drop++;
+
+         pkts -= sfwk->skip;
+         sfwk->pool += sfwk->skip;
+         sfwk->skip = sflow_next_random_skip (sfwk);
+       }
+      /* We took a sample (or several) from this frame-vector, but now we are
+        skipping the rest. */
+      sfwk->skip -= pkts;
+      sfwk->pool += pkts;
+    }
+
+  /* the rest of this is boilerplate code just to make sure
+   * that packets are passed on the same way as they would
+   * have been if this node were not enabled.
+   * TODO: If there is ever a way to do this in one step
+   * (i.e. pass on the whole frame-vector unchanged) then it
+   * might help performance.
+   */
+
+  next_index = node->cached_next_index;
+
+  while (n_left_from > 0)
+    {
+      u32 n_left_to_next;
+
+      vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
+
+      while (n_left_from >= 8 && n_left_to_next >= 4)
+       {
+         u32 next0 = SFLOW_NEXT_ETHERNET_INPUT;
+         u32 next1 = SFLOW_NEXT_ETHERNET_INPUT;
+         u32 next2 = SFLOW_NEXT_ETHERNET_INPUT;
+         u32 next3 = SFLOW_NEXT_ETHERNET_INPUT;
+         ethernet_header_t *en0, *en1, *en2, *en3;
+         u32 bi0, bi1, bi2, bi3;
+         vlib_buffer_t *b0, *b1, *b2, *b3;
+
+         /* Prefetch next iteration. */
+         {
+           vlib_buffer_t *p4, *p5, *p6, *p7;
+
+           p4 = vlib_get_buffer (vm, from[4]);
+           p5 = vlib_get_buffer (vm, from[5]);
+           p6 = vlib_get_buffer (vm, from[6]);
+           p7 = vlib_get_buffer (vm, from[7]);
+
+           vlib_prefetch_buffer_header (p4, LOAD);
+           vlib_prefetch_buffer_header (p5, LOAD);
+           vlib_prefetch_buffer_header (p6, LOAD);
+           vlib_prefetch_buffer_header (p7, LOAD);
+
+           CLIB_PREFETCH (p4->data, CLIB_CACHE_LINE_BYTES, STORE);
+           CLIB_PREFETCH (p5->data, CLIB_CACHE_LINE_BYTES, STORE);
+           CLIB_PREFETCH (p6->data, CLIB_CACHE_LINE_BYTES, STORE);
+           CLIB_PREFETCH (p7->data, CLIB_CACHE_LINE_BYTES, STORE);
+         }
+
+         /* speculatively enqueue b0-b3 to the current next frame */
+         to_next[0] = bi0 = from[0];
+         to_next[1] = bi1 = from[1];
+         to_next[2] = bi2 = from[2];
+         to_next[3] = bi3 = from[3];
+         from += 4;
+         to_next += 4;
+         n_left_from -= 4;
+         n_left_to_next -= 4;
+
+         b0 = vlib_get_buffer (vm, bi0);
+         b1 = vlib_get_buffer (vm, bi1);
+         b2 = vlib_get_buffer (vm, bi2);
+         b3 = vlib_get_buffer (vm, bi3);
+
+         /* do this to always pass on to the next node on feature arc */
+         vnet_feature_next (&next0, b0);
+         vnet_feature_next (&next1, b1);
+         vnet_feature_next (&next2, b2);
+         vnet_feature_next (&next3, b3);
+
+         ASSERT (b0->current_data == 0);
+         ASSERT (b1->current_data == 0);
+         ASSERT (b2->current_data == 0);
+         ASSERT (b3->current_data == 0);
+
+         en0 = vlib_buffer_get_current (b0);
+         en1 = vlib_buffer_get_current (b1);
+         en2 = vlib_buffer_get_current (b2);
+         en3 = vlib_buffer_get_current (b3);
+
+         if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED))
+           {
+             sflow_trace_t *t = vlib_add_trace (vm, node, b0, sizeof (*t));
+             t->sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_RX];
+             t->next_index = next0;
+             clib_memcpy (t->new_src_mac, en0->src_address,
+                          sizeof (t->new_src_mac));
+             clib_memcpy (t->new_dst_mac, en0->dst_address,
+                          sizeof (t->new_dst_mac));
+           }
+
+         if (PREDICT_FALSE (b1->flags & VLIB_BUFFER_IS_TRACED))
+           {
+             sflow_trace_t *t = vlib_add_trace (vm, node, b1, sizeof (*t));
+             t->sw_if_index = vnet_buffer (b1)->sw_if_index[VLIB_RX];
+             t->next_index = next1;
+             clib_memcpy (t->new_src_mac, en1->src_address,
+                          sizeof (t->new_src_mac));
+             clib_memcpy (t->new_dst_mac, en1->dst_address,
+                          sizeof (t->new_dst_mac));
+           }
+
+         if (PREDICT_FALSE (b2->flags & VLIB_BUFFER_IS_TRACED))
+           {
+             sflow_trace_t *t = vlib_add_trace (vm, node, b2, sizeof (*t));
+             t->sw_if_index = vnet_buffer (b2)->sw_if_index[VLIB_RX];
+             t->next_index = next2;
+             clib_memcpy (t->new_src_mac, en2->src_address,
+                          sizeof (t->new_src_mac));
+             clib_memcpy (t->new_dst_mac, en2->dst_address,
+                          sizeof (t->new_dst_mac));
+           }
+
+         if (PREDICT_FALSE (b3->flags & VLIB_BUFFER_IS_TRACED))
+           {
+             sflow_trace_t *t = vlib_add_trace (vm, node, b3, sizeof (*t));
+             t->sw_if_index = vnet_buffer (b3)->sw_if_index[VLIB_RX];
+             t->next_index = next3;
+             clib_memcpy (t->new_src_mac, en3->src_address,
+                          sizeof (t->new_src_mac));
+             clib_memcpy (t->new_dst_mac, en3->dst_address,
+                          sizeof (t->new_dst_mac));
+           }
+
+         /* verify speculative enqueues, maybe switch current next frame */
+         vlib_validate_buffer_enqueue_x4 (vm, node, next_index, to_next,
+                                          n_left_to_next, bi0, bi1, bi2, bi3,
+                                          next0, next1, next2, next3);
+       }
+
+      while (n_left_from > 0 && n_left_to_next > 0)
+       {
+         u32 bi0;
+         vlib_buffer_t *b0;
+         u32 next0 = SFLOW_NEXT_ETHERNET_INPUT;
+         ethernet_header_t *en0;
+
+         /* speculatively enqueue b0 to the current next frame */
+         bi0 = from[0];
+         to_next[0] = bi0;
+         from += 1;
+         to_next += 1;
+         n_left_from -= 1;
+         n_left_to_next -= 1;
+
+         b0 = vlib_get_buffer (vm, bi0);
+
+         /* do this to always pass on to the next node on feature arc */
+         vnet_feature_next (&next0, b0);
+
+         /*
+          * Direct from the driver, we should be at offset 0
+          * aka at &b0->data[0]
+          */
+         ASSERT (b0->current_data == 0);
+
+         en0 = vlib_buffer_get_current (b0);
+
+         if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED))
+           {
+             sflow_trace_t *t = vlib_add_trace (vm, node, b0, sizeof (*t));
+             t->sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_RX];
+             t->next_index = next0;
+             clib_memcpy (t->new_src_mac, en0->src_address,
+                          sizeof (t->new_src_mac));
+             clib_memcpy (t->new_dst_mac, en0->dst_address,
+                          sizeof (t->new_dst_mac));
+           }
+
+         /* verify speculative enqueue, maybe switch current next frame */
+         vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next,
+                                          n_left_to_next, bi0, next0);
+       }
+
+      vlib_put_next_frame (vm, node, next_index, n_left_to_next);
+    }
+  return frame->n_vectors;
+}
+
+#ifndef CLIB_MARCH_VARIANT
+VLIB_REGISTER_NODE (sflow_node) =
+{
+  .name = "sflow",
+  .vector_size = sizeof (u32),
+  .format_trace = format_sflow_trace,
+  .type = VLIB_NODE_TYPE_INTERNAL,
+  .n_errors = ARRAY_LEN(sflow_error_strings),
+  .error_strings = sflow_error_strings,
+  .n_next_nodes = SFLOW_N_NEXT,
+  /* edit / add dispositions here */
+  .next_nodes = {
+    [SFLOW_NEXT_ETHERNET_INPUT] = "ethernet-input",
+  },
+};
+#endif /* CLIB_MARCH_VARIANT */
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/plugins/sflow/sflow.api b/src/plugins/sflow/sflow.api
new file mode 100644 (file)
index 0000000..e5f3300
--- /dev/null
@@ -0,0 +1,198 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file sflow.api
+ * @brief VPP control-plane API messages.
+ *
+ * This file defines VPP control-plane binary API messages which are generally
+ * called through a shared memory interface.
+ */
+
+/* Version and type recitations */
+
+option version = "0.1.0";
+import "vnet/interface_types.api";
+
+
+/** @brief API to enable / disable sflow
+    @param client_index - opaque cookie to identify the sender
+    @param context - sender context, to match reply w/ request
+    @param enable_disable - 1 to enable, 0 to disable the feature
+    @param hw_if_index - hardware interface handle
+*/
+
+autoreply define sflow_enable_disable {
+    /* Client identifier, set from api_main.my_client_index */
+    u32 client_index;
+
+    /* Arbitrary context, so client can match reply to request */
+    u32 context;
+
+    /* Enable / disable the feature */
+    bool enable_disable;
+
+    /* Interface handle */
+    vl_api_interface_index_t hw_if_index;
+};
+
+/** @brief API to get sflow sampling-rate
+    @param client_index - opaque cookie to identify the sender
+    @param context - sender context, to match reply w/ request
+*/
+
+define sflow_sampling_rate_get {
+    /* Client identifier, set from api_main.my_client_index */
+    u32 client_index;
+
+    /* Arbitrary context, so client can match reply to request */
+    u32 context;
+};
+
+/** \brief API go the sflow sampling-rate
+    @param client_index - opaque cookie to identify the sender
+    @param context - sender context, to match reply w/ request
+    @param sampling_N - the current 1-in-N sampling rate
+*/
+
+define sflow_sampling_rate_get_reply
+{
+  u32 context;
+  u32 sampling_N;
+  option in_progress;
+};
+
+/** @brief API to set sflow sampling-rate
+    @param client_index - opaque cookie to identify the sender
+    @param context - sender context, to match reply w/ request
+    @param sampling_N - 1-in-N random sampling rate
+*/
+
+autoreply define sflow_sampling_rate_set {
+    /* Client identifier, set from api_main.my_client_index */
+    u32 client_index;
+
+    /* Arbitrary context, so client can match reply to request */
+    u32 context;
+
+    /* Sampling_N */
+    u32 sampling_N [default=10000];
+};
+
+/** @brief API to set sflow polling-interval
+    @param client_index - opaque cookie to identify the sender
+    @param context - sender context, to match reply w/ request
+    @param polling_S - polling interval in seconds
+*/
+
+autoreply define sflow_polling_interval_set {
+    /* Client identifier, set from api_main.my_client_index */
+    u32 client_index;
+
+    /* Arbitrary context, so client can match reply to request */
+    u32 context;
+
+    /* Polling_S */
+    u32 polling_S [default=20];
+};
+
+/** @brief API to get sflow polling-interval
+    @param client_index - opaque cookie to identify the sender
+    @param context - sender context, to match reply w/ request
+*/
+
+define sflow_polling_interval_get {
+    /* Client identifier, set from api_main.my_client_index */
+    u32 client_index;
+
+    /* Arbitrary context, so client can match reply to request */
+    u32 context;
+};
+
+/** \brief API go the sflow polling-interval
+    @param client_index - opaque cookie to identify the sender
+    @param context - sender context, to match reply w/ request
+    @param polling_S - current polling interval in seconds
+*/
+
+define sflow_polling_interval_get_reply
+{
+  u32 context;
+  u32 polling_S;
+  option in_progress;
+};
+
+/** @brief API to set sflow header-bytes
+    @param client_index - opaque cookie to identify the sender
+    @param context - sender context, to match reply w/ request
+    @param header_B - max header length in bytes
+*/
+
+autoreply define sflow_header_bytes_set {
+    /* Client identifier, set from api_main.my_client_index */
+    u32 client_index;
+
+    /* Arbitrary context, so client can match reply to request */
+    u32 context;
+
+    /* header_B */
+    u32 header_B [default=128];
+};
+
+/** @brief API to get sflow header-bytes
+    @param client_index - opaque cookie to identify the sender
+    @param context - sender context, to match reply w/ request
+*/
+
+define sflow_header_bytes_get {
+    /* Client identifier, set from api_main.my_client_index */
+    u32 client_index;
+
+    /* Arbitrary context, so client can match reply to request */
+    u32 context;
+};
+
+/** \brief API go the sflow header-bytes
+    @param client_index - opaque cookie to identify the sender
+    @param context - sender context, to match reply w/ request
+    @param header_B - current maximum header length in bytes
+*/
+
+define sflow_header_bytes_get_reply
+{
+  u32 context;
+  u32 header_B;
+  option in_progress;
+};
+
+/** \brief Dump sflow enabled interface(s)
+    @param client_index - opaque cookie to identify the sender
+    @param hw_if_index - hw_if_index of a specific interface, or -1 (default)
+                         to return all sflow enabled interfaces
+*/
+define sflow_interface_dump
+{
+  u32 client_index;
+  u32 context;
+  vl_api_interface_index_t hw_if_index [default=0xffffffff];
+};
+
+/** \brief sflow enabled interface details
+*/
+define sflow_interface_details
+{
+  u32 context;
+  vl_api_interface_index_t hw_if_index;
+};
diff --git a/src/plugins/sflow/sflow.c b/src/plugins/sflow/sflow.c
new file mode 100644 (file)
index 0000000..ad8cf7c
--- /dev/null
@@ -0,0 +1,1050 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vnet/vnet.h>
+#include <vnet/plugin/plugin.h>
+#include <sflow/sflow.h>
+
+#include <vlibapi/api.h>
+#include <vlibmemory/api.h>
+#include <vpp/app/version.h>
+#include <stdbool.h>
+
+#include <sflow/sflow.api_enum.h>
+#include <sflow/sflow.api_types.h>
+#include <sflow/sflow_psample.h>
+
+#include <vpp-api/client/stat_client.h>
+#include <vlib/stats/stats.h>
+
+#define REPLY_MSG_ID_BASE smp->msg_id_base
+#include <vlibapi/api_helper_macros.h>
+
+sflow_main_t sflow_main;
+vlib_log_class_t sflow_logger;
+
+static void
+sflow_stat_segment_client_init (void)
+{
+  stat_client_main_t *scm = &stat_client_main;
+  vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+  uword size;
+
+  size = sm->memory_size ? sm->memory_size : STAT_SEGMENT_DEFAULT_SIZE;
+  scm->memory_size = size;
+  scm->shared_header = sm->shared_header;
+  scm->directory_vector =
+    stat_segment_adjust (scm, (void *) scm->shared_header->directory_vector);
+}
+
+static void
+update_counter_vector_simple (stat_segment_data_t *res,
+                             sflow_counters_t *ifCtrs, u32 hw_if_index)
+{
+  for (int th = 0; th < vec_len (res->simple_counter_vec); th++)
+    {
+      for (int intf = 0; intf < vec_len (res->simple_counter_vec[th]); intf++)
+       {
+         if (intf == hw_if_index)
+           {
+             u64 count = res->simple_counter_vec[th][intf];
+             if (count)
+               {
+                 if (strcmp (res->name, "/if/rx-error") == 0)
+                   ifCtrs->rx.errs += count;
+                 else if (strcmp (res->name, "/if/tx-error") == 0)
+                   ifCtrs->tx.errs += count;
+                 else if (strcmp (res->name, "/if/drops") == 0)
+                   ifCtrs->tx.drps += count;
+                 else if (strcmp (res->name, "/if/rx-miss") == 0 ||
+                          strcmp (res->name, "/if/rx-no-buf") == 0)
+                   ifCtrs->rx.drps += count;
+               }
+           }
+       }
+    }
+}
+
+static void
+update_counter_vector_combined (stat_segment_data_t *res,
+                               sflow_counters_t *ifCtrs, u32 hw_if_index)
+{
+  for (int th = 0; th < vec_len (res->simple_counter_vec); th++)
+    {
+      for (int intf = 0; intf < vec_len (res->combined_counter_vec[th]);
+          intf++)
+       {
+         if (intf == hw_if_index)
+           {
+             u64 pkts = res->combined_counter_vec[th][intf].packets;
+             u64 byts = res->combined_counter_vec[th][intf].bytes;
+             if (pkts || byts)
+               {
+                 if (strcmp (res->name, "/if/rx") == 0)
+                   {
+                     ifCtrs->rx.pkts += pkts;
+                     ifCtrs->rx.byts += byts;
+                   }
+                 else if (strcmp (res->name, "/if/tx") == 0)
+                   {
+                     ifCtrs->tx.byts += byts;
+                     ifCtrs->tx.pkts += pkts;
+                   }
+                 // TODO: do multicasts include broadcasts, or are they
+                 // counted separately? (test with traffic)
+                 else if (strcmp (res->name, "/if/rx-multicast") == 0)
+                   ifCtrs->rx.m_pkts += pkts;
+                 else if (strcmp (res->name, "/if/tx-multicast") == 0)
+                   ifCtrs->tx.m_pkts += pkts;
+                 else if (strcmp (res->name, "/if/rx-broadcast") == 0)
+                   ifCtrs->rx.b_pkts += pkts;
+                 else if (strcmp (res->name, "/if/tx-broadcast") == 0)
+                   ifCtrs->tx.b_pkts += pkts;
+               }
+           }
+       }
+    }
+}
+
+static int
+startsWith (u8 *str, char *prefix)
+{
+  if (str && prefix)
+    {
+      int len1 = vec_len (str);
+      int len2 = strlen (prefix);
+      if (len1 >= len2)
+       {
+         return (memcmp (str, prefix, len2) == 0);
+       }
+    }
+  return false;
+}
+
+static void
+update_counters (sflow_main_t *smp, sflow_per_interface_data_t *sfif)
+{
+  vnet_sw_interface_t *sw =
+    vnet_get_sw_interface (smp->vnet_main, sfif->sw_if_index);
+  vnet_hw_interface_t *hw =
+    vnet_get_hw_interface (smp->vnet_main, sfif->hw_if_index);
+  // This gives us a list of stat integers
+  u32 *stats = stat_segment_ls (NULL);
+  stat_segment_data_t *res = NULL;
+  // read vector of stat_segment_data_t objects
+retry:
+  res = stat_segment_dump (stats);
+  if (res == NULL)
+    {
+      /* Memory layout has changed */
+      if (stats)
+       vec_free (stats);
+      stats = stat_segment_ls (NULL);
+      goto retry;
+    }
+  sflow_counters_t ifCtrs = {};
+  // and accumulate the (per-thread) entries for this interface
+  for (int ii = 0; ii < vec_len (res); ii++)
+    {
+      switch (res[ii].type)
+       {
+       case STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE:
+         update_counter_vector_simple (&res[ii], &ifCtrs, sfif->hw_if_index);
+         break;
+       case STAT_DIR_TYPE_COUNTER_VECTOR_COMBINED:
+         update_counter_vector_combined (&res[ii], &ifCtrs,
+                                         sfif->hw_if_index);
+         break;
+       case STAT_DIR_TYPE_SCALAR_INDEX:
+       case STAT_DIR_TYPE_NAME_VECTOR:
+       case STAT_DIR_TYPE_EMPTY:
+       default:
+         break;
+       }
+    }
+  stat_segment_data_free (res);
+  vec_free (stats);
+  // send the structure via netlink
+  SFLOWUSSpec spec = {};
+  SFLOWUSSpec_setMsgType (&spec, SFLOW_VPP_MSG_IF_COUNTERS);
+  SFLOWUSSpec_setAttr (&spec, SFLOW_VPP_ATTR_PORTNAME, hw->name,
+                      vec_len (hw->name));
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_IFINDEX, sfif->hw_if_index);
+  if (sfif->linux_if_index)
+    {
+      // We know the corresponding Linux ifIndex for this interface, so include
+      // that here.
+      SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_OSINDEX,
+                             sfif->linux_if_index);
+    }
+
+  // Report consistent with vpp-snmp-agent
+  u64 ifSpeed = (hw->link_speed == ~0) ? 0 : (hw->link_speed * 1000);
+  if (startsWith (hw->name, "loop") || startsWith (hw->name, "tap"))
+    ifSpeed = 1e9;
+
+  u32 ifType = startsWith (hw->name, "loop") ? 24 // softwareLoopback
+                                              :
+                                              6; // ethernetCsmacd
+
+  u32 ifDirection = (hw->flags & VNET_HW_INTERFACE_FLAG_HALF_DUPLEX) ?
+                     2 // half-duplex
+                     :
+                     1; // full-duplex
+
+  u32 operUp = (hw->flags & VNET_HW_INTERFACE_FLAG_LINK_UP) ? 1 : 0;
+  u32 adminUp = (sw->flags & VNET_SW_INTERFACE_FLAG_ADMIN_UP) ? 1 : 0;
+
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_IFSPEED, ifSpeed);
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_IFTYPE, ifType);
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_IFDIRECTION, ifDirection);
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_OPER_UP, operUp);
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_ADMIN_UP, adminUp);
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_RX_OCTETS, ifCtrs.rx.byts);
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_TX_OCTETS, ifCtrs.tx.byts);
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_RX_PKTS, ifCtrs.rx.pkts);
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_TX_PKTS, ifCtrs.tx.pkts);
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_RX_MCASTS, ifCtrs.rx.m_pkts);
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_TX_MCASTS, ifCtrs.tx.m_pkts);
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_RX_BCASTS, ifCtrs.rx.b_pkts);
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_TX_BCASTS, ifCtrs.tx.b_pkts);
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_RX_ERRORS, ifCtrs.rx.errs);
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_TX_ERRORS, ifCtrs.tx.errs);
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_RX_DISCARDS, ifCtrs.rx.drps);
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_TX_DISCARDS, ifCtrs.tx.drps);
+  SFLOWUSSpec_setAttr (&spec, SFLOW_VPP_ATTR_HW_ADDRESS, hw->hw_address,
+                      vec_len (hw->hw_address));
+  smp->unixsock_seq++;
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_SEQ, smp->unixsock_seq);
+  if (SFLOWUSSpec_send (&smp->sflow_usersock, &spec) < 0)
+    smp->csample_send_drops++;
+  smp->csample_send++;
+}
+
+static u32
+total_drops (sflow_main_t *smp)
+{
+  // sum sendmsg and worker-fifo drops
+  u32 all_drops = smp->psample_send_drops;
+  for (u32 thread_index = 0; thread_index < smp->total_threads; thread_index++)
+    {
+      sflow_per_thread_data_t *sfwk =
+       vec_elt_at_index (smp->per_thread_data, thread_index);
+      all_drops += sfwk->drop;
+    }
+  return all_drops;
+}
+
+static void
+send_sampling_status_info (sflow_main_t *smp)
+{
+  SFLOWUSSpec spec = {};
+  u32 all_pipeline_drops = total_drops (smp);
+  SFLOWUSSpec_setMsgType (&spec, SFLOW_VPP_MSG_STATUS);
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_UPTIME_S, smp->now_mono_S);
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_DROPS, all_pipeline_drops);
+  ++smp->unixsock_seq;
+  SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_SEQ, smp->unixsock_seq);
+  SFLOWUSSpec_send (&smp->sflow_usersock, &spec);
+}
+
+static int
+counter_polling_check (sflow_main_t *smp)
+{
+  // see if we should poll one or more interfaces
+  int polled = 0;
+  for (int ii = 0; ii < vec_len (smp->per_interface_data); ii++)
+    {
+      sflow_per_interface_data_t *sfif =
+       vec_elt_at_index (smp->per_interface_data, ii);
+      if (sfif && sfif->sflow_enabled &&
+         (sfif->polled == 0 // always send the first time
+          || (smp->now_mono_S % smp->pollingS) ==
+               (sfif->hw_if_index % smp->pollingS)))
+       {
+         update_counters (smp, sfif);
+         sfif->polled++;
+         polled++;
+       }
+    }
+  return polled;
+}
+
+static u32
+read_worker_fifos (sflow_main_t *smp)
+{
+  // Our maximum samples/sec is approximately:
+  // (SFLOW_READ_BATCH * smp->total_threads) / SFLOW_POLL_WAIT_S
+  // but it may also be affected by SFLOW_FIFO_DEPTH
+  // and whether vlib_process_wait_for_event_or_clock() really waits for
+  // SFLOW_POLL_WAIT_S every time.
+  // If there are too many samples then dropping them as early as possible
+  // (and as randomly as possible) is preferred, so SFLOW_FIFO_DEPTH should not
+  // be any bigger than it strictly needs to be. If there is a system
+  // bottleneck it could be in the PSAMPLE netlink channel, the hsflowd
+  // encoder, the UDP stack, the network path, the collector, or a faraway
+  // application. Any kind of "clipping" will result in systematic bias so we
+  // try to make this fair even when it's running hot. For example, we'll
+  // round-robin the thread FIFO dequeues here to make sure we give them equal
+  // access to the PSAMPLE channel. Another factor in sizing SFLOW_FIFO_DEPTH
+  // is to ensure that we can absorb a short-term line-rate burst without
+  // dropping samples. This implies a deeper FIFO. In fact it looks like this
+  // requirement ends up being the dominant one. A value of SFLOW_FIFO_DEPTH
+  // that will absorb an n-second line-rate burst may well result in the max
+  // sustainable samples/sec being higher than we really need. But it's not a
+  // serious problem because the samples are packed into UDP datagrams and the
+  // network or collector can drop those anywhere they need to. The protocol is
+  // designed to be tolerant to random packet-loss in transit. For example, 1%
+  // loss should just make it look like the sampling-rate setting was 1:10100
+  // instead of 1:10000.
+  u32 batch = 0;
+  for (; batch < SFLOW_READ_BATCH; batch++)
+    {
+      u32 psample_send = 0, psample_send_fail = 0;
+      for (u32 thread_index = 0; thread_index < smp->total_threads;
+          thread_index++)
+       {
+         sflow_per_thread_data_t *sfwk =
+           vec_elt_at_index (smp->per_thread_data, thread_index);
+         sflow_sample_t sample;
+         if (sflow_fifo_dequeue (&sfwk->fifo, &sample))
+           {
+             if (sample.header_bytes > smp->headerB)
+               {
+                 // We get here if header-bytes setting is reduced dynamically
+                 // and a sample that was in the FIFO appears with a larger
+                 // header.
+                 continue;
+               }
+             SFLOWPSSpec spec = {};
+             u32 ps_group = SFLOW_VPP_PSAMPLE_GROUP_INGRESS;
+             u32 seqNo = ++smp->psample_seq_ingress;
+             // TODO: is it always ethernet? (affects ifType counter as well)
+             u16 header_protocol = 1; /* ethernet */
+             SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_SAMPLE_GROUP,
+                                     ps_group);
+             SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_IIFINDEX,
+                                     sample.input_if_index);
+             SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_OIFINDEX,
+                                     sample.output_if_index);
+             SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_ORIGSIZE,
+                                     sample.sampled_packet_size);
+             SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_GROUP_SEQ,
+                                     seqNo);
+             SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_SAMPLE_RATE,
+                                     sample.samplingN);
+             SFLOWPSSpec_setAttr (&spec, SFLOWPS_PSAMPLE_ATTR_DATA,
+                                  sample.header, sample.header_bytes);
+             SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_PROTO,
+                                     header_protocol);
+             psample_send++;
+             if (SFLOWPSSpec_send (&smp->sflow_psample, &spec) < 0)
+               psample_send_fail++;
+           }
+       }
+      if (psample_send == 0)
+       {
+         // nothing found on FIFOs this time through, so terminate batch early
+         break;
+       }
+      else
+       {
+         vlib_node_increment_counter (smp->vlib_main, sflow_node.index,
+                                      SFLOW_ERROR_PSAMPLE_SEND, psample_send);
+         if (psample_send_fail > 0)
+           {
+             vlib_node_increment_counter (smp->vlib_main, sflow_node.index,
+                                          SFLOW_ERROR_PSAMPLE_SEND_FAIL,
+                                          psample_send_fail);
+             smp->psample_send_drops += psample_send_fail;
+           }
+       }
+    }
+  return batch;
+}
+
+static void
+read_node_counters (sflow_main_t *smp, sflow_err_ctrs_t *ctrs)
+{
+  for (u32 ec = 0; ec < SFLOW_N_ERROR; ec++)
+    ctrs->counters[ec] = 0;
+  for (u32 thread_index = 0; thread_index < smp->total_threads; thread_index++)
+    {
+      sflow_per_thread_data_t *sfwk =
+       vec_elt_at_index (smp->per_thread_data, thread_index);
+      ctrs->counters[SFLOW_ERROR_PROCESSED] += sfwk->pool;
+      ctrs->counters[SFLOW_ERROR_SAMPLED] += sfwk->smpl;
+      ctrs->counters[SFLOW_ERROR_DROPPED] += sfwk->drop;
+    }
+}
+
+static void
+update_node_cntr (sflow_main_t *smp, sflow_err_ctrs_t *prev,
+                 sflow_err_ctrs_t *latest, sflow_error_t ee)
+{
+  u32 delta = latest->counters[ee] - prev->counters[ee];
+  vlib_node_increment_counter (smp->vlib_main, sflow_node.index, ee, delta);
+}
+
+static void
+update_node_counters (sflow_main_t *smp, sflow_err_ctrs_t *prev,
+                     sflow_err_ctrs_t *latest)
+{
+  update_node_cntr (smp, prev, latest, SFLOW_ERROR_PROCESSED);
+  update_node_cntr (smp, prev, latest, SFLOW_ERROR_SAMPLED);
+  update_node_cntr (smp, prev, latest, SFLOW_ERROR_DROPPED);
+  *prev = *latest; // latch for next time
+}
+
+static uword
+sflow_process_samples (vlib_main_t *vm, vlib_node_runtime_t *node,
+                      vlib_frame_t *frame)
+{
+  sflow_main_t *smp = &sflow_main;
+  clib_time_t ctm;
+  clib_time_init (&ctm);
+
+  sflow_err_ctrs_t prev = {};
+  read_node_counters (smp, &prev);
+
+  while (1)
+    {
+
+      // We don't have anything for the main loop to edge-trigger on, so
+      // we are just asking to be called back regularly.  More regularly
+      // if sFlow is actually enabled...
+      f64 poll_wait_S = smp->running ? SFLOW_POLL_WAIT_S : 1.0;
+      vlib_process_wait_for_event_or_clock (vm, poll_wait_S);
+      if (!smp->running)
+       {
+         // Nothing to do. Just yield again.
+         continue;
+       }
+
+#ifdef SFLOW_USE_VAPI
+#ifdef SFLOW_TEST_HAMMER_VAPI
+      sflow_vapi_check_for_linux_if_index_results (&smp->vac,
+                                                  smp->per_interface_data);
+      sflow_vapi_read_linux_if_index_numbers (&smp->vac,
+                                             smp->per_interface_data);
+#endif
+#endif
+
+      // PSAMPLE channel may need extra step (e.g. to learn family_id)
+      // before it is ready to send
+      EnumSFLOWPSState psState = SFLOWPS_state (&smp->sflow_psample);
+      if (psState != SFLOWPS_STATE_READY)
+       {
+         SFLOWPS_open_step (&smp->sflow_psample);
+       }
+
+      // What we want is a monotonic, per-second clock. This seems to do it
+      // because it is based on the CPU clock.
+      f64 tnow = clib_time_now (&ctm);
+      u32 tnow_S = (u32) tnow;
+      if (tnow_S != smp->now_mono_S)
+       {
+         // second rollover
+         smp->now_mono_S = tnow_S;
+#ifdef SFLOW_USE_VAPI
+         if (!smp->vac.vapi_unavailable)
+           {
+             // look up linux if_index numbers
+             sflow_vapi_check_for_linux_if_index_results (
+               &smp->vac, smp->per_interface_data);
+             if (smp->vapi_requests == 0 ||
+                 (tnow_S % SFLOW_VAPI_POLL_INTERVAL) == 0)
+               {
+                 if (sflow_vapi_read_linux_if_index_numbers (
+                       &smp->vac, smp->per_interface_data))
+                   {
+                     smp->vapi_requests++;
+                   }
+               }
+           }
+#endif
+         // send status info
+         send_sampling_status_info (smp);
+         // poll counters for interfaces that are due
+         counter_polling_check (smp);
+       }
+      // process samples from workers
+      read_worker_fifos (smp);
+
+      // and sync the global counters
+      sflow_err_ctrs_t latest = {};
+      read_node_counters (smp, &latest);
+      update_node_counters (smp, &prev, &latest);
+    }
+  return 0;
+}
+
+VLIB_REGISTER_NODE (sflow_process_samples_node, static) = {
+  .function = sflow_process_samples,
+  .name = "sflow-process-samples",
+  .type = VLIB_NODE_TYPE_PROCESS,
+  .process_log2_n_stack_bytes = 17,
+};
+
+static void
+sflow_set_worker_sampling_state (sflow_main_t *smp)
+{
+  /* set up (or reset) sampling context for each thread */
+  vlib_thread_main_t *tm = &vlib_thread_main;
+  smp->total_threads = 1 + tm->n_threads;
+  vec_validate (smp->per_thread_data, smp->total_threads);
+  for (u32 thread_index = 0; thread_index < smp->total_threads; thread_index++)
+    {
+      sflow_per_thread_data_t *sfwk =
+       vec_elt_at_index (smp->per_thread_data, thread_index);
+      if (sfwk->smpN != smp->samplingN)
+       {
+         sfwk->smpN = smp->samplingN;
+         sfwk->seed = thread_index;
+         sfwk->skip = sflow_next_random_skip (sfwk);
+         SFLOW_DBG (
+           "sflowset_worker_sampling_state: samplingN=%u thread=%u skip=%u",
+           smp->samplingN, thread_index, sfwk->skip);
+       }
+    }
+}
+
+static void
+sflow_sampling_start (sflow_main_t *smp)
+{
+  SFLOW_INFO ("sflow_sampling_start");
+
+  smp->running = 1;
+  // Reset this clock so that the per-second netlink status updates
+  // will communicate a restart to hsflowd.  This helps to distinguish:
+  // (1) vpp restarted with sFlow off => no status updates (went quiet)
+  // (2) vpp restarted with default sFlow => status updates (starting again
+  // from 0)
+  smp->now_mono_S = 0;
+
+  // reset sequence numbers to indicated discontinuity
+  smp->psample_seq_ingress = 0;
+  smp->psample_seq_egress = 0;
+  smp->psample_send_drops = 0;
+
+  // reset vapi request count so that we make a request the first time
+  smp->vapi_requests = 0;
+
+  /* open PSAMPLE netlink channel for writing packet samples */
+  SFLOWPS_open (&smp->sflow_psample);
+  /* open USERSOCK netlink channel for writing counters */
+  SFLOWUS_open (&smp->sflow_usersock);
+  smp->sflow_usersock.group_id = SFLOW_NETLINK_USERSOCK_MULTICAST;
+  /* set up (or reset) sampling context for each thread */
+  sflow_set_worker_sampling_state (smp);
+}
+
+static void
+sflow_sampling_stop (sflow_main_t *smp)
+{
+  SFLOW_INFO ("sflow_sampling_stop");
+  smp->running = 0;
+  SFLOWPS_close (&smp->sflow_psample);
+  SFLOWUS_close (&smp->sflow_usersock);
+}
+
+static void
+sflow_sampling_start_stop (sflow_main_t *smp)
+{
+  int run = (smp->samplingN != 0 && smp->interfacesEnabled != 0);
+  if (run != smp->running)
+    {
+      if (run)
+       sflow_sampling_start (smp);
+      else
+       sflow_sampling_stop (smp);
+    }
+}
+
+int
+sflow_sampling_rate (sflow_main_t *smp, u32 samplingN)
+{
+  // TODO: this might be the right place to enforce the
+  // "2 significant" figures constraint so that per-interface
+  // sampling-rate settings can use HCF+sub-sampling efficiently.
+
+  if (smp->running && smp->samplingN && samplingN)
+    {
+      // dynamic change of sampling rate
+      smp->samplingN = samplingN;
+      sflow_set_worker_sampling_state (smp);
+    }
+  else
+    {
+      // potential on/off change
+      smp->samplingN = samplingN;
+      sflow_sampling_start_stop (smp);
+    }
+  return 0;
+}
+
+int
+sflow_polling_interval (sflow_main_t *smp, u32 pollingS)
+{
+  smp->pollingS = pollingS;
+  return 0;
+}
+
+int
+sflow_header_bytes (sflow_main_t *smp, u32 headerB)
+{
+  u32 hdrB = headerB;
+  // first round up to nearest multiple of SFLOW_HEADER_BYTES_STEP
+  // (which helps to make worker thread memcpy faster)
+  hdrB = ((hdrB + SFLOW_HEADER_BYTES_STEP - 1) / SFLOW_HEADER_BYTES_STEP) *
+        SFLOW_HEADER_BYTES_STEP;
+  // then check max/min
+  if (hdrB < SFLOW_MIN_HEADER_BYTES)
+    hdrB = SFLOW_MIN_HEADER_BYTES;
+  if (hdrB > SFLOW_MAX_HEADER_BYTES)
+    hdrB = SFLOW_MAX_HEADER_BYTES;
+  if (hdrB != headerB)
+    SFLOW_WARN ("header_bytes rounded from %u to %u\n", headerB, hdrB);
+  smp->headerB = hdrB;
+  return 0;
+}
+
+int
+sflow_enable_disable (sflow_main_t *smp, u32 sw_if_index, int enable_disable)
+{
+  vnet_sw_interface_t *sw;
+
+  /* Utterly wrong? */
+  if (pool_is_free_index (smp->vnet_main->interface_main.sw_interfaces,
+                         sw_if_index))
+    return VNET_API_ERROR_INVALID_SW_IF_INDEX;
+
+  /* Not a physical port? */
+  sw = vnet_get_sw_interface (smp->vnet_main, sw_if_index);
+  if (sw->type != VNET_SW_INTERFACE_TYPE_HARDWARE)
+    return VNET_API_ERROR_INVALID_SW_IF_INDEX;
+
+  // note: vnet_interface_main_t has "fast lookup table" called
+  // he_if_index_by_sw_if_index.
+  SFLOW_DBG ("sw_if_index=%u, sup_sw_if_index=%u, hw_if_index=%u\n",
+            sw->sw_if_index, sw->sup_sw_if_index, sw->hw_if_index);
+
+  // note: vnet_hw_interface_t has uword *bond_info
+  // (where 0=>none, ~0 => slave, other=>ptr to bitmap of slaves)
+
+  vec_validate (smp->per_interface_data, sw->hw_if_index);
+  sflow_per_interface_data_t *sfif =
+    vec_elt_at_index (smp->per_interface_data, sw->hw_if_index);
+  if (enable_disable == sfif->sflow_enabled)
+    {
+      // redundant enable or disable
+      return VNET_API_ERROR_VALUE_EXIST;
+    }
+  else
+    {
+      // OK, turn it on/off
+      sfif->sw_if_index = sw_if_index;
+      sfif->hw_if_index = sw->hw_if_index;
+      sfif->polled = 0;
+      sfif->sflow_enabled = enable_disable;
+      vnet_feature_enable_disable ("device-input", "sflow", sw_if_index,
+                                  enable_disable, 0, 0);
+      smp->interfacesEnabled += (enable_disable) ? 1 : -1;
+    }
+
+  sflow_sampling_start_stop (smp);
+  return 0;
+}
+
+static clib_error_t *
+sflow_sampling_rate_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                               vlib_cli_command_t *cmd)
+{
+  sflow_main_t *smp = &sflow_main;
+  u32 sampling_N = ~0;
+
+  int rv;
+
+  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (input, "%u", &sampling_N))
+       ;
+      else
+       break;
+    }
+
+  if (sampling_N == ~0)
+    return clib_error_return (0, "Please specify a sampling rate...");
+
+  rv = sflow_sampling_rate (smp, sampling_N);
+
+  switch (rv)
+    {
+    case 0:
+      break;
+    default:
+      return clib_error_return (0, "sflow_enable_disable returned %d", rv);
+    }
+  return 0;
+}
+
+static clib_error_t *
+sflow_polling_interval_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                                  vlib_cli_command_t *cmd)
+{
+  sflow_main_t *smp = &sflow_main;
+  u32 polling_S = ~0;
+
+  int rv;
+
+  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (input, "%u", &polling_S))
+       ;
+      else
+       break;
+    }
+
+  if (polling_S == ~0)
+    return clib_error_return (0, "Please specify a polling interval...");
+
+  rv = sflow_polling_interval (smp, polling_S);
+
+  switch (rv)
+    {
+    case 0:
+      break;
+    default:
+      return clib_error_return (0, "sflow_polling_interval returned %d", rv);
+    }
+  return 0;
+}
+
+static clib_error_t *
+sflow_header_bytes_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                              vlib_cli_command_t *cmd)
+{
+  sflow_main_t *smp = &sflow_main;
+  u32 header_B = ~0;
+
+  int rv;
+
+  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (input, "%u", &header_B))
+       ;
+      else
+       break;
+    }
+
+  if (header_B == ~0)
+    return clib_error_return (0, "Please specify a header bytes limit...");
+
+  rv = sflow_header_bytes (smp, header_B);
+
+  switch (rv)
+    {
+    case 0:
+      break;
+    default:
+      return clib_error_return (0, "sflow_header_bytes returned %d", rv);
+    }
+  return 0;
+}
+
+static clib_error_t *
+sflow_enable_disable_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                                vlib_cli_command_t *cmd)
+{
+  sflow_main_t *smp = &sflow_main;
+  u32 sw_if_index = ~0;
+  int enable_disable = 1;
+
+  int rv;
+
+  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (input, "disable"))
+       enable_disable = 0;
+      else if (unformat (input, "%U", unformat_vnet_sw_interface,
+                        smp->vnet_main, &sw_if_index))
+       ;
+      else
+       break;
+    }
+
+  if (sw_if_index == ~0)
+    return clib_error_return (0, "Please specify an interface...");
+
+  rv = sflow_enable_disable (smp, sw_if_index, enable_disable);
+
+  switch (rv)
+    {
+    case 0:
+      break;
+
+    case VNET_API_ERROR_INVALID_SW_IF_INDEX:
+      return clib_error_return (
+       0, "Invalid interface, only works on physical ports");
+      break;
+
+    case VNET_API_ERROR_UNIMPLEMENTED:
+      return clib_error_return (0,
+                               "Device driver doesn't support redirection");
+      break;
+
+    default:
+      return clib_error_return (0, "sflow_enable_disable returned %d", rv);
+    }
+  return 0;
+}
+
+static clib_error_t *
+show_sflow_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                      vlib_cli_command_t *cmd)
+{
+  sflow_main_t *smp = &sflow_main;
+  clib_error_t *error = NULL;
+  vlib_cli_output (vm, "sflow sampling-rate %u\n", smp->samplingN);
+  vlib_cli_output (vm, "sflow sampling-direction ingress\n");
+  vlib_cli_output (vm, "sflow polling-interval %u\n", smp->pollingS);
+  vlib_cli_output (vm, "sflow header-bytes %u\n", smp->headerB);
+  u32 itfs_enabled = 0;
+  for (int ii = 0; ii < vec_len (smp->per_interface_data); ii++)
+    {
+      sflow_per_interface_data_t *sfif =
+       vec_elt_at_index (smp->per_interface_data, ii);
+      if (sfif && sfif->sflow_enabled)
+       {
+         itfs_enabled++;
+         vnet_hw_interface_t *hw =
+           vnet_get_hw_interface (smp->vnet_main, sfif->hw_if_index);
+         vlib_cli_output (vm, "sflow enable %s\n", (char *) hw->name);
+       }
+    }
+  vlib_cli_output (vm, "Status\n");
+  vlib_cli_output (vm, "  interfaces enabled: %u\n", itfs_enabled);
+  vlib_cli_output (vm, "  packet samples sent: %u\n",
+                  smp->psample_seq_ingress + smp->psample_seq_egress);
+  vlib_cli_output (vm, "  packet samples dropped: %u\n", total_drops (smp));
+  vlib_cli_output (vm, "  counter samples sent: %u\n", smp->csample_send);
+  vlib_cli_output (vm, "  counter samples dropped: %u\n",
+                  smp->csample_send_drops);
+  return error;
+}
+
+VLIB_CLI_COMMAND (sflow_enable_disable_command, static) = {
+  .path = "sflow enable-disable",
+  .short_help = "sflow enable-disable <interface-name> [disable]",
+  .function = sflow_enable_disable_command_fn,
+};
+
+VLIB_CLI_COMMAND (sflow_sampling_rate_command, static) = {
+  .path = "sflow sampling-rate",
+  .short_help = "sflow sampling-rate <N>",
+  .function = sflow_sampling_rate_command_fn,
+};
+
+VLIB_CLI_COMMAND (sflow_polling_interval_command, static) = {
+  .path = "sflow polling-interval",
+  .short_help = "sflow polling-interval <S>",
+  .function = sflow_polling_interval_command_fn,
+};
+
+VLIB_CLI_COMMAND (sflow_header_bytes_command, static) = {
+  .path = "sflow header-bytes",
+  .short_help = "sflow header-bytes <B>",
+  .function = sflow_header_bytes_command_fn,
+};
+
+VLIB_CLI_COMMAND (show_sflow_command, static) = {
+  .path = "show sflow",
+  .short_help = "show sflow",
+  .function = show_sflow_command_fn,
+};
+
+/* API message handler */
+static void
+vl_api_sflow_enable_disable_t_handler (vl_api_sflow_enable_disable_t *mp)
+{
+  vl_api_sflow_enable_disable_reply_t *rmp;
+  sflow_main_t *smp = &sflow_main;
+  int rv;
+
+  rv = sflow_enable_disable (smp, ntohl (mp->hw_if_index),
+                            (int) (mp->enable_disable));
+
+  REPLY_MACRO (VL_API_SFLOW_ENABLE_DISABLE_REPLY);
+}
+
+static void
+vl_api_sflow_sampling_rate_set_t_handler (vl_api_sflow_sampling_rate_set_t *mp)
+{
+  vl_api_sflow_sampling_rate_set_reply_t *rmp;
+  sflow_main_t *smp = &sflow_main;
+  int rv;
+
+  rv = sflow_sampling_rate (smp, ntohl (mp->sampling_N));
+
+  REPLY_MACRO (VL_API_SFLOW_SAMPLING_RATE_SET_REPLY);
+}
+
+static void
+vl_api_sflow_sampling_rate_get_t_handler (vl_api_sflow_sampling_rate_get_t *mp)
+{
+  vl_api_sflow_sampling_rate_get_reply_t *rmp;
+  sflow_main_t *smp = &sflow_main;
+
+  REPLY_MACRO_DETAILS2 (VL_API_SFLOW_SAMPLING_RATE_GET_REPLY,
+                       ({ rmp->sampling_N = ntohl (smp->samplingN); }));
+}
+
+static void
+vl_api_sflow_polling_interval_set_t_handler (
+  vl_api_sflow_polling_interval_set_t *mp)
+{
+  vl_api_sflow_polling_interval_set_reply_t *rmp;
+  sflow_main_t *smp = &sflow_main;
+  int rv;
+
+  rv = sflow_polling_interval (smp, ntohl (mp->polling_S));
+
+  REPLY_MACRO (VL_API_SFLOW_POLLING_INTERVAL_SET_REPLY);
+}
+
+static void
+vl_api_sflow_polling_interval_get_t_handler (
+  vl_api_sflow_polling_interval_get_t *mp)
+{
+  vl_api_sflow_polling_interval_get_reply_t *rmp;
+  sflow_main_t *smp = &sflow_main;
+
+  REPLY_MACRO_DETAILS2 (VL_API_SFLOW_POLLING_INTERVAL_GET_REPLY,
+                       ({ rmp->polling_S = ntohl (smp->pollingS); }));
+}
+
+static void
+vl_api_sflow_header_bytes_set_t_handler (vl_api_sflow_header_bytes_set_t *mp)
+{
+  vl_api_sflow_header_bytes_set_reply_t *rmp;
+  sflow_main_t *smp = &sflow_main;
+  int rv;
+
+  rv = sflow_header_bytes (smp, ntohl (mp->header_B));
+
+  REPLY_MACRO (VL_API_SFLOW_HEADER_BYTES_SET_REPLY);
+}
+
+static void
+vl_api_sflow_header_bytes_get_t_handler (vl_api_sflow_header_bytes_get_t *mp)
+{
+  vl_api_sflow_header_bytes_get_reply_t *rmp;
+  sflow_main_t *smp = &sflow_main;
+
+  REPLY_MACRO_DETAILS2 (VL_API_SFLOW_HEADER_BYTES_GET_REPLY,
+                       ({ rmp->header_B = ntohl (smp->headerB); }));
+}
+
+static void
+send_sflow_interface_details (vpe_api_main_t *am, vl_api_registration_t *reg,
+                             u32 context, const u32 hw_if_index)
+{
+  vl_api_sflow_interface_details_t *mp;
+  sflow_main_t *smp = &sflow_main;
+
+  mp = vl_msg_api_alloc_zero (sizeof (*mp));
+  mp->_vl_msg_id = ntohs (REPLY_MSG_ID_BASE + VL_API_SFLOW_INTERFACE_DETAILS);
+  mp->context = context;
+
+  mp->hw_if_index = htonl (hw_if_index);
+  vl_api_send_msg (reg, (u8 *) mp);
+}
+
+static void
+vl_api_sflow_interface_dump_t_handler (vl_api_sflow_interface_dump_t *mp)
+{
+  vpe_api_main_t *am = &vpe_api_main;
+  sflow_main_t *smp = &sflow_main;
+  vl_api_registration_t *reg;
+  u32 hw_if_index = ~0;
+
+  reg = vl_api_client_index_to_registration (mp->client_index);
+  if (!reg)
+    return;
+  hw_if_index = ntohl (mp->hw_if_index);
+
+  for (int ii = 0; ii < vec_len (smp->per_interface_data); ii++)
+    {
+      sflow_per_interface_data_t *sfif =
+       vec_elt_at_index (smp->per_interface_data, ii);
+      if (sfif && sfif->sflow_enabled)
+       {
+         if (hw_if_index == ~0 || hw_if_index == sfif->hw_if_index)
+           {
+             send_sflow_interface_details (am, reg, mp->context,
+                                           sfif->hw_if_index);
+           }
+       }
+    }
+}
+
+/* API definitions */
+#include <sflow/sflow.api.c>
+
+static clib_error_t *
+sflow_init (vlib_main_t *vm)
+{
+  sflow_logger = vlib_log_register_class ("sflow", "all");
+
+  sflow_main_t *smp = &sflow_main;
+  clib_error_t *error = 0;
+
+  smp->vlib_main = vm;
+  smp->vnet_main = vnet_get_main ();
+
+  /* set default sampling-rate and polling-interval so that "enable" is all
+   * that is necessary */
+  smp->samplingN = SFLOW_DEFAULT_SAMPLING_N;
+  smp->pollingS = SFLOW_DEFAULT_POLLING_S;
+  smp->headerB = SFLOW_DEFAULT_HEADER_BYTES;
+
+  /* Add our API messages to the global name_crc hash table */
+  smp->msg_id_base = setup_message_id_table ();
+
+  /* access to counters - TODO: should this only happen on sflow enable? */
+  sflow_stat_segment_client_init ();
+  return error;
+}
+
+VLIB_INIT_FUNCTION (sflow_init);
+
+VNET_FEATURE_INIT (sflow, static) = {
+  .arc_name = "device-input",
+  .node_name = "sflow",
+  .runs_before = VNET_FEATURES ("ethernet-input"),
+};
+
+VLIB_PLUGIN_REGISTER () = {
+  .version = VPP_BUILD_VER,
+  .description = "sFlow random packet sampling",
+};
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/plugins/sflow/sflow.h b/src/plugins/sflow/sflow.h
new file mode 100644 (file)
index 0000000..18e2dab
--- /dev/null
@@ -0,0 +1,199 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __included_sflow_h__
+#define __included_sflow_h__
+
+#include <vnet/vnet.h>
+#include <vnet/ip/ip.h>
+#include <vnet/ethernet/ethernet.h>
+
+#include <vppinfra/hash.h>
+#include <vppinfra/error.h>
+#include <sflow/sflow_common.h>
+#include <sflow/sflow_vapi.h>
+#include <sflow/sflow_psample.h>
+#include <sflow/sflow_usersock.h>
+
+#define SFLOW_DEFAULT_SAMPLING_N   10000
+#define SFLOW_DEFAULT_POLLING_S           20
+#define SFLOW_DEFAULT_HEADER_BYTES 128
+#define SFLOW_MAX_HEADER_BYTES    256
+#define SFLOW_MIN_HEADER_BYTES    64
+#define SFLOW_HEADER_BYTES_STEP           32
+
+#define SFLOW_USE_VAPI
+#define SFLOW_FIFO_DEPTH  2048 // must be power of 2
+#define SFLOW_POLL_WAIT_S 0.001
+#define SFLOW_READ_BATCH  100
+
+// use PSAMPLE group number to distinguish VPP samples from others
+// (so that hsflowd will know to remap the ifIndex numbers if necessary)
+#define SFLOW_VPP_PSAMPLE_GROUP_INGRESS 3
+#define SFLOW_VPP_PSAMPLE_GROUP_EGRESS 4
+
+#define foreach_sflow_error                                                   \
+  _ (PROCESSED, "sflow packets processed")                                    \
+  _ (SAMPLED, "sflow packets sampled")                                        \
+  _ (DROPPED, "sflow packets dropped")                                        \
+  _ (PSAMPLE_SEND, "sflow PSAMPLE sent")                                      \
+  _ (PSAMPLE_SEND_FAIL, "sflow PSAMPLE send failed")
+
+typedef enum
+{
+#define _(sym, str) SFLOW_ERROR_##sym,
+  foreach_sflow_error
+#undef _
+    SFLOW_N_ERROR,
+} sflow_error_t;
+
+typedef struct
+{
+  u32 counters[SFLOW_N_ERROR];
+} sflow_err_ctrs_t;
+
+/* packet sample */
+typedef struct
+{
+  u32 samplingN;
+  u32 input_if_index;
+  u32 output_if_index;
+  u32 header_protocol;
+  u32 sampled_packet_size;
+  u32 header_bytes;
+  u8 header[SFLOW_MAX_HEADER_BYTES];
+} sflow_sample_t;
+
+// Define SPSC FIFO for sending samples worker-to-main.
+// (I did try to use VPP svm FIFO, but couldn't
+// understand why it was sometimes going wrong).
+typedef struct
+{
+  volatile u32 tx; // can change under consumer's feet
+  volatile u32 rx; // can change under producer's feet
+  sflow_sample_t samples[SFLOW_FIFO_DEPTH];
+} sflow_fifo_t;
+
+#define SFLOW_FIFO_NEXT(slot) ((slot + 1) & (SFLOW_FIFO_DEPTH - 1))
+static inline int
+sflow_fifo_enqueue (sflow_fifo_t *fifo, sflow_sample_t *sample)
+{
+  u32 curr_rx = clib_atomic_load_acq_n (&fifo->rx);
+  u32 curr_tx = fifo->tx; // clib_atomic_load_acq_n(&fifo->tx);
+  u32 next_tx = SFLOW_FIFO_NEXT (curr_tx);
+  if (next_tx == curr_rx)
+    return false; // full
+  memcpy (&fifo->samples[next_tx], sample, sizeof (*sample));
+  clib_atomic_store_rel_n (&fifo->tx, next_tx);
+  return true;
+}
+
+static inline int
+sflow_fifo_dequeue (sflow_fifo_t *fifo, sflow_sample_t *sample)
+{
+  u32 curr_rx = fifo->rx; // clib_atomic_load_acq_n(&fifo->rx);
+  u32 curr_tx = clib_atomic_load_acq_n (&fifo->tx);
+  if (curr_rx == curr_tx)
+    return false; // empty
+  memcpy (sample, &fifo->samples[curr_rx], sizeof (*sample));
+  u32 next_rx = SFLOW_FIFO_NEXT (curr_rx);
+  clib_atomic_store_rel_n (&fifo->rx, next_rx);
+  return true;
+}
+
+/* private to worker */
+typedef struct
+{
+  u32 smpN;
+  u32 skip;
+  u32 pool;
+  u32 seed;
+  u32 smpl;
+  u32 drop;
+  CLIB_CACHE_LINE_ALIGN_MARK (_fifo);
+  sflow_fifo_t fifo;
+} sflow_per_thread_data_t;
+
+typedef struct
+{
+  /* API message ID base */
+  u16 msg_id_base;
+
+  /* convenience */
+  vlib_main_t *vlib_main;
+  vnet_main_t *vnet_main;
+  ethernet_main_t *ethernet_main;
+
+  /* sampling state */
+  u32 samplingN;
+  u32 pollingS;
+  u32 headerB;
+  u32 total_threads;
+  sflow_per_interface_data_t *per_interface_data;
+  sflow_per_thread_data_t *per_thread_data;
+
+  /* psample channel (packet samples) */
+  SFLOWPS sflow_psample;
+  /* usersock channel (periodic counters) */
+  SFLOWUS sflow_usersock;
+#define SFLOW_NETLINK_USERSOCK_MULTICAST 29
+  /* dropmon channel (packet drops) */
+  // SFLOWDM sflow_dropmon;
+
+  /* sample-processing */
+  u32 now_mono_S;
+
+  /* running control */
+  int running;
+  u32 interfacesEnabled;
+
+  /* main-thread counters */
+  u32 psample_seq_ingress;
+  u32 psample_seq_egress;
+  u32 psample_send_drops;
+  u32 csample_send;
+  u32 csample_send_drops;
+  u32 unixsock_seq;
+
+  /* vapi query helper thread (transient) */
+  CLIB_CACHE_LINE_ALIGN_MARK (_vapi);
+  sflow_vapi_client_t vac;
+  int vapi_requests;
+
+} sflow_main_t;
+
+extern sflow_main_t sflow_main;
+
+extern vlib_node_registration_t sflow_node;
+
+static inline u32
+sflow_next_random_skip (sflow_per_thread_data_t *sfwk)
+{
+  /* skip==1 means "take the next packet" so this
+     fn must never return 0 */
+  if (sfwk->smpN <= 1)
+    return 1;
+  u32 lim = (2 * sfwk->smpN) - 1;
+  return (random_u32 (&sfwk->seed) % lim) + 1;
+}
+
+#endif /* __included_sflow_h__ */
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/plugins/sflow/sflow.rst b/src/plugins/sflow/sflow.rst
new file mode 100644 (file)
index 0000000..f9c1848
--- /dev/null
@@ -0,0 +1,61 @@
+.. _Sflow_agent:
+
+.. toctree::
+
+SFlow Monitoring Agent
+======================
+
+Overview
+________
+
+This plugin implements the random packet-sampling and interface
+telemetry streaming required to support standard sFlow export
+on Linux platforms. The overhead incurred by this monitoring is
+minimal, so that detailed, real-time traffic analysis can be
+achieved even under high load conditions, with visibility into
+any fields that appear in the packet headers. If the VPP linux-cp
+plugin is running then interfaces will be mapped to their
+equivalent Linux tap ports.
+
+Example Configuration
+_____________________
+
+::
+    sflow sampling-rate 10000
+    sflow polling-interval 20
+    sflow header-bytes 128
+    sflow enable GigabitEthernet0/8/0
+    sflow enable GigabitEthernet0/9/0
+    sflow enable GigabitEthernet0/a/0
+    ...
+    sflow enable GigabitEthernet0/a/0 disable
+
+Detailed notes
+______________
+
+Each VPP worker that has at least one interface, will create a FIFO
+and enqueues samples to it from the interfaces it is servicing that
+are enabled. There is a process running in the main thread that will
+dequeue the FIFOs periodically. If the FIFO is full, the worker will
+drop samples, which helps ensure that (a) the main thread is not
+overloaded with samples and (b) that individual workers and interfaces,
+even when under high load, can't crowd out other interfaces and workers.
+
+You can change the sampling-rate at runtime, but keep in mind that
+it is a global variable that applies to workers, not interfaces.
+This means that (1) all workers will sample at the same rate, and (2)
+if there are multiple interfaces assigned to a worker, they'll share
+the sampling rate which will undershoot, and similarly (3) if there
+are multiple RX queues assigned to more than one worker, the effective
+sampling rate will overshoot.
+
+External Dependencies
+_____________________
+
+This plugin writes packet samples to the standard Linux netlink PSAMPLE
+channel, so the kernel psample module must be loaded with modprobe or
+insmod. As such, this plugin only works for Linux environments.
+
+It also shares periodic interface counter samples vi netlink USERSOCK.
+The host-sflow daemon, hsflowd, at https://sflow.net is one example of
+a tool that will consume this feed and emit standard sFlow v5.
diff --git a/src/plugins/sflow/sflow_common.h b/src/plugins/sflow/sflow_common.h
new file mode 100644 (file)
index 0000000..1d5b908
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __included_sflow_common_h__
+#define __included_sflow_common_h__
+
+#define SFLOW_USE_VAPI
+
+extern vlib_log_class_t sflow_logger;
+#define SFLOW_DBG(...)   vlib_log_debug (sflow_logger, __VA_ARGS__);
+#define SFLOW_INFO(...)          vlib_log_info (sflow_logger, __VA_ARGS__);
+#define SFLOW_NOTICE(...) vlib_log_notice (sflow_logger, __VA_ARGS__);
+#define SFLOW_WARN(...)          vlib_log_warn (sflow_logger, __VA_ARGS__);
+#define SFLOW_ERR(...)   vlib_log_err (sflow_logger, __VA_ARGS__);
+
+typedef struct
+{
+  u32 sw_if_index;
+  u32 hw_if_index;
+  u32 linux_if_index;
+  u32 polled;
+  int sflow_enabled;
+} sflow_per_interface_data_t;
+
+#endif /* __included_sflow_common_h__ */
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/plugins/sflow/sflow_psample.c b/src/plugins/sflow/sflow_psample.c
new file mode 100644 (file)
index 0000000..0e4fcfb
--- /dev/null
@@ -0,0 +1,523 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if defined(__cplusplus)
+extern "C"
+{
+#endif
+
+#include <vlib/vlib.h>
+#include <vnet/vnet.h>
+#include <vnet/pg/pg.h>
+#include <vppinfra/error.h>
+#include <sflow/sflow.h>
+
+#include <fcntl.h>
+#include <asm/types.h>
+#include <sys/socket.h>
+#include <linux/types.h>
+#include <linux/netlink.h>
+#include <linux/genetlink.h>
+#include <linux/psample.h>
+#include <signal.h>
+#include <ctype.h>
+
+#include <sflow/sflow_psample.h>
+
+  /*_________________---------------------------__________________
+    _________________       fcntl utils         __________________
+    -----------------___________________________------------------
+  */
+
+  static void
+  setNonBlocking (int fd)
+  {
+    // set the socket to non-blocking
+    int fdFlags = fcntl (fd, F_GETFL);
+    fdFlags |= O_NONBLOCK;
+    if (fcntl (fd, F_SETFL, fdFlags) < 0)
+      {
+       SFLOW_ERR ("fcntl(O_NONBLOCK) failed: %s\n", strerror (errno));
+      }
+  }
+
+  static void
+  setCloseOnExec (int fd)
+  {
+    // make sure it doesn't get inherited, e.g. when we fork a script
+    int fdFlags = fcntl (fd, F_GETFD);
+    fdFlags |= FD_CLOEXEC;
+    if (fcntl (fd, F_SETFD, fdFlags) < 0)
+      {
+       SFLOW_ERR ("fcntl(F_SETFD=FD_CLOEXEC) failed: %s\n", strerror (errno));
+      }
+  }
+
+  static int
+  setSendBuffer (int fd, int requested)
+  {
+    int txbuf = 0;
+    socklen_t txbufsiz = sizeof (txbuf);
+    if (getsockopt (fd, SOL_SOCKET, SO_SNDBUF, &txbuf, &txbufsiz) < 0)
+      {
+       SFLOW_ERR ("getsockopt(SO_SNDBUF) failed: %s", strerror (errno));
+      }
+    if (txbuf < requested)
+      {
+       txbuf = requested;
+       if (setsockopt (fd, SOL_SOCKET, SO_SNDBUF, &txbuf, sizeof (txbuf)) < 0)
+         {
+           SFLOW_WARN ("setsockopt(SO_TXBUF=%d) failed: %s", requested,
+                       strerror (errno));
+         }
+       // see what we actually got
+       txbufsiz = sizeof (txbuf);
+       if (getsockopt (fd, SOL_SOCKET, SO_SNDBUF, &txbuf, &txbufsiz) < 0)
+         {
+           SFLOW_ERR ("getsockopt(SO_SNDBUF) failed: %s", strerror (errno));
+         }
+      }
+    return txbuf;
+  }
+
+  /*_________________---------------------------__________________
+    _________________        generic_pid        __________________
+    -----------------___________________________------------------
+    choose a 32-bit id that is likely to be unique even if more
+    than one module in this process wants to bind a netlink socket
+  */
+
+  static u32
+  generic_pid (u32 mod_id)
+  {
+    return (mod_id << 16) | getpid ();
+  }
+
+  /*_________________---------------------------__________________
+    _________________        generic_open       __________________
+    -----------------___________________________------------------
+  */
+
+  static int
+  generic_open (u32 mod_id)
+  {
+    int nl_sock = socket (AF_NETLINK, SOCK_RAW, NETLINK_GENERIC);
+    if (nl_sock < 0)
+      {
+       SFLOW_ERR ("nl_sock open failed: %s\n", strerror (errno));
+       return -1;
+      }
+    // bind to a suitable id
+    struct sockaddr_nl sa = { .nl_family = AF_NETLINK,
+                             .nl_pid = generic_pid (mod_id) };
+    if (bind (nl_sock, (struct sockaddr *) &sa, sizeof (sa)) < 0)
+      SFLOW_ERR ("generic_open: bind failed: %s\n", strerror (errno));
+    setNonBlocking (nl_sock);
+    setCloseOnExec (nl_sock);
+    return nl_sock;
+  }
+
+  /*_________________---------------------------__________________
+    _________________       generic_send        __________________
+    -----------------___________________________------------------
+  */
+
+  static int
+  generic_send (int sockfd, u32 mod_id, int type, int cmd, int req_type,
+               void *req, int req_len, int req_footprint, u32 seqNo)
+  {
+    struct nlmsghdr nlh = {};
+    struct genlmsghdr ge = {};
+    struct nlattr attr = {};
+
+    attr.nla_len = sizeof (attr) + req_len;
+    attr.nla_type = req_type;
+
+    ge.cmd = cmd;
+    ge.version = 1;
+
+    nlh.nlmsg_len = NLMSG_LENGTH (req_footprint + sizeof (attr) + sizeof (ge));
+    nlh.nlmsg_flags = NLM_F_REQUEST | NLM_F_ACK;
+    nlh.nlmsg_type = type;
+    nlh.nlmsg_seq = seqNo;
+    nlh.nlmsg_pid = generic_pid (mod_id);
+
+    struct iovec iov[4] = { { .iov_base = &nlh, .iov_len = sizeof (nlh) },
+                           { .iov_base = &ge, .iov_len = sizeof (ge) },
+                           { .iov_base = &attr, .iov_len = sizeof (attr) },
+                           { .iov_base = req, .iov_len = req_footprint } };
+
+    struct sockaddr_nl sa = { .nl_family = AF_NETLINK };
+    struct msghdr msg = { .msg_name = &sa,
+                         .msg_namelen = sizeof (sa),
+                         .msg_iov = iov,
+                         .msg_iovlen = 4 };
+    return sendmsg (sockfd, &msg, 0);
+  }
+
+  /*_________________---------------------------__________________
+    _________________    getFamily_PSAMPLE      __________________
+    -----------------___________________________------------------
+  */
+
+  static void
+  getFamily_PSAMPLE (SFLOWPS *pst)
+  {
+#define SFLOWPS_FAM_LEN              sizeof (PSAMPLE_GENL_NAME)
+#define SFLOWPS_FAM_FOOTPRINT NLMSG_ALIGN (SFLOWPS_FAM_LEN)
+    char fam_name[SFLOWPS_FAM_FOOTPRINT] = {};
+    memcpy (fam_name, PSAMPLE_GENL_NAME, SFLOWPS_FAM_LEN);
+    generic_send (pst->nl_sock, pst->id, GENL_ID_CTRL, CTRL_CMD_GETFAMILY,
+                 CTRL_ATTR_FAMILY_NAME, fam_name, SFLOWPS_FAM_LEN,
+                 SFLOWPS_FAM_FOOTPRINT, ++pst->nl_seq);
+    pst->state = SFLOWPS_STATE_WAIT_FAMILY;
+  }
+
+  /*_________________---------------------------__________________
+    _________________  processNetlink_GENERIC   __________________
+    -----------------___________________________------------------
+  */
+
+  static void
+  processNetlink_GENERIC (SFLOWPS *pst, struct nlmsghdr *nlh)
+  {
+    char *msg = (char *) NLMSG_DATA (nlh);
+    int msglen = nlh->nlmsg_len - NLMSG_HDRLEN;
+    struct genlmsghdr *genl = (struct genlmsghdr *) msg;
+    SFLOW_DBG ("generic netlink CMD = %u\n", genl->cmd);
+
+    for (int offset = GENL_HDRLEN; offset < msglen;)
+      {
+       struct nlattr *attr = (struct nlattr *) (msg + offset);
+       if (attr->nla_len == 0 || (attr->nla_len + offset) > msglen)
+         {
+           SFLOW_ERR ("processNetlink_GENERIC attr parse error\n");
+           break; // attr parse error
+         }
+       char *attr_datap = (char *) attr + NLA_HDRLEN;
+       switch (attr->nla_type)
+         {
+         case CTRL_ATTR_VERSION:
+           pst->genetlink_version = *(u32 *) attr_datap;
+           break;
+         case CTRL_ATTR_FAMILY_ID:
+           pst->family_id = *(u16 *) attr_datap;
+           SFLOW_DBG ("generic family id: %u\n", pst->family_id);
+           break;
+         case CTRL_ATTR_FAMILY_NAME:
+           SFLOW_DBG ("generic family name: %s\n", attr_datap);
+           break;
+         case CTRL_ATTR_MCAST_GROUPS:
+           for (int grp_offset = NLA_HDRLEN; grp_offset < attr->nla_len;)
+             {
+               struct nlattr *grp_attr =
+                 (struct nlattr *) (msg + offset + grp_offset);
+               if (grp_attr->nla_len == 0 ||
+                   (grp_attr->nla_len + grp_offset) > attr->nla_len)
+                 {
+                   SFLOW_ERR (
+                     "processNetlink_GENERIC grp_attr parse error\n");
+                   break;
+                 }
+               char *grp_name = NULL;
+               u32 grp_id = 0;
+               for (int gf_offset = NLA_HDRLEN;
+                    gf_offset < grp_attr->nla_len;)
+                 {
+                   struct nlattr *gf_attr =
+                     (struct nlattr *) (msg + offset + grp_offset +
+                                        gf_offset);
+                   if (gf_attr->nla_len == 0 ||
+                       (gf_attr->nla_len + gf_offset) > grp_attr->nla_len)
+                     {
+                       SFLOW_ERR (
+                         "processNetlink_GENERIC gf_attr parse error\n");
+                       break;
+                     }
+                   char *grp_attr_datap = (char *) gf_attr + NLA_HDRLEN;
+                   switch (gf_attr->nla_type)
+                     {
+                     case CTRL_ATTR_MCAST_GRP_NAME:
+                       grp_name = grp_attr_datap;
+                       SFLOW_DBG ("psample multicast group: %s\n", grp_name);
+                       break;
+                     case CTRL_ATTR_MCAST_GRP_ID:
+                       grp_id = *(u32 *) grp_attr_datap;
+                       SFLOW_DBG ("psample multicast group id: %u\n", grp_id);
+                       break;
+                     }
+                   gf_offset += NLMSG_ALIGN (gf_attr->nla_len);
+                 }
+               if (pst->group_id == 0 && grp_name && grp_id &&
+                   !strcmp (grp_name, PSAMPLE_NL_MCGRP_SAMPLE_NAME))
+                 {
+                   SFLOW_DBG ("psample found group %s=%u\n", grp_name,
+                              grp_id);
+                   pst->group_id = grp_id;
+                   // We don't need to join the group if we are only sending
+                   // to it.
+                 }
+
+               grp_offset += NLMSG_ALIGN (grp_attr->nla_len);
+             }
+           break;
+         default:
+           SFLOW_DBG ("psample attr type: %u (nested=%u) len: %u\n",
+                      attr->nla_type, attr->nla_type & NLA_F_NESTED,
+                      attr->nla_len);
+           break;
+         }
+       offset += NLMSG_ALIGN (attr->nla_len);
+      }
+    if (pst->family_id && pst->group_id)
+      {
+       SFLOW_DBG ("psample state->READY\n");
+       pst->state = SFLOWPS_STATE_READY;
+      }
+  }
+
+  // TODO: we can take out the fns for reading PSAMPLE here
+
+  /*_________________---------------------------__________________
+    _________________      processNetlink       __________________
+    -----------------___________________________------------------
+  */
+
+  static void
+  processNetlink (SFLOWPS *pst, struct nlmsghdr *nlh)
+  {
+    if (nlh->nlmsg_type == NETLINK_GENERIC)
+      {
+       processNetlink_GENERIC (pst, nlh);
+      }
+    else if (nlh->nlmsg_type == pst->family_id)
+      {
+       // We are write-only, don't need to read these.
+      }
+  }
+
+  /*_________________---------------------------__________________
+    _________________   readNetlink_PSAMPLE     __________________
+    -----------------___________________________------------------
+  */
+
+  static void
+  readNetlink_PSAMPLE (SFLOWPS *pst, int fd)
+  {
+    uint8_t recv_buf[SFLOWPS_PSAMPLE_READNL_RCV_BUF];
+    int numbytes = recv (fd, recv_buf, sizeof (recv_buf), 0);
+    if (numbytes <= 0)
+      {
+       SFLOW_ERR ("readNetlink_PSAMPLE returned %d : %s\n", numbytes,
+                  strerror (errno));
+       return;
+      }
+    struct nlmsghdr *nlh = (struct nlmsghdr *) recv_buf;
+    while (NLMSG_OK (nlh, numbytes))
+      {
+       if (nlh->nlmsg_type == NLMSG_DONE)
+         break;
+       if (nlh->nlmsg_type == NLMSG_ERROR)
+         {
+           struct nlmsgerr *err_msg = (struct nlmsgerr *) NLMSG_DATA (nlh);
+           if (err_msg->error == 0)
+             {
+               SFLOW_DBG ("received Netlink ACK\n");
+             }
+           else
+             {
+               SFLOW_ERR ("error in netlink message: %d : %s\n",
+                          err_msg->error, strerror (-err_msg->error));
+             }
+           return;
+         }
+       processNetlink (pst, nlh);
+       nlh = NLMSG_NEXT (nlh, numbytes);
+      }
+  }
+
+  /*_________________---------------------------__________________
+    _________________       SFLOWPS_open        __________________
+    -----------------___________________________------------------
+  */
+
+  bool
+  SFLOWPS_open (SFLOWPS *pst)
+  {
+    if (pst->nl_sock == 0)
+      {
+       pst->nl_sock = generic_open (pst->id);
+       if (pst->nl_sock > 0)
+         {
+           pst->state = SFLOWPS_STATE_OPEN;
+           setSendBuffer (pst->nl_sock, SFLOWPS_PSAMPLE_READNL_SND_BUF);
+           getFamily_PSAMPLE (pst);
+         }
+      }
+    return (pst->nl_sock > 0);
+  }
+
+  /*_________________---------------------------__________________
+    _________________       SFLOWPS_close       __________________
+    -----------------___________________________------------------
+  */
+
+  bool
+  SFLOWPS_close (SFLOWPS *pst)
+  {
+    if (pst->nl_sock > 0)
+      {
+       int err = close (pst->nl_sock);
+       if (err == 0)
+         {
+           pst->nl_sock = 0;
+           return true;
+         }
+       else
+         {
+           SFLOW_ERR ("SFLOWPS_close: returned %d : %s\n", err,
+                      strerror (errno));
+         }
+      }
+    return false;
+  }
+
+  /*_________________---------------------------__________________
+    _________________       SFLOWPS_state       __________________
+    -----------------___________________________------------------
+  */
+
+  EnumSFLOWPSState
+  SFLOWPS_state (SFLOWPS *pst)
+  {
+    return pst->state;
+  }
+
+  /*_________________---------------------------__________________
+    _________________    SFLOWPS_open_step      __________________
+    -----------------___________________________------------------
+  */
+
+  EnumSFLOWPSState
+  SFLOWPS_open_step (SFLOWPS *pst)
+  {
+    switch (pst->state)
+      {
+      case SFLOWPS_STATE_INIT:
+       SFLOWPS_open (pst);
+       break;
+      case SFLOWPS_STATE_OPEN:
+       getFamily_PSAMPLE (pst);
+       break;
+      case SFLOWPS_STATE_WAIT_FAMILY:
+       readNetlink_PSAMPLE (pst, pst->nl_sock);
+       break;
+      case SFLOWPS_STATE_READY:
+       break;
+      }
+    return pst->state;
+  }
+
+  /*_________________---------------------------__________________
+    _________________    SFLOWPSSpec_setAttr    __________________
+    -----------------___________________________------------------
+  */
+
+  bool
+  SFLOWPSSpec_setAttr (SFLOWPSSpec *spec, EnumSFLOWPSAttributes field,
+                      void *val, int len)
+  {
+    SFLOWPSAttr *psa = &spec->attr[field];
+    if (psa->included)
+      return false;
+    psa->included = true;
+    int expected_len = SFLOWPS_Fields[field].len;
+    if (expected_len && expected_len != len)
+      {
+       SFLOW_ERR ("SFLOWPSSpec_setAttr(%s) length=%u != expected: %u\n",
+                  SFLOWPS_Fields[field].descr, len, expected_len);
+       return false;
+      }
+    psa->attr.nla_type = field;
+    psa->attr.nla_len = sizeof (psa->attr) + len;
+    int len_w_pad = NLMSG_ALIGN (len);
+    psa->val.iov_len = len_w_pad;
+    psa->val.iov_base = val;
+    spec->n_attrs++;
+    spec->attrs_len += sizeof (psa->attr);
+    spec->attrs_len += len_w_pad;
+    return true;
+  }
+
+  /*_________________---------------------------__________________
+    _________________    SFLOWPSSpec_send       __________________
+    -----------------___________________________------------------
+  */
+
+  int
+  SFLOWPSSpec_send (SFLOWPS *pst, SFLOWPSSpec *spec)
+  {
+    spec->nlh.nlmsg_len = NLMSG_LENGTH (sizeof (spec->ge) + spec->attrs_len);
+    spec->nlh.nlmsg_flags = 0;
+    spec->nlh.nlmsg_type = pst->family_id;
+    spec->nlh.nlmsg_seq = ++pst->nl_seq;
+    spec->nlh.nlmsg_pid = generic_pid (pst->id);
+
+    spec->ge.cmd = PSAMPLE_CMD_SAMPLE;
+    spec->ge.version = PSAMPLE_GENL_VERSION;
+
+#define MAX_IOV_FRAGMENTS (2 * __SFLOWPS_PSAMPLE_ATTR_MAX) + 2
+
+    struct iovec iov[MAX_IOV_FRAGMENTS];
+    u32 frag = 0;
+    iov[frag].iov_base = &spec->nlh;
+    iov[frag].iov_len = sizeof (spec->nlh);
+    frag++;
+    iov[frag].iov_base = &spec->ge;
+    iov[frag].iov_len = sizeof (spec->ge);
+    frag++;
+    int nn = 0;
+    for (u32 ii = 0; ii < __SFLOWPS_PSAMPLE_ATTR_MAX; ii++)
+      {
+       SFLOWPSAttr *psa = &spec->attr[ii];
+       if (psa->included)
+         {
+           nn++;
+           iov[frag].iov_base = &psa->attr;
+           iov[frag].iov_len = sizeof (psa->attr);
+           frag++;
+           iov[frag] = psa->val; // struct copy
+           frag++;
+         }
+      }
+    ASSERT (nn == spec->n_attrs);
+
+    struct sockaddr_nl da = { .nl_family = AF_NETLINK,
+                             .nl_groups = (1 << (pst->group_id - 1)) };
+
+    struct msghdr msg = { .msg_name = &da,
+                         .msg_namelen = sizeof (da),
+                         .msg_iov = iov,
+                         .msg_iovlen = frag };
+
+    int status = sendmsg (pst->nl_sock, &msg, 0);
+    if (status <= 0)
+      {
+       SFLOW_ERR ("strerror(errno) = %s; errno = %d\n", strerror (errno),
+                  errno);
+       return -1;
+      }
+    return 0;
+  }
diff --git a/src/plugins/sflow/sflow_psample.h b/src/plugins/sflow/sflow_psample.h
new file mode 100644 (file)
index 0000000..5d49442
--- /dev/null
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __included_sflow_psample_h__
+#define __included_sflow_psample_h__
+
+#include <vlib/vlib.h>
+#include <vnet/vnet.h>
+#include <vnet/pg/pg.h>
+#include <vppinfra/error.h>
+#include <sflow/sflow.h>
+
+#include <asm/types.h>
+#include <sys/socket.h>
+#include <linux/types.h>
+#include <linux/netlink.h>
+#include <linux/genetlink.h>
+#include <linux/psample.h>
+#include <signal.h>
+#include <ctype.h>
+
+// #define SFLOWPS_DEBUG
+
+#define SFLOWPS_PSAMPLE_READNL_RCV_BUF 8192
+#define SFLOWPS_PSAMPLE_READNL_SND_BUF 1000000
+
+/* Shadow the attributes in linux/psample.h so
+ * we can easily compile/test fields that are not
+ * defined on the kernel we are compiling on.
+ */
+typedef enum
+{
+#define SFLOWPS_FIELDDATA(field, len, descr) field,
+#include "sflow/sflow_psample_fields.h"
+#undef SFLOWPS_FIELDDATA
+  __SFLOWPS_PSAMPLE_ATTR_MAX
+} EnumSFLOWPSAttributes;
+
+typedef struct _SFLOWPS_field_t
+{
+  EnumSFLOWPSAttributes field;
+  int len;
+  char *descr;
+} SFLOWPS_field_t;
+
+static const SFLOWPS_field_t SFLOWPS_Fields[] = {
+#define SFLOWPS_FIELDDATA(field, len, descr) { field, len, descr },
+#include "sflow/sflow_psample_fields.h"
+#undef SFLOWPS_FIELDDATA
+};
+
+typedef enum
+{
+  SFLOWPS_STATE_INIT,
+  SFLOWPS_STATE_OPEN,
+  SFLOWPS_STATE_WAIT_FAMILY,
+  SFLOWPS_STATE_READY
+} EnumSFLOWPSState;
+
+typedef struct _SFLOWPS
+{
+  EnumSFLOWPSState state;
+  u32 id;
+  int nl_sock;
+  u32 nl_seq;
+  u32 genetlink_version;
+  u16 family_id;
+  u32 group_id;
+} SFLOWPS;
+
+typedef struct _SFLOWPSAttr
+{
+  bool included : 1;
+  struct nlattr attr;
+  struct iovec val;
+} SFLOWPSAttr;
+
+typedef struct _SFLOWPSSpec
+{
+  struct nlmsghdr nlh;
+  struct genlmsghdr ge;
+  SFLOWPSAttr attr[__SFLOWPS_PSAMPLE_ATTR_MAX];
+  int n_attrs;
+  int attrs_len;
+} SFLOWPSSpec;
+
+bool SFLOWPS_open (SFLOWPS *pst);
+bool SFLOWPS_close (SFLOWPS *pst);
+EnumSFLOWPSState SFLOWPS_state (SFLOWPS *pst);
+EnumSFLOWPSState SFLOWPS_open_step (SFLOWPS *pst);
+
+bool SFLOWPSSpec_setAttr (SFLOWPSSpec *spec, EnumSFLOWPSAttributes field,
+                         void *buf, int len);
+#define SFLOWPSSpec_setAttrInt(spec, field, val)                              \
+  SFLOWPSSpec_setAttr ((spec), (field), &(val), sizeof (val))
+
+int SFLOWPSSpec_send (SFLOWPS *pst, SFLOWPSSpec *spec);
+
+#endif /* __included_sflow_psample_h__ */
diff --git a/src/plugins/sflow/sflow_psample_fields.h b/src/plugins/sflow/sflow_psample_fields.h
new file mode 100644 (file)
index 0000000..72d484c
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_IIFINDEX, 4, "input if_index")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_OIFINDEX, 4, "output if_index")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_ORIGSIZE, 4, "original packet size")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_SAMPLE_GROUP, 4, "group number")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_GROUP_SEQ, 4, "group sequence number")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_SAMPLE_RATE, 4, "sampling N")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_DATA, 0, "sampled header")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_TUNNEL, 0, "tunnel header")
+
+/* commands attributes */
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_GROUP_REFCOUNT, 0,
+                  "group reference count")
+
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_PAD, 0, "pad bytes")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_OUT_TC, 2, "egress queue number")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_OUT_TC_OCC, 8,
+                  "egress queue depth in bytes")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_LATENCY, 8,
+                  "transit latency in nanoseconds")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_TIMESTAMP, 8, "timestamp")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_PROTO, 2, "header protocol")
diff --git a/src/plugins/sflow/sflow_test.c b/src/plugins/sflow/sflow_test.c
new file mode 100644 (file)
index 0000000..5548066
--- /dev/null
@@ -0,0 +1,298 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vat/vat.h>
+#include <vlibapi/api.h>
+#include <vlibmemory/api.h>
+#include <vppinfra/error.h>
+#include <stdbool.h>
+
+#define __plugin_msg_base sflow_test_main.msg_id_base
+#include <vlibapi/vat_helper_macros.h>
+
+uword unformat_sw_if_index (unformat_input_t *input, va_list *args);
+
+/* Declare message IDs */
+#include <sflow/sflow.api_enum.h>
+#include <sflow/sflow.api_types.h>
+
+typedef struct
+{
+  /* API message ID base */
+  u16 msg_id_base;
+  vat_main_t *vat_main;
+} sflow_test_main_t;
+
+sflow_test_main_t sflow_test_main;
+
+static int
+api_sflow_enable_disable (vat_main_t *vam)
+{
+  unformat_input_t *i = vam->input;
+  int enable_disable = 1;
+  u32 hw_if_index = ~0;
+  vl_api_sflow_enable_disable_t *mp;
+  int ret;
+
+  /* Parse args required to build the message */
+  while (unformat_check_input (i) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (i, "%U", unformat_sw_if_index, vam, &hw_if_index))
+       ;
+      else if (unformat (i, "disable"))
+       enable_disable = 0;
+      else
+       break;
+    }
+
+  if (hw_if_index == ~0)
+    {
+      errmsg ("missing interface name / explicit hw_if_index number \n");
+      return -99;
+    }
+
+  /* Construct the API message */
+  M (SFLOW_ENABLE_DISABLE, mp);
+  mp->hw_if_index = ntohl (hw_if_index);
+  mp->enable_disable = enable_disable;
+
+  /* send it... */
+  S (mp);
+
+  /* Wait for a reply... */
+  W (ret);
+  return ret;
+}
+
+static void
+vl_api_sflow_sampling_rate_get_reply_t_handler (
+  vl_api_sflow_sampling_rate_get_reply_t *mp)
+{
+  vat_main_t *vam = sflow_test_main.vat_main;
+  clib_warning ("sflow sampling_N: %d", ntohl (mp->sampling_N));
+  vam->result_ready = 1;
+}
+
+static int
+api_sflow_sampling_rate_get (vat_main_t *vam)
+{
+  vl_api_sflow_sampling_rate_get_t *mp;
+  int ret;
+
+  /* Construct the API message */
+  M (SFLOW_SAMPLING_RATE_GET, mp);
+
+  /* send it... */
+  S (mp);
+
+  /* Wait for a reply... */
+  W (ret);
+  return ret;
+}
+
+static int
+api_sflow_sampling_rate_set (vat_main_t *vam)
+{
+  unformat_input_t *i = vam->input;
+  u32 sampling_N = ~0;
+  vl_api_sflow_sampling_rate_set_t *mp;
+  int ret;
+
+  /* Parse args required to build the message */
+  while (unformat_check_input (i) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (i, "sampling_N %d", &sampling_N))
+       ;
+      else
+       break;
+    }
+
+  if (sampling_N == ~0)
+    {
+      errmsg ("missing sampling_N number \n");
+      return -99;
+    }
+
+  /* Construct the API message */
+  M (SFLOW_SAMPLING_RATE_SET, mp);
+  mp->sampling_N = ntohl (sampling_N);
+
+  /* send it... */
+  S (mp);
+
+  /* Wait for a reply... */
+  W (ret);
+  return ret;
+}
+
+static void
+vl_api_sflow_polling_interval_get_reply_t_handler (
+  vl_api_sflow_polling_interval_get_reply_t *mp)
+{
+  vat_main_t *vam = sflow_test_main.vat_main;
+  clib_warning ("sflow polling-interval: %d", ntohl (mp->polling_S));
+  vam->result_ready = 1;
+}
+
+static int
+api_sflow_polling_interval_get (vat_main_t *vam)
+{
+  vl_api_sflow_polling_interval_get_t *mp;
+  int ret;
+
+  /* Construct the API message */
+  M (SFLOW_POLLING_INTERVAL_GET, mp);
+
+  /* send it... */
+  S (mp);
+
+  /* Wait for a reply... */
+  W (ret);
+  return ret;
+}
+
+static int
+api_sflow_polling_interval_set (vat_main_t *vam)
+{
+  unformat_input_t *i = vam->input;
+  u32 polling_S = ~0;
+  vl_api_sflow_polling_interval_set_t *mp;
+  int ret;
+
+  /* Parse args required to build the message */
+  while (unformat_check_input (i) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (i, "polling_S %d", &polling_S))
+       ;
+      else
+       break;
+    }
+
+  if (polling_S == ~0)
+    {
+      errmsg ("missing polling_S number \n");
+      return -99;
+    }
+
+  /* Construct the API message */
+  M (SFLOW_POLLING_INTERVAL_SET, mp);
+  mp->polling_S = ntohl (polling_S);
+
+  /* send it... */
+  S (mp);
+
+  /* Wait for a reply... */
+  W (ret);
+  return ret;
+}
+
+static void
+vl_api_sflow_header_bytes_get_reply_t_handler (
+  vl_api_sflow_header_bytes_get_reply_t *mp)
+{
+  vat_main_t *vam = sflow_test_main.vat_main;
+  clib_warning ("sflow header-bytes: %d", ntohl (mp->header_B));
+  vam->result_ready = 1;
+}
+
+static int
+api_sflow_header_bytes_get (vat_main_t *vam)
+{
+  vl_api_sflow_header_bytes_get_t *mp;
+  int ret;
+
+  /* Construct the API message */
+  M (SFLOW_HEADER_BYTES_GET, mp);
+
+  /* send it... */
+  S (mp);
+
+  /* Wait for a reply... */
+  W (ret);
+  return ret;
+}
+
+static int
+api_sflow_header_bytes_set (vat_main_t *vam)
+{
+  unformat_input_t *i = vam->input;
+  u32 header_B = ~0;
+  vl_api_sflow_header_bytes_set_t *mp;
+  int ret;
+
+  /* Parse args required to build the message */
+  while (unformat_check_input (i) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (i, "header_B %d", &header_B))
+       ;
+      else
+       break;
+    }
+
+  if (header_B == ~0)
+    {
+      errmsg ("missing header_B number \n");
+      return -99;
+    }
+
+  /* Construct the API message */
+  M (SFLOW_HEADER_BYTES_SET, mp);
+  mp->header_B = ntohl (header_B);
+
+  /* send it... */
+  S (mp);
+
+  /* Wait for a reply... */
+  W (ret);
+  return ret;
+}
+
+static void
+vl_api_sflow_interface_details_t_handler (vl_api_sflow_interface_details_t *mp)
+{
+  vat_main_t *vam = sflow_test_main.vat_main;
+  clib_warning ("sflow enable: %d", ntohl (mp->hw_if_index));
+  vam->result_ready = 1;
+}
+
+static int
+api_sflow_interface_dump (vat_main_t *vam)
+{
+  vl_api_sflow_interface_dump_t *mp;
+  int ret;
+
+  /* Construct the API message */
+  M (SFLOW_INTERFACE_DUMP, mp);
+
+  /* send it... */
+  S (mp);
+
+  /* Wait for a reply... */
+  W (ret);
+  return ret;
+}
+
+/*
+ * List of messages that the sflow test plugin sends,
+ * and that the data plane plugin processes
+ */
+#include <sflow/sflow.api_test.c>
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/plugins/sflow/sflow_usersock.c b/src/plugins/sflow/sflow_usersock.c
new file mode 100644 (file)
index 0000000..0ccb947
--- /dev/null
@@ -0,0 +1,222 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if defined(__cplusplus)
+extern "C"
+{
+#endif
+
+#include <vlib/vlib.h>
+#include <vnet/vnet.h>
+#include <vnet/pg/pg.h>
+#include <vppinfra/error.h>
+#include <sflow/sflow.h>
+
+#include <fcntl.h>
+#include <asm/types.h>
+#include <sys/socket.h>
+#include <linux/types.h>
+#include <linux/netlink.h>
+#include <signal.h>
+#include <ctype.h>
+
+#include <sflow/sflow_usersock.h>
+
+  /*_________________---------------------------__________________
+    _________________       fcntl utils         __________________
+    -----------------___________________________------------------
+  */
+
+  static void
+  setNonBlocking (int fd)
+  {
+    // set the socket to non-blocking
+    int fdFlags = fcntl (fd, F_GETFL);
+    fdFlags |= O_NONBLOCK;
+    if (fcntl (fd, F_SETFL, fdFlags) < 0)
+      {
+       SFLOW_ERR ("fcntl(O_NONBLOCK) failed: %s\n", strerror (errno));
+      }
+  }
+
+  static void
+  setCloseOnExec (int fd)
+  {
+    // make sure it doesn't get inherited, e.g. when we fork a script
+    int fdFlags = fcntl (fd, F_GETFD);
+    fdFlags |= FD_CLOEXEC;
+    if (fcntl (fd, F_SETFD, fdFlags) < 0)
+      {
+       SFLOW_ERR ("fcntl(F_SETFD=FD_CLOEXEC) failed: %s\n", strerror (errno));
+      }
+  }
+
+  /*_________________---------------------------__________________
+    _________________       usersock_open       __________________
+    -----------------___________________________------------------
+  */
+
+  static int
+  usersock_open (void)
+  {
+    int nl_sock = socket (AF_NETLINK, SOCK_RAW, NETLINK_USERSOCK);
+    if (nl_sock < 0)
+      {
+       SFLOW_ERR ("nl_sock open failed: %s\n", strerror (errno));
+       return -1;
+      }
+    setNonBlocking (nl_sock);
+    setCloseOnExec (nl_sock);
+    return nl_sock;
+  }
+
+  /*_________________---------------------------__________________
+    _________________       SFLOWUS_open        __________________
+    -----------------___________________________------------------
+  */
+
+  bool
+  SFLOWUS_open (SFLOWUS *ust)
+  {
+    if (ust->nl_sock == 0)
+      {
+       ust->nl_sock = usersock_open ();
+      }
+    return true;
+  }
+
+  /*_________________---------------------------__________________
+    _________________       SFLOWUS_close       __________________
+    -----------------___________________________------------------
+  */
+
+  bool
+  SFLOWUS_close (SFLOWUS *ust)
+  {
+    if (ust->nl_sock != 0)
+      {
+       int err = close (ust->nl_sock);
+       if (err == 0)
+         {
+           ust->nl_sock = 0;
+           return true;
+         }
+       else
+         {
+           SFLOW_WARN ("SFLOWUS_close: returned %d : %s\n", err,
+                       strerror (errno));
+         }
+      }
+    return false;
+  }
+
+  /*_________________---------------------------__________________
+    _________________  SFLOWUSSpec_setMsgType   __________________
+    -----------------___________________________------------------
+  */
+
+  bool
+  SFLOWUSSpec_setMsgType (SFLOWUSSpec *spec, EnumSFlowVppMsgType msgType)
+  {
+    spec->nlh.nlmsg_type = msgType;
+    return true;
+  }
+
+  /*_________________---------------------------__________________
+    _________________    SFLOWUSSpec_setAttr    __________________
+    -----------------___________________________------------------
+  */
+
+  bool
+  SFLOWUSSpec_setAttr (SFLOWUSSpec *spec, EnumSFlowVppAttributes field,
+                      void *val, int len)
+  {
+    SFLOWUSAttr *usa = &spec->attr[field];
+    if (usa->included)
+      return false;
+    usa->included = true;
+    usa->attr.nla_type = field;
+    usa->attr.nla_len = sizeof (usa->attr) + len;
+    int len_w_pad = NLMSG_ALIGN (len);
+    usa->val.iov_len = len_w_pad;
+    usa->val.iov_base = val;
+    spec->n_attrs++;
+    spec->attrs_len += sizeof (usa->attr);
+    spec->attrs_len += len_w_pad;
+    return true;
+  }
+
+  /*_________________---------------------------__________________
+    _________________    SFLOWUSSpec_send       __________________
+    -----------------___________________________------------------
+  */
+
+  int
+  SFLOWUSSpec_send (SFLOWUS *ust, SFLOWUSSpec *spec)
+  {
+    spec->nlh.nlmsg_len = NLMSG_LENGTH (spec->attrs_len);
+    spec->nlh.nlmsg_flags = 0;
+    spec->nlh.nlmsg_seq = ++ust->nl_seq;
+    spec->nlh.nlmsg_pid = getpid ();
+
+#define MAX_IOV_FRAGMENTS (2 * __SFLOW_VPP_ATTR_MAX) + 2
+
+    struct iovec iov[MAX_IOV_FRAGMENTS];
+    u32 frag = 0;
+    iov[frag].iov_base = &spec->nlh;
+    iov[frag].iov_len = sizeof (spec->nlh);
+    frag++;
+    int nn = 0;
+    for (u32 ii = 0; ii < __SFLOW_VPP_ATTR_MAX; ii++)
+      {
+       SFLOWUSAttr *usa = &spec->attr[ii];
+       if (usa->included)
+         {
+           nn++;
+           iov[frag].iov_base = &usa->attr;
+           iov[frag].iov_len = sizeof (usa->attr);
+           frag++;
+           iov[frag] = usa->val; // struct copy
+           frag++;
+         }
+      }
+    ASSERT (nn == spec->n_attrs);
+
+    struct sockaddr_nl da = {
+      .nl_family = AF_NETLINK,
+      .nl_groups = (1 << (ust->group_id - 1)) // for multicast to the group
+      // .nl_pid = 1e9+6343 // for unicast to receiver bound to netlink socket
+      // with that "pid"
+    };
+
+    struct msghdr msg = { .msg_name = &da,
+                         .msg_namelen = sizeof (da),
+                         .msg_iov = iov,
+                         .msg_iovlen = frag };
+
+    int status = sendmsg (ust->nl_sock, &msg, 0);
+    if (status <= 0)
+      {
+       // Linux replies with ECONNREFUSED when
+       // a multicast is sent via NETLINK_USERSOCK, but
+       // it's not an error so we can just ignore it here.
+       if (errno != ECONNREFUSED)
+         {
+           SFLOW_DBG ("USERSOCK strerror(errno) = %s\n", strerror (errno));
+           return -1;
+         }
+      }
+    return 0;
+  }
diff --git a/src/plugins/sflow/sflow_usersock.h b/src/plugins/sflow/sflow_usersock.h
new file mode 100644 (file)
index 0000000..d663899
--- /dev/null
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __included_sflow_usersock_h__
+#define __included_sflow_usersock_h__
+
+#include <vlib/vlib.h>
+#include <vnet/vnet.h>
+#include <vnet/pg/pg.h>
+#include <vppinfra/error.h>
+#include <sflow/sflow.h>
+
+#include <asm/types.h>
+#include <sys/socket.h>
+#include <linux/types.h>
+#include <linux/netlink.h>
+#include <signal.h>
+#include <ctype.h>
+
+// ==================== shared with hsflowd mod_vpp =========================
+// See https://github.com/sflow/host-sflow
+
+#define SFLOW_VPP_NETLINK_USERSOCK_MULTICAST 29
+
+typedef enum
+{
+  SFLOW_VPP_MSG_STATUS = 1,
+  SFLOW_VPP_MSG_IF_COUNTERS
+} EnumSFlowVppMsgType;
+
+typedef enum
+{
+  SFLOW_VPP_ATTR_PORTNAME,    /* string */
+  SFLOW_VPP_ATTR_IFINDEX,     /* u32 */
+  SFLOW_VPP_ATTR_IFTYPE,      /* u32 */
+  SFLOW_VPP_ATTR_IFSPEED,     /* u64 */
+  SFLOW_VPP_ATTR_IFDIRECTION, /* u32 */
+  SFLOW_VPP_ATTR_OPER_UP,     /* u32 */
+  SFLOW_VPP_ATTR_ADMIN_UP,    /* u32 */
+  SFLOW_VPP_ATTR_RX_OCTETS,   /* u64 */
+  SFLOW_VPP_ATTR_TX_OCTETS,   /* u64 */
+  SFLOW_VPP_ATTR_RX_PKTS,     /* u64 */
+  SFLOW_VPP_ATTR_TX_PKTS,     /* u64 */
+  SFLOW_VPP_ATTR_RX_BCASTS,   /* u64 */
+  SFLOW_VPP_ATTR_TX_BCASTS,   /* u64 */
+  SFLOW_VPP_ATTR_RX_MCASTS,   /* u64 */
+  SFLOW_VPP_ATTR_TX_MCASTS,   /* u64 */
+  SFLOW_VPP_ATTR_RX_DISCARDS, /* u64 */
+  SFLOW_VPP_ATTR_TX_DISCARDS, /* u64 */
+  SFLOW_VPP_ATTR_RX_ERRORS,   /* u64 */
+  SFLOW_VPP_ATTR_TX_ERRORS,   /* u64 */
+  SFLOW_VPP_ATTR_HW_ADDRESS,  /* binary */
+  SFLOW_VPP_ATTR_UPTIME_S,    /* u32 */
+  SFLOW_VPP_ATTR_OSINDEX,     /* u32 Linux ifIndex number, where applicable */
+  SFLOW_VPP_ATTR_DROPS,              /* u32 all FIFO and netlink sendmsg drops */
+  SFLOW_VPP_ATTR_SEQ,        /* u32 send seq no */
+  /* enum shared with hsflowd, so only add here */
+  __SFLOW_VPP_ATTR_MAX
+} EnumSFlowVppAttributes;
+
+#define SFLOW_VPP_PSAMPLE_GROUP_INGRESS 3
+#define SFLOW_VPP_PSAMPLE_GROUP_EGRESS 4
+
+// =========================================================================
+typedef struct
+{
+  u64 byts;
+  u64 pkts;
+  u64 m_pkts;
+  u64 b_pkts;
+  u64 errs;
+  u64 drps;
+} sflow_ctrs_t;
+
+typedef struct
+{
+  sflow_ctrs_t tx;
+  sflow_ctrs_t rx;
+} sflow_counters_t;
+
+typedef struct _SFLOWUS_field_t
+{
+  EnumSFlowVppAttributes field;
+  int len;
+} SFLOWUS_field_t;
+
+typedef struct _SFLOWUS
+{
+  u32 id;
+  int nl_sock;
+  u32 nl_seq;
+  u32 group_id;
+} SFLOWUS;
+
+typedef struct _SFLOWUSAttr
+{
+  bool included : 1;
+  struct nlattr attr;
+  struct iovec val;
+} SFLOWUSAttr;
+
+typedef struct _SFLOWUSSpec
+{
+  struct nlmsghdr nlh;
+  SFLOWUSAttr attr[__SFLOW_VPP_ATTR_MAX];
+  int n_attrs;
+  int attrs_len;
+} SFLOWUSSpec;
+
+bool SFLOWUS_open (SFLOWUS *ust);
+bool SFLOWUS_close (SFLOWUS *ust);
+
+bool SFLOWUSSpec_setMsgType (SFLOWUSSpec *spec, EnumSFlowVppMsgType type);
+bool SFLOWUSSpec_setAttr (SFLOWUSSpec *spec, EnumSFlowVppAttributes field,
+                         void *buf, int len);
+#define SFLOWUSSpec_setAttrInt(spec, field, val)                              \
+  SFLOWUSSpec_setAttr ((spec), (field), &(val), sizeof (val))
+
+int SFLOWUSSpec_send (SFLOWUS *ust, SFLOWUSSpec *spec);
+
+#endif /* __included_sflow_usersock_h__ */
diff --git a/src/plugins/sflow/sflow_vapi.c b/src/plugins/sflow/sflow_vapi.c
new file mode 100644 (file)
index 0000000..66e9eba
--- /dev/null
@@ -0,0 +1,221 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sflow/sflow_vapi.h>
+
+#include <vlibapi/api.h>
+#include <vlibmemory/api.h>
+#include <vpp/app/version.h>
+#include <stdbool.h>
+
+#include <vapi/vapi.h>
+#include <vapi/memclnt.api.vapi.h>
+#include <vapi/vlib.api.vapi.h>
+
+#ifdef included_interface_types_api_types_h
+#define defined_vapi_enum_if_status_flags
+#define defined_vapi_enum_mtu_proto
+#define defined_vapi_enum_link_duplex
+#define defined_vapi_enum_sub_if_flags
+#define defined_vapi_enum_rx_mode
+#define defined_vapi_enum_if_type
+#define defined_vapi_enum_direction
+#endif
+#include <vapi/lcp.api.vapi.h>
+
+DEFINE_VAPI_MSG_IDS_LCP_API_JSON;
+
+static vapi_error_e
+my_pair_get_cb (struct vapi_ctx_s *ctx, void *callback_ctx, vapi_error_e rv,
+               bool is_last, vapi_payload_lcp_itf_pair_get_v2_reply *reply)
+{
+  // this is a no-op, but it seems like it's presence is still required.  For
+  // example, it is called if the pair lookup does not find anything.
+  return VAPI_OK;
+}
+
+static vapi_error_e
+my_pair_details_cb (struct vapi_ctx_s *ctx, void *callback_ctx,
+                   vapi_error_e rv, bool is_last,
+                   vapi_payload_lcp_itf_pair_details *details)
+{
+  sflow_per_interface_data_t *sfif =
+    (sflow_per_interface_data_t *) callback_ctx;
+  // Setting this here will mean it is sent to hsflowd with the interface
+  // counters.
+  sfif->linux_if_index = details->vif_index;
+  return VAPI_OK;
+}
+
+static vapi_error_e
+sflow_vapi_connect (sflow_vapi_client_t *vac)
+{
+  vapi_error_e rv = VAPI_OK;
+  vapi_ctx_t ctx = vac->vapi_ctx;
+  if (ctx == NULL)
+    {
+      // first time - open and connect.
+      if ((rv = vapi_ctx_alloc (&ctx)) != VAPI_OK)
+       {
+         SFLOW_ERR ("vap_ctx_alloc() returned %d", rv);
+       }
+      else
+       {
+         vac->vapi_ctx = ctx;
+         if ((rv = vapi_connect_from_vpp (
+                ctx, "api_from_sflow_plugin", SFLOW_VAPI_MAX_REQUEST_Q,
+                SFLOW_VAPI_MAX_RESPONSE_Q, VAPI_MODE_BLOCKING, true)) !=
+             VAPI_OK)
+           {
+             SFLOW_ERR ("vapi_connect_from_vpp() returned %d", rv);
+           }
+         else
+           {
+             // Connected - but is there a handler for the request we want to
+             // send?
+             if (!vapi_is_msg_available (ctx,
+                                         vapi_msg_id_lcp_itf_pair_add_del_v2))
+               {
+                 SFLOW_WARN ("vapi_is_msg_available() returned false => "
+                             "linux-cp plugin not loaded");
+                 rv = VAPI_EUSER;
+               }
+           }
+       }
+    }
+  return rv;
+}
+
+// in forked thread
+static void *
+get_lcp_itf_pairs (void *magic)
+{
+  sflow_vapi_client_t *vac = magic;
+  vapi_error_e rv = VAPI_OK;
+
+  sflow_per_interface_data_t *intfs = vac->vapi_itfs;
+  vlib_set_thread_name (SFLOW_VAPI_THREAD_NAME);
+  if ((rv = sflow_vapi_connect (vac)) != VAPI_OK)
+    {
+      vac->vapi_unavailable = true;
+    }
+  else
+    {
+      vapi_ctx_t ctx = vac->vapi_ctx;
+
+      for (int ii = 1; ii < vec_len (intfs); ii++)
+       {
+         sflow_per_interface_data_t *sfif = vec_elt_at_index (intfs, ii);
+         if (sfif && sfif->sflow_enabled)
+           {
+             // TODO: if we try non-blocking we might not be able to just pour
+             // all the requests in here. Might be better to do them one at a
+             // time - e.g. when we poll for counters.
+             vapi_msg_lcp_itf_pair_get_v2 *msg =
+               vapi_alloc_lcp_itf_pair_get_v2 (ctx);
+             if (msg)
+               {
+                 msg->payload.sw_if_index = sfif->sw_if_index;
+                 if ((rv = vapi_lcp_itf_pair_get_v2 (ctx, msg, my_pair_get_cb,
+                                                     sfif, my_pair_details_cb,
+                                                     sfif)) != VAPI_OK)
+                   {
+                     SFLOW_ERR ("vapi_lcp_itf_pair_get_v2 returned %d", rv);
+                     // vapi.h: "message must be freed by vapi_msg_free if not
+                     // consumed by vapi_send"
+                     vapi_msg_free (ctx, msg);
+                   }
+               }
+           }
+       }
+      // We no longer disconnect or free the client structures
+      // vapi_disconnect_from_vpp (ctx);
+      // vapi_ctx_free (ctx);
+    }
+  // indicate that we are done - more portable that using pthread_tryjoin_np()
+  vac->vapi_request_status = (int) rv;
+  clib_atomic_store_rel_n (&vac->vapi_request_active, false);
+  // TODO: how to tell if heap-allocated data is stored separately per thread?
+  // And if so, how to tell the allocator to GC all data for the thread when it
+  // exits?
+  return (void *) rv;
+}
+
+int
+sflow_vapi_read_linux_if_index_numbers (sflow_vapi_client_t *vac,
+                                       sflow_per_interface_data_t *itfs)
+{
+
+#ifdef SFLOW_VAPI_TEST_PLUGIN_SYMBOL
+  // don't even fork the query thread if the symbol is not there
+  if (!vlib_get_plugin_symbol ("linux_cp_plugin.so", "lcp_itf_pair_get"))
+    {
+      return false;
+    }
+#endif
+  // previous query is done and results extracted?
+  int req_active = clib_atomic_load_acq_n (&vac->vapi_request_active);
+  if (req_active == false && vac->vapi_itfs == NULL)
+    {
+      // make a copy of the current interfaces vector for the lookup thread to
+      // write into
+      vac->vapi_itfs = vec_dup (itfs);
+      pthread_attr_t attr;
+      pthread_attr_init (&attr);
+      pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
+      pthread_attr_setstacksize (&attr, VLIB_THREAD_STACK_SIZE);
+      vac->vapi_request_active = true;
+      pthread_create (&vac->vapi_thread, &attr, get_lcp_itf_pairs, vac);
+      pthread_attr_destroy (&attr);
+      return true;
+    }
+  return false;
+}
+
+int
+sflow_vapi_check_for_linux_if_index_results (sflow_vapi_client_t *vac,
+                                            sflow_per_interface_data_t *itfs)
+{
+  // request completed?
+  // TODO: if we use non-blocking mode do we have to call something here to
+  // receive results?
+  int req_active = clib_atomic_load_acq_n (&vac->vapi_request_active);
+  if (req_active == false && vac->vapi_itfs != NULL)
+    {
+      // yes, extract what we learned
+      // TODO: would not have to do this if vector were array of pointers
+      // to sflow_per_interface_data_t rather than an actual array, but
+      // it does mean we have very clear separation between the threads.
+      for (int ii = 1; ii < vec_len (vac->vapi_itfs); ii++)
+       {
+         sflow_per_interface_data_t *sfif1 =
+           vec_elt_at_index (vac->vapi_itfs, ii);
+         sflow_per_interface_data_t *sfif2 = vec_elt_at_index (itfs, ii);
+         if (sfif1 && sfif2 && sfif1->sflow_enabled && sfif2->sflow_enabled)
+           sfif2->linux_if_index = sfif1->linux_if_index;
+       }
+      vec_free (vac->vapi_itfs);
+      vac->vapi_itfs = NULL;
+      return true;
+    }
+  return false;
+}
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/plugins/sflow/sflow_vapi.h b/src/plugins/sflow/sflow_vapi.h
new file mode 100644 (file)
index 0000000..baf4304
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __included_sflow_vapi_h__
+#define __included_sflow_vapi_h__
+
+#include <vnet/vnet.h>
+#include <sflow/sflow_common.h>
+
+#define SFLOW_VAPI_POLL_INTERVAL  5
+#define SFLOW_VAPI_MAX_REQUEST_Q  8
+#define SFLOW_VAPI_MAX_RESPONSE_Q 16
+#define SFLOW_VAPI_THREAD_NAME   "sflow_vapi" // must be <= 15 characters
+
+// #define SFLOW_VAPI_TEST_PLUGIN_SYMBOL
+
+typedef struct
+{
+  volatile int vapi_request_active; // to sync main <-> vapi_thread
+  pthread_t vapi_thread;
+  sflow_per_interface_data_t *vapi_itfs;
+  int vapi_unavailable;
+  int vapi_request_status; // written by vapi_thread
+  void *vapi_ctx;
+} sflow_vapi_client_t;
+
+int sflow_vapi_read_linux_if_index_numbers (sflow_vapi_client_t *vac,
+                                           sflow_per_interface_data_t *itfs);
+int
+sflow_vapi_check_for_linux_if_index_results (sflow_vapi_client_t *vac,
+                                            sflow_per_interface_data_t *itfs);
+
+#endif /* __included_sflow_vapi_h__ */
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/test/test_sflow.py b/test/test_sflow.py
new file mode 100644 (file)
index 0000000..d16c0e6
--- /dev/null
@@ -0,0 +1,212 @@
+#!/usr/bin/env python3
+
+import unittest
+from framework import VppTestCase
+from asfframework import VppTestRunner
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
+from scapy.layers.inet import IP, UDP
+from random import randint
+import re  # for finding counters in "sh errors" output
+
+
+class SFlowTestCase(VppTestCase):
+    """sFlow test case"""
+
+    @classmethod
+    def setUpClass(self):
+        super(SFlowTestCase, self).setUpClass()
+
+    @classmethod
+    def teadDownClass(cls):
+        super(SFlowTestCase, cls).tearDownClass()
+
+    def setUp(self):
+        self.create_pg_interfaces(range(2))  #  create pg0 and pg1
+        for i in self.pg_interfaces:
+            i.admin_up()  # put the interface up
+            i.config_ip4()  # configure IPv4 address on the interface
+            i.resolve_arp()  # resolve ARP, so that we know VPP MAC
+
+    def tearDown(self):
+        for i in self.pg_interfaces:
+            i.admin_down()
+            i.unconfig()
+            i.set_table_ip4(0)
+            i.set_table_ip6(0)
+
+    def is_hw_interface_in_dump(self, dump, hw_if_index):
+        for i in dump:
+            if i.hw_if_index == hw_if_index:
+                return True
+        else:
+            return False
+
+    def enable_sflow_via_api(self):
+        ## TEST: Enable one interface
+        ret = self.vapi.sflow_enable_disable(hw_if_index=1, enable_disable=True)
+        self.assertEqual(ret.retval, 0)
+
+        ## TEST: interface dump all
+        ret = self.vapi.sflow_interface_dump()
+        self.assertTrue(self.is_hw_interface_in_dump(ret, 1))
+
+        ## TEST: Disable one interface
+        ret = self.vapi.sflow_enable_disable(hw_if_index=1, enable_disable=False)
+        self.assertEqual(ret.retval, 0)
+
+        ## TEST: interface dump all after enable + disable
+        ret = self.vapi.sflow_interface_dump()
+        self.assertEqual(len(ret), 0)
+
+        ## TEST: Enable both interfaces
+        ret = self.vapi.sflow_enable_disable(hw_if_index=1, enable_disable=True)
+        self.assertEqual(ret.retval, 0)
+        ret = self.vapi.sflow_enable_disable(hw_if_index=2, enable_disable=True)
+        self.assertEqual(ret.retval, 0)
+
+        ## TEST: interface dump all
+        ret = self.vapi.sflow_interface_dump()
+        self.assertTrue(self.is_hw_interface_in_dump(ret, 1))
+        self.assertTrue(self.is_hw_interface_in_dump(ret, 2))
+
+        ## TEST: the default sampling rate
+        ret = self.vapi.sflow_sampling_rate_get()
+        self.assert_equal(ret.sampling_N, 10000)
+
+        ## TEST: sflow_sampling_rate_set()
+        self.vapi.sflow_sampling_rate_set(sampling_N=1)
+        ret = self.vapi.sflow_sampling_rate_get()
+        self.assert_equal(ret.sampling_N, 1)
+
+        ## TEST: the default polling interval
+        ret = self.vapi.sflow_polling_interval_get()
+        self.assert_equal(ret.polling_S, 20)
+
+        ## TEST: sflow_polling_interval_set()
+        self.vapi.sflow_polling_interval_set(polling_S=10)
+        ret = self.vapi.sflow_polling_interval_get()
+        self.assert_equal(ret.polling_S, 10)
+
+        ## TEST: the default header bytes
+        ret = self.vapi.sflow_header_bytes_get()
+        self.assert_equal(ret.header_B, 128)
+
+        ## TEST: sflow_header_bytes_set()
+        self.vapi.sflow_header_bytes_set(header_B=96)
+        ret = self.vapi.sflow_header_bytes_get()
+        self.assert_equal(ret.header_B, 96)
+
+    def create_stream(self, src_if, dst_if, count):
+        packets = []
+        for i in range(count):
+            # create packet info stored in the test case instance
+            info = self.create_packet_info(src_if, dst_if)
+            # convert the info into packet payload
+            payload = self.info_to_payload(info)
+            # create the packet itself
+            p = (
+                Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+                / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+                / UDP(sport=randint(1000, 2000), dport=5678)
+                / Raw(payload)
+            )
+            # store a copy of the packet in the packet info
+            info.data = p.copy()
+            # append the packet to the list
+            packets.append(p)
+            # return the created packet list
+        return packets
+
+    def verify_capture(self, src_if, dst_if, capture):
+        packet_info = None
+        for packet in capture:
+            try:
+                ip = packet[IP]
+                udp = packet[UDP]
+                # convert the payload to packet info object
+                payload_info = self.payload_to_info(packet[Raw])
+                # make sure the indexes match
+                self.assert_equal(
+                    payload_info.src, src_if.sw_if_index, "source sw_if_index"
+                )
+                self.assert_equal(
+                    payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
+                )
+                packet_info = self.get_next_packet_info_for_interface2(
+                    src_if.sw_if_index, dst_if.sw_if_index, packet_info
+                )
+                # make sure we didn't run out of saved packets
+                self.assertIsNotNone(packet_info)
+                self.assert_equal(
+                    payload_info.index, packet_info.index, "packet info index"
+                )
+                saved_packet = packet_info.data  # fetch the saved packet
+                # assert the values match
+                self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
+                self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
+            except:
+                self.logger.error("Unexpected or invalid packet:", packet)
+                raise
+        remaining_packet = self.get_next_packet_info_for_interface2(
+            src_if.sw_if_index, dst_if.sw_if_index, packet_info
+        )
+        self.assertIsNone(
+            remaining_packet,
+            "Interface %s: Packet expected from interface "
+            "%s didn't arrive" % (dst_if.name, src_if.name),
+        )
+
+    def get_sflow_counter(self, counter):
+        counters = self.vapi.cli("sh errors").split("\n")
+        for i in range(1, len(counters) - 1):
+            results = counters[i].split()
+            if results[1] == "sflow":
+                if re.search(counter, counters[i]) is not None:
+                    return int(results[0])
+        return None
+
+    def verify_sflow(self, count):
+        ctr_processed = "sflow packets processed"
+        ctr_sampled = "sflow packets sampled"
+        ctr_dropped = "sflow packets dropped"
+        ctr_ps_sent = "sflow PSAMPLE sent"
+        ctr_ps_fail = "sflow PSAMPLE send failed"
+        processed = self.get_sflow_counter(ctr_processed)
+        sampled = self.get_sflow_counter(ctr_sampled)
+        dropped = self.get_sflow_counter(ctr_dropped)
+        ps_sent = self.get_sflow_counter(ctr_ps_sent)
+        ps_fail = self.get_sflow_counter(ctr_ps_fail)
+        self.assert_equal(processed, count, ctr_processed)
+        self.assert_equal(sampled, count, ctr_sampled)
+        self.assert_equal(dropped, None, ctr_dropped)
+        # TODO decide how to warn if PSAMPLE is not working
+        # It requires a prior "sudo modprobe psample", but
+        # that should probably be done at system boot time
+        # or maybe in a systemctl startup script, so we
+        # should only warn here.
+        self.logger.info(ctr_ps_sent + "=" + str(ps_sent))
+        self.logger.info(ctr_ps_fail + "=" + str(ps_fail))
+
+    def test_basic(self):
+        self.enable_sflow_via_api()
+        count = 7
+        # create the packet stream
+        packets = self.create_stream(self.pg0, self.pg1, count)
+        # add the stream to the source interface
+        self.pg0.add_stream(packets)
+        # enable capture on both interfaces
+        self.pg0.enable_capture()
+        self.pg1.enable_capture()
+        # start the packet generator
+        self.pg_start()
+        # get capture - the proper count of packets was saved by
+        # create_packet_info() based on dst_if parameter
+        capture = self.pg1.get_capture()
+        # assert nothing captured on pg0 (always do this last, so that
+        # some time has already passed since pg_start())
+        self.pg0.assert_nothing_captured()
+        # verify capture
+        self.verify_capture(self.pg0, self.pg1, capture)
+        # verify sflow counters
+        self.verify_sflow(count)