- Introduce tle_tcp_stream_readv() and tle_tcp_stream_writev(). 13/7813/2
authorKonstantin Ananyev <konstantin.ananyev@intel.com>
Thu, 27 Jul 2017 11:00:57 +0000 (12:00 +0100)
committerKonstantin Ananyev <konstantin.ananyev@intel.com>
Thu, 27 Jul 2017 19:24:53 +0000 (20:24 +0100)
- Introduce flags for tle_ctx_param.
- Introduce TLE_CTX_FLAG_ST - indicates that given ctx will be used
  by single thread only.
- Introduce new parameters for tcp context:
    timewait - allows user to configure max timeout in TCP_TIMEWAIT state.
    icw - allows user to specify desired initial congestion window
    for new connections.
-Few optimisations:
    cache tx.ol_flags inside tle destination.
    calcualte and cache inside ctx cycles_to_ms shift value.
    reorder restoring SYN opts and filling TCB a bit.

Change-Id: Ie05087783b3b7f1e4ce99d3555bc5bd098f83fe0
Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
Signed-off-by: Mohammad Abdul Awal <mohammad.abdul.awal@intel.com>
22 files changed:
examples/l4fwd/main.c
examples/l4fwd/parse.c
lib/libtle_dring/tle_dring.h
lib/libtle_l4p/ctx.c
lib/libtle_l4p/ctx.h
lib/libtle_l4p/misc.h
lib/libtle_l4p/stream.h
lib/libtle_l4p/stream_table.h
lib/libtle_l4p/syncookie.h
lib/libtle_l4p/tcp_ctl.h
lib/libtle_l4p/tcp_misc.h
lib/libtle_l4p/tcp_rxq.h
lib/libtle_l4p/tcp_rxtx.c
lib/libtle_l4p/tcp_stream.c
lib/libtle_l4p/tcp_stream.h
lib/libtle_l4p/tcp_tx_seg.h
lib/libtle_l4p/tle_ctx.h
lib/libtle_l4p/tle_tcp.h
lib/libtle_misc/tle_dpdk_wrapper.h
test/dring/test_dring.c
test/gtest/test_tle_dring.cpp
test/gtest/test_tle_dring.h

index 7613a95..c43b8d7 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -209,6 +209,7 @@ main(int argc, char *argv[])
                        __func__, rc);
 
        memset(&ctx_prm, 0, sizeof(ctx_prm));
+       ctx_prm.timewait = TLE_TCP_TIMEWAIT_DEFAULT;
 
        signal(SIGINT, sig_handle);
 
