Change libtle_udp to use dring. 10/1910/2
authorKonstantin Ananyev <konstantin.ananyev@intel.com>
Thu, 7 Jul 2016 19:24:24 +0000 (20:24 +0100)
committerKonstantin Ananyev <konstantin.ananyev@intel.com>
Thu, 7 Jul 2016 22:41:30 +0000 (23:41 +0100)
Right now didn't see any noticeable performance boost with these changes.
Though it allowed to get rid of using locks at UDP TX code-path
and simplify the code quite a lot.

Change-Id: If865abd3db9127f510df670d9a8edb168b915770
Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
examples/udpfwd/main.c
lib/libtle_udp/Makefile
lib/libtle_udp/buf_cage.c [deleted file]
lib/libtle_udp/buf_cage.h [deleted file]
lib/libtle_udp/misc.h
lib/libtle_udp/osdep.h
lib/libtle_udp/tle_udp_impl.h
lib/libtle_udp/udp_ctl.c
lib/libtle_udp/udp_impl.h
lib/libtle_udp/udp_rxtx.c

index a907355..20f123e 100644 (file)
@@ -34,6 +34,9 @@
 #define RX_CSUM_OFFLOAD        (DEV_RX_OFFLOAD_IPV4_CKSUM | DEV_RX_OFFLOAD_UDP_CKSUM)
 #define TX_CSUM_OFFLOAD        (DEV_TX_OFFLOAD_IPV4_CKSUM | DEV_TX_OFFLOAD_UDP_CKSUM)
 
+#define        OPT_SHORT_SBULK         'B'
+#define        OPT_LONG_SBULK          "sburst"
+
 #define        OPT_SHORT_PROMISC       'P'
 #define        OPT_LONG_PROMISC        "promisc"
 
@@ -62,6 +65,7 @@ static const struct option long_opt[] = {
        {OPT_LONG_PROMISC, 0, 0, OPT_SHORT_PROMISC},
        {OPT_LONG_RBUFS, 1, 0, OPT_SHORT_RBUFS},
        {OPT_LONG_SBUFS, 1, 0, OPT_SHORT_SBUFS},
+       {OPT_LONG_SBULK, 1, 0, OPT_SHORT_SBULK},
        {OPT_LONG_STREAMS, 1, 0, OPT_SHORT_STREAMS},
        {NULL, 0, 0, 0}
 };
@@ -1704,14 +1708,23 @@ main(int argc, char *argv[])
                        "%s: rte_eal_init failed with error code: %d\n",
                        __func__, rc);
 
+       memset(&ctx_prm, 0, sizeof(ctx_prm));
+
        argc -= rc;
        argv += rc;
 
        optind = 0;
        optarg = NULL;
