snort: snort3 plugin and DAQ 77/31277/35
authorDamjan Marion <damarion@cisco.com>
Wed, 27 Jan 2021 20:17:48 +0000 (21:17 +0100)
committerDamjan Marion <dmarion@me.com>
Fri, 16 Jul 2021 11:36:32 +0000 (11:36 +0000)
Zero copy interface which exposes VPP buffers to snort instance(s).
Includes VPP DAQ which is compiled only if libdaq 3 API headers are
available.

Type: feature
Change-Id: I96611b43f94fbae091e7391589e0454ae66de88b
Signed-off-by: Damjan Marion <damarion@cisco.com>
Signed-off-by: Filip Tehlar <ftehlar@cisco.com>
MAINTAINERS
src/plugins/snort/CMakeLists.txt [new file with mode: 0644]
src/plugins/snort/cli.c [new file with mode: 0644]
src/plugins/snort/daq_vpp.c [new file with mode: 0644]
src/plugins/snort/daq_vpp.h [new file with mode: 0644]
src/plugins/snort/dequeue.c [new file with mode: 0644]
src/plugins/snort/enqueue.c [new file with mode: 0644]
src/plugins/snort/main.c [new file with mode: 0644]
src/plugins/snort/snort.h [new file with mode: 0644]

index b89a2a2..e0183dc 100644 (file)
@@ -632,6 +632,11 @@ M: Florin Coras <fcoras@cisco.com>
 Y:     src/plugins/quic/FEATURE.yaml
 F:     src/plugins/quic/
 
+Plugin - snort plugin
+I:     snort
+M:     Damjan Marion <damarion@cisco.com>
+F:     src/plugins/snort/
+
 libmemif
 I:     libmemif
 M:     Damjan Marion <damarion@cisco.com>
