1 /* SPDX-License-Identifier: Apache-2.0
2 * Copyright(c) 2021 Cisco Systems, Inc.
10 #include <sys/socket.h>
14 #include <sys/epoll.h>
16 #include <vppinfra/cache.h>
18 #include <daq_module_api.h>
22 #define DAQ_VPP_VERSION 1
25 #define VPP_DAQ_PAUSE() __builtin_ia32_pause ()
26 #elif defined(__aarch64__) || defined(__arm__)
27 #define VPP_DAQ_PAUSE() __asm__("yield")
29 #define VPP_DAQ_PAUSE()
32 static DAQ_VariableDesc_t vpp_variable_descriptions[] = {
33 { "debug", "Enable debugging output to stdout",
34 DAQ_VAR_DESC_FORBIDS_ARGUMENT },
37 static DAQ_BaseAPI_t daq_base_api;
39 #define SET_ERROR(modinst, ...) daq_base_api.set_errbuf (modinst, __VA_ARGS__)
41 typedef struct _vpp_msg_pool
43 DAQ_MsgPoolInfo_t info;
46 typedef struct _vpp_desc_data
54 typedef struct _vpp_bpool
61 typedef struct _vpp_qpair
63 CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
65 daq_vpp_desc_t *descs;
68 volatile uint32_t *enq_head;
69 volatile uint32_t *deq_head;
73 VPPDescData *desc_data;
79 DAQ_VPP_INPUT_MODE_INTERRUPT = 0,
80 DAQ_VPP_INPUT_MODE_POLLING,
81 } daq_vpp_input_mode_t;
83 typedef struct _vpp_context
90 DAQ_ModuleInstance_h modinst;
103 VPPQueuePair *qpairs;
107 struct epoll_event *epoll_events;
112 VPPBufferPool *bpools;
114 daq_vpp_input_mode_t input_mode;
115 const char *socket_name;
116 volatile bool interrupted;
119 static VPP_Context_t *global_vpp_ctx = 0;
122 vpp_daq_qpair_lock (VPPQueuePair *p)
125 while (!__atomic_compare_exchange_n (&p->lock, &free, 1, 0, __ATOMIC_ACQUIRE,
128 while (__atomic_load_n (&p->lock, __ATOMIC_RELAXED))
136 vpp_daq_qpair_unlock (VPPQueuePair *p)
138 __atomic_store_n (&p->lock, 0, __ATOMIC_RELEASE);
142 vpp_daq_module_load (const DAQ_BaseAPI_t *base_api)
144 if (base_api->api_version != DAQ_BASE_API_VERSION ||
145 base_api->api_size != sizeof (DAQ_BaseAPI_t))
148 daq_base_api = *base_api;
154 vpp_daq_module_unload (void)
156 memset (&daq_base_api, 0, sizeof (daq_base_api));
161 vpp_daq_get_variable_descs (const DAQ_VariableDesc_t **var_desc_table)
163 *var_desc_table = vpp_variable_descriptions;
165 return sizeof (vpp_variable_descriptions) / sizeof (DAQ_VariableDesc_t);
169 vpp_daq_recvmsg (int fd, daq_vpp_msg_t *msg, int n_fds, int *fds)
172 CMSG_SPACE (sizeof (int) * n_fds) + CMSG_SPACE (sizeof (struct ucred));
174 struct msghdr mh = {};
176 struct cmsghdr *cmsg;
178 iov[0].iov_base = (void *) msg;
179 iov[0].iov_len = sizeof (daq_vpp_msg_t);
182 mh.msg_control = ctl;
183 mh.msg_controllen = ctl_sz;
185 memset (ctl, 0, ctl_sz);
188 if ((rv = recvmsg (fd, &mh, 0)) != sizeof (daq_vpp_msg_t))
189 return DAQ_ERROR_NODEV;
191 cmsg = CMSG_FIRSTHDR (&mh);
194 if (cmsg->cmsg_level == SOL_SOCKET)
196 if (cmsg->cmsg_type == SCM_CREDENTIALS)
200 else if (cmsg->cmsg_type == SCM_RIGHTS)
202 memcpy (fds, CMSG_DATA (cmsg), n_fds * sizeof (int));
205 cmsg = CMSG_NXTHDR (&mh, cmsg);
212 vpp_daq_destroy (void *handle)
214 VPP_Context_t *vc = (VPP_Context_t *) handle;
216 if (vc->shm_base != MAP_FAILED)
217 munmap (vc->shm_base, vc->shm_size);
219 if (vc->shm_fd != -1)
224 for (int i = 0; i < vc->num_bpools; i++)
226 VPPBufferPool *bp = vc->bpools + i;
229 if (bp->base && bp->base != MAP_FAILED)
230 munmap (bp->base, bp->size);
237 for (int i = 0; i < vc->num_qpairs; i++)
239 VPPQueuePair *qp = vc->qpairs + i;
240 if (qp->enq_fd != -1)
242 if (qp->deq_fd != -1)
245 free (qp->desc_data);
250 free (vc->epoll_events);
252 if (vc->epoll_fd != -1)
253 close (vc->epoll_fd);
257 #define ERR(rv, ...) \
259 SET_ERROR (modinst, __VA_ARGS__); \
265 vpp_daq_instantiate (const DAQ_ModuleConfig_h modcfg,
266 DAQ_ModuleInstance_h modinst, void **ctxt_ptr)
268 VPP_Context_t *vc = 0;
269 int rval = DAQ_ERROR;
271 struct sockaddr_un sun = { .sun_family = AF_UNIX };
272 int i, fd = -1, shm_fd = -1;
278 *ctxt_ptr = global_vpp_ctx;
282 vc = calloc (1, sizeof (VPP_Context_t));
285 ERR (DAQ_ERROR_NOMEM,
286 "%s: Couldn't allocate memory for the new VPP context!", __func__);
288 const char *varKey, *varValue;
289 daq_base_api.config_first_variable (modcfg, &varKey, &varValue);
292 if (!strcmp (varKey, "debug"))
294 else if (!strcmp (varKey, "input_mode"))
296 if (!strcmp (varValue, "interrupt"))
297 vc->input_mode = DAQ_VPP_INPUT_MODE_INTERRUPT;
298 else if (!strcmp (varValue, "polling"))
299 vc->input_mode = DAQ_VPP_INPUT_MODE_POLLING;
301 else if (!strcmp (varKey, "socket_name"))
303 vc->socket_name = varValue;
305 daq_base_api.config_next_variable (modcfg, &varKey, &varValue);
308 input = daq_base_api.config_get_input (modcfg);
310 if (!vc->socket_name)
311 /* try to use default socket path */
312 vc->socket_name = DAQ_VPP_DEFAULT_SOCKET_PATH;
314 if ((fd = socket (AF_UNIX, SOCK_SEQPACKET, 0)) < 0)
315 ERR (DAQ_ERROR_NODEV, "%s: Couldn't create socket!", __func__);
317 strncpy (sun.sun_path, vc->socket_name, sizeof (sun.sun_path) - 1);
319 if (connect (fd, (struct sockaddr *) &sun, sizeof (struct sockaddr_un)) != 0)
320 ERR (DAQ_ERROR_NODEV, "%s: Couldn't connect to socket! '%s'", __func__,
323 /* craft and send connect message */
324 msg.type = DAQ_VPP_MSG_TYPE_HELLO;
325 snprintf ((char *) &msg.hello.inst_name, DAQ_VPP_INST_NAME_LEN - 1, "%s",
328 if (send (fd, &msg, sizeof (msg), 0) != sizeof (msg))
329 ERR (DAQ_ERROR_NODEV, "%s: Couldn't send connect message!", __func__);
331 /* receive config message */
332 rval = vpp_daq_recvmsg (fd, &msg, 1, &shm_fd);
334 if (rval != DAQ_SUCCESS || msg.type != DAQ_VPP_MSG_TYPE_CONFIG ||
336 ERR (DAQ_ERROR_NODEV, "%s: Couldn't receive config message!", __func__);
338 vc->modinst = modinst;
342 vc->num_bpools = msg.config.num_bpools;
343 vc->num_qpairs = msg.config.num_qpairs;
344 vc->shm_size = msg.config.shm_size;
347 vc->bpools = calloc (vc->num_bpools, sizeof (VPPBufferPool));
348 vc->qpairs = calloc (vc->num_qpairs, sizeof (VPPQueuePair));
349 vc->epoll_events = calloc (vc->num_qpairs, sizeof (struct epoll_event));
351 if (vc->bpools == 0 || vc->qpairs == 0)
352 ERR (DAQ_ERROR_NOMEM,
353 "%s: Couldn't allocate memory for the new VPP context!", __func__);
355 for (i = 0; i < vc->num_bpools; i++)
356 vc->bpools[i].fd = -1;
359 mmap (0, vc->shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, vc->shm_fd, 0);
361 if (base == MAP_FAILED)
362 ERR (DAQ_ERROR_NOMEM,
363 "%s: Couldn't map shared memory for the new VPP context!", __func__);
369 printf ("[%s]\n", input);
370 printf (" Shared memory size: %u\n", vc->shm_size);
371 printf (" Number of buffer pools: %u\n", vc->num_bpools);
372 printf (" Number of queue pairs: %u\n", vc->num_qpairs);
375 /* receive buffer pools */
376 for (int i = 0; i < vc->num_bpools; i++)
378 VPPBufferPool *bp = vc->bpools + i;
379 rval = vpp_daq_recvmsg (fd, &msg, 1, &bp->fd);
380 if (rval != DAQ_SUCCESS || msg.type != DAQ_VPP_MSG_TYPE_BPOOL ||
382 ERR (DAQ_ERROR_NODEV,
383 "%s: Failed to receive buffer pool message for the new "
386 bp->size = msg.bpool.size;
387 bp->base = mmap (0, bp->size, PROT_READ, MAP_SHARED, bp->fd, 0);
389 if (bp->base == MAP_FAILED)
390 ERR (DAQ_ERROR_NOMEM,
391 "%s: Couldn't map shared memory for the new VPP context!",
394 printf (" Buffer pool %u size: %u\n", i, bp->size);
397 if ((vc->epoll_fd = epoll_create (1)) == -1)
398 ERR (DAQ_ERROR_NODEV,
399 "%s: Couldn't create epoll fd for the new VPP context!", __func__);
401 /* receive queue pairs */
402 for (int i = 0; i < vc->num_qpairs; i++)
404 struct epoll_event ev = { .events = EPOLLIN };
405 int fds[2] = { -1, -1 };
407 VPPQueuePair *qp = vc->qpairs + i;
408 rval = vpp_daq_recvmsg (fd, &msg, 2, fds);
409 if (rval != DAQ_SUCCESS || msg.type != DAQ_VPP_MSG_TYPE_QPAIR ||
410 fds[0] == -1 || fds[1] == -1)
411 ERR (DAQ_ERROR_NODEV,
412 "%s: Failed to receive queu pair message for the new "
415 qp->queue_size = 1 << msg.qpair.log2_queue_size;
416 qp->descs = (daq_vpp_desc_t *) (base + msg.qpair.desc_table_offset);
417 qp->enq_ring = (uint32_t *) (base + msg.qpair.enq_ring_offset);
418 qp->deq_ring = (uint32_t *) (base + msg.qpair.deq_ring_offset);
419 qp->enq_head = (uint32_t *) (base + msg.qpair.enq_head_offset);
420 qp->deq_head = (uint32_t *) (base + msg.qpair.deq_head_offset);
425 if (epoll_ctl (vc->epoll_fd, EPOLL_CTL_ADD, qp->enq_fd, &ev) == -1)
426 ERR (DAQ_ERROR_NODEV,
427 "%s: Failed to dequeue fd to epoll instance for the new "
431 qsz = qp->queue_size;
433 qp->desc_data = calloc (qsz, sizeof (VPPDescData));
435 ERR (DAQ_ERROR_NOMEM,
436 "%s: Couldn't allocate memory for the new VPP context!",
439 for (int j = 0; j < qsz; j++)
441 VPPDescData *dd = qp->desc_data + j;
442 DAQ_PktHdr_t *pkthdr = &dd->pkthdr;
443 DAQ_Msg_t *msg = &dd->msg;
448 pkthdr->ingress_group = DAQ_PKTHDR_UNKNOWN;
449 pkthdr->egress_group = DAQ_PKTHDR_UNKNOWN;
451 msg->type = DAQ_MSG_TYPE_PACKET;
452 msg->hdr_len = sizeof (DAQ_PktHdr_t);
454 msg->owner = vc->modinst;
460 printf (" Queue pair %u:\n", i);
461 printf (" Size: %u\n", qp->queue_size);
462 printf (" Enqueue fd: %u\n", qp->enq_fd);
463 printf (" Dequeue fd: %u\n", qp->deq_fd);
467 *ctxt_ptr = global_vpp_ctx = vc;
471 vpp_daq_destroy (vc);
478 vpp_daq_start (void *handle)
484 vpp_daq_interrupt (void *handle)
486 VPP_Context_t *vc = (VPP_Context_t *) handle;
488 vc->interrupted = true;
494 vpp_daq_get_stats (void *handle, DAQ_Stats_t *stats)
496 memset (stats, 0, sizeof (DAQ_Stats_t));
501 vpp_daq_reset_stats (void *handle)
506 vpp_daq_get_capabilities (void *handle)
508 uint32_t capabilities = DAQ_CAPA_BLOCK | DAQ_CAPA_UNPRIV_START;
513 vpp_daq_get_datalink_type (void *handle)
518 static inline uint32_t
519 vpp_daq_msg_receive_one (VPP_Context_t *vc, VPPQueuePair *qp,
520 const DAQ_Msg_t *msgs[], unsigned max_recv)
522 uint32_t n_recv, n_left;
523 uint32_t head, next, mask = qp->queue_size - 1;
528 vpp_daq_qpair_lock (qp);
529 next = qp->next_desc;
530 head = __atomic_load_n (qp->enq_head, __ATOMIC_ACQUIRE);
531 n_recv = n_left = head - next;
533 if (n_left > max_recv)
535 n_left = n_recv = max_recv;
540 uint32_t desc_index = qp->enq_ring[next & mask];
541 daq_vpp_desc_t *d = qp->descs + desc_index;
542 VPPDescData *dd = qp->desc_data + desc_index;
543 dd->pkthdr.pktlen = d->length;
544 dd->pkthdr.address_space_id = d->address_space_id;
545 dd->msg.data = vc->bpools[d->buffer_pool].base + d->offset;
552 qp->next_desc = next;
553 vpp_daq_qpair_unlock (qp);
559 vpp_daq_msg_receive (void *handle, const unsigned max_recv,
560 const DAQ_Msg_t *msgs[], DAQ_RecvStatus *rstat)
562 VPP_Context_t *vc = (VPP_Context_t *) handle;
563 uint32_t n_qpairs_left = vc->num_qpairs;
564 uint32_t n, n_events, n_recv = 0;
566 /* If the receive has been interrupted, break out of loop and return. */
569 vc->interrupted = false;
570 *rstat = DAQ_RSTAT_INTERRUPTED;
574 /* first, we visit all qpairs. If we find any work there then we can give
575 * it back immediatelly. To avoid bias towards qpair 0 we remeber what
577 while (n_qpairs_left)
579 VPPQueuePair *qp = vc->qpairs + vc->next_qpair;
581 if ((n = vpp_daq_msg_receive_one (vc, qp, msgs, max_recv - n_recv)))
589 if (vc->next_qpair == vc->num_qpairs)
594 if (vc->input_mode == DAQ_VPP_INPUT_MODE_POLLING)
596 *rstat = DAQ_RSTAT_OK;
602 *rstat = DAQ_RSTAT_OK;
606 n_events = epoll_wait (vc->epoll_fd, vc->epoll_events, vc->num_qpairs, 1000);
610 *rstat = n_events == -1 ? DAQ_RSTAT_ERROR : DAQ_RSTAT_TIMEOUT;
614 for (int i = 0; i < n_events; i++)
617 VPPQueuePair *qp = vc->qpairs + vc->epoll_events[i].data.u32;
619 if ((n = vpp_daq_msg_receive_one (vc, qp, msgs, max_recv - n_recv)))
625 (void) read (qp->enq_fd, &ctr, sizeof (ctr));
628 *rstat = DAQ_RSTAT_OK;
633 vpp_daq_msg_finalize (void *handle, const DAQ_Msg_t *msg, DAQ_Verdict verdict)
635 VPP_Context_t *vc = (VPP_Context_t *) handle;
636 VPPDescData *dd = msg->priv;
637 VPPQueuePair *qp = vc->qpairs + dd->qpair_index;
640 uint64_t counter_increment = 1;
641 int rv, retv = DAQ_SUCCESS;
643 vpp_daq_qpair_lock (qp);
644 mask = qp->queue_size - 1;
645 head = *qp->deq_head;
646 d = qp->descs + dd->index;
647 if (verdict == DAQ_VERDICT_PASS)
648 d->action = DAQ_VPP_ACTION_FORWARD;
650 d->action = DAQ_VPP_ACTION_DROP;
652 qp->deq_ring[head & mask] = dd->index;
654 __atomic_store_n (qp->deq_head, head, __ATOMIC_RELEASE);
656 if (vc->input_mode == DAQ_VPP_INPUT_MODE_INTERRUPT)
658 rv = write (qp->deq_fd, &counter_increment, sizeof (counter_increment));
660 if (rv != sizeof (counter_increment))
664 vpp_daq_qpair_unlock (qp);
669 vpp_daq_get_msg_pool_info (void *handle, DAQ_MsgPoolInfo_t *info)
671 VPP_Context_t *vc = (VPP_Context_t *) handle;
673 vc->pool.info.available = 128;
674 vc->pool.info.size = 256;
676 *info = vc->pool.info;
682 const DAQ_ModuleAPI_t DAQ_MODULE_DATA = {
683 /* .api_version = */ DAQ_MODULE_API_VERSION,
684 /* .api_size = */ sizeof (DAQ_ModuleAPI_t),
685 /* .module_version = */ DAQ_VPP_VERSION,
687 /* .type = */ DAQ_TYPE_INTF_CAPABLE | DAQ_TYPE_INLINE_CAPABLE |
688 DAQ_TYPE_MULTI_INSTANCE,
689 /* .load = */ vpp_daq_module_load,
690 /* .unload = */ vpp_daq_module_unload,
691 /* .get_variable_descs = */ vpp_daq_get_variable_descs,
692 /* .instantiate = */ vpp_daq_instantiate,
693 /* .destroy = */ vpp_daq_destroy,
694 /* .set_filter = */ NULL,
695 /* .start = */ vpp_daq_start,
696 /* .inject = */ NULL,
697 /* .inject_relative = */ NULL,
698 /* .interrupt = */ vpp_daq_interrupt,
701 /* .get_stats = */ vpp_daq_get_stats,
702 /* .reset_stats = */ vpp_daq_reset_stats,
703 /* .get_snaplen = */ NULL,
704 /* .get_capabilities = */ vpp_daq_get_capabilities,
705 /* .get_datalink_type = */ vpp_daq_get_datalink_type,
706 /* .config_load = */ NULL,
707 /* .config_swap = */ NULL,
708 /* .config_free = */ NULL,
709 /* .msg_receive = */ vpp_daq_msg_receive,
710 /* .msg_finalize = */ vpp_daq_msg_finalize,
711 /* .get_msg_pool_info = */ vpp_daq_get_msg_pool_info,