-       while ((opt = getopt_long(argc, argv, "PR:S:b:f:s:", long_opt,
+       while ((opt = getopt_long(argc, argv, "B:PR:S:b:f:s:", long_opt,
                        &opt_idx)) != EOF) {
-               if (opt == OPT_SHORT_PROMISC) {
+               if (opt == OPT_SHORT_SBULK) {
+                       rc = parse_uint_val(NULL, optarg, &v);
+                       if (rc < 0)
+                               rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
+                                       "for option: \'%c\'\n",
+                                       __func__, optarg, opt);
+                       ctx_prm.send_bulk_size = v;
+               } else if (opt == OPT_SHORT_PROMISC) {
                        becfg.promisc = 1;
                } else if (opt == OPT_SHORT_RBUFS) {
                        rc = parse_uint_val(NULL, optarg, &v);
index 100755c..a834873 100644 (file)
@@ -31,7 +31,6 @@ EXPORT_MAP := tle_udp_version.map
 LIBABIVER := 1
 
 #source files
-SRCS-y += buf_cage.c
 SRCS-y += event.c
 SRCS-y += udp_ctl.c
 SRCS-y += udp_rxtx.c
@@ -40,11 +39,7 @@ SRCS-y += udp_rxtx.c
 SYMLINK-y-include += tle_udp_impl.h
 SYMLINK-y-include += tle_event.h
 
-# this library depends on
-DEPDIRS-y += $(RTE_SDK)/lib/librte_eal
-DEPDIRS-y += $(RTE_SDK)/lib/librte_ether
-DEPDIRS-y += $(RTE_SDK)/lib/librte_mbuf
-DEPDIRS-y += $(RTE_SDK)lib/librte_net
-DEPDIRS-y += $(RTE_SDK)lib/librte_ip_frag
+# this lib dependencies
+DEPDIRS-y += lib/libtle_dring
 
 include $(RTE_SDK)/mk/rte.extlib.mk
diff --git a/lib/libtle_udp/buf_cage.c b/lib/libtle_udp/buf_cage.c
deleted file mode 100644 (file)
index 0ae21b0..0000000
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright (c) 2016  Intel Corporation.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <rte_errno.h>
-#include <rte_malloc.h>
-#include <rte_log.h>
-
-#include "buf_cage.h"
-#include "osdep.h"
-
-struct bcg_store *
-bcg_create(const struct bcg_store_prm *prm)
-{
-       struct buf_cage *bc;
-       struct bcg_store *st;
-       uintptr_t end, p;
-       size_t sz, tsz;
-       uint32_t n;
-
-       if (prm == NULL || (prm->cage_align != 0 &&
-                       rte_is_power_of_2(prm->cage_align) == 0)) {
-               rte_errno = EINVAL;
-               return NULL;
-       }
-
-       /* number of cages required. */
-       n = (prm->max_bufs + prm->cage_bufs - 1) / prm->cage_bufs;
-       n = RTE_MAX(n, prm->min_cages);
-
-       /* size of each cage. */
-       sz = prm->cage_bufs * sizeof(bc->bufs[0]) + sizeof(*bc);
-       sz = RTE_ALIGN_CEIL(sz, prm->cage_align);
-
-       /* total number of bytes required. */
-       tsz = n * sz + RTE_ALIGN_CEIL(sizeof(*st), prm->cage_align);
-
-       st = rte_zmalloc_socket(NULL, tsz, RTE_CACHE_LINE_SIZE, prm->socket_id);
-       if (st == NULL) {
-               UDP_LOG(ERR, "%s: allocation of %zu bytes on "
-                       "socket %d failed\n",
-                       __func__, tsz, prm->socket_id);
-               return NULL;
-       }
-
-       st->prm = prm[0];
-       bcg_queue_reset(&st->free);
-
-       p = (uintptr_t)RTE_PTR_ALIGN_CEIL((st + 1), prm->cage_align);
-       end = p + n * sz;
-
-       for (; p != end; p += sz) {
-               bc = (struct buf_cage *)p;
-               bc->st = st;
-               bc->num = prm->cage_bufs;
-               STAILQ_INSERT_TAIL(&st->free.queue, bc, ql);
-       }
-
-       st->free.num = n;
-       st->nb_cages = n;
-       st->cage_sz = sz;
-       st->total_sz = tsz;
-       return st;
-}
-
-void
-bcg_destroy(struct bcg_store *st)
-{
-       rte_free(st);
-}
diff --git a/lib/libtle_udp/buf_cage.h b/lib/libtle_udp/buf_cage.h
deleted file mode 100644 (file)
index 3b3c429..0000000
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Copyright (c) 2016  Intel Corporation.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef _BUF_CAGE_H_
-#define _BUF_CAGE_H_
-
-#include <rte_common.h>
-#include <rte_atomic.h>
-#include <rte_spinlock.h>
-#include <sys/queue.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-struct bcg_store;
-
-struct buf_cage {
-       struct bcg_store *st;
-       STAILQ_ENTRY(buf_cage) ql;
-       uint32_t num;
-       uint32_t rp;
-       uint32_t wp;
-       const void *bufs[0];
-};
-
-struct bcg_queue {
-       rte_spinlock_t lock;
-       uint32_t num;
-       STAILQ_HEAD(, buf_cage) queue;
-};
-
-struct bcg_store_prm {
-       void *user_data;
-       int32_t socket_id;     /* NUMA socket to allocate memory from. */
-       uint32_t max_bufs;     /* total number of bufs to cage. */
-       uint32_t min_cages;    /* min number of cages per store. */
-       uint32_t cage_bufs;    /* min number of bufs per cage. */
-       uint32_t cage_align;   /* each cage to be aligned (power of 2). */
-};
-
-struct bcg_store {
-       struct bcg_queue free;
-       uint32_t nb_cages;
-       size_t cage_sz;
-       size_t total_sz;
-       struct bcg_store_prm prm;
-} __rte_cache_aligned;
-
-struct bcg_store *bcg_create(const struct bcg_store_prm *prm);
-void bcg_destroy(struct bcg_store *st);
-
-static inline int
-bcg_store_full(const struct bcg_store *st)
-{
-       return st->nb_cages == st->free.num;
-}
-
-static inline void
-bcg_queue_reset(struct bcg_queue *bq)
-{
-       STAILQ_INIT(&bq->queue);
-       bq->num = 0;
-       rte_spinlock_init(&bq->lock);
-}
-
-static inline void
-bcg_reset(struct buf_cage *bc)
-{
-       bc->rp = 0;
-       bc->wp = 0;
-}
-
-static inline void *
-bcg_get_udata(struct buf_cage *bc)
-{
-       return bc->st->prm.user_data;
-}
-
-static inline struct buf_cage *
-__bcg_dequeue_head(struct bcg_queue *bq)
-{
-       struct buf_cage *bc;
-
-       bc = STAILQ_FIRST(&bq->queue);
-       if (bc != NULL) {
-               STAILQ_REMOVE_HEAD(&bq->queue, ql);
-               bq->num--;
-       }
-       return bc;
-}
-
-static inline struct buf_cage *
-bcg_dequeue_head(struct bcg_queue *bq)
-{
-       struct buf_cage *bc;
-
-       if (bq->num == 0)
-               return NULL;
-
-       rte_compiler_barrier();
-
-       rte_spinlock_lock(&bq->lock);
-       bc = __bcg_dequeue_head(bq);
-       rte_spinlock_unlock(&bq->lock);
-       return bc;
-}
-
-static inline uint32_t
-__bcg_enqueue_head(struct bcg_queue *bq, struct buf_cage *bc)
-{
-       STAILQ_INSERT_HEAD(&bq->queue, bc, ql);
-       return ++bq->num;
-}
-
-static inline uint32_t
-bcg_enqueue_head(struct bcg_queue *bq, struct buf_cage *bc)
-{
-       uint32_t n;
-
-       rte_spinlock_lock(&bq->lock);
-       n = __bcg_enqueue_head(bq, bc);
-       rte_spinlock_unlock(&bq->lock);
-       return n;
-}
-
-static inline uint32_t
-__bcg_enqueue_tail(struct bcg_queue *bq, struct buf_cage *bc)
-{
-       STAILQ_INSERT_TAIL(&bq->queue, bc, ql);
-       return ++bq->num;
-}
-
-static inline uint32_t
-bcg_enqueue_tail(struct bcg_queue *bq, struct buf_cage *bc)
-{
-       uint32_t n;
-
-       rte_spinlock_lock(&bq->lock);
-       n = __bcg_enqueue_tail(bq, bc);
-       rte_spinlock_unlock(&bq->lock);
-       return n;
-}
-
-static inline uint32_t
-bcg_queue_append(struct bcg_queue *dst, struct bcg_queue *src)
-{
-       rte_spinlock_lock(&src->lock);
-       STAILQ_CONCAT(&dst->queue, &src->queue);
-       dst->num += src->num;
-       src->num = 0;
-       rte_spinlock_unlock(&src->lock);
-       return dst->num;
-}
-
-static inline uint32_t
-bcg_free_count(const struct buf_cage *bc)
-{
-       return bc->num - bc->wp;
-}
-
-
-static inline uint32_t
-bcg_fill_count(const struct buf_cage *bc)
-{
-       return bc->wp - bc->rp;
-}
-
-/* !!! if going to keep it - try to unroll copying stuff. !!! */
-static inline uint32_t
-bcg_get(struct buf_cage *bc, const void *bufs[], uint32_t num)
-{
-       uint32_t i, n, r;
-
-       r = bc->rp;
-       n = RTE_MIN(num, bc->wp - r);
-       for (i = 0; i != n; i++)
-               bufs[i] = bc->bufs[r + i];
-
-       bc->rp = r + n;
-       return n;
-}
-
-static inline uint32_t
-bcg_put(struct buf_cage *bc, const void *bufs[], uint32_t num)
-{
-       uint32_t i, n, w;
-
-       w = bc->wp;
-       n = RTE_MIN(num, bc->num - w);
-       for (i = 0; i != n; i++)
-               bc->bufs[w + i] = bufs[i];
-
-       bc->wp = w + n;
-       return n;
-}
-
-
-static inline struct buf_cage *
-bcg_alloc(struct bcg_store *st)
-{
-       return bcg_dequeue_head(&st->free);
-}
-
-static inline uint32_t
-bcg_free(struct buf_cage *bc)
-{
-       struct bcg_store *st;
-
-       st = bc->st;
-       bcg_reset(bc);
-       return bcg_enqueue_head(&st->free, bc);
-}
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif /* _BUF_CAGE_H_ */
index 3874647..359f400 100644 (file)
@@ -44,6 +44,21 @@ _mbuf_tx_offload(uint64_t il2, uint64_t il3, uint64_t il4, uint64_t tso,
        return il2 | il3 << 7 | il4 << 16 | tso << 24 | ol3 << 40 | ol2 << 49;
 }
 
+/*
+ * Given the value of mbuf's tx_offload, calculate L4 payload offset.
+ */
+static inline uint32_t
+_tx_offload_l4_offset(uint64_t ofl)
+{
+       uint32_t l2, l3, l4;
+
+       l2 = ofl & 0x7f;
+       l3 = ofl >> 7 & 0x1ff;
+       l4 = ofl >> 16 & UINT8_MAX;
+
+       return l2 + l3 + l4;
+}
+
 /*
  * Routines to calculate L3/L4 checksums in SW.
  * Pretty similar to ones from DPDK librte_net/rte_ip.h,
index 6161242..8e91964 100644 (file)
 extern "C" {
 #endif
 
+/*
+ * internal defines.
+ */
+#define        MAX_PKT_BURST   0x20
+
+#define        MAX_DRB_BURST   4
+
+/*
+ * logging related macros.
+ */
+
 #define UDP_LOG(lvl, fmt, args...)      RTE_LOG(lvl, USER1, fmt, ##args)
 
 /*
index a5d17e1..8e61ea6 100644 (file)
@@ -84,6 +84,7 @@ struct tle_udp_ctx_param {
        uint32_t max_streams;      /**< max number of streams in context. */
        uint32_t max_stream_rbufs; /**< max recv mbufs per stream. */
        uint32_t max_stream_sbufs; /**< max send mbufs per stream. */
+       uint32_t send_bulk_size;   /**< expected # of packets per send call. */
 
        int (*lookup4)(void *opaque, const struct in_addr *addr,
                struct tle_udp_dest *res);
index 36ec8a6..55c4afd 100644 (file)
@@ -29,8 +29,6 @@
 #define        LPORT_START_BLK PORT_BLK(LPORT_START)
 #define        LPORT_END_BLK   PORT_BLK(LPORT_END)
 
-#define        MAX_BURST       0x20
-
 static const struct in6_addr tle_udp6_any = IN6ADDR_ANY_INIT;
 static const struct in6_addr tle_udp6_none = {
        {
@@ -60,12 +58,31 @@ unuse_stream(struct tle_udp_stream *s)
        rte_atomic32_set(&s->tx.use, INT32_MIN);
 }
 
+/* calculate number of drbs per stream. */
+static uint32_t
+calc_stream_drb_num(const struct tle_udp_ctx *ctx, uint32_t obj_num)
+{
+       uint32_t num;
+
+       num = (ctx->prm.max_stream_sbufs + obj_num - 1) / obj_num;
+       num = num + num / 2;
+       num = RTE_MAX(num, RTE_DIM(ctx->dev) + 1);
+       return num;
+}
+
+static uint32_t
+drb_nb_elem(const struct tle_udp_ctx *ctx)
+{
+       return (ctx->prm.send_bulk_size != 0) ?
+               ctx->prm.send_bulk_size : MAX_PKT_BURST;
+}
+
 static int
 init_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s)
 {
-       size_t sz;
-       uint32_t n;
-       struct bcg_store_prm sp;
+       size_t bsz, rsz, sz;
+       uint32_t i, k, n, nb;
+       struct tle_drb *drb;
        char name[RTE_RING_NAMESIZE];
 
        /* init RX part. */
@@ -88,21 +105,45 @@ init_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s)
 
        /* init TX part. */
 
-       sp.socket_id = ctx->prm.socket_id;
-       sp.max_bufs = ctx->prm.max_stream_sbufs;
-       sp.min_cages = RTE_DIM(ctx->dev) + 1;
-       sp.cage_bufs = MAX_BURST;
-       sp.cage_align = RTE_CACHE_LINE_SIZE;
-       sp.user_data = s;
-
-       s->tx.st = bcg_create(&sp);
-       if (s->tx.st == NULL) {
-               UDP_LOG(ERR,
-                       "%s(%p): bcg_create() failed with error code: %d\n",
-                       __func__, s, rte_errno);
+       nb = drb_nb_elem(ctx);
+       k = calc_stream_drb_num(ctx, nb);
+       n = rte_align32pow2(k);
+
+       /* size of the drbs ring */
+       rsz = sizeof(*s->tx.drb.r) + n * sizeof(s->tx.drb.r->ring[0]);
+       rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE);
+
+       /* size of the drb. */
+       bsz = tle_drb_calc_size(nb);
+
+       /* total stream drbs size. */
+       sz = rsz + bsz * k;
+
+       s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
+               ctx->prm.socket_id);
+       if (s->tx.drb.r == NULL) {
+               UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
+                       "failed with error code: %d\n",
+                       __func__, s, sz, ctx->prm.socket_id, rte_errno);
                return ENOMEM;
        }
 
+       snprintf(name, sizeof(name), "%p@%zu", s, sz);
+       rte_ring_init(s->tx.drb.r, name, n, 0);
+
+       for (i = 0; i != k; i++) {
+               drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
+                       rsz + bsz * i);
+               drb->udata = s;
+               drb->size = nb;
+               rte_ring_enqueue(s->tx.drb.r, drb);
+       }
+
+       s->tx.drb.nb_elem = nb;
+       s->tx.drb.nb_max = k;
+
+       /* mark stream as avaialble to use. */
+
        s->ctx = ctx;
        unuse_stream(s);
        STAILQ_INSERT_TAIL(&ctx->streams.free, s, link);
@@ -113,8 +154,8 @@ init_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s)
 static void
 fini_stream(struct tle_udp_stream *s)
 {
-       bcg_destroy(s->tx.st);
        rte_free(s->rx.q);
+       rte_free(s->tx.drb.r);
 }
 
 struct tle_udp_ctx *
@@ -181,15 +222,15 @@ tle_udp_destroy(struct tle_udp_ctx *ctx)
                return;
        }
 
