Change libtle_udp to use dring.
[tldk.git] / lib / libtle_udp / udp_rxtx.c
index a8ab3fb..7136714 100644 (file)
@@ -217,13 +217,13 @@ tle_udp_rx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[],
 }
 
 static inline void
-tx_cage_release(struct buf_cage *bc)
+stream_drb_release(struct tle_udp_stream *s, struct tle_drb * drb[],
+       uint32_t nb_drb)
 {
-       struct tle_udp_stream *s;
        uint32_t n;
 
-       s = bcg_get_udata(bc);
-       n = bcg_free(bc);
+       n = rte_ring_count(s->tx.drb.r);
+       rte_ring_enqueue_burst(s->tx.drb.r, (void **)drb, nb_drb);
 
        /* If stream is still open, then mark it as avaialble for writing. */
        if (rwl_try_acquire(&s->tx.use) > 0) {
@@ -232,7 +232,7 @@ tx_cage_release(struct buf_cage *bc)
                        tle_event_raise(s->tx.ev);
 
                /* if stream send buffer was full invoke TX callback */
-               else if (s->tx.cb.func != NULL && n == 1)
+               else if (s->tx.cb.func != NULL && n == 0)
                        s->tx.cb.func(s->tx.cb.data, s);
 
        }
@@ -240,53 +240,32 @@ tx_cage_release(struct buf_cage *bc)
        rwl_release(&s->tx.use);
 }
 
-static inline void
-tx_cage_update(struct tle_udp_dev *dev, struct buf_cage *bc)
-{
-       struct tle_udp_stream *s;
-       struct tle_udp_ctx *ctx;
-       uint32_t idx;
-
-       ctx = dev->ctx;
-       s = bcg_get_udata(bc);
-       idx = dev - ctx->dev;
-
-       /* mark cage as closed to the stream. */
-       rte_spinlock_lock(&s->tx.lock);
-       if (bc == s->tx.cg[idx])
-               s->tx.cg[idx] = NULL;
-       rte_spinlock_unlock(&s->tx.lock);
-}
-
 uint16_t
 tle_udp_tx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
 {
-       struct buf_cage *bc;
-       uint32_t i, n;
+       uint32_t i, j, k, n;
+       struct tle_drb *drb[num];
+       struct tle_udp_stream *s;
 
-       for (i = 0; i != num; i += n) {
+       /* extract packets from device TX queue. */
 
-               bc = dev->tx.bc;
-               if (bc == NULL) {
-                       if (dev->tx.beq.num == 0)
-                               bcg_queue_append(&dev->tx.beq, &dev->tx.feq);
-                       bc = __bcg_dequeue_head(&dev->tx.beq);
-                       if (bc == NULL)
-                               break;
-                       tx_cage_update(dev, bc);
-                       dev->tx.bc = bc;
-               }
+       k = num;
+       n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
+               num, drb, &k);
 
-               n = bcg_get(bc, (const void **)(uintptr_t)&pkt[i], num - i);
+       if (n == 0)
+               return 0;
 
-               /* cage is empty, need to free it and notify related stream. */
-               if (bcg_fill_count(bc) == 0) {
-                       tx_cage_release(bc);
-                       dev->tx.bc = NULL;
-               }
+       /* free empty drbs and notify related streams. */
+
+       for (i = 0; i != k; i = j) {
+               s = drb[i]->udata;
+               for (j = i + 1; j != k && s == drb[i]->udata; j++)
+                       ;
+               stream_drb_release(s, drb + i, j - i);
        }
 
-       return i;
+       return n;
 }
 
 static int
@@ -359,30 +338,41 @@ compress_pkt_list(struct rte_mbuf *pkt[], uint32_t nb_pkt, uint32_t nb_zero)
  * helper function, do the necessary pre-processing for the received packets
  * before handiing them to the strem_recv caller.
  */
