Rewrite accept() code-path and make l4fwd not to close() on FIN immediatelly. 17/5617/3
authorKonstantin Ananyev <konstantin.ananyev@intel.com>
Fri, 3 Mar 2017 18:40:23 +0000 (18:40 +0000)
committerKonstantin Ananyev <konstantin.ananyev@intel.com>
Mon, 6 Mar 2017 15:06:38 +0000 (15:06 +0000)
Changes in public API:
 - removes tle_tcp_stream_synreqs() and tle_tcp_reject()
 - adds tle_tcp_stream_update_cfg
Allocates and fills new stream when final ACK for 3-way handshake
is received.

Changes in l4fwd sample application:
prevents l4fwd to call close() on error event immediately:
first try to recv/send remaining data.

Change-Id: I8c5b9d365353084083731a4ce582197a8268688f
Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
examples/l4fwd/common.h
examples/l4fwd/netbe.h
examples/l4fwd/tcp.h
lib/libtle_l4p/stream_table.h
lib/libtle_l4p/syncookie.h
lib/libtle_l4p/tcp_ctl.h
lib/libtle_l4p/tcp_rxtx.c
lib/libtle_l4p/tcp_stream.c
lib/libtle_l4p/tle_event.h
lib/libtle_l4p/tle_tcp.h

index ff8ee7a..8d757f3 100644 (file)
@@ -619,7 +619,7 @@ netbe_lcore(void)
        }
 }
 