index 158b2cb..97cf20d 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -38,6 +38,9 @@ static const struct {
 #define        OPT_SHORT_SBULK         'B'
 #define        OPT_LONG_SBULK          "sburst"
 
+#define        OPT_SHORT_CTXFLAGS      'C'
+#define        OPT_LONG_CTXFLAGS       "ctxflags"
+
 #define        OPT_SHORT_PROMISC       'P'
 #define        OPT_LONG_PROMISC        "promisc"
 
@@ -74,9 +77,16 @@ static const struct {
 #define        OPT_SHORT_VERBOSE       'v'
 #define        OPT_LONG_VERBOSE        "verbose"
 
+#define        OPT_SHORT_WINDOW        'w'
+#define        OPT_LONG_WINDOW         "initial-window"
+
+#define        OPT_SHORT_TIMEWAIT      'W'
+#define        OPT_LONG_TIMEWAIT       "timewait"
+
 static const struct option long_opt[] = {
        {OPT_LONG_ARP, 1, 0, OPT_SHORT_ARP},
        {OPT_LONG_SBULK, 1, 0, OPT_SHORT_SBULK},
+       {OPT_LONG_CTXFLAGS, 1, 0, OPT_SHORT_CTXFLAGS},
        {OPT_LONG_PROMISC, 0, 0, OPT_SHORT_PROMISC},
        {OPT_LONG_RBUFS, 1, 0, OPT_SHORT_RBUFS},
        {OPT_LONG_SBUFS, 1, 0, OPT_SHORT_SBUFS},
@@ -89,6 +99,8 @@ static const struct option long_opt[] = {
        {OPT_LONG_SEC_KEY, 1, 0, OPT_SHORT_SEC_KEY},
        {OPT_LONG_LISTEN, 0, 0, OPT_SHORT_LISTEN},
        {OPT_LONG_VERBOSE, 1, 0, OPT_SHORT_VERBOSE},
+       {OPT_LONG_WINDOW, 1, 0, OPT_SHORT_WINDOW},
+       {OPT_LONG_TIMEWAIT, 1, 0, OPT_SHORT_TIMEWAIT},
        {NULL, 0, 0, 0}
 };
 
@@ -760,7 +772,7 @@ parse_app_options(int argc, char **argv, struct netbe_cfg *cfg,
 
        optind = 0;
        optarg = NULL;
-       while ((opt = getopt_long(argc, argv, "aB:LPR:S:TUb:f:s:v:H:K:",
+       while ((opt = getopt_long(argc, argv, "aB:C:LPR:S:TUb:f:s:v:H:K:W:w:",
                        long_opt, &opt_idx)) != EOF) {
                if (opt == OPT_SHORT_ARP) {
                        cfg->arp = 1;
@@ -771,6 +783,14 @@ parse_app_options(int argc, char **argv, struct netbe_cfg *cfg,
                                        "for option: \'%c\'\n",
                                        __func__, optarg, opt);
                        ctx_prm->send_bulk_size = v;
+               } else if (opt == OPT_SHORT_CTXFLAGS) {
+                       rc = parse_uint_val(NULL, optarg, &v);
+                       if (rc < 0)
+                               rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
+                                       "for option: \'%c\'\n",
+                                       __func__, optarg, opt);
+                       ctx_prm->flags = v;
+               } else if (opt == OPT_SHORT_PROMISC) {
                } else if (opt == OPT_SHORT_PROMISC) {
                        cfg->promisc = 1;
                } else if (opt == OPT_SHORT_RBUFS) {
@@ -835,9 +855,21 @@ parse_app_options(int argc, char **argv, struct netbe_cfg *cfg,
                        }
                        memcpy(&ctx_prm->secret_key, optarg,
                                sizeof(ctx_prm->secret_key));
-               }
-
-               else {
+               } else if (opt == OPT_SHORT_WINDOW) {
+                       rc = parse_uint_val(NULL, optarg, &v);
+                       if (rc < 0)
+                               rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
+                                       "for option: \'%c\'\n",
+                                       __func__, optarg, opt);
+                       ctx_prm->icw = v;
+               } else if (opt == OPT_SHORT_TIMEWAIT) {
+                       rc = parse_uint_val(NULL, optarg, &v);
+                       if (rc < 0)
+                               rte_exit(EXIT_FAILURE, "%s: invalid value: %s "
+                                       "for option: \'%c\'\n",
+                                       __func__, optarg, opt);
+                       ctx_prm->timewait = v;
+               } else {
                        rte_exit(EXIT_FAILURE,
                                "%s: unknown option: \'%c\'\n",
                                __func__, opt);
index f589ece..9d3788a 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -21,6 +21,7 @@
 #include <rte_common.h>
 #include <rte_atomic.h>
 #include <rte_memory.h>
+#include <rte_ring.h>
 #include <rte_debug.h>
 
 #ifdef __cplusplus
@@ -68,11 +69,13 @@ struct tle_drb {
 struct tle_dring {
        uint32_t flags;
        struct  {
+               uint32_t single;                /**< true if single producer */
                volatile uint32_t head;         /**< producer head */
                volatile uint32_t tail;         /**< producer tail */
                struct tle_drb * volatile crb;  /**< block to enqueue to */
        } prod __rte_cache_aligned;
        struct  {
+               uint32_t single;                /**< true if single consumer */
                volatile uint32_t head;         /**< consumer head */
                volatile uint32_t tail;         /**< consumer tail */
                struct tle_drb * volatile crb;  /**< block to dequeue from */
@@ -259,6 +262,36 @@ tle_dring_sp_enqueue(struct tle_dring *dr, const void * const objs[],
        return nb_obj;
 }
 
+/**
+ * Enqueue several objects on the dring.
+ * Note that it is a caller responsibility to provide enough drbs
+ * to enqueue all requested objects.
+ *
+ * @param dr
+ *   A pointer to the ring structure.
+ * @param objs
+ *   An array of pointers to objects to enqueue.
+ * @param nb_obj
+ *   The number of objects to add to the dring from the objs[].
+ * @param drbs
+ *   An array of pointers to the drbs that can be used by the dring
+ *   to perform enqueue operation.
+ * @param nb_drb
+ *   at input: number of elements in the drbs[] array.
+ *   at output: number of unused by the dring elements in the drbs[] array.
+ * @return
+ *   - number of enqueued objects.
+ */
+static inline uint32_t
+tle_dring_enqueue(struct tle_dring *dr, const void * const objs[],
+       uint32_t nb_obj, struct tle_drb *drbs[], uint32_t *nb_drb)
+{
+       if (dr->prod.single == 0)
+               return tle_dring_mp_enqueue(dr, objs, nb_obj, drbs, nb_drb);
+       else
+               return tle_dring_sp_enqueue(dr, objs, nb_obj, drbs, nb_drb);
+}
+
 /*
  * helper routine, to dequeue objects from the ring.
  */
@@ -428,6 +461,39 @@ tle_dring_sc_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj,
        return num;
 }
 
+/**
+ * Dequeue several objects from the dring.
+ * Note, that it is a caller responsibility to provide drbs[] large
+ * enough to store pointers to all drbs that might become unused
+ * after that dequeue operation. It is a caller responsibility to manage
+ * unused drbs after the dequeue operation is completed
+ * (i.e mark them as free/reusable again, etc.).
+ *
+ * @param dr
+ *   A pointer to the ring structure.
+ * @param objs
+ *   An array of pointers to objects that will be dequeued.
+ * @param nb_obj
+ *   The number of objects to dequeue from the dring.
+ * @param drbs
+ *   An array of pointers to the drbs that will become unused after that
+ *   dequeue operation.
+ * @param nb_drb
+ *   at input: number of elements in the drbs[] array.
+ *   at output: number of filled entries in the drbs[] array.
+ * @return
+ *   - number of dequeued objects.
+ */
+static inline uint32_t
+tle_dring_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj,
+       struct tle_drb *drbs[], uint32_t *nb_drb)
+{
+       if (dr->cons.single == 0)
+               return tle_dring_mc_dequeue(dr, objs, nb_obj, drbs, nb_drb);
+       else
+               return tle_dring_sc_dequeue(dr, objs, nb_obj, drbs, nb_drb);
+}
+
 /**
  * Reset given dring to the initial state.
  * Note, that information about all queued objects will be lost.
@@ -436,11 +502,14 @@ tle_dring_sc_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj,
  *   A pointer to the dring structure.
  */
 static inline void
-tle_dring_reset(struct tle_dring *dr)
+tle_dring_reset(struct tle_dring *dr, uint32_t flags)
 {
        memset(dr, 0, sizeof(*dr));
        dr->prod.crb = &dr->dummy;
        dr->cons.crb = &dr->dummy;
+       dr->prod.single = ((flags & RING_F_SP_ENQ) != 0);
+       dr->cons.single = ((flags & RING_F_SC_DEQ) != 0);
+       dr->flags = flags;
 }
 
 /**
index 6eb33eb..910fc88 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -16,6 +16,7 @@
 #include <string.h>
 #include <rte_malloc.h>
 #include <rte_errno.h>
+#include <rte_cycles.h>
 #include <rte_ethdev.h>
 #include <rte_ip.h>
 
@@ -77,6 +78,7 @@ tle_ctx_create(const struct tle_ctx_param *ctx_prm)
 {
        struct tle_ctx *ctx;
        size_t sz;
+       uint64_t ms;
        uint32_t i;
        int32_t rc;
 
@@ -95,6 +97,10 @@ tle_ctx_create(const struct tle_ctx_param *ctx_prm)
                return NULL;
        }
 
+       /* caclulate closest shift to convert from cycles to ms (approximate) */
+       ms = (rte_get_tsc_hz() + MS_PER_S - 1) / MS_PER_S;
+       ctx->cycles_ms_shift = sizeof(ms) * CHAR_BIT - __builtin_clzll(ms) - 1;
+
        ctx->prm = *ctx_prm;
 
        rc = tle_stream_ops[ctx_prm->proto].init_streams(ctx);
@@ -195,6 +201,7 @@ struct tle_dev *
 tle_add_dev(struct tle_ctx *ctx, const struct tle_dev_param *dev_prm)
 {
        int32_t rc;
+       uint32_t df;
        struct tle_dev *dev;
 
        if (ctx == NULL || dev_prm == NULL || check_dev_prm(dev_prm) != 0) {
@@ -247,7 +254,9 @@ tle_add_dev(struct tle_ctx *ctx, const struct tle_dev_param *dev_prm)
        }
 
        /* setup TX data. */
-       tle_dring_reset(&dev->tx.dr);
+       df = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
+               RING_F_SP_ENQ | RING_F_SC_DEQ;
+       tle_dring_reset(&dev->tx.dr, df);
 
        if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_UDP_CKSUM) != 0 &&
                        ctx->prm.proto == TLE_PROTO_UDP) {
index cc32081..389d646 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -53,6 +53,7 @@ struct tle_dev {
 
 struct tle_ctx {
        struct tle_ctx_param prm;
+       uint32_t cycles_ms_shift;  /* to convert from cycles to ms */
        struct {
                rte_spinlock_t lock;
                uint32_t nb_free; /* number of free streams. */
index 6450b67..9bff459 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -396,20 +396,103 @@ compress_pkt_list(struct rte_mbuf *pkt[], uint32_t nb_pkt, uint32_t nb_zero)
        return nb_pkt;
 }
 
+static inline void
+free_mbufs(struct rte_mbuf *mb[], uint32_t num)
+{
+       uint32_t i;
+
+       for (i = 0; i != num; i++)
+               rte_pktmbuf_free(mb[i]);
+}
+
 /* empty ring and free queued mbufs */
 static inline void
 empty_mbuf_ring(struct rte_ring *r)
 {
-       uint32_t i, n;
+       uint32_t n;
        struct rte_mbuf *mb[MAX_PKT_BURST];
 
        do {
                n = _rte_ring_dequeue_burst(r, (void **)mb, RTE_DIM(mb));
-               for (i = 0; i != n; i++)
-                       rte_pktmbuf_free(mb[i]);
+               free_mbufs(mb, n);
        } while (n != 0);
 }
 
+static inline uint32_t
+_mbus_to_iovec(struct iovec *iv, struct rte_mbuf *mb[], uint32_t num)
+{
+       uint32_t i, ns;
+       uint32_t len, slen, tlen;
+       struct rte_mbuf *m, *next;
+       const void *src;
+
+       for (i = 0; i != num; i++) {
+
+               m = mb[i];
+               tlen = 0;
+               ns = 0;
+
+               do {
+                       slen = m->data_len;
+                       src = rte_pktmbuf_mtod(m, const void *);
+                       len = RTE_MIN(iv->iov_len - tlen, slen);
+                       rte_memcpy((uint8_t *)iv->iov_base + tlen, src, len);
+                       slen -= len;
+                       tlen += len;
+                       if (slen != 0)
+                               break;
+                       ns++;
+                       next = m->next;
+                       rte_pktmbuf_free_seg(m);
+                       m = next;
+                } while (m != NULL);
+
+               iv->iov_base = (uint8_t *)iv->iov_base + tlen;
+               iv->iov_len -= tlen;
+
+               /* partly consumed mbuf */
+               if (m != NULL) {
+                       m->pkt_len = mb[i]->pkt_len - tlen;
+                       m->data_len = slen;
+                       m->data_off += len;
+                       m->nb_segs = mb[i]->nb_segs - ns;
+                       mb[i] = m;
+                       break;
+               }
+       }
+
+       return i;
+}
+
+static inline uint32_t
+_iovec_to_mbsegs(struct iovec *iv, uint32_t seglen, struct rte_mbuf *mb[],
+       uint32_t num)
+{
+       uint32_t i;
+       uint32_t len, slen, tlen;
+       struct rte_mbuf *m;
+       void *dst;
+
+       tlen = 0;
+       for (i = 0; i != num; i++) {
+
+               m = mb[i];
+               slen = rte_pktmbuf_tailroom(m);
+               slen = RTE_MIN(slen, seglen - m->data_len);
+               len = RTE_MIN(iv->iov_len - tlen, slen);
+               dst = rte_pktmbuf_append(m, len);
+               rte_memcpy(dst, (uint8_t *)iv->iov_base + tlen, len);
+               tlen += len;
+               if (len != slen)
+                       break;
+       }
+
+       iv->iov_base = (uint8_t *)iv->iov_base + tlen;
+       iv->iov_len -= tlen;
+
+       return i;
+}
+
 #ifdef __cplusplus
 }
 #endif
index f3b5828..e76f126 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -147,6 +147,8 @@ stream_get_dest(struct tle_stream *s, const void *dst_addr,
                return -ENOENT;
 
        dev = dst->dev;
+       dst->ol_flags = dev->tx.ol_flags[s->type];
+
        if (s->type == TLE_V4) {
                struct ipv4_hdr *l3h;
                l3h = (struct ipv4_hdr *)(dst->hdr + dst->l2_len);
index 29f1f63..033c306 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -23,9 +23,6 @@
 extern "C" {
 #endif
 
-/* current stbl entry contains packet. */
-#define        STE_PKT 1
-
 struct stbl_entry {
        void *data;
 };
@@ -138,18 +135,6 @@ stbl_find_entry(struct stbl *st, const union pkt_info *pi)
        return ht->ent + rc;
 }
 
-static inline int
-stbl_data_pkt(const void *p)
-{
-       return ((uintptr_t)p & STE_PKT);
-}
-
-static inline void *
-stbl_get_pkt(const struct stbl_entry *se)
-{
-       return (void *)((uintptr_t)se->data ^ STE_PKT);
-}
-
 static inline void *
 stbl_find_data(struct stbl *st, const union pkt_info *pi)
 {
@@ -159,35 +144,6 @@ stbl_find_data(struct stbl *st, const union pkt_info *pi)
        return (ent == NULL) ? NULL : ent->data;
 }
 
-static inline void
-stbl_del_pkt(struct stbl *st, struct stbl_entry *se, const union pkt_info *pi)
-{
-       uint32_t type;
-       struct stbl_key k;
-
-       se->data = NULL;
-
-       type = pi->tf.type;
-       stbl_pkt_fill_key(&k, pi, type);
-       rte_hash_del_key(st->ht[type].t, &k);
-}
-
-static inline void
-stbl_del_pkt_lock(struct stbl *st, struct stbl_entry *se,
-       const union pkt_info *pi)
-{
-       uint32_t type;
-       struct stbl_key k;
-
-       se->data = NULL;
-
-       type = pi->tf.type;
-       stbl_pkt_fill_key(&k, pi, type);
-       stbl_lock(st, type);
-       rte_hash_del_key(st->ht[type].t, &k);
-       stbl_unlock(st, type);
-}
-
 #include "tcp_stream.h"
 
 static inline void
@@ -235,8 +191,8 @@ stbl_add_stream_lock(struct stbl *st, const struct tle_tcp_stream *s)
 }
 
 static inline void
-stbl_del_stream_lock(struct stbl *st, struct stbl_entry *se,
-       const struct tle_tcp_stream *s)
+stbl_del_stream(struct stbl *st, struct stbl_entry *se,
+       const struct tle_tcp_stream *s, uint32_t lock)
 {
        uint32_t type;
        struct stbl_key k;
@@ -248,9 +204,11 @@ stbl_del_stream_lock(struct stbl *st, struct stbl_entry *se,
 
        type = s->s.type;
        stbl_stream_fill_key(&k, &s->s, type);
-       stbl_lock(st, type);
+       if (lock != 0)
+               stbl_lock(st, type);
        rte_hash_del_key(st->ht[type].t, &k);
-       stbl_unlock(st, type);
+       if (lock != 0)
+               stbl_unlock(st, type);
 }
 
 #ifdef __cplusplus
index da2e166..61bfce4 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -178,37 +178,40 @@ sync_check_ack(const union pkt_info *pi, uint32_t seq, uint32_t ack,
 }
 
 static inline void
-sync_get_opts(struct syn_opts *so, uintptr_t p, uint32_t len)
+sync_fill_tcb(struct tcb *tcb, const union seg_info *si, const union tsopt *to)
 {
-       so->ts = get_tms_opts(p, len);
-       so->wscale = so->ts.ecr & SYNC_TMS_WSCALE_MASK;
-}
+       uint32_t ack, mss, seq, wscale;
 
-static inline void
-sync_fill_tcb(struct tcb *tcb, const union seg_info *si,
-       const struct syn_opts *so)
-{
-       tcb->rcv.nxt = si->seq;
-       tcb->rcv.irs = si->seq - 1;
+       seq = si->seq;
+
+       tcb->rcv.nxt = seq;
+       tcb->rcv.irs = seq - 1;
+       tcb->snd.wu.wl1 = seq;
+
+       ack = si->ack;
+
+       tcb->snd.nxt = ack;
+       tcb->snd.una = ack;
+       tcb->snd.iss = ack - 1;
+       tcb->snd.rcvr = ack - 1;
+       tcb->snd.wu.wl2 = ack;
 
-       tcb->snd.nxt = si->ack;
-       tcb->snd.una = si->ack;
-       tcb->snd.iss = si->ack - 1;
-       tcb->snd.rcvr = tcb->snd.iss;
+       mss = si->mss;
 
-       tcb->snd.wu.wl1 = si->seq;
-       tcb->snd.wu.wl2 = si->ack;
+       tcb->snd.mss = mss;
+       tcb->so.mss = mss;
 
-       tcb->so = *so;
+       tcb->snd.ts = to->ecr;
+       tcb->rcv.ts = to->val;
+       tcb->so.ts.raw = to->raw;
 
-       tcb->snd.wscale = tcb->so.wscale;
-       tcb->snd.mss = tcb->so.mss;
-       tcb->snd.wnd = si->wnd << tcb->snd.wscale;
+       wscale = to->ecr & SYNC_TMS_WSCALE_MASK;
 
-       tcb->snd.ts = tcb->so.ts.ecr;
-       tcb->rcv.ts = tcb->so.ts.val;
+       tcb->snd.wscale = wscale;
+       tcb->snd.wnd = si->wnd << wscale;
+       tcb->so.wscale = wscale;
 
-       tcb->rcv.wscale = (tcb->so.wscale == TCP_WSCALE_NONE) ?
+       tcb->rcv.wscale = (wscale == TCP_WSCALE_NONE) ?
                TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
 }
 
index 32faaa2..bec1e76 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -30,15 +30,63 @@ extern "C" {
 static inline void
 tcp_stream_down(struct tle_tcp_stream *s)
 {
-       rwl_down(&s->rx.use);
-       rwl_down(&s->tx.use);
+       if ((s->flags & TLE_CTX_FLAG_ST) == 0)
+               rwl_down(&s->use);
+       else
+               rte_atomic32_set(&s->use, INT32_MIN);
 }
 
 static inline void
 tcp_stream_up(struct tle_tcp_stream *s)
 {
-       rwl_up(&s->rx.use);
-       rwl_up(&s->tx.use);
+       int32_t v;
+
+       if ((s->flags & TLE_CTX_FLAG_ST) == 0)
+               rwl_up(&s->use);
+       else {
+               v = rte_atomic32_read(&s->use) - INT32_MIN;
+               rte_atomic32_set(&s->use, v);
+       }
+}
+
+static inline int
+tcp_stream_try_acquire(struct tle_tcp_stream *s)
+{
+       int32_t v;
+
+       if ((s->flags & TLE_CTX_FLAG_ST) == 0)
+               return rwl_try_acquire(&s->use);
+
+       v = rte_atomic32_read(&s->use) + 1;
+       rte_atomic32_set(&s->use, v);
+       return v;
+}
+
+static inline void
+tcp_stream_release(struct tle_tcp_stream *s)
+{
+       int32_t v;
+
+       if ((s->flags & TLE_CTX_FLAG_ST) == 0)
+               rwl_release(&s->use);
+       else {
+               v = rte_atomic32_read(&s->use) - 1;
+               rte_atomic32_set(&s->use, v);
+       }
+}
+
+static inline int
+tcp_stream_acquire(struct tle_tcp_stream *s)
+{
+       int32_t v;
+
+       if ((s->flags & TLE_CTX_FLAG_ST) == 0)
+               return rwl_acquire(&s->use);
+
+       v = rte_atomic32_read(&s->use) + 1;
+       if (v > 0)
+               rte_atomic32_set(&s->use, v);
+       return v;
 }
 
 /* calculate RCV.WND value based on size of stream receive buffer */
@@ -67,28 +115,28 @@ empty_tq(struct tle_tcp_stream *s)
 static inline void
 empty_rq(struct tle_tcp_stream *s)
 {
-       empty_mbuf_ring(s->rx.q);
+       uint32_t n;
+       struct rte_mbuf *mb[MAX_PKT_BURST];
+
+       do {
+               n = _rte_ring_mcs_dequeue_burst(s->rx.q, (void **)mb,
+                       RTE_DIM(mb));
+               free_mbufs(mb, n);
+       } while (n != 0);
+
        tcp_ofo_reset(s->rx.ofo);
 }
 
 /* empty stream's listen queue */
 static inline void
-empty_lq(struct tle_tcp_stream *s, struct stbl *st)
+empty_lq(struct tle_tcp_stream *s)
 {
-       uint32_t i, n;
-       struct rte_mbuf *mb;
-       union pkt_info pi;
-       union seg_info si;
-       struct stbl_entry *se[MAX_PKT_BURST];
+       uint32_t n;
+       struct tle_stream *ts[MAX_PKT_BURST];
 
        do {
-               n = _rte_ring_dequeue_burst(s->rx.q, (void **)se, RTE_DIM(se));
-               for (i = 0; i != n; i++) {
-                       mb = stbl_get_pkt(se[i]);
-                       get_pkt_info(mb, &pi, &si);
-                       stbl_del_pkt_lock(st, se[i], &pi);
-                       rte_pktmbuf_free(mb);
-               }
+               n = _rte_ring_dequeue_burst(s->rx.q, (void **)ts, RTE_DIM(ts));
+               tle_tcp_stream_close_bulk(ts, n);
        } while (n != 0);
 }
 
@@ -114,12 +162,13 @@ tcp_stream_reset(struct tle_ctx *ctx, struct tle_tcp_stream *s)
                /* free stream's destination port */
                stream_clear_ctx(ctx, &s->s);
                if (uop == TCP_OP_LISTEN)
-                       empty_lq(s, st);
+                       empty_lq(s);
        }
 
        if (s->ste != NULL) {
                /* remove entry from RX streams table */
-               stbl_del_stream_lock(st, s->ste, s);
+               stbl_del_stream(st, s->ste, s,
+                       (s->flags & TLE_CTX_FLAG_ST) == 0);
                s->ste = NULL;
                empty_rq(s);
        }
index 9f19f69..0ca5429 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -54,6 +54,10 @@ extern "C" {
 
 #define TCP6_OP_MSS    (TCP6_NOP_MSS - TCP_TX_OPT_LEN_MAX)
 
+/* Initial Window Configuration parameter, probably will be configured during
+ * the startup in future */
+#define TCP_INITIAL_CWND_MAX 14600
+
 /*
  * TCP flags
  */
@@ -93,8 +97,8 @@ union seg_info {
        struct {
                uint32_t seq;
                uint32_t ack;
-               uint16_t hole1;
                uint16_t wnd;
+               uint16_t mss; /* valid only at SYN time */
        };
 };
 
@@ -223,11 +227,10 @@ struct dack_info {
 
 /* get current timestamp in ms */
 static inline uint32_t
-tcp_get_tms(void)
+tcp_get_tms(uint32_t mshift)
 {
-       uint64_t ts, ms;
-       ms = (rte_get_tsc_hz() + MS_PER_S - 1) / MS_PER_S;
-       ts = rte_get_tsc_cycles() / ms;
+       uint64_t ts;
+       ts = rte_get_tsc_cycles() >> mshift;
        return ts;
 }
 
@@ -248,8 +251,11 @@ static inline void
 get_seg_info(const struct tcp_hdr *th, union seg_info *si)
 {
        __m128i v;
-       const  __m128i bswap_mask = _mm_set_epi8(15, 14, 13, 12, 10, 11, 9, 8,
-                       4, 5, 6, 7, 0, 1, 2, 3);
+       const  __m128i bswap_mask =
+               _mm_set_epi8(UINT8_MAX, UINT8_MAX, UINT8_MAX, UINT8_MAX,
+                       UINT8_MAX, UINT8_MAX, 10, 11,
+                       4, 5, 6, 7,
+                       0, 1, 2, 3);
 
        v = _mm_loadu_si128((const __m128i *)&th->sent_seq);
        si->raw.x = _mm_shuffle_epi8(v, bswap_mask);
index bddc28e..01f34fa 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
 extern "C" {
 #endif
 
+struct rxq_objs {
+       struct rte_mbuf **mb;
+       uint32_t num;
+};
+
 static inline uint32_t
 rx_ofo_enqueue(struct tle_tcp_stream *s, union seqlen *sl,
        struct rte_mbuf *mb[], uint32_t num)
@@ -142,6 +147,41 @@ rx_data_enqueue(struct tle_tcp_stream *s, uint32_t seq, uint32_t len,
        return t;
 }
 
+static inline uint32_t
+tcp_rxq_get_objs(struct tle_tcp_stream *s, struct rxq_objs obj[2])
+{
+       struct rte_ring *r;
+       uint32_t n, head, sz;
+
+       r = s->rx.q;
+
+       n = _rte_ring_mcs_dequeue_start(r, UINT32_MAX);
+       if (n == 0)
+               return 0;
+
+       sz = _rte_ring_get_size(r);
+       head = (r->cons.head - n) & _rte_ring_get_mask(r);
+
+       obj[0].mb = (struct rte_mbuf **)(_rte_ring_get_data(r) + head);
+       obj[1].mb = (struct rte_mbuf **)_rte_ring_get_data(r);
+
+       if (head + n <= sz) {
+               obj[0].num = n;
+               obj[1].num = 0;
+               return 1;
+       } else {
+               obj[0].num = sz - head;
+               obj[1].num = n + head - sz;
+               return 2;
+       }
+}
+
+static inline void
+tcp_rxq_consume(struct tle_tcp_stream *s, uint32_t num)
+{
+       _rte_ring_mcs_dequeue_finish(s->rx.q, num);
+}
+
 #ifdef __cplusplus
 }
 #endif
index a1c7d09..30ed104 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -59,12 +59,12 @@ rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi,
        struct tle_tcp_stream *s;
 
        s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst];
-       if (s == NULL || rwl_acquire(&s->rx.use) < 0)
+       if (s == NULL || tcp_stream_acquire(s) < 0)
                return NULL;
 
        /* check that we have a proper stream. */
        if (s->tcb.state != TCP_ST_LISTEN) {
-               rwl_release(&s->rx.use);
+               tcp_stream_release(s);
                s = NULL;
        }
 
@@ -84,11 +84,11 @@ rx_obtain_stream(const struct tle_dev *dev, struct stbl *st,
                return NULL;
        }
 
-       if (stbl_data_pkt(s) || rwl_acquire(&s->rx.use) < 0)
+       if (tcp_stream_acquire(s) < 0)
                return NULL;
        /* check that we have a proper stream. */
        else if (s->tcb.state == TCP_ST_CLOSED) {
-               rwl_release(&s->rx.use);
+               tcp_stream_release(s);
                s = NULL;
        }
 
@@ -164,6 +164,24 @@ stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
        return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
 }
 
+static inline uint32_t
+get_ip_pid(struct tle_dev *dev, uint32_t num, uint32_t type, uint32_t st)
+{
+       uint32_t pid;
+       rte_atomic32_t *pa;
+
+       pa = &dev->tx.packet_id[type];
+
+       if (st == 0) {
+               pid = rte_atomic32_add_return(pa, num);
+               return pid - num;
+       } else {
+               pid = rte_atomic32_read(pa);
+               rte_atomic32_set(pa, pid + num);
+               return pid;
+       }
+}
+
 static inline void
 fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
        uint32_t seq, uint8_t hlen, uint8_t flags)
@@ -357,7 +375,7 @@ tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
        type = s->s.type;
 
        dev = s->tx.dst.dev;
-       pid = rte_atomic32_add_return(&dev->tx.packet_id[type], num) - num;
+       pid = get_ip_pid(dev, num, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
 
        k = 0;
        tn = 0;
@@ -506,22 +524,16 @@ calc_smss(uint16_t mss, const struct tle_dest *dst)
 }
 
 /*
- * RFC 5681 3.1
- * If SMSS > 2190 bytes:
- *     IW = 2 * SMSS bytes and MUST NOT be more than 2 segments
- *  If (SMSS > 1095 bytes) and (SMSS <= 2190 bytes):
- *     IW = 3 * SMSS bytes and MUST NOT be more than 3 segments
- *  if SMSS <= 1095 bytes:
- *     IW = 4 * SMSS bytes and MUST NOT be more than 4 segments
+ * RFC 6928 2
+ * min (10*MSS, max (2*MSS, 14600))
+ *
+ * or using user provided initial congestion window (icw)
+ * min (10*MSS, max (2*MSS, icw))
  */
 static inline uint32_t
-initial_cwnd(uint16_t smss)
+initial_cwnd(uint32_t smss, uint32_t icw)
 {
-       if (smss > 2190)
-               return 2 * smss;
-       else if (smss > 1095)
-               return 3 * smss;
-       return 4 * smss;
+       return RTE_MIN(10 * smss, RTE_MAX(2 * smss, icw));
 }
 
 /*
@@ -561,7 +573,7 @@ send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
 
        dst = &s->tx.dst;
        type = s->s.type;
-       pid = rte_atomic32_add_return(&dst->dev->tx.packet_id[type], 1) - 1;
+       pid = get_ip_pid(dst->dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
 
        rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1);
        if (rc == 0)
@@ -657,7 +669,7 @@ sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
                return -EINVAL;
 
        dev = dst.dev;
-       pid = rte_atomic32_add_return(&dev->tx.packet_id[type], 1) - 1;
+       pid = get_ip_pid(dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
 
        rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq,
                TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
@@ -763,8 +775,8 @@ rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
 }
 
 static inline int
-restore_syn_opt(struct syn_opts *so, const union pkt_info *pi,
-       const union seg_info *si, uint32_t ts, const struct rte_mbuf *mb,
+restore_syn_opt(union seg_info *si, union tsopt *to,
+       const union pkt_info *pi, uint32_t ts, const struct rte_mbuf *mb,
        uint32_t hash_alg, rte_xmm_t *secret_key)
 {
        int32_t rc;
@@ -778,12 +790,12 @@ restore_syn_opt(struct syn_opts *so, const union pkt_info *pi,
        if (rc < 0)
                return rc;
 
-       so->mss = rc;
+       si->mss = rc;
 
        th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
                mb->l2_len + mb->l3_len);
        len = mb->l4_len - sizeof(*th);
-       sync_get_opts(so, (uintptr_t)(th + 1), len);
+       to[0] = get_tms_opts((uintptr_t)(th + 1), len);
        return 0;
 }
 
@@ -814,9 +826,11 @@ static inline int
 stream_fill_dest(struct tle_tcp_stream *s)
 {
        int32_t rc;
+       uint32_t type;
        const void *da;
 
-       if (s->s.type == TLE_V4)
+        type = s->s.type;
+       if (type == TLE_V4)
                da = &s->s.ipv4.addr.src;
        else
                da = &s->s.ipv6.addr.src;
@@ -830,7 +844,7 @@ stream_fill_dest(struct tle_tcp_stream *s)
  */
 static inline int
 accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
-       struct tle_tcp_stream *cs, const struct syn_opts *so,
+       struct tle_tcp_stream *cs, const union tsopt *to,
        uint32_t tms, const union pkt_info *pi, const union seg_info *si)
 {
        int32_t rc;
@@ -857,7 +871,7 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
        }
 
        /* setup TCB */
-       sync_fill_tcb(&cs->tcb, si, so);
+       sync_fill_tcb(&cs->tcb, si, to);
        cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
 
        /*
@@ -871,8 +885,9 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
        } else
                cs->tcb.snd.rto = TCP_RTO_DEFAULT;
 
-       /* copy streams type. */
+       /* copy streams type & flags. */
        cs->s.type = ps->s.type;
+       cs->flags = ps->flags;
 
        /* retrive and cache destination information. */
        rc = stream_fill_dest(cs);
@@ -883,8 +898,9 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
        cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
 
        /* setup congestion variables */
-       cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss);
+       cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss, ps->tcb.snd.cwnd);
        cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
+       cs->tcb.snd.rto_tw = ps->tcb.snd.rto_tw;
 
        cs->tcb.state = TCP_ST_ESTABLISHED;
 
@@ -909,14 +925,14 @@ accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
  */
 static inline int
 rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
-       const union pkt_info *pi, const union seg_info *si,
+       const union pkt_info *pi, union seg_info *si,
        uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
 {
        int32_t rc;
        struct tle_ctx *ctx;
        struct tle_stream *ts;
        struct tle_tcp_stream *cs;
-       struct syn_opts so;
+       union tsopt to;
 
        *csp = NULL;
 
@@ -924,7 +940,7 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
                return -EINVAL;
 
        ctx = s->s.ctx;
-       rc = restore_syn_opt(&so, pi, si, tms, mb, ctx->prm.hash_alg,
+       rc = restore_syn_opt(si, &to, pi, tms, mb, ctx->prm.hash_alg,
                                &ctx->prm.secret_key);
        if (rc < 0)
                return rc;
@@ -936,7 +952,7 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
                return ENFILE;
 
        /* prepare stream to handle new connection */
-       if (accept_prep_stream(s, st, cs, &so, tms, pi, si) == 0) {
+       if (accept_prep_stream(s, st, cs, &to, tms, pi, si) == 0) {
 
                /* put new stream in the accept queue */
                if (_rte_ring_enqueue_burst(s->rx.q,
@@ -947,7 +963,7 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
 
                /* cleanup on failure */
                tcp_stream_down(cs);
-               stbl_del_pkt(st, cs->ste, pi);
+               stbl_del_stream(st, cs->ste, cs, 0);
                cs->ste = NULL;
        }
 
@@ -1007,6 +1023,17 @@ rx_ackdata(struct tle_tcp_stream *s, uint32_t ack)
        return n;
 }
 
+static void
+stream_timewait(struct tle_tcp_stream *s, uint32_t rto)
+{
+       if (rto != 0) {
+               s->tcb.state = TCP_ST_TIME_WAIT;
+               s->tcb.snd.rto = rto;
+               timer_reset(s);
+       } else
+               stream_term(s);
+}
+
 static void
 rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
 {
@@ -1027,17 +1054,13 @@ rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
                        s->err.cb.func(s->err.cb.data, &s->s);
        } else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) {
                rsp->flags |= TCP_FLAG_ACK;
-               if (ackfin != 0) {
-                       s->tcb.state = TCP_ST_TIME_WAIT;
-                       s->tcb.snd.rto = TCP_RTO_2MSL;
-                       timer_reset(s);
-               } else
+               if (ackfin != 0)
+                       stream_timewait(s, s->tcb.snd.rto_tw);
+               else
                        s->tcb.state = TCP_ST_CLOSING;
        } else if (state == TCP_ST_FIN_WAIT_2) {
                rsp->flags |= TCP_FLAG_ACK;
-               s->tcb.state = TCP_ST_TIME_WAIT;
-               s->tcb.snd.rto = TCP_RTO_2MSL;
-               timer_reset(s);
+               stream_timewait(s, s->tcb.snd.rto_tw);
        } else if (state == TCP_ST_LAST_ACK && ackfin != 0) {
                stream_term(s);
        }
@@ -1144,7 +1167,9 @@ rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp)
 static inline void
 dack_info_init(struct dack_info *tack, const struct tcb *tcb)
 {
-       memset(tack, 0, sizeof(*tack));
+       static const struct dack_info zero_dack;
+
+       tack[0] = zero_dack;
        tack->ack = tcb->snd.una;
        tack->segs.dup = tcb->rcv.dupack;
        tack->wu.raw = tcb->snd.wu.raw;
@@ -1488,9 +1513,7 @@ rx_ackfin(struct tle_tcp_stream *s)
                timer_stop(s);
                s->tcb.state = TCP_ST_FIN_WAIT_2;
        } else if (state == TCP_ST_CLOSING) {
-               s->tcb.state = TCP_ST_TIME_WAIT;
-               s->tcb.snd.rto = TCP_RTO_2MSL;
-               timer_reset(s);
+               stream_timewait(s, s->tcb.snd.rto_tw);
        }
 }
 
@@ -1554,7 +1577,7 @@ rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
        s->tcb.snd.wscale = so.wscale;
 
        /* setup congestion variables */
-       s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss);
+       s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd);
        s->tcb.snd.ssthresh = s->tcb.snd.wnd;
 
        s->tcb.rcv.ts = so.ts.val;
@@ -1720,9 +1743,9 @@ rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
 {
        uint32_t i;
 
-       if (rwl_acquire(&s->rx.use) > 0) {
+       if (tcp_stream_acquire(s) > 0) {
                i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
-               rwl_release(&s->rx.use);
+               tcp_stream_release(s);
                return i;
        }
 
@@ -1735,7 +1758,7 @@ rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
 
 static inline uint32_t
 rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
-       const union pkt_info pi[], const union seg_info si[],
+       const union pkt_info pi[], union seg_info si[],
        struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
        uint32_t num)
 {
@@ -1809,7 +1832,7 @@ rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
                k = num - i;
        }
 
-       rwl_release(&s->rx.use);
+       tcp_stream_release(s);
        return num - k;
 }
 
@@ -1850,7 +1873,7 @@ rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
                }
        }
 
-       rwl_release(&s->rx.use);
+       tcp_stream_release(s);
        return num - k;
 }
 
@@ -1859,7 +1882,8 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
        struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
 {
        struct stbl *st;
-       uint32_t i, j, k, n, t, ts;
+       struct tle_ctx *ctx;
+       uint32_t i, j, k, mt, n, t, ts;
        uint64_t csf;
        union pkt_info pi[num];
        union seg_info si[num];
@@ -1868,8 +1892,10 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
                uint32_t raw;
        } stu;
 
-       ts = tcp_get_tms();
-       st = CTX_TCP_STLB(dev->ctx);
+       ctx = dev->ctx;
+       ts = tcp_get_tms(ctx->cycles_ms_shift);
+       st = CTX_TCP_STLB(ctx);
+       mt = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0);
 
        stu.raw = 0;
 
@@ -1887,7 +1913,7 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
                                pi[i].tf.type, IPPROTO_TCP) != 0)
                        pi[i].csf = csf;
 
-               stu.t[t] = 1;
+               stu.t[t] = mt;
        }
 
        if (stu.t[TLE_V4] != 0)
