snort: add support for packet injection 64/43764/5
authorMohsin Kazmi <[email protected]>
Mon, 6 Oct 2025 13:52:30 +0000 (13:52 +0000)
committerDamjan Marion <[email protected]>
Fri, 17 Oct 2025 11:26:23 +0000 (11:26 +0000)
Type: feature

Signed-off-by: Mohsin Kazmi <[email protected]>
Change-Id: Iead600732d7f613d478ebff8c059f8ac9e74344c

12 files changed:
src/plugins/snort/cli.c
src/plugins/snort/daq/daq_vpp.h
src/plugins/snort/daq/dump.c
src/plugins/snort/daq/main.c
src/plugins/snort/daq/socket.c
src/plugins/snort/daq_vpp_shared.h
src/plugins/snort/dequeue.c
src/plugins/snort/export.h
src/plugins/snort/format.c
src/plugins/snort/main.c
src/plugins/snort/snort.h
src/plugins/snort/socket.c

index ed5b98b..5ccc57c 100644 (file)
@@ -51,6 +51,7 @@ snort_create_instance_command_fn (vlib_main_t *vm, unformat_input_t *input,
   clib_error_t *err = 0;
   u8 *name = 0;
   u32 queue_size = 1024;
+  u32 empty_buf_queue_size = 64;
   u32 qpairs_per_thread = 1;
   u8 drop_on_disconnect = 1;
   int rv = 0;
@@ -61,6 +62,9 @@ snort_create_instance_command_fn (vlib_main_t *vm, unformat_input_t *input,
        ;
       else if (unformat (input, "queues-per-thread %u", &qpairs_per_thread))
        ;
+      else if (unformat (input, "empty-buf-queue-size %u",
+                        &empty_buf_queue_size))
+       ;
       else if (unformat (input, "on-disconnect drop"))
        drop_on_disconnect = 1;
       else if (unformat (input, "on-disconnect pass"))
@@ -87,15 +91,17 @@ snort_create_instance_command_fn (vlib_main_t *vm, unformat_input_t *input,
       goto done;
     }
 
-  rv = snort_instance_create (vm,
-                             &(snort_instance_create_args_t){
-                               .log2_queue_sz = min_log2 (queue_size),
-                               .drop_on_disconnect = drop_on_disconnect,
-                               .drop_bitmap = 1 << DAQ_VPP_VERDICT_BLOCK |
-                                              1 << DAQ_VPP_VERDICT_BLACKLIST,
-                               .qpairs_per_thread = qpairs_per_thread,
-                             },
-                             "%s", name);
+  rv = snort_instance_create (
+    vm,
+    &(snort_instance_create_args_t){
+      .log2_queue_sz = min_log2 (queue_size),
+      .log2_empty_buf_queue_sz = min_log2 (empty_buf_queue_size),
+      .drop_on_disconnect = drop_on_disconnect,
+      .drop_bitmap =
+       1 << DAQ_VPP_VERDICT_BLOCK | 1 << DAQ_VPP_VERDICT_BLACKLIST,
+      .qpairs_per_thread = qpairs_per_thread,
+    },
+    "%s", name);
 
   switch (rv)
     {
@@ -127,7 +133,8 @@ done:
 VLIB_CLI_COMMAND (snort_create_instance_command, static) = {
   .path = "snort create-instance",
   .short_help = "snort create-instance name <name> [queue-size <size>] "
-               "[queues-per-thread <n>] [on-disconnect drop|pass]",
+               "[queues-per-thread <n>] [empty-buf-queue-size <size>] "
+               "[on-disconnect drop|pass]",
   .function = snort_create_instance_command_fn,
 };
 
@@ -369,6 +376,13 @@ snort_show_instances_command_fn (vlib_main_t *vm, unformat_input_t *input,
            (u8 *) qp->deq_ring - (u8 *) si->shm_base, qp->deq_fd,
            qp->hdr->deq.head, qp->deq_tail);
 
+         vlib_cli_output (vm,
+                          "    empty-buf-queue: ring_offset %u ring_size %u "
+                          "head %lu tail %lu",
+                          (u8 *) qp->empty_buf_ring - (u8 *) si->shm_base,
+                          1 << qp->log2_empty_buf_queue_size,
+                          qp->hdr->deq.empty_buf_head, qp->empty_buf_tail);
+
          for (u32 i = 0; i < DAQ_VPP_MAX_DAQ_VERDICT; i++)
            if (qp->n_packets_by_verdict[i])
              {
@@ -387,16 +401,18 @@ snort_show_instances_command_fn (vlib_main_t *vm, unformat_input_t *input,
          if (verbose)
            {
              vlib_cli_output (vm, "   desc   buffer_index   next_index "
-                                  "  freelist_next\n");
+                                  "  freelist_next                   desc\n");
              vlib_cli_output (
-               vm, "  ====== ============== ============ ===============\n");
+               vm, "  ====== ============== ============ "
+                   "=============== ====================================\n");
              u32 total_desc = 1 << qp->log2_queue_size;
              for (u32 i = 0; i < total_desc; i++)
                {
                  snort_qpair_entry_t *qpe = qp->entries + i;
-                 vlib_cli_output (vm, "  %-6d  %-12u   %-12u  %-12u", i,
+                 daq_vpp_desc_t *d = qp->hdr->descs + i;
+                 vlib_cli_output (vm, "  %-6d  %-12u   %-12u  %-14u %U", i,
                                   qpe->buffer_index, qpe->next_index,
-                                  qpe->freelist_next);
+                                  qpe->freelist_next, format_snort_desc, d);
                }
            }
        }
index a4692d9..09698b8 100644 (file)
@@ -44,6 +44,7 @@ char *daq_vpp_dump_pkt_hdr (const DAQ_PktHdr_t *hdr);
 void daq_vpp_dump_msg_type (DAQ_MsgType type);
 void daq_vpp_dump_msg (DAQ_Msg_h msg);
 char *daq_vpp_dump_packet_data (const uint8_t *data, uint32_t len);
+const char *daq_vpp_inject_direction (int reverse);
 
 #define DEBUG2(fmt, ...)                                                      \
   if (daq_vpp_main.debug_msg)                                                 \
@@ -106,8 +107,10 @@ typedef struct _vpp_qpair
   daq_vpp_qpair_header_t *hdr;
   daq_vpp_desc_index_t *enq_ring;
   daq_vpp_desc_index_t *deq_ring;
+  daq_vpp_empty_buf_desc_t *empty_buf_ring;
   daq_vpp_head_tail_t tail;
   uint16_t queue_size;
+  uint16_t empty_buf_queue_size;
   int enq_fd;
   int deq_fd;
   daq_vpp_input_index_t input_index;
index 73d25a6..870effb 100644 (file)
@@ -357,6 +357,22 @@ daq_vpp_dump_pkt_hdr (const DAQ_PktHdr_t *hdr)
   return buf;
 }
 
+const char *
+daq_vpp_inject_direction (int reverse)
+{
+  switch (reverse)
+    {
+    case (DAQ_DIR_FORWARD):
+      return "Forward Injection";
+    case (DAQ_DIR_REVERSE):
+      return "Reverse Injection";
+    case (DAQ_DIR_BOTH):
+      return "Forward & Reverse Injection";
+    default:
+      return "Unknown";
+    }
+}
+
 static inline const char *
 daq_vpp_msg_type_to_str (DAQ_MsgType type)
 {
index ff38ce7..c00d6a6 100644 (file)
@@ -432,6 +432,85 @@ daq_vpp_interrupt (void *handle)
   return DAQ_SUCCESS;
 }
 
+static int
+daq_vpp_inject (void *handle, DAQ_MsgType type, const void *hdr,
+               const uint8_t *data, uint32_t data_len)
+{
+  daq_vpp_ctx_t __unused *ctx = (daq_vpp_ctx_t *) handle;
+  daq_vpp_main_t __unused *vdm = &daq_vpp_main;
+  DAQ_PktHdr_t *pkthdr = (DAQ_PktHdr_t *) hdr;
+
+  DEBUG_DUMP_MSG2 (pkthdr, type, data, data_len);
+
+  return DAQ_ERROR_NOTSUP;
+}
+
+static int
+daq_vpp_inject_relative (void *handle, DAQ_Msg_h msg, const uint8_t *data,
+                        uint32_t data_len, int reverse)
+{
+  daq_vpp_ctx_t *ctx = (daq_vpp_ctx_t *) handle;
+  daq_vpp_main_t *vdm = &daq_vpp_main;
+  daq_vpp_msg_pool_entry_t *pe = msg->priv;
+  daq_vpp_qpair_t *qp = pe->qpair;
+  daq_vpp_head_tail_t head, tail, mask = qp->empty_buf_queue_size - 1;
+  daq_vpp_qpair_header_t *h = qp->hdr;
+  uint8_t *buf_data;
+  daq_vpp_empty_buf_desc_t *empty_buf_desc;
+
+  DEBUG_DUMP_MSG (msg);
+  DEBUG2 ("%s", daq_vpp_inject_direction (reverse));
+  DEBUG_DUMP_DATA_HEX (data, data_len);
+
+  /*
+   * check the injection direction against the packet direction
+   * if direction is not the supported one, return error
+   */
+  switch (reverse)
+    {
+    case DAQ_DIR_FORWARD:
+      break;
+    case DAQ_DIR_REVERSE:
+      break;
+    case DAQ_DIR_BOTH:
+      break;
+    default:
+      return daq_vpp_err (ctx, "invalid direction %d", reverse);
+    }
+
+  head = __atomic_load_n (&qp->hdr->deq.empty_buf_head, __ATOMIC_ACQUIRE);
+  tail = __atomic_load_n (&qp->hdr->deq.empty_buf_tail, __ATOMIC_RELAXED);
+
+  if (head == tail)
+    {
+      DEBUG2 ("no empty buffer available to inject packet");
+      return daq_vpp_err (ctx, "no empty buffer available to inject packet");
+    }
+  empty_buf_desc = &qp->empty_buf_ring[tail & mask];
+  buf_data =
+    vdm->bpools[empty_buf_desc->buffer_pool].base + empty_buf_desc->offset;
+
+  if (empty_buf_desc->length < data_len)
+    return daq_vpp_err (ctx, "descriptor %u buffer too small (%u < %u)",
+                       tail & mask, empty_buf_desc->length, data_len);
+
+  memcpy (buf_data, data, data_len);
+  empty_buf_desc->length = data_len;
+  empty_buf_desc->ref_buffer_desc_index = pe->index;
+  empty_buf_desc->direction = reverse;
+
+  tail = tail + 1;
+  __atomic_store_n (&h->deq.empty_buf_tail, tail, __ATOMIC_RELEASE);
+
+  if (!__atomic_exchange_n (&qp->hdr->deq.interrupt_pending, 1,
+                           __ATOMIC_RELAXED))
+    {
+      ssize_t __unused rv;
+      rv = write (qp->deq_fd, &(uint64_t){ 1 }, sizeof (uint64_t));
+    }
+  return DAQ_SUCCESS;
+}
+
 const DAQ_Msg_t *
 daq_vpp_fill_msg (daq_vpp_ctx_t *ctx, daq_vpp_qpair_t *qp, uint32_t desc_index,
                  struct timeval tv)
@@ -659,8 +738,17 @@ daq_vpp_msg_finalize (void *handle, const DAQ_Msg_t *msg, DAQ_Verdict verdict)
 
   DEBUG_DUMP_MSG (msg);
 
+  if (ctx->msg_pool_info.available == ctx->msg_pool_info.size)
+    {
+      DEBUG2 ("all messages are already finalized");
+      return DAQ_SUCCESS;
+    }
+
   if (verdict >= MAX_DAQ_VERDICT)
-    verdict = DAQ_VERDICT_PASS;
+    {
+      DEBUG2 ("verdict %d out of range, setting to PASS", verdict);
+      verdict = DAQ_VERDICT_PASS;
+    }
   ctx->stats.verdicts[verdict]++;
 
   mask = qp->queue_size - 1;
@@ -716,8 +804,12 @@ daq_vpp_reset_stats (void *handle)
 static uint32_t
 daq_vpp_get_capabilities (void __unused *handle)
 {
-  return DAQ_CAPA_BLOCK | DAQ_CAPA_UNPRIV_START | DAQ_CAPA_INTERRUPT |
-        DAQ_CAPA_DEVICE_INDEX;
+  return DAQ_CAPA_BLOCK |       /* can block packets */
+        DAQ_CAPA_INJECT |       /* can inject packets */
+        DAQ_CAPA_UNPRIV_START | /* can start without root privileges */
+        DAQ_CAPA_INTERRUPT |    /* can be interrupted */
+        DAQ_CAPA_DEVICE_INDEX; /* can consistently fill the device index field
+                                  in DAQ_PktHdr */
 }
 
 static int
@@ -726,7 +818,7 @@ daq_vpp_get_datalink_type (void __unused *handle)
   return DLT_IPV4;
 }
 