-static inline void
+static inline int
 netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
 {
        uint32_t k, n;
@@ -631,12 +631,12 @@ netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
        if (k == 0) {
                tle_event_idle(fes->rxev);
                fes->stat.rxev[TLE_SEV_IDLE]++;
-               return;
+               return 0;
        }
 
        n = tle_stream_recv(fes->s, fes->pbuf.pkt + n, k);
        if (n == 0)
-               return;
+               return 0;
 
        NETFE_TRACE("%s(%u): tle_%s_stream_recv(%p, %u) returns %u\n",
                __func__, lcore, proto_name[fes->proto], fes->s, k, n);
@@ -648,7 +648,7 @@ netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
        if (fes->op == RXONLY)
                fes->stat.rxb += pkt_buf_empty(&fes->pbuf);
        /* mark stream as writable */
-       else if (k ==  RTE_DIM(fes->pbuf.pkt)) {
+       else if (k == RTE_DIM(fes->pbuf.pkt)) {
                if (fes->op == RXTX) {
                        tle_event_active(fes->txev, TLE_SEV_UP);
                        fes->stat.txev[TLE_SEV_UP]++;
@@ -657,6 +657,8 @@ netfe_rx_process(__rte_unused uint32_t lcore, struct netfe_stream *fes)
                        fes->stat.txev[TLE_SEV_UP]++;
                }
        }
+
+       return n;
 }
 
 #endif /* COMMON_H_ */
index 6d25603..80d1c28 100644 (file)
@@ -195,6 +195,7 @@ struct netfe_stream {
        uint16_t proto;
        uint16_t family;
        uint16_t txlen;
+       uint16_t posterr; /* # of time error event handling was postponed */
        struct {
                uint64_t rxp;
                uint64_t rxb;
index 031ad8d..f6ca3a5 100644 (file)
@@ -23,7 +23,9 @@ netfe_stream_term_tcp(struct netfe_lcore *fe, struct netfe_stream *fes)
 {
        fes->s = NULL;
        fes->fwds = NULL;
+       fes->posterr = 0;
        memset(&fes->stat, 0, sizeof(fes->stat));
+       pkt_buf_empty(&fes->pbuf);
        netfe_put_stream(fe, &fe->free, fes);
 }
 
@@ -251,7 +253,7 @@ netfe_create_fwd_stream(struct netfe_lcore *fe, struct netfe_stream *fes,
        return fws;
 }
 
-static inline void
+static inline int
 netfe_fwd_tcp(uint32_t lcore, struct netfe_stream *fes)
 {
        uint32_t i, k, n;
@@ -264,7 +266,7 @@ netfe_fwd_tcp(uint32_t lcore, struct netfe_stream *fes)
        pkt = fes->pbuf.pkt;
 
        if (n == 0)
-               return;
+               return 0;
 
        fed = fes->fwds;
 
@@ -307,88 +309,73 @@ netfe_fwd_tcp(uint32_t lcore, struct netfe_stream *fes)
                tle_event_active(fes->rxev, TLE_SEV_UP);
                fes->stat.rxev[TLE_SEV_UP]++;
        }
+
+       return (fed == NULL) ? 0 : k;
 }
 
 static inline void
-netfe_new_conn_tcp(struct netfe_lcore *fe, __rte_unused uint32_t lcore,
+netfe_new_conn_tcp(struct netfe_lcore *fe, uint32_t lcore,
        struct netfe_stream *fes)
 {
-       uint32_t i, k, n, rc;
-       struct tle_tcp_stream_cfg *prm;
-       struct tle_tcp_accept_param acpt_prm[MAX_PKT_BURST];
-       struct tle_stream *rs[MAX_PKT_BURST];
-       struct tle_syn_req syn_reqs[MAX_PKT_BURST];
+       uint32_t i, k, n;
        struct netfe_stream *ts;
+       struct tle_stream *rs[MAX_PKT_BURST];
        struct netfe_stream *fs[MAX_PKT_BURST];
-
-       static const struct tle_stream_cb zcb = {.func = NULL, .data = NULL};
+       struct tle_tcp_stream_cfg prm[MAX_PKT_BURST];
 
        /* check if any syn requests are waiting */
-       n = tle_tcp_stream_synreqs(fes->s, syn_reqs, RTE_DIM(syn_reqs));
+       n = tle_tcp_stream_accept(fes->s, rs, RTE_DIM(rs));
        if (n == 0)
                return;
 
-       NETFE_TRACE("%s(%u): tle_tcp_stream_synreqs(%p, %u) returns %u\n",
+       NETFE_TRACE("%s(%u): tle_tcp_stream_accept(%p, %u) returns %u\n",
                __func__, lcore, fes->s, MAX_PKT_BURST, n);
 
        /* get n free streams */
        k = netfe_get_streams(&fe->free, fs, n);
+       if (n != k)
+               RTE_LOG(ERR, USER1,
+                       "%s(lc=%u): not enough FE resources to handle %u new "
+                       "TCP streams;\n",
+                       __func__, lcore, n - k);
 
        /* fill accept params to accept k connection requests*/
        for (i = 0; i != k; i++) {
-               acpt_prm[i].syn = syn_reqs[i];
-               prm = &acpt_prm[i].cfg;
-               prm->nb_retries = 0;
-               prm->recv_ev = fs[i]->rxev;
-               prm->send_ev = fs[i]->txev;
-               prm->err_ev = fs[i]->erev;
-               tle_event_active(fs[i]->erev, TLE_SEV_DOWN);
-               prm->err_cb = zcb;
-               prm->recv_cb = zcb;
-               prm->send_cb = zcb;
-       }
-
-       /* accept k new connections */
-       rc = tle_tcp_stream_accept(fes->s, acpt_prm, rs, k);
-
-       NETFE_TRACE("%s(%u): tle_tcp_stream_accept(%p, %u) returns %u\n",
-               __func__, lcore, fes->s, k, rc);
-
-       if (rc != n) {
-               /* n - rc connections could not be accepted */
-               tle_tcp_reject(fes->s, syn_reqs + rc, n - rc);
-
-               /* put back k - rc streams free list */
-               netfe_put_streams(fe, &fe->free, fs + rc, k - rc);
-       }
-
-       /* update the params for accepted streams */
-       for (i = 0; i != rc; i++) {
 
                ts = fs[i];
-
                ts->s = rs[i];
                ts->op = fes->op;
                ts->proto = fes->proto;
                ts->family = fes->family;
                ts->txlen = fes->txlen;
 
-               if (fes->op == TXONLY) {
+               tle_event_active(ts->erev, TLE_SEV_DOWN);
+               if (fes->op == TXONLY || fes->op == FWD) {
                        tle_event_active(ts->txev, TLE_SEV_UP);
                        ts->stat.txev[TLE_SEV_UP]++;
-               } else {
+               }
+               if (fes->op != TXONLY) {
                        tle_event_active(ts->rxev, TLE_SEV_DOWN);
                        ts->stat.rxev[TLE_SEV_DOWN]++;
                }
 
                netfe_put_stream(fe, &fe->use, ts);
-               NETFE_TRACE("%s(%u) accept (stream=%p, s=%p)\n",
-                       __func__, lcore, ts, rs[i]);
 
-               /* create a new fwd stream if needed */
-               if (fes->op == FWD) {
-                       tle_event_active(ts->txev, TLE_SEV_DOWN);
-                       ts->stat.txev[TLE_SEV_DOWN]++;
+               memset(&prm[i], 0, sizeof(prm[i]));
+               prm[i].recv_ev = ts->rxev;
+               prm[i].send_ev = ts->txev;
+               prm[i].err_ev = ts->erev;
+       }
+
+       tle_tcp_stream_update_cfg(rs, prm, k);
+
+       tle_tcp_stream_close_bulk(rs + k, n - k);
+
+       /* for the forwarding mode, open the second one */
+       if (fes->op == FWD) {
+               for (i = 0; i != k; i++) {
+
+                       ts = fs[i];
 
                        ts->fwds = netfe_create_fwd_stream(fe, fes, lcore,
                                fes->fwdprm.bidx);
@@ -396,8 +383,9 @@ netfe_new_conn_tcp(struct netfe_lcore *fe, __rte_unused uint32_t lcore,
                                ts->fwds->fwds = ts;
                }
        }
-       fe->tcp_stat.acc += rc;
-       fe->tcp_stat.rej += n - rc;
+
+       fe->tcp_stat.acc += k;
+       fe->tcp_stat.rej += n - k;
 }
 
 static inline void
@@ -430,7 +418,7 @@ netfe_lcore_tcp_rst(void)
 {
        struct netfe_lcore *fe;
        struct netfe_stream *fwds;
-       uint32_t j, n;
+       uint32_t j, k, n;
        struct tle_stream *s[MAX_PKT_BURST];
        struct netfe_stream *fs[MAX_PKT_BURST];
        struct tle_event *rv[MAX_PKT_BURST];
@@ -449,36 +437,44 @@ netfe_lcore_tcp_rst(void)
        NETFE_TRACE("%s(%u): tle_evq_get(errevq=%p) returns %u\n",
                __func__, rte_lcore_id(), fe->ereq, n);
 
+       k = 0;
        for (j = 0; j != n; j++) {
                if (verbose > VERBOSE_NONE) {
                        struct tle_tcp_stream_addr addr;
                        tle_tcp_stream_get_addr(fs[j]->s, &addr);
                        netfe_stream_dump(fs[j], &addr.local, &addr.remote);
                }
-               s[j] = fs[j]->s;
-               rv[j] = fs[j]->rxev;
-               tv[j] = fs[j]->txev;
-               ev[j] = fs[j]->erev;
+
+               /* check do we still have something to send/recv */
+               if (fs[j]->posterr == 0 &&
+                               (tle_event_state(fs[j]->rxev) == TLE_SEV_UP ||
+                               tle_event_state(fs[j]->txev) == TLE_SEV_UP)) {
+                       fs[j]->posterr++;
+               } else {
+                       s[k] = fs[j]->s;
+                       rv[k] = fs[j]->rxev;
+                       tv[k] = fs[j]->txev;
+                       ev[k] = fs[j]->erev;
+                       fs[k] = fs[j];
+                       k++;
+               }
        }
 
-       tle_evq_idle(fe->rxeq, rv, n);
-       tle_evq_idle(fe->txeq, tv, n);
-       tle_evq_idle(fe->ereq, ev, n);
+       if (k == 0)
+               return;
 
-       tle_tcp_stream_close_bulk(s, n);
+       tle_evq_idle(fe->rxeq, rv, k);
+       tle_evq_idle(fe->txeq, tv, k);
+       tle_evq_idle(fe->ereq, ev, k);
 
-       for (j = 0; j != n; j++) {
+       tle_tcp_stream_close_bulk(s, k);
+
+       for (j = 0; j != k; j++) {
 
-               /*
-                * if forwarding mode, send unsent packets and
-                * signal peer stream to terminate too.
-                */
+               /* if forwarding mode, signal peer stream to terminate too. */
                fwds = fs[j]->fwds;
                if (fwds != NULL && fwds->s != NULL) {
 
-                       /* forward all unsent packets */
-                       netfe_fwd_tcp(rte_lcore_id(), fs[j]);
-
                        fwds->fwds = NULL;
                        tle_event_raise(fwds->erev);
                        fs[j]->fwds = NULL;
@@ -491,7 +487,7 @@ netfe_lcore_tcp_rst(void)
        }
 }
 
-static inline void
+static inline int
 netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes)
 {
        uint32_t i, k, n;
@@ -504,7 +500,7 @@ netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes)
        if (n == 0) {
                tle_event_idle(fes->txev);
                fes->stat.txev[TLE_SEV_IDLE]++;
-               return;
+               return 0;
        }
 
 
@@ -512,13 +508,13 @@ netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes)
 
        NETFE_TRACE("%s(%u): tle_%s_stream_send(%p, %u) returns %u\n",
                __func__, lcore, proto_name[fes->proto],
-       fes->s, n, k);
+               fes->s, n, k);
        fes->stat.txp += k;
        fes->stat.drops += n - k;
 
        /* not able to send anything. */
        if (k == 0)
-               return;
+               return 0;
 
        if (n == RTE_DIM(fes->pbuf.pkt)) {
                /* mark stream as readable */
@@ -530,19 +526,22 @@ netfe_rxtx_process_tcp(__rte_unused uint32_t lcore, struct netfe_stream *fes)
        fes->pbuf.num = n - k;
        for (i = 0; i != n - k; i++)
                pkt[i] = pkt[i + k];
+
+       return k;
 }
 
-static inline void
+static inline int
 netfe_tx_process_tcp(uint32_t lcore, struct netfe_stream *fes)
 {
        uint32_t i, k, n;
 
        /* refill with new mbufs. */
-       pkt_buf_fill(lcore, &fes->pbuf, fes->txlen);
+       if (fes->posterr == 0)
+               pkt_buf_fill(lcore, &fes->pbuf, fes->txlen);
 
        n = fes->pbuf.num;
        if (n == 0)
-               return;
+               return 0;
 
        /**
         * TODO: cannot use function pointers for unequal param num.
@@ -555,19 +554,22 @@ netfe_tx_process_tcp(uint32_t lcore, struct netfe_stream *fes)
        fes->stat.drops += n - k;
 
        if (k == 0)
-               return;
+               return 0;
 
        /* adjust pbuf array. */
        fes->pbuf.num = n - k;
        for (i = k; i != n; i++)
                fes->pbuf.pkt[i - k] = fes->pbuf.pkt[i];
+
+       return k;
 }
 
 static inline void
 netfe_lcore_tcp(void)
 {
-       struct netfe_lcore *fe;
+       int32_t rc;
        uint32_t j, n, lcore;
+       struct netfe_lcore *fe;
        struct netfe_stream *fs[MAX_PKT_BURST];
 
        fe = RTE_PER_LCORE(_fe);
@@ -580,25 +582,42 @@ netfe_lcore_tcp(void)
        n = tle_evq_get(fe->rxeq, (const void **)(uintptr_t)fs, RTE_DIM(fs));
 
        if (n != 0) {
+
                NETFE_TRACE("%s(%u): tle_evq_get(rxevq=%p) returns %u\n",
                        __func__, lcore, fe->rxeq, n);
-               for (j = 0; j != n; j++)
-                       netfe_rx_process(lcore, fs[j]);
+
+               for (j = 0; j != n; j++) {
+
+                       rc = netfe_rx_process(lcore, fs[j]);
+
+                       /* we are ok to close the stream */
+                       if (rc == 0 && fs[j]->posterr != 0)
+                               tle_event_raise(fs[j]->erev);
+               }
        }
 
        /* look for tx events */
        n = tle_evq_get(fe->txeq, (const void **)(uintptr_t)fs, RTE_DIM(fs));
 
        if (n != 0) {
+
                NETFE_TRACE("%s(%u): tle_evq_get(txevq=%p) returns %u\n",
                        __func__, lcore, fe->txeq, n);
+
                for (j = 0; j != n; j++) {
+
+                       rc = 0;
+
                        if (fs[j]->op == RXTX)
-                               netfe_rxtx_process_tcp(lcore, fs[j]);
+                               rc = netfe_rxtx_process_tcp(lcore, fs[j]);
                        else if (fs[j]->op == FWD)
-                               netfe_fwd_tcp(lcore, fs[j]);
+                               rc = netfe_fwd_tcp(lcore, fs[j]);
                        else if (fs[j]->op == TXONLY)
-                               netfe_tx_process_tcp(lcore, fs[j]);
+                               rc = netfe_tx_process_tcp(lcore, fs[j]);
+
+                       /* we are ok to close the stream */
+                       if (rc == 0 && fs[j]->posterr != 0)
+                               tle_event_raise(fs[j]->erev);
                }
        }
 }
index 8ad1103..29f1f63 100644 (file)
@@ -110,13 +110,13 @@ stbl_add_entry(struct stbl *st, const union pkt_info *pi)
 }
 
 static inline struct stbl_entry *
-stbl_add_pkt(struct stbl *st, const union pkt_info *pi, const void *pkt)
+stbl_add_stream(struct stbl *st, const union pkt_info *pi, const void *s)
 {
        struct stbl_entry *se;
 
        se = stbl_add_entry(st, pi);
        if (se != NULL)
-               se->data = (void *)((uintptr_t)pkt | STE_PKT);
+               se->data = (void *)(uintptr_t)s;
        return se;
 }
 
index 276d45a..ad70b7d 100644 (file)
@@ -156,13 +156,8 @@ sync_get_opts(struct syn_opts *so, uintptr_t p, uint32_t len)
 
 static inline void
 sync_fill_tcb(struct tcb *tcb, const union seg_info *si,
-       const struct rte_mbuf *mb)
+       const struct syn_opts *so)
 {
-       const struct tcp_hdr *th;
-
-       th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
-               mb->l2_len + mb->l3_len);
-
        tcb->rcv.nxt = si->seq;
        tcb->rcv.irs = si->seq - 1;
 
@@ -174,7 +169,7 @@ sync_fill_tcb(struct tcb *tcb, const union seg_info *si,
        tcb->snd.wu.wl1 = si->seq;
        tcb->snd.wu.wl2 = si->ack;
 
-       get_syn_opts(&tcb->so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th));
+       tcb->so = *so;
 
        tcb->snd.wscale = tcb->so.wscale;
        tcb->snd.mss = tcb->so.mss;
index dcb9c3e..95c2bbc 100644 (file)
@@ -41,6 +41,13 @@ tcp_stream_up(struct tle_tcp_stream *s)
        rwl_up(&s->tx.use);
 }
 
+/* calculate RCV.WND value based on size of stream receive buffer */
+static inline uint32_t
+calc_rx_wnd(const struct tle_tcp_stream *s, uint32_t scale)
+{
+       return  s->rx.q->prod.mask << scale;
+}
+
 /* empty stream's receive queue */
 static void
 empty_rq(struct tle_tcp_stream *s)
index 4e43730..6085814 100644 (file)
@@ -173,7 +173,7 @@ fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
        l4h->dst_port = port.src;
 
        wnd = (flags & TCP_FLAG_SYN) ?
-               RTE_MAX(TCP4_MIN_MSS, tcb->so.mss) :
+               RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
                tcb->rcv.wnd >> tcb->rcv.wscale;
 
        /* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
@@ -760,68 +760,24 @@ rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
 }
 
 static inline int
-restore_syn_pkt(const union pkt_info *pi, const union seg_info *si,
-       uint32_t ts, struct rte_mbuf *mb)
+restore_syn_opt(struct syn_opts *so, const union pkt_info *pi,
+       const union seg_info *si, uint32_t ts, const struct rte_mbuf *mb)
 {
        int32_t rc;
        uint32_t len;
-       struct tcp_hdr *th;
-       struct syn_opts so;
+       const struct tcp_hdr *th;
 
        /* check that ACK, etc fields are what we expected. */
        rc = sync_check_ack(pi, si->seq, si->ack - 1, ts);
        if (rc < 0)
                return rc;
 
-       so.mss = rc;
+       so->mss = rc;
 
-       th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *,
+       th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
                mb->l2_len + mb->l3_len);
        len = mb->l4_len - sizeof(*th);
-       sync_get_opts(&so, (uintptr_t)(th + 1), len);
-
-       /* reconstruct SYN options, extend header size if necessary */
-       if (len < TCP_TX_OPT_LEN_MAX) {
-               len = TCP_TX_OPT_LEN_MAX - len;
-               th->data_off = TCP_TX_OPT_LEN_MAX / TCP_DATA_ALIGN <<
-                       TCP_DATA_OFFSET;
-               mb->pkt_len += len;
-               mb->data_len += len;
-               mb->l4_len += len;
-       }
-
-       fill_syn_opts(th + 1, &so);
-       return 0;
-}
-
-static inline int
-rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
-       const union pkt_info *pi, const union seg_info *si,
-       uint32_t ts, struct rte_mbuf *mb)
-{
-       int32_t rc;
-       struct stbl_entry *se;
-
-       if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
-               return -EINVAL;
-
-       /* ACK for new connection request. */
-
-       rc = restore_syn_pkt(pi, si, ts, mb);
-       if (rc < 0)
-               return rc;
-
-       se = stbl_add_pkt(st, pi, mb);
-       if (se == NULL)
-               return -ENOBUFS;
-
-       /* put new connection requests into stream listen queue */
-       if (rte_ring_enqueue_burst(s->rx.q,
-                       (void * const *)&se, 1) != 1) {
-               stbl_del_pkt(st, se, pi);
-               return -ENOBUFS;
-       }
-
+       sync_get_opts(so, (uintptr_t)(th + 1), len);
        return 0;
 }
 
