v6: make TCP stream alloc/free to use memtank API 19/23219/7
authorKonstantin Ananyev <konstantin.ananyev@intel.com>
Mon, 4 Nov 2019 13:50:31 +0000 (13:50 +0000)
committerKonstantin Ananyev <konstantin.ananyev@intel.com>
Tue, 31 Dec 2019 11:51:05 +0000 (11:51 +0000)
Introduce two extra parameters for TCP context creation:
struct {
uint32_t min;
/**< min number of free streams (grow threshold). */
uint32_t max;
/**< max number of free streams (shrink threshold). */
} free_streams;

By default these params are equal to max_streams value
(avoid dynamic allocation and preserve current beahviour).

grow() is invoked from accept() FE call to refill streams tank for BE.
shrink() is invoked from close() FE call.

Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
Change-Id: I7af6a76d64813ee4a535323e27ffbfd75037fc92

15 files changed:
app/nginx/auto/modules
app/nginx/src/tldk/module.c
app/nginx/src/tldk/ngx_tldk.h
app/nginx/src/tldk/parse.c
examples/l4fwd/Makefile
lib/libtle_l4p/Makefile
lib/libtle_l4p/ctx.c
lib/libtle_l4p/tcp_ctl.h
lib/libtle_l4p/tcp_rxtx.c
lib/libtle_l4p/tcp_stream.c
lib/libtle_l4p/tcp_stream.h
lib/libtle_l4p/tle_ctx.h
lib/libtle_l4p/udp_stream.c
test/gtest/Makefile
test/gtest/test_tle_tcp_stream.h

index f1791c9..cdba3a8 100644 (file)
@@ -1239,9 +1239,10 @@ if [ $USE_TLDK = YES ]; then
                      src/tldk/tldk_event.c
                      src/tldk/parse.c"
     ngx_module_libs="-L${TLDK_ROOT}/${RTE_TARGET}/lib -Wl,--whole-archive
-                     -ltle_l4p -ltle_dring -ltle_timer -Wl,--no-whole-archive
-                     -L${RTE_SDK}/${RTE_TARGET}/lib -Wl,--whole-archive -ldpdk
-                     -lm -lpcap -lnuma -Wl,--no-whole-archive"
+                     -ltle_l4p -ltle_dring -ltle_memtank -ltle_timer
+                     -Wl,--no-whole-archive -L${RTE_SDK}/${RTE_TARGET}/lib
+                     -Wl,--whole-archive -ldpdk -lm -lpcap -lnuma
+                     -Wl,--no-whole-archive"
     ngx_module_link=YES
     ngx_module_order=
 
index 67d9746..8736529 100644 (file)
@@ -95,6 +95,8 @@ init_context(struct tldk_ctx *tcx, const struct tldk_ctx_conf *cf,
        cprm.socket_id = sid;
        cprm.proto = TLE_PROTO_TCP;
        cprm.max_streams = cf->nb_stream;
+       cprm.free_streams.min = cf->free_streams.nb_min;
+       cprm.free_streams.max = cf->free_streams.nb_max;
        cprm.max_stream_rbufs = cf->nb_rbuf;
        cprm.max_stream_sbufs = cf->nb_sbuf;
        if (cf->be_in_worker != 0)
index ffd479b..ed6ae35 100644 (file)
@@ -86,6 +86,10 @@ struct tldk_ctx_conf {
        uint32_t lcore;
        uint32_t nb_mbuf;
        uint32_t nb_stream;
+       struct {
+               uint32_t nb_min;
+               uint32_t nb_max;
+       } free_streams;
        uint32_t nb_rbuf;
        uint32_t nb_sbuf;
        uint32_t nb_dev;
index 5d71d9e..6e20b1b 100644 (file)
@@ -413,6 +413,18 @@ tldk_ctx_parse(ngx_conf_t *cf, ngx_command_t *dummy, void *conf)
                                        &pvl) < 0)
                        return NGX_CONF_ERROR;
                tcx->nb_stream = pvl.u64;