@@ -1936,7 +1962,7 @@ tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
        struct tle_tcp_stream *s;
 
        s = TCP_STREAM(ts);
-       n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)rs, num);
+       n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num);
        if (n == 0)
                return 0;
 
@@ -1945,9 +1971,9 @@ tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
         * then rearm stream RX event.
         */
        if (n == num && rte_ring_count(s->rx.q) != 0) {
-               if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
+               if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
                        tle_event_raise(s->rx.ev);
-               rwl_release(&s->rx.use);
+               tcp_stream_release(s);
        }
 
        return n;
@@ -2066,7 +2092,7 @@ tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
        /* fill pkt info to generate seq.*/
        stream_fill_pkt_info(s, &pi);
 
-       tms = tcp_get_tms();
+       tms = tcp_get_tms(s->s.ctx->cycles_ms_shift);
        s->tcb.so.ts.val = tms;
        s->tcb.so.ts.ecr = 0;
        s->tcb.so.wscale = TCP_WSCALE_DEFAULT;
@@ -2116,7 +2142,7 @@ tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
        if (type >= TLE_VNUM)
                return -EINVAL;
 
-       if (rwl_try_acquire(&s->tx.use) > 0) {
+       if (tcp_stream_try_acquire(s) > 0) {
                rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
                        TCP_ST_SYN_SENT);
                rc = (rc == 0) ? -EDEADLK : 0;
@@ -2124,14 +2150,14 @@ tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
                rc = -EINVAL;
 
        if (rc != 0) {
-               rwl_release(&s->tx.use);
+               tcp_stream_release(s);
                return rc;
        }
 
        /* fill stream, prepare and transmit syn pkt */
        s->tcb.uop |= TCP_OP_CONNECT;
        rc = tx_syn(s, addr);
-       rwl_release(&s->tx.use);
+       tcp_stream_release(s);
 
        /* error happened, do a cleanup */
        if (rc != 0)
@@ -2147,7 +2173,7 @@ tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
        struct tle_tcp_stream *s;
 
        s = TCP_STREAM(ts);
-       n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
+       n = _rte_ring_mcs_dequeue_burst(s->rx.q, (void **)pkt, num);
        if (n == 0)
                return 0;
 
@@ -2156,14 +2182,76 @@ tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
         * then rearm stream RX event.
         */
        if (n == num && rte_ring_count(s->rx.q) != 0) {
-               if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
+               if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
                        tle_event_raise(s->rx.ev);
-               rwl_release(&s->rx.use);
+               tcp_stream_release(s);
        }
 
        return n;
 }
 
