20a26650a79a2428a3c7b71f9cba86d4e2f69ea5
[tldk.git] / lib / libtle_l4p / tcp_rxtx.c
1 /*
2  * Copyright (c) 2016-2017  Intel Corporation.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <rte_errno.h>
17 #include <rte_ethdev.h>
18 #include <rte_ip.h>
19 #include <rte_ip_frag.h>
20 #include <rte_tcp.h>
21
22 #include "tcp_stream.h"
23 #include "tcp_timer.h"
24 #include "stream_table.h"
25 #include "syncookie.h"
26 #include "misc.h"
27 #include "tcp_ctl.h"
28 #include "tcp_rxq.h"
29 #include "tcp_txq.h"
30 #include "tcp_tx_seg.h"
31
32 #define TCP_MAX_PKT_SEG 0x20
33
34 /*
35  * checks if input TCP ports and IP addresses match given stream.
36  * returns zero on success.
37  */
38 static inline int
39 rx_check_stream(const struct tle_tcp_stream *s, const union pkt_info *pi)
40 {
41         int32_t rc;
42
43         if (pi->tf.type == TLE_V4)
44                 rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
45                         (pi->addr4.raw & s->s.ipv4.mask.raw) !=
46                         s->s.ipv4.addr.raw;
47         else
48                 rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
49                         ymm_mask_cmp(&pi->addr6->raw, &s->s.ipv6.addr.raw,
50                         &s->s.ipv6.mask.raw) != 0;
51
52         return rc;
53 }
54
55 static inline struct tle_tcp_stream *
56 rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi,
57         uint32_t type)
58 {
59         struct tle_tcp_stream *s;
60
61         s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst];
62         if (s == NULL || tcp_stream_acquire(s) < 0)
63                 return NULL;
64
65         /* check that we have a proper stream. */
66         if (s->tcb.state != TLE_TCP_ST_LISTEN) {
67                 tcp_stream_release(s);
68                 s = NULL;
69         }
70
71         return s;
72 }
73
74 static inline struct tle_tcp_stream *
75 rx_acquire_stream(struct tle_stream *ts)
76 {
77         struct tle_tcp_stream *s;
78
79         s = TCP_STREAM(ts);
80         if (tcp_stream_acquire(s) < 0)
81                 return NULL;
82
83         else if (s->tcb.state == TLE_TCP_ST_CLOSED) {
84                 tcp_stream_release(s);
85                 return NULL;
86         }
87
88         return s;
89 }
90
91 static inline struct tle_tcp_stream *
92 rx_obtain_stream(const struct tle_dev *dev, struct stbl *st,
93         const union pkt_info *pi, uint32_t type)
94 {
95         struct tle_tcp_stream *s;
96
97         s = stbl_find_data(st, pi);
98         if (s == NULL) {
99                 if (pi->tf.flags == TCP_FLAG_ACK)
100                         return rx_obtain_listen_stream(dev, pi, type);
101                 return NULL;
102         }
103
104         if (tcp_stream_acquire(s) < 0)
105                 return NULL;
106         /* check that we have a proper stream. */
107         else if (s->tcb.state == TLE_TCP_ST_CLOSED) {
108                 tcp_stream_release(s);
109                 s = NULL;
110         }
111
112         return s;
113 }
114
115 /*
116  * Consider 2 pkt_info *equal* if their:
117  * - types (IPv4/IPv6)
118  * - TCP flags
119  * - checksum flags
120  * - TCP src and dst ports
121  * - IP src and dst addresses
122  * are equal.
123  */
124 static inline int
125 pkt_info_bulk_eq(const union pkt_info pi[], uint32_t num)
126 {
127         uint32_t i;
128
129         i = 1;
130
131         if (pi[0].tf.type == TLE_V4) {
132                 while (i != num && xmm_cmp(&pi[0].raw, &pi[i].raw) == 0)
133                         i++;
134
135         } else if (pi[0].tf.type == TLE_V6) {
136                 while (i != num &&
137                                 pi[0].raw.u64[0] == pi[i].raw.u64[0] &&
138                                 ymm_cmp(&pi[0].addr6->raw,
139                                 &pi[i].addr6->raw) == 0)
140                         i++;
141         }
142
143         return i;
144 }
145
146 static inline int
147 pkt_info_bulk_syneq(const union pkt_info pi[], uint32_t num)
148 {
149         uint32_t i;
150
151         i = 1;
152
153         if (pi[0].tf.type == TLE_V4) {
154                 while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
155                                 pi[0].port.dst == pi[i].port.dst &&
156                                 pi[0].addr4.dst == pi[i].addr4.dst)
157                         i++;
158
159         } else if (pi[0].tf.type == TLE_V6) {
160                 while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
161                                 pi[0].port.dst == pi[i].port.dst &&
162                                 xmm_cmp(&pi[0].addr6->dst,
163                                 &pi[i].addr6->dst) == 0)
164                         i++;
165         }
166
167         return i;
168 }
169
170 static inline void
171 stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[],
172         uint32_t nb_drb)
173 {
174         _rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
175 }
176
177 static inline uint32_t
178 stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
179         uint32_t nb_drb)
180 {
181         return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
182 }
183
184 static inline uint32_t
185 get_ip_pid(struct tle_dev *dev, uint32_t num, uint32_t type, uint32_t st)
186 {
187         uint32_t pid;
188         rte_atomic32_t *pa;
189
190         pa = &dev->tx.packet_id[type];
191
192         if (st == 0) {
193                 pid = rte_atomic32_add_return(pa, num);
194                 return pid - num;
195         } else {
196                 pid = rte_atomic32_read(pa);
197                 rte_atomic32_set(pa, pid + num);
198                 return pid;
199         }
200 }
201
202 static inline uint32_t
203 tcp_stream_adjust_tms(const struct tle_tcp_stream *s, uint32_t tms)
204 {
205         return tms - s->ts_offset;
206 }
207
208 static inline void
209 fill_tcph(struct rte_tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
210         uint32_t seq, uint8_t hlen, uint8_t flags)
211 {
212         uint16_t wnd;
213
214         l4h->src_port = port.dst;
215         l4h->dst_port = port.src;
216
217         wnd = (flags & TCP_FLAG_SYN) ?
218                 RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
219                 tcb->rcv.wnd >> tcb->rcv.wscale;
220
221         /* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
222         l4h->sent_seq = rte_cpu_to_be_32(seq);
223         l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
224         l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET;
225         l4h->tcp_flags = flags;
226         l4h->rx_win = rte_cpu_to_be_16(wnd);
227         l4h->cksum = 0;
228         l4h->tcp_urp = 0;
229
230         if (flags & TCP_FLAG_SYN)
231                 fill_syn_opts(l4h + 1, &tcb->so);
232         else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0)
233                 fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
234 }
235
236 static inline int
237 tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s,
238         const struct tle_dest *dst, uint64_t ol_flags,
239         union l4_ports port, uint32_t seq, uint32_t flags,
240         uint32_t pid, uint32_t swcsm)
241 {
242         uint32_t l4, len, plen;
243         struct rte_tcp_hdr *l4h;
244         char *l2h;
245
246         len = dst->l2_len + dst->l3_len;
247         plen = m->pkt_len;
248
249         if (flags & TCP_FLAG_SYN)
250                 l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_MAX;
251         else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0)
252                 l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS;
253         else
254                 l4 = sizeof(*l4h);
255
256         /* adjust mbuf to put L2/L3/L4 headers into it. */
257         l2h = rte_pktmbuf_prepend(m, len + l4);
258         if (l2h == NULL)
259                 return -EINVAL;
260
261         /* copy L2/L3 header */
262         rte_memcpy(l2h, dst->hdr, len);
263
264         /* setup TCP header & options */
265         l4h = (struct rte_tcp_hdr *)(l2h + len);
266         fill_tcph(l4h, &s->tcb, port, seq, l4, flags);
267
268         /* setup mbuf TX offload related fields. */
269         m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0);
270         m->ol_flags |= ol_flags;
271
272         /* update proto specific fields. */
273
274         if (s->s.type == TLE_V4) {
275                 struct rte_ipv4_hdr *l3h;
276                 l3h = (struct rte_ipv4_hdr *)(l2h + dst->l2_len);
277                 l3h->packet_id = rte_cpu_to_be_16(pid);
278                 l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4);
279
280                 if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
281                         l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
282                                 ol_flags);
283                 else if (swcsm != 0)
284                         l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
285
286                 if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0)
287                         l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
288         } else {
289                 struct rte_ipv6_hdr *l3h;
290                 l3h = (struct rte_ipv6_hdr *)(l2h + dst->l2_len);
291                 l3h->payload_len = rte_cpu_to_be_16(plen + l4);
292                 if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
293                         l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
294                 else if (swcsm != 0)
295                         l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
296         }
297
298         return 0;
299 }
300
301 /*
302  * That function supposed to be used only for data packets.
303  * Assumes that L2/L3/L4 headers and mbuf fields already setup properly.
304  *  - updates tcp SEG.SEQ, SEG.ACK, TS.VAL, TS.ECR.
305  *  - if no HW cksum offloads are enabled, calculates TCP checksum.
306  */
307 static inline void
308 tcp_update_mbuf(struct rte_mbuf *m, uint32_t type, const struct tcb *tcb,
309         uint32_t seq, uint32_t pid)
310 {
311         struct rte_tcp_hdr *l4h;
312         uint32_t len;
313
314         len = m->l2_len + m->l3_len;
315         l4h = rte_pktmbuf_mtod_offset(m, struct rte_tcp_hdr *, len);
316
317         l4h->sent_seq = rte_cpu_to_be_32(seq);
318         l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
319
320         if (tcb->so.ts.raw != 0)
321                 fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
322
323         if (type == TLE_V4) {
324                 struct rte_ipv4_hdr *l3h;
325                 l3h = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *,
326                         m->l2_len);
327                 l3h->hdr_checksum = 0;
328                 l3h->packet_id = rte_cpu_to_be_16(pid);
329                 if ((m->ol_flags & PKT_TX_IP_CKSUM) == 0)
330                         l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
331         }
332
333         /* have to calculate TCP checksum in SW */
334         if ((m->ol_flags & PKT_TX_TCP_CKSUM) == 0) {
335
336                 l4h->cksum = 0;
337
338                 if (type == TLE_V4) {
339                         struct rte_ipv4_hdr *l3h;
340                         l3h = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *,
341                                 m->l2_len);
342                         l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
343
344                 } else {
345                         struct rte_ipv6_hdr *l3h;
346                         l3h = rte_pktmbuf_mtod_offset(m, struct rte_ipv6_hdr *,
347                                 m->l2_len);
348                         l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
349                 }
350         }
351 }
352
353 /* Send data packets that need to be ACK-ed by peer */
354 static inline uint32_t
355 tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num)
356 {
357         uint32_t bsz, i, nb, nbm;
358         struct tle_dev *dev;
359         struct tle_drb *drb[num];
360
361         /* calculate how many drbs are needed.*/
362         bsz = s->tx.drb.nb_elem;
363         nbm = (num + bsz - 1) / bsz;
364
365         /* allocate drbs, adjust number of packets. */
366         nb = stream_drb_alloc(s, drb, nbm);
367
368         /* drb ring is empty. */
369         if (nb == 0)
370                 return 0;
371
372         else if (nb != nbm)
373                 num = nb * bsz;
374
375         dev = s->tx.dst.dev;
376
377         /* enqueue pkts for TX. */
378         nbm = nb;
379         i = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)m,
380                 num, drb, &nb);
381
382         /* free unused drbs. */
383         if (nb != 0)
384                 stream_drb_free(s, drb + nbm - nb, nb);
385
386         return i;
387 }
388
389 static inline uint32_t
390 tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
391         uint32_t num)
392 {
393         uint32_t fail, i, k, n, mss, pid, plen, sz, tn, type;
394         struct tle_dev *dev;
395         struct rte_mbuf *mb;
396         struct rte_mbuf *mo[MAX_PKT_BURST + TCP_MAX_PKT_SEG];
397
398         mss = s->tcb.snd.mss;
399         type = s->s.type;
400
401         dev = s->tx.dst.dev;
402         pid = get_ip_pid(dev, num, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
403
404         k = 0;
405         tn = 0;
406         fail = 0;
407         for (i = 0; i != num && sl->len != 0 && fail == 0; i++) {
408
409                 mb = mi[i];
410                 sz = RTE_MIN(sl->len, mss);
411                 plen = PKT_L4_PLEN(mb);
412
413                 /*fast path, no need to use indirect mbufs. */
414                 if (plen <= sz) {
415
416                         /* update pkt TCP header */
417                         tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid + i);
418
419                         /* keep mbuf till ACK is received. */
420                         rte_pktmbuf_refcnt_update(mb, 1);
421                         sl->len -= plen;
422                         sl->seq += plen;
423                         mo[k++] = mb;
424                 /* remaining snd.wnd is less them MSS, send nothing */
425                 } else if (sz < mss)
426                         break;
427                 /* packet indirection needed */
428                 else
429                         RTE_VERIFY(0);
430
431                 if (k >= MAX_PKT_BURST) {
432                         n = tx_data_pkts(s, mo, k);
433                         fail = k - n;
434                         tn += n;
435                         k = 0;
436                 }
437         }
438
439         if (k != 0) {
440                 n = tx_data_pkts(s, mo, k);
441                 fail = k - n;
442                 tn += n;
443         }
444
445         if (fail != 0) {
446                 sz = tcp_mbuf_seq_free(mo + n, fail);
447                 sl->seq -= sz;
448                 sl->len += sz;
449         }
450
451         return tn;
452 }
453
454 /*
455  * gets data from stream send buffer, updates it and
456  * queues it into TX device queue.
457  * Note that this function and is not MT safe.
458  */
459 static inline uint32_t
460 tx_nxt_data(struct tle_tcp_stream *s, uint32_t tms)
461 {
462         uint32_t n, num, tn, wnd;
463         struct rte_mbuf **mi;
464         union seqlen sl;
465
466         tn = 0;
467         wnd = s->tcb.snd.wnd - (uint32_t)(s->tcb.snd.nxt - s->tcb.snd.una);
468         sl.seq = s->tcb.snd.nxt;
469         sl.len = RTE_MIN(wnd, s->tcb.snd.cwnd);
470
471         if (sl.len == 0)
472                 return tn;
473
474         /* update send timestamp */
475         s->tcb.snd.ts = tms;
476
477         do {
478                 /* get group of packets */
479                 mi = tcp_txq_get_nxt_objs(s, &num);
480
481                 /* stream send buffer is empty */
482                 if (num == 0)
483                         break;
484
485                 /* queue data packets for TX */
486                 n = tx_data_bulk(s, &sl, mi, num);
487                 tn += n;
488
489                 /* update consumer head */
490                 tcp_txq_set_nxt_head(s, n);
491         } while (n == num);
492
493         s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt;
494         return tn;
495 }
496
497 static inline void
498 free_una_data(struct tle_tcp_stream *s, uint32_t len)
499 {
500         uint32_t i, num, plen;
501         struct rte_mbuf **mi;
502
503         plen = 0;
504
505         do {
506                 /* get group of packets */
507                 mi = tcp_txq_get_una_objs(s, &num);
508
509                 if (num == 0)
510                         break;
511
512                 /* free acked data */
513                 for (i = 0; i != num && plen != len; i++) {
514                         uint32_t next_pkt_len = PKT_L4_PLEN(mi[i]);
515                         if (plen + next_pkt_len > len) {
516                                 /* keep SND.UNA at the start of the packet */
517                                 len = plen;
518                                 break;
519                         } else {
520                                 plen += next_pkt_len;
521                         }
522                         rte_pktmbuf_free(mi[i]);
523                 }
524
525                 /* update consumer tail */
526                 tcp_txq_set_una_tail(s, i);
527         } while (plen < len);
528
529         s->tcb.snd.una += len;
530
531         /*
532          * that could happen in case of retransmit,
533          * adjust SND.NXT with SND.UNA.
534          */
535         if (s->tcb.snd.una > s->tcb.snd.nxt) {
536                 tcp_txq_rst_nxt_head(s);
537                 s->tcb.snd.nxt = s->tcb.snd.una;
538         }
539 }
540
541 static inline uint16_t
542 calc_smss(uint16_t mss, const struct tle_dest *dst)
543 {
544         uint16_t n;
545
546         n = dst->mtu - dst->l2_len - dst->l3_len - TCP_TX_HDR_DACK;
547         mss = RTE_MIN(n, mss);
548         return mss;
549 }
550
551 /*
552  * RFC 6928 2
553  * min (10*MSS, max (2*MSS, 14600))
554  *
555  * or using user provided initial congestion window (icw)
556  * min (10*MSS, max (2*MSS, icw))
557  */
558 static inline uint32_t
559 initial_cwnd(uint32_t smss, uint32_t icw)
560 {
561         return RTE_MIN(10 * smss, RTE_MAX(2 * smss, icw));
562 }
563
564 /*
565  * queue standalone packet to he particular output device
566  * It assumes that:
567  * - L2/L3/L4 headers should be already set.
568  * - packet fits into one segment.
569  */
570 static inline int
571 send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m)
572 {
573         uint32_t n, nb;
574         struct tle_drb *drb;
575
576         if (stream_drb_alloc(s, &drb, 1) == 0)
577                 return -ENOBUFS;
578
579         /* enqueue pkt for TX. */
580         nb = 1;
581         n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1,
582                 &drb, &nb);
583
584         /* free unused drbs. */
585         if (nb != 0)
586                 stream_drb_free(s, &drb, 1);
587
588         return (n == 1) ? 0 : -ENOBUFS;
589 }
590
591 static inline int
592 send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
593         uint32_t flags)
594 {
595         const struct tle_dest *dst;
596         uint32_t pid, type;
597         int32_t rc;
598
599         dst = &s->tx.dst;
600         type = s->s.type;
601         pid = get_ip_pid(dst->dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
602
603         rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1);
604         if (rc == 0)
605                 rc = send_pkt(s, dst->dev, m);
606
607         return rc;
608 }
609
610 static inline int
611 send_rst(struct tle_tcp_stream *s, uint32_t seq)
612 {
613         struct rte_mbuf *m;
614         int32_t rc;
615
616         m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
617         if (m == NULL)
618                 return -ENOMEM;
619
620         rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST);
621         if (rc != 0)
622                 rte_pktmbuf_free(m);
623
624         return rc;
625 }
626
627 static inline int
628 send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags)
629 {
630         struct rte_mbuf *m;
631         uint32_t seq;
632         int32_t rc;
633
634         m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
635         if (m == NULL)
636                 return -ENOMEM;
637
638         seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0);
639         s->tcb.snd.ts = tms;
640
641         rc = send_ctrl_pkt(s, m, seq, flags);
642         if (rc != 0) {
643                 rte_pktmbuf_free(m);
644                 return rc;
645         }
646
647         s->tcb.snd.ack = s->tcb.rcv.nxt;
648         return 0;
649 }
650
651
652 static int
653 sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
654         const union seg_info *si, uint32_t ts, struct rte_mbuf *m)
655 {
656         uint16_t len;
657         int32_t rc;
658         uint32_t pid, seq, type;
659         struct tle_dev *dev;
660         const void *da;
661         struct tle_dest dst;
662         const struct rte_tcp_hdr *th;
663
664         type = s->s.type;
665
666         /* get destination information. */
667         if (type == TLE_V4)
668                 da = &pi->addr4.src;
669         else
670                 da = &pi->addr6->src;
671
672         rc = stream_get_dest(&s->s, da, &dst);
673         if (rc < 0)
674                 return rc;
675
676         th = rte_pktmbuf_mtod_offset(m, const struct rte_tcp_hdr *,
677                 m->l2_len + m->l3_len);
678         get_syn_opts(&s->tcb.so, (uintptr_t)(th + 1), m->l4_len - sizeof(*th));
679
680         /* reset wscale option if timestamp is not present */
681         if (s->tcb.so.ts.val == 0)
682                 s->tcb.so.wscale = 0;
683
684         s->tcb.rcv.nxt = si->seq + 1;
685         seq = sync_gen_seq(pi, s->tcb.rcv.nxt, ts, s->tcb.so.mss,
686                                 s->s.ctx->prm.hash_alg,
687                                 &s->s.ctx->prm.secret_key);
688         s->tcb.so.ts.ecr = s->tcb.so.ts.val;
689         s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale);
690         s->tcb.so.wscale = (s->tcb.so.wscale == TCP_WSCALE_NONE) ?
691                 TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
692         s->tcb.so.mss = calc_smss(dst.mtu, &dst);
693
694         /* reset mbuf's data contents. */
695         len = m->l2_len + m->l3_len + m->l4_len;
696         m->tx_offload = 0;
697         if (rte_pktmbuf_adj(m, len) == NULL)
698                 return -EINVAL;
699
700         dev = dst.dev;
701         pid = get_ip_pid(dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
702
703         rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq,
704                 TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
705         if (rc == 0)
706                 rc = send_pkt(s, dev, m);
707
708         return rc;
709 }
710
711 /*
712  * RFC 793:
713  * There are four cases for the acceptability test for an incoming segment:
714  * Segment Receive  Test
715  * Length  Window
716  * ------- -------  -------------------------------------------
717  *    0       0     SEG.SEQ = RCV.NXT
718  *    0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
719  *   >0       0     not acceptable
720  *   >0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
721  *                  or RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
722  */
723 static inline int
724 check_seqn(const struct tcb *tcb, uint32_t seqn, uint32_t len)
725 {
726         uint32_t n;
727
728         n = seqn + len;
729         if (seqn - tcb->rcv.nxt >= tcb->rcv.wnd &&
730                         n - tcb->rcv.nxt > tcb->rcv.wnd)
731                 return -ERANGE;
732
733         return 0;
734 }
735
736 static inline union tle_tcp_tsopt
737 rx_tms_opt(const struct tcb *tcb, const struct rte_mbuf *mb)
738 {
739         union tle_tcp_tsopt ts;
740         uintptr_t opt;
741         const struct rte_tcp_hdr *th;
742
743         if (tcb->so.ts.val != 0) {
744                 opt = rte_pktmbuf_mtod_offset(mb, uintptr_t,
745                         mb->l2_len + mb->l3_len + sizeof(*th));
746                 ts = get_tms_opts(opt, mb->l4_len - sizeof(*th));
747         } else
748                 ts.raw = 0;
749
750         return ts;
751 }
752
753 /*
754  * PAWS and sequence check.
755  * RFC 1323 4.2.1
756  */
757 static inline int
758 rx_check_seq(struct tcb *tcb, uint32_t seq, uint32_t len,
759         const union tle_tcp_tsopt ts)
760 {
761         int32_t rc;
762
763         /* RFC 1323 4.2.1 R2 */
764         rc = check_seqn(tcb, seq, len);
765         if (rc < 0)
766                 return rc;
767
768         if (ts.raw != 0) {
769
770                 /* RFC 1323 4.2.1 R1 */
771                 if (tcp_seq_lt(ts.val, tcb->rcv.ts))
772                         return -ERANGE;
773
774                 /* RFC 1323 4.2.1 R3 */
775                 if (tcp_seq_leq(seq, tcb->snd.ack) &&
776                                 tcp_seq_lt(tcb->snd.ack, seq + len))
777                         tcb->rcv.ts = ts.val;
778         }
779
780         return rc;
781 }
782
783 static inline int
784 rx_check_ack(const struct tcb *tcb, uint32_t ack)
785 {
786         uint32_t max;
787
788         max = (uint32_t)RTE_MAX(tcb->snd.nxt, tcb->snd.rcvr);
789
790         if (tcp_seq_leq(tcb->snd.una, ack) && tcp_seq_leq(ack, max))
791                 return 0;
792
793         return -ERANGE;
794 }
795
796 static inline int
797 rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
798         const union tle_tcp_tsopt ts)
799 {
800         int32_t rc;
801
802         rc = rx_check_seq(tcb, seq, len, ts);
803         rc |= rx_check_ack(tcb, ack);
804         return rc;
805 }
806
807 static inline int
808 restore_syn_opt(union seg_info *si, union tle_tcp_tsopt *to,
809         const union pkt_info *pi, uint32_t ts, const struct rte_mbuf *mb,
810         uint32_t hash_alg, rte_xmm_t *secret_key)
811 {
812         int32_t rc;
813         uint32_t len;
814         const struct rte_tcp_hdr *th;
815
816         /* check that ACK, etc fields are what we expected. */
817         rc = sync_check_ack(pi, si->seq, si->ack - 1, ts,
818                                 hash_alg,
819                                 secret_key);
820         if (rc < 0)
821                 return rc;
822
823         si->mss = rc;
824
825         th = rte_pktmbuf_mtod_offset(mb, const struct rte_tcp_hdr *,
826                 mb->l2_len + mb->l3_len);
827         len = mb->l4_len - sizeof(*th);
828         to[0] = get_tms_opts((uintptr_t)(th + 1), len);
829         return 0;
830 }
831
832 static inline void
833 stream_term(struct tle_tcp_stream *s)
834 {
835         struct sdr *dr;
836
837         s->tcb.state = TLE_TCP_ST_CLOSED;
838         rte_smp_wmb();
839
840         timer_stop(s);
841
842         /* close() was already invoked, schedule final cleanup */
843         if ((s->tcb.uop & TLE_TCP_OP_CLOSE) != 0) {
844
845                 dr = CTX_TCP_SDR(s->s.ctx);
846                 STAILQ_INSERT_TAIL(&dr->be, &s->s, link);
847
848         /* notify user that stream need to be closed */
849         } else if (s->err.ev != NULL)
850                 tle_event_raise(s->err.ev);
851         else if (s->err.cb.func != NULL)
852                 s->err.cb.func(s->err.cb.data, &s->s);
853 }
854
855 static inline int
856 stream_fill_dest(struct tle_tcp_stream *s)
857 {
858         int32_t rc;
859         uint32_t type;
860         const void *da;
861
862         type = s->s.type;
863         if (type == TLE_V4)
864                 da = &s->s.ipv4.addr.src;
865         else
866                 da = &s->s.ipv6.addr.src;
867
868         rc = stream_get_dest(&s->s, da, &s->tx.dst);
869         return (rc < 0) ? rc : 0;
870 }
871
872 /*
873  * estimate the rto
874  * for now rtt is calculated based on the tcp TMS option,
875  * later add real-time one
876  */
877 static inline void
878 estimate_stream_rto(struct tle_tcp_stream *s, uint32_t tms)
879 {
880         uint32_t rtt;
881
882         if (s->tcb.so.ts.ecr) {
883                 rtt = tms - s->tcb.so.ts.ecr;
884                 rto_estimate(&s->tcb, rtt);
885         } else
886                 s->tcb.snd.rto = TCP_RTO_DEFAULT;
887 }
888
889 /*
890  * helper function, prepares a new accept stream.
891  */
892 static inline int
893 accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
894         struct tle_tcp_stream *cs, const union tle_tcp_tsopt *to,
895         uint32_t tms, const union pkt_info *pi, const union seg_info *si)
896 {
897         int32_t rc;
898
899         /* some TX still pending for that stream. */
900         if (TCP_STREAM_TX_PENDING(cs))
901                 return -EAGAIN;
902
903         /* setup L4 ports and L3 addresses fields. */
904         cs->s.port.raw = pi->port.raw;
905         cs->s.pmsk.raw = UINT32_MAX;
906
907         if (pi->tf.type == TLE_V4) {
908                 cs->s.ipv4.addr = pi->addr4;
909                 cs->s.ipv4.mask.src = INADDR_NONE;
910                 cs->s.ipv4.mask.dst = INADDR_NONE;
911         } else if (pi->tf.type == TLE_V6) {
912                 cs->s.ipv6.addr = *pi->addr6;
913                 rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
914                         sizeof(cs->s.ipv6.mask.src));
915                 rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
916                         sizeof(cs->s.ipv6.mask.dst));
917         }
918
919         /* setup TCB */
920         sync_fill_tcb(&cs->tcb, si, to);
921         cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
922
923         estimate_stream_rto(cs, tms);
924
925         /* copy streams type & flags. */
926         cs->s.type = ps->s.type;
927         cs->flags = ps->flags;
928
929         /* retrive and cache destination information. */
930         rc = stream_fill_dest(cs);
931         if (rc != 0)
932                 return rc;
933
934         /* update snd.mss with SMSS value */
935         cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
936
937         /* setup congestion variables */
938         cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss, ps->tcb.snd.cwnd);
939         cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
940         cs->tcb.snd.rto_tw = ps->tcb.snd.rto_tw;
941
942         cs->tcb.state = TLE_TCP_ST_ESTABLISHED;
943
944         /* add stream to the table */
945         cs->ste = stbl_add_stream(st, pi, cs);
946         if (cs->ste == NULL)
947                 return -ENOBUFS;
948
949         cs->tcb.uop |= TLE_TCP_OP_ACCEPT;
950         tcp_stream_up(cs);
951         return 0;
952 }
953
954
955 /*
956  * ACK for new connection request arrived.
957  * Check that the packet meets all conditions and try to open a new stream.
958  * returns:
959  * < 0  - invalid packet
960  * == 0 - packet is valid and new stream was opened for it.
961  * > 0  - packet is valid, but failed to open new stream.
962  */
963 static inline int
964 rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
965         const union pkt_info *pi, union seg_info *si,
966         uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
967 {
968         int32_t rc;
969         struct tle_ctx *ctx;
970         struct tle_stream *ts;
971         struct tle_tcp_stream *cs;
972         union tle_tcp_tsopt to;
973
974         *csp = NULL;
975
976         if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
977                 return -EINVAL;
978
979         ctx = s->s.ctx;
980         rc = restore_syn_opt(si, &to, pi, tms, mb, ctx->prm.hash_alg,
981                                 &ctx->prm.secret_key);
982         if (rc < 0)
983                 return rc;
984
985         /* allocate new stream */
986         cs = tcp_stream_get(ctx, 0);
987         if (cs == NULL)
988                 return ENFILE;
989
990         /* prepare stream to handle new connection */
991         if (accept_prep_stream(s, st, cs, &to, tms, pi, si) == 0) {
992
993                 /* put new stream in the accept queue */
994                 ts = &cs->s;
995                 if (_rte_ring_enqueue_burst(s->rx.q,
996                                 (void * const *)&ts, 1) == 1) {
997                         *csp = cs;
998                         return 0;
999                 }
1000
1001                 /* cleanup on failure */
1002                 tcp_stream_down(cs);
1003                 stbl_del_stream(st, cs->ste, cs, 0);
1004                 cs->ste = NULL;
1005         }
1006
1007         tcp_stream_reset(ctx, cs);
1008         return ENOBUFS;
1009 }
1010
1011 static inline int
1012 data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf **mb, uint32_t hlen,
1013         uint32_t *seqn, uint32_t *plen)
1014 {
1015         uint32_t len, n, seq;
1016
1017         seq = *seqn;
1018         len = *plen;
1019
1020         rte_pktmbuf_adj(*mb, hlen);
1021         if (len == 0)
1022                 return -ENODATA;
1023         /* cut off the start of the packet */
1024         else if (tcp_seq_lt(seq, tcb->rcv.nxt)) {
1025                 n = tcb->rcv.nxt - seq;
1026                 if (n >= len)
1027                         return -ENODATA;
1028
1029                 *mb = _rte_pktmbuf_adj(*mb, n);
1030                 *seqn = seq + n;
1031                 *plen = len - n;
1032         }
1033
1034         return 0;
1035 }
1036
1037 static inline uint32_t
1038 rx_ackdata(struct tle_tcp_stream *s, uint32_t ack)
1039 {
1040         uint32_t k, n;
1041
1042         n = ack - (uint32_t)s->tcb.snd.una;
1043
1044         /* some more data was acked. */
1045         if (n != 0) {
1046
1047                 /* advance SND.UNA and free related packets. */
1048                 k = rte_ring_free_count(s->tx.q);
1049                 free_una_data(s, n);
1050
1051                 /* mark the stream as available for writing */
1052                 if (rte_ring_free_count(s->tx.q) != 0) {
1053                         if (s->tx.ev != NULL)
1054                                 tle_event_raise(s->tx.ev);
1055                         else if (k == 0 && s->tx.cb.func != NULL)
1056                                 s->tx.cb.func(s->tx.cb.data, &s->s);
1057                 }
1058         }
1059
1060         return n;
1061 }
1062
1063 static void
1064 stream_timewait(struct tle_tcp_stream *s, uint32_t rto)
1065 {
1066         if (rto != 0) {
1067                 s->tcb.state = TLE_TCP_ST_TIME_WAIT;
1068                 s->tcb.snd.rto = rto;
1069                 timer_reset(s);
1070         } else
1071                 stream_term(s);
1072 }
1073
1074 static void
1075 rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
1076 {
1077         uint32_t state;
1078         int32_t ackfin;
1079
1080         s->tcb.rcv.nxt += 1;
1081         s->err.rev |= TLE_TCP_REV_FIN;
1082
1083         ackfin = (s->tcb.snd.una == s->tcb.snd.fss);
1084         state = s->tcb.state;
1085
1086         if (state == TLE_TCP_ST_ESTABLISHED) {
1087                 s->tcb.state = TLE_TCP_ST_CLOSE_WAIT;
1088                 /* raise err.ev & err.cb */
1089                 if (s->err.ev != NULL)
1090                         tle_event_raise(s->err.ev);
1091                 else if (s->err.cb.func != NULL)
1092                         s->err.cb.func(s->err.cb.data, &s->s);
1093         } else if (state == TLE_TCP_ST_FIN_WAIT_1 ||
1094                         state == TLE_TCP_ST_CLOSING) {
1095                 rsp->flags |= TCP_FLAG_ACK;
1096                 if (ackfin != 0)
1097                         stream_timewait(s, s->tcb.snd.rto_tw);
1098                 else
1099                         s->tcb.state = TLE_TCP_ST_CLOSING;
1100         } else if (state == TLE_TCP_ST_FIN_WAIT_2) {
1101                 rsp->flags |= TCP_FLAG_ACK;
1102                 stream_timewait(s, s->tcb.snd.rto_tw);
1103         } else if (state == TLE_TCP_ST_LAST_ACK && ackfin != 0) {
1104                 stream_term(s);
1105         }
1106 }
1107
1108 /*
1109  * FIN process for ESTABLISHED state
1110  * returns:
1111  * 0 < - error occurred
1112  * 0 - FIN was processed OK, and mbuf can be free/reused.
1113  * 0 > - FIN was processed OK and mbuf can't be free/reused.
1114  */
1115 static inline int
1116 rx_fin(struct tle_tcp_stream *s, uint32_t state,
1117         const union seg_info *si, struct rte_mbuf *mb,
1118         struct resp_info *rsp)
1119 {
1120         uint32_t hlen, plen, seq;
1121         int32_t ret;
1122         union tle_tcp_tsopt ts;
1123
1124         hlen = PKT_L234_HLEN(mb);
1125         plen = mb->pkt_len - hlen;
1126         seq = si->seq;
1127
1128         ts = rx_tms_opt(&s->tcb, mb);
1129         ret = rx_check_seqack(&s->tcb, seq, si->ack, plen, ts);
1130         if (ret != 0)
1131                 return ret;
1132
1133         if (state < TLE_TCP_ST_ESTABLISHED)
1134                 return -EINVAL;
1135
1136         if (plen != 0) {
1137
1138                 ret = data_pkt_adjust(&s->tcb, &mb, hlen, &seq, &plen);
1139                 if (ret != 0)
1140                         return ret;
1141                 if (rx_data_enqueue(s, seq, plen, &mb, 1) != 1)
1142                         return -ENOBUFS;
1143         }
1144
1145         /*
1146          * fast-path: all data & FIN was already sent out
1147          * and now is acknowledged.
1148          */
1149         if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1150                         si->ack == (uint32_t)s->tcb.snd.nxt) {
1151                 s->tcb.snd.una = s->tcb.snd.fss;
1152                 empty_tq(s);
1153         /* conventional ACK processiing */
1154         } else
1155                 rx_ackdata(s, si->ack);
1156
1157         /* some fragments still missing */
1158         if (seq + plen != s->tcb.rcv.nxt) {
1159                 s->tcb.rcv.frs.seq = seq + plen;
1160                 s->tcb.rcv.frs.on = 1;
1161         } else
1162                 rx_fin_state(s, rsp);
1163
1164         return plen;
1165 }
1166
1167 static inline int
1168 rx_rst(struct tle_tcp_stream *s, uint32_t state, uint32_t flags,
1169         const union seg_info *si)
1170 {
1171         int32_t rc;
1172
1173         /*
1174          * RFC 793: In all states except SYN-SENT, all reset (RST) segments
1175          * are validated by checking their SEQ-fields.
1176          * A reset is valid if its sequence number is in the window.
1177          * In the SYN-SENT state (a RST received in response to an initial SYN),
1178          * the RST is acceptable if the ACK field acknowledges the SYN.
1179          */
1180         if (state == TLE_TCP_ST_SYN_SENT) {
1181                 rc = ((flags & TCP_FLAG_ACK) == 0 ||
1182                                 si->ack != s->tcb.snd.nxt) ?
1183                         -ERANGE : 0;
1184         }
1185
1186         else
1187                 rc = check_seqn(&s->tcb, si->seq, 0);
1188
1189         if (rc == 0) {
1190                 s->err.rev |= TLE_TCP_REV_RST;
1191                 stream_term(s);
1192         }
1193
1194         return rc;
1195 }
1196
1197 /*
1198  *  check do we have FIN  that was received out-of-order.
1199  *  if yes, try to process it now.
1200  */
1201 static inline void
1202 rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp)
1203 {
1204         if (s->tcb.rcv.frs.on != 0 && s->tcb.rcv.nxt == s->tcb.rcv.frs.seq)
1205                 rx_fin_state(s, rsp);
1206 }
1207
1208 static inline void
1209 dack_info_init(struct dack_info *tack, const struct tcb *tcb)
1210 {
1211         static const struct dack_info zero_dack;
1212
1213         tack[0] = zero_dack;
1214         tack->ack = tcb->snd.una;
1215         tack->segs.dup = tcb->rcv.dupack;
1216         tack->wu.raw = tcb->snd.wu.raw;
1217         tack->wnd = tcb->snd.wnd >> tcb->snd.wscale;
1218 }
1219
1220 static inline void
1221 ack_window_update(struct tcb *tcb, const struct dack_info *tack)
1222 {
1223         tcb->snd.wu.raw = tack->wu.raw;
1224         tcb->snd.wnd = tack->wnd << tcb->snd.wscale;
1225 }
1226
1227 static inline void
1228 ack_cwnd_update(struct tcb *tcb, uint32_t acked, const struct dack_info *tack)
1229 {
1230         uint32_t n;
1231
1232         n = tack->segs.ack * tcb->snd.mss;
1233
1234         /* slow start phase, RFC 5681 3.1 (2)  */
1235         if (tcb->snd.cwnd < tcb->snd.ssthresh)
1236                 tcb->snd.cwnd += RTE_MIN(acked, n);
1237         /* congestion avoidance phase, RFC 5681 3.1 (3) */
1238         else
1239                 tcb->snd.cwnd += RTE_MAX(1U, n * tcb->snd.mss / tcb->snd.cwnd);
1240 }
1241
1242 static inline void
1243 rto_ssthresh_update(struct tcb *tcb)
1244 {
1245         uint32_t k, n;
1246
1247         /* RFC 5681 3.1 (4)  */
1248         n = (tcb->snd.nxt - tcb->snd.una) / 2;
1249         k = 2 * tcb->snd.mss;
1250         tcb->snd.ssthresh = RTE_MAX(n, k);
1251 }
1252
1253 static inline void
1254 rto_cwnd_update(struct tcb *tcb)
1255 {
1256
1257         if (tcb->snd.nb_retx == 0)
1258                 rto_ssthresh_update(tcb);
1259
1260         /*
1261          * RFC 5681 3.1: upon a timeout cwnd MUST be set to
1262          * no more than 1 full-sized segment.
1263          */
1264         tcb->snd.cwnd = tcb->snd.mss;
1265 }
1266
1267 static inline void
1268 ack_info_update(struct dack_info *tack, const union seg_info *si,
1269         int32_t badseq, uint32_t dlen, const union tle_tcp_tsopt ts)
1270 {
1271         if (badseq != 0) {
1272                 tack->segs.badseq++;
1273                 return;
1274         }
1275
1276         /* segnt with incoming data */
1277         tack->segs.data += (dlen != 0);
1278
1279         /* segment with newly acked data */
1280         if (tcp_seq_lt(tack->ack, si->ack)) {
1281                 tack->segs.dup = 0;
1282                 tack->segs.ack++;
1283                 tack->ack = si->ack;
1284                 tack->ts = ts;
1285
1286         /*
1287          * RFC 5681: An acknowledgment is considered a "duplicate" when:
1288          * (a) the receiver of the ACK has outstanding data
1289          * (b) the incoming acknowledgment carries no data
1290          * (c) the SYN and FIN bits are both off
1291          * (d) the acknowledgment number is equal to the TCP.UNA
1292          * (e) the advertised window in the incoming acknowledgment equals the
1293          * advertised window in the last incoming acknowledgment.
1294          *
1295          * Here will have only to check only for (b),(d),(e).
1296          * (a) will be checked later for the whole bulk of packets,
1297          * (c) should never happen here.
1298          */
1299         } else if (dlen == 0 && si->wnd == tack->wnd && ++tack->segs.dup == 3) {
1300                 tack->dup3.seg = tack->segs.ack + 1;
1301                 tack->dup3.ack = tack->ack;
1302         }
1303
1304         /*
1305          * RFC 793:
1306          * If SND.UNA < SEG.ACK =< SND.NXT, the send window should be
1307          * updated.  If (SND.WL1 < SEG.SEQ or (SND.WL1 = SEG.SEQ and
1308          * SND.WL2 =< SEG.ACK)), set SND.WND <- SEG.WND, set
1309          * SND.WL1 <- SEG.SEQ, and set SND.WL2 <- SEG.ACK.
1310          */
1311         if (tcp_seq_lt(tack->wu.wl1, si->seq) ||
1312                         (si->seq == tack->wu.wl1 &&
1313                         tcp_seq_leq(tack->wu.wl2, si->ack))) {
1314
1315                 tack->wu.wl1 = si->seq;
1316                 tack->wu.wl2 = si->ack;
1317                 tack->wnd = si->wnd;
1318         }
1319 }
1320
1321 static inline uint32_t
1322 rx_data_ack(struct tle_tcp_stream *s, struct dack_info *tack,
1323         const union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[],
1324         int32_t rc[], uint32_t num)
1325 {
1326         uint32_t i, j, k, n, t;
1327         uint32_t hlen, plen, seq, tlen;
1328         int32_t ret;
1329         union tle_tcp_tsopt ts;
1330
1331         k = 0;
1332         for (i = 0; i != num; i = j) {
1333
1334                 hlen = PKT_L234_HLEN(mb[i]);
1335                 plen = mb[i]->pkt_len - hlen;
1336                 seq = si[i].seq;
1337
1338                 ts = rx_tms_opt(&s->tcb, mb[i]);
1339                 ret = rx_check_seqack(&s->tcb, seq, si[i].ack, plen, ts);
1340
1341                 /* account segment received */
1342                 ack_info_update(tack, &si[i], ret != 0, plen, ts);
1343
1344                 if (ret == 0) {
1345                         /* skip duplicate data, if any */
1346                         ret = data_pkt_adjust(&s->tcb, &mb[i], hlen,
1347                                 &seq, &plen);
1348                 }
1349
1350                 j = i + 1;
1351                 if (ret != 0) {
1352                         rp[k] = mb[i];
1353                         rc[k] = -ret;
1354                         k++;
1355                         continue;
1356                 }
1357
1358                 /* group sequential packets together. */
1359                 for (tlen = plen; j != num; tlen += plen, j++) {
1360
1361                         hlen = PKT_L234_HLEN(mb[j]);
1362                         plen = mb[j]->pkt_len - hlen;
1363
1364                         /* not consecutive packet */
1365                         if (plen == 0 || seq + tlen != si[j].seq)
1366                                 break;
1367
1368                         /* check SEQ/ACK */
1369                         ts = rx_tms_opt(&s->tcb, mb[j]);
1370                         ret = rx_check_seqack(&s->tcb, si[j].seq, si[j].ack,
1371                                 plen, ts);
1372
1373                         if (ret != 0)
1374                                 break;
1375
1376                         /* account for segment received */
1377                         ack_info_update(tack, &si[j], ret != 0, plen, ts);
1378
1379                         rte_pktmbuf_adj(mb[j], hlen);
1380                 }
1381
1382                 n = j - i;
1383
1384                 /* account for OFO data */
1385                 if (seq != s->tcb.rcv.nxt)
1386                         tack->segs.ofo += n;
1387
1388                 /* enqueue packets */
1389                 t = rx_data_enqueue(s, seq, tlen, mb + i, n);
1390
1391                 /* if we are out of space in stream recv buffer. */
1392                 for (; t != n; t++) {
1393                         rp[k] = mb[i + t];
1394                         rc[k] = -ENOBUFS;
1395                         k++;
1396                 }
1397         }
1398
1399         return num - k;
1400 }
1401
1402 static inline void
1403 start_fast_retransmit(struct tle_tcp_stream *s)
1404 {
1405         struct tcb *tcb;
1406
1407         tcb = &s->tcb;
1408
1409         /* RFC 6582 3.2.2 */
1410         tcb->snd.rcvr = tcb->snd.nxt;
1411         tcb->snd.fastack = 1;
1412
1413         /* RFC 5681 3.2.2 */
1414         rto_ssthresh_update(tcb);
1415
1416         /* RFC 5681 3.2.3 */
1417         tcp_txq_rst_nxt_head(s);
1418         tcb->snd.nxt = tcb->snd.una;
1419         tcb->snd.cwnd = tcb->snd.ssthresh + 3 * tcb->snd.mss;
1420 }
1421
1422 static inline void
1423 stop_fast_retransmit(struct tle_tcp_stream *s)
1424 {
1425         struct tcb *tcb;
1426         uint32_t n;
1427
1428         tcb = &s->tcb;
1429         n = tcb->snd.nxt - tcb->snd.una;
1430         tcb->snd.cwnd = RTE_MIN(tcb->snd.ssthresh,
1431                 RTE_MAX(n, tcb->snd.mss) + tcb->snd.mss);
1432         tcb->snd.fastack = 0;
1433 }
1434
1435 static inline int
1436 in_fast_retransmit(struct tle_tcp_stream *s, uint32_t ack_len, uint32_t ack_num,
1437         uint32_t dup_num)
1438 {
1439         uint32_t n;
1440         struct tcb *tcb;
1441
1442         tcb = &s->tcb;
1443
1444         /* RFC 5682 3.2.3 partial ACK */
1445         if (ack_len != 0) {
1446
1447                 n = ack_num * tcb->snd.mss;
1448                 if (ack_len >= n)
1449                         tcb->snd.cwnd -= ack_len - n;
1450                 else
1451                         tcb->snd.cwnd -= ack_len % tcb->snd.mss;
1452
1453                 /*
1454                  * For the first partial ACK that arrives
1455                  * during fast recovery, also reset the
1456                  * retransmit timer.
1457                  */
1458                 if (tcb->snd.fastack == 1)
1459                         timer_reset(s);
1460
1461                 tcb->snd.fastack += ack_num;
1462                 return 1;
1463
1464         /* RFC 5681 3.2.4 */
1465         } else if (dup_num > 3) {
1466                 s->tcb.snd.cwnd += (dup_num - 3) * tcb->snd.mss;
1467                 return 1;
1468         }
1469
1470         return 0;
1471 }
1472
1473 static inline int
1474 process_ack(struct tle_tcp_stream *s, uint32_t acked,
1475         const struct dack_info *tack)
1476 {
1477         int32_t send;
1478
1479         send = 0;
1480
1481         /* normal mode */
1482         if (s->tcb.snd.fastack == 0) {
1483
1484                 send = 1;
1485
1486                 /* RFC 6582 3.2.2 switch to fast retransmit mode */
1487                 if (tack->dup3.seg != 0 && s->tcb.snd.una != s->tcb.snd.nxt &&
1488                                 s->tcb.snd.una >= s->tcb.snd.rcvr) {
1489
1490                         start_fast_retransmit(s);
1491                         in_fast_retransmit(s,
1492                                 tack->ack - tack->dup3.ack,
1493                                 tack->segs.ack - tack->dup3.seg - 1,
1494                                 tack->segs.dup);
1495
1496                 /* remain in normal mode */
1497                 } else if (acked != 0) {
1498                         ack_cwnd_update(&s->tcb, acked, tack);
1499                         timer_stop(s);
1500                 }
1501
1502         /* fast retransmit mode */
1503         } else {
1504
1505                 /* remain in fast retransmit mode */
1506                 if (s->tcb.snd.una < s->tcb.snd.rcvr) {
1507
1508                         send = in_fast_retransmit(s, acked, tack->segs.ack,
1509                                 tack->segs.dup);
1510                 } else {
1511                         /* RFC 5682 3.2.3 full ACK */
1512                         stop_fast_retransmit(s);
1513                         timer_stop(s);
1514
1515                         /* if we have another series of dup ACKs */
1516                         if (tack->dup3.seg != 0 &&
1517                                         s->tcb.snd.una != s->tcb.snd.nxt &&
1518                                         tcp_seq_leq((uint32_t)s->tcb.snd.rcvr,
1519                                         tack->dup3.ack)) {
1520
1521                                 /* restart fast retransmit again. */
1522                                 start_fast_retransmit(s);
1523                                 send = in_fast_retransmit(s,
1524                                         tack->ack - tack->dup3.ack,
1525                                         tack->segs.ack - tack->dup3.seg - 1,
1526                                         tack->segs.dup);
1527                         }
1528                 }
1529         }
1530
1531         return send;
1532 }
1533
1534 /*
1535  * our FIN was acked, stop rto timer, change stream state,
1536  * and possibly close the stream.
1537  */
1538 static inline void
1539 rx_ackfin(struct tle_tcp_stream *s)
1540 {
1541         uint32_t state;
1542
1543         s->tcb.snd.una = s->tcb.snd.fss;
1544         empty_tq(s);
1545
1546         state = s->tcb.state;
1547         if (state == TLE_TCP_ST_LAST_ACK)
1548                 stream_term(s);
1549         else if (state == TLE_TCP_ST_FIN_WAIT_1) {
1550                 timer_stop(s);
1551                 s->tcb.state = TLE_TCP_ST_FIN_WAIT_2;
1552         } else if (state == TLE_TCP_ST_CLOSING) {
1553                 stream_timewait(s, s->tcb.snd.rto_tw);
1554         }
1555 }
1556
1557 static inline void
1558 rx_process_ack(struct tle_tcp_stream *s, uint32_t ts,
1559         const struct dack_info *tack)
1560 {
1561         int32_t send;
1562         uint32_t n;
1563
1564         s->tcb.rcv.dupack = tack->segs.dup;
1565
1566         n = rx_ackdata(s, tack->ack);
1567         send = process_ack(s, n, tack);
1568
1569         /* try to send more data. */
1570         if ((n != 0 || send != 0) && tcp_txq_nxt_cnt(s) != 0)
1571                 txs_enqueue(s->s.ctx, s);
1572
1573         /* restart RTO timer. */
1574         if (s->tcb.snd.nxt != s->tcb.snd.una)
1575                 timer_start(s);
1576
1577         /* update rto, if fresh packet is here then calculate rtt */
1578         if (tack->ts.ecr != 0)
1579                 rto_estimate(&s->tcb, ts - tack->ts.ecr);
1580 }
1581
1582 /*
1583  * process <SYN,ACK>
1584  * returns negative value on failure, or zero on success.
1585  */
1586 static inline int
1587 rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
1588         const union seg_info *si, struct rte_mbuf *mb,
1589         struct resp_info *rsp)
1590 {
1591         struct tle_tcp_syn_opts so;
1592         struct rte_tcp_hdr *th;
1593
1594         if (state != TLE_TCP_ST_SYN_SENT)
1595                 return -EINVAL;
1596
1597         /*
1598          * RFC 793 3.9: in the SYN-SENT state
1599          * If SEG.ACK =< ISS, or SEG.ACK > SND.NXT, send a reset
1600          * <SEQ=SEG.ACK><CTL=RST>
1601          * and discard the segment.
1602          * The connection remains in the same state.
1603          */
1604         if (si->ack != (uint32_t)s->tcb.snd.nxt) {
1605                 send_rst(s, si->ack);
1606                 return 0;
1607         }
1608
1609         th = rte_pktmbuf_mtod_offset(mb, struct rte_tcp_hdr *,
1610                 mb->l2_len + mb->l3_len);
1611         get_syn_opts(&so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th));
1612
1613         s->tcb.so = so;
1614
1615         s->tcb.snd.una = s->tcb.snd.nxt;
1616         s->tcb.snd.mss = calc_smss(so.mss, &s->tx.dst);
1617         s->tcb.snd.wnd = si->wnd << so.wscale;
1618         s->tcb.snd.wu.wl1 = si->seq;
1619         s->tcb.snd.wu.wl2 = si->ack;
1620         s->tcb.snd.wscale = so.wscale;
1621
1622         /* setup congestion variables */
1623         s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd);
1624         s->tcb.snd.ssthresh = s->tcb.snd.wnd;
1625
1626         s->tcb.rcv.ts = so.ts.val;
1627         s->tcb.rcv.irs = si->seq;
1628         s->tcb.rcv.nxt = si->seq + 1;
1629
1630         /* if peer doesn't support WSCALE opt, recalculate RCV.WND */
1631         s->tcb.rcv.wscale = (so.wscale == TCP_WSCALE_NONE) ?
1632                 TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
1633         s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
1634
1635         /* calculate initial rto */
1636         rto_estimate(&s->tcb, ts - s->tcb.snd.ts);
1637
1638         rsp->flags |= TCP_FLAG_ACK;
1639
1640         timer_stop(s);
1641         s->tcb.state = TLE_TCP_ST_ESTABLISHED;
1642         rte_smp_wmb();
1643
1644         if (s->tx.ev != NULL)
1645                 tle_event_raise(s->tx.ev);
1646         else if (s->tx.cb.func != NULL)
1647                 s->tx.cb.func(s->tx.cb.data, &s->s);
1648
1649         return 0;
1650 }
1651
1652 static inline uint32_t
1653 rx_stream(struct tle_tcp_stream *s, uint32_t ts,
1654         const union pkt_info *pi, const union seg_info si[],
1655         struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1656         uint32_t num)
1657 {
1658         uint32_t i, k, n, state;
1659         int32_t ret;
1660         struct resp_info rsp;
1661         struct dack_info tack;
1662
1663         k = 0;
1664         rsp.flags = 0;
1665
1666         state = s->tcb.state;
1667
1668         /*
1669          * first check for the states/flags where we don't
1670          * expect groups of packets.
1671          */
1672
1673         /* process RST */
1674         if ((pi->tf.flags & TCP_FLAG_RST) != 0) {
1675                 for (i = 0;
1676                                 i != num &&
1677                                 rx_rst(s, state, pi->tf.flags, &si[i]);
1678                                 i++)
1679                         ;
1680                 i = 0;
1681
1682         /* RFC 793: if the ACK bit is off drop the segment and return */
1683         } else if ((pi->tf.flags & TCP_FLAG_ACK) == 0) {
1684                 i = 0;
1685         /*
1686          * first check for the states/flags where we don't
1687          * expect groups of packets.
1688          */
1689
1690         /* process <SYN,ACK> */
1691         } else if ((pi->tf.flags & TCP_FLAG_SYN) != 0) {
1692                 for (i = 0; i != num; i++) {
1693                         ret = rx_synack(s, ts, state, &si[i], mb[i], &rsp);
1694                         if (ret == 0)
1695                                 break;
1696
1697                         rc[k] = -ret;
1698                         rp[k] = mb[i];
1699                         k++;
1700                 }
1701
1702         /* process FIN */
1703         } else if ((pi->tf.flags & TCP_FLAG_FIN) != 0) {
1704                 ret = 0;
1705                 for (i = 0; i != num; i++) {
1706                         ret = rx_fin(s, state, &si[i], mb[i], &rsp);
1707                         if (ret >= 0)
1708                                 break;
1709
1710                         rc[k] = -ret;
1711                         rp[k] = mb[i];
1712                         k++;
1713                 }
1714                 i += (ret > 0);
1715
1716         /* normal data/ack packets */
1717         } else if (state >= TLE_TCP_ST_ESTABLISHED &&
1718                         state <= TLE_TCP_ST_LAST_ACK) {
1719
1720                 /* process incoming data packets. */
1721                 dack_info_init(&tack, &s->tcb);
1722                 n = rx_data_ack(s, &tack, si, mb, rp, rc, num);
1723
1724                 /* follow up actions based on aggregated information */
1725
1726                 /* update SND.WND */
1727                 ack_window_update(&s->tcb, &tack);
1728
1729                 /*
1730                  * fast-path: all data & FIN was already sent out
1731                  * and now is acknowledged.
1732                  */
1733                 if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1734                                 tack.ack == (uint32_t)s->tcb.snd.nxt)
1735                         rx_ackfin(s);
1736                 else
1737                         rx_process_ack(s, ts, &tack);
1738
1739                 /*
1740                  * send an immediate ACK if either:
1741                  * - received segment with invalid seq/ack number
1742                  * - received segment with OFO data
1743                  * - received segment with INO data and no TX is scheduled
1744                  *   for that stream.
1745                  */
1746                 if (tack.segs.badseq != 0 || tack.segs.ofo != 0 ||
1747                                 (tack.segs.data != 0 &&
1748                                 rte_atomic32_read(&s->tx.arm) == 0))
1749                         rsp.flags |= TCP_FLAG_ACK;
1750
1751                 rx_ofo_fin(s, &rsp);
1752
1753                 k += num - n;
1754                 i = num;
1755
1756         /* unhandled state, drop all packets. */
1757         } else
1758                 i = 0;
1759
1760         /* we have a response packet to send. */
1761         if (rsp.flags != 0) {
1762                 send_ack(s, ts, rsp.flags);
1763
1764                 /* start the timer for FIN packet */
1765                 if ((rsp.flags & TCP_FLAG_FIN) != 0)
1766                         timer_reset(s);
1767         }
1768
1769         /* unprocessed packets */
1770         for (; i != num; i++, k++) {
1771                 rc[k] = ENODATA;
1772                 rp[k] = mb[i];
1773         }
1774
1775         return num - k;
1776 }
1777
1778 static inline uint32_t
1779 rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
1780         const union pkt_info *pi, const union seg_info si[],
1781         struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1782         uint32_t num)
1783 {
1784         uint32_t i;
1785
1786         if (tcp_stream_acquire(s) > 0) {
1787                 i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1788                 tcp_stream_release(s);
1789                 return i;
1790         }
1791
1792         for (i = 0; i != num; i++) {
1793                 rc[i] = ENOENT;
1794                 rp[i] = mb[i];
1795         }
1796         return 0;
1797 }
1798
1799 static inline uint32_t
1800 rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
1801         const union pkt_info pi[], union seg_info si[],
1802         struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1803         uint32_t num)
1804 {
1805         struct tle_tcp_stream *cs, *s;
1806         uint32_t i, k, n, state;
1807         int32_t ret;
1808
1809         s = rx_obtain_stream(dev, st, &pi[0], type);
1810         if (s == NULL) {
1811                 for (i = 0; i != num; i++) {
1812                         rc[i] = ENOENT;
1813                         rp[i] = mb[i];
1814                 }
1815                 return 0;
1816         }
1817
1818         k = 0;
1819         state = s->tcb.state;
1820
1821         if (state == TLE_TCP_ST_LISTEN) {
1822
1823                 /* one connection per flow */
1824                 cs = NULL;
1825                 ret = -EINVAL;
1826                 for (i = 0; i != num; i++) {
1827
1828                         ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs);
1829
1830                         /* valid packet encountered */
1831                         if (ret >= 0)
1832                                 break;
1833
1834                         /* invalid packet, keep trying to find a proper one */
1835                         rc[k] = -ret;
1836                         rp[k] = mb[i];
1837                         k++;
1838                 }
1839
1840                 /* packet is valid, but we are out of streams to serve it */
1841                 if (ret > 0) {
1842                         for (; i != num; i++, k++) {
1843                                 rc[k] = ret;
1844                                 rp[k] = mb[i];
1845                         }
1846                 /* new stream is accepted */
1847                 } else if (ret == 0) {
1848
1849                         /* inform listen stream about new connections */
1850                         if (s->rx.ev != NULL)
1851                                 tle_event_raise(s->rx.ev);
1852                         else if (s->rx.cb.func != NULL &&
1853                                         rte_ring_count(s->rx.q) == 1)
1854                                 s->rx.cb.func(s->rx.cb.data, &s->s);
1855
1856                         /* if there is no data, drop current packet */
1857                         if (PKT_L4_PLEN(mb[i]) == 0) {
1858                                 rc[k] = ENODATA;
1859                                 rp[k++] = mb[i++];
1860                         }
1861
1862                         /*  process remaining packets for that stream */
1863                         if (num != i) {
1864                                 n = rx_new_stream(cs, ts, pi + i, si + i,
1865                                         mb + i, rp + k, rc + k, num - i);
1866                                 k += num - n - i;
1867                         }
1868                 }
1869
1870         } else {
1871                 i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1872                 k = num - i;
1873         }
1874
1875         tcp_stream_release(s);
1876         return num - k;
1877 }
1878
1879
1880 static inline uint32_t
1881 rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
1882         const union pkt_info pi[], const union seg_info si[],
1883         struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1884         uint32_t num)
1885 {
1886         struct tle_tcp_stream *s;
1887         uint32_t i, k;
1888         int32_t ret;
1889
1890         s = rx_obtain_listen_stream(dev, &pi[0], type);
1891         if (s == NULL) {
1892                 for (i = 0; i != num; i++) {
1893                         rc[i] = ENOENT;
1894                         rp[i] = mb[i];
1895                 }
1896                 return 0;
1897         }
1898
1899         k = 0;
1900         for (i = 0; i != num; i++) {
1901
1902                 /* check that this remote is allowed to connect */
1903                 if (rx_check_stream(s, &pi[i]) != 0)
1904                         ret = -ENOENT;
1905                 else
1906                         /* syncokie: reply with <SYN,ACK> */
1907                         ret = sync_ack(s, &pi[i], &si[i], ts, mb[i]);
1908
1909                 if (ret != 0) {
1910                         rc[k] = -ret;
1911                         rp[k] = mb[i];
1912                         k++;
1913                 }
1914         }
1915
1916         tcp_stream_release(s);
1917         return num - k;
1918 }
1919
1920 uint16_t
1921 tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
1922         struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
1923 {
1924         struct stbl *st;
1925         struct tle_ctx *ctx;
1926         uint32_t i, j, k, mt, n, t, ts;
1927         union pkt_info pi[num];
1928         union seg_info si[num];
1929         union {
1930                 uint8_t t[TLE_VNUM];
1931                 uint32_t raw;
1932         } stu;
1933
1934         ctx = dev->ctx;
1935         ts = tcp_get_tms(ctx->cycles_ms_shift);
1936         st = CTX_TCP_STLB(ctx);
1937         mt = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0);
1938
1939         stu.raw = 0;
1940
1941         /* extract packet info and check the L3/L4 csums */
1942         for (i = 0; i != num; i++) {
1943
1944                 get_pkt_info(pkt[i], &pi[i], &si[i]);
1945
1946                 t = pi[i].tf.type;
1947                 pi[i].csf = check_pkt_csum(pkt[i], pi[i].csf, t, IPPROTO_TCP);
1948                 stu.t[t] = mt;
1949         }
1950
1951         if (stu.t[TLE_V4] != 0)
1952                 stbl_lock(st, TLE_V4);
1953         if (stu.t[TLE_V6] != 0)
1954                 stbl_lock(st, TLE_V6);
1955
1956         k = 0;
1957         for (i = 0; i != num; i += j) {
1958
1959                 t = pi[i].tf.type;
1960
1961                 /*basic checks for incoming packet */
1962                 if (t >= TLE_VNUM || pi[i].csf != 0 || dev->dp[t] == NULL) {
1963                         rc[k] = EINVAL;
1964                         rp[k] = pkt[i];
1965                         j = 1;
1966                         k++;
1967                 /* process input SYN packets */
1968                 } else if (pi[i].tf.flags == TCP_FLAG_SYN) {
1969                         j = pkt_info_bulk_syneq(pi + i, num - i);
1970                         n = rx_syn(dev, t, ts, pi + i, si + i, pkt + i,
1971                                 rp + k, rc + k, j);
1972                         k += j - n;
1973                 } else {
1974                         j = pkt_info_bulk_eq(pi + i, num - i);
1975                         n = rx_postsyn(dev, st, t, ts, pi + i, si + i, pkt + i,
1976                                 rp + k, rc + k, j);
1977                         k += j - n;
1978                 }
1979         }
1980
1981         if (stu.t[TLE_V4] != 0)
1982                 stbl_unlock(st, TLE_V4);
1983         if (stu.t[TLE_V6] != 0)
1984                 stbl_unlock(st, TLE_V6);
1985
1986         return num - k;
1987 }
1988
1989 uint16_t
1990 tle_tcp_stream_rx_bulk(struct tle_stream *ts, struct rte_mbuf *pkt[],
1991         struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
1992 {
1993         struct tle_ctx *ctx;
1994         struct tle_tcp_stream *s;
1995         uint32_t i, j, k, n, t, tms;
1996         union pkt_info pi[num];
1997         union seg_info si[num];
1998
1999         ctx = ts->ctx;
2000         tms = tcp_get_tms(ctx->cycles_ms_shift);
2001
2002         s = rx_acquire_stream(ts);
2003         if (s == NULL) {
2004                 for (i = 0; i != num; i++) {
2005                         rc[i] = ENOENT;
2006                         rp[i] = pkt[i];
2007                 }
2008                 return 0;
2009         }
2010
2011         tms = tcp_stream_adjust_tms(s, tms);
2012
2013         /* extract packet info and check the L3/L4 csums */
2014         for (i = 0; i != num; i++) {
2015                 get_pkt_info(pkt[i], &pi[i], &si[i]);
2016                 pi[i].csf = check_pkt_csum(pkt[i], pi[i].csf, pi[i].tf.type,
2017                         IPPROTO_TCP);
2018         }
2019
2020         k = 0;
2021         for (i = 0; i != num; i += j) {
2022
2023                 t = pi[i].tf.type;
2024                 j = 1;
2025
2026                 /*basic checks for incoming packet */
2027                 if (t != ts->type || pi[i].csf != 0 ||
2028                                  rx_check_stream(s, pi + i) != 0) {
2029                         rc[k] = EINVAL;
2030                         rp[k] = pkt[i];
2031                         k++;
2032                         continue;
2033                 }
2034
2035                 j = pkt_info_bulk_eq(pi + i, num - i);
2036                 n = rx_stream(s, tms, pi + i, si + i, pkt + i,
2037                         rp + k, rc + k, j);
2038                 k += j - n;
2039         }
2040
2041         tcp_stream_release(s);
2042         return num - k;
2043 }
2044
2045 uint16_t
2046 tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
2047         uint32_t num)
2048 {
2049         uint32_t n;
2050         struct tle_tcp_stream *s;
2051         struct tle_memtank *mts;
2052
2053         s = TCP_STREAM(ts);
2054         n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num);
2055         if (n == 0)
2056                 return 0;
2057
2058         mts = CTX_TCP_MTS(ts->ctx);
2059
2060         /*
2061          * if we still have packets to read,
2062          * then rearm stream RX event.
2063          */
2064         if (n == num && rte_ring_count(s->rx.q) != 0) {
2065                 if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
2066                         tle_event_raise(s->rx.ev);
2067                 tcp_stream_release(s);
2068         }
2069
2070         tle_memtank_grow(mts);
2071         return n;
2072 }
2073
2074 uint16_t
2075 tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
2076 {
2077         uint32_t i, j, k, n;
2078         struct tle_drb *drb[num];
2079         struct tle_tcp_stream *s;
2080
2081         /* extract packets from device TX queue. */
2082
2083         k = num;
2084         n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
2085                 num, drb, &k);
2086
2087         if (n == 0)
2088                 return 0;
2089
2090         /* free empty drbs and notify related streams. */
2091
2092         for (i = 0; i != k; i = j) {
2093                 s = drb[i]->udata;
2094                 for (j = i + 1; j != k && s == drb[j]->udata; j++)
2095                         ;
2096                 stream_drb_free(s, drb + i, j - i);
2097         }
2098
2099         return n;
2100 }
2101
2102 static inline void
2103 stream_fill_pkt_info(const struct tle_tcp_stream *s, union pkt_info *pi)
2104 {
2105         if (s->s.type == TLE_V4)
2106                 pi->addr4 = s->s.ipv4.addr;
2107         else
2108                 pi->addr6 = &s->s.ipv6.addr;
2109
2110         pi->port = s->s.port;
2111         pi->tf.type = s->s.type;
2112 }
2113
2114 static int
2115 stream_fill_addr(struct tle_tcp_stream *s, const struct sockaddr *addr)
2116 {
2117         const struct sockaddr_in *in4;
2118         const struct sockaddr_in6 *in6;
2119         const struct tle_dev_param *prm;
2120         int32_t rc;
2121
2122         rc = 0;
2123         s->s.pmsk.raw = UINT32_MAX;
2124
2125         /* setup L4 src ports and src address fields. */
2126         if (s->s.type == TLE_V4) {
2127                 in4 = (const struct sockaddr_in *)addr;
2128                 if (in4->sin_addr.s_addr == INADDR_ANY || in4->sin_port == 0)
2129                         return -EINVAL;
2130
2131                 s->s.port.src = in4->sin_port;
2132                 s->s.ipv4.addr.src = in4->sin_addr.s_addr;
2133                 s->s.ipv4.mask.src = INADDR_NONE;
2134                 s->s.ipv4.mask.dst = INADDR_NONE;
2135
2136         } else if (s->s.type == TLE_V6) {
2137                 in6 = (const struct sockaddr_in6 *)addr;
2138                 if (memcmp(&in6->sin6_addr, &tle_ipv6_any,
2139                                 sizeof(tle_ipv6_any)) == 0 ||
2140                                 in6->sin6_port == 0)
2141                         return -EINVAL;
2142
2143                 s->s.port.src = in6->sin6_port;
2144                 rte_memcpy(&s->s.ipv6.addr.src, &in6->sin6_addr,
2145                         sizeof(s->s.ipv6.addr.src));
2146                 rte_memcpy(&s->s.ipv6.mask.src, &tle_ipv6_none,
2147                         sizeof(s->s.ipv6.mask.src));
2148                 rte_memcpy(&s->s.ipv6.mask.dst, &tle_ipv6_none,
2149                         sizeof(s->s.ipv6.mask.dst));
2150         }
2151
2152         /* setup the destination device. */
2153         rc = stream_fill_dest(s);
2154         if (rc != 0)
2155                 return rc;
2156
2157         /* setup L4 dst address from device param */
2158         prm = &s->tx.dst.dev->prm;
2159         if (s->s.type == TLE_V4) {
2160                 if (s->s.ipv4.addr.dst == INADDR_ANY)
2161                         s->s.ipv4.addr.dst = prm->local_addr4.s_addr;
2162         } else if (memcmp(&s->s.ipv6.addr.dst, &tle_ipv6_any,
2163                         sizeof(tle_ipv6_any)) == 0)
2164                 memcpy(&s->s.ipv6.addr.dst, &prm->local_addr6,
2165                         sizeof(s->s.ipv6.addr.dst));
2166
2167         return rc;
2168 }
2169
2170 static inline int
2171 tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
2172 {
2173         int32_t rc;
2174         uint32_t tms, seq;
2175         union pkt_info pi;
2176         struct stbl *st;
2177         struct stbl_entry *se;
2178
2179         /* fill stream address */
2180         rc = stream_fill_addr(s, addr);
2181         if (rc != 0)
2182                 return rc;
2183
2184         /* fill pkt info to generate seq.*/
2185         stream_fill_pkt_info(s, &pi);
2186
2187         tms = tcp_get_tms(s->s.ctx->cycles_ms_shift);
2188         s->tcb.so.ts.val = tms;
2189         s->tcb.so.ts.ecr = 0;
2190         s->tcb.so.wscale = TCP_WSCALE_DEFAULT;
2191         s->tcb.so.mss = calc_smss(s->tx.dst.mtu, &s->tx.dst);
2192
2193         /* note that rcv.nxt is 0 here for sync_gen_seq.*/
2194         seq = sync_gen_seq(&pi, s->tcb.rcv.nxt, tms, s->tcb.so.mss,
2195                                 s->s.ctx->prm.hash_alg,
2196                                 &s->s.ctx->prm.secret_key);
2197         s->tcb.snd.iss = seq;
2198         s->tcb.snd.rcvr = seq;
2199         s->tcb.snd.una = seq;
2200         s->tcb.snd.nxt = seq + 1;
2201         s->tcb.snd.rto = TCP_RTO_DEFAULT;
2202         s->tcb.snd.ts = tms;
2203
2204         s->tcb.rcv.mss = s->tcb.so.mss;
2205         s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT;
2206         s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
2207         s->tcb.rcv.ts = 0;
2208
2209         /* add the stream in stream table */
2210         st = CTX_TCP_STLB(s->s.ctx);
2211         se = stbl_add_stream_lock(st, s);
2212         if (se == NULL)
2213                 return -ENOBUFS;
2214         s->ste = se;
2215
2216         /* put stream into the to-send queue */
2217         txs_enqueue(s->s.ctx, s);
2218
2219         return 0;
2220 }
2221
2222 int
2223 tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
2224 {
2225         struct tle_tcp_stream *s;
2226         uint32_t type;
2227         int32_t rc;
2228
2229         if (ts == NULL || addr == NULL)
2230                 return -EINVAL;
2231
2232         s = TCP_STREAM(ts);
2233         type = s->s.type;
2234         if (type >= TLE_VNUM)
2235                 return -EINVAL;
2236
2237         if (tcp_stream_try_acquire(s) > 0) {
2238                 rc = rte_atomic16_cmpset(&s->tcb.state, TLE_TCP_ST_CLOSED,
2239                         TLE_TCP_ST_SYN_SENT);
2240                 rc = (rc == 0) ? -EDEADLK : 0;
2241         } else
2242                 rc = -EINVAL;
2243
2244         if (rc != 0) {
2245                 tcp_stream_release(s);
2246                 return rc;
2247         }
2248
2249         /* fill stream, prepare and transmit syn pkt */
2250         s->tcb.uop |= TLE_TCP_OP_CONNECT;
2251         rc = tx_syn(s, addr);
2252         tcp_stream_release(s);
2253
2254         /* error happened, do a cleanup */
2255         if (rc != 0)
2256                 tle_tcp_stream_close(ts);
2257
2258         return rc;
2259 }
2260
2261 /*
2262  * Helper function for tle_tcp_stream_establish().
2263  * updates stream's TCB.
2264  */
2265 static inline void
2266 tcb_establish(struct tle_tcp_stream *s, const struct tle_tcp_conn_info *ci)
2267 {
2268         uint32_t mss, tms;
2269
2270         tms = tcp_get_tms(s->s.ctx->cycles_ms_shift);
2271         mss = calc_smss(ci->so.mss, &s->tx.dst);
2272
2273         s->tcb.so = ci->so;
2274         fill_tcb_snd(&s->tcb, ci->ack, ci->seq, mss,
2275                 ci->wnd, ci->so.wscale, &ci->so.ts);
2276         fill_tcb_rcv(&s->tcb, ci->ack, ci->so.wscale, &ci->so.ts);
2277
2278         s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
2279
2280         /* setup congestion variables */
2281         s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd);
2282         s->tcb.snd.ssthresh = s->tcb.snd.wnd;
2283
2284         /* calculate and store real timestamp offset */
2285         if (ci->so.ts.raw != 0) {
2286                 s->ts_offset = tms - ci->so.ts.ecr;
2287                 tms -= s->ts_offset;
2288         }
2289
2290         estimate_stream_rto(s, tms);
2291 }
2292
2293 struct tle_stream *
2294 tle_tcp_stream_establish(struct tle_ctx *ctx,
2295         const struct tle_tcp_stream_param *prm,
2296         const struct tle_tcp_conn_info *ci, uint32_t flags)
2297 {
2298         int32_t rc;
2299         struct tle_tcp_stream *s;
2300         struct stbl *st;
2301
2302         if (ctx == NULL || prm == NULL || ci == NULL) {
2303                 rte_errno = -EINVAL;
2304                 return NULL;
2305         }
2306
2307         /* allocate new stream */
2308         s = tcp_stream_get(ctx, TLE_MTANK_ALLOC_CHUNK | TLE_MTANK_ALLOC_GROW);
2309         if (s == NULL) {
2310                 rte_errno = ENFILE;
2311                 return NULL;
2312         }
2313
2314         do {
2315                 s->tcb.uop |= TLE_TCP_OP_ESTABLISH;
2316
2317                 /* check and use stream addresses and parameters */
2318                 rc = tcp_stream_fill_prm(s, prm);
2319                 if (rc != 0)
2320                         break;
2321
2322                 /* retrieve and cache destination information. */
2323                 rc = stream_fill_dest(s);
2324                 if (rc != 0)
2325                         break;
2326
2327                 /* add the stream to the stream table */
2328                 if ((flags & TLE_TCP_STREAM_F_PRIVATE) == 0) {
2329                         st = CTX_TCP_STLB(s->s.ctx);
2330                         s->ste = stbl_add_stream_lock(st, s);
2331                         if (s->ste == NULL) {
2332                                 rc = -ENOBUFS;
2333                                 break;
2334                         }
2335                 }
2336
2337                 /* fill TCB from user provided data */
2338                 tcb_establish(s, ci);
2339                 s->tcb.state = TLE_TCP_ST_ESTABLISHED;
2340                 tcp_stream_up(s);
2341
2342         } while (0);
2343
2344         /* cleanup on failure */
2345         if (rc != 0) {
2346                 tcp_stream_reset(ctx, s);
2347                 rte_errno = -rc;
2348                 s = NULL;
2349         }
2350
2351         return &s->s;
2352 }
2353
2354 uint16_t
2355 tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2356 {
2357         uint32_t n;
2358         struct tle_tcp_stream *s;
2359
2360         s = TCP_STREAM(ts);
2361         n = _rte_ring_mcs_dequeue_burst(s->rx.q, (void **)pkt, num);
2362         if (n == 0)
2363                 return 0;
2364
2365         /*
2366          * if we still have packets to read,
2367          * then rearm stream RX event.
2368          */
2369         if (n == num && rte_ring_count(s->rx.q) != 0) {
2370                 if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
2371                         tle_event_raise(s->rx.ev);
2372                 tcp_stream_release(s);
2373         }
2374
2375         return n;
2376 }
2377
2378 ssize_t
2379 tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov,
2380         int iovcnt)
2381 {
2382         int32_t i;
2383         uint32_t mn, n, tn;
2384         size_t sz;
2385         struct tle_tcp_stream *s;
2386         struct iovec iv;
2387         struct rxq_objs mo[2];
2388
2389         s = TCP_STREAM(ts);
2390
2391         /* get group of packets */
2392         mn = tcp_rxq_get_objs(s, mo);
2393         if (mn == 0)
2394                 return 0;
2395
2396         sz = 0;
2397         n = 0;
2398         for (i = 0; i != iovcnt; i++) {
2399                 iv = iov[i];
2400                 sz += iv.iov_len;
2401                 n += _mbus_to_iovec(&iv, mo[0].mb + n, mo[0].num - n);
2402                 if (iv.iov_len != 0) {
2403                         sz -= iv.iov_len;
2404                         break;
2405                 }
2406         }
2407
2408         tn = n;
2409
2410         if (i != iovcnt && mn != 1) {
2411                 n = 0;
2412                 do {
2413                         sz += iv.iov_len;
2414                         n += _mbus_to_iovec(&iv, mo[1].mb + n, mo[1].num - n);
2415                         if (iv.iov_len != 0) {
2416                                 sz -= iv.iov_len;
2417                                 break;
2418                         }
2419                         if (i + 1 != iovcnt)
2420                                 iv = iov[i + 1];
2421                 } while (++i != iovcnt);
2422                 tn += n;
2423         }
2424
2425         tcp_rxq_consume(s, tn);
2426
2427         /*
2428          * if we still have packets to read,
2429          * then rearm stream RX event.
2430          */
2431         if (i == iovcnt && rte_ring_count(s->rx.q) != 0) {
2432                 if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
2433                         tle_event_raise(s->rx.ev);
2434                 tcp_stream_release(s);
2435         }
2436
2437         return sz;
2438 }
2439
2440 static inline int32_t
2441 tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
2442         struct rte_mbuf *segs[], uint32_t num)
2443 {
2444         uint32_t i;
2445         int32_t rc;
2446
2447         for (i = 0; i != num; i++) {
2448                 /* Build L2/L3/L4 header */
2449                 rc = tcp_fill_mbuf(segs[i], s, &s->tx.dst, ol_flags, s->s.port,
2450                         0, TCP_FLAG_ACK, 0, 0);
2451                 if (rc != 0) {
2452                         free_mbufs(segs, num);
2453                         break;
2454                 }
2455         }
2456
2457         if (i == num) {
2458                 /* queue packets for further transmission. */
2459                 rc = _rte_ring_enqueue_bulk(s->tx.q, (void **)segs, num);
2460                 if (rc != 0)
2461                         free_mbufs(segs, num);
2462         }
2463
2464         return rc;
2465 }
2466
2467 uint16_t
2468 tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2469 {
2470         uint32_t i, j, k, mss, n, state;
2471         int32_t rc;
2472         uint64_t ol_flags;
2473         struct tle_tcp_stream *s;
2474         struct rte_mbuf *segs[TCP_MAX_PKT_SEG];
2475
2476         s = TCP_STREAM(ts);
2477
2478         /* mark stream as not closable. */
2479         if (tcp_stream_acquire(s) < 0) {
2480                 rte_errno = EAGAIN;
2481                 return 0;
2482         }
2483
2484         state = s->tcb.state;
2485         if (state != TLE_TCP_ST_ESTABLISHED && state != TLE_TCP_ST_CLOSE_WAIT) {
2486                 rte_errno = ENOTCONN;
2487                 tcp_stream_release(s);
2488                 return 0;
2489         }
2490
2491         mss = s->tcb.snd.mss;
2492         ol_flags = s->tx.dst.ol_flags;
2493
2494         k = 0;
2495         rc = 0;
2496         while (k != num) {
2497                 /* prepare and check for TX */
2498                 for (i = k; i != num; i++) {
2499                         if (pkt[i]->pkt_len > mss ||
2500                                         pkt[i]->nb_segs > TCP_MAX_PKT_SEG)
2501                                 break;
2502                         rc = tcp_fill_mbuf(pkt[i], s, &s->tx.dst, ol_flags,
2503                                 s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2504                         if (rc != 0)
2505                                 break;
2506                 }
2507
2508                 if (i != k) {
2509                         /* queue packets for further transmission. */
2510                         n = _rte_ring_enqueue_burst(s->tx.q,
2511                                 (void **)pkt + k, (i - k));
2512                         k += n;
2513
2514                         /*
2515                          * for unsent, but already modified packets:
2516                          * remove pkt l2/l3 headers, restore ol_flags
2517                          */
2518                         if (i != k) {
2519                                 ol_flags = ~s->tx.dst.ol_flags;
2520                                 for (j = k; j != i; j++) {
2521                                         rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len +
2522                                                 pkt[j]->l3_len +
2523                                                 pkt[j]->l4_len);
2524                                         pkt[j]->ol_flags &= ol_flags;
2525                                 }
2526                                 break;
2527                         }
2528                 }
2529
2530                 if (rc != 0) {
2531                         rte_errno = -rc;
2532                         break;
2533
2534                 /* segment large packet and enqueue for sending */
2535                 } else if (i != num) {
2536                         /* segment the packet. */
2537                         rc = tcp_segmentation(pkt[i], segs, RTE_DIM(segs),
2538                                 &s->tx.dst, mss);
2539                         if (rc < 0) {
2540                                 rte_errno = -rc;
2541                                 break;
2542                         }
2543
2544                         rc = tx_segments(s, ol_flags, segs, rc);
2545                         if (rc == 0) {
2546                                 /* free the large mbuf */
2547                                 rte_pktmbuf_free(pkt[i]);
2548                                 /* set the mbuf as consumed */
2549                                 k++;
2550                         } else
2551                                 /* no space left in tx queue */
2552                                 break;
2553                 }
2554         }
2555
2556         /* notify BE about more data to send */
2557         if (k != 0)
2558                 txs_enqueue(s->s.ctx, s);
2559         /* if possible, re-arm stream write event. */
2560         if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2561                 tle_event_raise(s->tx.ev);
2562
2563         tcp_stream_release(s);
2564
2565         return k;
2566 }
2567
2568 ssize_t
2569 tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp,
2570         const struct iovec *iov, int iovcnt)
2571 {
2572         int32_t i, rc;
2573         uint32_t j, k, n, num, slen, state;
2574         uint64_t ol_flags;
2575         size_t sz, tsz;
2576         struct tle_tcp_stream *s;
2577         struct iovec iv;
2578         struct rte_mbuf *mb[2 * MAX_PKT_BURST];
2579
2580         s = TCP_STREAM(ts);
2581
2582         /* mark stream as not closable. */
2583         if (tcp_stream_acquire(s) < 0) {
2584                 rte_errno = EAGAIN;
2585                 return -1;
2586         }
2587
2588         state = s->tcb.state;
2589         if (state != TLE_TCP_ST_ESTABLISHED && state != TLE_TCP_ST_CLOSE_WAIT) {
2590                 rte_errno = ENOTCONN;
2591                 tcp_stream_release(s);
2592                 return -1;
2593         }
2594
2595         /* figure out how many mbufs do we need */
2596         tsz = 0;
2597         for (i = 0; i != iovcnt; i++)
2598                 tsz += iov[i].iov_len;
2599
2600         slen = rte_pktmbuf_data_room_size(mp);
2601         slen = RTE_MIN(slen, s->tcb.snd.mss);
2602
2603         num = (tsz + slen - 1) / slen;
2604         n = rte_ring_free_count(s->tx.q);
2605         num = RTE_MIN(num, n);
2606         n = RTE_MIN(num, RTE_DIM(mb));
2607
2608         /* allocate mbufs */
2609         if (rte_pktmbuf_alloc_bulk(mp, mb, n) != 0) {
2610                 rte_errno = ENOMEM;
2611                 tcp_stream_release(s);
2612                 return -1;
2613         }
2614
2615         /* copy data into the mbufs */
2616         k = 0;
2617         sz = 0;
2618         for (i = 0; i != iovcnt; i++) {
2619                 iv = iov[i];
2620                 sz += iv.iov_len;
2621                 k += _iovec_to_mbsegs(&iv, slen, mb + k, n - k);
2622                 if (iv.iov_len != 0) {
2623                         sz -= iv.iov_len;
2624                         break;
2625                 }
2626         }
2627
2628         /* partially filled segment */
2629         k += (k != n && mb[k]->data_len != 0);
2630
2631         /* fill pkt headers */
2632         ol_flags = s->tx.dst.ol_flags;
2633
2634         for (j = 0; j != k; j++) {
2635                 rc = tcp_fill_mbuf(mb[j], s, &s->tx.dst, ol_flags,
2636                         s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2637                 if (rc != 0)
2638                         break;
2639         }
2640
2641         /* if no error encountered, then enqueue pkts for transmission */
2642         if (k == j)
2643                 k = _rte_ring_enqueue_burst(s->tx.q, (void **)mb, j);
2644         else
2645                 k = 0;
2646
2647         if (k != j) {
2648
2649                 /* free pkts that were not enqueued */
2650                 free_mbufs(mb + k, j - k);
2651
2652                 /* our last segment can be partially filled */
2653                 sz += slen - sz % slen;
2654                 sz -= (j - k) * slen;
2655
2656                 /* report an error */
2657                 if (rc != 0) {
2658                         rte_errno = -rc;
2659                         sz = -1;
2660                 }
2661         }
2662
2663         if (k != 0) {
2664
2665                 /* notify BE about more data to send */
2666                 txs_enqueue(s->s.ctx, s);
2667
2668                 /* if possible, re-arm stream write event. */
2669                 if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2670                         tle_event_raise(s->tx.ev);
2671         }
2672
2673         tcp_stream_release(s);
2674         return sz;
2675 }
2676
2677 /* send data and FIN (if needed) */
2678 static inline void
2679 tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
2680 {
2681         /* try to send some data */
2682         tx_nxt_data(s, tms);
2683
2684         /* we also have to send a FIN */
2685         if (state != TLE_TCP_ST_ESTABLISHED &&
2686                         state != TLE_TCP_ST_CLOSE_WAIT &&
2687                         tcp_txq_nxt_cnt(s) == 0 &&
2688                         s->tcb.snd.fss != s->tcb.snd.nxt) {
2689                 s->tcb.snd.fss = ++s->tcb.snd.nxt;
2690                 send_ack(s, tms, TCP_FLAG_FIN | TCP_FLAG_ACK);
2691         }
2692 }
2693
2694 static inline void
2695 tx_stream(struct tle_tcp_stream *s, uint32_t tms)
2696 {
2697         uint32_t state;
2698
2699         state = s->tcb.state;
2700
2701         if (state == TLE_TCP_ST_SYN_SENT) {
2702                 /* send the SYN, start the rto timer */
2703                 send_ack(s, tms, TCP_FLAG_SYN);
2704                 timer_start(s);
2705
2706         } else if (state >= TLE_TCP_ST_ESTABLISHED &&
2707                         state <= TLE_TCP_ST_LAST_ACK) {
2708
2709                 tx_data_fin(s, tms, state);
2710
2711                 /* start RTO timer. */
2712                 if (s->tcb.snd.nxt != s->tcb.snd.una)
2713                         timer_start(s);
2714         } else if (state == TLE_TCP_ST_CLOSED) {
2715                 if ((s->tcb.snd.close_flags & TCP_FLAG_RST) != 0)
2716                         send_rst(s, s->tcb.snd.nxt);
2717                 stream_term(s);
2718         }
2719 }
2720
2721 static inline void
2722 rto_stream(struct tle_tcp_stream *s, uint32_t tms)
2723 {
2724         uint32_t state;
2725
2726         state = s->tcb.state;
2727
2728         TCP_LOG(DEBUG, "%s(%p, tms=%u): state=%u, "
2729                 "retx=%u, retm=%u, "
2730                 "rto=%u, snd.ts=%u, tmo=%u, "
2731                 "snd.nxt=%lu, snd.una=%lu, flight_size=%lu, "
2732                 "snd.rcvr=%lu, snd.fastack=%u, "
2733                 "wnd=%u, cwnd=%u, ssthresh=%u, "
2734                 "bytes sent=%lu, pkt remain=%u;\n",
2735                 __func__, s, tms, s->tcb.state,
2736                 s->tcb.snd.nb_retx, s->tcb.snd.nb_retm,
2737                 s->tcb.snd.rto, s->tcb.snd.ts, tms - s->tcb.snd.ts,
2738                 s->tcb.snd.nxt, s->tcb.snd.una, s->tcb.snd.nxt - s->tcb.snd.una,
2739                 s->tcb.snd.rcvr, s->tcb.snd.fastack,
2740                 s->tcb.snd.wnd, s->tcb.snd.cwnd, s->tcb.snd.ssthresh,
2741                 s->tcb.snd.nxt - s->tcb.snd.iss, tcp_txq_nxt_cnt(s));
2742
2743         if (s->tcb.snd.nb_retx < s->tcb.snd.nb_retm) {
2744
2745                 if (state >= TLE_TCP_ST_ESTABLISHED &&
2746                                 state <= TLE_TCP_ST_LAST_ACK) {
2747
2748                         /* update SND.CWD and SND.SSTHRESH */
2749                         rto_cwnd_update(&s->tcb);
2750
2751                         /* RFC 6582 3.2.4 */
2752                         s->tcb.snd.rcvr = s->tcb.snd.nxt;
2753                         s->tcb.snd.fastack = 0;
2754
2755                         /* restart from last acked data */
2756                         tcp_txq_rst_nxt_head(s);
2757                         s->tcb.snd.nxt = s->tcb.snd.una;
2758
2759                         tx_data_fin(s, tms, state);
2760
2761                 } else if (state == TLE_TCP_ST_SYN_SENT) {
2762                         /* resending SYN */
2763                         s->tcb.so.ts.val = tms;
2764
2765                         /* According to RFC 6928 2:
2766                          * To reduce the chance for spurious SYN or SYN/ACK
2767                          * retransmission, it is RECOMMENDED that
2768                          * implementations refrain from resetting the initial
2769                          * window to 1 segment, unless there have been more
2770                          * than one SYN or SYN/ACK retransmissions or true loss
2771                          * detection has been made.
2772                          */
2773                         if (s->tcb.snd.nb_retx != 0)
2774                                 s->tcb.snd.cwnd = s->tcb.snd.mss;
2775
2776                         send_ack(s, tms, TCP_FLAG_SYN);
2777
2778                 } else if (state == TLE_TCP_ST_TIME_WAIT) {
2779                         s->err.rev |= TLE_TCP_REV_RTO;
2780                         stream_term(s);
2781                 }
2782
2783                 /* RFC6298:5.5 back off the timer */
2784                 s->tcb.snd.rto = rto_roundup(2 * s->tcb.snd.rto);
2785                 s->tcb.snd.nb_retx++;
2786                 timer_restart(s);
2787
2788         } else {
2789                 s->err.rev |= TLE_TCP_REV_RTO;
2790                 send_rst(s, s->tcb.snd.nxt);
2791                 stream_term(s);
2792         }
2793 }
2794
2795 int
2796 tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
2797 {
2798         uint32_t i, k, tms;
2799         struct sdr *dr;
2800         struct tle_timer_wheel *tw;
2801         struct tle_stream *p;
2802         struct tle_tcp_stream *s, *rs[num];
2803
2804         /* process streams with RTO exipred */
2805
2806         tw = CTX_TCP_TMWHL(ctx);
2807         tms = tcp_get_tms(ctx->cycles_ms_shift);
2808         tle_timer_expire(tw, tms);
2809
2810         k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs));
2811
2812         for (i = 0; i != k; i++) {
2813
2814                 s = rs[i];
2815                 s->timer.handle = NULL;
2816                 if (tcp_stream_try_acquire(s) > 0)
2817                         rto_stream(s, tcp_stream_adjust_tms(s, tms));
2818                 tcp_stream_release(s);
2819         }
2820
2821         /* process streams from to-send queue */
2822
2823         k = txs_dequeue_bulk(ctx, rs, RTE_DIM(rs));
2824
2825         for (i = 0; i != k; i++) {
2826
2827                 s = rs[i];
2828                 rte_atomic32_set(&s->tx.arm, 0);
2829
2830                 if (tcp_stream_try_acquire(s) > 0)
2831                         tx_stream(s, tcp_stream_adjust_tms(s, tms));
2832                 else
2833                         txs_enqueue(s->s.ctx, s);
2834                 tcp_stream_release(s);
2835         }
2836
2837         /* collect streams to close from the death row */
2838
2839         dr = CTX_TCP_SDR(ctx);
2840         for (k = 0, p = STAILQ_FIRST(&dr->be);
2841                         k != num && p != NULL;
2842                         k++, p = STAILQ_NEXT(p, link))
2843                 rs[k] = TCP_STREAM(p);
2844
2845         if (p == NULL)
2846                 STAILQ_INIT(&dr->be);
2847         else
2848                 STAILQ_FIRST(&dr->be) = p;
2849
2850         /* cleanup closed streams */
2851         for (i = 0; i != k; i++) {
2852                 s = rs[i];
2853                 tcp_stream_down(s);
2854                 tcp_stream_reset(ctx, s);
2855         }
2856
2857         return 0;
2858 }