+       } else if (ngx_strcmp(v[0].data, "min_free_streams") == 0) {
+               if (cf->args->nelts != 2 ||
+                               parse_uint_val((const char *)v[1].data,
+                                       &pvl) < 0)
+                       return NGX_CONF_ERROR;
+               tcx->free_streams.nb_min = pvl.u64;
+       } else if (ngx_strcmp(v[0].data, "max_free_streams") == 0) {
+               if (cf->args->nelts != 2 ||
+                               parse_uint_val((const char *)v[1].data,
+                                       &pvl) < 0)
+                       return NGX_CONF_ERROR;
+               tcx->free_streams.nb_max = pvl.u64;
        } else if (ngx_strcmp(v[0].data, "rbufs") == 0) {
                if (cf->args->nelts != 2 ||
                                parse_uint_val((const char *)v[1].data,
index f18b622..a6e0de3 100644 (file)
@@ -38,6 +38,7 @@ CFLAGS += -I$(RTE_OUTPUT)/include
 
 LDLIBS += -L$(RTE_OUTPUT)/lib
 LDLIBS += -ltle_l4p
+LDLIBS += -ltle_memtank
 LDLIBS += -ltle_timer
 
 EXTRA_CFLAGS += -O3
index e1357d1..5c8407e 100644 (file)
@@ -48,6 +48,7 @@ SYMLINK-y-include += tle_udp.h
 
 # this lib dependencies
 DEPDIRS-y += lib/libtle_misc
+DEPDIRS-y += lib/libtle_memtank
 DEPDIRS-y += lib/libtle_dring
 DEPDIRS-y += lib/libtle_timer
 
index b8067f0..b810983 100644 (file)
@@ -116,8 +116,6 @@ tle_ctx_create(const struct tle_ctx_param *ctx_prm)
        for (i = 0; i != RTE_DIM(ctx->use); i++)
                tle_pbm_init(ctx->use + i, LPORT_START_BLK);
 
-       ctx->streams.nb_free = ctx->prm.max_streams;
-
        /* Initialization of siphash state is done here to speed up the
         * fastpath processing.
         */
index bec1e76..7dde8ff 100644 (file)
@@ -143,10 +143,10 @@ empty_lq(struct tle_tcp_stream *s)
 static inline void
 tcp_stream_reset(struct tle_ctx *ctx, struct tle_tcp_stream *s)
 {
-       struct stbl *st;
        uint16_t uop;
+       struct tcp_streams *ts;
 
-       st = CTX_TCP_STLB(ctx);
+       ts = CTX_TCP_STREAMS(ctx);
 
        /* reset TX armed */
        rte_atomic32_set(&s->tx.arm, 0);
@@ -167,7 +167,7 @@ tcp_stream_reset(struct tle_ctx *ctx, struct tle_tcp_stream *s)
 
        if (s->ste != NULL) {
                /* remove entry from RX streams table */
-               stbl_del_stream(st, s->ste, s,
+               stbl_del_stream(&ts->st, s->ste, s,
                        (s->flags & TLE_CTX_FLAG_ST) == 0);
                s->ste = NULL;
                empty_rq(s);
@@ -181,7 +181,36 @@ tcp_stream_reset(struct tle_ctx *ctx, struct tle_tcp_stream *s)
         * if there still are pkts queued for TX,
         * then put this stream to the tail of free list.
         */
-       put_stream(ctx, &s->s, TCP_STREAM_TX_FINISHED(s));
+       if (TCP_STREAM_TX_PENDING(s)) 
+               put_stream(ctx, &s->s, 0);
+       else {
+               s->s.type = TLE_VNUM;
+               tle_memtank_free(ts->mts, (void **)&s, 1, 0);
+       }
+}
+
+static inline struct tle_tcp_stream *
+tcp_stream_get(struct tle_ctx *ctx, uint32_t flag)
+{
+       struct tle_stream *s;
+       struct tle_tcp_stream *cs;
+       struct tcp_streams *ts;
+
+       ts = CTX_TCP_STREAMS(ctx);
+       
+       /* check TX pending list */
+       s = get_stream(ctx);
+       cs = TCP_STREAM(s);
+       if (s != NULL) {
+               if (TCP_STREAM_TX_FINISHED(cs))
+                       return cs;
+               put_stream(ctx, &cs->s, 0);
+       }
+
+       if (tle_memtank_alloc(ts->mts, (void **)&cs, 1, flag) != 1)
+               return NULL;
+
+       return cs;
 }
 
 #ifdef __cplusplus
index a519645..b71a565 100644 (file)
@@ -947,15 +947,15 @@ rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
                return rc;
 
        /* allocate new stream */
-       ts = get_stream(ctx);
-       cs = TCP_STREAM(ts);
-       if (ts == NULL)
+       cs = tcp_stream_get(ctx, 0);
+       if (cs == NULL)
                return ENFILE;
 
        /* prepare stream to handle new connection */
        if (accept_prep_stream(s, st, cs, &to, tms, pi, si) == 0) {
 
                /* put new stream in the accept queue */
+               ts = &cs->s;
                if (_rte_ring_enqueue_burst(s->rx.q,
                                (void * const *)&ts, 1) == 1) {
                        *csp = cs;
@@ -1951,12 +1951,15 @@ tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
 {
        uint32_t n;
        struct tle_tcp_stream *s;
+       struct tle_memtank *mts;
 
        s = TCP_STREAM(ts);
        n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num);
        if (n == 0)
                return 0;
 
+       mts = CTX_TCP_MTS(ts->ctx);
+
        /*
         * if we still have packets to read,
         * then rearm stream RX event.
@@ -1967,6 +1970,7 @@ tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
                tcp_stream_release(s);
        }
 
+       tle_memtank_grow(mts);
        return n;
 }
 
index a212405..fce3b9a 100644 (file)
@@ -28,6 +28,8 @@
 #include "tcp_ofo.h"
 #include "tcp_txq.h"
 
+#define MAX_STREAM_BURST       0x40
+
 static void
 unuse_stream(struct tle_tcp_stream *s)
 {
@@ -42,11 +44,13 @@ tcp_fini_streams(struct tle_ctx *ctx)
 
        ts = CTX_TCP_STREAMS(ctx);
        if (ts != NULL) {
-               stbl_fini(&ts->st);
 
-               /* free the timer wheel */
+               stbl_fini(&ts->st);
                tle_timer_free(ts->tmr);
                rte_free(ts->tsq);
+               tle_memtank_dump(stdout, ts->mts, TLE_MTANK_DUMP_STAT);
+               tle_memtank_sanity_check(ts->mts, 0);
+               tle_memtank_destroy(ts->mts);
 
                STAILQ_INIT(&ts->dr.fe);
                STAILQ_INIT(&ts->dr.be);
@@ -122,7 +126,7 @@ calc_stream_szofs(struct tle_ctx *ctx, struct stream_szofs *szofs)
        szofs->size = sz;
 }
 
-static int
+static void
 init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s,
        const struct stream_szofs *szofs)
 {
@@ -163,9 +167,6 @@ init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s,
 
        s->s.ctx = ctx;
        unuse_stream(s);
-       STAILQ_INSERT_TAIL(&ctx->streams.free, &s->s, link);
-
-       return 0;
 }
 
 static void
@@ -178,38 +179,107 @@ tcp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb)
 }
 
 static struct tle_timer_wheel *
-alloc_timers(uint32_t num, uint32_t mshift, int32_t socket)
+alloc_timers(const struct tle_ctx *ctx)
 {
+       struct tle_timer_wheel *twl;
        struct tle_timer_wheel_args twprm;
 
        twprm.tick_size = TCP_RTO_GRANULARITY;
-       twprm.max_timer = num;
-       twprm.socket_id = socket;
-       return tle_timer_create(&twprm, tcp_get_tms(mshift));
+       twprm.max_timer = ctx->prm.max_streams;
+       twprm.socket_id = ctx->prm.socket_id;
+
+       twl = tle_timer_create(&twprm, tcp_get_tms(ctx->cycles_ms_shift));
+       if (twl == NULL)
+               TCP_LOG(ERR, "alloc_timers(ctx=%p) failed with error=%d\n",
+                       ctx, rte_errno);
+       return twl;
+}
+
+static void *
+mts_alloc(size_t sz, void *udata)
+{
+       struct tle_ctx *ctx;
+
+       ctx = udata;
+       return rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
+               ctx->prm.socket_id);
+}
+
+static void
+mts_free(void *p, void *udata)
+{
+       RTE_SET_USED(udata);
+       rte_free(p);
+}
+
+static void
+mts_init(void *pa[], uint32_t num, void *udata)
+{
+       uint32_t i;
+       struct tle_ctx *ctx;
+       struct tcp_streams *ts;
+
+       ctx = udata;
+       ts = CTX_TCP_STREAMS(ctx);
+
+       for (i = 0; i != num; i++)
+               init_stream(ctx, pa[i], &ts->szofs);
+}
+
+static struct tle_memtank *
+alloc_mts(struct tle_ctx *ctx, uint32_t stream_size)
+{
+       struct tle_memtank *mts;
+       struct tle_memtank_prm prm;
+
+       static const struct tle_memtank_prm cprm = {
+               .obj_align = RTE_CACHE_LINE_SIZE,
+               .flags = TLE_MTANK_OBJ_DBG,
+               .alloc = mts_alloc,
+               .free = mts_free,
+               .init = mts_init,
+       };
+
+       prm = cprm;
+       prm.udata = ctx;
+
+       prm.obj_size = stream_size;
+
+       prm.min_free = (ctx->prm.free_streams.min != 0) ?
+               ctx->prm.free_streams.min : ctx->prm.max_streams;
+       prm.max_free = (ctx->prm.free_streams.max > prm.min_free) ?
+               ctx->prm.free_streams.max : prm.min_free;
+
+       prm.nb_obj_chunk = MAX_STREAM_BURST;
+       prm.max_obj = ctx->prm.max_streams;
+
+       mts = tle_memtank_create(&prm);
+       if (mts == NULL)
+               TCP_LOG(ERR, "%s(ctx=%p) failed with error=%d\n",
+                       __func__, ctx, rte_errno);
+       else
+               tle_memtank_grow(mts);
+
+       return mts;
 }
 
 static int
 tcp_init_streams(struct tle_ctx *ctx)
 {
-       size_t sz;
-       uint32_t f, i;
+       uint32_t f;
        int32_t rc;
        struct tcp_streams *ts;
-       struct tle_tcp_stream *ps;
        struct stream_szofs szofs;
 
        f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
                (RING_F_SP_ENQ |  RING_F_SC_DEQ);
 
        calc_stream_szofs(ctx, &szofs);
+       TCP_LOG(NOTICE, "ctx:%p, caluclated stream size: %u\n",
+               ctx, szofs.size);
 
-       sz = sizeof(*ts) + szofs.size * ctx->prm.max_streams;
-       ts = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
+       ts = rte_zmalloc_socket(NULL, sizeof(*ts), RTE_CACHE_LINE_SIZE,
                ctx->prm.socket_id);
-
-       TCP_LOG(NOTICE, "allocation of %zu bytes on socket %d "
-                       "for %u tcp_streams returns %p\n",
-                       sz, ctx->prm.socket_id, ctx->prm.max_streams, ts);
        if (ts == NULL)
                return -ENOMEM;
 
@@ -221,29 +291,22 @@ tcp_init_streams(struct tle_ctx *ctx)
        ctx->streams.buf = ts;
        STAILQ_INIT(&ctx->streams.free);
 
-       ts->tmr = alloc_timers(ctx->prm.max_streams, ctx->cycles_ms_shift,
-               ctx->prm.socket_id);
-       if (ts->tmr == NULL) {
-               TCP_LOG(ERR, "alloc_timers(ctx=%p) failed with error=%d\n",
-                       ctx, rte_errno);
-               rc = -ENOMEM;
-       } else {
-               ts->tsq = alloc_ring(ctx->prm.max_streams,
-                       f | RING_F_SC_DEQ, ctx->prm.socket_id);
-               if (ts->tsq == NULL)
+       rc = stbl_init(&ts->st, ctx->prm.max_streams, ctx->prm.socket_id);
+
+       if (rc == 0) {
+               ts->tsq = alloc_ring(ctx->prm.max_streams, f | RING_F_SC_DEQ,
+                       ctx->prm.socket_id);
+               ts->tmr = alloc_timers(ctx);
+               ts->mts = alloc_mts(ctx, szofs.size);
+       
+               if (ts->tsq == NULL || ts->tmr == NULL || ts->mts == NULL)
                        rc = -ENOMEM;
-               else
-                       rc = stbl_init(&ts->st, ctx->prm.max_streams,
-                               ctx->prm.socket_id);
-       }
 
-       for (i = 0; rc == 0 && i != ctx->prm.max_streams; i++) {
-               ps = (void *)((uintptr_t)ts->s + i * ts->szofs.size);
-               rc = init_stream(ctx, ps, &ts->szofs);
+               tle_memtank_dump(stdout, ts->mts, TLE_MTANK_DUMP_STAT);
        }
 
        if (rc != 0) {
-               TCP_LOG(ERR, "initalisation of %u-th stream failed", i);
+               TCP_LOG(ERR, "initalisation of tcp streams failed");
                tcp_fini_streams(ctx);
        }
 
@@ -302,6 +365,7 @@ struct tle_stream *
 tle_tcp_stream_open(struct tle_ctx *ctx,
        const struct tle_tcp_stream_param *prm)
 {
+       struct tcp_streams *ts;
        struct tle_tcp_stream *s;
        int32_t rc;
 
@@ -310,15 +374,11 @@ tle_tcp_stream_open(struct tle_ctx *ctx,
                return NULL;
        }
 
-       s = (struct tle_tcp_stream *)get_stream(ctx);
-       if (s == NULL)  {
-               rte_errno = ENFILE;
-               return NULL;
+       ts = CTX_TCP_STREAMS(ctx);
 
-       /* some TX still pending for that stream. */
-       } else if (TCP_STREAM_TX_PENDING(s)) {
-               put_stream(ctx, &s->s, 0);
-               rte_errno = EAGAIN;
+       s = tcp_stream_get(ctx, TLE_MTANK_ALLOC_CHUNK | TLE_MTANK_ALLOC_GROW);
+       if (s == NULL) {
+               rte_errno = ENFILE;
                return NULL;
        }
 
@@ -328,7 +388,8 @@ tle_tcp_stream_open(struct tle_ctx *ctx,
                (const struct sockaddr *)&prm->addr.remote);
 
        if (rc != 0) {
-               put_stream(ctx, &s->s, 1);
+               tle_memtank_free(ts->mts, (void **)&s, 1,
+                       TLE_MTANK_FREE_SHRINK);
                rte_errno = rc;
                return NULL;
        }
@@ -426,18 +487,17 @@ tle_tcp_stream_close_bulk(struct tle_stream *ts[], uint32_t num)
 
        rc = 0;
 
-       for (i = 0; i != num; i++) {
+       for (i = 0; i != num && rc == 0; i++) {
 
                s = TCP_STREAM(ts[i]);
-               if (ts[i] == NULL || s->s.type >= TLE_VNUM) {
+               if (ts[i] == NULL || s->s.type >= TLE_VNUM)
                        rc = EINVAL;
-                       break;
-               }
 
-               ctx = s->s.ctx;
-               rc = stream_close(ctx, s);
-               if (rc != 0)
-                       break;
+               else {
+                       ctx = s->s.ctx;
+                       rc = stream_close(ctx, s);
+                       tle_memtank_shrink(CTX_TCP_MTS(ctx));
+               }
        }
 
        if (rc != 0)
@@ -448,6 +508,7 @@ tle_tcp_stream_close_bulk(struct tle_stream *ts[], uint32_t num)
 int
 tle_tcp_stream_close(struct tle_stream *ts)
 {
+       int32_t rc;
        struct tle_ctx *ctx;
        struct tle_tcp_stream *s;
 
@@ -456,7 +517,9 @@ tle_tcp_stream_close(struct tle_stream *ts)
                return -EINVAL;
 
        ctx = s->s.ctx;
-       return stream_close(ctx, s);
+       rc = stream_close(ctx, s);
+       tle_memtank_shrink(CTX_TCP_MTS(ctx));
+       return rc;
 }
 
 int
index 1bb2a42..a3d00dc 100644 (file)
@@ -18,6 +18,7 @@
 
 #include <rte_vect.h>
 #include <tle_dring.h>
+#include <tle_memtank.h>
 #include <tle_tcp.h>
 #include <tle_event.h>
 
@@ -176,9 +177,9 @@ struct tcp_streams {
        struct stbl st;
        struct tle_timer_wheel *tmr; /* timer wheel */
        struct rte_ring *tsq;        /* to-send streams queue */
+       struct tle_memtank *mts;     /* memtank to allocate streams from */
        struct sdr dr;               /* death row for zombie streams */
        struct stream_szofs szofs;   /* size and offsets for stream data */
-       struct tle_tcp_stream s[];   /* array of allocated streams. */
 };
 
 #define CTX_TCP_STREAMS(ctx)   ((struct tcp_streams *)(ctx)->streams.buf)
@@ -186,6 +187,7 @@ struct tcp_streams {
 #define CTX_TCP_TMWHL(ctx)     (CTX_TCP_STREAMS(ctx)->tmr)
 #define CTX_TCP_TSQ(ctx)       (CTX_TCP_STREAMS(ctx)->tsq)
 #define CTX_TCP_SDR(ctx)       (&CTX_TCP_STREAMS(ctx)->dr)
+#define CTX_TCP_MTS(ctx)       (CTX_TCP_STREAMS(ctx)->mts)
 
 #ifdef __cplusplus
 }
index de78a6b..391cfe3 100644 (file)
@@ -112,6 +112,12 @@ struct tle_ctx_param {
        int32_t socket_id;         /**< socket ID to allocate memory for. */
        uint32_t proto;            /**< L4 proto to handle. */
        uint32_t max_streams;      /**< max number of streams in context. */
+       struct {
+               uint32_t min;
+               /**< min number of free streams (grow threshold). */
+               uint32_t max;
+               /**< max number of free streams (shrink threshold). */
+       } free_streams;
        uint32_t max_stream_rbufs; /**< max recv mbufs per stream. */
        uint32_t max_stream_sbufs; /**< max send mbufs per stream. */
        uint32_t send_bulk_size;   /**< expected # of packets per send call. */
index 29f5a40..a50c420 100644 (file)
@@ -170,6 +170,7 @@ udp_init_streams(struct tle_ctx *ctx)
                }
        }
 
+       ctx->streams.nb_free = ctx->prm.max_streams;
        return 0;
 }
 
index e980c23..3858306 100644 (file)
@@ -125,7 +125,7 @@ LDLIBS += -lstdc++
 LDLIBS += -L$(GMOCK_DIR) -L$(GMOCK_DIR)/../lib -lgmock
 LDLIBS += -L$(GMOCK_DIR)/gtest -L$(GMOCK_DIR)/../lib -lgtest
 LDLIBS += -L$(RTE_OUTPUT)/lib
-LDLIBS += -whole-archive -ltle_l4p -ltle_dring -ltle_timer
+LDLIBS += -whole-archive -ltle_l4p -ltle_dring -ltle_memtank -ltle_timer
 
 include $(TLDK_ROOT)/mk/tle.app.mk
 endif
index 2caf2b5..cb2946e 100644 (file)
@@ -41,6 +41,10 @@ static struct tle_ctx_param ctx_prm_tmpl = {
        .socket_id = SOCKET_ID_ANY,
        .proto = TLE_PROTO_TCP,
        .max_streams = MAX_STREAMS,
+       . free_streams = {
+               .min = 0,
+               .max = 0,
+       },
        .max_stream_rbufs = MAX_STREAM_RBUFS,
        .max_stream_sbufs = MAX_STREAM_SBUFS,
 };