Add l4fwd RXTX mode 25/6925/6
authorRemy Horton <remy.horton@intel.com>
Tue, 30 May 2017 09:53:25 +0000 (10:53 +0100)
committerRemy Horton <remy.horton@intel.com>
Fri, 30 Jun 2017 10:09:47 +0000 (11:09 +0100)
This mode allows for transactions where the request and response
are of different payload sizes

Change-Id: I0744159f0618c9241e576a4af1c02765bbf1dd9f
Signed-off-by: Remy Horton <remy.horton@intel.com>
examples/l4fwd/README
examples/l4fwd/common.h
examples/l4fwd/netbe.h
examples/l4fwd/parse.c
examples/l4fwd/tcp.h
examples/l4fwd/udp.h
lib/libtle_l4p/tcp_stream.c
lib/libtle_l4p/tle_tcp.h

index a232537..a7ae56a 100644 (file)
 
    FE configuration record format:
 
-   lcore=<uint>,op=<"rx|tx|echo|fwd">,\
+   lcore=<uint>,op=<"rx|tx|echo|fwd|rxtx">,\
    laddr=<ip>,lport=<uint16>,raddr=<ip>,rport=<uint16>,\
    [txlen=<uint>,fwladdr=<ip>,fwlport=<uint16>,fwraddr=<ip>,fwrport=<uint16>,\
-   belcore=<uint>]
+   belcore=<uint>,rxlen=<uint>]
 
    lcore -   EAL lcore to manage that stream(s) in the FE. It is an mandatory
              option.
              "echo" - mimic recvfrom(..., &addr);sendto(..., &addr);
              on that stream.
              "fwd" - forward packets between streams.
+             "rxtx" - Receive/reply transactions on stream.
              It is an mandatory option.
    laddr -   local address for the stream to open. It is an mandatory option.
    lport -   local port for the stream to open. It is an mandatory option.
    raddr -   remote address for the stream to open. It is an mandatory option.
    rport -   remote port for the stream to open. It is an mandatory option.
-   txlen -   data length sending in each packet (mandatory for "tx" mode only).
+   txlen -   data length sending in each packet.
+             (mandatory for "tx" & "rxtx" modes only).
+   rxlen -   Expected response length (mandatory for "rxtx" mode only).
    fwladdr - local address for the forwarding stream(s) to open
              (mandatory for "fwd" mode only).
    fwlport - local port for the forwarding stream(s) to open
index 8d757f3..ae4f266 100644 (file)
@@ -620,9 +620,78 @@ netbe_lcore(void)
 }
 
 static inline int
-netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
+netfe_rxtx_get_mss(struct netfe_stream *fes)
+{
+       switch (fes->proto) {
+       case TLE_PROTO_TCP:
+               return tle_tcp_stream_get_mss(fes->s);
+
+       case TLE_PROTO_UDP:
+               /* The UDP code doesn't have MSS discovery, so have to
+                * assume arbitary MTU. Going to use default mbuf
+                * data space as TLDK uses this internally as a
+                * maximum segment size.
+                */
+               return RTE_MBUF_DEFAULT_DATAROOM - TLE_DST_MAX_HDR;
+       default:
+               NETFE_TRACE("%s(%u): Unhandled MSS query (family=%i)\n",
+                       __func__, lcore, fes->proto, fes->family);
+               return -EINVAL;
+       }
+}
+
+static inline int
+netfe_rxtx_dispatch_reply(uint32_t lcore, struct netfe_stream *fes)
+
+{
+       struct pkt_buf *pb;
+       int32_t sid;
+       int32_t cnt_mtu_pkts;
+       int32_t cnt_all_pkts;
+       int32_t idx_pkt;
+       int32_t len_tail;
+       int32_t mtu;
+
+       pb = &fes->pbuf;
+       sid = rte_lcore_to_socket_id(lcore) + 1;
+       mtu = netfe_rxtx_get_mss(fes);
+
+       cnt_mtu_pkts = (fes->txlen / mtu);
+       cnt_all_pkts = cnt_mtu_pkts;
+       len_tail = fes->txlen - (mtu * cnt_mtu_pkts);
+
+       if (len_tail > 0)
+               cnt_all_pkts++;
+
+       if (pb->num + cnt_all_pkts >= RTE_DIM(pb->pkt)) {
+               NETFE_TRACE("%s(%u): Insufficent space for outbound burst\n",
+                       __func__, lcore);
+               return -ENOMEM;
+       }
+       if (rte_pktmbuf_alloc_bulk(mpool[sid], &pb->pkt[pb->num], cnt_all_pkts)
+                       != 0) {
+               NETFE_TRACE("%s(%u): rte_pktmbuf_alloc_bulk() failed\n",
+                       __func__, lcore);
+               return -ENOMEM;
+       }
+
+       /* Full MTU packets */
+       for (idx_pkt = 0; idx_pkt < cnt_mtu_pkts; idx_pkt++) {
+               rte_pktmbuf_append(pb->pkt[pb->num++], mtu);
+       }
+
+       /* Last non-MTU packet, if any */
+       if (len_tail > 0)
+               rte_pktmbuf_append(pb->pkt[pb->num++], len_tail);
+
+       return 0;
+}
+
+static inline int
+netfe_rx_process(uint32_t lcore, struct netfe_stream *fes)
 {
        uint32_t k, n;
+       uint64_t count_bytes;
 
        n = fes->pbuf.num;
        k = RTE_DIM(fes->pbuf.pkt) - n;
@@ -647,9 +716,32 @@ netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
        /* free all received mbufs. */
        if (fes->op == RXONLY)
                fes->stat.rxb += pkt_buf_empty(&fes->pbuf);
+       else if (fes->op == RXTX) {
+               /* RXTX mode. Count incoming bytes then discard.
+                * If receive threshold (rxlen) exceeded, send out a packet.
+                */
+               count_bytes = pkt_buf_empty(&fes->pbuf);
+               fes->stat.rxb += count_bytes;
+               fes->rx_run_len += count_bytes;
+               if (fes->rx_run_len >= fes->rxlen) {
+                       /* Idle Rx as buffer needed for Tx */
+                       tle_event_idle(fes->rxev);
+                       fes->stat.rxev[TLE_SEV_IDLE]++;
+
+                       /* Discard surplus bytes. For now pipelining of
+                        * requests is not supported.
+                        */
+                       fes->rx_run_len = 0;
+                       netfe_rxtx_dispatch_reply(lcore, fes);
+
+                       /* Kick off a Tx event */
+                       tle_event_active(fes->txev, TLE_SEV_UP);
+                       fes->stat.txev[TLE_SEV_UP]++;
+               }
+       }
        /* mark stream as writable */
        else if (k == RTE_DIM(fes->pbuf.pkt)) {
-               if (fes->op == RXTX) {
+               if (fes->op == ECHO) {
                        tle_event_active(fes->txev, TLE_SEV_UP);
                        fes->stat.txev[TLE_SEV_UP]++;
                } else if (fes->op == FWD) {
index 80d1c28..134ce3d 100644 (file)
@@ -44,6 +44,8 @@
 #include <tle_udp.h>
 #include <tle_event.h>
 
+#define TLE_DEFAULT_MSS 536
+
 #define        MAX_PKT_BURST   0x20
 
 /* Used to allocate the memory for hash key. */
@@ -161,6 +163,7 @@ enum {
        RXONLY,
        TXONLY,
        RXTX,
+       ECHO,
        FWD,
 };
 
@@ -175,7 +178,8 @@ struct netfe_stream_prm {
        uint32_t belcore;
        uint16_t line;
        uint16_t op;
-       uint16_t txlen; /* valid/used only for TXONLY op. */
+       uint32_t txlen; /* valid/used only for TXONLY op. */
+       uint32_t rxlen; /* Used by RXTX */
        struct netfe_sprm sprm;
        struct netfe_sprm fprm;  /* valid/used only for FWD op. */
 };
@@ -194,7 +198,10 @@ struct netfe_stream {
        uint16_t op;
        uint16_t proto;
        uint16_t family;
-       uint16_t txlen;
+       uint32_t txlen;
+       uint32_t rxlen;
+       uint16_t reply_count;
+       uint32_t rx_run_len;
        uint16_t posterr; /* # of time error event handling was postponed */
        struct {
                uint64_t rxp;
index 4850312..158b2cb 100644 (file)
@@ -27,7 +27,8 @@ static const struct {
 } name2feop[] = {
        { .name = "rx", .op = RXONLY,},
        { .name = "tx", .op = TXONLY,},
-       { .name = "echo", .op = RXTX,},
+       { .name = "echo", .op = ECHO,},
+       { .name = "rxtx", .op = RXTX,},
        { .name = "fwd", .op = FWD,},
 };
 
@@ -520,6 +521,7 @@ parse_netfe_arg(struct netfe_stream_prm *sp, const char *arg)
                "fwraddr",
                "fwrport",
                "belcore",
+               "rxlen",
        };
 
        static const arg_handler_t hndl[] = {
@@ -535,6 +537,7 @@ parse_netfe_arg(struct netfe_stream_prm *sp, const char *arg)
                parse_ip_val,
                parse_uint_val,
                parse_uint_val,
+               parse_uint_val,
        };
 
        union parse_val val[RTE_DIM(hndl)];
@@ -553,6 +556,7 @@ parse_netfe_arg(struct netfe_stream_prm *sp, const char *arg)
        pv2saddr(&sp->fprm.local_addr, val + 7, val + 8);
        pv2saddr(&sp->fprm.remote_addr, val + 9, val + 10);
        sp->belcore = val[11].u64;
+       sp->rxlen = val[12].u64;
 
        return 0;
 }
@@ -631,6 +635,18 @@ check_netfe_arg(const struct netfe_stream_prm *sp)
                                format_feop(sp->op));
                        return -EINVAL;
                }
+       } else if (sp->op == RXTX) {
+               /* RXTX: Check tx pkt size */
+               if (sp->txlen == 0) {
+                       RTE_LOG(ERR, USER1, "invalid arg at line %u: "
+                               "txlen cannot be zero.\n", sp->line);
+                       return -EINVAL;
+               }
+               if (sp->rxlen == 0) {
+                       RTE_LOG(ERR, USER1, "invalid arg at line %u: "
+                               "rxlen cannot be zero.\n", sp->line);
+                       return -EINVAL;
+               }
        }
 
        return 0;
index e4aadb5..701de9b 100644 (file)
@@ -199,6 +199,9 @@ netfe_lcore_init_tcp(const struct netfe_lcore_prm *prm)
                } else if (prm->stream[i].op == TXONLY) {
                        fes->txlen = prm->stream[i].txlen;
                        fes->raddr = prm->stream[i].sprm.remote_addr;
+               } else if (prm->stream[i].op == RXTX) {
+                       fes->txlen = prm->stream[i].txlen;
+                       fes->rxlen = prm->stream[i].rxlen;
                }
 
                if (becfg.server == 1) {
@@ -348,6 +351,7 @@ netfe_new_conn_tcp(struct netfe_lcore *fe, uint32_t lcore,
                ts->proto = fes->proto;
                ts->family = fes->family;
                ts->txlen = fes->txlen;
+               ts->rxlen = fes->rxlen;
 
                tle_event_active(ts->erev, TLE_SEV_DOWN);
                if (fes->op == TXONLY || fes->op == FWD) {
@@ -516,7 +520,12 @@ netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes)
        if (k == 0)
                return 0;
 
-       if (n == RTE_DIM(fes->pbuf.pkt)) {
+       /* Mark stream for reading if:
+        * ECHO: Buffer full
+        * RXTX: All outbound packets successfully dispatched
+        */
+       if ((fes->op == ECHO && n == RTE_DIM(fes->pbuf.pkt)) ||
+                       (fes->op == RXTX && n - k == 0)) {
                /* mark stream as readable */
                tle_event_active(fes->rxev, TLE_SEV_UP);
                fes->stat.rxev[TLE_SEV_UP]++;
@@ -608,12 +617,14 @@ netfe_lcore_tcp(void)
 
                        rc = 0;
 
-                       if (fs[j]->op == RXTX)
+                       if (fs[j]->op == ECHO)
                                rc = netfe_rxtx_process_tcp(lcore, fs[j]);
                        else if (fs[j]->op == FWD)
                                rc = netfe_fwd_tcp(lcore, fs[j]);
                        else if (fs[j]->op == TXONLY)
                                rc = netfe_tx_process_tcp(lcore, fs[j]);
+                       else if (fs[j]->op == RXTX)
+                               rc = netfe_rxtx_process_tcp(lcore, fs[j]);
 
                        /* we are ok to close the stream */
                        if (rc == 0 && fs[j]->posterr != 0)
index cdec6a5..c079e9c 100644 (file)
@@ -510,7 +510,7 @@ netfe_lcore_udp(void)
                NETFE_TRACE("%s(%u): tle_evq_get(txevq=%p) returns %u\n",
                        __func__, lcore, fe->txeq, n);
                for (j = 0; j != n; j++) {
-                       if (fs[j]->op == RXTX)
+                       if (fs[j]->op == ECHO)
                                netfe_rxtx_process_udp(lcore, fs[j]);
                        else if (fs[j]->op == FWD)
                                netfe_fwd_udp(lcore, fs[j]);
index af65967..99791d0 100644 (file)
@@ -604,3 +604,15 @@ tle_tcp_stream_update_cfg(struct tle_stream *ts[],
 
        return i;
 }
+
+int
+tle_tcp_stream_get_mss(const struct tle_stream * stream)
+{
+       struct tle_tcp_stream *tcp;
+
+       if (stream == NULL)
+               return -EINVAL;
+
+       tcp = TCP_STREAM(stream);
+       return tcp->tcb.snd.mss;
+}
index ec89746..9086658 100644 (file)
@@ -362,6 +362,16 @@ uint16_t tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
  */
 int tle_tcp_process(struct tle_ctx *ctx, uint32_t num);
 
+/**
+ * Get current TCP maximum segment size
+ * @param stream
+ *   Stream to get MSS from.
+ * @return
+ *   Maximum segment size in bytes, if successful.
+ *   Negative on failure.
+ */
+int tle_tcp_stream_get_mss(const struct tle_stream * const stream);
+
 #ifdef __cplusplus
 }
 #endif