+ssize_t
+tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov,
+       int iovcnt)
+{
+       int32_t i;
+       uint32_t mn, n, tn;
+       size_t sz;
+       struct tle_tcp_stream *s;
+       struct iovec iv;
+       struct rxq_objs mo[2];
+
+       s = TCP_STREAM(ts);
+
+       /* get group of packets */
+       mn = tcp_rxq_get_objs(s, mo);
+       if (mn == 0)
+               return 0;
+
+       sz = 0;
+       n = 0;
+       for (i = 0; i != iovcnt; i++) {
+               iv = iov[i];
+               sz += iv.iov_len;
+               n += _mbus_to_iovec(&iv, mo[0].mb + n, mo[0].num - n);
+               if (iv.iov_len != 0) {
+                       sz -= iv.iov_len;
+                       break;
+               }
+       }
+
+       tn = n;
+
+       if (i != iovcnt && mn != 1) {
+               n = 0;
+               do {
+                       sz += iv.iov_len;
+                       n += _mbus_to_iovec(&iv, mo[1].mb + n, mo[1].num - n);
+                       if (iv.iov_len != 0) {
+                               sz -= iv.iov_len;
+                               break;
+                       }
+                       if (i + 1 != iovcnt)
+                               iv = iov[i + 1];
+               } while (++i != iovcnt);
+               tn += n;
+       }
+
+       tcp_rxq_consume(s, tn);
+
+       /*
+        * if we still have packets to read,
+        * then rearm stream RX event.
+        */
+       if (i == iovcnt && rte_ring_count(s->rx.q) != 0) {
+               if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
+                       tle_event_raise(s->rx.ev);
+               tcp_stream_release(s);
+       }
+
+       return sz;
+}
+
 static inline int32_t
 tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
        struct rte_mbuf *segs[], uint32_t num)