@@ -848,6 +804,151 @@ stream_term(struct tle_tcp_stream *s)
                s->err.cb.func(s->err.cb.data, &s->s);
 }
 
+static inline int
+stream_fill_dest(struct tle_tcp_stream *s)
+{
+       int32_t rc;
+       const void *da;
+
+       if (s->s.type == TLE_V4)
+               da = &s->s.ipv4.addr.src;
+       else
+               da = &s->s.ipv6.addr.src;
+
+       rc = stream_get_dest(&s->s, da, &s->tx.dst);
+       return (rc < 0) ? rc : 0;
+}
+
+/*
+ * helper function, prepares a new accept stream.
+ */
+static inline int
+accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
+       struct tle_tcp_stream *cs, const struct syn_opts *so,
+       uint32_t tms, const union pkt_info *pi, const union seg_info *si)
+{
+       int32_t rc;
+       uint32_t rtt;
+
+       /* some TX still pending for that stream. */
+       if (TCP_STREAM_TX_PENDING(cs))
+               return -EAGAIN;
+
+       /* setup L4 ports and L3 addresses fields. */
+       cs->s.port.raw = pi->port.raw;
+       cs->s.pmsk.raw = UINT32_MAX;
+
+       if (pi->tf.type == TLE_V4) {
+               cs->s.ipv4.addr = pi->addr4;
+               cs->s.ipv4.mask.src = INADDR_NONE;
+               cs->s.ipv4.mask.dst = INADDR_NONE;
+       } else if (pi->tf.type == TLE_V6) {
+               cs->s.ipv6.addr = *pi->addr6;
+               rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
+                       sizeof(cs->s.ipv6.mask.src));
+               rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
+                       sizeof(cs->s.ipv6.mask.dst));
+       }
+
+       /* setup TCB */
+       sync_fill_tcb(&cs->tcb, si, so);
+       cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
+
+       /*
+        * estimate the rto
+        * for now rtt is calculated based on the tcp TMS option,
+        * later add real-time one
+        */
+       if (cs->tcb.so.ts.ecr) {
+               rtt = tms - cs->tcb.so.ts.ecr;
+               rto_estimate(&cs->tcb, rtt);
+       } else
+               cs->tcb.snd.rto = TCP_RTO_DEFAULT;
+
+       /* copy streams type. */
+       cs->s.type = ps->s.type;
+
+       /* retrive and cache destination information. */
+       rc = stream_fill_dest(cs);
+       if (rc != 0)
+               return rc;
+
+       /* update snd.mss with SMSS value */
+       cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
+
+       /* setup congestion variables */
+       cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss);
+       cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
+
+       cs->tcb.state = TCP_ST_ESTABLISHED;
+
+       /* add stream to the table */
+       cs->ste = stbl_add_stream(st, pi, cs);
+       if (cs->ste == NULL)
+               return -ENOBUFS;
+
+       cs->tcb.uop |= TCP_OP_ACCEPT;
+       tcp_stream_up(cs);
+       return 0;
+}
+
+
+/*
+ * ACK for new connection request arrived.
+ * Check that the packet meets all conditions and try to open a new stream.
+ * returns:
+ * < 0  - invalid packet
+ * == 0 - packet is valid and new stream was opened for it.
+ * > 0  - packet is valid, but failed to open new stream.
+ */
+static inline int
+rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
+       const union pkt_info *pi, const union seg_info *si,
+       uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
+{
+       int32_t rc;
+       struct tle_ctx *ctx;
+       struct tle_stream *ts;
+       struct tle_tcp_stream *cs;
+       struct syn_opts so;
+
+       *csp = NULL;
+
+       if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
+               return -EINVAL;
+
+       rc = restore_syn_opt(&so, pi, si, tms, mb);
+       if (rc < 0)
+               return rc;
+
+       ctx = s->s.ctx;
+
+       /* allocate new stream */
+       ts = get_stream(ctx);
+       cs = TCP_STREAM(ts);
+       if (ts == NULL)
+               return ENFILE;
+
+       /* prepare stream to handle new connection */
+       if (accept_prep_stream(s, st, cs, &so, tms, pi, si) == 0) {
+
+               /* put new stream in the accept queue */
+               if (rte_ring_enqueue_burst(s->rx.q,
+                               (void * const *)&ts, 1) == 1) {
+                       *csp = cs;
+                       return 0;
+               }
+
+               /* cleanup on failure */
+               tcp_stream_down(cs);
+               stbl_del_pkt(st, cs->ste, pi);
+               cs->ste = NULL;
+       }
+
+       tcp_stream_reset(ctx, cs);
+       return ENOBUFS;
+}
+
 static inline int
 data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf *mb, uint32_t hlen,
        uint32_t *seqn, uint32_t *plen)
