2 * Copyright (c) 2016-2017 Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
17 #include <rte_malloc.h>
18 #include <rte_errno.h>
19 #include <rte_ethdev.h>
23 #include "tcp_stream.h"
24 #include "tcp_timer.h"
25 #include "stream_table.h"
31 #define MAX_STREAM_BURST 0x40
34 unuse_stream(struct tle_tcp_stream *s)
37 rte_atomic32_set(&s->use, INT32_MIN);
41 tcp_fini_streams(struct tle_ctx *ctx)
43 struct tcp_streams *ts;
45 ts = CTX_TCP_STREAMS(ctx);
49 tle_timer_free(ts->tmr);
51 tle_memtank_dump(stdout, ts->mts, TLE_MTANK_DUMP_STAT);
52 tle_memtank_sanity_check(ts->mts, 0);
53 tle_memtank_destroy(ts->mts);
55 STAILQ_INIT(&ts->dr.fe);
56 STAILQ_INIT(&ts->dr.be);
60 ctx->streams.buf = NULL;
61 STAILQ_INIT(&ctx->streams.free);
64 static struct rte_ring *
65 alloc_ring(uint32_t n, uint32_t flags, int32_t socket)
69 char name[RTE_RING_NAMESIZE];
71 n = rte_align32pow2(n);
72 sz = rte_ring_get_memsize(n);
74 r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, socket);
76 TCP_LOG(ERR, "%s: allocation of %zu bytes on socket %d "
77 "failed with error code: %d\n",
78 __func__, sz, socket, rte_errno);
82 snprintf(name, sizeof(name), "%p@%zu", r, sz);
83 rte_ring_init(r, name, n, flags);
88 calc_stream_szofs(struct tle_ctx *ctx, struct stream_szofs *szofs)
90 uint32_t n, na, sz, tsz;
92 sz = sizeof(struct tle_tcp_stream);
94 n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
95 tcp_ofo_calc_elems(n, &szofs->ofo.nb_obj, &szofs->ofo.nb_max, &tsz);
99 sz = RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
101 na = rte_align32pow2(n);
103 szofs->rxq.nb_obj = na;
105 sz += rte_ring_get_memsize(na);
106 sz = RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
108 n = RTE_MAX(ctx->prm.max_stream_sbufs, 1U);
109 na = rte_align32pow2(n);
111 szofs->txq.nb_obj = na;
113 sz += rte_ring_get_memsize(na);
114 sz = RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
116 szofs->drb.nb_obj = drb_nb_elem(ctx);
117 szofs->drb.nb_max = calc_stream_drb_num(ctx, szofs->drb.nb_obj);
118 szofs->drb.nb_rng = rte_align32pow2(szofs->drb.nb_max);
119 szofs->drb.rng_sz = rte_ring_get_memsize(szofs->drb.nb_rng);
120 szofs->drb.blk_sz = tle_drb_calc_size(szofs->drb.nb_obj);
123 sz += szofs->drb.rng_sz + szofs->drb.blk_sz * szofs->drb.nb_max;
124 sz = RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
130 init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s,
131 const struct stream_szofs *szofs)
136 f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
137 (RING_F_SP_ENQ | RING_F_SC_DEQ);
141 s->rx.ofo = (void *)((uintptr_t)s + szofs->ofo.ofs);
142 tcp_ofo_init(s->rx.ofo, szofs->ofo.nb_obj, szofs->ofo.nb_max);
144 s->rx.q = (void *)((uintptr_t)s + szofs->rxq.ofs);
145 rte_ring_init(s->rx.q, __func__, szofs->rxq.nb_obj, f | RING_F_SP_ENQ);
149 s->tx.q = (void *)((uintptr_t)s + szofs->txq.ofs);
150 rte_ring_init(s->tx.q, __func__, szofs->txq.nb_obj, f | RING_F_SC_DEQ);
152 s->tx.drb.r = (void *)((uintptr_t)s + szofs->drb.ofs);
153 rte_ring_init(s->tx.drb.r, __func__, szofs->drb.nb_rng, f);
155 for (i = 0; i != szofs->drb.nb_max; i++) {
156 drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
157 szofs->drb.rng_sz + szofs->drb.blk_sz * i);
159 drb->size = szofs->drb.nb_obj;
160 rte_ring_enqueue(s->tx.drb.r, drb);
163 s->tx.drb.nb_elem = szofs->drb.nb_obj;
164 s->tx.drb.nb_max = szofs->drb.nb_max;
166 /* mark stream as avaialble to use. */
173 tcp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb)
175 struct tle_tcp_stream *us;
177 us = (struct tle_tcp_stream *)s;
178 _rte_ring_enqueue_burst(us->tx.drb.r, (void **)drb, nb_drb);
181 static struct tle_timer_wheel *
182 alloc_timers(const struct tle_ctx *ctx)
184 struct tle_timer_wheel *twl;
185 struct tle_timer_wheel_args twprm;
187 twprm.tick_size = TCP_RTO_GRANULARITY;
188 twprm.max_timer = ctx->prm.max_streams;
189 twprm.socket_id = ctx->prm.socket_id;
191 twl = tle_timer_create(&twprm, tcp_get_tms(ctx->cycles_ms_shift));
193 TCP_LOG(ERR, "alloc_timers(ctx=%p) failed with error=%d\n",
199 mts_alloc(size_t sz, void *udata)
204 return rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
209 mts_free(void *p, void *udata)
216 mts_init(void *pa[], uint32_t num, void *udata)
220 struct tcp_streams *ts;
223 ts = CTX_TCP_STREAMS(ctx);
225 for (i = 0; i != num; i++)
226 init_stream(ctx, pa[i], &ts->szofs);
229 static struct tle_memtank *
230 alloc_mts(struct tle_ctx *ctx, uint32_t stream_size)
232 struct tle_memtank *mts;
233 struct tle_memtank_prm prm;
235 static const struct tle_memtank_prm cprm = {
236 .obj_align = RTE_CACHE_LINE_SIZE,
237 .flags = TLE_MTANK_OBJ_DBG,
246 prm.obj_size = stream_size;
248 prm.min_free = (ctx->prm.free_streams.min != 0) ?
249 ctx->prm.free_streams.min : ctx->prm.max_streams;
250 prm.max_free = (ctx->prm.free_streams.max > prm.min_free) ?
251 ctx->prm.free_streams.max : prm.min_free;
253 prm.nb_obj_chunk = MAX_STREAM_BURST;
254 prm.max_obj = ctx->prm.max_streams;
256 mts = tle_memtank_create(&prm);
258 TCP_LOG(ERR, "%s(ctx=%p) failed with error=%d\n",
259 __func__, ctx, rte_errno);
261 tle_memtank_grow(mts);
267 tcp_init_streams(struct tle_ctx *ctx)
271 struct tcp_streams *ts;
272 struct stream_szofs szofs;
274 f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
275 (RING_F_SP_ENQ | RING_F_SC_DEQ);
277 calc_stream_szofs(ctx, &szofs);
278 TCP_LOG(NOTICE, "ctx:%p, caluclated stream size: %u\n",
281 ts = rte_zmalloc_socket(NULL, sizeof(*ts), RTE_CACHE_LINE_SIZE,
288 STAILQ_INIT(&ts->dr.fe);
289 STAILQ_INIT(&ts->dr.be);
291 ctx->streams.buf = ts;
292 STAILQ_INIT(&ctx->streams.free);
294 rc = stbl_init(&ts->st, ctx->prm.max_streams, ctx->prm.socket_id);
297 ts->tsq = alloc_ring(ctx->prm.max_streams, f | RING_F_SC_DEQ,
299 ts->tmr = alloc_timers(ctx);
300 ts->mts = alloc_mts(ctx, szofs.size);
302 if (ts->tsq == NULL || ts->tmr == NULL || ts->mts == NULL)
305 tle_memtank_dump(stdout, ts->mts, TLE_MTANK_DUMP_STAT);
309 TCP_LOG(ERR, "initalisation of tcp streams failed");
310 tcp_fini_streams(ctx);
316 static void __attribute__((constructor))
317 tcp_stream_setup(void)
319 static const struct stream_ops tcp_ops = {
320 .init_streams = tcp_init_streams,
321 .fini_streams = tcp_fini_streams,
322 .free_drbs = tcp_free_drbs,
325 tle_stream_ops[TLE_PROTO_TCP] = tcp_ops;
329 * Helper routine, check that input event and callback are mutually exclusive.
332 check_cbev(const struct tle_event *ev, const struct tle_stream_cb *cb)
334 if (ev != NULL && cb->func != NULL)
340 check_stream_prm(const struct tle_ctx *ctx,
341 const struct tle_tcp_stream_param *prm)
343 if ((prm->addr.local.ss_family != AF_INET &&
344 prm->addr.local.ss_family != AF_INET6) ||
345 prm->addr.local.ss_family != prm->addr.remote.ss_family)
348 /* callback and event notifications mechanisms are mutually exclusive */
349 if (check_cbev(prm->cfg.recv_ev, &prm->cfg.recv_cb) != 0 ||
350 check_cbev(prm->cfg.recv_ev, &prm->cfg.recv_cb) != 0 ||
351 check_cbev(prm->cfg.err_ev, &prm->cfg.err_cb) != 0)
354 /* check does context support desired address family. */
355 if ((prm->addr.local.ss_family == AF_INET &&
356 ctx->prm.lookup4 == NULL) ||
357 (prm->addr.local.ss_family == AF_INET6 &&
358 ctx->prm.lookup6 == NULL))
365 tle_tcp_stream_open(struct tle_ctx *ctx,
366 const struct tle_tcp_stream_param *prm)
368 struct tcp_streams *ts;
369 struct tle_tcp_stream *s;
372 if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) {
377 ts = CTX_TCP_STREAMS(ctx);
379 s = tcp_stream_get(ctx, TLE_MTANK_ALLOC_CHUNK | TLE_MTANK_ALLOC_GROW);
385 /* setup L4 ports and L3 addresses fields. */
386 rc = stream_fill_ctx(ctx, &s->s,
387 (const struct sockaddr *)&prm->addr.local,
388 (const struct sockaddr *)&prm->addr.remote);
391 tle_memtank_free(ts->mts, (void **)&s, 1,
392 TLE_MTANK_FREE_SHRINK);
397 /* setup stream notification menchanism */
398 s->rx.ev = prm->cfg.recv_ev;
399 s->rx.cb = prm->cfg.recv_cb;
400 s->tx.ev = prm->cfg.send_ev;
401 s->tx.cb = prm->cfg.send_cb;
402 s->err.ev = prm->cfg.err_ev;
403 s->err.cb = prm->cfg.err_cb;
405 /* store other params */
406 s->flags = ctx->prm.flags;
407 s->tcb.snd.nb_retm = (prm->cfg.nb_retries != 0) ? prm->cfg.nb_retries :
408 TLE_TCP_DEFAULT_RETRIES;
409 s->tcb.snd.cwnd = (ctx->prm.icw == 0) ? TCP_INITIAL_CWND_MAX :
411 s->tcb.snd.rto_tw = (ctx->prm.timewait == TLE_TCP_TIMEWAIT_DEFAULT) ?
412 TCP_RTO_2MSL : ctx->prm.timewait;
419 * Helper functions, used by close API.
422 stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s)
426 static const struct tle_stream_cb zcb;
428 /* check was close() already invoked */
430 if ((uop & TCP_OP_CLOSE) != 0)
433 /* record that close() was already invoked */
434 if (rte_atomic16_cmpset(&s->tcb.uop, uop, uop | TCP_OP_CLOSE) == 0)
437 /* mark stream as unavaialbe for RX/TX. */
440 /* reset events/callbacks */
449 state = s->tcb.state;
451 /* CLOSED, LISTEN, SYN_SENT - we can close the stream straighway */
452 if (state <= TCP_ST_SYN_SENT) {
453 tcp_stream_reset(ctx, s);
457 /* generate FIN and proceed with normal connection termination */
458 if (state == TCP_ST_ESTABLISHED || state == TCP_ST_CLOSE_WAIT) {
461 s->tcb.state = (state == TCP_ST_ESTABLISHED) ?
462 TCP_ST_FIN_WAIT_1 : TCP_ST_LAST_ACK;
464 /* mark stream as writable/readable again */
467 /* queue stream into to-send queue */
473 * accroding to the state, close() was already invoked,
474 * should never that point.
481 tle_tcp_stream_close_bulk(struct tle_stream *ts[], uint32_t num)
486 struct tle_tcp_stream *s;
490 for (i = 0; i != num && rc == 0; i++) {
492 s = TCP_STREAM(ts[i]);
493 if (ts[i] == NULL || s->s.type >= TLE_VNUM)
498 rc = stream_close(ctx, s);
499 tle_memtank_shrink(CTX_TCP_MTS(ctx));
509 tle_tcp_stream_close(struct tle_stream *ts)
513 struct tle_tcp_stream *s;
516 if (ts == NULL || s->s.type >= TLE_VNUM)
520 rc = stream_close(ctx, s);
521 tle_memtank_shrink(CTX_TCP_MTS(ctx));
526 tle_tcp_stream_get_addr(const struct tle_stream *ts,
527 struct tle_tcp_stream_addr *addr)
529 struct sockaddr_in *lin4, *rin4;
530 struct sockaddr_in6 *lin6, *rin6;
531 struct tle_tcp_stream *s;
534 if (addr == NULL || ts == NULL || s->s.type >= TLE_VNUM)
537 if (s->s.type == TLE_V4) {
539 lin4 = (struct sockaddr_in *)&addr->local;
540 rin4 = (struct sockaddr_in *)&addr->remote;
542 addr->local.ss_family = AF_INET;
543 addr->remote.ss_family = AF_INET;
545 lin4->sin_port = s->s.port.dst;
546 rin4->sin_port = s->s.port.src;
547 lin4->sin_addr.s_addr = s->s.ipv4.addr.dst;
548 rin4->sin_addr.s_addr = s->s.ipv4.addr.src;
550 } else if (s->s.type == TLE_V6) {
552 lin6 = (struct sockaddr_in6 *)&addr->local;
553 rin6 = (struct sockaddr_in6 *)&addr->remote;
555 addr->local.ss_family = AF_INET6;
556 addr->remote.ss_family = AF_INET6;
558 lin6->sin6_port = s->s.port.dst;
559 rin6->sin6_port = s->s.port.src;
560 memcpy(&lin6->sin6_addr, &s->s.ipv6.addr.dst,
561 sizeof(lin6->sin6_addr));
562 memcpy(&rin6->sin6_addr, &s->s.ipv6.addr.src,
563 sizeof(rin6->sin6_addr));
570 tle_tcp_stream_listen(struct tle_stream *ts)
572 struct tle_tcp_stream *s;
576 if (ts == NULL || s->s.type >= TLE_VNUM)
579 /* app may listen for multiple times to change backlog,
580 * we will just return success for such cases.
582 if (s->tcb.state == TCP_ST_LISTEN)
585 /* mark stream as not closable. */
586 if (tcp_stream_try_acquire(s) > 0) {
587 rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
590 s->tcb.uop |= TCP_OP_LISTEN;
591 s->tcb.rcv.wnd = calc_rx_wnd(s, TCP_WSCALE_DEFAULT);
598 tcp_stream_release(s);
603 * helper function, updates stream config
606 stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm)
608 struct tle_tcp_stream *s;
612 if (tcp_stream_try_acquire(s) < 0 || (s->tcb.uop & TCP_OP_CLOSE) != 0) {
613 tcp_stream_release(s);
617 /* setup stream notification menchanism */
618 s->rx.ev = prm->recv_ev;
619 s->tx.ev = prm->send_ev;
620 s->err.ev = prm->err_ev;
622 s->rx.cb.data = prm->recv_cb.data;
623 s->tx.cb.data = prm->send_cb.data;
624 s->err.cb.data = prm->err_cb.data;
628 s->rx.cb.func = prm->recv_cb.func;
629 s->tx.cb.func = prm->send_cb.func;
630 s->err.cb.func = prm->err_cb.func;
632 /* store other params */
633 s->tcb.snd.nb_retm = (prm->nb_retries != 0) ? prm->nb_retries :
634 TLE_TCP_DEFAULT_RETRIES;
636 /* invoke async notifications, if any */
637 if (rte_ring_count(s->rx.q) != 0) {
638 if (s->rx.ev != NULL)
639 tle_event_raise(s->rx.ev);
640 else if (s->rx.cb.func != NULL)
641 s->rx.cb.func(s->rx.cb.data, &s->s);
643 if (rte_ring_free_count(s->tx.q) != 0) {
644 if (s->tx.ev != NULL)
645 tle_event_raise(s->tx.ev);
646 else if (s->tx.cb.func != NULL)
647 s->tx.cb.func(s->tx.cb.data, &s->s);
649 if (s->tcb.state == TCP_ST_CLOSE_WAIT ||
650 s->tcb.state == TCP_ST_CLOSED) {
651 if (s->err.ev != NULL)
652 tle_event_raise(s->err.ev);
653 else if (s->err.cb.func != NULL)
654 s->err.cb.func(s->err.cb.data, &s->s);
657 tcp_stream_release(s);
662 tle_tcp_stream_update_cfg(struct tle_stream *ts[],
663 struct tle_tcp_stream_cfg prm[], uint32_t num)
668 for (i = 0; i != num; i++) {
669 rc = stream_update_cfg(ts[i], &prm[i]);
680 tle_tcp_stream_get_mss(const struct tle_stream * ts)
682 struct tle_tcp_stream *s;
688 return s->tcb.snd.mss;