Change libtle_udp to use dring.
[tldk.git] / lib / libtle_udp / udp_ctl.c
index 36ec8a6..55c4afd 100644 (file)
@@ -29,8 +29,6 @@
 #define        LPORT_START_BLK PORT_BLK(LPORT_START)
 #define        LPORT_END_BLK   PORT_BLK(LPORT_END)
 
-#define        MAX_BURST       0x20
-
 static const struct in6_addr tle_udp6_any = IN6ADDR_ANY_INIT;
 static const struct in6_addr tle_udp6_none = {
        {
@@ -60,12 +58,31 @@ unuse_stream(struct tle_udp_stream *s)
        rte_atomic32_set(&s->tx.use, INT32_MIN);
 }
 
+/* calculate number of drbs per stream. */
+static uint32_t
+calc_stream_drb_num(const struct tle_udp_ctx *ctx, uint32_t obj_num)
+{
+       uint32_t num;
+
+       num = (ctx->prm.max_stream_sbufs + obj_num - 1) / obj_num;
+       num = num + num / 2;
+       num = RTE_MAX(num, RTE_DIM(ctx->dev) + 1);
+       return num;
+}
+
+static uint32_t
+drb_nb_elem(const struct tle_udp_ctx *ctx)
+{
+       return (ctx->prm.send_bulk_size != 0) ?
+               ctx->prm.send_bulk_size : MAX_PKT_BURST;
+}
+
 static int
 init_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s)
 {
-       size_t sz;
-       uint32_t n;
-       struct bcg_store_prm sp;
+       size_t bsz, rsz, sz;
+       uint32_t i, k, n, nb;
+       struct tle_drb *drb;
        char name[RTE_RING_NAMESIZE];
 
        /* init RX part. */
@@ -88,21 +105,45 @@ init_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s)
 
        /* init TX part. */
 
-       sp.socket_id = ctx->prm.socket_id;
-       sp.max_bufs = ctx->prm.max_stream_sbufs;
-       sp.min_cages = RTE_DIM(ctx->dev) + 1;
-       sp.cage_bufs = MAX_BURST;
-       sp.cage_align = RTE_CACHE_LINE_SIZE;
-       sp.user_data = s;
-
-       s->tx.st = bcg_create(&sp);
-       if (s->tx.st == NULL) {
-               UDP_LOG(ERR,
-                       "%s(%p): bcg_create() failed with error code: %d\n",
-                       __func__, s, rte_errno);
+       nb = drb_nb_elem(ctx);
+       k = calc_stream_drb_num(ctx, nb);
+       n = rte_align32pow2(k);
+
+       /* size of the drbs ring */
+       rsz = sizeof(*s->tx.drb.r) + n * sizeof(s->tx.drb.r->ring[0]);
+       rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE);
+
+       /* size of the drb. */
+       bsz = tle_drb_calc_size(nb);
+
+       /* total stream drbs size. */
+       sz = rsz + bsz * k;
+
+       s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
+               ctx->prm.socket_id);
+       if (s->tx.drb.r == NULL) {
+               UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
+                       "failed with error code: %d\n",
+                       __func__, s, sz, ctx->prm.socket_id, rte_errno);
                return ENOMEM;
        }
 
+       snprintf(name, sizeof(name), "%p@%zu", s, sz);
+       rte_ring_init(s->tx.drb.r, name, n, 0);
+
+       for (i = 0; i != k; i++) {
+               drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
+                       rsz + bsz * i);
+               drb->udata = s;
+               drb->size = nb;
+               rte_ring_enqueue(s->tx.drb.r, drb);
+       }
+
+       s->tx.drb.nb_elem = nb;
+       s->tx.drb.nb_max = k;
+
+       /* mark stream as avaialble to use. */
+
        s->ctx = ctx;
        unuse_stream(s);
        STAILQ_INSERT_TAIL(&ctx->streams.free, s, link);
@@ -113,8 +154,8 @@ init_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s)
 static void
 fini_stream(struct tle_udp_stream *s)
 {
-       bcg_destroy(s->tx.st);
        rte_free(s->rx.q);
+       rte_free(s->tx.drb.r);
 }
 
 struct tle_udp_ctx *
@@ -181,15 +222,15 @@ tle_udp_destroy(struct tle_udp_ctx *ctx)
                return;
        }
 
+       for (i = 0; i != RTE_DIM(ctx->dev); i++)
+               tle_udp_del_dev(ctx->dev + i);
+
        if (ctx->streams.buf != 0) {
                for (i = 0; i != ctx->prm.max_streams; i++)
                        fini_stream(&ctx->streams.buf[i]);
                rte_free(ctx->streams.buf);
        }
 