@@ -1591,14 +1692,35 @@ rx_stream(struct tle_tcp_stream *s, uint32_t ts,
        return num - k;
 }
 
+static inline uint32_t
+rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
+       const union pkt_info *pi, const union seg_info si[],
+       struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
+       uint32_t num)
+{
+       uint32_t i;
+
+       if (rwl_acquire(&s->rx.use) > 0) {
+               i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
+               rwl_release(&s->rx.use);
+               return i;
+       }
+
+       for (i = 0; i != num; i++) {
+               rc[i] = ENOENT;
+               rp[i] = mb[i];
+       }
+       return 0;
+}
+
 static inline uint32_t
 rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
        const union pkt_info pi[], const union seg_info si[],
        struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
        uint32_t num)
 {
-       struct tle_tcp_stream *s;
-       uint32_t i, k, state;
+       struct tle_tcp_stream *cs, *s;
+       uint32_t i, k, n, state;
        int32_t ret;
 
        s = rx_obtain_stream(dev, st, &pi[0], type);
@@ -1616,25 +1738,51 @@ rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
        if (state == TCP_ST_LISTEN) {
 
                /* one connection per flow */
-               ret = EINVAL;
-               for (i = 0; i != num && ret != 0; i++) {
-                       ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i]);
-                       if (ret != 0) {
-                               rc[k] = -ret;
-                               rp[k] = mb[i];
-                               k++;
-                       }
-               }
-               /* duplicate SYN requests */
-               for (; i != num; i++, k++) {
-                       rc[k] = EINVAL;
+               cs = NULL;
+               ret = -EINVAL;
+               for (i = 0; i != num; i++) {
+
+                       ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs);
+
+                       /* valid packet encountered */
+                       if (ret >= 0)
+                               break;
+
+                       /* invalid packet, keep trying to find a proper one */
+                       rc[k] = -ret;
                        rp[k] = mb[i];
+                       k++;
                }
 