-static char *
+static const char *
 daq_vpp_ioctl_cmd_to_str (DAQ_IoctlCmd cmd)
 {
 #define IOCTL_CMD_STR(cmd)                                                    \
@@ -764,48 +856,59 @@ daq_vpp_ioctl (void *handle, DAQ_IoctlCmd cmd, void *arg, size_t arglen)
   daq_vpp_ctx_t *ctx = (daq_vpp_ctx_t *) handle;
   DIOCTL_QueryDeviceIndex *qdi = (DIOCTL_QueryDeviceIndex *) arg;
 
-  DEBUG ("ioctl cmd %s", daq_vpp_ioctl_cmd_to_str (cmd));
+  DEBUG2 ("ioctl cmd %s", daq_vpp_ioctl_cmd_to_str (cmd));
 
-  if (cmd == DIOCTL_GET_DEVICE_INDEX)
+  switch (cmd)
     {
-      char name[DAQ_VPP_MAX_INST_NAME_LEN], *colon;
-
-      if (arglen != sizeof (DIOCTL_QueryDeviceIndex))
-       return DAQ_ERROR_NOTSUP;
+    case DIOCTL_GET_DEVICE_INDEX:
+      {
+       char name[DAQ_VPP_MAX_INST_NAME_LEN], *colon;
 
-      if (qdi->device == 0)
-       {
-         daq_vpp_err (ctx, "no device name in IOCTL_GET_DEVICE_INDEX");
-         return DAQ_ERROR_INVAL;
-       }
-      snprintf (name, sizeof (name), "%s", qdi->device);
-      colon = strchr (name, ':');
-      if (colon)
-       colon[0] = 0;
+       if (arglen != sizeof (DIOCTL_QueryDeviceIndex))
+         return DAQ_ERROR_NOTSUP;
 
-      for (daq_vpp_input_index_t ii = 0; ii < vdm->n_inputs; ii++)
-       if (strcmp (name, vdm->inputs[ii]->name) == 0)
+       if (qdi->device == 0)
          {
-           qdi->index = ii + 1;
-           return DAQ_SUCCESS;
+           daq_vpp_err (ctx, "no device name in IOCTL_GET_DEVICE_INDEX");
+           return DAQ_ERROR_INVAL;
          }
+       snprintf (name, sizeof (name), "%s", qdi->device);
+       colon = strchr (name, ':');
+       if (colon)
+         colon[0] = 0;
 
-      return DAQ_ERROR_NODEV;
-    }
-  else if (cmd == DIOCTL_GET_PRIV_DATA_LEN)
-    {
-      DIOCTL_GetPrivDataLen *gpl = (DIOCTL_GetPrivDataLen *) arg;
+       for (daq_vpp_input_index_t ii = 0; ii < vdm->n_inputs; ii++)
+         if (strcmp (name, vdm->inputs[ii]->name) == 0)
+           {
+             qdi->index = ii + 1;
+             return DAQ_SUCCESS;
+           }
 
-      if (arglen != sizeof (DIOCTL_GetPrivDataLen))
-       return DAQ_ERROR_NOTSUP;
-      if (gpl->msg->priv != NULL)
-       gpl->priv_data_len = sizeof (daq_vpp_msg_pool_entry_t);
-      else
-       gpl->priv_data_len = 0;
+       return DAQ_ERROR_NODEV;
+      }
+    case DIOCTL_GET_PRIV_DATA_LEN:
+      {
+       DIOCTL_GetPrivDataLen *gpl = (DIOCTL_GetPrivDataLen *) arg;
+
+       if (arglen != sizeof (DIOCTL_GetPrivDataLen))
+         return DAQ_ERROR_NOTSUP;
+       if (gpl->msg->priv != NULL)
+         gpl->priv_data_len = sizeof (daq_vpp_msg_pool_entry_t);
+       else
+         gpl->priv_data_len = 0;
+
+       DEBUG2 ("ioctl cmd %s %u", daq_vpp_ioctl_cmd_to_str (cmd),
+               gpl->priv_data_len);
+       return DAQ_SUCCESS;
+      }
+    case DIOCTL_DIRECT_INJECT_PAYLOAD:
+    case DIOCTL_DIRECT_INJECT_RESET:
+    case DIOCTL_SET_INJECT_DROP:
+      DEBUG2 ("%s is a no-op", daq_vpp_ioctl_cmd_to_str (cmd));
 
-      DEBUG2 ("ioctl cmd %s %u", daq_vpp_ioctl_cmd_to_str (cmd),
-             gpl->priv_data_len);
-      return DAQ_SUCCESS;
+    default:
+      /* not supported yet */
+      return DAQ_ERROR_NOTSUP;
     }
 
   return DAQ_ERROR_NOTSUP;
@@ -825,6 +928,8 @@ const DAQ_ModuleAPI_t DAQ_MODULE_DATA = {
   .instantiate = daq_vpp_instantiate,
   .destroy = daq_vpp_destroy,
   .start = daq_vpp_start,
+  .inject = daq_vpp_inject,
+  .inject_relative = daq_vpp_inject_relative,
   .interrupt = daq_vpp_interrupt,
   .ioctl = daq_vpp_ioctl,
   .get_stats = daq_vpp_get_stats,
index 09e9b8d..b1203f4 100644 (file)
@@ -156,13 +156,16 @@ daq_vpp_recvmsg_data_string (daq_vpp_msg_reply_t *reply, ssize_t sz)
                     ", attach_qpair: { qpair_id: { thread_id: %u, "
                     "queue_id: %u }, log2_queue_size: %u, "
                     "qpair_header_offset: %u, enq_ring_offset: %u, "
-                    "deq_ring_offset: %u }",
+                    "deq_ring_offset: %u, log2_empty_buf_queue_size: %u "
+                    "empty_buf_ring_offset: %u }",
                     reply->attach_qpair.qpair_id.thread_id,
                     reply->attach_qpair.qpair_id.queue_id,
                     reply->attach_qpair.log2_queue_size,
                     reply->attach_qpair.qpair_header_offset,
                     reply->attach_qpair.enq_ring_offset,
-                    reply->attach_qpair.deq_ring_offset);
+                    reply->attach_qpair.deq_ring_offset,
+                    reply->attach_qpair.log2_empty_buf_queue_size,
+                    reply->attach_qpair.empty_buf_ring_offset);
       break;
     default:
       n += snprintf (buf + n, sizeof (buf) - n, ", unknown");