-       for (i = 0; i != RTE_DIM(ctx->dev); i++)
-               tle_udp_del_dev(ctx->dev + i);
-
        rte_free(ctx);
 }
 
@@ -279,8 +320,7 @@ tle_udp_add_dev(struct tle_udp_ctx *ctx,
        }
 
        /* setup TX data. */
-       bcg_queue_reset(&dev->tx.beq);
-       bcg_queue_reset(&dev->tx.feq);
+       tle_dring_reset(&dev->tx.dr);
 
        if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_UDP_CKSUM) != 0) {
                dev->tx.ol_flags[TLE_UDP_V4] |= PKT_TX_IPV4 | PKT_TX_UDP_CKSUM;
@@ -297,25 +337,33 @@ tle_udp_add_dev(struct tle_udp_ctx *ctx,
 }
 
 static void
-empty_cage(struct buf_cage *bc)
+empty_dring(struct tle_dring *dr)
 {
-       uint32_t i, n;
-       struct rte_mbuf *pkt[MAX_BURST];
+       uint32_t i, k, n;
+       struct tle_udp_stream *s;
+       struct rte_mbuf *pkt[MAX_PKT_BURST];
+       struct tle_drb *drb[MAX_PKT_BURST];
 
        do {
-               n = bcg_get(bc, (const void **)(uintptr_t)pkt, RTE_DIM(pkt));
+               k = RTE_DIM(drb);
+               n = tle_dring_sc_dequeue(dr, (const void **)(uintptr_t)pkt,
+                       RTE_DIM(pkt), drb, &k);
+
+               /* free mbufs */
                for (i = 0; i != n; i++)
                        rte_pktmbuf_free(pkt[i]);
+               /* free drbs */
+               for (i = 0; i != k; i++) {
+                       s = drb[i]->udata;
+                       rte_ring_enqueue(s->tx.drb.r, drb[i]);
+               }
        } while (n != 0);
-
-       bcg_free(bc);
 }
 
 int
 tle_udp_del_dev(struct tle_udp_dev *dev)
 {
        uint32_t p;
-       struct buf_cage *bc;
        struct tle_udp_ctx *ctx;
 
        ctx = dev->ctx;
@@ -331,13 +379,7 @@ tle_udp_del_dev(struct tle_udp_dev *dev)
                return -EINVAL;
 
        /* emtpy TX queues. */
-       if (dev->tx.bc != NULL)
-               empty_cage(dev->tx.bc);
-
-       bcg_queue_append(&dev->tx.beq, &dev->tx.feq);
-
-       while ((bc = __bcg_dequeue_head(&dev->tx.beq)) != NULL)
-               empty_cage(bc);
+       empty_dring(&dev->tx.dr);
 
        rte_free(dev->dp[TLE_UDP_V4]);
        rte_free(dev->dp[TLE_UDP_V6]);
@@ -591,7 +633,7 @@ tle_udp_stream_open(struct tle_udp_ctx *ctx,
                return NULL;
 
        /* some TX still pending for that stream. */
-       } else if (bcg_store_full(s->tx.st) == 0) {
+       } else if (UDP_STREAM_TX_PENDING(s)) {
                put_stream(ctx, s, 0);
                rte_errno = EAGAIN;
                return NULL;
@@ -649,7 +691,7 @@ tle_udp_stream_close(struct tle_udp_stream *s)
        uint32_t i, n;
        int32_t rc;
        struct tle_udp_ctx *ctx;
-       struct rte_mbuf *m[MAX_BURST];
+       struct rte_mbuf *m[MAX_PKT_BURST];
 
        static const struct tle_udp_stream_cb zcb;
 
@@ -661,11 +703,6 @@ tle_udp_stream_close(struct tle_udp_stream *s)
        /* mark stream as unavaialbe for RX/TX. */
        stream_down(s);
 
-       /* reset TX cages. */
-       rte_spinlock_lock(&s->tx.lock);
-       memset(s->tx.cg, 0, sizeof(s->tx.cg));
-       rte_spinlock_unlock(&s->tx.lock);
-
        /* reset stream events if any. */
        if (s->rx.ev != NULL) {
                tle_event_idle(s->rx.ev);
@@ -696,7 +733,7 @@ tle_udp_stream_close(struct tle_udp_stream *s)
         * if there still are pkts queued for TX,
         * then put this stream to the tail of free list.
         */
-       put_stream(ctx, s, bcg_store_full(s->tx.st));
+       put_stream(ctx, s, UDP_STREAM_TX_FINISHED(s));
        return rc;
 }