-               if (k != num && s->rx.ev != NULL)
-                       tle_event_raise(s->rx.ev);
-               else if (s->rx.cb.func != NULL && rte_ring_count(s->rx.q) == 1)
-                       s->rx.cb.func(s->rx.cb.data, &s->s);
+               /* packet is valid, but we are out of streams to serve it */
+               if (ret > 0) {
+                       for (; i != num; i++, k++) {
+                               rc[k] = ret;
+                               rp[k] = mb[i];
+                       }
+               /* new stream is accepted */
+               } else if (ret == 0) {
+
+                       /* inform listen stream about new connections */
+                       if (s->rx.ev != NULL)
+                               tle_event_raise(s->rx.ev);
+                       else if (s->rx.cb.func != NULL &&
+                                       rte_ring_count(s->rx.q) == 1)
+                               s->rx.cb.func(s->rx.cb.data, &s->s);
+
+                       /* if there is no data, drop current packet */
+                       if (PKT_L4_PLEN(mb[i]) == 0) {
+                               rc[k] = ENODATA;
+                               rp[k++] = mb[i++];
+                       }
+
+                       /*  process remaining packets for that stream */
+                       if (num != i) {
+                               n = rx_new_stream(cs, ts, pi + i, si + i,
+                                       mb + i, rp + k, rc + k, num - i);
+                               k += num - n - i;
+                       }
+               }
 
        } else {
                i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
@@ -1761,23 +1909,17 @@ tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
 }
 
 uint16_t