@@ -440,9 +443,12 @@ daq_vpp_find_or_add_input (daq_vpp_ctx_t *ctx, char *name,
 
       qp->qpair_id = g->qpair_id;
       qp->queue_size = 1 << g->log2_queue_size;
+      qp->empty_buf_queue_size = 1 << g->log2_empty_buf_queue_size;
       qp->hdr = (daq_vpp_qpair_header_t *) (base + g->qpair_header_offset);
       qp->enq_ring = (daq_vpp_desc_index_t *) (base + g->enq_ring_offset);
       qp->deq_ring = (daq_vpp_desc_index_t *) (base + g->deq_ring_offset);
+      qp->empty_buf_ring =
+       (daq_vpp_empty_buf_desc_t *) (base + g->empty_buf_ring_offset);
       qp->enq_fd = fds[0];
       qp->deq_fd = fds[1];
       qp->input_index = ii;
@@ -453,9 +459,11 @@ daq_vpp_find_or_add_input (daq_vpp_ctx_t *ctx, char *name,
                            qp->qpair_id.thread_id, qp->qpair_id.queue_id);
          goto err;
        }
-      DEBUG ("input %s qpair %u.%u: size %u, hdr %p, enq %p, deq %p", name,
-            qp->qpair_id.thread_id, qp->qpair_id.queue_id, qp->queue_size,
-            qp->hdr, qp->enq_ring, qp->deq_ring);
+      DEBUG ("input %s qpair %u.%u: size %u, hdr %p, enq %p, deq %p, "
+            "empty_buf_queue_size %u, empty_buf_queue %p",
+            name, qp->qpair_id.thread_id, qp->qpair_id.queue_id,
+            qp->queue_size, qp->hdr, qp->enq_ring, qp->deq_ring,
+            qp->empty_buf_queue_size, qp->empty_buf_ring);
     }
 
   vdm->inputs =
