clib_error_t *err = 0;
u8 *name = 0;
u32 queue_size = 1024;
+ u32 empty_buf_queue_size = 64;
u32 qpairs_per_thread = 1;
u8 drop_on_disconnect = 1;
int rv = 0;
;
else if (unformat (input, "queues-per-thread %u", &qpairs_per_thread))
;
+ else if (unformat (input, "empty-buf-queue-size %u",
+ &empty_buf_queue_size))
+ ;
else if (unformat (input, "on-disconnect drop"))
drop_on_disconnect = 1;
else if (unformat (input, "on-disconnect pass"))
goto done;
}
- rv = snort_instance_create (vm,
- &(snort_instance_create_args_t){
- .log2_queue_sz = min_log2 (queue_size),
- .drop_on_disconnect = drop_on_disconnect,
- .drop_bitmap = 1 << DAQ_VPP_VERDICT_BLOCK |
- 1 << DAQ_VPP_VERDICT_BLACKLIST,
- .qpairs_per_thread = qpairs_per_thread,
- },
- "%s", name);
+ rv = snort_instance_create (
+ vm,
+ &(snort_instance_create_args_t){
+ .log2_queue_sz = min_log2 (queue_size),
+ .log2_empty_buf_queue_sz = min_log2 (empty_buf_queue_size),
+ .drop_on_disconnect = drop_on_disconnect,
+ .drop_bitmap =
+ 1 << DAQ_VPP_VERDICT_BLOCK | 1 << DAQ_VPP_VERDICT_BLACKLIST,
+ .qpairs_per_thread = qpairs_per_thread,
+ },
+ "%s", name);
switch (rv)
{
VLIB_CLI_COMMAND (snort_create_instance_command, static) = {
.path = "snort create-instance",
.short_help = "snort create-instance name <name> [queue-size <size>] "
- "[queues-per-thread <n>] [on-disconnect drop|pass]",
+ "[queues-per-thread <n>] [empty-buf-queue-size <size>] "
+ "[on-disconnect drop|pass]",
.function = snort_create_instance_command_fn,
};
(u8 *) qp->deq_ring - (u8 *) si->shm_base, qp->deq_fd,
qp->hdr->deq.head, qp->deq_tail);
+ vlib_cli_output (vm,
+ " empty-buf-queue: ring_offset %u ring_size %u "
+ "head %lu tail %lu",
+ (u8 *) qp->empty_buf_ring - (u8 *) si->shm_base,
+ 1 << qp->log2_empty_buf_queue_size,
+ qp->hdr->deq.empty_buf_head, qp->empty_buf_tail);
+
for (u32 i = 0; i < DAQ_VPP_MAX_DAQ_VERDICT; i++)
if (qp->n_packets_by_verdict[i])
{
if (verbose)
{
vlib_cli_output (vm, " desc buffer_index next_index "
- " freelist_next\n");
+ " freelist_next desc\n");
vlib_cli_output (
- vm, " ====== ============== ============ ===============\n");
+ vm, " ====== ============== ============ "
+ "=============== ====================================\n");
u32 total_desc = 1 << qp->log2_queue_size;
for (u32 i = 0; i < total_desc; i++)
{
snort_qpair_entry_t *qpe = qp->entries + i;
- vlib_cli_output (vm, " %-6d %-12u %-12u %-12u", i,
+ daq_vpp_desc_t *d = qp->hdr->descs + i;
+ vlib_cli_output (vm, " %-6d %-12u %-12u %-14u %U", i,
qpe->buffer_index, qpe->next_index,
- qpe->freelist_next);
+ qpe->freelist_next, format_snort_desc, d);
}
}
}
void daq_vpp_dump_msg_type (DAQ_MsgType type);
void daq_vpp_dump_msg (DAQ_Msg_h msg);
char *daq_vpp_dump_packet_data (const uint8_t *data, uint32_t len);
+const char *daq_vpp_inject_direction (int reverse);
#define DEBUG2(fmt, ...) \
if (daq_vpp_main.debug_msg) \
daq_vpp_qpair_header_t *hdr;
daq_vpp_desc_index_t *enq_ring;
daq_vpp_desc_index_t *deq_ring;
+ daq_vpp_empty_buf_desc_t *empty_buf_ring;
daq_vpp_head_tail_t tail;
uint16_t queue_size;
+ uint16_t empty_buf_queue_size;
int enq_fd;
int deq_fd;
daq_vpp_input_index_t input_index;
return buf;
}
+const char *
+daq_vpp_inject_direction (int reverse)
+{
+ switch (reverse)
+ {
+ case (DAQ_DIR_FORWARD):
+ return "Forward Injection";
+ case (DAQ_DIR_REVERSE):
+ return "Reverse Injection";
+ case (DAQ_DIR_BOTH):
+ return "Forward & Reverse Injection";
+ default:
+ return "Unknown";
+ }
+}
+
static inline const char *
daq_vpp_msg_type_to_str (DAQ_MsgType type)
{
return DAQ_SUCCESS;
}
+static int
+daq_vpp_inject (void *handle, DAQ_MsgType type, const void *hdr,
+ const uint8_t *data, uint32_t data_len)
+{
+ daq_vpp_ctx_t __unused *ctx = (daq_vpp_ctx_t *) handle;
+ daq_vpp_main_t __unused *vdm = &daq_vpp_main;
+ DAQ_PktHdr_t *pkthdr = (DAQ_PktHdr_t *) hdr;
+
+ DEBUG_DUMP_MSG2 (pkthdr, type, data, data_len);
+
+ return DAQ_ERROR_NOTSUP;
+}
+
+static int
+daq_vpp_inject_relative (void *handle, DAQ_Msg_h msg, const uint8_t *data,
+ uint32_t data_len, int reverse)
+{
+ daq_vpp_ctx_t *ctx = (daq_vpp_ctx_t *) handle;
+ daq_vpp_main_t *vdm = &daq_vpp_main;
+ daq_vpp_msg_pool_entry_t *pe = msg->priv;
+ daq_vpp_qpair_t *qp = pe->qpair;
+ daq_vpp_head_tail_t head, tail, mask = qp->empty_buf_queue_size - 1;
+ daq_vpp_qpair_header_t *h = qp->hdr;
+ uint8_t *buf_data;
+ daq_vpp_empty_buf_desc_t *empty_buf_desc;
+
+ DEBUG_DUMP_MSG (msg);
+ DEBUG2 ("%s", daq_vpp_inject_direction (reverse));
+ DEBUG_DUMP_DATA_HEX (data, data_len);
+
+ /*
+ * check the injection direction against the packet direction
+ * if direction is not the supported one, return error
+ */
+ switch (reverse)
+ {
+ case DAQ_DIR_FORWARD:
+ break;
+ case DAQ_DIR_REVERSE:
+ break;
+ case DAQ_DIR_BOTH:
+ break;
+ default:
+ return daq_vpp_err (ctx, "invalid direction %d", reverse);
+ }
+
+ head = __atomic_load_n (&qp->hdr->deq.empty_buf_head, __ATOMIC_ACQUIRE);
+ tail = __atomic_load_n (&qp->hdr->deq.empty_buf_tail, __ATOMIC_RELAXED);
+
+ if (head == tail)
+ {
+ DEBUG2 ("no empty buffer available to inject packet");
+ return daq_vpp_err (ctx, "no empty buffer available to inject packet");
+ }
+ empty_buf_desc = &qp->empty_buf_ring[tail & mask];
+ buf_data =
+ vdm->bpools[empty_buf_desc->buffer_pool].base + empty_buf_desc->offset;
+
+ if (empty_buf_desc->length < data_len)
+ return daq_vpp_err (ctx, "descriptor %u buffer too small (%u < %u)",
+ tail & mask, empty_buf_desc->length, data_len);
+
+ memcpy (buf_data, data, data_len);
+ empty_buf_desc->length = data_len;
+ empty_buf_desc->ref_buffer_desc_index = pe->index;
+ empty_buf_desc->direction = reverse;
+
+ tail = tail + 1;
+ __atomic_store_n (&h->deq.empty_buf_tail, tail, __ATOMIC_RELEASE);
+
+ if (!__atomic_exchange_n (&qp->hdr->deq.interrupt_pending, 1,
+ __ATOMIC_RELAXED))
+ {
+ ssize_t __unused rv;
+ rv = write (qp->deq_fd, &(uint64_t){ 1 }, sizeof (uint64_t));
+ }
+ return DAQ_SUCCESS;
+}
+
const DAQ_Msg_t *
daq_vpp_fill_msg (daq_vpp_ctx_t *ctx, daq_vpp_qpair_t *qp, uint32_t desc_index,
struct timeval tv)
DEBUG_DUMP_MSG (msg);
+ if (ctx->msg_pool_info.available == ctx->msg_pool_info.size)
+ {
+ DEBUG2 ("all messages are already finalized");
+ return DAQ_SUCCESS;
+ }
+
if (verdict >= MAX_DAQ_VERDICT)
- verdict = DAQ_VERDICT_PASS;
+ {
+ DEBUG2 ("verdict %d out of range, setting to PASS", verdict);
+ verdict = DAQ_VERDICT_PASS;
+ }
ctx->stats.verdicts[verdict]++;
mask = qp->queue_size - 1;
static uint32_t
daq_vpp_get_capabilities (void __unused *handle)
{
- return DAQ_CAPA_BLOCK | DAQ_CAPA_UNPRIV_START | DAQ_CAPA_INTERRUPT |
- DAQ_CAPA_DEVICE_INDEX;
+ return DAQ_CAPA_BLOCK | /* can block packets */
+ DAQ_CAPA_INJECT | /* can inject packets */
+ DAQ_CAPA_UNPRIV_START | /* can start without root privileges */
+ DAQ_CAPA_INTERRUPT | /* can be interrupted */
+ DAQ_CAPA_DEVICE_INDEX; /* can consistently fill the device index field
+ in DAQ_PktHdr */
}
static int
return DLT_IPV4;
}
-static char *
+static const char *
daq_vpp_ioctl_cmd_to_str (DAQ_IoctlCmd cmd)
{
#define IOCTL_CMD_STR(cmd) \
daq_vpp_ctx_t *ctx = (daq_vpp_ctx_t *) handle;
DIOCTL_QueryDeviceIndex *qdi = (DIOCTL_QueryDeviceIndex *) arg;
- DEBUG ("ioctl cmd %s", daq_vpp_ioctl_cmd_to_str (cmd));
+ DEBUG2 ("ioctl cmd %s", daq_vpp_ioctl_cmd_to_str (cmd));
- if (cmd == DIOCTL_GET_DEVICE_INDEX)
+ switch (cmd)
{
- char name[DAQ_VPP_MAX_INST_NAME_LEN], *colon;
-
- if (arglen != sizeof (DIOCTL_QueryDeviceIndex))
- return DAQ_ERROR_NOTSUP;
+ case DIOCTL_GET_DEVICE_INDEX:
+ {
+ char name[DAQ_VPP_MAX_INST_NAME_LEN], *colon;
- if (qdi->device == 0)
- {
- daq_vpp_err (ctx, "no device name in IOCTL_GET_DEVICE_INDEX");
- return DAQ_ERROR_INVAL;
- }
- snprintf (name, sizeof (name), "%s", qdi->device);
- colon = strchr (name, ':');
- if (colon)
- colon[0] = 0;
+ if (arglen != sizeof (DIOCTL_QueryDeviceIndex))
+ return DAQ_ERROR_NOTSUP;
- for (daq_vpp_input_index_t ii = 0; ii < vdm->n_inputs; ii++)
- if (strcmp (name, vdm->inputs[ii]->name) == 0)
+ if (qdi->device == 0)
{
- qdi->index = ii + 1;
- return DAQ_SUCCESS;
+ daq_vpp_err (ctx, "no device name in IOCTL_GET_DEVICE_INDEX");
+ return DAQ_ERROR_INVAL;
}
+ snprintf (name, sizeof (name), "%s", qdi->device);
+ colon = strchr (name, ':');
+ if (colon)
+ colon[0] = 0;
- return DAQ_ERROR_NODEV;
- }
- else if (cmd == DIOCTL_GET_PRIV_DATA_LEN)
- {
- DIOCTL_GetPrivDataLen *gpl = (DIOCTL_GetPrivDataLen *) arg;
+ for (daq_vpp_input_index_t ii = 0; ii < vdm->n_inputs; ii++)
+ if (strcmp (name, vdm->inputs[ii]->name) == 0)
+ {
+ qdi->index = ii + 1;
+ return DAQ_SUCCESS;
+ }
- if (arglen != sizeof (DIOCTL_GetPrivDataLen))
- return DAQ_ERROR_NOTSUP;
- if (gpl->msg->priv != NULL)
- gpl->priv_data_len = sizeof (daq_vpp_msg_pool_entry_t);
- else
- gpl->priv_data_len = 0;
+ return DAQ_ERROR_NODEV;
+ }
+ case DIOCTL_GET_PRIV_DATA_LEN:
+ {
+ DIOCTL_GetPrivDataLen *gpl = (DIOCTL_GetPrivDataLen *) arg;
+
+ if (arglen != sizeof (DIOCTL_GetPrivDataLen))
+ return DAQ_ERROR_NOTSUP;
+ if (gpl->msg->priv != NULL)
+ gpl->priv_data_len = sizeof (daq_vpp_msg_pool_entry_t);
+ else
+ gpl->priv_data_len = 0;
+
+ DEBUG2 ("ioctl cmd %s %u", daq_vpp_ioctl_cmd_to_str (cmd),
+ gpl->priv_data_len);
+ return DAQ_SUCCESS;
+ }
+ case DIOCTL_DIRECT_INJECT_PAYLOAD:
+ case DIOCTL_DIRECT_INJECT_RESET:
+ case DIOCTL_SET_INJECT_DROP:
+ DEBUG2 ("%s is a no-op", daq_vpp_ioctl_cmd_to_str (cmd));
- DEBUG2 ("ioctl cmd %s %u", daq_vpp_ioctl_cmd_to_str (cmd),
- gpl->priv_data_len);
- return DAQ_SUCCESS;
+ default:
+ /* not supported yet */
+ return DAQ_ERROR_NOTSUP;
}
return DAQ_ERROR_NOTSUP;
.instantiate = daq_vpp_instantiate,
.destroy = daq_vpp_destroy,
.start = daq_vpp_start,
+ .inject = daq_vpp_inject,
+ .inject_relative = daq_vpp_inject_relative,
.interrupt = daq_vpp_interrupt,
.ioctl = daq_vpp_ioctl,
.get_stats = daq_vpp_get_stats,
", attach_qpair: { qpair_id: { thread_id: %u, "
"queue_id: %u }, log2_queue_size: %u, "
"qpair_header_offset: %u, enq_ring_offset: %u, "
- "deq_ring_offset: %u }",
+ "deq_ring_offset: %u, log2_empty_buf_queue_size: %u "
+ "empty_buf_ring_offset: %u }",
reply->attach_qpair.qpair_id.thread_id,
reply->attach_qpair.qpair_id.queue_id,
reply->attach_qpair.log2_queue_size,
reply->attach_qpair.qpair_header_offset,
reply->attach_qpair.enq_ring_offset,
- reply->attach_qpair.deq_ring_offset);
+ reply->attach_qpair.deq_ring_offset,
+ reply->attach_qpair.log2_empty_buf_queue_size,
+ reply->attach_qpair.empty_buf_ring_offset);
break;
default:
n += snprintf (buf + n, sizeof (buf) - n, ", unknown");
qp->qpair_id = g->qpair_id;
qp->queue_size = 1 << g->log2_queue_size;
+ qp->empty_buf_queue_size = 1 << g->log2_empty_buf_queue_size;
qp->hdr = (daq_vpp_qpair_header_t *) (base + g->qpair_header_offset);
qp->enq_ring = (daq_vpp_desc_index_t *) (base + g->enq_ring_offset);
qp->deq_ring = (daq_vpp_desc_index_t *) (base + g->deq_ring_offset);
+ qp->empty_buf_ring =
+ (daq_vpp_empty_buf_desc_t *) (base + g->empty_buf_ring_offset);
qp->enq_fd = fds[0];
qp->deq_fd = fds[1];
qp->input_index = ii;
qp->qpair_id.thread_id, qp->qpair_id.queue_id);
goto err;
}
- DEBUG ("input %s qpair %u.%u: size %u, hdr %p, enq %p, deq %p", name,
- qp->qpair_id.thread_id, qp->qpair_id.queue_id, qp->queue_size,
- qp->hdr, qp->enq_ring, qp->deq_ring);
+ DEBUG ("input %s qpair %u.%u: size %u, hdr %p, enq %p, deq %p, "
+ "empty_buf_queue_size %u, empty_buf_queue %p",
+ name, qp->qpair_id.thread_id, qp->qpair_id.queue_id,
+ qp->queue_size, qp->hdr, qp->enq_ring, qp->deq_ring,
+ qp->empty_buf_queue_size, qp->empty_buf_ring);
}
vdm->inputs =
#ifndef __DAQ_VPP_SHARED_H__
#define __DAQ_VPP_SHARED_H__
-#define DAQ_VPP_VERSION 2
+#define DAQ_VPP_VERSION 3
#define DAQ_VPP_DEFAULT_SOCKET_FILE "snort.sock"
#define DAQ_VPP_DEFAULT_SOCKET_PATH "/run/vpp/" DAQ_VPP_DEFAULT_SOCKET_FILE
#define DAQ_VPP_MAX_INST_NAME_LEN 32
daq_vpp_input_index_t input_index;
daq_vpp_qpair_index_t qpair_index;
uint8_t log2_queue_size;
+ uint8_t log2_empty_buf_queue_size;
daq_vpp_offset_t qpair_header_offset;
daq_vpp_offset_t enq_ring_offset;
daq_vpp_offset_t deq_ring_offset;
+ daq_vpp_offset_t empty_buf_ring_offset;
} daq_vpp_msg_reply_attach_qpair_t;
typedef struct
daq_vpp_pkt_metadata_t metadata;
} daq_vpp_desc_t;
+typedef struct
+{
+ daq_vpp_offset_t offset;
+ uint16_t length;
+ uint8_t buffer_pool;
+ uint32_t ref_buffer_desc_index;
+ uint8_t direction;
+} daq_vpp_empty_buf_desc_t;
+
typedef struct
{
/* enqueue */
struct
{
daq_vpp_head_tail_t head;
+ /* empty buffer ring head/tail */
+ daq_vpp_head_tail_t empty_buf_head;
+ daq_vpp_head_tail_t empty_buf_tail;
uint8_t interrupt_pending;
} __attribute__ ((__aligned__ (64))) deq;
} daq_vpp_qpair_header_t;
_Static_assert (sizeof (daq_vpp_qpair_header_t) == 128,
- "size must be two achelines");
+ "size must be two cachelines");
#endif /* __DAQ_VPP_SHARED_H__ */
#include <vnet/feature/feature.h>
#include <snort/snort.h>
+static_always_inline void
+snort_deq_node_inject (vlib_main_t *vm, vlib_node_runtime_t *node,
+ snort_qpair_t *qp)
+{
+ daq_vpp_desc_index_t mask = pow2_mask (qp->log2_empty_buf_queue_size);
+ daq_vpp_head_tail_t tail, last_tail = qp->empty_buf_tail;
+ u32 from[VLIB_FRAME_SIZE];
+ u16 nexts[VLIB_FRAME_SIZE];
+ u32 n_recv;
+
+ /* recv injected buffers from empty_buf ring */
+ tail = __atomic_load_n (&qp->hdr->deq.empty_buf_tail, __ATOMIC_ACQUIRE);
+
+ n_recv = tail - last_tail;
+
+ for (u32 i = 0; i < n_recv; i++)
+ {
+ daq_vpp_empty_buf_desc_t *desc = &qp->empty_buf_ring[last_tail & mask];
+ u32 ref_desc_index = desc->ref_buffer_desc_index;
+ snort_qpair_entry_t *ref_qpe = qp->entries + ref_desc_index;
+ daq_vpp_desc_t *ref_d = qp->hdr->descs + ref_desc_index;
+ vlib_buffer_t *b, *ref_b;
+ u32 bi, ref_bi;
+
+ bi = qp->empty_buffers[last_tail & mask];
+ qp->empty_buffers[last_tail & mask] = VLIB_BUFFER_INVALID_INDEX;
+
+ b = vlib_get_buffer (vm, bi);
+ b->current_length = desc->length;
+
+ /* set the buffer metadata */
+ ref_bi = ref_qpe->buffer_index;
+ ref_b = vlib_get_buffer (vm, ref_bi);
+
+ *snort_get_buffer_metadata (b) = ref_d->metadata;
+
+ b->current_config_index = ref_b->current_config_index;
+ vnet_buffer (b)->feature_arc_index =
+ vnet_buffer (ref_b)->feature_arc_index;
+
+ from[i] = bi;
+ nexts[i] = ref_qpe->next_index;
+ last_tail++;
+ }
+
+ if (n_recv)
+ {
+ vlib_buffer_enqueue_to_next (vm, node, from, nexts, n_recv);
+ qp->empty_buf_tail = last_tail;
+ }
+
+ /* allocate more empty_bufs if needed */
+ snort_qpair_empty_buf_alloc_buffers (vm, qp);
+}
+
static u32
snort_deq_node_inline (vlib_main_t *vm, vlib_node_runtime_t *node,
snort_instance_t *si, snort_qpair_t *qp)
vm, node->node_index, SNORT_DEQ_ERROR_NO_CLIENT_FREE, n_total);
}
+ snort_qpair_empty_buf_free_buffers (vm, qp);
snort_qpair_init (qp);
+ snort_qpair_empty_buf_alloc_buffers (vm, qp);
__atomic_store_n (&qp->cleanup_needed, 0, __ATOMIC_RELEASE);
return 0;
}
u32 bi;
u8 verdict;
- /* check if descriptor index taken from dequqe ring is valid */
+ /* check if descriptor index taken from dequeue ring is valid */
if (desc_index & ~mask)
{
error = 1;
vlib_node_get_runtime_data (vm, node->node_index);
snort_instance_t *si = pool_elt_at_index (sm->instances, rt->instance_index);
u32 qpairs_per_thread = si->qpairs_per_thread;
- snort_qpair_t **qp = snort_get_qpairs (si, vm->thread_index);
+ snort_qpair_t **qp_vec = snort_get_qpairs (si, vm->thread_index);
uword rv = 0;
while (qpairs_per_thread--)
- rv += snort_deq_node_inline (vm, node, si, qp++[0]);
+ {
+ snort_qpair_t *qp = qp_vec++[0];
+ snort_deq_node_inject (vm, node, qp);
+ rv += snort_deq_node_inline (vm, node, si, qp);
+ }
return rv;
}
typedef struct
{
u8 log2_queue_sz;
+ u8 log2_empty_buf_queue_sz;
u8 drop_on_disconnect;
u8 qpairs_per_thread;
u8 drop_bitmap; /* bits indexed by verdict, 0 = pass, 1 = drop */
return format (s, "%s", strings[v]);
}
+
+u8 *
+format_snort_desc (u8 *s, va_list *args)
+{
+ daq_vpp_desc_t *d = va_arg (*args, daq_vpp_desc_t *);
+
+ s = format (s, "desc: buffer-pool %u offset %u length %u", d->buffer_pool,
+ d->offset, d->length);
+ return s;
+}
u8 *base = CLIB_MEM_VM_MAP_FAILED, *name;
int rv = 0, fd = -1;
u32 qsz = 1 << args->log2_queue_sz;
+ u32 empty_buf_qsz = 1 << args->log2_empty_buf_queue_sz;
u32 qpairs_per_thread, total_qpairs, n_threads = tm->n_vlib_mains;
u8 align = CLIB_CACHE_LINE_BYTES;
/* enq and deq ring */
qpair_mem_sz += 2 * round_pow2 (qsz * sizeof (daq_vpp_desc_index_t), align);
+ /* empty buffers ring */
+ qpair_mem_sz +=
+ round_pow2 (empty_buf_qsz * sizeof (daq_vpp_empty_buf_desc_t), align);
+
/* total size of shared memory */
size = round_pow2 ((uword) total_qpairs * qpair_mem_sz,
clib_mem_get_page_size ());
.client_index = SNORT_INVALID_CLIENT_INDEX,
.dequeue_node_index = si->dequeue_node_index,
.log2_queue_size = args->log2_queue_sz,
+ .log2_empty_buf_queue_size = args->log2_empty_buf_queue_sz,
.qpair_id.thread_id = thread_id,
.qpair_id.queue_id = queue_id,
.enq_fd = eventfd (0, EFD_NONBLOCK),
base += round_pow2 (qsz * sizeof (daq_vpp_desc_index_t), align);
qp->deq_ring = (void *) base;
base += round_pow2 (qsz * sizeof (daq_vpp_desc_index_t), align);
-
- for (u32 i = 0; i < qsz; i++)
- qp->entries[i].buffer_index = VLIB_BUFFER_INVALID_INDEX;
+ qp->empty_buf_ring = (void *) base;
+ base += round_pow2 (empty_buf_qsz * sizeof (daq_vpp_empty_buf_desc_t),
+ align);
qp->hdr->enq.cookie = DAQ_VPP_COOKIE;
snort_qpair_init (qp);
+ snort_qpair_empty_buf_alloc_buffers (vm, qp);
/* listen on dequeue events */
qp->deq_fd_file_index = clib_file_add (
vec_foreach_pointer (qp, si->qpairs)
{
+ snort_qpair_empty_buf_free_buffers (vm, qp);
clib_file_del_by_index (&file_main, qp->deq_fd_file_index);
close (qp->enq_fd);
close (qp->deq_fd);
daq_vpp_qpair_header_t *hdr;
daq_vpp_desc_index_t *enq_ring;
daq_vpp_desc_index_t *deq_ring;
+ daq_vpp_empty_buf_desc_t *empty_buf_ring;
int enq_fd, deq_fd;
u32 client_index;
daq_vpp_desc_index_t next_free_desc;
u16 n_free_descs;
daq_vpp_head_tail_t deq_tail;
+ daq_vpp_head_tail_t empty_buf_tail;
u8 log2_queue_size;
+ u8 log2_empty_buf_queue_size;
+ u32 *empty_buffers;
u8 cleanup_needed;
daq_vpp_qpair_id_t qpair_id;
u32 deq_fd_file_index;
format_function_t format_snort_daq_version;
format_function_t format_snort_verdict;
format_function_t format_snort_mode;
+format_function_t format_snort_desc;
/* enqueue.c */
typedef struct
#define foreach_snort_deq_error \
_ (BAD_DESC, "bad descriptor") \
_ (BAD_DESC_INDEX, "bad descriptor index") \
- _ (NO_CLIENT_FREE, "packets freed on client dissapear")
+ _ (NO_CLIENT_FREE, "packets freed on client dissapear") \
+ _ (NO_FREE_BUFFERS, "no free buffers to allocate to snort")
typedef enum
{
snort_qpair_init (snort_qpair_t *qp)
{
u16 qsz = 1 << qp->log2_queue_size;
- for (int j = 0; j < qsz - 1; j++)
+ u16 empty_buf_qsz = 1 << qp->log2_empty_buf_queue_size;
+ u16 mask = qsz - 1;
+ for (int j = 0; j < qsz; j++)
{
- qp->entries[j].freelist_next = j + 1;
+ qp->entries[j].freelist_next = (j + 1) & mask;
qp->entries[j].buffer_index = VLIB_BUFFER_INVALID_INDEX;
}
- qp->entries[qsz - 1].freelist_next = ~0;
qp->next_free_desc = 0;
qp->hdr->enq.head = qp->hdr->deq.head = 0;
qp->hdr->enq.interrupt_pending = qp->hdr->deq.interrupt_pending = 0;
qp->deq_tail = 0;
+ qp->hdr->deq.empty_buf_head = 0;
+ qp->hdr->deq.empty_buf_tail = 0;
+ qp->empty_buf_tail = 0;
qp->n_free_descs = qsz;
+ vec_validate_aligned (qp->empty_buffers, empty_buf_qsz - 1,
+ CLIB_CACHE_LINE_BYTES);
+
+ for (int i = 0; i < empty_buf_qsz; i++)
+ qp->empty_buffers[i] = VLIB_BUFFER_INVALID_INDEX;
+}
+
+static_always_inline void
+snort_qpair_empty_buf_alloc_buffers (vlib_main_t *vm, snort_qpair_t *qp)
+{
+ snort_main_t *sm = &snort_main;
+ u32 i = 0;
+ daq_vpp_desc_index_t mask = pow2_mask (qp->log2_empty_buf_queue_size);
+ u32 empty_buf_size = 1 << qp->log2_empty_buf_queue_size;
+ daq_vpp_head_tail_t head, tail = qp->empty_buf_tail;
+ u32 n_alloc, n_req = 0;
+
+ /* allocate new buffers for empty_buf ring */
+ head = __atomic_load_n (&qp->hdr->deq.empty_buf_head, __ATOMIC_ACQUIRE);
+
+ n_req = empty_buf_size - (head - tail);
+ if (n_req == 0 || n_req < empty_buf_size / 4)
+ return;
+
+ if ((head & mask) + n_req <= empty_buf_size)
+ n_alloc = vlib_buffer_alloc (vm, &qp->empty_buffers[head & mask], n_req);
+ else
+ {
+ u32 n_first = empty_buf_size - (head & mask);
+ u32 n_second = n_req - n_first;
+
+ n_alloc =
+ vlib_buffer_alloc (vm, &qp->empty_buffers[head & mask], n_first);
+ if (n_alloc == n_first && n_second)
+ n_alloc += vlib_buffer_alloc (vm, &qp->empty_buffers[0], n_second);
+ }
+
+ while (i < n_alloc)
+ {
+ daq_vpp_empty_buf_desc_t *empty_buf_desc =
+ &qp->empty_buf_ring[head & mask];
+ vlib_buffer_t *b = vlib_get_buffer (vm, qp->empty_buffers[head & mask]);
+ b->current_data = 0;
+ *empty_buf_desc = (daq_vpp_empty_buf_desc_t){
+ .buffer_pool = b->buffer_pool_index,
+ .length = vlib_buffer_get_default_data_size (vm),
+ .offset = (u8 *) b->data -
+ sm->buffer_pool_base_addrs[empty_buf_desc->buffer_pool],
+ };
+ head++;
+ i++;
+ }
+ __atomic_store_n (&qp->hdr->deq.empty_buf_head, head, __ATOMIC_RELEASE);
+}
+
+static_always_inline u32
+snort_qpair_empty_buf_free_buffers (vlib_main_t *vm, snort_qpair_t *qp)
+{
+ u32 empty_buf_size = 1 << qp->log2_empty_buf_queue_size;
+ u32 n_total = 0;
+ u32 n_free = 0;
+ u32 buffer_indices[VLIB_FRAME_SIZE];
+
+ for (u32 i = 0; i < empty_buf_size; i++)
+ {
+ u32 bi = qp->empty_buffers[i];
+ if (bi != VLIB_BUFFER_INVALID_INDEX)
+ {
+ buffer_indices[n_free] = bi;
+ qp->empty_buffers[i] = VLIB_BUFFER_INVALID_INDEX;
+ n_free++;
+ }
+ if (n_free == VLIB_FRAME_SIZE)
+ {
+ vlib_buffer_free (vm, buffer_indices, VLIB_FRAME_SIZE);
+ n_total += VLIB_FRAME_SIZE;
+ n_free = 0;
+ }
+ }
+
+ if (n_free)
+ vlib_buffer_free (vm, buffer_indices, n_free);
+ n_total += n_free;
+
+ vec_free (qp->empty_buffers);
+ return n_total;
}
static_always_inline snort_qpair_t **
r->qpair_id = qp->qpair_id;
r->input_index = si->index;
r->log2_queue_size = qp->log2_queue_size;
+ r->log2_empty_buf_queue_size = qp->log2_empty_buf_queue_size;
r->qpair_header_offset = (u8 *) qp->hdr - base;
r->enq_ring_offset = (u8 *) qp->enq_ring - base;
r->deq_ring_offset = (u8 *) qp->deq_ring - base;
+ r->empty_buf_ring_offset = (u8 *) qp->empty_buf_ring - base;
e->fds[0] = qp->enq_fd;
e->fds[1] = qp->deq_fd;
e->n_fds = 2;
log_debug (
"qpair_id %u.%u input_index %u log2_queue_size %u qpair_header_offset %u "
- "enq_ring_offset %u deq_ring_offset %u enq_fd %d deq_fd %d",
+ "enq_ring_offset %u deq_ring_offset %u enq_fd %d deq_fd %d "
+ "log2_empty_buf_queue_size %u empty_buf_ring_offset %u",
r->qpair_id.thread_id, r->qpair_id.queue_id, r->input_index,
r->log2_queue_size, r->qpair_header_offset, r->enq_ring_offset,
- r->deq_ring_offset, e->fds[0], e->fds[1]);
+ r->deq_ring_offset, e->fds[0], e->fds[1], r->log2_empty_buf_queue_size,
+ r->empty_buf_ring_offset);
}
static clib_error_t *