l4p/tcp_ofo: fix handling out-of-order packets
[tldk.git] / lib / libtle_l4p / udp_rxtx.c
index 01d3520..84a13ea 100644 (file)
@@ -57,8 +57,7 @@ get_pkt_type(const struct rte_mbuf *m)
 }
 
 static inline union l4_ports
-pkt_info(const struct tle_dev *dev, struct rte_mbuf *m,
-       union l4_ports *ports, union ipv4_addrs *addr4,
+pkt_info(struct rte_mbuf *m, union l4_ports *ports, union ipv4_addrs *addr4,
        union ipv6_addrs **addr6)
 {
        uint32_t len;
@@ -72,11 +71,9 @@ pkt_info(const struct tle_dev *dev, struct rte_mbuf *m,
                pa4 = rte_pktmbuf_mtod_offset(m, union ipv4_addrs *,
                        len + offsetof(struct ipv4_hdr, src_addr));
                addr4->raw = pa4->raw;
-               m->ol_flags |= dev->rx.ol_flags[TLE_V4];
        } else if (ret.src == TLE_V6) {
                *addr6 = rte_pktmbuf_mtod_offset(m, union ipv6_addrs *,
                        len + offsetof(struct ipv6_hdr, src_addr));
-               m->ol_flags |= dev->rx.ol_flags[TLE_V6];
        }
 
        len += m->l3_len;
@@ -97,7 +94,7 @@ rx_stream(struct tle_udp_stream *s, void *mb[], struct rte_mbuf *rp[],
 {
        uint32_t i, k, r;
 
-       r = rte_ring_enqueue_burst(s->rx.q, mb, num);
+       r = _rte_ring_enqueue_burst(s->rx.q, mb, num);
 
        /* if RX queue was empty invoke user RX notification callback. */
        if (s->rx.cb.func != NULL && r != 0 && rte_ring_count(s->rx.q) == r)
@@ -178,7 +175,7 @@ tle_udp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
        union ipv6_addrs *pa6[num];
 
        for (i = 0; i != num; i++)
-               tp[i] = pkt_info(dev, pkt[i], &port[i], &a4[i], &pa6[i]);
+               tp[i] = pkt_info(pkt[i], &port[i], &a4[i], &pa6[i]);
 
        k = 0;
        for (i = 0; i != num; i = j) {
@@ -223,7 +220,7 @@ stream_drb_release(struct tle_udp_stream *s, struct tle_drb *drb[],
        uint32_t n;
 
        n = rte_ring_count(s->tx.drb.r);
-       rte_ring_enqueue_burst(s->tx.drb.r, (void **)drb, nb_drb);
+       _rte_ring_enqueue_burst(s->tx.drb.r, (void **)drb, nb_drb);
 
        /* If stream is still open, then mark it as avaialble for writing. */
        if (rwl_try_acquire(&s->tx.use) > 0) {
@@ -276,7 +273,7 @@ static inline uint32_t
 recv_pkt_process(struct rte_mbuf *m[], uint32_t num, uint32_t type)
 {
        uint32_t i, k;
-       uint64_t f, flg[num], ofl[num];
+       uint64_t flg[num], ofl[num];
 
        for (i = 0; i != num; i++) {
                flg[i] = m[i]->ol_flags;
@@ -286,18 +283,13 @@ recv_pkt_process(struct rte_mbuf *m[], uint32_t num, uint32_t type)
        k = 0;
        for (i = 0; i != num; i++) {
 
-               f = flg[i] & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
-
                /* drop packets with invalid cksum(s). */
-               if (f != 0 && check_pkt_csum(m[i], m[i]->ol_flags, type,
-                               IPPROTO_UDP) != 0) {
+               if (check_pkt_csum(m[i], flg[i], type, IPPROTO_UDP) != 0) {
                        rte_pktmbuf_free(m[i]);
                        m[i] = NULL;
                        k++;
-               } else {
-                       m[i]->ol_flags ^= f;
+               } else
                        rte_pktmbuf_adj(m[i], _tx_offload_l4_offset(ofl[i]));
-               }
        }
 
        return k;
@@ -310,7 +302,7 @@ tle_udp_stream_recv(struct tle_stream *us, struct rte_mbuf *pkt[], uint16_t num)
        struct tle_udp_stream *s;
 
        s = UDP_STREAM(us);
-       n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
+       n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
        if (n == 0)
                return 0;
 
@@ -451,21 +443,21 @@ static inline void
 stream_drb_free(struct tle_udp_stream *s, struct tle_drb *drbs[],
        uint32_t nb_drb)
 {
-       rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
+       _rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
 }
 
 static inline uint32_t
 stream_drb_alloc(struct tle_udp_stream *s, struct tle_drb *drbs[],
        uint32_t nb_drb)
 {
-       return rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
+       return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
 }
 
 /* enqueue up to num packets to the destination device queue. */
 static inline uint16_t
 queue_pkt_out(struct tle_udp_stream *s, struct tle_dev *dev,
                const void *pkt[], uint16_t nb_pkt,
-               struct tle_drb *drbs[], uint32_t *nb_drb)
+               struct tle_drb *drbs[], uint32_t *nb_drb, uint8_t all_or_nothing)
 {
        uint32_t bsz, i, n, nb, nbc, nbm;
 
@@ -487,8 +479,11 @@ queue_pkt_out(struct tle_udp_stream *s, struct tle_dev *dev,
                return 0;
 
        /* not enough free drbs, reduce number of packets to send. */
-       else if (nb != nbm)
+       else if (nb != nbm) {
+               if (all_or_nothing)
+                       return 0;
                nb_pkt = nb * bsz;
+       }
 
        /* enqueue packets to the destination device. */
        nbc = nb;
@@ -559,8 +554,10 @@ tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[],
        mtu = dst.mtu - dst.l2_len - dst.l3_len;
 
        /* mark stream as not closable. */
-       if (rwl_acquire(&s->tx.use) < 0)
+       if (rwl_acquire(&s->tx.use) < 0) {
+               rte_errno = EAGAIN;
                return 0;
+       }
 
        nb = 0;
        for (i = 0, k = 0; k != num; k = i) {
@@ -587,11 +584,13 @@ tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[],
                if (k != i) {
                        k += queue_pkt_out(s, dst.dev,
                                (const void **)(uintptr_t)&pkt[k], i - k,
-                               drb, &nb);
+                               drb, &nb, 0);
 
                        /* stream TX queue is full. */
-                       if (k != i)
+                       if (k != i) {
+                               rte_errno = EAGAIN;
                                break;
+                       }
                }
 
                /* enqueue packet that need to be fragmented */
@@ -607,10 +606,11 @@ tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[],
                        }
 
                        n = queue_pkt_out(s, dst.dev,
-                               (const void **)(uintptr_t)frag, rc, drb, &nb);
+                               (const void **)(uintptr_t)frag, rc, drb, &nb, 1);
                        if (n == 0) {
                                while (rc-- != 0)
                                        rte_pktmbuf_free(frag[rc]);
+                               rte_errno = EAGAIN;
                                break;
                        }