tle_tcp_proces: fix the issue when strem can sit in the txs queue forever.
[tldk.git] / lib / libtle_l4p / tcp_rxtx.c
1 /*
2  * Copyright (c) 2016  Intel Corporation.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <rte_errno.h>
17 #include <rte_ethdev.h>
18 #include <rte_ip.h>
19 #include <rte_ip_frag.h>
20 #include <rte_tcp.h>
21
22 #include "tcp_stream.h"
23 #include "tcp_timer.h"
24 #include "stream_table.h"
25 #include "syncookie.h"
26 #include "misc.h"
27 #include "tcp_ctl.h"
28 #include "tcp_rxq.h"
29 #include "tcp_txq.h"
30 #include "tcp_tx_seg.h"
31
32 #define TCP_MAX_PKT_SEG 0x20
33
34 /*
35  * checks if input TCP ports and IP addresses match given stream.
36  * returns zero on success.
37  */
38 static inline int
39 rx_check_stream(const struct tle_tcp_stream *s, const union pkt_info *pi)
40 {
41         int32_t rc;
42
43         if (pi->tf.type == TLE_V4)
44                 rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
45                         (pi->addr4.raw & s->s.ipv4.mask.raw) !=
46                         s->s.ipv4.addr.raw;
47         else
48                 rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
49                         ymm_mask_cmp(&pi->addr6->raw, &s->s.ipv6.addr.raw,
50                         &s->s.ipv6.mask.raw) != 0;
51
52         return rc;
53 }
54
55 static inline struct tle_tcp_stream *
56 rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi,
57         uint32_t type)
58 {
59         struct tle_tcp_stream *s;
60
61         s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst];
62         if (s == NULL || rwl_acquire(&s->rx.use) < 0)
63                 return NULL;
64
65         /* check that we have a proper stream. */
66         if (s->tcb.state != TCP_ST_LISTEN) {
67                 rwl_release(&s->rx.use);
68                 s = NULL;
69         }
70
71         return s;
72 }
73
74 static inline struct tle_tcp_stream *
75 rx_obtain_stream(const struct tle_dev *dev, struct stbl *st,
76         const union pkt_info *pi, uint32_t type)
77 {
78         struct tle_tcp_stream *s;
79
80         s = stbl_find_data(st, pi);
81         if (s == NULL) {
82                 if (pi->tf.flags == TCP_FLAG_ACK)
83                         return rx_obtain_listen_stream(dev, pi, type);
84                 return NULL;
85         }
86
87         if (stbl_data_pkt(s) || rwl_acquire(&s->rx.use) < 0)
88                 return NULL;
89         /* check that we have a proper stream. */
90         else if (s->tcb.state == TCP_ST_CLOSED) {
91                 rwl_release(&s->rx.use);
92                 s = NULL;
93         }
94
95         return s;
96 }
97
98 /*
99  * Consider 2 pkt_info *equal* if their:
100  * - types (IPv4/IPv6)
101  * - TCP flags
102  * - checksum flags
103  * - TCP src and dst ports
104  * - IP src and dst addresses
105  * are equal.
106  */
107 static inline int
108 pkt_info_bulk_eq(const union pkt_info pi[], uint32_t num)
109 {
110         uint32_t i;
111
112         i = 1;
113
114         if (pi[0].tf.type == TLE_V4) {
115                 while (i != num && xmm_cmp(&pi[0].raw, &pi[i].raw) == 0)
116                         i++;
117
118         } else if (pi[0].tf.type == TLE_V6) {
119                 while (i != num &&
120                                 pi[0].raw.u64[0] == pi[i].raw.u64[0] &&
121                                 ymm_cmp(&pi[0].addr6->raw,
122                                 &pi[i].addr6->raw) == 0)
123                         i++;
124         }
125
126         return i;
127 }
128
129 static inline int
130 pkt_info_bulk_syneq(const union pkt_info pi[], uint32_t num)
131 {
132         uint32_t i;
133
134         i = 1;
135
136         if (pi[0].tf.type == TLE_V4) {
137                 while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
138                                 pi[0].port.dst == pi[i].port.dst &&
139                                 pi[0].addr4.dst == pi[i].addr4.dst)
140                         i++;
141
142         } else if (pi[0].tf.type == TLE_V6) {
143                 while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
144                                 pi[0].port.dst == pi[i].port.dst &&
145                                 xmm_cmp(&pi[0].addr6->dst,
146                                 &pi[i].addr6->dst) == 0)
147                         i++;
148         }
149
150         return i;
151 }
152
153 static inline void
154 stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[],
155         uint32_t nb_drb)
156 {
157         _rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
158 }
159
160 static inline uint32_t
161 stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
162         uint32_t nb_drb)
163 {
164         return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
165 }
166
167 static inline void
168 fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
169         uint32_t seq, uint8_t hlen, uint8_t flags)
170 {
171         uint16_t wnd;
172
173         l4h->src_port = port.dst;
174         l4h->dst_port = port.src;
175
176         wnd = (flags & TCP_FLAG_SYN) ?
177                 RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
178                 tcb->rcv.wnd >> tcb->rcv.wscale;
179
180         /* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
181         l4h->sent_seq = rte_cpu_to_be_32(seq);
182         l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
183         l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET;
184         l4h->tcp_flags = flags;
185         l4h->rx_win = rte_cpu_to_be_16(wnd);
186         l4h->cksum = 0;
187         l4h->tcp_urp = 0;
188
189         if (flags & TCP_FLAG_SYN)
190                 fill_syn_opts(l4h + 1, &tcb->so);
191         else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0)
192                 fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
193 }
194
195 static inline int
196 tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s,
197         const struct tle_dest *dst, uint64_t ol_flags,
198         union l4_ports port, uint32_t seq, uint32_t flags,
199         uint32_t pid, uint32_t swcsm)
200 {
201         uint32_t l4, len, plen;
202         struct tcp_hdr *l4h;
203         char *l2h;
204
205         len = dst->l2_len + dst->l3_len;
206         plen = m->pkt_len;
207
208         if (flags & TCP_FLAG_SYN)
209                 l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_MAX;
210         else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0)
211                 l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS;
212         else
213                 l4 = sizeof(*l4h);
214
215         /* adjust mbuf to put L2/L3/L4 headers into it. */
216         l2h = rte_pktmbuf_prepend(m, len + l4);
217         if (l2h == NULL)
218                 return -EINVAL;
219
220         /* copy L2/L3 header */
221         rte_memcpy(l2h, dst->hdr, len);
222
223         /* setup TCP header & options */
224         l4h = (struct tcp_hdr *)(l2h + len);
225         fill_tcph(l4h, &s->tcb, port, seq, l4, flags);
226
227         /* setup mbuf TX offload related fields. */
228         m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0);
229         m->ol_flags |= ol_flags;
230
231         /* update proto specific fields. */
232
233         if (s->s.type == TLE_V4) {
234                 struct ipv4_hdr *l3h;
235                 l3h = (struct ipv4_hdr *)(l2h + dst->l2_len);
236                 l3h->packet_id = rte_cpu_to_be_16(pid);
237                 l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4);
238
239                 if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
240                         l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
241                                 ol_flags);
242                 else if (swcsm != 0)
243                         l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
244
245                 if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0)
246                         l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
247         } else {
248                 struct ipv6_hdr *l3h;
249                 l3h = (struct ipv6_hdr *)(l2h + dst->l2_len);
250                 l3h->payload_len = rte_cpu_to_be_16(plen + l4);
251                 if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
252                         l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
253                 else if (swcsm != 0)
254                         l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
255         }
256
257         return 0;
258 }
259
260 /*
261  * That function supposed to be used only for data packets.
262  * Assumes that L2/L3/L4 headers and mbuf fields already setup properly.
263  *  - updates tcp SEG.SEQ, SEG.ACK, TS.VAL, TS.ECR.
264  *  - if no HW cksum offloads are enabled, calculates TCP checksum.
265  */
266 static inline void
267 tcp_update_mbuf(struct rte_mbuf *m, uint32_t type, const struct tcb *tcb,
268         uint32_t seq, uint32_t pid)
269 {
270         struct tcp_hdr *l4h;
271         uint32_t len;
272
273         len = m->l2_len + m->l3_len;
274         l4h = rte_pktmbuf_mtod_offset(m, struct tcp_hdr *, len);
275
276         l4h->sent_seq = rte_cpu_to_be_32(seq);
277         l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
278
279         if (tcb->so.ts.raw != 0)
280                 fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
281
282         if (type == TLE_V4) {
283                 struct ipv4_hdr *l3h;
284                 l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *, m->l2_len);
285                 l3h->hdr_checksum = 0;
286                 l3h->packet_id = rte_cpu_to_be_16(pid);
287                 if ((m->ol_flags & PKT_TX_IP_CKSUM) == 0)
288                         l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
289         }
290
291         /* have to calculate TCP checksum in SW */
292         if ((m->ol_flags & PKT_TX_TCP_CKSUM) == 0) {
293
294                 l4h->cksum = 0;
295
296                 if (type == TLE_V4) {
297                         struct ipv4_hdr *l3h;
298                         l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *,
299                                 m->l2_len);
300                         l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
301
302                 } else {
303                         struct ipv6_hdr *l3h;
304                         l3h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *,
305                                 m->l2_len);
306                         l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
307                 }
308         }
309 }
310
311 /* Send data packets that need to be ACK-ed by peer */
312 static inline uint32_t
313 tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num)
314 {
315         uint32_t bsz, i, nb, nbm;
316         struct tle_dev *dev;
317         struct tle_drb *drb[num];
318
319         /* calculate how many drbs are needed.*/
320         bsz = s->tx.drb.nb_elem;
321         nbm = (num + bsz - 1) / bsz;
322
323         /* allocate drbs, adjust number of packets. */
324         nb = stream_drb_alloc(s, drb, nbm);
325
326         /* drb ring is empty. */
327         if (nb == 0)
328                 return 0;
329
330         else if (nb != nbm)
331                 num = nb * bsz;
332
333         dev = s->tx.dst.dev;
334
335         /* enqueue pkts for TX. */
336         nbm = nb;
337         i = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)m,
338                 num, drb, &nb);
339
340         /* free unused drbs. */
341         if (nb != 0)
342                 stream_drb_free(s, drb + nbm - nb, nb);
343
344         return i;
345 }
346
347 static inline uint32_t
348 tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
349         uint32_t num)
350 {
351         uint32_t fail, i, k, n, mss, pid, plen, sz, tn, type;
352         struct tle_dev *dev;
353         struct rte_mbuf *mb;
354         struct rte_mbuf *mo[MAX_PKT_BURST + TCP_MAX_PKT_SEG];
355
356         mss = s->tcb.snd.mss;
357         type = s->s.type;
358
359         dev = s->tx.dst.dev;
360         pid = rte_atomic32_add_return(&dev->tx.packet_id[type], num) - num;
361
362         k = 0;
363         tn = 0;
364         fail = 0;
365         for (i = 0; i != num && sl->len != 0 && fail == 0; i++) {
366
367                 mb = mi[i];
368                 sz = RTE_MIN(sl->len, mss);
369                 plen = PKT_L4_PLEN(mb);
370
371                 /*fast path, no need to use indirect mbufs. */
372                 if (plen <= sz) {
373
374                         /* update pkt TCP header */
375                         tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid + i);
376
377                         /* keep mbuf till ACK is received. */
378                         rte_pktmbuf_refcnt_update(mb, 1);
379                         sl->len -= plen;
380                         sl->seq += plen;
381                         mo[k++] = mb;
382                 /* remaining snd.wnd is less them MSS, send nothing */
383                 } else if (sz < mss)
384                         break;
385                 /* packet indirection needed */
386                 else
387                         RTE_VERIFY(0);
388
389                 if (k >= MAX_PKT_BURST) {
390                         n = tx_data_pkts(s, mo, k);
391                         fail = k - n;
392                         tn += n;
393                         k = 0;
394                 }
395         }
396
397         if (k != 0) {
398                 n = tx_data_pkts(s, mo, k);
399                 fail = k - n;
400                 tn += n;
401         }
402
403         if (fail != 0) {
404                 sz = tcp_mbuf_seq_free(mo + n, fail);
405                 sl->seq -= sz;
406                 sl->len += sz;
407         }
408
409         return tn;
410 }
411
412 /*
413  * gets data from stream send buffer, updates it and
414  * queues it into TX device queue.
415  * Note that this function and is not MT safe.
416  */
417 static inline uint32_t
418 tx_nxt_data(struct tle_tcp_stream *s, uint32_t tms)
419 {
420         uint32_t n, num, tn, wnd;
421         struct rte_mbuf **mi;
422         union seqlen sl;
423
424         tn = 0;
425         wnd = s->tcb.snd.wnd - (uint32_t)(s->tcb.snd.nxt - s->tcb.snd.una);
426         sl.seq = s->tcb.snd.nxt;
427         sl.len = RTE_MIN(wnd, s->tcb.snd.cwnd);
428
429         if (sl.len == 0)
430                 return tn;
431
432         /* update send timestamp */
433         s->tcb.snd.ts = tms;
434
435         do {
436                 /* get group of packets */
437                 mi = tcp_txq_get_nxt_objs(s, &num);
438
439                 /* stream send buffer is empty */
440                 if (num == 0)
441                         break;
442
443                 /* queue data packets for TX */
444                 n = tx_data_bulk(s, &sl, mi, num);
445                 tn += n;
446
447                 /* update consumer head */
448                 tcp_txq_set_nxt_head(s, n);
449         } while (n == num);
450
451         s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt;
452         return tn;
453 }
454
455 static inline void
456 free_una_data(struct tle_tcp_stream *s, uint32_t len)
457 {
458         uint32_t i, n, num, plen;
459         struct rte_mbuf **mi;
460
461         n = 0;
462         plen = 0;
463
464         do {
465                 /* get group of packets */
466                 mi = tcp_txq_get_una_objs(s, &num);
467
468                 if (num == 0)
469                         break;
470
471                 /* free acked data */
472                 for (i = 0; i != num && n != len; i++, n = plen) {
473                         plen += PKT_L4_PLEN(mi[i]);
474                         if (plen > len) {
475                                 /* keep SND.UNA at the start of the packet */
476                                 len -= RTE_MIN(len, plen - len);
477                                 break;
478                         }
479                         rte_pktmbuf_free(mi[i]);
480                 }
481
482                 /* update consumer tail */
483                 tcp_txq_set_una_tail(s, i);
484         } while (plen < len);
485
486         s->tcb.snd.una += len;
487
488         /*
489          * that could happen in case of retransmit,
490          * adjust SND.NXT with SND.UNA.
491          */
492         if (s->tcb.snd.una > s->tcb.snd.nxt) {
493                 tcp_txq_rst_nxt_head(s);
494                 s->tcb.snd.nxt = s->tcb.snd.una;
495         }
496 }
497
498 static inline uint16_t
499 calc_smss(uint16_t mss, const struct tle_dest *dst)
500 {
501         uint16_t n;
502
503         n = dst->mtu - dst->l2_len - dst->l3_len - TCP_TX_HDR_DACK;
504         mss = RTE_MIN(n, mss);
505         return mss;
506 }
507
508 /*
509  * RFC 5681 3.1
510  * If SMSS > 2190 bytes:
511  *     IW = 2 * SMSS bytes and MUST NOT be more than 2 segments
512  *  If (SMSS > 1095 bytes) and (SMSS <= 2190 bytes):
513  *     IW = 3 * SMSS bytes and MUST NOT be more than 3 segments
514  *  if SMSS <= 1095 bytes:
515  *     IW = 4 * SMSS bytes and MUST NOT be more than 4 segments
516  */
517 static inline uint32_t
518 initial_cwnd(uint16_t smss)
519 {
520         if (smss > 2190)
521                 return 2 * smss;
522         else if (smss > 1095)
523                 return 3 * smss;
524         return 4 * smss;
525 }
526
527 /*
528  * queue standalone packet to he particular output device
529  * It assumes that:
530  * - L2/L3/L4 headers should be already set.
531  * - packet fits into one segment.
532  */
533 static inline int
534 send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m)
535 {
536         uint32_t n, nb;
537         struct tle_drb *drb;
538
539         if (stream_drb_alloc(s, &drb, 1) == 0)
540                 return -ENOBUFS;
541
542         /* enqueue pkt for TX. */
543         nb = 1;
544         n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1,
545                 &drb, &nb);
546
547         /* free unused drbs. */
548         if (nb != 0)
549                 stream_drb_free(s, &drb, 1);
550
551         return (n == 1) ? 0 : -ENOBUFS;
552 }
553
554 static inline int
555 send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
556         uint32_t flags)
557 {
558         const struct tle_dest *dst;
559         uint32_t pid, type;
560         int32_t rc;
561
562         dst = &s->tx.dst;
563         type = s->s.type;
564         pid = rte_atomic32_add_return(&dst->dev->tx.packet_id[type], 1) - 1;
565
566         rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1);
567         if (rc == 0)
568                 rc = send_pkt(s, dst->dev, m);
569
570         return rc;
571 }
572
573 static inline int
574 send_rst(struct tle_tcp_stream *s, uint32_t seq)
575 {
576         struct rte_mbuf *m;
577         int32_t rc;
578
579         m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
580         if (m == NULL)
581                 return -ENOMEM;
582
583         rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST);
584         if (rc != 0)
585                 rte_pktmbuf_free(m);
586
587         return rc;
588 }
589
590 static inline int
591 send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags)
592 {
593         struct rte_mbuf *m;
594         uint32_t seq;
595         int32_t rc;
596
597         m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
598         if (m == NULL)
599                 return -ENOMEM;
600
601         seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0);
602         s->tcb.snd.ts = tms;
603
604         rc = send_ctrl_pkt(s, m, seq, flags);
605         if (rc != 0) {
606                 rte_pktmbuf_free(m);
607                 return rc;
608         }
609
610         s->tcb.snd.ack = s->tcb.rcv.nxt;
611         return 0;
612 }
613
614
615 static int
616 sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
617         const union seg_info *si, uint32_t ts, struct rte_mbuf *m)
618 {
619         uint16_t len;
620         int32_t rc;
621         uint32_t pid, seq, type;
622         struct tle_dev *dev;
623         const void *da;
624         struct tle_dest dst;
625         const struct tcp_hdr *th;
626
627         type = s->s.type;
628
629         /* get destination information. */
630         if (type == TLE_V4)
631                 da = &pi->addr4.src;
632         else
633                 da = &pi->addr6->src;
634
635         rc = stream_get_dest(&s->s, da, &dst);
636         if (rc < 0)
637                 return rc;
638
639         th = rte_pktmbuf_mtod_offset(m, const struct tcp_hdr *,
640                 m->l2_len + m->l3_len);
641         get_syn_opts(&s->tcb.so, (uintptr_t)(th + 1), m->l4_len - sizeof(*th));
642
643         s->tcb.rcv.nxt = si->seq + 1;
644         seq = sync_gen_seq(pi, s->tcb.rcv.nxt, ts, s->tcb.so.mss,
645                                 s->s.ctx->prm.hash_alg,
646                                 &s->s.ctx->prm.secret_key);
647         s->tcb.so.ts.ecr = s->tcb.so.ts.val;
648         s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale);
649         s->tcb.so.wscale = (s->tcb.so.wscale == TCP_WSCALE_NONE) ?
650                 TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
651         s->tcb.so.mss = calc_smss(dst.mtu, &dst);
652
653         /* reset mbuf's data contents. */
654         len = m->l2_len + m->l3_len + m->l4_len;
655         m->tx_offload = 0;
656         if (rte_pktmbuf_adj(m, len) == NULL)
657                 return -EINVAL;
658
659         dev = dst.dev;
660         pid = rte_atomic32_add_return(&dev->tx.packet_id[type], 1) - 1;
661
662         rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq,
663                 TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
664         if (rc == 0)
665                 rc = send_pkt(s, dev, m);
666
667         return rc;
668 }
669
670 /*
671  * RFC 793:
672  * There are four cases for the acceptability test for an incoming segment:
673  * Segment Receive  Test
674  * Length  Window
675  * ------- -------  -------------------------------------------
676  *    0       0     SEG.SEQ = RCV.NXT
677  *    0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
678  *   >0       0     not acceptable
679  *   >0      >0     RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
680  *                  or RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
681  */
682 static inline int
683 check_seqn(const struct tcb *tcb, uint32_t seqn, uint32_t len)
684 {
685         uint32_t n;
686
687         n = seqn + len;
688         if (seqn - tcb->rcv.nxt >= tcb->rcv.wnd &&
689                         n - tcb->rcv.nxt > tcb->rcv.wnd)
690                 return -ERANGE;
691
692         return 0;
693 }
694
695 static inline union tsopt
696 rx_tms_opt(const struct tcb *tcb, const struct rte_mbuf *mb)
697 {
698         union tsopt ts;
699         uintptr_t opt;
700         const struct tcp_hdr *th;
701
702         if (tcb->so.ts.val != 0) {
703                 opt = rte_pktmbuf_mtod_offset(mb, uintptr_t,
704                         mb->l2_len + mb->l3_len + sizeof(*th));
705                 ts = get_tms_opts(opt, mb->l4_len - sizeof(*th));
706         } else
707                 ts.raw = 0;
708
709         return ts;
710 }
711
712 /*
713  * PAWS and sequence check.
714  * RFC 1323 4.2.1
715  */
716 static inline int
717 rx_check_seq(struct tcb *tcb, uint32_t seq, uint32_t len, const union tsopt ts)
718 {
719         int32_t rc;
720
721         /* RFC 1323 4.2.1 R2 */
722         rc = check_seqn(tcb, seq, len);
723         if (rc < 0)
724                 return rc;
725
726         if (ts.raw != 0) {
727
728                 /* RFC 1323 4.2.1 R1 */
729                 if (tcp_seq_lt(ts.val, tcb->rcv.ts))
730                         return -ERANGE;
731
732                 /* RFC 1323 4.2.1 R3 */
733                 if (tcp_seq_leq(seq, tcb->snd.ack) &&
734                                 tcp_seq_lt(tcb->snd.ack, seq + len))
735                         tcb->rcv.ts = ts.val;
736         }
737
738         return rc;
739 }
740
741 static inline int
742 rx_check_ack(const struct tcb *tcb, uint32_t ack)
743 {
744         uint32_t max;
745
746         max = (uint32_t)RTE_MAX(tcb->snd.nxt, tcb->snd.rcvr);
747
748         if (tcp_seq_leq(tcb->snd.una, ack) && tcp_seq_leq(ack, max))
749                 return 0;
750
751         return -ERANGE;
752 }
753
754 static inline int
755 rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
756         const union tsopt ts)
757 {
758         int32_t rc;
759
760         rc = rx_check_seq(tcb, seq, len, ts);
761         rc |= rx_check_ack(tcb, ack);
762         return rc;
763 }
764
765 static inline int
766 restore_syn_opt(struct syn_opts *so, const union pkt_info *pi,
767         const union seg_info *si, uint32_t ts, const struct rte_mbuf *mb,
768         uint32_t hash_alg, rte_xmm_t *secret_key)
769 {
770         int32_t rc;
771         uint32_t len;
772         const struct tcp_hdr *th;
773
774         /* check that ACK, etc fields are what we expected. */
775         rc = sync_check_ack(pi, si->seq, si->ack - 1, ts,
776                                 hash_alg,
777                                 secret_key);
778         if (rc < 0)
779                 return rc;
780
781         so->mss = rc;
782
783         th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
784                 mb->l2_len + mb->l3_len);
785         len = mb->l4_len - sizeof(*th);
786         sync_get_opts(so, (uintptr_t)(th + 1), len);
787         return 0;
788 }
789
790 static inline void
791 stream_term(struct tle_tcp_stream *s)
792 {
793         struct sdr *dr;
794
795         s->tcb.state = TCP_ST_CLOSED;
796         rte_smp_wmb();
797
798         timer_stop(s);
799
800         /* close() was already invoked, schedule final cleanup */
801         if ((s->tcb.uop & TCP_OP_CLOSE) != 0) {
802
803                 dr = CTX_TCP_SDR(s->s.ctx);
804                 STAILQ_INSERT_TAIL(&dr->be, &s->s, link);
805
806         /* notify user that stream need to be closed */
807         } else if (s->err.ev != NULL)
808                 tle_event_raise(s->err.ev);
809         else if (s->err.cb.func != NULL)
810                 s->err.cb.func(s->err.cb.data, &s->s);
811 }
812
813 static inline int
814 stream_fill_dest(struct tle_tcp_stream *s)
815 {
816         int32_t rc;
817         const void *da;
818
819         if (s->s.type == TLE_V4)
820                 da = &s->s.ipv4.addr.src;
821         else
822                 da = &s->s.ipv6.addr.src;
823
824         rc = stream_get_dest(&s->s, da, &s->tx.dst);
825         return (rc < 0) ? rc : 0;
826 }
827
828 /*
829  * helper function, prepares a new accept stream.
830  */
831 static inline int
832 accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
833         struct tle_tcp_stream *cs, const struct syn_opts *so,
834         uint32_t tms, const union pkt_info *pi, const union seg_info *si)
835 {
836         int32_t rc;
837         uint32_t rtt;
838
839         /* some TX still pending for that stream. */
840         if (TCP_STREAM_TX_PENDING(cs))
841                 return -EAGAIN;
842
843         /* setup L4 ports and L3 addresses fields. */
844         cs->s.port.raw = pi->port.raw;
845         cs->s.pmsk.raw = UINT32_MAX;
846
847         if (pi->tf.type == TLE_V4) {
848                 cs->s.ipv4.addr = pi->addr4;
849                 cs->s.ipv4.mask.src = INADDR_NONE;
850                 cs->s.ipv4.mask.dst = INADDR_NONE;
851         } else if (pi->tf.type == TLE_V6) {
852                 cs->s.ipv6.addr = *pi->addr6;
853                 rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
854                         sizeof(cs->s.ipv6.mask.src));
855                 rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
856                         sizeof(cs->s.ipv6.mask.dst));
857         }
858
859         /* setup TCB */
860         sync_fill_tcb(&cs->tcb, si, so);
861         cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
862
863         /*
864          * estimate the rto
865          * for now rtt is calculated based on the tcp TMS option,
866          * later add real-time one
867          */
868         if (cs->tcb.so.ts.ecr) {
869                 rtt = tms - cs->tcb.so.ts.ecr;
870                 rto_estimate(&cs->tcb, rtt);
871         } else
872                 cs->tcb.snd.rto = TCP_RTO_DEFAULT;
873
874         /* copy streams type. */
875         cs->s.type = ps->s.type;
876
877         /* retrive and cache destination information. */
878         rc = stream_fill_dest(cs);
879         if (rc != 0)
880                 return rc;
881
882         /* update snd.mss with SMSS value */
883         cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
884
885         /* setup congestion variables */
886         cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss);
887         cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
888
889         cs->tcb.state = TCP_ST_ESTABLISHED;
890
891         /* add stream to the table */
892         cs->ste = stbl_add_stream(st, pi, cs);
893         if (cs->ste == NULL)
894                 return -ENOBUFS;
895
896         cs->tcb.uop |= TCP_OP_ACCEPT;
897         tcp_stream_up(cs);
898         return 0;
899 }
900
901
902 /*
903  * ACK for new connection request arrived.
904  * Check that the packet meets all conditions and try to open a new stream.
905  * returns:
906  * < 0  - invalid packet
907  * == 0 - packet is valid and new stream was opened for it.
908  * > 0  - packet is valid, but failed to open new stream.
909  */
910 static inline int
911 rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
912         const union pkt_info *pi, const union seg_info *si,
913         uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
914 {
915         int32_t rc;
916         struct tle_ctx *ctx;
917         struct tle_stream *ts;
918         struct tle_tcp_stream *cs;
919         struct syn_opts so;
920
921         *csp = NULL;
922
923         if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
924                 return -EINVAL;
925
926         ctx = s->s.ctx;
927         rc = restore_syn_opt(&so, pi, si, tms, mb, ctx->prm.hash_alg,
928                                 &ctx->prm.secret_key);
929         if (rc < 0)
930                 return rc;
931
932         /* allocate new stream */
933         ts = get_stream(ctx);
934         cs = TCP_STREAM(ts);
935         if (ts == NULL)
936                 return ENFILE;
937
938         /* prepare stream to handle new connection */
939         if (accept_prep_stream(s, st, cs, &so, tms, pi, si) == 0) {
940
941                 /* put new stream in the accept queue */
942                 if (_rte_ring_enqueue_burst(s->rx.q,
943                                 (void * const *)&ts, 1) == 1) {
944                         *csp = cs;
945                         return 0;
946                 }
947
948                 /* cleanup on failure */
949                 tcp_stream_down(cs);
950                 stbl_del_pkt(st, cs->ste, pi);
951                 cs->ste = NULL;
952         }
953
954         tcp_stream_reset(ctx, cs);
955         return ENOBUFS;
956 }
957
958 static inline int
959 data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf *mb, uint32_t hlen,
960         uint32_t *seqn, uint32_t *plen)
961 {
962         uint32_t len, n, seq;
963
964         seq = *seqn;
965         len = *plen;
966
967         rte_pktmbuf_adj(mb, hlen);
968         if (len == 0)
969                 return -ENODATA;
970         /* cut off the start of the packet */
971         else if (tcp_seq_lt(seq, tcb->rcv.nxt)) {
972                 n = tcb->rcv.nxt - seq;
973                 if (n >= len)
974                         return -ENODATA;
975
976                 rte_pktmbuf_adj(mb, n);
977                 *seqn = seq + n;
978                 *plen = len - n;
979         }
980
981         return 0;
982 }
983
984 static inline uint32_t
985 rx_ackdata(struct tle_tcp_stream *s, uint32_t ack)
986 {
987         uint32_t k, n;
988
989         n = ack - (uint32_t)s->tcb.snd.una;
990
991         /* some more data was acked. */
992         if (n != 0) {
993
994                 /* advance SND.UNA and free related packets. */
995                 k = rte_ring_free_count(s->tx.q);
996                 free_una_data(s, n);
997
998                 /* mark the stream as available for writing */
999                 if (rte_ring_free_count(s->tx.q) != 0) {
1000                         if (s->tx.ev != NULL)
1001                                 tle_event_raise(s->tx.ev);
1002                         else if (k == 0 && s->tx.cb.func != NULL)
1003                                 s->tx.cb.func(s->tx.cb.data, &s->s);
1004                 }
1005         }
1006
1007         return n;
1008 }
1009
1010 static void
1011 rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
1012 {
1013         uint32_t state;
1014         int32_t ackfin;
1015
1016         s->tcb.rcv.nxt += 1;
1017
1018         ackfin = (s->tcb.snd.una == s->tcb.snd.fss);
1019         state = s->tcb.state;
1020
1021         if (state == TCP_ST_ESTABLISHED) {
1022                 s->tcb.state = TCP_ST_CLOSE_WAIT;
1023                 /* raise err.ev & err.cb */
1024                 if (s->err.ev != NULL)
1025                         tle_event_raise(s->err.ev);
1026                 else if (s->err.cb.func != NULL)
1027                         s->err.cb.func(s->err.cb.data, &s->s);
1028         } else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) {
1029                 rsp->flags |= TCP_FLAG_ACK;
1030                 if (ackfin != 0) {
1031                         s->tcb.state = TCP_ST_TIME_WAIT;
1032                         s->tcb.snd.rto = TCP_RTO_2MSL;
1033                         timer_reset(s);
1034                 } else
1035                         s->tcb.state = TCP_ST_CLOSING;
1036         } else if (state == TCP_ST_FIN_WAIT_2) {
1037                 rsp->flags |= TCP_FLAG_ACK;
1038                 s->tcb.state = TCP_ST_TIME_WAIT;
1039                 s->tcb.snd.rto = TCP_RTO_2MSL;
1040                 timer_reset(s);
1041         } else if (state == TCP_ST_LAST_ACK && ackfin != 0) {
1042                 stream_term(s);
1043         }
1044 }
1045
1046 /*
1047  * FIN process for ESTABLISHED state
1048  * returns:
1049  * 0 < - error occurred
1050  * 0 - FIN was processed OK, and mbuf can be free/reused.
1051  * 0 > - FIN was processed OK and mbuf can't be free/reused.
1052  */
1053 static inline int
1054 rx_fin(struct tle_tcp_stream *s, uint32_t state,
1055         const union seg_info *si, struct rte_mbuf *mb,
1056         struct resp_info *rsp)
1057 {
1058         uint32_t hlen, plen, seq;
1059         int32_t ret;
1060         union tsopt ts;
1061
1062         hlen = PKT_L234_HLEN(mb);
1063         plen = mb->pkt_len - hlen;
1064         seq = si->seq;
1065
1066         ts = rx_tms_opt(&s->tcb, mb);
1067         ret = rx_check_seqack(&s->tcb, seq, si->ack, plen, ts);
1068         if (ret != 0)
1069                 return ret;
1070
1071         if (state < TCP_ST_ESTABLISHED)
1072                 return -EINVAL;
1073
1074         if (plen != 0) {
1075
1076                 ret = data_pkt_adjust(&s->tcb, mb, hlen, &seq, &plen);
1077                 if (ret != 0)
1078                         return ret;
1079                 if (rx_data_enqueue(s, seq, plen, &mb, 1) != 1)
1080                         return -ENOBUFS;
1081         }
1082
1083         /* process ack here */
1084         rx_ackdata(s, si->ack);
1085
1086         /* some fragments still missing */
1087         if (seq + plen != s->tcb.rcv.nxt) {
1088                 s->tcb.rcv.frs.seq = seq + plen;
1089                 s->tcb.rcv.frs.on = 1;
1090         } else
1091                 rx_fin_state(s, rsp);
1092
1093         return plen;
1094 }
1095
1096 static inline int
1097 rx_rst(struct tle_tcp_stream *s, uint32_t state, uint32_t flags,
1098         const union seg_info *si)
1099 {
1100         int32_t rc;
1101
1102         /*
1103          * RFC 793: In all states except SYN-SENT, all reset (RST) segments
1104          * are validated by checking their SEQ-fields.
1105          * A reset is valid if its sequence number is in the window.
1106          * In the SYN-SENT state (a RST received in response to an initial SYN),
1107          * the RST is acceptable if the ACK field acknowledges the SYN.
1108          */
1109         if (state == TCP_ST_SYN_SENT) {
1110                 rc = ((flags & TCP_FLAG_ACK) == 0 ||
1111                                 si->ack != s->tcb.snd.nxt) ?
1112                         -ERANGE : 0;
1113         }
1114
1115         else
1116                 rc = check_seqn(&s->tcb, si->seq, 0);
1117
1118         if (rc == 0)
1119                 stream_term(s);
1120
1121         return rc;
1122 }
1123
1124 /*
1125  *  check do we have FIN  that was received out-of-order.
1126  *  if yes, try to process it now.
1127  */
1128 static inline void
1129 rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp)
1130 {
1131         if (s->tcb.rcv.frs.on != 0 && s->tcb.rcv.nxt == s->tcb.rcv.frs.seq)
1132                 rx_fin_state(s, rsp);
1133 }
1134
1135 static inline void
1136 dack_info_init(struct dack_info *tack, const struct tcb *tcb)
1137 {
1138         memset(tack, 0, sizeof(*tack));
1139         tack->ack = tcb->snd.una;
1140         tack->segs.dup = tcb->rcv.dupack;
1141         tack->wu.raw = tcb->snd.wu.raw;
1142         tack->wnd = tcb->snd.wnd >> tcb->snd.wscale;
1143 }
1144
1145 static inline void
1146 ack_window_update(struct tcb *tcb, const struct dack_info *tack)
1147 {
1148         tcb->snd.wu.raw = tack->wu.raw;
1149         tcb->snd.wnd = tack->wnd << tcb->snd.wscale;
1150 }
1151
1152 static inline void
1153 ack_cwnd_update(struct tcb *tcb, uint32_t acked, const struct dack_info *tack)
1154 {
1155         uint32_t n;
1156
1157         n = tack->segs.ack * tcb->snd.mss;
1158
1159         /* slow start phase, RFC 5681 3.1 (2)  */
1160         if (tcb->snd.cwnd < tcb->snd.ssthresh)
1161                 tcb->snd.cwnd += RTE_MIN(acked, n);
1162         /* congestion avoidance phase, RFC 5681 3.1 (3) */
1163         else
1164                 tcb->snd.cwnd += RTE_MAX(1U, n * tcb->snd.mss / tcb->snd.cwnd);
1165 }
1166
1167 static inline void
1168 rto_ssthresh_update(struct tcb *tcb)
1169 {
1170         uint32_t k, n;
1171
1172         /* RFC 5681 3.1 (4)  */
1173         n = (tcb->snd.nxt - tcb->snd.una) / 2;
1174         k = 2 * tcb->snd.mss;
1175         tcb->snd.ssthresh = RTE_MAX(n, k);
1176 }
1177
1178 static inline void
1179 rto_cwnd_update(struct tcb *tcb)
1180 {
1181
1182         if (tcb->snd.nb_retx == 0)
1183                 rto_ssthresh_update(tcb);
1184
1185         /*
1186          * RFC 5681 3.1: upon a timeout cwnd MUST be set to
1187          * no more than 1 full-sized segment.
1188          */
1189         tcb->snd.cwnd = tcb->snd.mss;
1190 }
1191
1192 static inline void
1193 ack_info_update(struct dack_info *tack, const union seg_info *si,
1194         int32_t badseq, uint32_t dlen, const union tsopt ts)
1195 {
1196         if (badseq != 0) {
1197                 tack->segs.badseq++;
1198                 return;
1199         }
1200
1201         /* segnt with incoming data */
1202         tack->segs.data += (dlen != 0);
1203
1204         /* segment with newly acked data */
1205         if (tcp_seq_lt(tack->ack, si->ack)) {
1206                 tack->segs.dup = 0;
1207                 tack->segs.ack++;
1208                 tack->ack = si->ack;
1209                 tack->ts = ts;
1210
1211         /*
1212          * RFC 5681: An acknowledgment is considered a "duplicate" when:
1213          * (a) the receiver of the ACK has outstanding data
1214          * (b) the incoming acknowledgment carries no data
1215          * (c) the SYN and FIN bits are both off
1216          * (d) the acknowledgment number is equal to the TCP.UNA
1217          * (e) the advertised window in the incoming acknowledgment equals the
1218          * advertised window in the last incoming acknowledgment.
1219          *
1220          * Here will have only to check only for (b),(d),(e).
1221          * (a) will be checked later for the whole bulk of packets,
1222          * (c) should never happen here.
1223          */
1224         } else if (dlen == 0 && si->wnd == tack->wnd && ++tack->segs.dup == 3) {
1225                 tack->dup3.seg = tack->segs.ack + 1;
1226                 tack->dup3.ack = tack->ack;
1227         }
1228
1229         /*
1230          * RFC 793:
1231          * If SND.UNA < SEG.ACK =< SND.NXT, the send window should be
1232          * updated.  If (SND.WL1 < SEG.SEQ or (SND.WL1 = SEG.SEQ and
1233          * SND.WL2 =< SEG.ACK)), set SND.WND <- SEG.WND, set
1234          * SND.WL1 <- SEG.SEQ, and set SND.WL2 <- SEG.ACK.
1235          */
1236         if (tcp_seq_lt(tack->wu.wl1, si->seq) ||
1237                         (si->seq == tack->wu.wl1 &&
1238                         tcp_seq_leq(tack->wu.wl2, si->ack))) {
1239
1240                 tack->wu.wl1 = si->seq;
1241                 tack->wu.wl2 = si->ack;
1242                 tack->wnd = si->wnd;
1243         }
1244 }
1245
1246 static inline uint32_t
1247 rx_data_ack(struct tle_tcp_stream *s, struct dack_info *tack,
1248         const union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[],
1249         int32_t rc[], uint32_t num)
1250 {
1251         uint32_t i, j, k, n, t;
1252         uint32_t hlen, plen, seq, tlen;
1253         int32_t ret;
1254         union tsopt ts;
1255
1256         k = 0;
1257         for (i = 0; i != num; i = j) {
1258
1259                 hlen = PKT_L234_HLEN(mb[i]);
1260                 plen = mb[i]->pkt_len - hlen;
1261                 seq = si[i].seq;
1262
1263                 ts = rx_tms_opt(&s->tcb, mb[i]);
1264                 ret = rx_check_seqack(&s->tcb, seq, si[i].ack, plen, ts);
1265
1266                 /* account segment received */
1267                 ack_info_update(tack, &si[i], ret != 0, plen, ts);
1268
1269                 if (ret == 0) {
1270                         /* skip duplicate data, if any */
1271                         ret = data_pkt_adjust(&s->tcb, mb[i], hlen,
1272                                 &seq, &plen);
1273                 }
1274
1275                 j = i + 1;
1276                 if (ret != 0) {
1277                         rp[k] = mb[i];
1278                         rc[k] = -ret;
1279                         k++;
1280                         continue;
1281                 }
1282
1283                 /* group sequential packets together. */
1284                 for (tlen = plen; j != num; tlen += plen, j++) {
1285
1286                         hlen = PKT_L234_HLEN(mb[j]);
1287                         plen = mb[j]->pkt_len - hlen;
1288
1289                         /* not consecutive packet */
1290                         if (plen == 0 || seq + tlen != si[j].seq)
1291                                 break;
1292
1293                         /* check SEQ/ACK */
1294                         ts = rx_tms_opt(&s->tcb, mb[j]);
1295                         ret = rx_check_seqack(&s->tcb, si[j].seq, si[j].ack,
1296                                 plen, ts);
1297
1298                         /* account for segment received */
1299                         ack_info_update(tack, &si[j], ret != 0, plen, ts);
1300
1301                         if (ret != 0) {
1302                                 rp[k] = mb[j];
1303                                 rc[k] = -ret;
1304                                 k++;
1305                                 break;
1306                         }
1307                         rte_pktmbuf_adj(mb[j], hlen);
1308                 }
1309
1310                 n = j - i;
1311                 j += (ret != 0);
1312
1313                 /* account for OFO data */
1314                 if (seq != s->tcb.rcv.nxt)
1315                         tack->segs.ofo += n;
1316
1317                 /* enqueue packets */
1318                 t = rx_data_enqueue(s, seq, tlen, mb + i, n);
1319
1320                 /* if we are out of space in stream recv buffer. */
1321                 for (; t != n; t++) {
1322                         rp[k] = mb[i + t];
1323                         rc[k] = -ENOBUFS;
1324                         k++;
1325                 }
1326         }
1327
1328         return num - k;
1329 }
1330
1331 static inline void
1332 start_fast_retransmit(struct tle_tcp_stream *s)
1333 {
1334         struct tcb *tcb;
1335
1336         tcb = &s->tcb;
1337
1338         /* RFC 6582 3.2.2 */
1339         tcb->snd.rcvr = tcb->snd.nxt;
1340         tcb->snd.fastack = 1;
1341
1342         /* RFC 5681 3.2.2 */
1343         rto_ssthresh_update(tcb);
1344
1345         /* RFC 5681 3.2.3 */
1346         tcp_txq_rst_nxt_head(s);
1347         tcb->snd.nxt = tcb->snd.una;
1348         tcb->snd.cwnd = tcb->snd.ssthresh + 3 * tcb->snd.mss;
1349 }
1350
1351 static inline void
1352 stop_fast_retransmit(struct tle_tcp_stream *s)
1353 {
1354         struct tcb *tcb;
1355         uint32_t n;
1356
1357         tcb = &s->tcb;
1358         n = tcb->snd.nxt - tcb->snd.una;
1359         tcb->snd.cwnd = RTE_MIN(tcb->snd.ssthresh,
1360                 RTE_MAX(n, tcb->snd.mss) + tcb->snd.mss);
1361         tcb->snd.fastack = 0;
1362 }
1363
1364 static inline int
1365 in_fast_retransmit(struct tle_tcp_stream *s, uint32_t ack_len, uint32_t ack_num,
1366         uint32_t dup_num)
1367 {
1368         uint32_t n;
1369         struct tcb *tcb;
1370
1371         tcb = &s->tcb;
1372
1373         /* RFC 5682 3.2.3 partial ACK */
1374         if (ack_len != 0) {
1375
1376                 n = ack_num * tcb->snd.mss;
1377                 if (ack_len >= n)
1378                         tcb->snd.cwnd -= ack_len - n;
1379                 else
1380                         tcb->snd.cwnd -= ack_len % tcb->snd.mss;
1381
1382                 /*
1383                  * For the first partial ACK that arrives
1384                  * during fast recovery, also reset the
1385                  * retransmit timer.
1386                  */
1387                 if (tcb->snd.fastack == 1)
1388                         timer_reset(s);
1389
1390                 tcb->snd.fastack += ack_num;
1391                 return 1;
1392
1393         /* RFC 5681 3.2.4 */
1394         } else if (dup_num > 3) {
1395                 s->tcb.snd.cwnd += (dup_num - 3) * tcb->snd.mss;
1396                 return 1;
1397         }
1398
1399         return 0;
1400 }
1401
1402 static inline int
1403 process_ack(struct tle_tcp_stream *s, uint32_t acked,
1404         const struct dack_info *tack)
1405 {
1406         int32_t send;
1407
1408         send = 0;
1409
1410         /* normal mode */
1411         if (s->tcb.snd.fastack == 0) {
1412
1413                 send = 1;
1414
1415                 /* RFC 6582 3.2.2 switch to fast retransmit mode */
1416                 if (tack->dup3.seg != 0 && s->tcb.snd.una != s->tcb.snd.nxt &&
1417                                 s->tcb.snd.una >= s->tcb.snd.rcvr) {
1418
1419                         start_fast_retransmit(s);
1420                         in_fast_retransmit(s,
1421                                 tack->ack - tack->dup3.ack,
1422                                 tack->segs.ack - tack->dup3.seg - 1,
1423                                 tack->segs.dup);
1424
1425                 /* remain in normal mode */
1426                 } else if (acked != 0) {
1427                         ack_cwnd_update(&s->tcb, acked, tack);
1428                         timer_stop(s);
1429                 }
1430
1431         /* fast retransmit mode */
1432         } else {
1433
1434                 /* remain in fast retransmit mode */
1435                 if (s->tcb.snd.una < s->tcb.snd.rcvr) {
1436
1437                         send = in_fast_retransmit(s, acked, tack->segs.ack,
1438                                 tack->segs.dup);
1439                 } else {
1440                         /* RFC 5682 3.2.3 full ACK */
1441                         stop_fast_retransmit(s);
1442                         timer_stop(s);
1443
1444                         /* if we have another series of dup ACKs */
1445                         if (tack->dup3.seg != 0 &&
1446                                         s->tcb.snd.una != s->tcb.snd.nxt &&
1447                                         tcp_seq_leq((uint32_t)s->tcb.snd.rcvr,
1448                                         tack->dup3.ack)) {
1449
1450                                 /* restart fast retransmit again. */
1451                                 start_fast_retransmit(s);
1452                                 send = in_fast_retransmit(s,
1453                                         tack->ack - tack->dup3.ack,
1454                                         tack->segs.ack - tack->dup3.seg - 1,
1455                                         tack->segs.dup);
1456                         }
1457                 }
1458         }
1459
1460         return send;
1461 }
1462
1463 /*
1464  * our FIN was acked, stop rto timer, change stream state,
1465  * and possibly close the stream.
1466  */
1467 static inline void
1468 rx_ackfin(struct tle_tcp_stream *s)
1469 {
1470         uint32_t state;
1471
1472         s->tcb.snd.una = s->tcb.snd.fss;
1473         empty_mbuf_ring(s->tx.q);
1474
1475         state = s->tcb.state;
1476         if (state == TCP_ST_LAST_ACK)
1477                 stream_term(s);
1478         else if (state == TCP_ST_FIN_WAIT_1) {
1479                 timer_stop(s);
1480                 s->tcb.state = TCP_ST_FIN_WAIT_2;
1481         } else if (state == TCP_ST_CLOSING) {
1482                 s->tcb.state = TCP_ST_TIME_WAIT;
1483                 s->tcb.snd.rto = TCP_RTO_2MSL;
1484                 timer_reset(s);
1485         }
1486 }
1487
1488 static inline void
1489 rx_process_ack(struct tle_tcp_stream *s, uint32_t ts,
1490         const struct dack_info *tack)
1491 {
1492         int32_t send;
1493         uint32_t n;
1494
1495         s->tcb.rcv.dupack = tack->segs.dup;
1496
1497         n = rx_ackdata(s, tack->ack);
1498         send = process_ack(s, n, tack);
1499
1500         /* try to send more data. */
1501         if ((n != 0 || send != 0) && tcp_txq_nxt_cnt(s) != 0)
1502                 txs_enqueue(s->s.ctx, s);
1503
1504         /* restart RTO timer. */
1505         if (s->tcb.snd.nxt != s->tcb.snd.una)
1506                 timer_start(s);
1507
1508         /* update rto, if fresh packet is here then calculate rtt */
1509         if (tack->ts.ecr != 0)
1510                 rto_estimate(&s->tcb, ts - tack->ts.ecr);
1511 }
1512
1513 /*
1514  * process <SYN,ACK>
1515  * returns negative value on failure, or zero on success.
1516  */
1517 static inline int
1518 rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
1519         const union seg_info *si, struct rte_mbuf *mb,
1520         struct resp_info *rsp)
1521 {
1522         struct syn_opts so;
1523         struct tcp_hdr *th;
1524
1525         if (state != TCP_ST_SYN_SENT)
1526                 return -EINVAL;
1527
1528         /* invalid SEG.SEQ */
1529         if (si->ack != (uint32_t)s->tcb.snd.nxt) {
1530                 rsp->flags = TCP_FLAG_RST;
1531                 return 0;
1532         }
1533
1534         th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *,
1535                 mb->l2_len + mb->l3_len);
1536         get_syn_opts(&so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th));
1537
1538         s->tcb.so = so;
1539
1540         s->tcb.snd.una = s->tcb.snd.nxt;
1541         s->tcb.snd.mss = calc_smss(so.mss, &s->tx.dst);
1542         s->tcb.snd.wnd = si->wnd << so.wscale;
1543         s->tcb.snd.wu.wl1 = si->seq;
1544         s->tcb.snd.wu.wl2 = si->ack;
1545         s->tcb.snd.wscale = so.wscale;
1546
1547         /* setup congestion variables */
1548         s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss);
1549         s->tcb.snd.ssthresh = s->tcb.snd.wnd;
1550
1551         s->tcb.rcv.ts = so.ts.val;
1552         s->tcb.rcv.irs = si->seq;
1553         s->tcb.rcv.nxt = si->seq + 1;
1554
1555         /* if peer doesn't support WSCALE opt, recalculate RCV.WND */
1556         s->tcb.rcv.wscale = (so.wscale == TCP_WSCALE_NONE) ?
1557                 TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
1558         s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
1559
1560         /* calculate initial rto */
1561         rto_estimate(&s->tcb, ts - s->tcb.snd.ts);
1562
1563         rsp->flags |= TCP_FLAG_ACK;
1564
1565         timer_stop(s);
1566         s->tcb.state = TCP_ST_ESTABLISHED;
1567         rte_smp_wmb();
1568
1569         if (s->tx.ev != NULL)
1570                 tle_event_raise(s->tx.ev);
1571         else if (s->tx.cb.func != NULL)
1572                 s->tx.cb.func(s->tx.cb.data, &s->s);
1573
1574         return 0;
1575 }
1576
1577 static inline uint32_t
1578 rx_stream(struct tle_tcp_stream *s, uint32_t ts,
1579         const union pkt_info *pi, const union seg_info si[],
1580         struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1581         uint32_t num)
1582 {
1583         uint32_t i, k, n, state;
1584         int32_t ret;
1585         struct resp_info rsp;
1586         struct dack_info tack;
1587
1588         k = 0;
1589         rsp.flags = 0;
1590
1591         state = s->tcb.state;
1592
1593         /*
1594          * first check for the states/flags where we don't
1595          * expect groups of packets.
1596          */
1597
1598         /* process RST */
1599         if ((pi->tf.flags & TCP_FLAG_RST) != 0) {
1600                 for (i = 0;
1601                                 i != num &&
1602                                 rx_rst(s, state, pi->tf.flags, &si[i]);
1603                                 i++)
1604                         ;
1605                 i = 0;
1606
1607         /* RFC 793: if the ACK bit is off drop the segment and return */
1608         } else if ((pi->tf.flags & TCP_FLAG_ACK) == 0) {
1609                 i = 0;
1610         /*
1611          * first check for the states/flags where we don't
1612          * expect groups of packets.
1613          */
1614
1615         /* process <SYN,ACK> */
1616         } else if ((pi->tf.flags & TCP_FLAG_SYN) != 0) {
1617                 ret = 0;
1618                 for (i = 0; i != num; i++) {
1619                         ret = rx_synack(s, ts, state, &si[i], mb[i], &rsp);
1620                         if (ret == 0)
1621                                 break;
1622
1623                         rc[k] = -ret;
1624                         rp[k] = mb[i];
1625                         k++;
1626                 }
1627
1628         /* process FIN */
1629         } else if ((pi->tf.flags & TCP_FLAG_FIN) != 0) {
1630                 ret = 0;
1631                 for (i = 0; i != num; i++) {
1632                         ret = rx_fin(s, state, &si[i], mb[i], &rsp);
1633                         if (ret >= 0)
1634                                 break;
1635
1636                         rc[k] = -ret;
1637                         rp[k] = mb[i];
1638                         k++;
1639                 }
1640                 i += (ret > 0);
1641
1642         /* normal data/ack packets */
1643         } else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
1644
1645                 /* process incoming data packets. */
1646                 dack_info_init(&tack, &s->tcb);
1647                 n = rx_data_ack(s, &tack, si, mb, rp, rc, num);
1648
1649                 /* follow up actions based on aggregated information */
1650
1651                 /* update SND.WND */
1652                 ack_window_update(&s->tcb, &tack);
1653
1654                 /*
1655                  * fast-path: all data & FIN was already sent out
1656                  * and now is acknowledged.
1657                  */
1658                 if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1659                                 tack.ack == (uint32_t) s->tcb.snd.nxt)
1660                         rx_ackfin(s);
1661                 else
1662                         rx_process_ack(s, ts, &tack);
1663
1664                 /*
1665                  * send an immediate ACK if either:
1666                  * - received segment with invalid seq/ack number
1667                  * - received segment with OFO data
1668                  * - received segment with INO data and no TX is scheduled
1669                  *   for that stream.
1670                  */
1671                 if (tack.segs.badseq != 0 || tack.segs.ofo != 0 ||
1672                                 (tack.segs.data != 0 &&
1673                                 rte_atomic32_read(&s->tx.arm) == 0))
1674                         rsp.flags |= TCP_FLAG_ACK;
1675
1676                 rx_ofo_fin(s, &rsp);
1677
1678                 k += num - n;
1679                 i = num;
1680
1681         /* unhandled state, drop all packets. */
1682         } else
1683                 i = 0;
1684
1685         /* we have a response packet to send. */
1686         if (rsp.flags == TCP_FLAG_RST) {
1687                 send_rst(s, si[i].ack);
1688                 stream_term(s);
1689         } else if (rsp.flags != 0) {
1690                 send_ack(s, ts, rsp.flags);
1691
1692                 /* start the timer for FIN packet */
1693                 if ((rsp.flags & TCP_FLAG_FIN) != 0)
1694                         timer_reset(s);
1695         }
1696
1697         /* unprocessed packets */
1698         for (; i != num; i++, k++) {
1699                 rc[k] = EINVAL;
1700                 rp[k] = mb[i];
1701         }
1702
1703         return num - k;
1704 }
1705
1706 static inline uint32_t
1707 rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
1708         const union pkt_info *pi, const union seg_info si[],
1709         struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1710         uint32_t num)
1711 {
1712         uint32_t i;
1713
1714         if (rwl_acquire(&s->rx.use) > 0) {
1715                 i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1716                 rwl_release(&s->rx.use);
1717                 return i;
1718         }
1719
1720         for (i = 0; i != num; i++) {
1721                 rc[i] = ENOENT;
1722                 rp[i] = mb[i];
1723         }
1724         return 0;
1725 }
1726
1727 static inline uint32_t
1728 rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
1729         const union pkt_info pi[], const union seg_info si[],
1730         struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1731         uint32_t num)
1732 {
1733         struct tle_tcp_stream *cs, *s;
1734         uint32_t i, k, n, state;
1735         int32_t ret;
1736
1737         s = rx_obtain_stream(dev, st, &pi[0], type);
1738         if (s == NULL) {
1739                 for (i = 0; i != num; i++) {
1740                         rc[i] = ENOENT;
1741                         rp[i] = mb[i];
1742                 }
1743                 return 0;
1744         }
1745
1746         k = 0;
1747         state = s->tcb.state;
1748
1749         if (state == TCP_ST_LISTEN) {
1750
1751                 /* one connection per flow */
1752                 cs = NULL;
1753                 ret = -EINVAL;
1754                 for (i = 0; i != num; i++) {
1755
1756                         ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs);
1757
1758                         /* valid packet encountered */
1759                         if (ret >= 0)
1760                                 break;
1761
1762                         /* invalid packet, keep trying to find a proper one */
1763                         rc[k] = -ret;
1764                         rp[k] = mb[i];
1765                         k++;
1766                 }
1767
1768                 /* packet is valid, but we are out of streams to serve it */
1769                 if (ret > 0) {
1770                         for (; i != num; i++, k++) {
1771                                 rc[k] = ret;
1772                                 rp[k] = mb[i];
1773                         }
1774                 /* new stream is accepted */
1775                 } else if (ret == 0) {
1776
1777                         /* inform listen stream about new connections */
1778                         if (s->rx.ev != NULL)
1779                                 tle_event_raise(s->rx.ev);
1780                         else if (s->rx.cb.func != NULL &&
1781                                         rte_ring_count(s->rx.q) == 1)
1782                                 s->rx.cb.func(s->rx.cb.data, &s->s);
1783
1784                         /* if there is no data, drop current packet */
1785                         if (PKT_L4_PLEN(mb[i]) == 0) {
1786                                 rc[k] = ENODATA;
1787                                 rp[k++] = mb[i++];
1788                         }
1789
1790                         /*  process remaining packets for that stream */
1791                         if (num != i) {
1792                                 n = rx_new_stream(cs, ts, pi + i, si + i,
1793                                         mb + i, rp + k, rc + k, num - i);
1794                                 k += num - n - i;
1795                         }
1796                 }
1797
1798         } else {
1799                 i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1800                 k = num - i;
1801         }
1802
1803         rwl_release(&s->rx.use);
1804         return num - k;
1805 }
1806
1807
1808 static inline uint32_t
1809 rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
1810         const union pkt_info pi[], const union seg_info si[],
1811         struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1812         uint32_t num)
1813 {
1814         struct tle_tcp_stream *s;
1815         uint32_t i, k;
1816         int32_t ret;
1817
1818         s = rx_obtain_listen_stream(dev, &pi[0], type);
1819         if (s == NULL) {
1820                 for (i = 0; i != num; i++) {
1821                         rc[i] = ENOENT;
1822                         rp[i] = mb[i];
1823                 }
1824                 return 0;
1825         }
1826
1827         k = 0;
1828         for (i = 0; i != num; i++) {
1829
1830                 /* check that this remote is allowed to connect */
1831                 if (rx_check_stream(s, &pi[i]) != 0)
1832                         ret = -ENOENT;
1833                 else
1834                         /* syncokie: reply with <SYN,ACK> */
1835                         ret = sync_ack(s, &pi[i], &si[i], ts, mb[i]);
1836
1837                 if (ret != 0) {
1838                         rc[k] = -ret;
1839                         rp[k] = mb[i];
1840                         k++;
1841                 }
1842         }
1843
1844         rwl_release(&s->rx.use);
1845         return num - k;
1846 }
1847
1848 uint16_t
1849 tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
1850         struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
1851 {
1852         struct stbl *st;
1853         uint32_t i, j, k, n, t, ts;
1854         uint64_t csf;
1855         union pkt_info pi[num];
1856         union seg_info si[num];
1857         union {
1858                 uint8_t t[TLE_VNUM];
1859                 uint32_t raw;
1860         } stu;
1861
1862         ts = tcp_get_tms();
1863         st = CTX_TCP_STLB(dev->ctx);
1864
1865         stu.raw = 0;
1866
1867         /* extract packet info and check the L3/L4 csums */
1868         for (i = 0; i != num; i++) {
1869
1870                 get_pkt_info(pkt[i], &pi[i], &si[i]);
1871
1872                 t = pi[i].tf.type;
1873                 csf = dev->rx.ol_flags[t] &
1874                         (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
1875
1876                 /* check csums in SW */
1877                 if (pi[i].csf == 0 && csf != 0 && check_pkt_csum(pkt[i], csf,
1878                                 pi[i].tf.type, IPPROTO_TCP) != 0)
1879                         pi[i].csf = csf;
1880
1881                 stu.t[t] = 1;
1882         }
1883
1884         if (stu.t[TLE_V4] != 0)
1885                 stbl_lock(st, TLE_V4);
1886         if (stu.t[TLE_V6] != 0)
1887                 stbl_lock(st, TLE_V6);
1888
1889         k = 0;
1890         for (i = 0; i != num; i += j) {
1891
1892                 t = pi[i].tf.type;
1893
1894                 /*basic checks for incoming packet */
1895                 if (t >= TLE_VNUM || pi[i].csf != 0 || dev->dp[t] == NULL) {
1896                         rc[k] = EINVAL;
1897                         rp[k] = pkt[i];
1898                         j = 1;
1899                         k++;
1900                 /* process input SYN packets */
1901                 } else if (pi[i].tf.flags == TCP_FLAG_SYN) {
1902                         j = pkt_info_bulk_syneq(pi + i, num - i);
1903                         n = rx_syn(dev, t, ts, pi + i, si + i, pkt + i,
1904                                 rp + k, rc + k, j);
1905                         k += j - n;
1906                 } else {
1907                         j = pkt_info_bulk_eq(pi + i, num - i);
1908                         n = rx_postsyn(dev, st, t, ts, pi + i, si + i, pkt + i,
1909                                 rp + k, rc + k, j);
1910                         k += j - n;
1911                 }
1912         }
1913
1914         if (stu.t[TLE_V4] != 0)
1915                 stbl_unlock(st, TLE_V4);
1916         if (stu.t[TLE_V6] != 0)
1917                 stbl_unlock(st, TLE_V6);
1918
1919         return num - k;
1920 }
1921
1922 uint16_t
1923 tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
1924         uint32_t num)
1925 {
1926         uint32_t n;
1927         struct tle_tcp_stream *s;
1928
1929         s = TCP_STREAM(ts);
1930         n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)rs, num);
1931         if (n == 0)
1932                 return 0;
1933
1934         /*
1935          * if we still have packets to read,
1936          * then rearm stream RX event.
1937          */
1938         if (n == num && rte_ring_count(s->rx.q) != 0) {
1939                 if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
1940                         tle_event_raise(s->rx.ev);
1941                 rwl_release(&s->rx.use);
1942         }
1943
1944         return n;
1945 }
1946
1947 uint16_t
1948 tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
1949 {
1950         uint32_t i, j, k, n;
1951         struct tle_drb *drb[num];
1952         struct tle_tcp_stream *s;
1953
1954         /* extract packets from device TX queue. */
1955
1956         k = num;
1957         n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
1958                 num, drb, &k);
1959
1960         if (n == 0)
1961                 return 0;
1962
1963         /* free empty drbs and notify related streams. */
1964
1965         for (i = 0; i != k; i = j) {
1966                 s = drb[i]->udata;
1967                 for (j = i + 1; j != k && s == drb[j]->udata; j++)
1968                         ;
1969                 stream_drb_free(s, drb + i, j - i);
1970         }
1971
1972         return n;
1973 }
1974
1975 static inline void
1976 stream_fill_pkt_info(const struct tle_tcp_stream *s, union pkt_info *pi)
1977 {
1978         if (s->s.type == TLE_V4)
1979                 pi->addr4 = s->s.ipv4.addr;
1980         else
1981                 pi->addr6 = &s->s.ipv6.addr;
1982
1983         pi->port = s->s.port;
1984         pi->tf.type = s->s.type;
1985 }
1986
1987 static int
1988 stream_fill_addr(struct tle_tcp_stream *s, const struct sockaddr *addr)
1989 {
1990         const struct sockaddr_in *in4;
1991         const struct sockaddr_in6 *in6;
1992         const struct tle_dev_param *prm;
1993         int32_t rc;
1994
1995         rc = 0;
1996         s->s.pmsk.raw = UINT32_MAX;
1997
1998         /* setup L4 src ports and src address fields. */
1999         if (s->s.type == TLE_V4) {
2000                 in4 = (const struct sockaddr_in *)addr;
2001                 if (in4->sin_addr.s_addr == INADDR_ANY || in4->sin_port == 0)
2002                         return -EINVAL;
2003
2004                 s->s.port.src = in4->sin_port;
2005                 s->s.ipv4.addr.src = in4->sin_addr.s_addr;
2006                 s->s.ipv4.mask.src = INADDR_NONE;
2007                 s->s.ipv4.mask.dst = INADDR_NONE;
2008
2009         } else if (s->s.type == TLE_V6) {
2010                 in6 = (const struct sockaddr_in6 *)addr;
2011                 if (memcmp(&in6->sin6_addr, &tle_ipv6_any,
2012                                 sizeof(tle_ipv6_any)) == 0 ||
2013                                 in6->sin6_port == 0)
2014                         return -EINVAL;
2015
2016                 s->s.port.src = in6->sin6_port;
2017                 rte_memcpy(&s->s.ipv6.addr.src, &in6->sin6_addr,
2018                         sizeof(s->s.ipv6.addr.src));
2019                 rte_memcpy(&s->s.ipv6.mask.src, &tle_ipv6_none,
2020                         sizeof(s->s.ipv6.mask.src));
2021                 rte_memcpy(&s->s.ipv6.mask.dst, &tle_ipv6_none,
2022                         sizeof(s->s.ipv6.mask.dst));
2023         }
2024
2025         /* setup the destination device. */
2026         rc = stream_fill_dest(s);
2027         if (rc != 0)
2028                 return rc;
2029
2030         /* setup L4 dst address from device param */
2031         prm = &s->tx.dst.dev->prm;
2032         if (s->s.type == TLE_V4) {
2033                 if (s->s.ipv4.addr.dst == INADDR_ANY)
2034                         s->s.ipv4.addr.dst = prm->local_addr4.s_addr;
2035         } else if (memcmp(&s->s.ipv6.addr.dst, &tle_ipv6_any,
2036                         sizeof(tle_ipv6_any)) == 0)
2037                 memcpy(&s->s.ipv6.addr.dst, &prm->local_addr6,
2038                         sizeof(s->s.ipv6.addr.dst));
2039
2040         return rc;
2041 }
2042
2043 static inline int
2044 tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
2045 {
2046         int32_t rc;
2047         uint32_t tms, seq;
2048         union pkt_info pi;
2049         struct stbl *st;
2050         struct stbl_entry *se;
2051
2052         /* fill stream address */
2053         rc = stream_fill_addr(s, addr);
2054         if (rc != 0)
2055                 return rc;
2056
2057         /* fill pkt info to generate seq.*/
2058         stream_fill_pkt_info(s, &pi);
2059
2060         tms = tcp_get_tms();
2061         s->tcb.so.ts.val = tms;
2062         s->tcb.so.ts.ecr = 0;
2063         s->tcb.so.wscale = TCP_WSCALE_DEFAULT;
2064         s->tcb.so.mss = calc_smss(s->tx.dst.mtu, &s->tx.dst);
2065
2066         /* note that rcv.nxt is 0 here for sync_gen_seq.*/
2067         seq = sync_gen_seq(&pi, s->tcb.rcv.nxt, tms, s->tcb.so.mss,
2068                                 s->s.ctx->prm.hash_alg,
2069                                 &s->s.ctx->prm.secret_key);
2070         s->tcb.snd.iss = seq;
2071         s->tcb.snd.rcvr = seq;
2072         s->tcb.snd.una = seq;
2073         s->tcb.snd.nxt = seq + 1;
2074         s->tcb.snd.rto = TCP_RTO_DEFAULT;
2075         s->tcb.snd.ts = tms;
2076
2077         s->tcb.rcv.mss = s->tcb.so.mss;
2078         s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT;
2079         s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
2080         s->tcb.rcv.ts = 0;
2081
2082         /* add the stream in stream table */
2083         st = CTX_TCP_STLB(s->s.ctx);
2084         se = stbl_add_stream_lock(st, s);
2085         if (se == NULL)
2086                 return -ENOBUFS;
2087         s->ste = se;
2088
2089         /* put stream into the to-send queue */
2090         txs_enqueue(s->s.ctx, s);
2091
2092         return 0;
2093 }
2094
2095 int
2096 tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
2097 {
2098         struct tle_tcp_stream *s;
2099         uint32_t type;
2100         int32_t rc;
2101
2102         if (ts == NULL || addr == NULL)
2103                 return -EINVAL;
2104
2105         s = TCP_STREAM(ts);
2106         type = s->s.type;
2107         if (type >= TLE_VNUM)
2108                 return -EINVAL;
2109
2110         if (rwl_try_acquire(&s->tx.use) > 0) {
2111                 rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
2112                         TCP_ST_SYN_SENT);
2113                 rc = (rc == 0) ? -EDEADLK : 0;
2114         } else
2115                 rc = -EINVAL;
2116
2117         if (rc != 0) {
2118                 rwl_release(&s->tx.use);
2119                 return rc;
2120         }
2121
2122         /* fill stream, prepare and transmit syn pkt */
2123         s->tcb.uop |= TCP_OP_CONNECT;
2124         rc = tx_syn(s, addr);
2125         rwl_release(&s->tx.use);
2126
2127         /* error happened, do a cleanup */
2128         if (rc != 0)
2129                 tle_tcp_stream_close(ts);
2130
2131         return rc;
2132 }
2133
2134 uint16_t
2135 tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2136 {
2137         uint32_t n;
2138         struct tle_tcp_stream *s;
2139
2140         s = TCP_STREAM(ts);
2141         n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
2142         if (n == 0)
2143                 return 0;
2144
2145         /*
2146          * if we still have packets to read,
2147          * then rearm stream RX event.
2148          */
2149         if (n == num && rte_ring_count(s->rx.q) != 0) {
2150                 if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
2151                         tle_event_raise(s->rx.ev);
2152                 rwl_release(&s->rx.use);
2153         }
2154
2155         return n;
2156 }
2157
2158 static inline int32_t
2159 tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
2160         struct rte_mbuf *segs[], uint32_t num)
2161 {
2162         uint32_t i;
2163         int32_t rc;
2164
2165         for (i = 0; i != num; i++) {
2166                 /* Build L2/L3/L4 header */
2167                 rc = tcp_fill_mbuf(segs[i], s, &s->tx.dst, ol_flags, s->s.port,
2168                         0, TCP_FLAG_ACK, 0, 0);
2169                 if (rc != 0) {
2170                         free_segments(segs, num);
2171                         break;
2172                 }
2173         }
2174
2175         if (i == num) {
2176                 /* queue packets for further transmission. */
2177                 rc = _rte_ring_mp_enqueue_bulk(s->tx.q, (void **)segs, num);
2178                 if (rc != 0)
2179                         free_segments(segs, num);
2180         }
2181
2182         return rc;
2183 }
2184
2185 uint16_t
2186 tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2187 {
2188         uint32_t i, j, k, mss, n, state, type;
2189         int32_t rc;
2190         uint64_t ol_flags;
2191         struct tle_tcp_stream *s;
2192         struct tle_dev *dev;
2193         struct rte_mbuf *segs[TCP_MAX_PKT_SEG];
2194
2195         s = TCP_STREAM(ts);
2196
2197         /* mark stream as not closable. */
2198         if (rwl_acquire(&s->tx.use) < 0) {
2199                 rte_errno = EAGAIN;
2200                 return 0;
2201         }
2202
2203         state = s->tcb.state;
2204         if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
2205                 rte_errno = ENOTCONN;
2206                 rwl_release(&s->tx.use);
2207                 return 0;
2208         }
2209
2210         mss = s->tcb.snd.mss;
2211         dev = s->tx.dst.dev;
2212         type = s->s.type;
2213         ol_flags = dev->tx.ol_flags[type];
2214
2215         k = 0;
2216         rc = 0;
2217         while (k != num) {
2218                 /* prepare and check for TX */
2219                 for (i = k; i != num; i++) {
2220                         if (pkt[i]->pkt_len > mss ||
2221                                         pkt[i]->nb_segs > TCP_MAX_PKT_SEG)
2222                                 break;
2223                         rc = tcp_fill_mbuf(pkt[i], s, &s->tx.dst, ol_flags,
2224                                 s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2225                         if (rc != 0)
2226                                 break;
2227                 }
2228
2229                 if (i != k) {
2230                         /* queue packets for further transmission. */
2231                         n = _rte_ring_mp_enqueue_burst(s->tx.q,
2232                                 (void **)pkt + k, (i - k));
2233                         k += n;
2234
2235                         /*
2236                          * for unsent, but already modified packets:
2237                          * remove pkt l2/l3 headers, restore ol_flags
2238                          */
2239                         if (i != k) {
2240                                 ol_flags = ~dev->tx.ol_flags[type];
2241                                 for (j = k; j != i; j++) {
2242                                         rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len +
2243                                                 pkt[j]->l3_len +
2244                                                 pkt[j]->l4_len);
2245                                         pkt[j]->ol_flags &= ol_flags;
2246                                 }
2247                                 break;
2248                         }
2249                 }
2250
2251                 if (rc != 0) {
2252                         rte_errno = -rc;
2253                         break;
2254
2255                 /* segment large packet and enqueue for sending */
2256                 } else if (i != num) {
2257                         /* segment the packet. */
2258                         rc = tcp_segmentation(pkt[i], segs, RTE_DIM(segs),
2259                                 &s->tx.dst, mss);
2260                         if (rc < 0) {
2261                                 rte_errno = -rc;
2262                                 break;
2263                         }
2264
2265                         rc = tx_segments(s, dev->tx.ol_flags[type], segs, rc);
2266                         if (rc == 0) {
2267                                 /* free the large mbuf */
2268                                 rte_pktmbuf_free(pkt[i]);
2269                                 /* set the mbuf as consumed */
2270                                 k++;
2271                         } else
2272                                 /* no space left in tx queue */
2273                                 break;
2274                 }
2275         }
2276
2277         /* notify BE about more data to send */
2278         if (k != 0)
2279                 txs_enqueue(s->s.ctx, s);
2280         /* if possible, re-arm stream write event. */
2281         if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2282                 tle_event_raise(s->tx.ev);
2283
2284         rwl_release(&s->tx.use);
2285
2286         return k;
2287 }
2288
2289 /* send data and FIN (if needed) */
2290 static inline void
2291 tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
2292 {
2293         /* try to send some data */
2294         tx_nxt_data(s, tms);
2295
2296         /* we also have to send a FIN */
2297         if (state != TCP_ST_ESTABLISHED &&
2298                         state != TCP_ST_CLOSE_WAIT &&
2299                         tcp_txq_nxt_cnt(s) == 0 &&
2300                         s->tcb.snd.fss != s->tcb.snd.nxt) {
2301                 s->tcb.snd.fss = ++s->tcb.snd.nxt;
2302                 send_ack(s, tms, TCP_FLAG_FIN | TCP_FLAG_ACK);
2303         }
2304 }
2305
2306 static inline void
2307 tx_stream(struct tle_tcp_stream *s, uint32_t tms)
2308 {
2309         uint32_t state;
2310
2311         state = s->tcb.state;
2312
2313         if (state == TCP_ST_SYN_SENT) {
2314                 /* send the SYN, start the rto timer */
2315                 send_ack(s, tms, TCP_FLAG_SYN);
2316                 timer_start(s);
2317
2318         } else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2319
2320                 tx_data_fin(s, tms, state);
2321
2322                 /* start RTO timer. */
2323                 if (s->tcb.snd.nxt != s->tcb.snd.una)
2324                         timer_start(s);
2325         }
2326 }
2327
2328 static inline void
2329 rto_stream(struct tle_tcp_stream *s, uint32_t tms)
2330 {
2331         uint32_t state;
2332
2333         state = s->tcb.state;
2334
2335         TCP_LOG(DEBUG, "%s(%p, tms=%u): state=%u, "
2336                 "retx=%u, retm=%u, "
2337                 "rto=%u, snd.ts=%u, tmo=%u, "
2338                 "snd.nxt=%lu, snd.una=%lu, flight_size=%lu, "
2339                 "snd.rcvr=%lu, snd.fastack=%u, "
2340                 "wnd=%u, cwnd=%u, ssthresh=%u, "
2341                 "bytes sent=%lu, pkt remain=%u;\n",
2342                 __func__, s, tms, s->tcb.state,
2343                 s->tcb.snd.nb_retx, s->tcb.snd.nb_retm,
2344                 s->tcb.snd.rto, s->tcb.snd.ts, tms - s->tcb.snd.ts,
2345                 s->tcb.snd.nxt, s->tcb.snd.una, s->tcb.snd.nxt - s->tcb.snd.una,
2346                 s->tcb.snd.rcvr, s->tcb.snd.fastack,
2347                 s->tcb.snd.wnd, s->tcb.snd.cwnd, s->tcb.snd.ssthresh,
2348                 s->tcb.snd.nxt - s->tcb.snd.iss, tcp_txq_nxt_cnt(s));
2349
2350         if (s->tcb.snd.nb_retx < s->tcb.snd.nb_retm) {
2351
2352                 if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2353
2354                         /* update SND.CWD and SND.SSTHRESH */
2355                         rto_cwnd_update(&s->tcb);
2356
2357                         /* RFC 6582 3.2.4 */
2358                         s->tcb.snd.rcvr = s->tcb.snd.nxt;
2359                         s->tcb.snd.fastack = 0;
2360
2361                         /* restart from last acked data */
2362                         tcp_txq_rst_nxt_head(s);
2363                         s->tcb.snd.nxt = s->tcb.snd.una;
2364
2365                         tx_data_fin(s, tms, state);
2366
2367                 } else if (state == TCP_ST_SYN_SENT) {
2368                         /* resending SYN */
2369                         s->tcb.so.ts.val = tms;
2370                         send_ack(s, tms, TCP_FLAG_SYN);
2371
2372                 } else if (state == TCP_ST_TIME_WAIT) {
2373                         stream_term(s);
2374                 }
2375
2376                 /* RFC6298:5.5 back off the timer */
2377                 s->tcb.snd.rto = rto_roundup(2 * s->tcb.snd.rto);
2378                 s->tcb.snd.nb_retx++;
2379                 timer_restart(s);
2380
2381         } else {
2382                 send_rst(s, s->tcb.snd.una);
2383                 stream_term(s);
2384         }
2385 }
2386
2387 int
2388 tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
2389 {
2390         uint32_t i, k, tms;
2391         struct sdr *dr;
2392         struct tle_timer_wheel *tw;
2393         struct tle_stream *p;
2394         struct tle_tcp_stream *s, *rs[num];
2395
2396         /* process streams with RTO exipred */
2397
2398         tw = CTX_TCP_TMWHL(ctx);
2399         tms = tcp_get_tms();
2400         tle_timer_expire(tw, tms);
2401
2402         k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs));
2403
2404         for (i = 0; i != k; i++) {
2405
2406                 s = rs[i];
2407                 s->timer.handle = NULL;
2408                 if (rwl_try_acquire(&s->tx.use) > 0)
2409                         rto_stream(s, tms);
2410                 rwl_release(&s->tx.use);
2411         }
2412
2413         /* process streams from to-send queue */
2414
2415         k = txs_dequeue_bulk(ctx, rs, RTE_DIM(rs));
2416
2417         for (i = 0; i != k; i++) {
2418
2419                 s = rs[i];
2420                 rte_atomic32_set(&s->tx.arm, 0);
2421
2422                 if (rwl_try_acquire(&s->tx.use) > 0)
2423                         tx_stream(s, tms);
2424                 else
2425                         txs_enqueue(s->s.ctx, s);
2426                 rwl_release(&s->tx.use);
2427         }
2428
2429         /* collect streams to close from the death row */
2430
2431         dr = CTX_TCP_SDR(ctx);
2432         for (k = 0, p = STAILQ_FIRST(&dr->be);
2433                         k != num && p != NULL;
2434                         k++, p = STAILQ_NEXT(p, link))
2435                 rs[k] = TCP_STREAM(p);
2436
2437         if (p == NULL)
2438                 STAILQ_INIT(&dr->be);
2439         else
2440                 STAILQ_FIRST(&dr->be) = p;
2441
2442         /* cleanup closed streams */
2443         for (i = 0; i != k; i++) {
2444                 s = rs[i];
2445                 tcp_stream_down(s);
2446                 tcp_stream_reset(ctx, s);
2447         }
2448
2449         return 0;
2450 }