-tle_tcp_stream_synreqs(struct tle_stream *ts, struct tle_syn_req rq[],
+tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
        uint32_t num)
 {
-       uint32_t i, n;
+       uint32_t n;
        struct tle_tcp_stream *s;
-       struct stbl_entry *se[num];
 
        s = TCP_STREAM(ts);
-       n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)se, num);
+       n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)rs, num);
        if (n == 0)
                return 0;
 
-       for (i = 0; i != n; i++) {
-               rq[i].pkt = stbl_get_pkt(se[i]);
-               rq[i].opaque = se[i];
-       }
-
        /*
         * if we still have packets to read,
         * then rearm stream RX event.
@@ -1791,206 +1933,6 @@ tle_tcp_stream_synreqs(struct tle_stream *ts, struct tle_syn_req rq[],
        return n;
 }
 
-static inline int
-stream_fill_dest(struct tle_tcp_stream *s)
-{
-       int32_t rc;
-       const void *da;
-
-       if (s->s.type == TLE_V4)
-               da = &s->s.ipv4.addr.src;
-       else
-               da = &s->s.ipv6.addr.src;
-
-       rc = stream_get_dest(&s->s, da, &s->tx.dst);
-       return (rc < 0) ? rc : 0;
-}
-
-/*
- * helper function, prepares an accepted stream.
- */
-static int
-accept_fill_stream(struct tle_tcp_stream *ps, struct tle_tcp_stream *cs,
-       const struct tle_tcp_accept_param *prm, uint32_t tms,
-       const union pkt_info *pi, const union seg_info *si)
-{
-       int32_t rc;
-       uint32_t rtt;
-
-       /* some TX still pending for that stream. */
-       if (TCP_STREAM_TX_PENDING(cs))
-               return -EAGAIN;
-
-       /* setup L4 ports and L3 addresses fields. */
-       cs->s.port.raw = pi->port.raw;
-       cs->s.pmsk.raw = UINT32_MAX;
-
-       if (pi->tf.type == TLE_V4) {
-               cs->s.ipv4.addr = pi->addr4;
-               cs->s.ipv4.mask.src = INADDR_NONE;
-               cs->s.ipv4.mask.dst = INADDR_NONE;
-       } else if (pi->tf.type == TLE_V6) {
-               cs->s.ipv6.addr = *pi->addr6;
-               rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
-                       sizeof(cs->s.ipv6.mask.src));
-               rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
-                       sizeof(cs->s.ipv6.mask.dst));
-       }
-
-       /* setup TCB */
-       sync_fill_tcb(&cs->tcb, si, prm->syn.pkt);
-       cs->tcb.rcv.wnd = cs->rx.q->prod.mask << cs->tcb.rcv.wscale;
-
-       /* setup stream notification menchanism */
-       cs->rx.ev = prm->cfg.recv_ev;
-       cs->rx.cb = prm->cfg.recv_cb;
-       cs->tx.ev = prm->cfg.send_ev;
-       cs->tx.cb = prm->cfg.send_cb;
-       cs->err.ev = prm->cfg.err_ev;
-       cs->err.cb = prm->cfg.err_cb;
-
-       /* store other params */
-       cs->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries :
-               TLE_TCP_DEFAULT_RETRIES;
-
-       /*
-        * estimate the rto
-        * for now rtt is calculated based on the tcp TMS option,
-        * later add real-time one
-        */
-       if (cs->tcb.so.ts.ecr) {
-               rtt = tms - cs->tcb.so.ts.ecr;
-               rto_estimate(&cs->tcb, rtt);
-       } else
-               cs->tcb.snd.rto = TCP_RTO_DEFAULT;
-
-       tcp_stream_up(cs);
-
-       /* copy streams type. */
-       cs->s.type = ps->s.type;
-
-       /* retrive and cache destination information. */
-       rc = stream_fill_dest(cs);
-       if (rc != 0)
-               return rc;
-
-       /* update snd.mss with SMSS value */
-       cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
-
-       /* setup congestion variables */
-       cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss);
-       cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
-
-       cs->tcb.state = TCP_ST_ESTABLISHED;
-       cs->tcb.uop |= TCP_OP_ACCEPT;
-
-       /* add stream to the table */
-       cs->ste = prm->syn.opaque;
-       rte_smp_wmb();
-       cs->ste->data = cs;
-       return 0;
-}
-
-/*
- * !!!
- * Right now new stream rcv.wnd is set to zero.
- * That simplifies handling of new connection establishment
- * (as no data segments could be received),
- * but has to be addressed.
- * possible ways:
- *  - send ack after accept creates new stream with new rcv.wnd value.
- *    the problem with that approach that single ack is not delivered
- *    reliably (could be lost), plus might slowdown connection establishment
- *    (extra packet per connection, that client has to wait for).
- *  - allocate new stream at ACK recieve stage.
- *    As a drawback - whole new stream allocation/connection establishment
- *    will be done in BE.
- * !!!
- */
-int
-tle_tcp_stream_accept(struct tle_stream *ts,
-       const struct tle_tcp_accept_param prm[], struct tle_stream *rs[],
-       uint32_t num)
-{
-       struct tle_tcp_stream *cs, *s;
-       struct tle_ctx *ctx;
-       uint32_t i, j, n, tms;
-       int32_t rc;
-       union pkt_info pi[num];
-       union seg_info si[num];
-
-       tms = tcp_get_tms();
-       s = TCP_STREAM(ts);
-
-       for (i = 0; i != num; i++)
-               get_pkt_info(prm[i].syn.pkt, &pi[i], &si[i]);
-
-       /* mark stream as not closable */
-       if (rwl_acquire(&s->rx.use) < 0)
-               return -EINVAL;
-
-       ctx = s->s.ctx;
-       n = get_streams(ctx, rs, num);
-
-       rc = 0;
-       for (i = 0; i != n; i++) {
-
-               /* prepare new stream */
-               cs = TCP_STREAM(rs[i]);
-               rc = accept_fill_stream(s, cs, prm + i, tms, pi + i, si + i);
-               if (rc != 0)
-                       break;
-       }
-
-       rwl_release(&s->rx.use);
-
-       /* free 'SYN' mbufs. */
-       for (j = 0; j != i; j++)
-               rte_pktmbuf_free(prm[j].syn.pkt);
-
-       /* close failed stream, put unused streams back to the free list. */
-       if (rc != 0) {
-               tle_tcp_stream_close(rs[i]);
-               for (j = i + 1; j != n; j++) {
-                       cs = TCP_STREAM(rs[j]);
-                       put_stream(ctx, rs[j], TCP_STREAM_TX_PENDING(cs));
-               }
-               rte_errno = -rc;
-
-       /* not enough streams are available */
-       } else if (n != num)
-               rte_errno = ENFILE;
-
-       return i;
-}
-
-/*
- * !!! implement a proper one, or delete !!!
- * need to make sure no race conditions with add/lookup stream table.
- */
-void
-tle_tcp_reject(struct tle_stream *s, const struct tle_syn_req rq[],
-       uint32_t num)
-{
-       uint32_t i;
-       struct rte_mbuf *mb;
-       struct stbl *st;
-       union pkt_info pi;
-       union seg_info si;
-
-       st = CTX_TCP_STLB(s->ctx);
-
-       for (i = 0; i != num; i++) {
-               mb = rq[i].pkt;
-               get_pkt_info(mb, &pi, &si);
-               if (pi.tf.type < TLE_VNUM)
-                       stbl_del_pkt_lock(st, rq[i].opaque, &pi);
-
-               /* !!! send RST pkt to the peer !!! */
-               rte_pktmbuf_free(mb);
-       }
-}
-
 uint16_t
 tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
 {
@@ -2121,7 +2063,7 @@ tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
 
        s->tcb.rcv.mss = s->tcb.so.mss;
        s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT;
-       s->tcb.rcv.wnd = s->rx.q->prod.mask << s->tcb.rcv.wscale;
+       s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
        s->tcb.rcv.ts = 0;
 
        /* add the stream in stream table */
index 67ed66b..f06b2ed 100644 (file)
@@ -511,6 +511,7 @@ tle_tcp_stream_listen(struct tle_stream *ts)
                                TCP_ST_LISTEN);
                if (rc != 0) {
                        s->tcb.uop |= TCP_OP_LISTEN;
+                       s->tcb.rcv.wnd = calc_rx_wnd(s, TCP_WSCALE_DEFAULT);
                        rc = 0;
                } else
                        rc = -EDEADLK;
