FE configuration record format:
- lcore=<uint>,op=<"rx|tx|echo|fwd">,\
+ lcore=<uint>,op=<"rx|tx|echo|fwd|rxtx">,\
laddr=<ip>,lport=<uint16>,raddr=<ip>,rport=<uint16>,\
[txlen=<uint>,fwladdr=<ip>,fwlport=<uint16>,fwraddr=<ip>,fwrport=<uint16>,\
- belcore=<uint>]
+ belcore=<uint>,rxlen=<uint>]
lcore - EAL lcore to manage that stream(s) in the FE. It is an mandatory
option.
"echo" - mimic recvfrom(..., &addr);sendto(..., &addr);
on that stream.
"fwd" - forward packets between streams.
+ "rxtx" - Receive/reply transactions on stream.
It is an mandatory option.
laddr - local address for the stream to open. It is an mandatory option.
lport - local port for the stream to open. It is an mandatory option.
raddr - remote address for the stream to open. It is an mandatory option.
rport - remote port for the stream to open. It is an mandatory option.
- txlen - data length sending in each packet (mandatory for "tx" mode only).
+ txlen - data length sending in each packet.
+ (mandatory for "tx" & "rxtx" modes only).
+ rxlen - Expected response length (mandatory for "rxtx" mode only).
fwladdr - local address for the forwarding stream(s) to open
(mandatory for "fwd" mode only).
fwlport - local port for the forwarding stream(s) to open
}
static inline int
-netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
+netfe_rxtx_get_mss(struct netfe_stream *fes)
+{
+ switch (fes->proto) {
+ case TLE_PROTO_TCP:
+ return tle_tcp_stream_get_mss(fes->s);
+
+ case TLE_PROTO_UDP:
+ /* The UDP code doesn't have MSS discovery, so have to
+ * assume arbitary MTU. Going to use default mbuf
+ * data space as TLDK uses this internally as a
+ * maximum segment size.
+ */
+ return RTE_MBUF_DEFAULT_DATAROOM - TLE_DST_MAX_HDR;
+ default:
+ NETFE_TRACE("%s(%u): Unhandled MSS query (family=%i)\n",
+ __func__, lcore, fes->proto, fes->family);
+ return -EINVAL;
+ }
+}
+
+static inline int
+netfe_rxtx_dispatch_reply(uint32_t lcore, struct netfe_stream *fes)
+
+{
+ struct pkt_buf *pb;
+ int32_t sid;
+ int32_t cnt_mtu_pkts;
+ int32_t cnt_all_pkts;
+ int32_t idx_pkt;
+ int32_t len_tail;
+ int32_t mtu;
+
+ pb = &fes->pbuf;
+ sid = rte_lcore_to_socket_id(lcore) + 1;
+ mtu = netfe_rxtx_get_mss(fes);
+
+ cnt_mtu_pkts = (fes->txlen / mtu);
+ cnt_all_pkts = cnt_mtu_pkts;
+ len_tail = fes->txlen - (mtu * cnt_mtu_pkts);
+
+ if (len_tail > 0)
+ cnt_all_pkts++;
+
+ if (pb->num + cnt_all_pkts >= RTE_DIM(pb->pkt)) {
+ NETFE_TRACE("%s(%u): Insufficent space for outbound burst\n",
+ __func__, lcore);
+ return -ENOMEM;
+ }
+ if (rte_pktmbuf_alloc_bulk(mpool[sid], &pb->pkt[pb->num], cnt_all_pkts)
+ != 0) {
+ NETFE_TRACE("%s(%u): rte_pktmbuf_alloc_bulk() failed\n",
+ __func__, lcore);
+ return -ENOMEM;
+ }
+
+ /* Full MTU packets */
+ for (idx_pkt = 0; idx_pkt < cnt_mtu_pkts; idx_pkt++) {
+ rte_pktmbuf_append(pb->pkt[pb->num++], mtu);
+ }
+
+ /* Last non-MTU packet, if any */
+ if (len_tail > 0)
+ rte_pktmbuf_append(pb->pkt[pb->num++], len_tail);
+
+ return 0;
+}
+
+static inline int
+netfe_rx_process(uint32_t lcore, struct netfe_stream *fes)
{
uint32_t k, n;
+ uint64_t count_bytes;
n = fes->pbuf.num;
k = RTE_DIM(fes->pbuf.pkt) - n;
/* free all received mbufs. */
if (fes->op == RXONLY)
fes->stat.rxb += pkt_buf_empty(&fes->pbuf);
+ else if (fes->op == RXTX) {
+ /* RXTX mode. Count incoming bytes then discard.
+ * If receive threshold (rxlen) exceeded, send out a packet.
+ */
+ count_bytes = pkt_buf_empty(&fes->pbuf);
+ fes->stat.rxb += count_bytes;
+ fes->rx_run_len += count_bytes;
+ if (fes->rx_run_len >= fes->rxlen) {
+ /* Idle Rx as buffer needed for Tx */
+ tle_event_idle(fes->rxev);
+ fes->stat.rxev[TLE_SEV_IDLE]++;
+
+ /* Discard surplus bytes. For now pipelining of
+ * requests is not supported.
+ */
+ fes->rx_run_len = 0;
+ netfe_rxtx_dispatch_reply(lcore, fes);
+
+ /* Kick off a Tx event */
+ tle_event_active(fes->txev, TLE_SEV_UP);
+ fes->stat.txev[TLE_SEV_UP]++;
+ }
+ }
/* mark stream as writable */
else if (k == RTE_DIM(fes->pbuf.pkt)) {
- if (fes->op == RXTX) {
+ if (fes->op == ECHO) {
tle_event_active(fes->txev, TLE_SEV_UP);
fes->stat.txev[TLE_SEV_UP]++;
} else if (fes->op == FWD) {
#include <tle_udp.h>
#include <tle_event.h>
+#define TLE_DEFAULT_MSS 536
+
#define MAX_PKT_BURST 0x20
/* Used to allocate the memory for hash key. */
RXONLY,
TXONLY,
RXTX,
+ ECHO,
FWD,
};
uint32_t belcore;
uint16_t line;
uint16_t op;
- uint16_t txlen; /* valid/used only for TXONLY op. */
+ uint32_t txlen; /* valid/used only for TXONLY op. */
+ uint32_t rxlen; /* Used by RXTX */
struct netfe_sprm sprm;
struct netfe_sprm fprm; /* valid/used only for FWD op. */
};
uint16_t op;
uint16_t proto;
uint16_t family;
- uint16_t txlen;
+ uint32_t txlen;
+ uint32_t rxlen;
+ uint16_t reply_count;
+ uint32_t rx_run_len;
uint16_t posterr; /* # of time error event handling was postponed */
struct {
uint64_t rxp;
} name2feop[] = {
{ .name = "rx", .op = RXONLY,},
{ .name = "tx", .op = TXONLY,},
- { .name = "echo", .op = RXTX,},
+ { .name = "echo", .op = ECHO,},
+ { .name = "rxtx", .op = RXTX,},
{ .name = "fwd", .op = FWD,},
};
"fwraddr",
"fwrport",
"belcore",
+ "rxlen",
};
static const arg_handler_t hndl[] = {
parse_ip_val,
parse_uint_val,
parse_uint_val,
+ parse_uint_val,
};
union parse_val val[RTE_DIM(hndl)];
pv2saddr(&sp->fprm.local_addr, val + 7, val + 8);
pv2saddr(&sp->fprm.remote_addr, val + 9, val + 10);
sp->belcore = val[11].u64;
+ sp->rxlen = val[12].u64;
return 0;
}
format_feop(sp->op));
return -EINVAL;
}
+ } else if (sp->op == RXTX) {
+ /* RXTX: Check tx pkt size */
+ if (sp->txlen == 0) {
+ RTE_LOG(ERR, USER1, "invalid arg at line %u: "
+ "txlen cannot be zero.\n", sp->line);
+ return -EINVAL;
+ }
+ if (sp->rxlen == 0) {
+ RTE_LOG(ERR, USER1, "invalid arg at line %u: "
+ "rxlen cannot be zero.\n", sp->line);
+ return -EINVAL;
+ }
}
return 0;
} else if (prm->stream[i].op == TXONLY) {
fes->txlen = prm->stream[i].txlen;
fes->raddr = prm->stream[i].sprm.remote_addr;
+ } else if (prm->stream[i].op == RXTX) {
+ fes->txlen = prm->stream[i].txlen;
+ fes->rxlen = prm->stream[i].rxlen;
}
if (becfg.server == 1) {
ts->proto = fes->proto;
ts->family = fes->family;
ts->txlen = fes->txlen;
+ ts->rxlen = fes->rxlen;
tle_event_active(ts->erev, TLE_SEV_DOWN);
if (fes->op == TXONLY || fes->op == FWD) {
if (k == 0)
return 0;
- if (n == RTE_DIM(fes->pbuf.pkt)) {
+ /* Mark stream for reading if:
+ * ECHO: Buffer full
+ * RXTX: All outbound packets successfully dispatched
+ */
+ if ((fes->op == ECHO && n == RTE_DIM(fes->pbuf.pkt)) ||
+ (fes->op == RXTX && n - k == 0)) {
/* mark stream as readable */
tle_event_active(fes->rxev, TLE_SEV_UP);
fes->stat.rxev[TLE_SEV_UP]++;
rc = 0;
- if (fs[j]->op == RXTX)
+ if (fs[j]->op == ECHO)
rc = netfe_rxtx_process_tcp(lcore, fs[j]);
else if (fs[j]->op == FWD)
rc = netfe_fwd_tcp(lcore, fs[j]);
else if (fs[j]->op == TXONLY)
rc = netfe_tx_process_tcp(lcore, fs[j]);
+ else if (fs[j]->op == RXTX)
+ rc = netfe_rxtx_process_tcp(lcore, fs[j]);
/* we are ok to close the stream */
if (rc == 0 && fs[j]->posterr != 0)
NETFE_TRACE("%s(%u): tle_evq_get(txevq=%p) returns %u\n",
__func__, lcore, fe->txeq, n);
for (j = 0; j != n; j++) {
- if (fs[j]->op == RXTX)
+ if (fs[j]->op == ECHO)
netfe_rxtx_process_udp(lcore, fs[j]);
else if (fs[j]->op == FWD)
netfe_fwd_udp(lcore, fs[j]);
return i;
}
+
+int
+tle_tcp_stream_get_mss(const struct tle_stream * stream)
+{
+ struct tle_tcp_stream *tcp;
+
+ if (stream == NULL)
+ return -EINVAL;
+
+ tcp = TCP_STREAM(stream);
+ return tcp->tcb.snd.mss;
+}
*/
int tle_tcp_process(struct tle_ctx *ctx, uint32_t num);
+/**
+ * Get current TCP maximum segment size
+ * @param stream
+ * Stream to get MSS from.
+ * @return
+ * Maximum segment size in bytes, if successful.
+ * Negative on failure.
+ */
+int tle_tcp_stream_get_mss(const struct tle_stream * const stream);
+
#ifdef __cplusplus
}
#endif