index 04f6f05..f9ad0ee 100644 (file)
@@ -5,7 +5,7 @@
 #ifndef __DAQ_VPP_SHARED_H__
 #define __DAQ_VPP_SHARED_H__
 
-#define DAQ_VPP_VERSION                    2
+#define DAQ_VPP_VERSION                     3
 #define DAQ_VPP_DEFAULT_SOCKET_FILE "snort.sock"
 #define DAQ_VPP_DEFAULT_SOCKET_PATH "/run/vpp/" DAQ_VPP_DEFAULT_SOCKET_FILE
 #define DAQ_VPP_MAX_INST_NAME_LEN   32
@@ -135,9 +135,11 @@ typedef struct
   daq_vpp_input_index_t input_index;
   daq_vpp_qpair_index_t qpair_index;
   uint8_t log2_queue_size;
+  uint8_t log2_empty_buf_queue_size;
   daq_vpp_offset_t qpair_header_offset;
   daq_vpp_offset_t enq_ring_offset;
   daq_vpp_offset_t deq_ring_offset;
+  daq_vpp_offset_t empty_buf_ring_offset;
 } daq_vpp_msg_reply_attach_qpair_t;
 
 typedef struct
@@ -195,6 +197,15 @@ typedef struct
   daq_vpp_pkt_metadata_t metadata;
 } daq_vpp_desc_t;
 