+       for (i = 0; i != RTE_DIM(ctx->dev); i++)
+               tle_udp_del_dev(ctx->dev + i);
+
        if (ctx->streams.buf != 0) {
                for (i = 0; i != ctx->prm.max_streams; i++)
                        fini_stream(&ctx->streams.buf[i]);
                rte_free(ctx->streams.buf);
        }
 
-       for (i = 0; i != RTE_DIM(ctx->dev); i++)
-               tle_udp_del_dev(ctx->dev + i);
-
        rte_free(ctx);
 }
 
@@ -279,8 +320,7 @@ tle_udp_add_dev(struct tle_udp_ctx *ctx,
        }
 
        /* setup TX data. */
-       bcg_queue_reset(&dev->tx.beq);
-       bcg_queue_reset(&dev->tx.feq);
+       tle_dring_reset(&dev->tx.dr);
 
        if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_UDP_CKSUM) != 0) {
                dev->tx.ol_flags[TLE_UDP_V4] |= PKT_TX_IPV4 | PKT_TX_UDP_CKSUM;
@@ -297,25 +337,33 @@ tle_udp_add_dev(struct tle_udp_ctx *ctx,
 }
 
 static void
-empty_cage(struct buf_cage *bc)
+empty_dring(struct tle_dring *dr)
 {
-       uint32_t i, n;
-       struct rte_mbuf *pkt[MAX_BURST];
+       uint32_t i, k, n;
+       struct tle_udp_stream *s;
+       struct rte_mbuf *pkt[MAX_PKT_BURST];
+       struct tle_drb *drb[MAX_PKT_BURST];
 
        do {
-               n = bcg_get(bc, (const void **)(uintptr_t)pkt, RTE_DIM(pkt));
+               k = RTE_DIM(drb);
+               n = tle_dring_sc_dequeue(dr, (const void **)(uintptr_t)pkt,
+                       RTE_DIM(pkt), drb, &k);
+
+               /* free mbufs */
                for (i = 0; i != n; i++)
                        rte_pktmbuf_free(pkt[i]);
+               /* free drbs */
+               for (i = 0; i != k; i++) {
+                       s = drb[i]->udata;
+                       rte_ring_enqueue(s->tx.drb.r, drb[i]);
+               }
        } while (n != 0);
-
-       bcg_free(bc);
 }
 
 int
 tle_udp_del_dev(struct tle_udp_dev *dev)
 {
        uint32_t p;
-       struct buf_cage *bc;
        struct tle_udp_ctx *ctx;
 
        ctx = dev->ctx;
@@ -331,13 +379,7 @@ tle_udp_del_dev(struct tle_udp_dev *dev)
                return -EINVAL;
 
        /* emtpy TX queues. */
-       if (dev->tx.bc != NULL)
-               empty_cage(dev->tx.bc);
-
-       bcg_queue_append(&dev->tx.beq, &dev->tx.feq);
-
-       while ((bc = __bcg_dequeue_head(&dev->tx.beq)) != NULL)
-               empty_cage(bc);
+       empty_dring(&dev->tx.dr);
 
        rte_free(dev->dp[TLE_UDP_V4]);
        rte_free(dev->dp[TLE_UDP_V6]);
@@ -591,7 +633,7 @@ tle_udp_stream_open(struct tle_udp_ctx *ctx,
                return NULL;
 
        /* some TX still pending for that stream. */
-       } else if (bcg_store_full(s->tx.st) == 0) {
+       } else if (UDP_STREAM_TX_PENDING(s)) {
                put_stream(ctx, s, 0);
                rte_errno = EAGAIN;
                return NULL;
@@ -649,7 +691,7 @@ tle_udp_stream_close(struct tle_udp_stream *s)
        uint32_t i, n;
        int32_t rc;
        struct tle_udp_ctx *ctx;
-       struct rte_mbuf *m[MAX_BURST];
+       struct rte_mbuf *m[MAX_PKT_BURST];
 
        static const struct tle_udp_stream_cb zcb;
 
@@ -661,11 +703,6 @@ tle_udp_stream_close(struct tle_udp_stream *s)
        /* mark stream as unavaialbe for RX/TX. */
        stream_down(s);
 
-       /* reset TX cages. */
-       rte_spinlock_lock(&s->tx.lock);
-       memset(s->tx.cg, 0, sizeof(s->tx.cg));
-       rte_spinlock_unlock(&s->tx.lock);
-
        /* reset stream events if any. */
        if (s->rx.ev != NULL) {
                tle_event_idle(s->rx.ev);
@@ -696,7 +733,7 @@ tle_udp_stream_close(struct tle_udp_stream *s)
         * if there still are pkts queued for TX,
         * then put this stream to the tail of free list.
         */
-       put_stream(ctx, s, bcg_store_full(s->tx.st));
+       put_stream(ctx, s, UDP_STREAM_TX_FINISHED(s));
        return rc;
 }
 
index fbdb743..af35197 100644 (file)
 
 #include <rte_spinlock.h>
 #include <rte_vect.h>
+#include <tle_dring.h>
 #include <tle_udp_impl.h>
 #include <tle_event.h>
 
-#include "buf_cage.h"
 #include "port_bitmap.h"
 #include "osdep.h"
 
@@ -104,16 +104,24 @@ struct tle_udp_stream {
 
        struct {
                rte_atomic32_t use;
-               rte_spinlock_t lock;
+               struct {
+                       uint32_t nb_elem;  /* number of obects per drb. */
+                       uint32_t nb_max;   /* number of drbs per stream. */
+                       struct rte_ring *r;
+               } drb;
                struct tle_event *ev;
                struct tle_udp_stream_cb cb;
-               struct bcg_store *st;
-               struct buf_cage *cg[RTE_MAX_ETHPORTS];
        } tx __rte_cache_aligned;
 
        struct tle_udp_stream_param prm;
 } __rte_cache_aligned;
 
+#define UDP_STREAM_TX_PENDING(s)       \
+       ((s)->tx.drb.nb_max != rte_ring_count((s)->tx.drb.r))
+
+#define UDP_STREAM_TX_FINISHED(s)      \
+       ((s)->tx.drb.nb_max == rte_ring_count((s)->tx.drb.r))
+
 struct tle_udp_dport {
        struct udp_pbm use; /* ports in use. */
        struct tle_udp_stream *streams[MAX_PORT_NUM]; /* port to stream. */
@@ -128,11 +136,9 @@ struct tle_udp_dev {
                /* used by FE. */
                uint64_t ol_flags[TLE_UDP_VNUM];
                rte_atomic32_t packet_id[TLE_UDP_VNUM];
-               struct bcg_queue feq;
 
-               /* used by BE only. */
-               struct bcg_queue beq __rte_cache_min_aligned;
-               struct buf_cage *bc;
+               /* used by FE & BE. */
+               struct tle_dring dr;
        } tx;
        struct tle_udp_dev_param prm; /* copy of device paramaters. */
        struct tle_udp_dport *dp[TLE_UDP_VNUM]; /* device udp ports */
@@ -140,7 +146,6 @@ struct tle_udp_dev {
 
 struct tle_udp_ctx {
        struct tle_udp_ctx_param prm;
-
        struct {
                rte_spinlock_t lock;
                uint32_t nb_free; /* number of free streams. */
index a8ab3fb..7136714 100644 (file)
@@ -217,13 +217,13 @@ tle_udp_rx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[],
 }
 
 static inline void
-tx_cage_release(struct buf_cage *bc)
+stream_drb_release(struct tle_udp_stream *s, struct tle_drb * drb[],
+       uint32_t nb_drb)
 {
-       struct tle_udp_stream *s;
        uint32_t n;
 
-       s = bcg_get_udata(bc);
-       n = bcg_free(bc);
+       n = rte_ring_count(s->tx.drb.r);
+       rte_ring_enqueue_burst(s->tx.drb.r, (void **)drb, nb_drb);
 
        /* If stream is still open, then mark it as avaialble for writing. */
        if (rwl_try_acquire(&s->tx.use) > 0) {
@@ -232,7 +232,7 @@ tx_cage_release(struct buf_cage *bc)
                        tle_event_raise(s->tx.ev);
 
                /* if stream send buffer was full invoke TX callback */
-               else if (s->tx.cb.func != NULL && n == 1)
+               else if (s->tx.cb.func != NULL && n == 0)
                        s->tx.cb.func(s->tx.cb.data, s);
 
        }
@@ -240,53 +240,32 @@ tx_cage_release(struct buf_cage *bc)
        rwl_release(&s->tx.use);
 }
 
-static inline void
-tx_cage_update(struct tle_udp_dev *dev, struct buf_cage *bc)
-{
-       struct tle_udp_stream *s;
-       struct tle_udp_ctx *ctx;
-       uint32_t idx;
-
-       ctx = dev->ctx;
-       s = bcg_get_udata(bc);
-       idx = dev - ctx->dev;
-
-       /* mark cage as closed to the stream. */
-       rte_spinlock_lock(&s->tx.lock);
-       if (bc == s->tx.cg[idx])
-               s->tx.cg[idx] = NULL;
-       rte_spinlock_unlock(&s->tx.lock);
-}
-
 uint16_t
 tle_udp_tx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
 {
-       struct buf_cage *bc;
-       uint32_t i, n;
+       uint32_t i, j, k, n;
+       struct tle_drb *drb[num];
+       struct tle_udp_stream *s;
 
-       for (i = 0; i != num; i += n) {
+       /* extract packets from device TX queue. */
 
-               bc = dev->tx.bc;
-               if (bc == NULL) {
-                       if (dev->tx.beq.num == 0)
-                               bcg_queue_append(&dev->tx.beq, &dev->tx.feq);
-                       bc = __bcg_dequeue_head(&dev->tx.beq);
-                       if (bc == NULL)
-                               break;
-                       tx_cage_update(dev, bc);
-                       dev->tx.bc = bc;
-               }
+       k = num;
+       n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
+               num, drb, &k);
 
-               n = bcg_get(bc, (const void **)(uintptr_t)&pkt[i], num - i);
+       if (n == 0)
+               return 0;
 
-               /* cage is empty, need to free it and notify related stream. */
-               if (bcg_fill_count(bc) == 0) {
-                       tx_cage_release(bc);
-                       dev->tx.bc = NULL;
-               }
+       /* free empty drbs and notify related streams. */
+
+       for (i = 0; i != k; i = j) {
+               s = drb[i]->udata;
+               for (j = i + 1; j != k && s == drb[i]->udata; j++)
+                       ;
+               stream_drb_release(s, drb + i, j - i);
        }
 
-       return i;
+       return n;
 }
 
 static int
@@ -359,30 +338,41 @@ compress_pkt_list(struct rte_mbuf *pkt[], uint32_t nb_pkt, uint32_t nb_zero)
  * helper function, do the necessary pre-processing for the received packets
  * before handiing them to the strem_recv caller.
  */
-static inline struct rte_mbuf *
-recv_pkt_process(struct rte_mbuf *m, uint32_t type)
+static inline uint32_t
+recv_pkt_process(struct rte_mbuf *m[], uint32_t num, uint32_t type)
 {
-       uint64_t f;
-
-       f = m->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
-       if (f != 0) {
-               if (check_pkt_csum(m, type) == 0)
-                       m->ol_flags ^= f;
-               else {
-                       rte_pktmbuf_free(m);
-                       return NULL;
+       uint32_t i, k;
+       uint64_t f, flg[num], ofl[num];
+
+       for (i = 0; i != num; i++) {
+               flg[i] = m[i]->ol_flags;
+               ofl[i] = m[i]->tx_offload;
+       }
+
+       k = 0;
+       for (i = 0; i != num; i++) {
+
+               f = flg[i] & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
+
+               /* drop packets with invalid cksum(s). */
+               if (f != 0 && check_pkt_csum(m[i], type) != 0) {
+                       rte_pktmbuf_free(m[i]);
+                       m[i] = NULL;
+                       k++;
                }
+
+               m[i]->ol_flags ^= f;
+               rte_pktmbuf_adj(m[i], _tx_offload_l4_offset(ofl[i]));
        }
 
-       rte_pktmbuf_adj(m, m->l2_len + m->l3_len + m->l4_len);
-       return m;
+       return k;
 }
 
 uint16_t
 tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
        uint16_t num)
 {
-       uint32_t i, k, n;
+       uint32_t k, n;
 
        n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
        if (n == 0)
@@ -398,28 +388,7 @@ tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
                rwl_release(&s->rx.use);
        }
 
-       k = 0;
-       for (i = 0; i != RTE_ALIGN_FLOOR(n, 4); i += 4) {
-               pkt[i] = recv_pkt_process(pkt[i], s->type);
-               pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type);
-               pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type);
-               pkt[i + 3] = recv_pkt_process(pkt[i + 3], s->type);
-               k += (pkt[i] == NULL) + (pkt[i + 1] == NULL) +
-                       (pkt[i + 2] == NULL) + (pkt[i + 3] == NULL);
-       }
-
-       switch (n % 4) {
-       case 3:
-               pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type);
-               k += (pkt[i + 2] == NULL);
-       case 2:
-               pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type);
-               k += (pkt[i + 1] == NULL);
-       case 1:
-               pkt[i] = recv_pkt_process(pkt[i], s->type);
-               k += (pkt[i] == NULL);
-       }
-
+       k = recv_pkt_process(pkt, n, s->type);
        return compress_pkt_list(pkt, n, k);
 }
 
@@ -586,65 +555,59 @@ fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num,
        return frag_num;
 }
 
+static inline void
+stream_drb_free(struct tle_udp_stream *s, struct tle_drb *drbs[],
+       uint32_t nb_drb)
+{
+       rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
+}
+
+static inline uint32_t
+stream_drb_alloc(struct tle_udp_stream *s, struct tle_drb *drbs[],
+       uint32_t nb_drb)
+{
+       return rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
+}
+
 /* enqueue up to num packets to the destination device queue. */
 static inline uint16_t
-queue_pkt_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di,
-       const void *pkt[], uint16_t num)
+queue_pkt_out(struct tle_udp_stream *s, struct tle_udp_dev *dev,
+               const void *pkt[], uint16_t nb_pkt,
+               struct tle_drb *drbs[], uint32_t *nb_drb)
 {
-       struct buf_cage *bc;
-       uint32_t i, n;
+       uint32_t bsz, i, n, nb, nbc, nbm;
 
-       rte_spinlock_lock(&s->tx.lock);
-       bc = s->tx.cg[di];
+       bsz = s->tx.drb.nb_elem;
 
-       for (i = 0; i != num; i += n) {
-               if (bc == NULL) {
-                       bc = bcg_alloc(s->tx.st);
-                       if (bc == NULL)
-                               break;
-                       n = bcg_put(bc, pkt + i, num - i);
-                       bcg_enqueue_tail(bq, bc);
-               } else
-                       n = bcg_put(bc, pkt + i, num - i);
+       /* calulate how many drbs are needed.*/
+       nbc = *nb_drb;
+       nbm = (nb_pkt + bsz - 1) / bsz;
+       nb = RTE_MAX(nbm, nbc) - nbc;
 
-               if (n != num - i)
-                       bc = NULL;
-       }
+       /* allocate required drbs */
+       if (nb != 0)
+               nb = stream_drb_alloc(s, drbs + nbc, nb);
 
-       s->tx.cg[di] = bc;
-       rte_spinlock_unlock(&s->tx.lock);
-       return i;
-}
+       nb += nbc;
 