@@ -520,3 +521,87 @@ tle_tcp_stream_listen(struct tle_stream *ts)
        rwl_release(&s->rx.use);
        return rc;
 }
+
+/*
+ * helper function, updates stream config
+ */
+static inline int
+stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm)
+{
+       int32_t rc1, rc2;
+       struct tle_tcp_stream *s;
+
+       s = TCP_STREAM(ts);
+
+       rc1 = rwl_try_acquire(&s->rx.use);
+       rc2 = rwl_try_acquire(&s->tx.use);
+
+       if (rc1 < 0 || rc2 < 0 || (s->tcb.uop & TCP_OP_CLOSE) != 0) {
+               rwl_release(&s->tx.use);
+               rwl_release(&s->rx.use);
+               return -EINVAL;
+       }
+
+       /* setup stream notification menchanism */
+       s->rx.ev = prm->recv_ev;
+       s->tx.ev = prm->send_ev;
+       s->err.ev = prm->err_ev;
+
+       s->rx.cb.data = prm->recv_cb.data;
+       s->tx.cb.data = prm->send_cb.data;
+       s->err.cb.data = prm->err_cb.data;
+
+       rte_smp_wmb();
+
+       s->rx.cb.func = prm->recv_cb.func;
+       s->tx.cb.func = prm->send_cb.func;
+       s->err.cb.func = prm->err_cb.func;
+
+       /* store other params */
+       s->tcb.snd.nb_retm = (prm->nb_retries != 0) ? prm->nb_retries :
+               TLE_TCP_DEFAULT_RETRIES;
+
+       /* invoke async notifications, if any */
+       if (rte_ring_count(s->rx.q) != 0) {
+               if (s->rx.ev != NULL)
+                       tle_event_raise(s->rx.ev);
+               else if (s->rx.cb.func != NULL)
+                       s->rx.cb.func(s->rx.cb.data, &s->s);
+       }
+       if (rte_ring_free_count(s->tx.q) != 0) {
+               if (s->tx.ev != NULL)
+                       tle_event_raise(s->tx.ev);
+               else if (s->tx.cb.func != NULL)
+                       s->tx.cb.func(s->tx.cb.data, &s->s);
+       }
+       if (s->tcb.state == TCP_ST_CLOSE_WAIT ||
+                       s->tcb.state ==  TCP_ST_CLOSED) {
+               if (s->err.ev != NULL)
+                       tle_event_raise(s->err.ev);
+               else if (s->err.cb.func != NULL)
+                       s->err.cb.func(s->err.cb.data, &s->s);
+       }
+
+       rwl_release(&s->tx.use);
+       rwl_release(&s->rx.use);
+
+       return 0;
+}
+
+uint32_t
+tle_tcp_stream_update_cfg(struct tle_stream *ts[],
+       struct tle_tcp_stream_cfg prm[], uint32_t num)
+{
+       int32_t rc;
+       uint32_t i;
+
+       for (i = 0; i != num; i++) {
+               rc = stream_update_cfg(ts[i], &prm[i]);
+               if (rc != 0) {
+                       rte_errno = -rc;
+                       break;
+               }
+       }
+
+       return i;
+}
index b19954a..d730345 100644 (file)
@@ -106,6 +106,11 @@ struct tle_event *tle_event_alloc(struct tle_evq *evq, const void *data);
  */
 void tle_event_free(struct tle_event *ev);
 
