tle_tcp: return ENODATA for unprocessed/unused packets that belong to existing stream.
[tldk.git] / lib / libtle_l4p / tcp_rxtx.c
1 /*
2  * Copyright (c) 2016-2017  Intel Corporation.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <rte_errno.h>
17 #include <rte_ethdev.h>
18 #include <rte_ip.h>
19 #include <rte_ip_frag.h>
20 #include <rte_tcp.h>
21
22 #include "tcp_stream.h"
23 #include "tcp_timer.h"
24 #include "stream_table.h"
25 #include "syncookie.h"
26 #include "misc.h"
27 #include "tcp_ctl.h"
28 #include "tcp_rxq.h"
29 #include "tcp_txq.h"
30 #include "tcp_tx_seg.h"
31
32 #define TCP_MAX_PKT_SEG 0x20
33
34 /*
35  * checks if input TCP ports and IP addresses match given stream.
36  * returns zero on success.
37  */
38 static inline int
39 rx_check_stream(const struct tle_tcp_stream *s, const union pkt_info *pi)
40 {
41         int32_t rc;
42
43         if (pi->tf.type == TLE_V4)
44                 rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
45                         (pi->addr4.raw & s->s.ipv4.mask.raw) !=
46                         s->s.ipv4.addr.raw;
47         else
48                 rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
49                         ymm_mask_cmp(&pi->addr6->raw, &s->s.ipv6.addr.raw,
50                         &s->s.ipv6.mask.raw) != 0;
51
52         return rc;
53 }
54
55 static inline struct tle_tcp_stream *
56 rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi,
57         uint32_t type)
58 {
59         struct tle_tcp_stream *s;
60
61         s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst];
62         if (s == NULL || tcp_stream_acquire(s) < 0)
63                 return NULL;
64
65         /* check that we have a proper stream. */
66         if (s->tcb.state != TCP_ST_LISTEN) {
67                 tcp_stream_release(s);
68                 s = NULL;
69         }
70
71         return s;
72 }
73
74 static inline struct tle_tcp_stream *
75 rx_obtain_stream(const struct tle_dev *dev, struct stbl *st,
76         const union pkt_info *pi, uint32_t type)
77 {
78         struct tle_tcp_stream *s;
79
80         s = stbl_find_data(st, pi);
81         if (s == NULL) {
82                 if (pi->tf.flags == TCP_FLAG_ACK)
83                         return rx_obtain_listen_stream(dev, pi, type);
84                 return NULL;
85         }
86
87         if (tcp_stream_acquire(s) < 0)
88                 return NULL;
89         /* check that we have a proper stream. */
90         else if (s->tcb.state == TCP_ST_CLOSED) {
91                 tcp_stream_release(s);
92                 s = NULL;
93         }
94
95         return s;
96 }
97
98 /*
99  * Consider 2 pkt_info *equal* if their:
100  * - types (IPv4/IPv6)
101  * - TCP flags
102  * - checksum flags
103  * - TCP src and dst ports
104  * - IP src and dst addresses
105  * are equal.
106  */
107 static inline int
108 pkt_info_bulk_eq(const union pkt_info pi[], uint32_t num)
109 {
110         uint32_t i;
111
112         i = 1;
113
114         if (pi[0].tf.type == TLE_V4) {
115                 while (i != num && xmm_cmp(&pi[0].raw, &pi[i].raw) == 0)
116                         i++;
117
118         } else if (pi[0].tf.type == TLE_V6) {
119                 while (i != num &&
120                                 pi[0].raw.u64[0] == pi[i].raw.u64[0] &&
121                                 ymm_cmp(&pi[0].addr6->raw,
122                                 &pi[i].addr6->raw) == 0)
123                         i++;
124         }
125
126         return i;
127 }
128
129 static inline int
130 pkt_info_bulk_syneq(const union pkt_info pi[], uint32_t num)
131 {
132         uint32_t i;
133
134         i = 1;
135
136         if (pi[0].tf.type == TLE_V4) {
137                 while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
138                                 pi[0].port.dst == pi[i].port.dst &&
139                                 pi[0].addr4.dst == pi[i].addr4.dst)
140                         i++;
141
142         } else if (pi[0].tf.type == TLE_V6) {
143                 while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
144                                 pi[0].port.dst == pi[i].port.dst &&
145                                 xmm_cmp(&pi[0].addr6->dst,
146                                 &pi[i].addr6->dst) == 0)
147                         i++;
148         }
149
150         return i;
151 }
152
153 static inline void
154 stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[],
155         uint32_t nb_drb)
156 {
157         _rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
158 }
159
160 static inline uint32_t
161 stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
162         uint32_t nb_drb)
163 {
164         return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
165 }
166
167 static inline uint32_t
168 get_ip_pid(struct tle_dev *dev, uint32_t num, uint32_t type, uint32_t st)
169 {
170         uint32_t pid;
171         rte_atomic32_t *pa;
172
173         pa = &dev->tx.packet_id[type];
174
175         if (st == 0) {
176                 pid = rte_atomic32_add_return(pa, num);
177                 return pid - num;
178         } else {
179                 pid = rte_atomic32_read(pa);
180                 rte_atomic32_set(pa, pid + num);
181                 return pid;
182         }
183 }
184
185 static inline void
186 fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
187         uint32_t seq, uint8_t hlen, uint8_t flags)
188 {
189         uint16_t wnd;
190
191         l4h->src_port = port.dst;
192         l4h->dst_port = port.src;
193
194         wnd = (flags & TCP_FLAG_SYN) ?
195                 RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
196                 tcb->rcv.wnd >> tcb->rcv.wscale;
197
198         /* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
199         l4h->sent_seq = rte_cpu_to_be_32(seq);
200         l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
201         l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET;
202         l4h->tcp_flags = flags;
203         l4h->rx_win = rte_cpu_to_be_16(wnd);
204         l4h->cksum = 0;
205         l4h->tcp_urp = 0;
206
207         if (flags & TCP_FLAG_SYN)
208                 fill_syn_opts(l4h + 1, &tcb->so);
209         else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0)
210                 fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
211 }
212
213 static inline int
214 tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s,
215         const struct tle_dest *dst, uint64_t ol_flags,
216         union l4_ports port, uint32_t seq, uint32_t flags,
217         uint32_t pid, uint32_t swcsm)
218 {
219         uint32_t l4, len, plen;
220         struct tcp_hdr *l4h;
221         char *l2h;
222
223         len = dst->l2_len + dst->l3_len;
224         plen = m->pkt_len;
225
226         if (flags & TCP_FLAG_SYN)
227                 l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_MAX;
228         else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0)
229                 l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS;
230         else
231                 l4 = sizeof(*l4h);
232
233         /* adjust mbuf to put L2/L3/L4 headers into it. */
234         l2h = rte_pktmbuf_prepend(m, len + l4);
235         if (l2h == NULL)
236                 return -EINVAL;
237
238         /* copy L2/L3 header */
239         rte_memcpy(l2h, dst->hdr, len);
240
241         /* setup TCP header & options */
242         l4h = (struct tcp_hdr *)(l2h + len);
243         fill_tcph(l4h, &s->tcb, port, seq, l4, flags);
244
245         /* setup mbuf TX offload related fields. */
246         m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0);
247         m->ol_flags |= ol_flags;
248
249         /* update proto specific fields. */
250
251         if (s->s.type == TLE_V4) {
252                 struct ipv4_hdr *l3h;
253                 l3h = (struct ipv4_hdr *)(l2h + dst->l2_len);
254                 l3h->packet_id = rte_cpu_to_be_16(pid);
255                 l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4);
256
257                 if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
258                         l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
259                                 ol_flags);
260                 else if (swcsm != 0)
261                         l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
262
263                 if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0)
264                         l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
265         } else {
266                 struct ipv6_hdr *l3h;
267                 l3h = (struct ipv6_hdr *)(l2h + dst->l2_len);
268                 l3h->payload_len = rte_cpu_to_be_16(plen + l4);
269                 if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
270                         l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
271                 else if (swcsm != 0)
272                         l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
273         }
274
275         return 0;
276 }
277
278 /*
279  * That function supposed to be used only for data packets.
280  * Assumes that L2/L3/L4 headers and mbuf fields already setup properly.
281  *  - updates tcp SEG.SEQ, SEG.ACK, TS.VAL, TS.ECR.
282  *  - if no HW cksum offloads are enabled, calculates TCP checksum.
283  */
284 static inline void
285 tcp_update_mbuf(struct rte_mbuf *m, uint32_t type, const struct tcb *tcb,
286         uint32_t seq, uint32_t pid)
287 {
288         struct tcp_hdr *l4h;
289         uint32_t len;
290
291         len = m->l2_len + m->l3_len;
292         l4h = rte_pktmbuf_mtod_offset(m, struct tcp_hdr *, len);
293
294         l4h->sent_seq = rte_cpu_to_be_32(seq);
295         l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
296
297         if (tcb->so.ts.raw != 0)
298                 fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
299
300         if (type == TLE_V4) {
301                 struct ipv4_hdr *l3h;
302                 l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *, m->l2_len);
303                 l3h->hdr_checksum = 0;
304                 l3h->packet_id = rte_cpu_to_be_16(pid);
305                 if ((m->ol_flags & PKT_TX_IP_CKSUM) == 0)
306                         l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
307         }
308
309         /* have to calculate TCP checksum in SW */
310         if ((m->ol_flags & PKT_TX_TCP_CKSUM) == 0) {
311
312                 l4h->cksum = 0;
313
314                 if (type == TLE_V4) {
315                         struct ipv4_hdr *l3h;
316                         l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *,
317                                 m->l2_len);
318                         l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
319
320                 } else {
321                         struct ipv6_hdr *l3h;
322                         l3h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *,
323                                 m->l2_len);
324                         l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
325                 }
326         }
327 }
328
329 /* Send data packets that need to be ACK-ed by peer */
330 static inline uint32_t
331 tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num)
332 {
333         uint32_t bsz, i, nb, nbm;
334         struct tle_dev *dev;
335         struct tle_drb *drb[num];
336
337         /* calculate how many drbs are needed.*/
338         bsz = s->tx.drb.nb_elem;
339         nbm = (num + bsz - 1) / bsz;
340
341         /* allocate drbs, adjust number of packets. */
342         nb = stream_drb_alloc(s, drb, nbm);
343
344         /* drb ring is empty. */
345         if (nb == 0)
346                 return 0;
347
348         else if (nb != nbm)
349                 num = nb * bsz;
350
351         dev = s->tx.dst.dev;
352
353         /* enqueue pkts for TX. */
354         nbm = nb;
355         i = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)m,
356                 num, drb, &nb);
357
358         /* free unused drbs. */
359         if (nb != 0)
360                 stream_drb_free(s, drb + nbm - nb, nb);
361
362         return i;
363 }
364
365 static inline uint32_t
366 tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
367         uint32_t num)
368 {
369         uint32_t fail, i, k, n, mss, pid, plen, sz, tn, type;
370         struct tle_dev *dev;
371         struct rte_mbuf *mb;
372         struct rte_mbuf *mo[MAX_PKT_BURST + TCP_MAX_PKT_SEG];
373
374         mss = s->tcb.snd.mss;
375         type = s->s.type;
376
377         dev = s->tx.dst.dev;
378         pid = get_ip_pid(dev, num, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
379
380         k = 0;
381         tn = 0;
382         fail = 0;
383         for (i = 0; i != num && sl->len != 0 && fail == 0; i++) {
384
385                 mb = mi[i];
386                 sz = RTE_MIN(sl->len, mss);
387                 plen = PKT_L4_PLEN(mb);
388
389                 /*fast path, no need to use indirect mbufs. */
390                 if (plen <= sz) {
391
392                         /* update pkt TCP header */
393                         tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid + i);
394
395                         /* keep mbuf till ACK is received. */
396                         rte_pktmbuf_refcnt_update(mb, 1);
397                         sl->len -= plen;
398                         sl->seq += plen;
399                         mo[k++] = mb;
400                 /* remaining snd.wnd is less them MSS, send nothing */
401                 } else if (sz < mss)
402                         break;
403                 /* packet indirection needed */
404                 else
405                         RTE_VERIFY(0);
406
407                 if (k >= MAX_PKT_BURST) {
408                         n = tx_data_pkts(s, mo, k);
409                         fail = k - n;
410                         tn += n;
411                         k = 0;
412                 }
413         }
414
415         if (k != 0) {
416                 n = tx_data_pkts(s, mo, k);
417                 fail = k - n;
418                 tn += n;
419         }
420
421         if (fail != 0) {
422                 sz = tcp_mbuf_seq_free(mo + n, fail);
423                 sl->seq -= sz;
424                 sl->len += sz;
425         }
426
427         return tn;
428 }
429
430 /*
431  * gets data from stream send buffer, updates it and
432  * queues it into TX device queue.
433  * Note that this function and is not MT safe.
434  */
435 static inline uint32_t
436 tx_nxt_data(struct tle_tcp_stream *s, uint32_t tms)
437 {
438         uint32_t n, num, tn, wnd;
439         struct rte_mbuf **mi;
440         union seqlen sl;
441
442         tn = 0;
443         wnd = s->tcb.snd.wnd - (uint32_t)(s->tcb.snd.nxt - s->tcb.snd.una);
444         sl.seq = s->tcb.snd.nxt;
445         sl.len = RTE_MIN(wnd, s->tcb.snd.cwnd);
446
447         if (sl.len == 0)
448                 return tn;
449
450         /* update send timestamp */
451         s->tcb.snd.ts = tms;
452
453         do {
454                 /* get group of packets */
455                 mi = tcp_txq_get_nxt_objs(s, &num);
456
457                 /* stream send buffer is empty */
458                 if (num == 0)
459                         break;
460
461                 /* queue data packets for TX */
462                 n = tx_data_bulk(s, &sl, mi, num);
463                 tn += n;
464
465                 /* update consumer head */
466                 tcp_txq_set_nxt_head(s, n);
467         } while (n == num);
468
469         s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt;
470         return tn;
471 }
472
473 static inline void
474 free_una_data(struct tle_tcp_stream *s, uint32_t len)
475 {
476         uint32_t i, n, num, plen;
477         struct rte_mbuf **mi;
478
479         n = 0;
480         plen = 0;
481
482         do {
483                 /* get group of packets */
484                 mi = tcp_txq_get_una_objs(s, &num);
485
486                 if (num == 0)
487                         break;
488
489                 /* free acked data */
490                 for (i = 0; i != num && n != len; i++, n = plen) {
491                         plen += PKT_L4_PLEN(mi[i]);
492                         if (plen > len) {
493                                 /* keep SND.UNA at the start of the packet */
494                                 len -= RTE_MIN(len, plen - len);
495                                 break;
496                         }
497                         rte_pktmbuf_free(mi[i]);
498                 }
499
500                 /* update consumer tail */
501                 tcp_txq_set_una_tail(s, i);
502         } while (plen < len);
503
504         s->tcb.snd.una += len;
505
506         /*
507          * that could happen in case of retransmit,
508          * adjust SND.NXT with SND.UNA.
509          */
510         if (s->tcb.snd.una > s->tcb.snd.nxt) {
511                 tcp_txq_rst_nxt_head(s);
512                 s->tcb.snd.nxt = s->tcb.snd.una;
513         }
514 }
515
516 static inline uint16_t
517 calc_smss(uint16_t mss, const struct tle_dest *dst)
518 {
519         uint16_t n;
520
521         n = dst->mtu - dst->l2_len - dst->l3_len - TCP_TX_HDR_DACK;
522         mss = RTE_MIN(n, mss);
523         return mss;
524 }
525
526 /*
527  * RFC 6928 2
528  * min (10*MSS, max (2*MSS, 14600))
529  *
530  * or using user provided initial congestion window (icw)
531  * min (10*MSS, max (2*MSS, icw))
532  */
533 static inline uint32_t
534 initial_cwnd(uint32_t smss, uint32_t icw)
535 {
536         return RTE_MIN(10 * smss, RTE_MAX(2 * smss, icw));
537 }
538
539 /*
540  * queue standalone packet to he particular output device
541  * It assumes that:
542  * - L2/L3/L4 headers should be already set.
543  * - packet fits into one segment.
544  */
545 static inline int
546 send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m)
547 {
548         uint32_t n, nb;
549         struct tle_drb *drb;
550
551         if (stream_drb_alloc(s, &drb, 1) == 0)
552                 return -ENOBUFS;
553
554         /* enqueue pkt for TX. */
555         nb = 1;
556         n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1,
557                 &drb, &nb);
558
559         /* free unused drbs. */
560         if (nb != 0)
561                 stream_drb_free(s, &drb, 1);
562
563         return (n == 1) ? 0 : -ENOBUFS;
564 }
565
566 static inline int
567 send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
568         uint32_t flags)
569 {
570         const struct tle_dest *dst;
571         uint32_t pid, type;
572         int32_t rc;
573
574         dst = &s->tx.dst;
575         type = s->s.type;
576         pid = get_ip_pid(dst->dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
577
578         rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1);
579         if (rc == 0)
580                 rc = send_pkt(s, dst->dev, m);
581
582         return rc;
583 }
584
585 static inline int
586 send_rst(struct tle_tcp_stream *s, uint32_t seq)
587 {
588         struct rte_mbuf *m;
589         int32_t rc;
590
591         m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
592         if (m == NULL)
593                 return -ENOMEM;
594
595         rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST);
596         if (rc != 0)
597                 rte_pktmbuf_free(m);
598
599         return rc;
600 }
601
602 static inline int
603 send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags)
604 {
605         struct rte_mbuf *m;
606         uint32_t seq;
607         int32_t rc;
608
609         m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
610         if (m == NULL)
611                 return -ENOMEM;
612
613         seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0);
614         s->tcb.snd.ts = tms;
615
616         rc = send_ctrl_pkt(s, m, seq, flags);
617         if (rc != 0) {
618                 rte_pktmbuf_free(m);
619                 return rc;
620         }
621
622         s->tcb.snd.ack = s->tcb.rcv.nxt;
623         return 0;
624 }
625
626
627 static int
628 sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
629         const union seg_info *si, uint32_t ts, struct rte_mbuf *m)
630 {
631         uint16_t len;
632         int32_t rc;
633         uint32_t pid, seq, type;
634         struct tle_dev *dev;
635         const void *da;
636         struct tle_dest dst;
637         const struct tcp_hdr *th;
638
639         type = s->s.type;
640
641         /* get destination information. */
642         if (type == TLE_V4)
643                 da = &pi->addr4.src;
644         else
645                 da = &pi->addr6->src;
646
647         rc = stream_get_dest(&s->s, da, &dst);
648         if (rc < 0)
649                 return rc;
650
651         th = rte_pktmbuf_mtod_offset(m, const struct tcp_hdr *,
652                 m->l2_len + m->l3_len);
653         get_syn_opts(&s->tcb.so, (uintptr_t)(th + 1), m->l4_len - sizeof(*th));
654
655         s->tcb.rcv.nxt = si->seq + 1;
656         seq = sync_gen_seq(pi, s->tcb.rcv.nxt, ts, s->tcb.so.mss,
657                                 s->s.ctx->prm.hash_alg,
658                                 &s->s.ctx->prm.secret_key);
659         s->tcb.so.ts.ecr = s->tcb.so.ts.val;
660         s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale);
661         s->tcb.so.wscale = (s->tcb.so.wscale == TCP_WSCALE_NONE) ?
662                 TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
663         s->tcb.so.mss = calc_smss(dst.mtu, &dst);
664
665         /* reset mbuf's data contents. */
666         len = m->l2_len + m->l3_len + m->l4_len;
667         m->tx_offload = 0;
668         if (rte_pktmbuf_adj(m, len) == NULL)
669                 return -EINVAL;
670
671         dev = dst.dev;
672         pid = get_ip_pid(dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
673
674         rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq,
675                 TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
676         if (rc == 0)
677                 rc = send_pkt(s, dev, m);
678
679         return rc;
680 }
681
682 /*
683  * RFC 793:
684  * There are four cases for the acceptability test for an incoming segment:
685  * Segment Receive  Test
686  * Length  Window
687  * ------- -------  -------------------------------------------
688  *    0       0     SEG.SEQ = RCV.NXT
689  *    0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
690  *   >0       0     not acceptable
691  *   >0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
692  *                  or RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
693  */
694 static inline int
695 check_seqn(const struct tcb *tcb, uint32_t seqn, uint32_t len)
696 {
697         uint32_t n;
698
699         n = seqn + len;
700         if (seqn - tcb->rcv.nxt >= tcb->rcv.wnd &&
701                         n - tcb->rcv.nxt > tcb->rcv.wnd)
702                 return -ERANGE;
703
704         return 0;
705 }
706
707 static inline union tsopt
708 rx_tms_opt(const struct tcb *tcb, const struct rte_mbuf *mb)
709 {
710         union tsopt ts;
711         uintptr_t opt;
712         const struct tcp_hdr *th;
713
714         if (tcb->so.ts.val != 0) {
715                 opt = rte_pktmbuf_mtod_offset(mb, uintptr_t,
716                         mb->l2_len + mb->l3_len + sizeof(*th));
717                 ts = get_tms_opts(opt, mb->l4_len - sizeof(*th));
718         } else
719                 ts.raw = 0;
720
721         return ts;
722 }
723
724 /*
725  * PAWS and sequence check.
726  * RFC 1323 4.2.1
727  */
728 static inline int
729 rx_check_seq(struct tcb *tcb, uint32_t seq, uint32_t len, const union tsopt ts)
730 {
731         int32_t rc;
732
733         /* RFC 1323 4.2.1 R2 */
734         rc = check_seqn(tcb, seq, len);
735         if (rc < 0)
736                 return rc;
737
738         if (ts.raw != 0) {
739
740                 /* RFC 1323 4.2.1 R1 */
741                 if (tcp_seq_lt(ts.val, tcb->rcv.ts))
742                         return -ERANGE;
743
744                 /* RFC 1323 4.2.1 R3 */
745                 if (tcp_seq_leq(seq, tcb->snd.ack) &&
746                                 tcp_seq_lt(tcb->snd.ack, seq + len))
747                         tcb->rcv.ts = ts.val;
748         }
749
750         return rc;
751 }
752
753 static inline int
754 rx_check_ack(const struct tcb *tcb, uint32_t ack)
755 {
756         uint32_t max;
757
758         max = (uint32_t)RTE_MAX(tcb->snd.nxt, tcb->snd.rcvr);
759
760         if (tcp_seq_leq(tcb->snd.una, ack) && tcp_seq_leq(ack, max))
761                 return 0;
762
763         return -ERANGE;
764 }
765
766 static inline int
767 rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
768         const union tsopt ts)
769 {
770         int32_t rc;
771
772         rc = rx_check_seq(tcb, seq, len, ts);
773         rc |= rx_check_ack(tcb, ack);
774         return rc;
775 }
776
777 static inline int
778 restore_syn_opt(union seg_info *si, union tsopt *to,
779         const union pkt_info *pi, uint32_t ts, const struct rte_mbuf *mb,
780         uint32_t hash_alg, rte_xmm_t *secret_key)
781 {
782         int32_t rc;
783         uint32_t len;
784         const struct tcp_hdr *th;
785
786         /* check that ACK, etc fields are what we expected. */
787         rc = sync_check_ack(pi, si->seq, si->ack - 1, ts,
788                                 hash_alg,
789                                 secret_key);
790         if (rc < 0)
791                 return rc;
792
793         si->mss = rc;
794
795         th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
796                 mb->l2_len + mb->l3_len);
797         len = mb->l4_len - sizeof(*th);
798         to[0] = get_tms_opts((uintptr_t)(th + 1), len);
799         return 0;
800 }
801
802 static inline void
803 stream_term(struct tle_tcp_stream *s)
804 {
805         struct sdr *dr;
806
807         s->tcb.state = TCP_ST_CLOSED;
808         rte_smp_wmb();
809
810         timer_stop(s);
811
812         /* close() was already invoked, schedule final cleanup */
813         if ((s->tcb.uop & TCP_OP_CLOSE) != 0) {
814
815                 dr = CTX_TCP_SDR(s->s.ctx);
816                 STAILQ_INSERT_TAIL(&dr->be, &s->s, link);
817
818         /* notify user that stream need to be closed */
819         } else if (s->err.ev != NULL)
820                 tle_event_raise(s->err.ev);
821         else if (s->err.cb.func != NULL)
822                 s->err.cb.func(s->err.cb.data, &s->s);
823 }
824
825 static inline int
826 stream_fill_dest(struct tle_tcp_stream *s)
827 {
828         int32_t rc;
829         uint32_t type;
830         const void *da;
831
832         type = s->s.type;
833         if (type == TLE_V4)
834                 da = &s->s.ipv4.addr.src;
835         else
836                 da = &s->s.ipv6.addr.src;
837
838         rc = stream_get_dest(&s->s, da, &s->tx.dst);
839         return (rc < 0) ? rc : 0;
840 }
841
842 /*
843  * helper function, prepares a new accept stream.
844  */
845 static inline int
846 accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
847         struct tle_tcp_stream *cs, const union tsopt *to,
848         uint32_t tms, const union pkt_info *pi, const union seg_info *si)
849 {
850         int32_t rc;
851         uint32_t rtt;
852
853         /* some TX still pending for that stream. */
854         if (TCP_STREAM_TX_PENDING(cs))
855                 return -EAGAIN;
856
857         /* setup L4 ports and L3 addresses fields. */
858         cs->s.port.raw = pi->port.raw;
859         cs->s.pmsk.raw = UINT32_MAX;
860
861         if (pi->tf.type == TLE_V4) {
862                 cs->s.ipv4.addr = pi->addr4;
863                 cs->s.ipv4.mask.src = INADDR_NONE;
864                 cs->s.ipv4.mask.dst = INADDR_NONE;
865         } else if (pi->tf.type == TLE_V6) {
866                 cs->s.ipv6.addr = *pi->addr6;
867                 rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
868                         sizeof(cs->s.ipv6.mask.src));
869                 rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
870                         sizeof(cs->s.ipv6.mask.dst));
871         }
872
873         /* setup TCB */
874         sync_fill_tcb(&cs->tcb, si, to);
875         cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
876
877         /*
878          * estimate the rto
879          * for now rtt is calculated based on the tcp TMS option,
880          * later add real-time one
881          */
882         if (cs->tcb.so.ts.ecr) {
883                 rtt = tms - cs->tcb.so.ts.ecr;
884                 rto_estimate(&cs->tcb, rtt);
885         } else
886                 cs->tcb.snd.rto = TCP_RTO_DEFAULT;
887
888         /* copy streams type & flags. */
889         cs->s.type = ps->s.type;
890         cs->flags = ps->flags;
891
892         /* retrive and cache destination information. */
893         rc = stream_fill_dest(cs);
894         if (rc != 0)
895                 return rc;
896
897         /* update snd.mss with SMSS value */
898         cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
899
900         /* setup congestion variables */
901         cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss, ps->tcb.snd.cwnd);
902         cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
903         cs->tcb.snd.rto_tw = ps->tcb.snd.rto_tw;
904
905         cs->tcb.state = TCP_ST_ESTABLISHED;
906
907         /* add stream to the table */
908         cs->ste = stbl_add_stream(st, pi, cs);
909         if (cs->ste == NULL)
910                 return -ENOBUFS;
911
912         cs->tcb.uop |= TCP_OP_ACCEPT;
913         tcp_stream_up(cs);
914         return 0;
915 }
916
917
918 /*
919  * ACK for new connection request arrived.
920  * Check that the packet meets all conditions and try to open a new stream.
921  * returns:
922  * < 0  - invalid packet
923  * == 0 - packet is valid and new stream was opened for it.
924  * > 0  - packet is valid, but failed to open new stream.
925  */
926 static inline int
927 rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
928         const union pkt_info *pi, union seg_info *si,
929         uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
930 {
931         int32_t rc;
932         struct tle_ctx *ctx;
933         struct tle_stream *ts;
934         struct tle_tcp_stream *cs;
935         union tsopt to;
936
937         *csp = NULL;
938
939         if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
940                 return -EINVAL;
941
942         ctx = s->s.ctx;
943         rc = restore_syn_opt(si, &to, pi, tms, mb, ctx->prm.hash_alg,
944                                 &ctx->prm.secret_key);
945         if (rc < 0)
946                 return rc;
947
948         /* allocate new stream */
949         ts = get_stream(ctx);
950         cs = TCP_STREAM(ts);
951         if (ts == NULL)
952                 return ENFILE;
953
954         /* prepare stream to handle new connection */
955         if (accept_prep_stream(s, st, cs, &to, tms, pi, si) == 0) {
956
957                 /* put new stream in the accept queue */
958                 if (_rte_ring_enqueue_burst(s->rx.q,
959                                 (void * const *)&ts, 1) == 1) {
960                         *csp = cs;
961                         return 0;
962                 }
963
964                 /* cleanup on failure */
965                 tcp_stream_down(cs);
966                 stbl_del_stream(st, cs->ste, cs, 0);
967                 cs->ste = NULL;
968         }
969
970         tcp_stream_reset(ctx, cs);
971         return ENOBUFS;
972 }
973
974 static inline int
975 data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf *mb, uint32_t hlen,
976         uint32_t *seqn, uint32_t *plen)
977 {
978         uint32_t len, n, seq;
979
980         seq = *seqn;
981         len = *plen;
982
983         rte_pktmbuf_adj(mb, hlen);
984         if (len == 0)
985                 return -ENODATA;
986         /* cut off the start of the packet */
987         else if (tcp_seq_lt(seq, tcb->rcv.nxt)) {
988                 n = tcb->rcv.nxt - seq;
989                 if (n >= len)
990                         return -ENODATA;
991
992                 rte_pktmbuf_adj(mb, n);
993                 *seqn = seq + n;
994                 *plen = len - n;
995         }
996
997         return 0;
998 }
999
1000 static inline uint32_t
1001 rx_ackdata(struct tle_tcp_stream *s, uint32_t ack)
1002 {
1003         uint32_t k, n;
1004
1005         n = ack - (uint32_t)s->tcb.snd.una;
1006
1007         /* some more data was acked. */
1008         if (n != 0) {
1009
1010                 /* advance SND.UNA and free related packets. */
1011                 k = rte_ring_free_count(s->tx.q);
1012                 free_una_data(s, n);
1013
1014                 /* mark the stream as available for writing */
1015                 if (rte_ring_free_count(s->tx.q) != 0) {
1016                         if (s->tx.ev != NULL)
1017                                 tle_event_raise(s->tx.ev);
1018                         else if (k == 0 && s->tx.cb.func != NULL)
1019                                 s->tx.cb.func(s->tx.cb.data, &s->s);
1020                 }
1021         }
1022
1023         return n;
1024 }
1025
1026 static void
1027 stream_timewait(struct tle_tcp_stream *s, uint32_t rto)
1028 {
1029         if (rto != 0) {
1030                 s->tcb.state = TCP_ST_TIME_WAIT;
1031                 s->tcb.snd.rto = rto;
1032                 timer_reset(s);
1033         } else
1034                 stream_term(s);
1035 }
1036
1037 static void
1038 rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
1039 {
1040         uint32_t state;
1041         int32_t ackfin;
1042
1043         s->tcb.rcv.nxt += 1;
1044
1045         ackfin = (s->tcb.snd.una == s->tcb.snd.fss);
1046         state = s->tcb.state;
1047
1048         if (state == TCP_ST_ESTABLISHED) {
1049                 s->tcb.state = TCP_ST_CLOSE_WAIT;
1050                 /* raise err.ev & err.cb */
1051                 if (s->err.ev != NULL)
1052                         tle_event_raise(s->err.ev);
1053                 else if (s->err.cb.func != NULL)
1054                         s->err.cb.func(s->err.cb.data, &s->s);
1055         } else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) {
1056                 rsp->flags |= TCP_FLAG_ACK;
1057                 if (ackfin != 0)
1058                         stream_timewait(s, s->tcb.snd.rto_tw);
1059                 else
1060                         s->tcb.state = TCP_ST_CLOSING;
1061         } else if (state == TCP_ST_FIN_WAIT_2) {
1062                 rsp->flags |= TCP_FLAG_ACK;
1063                 stream_timewait(s, s->tcb.snd.rto_tw);
1064         } else if (state == TCP_ST_LAST_ACK && ackfin != 0) {
1065                 stream_term(s);
1066         }
1067 }
1068
1069 /*
1070  * FIN process for ESTABLISHED state
1071  * returns:
1072  * 0 < - error occurred
1073  * 0 - FIN was processed OK, and mbuf can be free/reused.
1074  * 0 > - FIN was processed OK and mbuf can't be free/reused.
1075  */
1076 static inline int
1077 rx_fin(struct tle_tcp_stream *s, uint32_t state,
1078         const union seg_info *si, struct rte_mbuf *mb,
1079         struct resp_info *rsp)
1080 {
1081         uint32_t hlen, plen, seq;
1082         int32_t ret;
1083         union tsopt ts;
1084
1085         hlen = PKT_L234_HLEN(mb);
1086         plen = mb->pkt_len - hlen;
1087         seq = si->seq;
1088
1089         ts = rx_tms_opt(&s->tcb, mb);
1090         ret = rx_check_seqack(&s->tcb, seq, si->ack, plen, ts);
1091         if (ret != 0)
1092                 return ret;
1093
1094         if (state < TCP_ST_ESTABLISHED)
1095                 return -EINVAL;
1096
1097         if (plen != 0) {
1098
1099                 ret = data_pkt_adjust(&s->tcb, mb, hlen, &seq, &plen);
1100                 if (ret != 0)
1101                         return ret;
1102                 if (rx_data_enqueue(s, seq, plen, &mb, 1) != 1)
1103                         return -ENOBUFS;
1104         }
1105
1106         /*
1107          * fast-path: all data & FIN was already sent out
1108          * and now is acknowledged.
1109          */
1110         if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1111                         si->ack == (uint32_t)s->tcb.snd.nxt) {
1112                 s->tcb.snd.una = s->tcb.snd.fss;
1113                 empty_tq(s);
1114         /* conventional ACK processiing */
1115         } else
1116                 rx_ackdata(s, si->ack);
1117
1118         /* some fragments still missing */
1119         if (seq + plen != s->tcb.rcv.nxt) {
1120                 s->tcb.rcv.frs.seq = seq + plen;
1121                 s->tcb.rcv.frs.on = 1;
1122         } else
1123                 rx_fin_state(s, rsp);
1124
1125         return plen;
1126 }
1127
1128 static inline int
1129 rx_rst(struct tle_tcp_stream *s, uint32_t state, uint32_t flags,
1130         const union seg_info *si)
1131 {
1132         int32_t rc;
1133
1134         /*
1135          * RFC 793: In all states except SYN-SENT, all reset (RST) segments
1136          * are validated by checking their SEQ-fields.
1137          * A reset is valid if its sequence number is in the window.
1138          * In the SYN-SENT state (a RST received in response to an initial SYN),
1139          * the RST is acceptable if the ACK field acknowledges the SYN.
1140          */
1141         if (state == TCP_ST_SYN_SENT) {
1142                 rc = ((flags & TCP_FLAG_ACK) == 0 ||
1143                                 si->ack != s->tcb.snd.nxt) ?
1144                         -ERANGE : 0;
1145         }
1146
1147         else
1148                 rc = check_seqn(&s->tcb, si->seq, 0);
1149
1150         if (rc == 0)
1151                 stream_term(s);
1152
1153         return rc;
1154 }
1155
1156 /*
1157  *  check do we have FIN  that was received out-of-order.
1158  *  if yes, try to process it now.
1159  */
1160 static inline void
1161 rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp)
1162 {
1163         if (s->tcb.rcv.frs.on != 0 && s->tcb.rcv.nxt == s->tcb.rcv.frs.seq)
1164                 rx_fin_state(s, rsp);
1165 }
1166
1167 static inline void
1168 dack_info_init(struct dack_info *tack, const struct tcb *tcb)
1169 {
1170         static const struct dack_info zero_dack;
1171
1172         tack[0] = zero_dack;
1173         tack->ack = tcb->snd.una;
1174         tack->segs.dup = tcb->rcv.dupack;
1175         tack->wu.raw = tcb->snd.wu.raw;
1176         tack->wnd = tcb->snd.wnd >> tcb->snd.wscale;
1177 }
1178
1179 static inline void
1180 ack_window_update(struct tcb *tcb, const struct dack_info *tack)
1181 {
1182         tcb->snd.wu.raw = tack->wu.raw;
1183         tcb->snd.wnd = tack->wnd << tcb->snd.wscale;
1184 }
1185
1186 static inline void
1187 ack_cwnd_update(struct tcb *tcb, uint32_t acked, const struct dack_info *tack)
1188 {
1189         uint32_t n;
1190
1191         n = tack->segs.ack * tcb->snd.mss;
1192
1193         /* slow start phase, RFC 5681 3.1 (2)  */
1194         if (tcb->snd.cwnd < tcb->snd.ssthresh)
1195                 tcb->snd.cwnd += RTE_MIN(acked, n);
1196         /* congestion avoidance phase, RFC 5681 3.1 (3) */
1197         else
1198                 tcb->snd.cwnd += RTE_MAX(1U, n * tcb->snd.mss / tcb->snd.cwnd);
1199 }
1200
1201 static inline void
1202 rto_ssthresh_update(struct tcb *tcb)
1203 {
1204         uint32_t k, n;
1205
1206         /* RFC 5681 3.1 (4)  */
1207         n = (tcb->snd.nxt - tcb->snd.una) / 2;
1208         k = 2 * tcb->snd.mss;
1209         tcb->snd.ssthresh = RTE_MAX(n, k);
1210 }
1211
1212 static inline void
1213 rto_cwnd_update(struct tcb *tcb)
1214 {
1215
1216         if (tcb->snd.nb_retx == 0)
1217                 rto_ssthresh_update(tcb);
1218
1219         /*
1220          * RFC 5681 3.1: upon a timeout cwnd MUST be set to
1221          * no more than 1 full-sized segment.
1222          */
1223         tcb->snd.cwnd = tcb->snd.mss;
1224 }
1225
1226 static inline void
1227 ack_info_update(struct dack_info *tack, const union seg_info *si,
1228         int32_t badseq, uint32_t dlen, const union tsopt ts)
1229 {
1230         if (badseq != 0) {
1231                 tack->segs.badseq++;
1232                 return;
1233         }
1234
1235         /* segnt with incoming data */
1236         tack->segs.data += (dlen != 0);
1237
1238         /* segment with newly acked data */
1239         if (tcp_seq_lt(tack->ack, si->ack)) {
1240                 tack->segs.dup = 0;
1241                 tack->segs.ack++;
1242                 tack->ack = si->ack;
1243                 tack->ts = ts;
1244
1245         /*
1246          * RFC 5681: An acknowledgment is considered a "duplicate" when:
1247          * (a) the receiver of the ACK has outstanding data
1248          * (b) the incoming acknowledgment carries no data
1249          * (c) the SYN and FIN bits are both off
1250          * (d) the acknowledgment number is equal to the TCP.UNA
1251          * (e) the advertised window in the incoming acknowledgment equals the
1252          * advertised window in the last incoming acknowledgment.
1253          *
1254          * Here will have only to check only for (b),(d),(e).
1255          * (a) will be checked later for the whole bulk of packets,
1256          * (c) should never happen here.
1257          */
1258         } else if (dlen == 0 && si->wnd == tack->wnd && ++tack->segs.dup == 3) {
1259                 tack->dup3.seg = tack->segs.ack + 1;
1260                 tack->dup3.ack = tack->ack;
1261         }
1262
1263         /*
1264          * RFC 793:
1265          * If SND.UNA < SEG.ACK =< SND.NXT, the send window should be
1266          * updated.  If (SND.WL1 < SEG.SEQ or (SND.WL1 = SEG.SEQ and
1267          * SND.WL2 =< SEG.ACK)), set SND.WND <- SEG.WND, set
1268          * SND.WL1 <- SEG.SEQ, and set SND.WL2 <- SEG.ACK.
1269          */
1270         if (tcp_seq_lt(tack->wu.wl1, si->seq) ||
1271                         (si->seq == tack->wu.wl1 &&
1272                         tcp_seq_leq(tack->wu.wl2, si->ack))) {
1273
1274                 tack->wu.wl1 = si->seq;
1275                 tack->wu.wl2 = si->ack;
1276                 tack->wnd = si->wnd;
1277         }
1278 }
1279
1280 static inline uint32_t
1281 rx_data_ack(struct tle_tcp_stream *s, struct dack_info *tack,
1282         const union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[],
1283         int32_t rc[], uint32_t num)
1284 {
1285         uint32_t i, j, k, n, t;
1286         uint32_t hlen, plen, seq, tlen;
1287         int32_t ret;
1288         union tsopt ts;
1289
1290         k = 0;
1291         for (i = 0; i != num; i = j) {
1292
1293                 hlen = PKT_L234_HLEN(mb[i]);
1294                 plen = mb[i]->pkt_len - hlen;
1295                 seq = si[i].seq;
1296
1297                 ts = rx_tms_opt(&s->tcb, mb[i]);
1298                 ret = rx_check_seqack(&s->tcb, seq, si[i].ack, plen, ts);
1299
1300                 /* account segment received */
1301                 ack_info_update(tack, &si[i], ret != 0, plen, ts);
1302
1303                 if (ret == 0) {
1304                         /* skip duplicate data, if any */
1305                         ret = data_pkt_adjust(&s->tcb, mb[i], hlen,
1306                                 &seq, &plen);
1307                 }
1308
1309                 j = i + 1;
1310                 if (ret != 0) {
1311                         rp[k] = mb[i];
1312                         rc[k] = -ret;
1313                         k++;
1314                         continue;
1315                 }
1316
1317                 /* group sequential packets together. */
1318                 for (tlen = plen; j != num; tlen += plen, j++) {
1319
1320                         hlen = PKT_L234_HLEN(mb[j]);
1321                         plen = mb[j]->pkt_len - hlen;
1322
1323                         /* not consecutive packet */
1324                         if (plen == 0 || seq + tlen != si[j].seq)
1325                                 break;
1326
1327                         /* check SEQ/ACK */
1328                         ts = rx_tms_opt(&s->tcb, mb[j]);
1329                         ret = rx_check_seqack(&s->tcb, si[j].seq, si[j].ack,
1330                                 plen, ts);
1331
1332                         /* account for segment received */
1333                         ack_info_update(tack, &si[j], ret != 0, plen, ts);
1334
1335                         if (ret != 0) {
1336                                 rp[k] = mb[j];
1337                                 rc[k] = -ret;
1338                                 k++;
1339                                 break;
1340                         }
1341                         rte_pktmbuf_adj(mb[j], hlen);
1342                 }
1343
1344                 n = j - i;
1345                 j += (ret != 0);
1346
1347                 /* account for OFO data */
1348                 if (seq != s->tcb.rcv.nxt)
1349                         tack->segs.ofo += n;
1350
1351                 /* enqueue packets */
1352                 t = rx_data_enqueue(s, seq, tlen, mb + i, n);
1353
1354                 /* if we are out of space in stream recv buffer. */
1355                 for (; t != n; t++) {
1356                         rp[k] = mb[i + t];
1357                         rc[k] = -ENOBUFS;
1358                         k++;
1359                 }
1360         }
1361
1362         return num - k;
1363 }
1364
1365 static inline void
1366 start_fast_retransmit(struct tle_tcp_stream *s)
1367 {
1368         struct tcb *tcb;
1369
1370         tcb = &s->tcb;
1371
1372         /* RFC 6582 3.2.2 */
1373         tcb->snd.rcvr = tcb->snd.nxt;
1374         tcb->snd.fastack = 1;
1375
1376         /* RFC 5681 3.2.2 */
1377         rto_ssthresh_update(tcb);
1378
1379         /* RFC 5681 3.2.3 */
1380         tcp_txq_rst_nxt_head(s);
1381         tcb->snd.nxt = tcb->snd.una;
1382         tcb->snd.cwnd = tcb->snd.ssthresh + 3 * tcb->snd.mss;
1383 }
1384
1385 static inline void
1386 stop_fast_retransmit(struct tle_tcp_stream *s)
1387 {
1388         struct tcb *tcb;
1389         uint32_t n;
1390
1391         tcb = &s->tcb;
1392         n = tcb->snd.nxt - tcb->snd.una;
1393         tcb->snd.cwnd = RTE_MIN(tcb->snd.ssthresh,
1394                 RTE_MAX(n, tcb->snd.mss) + tcb->snd.mss);
1395         tcb->snd.fastack = 0;
1396 }
1397
1398 static inline int
1399 in_fast_retransmit(struct tle_tcp_stream *s, uint32_t ack_len, uint32_t ack_num,
1400         uint32_t dup_num)
1401 {
1402         uint32_t n;
1403         struct tcb *tcb;
1404
1405         tcb = &s->tcb;
1406
1407         /* RFC 5682 3.2.3 partial ACK */
1408         if (ack_len != 0) {
1409
1410                 n = ack_num * tcb->snd.mss;
1411                 if (ack_len >= n)
1412                         tcb->snd.cwnd -= ack_len - n;
1413                 else
1414                         tcb->snd.cwnd -= ack_len % tcb->snd.mss;
1415
1416                 /*
1417                  * For the first partial ACK that arrives
1418                  * during fast recovery, also reset the
1419                  * retransmit timer.
1420                  */
1421                 if (tcb->snd.fastack == 1)
1422                         timer_reset(s);
1423
1424                 tcb->snd.fastack += ack_num;
1425                 return 1;
1426
1427         /* RFC 5681 3.2.4 */
1428         } else if (dup_num > 3) {
1429                 s->tcb.snd.cwnd += (dup_num - 3) * tcb->snd.mss;
1430                 return 1;
1431         }
1432
1433         return 0;
1434 }
1435
1436 static inline int
1437 process_ack(struct tle_tcp_stream *s, uint32_t acked,
1438         const struct dack_info *tack)
1439 {
1440         int32_t send;
1441
1442         send = 0;
1443
1444         /* normal mode */
1445         if (s->tcb.snd.fastack == 0) {
1446
1447                 send = 1;
1448
1449                 /* RFC 6582 3.2.2 switch to fast retransmit mode */
1450                 if (tack->dup3.seg != 0 && s->tcb.snd.una != s->tcb.snd.nxt &&
1451                                 s->tcb.snd.una >= s->tcb.snd.rcvr) {
1452
1453                         start_fast_retransmit(s);
1454                         in_fast_retransmit(s,
1455                                 tack->ack - tack->dup3.ack,
1456                                 tack->segs.ack - tack->dup3.seg - 1,
1457                                 tack->segs.dup);
1458
1459                 /* remain in normal mode */
1460                 } else if (acked != 0) {
1461                         ack_cwnd_update(&s->tcb, acked, tack);
1462                         timer_stop(s);
1463                 }
1464
1465         /* fast retransmit mode */
1466         } else {
1467
1468                 /* remain in fast retransmit mode */
1469                 if (s->tcb.snd.una < s->tcb.snd.rcvr) {
1470
1471                         send = in_fast_retransmit(s, acked, tack->segs.ack,
1472                                 tack->segs.dup);
1473                 } else {
1474                         /* RFC 5682 3.2.3 full ACK */
1475                         stop_fast_retransmit(s);
1476                         timer_stop(s);
1477
1478                         /* if we have another series of dup ACKs */
1479                         if (tack->dup3.seg != 0 &&
1480                                         s->tcb.snd.una != s->tcb.snd.nxt &&
1481                                         tcp_seq_leq((uint32_t)s->tcb.snd.rcvr,
1482                                         tack->dup3.ack)) {
1483
1484                                 /* restart fast retransmit again. */
1485                                 start_fast_retransmit(s);
1486                                 send = in_fast_retransmit(s,
1487                                         tack->ack - tack->dup3.ack,
1488                                         tack->segs.ack - tack->dup3.seg - 1,
1489                                         tack->segs.dup);
1490                         }
1491                 }
1492         }
1493
1494         return send;
1495 }
1496
1497 /*
1498  * our FIN was acked, stop rto timer, change stream state,
1499  * and possibly close the stream.
1500  */
1501 static inline void
1502 rx_ackfin(struct tle_tcp_stream *s)
1503 {
1504         uint32_t state;
1505
1506         s->tcb.snd.una = s->tcb.snd.fss;
1507         empty_tq(s);
1508
1509         state = s->tcb.state;
1510         if (state == TCP_ST_LAST_ACK)
1511                 stream_term(s);
1512         else if (state == TCP_ST_FIN_WAIT_1) {
1513                 timer_stop(s);
1514                 s->tcb.state = TCP_ST_FIN_WAIT_2;
1515         } else if (state == TCP_ST_CLOSING) {
1516                 stream_timewait(s, s->tcb.snd.rto_tw);
1517         }
1518 }
1519
1520 static inline void
1521 rx_process_ack(struct tle_tcp_stream *s, uint32_t ts,
1522         const struct dack_info *tack)
1523 {
1524         int32_t send;
1525         uint32_t n;
1526
1527         s->tcb.rcv.dupack = tack->segs.dup;
1528
1529         n = rx_ackdata(s, tack->ack);
1530         send = process_ack(s, n, tack);
1531
1532         /* try to send more data. */
1533         if ((n != 0 || send != 0) && tcp_txq_nxt_cnt(s) != 0)
1534                 txs_enqueue(s->s.ctx, s);
1535
1536         /* restart RTO timer. */
1537         if (s->tcb.snd.nxt != s->tcb.snd.una)
1538                 timer_start(s);
1539
1540         /* update rto, if fresh packet is here then calculate rtt */
1541         if (tack->ts.ecr != 0)
1542                 rto_estimate(&s->tcb, ts - tack->ts.ecr);
1543 }
1544
1545 /*
1546  * process <SYN,ACK>
1547  * returns negative value on failure, or zero on success.
1548  */
1549 static inline int
1550 rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
1551         const union seg_info *si, struct rte_mbuf *mb,
1552         struct resp_info *rsp)
1553 {
1554         struct syn_opts so;
1555         struct tcp_hdr *th;
1556
1557         if (state != TCP_ST_SYN_SENT)
1558                 return -EINVAL;
1559
1560         /* invalid SEG.SEQ */
1561         if (si->ack != (uint32_t)s->tcb.snd.nxt) {
1562                 rsp->flags = TCP_FLAG_RST;
1563                 return 0;
1564         }
1565
1566         th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *,
1567                 mb->l2_len + mb->l3_len);
1568         get_syn_opts(&so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th));
1569
1570         s->tcb.so = so;
1571
1572         s->tcb.snd.una = s->tcb.snd.nxt;
1573         s->tcb.snd.mss = calc_smss(so.mss, &s->tx.dst);
1574         s->tcb.snd.wnd = si->wnd << so.wscale;
1575         s->tcb.snd.wu.wl1 = si->seq;
1576         s->tcb.snd.wu.wl2 = si->ack;
1577         s->tcb.snd.wscale = so.wscale;
1578
1579         /* setup congestion variables */
1580         s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd);
1581         s->tcb.snd.ssthresh = s->tcb.snd.wnd;
1582
1583         s->tcb.rcv.ts = so.ts.val;
1584         s->tcb.rcv.irs = si->seq;
1585         s->tcb.rcv.nxt = si->seq + 1;
1586
1587         /* if peer doesn't support WSCALE opt, recalculate RCV.WND */
1588         s->tcb.rcv.wscale = (so.wscale == TCP_WSCALE_NONE) ?
1589                 TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
1590         s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
1591
1592         /* calculate initial rto */
1593         rto_estimate(&s->tcb, ts - s->tcb.snd.ts);
1594
1595         rsp->flags |= TCP_FLAG_ACK;
1596
1597         timer_stop(s);
1598         s->tcb.state = TCP_ST_ESTABLISHED;
1599         rte_smp_wmb();
1600
1601         if (s->tx.ev != NULL)
1602                 tle_event_raise(s->tx.ev);
1603         else if (s->tx.cb.func != NULL)
1604                 s->tx.cb.func(s->tx.cb.data, &s->s);
1605
1606         return 0;
1607 }
1608
1609 static inline uint32_t
1610 rx_stream(struct tle_tcp_stream *s, uint32_t ts,
1611         const union pkt_info *pi, const union seg_info si[],
1612         struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1613         uint32_t num)
1614 {
1615         uint32_t i, k, n, state;
1616         int32_t ret;
1617         struct resp_info rsp;
1618         struct dack_info tack;
1619
1620         k = 0;
1621         rsp.flags = 0;
1622
1623         state = s->tcb.state;
1624
1625         /*
1626          * first check for the states/flags where we don't
1627          * expect groups of packets.
1628          */
1629
1630         /* process RST */
1631         if ((pi->tf.flags & TCP_FLAG_RST) != 0) {
1632                 for (i = 0;
1633                                 i != num &&
1634                                 rx_rst(s, state, pi->tf.flags, &si[i]);
1635                                 i++)
1636                         ;
1637                 i = 0;
1638
1639         /* RFC 793: if the ACK bit is off drop the segment and return */
1640         } else if ((pi->tf.flags & TCP_FLAG_ACK) == 0) {
1641                 i = 0;
1642         /*
1643          * first check for the states/flags where we don't
1644          * expect groups of packets.
1645          */
1646
1647         /* process <SYN,ACK> */
1648         } else if ((pi->tf.flags & TCP_FLAG_SYN) != 0) {
1649                 for (i = 0; i != num; i++) {
1650                         ret = rx_synack(s, ts, state, &si[i], mb[i], &rsp);
1651                         if (ret == 0)
1652                                 break;
1653
1654                         rc[k] = -ret;
1655                         rp[k] = mb[i];
1656                         k++;
1657                 }
1658
1659         /* process FIN */
1660         } else if ((pi->tf.flags & TCP_FLAG_FIN) != 0) {
1661                 ret = 0;
1662                 for (i = 0; i != num; i++) {
1663                         ret = rx_fin(s, state, &si[i], mb[i], &rsp);
1664                         if (ret >= 0)
1665                                 break;
1666
1667                         rc[k] = -ret;
1668                         rp[k] = mb[i];
1669                         k++;
1670                 }
1671                 i += (ret > 0);
1672
1673         /* normal data/ack packets */
1674         } else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
1675
1676                 /* process incoming data packets. */
1677                 dack_info_init(&tack, &s->tcb);
1678                 n = rx_data_ack(s, &tack, si, mb, rp, rc, num);
1679
1680                 /* follow up actions based on aggregated information */
1681
1682                 /* update SND.WND */
1683                 ack_window_update(&s->tcb, &tack);
1684
1685                 /*
1686                  * fast-path: all data & FIN was already sent out
1687                  * and now is acknowledged.
1688                  */
1689                 if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1690                                 tack.ack == (uint32_t)s->tcb.snd.nxt)
1691                         rx_ackfin(s);
1692                 else
1693                         rx_process_ack(s, ts, &tack);
1694
1695                 /*
1696                  * send an immediate ACK if either:
1697                  * - received segment with invalid seq/ack number
1698                  * - received segment with OFO data
1699                  * - received segment with INO data and no TX is scheduled
1700                  *   for that stream.
1701                  */
1702                 if (tack.segs.badseq != 0 || tack.segs.ofo != 0 ||
1703                                 (tack.segs.data != 0 &&
1704                                 rte_atomic32_read(&s->tx.arm) == 0))
1705                         rsp.flags |= TCP_FLAG_ACK;
1706
1707                 rx_ofo_fin(s, &rsp);
1708
1709                 k += num - n;
1710                 i = num;
1711
1712         /* unhandled state, drop all packets. */
1713         } else
1714                 i = 0;
1715
1716         /* we have a response packet to send. */
1717         if (rsp.flags == TCP_FLAG_RST) {
1718                 send_rst(s, si[i].ack);
1719                 stream_term(s);
1720         } else if (rsp.flags != 0) {
1721                 send_ack(s, ts, rsp.flags);
1722
1723                 /* start the timer for FIN packet */
1724                 if ((rsp.flags & TCP_FLAG_FIN) != 0)
1725                         timer_reset(s);
1726         }
1727
1728         /* unprocessed packets */
1729         for (; i != num; i++, k++) {
1730                 rc[k] = ENODATA;
1731                 rp[k] = mb[i];
1732         }
1733
1734         return num - k;
1735 }
1736
1737 static inline uint32_t
1738 rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
1739         const union pkt_info *pi, const union seg_info si[],
1740         struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1741         uint32_t num)
1742 {
1743         uint32_t i;
1744
1745         if (tcp_stream_acquire(s) > 0) {
1746                 i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1747                 tcp_stream_release(s);
1748                 return i;
1749         }
1750
1751         for (i = 0; i != num; i++) {
1752                 rc[i] = ENOENT;
1753                 rp[i] = mb[i];
1754         }
1755         return 0;
1756 }
1757
1758 static inline uint32_t
1759 rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
1760         const union pkt_info pi[], union seg_info si[],
1761         struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1762         uint32_t num)
1763 {
1764         struct tle_tcp_stream *cs, *s;
1765         uint32_t i, k, n, state;
1766         int32_t ret;
1767
1768         s = rx_obtain_stream(dev, st, &pi[0], type);
1769         if (s == NULL) {
1770                 for (i = 0; i != num; i++) {
1771                         rc[i] = ENOENT;
1772                         rp[i] = mb[i];
1773                 }
1774                 return 0;
1775         }
1776
1777         k = 0;
1778         state = s->tcb.state;
1779
1780         if (state == TCP_ST_LISTEN) {
1781
1782                 /* one connection per flow */
1783                 cs = NULL;
1784                 ret = -EINVAL;
1785                 for (i = 0; i != num; i++) {
1786
1787                         ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs);
1788
1789                         /* valid packet encountered */
1790                         if (ret >= 0)
1791                                 break;
1792
1793                         /* invalid packet, keep trying to find a proper one */
1794                         rc[k] = -ret;
1795                         rp[k] = mb[i];
1796                         k++;
1797                 }
1798
1799                 /* packet is valid, but we are out of streams to serve it */
1800                 if (ret > 0) {
1801                         for (; i != num; i++, k++) {
1802                                 rc[k] = ret;
1803                                 rp[k] = mb[i];
1804                         }
1805                 /* new stream is accepted */
1806                 } else if (ret == 0) {
1807
1808                         /* inform listen stream about new connections */
1809                         if (s->rx.ev != NULL)
1810                                 tle_event_raise(s->rx.ev);
1811                         else if (s->rx.cb.func != NULL &&
1812                                         rte_ring_count(s->rx.q) == 1)
1813                                 s->rx.cb.func(s->rx.cb.data, &s->s);
1814
1815                         /* if there is no data, drop current packet */
1816                         if (PKT_L4_PLEN(mb[i]) == 0) {
1817                                 rc[k] = ENODATA;
1818                                 rp[k++] = mb[i++];
1819                         }
1820
1821                         /*  process remaining packets for that stream */
1822                         if (num != i) {
1823                                 n = rx_new_stream(cs, ts, pi + i, si + i,
1824                                         mb + i, rp + k, rc + k, num - i);
1825                                 k += num - n - i;
1826                         }
1827                 }
1828
1829         } else {
1830                 i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1831                 k = num - i;
1832         }
1833
1834         tcp_stream_release(s);
1835         return num - k;
1836 }
1837
1838
1839 static inline uint32_t
1840 rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
1841         const union pkt_info pi[], const union seg_info si[],
1842         struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1843         uint32_t num)
1844 {
1845         struct tle_tcp_stream *s;
1846         uint32_t i, k;
1847         int32_t ret;
1848
1849         s = rx_obtain_listen_stream(dev, &pi[0], type);
1850         if (s == NULL) {
1851                 for (i = 0; i != num; i++) {
1852                         rc[i] = ENOENT;
1853                         rp[i] = mb[i];
1854                 }
1855                 return 0;
1856         }
1857
1858         k = 0;
1859         for (i = 0; i != num; i++) {
1860
1861                 /* check that this remote is allowed to connect */
1862                 if (rx_check_stream(s, &pi[i]) != 0)
1863                         ret = -ENOENT;
1864                 else
1865                         /* syncokie: reply with <SYN,ACK> */
1866                         ret = sync_ack(s, &pi[i], &si[i], ts, mb[i]);
1867
1868                 if (ret != 0) {
1869                         rc[k] = -ret;
1870                         rp[k] = mb[i];
1871                         k++;
1872                 }
1873         }
1874
1875         tcp_stream_release(s);
1876         return num - k;
1877 }
1878
1879 uint16_t
1880 tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
1881         struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
1882 {
1883         struct stbl *st;
1884         struct tle_ctx *ctx;
1885         uint32_t i, j, k, mt, n, t, ts;
1886         uint64_t csf;
1887         union pkt_info pi[num];
1888         union seg_info si[num];
1889         union {
1890                 uint8_t t[TLE_VNUM];
1891                 uint32_t raw;
1892         } stu;
1893
1894         ctx = dev->ctx;
1895         ts = tcp_get_tms(ctx->cycles_ms_shift);
1896         st = CTX_TCP_STLB(ctx);
1897         mt = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0);
1898
1899         stu.raw = 0;
1900
1901         /* extract packet info and check the L3/L4 csums */
1902         for (i = 0; i != num; i++) {
1903
1904                 get_pkt_info(pkt[i], &pi[i], &si[i]);
1905
1906                 t = pi[i].tf.type;
1907                 csf = dev->rx.ol_flags[t] &
1908                         (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
1909
1910                 /* check csums in SW */
1911                 if (pi[i].csf == 0 && csf != 0 && check_pkt_csum(pkt[i], csf,
1912                                 pi[i].tf.type, IPPROTO_TCP) != 0)
1913                         pi[i].csf = csf;
1914
1915                 stu.t[t] = mt;
1916         }
1917
1918         if (stu.t[TLE_V4] != 0)
1919                 stbl_lock(st, TLE_V4);
1920         if (stu.t[TLE_V6] != 0)
1921                 stbl_lock(st, TLE_V6);
1922
1923         k = 0;
1924         for (i = 0; i != num; i += j) {
1925
1926                 t = pi[i].tf.type;
1927
1928                 /*basic checks for incoming packet */
1929                 if (t >= TLE_VNUM || pi[i].csf != 0 || dev->dp[t] == NULL) {
1930                         rc[k] = EINVAL;
1931                         rp[k] = pkt[i];
1932                         j = 1;
1933                         k++;
1934                 /* process input SYN packets */
1935                 } else if (pi[i].tf.flags == TCP_FLAG_SYN) {
1936                         j = pkt_info_bulk_syneq(pi + i, num - i);
1937                         n = rx_syn(dev, t, ts, pi + i, si + i, pkt + i,
1938                                 rp + k, rc + k, j);
1939                         k += j - n;
1940                 } else {
1941                         j = pkt_info_bulk_eq(pi + i, num - i);
1942                         n = rx_postsyn(dev, st, t, ts, pi + i, si + i, pkt + i,
1943                                 rp + k, rc + k, j);
1944                         k += j - n;
1945                 }
1946         }
1947
1948         if (stu.t[TLE_V4] != 0)
1949                 stbl_unlock(st, TLE_V4);
1950         if (stu.t[TLE_V6] != 0)
1951                 stbl_unlock(st, TLE_V6);
1952
1953         return num - k;
1954 }
1955
1956 uint16_t
1957 tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
1958         uint32_t num)
1959 {
1960         uint32_t n;
1961         struct tle_tcp_stream *s;
1962
1963         s = TCP_STREAM(ts);
1964         n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num);
1965         if (n == 0)
1966                 return 0;
1967
1968         /*
1969          * if we still have packets to read,
1970          * then rearm stream RX event.
1971          */
1972         if (n == num && rte_ring_count(s->rx.q) != 0) {
1973                 if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
1974                         tle_event_raise(s->rx.ev);
1975                 tcp_stream_release(s);
1976         }
1977
1978         return n;
1979 }
1980
1981 uint16_t
1982 tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
1983 {
1984         uint32_t i, j, k, n;
1985         struct tle_drb *drb[num];
1986         struct tle_tcp_stream *s;
1987
1988         /* extract packets from device TX queue. */
1989
1990         k = num;
1991         n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
1992                 num, drb, &k);
1993
1994         if (n == 0)
1995                 return 0;
1996
1997         /* free empty drbs and notify related streams. */
1998
1999         for (i = 0; i != k; i = j) {
2000                 s = drb[i]->udata;
2001                 for (j = i + 1; j != k && s == drb[j]->udata; j++)
2002                         ;
2003                 stream_drb_free(s, drb + i, j - i);
2004         }
2005
2006         return n;
2007 }
2008
2009 static inline void
2010 stream_fill_pkt_info(const struct tle_tcp_stream *s, union pkt_info *pi)
2011 {
2012         if (s->s.type == TLE_V4)
2013                 pi->addr4 = s->s.ipv4.addr;
2014         else
2015                 pi->addr6 = &s->s.ipv6.addr;
2016
2017         pi->port = s->s.port;
2018         pi->tf.type = s->s.type;
2019 }
2020
2021 static int
2022 stream_fill_addr(struct tle_tcp_stream *s, const struct sockaddr *addr)
2023 {
2024         const struct sockaddr_in *in4;
2025         const struct sockaddr_in6 *in6;
2026         const struct tle_dev_param *prm;
2027         int32_t rc;
2028
2029         rc = 0;
2030         s->s.pmsk.raw = UINT32_MAX;
2031
2032         /* setup L4 src ports and src address fields. */
2033         if (s->s.type == TLE_V4) {
2034                 in4 = (const struct sockaddr_in *)addr;
2035                 if (in4->sin_addr.s_addr == INADDR_ANY || in4->sin_port == 0)
2036                         return -EINVAL;
2037
2038                 s->s.port.src = in4->sin_port;
2039                 s->s.ipv4.addr.src = in4->sin_addr.s_addr;
2040                 s->s.ipv4.mask.src = INADDR_NONE;
2041                 s->s.ipv4.mask.dst = INADDR_NONE;
2042
2043         } else if (s->s.type == TLE_V6) {
2044                 in6 = (const struct sockaddr_in6 *)addr;
2045                 if (memcmp(&in6->sin6_addr, &tle_ipv6_any,
2046                                 sizeof(tle_ipv6_any)) == 0 ||
2047                                 in6->sin6_port == 0)
2048                         return -EINVAL;
2049
2050                 s->s.port.src = in6->sin6_port;
2051                 rte_memcpy(&s->s.ipv6.addr.src, &in6->sin6_addr,
2052                         sizeof(s->s.ipv6.addr.src));
2053                 rte_memcpy(&s->s.ipv6.mask.src, &tle_ipv6_none,
2054                         sizeof(s->s.ipv6.mask.src));
2055                 rte_memcpy(&s->s.ipv6.mask.dst, &tle_ipv6_none,
2056                         sizeof(s->s.ipv6.mask.dst));
2057         }
2058
2059         /* setup the destination device. */
2060         rc = stream_fill_dest(s);
2061         if (rc != 0)
2062                 return rc;
2063
2064         /* setup L4 dst address from device param */
2065         prm = &s->tx.dst.dev->prm;
2066         if (s->s.type == TLE_V4) {
2067                 if (s->s.ipv4.addr.dst == INADDR_ANY)
2068                         s->s.ipv4.addr.dst = prm->local_addr4.s_addr;
2069         } else if (memcmp(&s->s.ipv6.addr.dst, &tle_ipv6_any,
2070                         sizeof(tle_ipv6_any)) == 0)
2071                 memcpy(&s->s.ipv6.addr.dst, &prm->local_addr6,
2072                         sizeof(s->s.ipv6.addr.dst));
2073
2074         return rc;
2075 }
2076
2077 static inline int
2078 tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
2079 {
2080         int32_t rc;
2081         uint32_t tms, seq;
2082         union pkt_info pi;
2083         struct stbl *st;
2084         struct stbl_entry *se;
2085
2086         /* fill stream address */
2087         rc = stream_fill_addr(s, addr);
2088         if (rc != 0)
2089                 return rc;
2090
2091         /* fill pkt info to generate seq.*/
2092         stream_fill_pkt_info(s, &pi);
2093
2094         tms = tcp_get_tms(s->s.ctx->cycles_ms_shift);
2095         s->tcb.so.ts.val = tms;
2096         s->tcb.so.ts.ecr = 0;
2097         s->tcb.so.wscale = TCP_WSCALE_DEFAULT;
2098         s->tcb.so.mss = calc_smss(s->tx.dst.mtu, &s->tx.dst);
2099
2100         /* note that rcv.nxt is 0 here for sync_gen_seq.*/
2101         seq = sync_gen_seq(&pi, s->tcb.rcv.nxt, tms, s->tcb.so.mss,
2102                                 s->s.ctx->prm.hash_alg,
2103                                 &s->s.ctx->prm.secret_key);
2104         s->tcb.snd.iss = seq;
2105         s->tcb.snd.rcvr = seq;
2106         s->tcb.snd.una = seq;
2107         s->tcb.snd.nxt = seq + 1;
2108         s->tcb.snd.rto = TCP_RTO_DEFAULT;
2109         s->tcb.snd.ts = tms;
2110
2111         s->tcb.rcv.mss = s->tcb.so.mss;
2112         s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT;
2113         s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
2114         s->tcb.rcv.ts = 0;
2115
2116         /* add the stream in stream table */
2117         st = CTX_TCP_STLB(s->s.ctx);
2118         se = stbl_add_stream_lock(st, s);
2119         if (se == NULL)
2120                 return -ENOBUFS;
2121         s->ste = se;
2122
2123         /* put stream into the to-send queue */
2124         txs_enqueue(s->s.ctx, s);
2125
2126         return 0;
2127 }
2128
2129 int
2130 tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
2131 {
2132         struct tle_tcp_stream *s;
2133         uint32_t type;
2134         int32_t rc;
2135
2136         if (ts == NULL || addr == NULL)
2137                 return -EINVAL;
2138
2139         s = TCP_STREAM(ts);
2140         type = s->s.type;
2141         if (type >= TLE_VNUM)
2142                 return -EINVAL;
2143
2144         if (tcp_stream_try_acquire(s) > 0) {
2145                 rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
2146                         TCP_ST_SYN_SENT);
2147                 rc = (rc == 0) ? -EDEADLK : 0;
2148         } else
2149                 rc = -EINVAL;
2150
2151         if (rc != 0) {
2152                 tcp_stream_release(s);
2153                 return rc;
2154         }
2155
2156         /* fill stream, prepare and transmit syn pkt */
2157         s->tcb.uop |= TCP_OP_CONNECT;
2158         rc = tx_syn(s, addr);
2159         tcp_stream_release(s);
2160
2161         /* error happened, do a cleanup */
2162         if (rc != 0)
2163                 tle_tcp_stream_close(ts);
2164
2165         return rc;
2166 }
2167
2168 uint16_t
2169 tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2170 {
2171         uint32_t n;
2172         struct tle_tcp_stream *s;
2173
2174         s = TCP_STREAM(ts);
2175         n = _rte_ring_mcs_dequeue_burst(s->rx.q, (void **)pkt, num);
2176         if (n == 0)
2177                 return 0;
2178
2179         /*
2180          * if we still have packets to read,
2181          * then rearm stream RX event.
2182          */
2183         if (n == num && rte_ring_count(s->rx.q) != 0) {
2184                 if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
2185                         tle_event_raise(s->rx.ev);
2186                 tcp_stream_release(s);
2187         }
2188
2189         return n;
2190 }
2191
2192 ssize_t
2193 tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov,
2194         int iovcnt)
2195 {
2196         int32_t i;
2197         uint32_t mn, n, tn;
2198         size_t sz;
2199         struct tle_tcp_stream *s;
2200         struct iovec iv;
2201         struct rxq_objs mo[2];
2202
2203         s = TCP_STREAM(ts);
2204
2205         /* get group of packets */
2206         mn = tcp_rxq_get_objs(s, mo);
2207         if (mn == 0)
2208                 return 0;
2209
2210         sz = 0;
2211         n = 0;
2212         for (i = 0; i != iovcnt; i++) {
2213                 iv = iov[i];
2214                 sz += iv.iov_len;
2215                 n += _mbus_to_iovec(&iv, mo[0].mb + n, mo[0].num - n);
2216                 if (iv.iov_len != 0) {
2217                         sz -= iv.iov_len;
2218                         break;
2219                 }
2220         }
2221
2222         tn = n;
2223
2224         if (i != iovcnt && mn != 1) {
2225                 n = 0;
2226                 do {
2227                         sz += iv.iov_len;
2228                         n += _mbus_to_iovec(&iv, mo[1].mb + n, mo[1].num - n);
2229                         if (iv.iov_len != 0) {
2230                                 sz -= iv.iov_len;
2231                                 break;
2232                         }
2233                         if (i + 1 != iovcnt)
2234                                 iv = iov[i + 1];
2235                 } while (++i != iovcnt);
2236                 tn += n;
2237         }
2238
2239         tcp_rxq_consume(s, tn);
2240
2241         /*
2242          * if we still have packets to read,
2243          * then rearm stream RX event.
2244          */
2245         if (i == iovcnt && rte_ring_count(s->rx.q) != 0) {
2246                 if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
2247                         tle_event_raise(s->rx.ev);
2248                 tcp_stream_release(s);
2249         }
2250
2251         return sz;
2252 }
2253
2254 static inline int32_t
2255 tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
2256         struct rte_mbuf *segs[], uint32_t num)
2257 {
2258         uint32_t i;
2259         int32_t rc;
2260
2261         for (i = 0; i != num; i++) {
2262                 /* Build L2/L3/L4 header */
2263                 rc = tcp_fill_mbuf(segs[i], s, &s->tx.dst, ol_flags, s->s.port,
2264                         0, TCP_FLAG_ACK, 0, 0);
2265                 if (rc != 0) {
2266                         free_mbufs(segs, num);
2267                         break;
2268                 }
2269         }
2270
2271         if (i == num) {
2272                 /* queue packets for further transmission. */
2273                 rc = _rte_ring_enqueue_bulk(s->tx.q, (void **)segs, num);
2274                 if (rc != 0)
2275                         free_mbufs(segs, num);
2276         }
2277
2278         return rc;
2279 }
2280
2281 uint16_t
2282 tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2283 {
2284         uint32_t i, j, k, mss, n, state;
2285         int32_t rc;
2286         uint64_t ol_flags;
2287         struct tle_tcp_stream *s;
2288         struct rte_mbuf *segs[TCP_MAX_PKT_SEG];
2289
2290         s = TCP_STREAM(ts);
2291
2292         /* mark stream as not closable. */
2293         if (tcp_stream_acquire(s) < 0) {
2294                 rte_errno = EAGAIN;
2295                 return 0;
2296         }
2297
2298         state = s->tcb.state;
2299         if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
2300                 rte_errno = ENOTCONN;
2301                 tcp_stream_release(s);
2302                 return 0;
2303         }
2304
2305         mss = s->tcb.snd.mss;
2306         ol_flags = s->tx.dst.ol_flags;
2307
2308         k = 0;
2309         rc = 0;
2310         while (k != num) {
2311                 /* prepare and check for TX */
2312                 for (i = k; i != num; i++) {
2313                         if (pkt[i]->pkt_len > mss ||
2314                                         pkt[i]->nb_segs > TCP_MAX_PKT_SEG)
2315                                 break;
2316                         rc = tcp_fill_mbuf(pkt[i], s, &s->tx.dst, ol_flags,
2317                                 s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2318                         if (rc != 0)
2319                                 break;
2320                 }
2321
2322                 if (i != k) {
2323                         /* queue packets for further transmission. */
2324                         n = _rte_ring_enqueue_burst(s->tx.q,
2325                                 (void **)pkt + k, (i - k));
2326                         k += n;
2327
2328                         /*
2329                          * for unsent, but already modified packets:
2330                          * remove pkt l2/l3 headers, restore ol_flags
2331                          */
2332                         if (i != k) {
2333                                 ol_flags = ~s->tx.dst.ol_flags;
2334                                 for (j = k; j != i; j++) {
2335                                         rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len +
2336                                                 pkt[j]->l3_len +
2337                                                 pkt[j]->l4_len);
2338                                         pkt[j]->ol_flags &= ol_flags;
2339                                 }
2340                                 break;
2341                         }
2342                 }
2343
2344                 if (rc != 0) {
2345                         rte_errno = -rc;
2346                         break;
2347
2348                 /* segment large packet and enqueue for sending */
2349                 } else if (i != num) {
2350                         /* segment the packet. */
2351                         rc = tcp_segmentation(pkt[i], segs, RTE_DIM(segs),
2352                                 &s->tx.dst, mss);
2353                         if (rc < 0) {
2354                                 rte_errno = -rc;
2355                                 break;
2356                         }
2357
2358                         rc = tx_segments(s, ol_flags, segs, rc);
2359                         if (rc == 0) {
2360                                 /* free the large mbuf */
2361                                 rte_pktmbuf_free(pkt[i]);
2362                                 /* set the mbuf as consumed */
2363                                 k++;
2364                         } else
2365                                 /* no space left in tx queue */
2366                                 break;
2367                 }
2368         }
2369
2370         /* notify BE about more data to send */
2371         if (k != 0)
2372                 txs_enqueue(s->s.ctx, s);
2373         /* if possible, re-arm stream write event. */
2374         if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2375                 tle_event_raise(s->tx.ev);
2376
2377         tcp_stream_release(s);
2378
2379         return k;
2380 }
2381
2382 ssize_t
2383 tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp,
2384         const struct iovec *iov, int iovcnt)
2385 {
2386         int32_t i, rc;
2387         uint32_t j, k, n, num, slen, state;
2388         uint64_t ol_flags;
2389         size_t sz, tsz;
2390         struct tle_tcp_stream *s;
2391         struct iovec iv;
2392         struct rte_mbuf *mb[2 * MAX_PKT_BURST];
2393
2394         s = TCP_STREAM(ts);
2395
2396         /* mark stream as not closable. */
2397         if (tcp_stream_acquire(s) < 0) {
2398                 rte_errno = EAGAIN;
2399                 return -1;
2400         }
2401
2402         state = s->tcb.state;
2403         if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
2404                 rte_errno = ENOTCONN;
2405                 tcp_stream_release(s);
2406                 return -1;
2407         }
2408
2409         /* figure out how many mbufs do we need */
2410         tsz = 0;
2411         for (i = 0; i != iovcnt; i++)
2412                 tsz += iov[i].iov_len;
2413
2414         slen = rte_pktmbuf_data_room_size(mp);
2415         slen = RTE_MIN(slen, s->tcb.snd.mss);
2416
2417         num = (tsz + slen - 1) / slen;
2418         n = rte_ring_free_count(s->tx.q);
2419         num = RTE_MIN(num, n);
2420         n = RTE_MIN(num, RTE_DIM(mb));
2421
2422         /* allocate mbufs */
2423         if (rte_pktmbuf_alloc_bulk(mp, mb, n) != 0) {
2424                 rte_errno = ENOMEM;
2425                 tcp_stream_release(s);
2426                 return -1;
2427         }
2428
2429         /* copy data into the mbufs */
2430         k = 0;
2431         sz = 0;
2432         for (i = 0; i != iovcnt; i++) {
2433                 iv = iov[i];
2434                 sz += iv.iov_len;
2435                 k += _iovec_to_mbsegs(&iv, slen, mb + k, n - k);
2436                 if (iv.iov_len != 0) {
2437                         sz -= iv.iov_len;
2438                         break;
2439                 }
2440         }
2441
2442         /* partially filled segment */
2443         k += (k != n && mb[k]->data_len != 0);
2444
2445         /* fill pkt headers */
2446         ol_flags = s->tx.dst.ol_flags;
2447
2448         for (j = 0; j != k; j++) {
2449                 rc = tcp_fill_mbuf(mb[j], s, &s->tx.dst, ol_flags,
2450                         s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2451                 if (rc != 0)
2452                         break;
2453         }
2454
2455         /* if no error encountered, then enqueue pkts for transmission */
2456         if (k == j)
2457                 k = _rte_ring_enqueue_burst(s->tx.q, (void **)mb, j);
2458         else
2459                 k = 0;
2460
2461         if (k != j) {
2462
2463                 /* free pkts that were not enqueued */
2464                 free_mbufs(mb + k, j - k);
2465
2466                 /* our last segment can be partially filled */
2467                 sz += slen - sz % slen;
2468                 sz -= (j - k) * slen;
2469
2470                 /* report an error */
2471                 if (rc != 0) {
2472                         rte_errno = -rc;
2473                         sz = -1;
2474                 }
2475         }
2476
2477         if (k != 0) {
2478
2479                 /* notify BE about more data to send */
2480                 txs_enqueue(s->s.ctx, s);
2481
2482                 /* if possible, re-arm stream write event. */
2483                 if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2484                         tle_event_raise(s->tx.ev);
2485         }
2486
2487         tcp_stream_release(s);
2488         return sz;
2489 }
2490
2491 /* send data and FIN (if needed) */
2492 static inline void
2493 tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
2494 {
2495         /* try to send some data */
2496         tx_nxt_data(s, tms);
2497
2498         /* we also have to send a FIN */
2499         if (state != TCP_ST_ESTABLISHED &&
2500                         state != TCP_ST_CLOSE_WAIT &&
2501                         tcp_txq_nxt_cnt(s) == 0 &&
2502                         s->tcb.snd.fss != s->tcb.snd.nxt) {
2503                 s->tcb.snd.fss = ++s->tcb.snd.nxt;
2504                 send_ack(s, tms, TCP_FLAG_FIN | TCP_FLAG_ACK);
2505         }
2506 }
2507
2508 static inline void
2509 tx_stream(struct tle_tcp_stream *s, uint32_t tms)
2510 {
2511         uint32_t state;
2512
2513         state = s->tcb.state;
2514
2515         if (state == TCP_ST_SYN_SENT) {
2516                 /* send the SYN, start the rto timer */
2517                 send_ack(s, tms, TCP_FLAG_SYN);
2518                 timer_start(s);
2519
2520         } else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2521
2522                 tx_data_fin(s, tms, state);
2523
2524                 /* start RTO timer. */
2525                 if (s->tcb.snd.nxt != s->tcb.snd.una)
2526                         timer_start(s);
2527         }
2528 }
2529
2530 static inline void
2531 rto_stream(struct tle_tcp_stream *s, uint32_t tms)
2532 {
2533         uint32_t state;
2534
2535         state = s->tcb.state;
2536
2537         TCP_LOG(DEBUG, "%s(%p, tms=%u): state=%u, "
2538                 "retx=%u, retm=%u, "
2539                 "rto=%u, snd.ts=%u, tmo=%u, "
2540                 "snd.nxt=%lu, snd.una=%lu, flight_size=%lu, "
2541                 "snd.rcvr=%lu, snd.fastack=%u, "
2542                 "wnd=%u, cwnd=%u, ssthresh=%u, "
2543                 "bytes sent=%lu, pkt remain=%u;\n",
2544                 __func__, s, tms, s->tcb.state,
2545                 s->tcb.snd.nb_retx, s->tcb.snd.nb_retm,
2546                 s->tcb.snd.rto, s->tcb.snd.ts, tms - s->tcb.snd.ts,
2547                 s->tcb.snd.nxt, s->tcb.snd.una, s->tcb.snd.nxt - s->tcb.snd.una,
2548                 s->tcb.snd.rcvr, s->tcb.snd.fastack,
2549                 s->tcb.snd.wnd, s->tcb.snd.cwnd, s->tcb.snd.ssthresh,
2550                 s->tcb.snd.nxt - s->tcb.snd.iss, tcp_txq_nxt_cnt(s));
2551
2552         if (s->tcb.snd.nb_retx < s->tcb.snd.nb_retm) {
2553
2554                 if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2555
2556                         /* update SND.CWD and SND.SSTHRESH */
2557                         rto_cwnd_update(&s->tcb);
2558
2559                         /* RFC 6582 3.2.4 */
2560                         s->tcb.snd.rcvr = s->tcb.snd.nxt;
2561                         s->tcb.snd.fastack = 0;
2562
2563                         /* restart from last acked data */
2564                         tcp_txq_rst_nxt_head(s);
2565                         s->tcb.snd.nxt = s->tcb.snd.una;
2566
2567                         tx_data_fin(s, tms, state);
2568
2569                 } else if (state == TCP_ST_SYN_SENT) {
2570                         /* resending SYN */
2571                         s->tcb.so.ts.val = tms;
2572
2573                         /* According to RFC 6928 2:
2574                          * To reduce the chance for spurious SYN or SYN/ACK
2575                          * retransmission, it is RECOMMENDED that
2576                          * implementations refrain from resetting the initial
2577                          * window to 1 segment, unless there have been more
2578                          * than one SYN or SYN/ACK retransmissions or true loss
2579                          * detection has been made.
2580                          */
2581                         if (s->tcb.snd.nb_retx != 0)
2582                                 s->tcb.snd.cwnd = s->tcb.snd.mss;
2583
2584                         send_ack(s, tms, TCP_FLAG_SYN);
2585
2586                 } else if (state == TCP_ST_TIME_WAIT) {
2587                         stream_term(s);
2588                 }
2589
2590                 /* RFC6298:5.5 back off the timer */
2591                 s->tcb.snd.rto = rto_roundup(2 * s->tcb.snd.rto);
2592                 s->tcb.snd.nb_retx++;
2593                 timer_restart(s);
2594
2595         } else {
2596                 send_rst(s, s->tcb.snd.una);
2597                 stream_term(s);
2598         }
2599 }
2600
2601 int
2602 tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
2603 {
2604         uint32_t i, k, tms;
2605         struct sdr *dr;
2606         struct tle_timer_wheel *tw;
2607         struct tle_stream *p;
2608         struct tle_tcp_stream *s, *rs[num];
2609
2610         /* process streams with RTO exipred */
2611
2612         tw = CTX_TCP_TMWHL(ctx);
2613         tms = tcp_get_tms(ctx->cycles_ms_shift);
2614         tle_timer_expire(tw, tms);
2615
2616         k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs));
2617
2618         for (i = 0; i != k; i++) {
2619
2620                 s = rs[i];
2621                 s->timer.handle = NULL;
2622                 if (tcp_stream_try_acquire(s) > 0)
2623                         rto_stream(s, tms);
2624                 tcp_stream_release(s);
2625         }
2626
2627         /* process streams from to-send queue */
2628
2629         k = txs_dequeue_bulk(ctx, rs, RTE_DIM(rs));
2630
2631         for (i = 0; i != k; i++) {
2632
2633                 s = rs[i];
2634                 rte_atomic32_set(&s->tx.arm, 0);
2635
2636                 if (tcp_stream_try_acquire(s) > 0)
2637                         tx_stream(s, tms);
2638                 else
2639                         txs_enqueue(s->s.ctx, s);
2640                 tcp_stream_release(s);
2641         }
2642
2643         /* collect streams to close from the death row */
2644
2645         dr = CTX_TCP_SDR(ctx);
2646         for (k = 0, p = STAILQ_FIRST(&dr->be);
2647                         k != num && p != NULL;
2648                         k++, p = STAILQ_NEXT(p, link))
2649                 rs[k] = TCP_STREAM(p);
2650
2651         if (p == NULL)
2652                 STAILQ_INIT(&dr->be);
2653         else
2654                 STAILQ_FIRST(&dr->be) = p;
2655
2656         /* cleanup closed streams */
2657         for (i = 0; i != k; i++) {
2658                 s = rs[i];
2659                 tcp_stream_down(s);
2660                 tcp_stream_reset(ctx, s);
2661         }
2662
2663         return 0;
2664 }