l4p/tcp: introduce tle_tcp_stream_abort() API
[tldk.git] / lib / libtle_l4p / tcp_stream.c
1 /*
2  * Copyright (c) 2016-2017  Intel Corporation.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <string.h>
17 #include <rte_malloc.h>
18 #include <rte_errno.h>
19 #include <rte_ethdev.h>
20 #include <rte_ip.h>
21 #include <rte_tcp.h>
22
23 #include "tcp_stream.h"
24 #include "tcp_timer.h"
25 #include "stream_table.h"
26 #include "misc.h"
27 #include "tcp_ctl.h"
28 #include "tcp_ofo.h"
29 #include "tcp_txq.h"
30
31 #define MAX_STREAM_BURST        0x40
32
33 static void
34 unuse_stream(struct tle_tcp_stream *s)
35 {
36         s->s.type = TLE_VNUM;
37         rte_atomic32_set(&s->use, INT32_MIN);
38 }
39
40 static void
41 tcp_fini_streams(struct tle_ctx *ctx)
42 {
43         struct tcp_streams *ts;
44
45         ts = CTX_TCP_STREAMS(ctx);
46         if (ts != NULL) {
47
48                 stbl_fini(&ts->st);
49                 tle_timer_free(ts->tmr);
50                 rte_free(ts->tsq);
51                 tle_memtank_dump(stdout, ts->mts, TLE_MTANK_DUMP_STAT);
52                 tle_memtank_sanity_check(ts->mts, 0);
53                 tle_memtank_destroy(ts->mts);
54
55                 STAILQ_INIT(&ts->dr.fe);
56                 STAILQ_INIT(&ts->dr.be);
57         }
58
59         rte_free(ts);
60         ctx->streams.buf = NULL;
61         STAILQ_INIT(&ctx->streams.free);
62 }
63
64 static struct rte_ring *
65 alloc_ring(uint32_t n, uint32_t flags, int32_t socket)
66 {
67         struct rte_ring *r;
68         size_t sz;
69         char name[RTE_RING_NAMESIZE];
70
71         n = rte_align32pow2(n);
72         sz =  rte_ring_get_memsize(n);
73
74         r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE, socket);
75         if (r == NULL) {
76                 TCP_LOG(ERR, "%s: allocation of %zu bytes on socket %d "
77                         "failed with error code: %d\n",
78                         __func__, sz, socket, rte_errno);
79                 return NULL;
80         }
81
82         snprintf(name, sizeof(name), "%p@%zu", r, sz);
83         rte_ring_init(r, name, n, flags);
84         return r;
85 }
86
87 static void
88 calc_stream_szofs(struct tle_ctx *ctx, struct stream_szofs *szofs)
89 {
90         uint32_t n, na, sz, tsz;
91
92         sz = sizeof(struct tle_tcp_stream);
93
94         n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
95         tcp_ofo_calc_elems(n, &szofs->ofo.nb_obj, &szofs->ofo.nb_max, &tsz);
96         szofs->ofo.ofs = sz;
97
98         sz += tsz;
99         sz = RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
100
101         na = rte_align32pow2(n);
102         szofs->rxq.ofs = sz;
103         szofs->rxq.nb_obj = na;
104
105         sz += rte_ring_get_memsize(na);
106         sz = RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
107
108         n = RTE_MAX(ctx->prm.max_stream_sbufs, 1U);
109         na = rte_align32pow2(n);
110         szofs->txq.ofs = sz;
111         szofs->txq.nb_obj = na;
112
113         sz += rte_ring_get_memsize(na);
114         sz = RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
115
116         szofs->drb.nb_obj = drb_nb_elem(ctx);
117         szofs->drb.nb_max = calc_stream_drb_num(ctx, szofs->drb.nb_obj);
118         szofs->drb.nb_rng = rte_align32pow2(szofs->drb.nb_max);
119         szofs->drb.rng_sz = rte_ring_get_memsize(szofs->drb.nb_rng);
120         szofs->drb.blk_sz = tle_drb_calc_size(szofs->drb.nb_obj);
121         szofs->drb.ofs = sz;
122
123         sz += szofs->drb.rng_sz + szofs->drb.blk_sz * szofs->drb.nb_max;
124         sz = RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
125
126         szofs->size = sz;
127 }
128
129 static void
130 init_stream(struct tle_ctx *ctx, struct tle_tcp_stream *s,
131         const struct stream_szofs *szofs)
132 {
133         uint32_t f, i;
134         struct tle_drb *drb;
135
136         f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
137                 (RING_F_SP_ENQ |  RING_F_SC_DEQ);
138
139         /* init RX part. */
140
141         s->rx.ofo = (void *)((uintptr_t)s + szofs->ofo.ofs);
142         tcp_ofo_init(s->rx.ofo, szofs->ofo.nb_obj, szofs->ofo.nb_max);
143
144         s->rx.q = (void *)((uintptr_t)s + szofs->rxq.ofs);
145         rte_ring_init(s->rx.q, __func__, szofs->rxq.nb_obj, f | RING_F_SP_ENQ);
146
147         /* init TX part. */
148
149         s->tx.q = (void *)((uintptr_t)s + szofs->txq.ofs);
150         rte_ring_init(s->tx.q, __func__, szofs->txq.nb_obj, f | RING_F_SC_DEQ);
151
152         s->tx.drb.r = (void *)((uintptr_t)s + szofs->drb.ofs);
153         rte_ring_init(s->tx.drb.r, __func__, szofs->drb.nb_rng, f);
154
155         for (i = 0; i != szofs->drb.nb_max; i++) {
156                 drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
157                         szofs->drb.rng_sz + szofs->drb.blk_sz * i);
158                 drb->udata = s;
159                 drb->size = szofs->drb.nb_obj;
160                 rte_ring_enqueue(s->tx.drb.r, drb);
161         }
162
163         s->tx.drb.nb_elem = szofs->drb.nb_obj;
164         s->tx.drb.nb_max = szofs->drb.nb_max;
165
166         /* mark stream as avaialble to use. */
167
168         s->s.ctx = ctx;
169         unuse_stream(s);
170 }
171
172 static void
173 tcp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb)
174 {
175         struct tle_tcp_stream *us;
176
177         us = (struct tle_tcp_stream *)s;
178         _rte_ring_enqueue_burst(us->tx.drb.r, (void **)drb, nb_drb);
179 }
180
181 static struct tle_timer_wheel *
182 alloc_timers(const struct tle_ctx *ctx)
183 {
184         struct tle_timer_wheel *twl;
185         struct tle_timer_wheel_args twprm;
186
187         twprm.tick_size = TCP_RTO_GRANULARITY;
188         twprm.max_timer = ctx->prm.max_streams;
189         twprm.socket_id = ctx->prm.socket_id;
190
191         twl = tle_timer_create(&twprm, tcp_get_tms(ctx->cycles_ms_shift));
192         if (twl == NULL)
193                 TCP_LOG(ERR, "alloc_timers(ctx=%p) failed with error=%d\n",
194                         ctx, rte_errno);
195         return twl;
196 }
197
198 static void *
199 mts_alloc(size_t sz, void *udata)
200 {
201         struct tle_ctx *ctx;
202
203         ctx = udata;
204         return rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
205                 ctx->prm.socket_id);
206 }
207
208 static void
209 mts_free(void *p, void *udata)
210 {
211         RTE_SET_USED(udata);
212         rte_free(p);
213 }
214
215 static void
216 mts_init(void *pa[], uint32_t num, void *udata)
217 {
218         uint32_t i;
219         struct tle_ctx *ctx;
220         struct tcp_streams *ts;
221
222         ctx = udata;
223         ts = CTX_TCP_STREAMS(ctx);
224
225         for (i = 0; i != num; i++)
226                 init_stream(ctx, pa[i], &ts->szofs);
227 }
228
229 static struct tle_memtank *
230 alloc_mts(struct tle_ctx *ctx, uint32_t stream_size)
231 {
232         struct tle_memtank *mts;
233         struct tle_memtank_prm prm;
234
235         static const struct tle_memtank_prm cprm = {
236                 .obj_align = RTE_CACHE_LINE_SIZE,
237                 .flags = TLE_MTANK_OBJ_DBG,
238                 .alloc = mts_alloc,
239                 .free = mts_free,
240                 .init = mts_init,
241         };
242
243         prm = cprm;
244         prm.udata = ctx;
245
246         prm.obj_size = stream_size;
247
248         prm.min_free = (ctx->prm.free_streams.min != 0) ?
249                 ctx->prm.free_streams.min : ctx->prm.max_streams;
250         prm.max_free = (ctx->prm.free_streams.max > prm.min_free) ?
251                 ctx->prm.free_streams.max : prm.min_free;
252
253         prm.nb_obj_chunk = MAX_STREAM_BURST;
254         prm.max_obj = ctx->prm.max_streams;
255
256         mts = tle_memtank_create(&prm);
257         if (mts == NULL)
258                 TCP_LOG(ERR, "%s(ctx=%p) failed with error=%d\n",
259                         __func__, ctx, rte_errno);
260         else
261                 tle_memtank_grow(mts);
262
263         return mts;
264 }
265
266 static int
267 tcp_init_streams(struct tle_ctx *ctx)
268 {
269         uint32_t f;
270         int32_t rc;
271         struct tcp_streams *ts;
272         struct stream_szofs szofs;
273
274         f = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0) ? 0 :
275                 (RING_F_SP_ENQ |  RING_F_SC_DEQ);
276
277         calc_stream_szofs(ctx, &szofs);
278         TCP_LOG(NOTICE, "ctx:%p, caluclated stream size: %u\n",
279                 ctx, szofs.size);
280
281         ts = rte_zmalloc_socket(NULL, sizeof(*ts), RTE_CACHE_LINE_SIZE,
282                 ctx->prm.socket_id);
283         if (ts == NULL)
284                 return -ENOMEM;
285
286         ts->szofs = szofs;
287
288         STAILQ_INIT(&ts->dr.fe);
289         STAILQ_INIT(&ts->dr.be);
290
291         ctx->streams.buf = ts;
292         STAILQ_INIT(&ctx->streams.free);
293
294         rc = stbl_init(&ts->st, ctx->prm.max_streams, ctx->prm.socket_id);
295
296         if (rc == 0) {
297                 ts->tsq = alloc_ring(ctx->prm.max_streams, f | RING_F_SC_DEQ,
298                         ctx->prm.socket_id);
299                 ts->tmr = alloc_timers(ctx);
300                 ts->mts = alloc_mts(ctx, szofs.size);
301         
302                 if (ts->tsq == NULL || ts->tmr == NULL || ts->mts == NULL)
303                         rc = -ENOMEM;
304
305                 tle_memtank_dump(stdout, ts->mts, TLE_MTANK_DUMP_STAT);
306         }
307
308         if (rc != 0) {
309                 TCP_LOG(ERR, "initalisation of tcp streams failed");
310                 tcp_fini_streams(ctx);
311         }
312
313         return rc;
314 }
315
316 static void __attribute__((constructor))
317 tcp_stream_setup(void)
318 {
319         static const struct stream_ops tcp_ops = {
320                 .init_streams = tcp_init_streams,
321                 .fini_streams = tcp_fini_streams,
322                 .free_drbs = tcp_free_drbs,
323         };
324
325         tle_stream_ops[TLE_PROTO_TCP] = tcp_ops;
326 }
327
328 /*
329  * Helper routine, check that input event and callback are mutually exclusive.
330  */
331 static int
332 check_cbev(const struct tle_event *ev, const struct tle_stream_cb *cb)
333 {
334         if (ev != NULL && cb->func != NULL)
335                 return -EINVAL;
336         return 0;
337 }
338
339 static int
340 check_stream_prm(const struct tle_ctx *ctx,
341         const struct tle_tcp_stream_param *prm)
342 {
343         if ((prm->addr.local.ss_family != AF_INET &&
344                         prm->addr.local.ss_family != AF_INET6) ||
345                         prm->addr.local.ss_family != prm->addr.remote.ss_family)
346                 return -EINVAL;
347
348         /* callback and event notifications mechanisms are mutually exclusive */
349         if (check_cbev(prm->cfg.recv_ev, &prm->cfg.recv_cb) != 0 ||
350                         check_cbev(prm->cfg.recv_ev, &prm->cfg.recv_cb) != 0 ||
351                         check_cbev(prm->cfg.err_ev, &prm->cfg.err_cb) != 0)
352                 return -EINVAL;
353
354         /* check does context support desired address family. */
355         if ((prm->addr.local.ss_family == AF_INET &&
356                         ctx->prm.lookup4 == NULL) ||
357                         (prm->addr.local.ss_family == AF_INET6 &&
358                         ctx->prm.lookup6 == NULL))
359                 return -EINVAL;
360
361         return 0;
362 }
363
364 static void
365 tcp_stream_fill_cfg(struct tle_tcp_stream *s, const struct tle_ctx_param *cprm,
366         const struct tle_tcp_stream_cfg *scfg)
367 {
368         /* setup stream notification menchanism */
369         s->rx.ev = scfg->recv_ev;
370         s->rx.cb = scfg->recv_cb;
371         s->tx.ev = scfg->send_ev;
372         s->tx.cb = scfg->send_cb;
373         s->err.ev = scfg->err_ev;
374         s->err.cb = scfg->err_cb;
375
376         /* store other params */
377         s->flags = cprm->flags;
378         s->tcb.snd.nb_retm = (scfg->nb_retries != 0) ? scfg->nb_retries :
379                 TLE_TCP_DEFAULT_RETRIES;
380         s->tcb.snd.cwnd = (cprm->icw == 0) ? TCP_INITIAL_CWND_MAX :
381                                 cprm->icw;
382         s->tcb.snd.rto_tw = (cprm->timewait == TLE_TCP_TIMEWAIT_DEFAULT) ?
383                                 TCP_RTO_2MSL : cprm->timewait;
384
385         s->s.udata = scfg->udata;
386 }
387
388 static int
389 stream_fill_type_addrs_type(struct tle_stream *s, const struct sockaddr *laddr,
390         const struct sockaddr *raddr)
391 {
392         const struct sockaddr_in *lin4, *rin4;
393         const struct sockaddr_in6 *lin6, *rin6;
394
395         const size_t sz = sizeof(tle_ipv6_any);
396
397         lin4 = (const struct sockaddr_in *)laddr;
398         lin6 = (const struct sockaddr_in6 *)laddr;
399
400         rin4 = (const struct sockaddr_in *)raddr;
401         rin6 = (const struct sockaddr_in6 *)raddr;
402
403         if (laddr->sa_family == AF_INET) {
404
405                 if (lin4->sin_addr.s_addr == INADDR_ANY ||
406                                 rin4->sin_addr.s_addr == INADDR_ANY ||
407                                 lin4->sin_port == 0 || rin4->sin_port == 0)
408                         return -EINVAL;
409
410                 s->port.src = rin4->sin_port;
411                 s->port.dst = lin4->sin_port;
412
413                 s->ipv4.addr.src = rin4->sin_addr.s_addr;
414                 s->ipv4.addr.dst = lin4->sin_addr.s_addr;
415
416                 s->ipv4.mask.src = INADDR_NONE;
417                 s->ipv4.mask.dst = INADDR_NONE;
418
419                 s->type = TLE_V4;
420
421         } else if (laddr->sa_family == AF_INET6) {
422
423                 if (memcmp(&lin6->sin6_addr, &tle_ipv6_any, sz) == 0 ||
424                                 memcmp(&rin6->sin6_addr, &tle_ipv6_any,
425                                 sz) == 0 ||
426                                 lin6->sin6_port == 0 || rin6->sin6_port == 0)
427                         return -EINVAL;
428
429                 s->port.src = rin6->sin6_port;
430                 s->port.dst = lin6->sin6_port;
431
432                 memcpy(&s->ipv6.addr.src, &rin6->sin6_addr, sz);
433                 memcpy(&s->ipv6.addr.dst, &lin6->sin6_addr, sz);
434
435                 memcpy(&s->ipv6.mask.src, &tle_ipv6_none, sz);
436                 memcpy(&s->ipv6.mask.dst, &tle_ipv6_none, sz);
437
438                 s->type = TLE_V6;
439
440         } else
441                 return -EINVAL;
442
443         s->pmsk.raw = UINT32_MAX;
444         return 0;
445 }
446
447 int
448 tcp_stream_fill_prm(struct tle_tcp_stream *s,
449         const struct tle_tcp_stream_param *prm)
450 {
451         int32_t rc;
452         struct tle_ctx *ctx;
453
454         ctx = s->s.ctx;
455         if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0)
456                 return -EINVAL;
457
458         rc = stream_fill_type_addrs_type(&s->s,
459                 (const struct sockaddr *)&prm->addr.local,
460                 (const struct sockaddr *)&prm->addr.remote);
461         if (rc == 0)
462                 tcp_stream_fill_cfg(s, &ctx->prm, &prm->cfg);
463         return rc;
464 }
465
466 struct tle_stream *
467 tle_tcp_stream_open(struct tle_ctx *ctx,
468         const struct tle_tcp_stream_param *prm)
469 {
470         struct tcp_streams *ts;
471         struct tle_tcp_stream *s;
472         int32_t rc;
473
474         if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) {
475                 rte_errno = EINVAL;
476                 return NULL;
477         }
478
479         ts = CTX_TCP_STREAMS(ctx);
480
481         s = tcp_stream_get(ctx, TLE_MTANK_ALLOC_CHUNK | TLE_MTANK_ALLOC_GROW);
482         if (s == NULL) {
483                 rte_errno = ENFILE;
484                 return NULL;
485         }
486
487         /* setup L4 ports and L3 addresses fields. */
488         rc = stream_fill_ctx(ctx, &s->s,
489                 (const struct sockaddr *)&prm->addr.local,
490                 (const struct sockaddr *)&prm->addr.remote);
491
492         if (rc != 0) {
493                 tle_memtank_free(ts->mts, (void **)&s, 1,
494                         TLE_MTANK_FREE_SHRINK);
495                 rte_errno = rc;
496                 return NULL;
497         }
498
499         tcp_stream_fill_cfg(s, &ctx->prm, &prm->cfg);
500
501         tcp_stream_up(s);
502         return &s->s;
503 }
504
505 /*
506  * Helper function, used by close API.
507  */
508 static inline int
509 stream_close(struct tle_ctx *ctx, struct tle_tcp_stream *s)
510 {
511         int32_t rc;
512
513         rc = stream_close_prolog(ctx, s, TLE_TCP_OP_CLOSE);
514         if (rc <= 0)
515                 return rc;
516
517         /* generate FIN and proceed with normal connection termination */
518         if (rc == TLE_TCP_ST_ESTABLISHED || rc == TLE_TCP_ST_CLOSE_WAIT) {
519
520                 /* change state */
521                 s->tcb.state = (rc == TLE_TCP_ST_ESTABLISHED) ?
522                         TLE_TCP_ST_FIN_WAIT_1 : TLE_TCP_ST_LAST_ACK;
523
524                 /* mark stream as writable/readable again */
525                 tcp_stream_up(s);
526
527                 /* queue stream into to-send queue */
528                 txs_enqueue(ctx, s);
529                 return 0;
530         }
531
532         /*
533          * accroding to the state, close() was already invoked,
534          * should never that point.
535          */
536         RTE_ASSERT(0);
537         return -EINVAL;
538 }
539
540 uint32_t
541 tle_tcp_stream_close_bulk(struct tle_stream *ts[], uint32_t num)
542 {
543         int32_t rc;
544         uint32_t i;
545         struct tle_ctx *ctx;
546         struct tle_tcp_stream *s;
547
548         rc = 0;
549
550         for (i = 0; i != num && rc == 0; i++) {
551
552                 s = TCP_STREAM(ts[i]);
553                 if (ts[i] == NULL || s->s.type >= TLE_VNUM)
554                         rc = EINVAL;
555
556                 else {
557                         ctx = s->s.ctx;
558                         rc = stream_close(ctx, s);
559                         tle_memtank_shrink(CTX_TCP_MTS(ctx));
560                 }
561         }
562
563         if (rc != 0)
564                 rte_errno = -rc;
565         return i;
566 }
567
568 int
569 tle_tcp_stream_close(struct tle_stream *ts)
570 {
571         int32_t rc;
572         struct tle_ctx *ctx;
573         struct tle_tcp_stream *s;
574
575         s = TCP_STREAM(ts);
576         if (ts == NULL || s->s.type >= TLE_VNUM)
577                 return -EINVAL;
578
579         ctx = s->s.ctx;
580         rc = stream_close(ctx, s);
581         tle_memtank_shrink(CTX_TCP_MTS(ctx));
582         return rc;
583 }
584
585 int
586 tle_tcp_stream_abort(struct tle_stream *ts)
587 {
588         int32_t rc;
589         struct tle_ctx *ctx;
590         struct tle_tcp_stream *s;
591
592         s = TCP_STREAM(ts);
593         if (ts == NULL || s->s.type >= TLE_VNUM)
594                 return -EINVAL;
595
596         ctx = s->s.ctx;
597         rc = stream_close_prolog(ctx, s, TLE_TCP_OP_CLOSE_ABORT);
598         if (rc > 0) {
599
600                 /*
601                  * RFC 793, On ABORT call, for states:
602                  *   SYN-RECEIVED STATE
603                  *   ESTABLISHED STATE
604                  *   FIN-WAIT-1 STATE
605                  *   FIN-WAIT-2 STATE
606                  *   CLOSE-WAIT STATE
607                  * Send a reset segment: <SEQ=SND.NXT><CTL=RST>
608                  * ...; all segments queued for transmission (except for the
609                  * RST formed above) or retransmission should be flushed,
610                  * delete the TCB, enter CLOSED state, and return.
611                 */
612
613                 if (rc >= TLE_TCP_ST_ESTABLISHED && rc <= TLE_TCP_ST_CLOSE_WAIT)
614                         s->tcb.snd.close_flags |= TCP_FLAG_RST;
615
616                 /*
617                  * set state to CLOSED, mark stream as writable/readable again
618                  * and enqueue stream into to-send queue.
619                  * That will cause later RST generation and stream termination.
620                  */
621                 s->tcb.state = TLE_TCP_ST_CLOSED;
622                 tcp_stream_up(s);
623                 txs_enqueue(ctx, s);
624                 rc = 0;
625         }
626
627         tle_memtank_shrink(CTX_TCP_MTS(ctx));
628         return rc;
629 }
630
631 int
632 tle_tcp_stream_get_addr(const struct tle_stream *ts,
633         struct tle_tcp_stream_addr *addr)
634 {
635         struct sockaddr_in *lin4, *rin4;
636         struct sockaddr_in6 *lin6, *rin6;
637         struct tle_tcp_stream *s;
638
639         s = TCP_STREAM(ts);
640         if (addr == NULL || ts == NULL || s->s.type >= TLE_VNUM)
641                 return -EINVAL;
642
643         if (s->s.type == TLE_V4) {
644
645                 lin4 = (struct sockaddr_in *)&addr->local;
646                 rin4 = (struct sockaddr_in *)&addr->remote;
647
648                 addr->local.ss_family = AF_INET;
649                 addr->remote.ss_family = AF_INET;
650
651                 lin4->sin_port = s->s.port.dst;
652                 rin4->sin_port = s->s.port.src;
653                 lin4->sin_addr.s_addr = s->s.ipv4.addr.dst;
654                 rin4->sin_addr.s_addr = s->s.ipv4.addr.src;
655
656         } else if (s->s.type == TLE_V6) {
657
658                 lin6 = (struct sockaddr_in6 *)&addr->local;
659                 rin6 = (struct sockaddr_in6 *)&addr->remote;
660
661                 addr->local.ss_family = AF_INET6;
662                 addr->remote.ss_family = AF_INET6;
663
664                 lin6->sin6_port = s->s.port.dst;
665                 rin6->sin6_port = s->s.port.src;
666                 memcpy(&lin6->sin6_addr, &s->s.ipv6.addr.dst,
667                         sizeof(lin6->sin6_addr));
668                 memcpy(&rin6->sin6_addr, &s->s.ipv6.addr.src,
669                         sizeof(rin6->sin6_addr));
670         }
671
672         return 0;
673 }
674
675 int
676 tle_tcp_stream_listen(struct tle_stream *ts)
677 {
678         struct tle_tcp_stream *s;
679         int32_t rc;
680
681         s = TCP_STREAM(ts);
682         if (ts == NULL || s->s.type >= TLE_VNUM)
683                 return -EINVAL;
684
685         /* app may listen for multiple times to change backlog,
686          * we will just return success for such cases.
687          */
688         if (s->tcb.state == TLE_TCP_ST_LISTEN)
689                 return 0;
690
691         /* mark stream as not closable. */
692         if (tcp_stream_try_acquire(s) > 0) {
693                 rc = rte_atomic16_cmpset(&s->tcb.state, TLE_TCP_ST_CLOSED,
694                                 TLE_TCP_ST_LISTEN);
695                 if (rc != 0) {
696                         s->tcb.uop |= TLE_TCP_OP_LISTEN;
697                         s->tcb.rcv.wnd = calc_rx_wnd(s, TCP_WSCALE_DEFAULT);
698                         rc = 0;
699                 } else
700                         rc = -EDEADLK;
701         } else
702                 rc = -EINVAL;
703
704         tcp_stream_release(s);
705         return rc;
706 }
707
708 /*
709  * helper function, updates stream config
710  */
711 static inline int
712 stream_update_cfg(struct tle_stream *ts,struct tle_tcp_stream_cfg *prm)
713 {
714         struct tle_tcp_stream *s;
715
716         s = TCP_STREAM(ts);
717
718         if (tcp_stream_try_acquire(s) < 0 ||
719                         (s->tcb.uop & TLE_TCP_OP_CLOSE) != 0) {
720                 tcp_stream_release(s);
721                 return -EINVAL;
722         }
723
724         /* setup stream notification menchanism */
725         s->rx.ev = prm->recv_ev;
726         s->tx.ev = prm->send_ev;
727         s->err.ev = prm->err_ev;
728
729         s->rx.cb.data = prm->recv_cb.data;
730         s->tx.cb.data = prm->send_cb.data;
731         s->err.cb.data = prm->err_cb.data;
732
733         rte_smp_wmb();
734
735         s->rx.cb.func = prm->recv_cb.func;
736         s->tx.cb.func = prm->send_cb.func;
737         s->err.cb.func = prm->err_cb.func;
738
739         /* store other params */
740         s->tcb.snd.nb_retm = (prm->nb_retries != 0) ? prm->nb_retries :
741                 TLE_TCP_DEFAULT_RETRIES;
742         s->s.udata = prm->udata;
743
744         /* invoke async notifications, if any */
745         if (rte_ring_count(s->rx.q) != 0) {
746                 if (s->rx.ev != NULL)
747                         tle_event_raise(s->rx.ev);
748                 else if (s->rx.cb.func != NULL)
749                         s->rx.cb.func(s->rx.cb.data, &s->s);
750         }
751         if (rte_ring_free_count(s->tx.q) != 0) {
752                 if (s->tx.ev != NULL)
753                         tle_event_raise(s->tx.ev);
754                 else if (s->tx.cb.func != NULL)
755                         s->tx.cb.func(s->tx.cb.data, &s->s);
756         }
757         if (s->tcb.state == TLE_TCP_ST_CLOSE_WAIT ||
758                         s->tcb.state ==  TLE_TCP_ST_CLOSED) {
759                 if (s->err.ev != NULL)
760                         tle_event_raise(s->err.ev);
761                 else if (s->err.cb.func != NULL)
762                         s->err.cb.func(s->err.cb.data, &s->s);
763         }
764
765         tcp_stream_release(s);
766         return 0;
767 }
768
769 uint32_t
770 tle_tcp_stream_update_cfg(struct tle_stream *ts[],
771         struct tle_tcp_stream_cfg prm[], uint32_t num)
772 {
773         int32_t rc;
774         uint32_t i;
775
776         for (i = 0; i != num; i++) {
777                 rc = stream_update_cfg(ts[i], &prm[i]);
778                 if (rc != 0) {
779                         rte_errno = -rc;
780                         break;
781                 }
782         }
783
784         return i;
785 }
786
787 int
788 tle_tcp_stream_get_mss(const struct tle_stream * ts)
789 {
790         struct tle_tcp_stream *s;
791
792         s = TCP_STREAM(ts);
793         if (ts == NULL || s->s.type >= TLE_VNUM)
794                 return -EINVAL;
795
796         return s->tcb.snd.mss;
797 }
798
799 int
800 tle_tcp_stream_get_state(const struct tle_stream * ts,
801         struct tle_tcp_stream_state *st)
802 {
803         struct tle_tcp_stream *s;
804
805         s = TCP_STREAM(ts);
806         if (ts == NULL || s->s.type >= TLE_VNUM)
807                 return -EINVAL;
808
809         st->state = s->tcb.state;
810         st->uop = s->tcb.uop;
811         st->rev = s->err.rev;
812
813         return 0;
814 }