+typedef struct
+{
+  daq_vpp_offset_t offset;
+  uint16_t length;
+  uint8_t buffer_pool;
+  uint32_t ref_buffer_desc_index;
+  uint8_t direction;
+} daq_vpp_empty_buf_desc_t;
+
 typedef struct
 {
   /* enqueue */
@@ -209,6 +220,9 @@ typedef struct
   struct
   {
     daq_vpp_head_tail_t head;
+    /* empty buffer ring head/tail */
+    daq_vpp_head_tail_t empty_buf_head;
+    daq_vpp_head_tail_t empty_buf_tail;
     uint8_t interrupt_pending;
   } __attribute__ ((__aligned__ (64))) deq;
 
@@ -217,6 +231,6 @@ typedef struct
 } daq_vpp_qpair_header_t;
 
 _Static_assert (sizeof (daq_vpp_qpair_header_t) == 128,
-               "size must be two achelines");
+               "size must be two cachelines");
 
 #endif /* __DAQ_VPP_SHARED_H__ */
index b567701..89e1639 100644 (file)
@@ -6,6 +6,61 @@
 #include <vnet/feature/feature.h>
 #include <snort/snort.h>
 
+static_always_inline void
+snort_deq_node_inject (vlib_main_t *vm, vlib_node_runtime_t *node,
+                      snort_qpair_t *qp)
+{
+  daq_vpp_desc_index_t mask = pow2_mask (qp->log2_empty_buf_queue_size);
+  daq_vpp_head_tail_t tail, last_tail = qp->empty_buf_tail;
+  u32 from[VLIB_FRAME_SIZE];
+  u16 nexts[VLIB_FRAME_SIZE];
+  u32 n_recv;
+
+  /* recv injected buffers from empty_buf ring */
+  tail = __atomic_load_n (&qp->hdr->deq.empty_buf_tail, __ATOMIC_ACQUIRE);
+
+  n_recv = tail - last_tail;
+
+  for (u32 i = 0; i < n_recv; i++)
+    {
+      daq_vpp_empty_buf_desc_t *desc = &qp->empty_buf_ring[last_tail & mask];
+      u32 ref_desc_index = desc->ref_buffer_desc_index;
+      snort_qpair_entry_t *ref_qpe = qp->entries + ref_desc_index;
+      daq_vpp_desc_t *ref_d = qp->hdr->descs + ref_desc_index;
+      vlib_buffer_t *b, *ref_b;
+      u32 bi, ref_bi;
+
+      bi = qp->empty_buffers[last_tail & mask];
+      qp->empty_buffers[last_tail & mask] = VLIB_BUFFER_INVALID_INDEX;
+
+      b = vlib_get_buffer (vm, bi);
+      b->current_length = desc->length;
+
+      /* set the buffer metadata */
+      ref_bi = ref_qpe->buffer_index;
+      ref_b = vlib_get_buffer (vm, ref_bi);
+
+      *snort_get_buffer_metadata (b) = ref_d->metadata;
+
+      b->current_config_index = ref_b->current_config_index;
+      vnet_buffer (b)->feature_arc_index =
+       vnet_buffer (ref_b)->feature_arc_index;
+
+      from[i] = bi;
+      nexts[i] = ref_qpe->next_index;
+      last_tail++;
+    }
+
+  if (n_recv)
+    {
+      vlib_buffer_enqueue_to_next (vm, node, from, nexts, n_recv);
+      qp->empty_buf_tail = last_tail;
+    }
+
+  /* allocate more empty_bufs if needed */
+  snort_qpair_empty_buf_alloc_buffers (vm, qp);
+}
+
 static u32
 snort_deq_node_inline (vlib_main_t *vm, vlib_node_runtime_t *node,
                       snort_instance_t *si, snort_qpair_t *qp)