-/*
- * etiher enqueue all num packets or none.
- * assumes that all number of input packets not exceed size of buf_cage.
- */
-static inline uint16_t
-queue_frg_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di,
-       const void *pkt[], uint16_t num)
-{
-       struct buf_cage *bc, *bcp;
-       uint32_t n;
+       /* no free drbs, can't send anything */
+       if (nb == 0)
+               return 0;
 
-       rte_spinlock_lock(&s->tx.lock);
-       bc = s->tx.cg[di];
+       /* not enough free drbs, reduce number of packets to send. */
+       else if (nb != nbm)
+               nb_pkt = nb * bsz;
 
-       n = 0;
-       if (bc == NULL || bcg_free_count(bc) < num) {
-               bcp = bc;
-               bc = bcg_alloc(s->tx.st);
-               if (bc != NULL) {
-                       if (bcp != NULL)
-                               n = bcg_put(bcp, pkt, num);
-                       n += bcg_put(bc, pkt, num - n);
-                       bcg_enqueue_tail(bq, bc);
-               }
-       } else
-               n = bcg_put(bc, pkt, num);
+       /* enqueue packets to the destination device. */
+       nbc = nb;
+       n = tle_dring_mp_enqueue(&dev->tx.dr, pkt, nb_pkt, drbs, &nb);
 
-       s->tx.cg[di] = bc;
-       rte_spinlock_unlock(&s->tx.lock);
+       /* if not all available drbs were consumed, move them to the start. */
+       nbc -= nb;
+       for (i = 0; i != nb; i++)
+               drbs[i] = drbs[nbc + i];
+
+       *nb_drb = nb;
        return n;
 }
 