@@ -2176,16 +2264,16 @@ tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
                rc = tcp_fill_mbuf(segs[i], s, &s->tx.dst, ol_flags, s->s.port,
                        0, TCP_FLAG_ACK, 0, 0);
                if (rc != 0) {
-                       free_segments(segs, num);
+                       free_mbufs(segs, num);
                        break;
                }
        }
 
        if (i == num) {
                /* queue packets for further transmission. */
-               rc = _rte_ring_mp_enqueue_bulk(s->tx.q, (void **)segs, num);
+               rc = _rte_ring_enqueue_bulk(s->tx.q, (void **)segs, num);
                if (rc != 0)
-                       free_segments(segs, num);
+                       free_mbufs(segs, num);
        }
 
        return rc;
@@ -2194,17 +2282,16 @@ tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
 uint16_t
 tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
 {
-       uint32_t i, j, k, mss, n, state, type;
+       uint32_t i, j, k, mss, n, state;
        int32_t rc;
        uint64_t ol_flags;
        struct tle_tcp_stream *s;
-       struct tle_dev *dev;
        struct rte_mbuf *segs[TCP_MAX_PKT_SEG];
 
        s = TCP_STREAM(ts);
 
        /* mark stream as not closable. */
-       if (rwl_acquire(&s->tx.use) < 0) {
+       if (tcp_stream_acquire(s) < 0) {
                rte_errno = EAGAIN;
                return 0;
        }
@@ -2212,14 +2299,12 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
        state = s->tcb.state;
        if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
                rte_errno = ENOTCONN;
-               rwl_release(&s->tx.use);
+               tcp_stream_release(s);
                return 0;
        }
 
        mss = s->tcb.snd.mss;
-       dev = s->tx.dst.dev;
-       type = s->s.type;
-       ol_flags = dev->tx.ol_flags[type];
+       ol_flags = s->tx.dst.ol_flags;
 
        k = 0;
        rc = 0;
@@ -2237,7 +2322,7 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
 
                if (i != k) {
                        /* queue packets for further transmission. */
-                       n = _rte_ring_mp_enqueue_burst(s->tx.q,
+                       n = _rte_ring_enqueue_burst(s->tx.q,
                                (void **)pkt + k, (i - k));
                        k += n;
 
@@ -2246,7 +2331,7 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
                         * remove pkt l2/l3 headers, restore ol_flags
                         */
                        if (i != k) {
-                               ol_flags = ~dev->tx.ol_flags[type];
+                               ol_flags = ~s->tx.dst.ol_flags;
                                for (j = k; j != i; j++) {
                                        rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len +
                                                pkt[j]->l3_len +
@@ -2271,7 +2356,7 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
                                break;
                        }
 
-                       rc = tx_segments(s, dev->tx.ol_flags[type], segs, rc);
+                       rc = tx_segments(s, ol_flags, segs, rc);
                        if (rc == 0) {
                                /* free the large mbuf */
                                rte_pktmbuf_free(pkt[i]);
@@ -2290,11 +2375,120 @@ tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
        if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
                tle_event_raise(s->tx.ev);
 
-       rwl_release(&s->tx.use);
+       tcp_stream_release(s);
 
        return k;
 }
 
+ssize_t
+tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp,
+       const struct iovec *iov, int iovcnt)
+{
+       int32_t i, rc;
+       uint32_t j, k, n, num, slen, state;
+       uint64_t ol_flags;
+       size_t sz, tsz;
+       struct tle_tcp_stream *s;
+       struct iovec iv;
+       struct rte_mbuf *mb[2 * MAX_PKT_BURST];
+
+       s = TCP_STREAM(ts);
+
+       /* mark stream as not closable. */
+       if (tcp_stream_acquire(s) < 0) {
+               rte_errno = EAGAIN;
+               return -1;
+       }
+
+       state = s->tcb.state;
+       if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
+               rte_errno = ENOTCONN;
+               tcp_stream_release(s);
+               return -1;
+       }
+
+       /* figure out how many mbufs do we need */
+       tsz = 0;
+       for (i = 0; i != iovcnt; i++)
+               tsz += iov[i].iov_len;
+
+       slen = rte_pktmbuf_data_room_size(mp);
+       slen = RTE_MIN(slen, s->tcb.snd.mss);
+
+       num = (tsz + slen - 1) / slen;
+       n = rte_ring_free_count(s->tx.q);
+       num = RTE_MIN(num, n);
+       n = RTE_MIN(num, RTE_DIM(mb));
+
+       /* allocate mbufs */
+       if (rte_pktmbuf_alloc_bulk(mp, mb, n) != 0) {
+               rte_errno = ENOMEM;
+               tcp_stream_release(s);
+               return -1;
+       }
+
+       /* copy data into the mbufs */
+       k = 0;
+       sz = 0;
+       for (i = 0; i != iovcnt; i++) {
+               iv = iov[i];
+               sz += iv.iov_len;
+               k += _iovec_to_mbsegs(&iv, slen, mb + k, n - k);
+               if (iv.iov_len != 0) {
+                       sz -= iv.iov_len;
+                       break;
+               }
+       }
+
+       /* partially filled segment */
+       k += (k != n && mb[k]->data_len != 0);
+
+       /* fill pkt headers */
+       ol_flags = s->tx.dst.ol_flags;
+
+       for (j = 0; j != k; j++) {
+               rc = tcp_fill_mbuf(mb[j], s, &s->tx.dst, ol_flags,
+                       s->s.port, 0, TCP_FLAG_ACK, 0, 0);
+               if (rc != 0)
+                       break;
+       }
+
+       /* if no error encountered, then enqueue pkts for transmission */
+       if (k == j)
+               k = _rte_ring_enqueue_burst(s->tx.q, (void **)mb, j);
+       else
+               k = 0;
+
+       if (k != j) {
+
+               /* free pkts that were not enqueued */
+               free_mbufs(mb + k, j - k);
+
+               /* our last segment can be partially filled */
+               sz += slen - sz % slen;
+               sz -= (j - k) * slen;
+
+               /* report an error */
+               if (rc != 0) {
+                       rte_errno = -rc;
+                       sz = -1;
+               }
+       }
+
+        if (k != 0) {
+
+               /* notify BE about more data to send */
+               txs_enqueue(s->s.ctx, s);
+
+               /* if possible, re-arm stream write event. */
+               if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
+                       tle_event_raise(s->tx.ev);
+       }
+
+       tcp_stream_release(s);
+       return sz;
+}
+
 /* send data and FIN (if needed) */
 static inline void
 tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
@@ -2376,6 +2570,18 @@ rto_stream(struct tle_tcp_stream *s, uint32_t tms)
                } else if (state == TCP_ST_SYN_SENT) {
                        /* resending SYN */
                        s->tcb.so.ts.val = tms;
+
+                       /* According to RFC 6928 2:
+                        * To reduce the chance for spurious SYN or SYN/ACK
+                        * retransmission, it is RECOMMENDED that
+                        * implementations refrain from resetting the initial
+                        * window to 1 segment, unless there have been more
+                        * than one SYN or SYN/ACK retransmissions or true loss
+                        * detection has been made.
+                        */
+                       if (s->tcb.snd.nb_retx != 0)
+                               s->tcb.snd.cwnd = s->tcb.snd.mss;
+
                        send_ack(s, tms, TCP_FLAG_SYN);
 
                } else if (state == TCP_ST_TIME_WAIT) {
@@ -2405,7 +2611,7 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
        /* process streams with RTO exipred */
 
        tw = CTX_TCP_TMWHL(ctx);
-       tms = tcp_get_tms();
+       tms = tcp_get_tms(ctx->cycles_ms_shift);
        tle_timer_expire(tw, tms);
 
        k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs));