-static inline struct rte_mbuf *
-recv_pkt_process(struct rte_mbuf *m, uint32_t type)
+static inline uint32_t
+recv_pkt_process(struct rte_mbuf *m[], uint32_t num, uint32_t type)
 {
-       uint64_t f;
-
-       f = m->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
-       if (f != 0) {
-               if (check_pkt_csum(m, type) == 0)
-                       m->ol_flags ^= f;
-               else {
-                       rte_pktmbuf_free(m);
-                       return NULL;
+       uint32_t i, k;
+       uint64_t f, flg[num], ofl[num];
+
+       for (i = 0; i != num; i++) {
+               flg[i] = m[i]->ol_flags;
+               ofl[i] = m[i]->tx_offload;
+       }
+
+       k = 0;
+       for (i = 0; i != num; i++) {
+
+               f = flg[i] & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
+
+               /* drop packets with invalid cksum(s). */
+               if (f != 0 && check_pkt_csum(m[i], type) != 0) {
+                       rte_pktmbuf_free(m[i]);
+                       m[i] = NULL;
+                       k++;
                }
+
+               m[i]->ol_flags ^= f;
+               rte_pktmbuf_adj(m[i], _tx_offload_l4_offset(ofl[i]));
        }
 
-       rte_pktmbuf_adj(m, m->l2_len + m->l3_len + m->l4_len);
-       return m;
+       return k;
 }
 
 uint16_t
 tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
        uint16_t num)
 {
-       uint32_t i, k, n;
+       uint32_t k, n;
 
        n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
        if (n == 0)
@@ -398,28 +388,7 @@ tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
                rwl_release(&s->rx.use);
        }
 
-       k = 0;
-       for (i = 0; i != RTE_ALIGN_FLOOR(n, 4); i += 4) {
-               pkt[i] = recv_pkt_process(pkt[i], s->type);
-               pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type);
-               pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type);
-               pkt[i + 3] = recv_pkt_process(pkt[i + 3], s->type);
-               k += (pkt[i] == NULL) + (pkt[i + 1] == NULL) +
-                       (pkt[i + 2] == NULL) + (pkt[i + 3] == NULL);
-       }
-
-       switch (n % 4) {
-       case 3:
-               pkt[i + 2] = recv_pkt_process(pkt[i + 2], s->type);
-               k += (pkt[i + 2] == NULL);
-       case 2:
-               pkt[i + 1] = recv_pkt_process(pkt[i + 1], s->type);
-               k += (pkt[i + 1] == NULL);
-       case 1:
-               pkt[i] = recv_pkt_process(pkt[i], s->type);
-               k += (pkt[i] == NULL);
-       }
-
+       k = recv_pkt_process(pkt, n, s->type);
        return compress_pkt_list(pkt, n, k);
 }
 
@@ -586,65 +555,59 @@ fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num,
        return frag_num;
 }
 
+static inline void
+stream_drb_free(struct tle_udp_stream *s, struct tle_drb *drbs[],
+       uint32_t nb_drb)
+{
+       rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
+}
+
+static inline uint32_t
+stream_drb_alloc(struct tle_udp_stream *s, struct tle_drb *drbs[],
+       uint32_t nb_drb)
+{
+       return rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
+}
+
 /* enqueue up to num packets to the destination device queue. */
 static inline uint16_t
-queue_pkt_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di,
-       const void *pkt[], uint16_t num)
+queue_pkt_out(struct tle_udp_stream *s, struct tle_udp_dev *dev,
+               const void *pkt[], uint16_t nb_pkt,
+               struct tle_drb *drbs[], uint32_t *nb_drb)
 {
-       struct buf_cage *bc;
-       uint32_t i, n;
+       uint32_t bsz, i, n, nb, nbc, nbm;
 
-       rte_spinlock_lock(&s->tx.lock);
-       bc = s->tx.cg[di];
+       bsz = s->tx.drb.nb_elem;
 
-       for (i = 0; i != num; i += n) {
-               if (bc == NULL) {
-                       bc = bcg_alloc(s->tx.st);
-                       if (bc == NULL)
-                               break;
-                       n = bcg_put(bc, pkt + i, num - i);
-                       bcg_enqueue_tail(bq, bc);
-               } else
-                       n = bcg_put(bc, pkt + i, num - i);
+       /* calulate how many drbs are needed.*/
+       nbc = *nb_drb;
+       nbm = (nb_pkt + bsz - 1) / bsz;
+       nb = RTE_MAX(nbm, nbc) - nbc;
 
-               if (n != num - i)
-                       bc = NULL;
-       }
+       /* allocate required drbs */
+       if (nb != 0)
+               nb = stream_drb_alloc(s, drbs + nbc, nb);
 
-       s->tx.cg[di] = bc;
-       rte_spinlock_unlock(&s->tx.lock);
-       return i;
-}
+       nb += nbc;
 
