0e8a39fb027f59dd5fb2eb87f1913570f4288458
[tldk.git] / lib / libtle_l4p / tcp_rxtx.c
1 /*
2  * Copyright (c) 2016-2017  Intel Corporation.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <rte_errno.h>
17 #include <rte_ethdev.h>
18 #include <rte_ip.h>
19 #include <rte_ip_frag.h>
20 #include <rte_tcp.h>
21
22 #include "tcp_stream.h"
23 #include "tcp_timer.h"
24 #include "stream_table.h"
25 #include "syncookie.h"
26 #include "misc.h"
27 #include "tcp_ctl.h"
28 #include "tcp_rxq.h"
29 #include "tcp_txq.h"
30 #include "tcp_tx_seg.h"
31
32 #define TCP_MAX_PKT_SEG 0x20
33
34 /*
35  * checks if input TCP ports and IP addresses match given stream.
36  * returns zero on success.
37  */
38 static inline int
39 rx_check_stream(const struct tle_tcp_stream *s, const union pkt_info *pi)
40 {
41         int32_t rc;
42
43         if (pi->tf.type == TLE_V4)
44                 rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
45                         (pi->addr4.raw & s->s.ipv4.mask.raw) !=
46                         s->s.ipv4.addr.raw;
47         else
48                 rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
49                         ymm_mask_cmp(&pi->addr6->raw, &s->s.ipv6.addr.raw,
50                         &s->s.ipv6.mask.raw) != 0;
51
52         return rc;
53 }
54
55 static inline struct tle_tcp_stream *
56 rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi,
57         uint32_t type)
58 {
59         struct tle_tcp_stream *s;
60
61         s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst];
62         if (s == NULL || tcp_stream_acquire(s) < 0)
63                 return NULL;
64
65         /* check that we have a proper stream. */
66         if (s->tcb.state != TCP_ST_LISTEN) {
67                 tcp_stream_release(s);
68                 s = NULL;
69         }
70
71         return s;
72 }
73
74 static inline struct tle_tcp_stream *
75 rx_obtain_stream(const struct tle_dev *dev, struct stbl *st,
76         const union pkt_info *pi, uint32_t type)
77 {
78         struct tle_tcp_stream *s;
79
80         s = stbl_find_data(st, pi);
81         if (s == NULL) {
82                 if (pi->tf.flags == TCP_FLAG_ACK)
83                         return rx_obtain_listen_stream(dev, pi, type);
84                 return NULL;
85         }
86
87         if (tcp_stream_acquire(s) < 0)
88                 return NULL;
89         /* check that we have a proper stream. */
90         else if (s->tcb.state == TCP_ST_CLOSED) {
91                 tcp_stream_release(s);
92                 s = NULL;
93         }
94
95         return s;
96 }
97
98 /*
99  * Consider 2 pkt_info *equal* if their:
100  * - types (IPv4/IPv6)
101  * - TCP flags
102  * - checksum flags
103  * - TCP src and dst ports
104  * - IP src and dst addresses
105  * are equal.
106  */
107 static inline int
108 pkt_info_bulk_eq(const union pkt_info pi[], uint32_t num)
109 {
110         uint32_t i;
111
112         i = 1;
113
114         if (pi[0].tf.type == TLE_V4) {
115                 while (i != num && xmm_cmp(&pi[0].raw, &pi[i].raw) == 0)
116                         i++;
117
118         } else if (pi[0].tf.type == TLE_V6) {
119                 while (i != num &&
120                                 pi[0].raw.u64[0] == pi[i].raw.u64[0] &&
121                                 ymm_cmp(&pi[0].addr6->raw,
122                                 &pi[i].addr6->raw) == 0)
123                         i++;
124         }
125
126         return i;
127 }
128
129 static inline int
130 pkt_info_bulk_syneq(const union pkt_info pi[], uint32_t num)
131 {
132         uint32_t i;
133
134         i = 1;
135
136         if (pi[0].tf.type == TLE_V4) {
137                 while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
138                                 pi[0].port.dst == pi[i].port.dst &&
139                                 pi[0].addr4.dst == pi[i].addr4.dst)
140                         i++;
141
142         } else if (pi[0].tf.type == TLE_V6) {
143                 while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
144                                 pi[0].port.dst == pi[i].port.dst &&
145                                 xmm_cmp(&pi[0].addr6->dst,
146                                 &pi[i].addr6->dst) == 0)
147                         i++;
148         }
149
150         return i;
151 }
152
153 static inline void
154 stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[],
155         uint32_t nb_drb)
156 {
157         _rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
158 }
159
160 static inline uint32_t
161 stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
162         uint32_t nb_drb)
163 {
164         return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
165 }
166
167 static inline uint32_t
168 get_ip_pid(struct tle_dev *dev, uint32_t num, uint32_t type, uint32_t st)
169 {
170         uint32_t pid;
171         rte_atomic32_t *pa;
172
173         pa = &dev->tx.packet_id[type];
174
175         if (st == 0) {
176                 pid = rte_atomic32_add_return(pa, num);
177                 return pid - num;
178         } else {
179                 pid = rte_atomic32_read(pa);
180                 rte_atomic32_set(pa, pid + num);
181                 return pid;
182         }
183 }
184
185 static inline void
186 fill_tcph(struct rte_tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
187         uint32_t seq, uint8_t hlen, uint8_t flags)
188 {
189         uint16_t wnd;
190
191         l4h->src_port = port.dst;
192         l4h->dst_port = port.src;
193
194         wnd = (flags & TCP_FLAG_SYN) ?
195                 RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
196                 tcb->rcv.wnd >> tcb->rcv.wscale;
197
198         /* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
199         l4h->sent_seq = rte_cpu_to_be_32(seq);
200         l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
201         l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET;
202         l4h->tcp_flags = flags;
203         l4h->rx_win = rte_cpu_to_be_16(wnd);
204         l4h->cksum = 0;
205         l4h->tcp_urp = 0;
206
207         if (flags & TCP_FLAG_SYN)
208                 fill_syn_opts(l4h + 1, &tcb->so);
209         else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0)
210                 fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
211 }
212
213 static inline int
214 tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s,
215         const struct tle_dest *dst, uint64_t ol_flags,
216         union l4_ports port, uint32_t seq, uint32_t flags,
217         uint32_t pid, uint32_t swcsm)
218 {
219         uint32_t l4, len, plen;
220         struct rte_tcp_hdr *l4h;
221         char *l2h;
222
223         len = dst->l2_len + dst->l3_len;
224         plen = m->pkt_len;
225
226         if (flags & TCP_FLAG_SYN)
227                 l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_MAX;
228         else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0)
229                 l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS;
230         else
231                 l4 = sizeof(*l4h);
232
233         /* adjust mbuf to put L2/L3/L4 headers into it. */
234         l2h = rte_pktmbuf_prepend(m, len + l4);
235         if (l2h == NULL)
236                 return -EINVAL;
237
238         /* copy L2/L3 header */
239         rte_memcpy(l2h, dst->hdr, len);
240
241         /* setup TCP header & options */
242         l4h = (struct rte_tcp_hdr *)(l2h + len);
243         fill_tcph(l4h, &s->tcb, port, seq, l4, flags);
244
245         /* setup mbuf TX offload related fields. */
246         m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0);
247         m->ol_flags |= ol_flags;
248
249         /* update proto specific fields. */
250
251         if (s->s.type == TLE_V4) {
252                 struct rte_ipv4_hdr *l3h;
253                 l3h = (struct rte_ipv4_hdr *)(l2h + dst->l2_len);
254                 l3h->packet_id = rte_cpu_to_be_16(pid);
255                 l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4);
256
257                 if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
258                         l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
259                                 ol_flags);
260                 else if (swcsm != 0)
261                         l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
262
263                 if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0)
264                         l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
265         } else {
266                 struct rte_ipv6_hdr *l3h;
267                 l3h = (struct rte_ipv6_hdr *)(l2h + dst->l2_len);
268                 l3h->payload_len = rte_cpu_to_be_16(plen + l4);
269                 if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
270                         l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
271                 else if (swcsm != 0)
272                         l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
273         }
274
275         return 0;
276 }
277
278 /*
279  * That function supposed to be used only for data packets.
280  * Assumes that L2/L3/L4 headers and mbuf fields already setup properly.
281  *  - updates tcp SEG.SEQ, SEG.ACK, TS.VAL, TS.ECR.
282  *  - if no HW cksum offloads are enabled, calculates TCP checksum.
283  */
284 static inline void
285 tcp_update_mbuf(struct rte_mbuf *m, uint32_t type, const struct tcb *tcb,
286         uint32_t seq, uint32_t pid)
287 {
288         struct rte_tcp_hdr *l4h;
289         uint32_t len;
290
291         len = m->l2_len + m->l3_len;
292         l4h = rte_pktmbuf_mtod_offset(m, struct rte_tcp_hdr *, len);
293
294         l4h->sent_seq = rte_cpu_to_be_32(seq);
295         l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
296
297         if (tcb->so.ts.raw != 0)
298                 fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
299
300         if (type == TLE_V4) {
301                 struct rte_ipv4_hdr *l3h;
302                 l3h = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *,
303                         m->l2_len);
304                 l3h->hdr_checksum = 0;
305                 l3h->packet_id = rte_cpu_to_be_16(pid);
306                 if ((m->ol_flags & PKT_TX_IP_CKSUM) == 0)
307                         l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
308         }
309
310         /* have to calculate TCP checksum in SW */
311         if ((m->ol_flags & PKT_TX_TCP_CKSUM) == 0) {
312
313                 l4h->cksum = 0;
314
315                 if (type == TLE_V4) {
316                         struct rte_ipv4_hdr *l3h;
317                         l3h = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *,
318                                 m->l2_len);
319                         l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
320
321                 } else {
322                         struct rte_ipv6_hdr *l3h;
323                         l3h = rte_pktmbuf_mtod_offset(m, struct rte_ipv6_hdr *,
324                                 m->l2_len);
325                         l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
326                 }
327         }
328 }
329
330 /* Send data packets that need to be ACK-ed by peer */
331 static inline uint32_t
332 tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num)
333 {
334         uint32_t bsz, i, nb, nbm;
335         struct tle_dev *dev;
336         struct tle_drb *drb[num];
337
338         /* calculate how many drbs are needed.*/
339         bsz = s->tx.drb.nb_elem;
340         nbm = (num + bsz - 1) / bsz;
341
342         /* allocate drbs, adjust number of packets. */
343         nb = stream_drb_alloc(s, drb, nbm);
344
345         /* drb ring is empty. */
346         if (nb == 0)
347                 return 0;
348
349         else if (nb != nbm)
350                 num = nb * bsz;
351
352         dev = s->tx.dst.dev;
353
354         /* enqueue pkts for TX. */
355         nbm = nb;
356         i = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)m,
357                 num, drb, &nb);
358
359         /* free unused drbs. */
360         if (nb != 0)
361                 stream_drb_free(s, drb + nbm - nb, nb);
362
363         return i;
364 }
365
366 static inline uint32_t
367 tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
368         uint32_t num)
369 {
370         uint32_t fail, i, k, n, mss, pid, plen, sz, tn, type;
371         struct tle_dev *dev;
372         struct rte_mbuf *mb;
373         struct rte_mbuf *mo[MAX_PKT_BURST + TCP_MAX_PKT_SEG];
374
375         mss = s->tcb.snd.mss;
376         type = s->s.type;
377
378         dev = s->tx.dst.dev;
379         pid = get_ip_pid(dev, num, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
380
381         k = 0;
382         tn = 0;
383         fail = 0;
384         for (i = 0; i != num && sl->len != 0 && fail == 0; i++) {
385
386                 mb = mi[i];
387                 sz = RTE_MIN(sl->len, mss);
388                 plen = PKT_L4_PLEN(mb);
389
390                 /*fast path, no need to use indirect mbufs. */
391                 if (plen <= sz) {
392
393                         /* update pkt TCP header */
394                         tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid + i);
395
396                         /* keep mbuf till ACK is received. */
397                         rte_pktmbuf_refcnt_update(mb, 1);
398                         sl->len -= plen;
399                         sl->seq += plen;
400                         mo[k++] = mb;
401                 /* remaining snd.wnd is less them MSS, send nothing */
402                 } else if (sz < mss)
403                         break;
404                 /* packet indirection needed */
405                 else
406                         RTE_VERIFY(0);
407
408                 if (k >= MAX_PKT_BURST) {
409                         n = tx_data_pkts(s, mo, k);
410                         fail = k - n;
411                         tn += n;
412                         k = 0;
413                 }
414         }
415
416         if (k != 0) {
417                 n = tx_data_pkts(s, mo, k);
418                 fail = k - n;
419                 tn += n;
420         }
421
422         if (fail != 0) {
423                 sz = tcp_mbuf_seq_free(mo + n, fail);
424                 sl->seq -= sz;
425                 sl->len += sz;
426         }
427
428         return tn;
429 }
430
431 /*
432  * gets data from stream send buffer, updates it and
433  * queues it into TX device queue.
434  * Note that this function and is not MT safe.
435  */
436 static inline uint32_t
437 tx_nxt_data(struct tle_tcp_stream *s, uint32_t tms)
438 {
439         uint32_t n, num, tn, wnd;
440         struct rte_mbuf **mi;
441         union seqlen sl;
442
443         tn = 0;
444         wnd = s->tcb.snd.wnd - (uint32_t)(s->tcb.snd.nxt - s->tcb.snd.una);
445         sl.seq = s->tcb.snd.nxt;
446         sl.len = RTE_MIN(wnd, s->tcb.snd.cwnd);
447
448         if (sl.len == 0)
449                 return tn;
450
451         /* update send timestamp */
452         s->tcb.snd.ts = tms;
453
454         do {
455                 /* get group of packets */
456                 mi = tcp_txq_get_nxt_objs(s, &num);
457
458                 /* stream send buffer is empty */
459                 if (num == 0)
460                         break;
461
462                 /* queue data packets for TX */
463                 n = tx_data_bulk(s, &sl, mi, num);
464                 tn += n;
465
466                 /* update consumer head */
467                 tcp_txq_set_nxt_head(s, n);
468         } while (n == num);
469
470         s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt;
471         return tn;
472 }
473
474 static inline void
475 free_una_data(struct tle_tcp_stream *s, uint32_t len)
476 {
477         uint32_t i, num, plen;
478         struct rte_mbuf **mi;
479
480         plen = 0;
481
482         do {
483                 /* get group of packets */
484                 mi = tcp_txq_get_una_objs(s, &num);
485
486                 if (num == 0)
487                         break;
488
489                 /* free acked data */
490                 for (i = 0; i != num && plen != len; i++) {
491                         uint32_t next_pkt_len = PKT_L4_PLEN(mi[i]);
492                         if (plen + next_pkt_len > len) {
493                                 /* keep SND.UNA at the start of the packet */
494                                 len = plen;
495                                 break;
496                         } else {
497                                 plen += next_pkt_len;
498                         }
499                         rte_pktmbuf_free(mi[i]);
500                 }
501
502                 /* update consumer tail */
503                 tcp_txq_set_una_tail(s, i);
504         } while (plen < len);
505
506         s->tcb.snd.una += len;
507
508         /*
509          * that could happen in case of retransmit,
510          * adjust SND.NXT with SND.UNA.
511          */
512         if (s->tcb.snd.una > s->tcb.snd.nxt) {
513                 tcp_txq_rst_nxt_head(s);
514                 s->tcb.snd.nxt = s->tcb.snd.una;
515         }
516 }
517
518 static inline uint16_t
519 calc_smss(uint16_t mss, const struct tle_dest *dst)
520 {
521         uint16_t n;
522
523         n = dst->mtu - dst->l2_len - dst->l3_len - TCP_TX_HDR_DACK;
524         mss = RTE_MIN(n, mss);
525         return mss;
526 }
527
528 /*
529  * RFC 6928 2
530  * min (10*MSS, max (2*MSS, 14600))
531  *
532  * or using user provided initial congestion window (icw)
533  * min (10*MSS, max (2*MSS, icw))
534  */
535 static inline uint32_t
536 initial_cwnd(uint32_t smss, uint32_t icw)
537 {
538         return RTE_MIN(10 * smss, RTE_MAX(2 * smss, icw));
539 }
540
541 /*
542  * queue standalone packet to he particular output device
543  * It assumes that:
544  * - L2/L3/L4 headers should be already set.
545  * - packet fits into one segment.
546  */
547 static inline int
548 send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m)
549 {
550         uint32_t n, nb;
551         struct tle_drb *drb;
552
553         if (stream_drb_alloc(s, &drb, 1) == 0)
554                 return -ENOBUFS;
555
556         /* enqueue pkt for TX. */
557         nb = 1;
558         n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1,
559                 &drb, &nb);
560
561         /* free unused drbs. */
562         if (nb != 0)
563                 stream_drb_free(s, &drb, 1);
564
565         return (n == 1) ? 0 : -ENOBUFS;
566 }
567
568 static inline int
569 send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
570         uint32_t flags)
571 {
572         const struct tle_dest *dst;
573         uint32_t pid, type;
574         int32_t rc;
575
576         dst = &s->tx.dst;
577         type = s->s.type;
578         pid = get_ip_pid(dst->dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
579
580         rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1);
581         if (rc == 0)
582                 rc = send_pkt(s, dst->dev, m);
583
584         return rc;
585 }
586
587 static inline int
588 send_rst(struct tle_tcp_stream *s, uint32_t seq)
589 {
590         struct rte_mbuf *m;
591         int32_t rc;
592
593         m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
594         if (m == NULL)
595                 return -ENOMEM;
596
597         rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST);
598         if (rc != 0)
599                 rte_pktmbuf_free(m);
600
601         return rc;
602 }
603
604 static inline int
605 send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags)
606 {
607         struct rte_mbuf *m;
608         uint32_t seq;
609         int32_t rc;
610
611         m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
612         if (m == NULL)
613                 return -ENOMEM;
614
615         seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0);
616         s->tcb.snd.ts = tms;
617
618         rc = send_ctrl_pkt(s, m, seq, flags);
619         if (rc != 0) {
620                 rte_pktmbuf_free(m);
621                 return rc;
622         }
623
624         s->tcb.snd.ack = s->tcb.rcv.nxt;
625         return 0;
626 }
627
628
629 static int
630 sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
631         const union seg_info *si, uint32_t ts, struct rte_mbuf *m)
632 {
633         uint16_t len;
634         int32_t rc;
635         uint32_t pid, seq, type;
636         struct tle_dev *dev;
637         const void *da;
638         struct tle_dest dst;
639         const struct rte_tcp_hdr *th;
640
641         type = s->s.type;
642
643         /* get destination information. */
644         if (type == TLE_V4)
645                 da = &pi->addr4.src;
646         else
647                 da = &pi->addr6->src;
648
649         rc = stream_get_dest(&s->s, da, &dst);
650         if (rc < 0)
651                 return rc;
652
653         th = rte_pktmbuf_mtod_offset(m, const struct rte_tcp_hdr *,
654                 m->l2_len + m->l3_len);
655         get_syn_opts(&s->tcb.so, (uintptr_t)(th + 1), m->l4_len - sizeof(*th));
656
657         /* reset wscale option if timestamp is not present */
658         if (s->tcb.so.ts.val == 0)
659                 s->tcb.so.wscale = 0;
660
661         s->tcb.rcv.nxt = si->seq + 1;
662         seq = sync_gen_seq(pi, s->tcb.rcv.nxt, ts, s->tcb.so.mss,
663                                 s->s.ctx->prm.hash_alg,
664                                 &s->s.ctx->prm.secret_key);
665         s->tcb.so.ts.ecr = s->tcb.so.ts.val;
666         s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale);
667         s->tcb.so.wscale = (s->tcb.so.wscale == TCP_WSCALE_NONE) ?
668                 TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
669         s->tcb.so.mss = calc_smss(dst.mtu, &dst);
670
671         /* reset mbuf's data contents. */
672         len = m->l2_len + m->l3_len + m->l4_len;
673         m->tx_offload = 0;
674         if (rte_pktmbuf_adj(m, len) == NULL)
675                 return -EINVAL;
676
677         dev = dst.dev;
678         pid = get_ip_pid(dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
679
680         rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq,
681                 TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
682         if (rc == 0)
683                 rc = send_pkt(s, dev, m);
684
685         return rc;
686 }
687
688 /*
689  * RFC 793:
690  * There are four cases for the acceptability test for an incoming segment:
691  * Segment Receive  Test
692  * Length  Window
693  * ------- -------  -------------------------------------------
694  *    0       0     SEG.SEQ = RCV.NXT
695  *    0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
696  *   >0       0     not acceptable
697  *   >0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
698  *                  or RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
699  */
700 static inline int
701 check_seqn(const struct tcb *tcb, uint32_t seqn, uint32_t len)
702 {
703         uint32_t n;
704
705         n = seqn + len;
706         if (seqn - tcb->rcv.nxt >= tcb->rcv.wnd &&
707                         n - tcb->rcv.nxt > tcb->rcv.wnd)
708                 return -ERANGE;
709
710         return 0;
711 }
712
713 static inline union tle_tcp_tsopt
714 rx_tms_opt(const struct tcb *tcb, const struct rte_mbuf *mb)
715 {
716         union tle_tcp_tsopt ts;
717         uintptr_t opt;
718         const struct rte_tcp_hdr *th;
719
720         if (tcb->so.ts.val != 0) {
721                 opt = rte_pktmbuf_mtod_offset(mb, uintptr_t,
722                         mb->l2_len + mb->l3_len + sizeof(*th));
723                 ts = get_tms_opts(opt, mb->l4_len - sizeof(*th));
724         } else
725                 ts.raw = 0;
726
727         return ts;
728 }
729
730 /*
731  * PAWS and sequence check.
732  * RFC 1323 4.2.1
733  */
734 static inline int
735 rx_check_seq(struct tcb *tcb, uint32_t seq, uint32_t len,
736         const union tle_tcp_tsopt ts)
737 {
738         int32_t rc;
739
740         /* RFC 1323 4.2.1 R2 */
741         rc = check_seqn(tcb, seq, len);
742         if (rc < 0)
743                 return rc;
744
745         if (ts.raw != 0) {
746
747                 /* RFC 1323 4.2.1 R1 */
748                 if (tcp_seq_lt(ts.val, tcb->rcv.ts))
749                         return -ERANGE;
750
751                 /* RFC 1323 4.2.1 R3 */
752                 if (tcp_seq_leq(seq, tcb->snd.ack) &&
753                                 tcp_seq_lt(tcb->snd.ack, seq + len))
754                         tcb->rcv.ts = ts.val;
755         }
756
757         return rc;
758 }
759
760 static inline int
761 rx_check_ack(const struct tcb *tcb, uint32_t ack)
762 {
763         uint32_t max;
764
765         max = (uint32_t)RTE_MAX(tcb->snd.nxt, tcb->snd.rcvr);
766
767         if (tcp_seq_leq(tcb->snd.una, ack) && tcp_seq_leq(ack, max))
768                 return 0;
769
770         return -ERANGE;
771 }
772
773 static inline int
774 rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
775         const union tle_tcp_tsopt ts)
776 {
777         int32_t rc;
778
779         rc = rx_check_seq(tcb, seq, len, ts);
780         rc |= rx_check_ack(tcb, ack);
781         return rc;
782 }
783
784 static inline int
785 restore_syn_opt(union seg_info *si, union tle_tcp_tsopt *to,
786         const union pkt_info *pi, uint32_t ts, const struct rte_mbuf *mb,
787         uint32_t hash_alg, rte_xmm_t *secret_key)
788 {
789         int32_t rc;
790         uint32_t len;
791         const struct rte_tcp_hdr *th;
792
793         /* check that ACK, etc fields are what we expected. */
794         rc = sync_check_ack(pi, si->seq, si->ack - 1, ts,
795                                 hash_alg,
796                                 secret_key);
797         if (rc < 0)
798                 return rc;
799
800         si->mss = rc;
801
802         th = rte_pktmbuf_mtod_offset(mb, const struct rte_tcp_hdr *,
803                 mb->l2_len + mb->l3_len);
804         len = mb->l4_len - sizeof(*th);
805         to[0] = get_tms_opts((uintptr_t)(th + 1), len);
806         return 0;
807 }
808
809 static inline void
810 stream_term(struct tle_tcp_stream *s)
811 {
812         struct sdr *dr;
813
814         s->tcb.state = TCP_ST_CLOSED;
815         rte_smp_wmb();
816
817         timer_stop(s);
818
819         /* close() was already invoked, schedule final cleanup */
820         if ((s->tcb.uop & TCP_OP_CLOSE) != 0) {
821
822                 dr = CTX_TCP_SDR(s->s.ctx);
823                 STAILQ_INSERT_TAIL(&dr->be, &s->s, link);
824
825         /* notify user that stream need to be closed */
826         } else if (s->err.ev != NULL)
827                 tle_event_raise(s->err.ev);
828         else if (s->err.cb.func != NULL)
829                 s->err.cb.func(s->err.cb.data, &s->s);
830 }
831
832 static inline int
833 stream_fill_dest(struct tle_tcp_stream *s)
834 {
835         int32_t rc;
836         uint32_t type;
837         const void *da;
838
839         type = s->s.type;
840         if (type == TLE_V4)
841                 da = &s->s.ipv4.addr.src;
842         else
843                 da = &s->s.ipv6.addr.src;
844
845         rc = stream_get_dest(&s->s, da, &s->tx.dst);
846         return (rc < 0) ? rc : 0;
847 }
848
849 /*
850  * estimate the rto
851  * for now rtt is calculated based on the tcp TMS option,
852  * later add real-time one
853  */
854 static inline void
855 estimate_stream_rto(struct tle_tcp_stream *s, uint32_t tms)
856 {
857         uint32_t rtt;
858
859         if (s->tcb.so.ts.ecr) {
860                 rtt = tms - s->tcb.so.ts.ecr;
861                 rto_estimate(&s->tcb, rtt);
862         } else
863                 s->tcb.snd.rto = TCP_RTO_DEFAULT;
864 }
865
866 /*
867  * helper function, prepares a new accept stream.
868  */
869 static inline int
870 accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
871         struct tle_tcp_stream *cs, const union tle_tcp_tsopt *to,
872         uint32_t tms, const union pkt_info *pi, const union seg_info *si)
873 {
874         int32_t rc;
875
876         /* some TX still pending for that stream. */
877         if (TCP_STREAM_TX_PENDING(cs))
878                 return -EAGAIN;
879
880         /* setup L4 ports and L3 addresses fields. */
881         cs->s.port.raw = pi->port.raw;
882         cs->s.pmsk.raw = UINT32_MAX;
883
884         if (pi->tf.type == TLE_V4) {
885                 cs->s.ipv4.addr = pi->addr4;
886                 cs->s.ipv4.mask.src = INADDR_NONE;
887                 cs->s.ipv4.mask.dst = INADDR_NONE;
888         } else if (pi->tf.type == TLE_V6) {
889                 cs->s.ipv6.addr = *pi->addr6;
890                 rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
891                         sizeof(cs->s.ipv6.mask.src));
892                 rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
893                         sizeof(cs->s.ipv6.mask.dst));
894         }
895
896         /* setup TCB */
897         sync_fill_tcb(&cs->tcb, si, to);
898         cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
899
900         estimate_stream_rto(cs, tms);
901
902         /* copy streams type & flags. */
903         cs->s.type = ps->s.type;
904         cs->flags = ps->flags;
905
906         /* retrive and cache destination information. */
907         rc = stream_fill_dest(cs);
908         if (rc != 0)
909                 return rc;
910
911         /* update snd.mss with SMSS value */
912         cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
913
914         /* setup congestion variables */
915         cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss, ps->tcb.snd.cwnd);
916         cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
917         cs->tcb.snd.rto_tw = ps->tcb.snd.rto_tw;
918
919         cs->tcb.state = TCP_ST_ESTABLISHED;
920
921         /* add stream to the table */
922         cs->ste = stbl_add_stream(st, pi, cs);
923         if (cs->ste == NULL)
924                 return -ENOBUFS;
925
926         cs->tcb.uop |= TCP_OP_ACCEPT;
927         tcp_stream_up(cs);
928         return 0;
929 }
930
931
932 /*
933  * ACK for new connection request arrived.
934  * Check that the packet meets all conditions and try to open a new stream.
935  * returns:
936  * < 0  - invalid packet
937  * == 0 - packet is valid and new stream was opened for it.
938  * > 0  - packet is valid, but failed to open new stream.
939  */
940 static inline int
941 rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
942         const union pkt_info *pi, union seg_info *si,
943         uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
944 {
945         int32_t rc;
946         struct tle_ctx *ctx;
947         struct tle_stream *ts;
948         struct tle_tcp_stream *cs;
949         union tle_tcp_tsopt to;
950
951         *csp = NULL;
952
953         if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
954                 return -EINVAL;
955
956         ctx = s->s.ctx;
957         rc = restore_syn_opt(si, &to, pi, tms, mb, ctx->prm.hash_alg,
958                                 &ctx->prm.secret_key);
959         if (rc < 0)
960                 return rc;
961
962         /* allocate new stream */
963         cs = tcp_stream_get(ctx, 0);
964         if (cs == NULL)
965                 return ENFILE;
966
967         /* prepare stream to handle new connection */
968         if (accept_prep_stream(s, st, cs, &to, tms, pi, si) == 0) {
969
970                 /* put new stream in the accept queue */
971                 ts = &cs->s;
972                 if (_rte_ring_enqueue_burst(s->rx.q,
973                                 (void * const *)&ts, 1) == 1) {
974                         *csp = cs;
975                         return 0;
976                 }
977
978                 /* cleanup on failure */
979                 tcp_stream_down(cs);
980                 stbl_del_stream(st, cs->ste, cs, 0);
981                 cs->ste = NULL;
982         }
983
984         tcp_stream_reset(ctx, cs);
985         return ENOBUFS;
986 }
987
988 static inline int
989 data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf **mb, uint32_t hlen,
990         uint32_t *seqn, uint32_t *plen)
991 {
992         uint32_t len, n, seq;
993
994         seq = *seqn;
995         len = *plen;
996
997         rte_pktmbuf_adj(*mb, hlen);
998         if (len == 0)
999                 return -ENODATA;
1000         /* cut off the start of the packet */
1001         else if (tcp_seq_lt(seq, tcb->rcv.nxt)) {
1002                 n = tcb->rcv.nxt - seq;
1003                 if (n >= len)
1004                         return -ENODATA;
1005
1006                 *mb = _rte_pktmbuf_adj(*mb, n);
1007                 *seqn = seq + n;
1008                 *plen = len - n;
1009         }
1010
1011         return 0;
1012 }
1013
1014 static inline uint32_t
1015 rx_ackdata(struct tle_tcp_stream *s, uint32_t ack)
1016 {
1017         uint32_t k, n;
1018
1019         n = ack - (uint32_t)s->tcb.snd.una;
1020
1021         /* some more data was acked. */
1022         if (n != 0) {
1023
1024                 /* advance SND.UNA and free related packets. */
1025                 k = rte_ring_free_count(s->tx.q);
1026                 free_una_data(s, n);
1027
1028                 /* mark the stream as available for writing */
1029                 if (rte_ring_free_count(s->tx.q) != 0) {
1030                         if (s->tx.ev != NULL)
1031                                 tle_event_raise(s->tx.ev);
1032                         else if (k == 0 && s->tx.cb.func != NULL)
1033                                 s->tx.cb.func(s->tx.cb.data, &s->s);
1034                 }
1035         }
1036
1037         return n;
1038 }
1039
1040 static void
1041 stream_timewait(struct tle_tcp_stream *s, uint32_t rto)
1042 {
1043         if (rto != 0) {
1044                 s->tcb.state = TCP_ST_TIME_WAIT;
1045                 s->tcb.snd.rto = rto;
1046                 timer_reset(s);
1047         } else
1048                 stream_term(s);
1049 }
1050
1051 static void
1052 rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
1053 {
1054         uint32_t state;
1055         int32_t ackfin;
1056
1057         s->tcb.rcv.nxt += 1;
1058
1059         ackfin = (s->tcb.snd.una == s->tcb.snd.fss);
1060         state = s->tcb.state;
1061
1062         if (state == TCP_ST_ESTABLISHED) {
1063                 s->tcb.state = TCP_ST_CLOSE_WAIT;
1064                 /* raise err.ev & err.cb */
1065                 if (s->err.ev != NULL)
1066                         tle_event_raise(s->err.ev);
1067                 else if (s->err.cb.func != NULL)
1068                         s->err.cb.func(s->err.cb.data, &s->s);
1069         } else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) {
1070                 rsp->flags |= TCP_FLAG_ACK;
1071                 if (ackfin != 0)
1072                         stream_timewait(s, s->tcb.snd.rto_tw);
1073                 else
1074                         s->tcb.state = TCP_ST_CLOSING;
1075         } else if (state == TCP_ST_FIN_WAIT_2) {
1076                 rsp->flags |= TCP_FLAG_ACK;
1077                 stream_timewait(s, s->tcb.snd.rto_tw);
1078         } else if (state == TCP_ST_LAST_ACK && ackfin != 0) {
1079                 stream_term(s);
1080         }
1081 }
1082
1083 /*
1084  * FIN process for ESTABLISHED state
1085  * returns:
1086  * 0 < - error occurred
1087  * 0 - FIN was processed OK, and mbuf can be free/reused.
1088  * 0 > - FIN was processed OK and mbuf can't be free/reused.
1089  */
1090 static inline int
1091 rx_fin(struct tle_tcp_stream *s, uint32_t state,
1092         const union seg_info *si, struct rte_mbuf *mb,
1093         struct resp_info *rsp)
1094 {
1095         uint32_t hlen, plen, seq;
1096         int32_t ret;
1097         union tle_tcp_tsopt ts;
1098
1099         hlen = PKT_L234_HLEN(mb);
1100         plen = mb->pkt_len - hlen;
1101         seq = si->seq;
1102
1103         ts = rx_tms_opt(&s->tcb, mb);
1104         ret = rx_check_seqack(&s->tcb, seq, si->ack, plen, ts);
1105         if (ret != 0)
1106                 return ret;
1107
1108         if (state < TCP_ST_ESTABLISHED)
1109                 return -EINVAL;
1110
1111         if (plen != 0) {
1112
1113                 ret = data_pkt_adjust(&s->tcb, &mb, hlen, &seq, &plen);
1114                 if (ret != 0)
1115                         return ret;
1116                 if (rx_data_enqueue(s, seq, plen, &mb, 1) != 1)
1117                         return -ENOBUFS;
1118         }
1119
1120         /*
1121          * fast-path: all data & FIN was already sent out
1122          * and now is acknowledged.
1123          */
1124         if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1125                         si->ack == (uint32_t)s->tcb.snd.nxt) {
1126                 s->tcb.snd.una = s->tcb.snd.fss;
1127                 empty_tq(s);
1128         /* conventional ACK processiing */
1129         } else
1130                 rx_ackdata(s, si->ack);
1131
1132         /* some fragments still missing */
1133         if (seq + plen != s->tcb.rcv.nxt) {
1134                 s->tcb.rcv.frs.seq = seq + plen;
1135                 s->tcb.rcv.frs.on = 1;
1136         } else
1137                 rx_fin_state(s, rsp);
1138
1139         return plen;
1140 }
1141
1142 static inline int
1143 rx_rst(struct tle_tcp_stream *s, uint32_t state, uint32_t flags,
1144         const union seg_info *si)
1145 {
1146         int32_t rc;
1147
1148         /*
1149          * RFC 793: In all states except SYN-SENT, all reset (RST) segments
1150          * are validated by checking their SEQ-fields.
1151          * A reset is valid if its sequence number is in the window.
1152          * In the SYN-SENT state (a RST received in response to an initial SYN),
1153          * the RST is acceptable if the ACK field acknowledges the SYN.
1154          */
1155         if (state == TCP_ST_SYN_SENT) {
1156                 rc = ((flags & TCP_FLAG_ACK) == 0 ||
1157                                 si->ack != s->tcb.snd.nxt) ?
1158                         -ERANGE : 0;
1159         }
1160
1161         else
1162                 rc = check_seqn(&s->tcb, si->seq, 0);
1163
1164         if (rc == 0)
1165                 stream_term(s);
1166
1167         return rc;
1168 }
1169
1170 /*
1171  *  check do we have FIN  that was received out-of-order.
1172  *  if yes, try to process it now.
1173  */
1174 static inline void
1175 rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp)
1176 {
1177         if (s->tcb.rcv.frs.on != 0 && s->tcb.rcv.nxt == s->tcb.rcv.frs.seq)
1178                 rx_fin_state(s, rsp);
1179 }
1180
1181 static inline void
1182 dack_info_init(struct dack_info *tack, const struct tcb *tcb)
1183 {
1184         static const struct dack_info zero_dack;
1185
1186         tack[0] = zero_dack;
1187         tack->ack = tcb->snd.una;
1188         tack->segs.dup = tcb->rcv.dupack;
1189         tack->wu.raw = tcb->snd.wu.raw;
1190         tack->wnd = tcb->snd.wnd >> tcb->snd.wscale;
1191 }
1192
1193 static inline void
1194 ack_window_update(struct tcb *tcb, const struct dack_info *tack)
1195 {
1196         tcb->snd.wu.raw = tack->wu.raw;
1197         tcb->snd.wnd = tack->wnd << tcb->snd.wscale;
1198 }
1199
1200 static inline void
1201 ack_cwnd_update(struct tcb *tcb, uint32_t acked, const struct dack_info *tack)
1202 {
1203         uint32_t n;
1204
1205         n = tack->segs.ack * tcb->snd.mss;
1206
1207         /* slow start phase, RFC 5681 3.1 (2)  */
1208         if (tcb->snd.cwnd < tcb->snd.ssthresh)
1209                 tcb->snd.cwnd += RTE_MIN(acked, n);
1210         /* congestion avoidance phase, RFC 5681 3.1 (3) */
1211         else
1212                 tcb->snd.cwnd += RTE_MAX(1U, n * tcb->snd.mss / tcb->snd.cwnd);
1213 }
1214
1215 static inline void
1216 rto_ssthresh_update(struct tcb *tcb)
1217 {
1218         uint32_t k, n;
1219
1220         /* RFC 5681 3.1 (4)  */
1221         n = (tcb->snd.nxt - tcb->snd.una) / 2;
1222         k = 2 * tcb->snd.mss;
1223         tcb->snd.ssthresh = RTE_MAX(n, k);
1224 }
1225
1226 static inline void
1227 rto_cwnd_update(struct tcb *tcb)
1228 {
1229
1230         if (tcb->snd.nb_retx == 0)
1231                 rto_ssthresh_update(tcb);
1232
1233         /*
1234          * RFC 5681 3.1: upon a timeout cwnd MUST be set to
1235          * no more than 1 full-sized segment.
1236          */
1237         tcb->snd.cwnd = tcb->snd.mss;
1238 }
1239
1240 static inline void
1241 ack_info_update(struct dack_info *tack, const union seg_info *si,
1242         int32_t badseq, uint32_t dlen, const union tle_tcp_tsopt ts)
1243 {
1244         if (badseq != 0) {
1245                 tack->segs.badseq++;
1246                 return;
1247         }
1248
1249         /* segnt with incoming data */
1250         tack->segs.data += (dlen != 0);
1251
1252         /* segment with newly acked data */
1253         if (tcp_seq_lt(tack->ack, si->ack)) {
1254                 tack->segs.dup = 0;
1255                 tack->segs.ack++;
1256                 tack->ack = si->ack;
1257                 tack->ts = ts;
1258
1259         /*
1260          * RFC 5681: An acknowledgment is considered a "duplicate" when:
1261          * (a) the receiver of the ACK has outstanding data
1262          * (b) the incoming acknowledgment carries no data
1263          * (c) the SYN and FIN bits are both off
1264          * (d) the acknowledgment number is equal to the TCP.UNA
1265          * (e) the advertised window in the incoming acknowledgment equals the
1266          * advertised window in the last incoming acknowledgment.
1267          *
1268          * Here will have only to check only for (b),(d),(e).
1269          * (a) will be checked later for the whole bulk of packets,
1270          * (c) should never happen here.
1271          */
1272         } else if (dlen == 0 && si->wnd == tack->wnd && ++tack->segs.dup == 3) {
1273                 tack->dup3.seg = tack->segs.ack + 1;
1274                 tack->dup3.ack = tack->ack;
1275         }
1276
1277         /*
1278          * RFC 793:
1279          * If SND.UNA < SEG.ACK =< SND.NXT, the send window should be
1280          * updated.  If (SND.WL1 < SEG.SEQ or (SND.WL1 = SEG.SEQ and
1281          * SND.WL2 =< SEG.ACK)), set SND.WND <- SEG.WND, set
1282          * SND.WL1 <- SEG.SEQ, and set SND.WL2 <- SEG.ACK.
1283          */
1284         if (tcp_seq_lt(tack->wu.wl1, si->seq) ||
1285                         (si->seq == tack->wu.wl1 &&
1286                         tcp_seq_leq(tack->wu.wl2, si->ack))) {
1287
1288                 tack->wu.wl1 = si->seq;
1289                 tack->wu.wl2 = si->ack;
1290                 tack->wnd = si->wnd;
1291         }
1292 }
1293
1294 static inline uint32_t
1295 rx_data_ack(struct tle_tcp_stream *s, struct dack_info *tack,
1296         const union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[],
1297         int32_t rc[], uint32_t num)
1298 {
1299         uint32_t i, j, k, n, t;
1300         uint32_t hlen, plen, seq, tlen;
1301         int32_t ret;
1302         union tle_tcp_tsopt ts;
1303
1304         k = 0;
1305         for (i = 0; i != num; i = j) {
1306
1307                 hlen = PKT_L234_HLEN(mb[i]);
1308                 plen = mb[i]->pkt_len - hlen;
1309                 seq = si[i].seq;
1310
1311                 ts = rx_tms_opt(&s->tcb, mb[i]);
1312                 ret = rx_check_seqack(&s->tcb, seq, si[i].ack, plen, ts);
1313
1314                 /* account segment received */
1315                 ack_info_update(tack, &si[i], ret != 0, plen, ts);
1316
1317                 if (ret == 0) {
1318                         /* skip duplicate data, if any */
1319                         ret = data_pkt_adjust(&s->tcb, &mb[i], hlen,
1320                                 &seq, &plen);
1321                 }
1322
1323                 j = i + 1;
1324                 if (ret != 0) {
1325                         rp[k] = mb[i];
1326                         rc[k] = -ret;
1327                         k++;
1328                         continue;
1329                 }
1330
1331                 /* group sequential packets together. */
1332                 for (tlen = plen; j != num; tlen += plen, j++) {
1333
1334                         hlen = PKT_L234_HLEN(mb[j]);
1335                         plen = mb[j]->pkt_len - hlen;
1336
1337                         /* not consecutive packet */
1338                         if (plen == 0 || seq + tlen != si[j].seq)
1339                                 break;
1340
1341                         /* check SEQ/ACK */
1342                         ts = rx_tms_opt(&s->tcb, mb[j]);
1343                         ret = rx_check_seqack(&s->tcb, si[j].seq, si[j].ack,
1344                                 plen, ts);
1345
1346                         if (ret != 0)
1347                                 break;
1348
1349                         /* account for segment received */
1350                         ack_info_update(tack, &si[j], ret != 0, plen, ts);
1351
1352                         rte_pktmbuf_adj(mb[j], hlen);
1353                 }
1354
1355                 n = j - i;
1356
1357                 /* account for OFO data */
1358                 if (seq != s->tcb.rcv.nxt)
1359                         tack->segs.ofo += n;
1360
1361                 /* enqueue packets */
1362                 t = rx_data_enqueue(s, seq, tlen, mb + i, n);
1363
1364                 /* if we are out of space in stream recv buffer. */
1365                 for (; t != n; t++) {
1366                         rp[k] = mb[i + t];
1367                         rc[k] = -ENOBUFS;
1368                         k++;
1369                 }
1370         }
1371
1372         return num - k;
1373 }
1374
1375 static inline void
1376 start_fast_retransmit(struct tle_tcp_stream *s)
1377 {
1378         struct tcb *tcb;
1379
1380         tcb = &s->tcb;
1381
1382         /* RFC 6582 3.2.2 */
1383         tcb->snd.rcvr = tcb->snd.nxt;
1384         tcb->snd.fastack = 1;
1385
1386         /* RFC 5681 3.2.2 */
1387         rto_ssthresh_update(tcb);
1388
1389         /* RFC 5681 3.2.3 */
1390         tcp_txq_rst_nxt_head(s);
1391         tcb->snd.nxt = tcb->snd.una;
1392         tcb->snd.cwnd = tcb->snd.ssthresh + 3 * tcb->snd.mss;
1393 }
1394
1395 static inline void
1396 stop_fast_retransmit(struct tle_tcp_stream *s)
1397 {
1398         struct tcb *tcb;
1399         uint32_t n;
1400
1401         tcb = &s->tcb;
1402         n = tcb->snd.nxt - tcb->snd.una;
1403         tcb->snd.cwnd = RTE_MIN(tcb->snd.ssthresh,
1404                 RTE_MAX(n, tcb->snd.mss) + tcb->snd.mss);
1405         tcb->snd.fastack = 0;
1406 }
1407
1408 static inline int
1409 in_fast_retransmit(struct tle_tcp_stream *s, uint32_t ack_len, uint32_t ack_num,
1410         uint32_t dup_num)
1411 {
1412         uint32_t n;
1413         struct tcb *tcb;
1414
1415         tcb = &s->tcb;
1416
1417         /* RFC 5682 3.2.3 partial ACK */
1418         if (ack_len != 0) {
1419
1420                 n = ack_num * tcb->snd.mss;
1421                 if (ack_len >= n)
1422                         tcb->snd.cwnd -= ack_len - n;
1423                 else
1424                         tcb->snd.cwnd -= ack_len % tcb->snd.mss;
1425
1426                 /*
1427                  * For the first partial ACK that arrives
1428                  * during fast recovery, also reset the
1429                  * retransmit timer.
1430                  */
1431                 if (tcb->snd.fastack == 1)
1432                         timer_reset(s);
1433
1434                 tcb->snd.fastack += ack_num;
1435                 return 1;
1436
1437         /* RFC 5681 3.2.4 */
1438         } else if (dup_num > 3) {
1439                 s->tcb.snd.cwnd += (dup_num - 3) * tcb->snd.mss;
1440                 return 1;
1441         }
1442
1443         return 0;
1444 }
1445
1446 static inline int
1447 process_ack(struct tle_tcp_stream *s, uint32_t acked,
1448         const struct dack_info *tack)
1449 {
1450         int32_t send;
1451
1452         send = 0;
1453
1454         /* normal mode */
1455         if (s->tcb.snd.fastack == 0) {
1456
1457                 send = 1;
1458
1459                 /* RFC 6582 3.2.2 switch to fast retransmit mode */
1460                 if (tack->dup3.seg != 0 && s->tcb.snd.una != s->tcb.snd.nxt &&
1461                                 s->tcb.snd.una >= s->tcb.snd.rcvr) {
1462
1463                         start_fast_retransmit(s);
1464                         in_fast_retransmit(s,
1465                                 tack->ack - tack->dup3.ack,
1466                                 tack->segs.ack - tack->dup3.seg - 1,
1467                                 tack->segs.dup);
1468
1469                 /* remain in normal mode */
1470                 } else if (acked != 0) {
1471                         ack_cwnd_update(&s->tcb, acked, tack);
1472                         timer_stop(s);
1473                 }
1474
1475         /* fast retransmit mode */
1476         } else {
1477
1478                 /* remain in fast retransmit mode */
1479                 if (s->tcb.snd.una < s->tcb.snd.rcvr) {
1480
1481                         send = in_fast_retransmit(s, acked, tack->segs.ack,
1482                                 tack->segs.dup);
1483                 } else {
1484                         /* RFC 5682 3.2.3 full ACK */
1485                         stop_fast_retransmit(s);
1486                         timer_stop(s);
1487
1488                         /* if we have another series of dup ACKs */
1489                         if (tack->dup3.seg != 0 &&
1490                                         s->tcb.snd.una != s->tcb.snd.nxt &&
1491                                         tcp_seq_leq((uint32_t)s->tcb.snd.rcvr,
1492                                         tack->dup3.ack)) {
1493
1494                                 /* restart fast retransmit again. */
1495                                 start_fast_retransmit(s);
1496                                 send = in_fast_retransmit(s,
1497                                         tack->ack - tack->dup3.ack,
1498                                         tack->segs.ack - tack->dup3.seg - 1,
1499                                         tack->segs.dup);
1500                         }
1501                 }
1502         }
1503
1504         return send;
1505 }
1506
1507 /*
1508  * our FIN was acked, stop rto timer, change stream state,
1509  * and possibly close the stream.
1510  */
1511 static inline void
1512 rx_ackfin(struct tle_tcp_stream *s)
1513 {
1514         uint32_t state;
1515
1516         s->tcb.snd.una = s->tcb.snd.fss;
1517         empty_tq(s);
1518
1519         state = s->tcb.state;
1520         if (state == TCP_ST_LAST_ACK)
1521                 stream_term(s);
1522         else if (state == TCP_ST_FIN_WAIT_1) {
1523                 timer_stop(s);
1524                 s->tcb.state = TCP_ST_FIN_WAIT_2;
1525         } else if (state == TCP_ST_CLOSING) {
1526                 stream_timewait(s, s->tcb.snd.rto_tw);
1527         }
1528 }
1529
1530 static inline void
1531 rx_process_ack(struct tle_tcp_stream *s, uint32_t ts,
1532         const struct dack_info *tack)
1533 {
1534         int32_t send;
1535         uint32_t n;
1536
1537         s->tcb.rcv.dupack = tack->segs.dup;
1538
1539         n = rx_ackdata(s, tack->ack);
1540         send = process_ack(s, n, tack);
1541
1542         /* try to send more data. */
1543         if ((n != 0 || send != 0) && tcp_txq_nxt_cnt(s) != 0)
1544                 txs_enqueue(s->s.ctx, s);
1545
1546         /* restart RTO timer. */
1547         if (s->tcb.snd.nxt != s->tcb.snd.una)
1548                 timer_start(s);
1549
1550         /* update rto, if fresh packet is here then calculate rtt */
1551         if (tack->ts.ecr != 0)
1552                 rto_estimate(&s->tcb, ts - tack->ts.ecr);
1553 }
1554
1555 /*
1556  * process <SYN,ACK>
1557  * returns negative value on failure, or zero on success.
1558  */
1559 static inline int
1560 rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
1561         const union seg_info *si, struct rte_mbuf *mb,
1562         struct resp_info *rsp)
1563 {
1564         struct tle_tcp_syn_opts so;
1565         struct rte_tcp_hdr *th;
1566
1567         if (state != TCP_ST_SYN_SENT)
1568                 return -EINVAL;
1569
1570         /*
1571          * RFC 793 3.9: in the SYN-SENT state
1572          * If SEG.ACK =< ISS, or SEG.ACK > SND.NXT, send a reset
1573          * <SEQ=SEG.ACK><CTL=RST>
1574          * and discard the segment.
1575          * The connection remains in the same state.
1576          */
1577         if (si->ack != (uint32_t)s->tcb.snd.nxt) {
1578                 send_rst(s, si->ack);
1579                 return 0;
1580         }
1581
1582         th = rte_pktmbuf_mtod_offset(mb, struct rte_tcp_hdr *,
1583                 mb->l2_len + mb->l3_len);
1584         get_syn_opts(&so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th));
1585
1586         s->tcb.so = so;
1587
1588         s->tcb.snd.una = s->tcb.snd.nxt;
1589         s->tcb.snd.mss = calc_smss(so.mss, &s->tx.dst);
1590         s->tcb.snd.wnd = si->wnd << so.wscale;
1591         s->tcb.snd.wu.wl1 = si->seq;
1592         s->tcb.snd.wu.wl2 = si->ack;
1593         s->tcb.snd.wscale = so.wscale;
1594
1595         /* setup congestion variables */
1596         s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd);
1597         s->tcb.snd.ssthresh = s->tcb.snd.wnd;
1598
1599         s->tcb.rcv.ts = so.ts.val;
1600         s->tcb.rcv.irs = si->seq;
1601         s->tcb.rcv.nxt = si->seq + 1;
1602
1603         /* if peer doesn't support WSCALE opt, recalculate RCV.WND */
1604         s->tcb.rcv.wscale = (so.wscale == TCP_WSCALE_NONE) ?
1605                 TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
1606         s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
1607
1608         /* calculate initial rto */
1609         rto_estimate(&s->tcb, ts - s->tcb.snd.ts);
1610
1611         rsp->flags |= TCP_FLAG_ACK;
1612
1613         timer_stop(s);
1614         s->tcb.state = TCP_ST_ESTABLISHED;
1615         rte_smp_wmb();
1616
1617         if (s->tx.ev != NULL)
1618                 tle_event_raise(s->tx.ev);
1619         else if (s->tx.cb.func != NULL)
1620                 s->tx.cb.func(s->tx.cb.data, &s->s);
1621
1622         return 0;
1623 }
1624
1625 static inline uint32_t
1626 rx_stream(struct tle_tcp_stream *s, uint32_t ts,
1627         const union pkt_info *pi, const union seg_info si[],
1628         struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1629         uint32_t num)
1630 {
1631         uint32_t i, k, n, state;
1632         int32_t ret;
1633         struct resp_info rsp;
1634         struct dack_info tack;
1635
1636         k = 0;
1637         rsp.flags = 0;
1638
1639         state = s->tcb.state;
1640
1641         /*
1642          * first check for the states/flags where we don't
1643          * expect groups of packets.
1644          */
1645
1646         /* process RST */
1647         if ((pi->tf.flags & TCP_FLAG_RST) != 0) {
1648                 for (i = 0;
1649                                 i != num &&
1650                                 rx_rst(s, state, pi->tf.flags, &si[i]);
1651                                 i++)
1652                         ;
1653                 i = 0;
1654
1655         /* RFC 793: if the ACK bit is off drop the segment and return */
1656         } else if ((pi->tf.flags & TCP_FLAG_ACK) == 0) {
1657                 i = 0;
1658         /*
1659          * first check for the states/flags where we don't
1660          * expect groups of packets.
1661          */
1662
1663         /* process <SYN,ACK> */
1664         } else if ((pi->tf.flags & TCP_FLAG_SYN) != 0) {
1665                 for (i = 0; i != num; i++) {
1666                         ret = rx_synack(s, ts, state, &si[i], mb[i], &rsp);
1667                         if (ret == 0)
1668                                 break;
1669
1670                         rc[k] = -ret;
1671                         rp[k] = mb[i];
1672                         k++;
1673                 }
1674
1675         /* process FIN */
1676         } else if ((pi->tf.flags & TCP_FLAG_FIN) != 0) {
1677                 ret = 0;
1678                 for (i = 0; i != num; i++) {
1679                         ret = rx_fin(s, state, &si[i], mb[i], &rsp);
1680                         if (ret >= 0)
1681                                 break;
1682
1683                         rc[k] = -ret;
1684                         rp[k] = mb[i];
1685                         k++;
1686                 }
1687                 i += (ret > 0);
1688
1689         /* normal data/ack packets */
1690         } else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
1691
1692                 /* process incoming data packets. */
1693                 dack_info_init(&tack, &s->tcb);
1694                 n = rx_data_ack(s, &tack, si, mb, rp, rc, num);
1695
1696                 /* follow up actions based on aggregated information */
1697
1698                 /* update SND.WND */
1699                 ack_window_update(&s->tcb, &tack);
1700
1701                 /*
1702                  * fast-path: all data & FIN was already sent out
1703                  * and now is acknowledged.
1704                  */
1705                 if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1706                                 tack.ack == (uint32_t)s->tcb.snd.nxt)
1707                         rx_ackfin(s);
1708                 else
1709                         rx_process_ack(s, ts, &tack);
1710
1711                 /*
1712                  * send an immediate ACK if either:
1713                  * - received segment with invalid seq/ack number
1714                  * - received segment with OFO data
1715                  * - received segment with INO data and no TX is scheduled
1716                  *   for that stream.
1717                  */
1718                 if (tack.segs.badseq != 0 || tack.segs.ofo != 0 ||
1719                                 (tack.segs.data != 0 &&
1720                                 rte_atomic32_read(&s->tx.arm) == 0))
1721                         rsp.flags |= TCP_FLAG_ACK;
1722
1723                 rx_ofo_fin(s, &rsp);
1724
1725                 k += num - n;
1726                 i = num;
1727
1728         /* unhandled state, drop all packets. */
1729         } else
1730                 i = 0;
1731
1732         /* we have a response packet to send. */
1733         if (rsp.flags != 0) {
1734                 send_ack(s, ts, rsp.flags);
1735
1736                 /* start the timer for FIN packet */
1737                 if ((rsp.flags & TCP_FLAG_FIN) != 0)
1738                         timer_reset(s);
1739         }
1740
1741         /* unprocessed packets */
1742         for (; i != num; i++, k++) {
1743                 rc[k] = ENODATA;
1744                 rp[k] = mb[i];
1745         }
1746
1747         return num - k;
1748 }
1749
1750 static inline uint32_t
1751 rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
1752         const union pkt_info *pi, const union seg_info si[],
1753         struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1754         uint32_t num)
1755 {
1756         uint32_t i;
1757
1758         if (tcp_stream_acquire(s) > 0) {
1759                 i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1760                 tcp_stream_release(s);
1761                 return i;
1762         }
1763
1764         for (i = 0; i != num; i++) {
1765                 rc[i] = ENOENT;
1766                 rp[i] = mb[i];
1767         }
1768         return 0;
1769 }
1770
1771 static inline uint32_t
1772 rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
1773         const union pkt_info pi[], union seg_info si[],
1774         struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1775         uint32_t num)
1776 {
1777         struct tle_tcp_stream *cs, *s;
1778         uint32_t i, k, n, state;
1779         int32_t ret;
1780
1781         s = rx_obtain_stream(dev, st, &pi[0], type);
1782         if (s == NULL) {
1783                 for (i = 0; i != num; i++) {
1784                         rc[i] = ENOENT;
1785                         rp[i] = mb[i];
1786                 }
1787                 return 0;
1788         }
1789
1790         k = 0;
1791         state = s->tcb.state;
1792
1793         if (state == TCP_ST_LISTEN) {
1794
1795                 /* one connection per flow */
1796                 cs = NULL;
1797                 ret = -EINVAL;
1798                 for (i = 0; i != num; i++) {
1799
1800                         ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs);
1801
1802                         /* valid packet encountered */
1803                         if (ret >= 0)
1804                                 break;
1805
1806                         /* invalid packet, keep trying to find a proper one */
1807                         rc[k] = -ret;
1808                         rp[k] = mb[i];
1809                         k++;
1810                 }
1811
1812                 /* packet is valid, but we are out of streams to serve it */
1813                 if (ret > 0) {
1814                         for (; i != num; i++, k++) {
1815                                 rc[k] = ret;
1816                                 rp[k] = mb[i];
1817                         }
1818                 /* new stream is accepted */
1819                 } else if (ret == 0) {
1820
1821                         /* inform listen stream about new connections */
1822                         if (s->rx.ev != NULL)
1823                                 tle_event_raise(s->rx.ev);
1824                         else if (s->rx.cb.func != NULL &&
1825                                         rte_ring_count(s->rx.q) == 1)
1826                                 s->rx.cb.func(s->rx.cb.data, &s->s);
1827
1828                         /* if there is no data, drop current packet */
1829                         if (PKT_L4_PLEN(mb[i]) == 0) {
1830                                 rc[k] = ENODATA;
1831                                 rp[k++] = mb[i++];
1832                         }
1833
1834                         /*  process remaining packets for that stream */
1835                         if (num != i) {
1836                                 n = rx_new_stream(cs, ts, pi + i, si + i,
1837                                         mb + i, rp + k, rc + k, num - i);
1838                                 k += num - n - i;
1839                         }
1840                 }
1841
1842         } else {
1843                 i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1844                 k = num - i;
1845         }
1846
1847         tcp_stream_release(s);
1848         return num - k;
1849 }
1850
1851
1852 static inline uint32_t
1853 rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
1854         const union pkt_info pi[], const union seg_info si[],
1855         struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1856         uint32_t num)
1857 {
1858         struct tle_tcp_stream *s;
1859         uint32_t i, k;
1860         int32_t ret;
1861
1862         s = rx_obtain_listen_stream(dev, &pi[0], type);
1863         if (s == NULL) {
1864                 for (i = 0; i != num; i++) {
1865                         rc[i] = ENOENT;
1866                         rp[i] = mb[i];
1867                 }
1868                 return 0;
1869         }
1870
1871         k = 0;
1872         for (i = 0; i != num; i++) {
1873
1874                 /* check that this remote is allowed to connect */
1875                 if (rx_check_stream(s, &pi[i]) != 0)
1876                         ret = -ENOENT;
1877                 else
1878                         /* syncokie: reply with <SYN,ACK> */
1879                         ret = sync_ack(s, &pi[i], &si[i], ts, mb[i]);
1880
1881                 if (ret != 0) {
1882                         rc[k] = -ret;
1883                         rp[k] = mb[i];
1884                         k++;
1885                 }
1886         }
1887
1888         tcp_stream_release(s);
1889         return num - k;
1890 }
1891
1892 uint16_t
1893 tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
1894         struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
1895 {
1896         struct stbl *st;
1897         struct tle_ctx *ctx;
1898         uint32_t i, j, k, mt, n, t, ts;
1899         union pkt_info pi[num];
1900         union seg_info si[num];
1901         union {
1902                 uint8_t t[TLE_VNUM];
1903                 uint32_t raw;
1904         } stu;
1905
1906         ctx = dev->ctx;
1907         ts = tcp_get_tms(ctx->cycles_ms_shift);
1908         st = CTX_TCP_STLB(ctx);
1909         mt = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0);
1910
1911         stu.raw = 0;
1912
1913         /* extract packet info and check the L3/L4 csums */
1914         for (i = 0; i != num; i++) {
1915
1916                 get_pkt_info(pkt[i], &pi[i], &si[i]);
1917
1918                 t = pi[i].tf.type;
1919                 pi[i].csf = check_pkt_csum(pkt[i], pi[i].csf, t, IPPROTO_TCP);
1920                 stu.t[t] = mt;
1921         }
1922
1923         if (stu.t[TLE_V4] != 0)
1924                 stbl_lock(st, TLE_V4);
1925         if (stu.t[TLE_V6] != 0)
1926                 stbl_lock(st, TLE_V6);
1927
1928         k = 0;
1929         for (i = 0; i != num; i += j) {
1930
1931                 t = pi[i].tf.type;
1932
1933                 /*basic checks for incoming packet */
1934                 if (t >= TLE_VNUM || pi[i].csf != 0 || dev->dp[t] == NULL) {
1935                         rc[k] = EINVAL;
1936                         rp[k] = pkt[i];
1937                         j = 1;
1938                         k++;
1939                 /* process input SYN packets */
1940                 } else if (pi[i].tf.flags == TCP_FLAG_SYN) {
1941                         j = pkt_info_bulk_syneq(pi + i, num - i);
1942                         n = rx_syn(dev, t, ts, pi + i, si + i, pkt + i,
1943                                 rp + k, rc + k, j);
1944                         k += j - n;
1945                 } else {
1946                         j = pkt_info_bulk_eq(pi + i, num - i);
1947                         n = rx_postsyn(dev, st, t, ts, pi + i, si + i, pkt + i,
1948                                 rp + k, rc + k, j);
1949                         k += j - n;
1950                 }
1951         }
1952
1953         if (stu.t[TLE_V4] != 0)
1954                 stbl_unlock(st, TLE_V4);
1955         if (stu.t[TLE_V6] != 0)
1956                 stbl_unlock(st, TLE_V6);
1957
1958         return num - k;
1959 }
1960
1961 uint16_t
1962 tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
1963         uint32_t num)
1964 {
1965         uint32_t n;
1966         struct tle_tcp_stream *s;
1967         struct tle_memtank *mts;
1968
1969         s = TCP_STREAM(ts);
1970         n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num);
1971         if (n == 0)
1972                 return 0;
1973
1974         mts = CTX_TCP_MTS(ts->ctx);
1975
1976         /*
1977          * if we still have packets to read,
1978          * then rearm stream RX event.
1979          */
1980         if (n == num && rte_ring_count(s->rx.q) != 0) {
1981                 if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
1982                         tle_event_raise(s->rx.ev);
1983                 tcp_stream_release(s);
1984         }
1985
1986         tle_memtank_grow(mts);
1987         return n;
1988 }
1989
1990 uint16_t
1991 tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
1992 {
1993         uint32_t i, j, k, n;
1994         struct tle_drb *drb[num];
1995         struct tle_tcp_stream *s;
1996
1997         /* extract packets from device TX queue. */
1998
1999         k = num;
2000         n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
2001                 num, drb, &k);
2002
2003         if (n == 0)
2004                 return 0;
2005
2006         /* free empty drbs and notify related streams. */
2007
2008         for (i = 0; i != k; i = j) {
2009                 s = drb[i]->udata;
2010                 for (j = i + 1; j != k && s == drb[j]->udata; j++)
2011                         ;
2012                 stream_drb_free(s, drb + i, j - i);
2013         }
2014
2015         return n;
2016 }
2017
2018 static inline void
2019 stream_fill_pkt_info(const struct tle_tcp_stream *s, union pkt_info *pi)
2020 {
2021         if (s->s.type == TLE_V4)
2022                 pi->addr4 = s->s.ipv4.addr;
2023         else
2024                 pi->addr6 = &s->s.ipv6.addr;
2025
2026         pi->port = s->s.port;
2027         pi->tf.type = s->s.type;
2028 }
2029
2030 static int
2031 stream_fill_addr(struct tle_tcp_stream *s, const struct sockaddr *addr)
2032 {
2033         const struct sockaddr_in *in4;
2034         const struct sockaddr_in6 *in6;
2035         const struct tle_dev_param *prm;
2036         int32_t rc;
2037
2038         rc = 0;
2039         s->s.pmsk.raw = UINT32_MAX;
2040
2041         /* setup L4 src ports and src address fields. */
2042         if (s->s.type == TLE_V4) {
2043                 in4 = (const struct sockaddr_in *)addr;
2044                 if (in4->sin_addr.s_addr == INADDR_ANY || in4->sin_port == 0)
2045                         return -EINVAL;
2046
2047                 s->s.port.src = in4->sin_port;
2048                 s->s.ipv4.addr.src = in4->sin_addr.s_addr;
2049                 s->s.ipv4.mask.src = INADDR_NONE;
2050                 s->s.ipv4.mask.dst = INADDR_NONE;
2051
2052         } else if (s->s.type == TLE_V6) {
2053                 in6 = (const struct sockaddr_in6 *)addr;
2054                 if (memcmp(&in6->sin6_addr, &tle_ipv6_any,
2055                                 sizeof(tle_ipv6_any)) == 0 ||
2056                                 in6->sin6_port == 0)
2057                         return -EINVAL;
2058
2059                 s->s.port.src = in6->sin6_port;
2060                 rte_memcpy(&s->s.ipv6.addr.src, &in6->sin6_addr,
2061                         sizeof(s->s.ipv6.addr.src));
2062                 rte_memcpy(&s->s.ipv6.mask.src, &tle_ipv6_none,
2063                         sizeof(s->s.ipv6.mask.src));
2064                 rte_memcpy(&s->s.ipv6.mask.dst, &tle_ipv6_none,
2065                         sizeof(s->s.ipv6.mask.dst));
2066         }
2067
2068         /* setup the destination device. */
2069         rc = stream_fill_dest(s);
2070         if (rc != 0)
2071                 return rc;
2072
2073         /* setup L4 dst address from device param */
2074         prm = &s->tx.dst.dev->prm;
2075         if (s->s.type == TLE_V4) {
2076                 if (s->s.ipv4.addr.dst == INADDR_ANY)
2077                         s->s.ipv4.addr.dst = prm->local_addr4.s_addr;
2078         } else if (memcmp(&s->s.ipv6.addr.dst, &tle_ipv6_any,
2079                         sizeof(tle_ipv6_any)) == 0)
2080                 memcpy(&s->s.ipv6.addr.dst, &prm->local_addr6,
2081                         sizeof(s->s.ipv6.addr.dst));
2082
2083         return rc;
2084 }
2085
2086 static inline int
2087 tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
2088 {
2089         int32_t rc;
2090         uint32_t tms, seq;
2091         union pkt_info pi;
2092         struct stbl *st;
2093         struct stbl_entry *se;
2094
2095         /* fill stream address */
2096         rc = stream_fill_addr(s, addr);
2097         if (rc != 0)
2098                 return rc;
2099
2100         /* fill pkt info to generate seq.*/
2101         stream_fill_pkt_info(s, &pi);
2102
2103         tms = tcp_get_tms(s->s.ctx->cycles_ms_shift);
2104         s->tcb.so.ts.val = tms;
2105         s->tcb.so.ts.ecr = 0;
2106         s->tcb.so.wscale = TCP_WSCALE_DEFAULT;
2107         s->tcb.so.mss = calc_smss(s->tx.dst.mtu, &s->tx.dst);
2108
2109         /* note that rcv.nxt is 0 here for sync_gen_seq.*/
2110         seq = sync_gen_seq(&pi, s->tcb.rcv.nxt, tms, s->tcb.so.mss,
2111                                 s->s.ctx->prm.hash_alg,
2112                                 &s->s.ctx->prm.secret_key);
2113         s->tcb.snd.iss = seq;
2114         s->tcb.snd.rcvr = seq;
2115         s->tcb.snd.una = seq;
2116         s->tcb.snd.nxt = seq + 1;
2117         s->tcb.snd.rto = TCP_RTO_DEFAULT;
2118         s->tcb.snd.ts = tms;
2119
2120         s->tcb.rcv.mss = s->tcb.so.mss;
2121         s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT;
2122         s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
2123         s->tcb.rcv.ts = 0;
2124
2125         /* add the stream in stream table */
2126         st = CTX_TCP_STLB(s->s.ctx);
2127         se = stbl_add_stream_lock(st, s);
2128         if (se == NULL)
2129                 return -ENOBUFS;
2130         s->ste = se;
2131
2132         /* put stream into the to-send queue */
2133         txs_enqueue(s->s.ctx, s);
2134
2135         return 0;
2136 }
2137
2138 int
2139 tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
2140 {
2141         struct tle_tcp_stream *s;
2142         uint32_t type;
2143         int32_t rc;
2144
2145         if (ts == NULL || addr == NULL)
2146                 return -EINVAL;
2147
2148         s = TCP_STREAM(ts);
2149         type = s->s.type;
2150         if (type >= TLE_VNUM)
2151                 return -EINVAL;
2152
2153         if (tcp_stream_try_acquire(s) > 0) {
2154                 rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
2155                         TCP_ST_SYN_SENT);
2156                 rc = (rc == 0) ? -EDEADLK : 0;
2157         } else
2158                 rc = -EINVAL;
2159
2160         if (rc != 0) {
2161                 tcp_stream_release(s);
2162                 return rc;
2163         }
2164
2165         /* fill stream, prepare and transmit syn pkt */
2166         s->tcb.uop |= TCP_OP_CONNECT;
2167         rc = tx_syn(s, addr);
2168         tcp_stream_release(s);
2169
2170         /* error happened, do a cleanup */
2171         if (rc != 0)
2172                 tle_tcp_stream_close(ts);
2173
2174         return rc;
2175 }
2176
2177 /*
2178  * Helper function for tle_tcp_stream_establish().
2179  * updates stream's TCB.
2180  */
2181 static inline void
2182 tcb_establish(struct tle_tcp_stream *s, const struct tle_tcp_conn_info *ci)
2183 {
2184         uint32_t tms;
2185
2186         tms = tcp_get_tms(s->s.ctx->cycles_ms_shift);
2187
2188         s->tcb.so = ci->so;
2189         fill_tcb_snd(&s->tcb, ci->seq, ci->ack, ci->so.mss,
2190                 ci->wnd, ci->so.wscale, &ci->so.ts);
2191         fill_tcb_rcv(&s->tcb, ci->seq, ci->so.wscale, &ci->so.ts);
2192
2193         s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
2194
2195         /* setup congestion variables */
2196         s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd);
2197         s->tcb.snd.ssthresh = s->tcb.snd.wnd;
2198
2199         estimate_stream_rto(s, tms);
2200 }
2201
2202 /*
2203  * !!! add flgs to distinguish - add or not stream into the table.
2204  */
2205 struct tle_stream *
2206 tle_tcp_stream_establish(struct tle_ctx *ctx,
2207         const struct tle_tcp_stream_param *prm,
2208         const struct tle_tcp_conn_info *ci)
2209 {
2210         int32_t rc;
2211         struct tle_tcp_stream *s;
2212         struct stbl *st;
2213
2214         if (ctx == NULL || prm == NULL || ci == NULL) {
2215                 rte_errno = -EINVAL;
2216                 return NULL;
2217         }
2218
2219         /* allocate new stream */
2220         s = tcp_stream_get(ctx, TLE_MTANK_ALLOC_CHUNK | TLE_MTANK_ALLOC_GROW);
2221         if (s == NULL) {
2222                 rte_errno = ENFILE;
2223                 return NULL;
2224         }
2225
2226         do {
2227                 s->tcb.uop |= TCP_OP_ESTABLISH;
2228
2229                 /* check and use stream addresses and parameters */
2230                 rc = tcp_stream_fill_prm(s, prm);
2231                 if (rc != 0)
2232                         break;
2233
2234                 /* retrieve and cache destination information. */
2235                 rc = stream_fill_dest(s);
2236                 if (rc != 0)
2237                         break;
2238
2239                 /* add the stream to the stream table */
2240                 st = CTX_TCP_STLB(s->s.ctx);
2241                 s->ste = stbl_add_stream_lock(st, s);
2242                 if (s->ste == NULL) {
2243                         rc = -ENOBUFS;
2244                         break;
2245                 }
2246
2247                 /* fill TCB from user provided data */
2248                 tcb_establish(s, ci);
2249                 s->tcb.state = TCP_ST_ESTABLISHED;
2250                 tcp_stream_up(s);
2251
2252         } while (0);
2253
2254         /* cleanup on failure */
2255         if (rc != 0) {
2256                 tcp_stream_reset(ctx, s);
2257                 rte_errno = -rc;
2258                 s = NULL;
2259         }
2260
2261         return &s->s;
2262 }
2263
2264 uint16_t
2265 tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2266 {
2267         uint32_t n;
2268         struct tle_tcp_stream *s;
2269
2270         s = TCP_STREAM(ts);
2271         n = _rte_ring_mcs_dequeue_burst(s->rx.q, (void **)pkt, num);
2272         if (n == 0)
2273                 return 0;
2274
2275         /*
2276          * if we still have packets to read,
2277          * then rearm stream RX event.
2278          */
2279         if (n == num && rte_ring_count(s->rx.q) != 0) {
2280                 if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
2281                         tle_event_raise(s->rx.ev);
2282                 tcp_stream_release(s);
2283         }
2284
2285         return n;
2286 }
2287
2288 ssize_t
2289 tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov,
2290         int iovcnt)
2291 {
2292         int32_t i;
2293         uint32_t mn, n, tn;
2294         size_t sz;
2295         struct tle_tcp_stream *s;
2296         struct iovec iv;
2297         struct rxq_objs mo[2];
2298
2299         s = TCP_STREAM(ts);
2300
2301         /* get group of packets */
2302         mn = tcp_rxq_get_objs(s, mo);
2303         if (mn == 0)
2304                 return 0;
2305
2306         sz = 0;
2307         n = 0;
2308         for (i = 0; i != iovcnt; i++) {
2309                 iv = iov[i];
2310                 sz += iv.iov_len;
2311                 n += _mbus_to_iovec(&iv, mo[0].mb + n, mo[0].num - n);
2312                 if (iv.iov_len != 0) {
2313                         sz -= iv.iov_len;
2314                         break;
2315                 }
2316         }
2317
2318         tn = n;
2319
2320         if (i != iovcnt && mn != 1) {
2321                 n = 0;
2322                 do {
2323                         sz += iv.iov_len;
2324                         n += _mbus_to_iovec(&iv, mo[1].mb + n, mo[1].num - n);
2325                         if (iv.iov_len != 0) {
2326                                 sz -= iv.iov_len;
2327                                 break;
2328                         }
2329                         if (i + 1 != iovcnt)
2330                                 iv = iov[i + 1];
2331                 } while (++i != iovcnt);
2332                 tn += n;
2333         }
2334
2335         tcp_rxq_consume(s, tn);
2336
2337         /*
2338          * if we still have packets to read,
2339          * then rearm stream RX event.
2340          */
2341         if (i == iovcnt && rte_ring_count(s->rx.q) != 0) {
2342                 if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
2343                         tle_event_raise(s->rx.ev);
2344                 tcp_stream_release(s);
2345         }
2346
2347         return sz;
2348 }
2349
2350 static inline int32_t
2351 tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
2352         struct rte_mbuf *segs[], uint32_t num)
2353 {
2354         uint32_t i;
2355         int32_t rc;
2356
2357         for (i = 0; i != num; i++) {
2358                 /* Build L2/L3/L4 header */
2359                 rc = tcp_fill_mbuf(segs[i], s, &s->tx.dst, ol_flags, s->s.port,
2360                         0, TCP_FLAG_ACK, 0, 0);
2361                 if (rc != 0) {
2362                         free_mbufs(segs, num);
2363                         break;
2364                 }
2365         }
2366
2367         if (i == num) {
2368                 /* queue packets for further transmission. */
2369                 rc = _rte_ring_enqueue_bulk(s->tx.q, (void **)segs, num);
2370                 if (rc != 0)
2371                         free_mbufs(segs, num);
2372         }
2373
2374         return rc;
2375 }
2376
2377 uint16_t
2378 tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2379 {
2380         uint32_t i, j, k, mss, n, state;
2381         int32_t rc;
2382         uint64_t ol_flags;
2383         struct tle_tcp_stream *s;
2384         struct rte_mbuf *segs[TCP_MAX_PKT_SEG];
2385
2386         s = TCP_STREAM(ts);
2387
2388         /* mark stream as not closable. */
2389         if (tcp_stream_acquire(s) < 0) {
2390                 rte_errno = EAGAIN;
2391                 return 0;
2392         }
2393
2394         state = s->tcb.state;
2395         if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
2396                 rte_errno = ENOTCONN;
2397                 tcp_stream_release(s);
2398                 return 0;
2399         }
2400
2401         mss = s->tcb.snd.mss;
2402         ol_flags = s->tx.dst.ol_flags;
2403
2404         k = 0;
2405         rc = 0;
2406         while (k != num) {
2407                 /* prepare and check for TX */
2408                 for (i = k; i != num; i++) {
2409                         if (pkt[i]->pkt_len > mss ||
2410                                         pkt[i]->nb_segs > TCP_MAX_PKT_SEG)
2411                                 break;
2412                         rc = tcp_fill_mbuf(pkt[i], s, &s->tx.dst, ol_flags,
2413                                 s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2414                         if (rc != 0)
2415                                 break;
2416                 }
2417
2418                 if (i != k) {
2419                         /* queue packets for further transmission. */
2420                         n = _rte_ring_enqueue_burst(s->tx.q,
2421                                 (void **)pkt + k, (i - k));
2422                         k += n;
2423
2424                         /*
2425                          * for unsent, but already modified packets:
2426                          * remove pkt l2/l3 headers, restore ol_flags
2427                          */
2428                         if (i != k) {
2429                                 ol_flags = ~s->tx.dst.ol_flags;
2430                                 for (j = k; j != i; j++) {
2431                                         rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len +
2432                                                 pkt[j]->l3_len +
2433                                                 pkt[j]->l4_len);
2434                                         pkt[j]->ol_flags &= ol_flags;
2435                                 }
2436                                 break;
2437                         }
2438                 }
2439
2440                 if (rc != 0) {
2441                         rte_errno = -rc;
2442                         break;
2443
2444                 /* segment large packet and enqueue for sending */
2445                 } else if (i != num) {
2446                         /* segment the packet. */
2447                         rc = tcp_segmentation(pkt[i], segs, RTE_DIM(segs),
2448                                 &s->tx.dst, mss);
2449                         if (rc < 0) {
2450                                 rte_errno = -rc;
2451                                 break;
2452                         }
2453
2454                         rc = tx_segments(s, ol_flags, segs, rc);
2455                         if (rc == 0) {
2456                                 /* free the large mbuf */
2457                                 rte_pktmbuf_free(pkt[i]);
2458                                 /* set the mbuf as consumed */
2459                                 k++;
2460                         } else
2461                                 /* no space left in tx queue */
2462                                 break;
2463                 }
2464         }
2465
2466         /* notify BE about more data to send */
2467         if (k != 0)
2468                 txs_enqueue(s->s.ctx, s);
2469         /* if possible, re-arm stream write event. */
2470         if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2471                 tle_event_raise(s->tx.ev);
2472
2473         tcp_stream_release(s);
2474
2475         return k;
2476 }
2477
2478 ssize_t
2479 tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp,
2480         const struct iovec *iov, int iovcnt)
2481 {
2482         int32_t i, rc;
2483         uint32_t j, k, n, num, slen, state;
2484         uint64_t ol_flags;
2485         size_t sz, tsz;
2486         struct tle_tcp_stream *s;
2487         struct iovec iv;
2488         struct rte_mbuf *mb[2 * MAX_PKT_BURST];
2489
2490         s = TCP_STREAM(ts);
2491
2492         /* mark stream as not closable. */
2493         if (tcp_stream_acquire(s) < 0) {
2494                 rte_errno = EAGAIN;
2495                 return -1;
2496         }
2497
2498         state = s->tcb.state;
2499         if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
2500                 rte_errno = ENOTCONN;
2501                 tcp_stream_release(s);
2502                 return -1;
2503         }
2504
2505         /* figure out how many mbufs do we need */
2506         tsz = 0;
2507         for (i = 0; i != iovcnt; i++)
2508                 tsz += iov[i].iov_len;
2509
2510         slen = rte_pktmbuf_data_room_size(mp);
2511         slen = RTE_MIN(slen, s->tcb.snd.mss);
2512
2513         num = (tsz + slen - 1) / slen;
2514         n = rte_ring_free_count(s->tx.q);
2515         num = RTE_MIN(num, n);
2516         n = RTE_MIN(num, RTE_DIM(mb));
2517
2518         /* allocate mbufs */
2519         if (rte_pktmbuf_alloc_bulk(mp, mb, n) != 0) {
2520                 rte_errno = ENOMEM;
2521                 tcp_stream_release(s);
2522                 return -1;
2523         }
2524
2525         /* copy data into the mbufs */
2526         k = 0;
2527         sz = 0;
2528         for (i = 0; i != iovcnt; i++) {
2529                 iv = iov[i];
2530                 sz += iv.iov_len;
2531                 k += _iovec_to_mbsegs(&iv, slen, mb + k, n - k);
2532                 if (iv.iov_len != 0) {
2533                         sz -= iv.iov_len;
2534                         break;
2535                 }
2536         }
2537
2538         /* partially filled segment */
2539         k += (k != n && mb[k]->data_len != 0);
2540
2541         /* fill pkt headers */
2542         ol_flags = s->tx.dst.ol_flags;
2543
2544         for (j = 0; j != k; j++) {
2545                 rc = tcp_fill_mbuf(mb[j], s, &s->tx.dst, ol_flags,
2546                         s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2547                 if (rc != 0)
2548                         break;
2549         }
2550
2551         /* if no error encountered, then enqueue pkts for transmission */
2552         if (k == j)
2553                 k = _rte_ring_enqueue_burst(s->tx.q, (void **)mb, j);
2554         else
2555                 k = 0;
2556
2557         if (k != j) {
2558
2559                 /* free pkts that were not enqueued */
2560                 free_mbufs(mb + k, j - k);
2561
2562                 /* our last segment can be partially filled */
2563                 sz += slen - sz % slen;
2564                 sz -= (j - k) * slen;
2565
2566                 /* report an error */
2567                 if (rc != 0) {
2568                         rte_errno = -rc;
2569                         sz = -1;
2570                 }
2571         }
2572
2573         if (k != 0) {
2574
2575                 /* notify BE about more data to send */
2576                 txs_enqueue(s->s.ctx, s);
2577
2578                 /* if possible, re-arm stream write event. */
2579                 if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2580                         tle_event_raise(s->tx.ev);
2581         }
2582
2583         tcp_stream_release(s);
2584         return sz;
2585 }
2586
2587 /* send data and FIN (if needed) */
2588 static inline void
2589 tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
2590 {
2591         /* try to send some data */
2592         tx_nxt_data(s, tms);
2593
2594         /* we also have to send a FIN */
2595         if (state != TCP_ST_ESTABLISHED &&
2596                         state != TCP_ST_CLOSE_WAIT &&
2597                         tcp_txq_nxt_cnt(s) == 0 &&
2598                         s->tcb.snd.fss != s->tcb.snd.nxt) {
2599                 s->tcb.snd.fss = ++s->tcb.snd.nxt;
2600                 send_ack(s, tms, TCP_FLAG_FIN | TCP_FLAG_ACK);
2601         }
2602 }
2603
2604 static inline void
2605 tx_stream(struct tle_tcp_stream *s, uint32_t tms)
2606 {
2607         uint32_t state;
2608
2609         state = s->tcb.state;
2610
2611         if (state == TCP_ST_SYN_SENT) {
2612                 /* send the SYN, start the rto timer */
2613                 send_ack(s, tms, TCP_FLAG_SYN);
2614                 timer_start(s);
2615
2616         } else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2617
2618                 tx_data_fin(s, tms, state);
2619
2620                 /* start RTO timer. */
2621                 if (s->tcb.snd.nxt != s->tcb.snd.una)
2622                         timer_start(s);
2623         }
2624 }
2625
2626 static inline void
2627 rto_stream(struct tle_tcp_stream *s, uint32_t tms)
2628 {
2629         uint32_t state;
2630
2631         state = s->tcb.state;
2632
2633         TCP_LOG(DEBUG, "%s(%p, tms=%u): state=%u, "
2634                 "retx=%u, retm=%u, "
2635                 "rto=%u, snd.ts=%u, tmo=%u, "
2636                 "snd.nxt=%lu, snd.una=%lu, flight_size=%lu, "
2637                 "snd.rcvr=%lu, snd.fastack=%u, "
2638                 "wnd=%u, cwnd=%u, ssthresh=%u, "
2639                 "bytes sent=%lu, pkt remain=%u;\n",
2640                 __func__, s, tms, s->tcb.state,
2641                 s->tcb.snd.nb_retx, s->tcb.snd.nb_retm,
2642                 s->tcb.snd.rto, s->tcb.snd.ts, tms - s->tcb.snd.ts,
2643                 s->tcb.snd.nxt, s->tcb.snd.una, s->tcb.snd.nxt - s->tcb.snd.una,
2644                 s->tcb.snd.rcvr, s->tcb.snd.fastack,
2645                 s->tcb.snd.wnd, s->tcb.snd.cwnd, s->tcb.snd.ssthresh,
2646                 s->tcb.snd.nxt - s->tcb.snd.iss, tcp_txq_nxt_cnt(s));
2647
2648         if (s->tcb.snd.nb_retx < s->tcb.snd.nb_retm) {
2649
2650                 if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2651
2652                         /* update SND.CWD and SND.SSTHRESH */
2653                         rto_cwnd_update(&s->tcb);
2654
2655                         /* RFC 6582 3.2.4 */
2656                         s->tcb.snd.rcvr = s->tcb.snd.nxt;
2657                         s->tcb.snd.fastack = 0;
2658
2659                         /* restart from last acked data */
2660                         tcp_txq_rst_nxt_head(s);
2661                         s->tcb.snd.nxt = s->tcb.snd.una;
2662
2663                         tx_data_fin(s, tms, state);
2664
2665                 } else if (state == TCP_ST_SYN_SENT) {
2666                         /* resending SYN */
2667                         s->tcb.so.ts.val = tms;
2668
2669                         /* According to RFC 6928 2:
2670                          * To reduce the chance for spurious SYN or SYN/ACK
2671                          * retransmission, it is RECOMMENDED that
2672                          * implementations refrain from resetting the initial
2673                          * window to 1 segment, unless there have been more
2674                          * than one SYN or SYN/ACK retransmissions or true loss
2675                          * detection has been made.
2676                          */
2677                         if (s->tcb.snd.nb_retx != 0)
2678                                 s->tcb.snd.cwnd = s->tcb.snd.mss;
2679
2680                         send_ack(s, tms, TCP_FLAG_SYN);
2681
2682                 } else if (state == TCP_ST_TIME_WAIT) {
2683                         stream_term(s);
2684                 }
2685
2686                 /* RFC6298:5.5 back off the timer */
2687                 s->tcb.snd.rto = rto_roundup(2 * s->tcb.snd.rto);
2688                 s->tcb.snd.nb_retx++;
2689                 timer_restart(s);
2690
2691         } else {
2692                 send_rst(s, s->tcb.snd.nxt);
2693                 stream_term(s);
2694         }
2695 }
2696
2697 int
2698 tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
2699 {
2700         uint32_t i, k, tms;
2701         struct sdr *dr;
2702         struct tle_timer_wheel *tw;
2703         struct tle_stream *p;
2704         struct tle_tcp_stream *s, *rs[num];
2705
2706         /* process streams with RTO exipred */
2707
2708         tw = CTX_TCP_TMWHL(ctx);
2709         tms = tcp_get_tms(ctx->cycles_ms_shift);
2710         tle_timer_expire(tw, tms);
2711
2712         k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs));
2713
2714         for (i = 0; i != k; i++) {
2715
2716                 s = rs[i];
2717                 s->timer.handle = NULL;
2718                 if (tcp_stream_try_acquire(s) > 0)
2719                         rto_stream(s, tms);
2720                 tcp_stream_release(s);
2721         }
2722
2723         /* process streams from to-send queue */
2724
2725         k = txs_dequeue_bulk(ctx, rs, RTE_DIM(rs));
2726
2727         for (i = 0; i != k; i++) {
2728
2729                 s = rs[i];
2730                 rte_atomic32_set(&s->tx.arm, 0);
2731
2732                 if (tcp_stream_try_acquire(s) > 0)
2733                         tx_stream(s, tms);
2734                 else
2735                         txs_enqueue(s->s.ctx, s);
2736                 tcp_stream_release(s);
2737         }
2738
2739         /* collect streams to close from the death row */
2740
2741         dr = CTX_TCP_SDR(ctx);
2742         for (k = 0, p = STAILQ_FIRST(&dr->be);
2743                         k != num && p != NULL;
2744                         k++, p = STAILQ_NEXT(p, link))
2745                 rs[k] = TCP_STREAM(p);
2746
2747         if (p == NULL)
2748                 STAILQ_INIT(&dr->be);
2749         else
2750                 STAILQ_FIRST(&dr->be) = p;
2751
2752         /* cleanup closed streams */
2753         for (i = 0; i != k; i++) {
2754                 s = rs[i];
2755                 tcp_stream_down(s);
2756                 tcp_stream_reset(ctx, s);
2757         }
2758
2759         return 0;
2760 }