@@ -2414,9 +2620,9 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
 
                s = rs[i];
                s->timer.handle = NULL;
-               if (rwl_try_acquire(&s->tx.use) > 0)
+               if (tcp_stream_try_acquire(s) > 0)
                        rto_stream(s, tms);
-               rwl_release(&s->tx.use);
+               tcp_stream_release(s);
        }
 
        /* process streams from to-send queue */
@@ -2428,11 +2634,11 @@ tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
                s = rs[i];
                rte_atomic32_set(&s->tx.arm, 0);
 
-               if (rwl_try_acquire(&s->tx.use) > 0)
+               if (tcp_stream_try_acquire(s) > 0)
                        tx_stream(s, tms);
                else
                        txs_enqueue(s->s.ctx, s);
-               rwl_release(&s->tx.use);
+               tcp_stream_release(s);
        }
 
        /* collect streams to close from the death row */
index 99791d0..4e9ddb7 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -32,8 +32,7 @@ static void
 unuse_stream(struct tle_tcp_stream *s)
 {
        s->s.type = TLE_VNUM;
-       rte_atomic32_set(&s->rx.use, INT32_MIN);
-       rte_atomic32_set(&s->tx.use, INT32_MIN);
+       rte_atomic32_set(&s->use, INT32_MIN);
 }
 
 static void
@@ -99,14 +98,17 @@ static int
 init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s)
 {
        size_t bsz, rsz, sz;
-       uint32_t i, k, n, nb;
+       uint32_t f, i, k, n, nb;
        struct tle_drb *drb;
        char name[RTE_RING_NAMESIZE];
 
+       f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
+               (RING_F_SP_ENQ |  RING_F_SC_DEQ);
+
        /* init RX part. */
 
        n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
-       s->rx.q = alloc_ring(n, RING_F_SP_ENQ, ctx->prm.socket_id);
+       s->rx.q = alloc_ring(n, f | RING_F_SP_ENQ, ctx->prm.socket_id);
        if (s->rx.q == NULL)
                return -ENOMEM;
 
@@ -117,7 +119,7 @@ init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s)
        /* init TX part. */
 
        n = RTE_MAX(ctx->prm.max_stream_sbufs, 1U);
-       s->tx.q = alloc_ring(n, RING_F_SC_DEQ, ctx->prm.socket_id);
+       s->tx.q = alloc_ring(n, f | RING_F_SC_DEQ, ctx->prm.socket_id);
        if (s->tx.q == NULL)
                return -ENOMEM;
 
@@ -145,7 +147,7 @@ init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s)
        }
 
        snprintf(name, sizeof(name), "%p@%zu", s, sz);
-       rte_ring_init(s->tx.drb.r, name, n, 0);
+       rte_ring_init(s->tx.drb.r, name, n, f);
 
        for (i = 0; i != k; i++) {
                drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
@@ -177,24 +179,27 @@ tcp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb)
 }
 
 static struct tle_timer_wheel *
-alloc_timers(uint32_t num, int32_t socket)
+alloc_timers(uint32_t num, uint32_t mshift, int32_t socket)
 {
        struct tle_timer_wheel_args twprm;
 
        twprm.tick_size = TCP_RTO_GRANULARITY;
        twprm.max_timer = num;
        twprm.socket_id = socket;
-       return tle_timer_create(&twprm, tcp_get_tms());
+       return tle_timer_create(&twprm, tcp_get_tms(mshift));
 }
 
 static int
 tcp_init_streams(struct tle_ctx *ctx)
 {
        size_t sz;
-       uint32_t i;
+       uint32_t f, i;
        int32_t rc;
        struct tcp_streams *ts;
 
+       f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
+               (RING_F_SP_ENQ |  RING_F_SC_DEQ);
+
        sz = sizeof(*ts) + sizeof(ts->s[0]) * ctx->prm.max_streams;
        ts = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
                ctx->prm.socket_id);