@@ -654,13 +617,14 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
 {
        int32_t di, frg, rc;
        uint64_t ol_flags;
-       uint32_t i, k, n;
+       uint32_t i, k, n, nb;
        uint32_t mtu, pid, type;
        const struct sockaddr_in *d4;
        const struct sockaddr_in6 *d6;
        const void *da;
        union udph udph;
        struct tle_udp_dest dst;
+       struct tle_drb *drb[num];
 
        type = s->type;
 
@@ -704,6 +668,7 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
        if (rwl_acquire(&s->tx.use) < 0)
                return 0;
 
+       nb = 0;
        for (i = 0, k = 0; k != num; k = i) {
 
                /* copy L2/L3/L4 headers into mbufs, setup mbufs metadata. */
@@ -726,8 +691,9 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
 
                /* enqueue non-fragment packets to the destination device. */
                if (k != i) {
-                       k += queue_pkt_out(s, &dst.dev->tx.feq, di,
-                               (const void **)(uintptr_t)&pkt[k], i - k);
+                       k += queue_pkt_out(s, dst.dev,
+                               (const void **)(uintptr_t)&pkt[k], i - k,
+                               drb, &nb);
 
                        /* stream TX queue is full. */
                        if (k != i)
@@ -746,8 +712,8 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
                                break;
                        }
 
-                       n = queue_frg_out(s, &dst.dev->tx.feq, di,
-                               (const void **)(uintptr_t)frag, rc);
+                       n = queue_pkt_out(s, dst.dev,
+                               (const void **)(uintptr_t)frag, rc, drb, &nb);
                        if (n == 0) {
                                while (rc-- != 0)
                                        rte_pktmbuf_free(frag[rc]);
@@ -765,6 +731,11 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
                tle_event_raise(s->tx.ev);
 
 out:
+       /* free unused drbs. */
+       if (nb != 0)
+               stream_drb_free(s, drb, nb);
+
+       /* stream can be closed. */
        rwl_release(&s->tx.use);
 
        /*