-/*
- * etiher enqueue all num packets or none.
- * assumes that all number of input packets not exceed size of buf_cage.
- */
-static inline uint16_t
-queue_frg_out(struct tle_udp_stream *s, struct bcg_queue *bq, uint32_t di,
-       const void *pkt[], uint16_t num)
-{
-       struct buf_cage *bc, *bcp;
-       uint32_t n;
+       /* no free drbs, can't send anything */
+       if (nb == 0)
+               return 0;
 
-       rte_spinlock_lock(&s->tx.lock);
-       bc = s->tx.cg[di];
+       /* not enough free drbs, reduce number of packets to send. */
+       else if (nb != nbm)
+               nb_pkt = nb * bsz;
 
-       n = 0;
-       if (bc == NULL || bcg_free_count(bc) < num) {
-               bcp = bc;
-               bc = bcg_alloc(s->tx.st);
-               if (bc != NULL) {
-                       if (bcp != NULL)
-                               n = bcg_put(bcp, pkt, num);
-                       n += bcg_put(bc, pkt, num - n);
-                       bcg_enqueue_tail(bq, bc);
-               }
-       } else
-               n = bcg_put(bc, pkt, num);
+       /* enqueue packets to the destination device. */
+       nbc = nb;
+       n = tle_dring_mp_enqueue(&dev->tx.dr, pkt, nb_pkt, drbs, &nb);
 
-       s->tx.cg[di] = bc;
-       rte_spinlock_unlock(&s->tx.lock);
+       /* if not all available drbs were consumed, move them to the start. */
+       nbc -= nb;
+       for (i = 0; i != nb; i++)
+               drbs[i] = drbs[nbc + i];
+
+       *nb_drb = nb;
        return n;
 }
 
@@ -654,13 +617,14 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
 {
        int32_t di, frg, rc;
        uint64_t ol_flags;
-       uint32_t i, k, n;
+       uint32_t i, k, n, nb;
        uint32_t mtu, pid, type;
        const struct sockaddr_in *d4;
        const struct sockaddr_in6 *d6;
        const void *da;
        union udph udph;
        struct tle_udp_dest dst;
+       struct tle_drb *drb[num];
 
        type = s->type;
 
@@ -704,6 +668,7 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
        if (rwl_acquire(&s->tx.use) < 0)
                return 0;
 
+       nb = 0;
        for (i = 0, k = 0; k != num; k = i) {
 
                /* copy L2/L3/L4 headers into mbufs, setup mbufs metadata. */
@@ -726,8 +691,9 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
 
                /* enqueue non-fragment packets to the destination device. */
                if (k != i) {
-                       k += queue_pkt_out(s, &dst.dev->tx.feq, di,
-                               (const void **)(uintptr_t)&pkt[k], i - k);
+                       k += queue_pkt_out(s, dst.dev,
+                               (const void **)(uintptr_t)&pkt[k], i - k,
+                               drb, &nb);
 
                        /* stream TX queue is full. */
                        if (k != i)
@@ -746,8 +712,8 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
                                break;
                        }
 
-                       n = queue_frg_out(s, &dst.dev->tx.feq, di,
-                               (const void **)(uintptr_t)frag, rc);
+                       n = queue_pkt_out(s, dst.dev,
+                               (const void **)(uintptr_t)frag, rc, drb, &nb);
                        if (n == 0) {
                                while (rc-- != 0)
                                        rte_pktmbuf_free(frag[rc]);
@@ -765,6 +731,11 @@ tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
                tle_event_raise(s->tx.ev);
 
 out:
+       /* free unused drbs. */
+       if (nb != 0)
+               stream_drb_free(s, drb, nb);
+
+       /* stream can be closed. */
        rwl_release(&s->tx.use);
 
        /*