@@ -211,14 +216,15 @@ tcp_init_streams(struct tle_ctx *ctx)
        ctx->streams.buf = ts;
        STAILQ_INIT(&ctx->streams.free);
 
-       ts->tmr = alloc_timers(ctx->prm.max_streams, ctx->prm.socket_id);
+       ts->tmr = alloc_timers(ctx->prm.max_streams, ctx->cycles_ms_shift,
+               ctx->prm.socket_id);
        if (ts->tmr == NULL) {
                TCP_LOG(ERR, "alloc_timers(ctx=%p) failed with error=%d\n",
                        ctx, rte_errno);
                rc = -ENOMEM;
        } else {
                ts->tsq = alloc_ring(ctx->prm.max_streams,
-                       RING_F_SC_DEQ, ctx->prm.socket_id);
+                       f | RING_F_SC_DEQ, ctx->prm.socket_id);
                if (ts->tsq == NULL)
                        rc = -ENOMEM;
                else
@@ -329,8 +335,13 @@ tle_tcp_stream_open(struct tle_ctx *ctx,
        s->err.cb = prm->cfg.err_cb;
 
        /* store other params */
+       s->flags = ctx->prm.flags;
        s->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries :
                TLE_TCP_DEFAULT_RETRIES;
+       s->tcb.snd.cwnd = (ctx->prm.icw == 0) ? TCP_INITIAL_CWND_MAX :
+                               ctx->prm.icw;
+       s->tcb.snd.rto_tw = (ctx->prm.timewait == TLE_TCP_TIMEWAIT_DEFAULT) ?
+                               TCP_RTO_2MSL : ctx->prm.timewait;
 
        tcp_stream_up(s);
        return &s->s;
@@ -438,15 +449,6 @@ tle_tcp_stream_close(struct tle_stream *ts)
                return -EINVAL;
 
        ctx = s->s.ctx;
-
-       /* reset stream events if any. */
-       if (s->rx.ev != NULL)
-               tle_event_idle(s->rx.ev);
-       if (s->tx.ev != NULL)
-               tle_event_idle(s->tx.ev);
-       if (s->err.ev != NULL)
-               tle_event_idle(s->err.ev);
-
        return stream_close(ctx, s);
 }
 
@@ -505,7 +507,7 @@ tle_tcp_stream_listen(struct tle_stream *ts)
                return -EINVAL;
 
        /* mark stream as not closable. */
-       if (rwl_try_acquire(&s->rx.use) > 0) {
+       if (tcp_stream_try_acquire(s) > 0) {
                rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
                                TCP_ST_LISTEN);
                if (rc != 0) {
@@ -517,7 +519,7 @@ tle_tcp_stream_listen(struct tle_stream *ts)
        } else
                rc = -EINVAL;
 
-       rwl_release(&s->rx.use);
+       tcp_stream_release(s);
        return rc;
 }
 
@@ -527,17 +529,12 @@ tle_tcp_stream_listen(struct tle_stream *ts)
 static inline int
 stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm)
 {
-       int32_t rc1, rc2;
        struct tle_tcp_stream *s;
 
        s = TCP_STREAM(ts);
 
-       rc1 = rwl_try_acquire(&s->rx.use);
-       rc2 = rwl_try_acquire(&s->tx.use);
-
-       if (rc1 < 0 || rc2 < 0 || (s->tcb.uop & TCP_OP_CLOSE) != 0) {
-               rwl_release(&s->tx.use);
-               rwl_release(&s->rx.use);
+       if (tcp_stream_try_acquire(s) < 0 || (s->tcb.uop & TCP_OP_CLOSE) != 0) {
+               tcp_stream_release(s);
                return -EINVAL;
        }
 
@@ -581,9 +578,7 @@ stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm)
                        s->err.cb.func(s->err.cb.data, &s->s);
        }
 
-       rwl_release(&s->tx.use);
-       rwl_release(&s->rx.use);
-
+       tcp_stream_release(s);
        return 0;
 }
 
@@ -606,13 +601,13 @@ tle_tcp_stream_update_cfg(struct tle_stream *ts[],
 }
 
 int
-tle_tcp_stream_get_mss(const struct tle_stream * stream)
+tle_tcp_stream_get_mss(const struct tle_stream * ts)
 {
-       struct tle_tcp_stream *tcp;
+       struct tle_tcp_stream *s;
 
-       if (stream == NULL)
+       if (ts == NULL)
                return -EINVAL;
 
-       tcp = TCP_STREAM(stream);
-       return tcp->tcb.snd.mss;
+       s = TCP_STREAM(ts);
+       return s->tcb.snd.mss;
 }