@@ -50,7 +105,9 @@ snort_deq_node_inline (vlib_main_t *vm, vlib_node_runtime_t *node,
              vm, node->node_index, SNORT_DEQ_ERROR_NO_CLIENT_FREE, n_total);
        }
 
+      snort_qpair_empty_buf_free_buffers (vm, qp);
       snort_qpair_init (qp);
+      snort_qpair_empty_buf_alloc_buffers (vm, qp);
       __atomic_store_n (&qp->cleanup_needed, 0, __ATOMIC_RELEASE);
       return 0;
     }
@@ -77,7 +134,7 @@ more:
       u32 bi;
       u8 verdict;
 
-      /* check if descriptor index taken from dequqe ring is valid */
+      /* check if descriptor index taken from dequeue ring is valid */
       if (desc_index & ~mask)
        {
          error = 1;
@@ -161,11 +218,15 @@ VLIB_NODE_FN (snort_deq_node)
     vlib_node_get_runtime_data (vm, node->node_index);
   snort_instance_t *si = pool_elt_at_index (sm->instances, rt->instance_index);
   u32 qpairs_per_thread = si->qpairs_per_thread;
-  snort_qpair_t **qp = snort_get_qpairs (si, vm->thread_index);
+  snort_qpair_t **qp_vec = snort_get_qpairs (si, vm->thread_index);
   uword rv = 0;
 
   while (qpairs_per_thread--)
-    rv += snort_deq_node_inline (vm, node, si, qp++[0]);
+    {
+      snort_qpair_t *qp = qp_vec++[0];
+      snort_deq_node_inject (vm, node, qp);
+      rv += snort_deq_node_inline (vm, node, si, qp);
+    }
 
   return rv;
 }
index e1f3edb..f0700d1 100644 (file)
@@ -14,6 +14,7 @@
 typedef struct
 {
   u8 log2_queue_sz;
+  u8 log2_empty_buf_queue_sz;
   u8 drop_on_disconnect;
   u8 qpairs_per_thread;
   u8 drop_bitmap; /* bits indexed by verdict, 0 = pass, 1 = drop */
index ae1e2c4..0273ecf 100644 (file)
@@ -112,3 +112,13 @@ format_snort_mode (u8 *s, va_list *args)
 
   return format (s, "%s", strings[v]);
 }