diff --git a/src/plugins/snort/CMakeLists.txt b/src/plugins/snort/CMakeLists.txt
new file mode 100644 (file)
index 0000000..bd9dcdc
--- /dev/null
@@ -0,0 +1,52 @@
+# SPDX-License-Identifier: Apache-2.0
+# Copyright(c) 2021 Cisco Systems, Inc.
+
+add_vpp_plugin(snort
+  SOURCES
+  enqueue.c
+  dequeue.c
+  main.c
+  cli.c
+
+  MULTIARCH_SOURCES
+  enqueue.c
+  dequeue.c
+
+  COMPONENT
+  vpp-plugin-snort
+)
+
+# DAQ
+
+find_path(LIBDAQ_INCLUDE_DIR NAMES daq_module_api.h daq_dlt.h daq_version.h)
+
+if (NOT LIBDAQ_INCLUDE_DIR)
+  message(WARNING "-- libdaq headers not found - snort3 DAQ disabled")
+  return()
+endif()
+
+file(STRINGS ${LIBDAQ_INCLUDE_DIR}/daq_version.h daq_version)
+foreach(l ${daq_version})
+  if (l MATCHES "^#define[\t ]*DAQ_")
+    STRING(REGEX REPLACE "^#define[\t ]*([A-Z1-9_]+)[\t ]*(.+)" "\\1;\\2" v "${l}")
+    list(GET v 0 name)
+    list(GET v 1 value)
+    set(${name} ${value})
+  endif()
+endforeach()
+
+set(DAQ_VER "${DAQ_VERSION_MAJOR}.${DAQ_VERSION_MINOR}.${DAQ_VERSION_PATCH}")
+message(STATUS "libdaq ${DAQ_VER} include files found at ${LIBDAQ_INCLUDE_DIR}")
+
+if (NOT DAQ_VERSION_MAJOR MATCHES 3)
+  message(WARNING "-- libdaq version not supported - snort3 DAQ disabled")
+  return()
+endif()
+
+add_library(daq_vpp SHARED daq_vpp.c)
+set_target_properties(daq_vpp PROPERTIES SOVERSION ${VPP_LIB_VERSION})
+target_compile_options (daq_vpp PRIVATE "-fvisibility=hidden")
+target_compile_options (daq_vpp PRIVATE "-DHAVE_VISIBILITY")
+target_compile_options (daq_vpp PRIVATE "-I${LIBDAQ_INCLUDE_DIR}")
+install(TARGETS daq_vpp DESTINATION ${VPP_LIBRARY_DIR}/daq COMPONENT vpp-plugin-snort)
+
diff --git a/src/plugins/snort/cli.c b/src/plugins/snort/cli.c
new file mode 100644 (file)
index 0000000..cbb33c7
--- /dev/null
@@ -0,0 +1,282 @@
+/* SPDX-License-Identifier: Apache-2.0
+ * Copyright(c) 2021 Cisco Systems, Inc.
+ */
+
+#include <vlib/vlib.h>
+#include <vnet/vnet.h>
+#include <snort/snort.h>
+
+static u8 *
+format_snort_instance (u8 *s, va_list *args)
+{
+  snort_instance_t *i = va_arg (*args, snort_instance_t *);
+  s = format (s, "%s [idx:%d sz:%d fd:%d]", i->name, i->index, i->shm_size,
+             i->shm_fd);
+
+  return s;
+}
+
+static clib_error_t *
+snort_create_instance_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                                 vlib_cli_command_t *cmd)
+{
+  unformat_input_t _line_input, *line_input = &_line_input;
+  clib_error_t *err = 0;
+  u8 *name = 0;
+  u32 queue_size = 1024;
+  u8 drop_on_diconnect = 1;
+
+  /* Get a line of input. */
+  if (!unformat_user (input, unformat_line_input, line_input))
+    return 0;
+
+  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (line_input, "queue-size %u", &queue_size))
+       ;
+      else if (unformat (line_input, "on-disconnect drop"))
+       drop_on_diconnect = 1;
+      else if (unformat (line_input, "on-disconnect pass"))
+       drop_on_diconnect = 0;
+      else if (unformat (line_input, "name %s", &name))
+       ;
+      else
+       {
+         err = clib_error_return (0, "unknown input `%U'",
+                                  format_unformat_error, input);
+         goto done;
+       }
+    }
+
+  if (!is_pow2 (queue_size))
+    {
+      err = clib_error_return (0, "Queue size must be a power of two");
+      goto done;
+    }
+
+  if (!name)
+    {
+      err = clib_error_return (0, "please specify instance name");
+      goto done;
+    }
+
+  err = snort_instance_create (vm, (char *) name, min_log2 (queue_size),
+                              drop_on_diconnect);
+
+done:
+  vec_free (name);
+  unformat_free (line_input);
+  return err;
+}
+
+VLIB_CLI_COMMAND (snort_create_instance_command, static) = {
+  .path = "snort create-instance",
+  .short_help = "snort create-instaince name <name> [queue-size <size>] "
+               "[on-disconnect drop|pass]",
+  .function = snort_create_instance_command_fn,
+};
+
+static clib_error_t *
+snort_attach_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                        vlib_cli_command_t *cmd)
+{
+  unformat_input_t _line_input, *line_input = &_line_input;
+  vnet_main_t *vnm = vnet_get_main ();
+  clib_error_t *err = 0;
+  u8 *name = 0;
+  u32 sw_if_index = ~0;
+
+  /* Get a line of input. */
+  if (!unformat_user (input, unformat_line_input, line_input))
+    return 0;
+
+  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (line_input, "interface %U", unformat_vnet_sw_interface,
+                   vnm, &sw_if_index))
+       ;
+      else if (unformat (line_input, "instance %s", &name))
+       ;
+      else
+       {
+         err = clib_error_return (0, "unknown input `%U'",
+                                  format_unformat_error, input);
+         goto done;
+       }
+    }
+
+  if (sw_if_index == ~0)
+    {
+      err = clib_error_return (0, "please specify interface");
+      goto done;
+    }
+
+  if (!name)
+    {
+      err = clib_error_return (0, "please specify instance name");
+      goto done;
+    }
+
+  err = snort_interface_enable_disable (vm, (char *) name, sw_if_index, 1);
+
+done:
+  vec_free (name);
+  unformat_free (line_input);
+  return err;
+}
+
+VLIB_CLI_COMMAND (snort_attach_command, static) = {
+  .path = "snort attach",
+  .short_help = "snort attach instance <name> interface <if-name>",
+  .function = snort_attach_command_fn,
+};
+
+static clib_error_t *
+snort_detach_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                        vlib_cli_command_t *cmd)
+{
+  unformat_input_t _line_input, *line_input = &_line_input;
+  vnet_main_t *vnm = vnet_get_main ();
+  clib_error_t *err = 0;
+  u32 sw_if_index = ~0;
+
+  /* Get a line of input. */
+  if (!unformat_user (input, unformat_line_input, line_input))
+    return 0;
+
+  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (line_input, "interface %U", unformat_vnet_sw_interface,
+                   vnm, &sw_if_index))
+       ;
+      else
+       {
+         err = clib_error_return (0, "unknown input `%U'",
+                                  format_unformat_error, input);
+         goto done;
+       }
+    }
+
+  if (sw_if_index == ~0)
+    {
+      err = clib_error_return (0, "please specify interface");
+      goto done;
+    }
+
+  err = snort_interface_enable_disable (vm, 0, sw_if_index, 0);
+
+done:
+  unformat_free (line_input);
+  return err;
+}
+
+VLIB_CLI_COMMAND (snort_detach_command, static) = {
+  .path = "snort detach",
+  .short_help = "snort detach interface <if-name>",
+  .function = snort_detach_command_fn,
+};
+
+static clib_error_t *
+snort_show_instances_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                                vlib_cli_command_t *cmd)
+{
+  snort_main_t *sm = &snort_main;
+  snort_instance_t *si;
+
+  pool_foreach (si, sm->instances)
+    vlib_cli_output (vm, "%U", format_snort_instance, si);
+
+  return 0;
+}
+
+VLIB_CLI_COMMAND (snort_show_instances_command, static) = {
+  .path = "show snort instances",
+  .short_help = "show snort instances",
+  .function = snort_show_instances_command_fn,
+};
+
+static clib_error_t *
+snort_show_interfaces_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                                 vlib_cli_command_t *cmd)
+{
+  snort_main_t *sm = &snort_main;
+  vnet_main_t *vnm = vnet_get_main ();
+  snort_instance_t *si;
+  u32 *index;
+
+  vlib_cli_output (vm, "interface\tsnort instance");
+  vec_foreach (index, sm->instance_by_sw_if_index)
+    {
+      if (index[0] != ~0)
+       {
+         si = vec_elt_at_index (sm->instances, index[0]);
+         vlib_cli_output (vm, "%U:\t%s", format_vnet_sw_if_index_name, vnm,
+                          index - sm->instance_by_sw_if_index, si->name);
+       }
+    }
+  return 0;
+}
+
+VLIB_CLI_COMMAND (snort_show_interfaces_command, static) = {
+  .path = "show snort interfaces",
+  .short_help = "show snort interfaces",
+  .function = snort_show_interfaces_command_fn,
+};
+
+static clib_error_t *
+snort_show_clients_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                              vlib_cli_command_t *cmd)
+{
+  snort_main_t *sm = &snort_main;
+  vlib_cli_output (vm, "number of clients: %d", pool_elts (sm->clients));
+  return 0;
+}
+
+VLIB_CLI_COMMAND (snort_show_clients_command, static) = {
+  .path = "show snort clients",
+  .short_help = "show snort clients",
+  .function = snort_show_clients_command_fn,
+};
+
+static clib_error_t *
+snort_mode_polling_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                              vlib_cli_command_t *cmd)
+{
+  return snort_set_node_mode (vm, VLIB_NODE_STATE_POLLING);
+}
+
+static clib_error_t *
+snort_mode_interrupt_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                                vlib_cli_command_t *cmd)
+{
+  return snort_set_node_mode (vm, VLIB_NODE_STATE_INTERRUPT);
+}
+
+VLIB_CLI_COMMAND (snort_mode_polling_command, static) = {
+  .path = "snort mode polling",
+  .short_help = "snort mode polling|interrupt",
+  .function = snort_mode_polling_command_fn,
+};
+
+VLIB_CLI_COMMAND (snort_mode_interrupt_command, static) = {
+  .path = "snort mode interrupt",
+  .short_help = "snort mode polling|interrupt",
+  .function = snort_mode_interrupt_command_fn,
+};
+
+static clib_error_t *
+snort_show_mode_command_fn (vlib_main_t *vm, unformat_input_t *input,
+                           vlib_cli_command_t *cmd)
+{
+  snort_main_t *sm = &snort_main;
+  char *mode =
+    sm->input_mode == VLIB_NODE_STATE_POLLING ? "polling" : "interrupt";
+  vlib_cli_output (vm, "input mode: %s", mode);
+  return 0;
+}
+
+VLIB_CLI_COMMAND (snort_show_mode_command, static) = {
+  .path = "show snort mode",
+  .short_help = "show snort mode",
+  .function = snort_show_mode_command_fn,
+};
diff --git a/src/plugins/snort/daq_vpp.c b/src/plugins/snort/daq_vpp.c
new file mode 100644 (file)
index 0000000..090b28a
--- /dev/null
@@ -0,0 +1,693 @@
+/* SPDX-License-Identifier: Apache-2.0
+ * Copyright(c) 2021 Cisco Systems, Inc.
+ */
+
+#define _GNU_SOURCE
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/mman.h>
+#include <errno.h>
+#include <sys/epoll.h>
+
+#include <vppinfra/cache.h>
+#include <daq_dlt.h>
+#include <daq_module_api.h>
+
+#include "daq_vpp.h"
+
+#define DAQ_VPP_VERSION 1
+
+#if __x86_64__
+#define VPP_DAQ_PAUSE() __builtin_ia32_pause ()
+#elif defined(__aarch64__) || defined(__arm__)
+#define VPP_DAQ_PAUSE() __asm__("yield")
+#else
+#define VPP_DAQ_PAUSE()
+#endif
+
+static DAQ_VariableDesc_t vpp_variable_descriptions[] = {
+  { "debug", "Enable debugging output to stdout",
+    DAQ_VAR_DESC_FORBIDS_ARGUMENT },
+};
+
+static DAQ_BaseAPI_t daq_base_api;
+
+#define SET_ERROR(modinst, ...) daq_base_api.set_errbuf (modinst, __VA_ARGS__)
+
+typedef struct _vpp_msg_pool
+{
+  DAQ_MsgPoolInfo_t info;
+} VPPMsgPool;
+
+typedef struct _vpp_desc_data
+{
+  uint32_t index;
+  uint32_t qpair_index;
+  DAQ_Msg_t msg;
+  DAQ_PktHdr_t pkthdr;
+} VPPDescData;
+
+typedef struct _vpp_bpool
+{
+  int fd;
+  uint32_t size;
+  void *base;
+} VPPBufferPool;
+
+typedef struct _vpp_qpair
+{
+  CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
+  uint32_t queue_size;
+  daq_vpp_desc_t *descs;
+  uint32_t *enq_ring;
+  uint32_t *deq_ring;
+  volatile uint32_t *enq_head;
+  volatile uint32_t *deq_head;
+  uint32_t next_desc;
+  int enq_fd;
+  int deq_fd;
+  VPPDescData *desc_data;
+  volatile int lock;
+} VPPQueuePair;
+
+typedef enum
+{
+  DAQ_VPP_INPUT_MODE_INTERRUPT = 0,
+  DAQ_VPP_INPUT_MODE_POLLING,
+} daq_vpp_input_mode_t;
+
+typedef struct _vpp_context
+{
+  /* config */
+  bool debug;
+
+  /* state */
+  uint32_t intf_count;
+  DAQ_ModuleInstance_h modinst;
+  VPPMsgPool pool;
+
+  /* socket */
+  int sock_fd;
+
+  /* shared memory */
+  uint32_t shm_size;
+  void *shm_base;
+  int shm_fd;
+
+  /* queue pairs */
+  uint8_t num_qpairs;
+  VPPQueuePair *qpairs;
+  uint32_t next_qpair;
+
+  /* epoll */
+  struct epoll_event *epoll_events;
+  int epoll_fd;
+
+  /* buffer pools */
+  uint8_t num_bpools;
+  VPPBufferPool *bpools;
+
+  daq_vpp_input_mode_t input_mode;
+  const char *socket_name;
+} VPP_Context_t;
+
+static VPP_Context_t *global_vpp_ctx = 0;
+
+static int
+vpp_daq_qpair_lock (VPPQueuePair *p)
+{
+  int free = 0;
+  while (!__atomic_compare_exchange_n (&p->lock, &free, 1, 0, __ATOMIC_ACQUIRE,
+                                      __ATOMIC_RELAXED))
+    {
+      while (__atomic_load_n (&p->lock, __ATOMIC_RELAXED))
+       VPP_DAQ_PAUSE ();
+      free = 0;
+    }
+  return 0;
+}
+
+static void
+vpp_daq_qpair_unlock (VPPQueuePair *p)
+{
+  __atomic_store_n (&p->lock, 0, __ATOMIC_RELEASE);
+}
+
+static int
+vpp_daq_module_load (const DAQ_BaseAPI_t *base_api)
+{
+  if (base_api->api_version != DAQ_BASE_API_VERSION ||
+      base_api->api_size != sizeof (DAQ_BaseAPI_t))
+    return DAQ_ERROR;
+
+  daq_base_api = *base_api;
+
+  return DAQ_SUCCESS;
+}
+
+static int
+vpp_daq_module_unload (void)
+{
+  memset (&daq_base_api, 0, sizeof (daq_base_api));
+  return DAQ_SUCCESS;
+}
+
+static int
+vpp_daq_get_variable_descs (const DAQ_VariableDesc_t **var_desc_table)
+{
+  *var_desc_table = vpp_variable_descriptions;
+
+  return sizeof (vpp_variable_descriptions) / sizeof (DAQ_VariableDesc_t);
+}
+
+static int
+vpp_daq_recvmsg (int fd, daq_vpp_msg_t *msg, int n_fds, int *fds)
+{
+  const int ctl_sz =
+    CMSG_SPACE (sizeof (int) * n_fds) + CMSG_SPACE (sizeof (struct ucred));
+  char ctl[ctl_sz];
+  struct msghdr mh = {};
+  struct iovec iov[1];
+  struct cmsghdr *cmsg;
+
+  iov[0].iov_base = (void *) msg;
+  iov[0].iov_len = sizeof (daq_vpp_msg_t);
+  mh.msg_iov = iov;
+  mh.msg_iovlen = 1;
+  mh.msg_control = ctl;
+  mh.msg_controllen = ctl_sz;
+
+  memset (ctl, 0, ctl_sz);
+
+  int rv;
+  if ((rv = recvmsg (fd, &mh, 0)) != sizeof (daq_vpp_msg_t))
+    return DAQ_ERROR_NODEV;
+
+  cmsg = CMSG_FIRSTHDR (&mh);
+  while (cmsg)
+    {
+      if (cmsg->cmsg_level == SOL_SOCKET)
+       {
+         if (cmsg->cmsg_type == SCM_CREDENTIALS)
+           {
+             /* Do nothing */;
+           }
+         else if (cmsg->cmsg_type == SCM_RIGHTS)
+           {
+             memcpy (fds, CMSG_DATA (cmsg), n_fds * sizeof (int));
+           }
+       }
+      cmsg = CMSG_NXTHDR (&mh, cmsg);
+    }
+
+  return DAQ_SUCCESS;
+}
+
+static void
+vpp_daq_destroy (void *handle)
+{
+  VPP_Context_t *vc = (VPP_Context_t *) handle;
+
+  if (vc->shm_base != MAP_FAILED)
+    munmap (vc->shm_base, vc->shm_size);
+
+  if (vc->shm_fd != -1)
+    close (vc->shm_fd);
+
+  if (vc->bpools)
+    {
+      for (int i = 0; i < vc->num_bpools; i++)
+       {
+         VPPBufferPool *bp = vc->bpools + i;
+         if (bp->fd != -1)
+           close (bp->fd);
+         if (bp->base && bp->base != MAP_FAILED)
+           munmap (bp->base, bp->size);
+       }
+      free (vc->bpools);
+    }
+
+  if (vc->qpairs)
+    {
+      for (int i = 0; i < vc->num_qpairs; i++)
+       {
+         VPPQueuePair *qp = vc->qpairs + i;
+         if (qp->enq_fd != -1)
+           close (qp->enq_fd);
+         if (qp->deq_fd != -1)
+           close (qp->deq_fd);
+         if (qp->desc_data)
+           free (qp->desc_data);
+       }
+      free (vc->qpairs);
+    }
+
+  free (vc->epoll_events);
+  close (vc->sock_fd);
+  if (vc->epoll_fd != -1)
+    close (vc->epoll_fd);
+  free (vc);
+}
+
+#define ERR(rv, ...)                                                          \
+  {                                                                           \
+    SET_ERROR (modinst, __VA_ARGS__);                                         \
+    rval = rv;                                                                \
+    goto err;                                                                 \
+  }
+
+static int
+vpp_daq_instantiate (const DAQ_ModuleConfig_h modcfg,
+                    DAQ_ModuleInstance_h modinst, void **ctxt_ptr)
+{
+  VPP_Context_t *vc = 0;
+  int rval = DAQ_ERROR;
+  daq_vpp_msg_t msg;
+  struct sockaddr_un sun = { .sun_family = AF_UNIX };
+  int i, fd = -1, shm_fd = -1;
+  const char *input;
+  uint8_t *base;
+
+  if (global_vpp_ctx)
+    {
+      *ctxt_ptr = global_vpp_ctx;
+      return DAQ_SUCCESS;
+    }
+
+  vc = calloc (1, sizeof (VPP_Context_t));
+
+  if (!vc)
+    ERR (DAQ_ERROR_NOMEM,
+        "%s: Couldn't allocate memory for the new VPP context!", __func__);
+
+  const char *varKey, *varValue;
+  daq_base_api.config_first_variable (modcfg, &varKey, &varValue);
+  while (varKey)
+    {
+      if (!strcmp (varKey, "debug"))
+       vc->debug = true;
+      else if (!strcmp (varKey, "input_mode"))
+       {
+         if (!strcmp (varValue, "interrupt"))
+           vc->input_mode = DAQ_VPP_INPUT_MODE_INTERRUPT;
+         else if (!strcmp (varValue, "polling"))
+           vc->input_mode = DAQ_VPP_INPUT_MODE_POLLING;
+       }
+      else if (!strcmp (varKey, "socket_name"))
+       {
+         vc->socket_name = varValue;
+       }
+      daq_base_api.config_next_variable (modcfg, &varKey, &varValue);
+    }
+
+  input = daq_base_api.config_get_input (modcfg);
+
+  if (!vc->socket_name)
+    /* try to use default socket path */
+    vc->socket_name = DAQ_VPP_DEFAULT_SOCKET_PATH;
+
+  if ((fd = socket (AF_UNIX, SOCK_SEQPACKET, 0)) < 0)
+    ERR (DAQ_ERROR_NODEV, "%s: Couldn't create socket!", __func__);
+
+  strncpy (sun.sun_path, vc->socket_name, sizeof (sun.sun_path) - 1);
+
+  if (connect (fd, (struct sockaddr *) &sun, sizeof (struct sockaddr_un)) != 0)
+    ERR (DAQ_ERROR_NODEV, "%s: Couldn't connect to socket! '%s'", __func__,
+        vc->socket_name);
+
+  /* craft and send connect message */
+  msg.type = DAQ_VPP_MSG_TYPE_HELLO;
+  snprintf ((char *) &msg.hello.inst_name, DAQ_VPP_INST_NAME_LEN - 1, "%s",
+           input);
+
+  if (send (fd, &msg, sizeof (msg), 0) != sizeof (msg))
+    ERR (DAQ_ERROR_NODEV, "%s: Couldn't send connect message!", __func__);
+
+  /* receive config message */
+  rval = vpp_daq_recvmsg (fd, &msg, 1, &shm_fd);
+
+  if (rval != DAQ_SUCCESS || msg.type != DAQ_VPP_MSG_TYPE_CONFIG ||
+      shm_fd == -1)
+    ERR (DAQ_ERROR_NODEV, "%s: Couldn't receive config message!", __func__);
+
+  vc->modinst = modinst;
+  vc->sock_fd = fd;
+  vc->epoll_fd = -1;
+  vc->intf_count = 1;
+  vc->num_bpools = msg.config.num_bpools;
+  vc->num_qpairs = msg.config.num_qpairs;
+  vc->shm_size = msg.config.shm_size;
+  vc->shm_fd = shm_fd;
+
+  vc->bpools = calloc (vc->num_bpools, sizeof (VPPBufferPool));
+  vc->qpairs = calloc (vc->num_qpairs, sizeof (VPPQueuePair));
+  vc->epoll_events = calloc (vc->num_qpairs, sizeof (struct epoll_event));
+
+  if (vc->bpools == 0 || vc->qpairs == 0)
+    ERR (DAQ_ERROR_NOMEM,
+        "%s: Couldn't allocate memory for the new VPP context!", __func__);
+
+  for (i = 0; i < vc->num_bpools; i++)
+    vc->bpools[i].fd = -1;
+
+  base =
+    mmap (0, vc->shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, vc->shm_fd, 0);
+
+  if (base == MAP_FAILED)
+    ERR (DAQ_ERROR_NOMEM,
+        "%s: Couldn't map shared memory for the new VPP context!", __func__);
+
+  vc->shm_base = base;
+
+  if (vc->debug)
+    {
+      printf ("[%s]\n", input);
+      printf ("  Shared memory size: %u\n", vc->shm_size);
+      printf ("  Number of buffer pools: %u\n", vc->num_bpools);
+      printf ("  Number of queue pairs: %u\n", vc->num_qpairs);
+    }
+
+  /* receive buffer pools */
+  for (int i = 0; i < vc->num_bpools; i++)
+    {
+      VPPBufferPool *bp = vc->bpools + i;
+      rval = vpp_daq_recvmsg (fd, &msg, 1, &bp->fd);
+      if (rval != DAQ_SUCCESS || msg.type != DAQ_VPP_MSG_TYPE_BPOOL ||
+         bp->fd == -1)
+       ERR (DAQ_ERROR_NODEV,
+            "%s: Failed to receive buffer pool message for the new "
+            "VPP context!",
+            __func__);
+      bp->size = msg.bpool.size;
+      bp->base = mmap (0, bp->size, PROT_READ, MAP_SHARED, bp->fd, 0);
+
+      if (bp->base == MAP_FAILED)
+       ERR (DAQ_ERROR_NOMEM,
+            "%s: Couldn't map shared memory for the new VPP context!",
+            __func__);
+      if (vc->debug)
+       printf ("  Buffer pool %u size: %u\n", i, bp->size);
+    }
+
+  if ((vc->epoll_fd = epoll_create (1)) == -1)
+    ERR (DAQ_ERROR_NODEV,
+        "%s: Couldn't create epoll fd for the new VPP context!", __func__);
+
+  /* receive queue pairs */
+  for (int i = 0; i < vc->num_qpairs; i++)
+    {
+      struct epoll_event ev = { .events = EPOLLIN };
+      int fds[2] = { -1, -1 };
+      uint32_t qsz;
+      VPPQueuePair *qp = vc->qpairs + i;
+      rval = vpp_daq_recvmsg (fd, &msg, 2, fds);
+      if (rval != DAQ_SUCCESS || msg.type != DAQ_VPP_MSG_TYPE_QPAIR ||
+         fds[0] == -1 || fds[1] == -1)
+       ERR (DAQ_ERROR_NODEV,
+            "%s: Failed to receive queu pair message for the new "
+            "VPP context!",
+            __func__);
+      qp->queue_size = 1 << msg.qpair.log2_queue_size;
+      qp->descs = (daq_vpp_desc_t *) (base + msg.qpair.desc_table_offset);
+      qp->enq_ring = (uint32_t *) (base + msg.qpair.enq_ring_offset);
+      qp->deq_ring = (uint32_t *) (base + msg.qpair.deq_ring_offset);
+      qp->enq_head = (uint32_t *) (base + msg.qpair.enq_head_offset);
+      qp->deq_head = (uint32_t *) (base + msg.qpair.deq_head_offset);
+      qp->enq_fd = fds[0];
+      qp->deq_fd = fds[1];
+      ev.data.u32 = i;
+
+      if (epoll_ctl (vc->epoll_fd, EPOLL_CTL_ADD, qp->enq_fd, &ev) == -1)
+       ERR (DAQ_ERROR_NODEV,
+            "%s: Failed to dequeue fd to epoll instance for the new "
+            "VPP context!",
+            __func__);
+
+      qsz = qp->queue_size;
+
+      qp->desc_data = calloc (qsz, sizeof (VPPDescData));
+      if (!qp->desc_data)
+       ERR (DAQ_ERROR_NOMEM,
+            "%s: Couldn't allocate memory for the new VPP context!",
+            __func__);
+
+      for (int j = 0; j < qsz; j++)
+       {
+         VPPDescData *dd = qp->desc_data + j;
+         DAQ_PktHdr_t *pkthdr = &dd->pkthdr;
+         DAQ_Msg_t *msg = &dd->msg;
+
+         dd->index = j;
+         dd->qpair_index = i;
+
+         pkthdr->ingress_group = DAQ_PKTHDR_UNKNOWN;
+         pkthdr->egress_group = DAQ_PKTHDR_UNKNOWN;
+
+         msg->type = DAQ_MSG_TYPE_PACKET;
+         msg->hdr_len = sizeof (DAQ_PktHdr_t);
+         msg->hdr = pkthdr;
+         msg->owner = vc->modinst;
+         msg->priv = dd;
+       }
+
+      if (vc->debug)
+       {
+         printf ("  Queue pair %u:\n", i);
+         printf ("    Size: %u\n", qp->queue_size);
+         printf ("    Enqueue fd: %u\n", qp->enq_fd);
+         printf ("    Dequeue fd: %u\n", qp->deq_fd);
+       }
+    }
+
+  *ctxt_ptr = global_vpp_ctx = vc;
+  return DAQ_SUCCESS;
+err:
+  if (vc)
+    vpp_daq_destroy (vc);
+  else if (fd != -1)
+    close (fd);
+  return rval;
+}
+
+static int
+vpp_daq_start (void *handle)
+{
+  return DAQ_SUCCESS;
+}
+
+static int
+vpp_daq_get_stats (void *handle, DAQ_Stats_t *stats)
+{
+  memset (stats, 0, sizeof (DAQ_Stats_t));
+  return DAQ_SUCCESS;
+}
+
+static void
+vpp_daq_reset_stats (void *handle)
+{
+}
+
+static uint32_t
+vpp_daq_get_capabilities (void *handle)
+{
+  uint32_t capabilities = DAQ_CAPA_BLOCK | DAQ_CAPA_UNPRIV_START;
+  return capabilities;
+}
+
+static int
+vpp_daq_get_datalink_type (void *handle)
+{
+  return DLT_IPV4;
+}
+
+static inline uint32_t
+vpp_daq_msg_receive_one (VPP_Context_t *vc, VPPQueuePair *qp,
+                        const DAQ_Msg_t *msgs[], unsigned max_recv)
+{
+  uint32_t n_recv, n_left;
+  uint32_t head, next, mask = qp->queue_size - 1;
+
+  if (max_recv == 0)
+    return 0;
+
+  vpp_daq_qpair_lock (qp);
+  next = qp->next_desc;
+  head = __atomic_load_n (qp->enq_head, __ATOMIC_ACQUIRE);
+  n_recv = n_left = head - next;
+
+  if (n_left > max_recv)
+    {
+      n_left = n_recv = max_recv;
+    }
+
+  while (n_left--)
+    {
+      uint32_t desc_index = qp->enq_ring[next & mask];
+      daq_vpp_desc_t *d = qp->descs + desc_index;
+      VPPDescData *dd = qp->desc_data + desc_index;
+      dd->pkthdr.pktlen = d->length;
+      dd->pkthdr.address_space_id = d->address_space_id;
+      dd->msg.data = vc->bpools[d->buffer_pool].base + d->offset;
+      next = next + 1;
+
+      msgs[0] = &dd->msg;
+      msgs++;
+    }
+
+  qp->next_desc = next;
+  vpp_daq_qpair_unlock (qp);
+
+  return n_recv;
+}
+
+static unsigned
+vpp_daq_msg_receive (void *handle, const unsigned max_recv,
+                    const DAQ_Msg_t *msgs[], DAQ_RecvStatus *rstat)
+{
+  VPP_Context_t *vc = (VPP_Context_t *) handle;
+  uint32_t n_qpairs_left = vc->num_qpairs;
+  uint32_t n, n_events, n_recv = 0;
+
+  /* first, we visit all qpairs. If we find any work there then we can give
+   * it back immediatelly. To avoid bias towards qpair 0 we remeber what
+   * next qpair */
+  while (n_qpairs_left)
+    {
+      VPPQueuePair *qp = vc->qpairs + vc->next_qpair;
+
+      if ((n = vpp_daq_msg_receive_one (vc, qp, msgs, max_recv - n_recv)))
+       {
+         msgs += n;
+         n_recv += n;
+       }
+
+      /* next */
+      vc->next_qpair++;
+      if (vc->next_qpair == vc->num_qpairs)
+       vc->next_qpair = 0;
+      n_qpairs_left--;
+    }
+
+  if (vc->input_mode == DAQ_VPP_INPUT_MODE_POLLING)
+    {
+      *rstat = DAQ_RSTAT_OK;
+      return n_recv;
+    }
+
+  if (n_recv)
+    {
+      *rstat = DAQ_RSTAT_OK;
+      return n_recv;
+    }
+
+  n_events = epoll_wait (vc->epoll_fd, vc->epoll_events, vc->num_qpairs, 1000);
+
+  if (n_events < 1)
+    {
+      *rstat = n_events == -1 ? DAQ_RSTAT_ERROR : DAQ_RSTAT_TIMEOUT;
+      return 0;
+    }
+
+  for (int i = 0; i < n_events; i++)
+    {
+      uint64_t ctr;
+      VPPQueuePair *qp = vc->qpairs + vc->epoll_events[i].data.u32;
+
+      if ((n = vpp_daq_msg_receive_one (vc, qp, msgs, max_recv - n_recv)))
+       {
+         msgs += n;
+         n_recv += n;
+       }
+
+      (void) read (qp->enq_fd, &ctr, sizeof (ctr));
+    }
+
+  *rstat = DAQ_RSTAT_OK;
+  return n_recv;
+}
+
+static int
+vpp_daq_msg_finalize (void *handle, const DAQ_Msg_t *msg, DAQ_Verdict verdict)
+{
+  VPP_Context_t *vc = (VPP_Context_t *) handle;
+  VPPDescData *dd = msg->priv;
+  VPPQueuePair *qp = vc->qpairs + dd->qpair_index;
+  daq_vpp_desc_t *d;
+  uint32_t mask, head;
+  uint64_t counter_increment = 1;
+  int rv, retv = DAQ_SUCCESS;
+
+  vpp_daq_qpair_lock (qp);
+  mask = qp->queue_size - 1;
+  head = *qp->deq_head;
+  d = qp->descs + dd->index;
+  if (verdict == DAQ_VERDICT_PASS)
+    d->action = DAQ_VPP_ACTION_FORWARD;
+  else
+    d->action = DAQ_VPP_ACTION_DROP;
+
+  qp->deq_ring[head & mask] = dd->index;
+  head = head + 1;
+  __atomic_store_n (qp->deq_head, head, __ATOMIC_RELEASE);
+
+  if (vc->input_mode == DAQ_VPP_INPUT_MODE_INTERRUPT)
+    {
+      rv = write (qp->deq_fd, &counter_increment, sizeof (counter_increment));
+
+      if (rv != sizeof (counter_increment))
+       retv = DAQ_ERROR;
+    }
+
+  vpp_daq_qpair_unlock (qp);
+  return retv;
+}
+
+static int
+vpp_daq_get_msg_pool_info (void *handle, DAQ_MsgPoolInfo_t *info)
+{
+  VPP_Context_t *vc = (VPP_Context_t *) handle;
+
+  vc->pool.info.available = 128;
+  vc->pool.info.size = 256;
+
+  *info = vc->pool.info;
+
+  return DAQ_SUCCESS;
+}
+
+DAQ_SO_PUBLIC
+const DAQ_ModuleAPI_t DAQ_MODULE_DATA = {
+  /* .api_version = */ DAQ_MODULE_API_VERSION,
+  /* .api_size = */ sizeof (DAQ_ModuleAPI_t),
+  /* .module_version = */ DAQ_VPP_VERSION,
+  /* .name = */ "vpp",
+  /* .type = */ DAQ_TYPE_INTF_CAPABLE | DAQ_TYPE_INLINE_CAPABLE |
+    DAQ_TYPE_MULTI_INSTANCE,
+  /* .load = */ vpp_daq_module_load,
+  /* .unload = */ vpp_daq_module_unload,
+  /* .get_variable_descs = */ vpp_daq_get_variable_descs,
+  /* .instantiate = */ vpp_daq_instantiate,
+  /* .destroy = */ vpp_daq_destroy,
+  /* .set_filter = */ NULL,
+  /* .start = */ vpp_daq_start,
+  /* .inject = */ NULL,
+  /* .inject_relative = */ NULL,
+  /* .interrupt = */ NULL,
+  /* .stop = */ NULL,
+  /* .ioctl = */ NULL,
+  /* .get_stats = */ vpp_daq_get_stats,
+  /* .reset_stats = */ vpp_daq_reset_stats,
+  /* .get_snaplen = */ NULL,
+  /* .get_capabilities = */ vpp_daq_get_capabilities,
+  /* .get_datalink_type = */ vpp_daq_get_datalink_type,
+  /* .config_load = */ NULL,
+  /* .config_swap = */ NULL,
+  /* .config_free = */ NULL,
+  /* .msg_receive = */ vpp_daq_msg_receive,
+  /* .msg_finalize = */ vpp_daq_msg_finalize,
+  /* .get_msg_pool_info = */ vpp_daq_get_msg_pool_info,
+};
diff --git a/src/plugins/snort/daq_vpp.h b/src/plugins/snort/daq_vpp.h
new file mode 100644 (file)
index 0000000..3b875aa
--- /dev/null
@@ -0,0 +1,77 @@
+/* SPDX-License-Identifier: Apache-2.0
+ * Copyright(c) 2021 Cisco Systems, Inc.
+ */
+
+#ifndef __DAQ_VPP_H__
+#define __DAQ_VPP_H__
+
+#include <stdint.h>
+
+#define DAQ_VPP_DEFAULT_SOCKET_FILE "snort.sock"
+#define DAQ_VPP_DEFAULT_SOCKET_PATH "/run/vpp/" DAQ_VPP_DEFAULT_SOCKET_FILE
+#define DAQ_VPP_INST_NAME_LEN      32
+
+typedef enum memif_msg_type
+{
+  DAQ_VPP_MSG_TYPE_NONE = 0,
+  DAQ_VPP_MSG_TYPE_HELLO = 1,
+  DAQ_VPP_MSG_TYPE_CONFIG = 2,
+  DAQ_VPP_MSG_TYPE_BPOOL = 3,
+  DAQ_VPP_MSG_TYPE_QPAIR = 4,
+} daq_vpp_msg_type_t;
+
+typedef struct
+{
+  char inst_name[DAQ_VPP_INST_NAME_LEN];
+} daq_vpp_msg_hello_t;
+
+typedef struct
+{
+  uint32_t shm_size;
+  uint16_t num_bpools;
+  uint16_t num_qpairs;
+} daq_vpp_msg_config_t;
+
+typedef struct
+{
+  uint32_t size;
+} daq_vpp_msg_bpool_t;
+
+typedef struct
+{
+  uint8_t log2_queue_size;
+  uint32_t desc_table_offset;
+  uint32_t enq_head_offset;
+  uint32_t deq_head_offset;
+  uint32_t enq_ring_offset;
+  uint32_t deq_ring_offset;
+} daq_vpp_msg_qpair_t;
+
+typedef struct
+{
+  daq_vpp_msg_type_t type : 8;
+  union
+  {
+    daq_vpp_msg_hello_t hello;
+    daq_vpp_msg_config_t config;
+    daq_vpp_msg_bpool_t bpool;
+    daq_vpp_msg_qpair_t qpair;
+  };
+} daq_vpp_msg_t;
+
+typedef enum
+{
+  DAQ_VPP_ACTION_DROP,
+  DAQ_VPP_ACTION_FORWARD,
+} daq_vpp_action_t;
+
+typedef struct
+{
+  uint32_t offset;
+  uint16_t length;
+  uint16_t address_space_id;
+  uint8_t buffer_pool;
+  daq_vpp_action_t action : 8;
+} daq_vpp_desc_t;
+
+#endif /* __DAQ_VPP_H__ */
diff --git a/src/plugins/snort/dequeue.c b/src/plugins/snort/dequeue.c
new file mode 100644 (file)
index 0000000..d597b88
--- /dev/null
@@ -0,0 +1,366 @@
+/* SPDX-License-Identifier: Apache-2.0
+ * Copyright(c) 2021 Cisco Systems, Inc.
+ */
+
+#include <vlib/vlib.h>
+#include <vnet/feature/feature.h>
+#include <snort/snort.h>
+
+typedef struct
+{
+  u32 next_index;
+  u32 sw_if_index;
+} snort_deq_trace_t;
+
+static u8 *
+format_snort_deq_trace (u8 *s, va_list *args)
+{
+  CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
+  CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
+  snort_deq_trace_t *t = va_arg (*args, snort_deq_trace_t *);
+
+  s = format (s, "snort-deq: sw_if_index %d, next index %d\n", t->sw_if_index,
+             t->next_index);
+
+  return s;
+}
+
+#define foreach_snort_deq_error                                               \
+  _ (BAD_DESC, "bad descriptor")                                              \
+  _ (BAD_DESC_INDEX, "bad descriptor index")
+
+typedef enum
+{
+#define _(sym, str) SNORT_DEQ_ERROR_##sym,
+  foreach_snort_deq_error
+#undef _
+    SNORT_DEQ_N_ERROR,
+} snort_deq_error_t;
+
+static char *snort_deq_error_strings[] = {
+#define _(sym, string) string,
+  foreach_snort_deq_error
+#undef _
+};
+
+static_always_inline uword
+snort_deq_instance (vlib_main_t *vm, u32 instance_index, snort_qpair_t *qp,
+                   u32 *buffer_indices, u16 *nexts, u32 max_recv)
+{
+  snort_main_t *sm = &snort_main;
+  snort_per_thread_data_t *ptd =
+    vec_elt_at_index (sm->per_thread_data, vm->thread_index);
+  u32 mask = pow2_mask (qp->log2_queue_size);
+  u32 head, next, n_recv = 0, n_left;
+
+  head = __atomic_load_n (qp->deq_head, __ATOMIC_ACQUIRE);
+  next = qp->next_desc;
+
+  n_left = head - next;
+
+  if (n_left == 0)
+    return 0;
+
+  if (n_left > max_recv)
+    {
+      n_left = max_recv;
+      clib_interrupt_set (ptd->interrupts, instance_index);
+      vlib_node_set_interrupt_pending (vm, snort_deq_node.index);
+    }
+
+  while (n_left)
+    {
+      u32 desc_index, bi;
+      daq_vpp_desc_t *d;
+
+      /* check if descriptor index taken from dequqe ring is valid */
+      if ((desc_index = qp->deq_ring[next & mask]) & ~mask)
+       {
+         vlib_node_increment_counter (vm, snort_deq_node.index,
+                                      SNORT_DEQ_ERROR_BAD_DESC_INDEX, 1);
+         goto next;
+       }
+
+      /* check if descriptor index taken from dequeue ring points to enqueued
+       * buffer */
+      if ((bi = qp->buffer_indices[desc_index]) == ~0)
+       {
+         vlib_node_increment_counter (vm, snort_deq_node.index,
+                                      SNORT_DEQ_ERROR_BAD_DESC, 1);
+         goto next;
+       }
+
+      /* put descriptor back to freelist */
+      vec_add1 (qp->freelist, desc_index);
+      d = qp->descriptors + desc_index;
+      buffer_indices++[0] = bi;
+      if (d->action == DAQ_VPP_ACTION_FORWARD)
+       nexts[0] = qp->next_indices[desc_index];
+      else
+       nexts[0] = SNORT_ENQ_NEXT_DROP;
+      qp->buffer_indices[desc_index] = ~0;
+      nexts++;
+      n_recv++;
+
+      /* next */
+    next:
+      next = next + 1;
+      n_left--;
+    }
+
+  qp->next_desc = next;
+
+  return n_recv;
+}
+
+static_always_inline u32
+snort_process_all_buffer_indices (snort_qpair_t *qp, u32 *b, u16 *nexts,
+                                 u32 max_recv, u8 drop_on_disconnect)
+{
+  u32 *bi, n_processed = 0;
+  u32 desc_index = 0;
+
+  vec_foreach (bi, qp->buffer_indices)
+    {
+      if (n_processed >= max_recv)
+       break;
+
+      if (bi[0] == ~0)
+       continue;
+
+      desc_index = bi - qp->buffer_indices;
+
+      b[0] = bi[0];
+      if (drop_on_disconnect)
+       nexts[0] = SNORT_ENQ_NEXT_DROP;
+      else
+       nexts[0] = qp->next_indices[desc_index];
+      qp->buffer_indices[desc_index] = ~0;
+
+      nexts += 1;
+      b += 1;
+      n_processed += 1;
+    }
+  return n_processed;
+}
+
+static_always_inline uword
+snort_deq_instance_all_interrupt (vlib_main_t *vm, u32 instance_index,
+                                 snort_qpair_t *qp, u32 *buffer_indices,
+                                 u16 *nexts, u32 max_recv,
+                                 u8 drop_on_disconnect)
+{
+  snort_main_t *sm = &snort_main;
+  snort_per_thread_data_t *ptd =
+    vec_elt_at_index (sm->per_thread_data, vm->thread_index);
+  u32 n_processed;
+
+  n_processed = snort_process_all_buffer_indices (
+    qp, buffer_indices, nexts, max_recv, drop_on_disconnect);
+
+  if (n_processed == max_recv)
+    {
+      clib_interrupt_set (ptd->interrupts, instance_index);
+      vlib_node_set_interrupt_pending (vm, snort_deq_node.index);
+    }
+  else
+    {
+      *qp->enq_head = *qp->deq_head = qp->next_desc = 0;
+      snort_freelist_init (qp->freelist);
+      __atomic_store_n (&qp->ready, 1, __ATOMIC_RELEASE);
+    }
+
+  return n_processed;
+}
+
+static u32
+snort_deq_node_interrupt (vlib_main_t *vm, vlib_node_runtime_t *node,
+                         vlib_frame_t *frame)
+{
+  snort_main_t *sm = &snort_main;
+  snort_per_thread_data_t *ptd =
+    vec_elt_at_index (sm->per_thread_data, vm->thread_index);
+  u32 buffer_indices[VLIB_FRAME_SIZE], *bi = buffer_indices;
+  u16 next_indices[VLIB_FRAME_SIZE], *nexts = next_indices;
+  u32 n_left = VLIB_FRAME_SIZE, n;
+  snort_qpair_t *qp;
+  snort_instance_t *si;
+  int inst = -1;
+
+  while ((inst = clib_interrupt_get_next (ptd->interrupts, inst)) != -1)
+    {
+      clib_interrupt_clear (ptd->interrupts, inst);
+      si = vec_elt_at_index (sm->instances, inst);
+      qp = vec_elt_at_index (si->qpairs, vm->thread_index);
+      u32 ready = __atomic_load_n (&qp->ready, __ATOMIC_ACQUIRE);
+      if (!ready)
+       n = snort_deq_instance_all_interrupt (vm, inst, qp, bi, nexts, n_left,
+                                             si->drop_on_disconnect);
+      else
+       n = snort_deq_instance (vm, inst, qp, bi, nexts, n_left);
+
+      n_left -= n;
+      bi += n;
+      nexts += n;
+
+      if (n_left == 0)
+       goto enq;
+    }
+
+  if (n_left == VLIB_FRAME_SIZE)
+    return 0;
+
+enq:
+  n = VLIB_FRAME_SIZE - n_left;
+  vlib_buffer_enqueue_to_next (vm, node, buffer_indices, next_indices, n);
+  return n;
+}
+
+static_always_inline uword
+snort_deq_instance_poll (vlib_main_t *vm, snort_qpair_t *qp,
+                        u32 *buffer_indices, u16 *nexts, u32 max_recv)
+{
+  u32 mask = pow2_mask (qp->log2_queue_size);
+  u32 head, next, n_recv = 0, n_left;
+
+  head = __atomic_load_n (qp->deq_head, __ATOMIC_ACQUIRE);
+  next = qp->next_desc;
+
+  n_left = head - next;
+
+  if (n_left == 0)
+    return 0;
+
+  if (n_left > max_recv)
+    n_left = max_recv;
+
+  while (n_left)
+    {
+      u32 desc_index, bi;
+      daq_vpp_desc_t *d;
+
+      /* check if descriptor index taken from dequqe ring is valid */
+      if ((desc_index = qp->deq_ring[next & mask]) & ~mask)
+       {
+         vlib_node_increment_counter (vm, snort_deq_node.index,
+                                      SNORT_DEQ_ERROR_BAD_DESC_INDEX, 1);
+         goto next;
+       }
+
+      /* check if descriptor index taken from dequeue ring points to enqueued
+       * buffer */
+      if ((bi = qp->buffer_indices[desc_index]) == ~0)
+       {
+         vlib_node_increment_counter (vm, snort_deq_node.index,
+                                      SNORT_DEQ_ERROR_BAD_DESC, 1);
+         goto next;
+       }
+
+      /* put descriptor back to freelist */
+      vec_add1 (qp->freelist, desc_index);
+      d = qp->descriptors + desc_index;
+      buffer_indices++[0] = bi;
+      if (d->action == DAQ_VPP_ACTION_FORWARD)
+       nexts[0] = qp->next_indices[desc_index];
+      else
+       nexts[0] = SNORT_ENQ_NEXT_DROP;
+      qp->buffer_indices[desc_index] = ~0;
+      nexts++;
+      n_recv++;
+
+      /* next */
+    next:
+      next = next + 1;
+      n_left--;
+    }
+
+  qp->next_desc = next;
+
+  return n_recv;
+}
+
+static_always_inline uword
+snort_deq_instance_all_poll (vlib_main_t *vm, snort_qpair_t *qp,
+                            u32 *buffer_indices, u16 *nexts, u32 max_recv,
+                            u8 drop_on_disconnect)
+{
+  u32 n_processed = snort_process_all_buffer_indices (
+    qp, buffer_indices, nexts, max_recv, drop_on_disconnect);
+  if (n_processed < max_recv)
+    {
+      *qp->enq_head = *qp->deq_head = qp->next_desc = 0;
+      snort_freelist_init (qp->freelist);
+      __atomic_store_n (&qp->ready, 1, __ATOMIC_RELEASE);
+    }
+
+  return n_processed;
+}
+
+static u32
+snort_deq_node_polling (vlib_main_t *vm, vlib_node_runtime_t *node,
+                       vlib_frame_t *frame)
+{
+  snort_main_t *sm = &snort_main;
+  u32 buffer_indices[VLIB_FRAME_SIZE], *bi = buffer_indices;
+  u16 next_indices[VLIB_FRAME_SIZE], *nexts = next_indices;
+  u32 n_left = VLIB_FRAME_SIZE, n, n_total = 0;
+  snort_qpair_t *qp;
+  snort_instance_t *si;
+
+  vec_foreach (si, sm->instances)
+    {
+      qp = vec_elt_at_index (si->qpairs, vm->thread_index);
+      u32 ready = __atomic_load_n (&qp->ready, __ATOMIC_ACQUIRE);
+      if (!ready)
+       n = snort_deq_instance_all_poll (vm, qp, bi, nexts, n_left,
+                                        si->drop_on_disconnect);
+      else
+       n = snort_deq_instance_poll (vm, qp, bi, nexts, n_left);
+
+      n_left -= n;
+      bi += n;
+      nexts += n;
+
+      if (n_left == 0)
+       {
+         n = VLIB_FRAME_SIZE - n_left;
+         vlib_buffer_enqueue_to_next (vm, node, buffer_indices, next_indices,
+                                      n);
+         n_left = VLIB_FRAME_SIZE;
+         bi = buffer_indices;
+         nexts = next_indices;
+         n_total += n;
+       }
+    }
+
+  if (n_left < VLIB_FRAME_SIZE)
+    {
+      n = VLIB_FRAME_SIZE - n_left;
+      vlib_buffer_enqueue_to_next (vm, node, buffer_indices, next_indices, n);
+      n_total += n;
+    }
+  return n_total;
+}
+
+VLIB_NODE_FN (snort_deq_node)
+(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
+{
+  snort_main_t *sm = &snort_main;
+  if (sm->input_mode == VLIB_NODE_STATE_POLLING)
+    return snort_deq_node_polling (vm, node, frame);
+  return snort_deq_node_interrupt (vm, node, frame);
+}
+
+VLIB_REGISTER_NODE (snort_deq_node) = {
+  .name = "snort-deq",
+  .vector_size = sizeof (u32),
+  .format_trace = format_snort_deq_trace,
+  .type = VLIB_NODE_TYPE_INPUT,
+  .state = VLIB_NODE_STATE_DISABLED,
+  .sibling_of = "snort-enq",
+
+  .n_errors = ARRAY_LEN (snort_deq_error_strings),
+  .error_strings = snort_deq_error_strings,
+
+  .n_next_nodes = 0,
+};
diff --git a/src/plugins/snort/enqueue.c b/src/plugins/snort/enqueue.c
new file mode 100644 (file)
index 0000000..3f44e80
--- /dev/null
@@ -0,0 +1,223 @@
+/* SPDX-License-Identifier: Apache-2.0
+ * Copyright(c) 2021 Cisco Systems, Inc.
+ */
+
+#include <vlib/vlib.h>
+#include <vnet/feature/feature.h>
+#include <snort/snort.h>
+
+typedef struct
+{
+  u32 next_index;
+  u32 sw_if_index;
+  u16 instance;
+  u16 qpair;
+  u32 enq_slot;
+  u32 desc_index;
+  daq_vpp_desc_t desc;
+} snort_enq_trace_t;
+
+static u8 *
+format_snort_enq_trace (u8 *s, va_list *args)
+{
+  CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
+  CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
+  snort_enq_trace_t *t = va_arg (*args, snort_enq_trace_t *);
+  u32 indent = format_get_indent (s);
+
+  s = format (s,
+             "sw-if-index %u next-index %u\n"
+             "%Uinstance %u qpair %u desc-index %u slot %u\n"
+             "%Udesc: buffer-pool %u offset %u len %u address-space-id %u\n",
+             t->sw_if_index, t->next_index, format_white_space, indent,
+             t->instance, t->qpair, t->desc_index, t->enq_slot,
+             format_white_space, indent, t->desc.buffer_pool, t->desc.offset,
+             t->desc.length, t->desc.address_space_id);
+
+  return s;
+}
+
+#define foreach_snort_enq_error                                               \
+  _ (SOCKET_ERROR, "write socket error")                                      \
+  _ (NO_INSTANCE, "no snort instance")                                        \
+  _ (NO_ENQ_SLOTS, "no enqueue slots (packet dropped)")
+
+typedef enum
+{
+#define _(sym, str) SNORT_ENQ_ERROR_##sym,
+  foreach_snort_enq_error
+#undef _
+    SNORT_ENQ_N_ERROR,
+} snort_enq_error_t;
+
+static char *snort_enq_error_strings[] = {
+#define _(sym, string) string,
+  foreach_snort_enq_error
+#undef _
+};
+
+static_always_inline uword
+snort_enq_node_inline (vlib_main_t *vm, vlib_node_runtime_t *node,
+                      vlib_frame_t *frame, int with_trace)
+{
+  snort_main_t *sm = &snort_main;
+  snort_instance_t *si = 0;
+  snort_qpair_t *qp = 0;
+  u32 thread_index = vm->thread_index;
+  u32 n_left = frame->n_vectors;
+  u32 n_trace = 0;
+  u32 total_enq = 0, n_processed = 0;
+  u32 *from = vlib_frame_vector_args (frame);
+  vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b = bufs;
+  u16 nexts[VLIB_FRAME_SIZE], *next = nexts;
+
+  vlib_get_buffers (vm, from, bufs, n_left);
+
+  while (n_left)
+    {
+      u32 instance_index, next_index, n;
+      instance_index =
+       *(u32 *) vnet_feature_next_with_data (&next_index, b[0], sizeof (u32));
+      si = vec_elt_at_index (sm->instances, instance_index);
+
+      /* if client isn't connected skip enqueue and take default action */
+      if (PREDICT_FALSE (si->client_index == ~0))
+       {
+         if (si->drop_on_disconnect)
+           next[0] = SNORT_ENQ_NEXT_DROP;
+         else
+           next[0] = next_index;
+         next++;
+         n_processed++;
+       }
+      else
+       {
+         qp = vec_elt_at_index (si->qpairs, thread_index);
+         n = qp->n_pending++;
+         daq_vpp_desc_t *d = qp->pending_descs + n;
+
+         qp->pending_nexts[n] = next_index;
+         qp->pending_buffers[n] = from[0];
+
+         vlib_buffer_chain_linearize (vm, b[0]);
+
+         /* If this pkt is traced, snapshoot the data */
+         if (with_trace && b[0]->flags & VLIB_BUFFER_IS_TRACED)
+           n_trace++;
+
+         /* fill descriptor */
+         d->buffer_pool = b[0]->buffer_pool_index;
+         d->length = b[0]->current_length;
+         d->offset = (u8 *) b[0]->data + b[0]->current_data -
+                     sm->buffer_pool_base_addrs[d->buffer_pool];
+         d->address_space_id = vnet_buffer (b[0])->sw_if_index[VLIB_RX];
+       }
+
+      n_left--;
+      from++;
+      b++;
+    }
+
+  if (n_processed)
+    {
+      vlib_node_increment_counter (vm, snort_enq_node.index,
+                                  SNORT_ENQ_ERROR_NO_INSTANCE, n_processed);
+      vlib_buffer_enqueue_to_next (vm, node, vlib_frame_vector_args (frame),
+                                  nexts, n_processed);
+    }
+
+  vec_foreach (si, sm->instances)
+    {
+      u32 head, freelist_len, n_pending, n_enq, mask;
+      u64 ctr = 1;
+      qp = vec_elt_at_index (si->qpairs, thread_index);
+      mask = pow2_mask (qp->log2_queue_size);
+      n_pending = qp->n_pending;
+      qp->n_pending = 0;
+
+      if (n_pending == 0)
+       continue;
+
+      freelist_len = vec_len (qp->freelist);
+
+      if (freelist_len < n_pending)
+       {
+         n_enq = freelist_len;
+         vlib_buffer_free (vm, qp->pending_buffers + n_enq,
+                           n_pending - n_enq);
+         vlib_node_increment_counter (vm, snort_enq_node.index,
+                                      SNORT_ENQ_ERROR_NO_ENQ_SLOTS,
+                                      n_pending - n_enq);
+       }
+      else
+       n_enq = n_pending;
+
+      if (n_enq == 0)
+       continue;
+
+      total_enq += n_enq;
+      head = *qp->enq_head;
+
+      for (u32 i = 0; i < n_enq; i++)
+       {
+         u32 desc_index = qp->freelist[--freelist_len];
+         qp->next_indices[desc_index] = qp->pending_nexts[i];
+         ASSERT (qp->buffer_indices[desc_index] == ~0);
+         qp->buffer_indices[desc_index] = qp->pending_buffers[i];
+         clib_memcpy_fast (qp->descriptors + desc_index,
+                           qp->pending_descs + i, sizeof (daq_vpp_desc_t));
+         qp->enq_ring[head & mask] = desc_index;
+
+         /* trace */
+         if (with_trace && n_trace)
+           {
+             vlib_buffer_t *tb = vlib_get_buffer (vm, qp->pending_buffers[i]);
+             if (tb->flags & VLIB_BUFFER_IS_TRACED)
+               {
+                 snort_enq_trace_t *t =
+                   vlib_add_trace (vm, node, tb, sizeof (*t));
+                 t->sw_if_index = vnet_buffer (tb)->sw_if_index[VLIB_RX];
+                 t->next_index = qp->pending_nexts[i];
+                 t->instance = si->index;
+                 t->qpair = qp - si->qpairs;
+                 t->enq_slot = head & mask;
+                 t->desc_index = desc_index;
+                 clib_memcpy_fast (&t->desc, qp->pending_descs + i,
+                                   sizeof (daq_vpp_desc_t));
+               }
+           }
+         head = head + 1;
+       }
+
+      __atomic_store_n (qp->enq_head, head, __ATOMIC_RELEASE);
+      _vec_len (qp->freelist) = freelist_len;
+      if (sm->input_mode == VLIB_NODE_STATE_INTERRUPT)
+       {
+         if (write (qp->enq_fd, &ctr, sizeof (ctr)) < 0)
+           vlib_node_increment_counter (vm, snort_enq_node.index,
+                                        SNORT_ENQ_ERROR_SOCKET_ERROR, 1);
+       }
+    }
+
+  return total_enq;
+}
+
+VLIB_NODE_FN (snort_enq_node)
+(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
+{
+  if (PREDICT_FALSE (node->flags & VLIB_NODE_FLAG_TRACE))
+    return snort_enq_node_inline (vm, node, frame, 1 /* is_trace*/);
+  else
+    return snort_enq_node_inline (vm, node, frame, 0 /* is_trace*/);
+}
+
+VLIB_REGISTER_NODE (snort_enq_node) = {
+  .name = "snort-enq",
+  .vector_size = sizeof (u32),
+  .format_trace = format_snort_enq_trace,
+  .type = VLIB_NODE_TYPE_INTERNAL,
+  .n_next_nodes = SNORT_ENQ_N_NEXT_NODES,
+  .next_nodes = SNORT_ENQ_NEXT_NODES,
+  .n_errors = ARRAY_LEN (snort_enq_error_strings),
+  .error_strings = snort_enq_error_strings,
+};
diff --git a/src/plugins/snort/main.c b/src/plugins/snort/main.c
new file mode 100644 (file)
index 0000000..37b5172
--- /dev/null
@@ -0,0 +1,520 @@
+/* SPDX-License-Identifier: Apache-2.0
+ * Copyright(c) 2021 Cisco Systems, Inc.
+ */
+
+#include <vlib/vlib.h>
+#include <vnet/plugin/plugin.h>
+#include <vpp/app/version.h>
+#include <snort/snort.h>
+
+#include <sys/eventfd.h>
+
+snort_main_t snort_main;
+
+VLIB_REGISTER_LOG_CLASS (snort_log, static) = {
+  .class_name = "snort",
+  .default_syslog_level = VLIB_LOG_LEVEL_DEBUG,
+};
+
+#define log_debug(fmt, ...) vlib_log_debug (snort_log.class, fmt, __VA_ARGS__)
+#define log_err(fmt, ...)   vlib_log_err (snort_log.class, fmt, __VA_ARGS__)
+
+static void
+snort_client_disconnect (clib_file_t *uf)
+{
+  vlib_main_t *vm = vlib_get_main ();
+  snort_qpair_t *qp;
+  snort_main_t *sm = &snort_main;
+  snort_client_t *c = pool_elt_at_index (sm->clients, uf->private_data);
+
+  if (c->instance_index != ~0)
+    {
+      snort_per_thread_data_t *ptd =
+       vec_elt_at_index (sm->per_thread_data, vm->thread_index);
+      snort_instance_t *si =
+       pool_elt_at_index (sm->instances, c->instance_index);
+      vec_foreach (qp, si->qpairs)
+       __atomic_store_n (&qp->ready, 1, __ATOMIC_RELEASE);
+
+      si->client_index = ~0;
+      clib_interrupt_set (ptd->interrupts, uf->private_data);
+      vlib_node_set_interrupt_pending (vm, snort_deq_node.index);
+    }
+
+  clib_file_del (&file_main, uf);
+  clib_socket_close (&c->socket);
+  pool_put (sm->clients, c);
+}
+
+static snort_instance_t *
+snort_get_instance_by_name (char *name)
+{
+  snort_main_t *sm = &snort_main;
+  uword *p;
+  if ((p = hash_get_mem (sm->instance_by_name, name)) == 0)
+    return 0;
+
+  return vec_elt_at_index (sm->instances, p[0]);
+  ;
+}
+
+static clib_error_t *
+snort_conn_fd_read_ready (clib_file_t *uf)
+{
+  vlib_main_t *vm = vlib_get_main ();
+  snort_main_t *sm = &snort_main;
+  snort_client_t *c = pool_elt_at_index (sm->clients, uf->private_data);
+  vlib_buffer_pool_t *bp;
+  snort_instance_t *si;
+  snort_qpair_t *qp;
+  snort_client_msg_queue_elt *e;
+  clib_error_t *err;
+  daq_vpp_msg_t msg;
+  char *name;
+  u8 *base;
+
+  log_debug ("fd_read_ready: client %u", uf->private_data);
+
+  if ((err = clib_socket_recvmsg (&c->socket, &msg, sizeof (msg), 0, 0)))
+    {
+      log_err ("client recvmsg error: %U", format_clib_error, err);
+      snort_client_disconnect (uf);
+      clib_error_free (err);
+      return 0;
+    }
+
+  if (msg.type != DAQ_VPP_MSG_TYPE_HELLO)
+    {
+      log_err ("unexpeced message recieved from client", 0);
+      snort_client_disconnect (uf);
+      return 0;
+    }
+
+  msg.hello.inst_name[DAQ_VPP_INST_NAME_LEN - 1] = 0;
+  name = msg.hello.inst_name;
+
+  log_debug ("fd_read_ready: connect instance %s", name);
+
+  if ((si = snort_get_instance_by_name (name)) == 0)
+    {
+      log_err ("unknown instance '%s' requested by client", name);
+      snort_client_disconnect (uf);
+      return 0;
+    }
+
+  vec_foreach (qp, si->qpairs)
+    {
+      u32 ready = __atomic_load_n (&qp->ready, __ATOMIC_ACQUIRE);
+      if (!ready)
+       {
+         log_err ("instance '%s' is not ready to accept connections", name);
+         snort_client_disconnect (uf);
+         return 0;
+       }
+    }
+
+  base = (u8 *) si->shm_base;
+
+  if (si->client_index != ~0)
+    {
+      log_err ("client already connected to instance '%s'", name);
+      snort_client_disconnect (uf);
+      return 0;
+    }
+  si->client_index = uf->private_data;
+  c->instance_index = si->index;
+
+  log_debug ("fd_read_ready: connect instance index %u", si->index);
+
+  clib_fifo_add2 (c->msg_queue, e);
+  e->msg.type = DAQ_VPP_MSG_TYPE_CONFIG;
+  e->msg.config.num_bpools = vec_len (vm->buffer_main->buffer_pools);
+  e->msg.config.num_qpairs = vec_len (si->qpairs);
+  e->msg.config.shm_size = si->shm_size;
+  e->fds[0] = si->shm_fd;
+  e->n_fds = 1;
+
+  vec_foreach (bp, vm->buffer_main->buffer_pools)
+    {
+      vlib_physmem_map_t *pm;
+      pm = vlib_physmem_get_map (vm, bp->physmem_map_index);
+      clib_fifo_add2 (c->msg_queue, e);
+      e->msg.type = DAQ_VPP_MSG_TYPE_BPOOL;
+      e->msg.bpool.size = pm->n_pages << pm->log2_page_size;
+      e->fds[0] = pm->fd;
+      e->n_fds = 1;
+    }
+
+  vec_foreach (qp, si->qpairs)
+    {
+      clib_fifo_add2 (c->msg_queue, e);
+      e->msg.type = DAQ_VPP_MSG_TYPE_QPAIR;
+      e->msg.qpair.log2_queue_size = qp->log2_queue_size;
+      e->msg.qpair.desc_table_offset = (u8 *) qp->descriptors - base;
+      e->msg.qpair.enq_ring_offset = (u8 *) qp->enq_ring - base;
+      e->msg.qpair.deq_ring_offset = (u8 *) qp->deq_ring - base;
+      e->msg.qpair.enq_head_offset = (u8 *) qp->enq_head - base;
+      e->msg.qpair.deq_head_offset = (u8 *) qp->deq_head - base;
+      e->fds[0] = qp->enq_fd;
+      e->fds[1] = qp->deq_fd;
+      e->n_fds = 2;
+    }
+
+  clib_file_set_data_available_to_write (&file_main, c->file_index, 1);
+  return 0;
+}
+
+static clib_error_t *
+snort_conn_fd_write_ready (clib_file_t *uf)
+{
+  snort_main_t *sm = &snort_main;
+  snort_client_t *c = pool_elt_at_index (sm->clients, uf->private_data);
+  snort_client_msg_queue_elt *e;
+
+  log_debug ("fd_write_ready: client %u", uf->private_data);
+  clib_fifo_sub2 (c->msg_queue, e);
+
+  if (clib_fifo_elts (c->msg_queue) == 0)
+    clib_file_set_data_available_to_write (&file_main, c->file_index, 0);
+
+  return clib_socket_sendmsg (&c->socket, &e->msg, sizeof (*e), e->fds,
+                             e->n_fds);
+}
+
+clib_error_t *
+snort_conn_fd_error (clib_file_t *uf)
+{
+  log_debug ("fd_error: client %u", uf->private_data);
+  return 0;
+}
+
+static clib_error_t *
+snort_deq_ready (clib_file_t *uf)
+{
+  vlib_main_t *vm = vlib_get_main ();
+  snort_main_t *sm = &snort_main;
+  snort_per_thread_data_t *ptd =
+    vec_elt_at_index (sm->per_thread_data, vm->thread_index);
+  u64 counter;
+
+  if (read (uf->file_descriptor, &counter, sizeof (counter)) < 0)
+    return clib_error_return (0, "client closed socket");
+
+  clib_interrupt_set (ptd->interrupts, uf->private_data);
+  vlib_node_set_interrupt_pending (vm, snort_deq_node.index);
+  return 0;
+}
+
+static clib_error_t *
+snort_conn_fd_accept_ready (clib_file_t *uf)
+{
+  snort_main_t *sm = &snort_main;
+  snort_client_t *c;
+  clib_socket_t *s;
+  clib_error_t *err = 0;
+  clib_file_t t = { 0 };
+
+  pool_get_zero (sm->clients, c);
+  c->instance_index = ~0;
+  s = &c->socket;
+
+  if ((err = clib_socket_accept (sm->listener, s)))
+    {
+      log_err ("%U", format_clib_error, err);
+      pool_put (sm->clients, c);
+      return err;
+    }
+
+  t.read_function = snort_conn_fd_read_ready;
+  t.write_function = snort_conn_fd_write_ready;
+  t.error_function = snort_conn_fd_error;
+  t.file_descriptor = s->fd;
+  t.private_data = c - sm->clients;
+  t.description = format (0, "snort client");
+  c->file_index = clib_file_add (&file_main, &t);
+
+  log_debug ("snort_conn_fd_accept_ready: client %u", t.private_data);
+  return 0;
+}
+
+static clib_error_t *
+snort_listener_init (vlib_main_t *vm)
+{
+  snort_main_t *sm = &snort_main;
+  clib_error_t *err;
+  clib_file_t t = { 0 };
+  clib_socket_t *s;
+
+  if (sm->listener)
+    return 0;
+
+  s = clib_mem_alloc (sizeof (clib_socket_t));
+  clib_memset (s, 0, sizeof (clib_socket_t));
+  s->config = (char *) sm->socket_name;
+  s->flags = CLIB_SOCKET_F_IS_SERVER | CLIB_SOCKET_F_ALLOW_GROUP_WRITE |
+            CLIB_SOCKET_F_SEQPACKET | CLIB_SOCKET_F_PASSCRED;
+
+  if ((err = clib_socket_init (s)))
+    {
+      clib_mem_free (s);
+      return err;
+    }
+
+  t.read_function = snort_conn_fd_accept_ready;
+  t.file_descriptor = s->fd;
+  t.description = format (0, "snort listener %s", s->config);
+  log_debug ("%v", t.description);
+  clib_file_add (&file_main, &t);
+
+  sm->listener = s;
+
+  return 0;
+}
+
+clib_error_t *
+snort_instance_create (vlib_main_t *vm, char *name, u8 log2_queue_sz,
+                      u8 drop_on_disconnect)
+{
+  vlib_thread_main_t *tm = vlib_get_thread_main ();
+  snort_main_t *sm = &snort_main;
+  snort_instance_t *si;
+  clib_error_t *err = 0;
+  u32 index, i;
+  u8 *base = CLIB_MEM_VM_MAP_FAILED;
+  u32 size;
+  int fd = -1;
+  u32 qpair_mem_sz = 0;
+  u32 qsz = 1 << log2_queue_sz;
+  u8 align = CLIB_CACHE_LINE_BYTES;
+
+  if (snort_get_instance_by_name (name))
+    return clib_error_return (0, "instance already exists");
+
+  /* descriptor table */
+  qpair_mem_sz += round_pow2 (qsz * sizeof (daq_vpp_desc_t), align);
+
+  /* enq and deq ring */
+  qpair_mem_sz += 2 * round_pow2 (qsz * sizeof (u32), align);
+
+  /* enq and deq head pointer */
+  qpair_mem_sz += 2 * round_pow2 (sizeof (u32), align);
+
+  size =
+    round_pow2 (tm->n_vlib_mains * qpair_mem_sz, clib_mem_get_page_size ());
+  fd = clib_mem_vm_create_fd (CLIB_MEM_PAGE_SZ_DEFAULT, "snort instance %s",
+                             name);
+
+  if (fd == -1)
+    {
+      err = clib_error_return (0, "memory fd failure: %U", format_clib_error,
+                              clib_mem_get_last_error ());
+      goto done;
+    }
+
+  if ((ftruncate (fd, size)) == -1)
+    {
+      err = clib_error_return (0, "ftruncate failure");
+      goto done;
+    }
+
+  base = clib_mem_vm_map_shared (0, size, fd, 0, "snort instance %s", name);
+
+  if (base == CLIB_MEM_VM_MAP_FAILED)
+    {
+      err = clib_error_return (0, "mmap failure");
+      goto done;
+    }
+
+  pool_get_zero (sm->instances, si);
+  si->index = si - sm->instances;
+  si->client_index = ~0;
+  si->shm_base = base;
+  si->shm_fd = fd;
+  si->shm_size = size;
+  si->name = format (0, "%s%c", name, 0);
+  si->drop_on_disconnect = drop_on_disconnect;
+  index = si - sm->instances;
+  hash_set_mem (sm->instance_by_name, si->name, index);
+
+  log_debug ("instnce '%s' createed with fd %d at %p, len %u", name, fd, base,
+            size);
+
+  vec_validate_aligned (sm->per_thread_data, tm->n_vlib_mains - 1,
+                       CLIB_CACHE_LINE_BYTES);
+  vec_validate_aligned (si->qpairs, tm->n_vlib_mains - 1,
+                       CLIB_CACHE_LINE_BYTES);
+
+  for (int i = 0; i < tm->n_vlib_mains; i++)
+    {
+      snort_qpair_t *qp = vec_elt_at_index (si->qpairs, i);
+      snort_per_thread_data_t *ptd = vec_elt_at_index (sm->per_thread_data, i);
+      clib_file_t t = { 0 };
+
+      qp->log2_queue_size = log2_queue_sz;
+      qp->descriptors = (void *) base;
+      base += round_pow2 (qsz * sizeof (daq_vpp_desc_t), align);
+      qp->enq_ring = (void *) base;
+      base += round_pow2 (qsz * sizeof (u32), align);
+      qp->deq_ring = (void *) base;
+      base += round_pow2 (qsz * sizeof (u32), align);
+      qp->enq_head = (void *) base;
+      base += round_pow2 (sizeof (u32), align);
+      qp->deq_head = (void *) base;
+      base += round_pow2 (sizeof (u32), align);
+      qp->enq_fd = eventfd (0, EFD_NONBLOCK);
+      qp->deq_fd = eventfd (0, EFD_NONBLOCK);
+      vec_validate_aligned (qp->buffer_indices, qsz - 1,
+                           CLIB_CACHE_LINE_BYTES);
+      vec_validate_aligned (qp->next_indices, qsz - 1, CLIB_CACHE_LINE_BYTES);
+      clib_memset_u32 (qp->buffer_indices, ~0, qsz);
+
+      /* pre-populate freelist */
+      vec_validate_aligned (qp->freelist, qsz - 1, CLIB_CACHE_LINE_BYTES);
+      snort_freelist_init (qp->freelist);
+
+      /* listen on dequeue events */
+      t.read_function = snort_deq_ready;
+      t.file_descriptor = qp->deq_fd;
+      t.private_data = si->index;
+      t.description =
+       format (0, "snort dequeue for instance '%s' qpair %u", si->name, i);
+      qp->deq_fd_file_index = clib_file_add (&file_main, &t);
+      qp->ready = 1;
+      clib_file_set_polling_thread (&file_main, qp->deq_fd_file_index, i);
+      clib_interrupt_resize (&ptd->interrupts, vec_len (sm->instances));
+    }
+
+  for (i = 0; i < vlib_get_n_threads (); i++)
+    vlib_node_set_state (vlib_get_main_by_index (i), snort_deq_node.index,
+                        VLIB_NODE_STATE_INTERRUPT);
+
+done:
+  if (err)
+    {
+      if (base != CLIB_MEM_VM_MAP_FAILED)
+       clib_mem_vm_unmap (base);
+      if (fd != -1)
+       close (fd);
+    }
+  return err;
+}
+
+clib_error_t *
+snort_interface_enable_disable (vlib_main_t *vm, char *instance_name,
+                               u32 sw_if_index, int is_enable)
+{
+  snort_main_t *sm = &snort_main;
+  vnet_main_t *vnm = vnet_get_main ();
+  snort_instance_t *si;
+  clib_error_t *err = 0;
+  u32 index;
+
+  if (is_enable)
+    {
+      if ((si = snort_get_instance_by_name (instance_name)) == 0)
+       {
+         err = clib_error_return (0, "unknown instance '%s'", instance_name);
+         goto done;
+       }
+
+      vec_validate_init_empty (sm->instance_by_sw_if_index, sw_if_index, ~0);
+
+      index = sm->instance_by_sw_if_index[sw_if_index];
+      if (index != ~0)
+       {
+         si = vec_elt_at_index (sm->instances, index);
+         err = clib_error_return (0,
+                                  "interface %U already assgined to "
+                                  "instance '%s'",
+                                  format_vnet_sw_if_index_name, vnm,
+                                  sw_if_index, si->name);
+         goto done;
+       }
+
+      index = sm->instance_by_sw_if_index[sw_if_index] = si->index;
+      vnet_feature_enable_disable ("ip4-unicast", "snort-enq", sw_if_index, 1,
+                                  &index, sizeof (index));
+    }
+  else
+    {
+      if (sw_if_index >= vec_len (sm->instance_by_sw_if_index) ||
+         sm->instance_by_sw_if_index[sw_if_index] == ~0)
+       {
+         err =
+           clib_error_return (0,
+                              "interface %U is not assigned to snort "
+                              "instance!",
+                              format_vnet_sw_if_index_name, vnm, sw_if_index);
+         goto done;
+       }
+      index = sm->instance_by_sw_if_index[sw_if_index];
+      si = vec_elt_at_index (sm->instances, index);
+
+      sm->instance_by_sw_if_index[sw_if_index] = ~0;
+      vnet_feature_enable_disable ("ip4-unicast", "snort-enq", sw_if_index, 0,
+                                  &index, sizeof (index));
+    }
+
+done:
+  if (err)
+    log_err ("%U", format_clib_error, err);
+  return 0;
+}
+
+clib_error_t *
+snort_set_node_mode (vlib_main_t *vm, u32 mode)
+{
+  int i;
+  snort_main.input_mode = mode;
+  for (i = 0; i < vlib_get_n_threads (); i++)
+    vlib_node_set_state (vlib_get_main_by_index (i), snort_deq_node.index,
+                        mode);
+  return 0;
+}
+
+static void
+snort_set_default_socket (snort_main_t *sm, u8 *socket_name)
+{
+  if (sm->socket_name)
+    return;
+
+  if (!socket_name)
+    socket_name = (u8 *) DAQ_VPP_DEFAULT_SOCKET_FILE;
+
+  sm->socket_name =
+    format (0, "%s/%s", vlib_unix_get_runtime_dir (), socket_name);
+  vec_terminate_c_string (sm->socket_name);
+}
+
+static clib_error_t *
+snort_init (vlib_main_t *vm)
+{
+  snort_main_t *sm = &snort_main;
+  sm->instance_by_name = hash_create_string (0, sizeof (uword));
+  vlib_buffer_pool_t *bp;
+
+  vec_foreach (bp, vm->buffer_main->buffer_pools)
+    {
+      vlib_physmem_map_t *pm =
+       vlib_physmem_get_map (vm, bp->physmem_map_index);
+      vec_add1 (sm->buffer_pool_base_addrs, pm->base);
+    }
+
+  if (!sm->socket_name)
+    snort_set_default_socket (sm, 0);
+
+  return snort_listener_init (vm);
+}
+
+VLIB_INIT_FUNCTION (snort_init);
+
+VLIB_PLUGIN_REGISTER () = {
+  .version = VPP_BUILD_VER,
+  .description = "Snort",
+};
+
+VNET_FEATURE_INIT (snort_enq, static) = {
+  .arc_name = "ip4-unicast",
+  .node_name = "snort-enq",
+  .runs_before = VNET_FEATURES ("ip4-lookup"),
+};
diff --git a/src/plugins/snort/snort.h b/src/plugins/snort/snort.h
new file mode 100644 (file)
index 0000000..d069fa0
--- /dev/null
@@ -0,0 +1,113 @@
+/* SPDX-License-Identifier: Apache-2.0
+ * Copyright(c) 2021 Cisco Systems, Inc.
+ */
+
+#ifndef __snort_snort_h__
+#define __snort_snort_h__
+
+#include <vppinfra/error.h>
+#include <vppinfra/socket.h>
+#include <vlib/vlib.h>
+#include <snort/daq_vpp.h>
+
+typedef struct
+{
+  CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
+  u8 log2_queue_size;
+  daq_vpp_desc_t *descriptors;
+  volatile u32 *enq_head;
+  volatile u32 *deq_head;
+  volatile u32 *enq_ring;
+  volatile u32 *deq_ring;
+  u32 next_desc;
+  int enq_fd, deq_fd;
+  u32 deq_fd_file_index;
+  u32 *buffer_indices;
+  u16 *next_indices;
+  u32 *freelist;
+  u32 ready;
+
+  /* temporary storeage used by enqueue node */
+  u32 n_pending;
+  u16 pending_nexts[VLIB_FRAME_SIZE];
+  u32 pending_buffers[VLIB_FRAME_SIZE];
+  daq_vpp_desc_t pending_descs[VLIB_FRAME_SIZE];
+} snort_qpair_t;
+
+typedef struct
+{
+  u32 index;
+  u32 client_index;
+  void *shm_base;
+  u32 shm_size;
+  int shm_fd;
+  snort_qpair_t *qpairs;
+  u8 *name;
+  u8 drop_on_disconnect;
+} snort_instance_t;
+
+typedef struct
+{
+  daq_vpp_msg_t msg;
+  int fds[2];
+  int n_fds;
+} snort_client_msg_queue_elt;
+
+typedef struct
+{
+  clib_socket_t socket;
+  u32 instance_index;
+  u32 file_index;
+  snort_client_msg_queue_elt *msg_queue;
+} snort_client_t;
+
+typedef struct
+{
+  /* per-instance dequeue interrupts */
+  void *interrupts;
+} snort_per_thread_data_t;
+
+typedef struct
+{
+  clib_socket_t *listener;
+  snort_client_t *clients;
+  snort_instance_t *instances;
+  uword *instance_by_name;
+  u32 *instance_by_sw_if_index;
+  u8 **buffer_pool_base_addrs;
+  snort_per_thread_data_t *per_thread_data;
+  u32 input_mode;
+  u8 *socket_name;
+} snort_main_t;
+
+extern snort_main_t snort_main;
+extern vlib_node_registration_t snort_enq_node;
+extern vlib_node_registration_t snort_deq_node;
+
+typedef enum
+{
+  SNORT_ENQ_NEXT_DROP,
+  SNORT_ENQ_N_NEXT_NODES,
+} snort_enq_next_t;
+
+#define SNORT_ENQ_NEXT_NODES                                                  \
+  {                                                                           \
+    [SNORT_ENQ_NEXT_DROP] = "error-drop",                                     \
+  }
+
+/* functions */
+clib_error_t *snort_instance_create (vlib_main_t *vm, char *name,
+                                    u8 log2_queue_sz, u8 drop_on_disconnect);
+clib_error_t *snort_interface_enable_disable (vlib_main_t *vm,
+                                             char *instance_name,
+                                             u32 sw_if_index, int is_enable);
+clib_error_t *snort_set_node_mode (vlib_main_t *vm, u32 mode);
+
+always_inline void
+snort_freelist_init (u32 *fl)
+{
+  for (int j = 0; j < vec_len (fl); j++)
+    fl[j] = j;
+}
+
+#endif /* __snort_snort_h__ */