index 04c2f88..4629fe6 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -82,6 +82,7 @@ struct tcb {
                uint32_t cwnd;     /* congestion window */
                uint32_t ssthresh; /* slow start threshold */
                uint32_t rto;      /* retransmission timeout */
+               uint32_t rto_tw;   /* TIME_WAIT retransmission timeout */
                uint32_t iss;      /* initial send sequence */
                uint16_t mss;
                uint8_t  wscale;
@@ -91,11 +92,13 @@ struct tcb {
        struct syn_opts so; /* initial syn options. */
 };
 
-
 struct tle_tcp_stream {
 
        struct tle_stream s;
 
+       uint32_t flags;
+       rte_atomic32_t use;
+
        struct stbl_entry *ste;     /* entry in streams table. */
        struct tcb tcb;
 
@@ -109,7 +112,6 @@ struct tle_tcp_stream {
        } err;
 
        struct {
-               rte_atomic32_t use;
                struct rte_ring *q;     /* listen (syn) queue */
                struct ofo *ofo;
                struct tle_event *ev;    /* user provided recv event. */
@@ -117,7 +119,6 @@ struct tle_tcp_stream {
        } rx __rte_cache_aligned;
 
        struct {
-               rte_atomic32_t use;
                rte_atomic32_t arm;  /* when > 0 stream is in to-send queue */
                struct {
                        uint32_t nb_elem;  /* number of objects per drb. */
index 3a80fdd..a8d2425 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
 extern "C" {
 #endif
 
-static inline void
-free_segments(struct rte_mbuf *mb[], uint32_t num)
-{
-       uint32_t i;
-
-       for (i = 0; i != num; i++)
-               rte_pktmbuf_free(mb[i]);
-}
-
 static inline int32_t
 tcp_segmentation(struct rte_mbuf *mbin, struct rte_mbuf *mbout[], uint16_t num,
        const struct tle_dest *dst, uint16_t mss)
@@ -53,7 +44,7 @@ tcp_segmentation(struct rte_mbuf *mbin, struct rte_mbuf *mbout[], uint16_t num,
                /* Allocate direct buffer */
                out_pkt = rte_pktmbuf_alloc(dst->head_mp);
                if (out_pkt == NULL) {
-                       free_segments(mbout, nbseg);
+                       free_mbufs(mbout, nbseg);
                        return -ENOMEM;
                }
 
@@ -67,7 +58,7 @@ tcp_segmentation(struct rte_mbuf *mbin, struct rte_mbuf *mbout[], uint16_t num,
                        out_seg = rte_pktmbuf_alloc(dst->head_mp);
                        if (out_seg == NULL) {
                                rte_pktmbuf_free(out_pkt);
-                               free_segments(mbout, nbseg);
+                               free_mbufs(mbout, nbseg);
                                return -ENOMEM;
                        }
                        out_seg_prev->next = out_seg;
index 144dbe7..0ea4668 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -80,8 +80,9 @@ struct tle_dev_param {
 struct tle_dest {
        struct rte_mempool *head_mp;
        /**< MP for fragment headers and control packets. */
-       struct tle_dev *dev;     /**< device to send packets through. */
-       uint16_t mtu;                /**< MTU for given destination. */
+       struct tle_dev *dev;    /**< device to send packets through. */
+       uint64_t ol_flags;      /**< tx ofload flags. */
+       uint16_t mtu;           /**< MTU for given destination. */
        uint8_t l2_len;  /**< L2 header length. */
        uint8_t l3_len;  /**< L3 header length. */
        uint8_t hdr[TLE_DST_MAX_HDR]; /**< L2/L3 headers. */
@@ -103,6 +104,10 @@ enum {
        TLE_HASH_NUM
 };
 
+enum {
+       TLE_CTX_FLAG_ST = 1,  /**< ctx will be used by single thread */
+};
+
 struct tle_ctx_param {
        int32_t socket_id;         /**< socket ID to allocate memory for. */
        uint32_t proto;            /**< L4 proto to handle. */
@@ -110,6 +115,7 @@ struct tle_ctx_param {
        uint32_t max_stream_rbufs; /**< max recv mbufs per stream. */
        uint32_t max_stream_sbufs; /**< max send mbufs per stream. */
        uint32_t send_bulk_size;   /**< expected # of packets per send call. */
+       uint32_t flags;            /**< specific flags */
 
        int (*lookup4)(void *opaque, const struct in_addr *addr,
                struct tle_dest *res);
@@ -127,8 +133,18 @@ struct tle_ctx_param {
        /**< hash algorithm to be used to generate sequence number. */
        rte_xmm_t secret_key;
        /**< secret key to be used to calculate the hash. */
+
+       uint32_t icw; /**< initial congestion window, default is 2*MSS if 0. */
+       uint32_t timewait;
+       /**< TCP TIME_WAIT state timeout duration in milliseconds,
+        * default 2MSL, if UINT32_MAX */
 };
 
+/**
+ * use default TIMEWAIT timeout value.
+ */
+#define        TLE_TCP_TIMEWAIT_DEFAULT        UINT32_MAX
+
 /**
  * create L4 processing context.
  * @param ctx_prm
index 9086658..b0cbda6 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -119,6 +119,16 @@ int
 tle_tcp_stream_get_addr(const struct tle_stream *s,
        struct tle_tcp_stream_addr *addr);
 
+/**
+ * Get current TCP maximum segment size
+ * @param ts
+ *   Stream to retrieve MSS information from.
+ * @return
+ *   Maximum segment size in bytes, if successful.
+ *   Negative on failure.
+ */
+int tle_tcp_stream_get_mss(const struct tle_stream *ts);
+
 /**
  * Client mode connect API.
  */
@@ -257,6 +267,28 @@ uint32_t tle_tcp_stream_update_cfg(struct tle_stream *ts[],
 uint16_t tle_tcp_stream_recv(struct tle_stream *s, struct rte_mbuf *pkt[],
        uint16_t num);
 
+/**
+ * Reads iovcnt buffers from the for given TCP stream.
+ * Note that the stream has to be in connected state.
+ * Data ordering is preserved.
+ * Buffers are processed in array order.
+ * This means that the function will comppletely fill iov[0]
+ * before proceeding to iov[1], and so on.
+ * If there is insufficient data, then not all buffers pointed to by iov
+ * may be filled.
+ * @param ts
+ *   TCP stream to receive data from.
+ * @param iov
+ *   Points to an array of iovec structures.
+ * @param iovcnt
+ *   Number of elements in the *iov* array.
+ * @return
+ *   On success, number of bytes read in the stream receive buffer.
+ *   In case of error, returns -1 and error code will be set in rte_errno.
+ */
+ssize_t tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov,
+       int iovcnt);
+
 /**
  * Consume and queue up to *num* packets, that will be sent eventually
  * by tle_tcp_tx_bulk().
@@ -281,13 +313,39 @@ uint16_t tle_tcp_stream_recv(struct tle_stream *s, struct rte_mbuf *pkt[],
  *   number of packets successfully queued in the stream send buffer.
  *   In case of error, error code can be set in rte_errno.
  *   Possible rte_errno errors include:
- *   - EAGAIN - operation can be perfomed right now
+ *   - EAGAIN - operation can't be perfomed right now
  *              (most likely close() was perfomed on that stream allready).
  *   - ENOTCONN - the stream is not connected.
  */
 uint16_t tle_tcp_stream_send(struct tle_stream *s, struct rte_mbuf *pkt[],
        uint16_t num);
 
+/**
+ * Writes iovcnt buffers of data described by iov to the for given TCP stream.
+ * Note that the stream has to be in connected state.
+ * Data ordering is preserved.
+ * Buffers are processed in array order.
+ * This means that the function will write out the entire contents of iov[0]
+ * before proceeding to iov[1], and so on.
+ * If there is insufficient space in stream send buffer,
+ * then not all buffers pointed to by iov may be written out.
+ * @param ts
+ *   TCP stream to send data to.
+ * @param iov
+ *   Points to an array of iovec structures.
+ * @param iovcnt
+ *   Number of elements in the *iov* array.
+ * @return
+ *   On success, number of bytes written to the stream send buffer.
+ *   In case of error, returns -1 and error code will be set in rte_errno.
+ *   - EAGAIN - operation can't be perfomed right now
+ *              (most likely close() was perfomed on that stream allready).
+ *   - ENOTCONN - the stream is not connected.
+ *   - ENOMEM - not enough internal buffer (mbuf) to store user provided data.
+ */
+ssize_t tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp,
+       const struct iovec *iov, int iovcnt);
+
 /**
  * Back End (BE) API.
  * BE API functions are not multi-thread safe.
@@ -362,16 +420,6 @@ uint16_t tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
  */
 int tle_tcp_process(struct tle_ctx *ctx, uint32_t num);
 
-/**
- * Get current TCP maximum segment size
- * @param stream
- *   Stream to get MSS from.
- * @return
- *   Maximum segment size in bytes, if successful.
- *   Negative on failure.
- */
-int tle_tcp_stream_get_mss(const struct tle_stream * const stream);
-
 #ifdef __cplusplus
 }
 #endif
index 6757663..3736964 100644 (file)
@@ -56,6 +56,18 @@ _rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, uint32_t n)
        return rte_ring_enqueue_burst(r, (void * const *)obj_table, n, NULL);
 }
 
+static inline uint32_t
+_rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, uint32_t n)
+{
+       uint32_t rc;
+
+       rc = rte_ring_enqueue_bulk(r, (void * const *)obj_table, n, NULL);
+       if (rc == n)
+               return 0;
+       else
+               return -ENOSPC;
+}
+
 static inline uint32_t
 _rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, uint32_t n)
 {
@@ -80,6 +92,17 @@ _rte_ring_get_data(struct rte_ring *r)
        return (void **)(&r[1]);
 }
 
+static inline void
+_rte_ring_dequeue_ptrs(struct rte_ring *r, void **obj_table, uint32_t num)
+{
+       uint32_t tail;
+       void **data;
+
+       tail = r->cons.tail;
+       data = _rte_ring_get_data(r);
+       DEQUEUE_PTRS(r, data, tail, obj_table, num, void *);
+}
+
 #else
 
 static inline uint32_t
@@ -108,6 +131,13 @@ _rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, uint32_t n)
        return rte_ring_enqueue_burst(r, (void * const *)obj_table, n);
 }
 
+static inline uint32_t
+_rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+       uint32_t n)
+{
+       return rte_ring_enqueue_bulk(r, (void * const *)obj_table, n);
+}
+
 static inline uint32_t
 _rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, uint32_t n)
 {
@@ -132,10 +162,91 @@ _rte_ring_get_data(struct rte_ring *r)
        return (void **)r->ring;
 }
 
-#endif
+static inline void
+_rte_ring_dequeue_ptrs(struct rte_ring *r, void **obj_table, uint32_t num)
+{
+       uint32_t i, n;
+       uint32_t mask, cons_head;
+
+       n = num;
+       cons_head = r->cons.tail;
+       mask = _rte_ring_get_mask(r);
+
+       DEQUEUE_PTRS();
+}
+
+#endif /* RTE_VERSION >= RTE_VERSION_NUM(17, 5, 0, 0) */
+
+/*
+ * Serialized variation of DPDK rte_ring dequeue mechanism.
+ * At any given moment, only one consumer is allowed to dequeue
+ * objects from the ring.
+ */
+
+static inline __attribute__((always_inline)) uint32_t
+_rte_ring_mcs_dequeue_start(struct rte_ring *r, uint32_t num)
+{
+       uint32_t n, end, head, tail;
+       int32_t rc;
+
+       rc = 0;
+       do {
+               head = r->cons.head;
+               tail = r->cons.tail;
+               end = r->prod.tail;
+
+               if (head != tail) {
+                       rte_pause();
+                       continue;
+               }
+
+               n = end - head;
+               n = RTE_MIN(num, n);
+               if (n == 0)
+                       return 0;
+
+               rc = rte_atomic32_cmpset(&r->cons.head, head, head + n);
+       } while (rc == 0);
+
+       return n;
+}
+
+static inline __attribute__((always_inline)) void
+_rte_ring_mcs_dequeue_finish(struct rte_ring *r, uint32_t num)
+{
+       uint32_t n, head, tail;
+
+       head = r->cons.head;
+       rte_smp_rmb();
+       tail = r->cons.tail;
+       n = head - tail;
+       RTE_ASSERT(n >= num);
+       RTE_SET_USED(n);
+       head = tail + num;
+       r->cons.head = head;
+       r->cons.tail = head;
+}
+
+static inline __attribute__((always_inline)) void
+_rte_ring_mcs_dequeue_abort(struct rte_ring *r)
+{
+       r->cons.head = r->cons.tail;
+}
+
+static inline uint32_t
+_rte_ring_mcs_dequeue_burst(struct rte_ring *r, void **obj_table, uint32_t num)
+{
+       uint32_t n;
+
+       n = _rte_ring_mcs_dequeue_start(r, num);
+       _rte_ring_dequeue_ptrs(r, obj_table, n);
+       _rte_ring_mcs_dequeue_finish(r, n);
+       return n;
+}
 
 #ifdef __cplusplus
 }
 #endif
 
+
 #endif /* TLE_DPDK_WRAPPER_H_ */
index ce3e454..692fd44 100644 (file)
@@ -349,7 +349,7 @@ test_dring_st(void)
 
        printf("%s started;\n", __func__);
 
-       tle_dring_reset(&dr);
+       tle_dring_reset(&dr, 0);
        r = init_drb_ring(OBJ_NUM);
        if (r == NULL)
                return -ENOMEM;
@@ -397,7 +397,7 @@ test_dring_mt(int32_t master_enq_type, int32_t master_deq_type,
        struct tle_dring dr;
        struct dring_arg arg[RTE_MAX_LCORE];
 
-       tle_dring_reset(&dr);
+       tle_dring_reset(&dr, 0);
        r = init_drb_ring(OBJ_NUM);
        if (r == NULL)
                return -ENOMEM;
index 6ab4905..5109ac2 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -19,7 +19,7 @@ TEST_F(dring, test_dring_st)
 {
        printf("%s started;\n", __func__);
 
-       tle_dring_reset(&dr);
+       tle_dring_reset(&dr, 0);
        r = init_drb_ring(OBJ_NUM);
 
        ASSERT_NE(r, (void *) NULL) << "Out of memory";
index 32a223e..fdb2c47 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016  Intel Corporation.
+ * Copyright (c) 2016-2017  Intel Corporation.
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at:
@@ -376,7 +376,7 @@ test_dring_mt(int32_t master_enq_type, int32_t master_deq_type,
        struct tle_dring dr;
        struct dring_arg arg[RTE_MAX_LCORE];
 
-       tle_dring_reset(&dr);
+       tle_dring_reset(&dr, 0);
        r = init_drb_ring(OBJ_NUM);
        if (r == NULL)
                return -ENOMEM;