+
+u8 *
+format_snort_desc (u8 *s, va_list *args)
+{
+  daq_vpp_desc_t *d = va_arg (*args, daq_vpp_desc_t *);
+
+  s = format (s, "desc: buffer-pool %u offset %u length %u", d->buffer_pool,
+             d->offset, d->length);
+  return s;
+}
index 583c555..d681731 100644 (file)
@@ -67,6 +67,7 @@ snort_instance_create (vlib_main_t *vm, snort_instance_create_args_t *args,
   u8 *base = CLIB_MEM_VM_MAP_FAILED, *name;
   int rv = 0, fd = -1;
   u32 qsz = 1 << args->log2_queue_sz;
+  u32 empty_buf_qsz = 1 << args->log2_empty_buf_queue_sz;
   u32 qpairs_per_thread, total_qpairs, n_threads = tm->n_vlib_mains;
   u8 align = CLIB_CACHE_LINE_BYTES;
 
@@ -100,6 +101,10 @@ snort_instance_create (vlib_main_t *vm, snort_instance_create_args_t *args,
   /* enq and deq ring */
   qpair_mem_sz += 2 * round_pow2 (qsz * sizeof (daq_vpp_desc_index_t), align);
 
+  /* empty buffers ring */
+  qpair_mem_sz +=
+    round_pow2 (empty_buf_qsz * sizeof (daq_vpp_empty_buf_desc_t), align);
+
   /* total size of shared memory */
   size = round_pow2 ((uword) total_qpairs * qpair_mem_sz,
                     clib_mem_get_page_size ());
@@ -212,6 +217,7 @@ snort_instance_create (vlib_main_t *vm, snort_instance_create_args_t *args,
          .client_index = SNORT_INVALID_CLIENT_INDEX,
          .dequeue_node_index = si->dequeue_node_index,
          .log2_queue_size = args->log2_queue_sz,
+         .log2_empty_buf_queue_size = args->log2_empty_buf_queue_sz,
          .qpair_id.thread_id = thread_id,
          .qpair_id.queue_id = queue_id,
          .enq_fd = eventfd (0, EFD_NONBLOCK),
@@ -226,12 +232,13 @@ snort_instance_create (vlib_main_t *vm, snort_instance_create_args_t *args,
        base += round_pow2 (qsz * sizeof (daq_vpp_desc_index_t), align);
        qp->deq_ring = (void *) base;
        base += round_pow2 (qsz * sizeof (daq_vpp_desc_index_t), align);
-
-       for (u32 i = 0; i < qsz; i++)
-         qp->entries[i].buffer_index = VLIB_BUFFER_INVALID_INDEX;
+       qp->empty_buf_ring = (void *) base;
+       base += round_pow2 (empty_buf_qsz * sizeof (daq_vpp_empty_buf_desc_t),
+                           align);
 
        qp->hdr->enq.cookie = DAQ_VPP_COOKIE;
        snort_qpair_init (qp);
+       snort_qpair_empty_buf_alloc_buffers (vm, qp);
 
        /* listen on dequeue events */
        qp->deq_fd_file_index = clib_file_add (
@@ -305,6 +312,7 @@ snort_instance_delete (vlib_main_t *vm, snort_instance_index_t instance_index)
 
   vec_foreach_pointer (qp, si->qpairs)
     {
+      snort_qpair_empty_buf_free_buffers (vm, qp);
       clib_file_del_by_index (&file_main, qp->deq_fd_file_index);
       close (qp->enq_fd);
       close (qp->deq_fd);
index f7ba66d..b2bdd8e 100644 (file)
@@ -31,12 +31,16 @@ typedef struct
   daq_vpp_qpair_header_t *hdr;
   daq_vpp_desc_index_t *enq_ring;
   daq_vpp_desc_index_t *deq_ring;
+  daq_vpp_empty_buf_desc_t *empty_buf_ring;
   int enq_fd, deq_fd;
   u32 client_index;
   daq_vpp_desc_index_t next_free_desc;
   u16 n_free_descs;
   daq_vpp_head_tail_t deq_tail;
+  daq_vpp_head_tail_t empty_buf_tail;
   u8 log2_queue_size;
+  u8 log2_empty_buf_queue_size;
+  u32 *empty_buffers;
   u8 cleanup_needed;
   daq_vpp_qpair_id_t qpair_id;
   u32 deq_fd_file_index;
@@ -148,6 +152,7 @@ format_function_t format_snort_deq_trace;
 format_function_t format_snort_daq_version;
 format_function_t format_snort_verdict;
 format_function_t format_snort_mode;
+format_function_t format_snort_desc;
 
 /* enqueue.c */
 typedef struct
@@ -196,7 +201,8 @@ typedef struct
 #define foreach_snort_deq_error                                               \
   _ (BAD_DESC, "bad descriptor")                                              \
   _ (BAD_DESC_INDEX, "bad descriptor index")                                  \
-  _ (NO_CLIENT_FREE, "packets freed on client dissapear")
+  _ (NO_CLIENT_FREE, "packets freed on client dissapear")                     \
+  _ (NO_FREE_BUFFERS, "no free buffers to allocate to snort")
 
 typedef enum
 {
@@ -211,17 +217,107 @@ always_inline void
 snort_qpair_init (snort_qpair_t *qp)
 {
   u16 qsz = 1 << qp->log2_queue_size;
-  for (int j = 0; j < qsz - 1; j++)
+  u16 empty_buf_qsz = 1 << qp->log2_empty_buf_queue_size;
+  u16 mask = qsz - 1;
+  for (int j = 0; j < qsz; j++)
     {
-      qp->entries[j].freelist_next = j + 1;
+      qp->entries[j].freelist_next = (j + 1) & mask;
       qp->entries[j].buffer_index = VLIB_BUFFER_INVALID_INDEX;
     }
-  qp->entries[qsz - 1].freelist_next = ~0;
   qp->next_free_desc = 0;
   qp->hdr->enq.head = qp->hdr->deq.head = 0;
   qp->hdr->enq.interrupt_pending = qp->hdr->deq.interrupt_pending = 0;
   qp->deq_tail = 0;
+  qp->hdr->deq.empty_buf_head = 0;
+  qp->hdr->deq.empty_buf_tail = 0;
+  qp->empty_buf_tail = 0;
   qp->n_free_descs = qsz;
+  vec_validate_aligned (qp->empty_buffers, empty_buf_qsz - 1,
+                       CLIB_CACHE_LINE_BYTES);
+
+  for (int i = 0; i < empty_buf_qsz; i++)
+    qp->empty_buffers[i] = VLIB_BUFFER_INVALID_INDEX;
+}
+
+static_always_inline void
+snort_qpair_empty_buf_alloc_buffers (vlib_main_t *vm, snort_qpair_t *qp)
+{
+  snort_main_t *sm = &snort_main;
+  u32 i = 0;
+  daq_vpp_desc_index_t mask = pow2_mask (qp->log2_empty_buf_queue_size);
+  u32 empty_buf_size = 1 << qp->log2_empty_buf_queue_size;
+  daq_vpp_head_tail_t head, tail = qp->empty_buf_tail;
+  u32 n_alloc, n_req = 0;
+
+  /* allocate new buffers for empty_buf ring */
+  head = __atomic_load_n (&qp->hdr->deq.empty_buf_head, __ATOMIC_ACQUIRE);
+
+  n_req = empty_buf_size - (head - tail);
+  if (n_req == 0 || n_req < empty_buf_size / 4)
+    return;
+
+  if ((head & mask) + n_req <= empty_buf_size)
+    n_alloc = vlib_buffer_alloc (vm, &qp->empty_buffers[head & mask], n_req);
+  else
+    {
+      u32 n_first = empty_buf_size - (head & mask);
+      u32 n_second = n_req - n_first;
+
+      n_alloc =
+       vlib_buffer_alloc (vm, &qp->empty_buffers[head & mask], n_first);
+      if (n_alloc == n_first && n_second)
+       n_alloc += vlib_buffer_alloc (vm, &qp->empty_buffers[0], n_second);
+    }
+
+  while (i < n_alloc)
+    {
+      daq_vpp_empty_buf_desc_t *empty_buf_desc =
+       &qp->empty_buf_ring[head & mask];
+      vlib_buffer_t *b = vlib_get_buffer (vm, qp->empty_buffers[head & mask]);
+      b->current_data = 0;
+      *empty_buf_desc = (daq_vpp_empty_buf_desc_t){
+       .buffer_pool = b->buffer_pool_index,
+       .length = vlib_buffer_get_default_data_size (vm),
+       .offset = (u8 *) b->data -
+                 sm->buffer_pool_base_addrs[empty_buf_desc->buffer_pool],
+      };
+      head++;
+      i++;
+    }
+  __atomic_store_n (&qp->hdr->deq.empty_buf_head, head, __ATOMIC_RELEASE);
+}
+
+static_always_inline u32
+snort_qpair_empty_buf_free_buffers (vlib_main_t *vm, snort_qpair_t *qp)
+{
+  u32 empty_buf_size = 1 << qp->log2_empty_buf_queue_size;
+  u32 n_total = 0;
+  u32 n_free = 0;
+  u32 buffer_indices[VLIB_FRAME_SIZE];
+
+  for (u32 i = 0; i < empty_buf_size; i++)
+    {
+      u32 bi = qp->empty_buffers[i];
+      if (bi != VLIB_BUFFER_INVALID_INDEX)
+       {
+         buffer_indices[n_free] = bi;
+         qp->empty_buffers[i] = VLIB_BUFFER_INVALID_INDEX;
+         n_free++;
+       }
+      if (n_free == VLIB_FRAME_SIZE)
+       {
+         vlib_buffer_free (vm, buffer_indices, VLIB_FRAME_SIZE);
+         n_total += VLIB_FRAME_SIZE;
+         n_free = 0;
+       }
+    }
+
+  if (n_free)
+    vlib_buffer_free (vm, buffer_indices, n_free);
+  n_total += n_free;
+
+  vec_free (qp->empty_buffers);
+  return n_total;
 }
 
 static_always_inline snort_qpair_t **
index 119435c..f0ba77f 100644 (file)
@@ -164,9 +164,11 @@ snort_msg_attach_qpair (snort_client_t *c, daq_vpp_msg_req_t *req,
   r->qpair_id = qp->qpair_id;
   r->input_index = si->index;
   r->log2_queue_size = qp->log2_queue_size;
+  r->log2_empty_buf_queue_size = qp->log2_empty_buf_queue_size;
   r->qpair_header_offset = (u8 *) qp->hdr - base;
   r->enq_ring_offset = (u8 *) qp->enq_ring - base;
   r->deq_ring_offset = (u8 *) qp->deq_ring - base;
+  r->empty_buf_ring_offset = (u8 *) qp->empty_buf_ring - base;
   e->fds[0] = qp->enq_fd;
   e->fds[1] = qp->deq_fd;
   e->n_fds = 2;
@@ -175,10 +177,12 @@ snort_msg_attach_qpair (snort_client_t *c, daq_vpp_msg_req_t *req,
 
   log_debug (
     "qpair_id %u.%u input_index %u log2_queue_size %u qpair_header_offset %u "
-    "enq_ring_offset %u deq_ring_offset %u enq_fd %d deq_fd %d",
+    "enq_ring_offset %u deq_ring_offset %u enq_fd %d deq_fd %d "
+    "log2_empty_buf_queue_size %u empty_buf_ring_offset %u",
     r->qpair_id.thread_id, r->qpair_id.queue_id, r->input_index,
     r->log2_queue_size, r->qpair_header_offset, r->enq_ring_offset,
-    r->deq_ring_offset, e->fds[0], e->fds[1]);
+    r->deq_ring_offset, e->fds[0], e->fds[1], r->log2_empty_buf_queue_size,
+    r->empty_buf_ring_offset);
 }
 
 static clib_error_t *