+static inline enum tle_ev_state
+tle_event_state(const struct tle_event *ev)
+{
+       return ev->state;
+}
 
 /**
  * move event from DOWN to UP state.
index e6eb336..ec89746 100644 (file)
@@ -148,40 +148,19 @@ int tle_tcp_stream_connect(struct tle_stream *s, const struct sockaddr *addr);
  * <stream open happens here>
  * tle_tcp_stream_listen(stream_to_listen);
  * <wait for read event/callback on that stream>
- * n = tle_tcp_synreqs(stream_to_listen, syn_reqs, sizeof(syn_reqs));
- * for (i = 0, k = 0; i != n; i++) {
- *     rc = <decide should connection from that endpoint be allowed>;
- *     if (rc == 0) {
- *             //proceed with connection establishment
- *             k++;
- *             accept_param[k].syn = syn_reqs[i];
- *             <fill rest of accept_param fields for k-th connection>
- *     } else {
- *             //reject connection requests from that endpoint
- *             rej_reqs[i - k] = syn_reqs[i];
- *     }
+ * n = tle_tcp_accept(stream_to_listen, accepted_streams,
+ *     sizeof(accepted_streams));
+ * for (i = 0, i != n; i++) {
+ *     //prepare tle_tcp_stream_cfg for newly accepted streams
+ *     ...
+ * }
+ * k = tle_tcp_stream_update_cfg(rs, prm, n);
+ * if (n != k) {
+ *     //handle error
+ *     ...
  * }
- *
- *     //reject n - k connection requests
- *     tle_tcp_reject(stream_to_listen, rej_reqs, n - k);
- *
- *     //accept k new connections
- *     rc = tle_tcp_accept(stream_to_listen, accept_param, new_con_streams, k);
- *     <handle errors>
  */
 
-struct tle_syn_req {
-       struct rte_mbuf *pkt;
-       /*< mbuf with incoming connection request. */
-       void *opaque;    /*< tldk related opaque pointer. */
-};
-
-struct tle_tcp_accept_param {
-       struct tle_syn_req syn;        /*< mbuf with incoming SYN request. */
-       struct tle_tcp_stream_cfg cfg; /*< stream configure options. */
-};
-
-
 /**
  * Set stream into the listen state (passive opener), i.e. make stream ready
  * to accept new connections.
@@ -198,28 +177,41 @@ struct tle_tcp_accept_param {
 int tle_tcp_stream_listen(struct tle_stream *s);
 
 /**
- * return up to *num* mbufs with SYN requests that were received
+ * return up to *num* streams from the queue of pending connections
  * for given TCP endpoint.
- * Note that the stream has to be in listen state.
- * For each returned mbuf:
- * data_off set to the start of the packet
- * l2_len, l3_len, l4_len are setup properly
- * (so user can still extract L2/L3/L4 header info if needed)
- * packet_type RTE_PTYPE_L2/L3/L4 bits are setup properly.
- * L3/L4 checksum is verified.
  * @param s
- *   TCP stream to receive packets from.
- * @param rq
- *   An array of tle_syn_req structures that contains
- *   at least *num* elements in it.
+ *   TCP stream in listen state.
+ * @param rs
+ *   An array of pointers to the newily accepted streams.
+ *   Each such new stream represents a new connection to the given TCP endpoint.
+ *   Newly accepted stream should be in connected state and ready to use
+ *   by other FE API routines (send/recv/close/etc.).
  * @param num
- *   Number of elements in the *pkt* array.
+ *   Number of elements in the *rs* array.
  * @return
- *   number of of entries filled inside *pkt* array.
+ *   number of entries filled inside *rs* array.
  */
-uint16_t tle_tcp_stream_synreqs(struct tle_stream *s, struct tle_syn_req rq[],
+uint16_t tle_tcp_stream_accept(struct tle_stream *s, struct tle_stream *rs[],
        uint32_t num);
 
+/**
+ * updates configuration (associated events, callbacks, stream parameters)
+ * for the given streams.
+ * @param ts
+ *   An array of pointers to the streams to update.
+ * @param prm
+ *   An array of parameters to update for the given streams.
+ * @param num
+ *   Number of elements in the *ts* and *prm* arrays.
+ * @return
+ *   number of streams successfully updated.
+ *   In case of error, error code set in rte_errno.
+ *   Possible rte_errno errors include:
+ *   - EINVAL - invalid parameter passed to function
+ */
+uint32_t tle_tcp_stream_update_cfg(struct tle_stream *ts[],
+       struct tle_tcp_stream_cfg prm[], uint32_t num);
+
 /**
  * Accept connection requests for the given stream.
  * Note that the stream has to be in listen state.
@@ -241,27 +233,9 @@ uint16_t tle_tcp_stream_synreqs(struct tle_stream *s, struct tle_syn_req rq[],
  *   - EINVAL - invalid parameter passed to function
  *   - ENFILE - no more streams are avaialble to open.
  */
-int tle_tcp_stream_accept(struct tle_stream *s,
-       const struct tle_tcp_accept_param prm[], struct tle_stream *rs[],
-       uint32_t num);
-
-/**
- * Reject connection requests for the given stream.
- * Note that the stream has to be in listen state.
- * For each new connection a new stream will be open.
- * @param s
- *   TCP listen stream.
- * @param rq
- *   An array of tle_syn_req structures that contains
- *   at least *num* elements in it.
- * @param num
- *   Number of elements in the *pkt* array.
- */
-void tle_tcp_reject(struct tle_stream *s, const struct tle_syn_req rq[],
-       uint32_t num);
 
 /**
- * return up to *num* mbufs that was received for given TCP stream.
+ * Return up to *num* mbufs that was received for given TCP stream.
  * Note that the stream has to be in connected state.
  * Data ordering is preserved.
  * For each returned mbuf: