2 * Copyright (c) 2016-2017 Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
16 #include <rte_errno.h>
17 #include <rte_ethdev.h>
19 #include <rte_ip_frag.h>
22 #include "tcp_stream.h"
23 #include "tcp_timer.h"
24 #include "stream_table.h"
25 #include "syncookie.h"
30 #include "tcp_tx_seg.h"
32 #define TCP_MAX_PKT_SEG 0x20
35 * checks if input TCP ports and IP addresses match given stream.
36 * returns zero on success.
39 rx_check_stream(const struct tle_tcp_stream *s, const union pkt_info *pi)
43 if (pi->tf.type == TLE_V4)
44 rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
45 (pi->addr4.raw & s->s.ipv4.mask.raw) !=
48 rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
49 ymm_mask_cmp(&pi->addr6->raw, &s->s.ipv6.addr.raw,
50 &s->s.ipv6.mask.raw) != 0;
55 static inline struct tle_tcp_stream *
56 rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi,
59 struct tle_tcp_stream *s;
61 s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst];
62 if (s == NULL || tcp_stream_acquire(s) < 0)
65 /* check that we have a proper stream. */
66 if (s->tcb.state != TLE_TCP_ST_LISTEN) {
67 tcp_stream_release(s);
74 static inline struct tle_tcp_stream *
75 rx_acquire_stream(struct tle_stream *ts)
77 struct tle_tcp_stream *s;
80 if (tcp_stream_acquire(s) < 0)
83 else if (s->tcb.state == TLE_TCP_ST_CLOSED) {
84 tcp_stream_release(s);
91 static inline struct tle_tcp_stream *
92 rx_obtain_stream(const struct tle_dev *dev, struct stbl *st,
93 const union pkt_info *pi, uint32_t type)
95 struct tle_tcp_stream *s;
97 s = stbl_find_data(st, pi);
99 if (pi->tf.flags == TCP_FLAG_ACK)
100 return rx_obtain_listen_stream(dev, pi, type);
104 if (tcp_stream_acquire(s) < 0)
106 /* check that we have a proper stream. */
107 else if (s->tcb.state == TLE_TCP_ST_CLOSED) {
108 tcp_stream_release(s);
116 * Consider 2 pkt_info *equal* if their:
117 * - types (IPv4/IPv6)
120 * - TCP src and dst ports
121 * - IP src and dst addresses
125 pkt_info_bulk_eq(const union pkt_info pi[], uint32_t num)
131 if (pi[0].tf.type == TLE_V4) {
132 while (i != num && xmm_cmp(&pi[0].raw, &pi[i].raw) == 0)
135 } else if (pi[0].tf.type == TLE_V6) {
137 pi[0].raw.u64[0] == pi[i].raw.u64[0] &&
138 ymm_cmp(&pi[0].addr6->raw,
139 &pi[i].addr6->raw) == 0)
147 pkt_info_bulk_syneq(const union pkt_info pi[], uint32_t num)
153 if (pi[0].tf.type == TLE_V4) {
154 while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
155 pi[0].port.dst == pi[i].port.dst &&
156 pi[0].addr4.dst == pi[i].addr4.dst)
159 } else if (pi[0].tf.type == TLE_V6) {
160 while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
161 pi[0].port.dst == pi[i].port.dst &&
162 xmm_cmp(&pi[0].addr6->dst,
163 &pi[i].addr6->dst) == 0)
171 stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[],
174 _rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
177 static inline uint32_t
178 stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
181 return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
184 static inline uint32_t
185 get_ip_pid(struct tle_dev *dev, uint32_t num, uint32_t type, uint32_t st)
190 pa = &dev->tx.packet_id[type];
193 pid = rte_atomic32_add_return(pa, num);
196 pid = rte_atomic32_read(pa);
197 rte_atomic32_set(pa, pid + num);
202 static inline uint32_t
203 tcp_stream_adjust_tms(const struct tle_tcp_stream *s, uint32_t tms)
205 return tms - s->ts_offset;
209 fill_tcph(struct rte_tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
210 uint32_t seq, uint8_t hlen, uint8_t flags)
214 l4h->src_port = port.dst;
215 l4h->dst_port = port.src;
217 wnd = (flags & TCP_FLAG_SYN) ?
218 RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
219 tcb->rcv.wnd >> tcb->rcv.wscale;
221 /* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
222 l4h->sent_seq = rte_cpu_to_be_32(seq);
223 l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
224 l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET;
225 l4h->tcp_flags = flags;
226 l4h->rx_win = rte_cpu_to_be_16(wnd);
230 if (flags & TCP_FLAG_SYN)
231 fill_syn_opts(l4h + 1, &tcb->so);
232 else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0)
233 fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
237 tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s,
238 const struct tle_dest *dst, uint64_t ol_flags,
239 union l4_ports port, uint32_t seq, uint32_t flags,
240 uint32_t pid, uint32_t swcsm)
242 uint32_t l4, len, plen;
243 struct rte_tcp_hdr *l4h;
246 len = dst->l2_len + dst->l3_len;
249 if (flags & TCP_FLAG_SYN)
250 l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_MAX;
251 else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0)
252 l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS;
256 /* adjust mbuf to put L2/L3/L4 headers into it. */
257 l2h = rte_pktmbuf_prepend(m, len + l4);
261 /* copy L2/L3 header */
262 rte_memcpy(l2h, dst->hdr, len);
264 /* setup TCP header & options */
265 l4h = (struct rte_tcp_hdr *)(l2h + len);
266 fill_tcph(l4h, &s->tcb, port, seq, l4, flags);
268 /* setup mbuf TX offload related fields. */
269 m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0);
270 m->ol_flags |= ol_flags;
272 /* update proto specific fields. */
274 if (s->s.type == TLE_V4) {
275 struct rte_ipv4_hdr *l3h;
276 l3h = (struct rte_ipv4_hdr *)(l2h + dst->l2_len);
277 l3h->packet_id = rte_cpu_to_be_16(pid);
278 l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4);
280 if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
281 l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
284 l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
286 if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0)
287 l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
289 struct rte_ipv6_hdr *l3h;
290 l3h = (struct rte_ipv6_hdr *)(l2h + dst->l2_len);
291 l3h->payload_len = rte_cpu_to_be_16(plen + l4);
292 if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
293 l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
295 l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
302 * That function supposed to be used only for data packets.
303 * Assumes that L2/L3/L4 headers and mbuf fields already setup properly.
304 * - updates tcp SEG.SEQ, SEG.ACK, TS.VAL, TS.ECR.
305 * - if no HW cksum offloads are enabled, calculates TCP checksum.
308 tcp_update_mbuf(struct rte_mbuf *m, uint32_t type, const struct tcb *tcb,
309 uint32_t seq, uint32_t pid)
311 struct rte_tcp_hdr *l4h;
314 len = m->l2_len + m->l3_len;
315 l4h = rte_pktmbuf_mtod_offset(m, struct rte_tcp_hdr *, len);
317 l4h->sent_seq = rte_cpu_to_be_32(seq);
318 l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
320 if (tcb->so.ts.raw != 0)
321 fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
323 if (type == TLE_V4) {
324 struct rte_ipv4_hdr *l3h;
325 l3h = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *,
327 l3h->hdr_checksum = 0;
328 l3h->packet_id = rte_cpu_to_be_16(pid);
329 if ((m->ol_flags & PKT_TX_IP_CKSUM) == 0)
330 l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
333 /* have to calculate TCP checksum in SW */
334 if ((m->ol_flags & PKT_TX_TCP_CKSUM) == 0) {
338 if (type == TLE_V4) {
339 struct rte_ipv4_hdr *l3h;
340 l3h = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *,
342 l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
345 struct rte_ipv6_hdr *l3h;
346 l3h = rte_pktmbuf_mtod_offset(m, struct rte_ipv6_hdr *,
348 l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
353 /* Send data packets that need to be ACK-ed by peer */
354 static inline uint32_t
355 tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num)
357 uint32_t bsz, i, nb, nbm;
359 struct tle_drb *drb[num];
361 /* calculate how many drbs are needed.*/
362 bsz = s->tx.drb.nb_elem;
363 nbm = (num + bsz - 1) / bsz;
365 /* allocate drbs, adjust number of packets. */
366 nb = stream_drb_alloc(s, drb, nbm);
368 /* drb ring is empty. */
377 /* enqueue pkts for TX. */
379 i = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)m,
382 /* free unused drbs. */
384 stream_drb_free(s, drb + nbm - nb, nb);
389 static inline uint32_t
390 tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
393 uint32_t fail, i, k, n, mss, pid, plen, sz, tn, type;
396 struct rte_mbuf *mo[MAX_PKT_BURST + TCP_MAX_PKT_SEG];
398 mss = s->tcb.snd.mss;
402 pid = get_ip_pid(dev, num, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
407 for (i = 0; i != num && sl->len != 0 && fail == 0; i++) {
410 sz = RTE_MIN(sl->len, mss);
411 plen = PKT_L4_PLEN(mb);
413 /*fast path, no need to use indirect mbufs. */
416 /* update pkt TCP header */
417 tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid + i);
419 /* keep mbuf till ACK is received. */
420 rte_pktmbuf_refcnt_update(mb, 1);
424 /* remaining snd.wnd is less them MSS, send nothing */
427 /* packet indirection needed */
431 if (k >= MAX_PKT_BURST) {
432 n = tx_data_pkts(s, mo, k);
440 n = tx_data_pkts(s, mo, k);
446 sz = tcp_mbuf_seq_free(mo + n, fail);
455 * gets data from stream send buffer, updates it and
456 * queues it into TX device queue.
457 * Note that this function and is not MT safe.
459 static inline uint32_t
460 tx_nxt_data(struct tle_tcp_stream *s, uint32_t tms)
462 uint32_t n, num, tn, wnd;
463 struct rte_mbuf **mi;
467 wnd = s->tcb.snd.wnd - (uint32_t)(s->tcb.snd.nxt - s->tcb.snd.una);
468 sl.seq = s->tcb.snd.nxt;
469 sl.len = RTE_MIN(wnd, s->tcb.snd.cwnd);
474 /* update send timestamp */
478 /* get group of packets */
479 mi = tcp_txq_get_nxt_objs(s, &num);
481 /* stream send buffer is empty */
485 /* queue data packets for TX */
486 n = tx_data_bulk(s, &sl, mi, num);
489 /* update consumer head */
490 tcp_txq_set_nxt_head(s, n);
493 s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt;
498 free_una_data(struct tle_tcp_stream *s, uint32_t len)
500 uint32_t i, num, plen;
501 struct rte_mbuf **mi;
506 /* get group of packets */
507 mi = tcp_txq_get_una_objs(s, &num);
512 /* free acked data */
513 for (i = 0; i != num && plen != len; i++) {
514 uint32_t next_pkt_len = PKT_L4_PLEN(mi[i]);
515 if (plen + next_pkt_len > len) {
516 /* keep SND.UNA at the start of the packet */
520 plen += next_pkt_len;
522 rte_pktmbuf_free(mi[i]);
525 /* update consumer tail */
526 tcp_txq_set_una_tail(s, i);
527 } while (plen < len);
529 s->tcb.snd.una += len;
532 * that could happen in case of retransmit,
533 * adjust SND.NXT with SND.UNA.
535 if (s->tcb.snd.una > s->tcb.snd.nxt) {
536 tcp_txq_rst_nxt_head(s);
537 s->tcb.snd.nxt = s->tcb.snd.una;
541 static inline uint16_t
542 calc_smss(uint16_t mss, const struct tle_dest *dst)
546 n = dst->mtu - dst->l2_len - dst->l3_len - TCP_TX_HDR_DACK;
547 mss = RTE_MIN(n, mss);
553 * min (10*MSS, max (2*MSS, 14600))
555 * or using user provided initial congestion window (icw)
556 * min (10*MSS, max (2*MSS, icw))
558 static inline uint32_t
559 initial_cwnd(uint32_t smss, uint32_t icw)
561 return RTE_MIN(10 * smss, RTE_MAX(2 * smss, icw));
565 * queue standalone packet to he particular output device
567 * - L2/L3/L4 headers should be already set.
568 * - packet fits into one segment.
571 send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m)
576 if (stream_drb_alloc(s, &drb, 1) == 0)
579 /* enqueue pkt for TX. */
581 n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1,
584 /* free unused drbs. */
586 stream_drb_free(s, &drb, 1);
588 return (n == 1) ? 0 : -ENOBUFS;
592 send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
595 const struct tle_dest *dst;
601 pid = get_ip_pid(dst->dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
603 rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1);
605 rc = send_pkt(s, dst->dev, m);
611 send_rst(struct tle_tcp_stream *s, uint32_t seq)
616 m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
620 rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST);
628 send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags)
634 m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
638 seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0);
641 rc = send_ctrl_pkt(s, m, seq, flags);
647 s->tcb.snd.ack = s->tcb.rcv.nxt;
653 sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
654 const union seg_info *si, uint32_t ts, struct rte_mbuf *m)
658 uint32_t pid, seq, type;
662 const struct rte_tcp_hdr *th;
666 /* get destination information. */
670 da = &pi->addr6->src;
672 rc = stream_get_dest(&s->s, da, &dst);
676 th = rte_pktmbuf_mtod_offset(m, const struct rte_tcp_hdr *,
677 m->l2_len + m->l3_len);
678 get_syn_opts(&s->tcb.so, (uintptr_t)(th + 1), m->l4_len - sizeof(*th));
680 /* reset wscale option if timestamp is not present */
681 if (s->tcb.so.ts.val == 0)
682 s->tcb.so.wscale = 0;
684 s->tcb.rcv.nxt = si->seq + 1;
685 seq = sync_gen_seq(pi, s->tcb.rcv.nxt, ts, s->tcb.so.mss,
686 s->s.ctx->prm.hash_alg,
687 &s->s.ctx->prm.secret_key);
688 s->tcb.so.ts.ecr = s->tcb.so.ts.val;
689 s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale);
690 s->tcb.so.wscale = (s->tcb.so.wscale == TCP_WSCALE_NONE) ?
691 TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
692 s->tcb.so.mss = calc_smss(dst.mtu, &dst);
694 /* reset mbuf's data contents. */
695 len = m->l2_len + m->l3_len + m->l4_len;
697 if (rte_pktmbuf_adj(m, len) == NULL)
701 pid = get_ip_pid(dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
703 rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq,
704 TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
706 rc = send_pkt(s, dev, m);
713 * There are four cases for the acceptability test for an incoming segment:
714 * Segment Receive Test
716 * ------- ------- -------------------------------------------
717 * 0 0 SEG.SEQ = RCV.NXT
718 * 0 >0 RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
719 * >0 0 not acceptable
720 * >0 >0 RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
721 * or RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
724 check_seqn(const struct tcb *tcb, uint32_t seqn, uint32_t len)
729 if (seqn - tcb->rcv.nxt >= tcb->rcv.wnd &&
730 n - tcb->rcv.nxt > tcb->rcv.wnd)
736 static inline union tle_tcp_tsopt
737 rx_tms_opt(const struct tcb *tcb, const struct rte_mbuf *mb)
739 union tle_tcp_tsopt ts;
741 const struct rte_tcp_hdr *th;
743 if (tcb->so.ts.val != 0) {
744 opt = rte_pktmbuf_mtod_offset(mb, uintptr_t,
745 mb->l2_len + mb->l3_len + sizeof(*th));
746 ts = get_tms_opts(opt, mb->l4_len - sizeof(*th));
754 * PAWS and sequence check.
758 rx_check_seq(struct tcb *tcb, uint32_t seq, uint32_t len,
759 const union tle_tcp_tsopt ts)
763 /* RFC 1323 4.2.1 R2 */
764 rc = check_seqn(tcb, seq, len);
770 /* RFC 1323 4.2.1 R1 */
771 if (tcp_seq_lt(ts.val, tcb->rcv.ts))
774 /* RFC 1323 4.2.1 R3 */
775 if (tcp_seq_leq(seq, tcb->snd.ack) &&
776 tcp_seq_lt(tcb->snd.ack, seq + len))
777 tcb->rcv.ts = ts.val;
784 rx_check_ack(const struct tcb *tcb, uint32_t ack)
788 max = (uint32_t)RTE_MAX(tcb->snd.nxt, tcb->snd.rcvr);
790 if (tcp_seq_leq(tcb->snd.una, ack) && tcp_seq_leq(ack, max))
797 rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
798 const union tle_tcp_tsopt ts)
802 rc = rx_check_seq(tcb, seq, len, ts);
803 rc |= rx_check_ack(tcb, ack);
808 restore_syn_opt(union seg_info *si, union tle_tcp_tsopt *to,
809 const union pkt_info *pi, uint32_t ts, const struct rte_mbuf *mb,
810 uint32_t hash_alg, rte_xmm_t *secret_key)
814 const struct rte_tcp_hdr *th;
816 /* check that ACK, etc fields are what we expected. */
817 rc = sync_check_ack(pi, si->seq, si->ack - 1, ts,
825 th = rte_pktmbuf_mtod_offset(mb, const struct rte_tcp_hdr *,
826 mb->l2_len + mb->l3_len);
827 len = mb->l4_len - sizeof(*th);
828 to[0] = get_tms_opts((uintptr_t)(th + 1), len);
833 stream_term(struct tle_tcp_stream *s)
837 s->tcb.state = TLE_TCP_ST_CLOSED;
842 /* close() was already invoked, schedule final cleanup */
843 if ((s->tcb.uop & TLE_TCP_OP_CLOSE) != 0) {
845 dr = CTX_TCP_SDR(s->s.ctx);
846 STAILQ_INSERT_TAIL(&dr->be, &s->s, link);
848 /* notify user that stream need to be closed */
849 } else if (s->err.ev != NULL)
850 tle_event_raise(s->err.ev);
851 else if (s->err.cb.func != NULL)
852 s->err.cb.func(s->err.cb.data, &s->s);
856 stream_fill_dest(struct tle_tcp_stream *s)
864 da = &s->s.ipv4.addr.src;
866 da = &s->s.ipv6.addr.src;
868 rc = stream_get_dest(&s->s, da, &s->tx.dst);
869 return (rc < 0) ? rc : 0;
874 * for now rtt is calculated based on the tcp TMS option,
875 * later add real-time one
878 estimate_stream_rto(struct tle_tcp_stream *s, uint32_t tms)
882 if (s->tcb.so.ts.ecr) {
883 rtt = tms - s->tcb.so.ts.ecr;
884 rto_estimate(&s->tcb, rtt);
886 s->tcb.snd.rto = TCP_RTO_DEFAULT;
890 * helper function, prepares a new accept stream.
893 accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
894 struct tle_tcp_stream *cs, const union tle_tcp_tsopt *to,
895 uint32_t tms, const union pkt_info *pi, const union seg_info *si)
899 /* some TX still pending for that stream. */
900 if (TCP_STREAM_TX_PENDING(cs))
903 /* setup L4 ports and L3 addresses fields. */
904 cs->s.port.raw = pi->port.raw;
905 cs->s.pmsk.raw = UINT32_MAX;
907 if (pi->tf.type == TLE_V4) {
908 cs->s.ipv4.addr = pi->addr4;
909 cs->s.ipv4.mask.src = INADDR_NONE;
910 cs->s.ipv4.mask.dst = INADDR_NONE;
911 } else if (pi->tf.type == TLE_V6) {
912 cs->s.ipv6.addr = *pi->addr6;
913 rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
914 sizeof(cs->s.ipv6.mask.src));
915 rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
916 sizeof(cs->s.ipv6.mask.dst));
920 sync_fill_tcb(&cs->tcb, si, to);
921 cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
923 estimate_stream_rto(cs, tms);
925 /* copy streams type & flags. */
926 cs->s.type = ps->s.type;
927 cs->flags = ps->flags;
929 /* retrive and cache destination information. */
930 rc = stream_fill_dest(cs);
934 /* update snd.mss with SMSS value */
935 cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
937 /* setup congestion variables */
938 cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss, ps->tcb.snd.cwnd);
939 cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
940 cs->tcb.snd.rto_tw = ps->tcb.snd.rto_tw;
942 cs->tcb.state = TLE_TCP_ST_ESTABLISHED;
944 /* add stream to the table */
945 cs->ste = stbl_add_stream(st, pi, cs);
949 cs->tcb.uop |= TLE_TCP_OP_ACCEPT;
956 * ACK for new connection request arrived.
957 * Check that the packet meets all conditions and try to open a new stream.
959 * < 0 - invalid packet
960 * == 0 - packet is valid and new stream was opened for it.
961 * > 0 - packet is valid, but failed to open new stream.
964 rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
965 const union pkt_info *pi, union seg_info *si,
966 uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
970 struct tle_stream *ts;
971 struct tle_tcp_stream *cs;
972 union tle_tcp_tsopt to;
976 if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
980 rc = restore_syn_opt(si, &to, pi, tms, mb, ctx->prm.hash_alg,
981 &ctx->prm.secret_key);
985 /* allocate new stream */
986 cs = tcp_stream_get(ctx, 0);
990 /* prepare stream to handle new connection */
991 if (accept_prep_stream(s, st, cs, &to, tms, pi, si) == 0) {
993 /* put new stream in the accept queue */
995 if (_rte_ring_enqueue_burst(s->rx.q,
996 (void * const *)&ts, 1) == 1) {
1001 /* cleanup on failure */
1002 tcp_stream_down(cs);
1003 stbl_del_stream(st, cs->ste, cs, 0);
1007 tcp_stream_reset(ctx, cs);
1012 data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf **mb, uint32_t hlen,
1013 uint32_t *seqn, uint32_t *plen)
1015 uint32_t len, n, seq;
1020 rte_pktmbuf_adj(*mb, hlen);
1023 /* cut off the start of the packet */
1024 else if (tcp_seq_lt(seq, tcb->rcv.nxt)) {
1025 n = tcb->rcv.nxt - seq;
1029 *mb = _rte_pktmbuf_adj(*mb, n);
1037 static inline uint32_t
1038 rx_ackdata(struct tle_tcp_stream *s, uint32_t ack)
1042 n = ack - (uint32_t)s->tcb.snd.una;
1044 /* some more data was acked. */
1047 /* advance SND.UNA and free related packets. */
1048 k = rte_ring_free_count(s->tx.q);
1049 free_una_data(s, n);
1051 /* mark the stream as available for writing */
1052 if (rte_ring_free_count(s->tx.q) != 0) {
1053 if (s->tx.ev != NULL)
1054 tle_event_raise(s->tx.ev);
1055 else if (k == 0 && s->tx.cb.func != NULL)
1056 s->tx.cb.func(s->tx.cb.data, &s->s);
1064 stream_timewait(struct tle_tcp_stream *s, uint32_t rto)
1067 s->tcb.state = TLE_TCP_ST_TIME_WAIT;
1068 s->tcb.snd.rto = rto;
1075 rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
1080 s->tcb.rcv.nxt += 1;
1081 s->err.rev |= TLE_TCP_REV_FIN;
1083 ackfin = (s->tcb.snd.una == s->tcb.snd.fss);
1084 state = s->tcb.state;
1086 if (state == TLE_TCP_ST_ESTABLISHED) {
1087 s->tcb.state = TLE_TCP_ST_CLOSE_WAIT;
1088 /* raise err.ev & err.cb */
1089 if (s->err.ev != NULL)
1090 tle_event_raise(s->err.ev);
1091 else if (s->err.cb.func != NULL)
1092 s->err.cb.func(s->err.cb.data, &s->s);
1093 } else if (state == TLE_TCP_ST_FIN_WAIT_1 ||
1094 state == TLE_TCP_ST_CLOSING) {
1095 rsp->flags |= TCP_FLAG_ACK;
1097 stream_timewait(s, s->tcb.snd.rto_tw);
1099 s->tcb.state = TLE_TCP_ST_CLOSING;
1100 } else if (state == TLE_TCP_ST_FIN_WAIT_2) {
1101 rsp->flags |= TCP_FLAG_ACK;
1102 stream_timewait(s, s->tcb.snd.rto_tw);
1103 } else if (state == TLE_TCP_ST_LAST_ACK && ackfin != 0) {
1109 * FIN process for ESTABLISHED state
1111 * 0 < - error occurred
1112 * 0 - FIN was processed OK, and mbuf can be free/reused.
1113 * 0 > - FIN was processed OK and mbuf can't be free/reused.
1116 rx_fin(struct tle_tcp_stream *s, uint32_t state,
1117 const union seg_info *si, struct rte_mbuf *mb,
1118 struct resp_info *rsp)
1120 uint32_t hlen, plen, seq;
1122 union tle_tcp_tsopt ts;
1124 hlen = PKT_L234_HLEN(mb);
1125 plen = mb->pkt_len - hlen;
1128 ts = rx_tms_opt(&s->tcb, mb);
1129 ret = rx_check_seqack(&s->tcb, seq, si->ack, plen, ts);
1133 if (state < TLE_TCP_ST_ESTABLISHED)
1138 ret = data_pkt_adjust(&s->tcb, &mb, hlen, &seq, &plen);
1141 if (rx_data_enqueue(s, seq, plen, &mb, 1) != 1)
1146 * fast-path: all data & FIN was already sent out
1147 * and now is acknowledged.
1149 if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1150 si->ack == (uint32_t)s->tcb.snd.nxt) {
1151 s->tcb.snd.una = s->tcb.snd.fss;
1153 /* conventional ACK processiing */
1155 rx_ackdata(s, si->ack);
1157 /* some fragments still missing */
1158 if (seq + plen != s->tcb.rcv.nxt) {
1159 s->tcb.rcv.frs.seq = seq + plen;
1160 s->tcb.rcv.frs.on = 1;
1162 rx_fin_state(s, rsp);
1168 rx_rst(struct tle_tcp_stream *s, uint32_t state, uint32_t flags,
1169 const union seg_info *si)
1174 * RFC 793: In all states except SYN-SENT, all reset (RST) segments
1175 * are validated by checking their SEQ-fields.
1176 * A reset is valid if its sequence number is in the window.
1177 * In the SYN-SENT state (a RST received in response to an initial SYN),
1178 * the RST is acceptable if the ACK field acknowledges the SYN.
1180 if (state == TLE_TCP_ST_SYN_SENT) {
1181 rc = ((flags & TCP_FLAG_ACK) == 0 ||
1182 si->ack != s->tcb.snd.nxt) ?
1187 rc = check_seqn(&s->tcb, si->seq, 0);
1190 s->err.rev |= TLE_TCP_REV_RST;
1198 * check do we have FIN that was received out-of-order.
1199 * if yes, try to process it now.
1202 rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp)
1204 if (s->tcb.rcv.frs.on != 0 && s->tcb.rcv.nxt == s->tcb.rcv.frs.seq)
1205 rx_fin_state(s, rsp);
1209 dack_info_init(struct dack_info *tack, const struct tcb *tcb)
1211 static const struct dack_info zero_dack;
1213 tack[0] = zero_dack;
1214 tack->ack = tcb->snd.una;
1215 tack->segs.dup = tcb->rcv.dupack;
1216 tack->wu.raw = tcb->snd.wu.raw;
1217 tack->wnd = tcb->snd.wnd >> tcb->snd.wscale;
1221 ack_window_update(struct tcb *tcb, const struct dack_info *tack)
1223 tcb->snd.wu.raw = tack->wu.raw;
1224 tcb->snd.wnd = tack->wnd << tcb->snd.wscale;
1228 ack_cwnd_update(struct tcb *tcb, uint32_t acked, const struct dack_info *tack)
1232 n = tack->segs.ack * tcb->snd.mss;
1234 /* slow start phase, RFC 5681 3.1 (2) */
1235 if (tcb->snd.cwnd < tcb->snd.ssthresh)
1236 tcb->snd.cwnd += RTE_MIN(acked, n);
1237 /* congestion avoidance phase, RFC 5681 3.1 (3) */
1239 tcb->snd.cwnd += RTE_MAX(1U, n * tcb->snd.mss / tcb->snd.cwnd);
1243 rto_ssthresh_update(struct tcb *tcb)
1247 /* RFC 5681 3.1 (4) */
1248 n = (tcb->snd.nxt - tcb->snd.una) / 2;
1249 k = 2 * tcb->snd.mss;
1250 tcb->snd.ssthresh = RTE_MAX(n, k);
1254 rto_cwnd_update(struct tcb *tcb)
1257 if (tcb->snd.nb_retx == 0)
1258 rto_ssthresh_update(tcb);
1261 * RFC 5681 3.1: upon a timeout cwnd MUST be set to
1262 * no more than 1 full-sized segment.
1264 tcb->snd.cwnd = tcb->snd.mss;
1268 ack_info_update(struct dack_info *tack, const union seg_info *si,
1269 int32_t badseq, uint32_t dlen, const union tle_tcp_tsopt ts)
1272 tack->segs.badseq++;
1276 /* segnt with incoming data */
1277 tack->segs.data += (dlen != 0);
1279 /* segment with newly acked data */
1280 if (tcp_seq_lt(tack->ack, si->ack)) {
1283 tack->ack = si->ack;
1287 * RFC 5681: An acknowledgment is considered a "duplicate" when:
1288 * (a) the receiver of the ACK has outstanding data
1289 * (b) the incoming acknowledgment carries no data
1290 * (c) the SYN and FIN bits are both off
1291 * (d) the acknowledgment number is equal to the TCP.UNA
1292 * (e) the advertised window in the incoming acknowledgment equals the
1293 * advertised window in the last incoming acknowledgment.
1295 * Here will have only to check only for (b),(d),(e).
1296 * (a) will be checked later for the whole bulk of packets,
1297 * (c) should never happen here.
1299 } else if (dlen == 0 && si->wnd == tack->wnd && ++tack->segs.dup == 3) {
1300 tack->dup3.seg = tack->segs.ack + 1;
1301 tack->dup3.ack = tack->ack;
1306 * If SND.UNA < SEG.ACK =< SND.NXT, the send window should be
1307 * updated. If (SND.WL1 < SEG.SEQ or (SND.WL1 = SEG.SEQ and
1308 * SND.WL2 =< SEG.ACK)), set SND.WND <- SEG.WND, set
1309 * SND.WL1 <- SEG.SEQ, and set SND.WL2 <- SEG.ACK.
1311 if (tcp_seq_lt(tack->wu.wl1, si->seq) ||
1312 (si->seq == tack->wu.wl1 &&
1313 tcp_seq_leq(tack->wu.wl2, si->ack))) {
1315 tack->wu.wl1 = si->seq;
1316 tack->wu.wl2 = si->ack;
1317 tack->wnd = si->wnd;
1321 static inline uint32_t
1322 rx_data_ack(struct tle_tcp_stream *s, struct dack_info *tack,
1323 const union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[],
1324 int32_t rc[], uint32_t num)
1326 uint32_t i, j, k, n, t;
1327 uint32_t hlen, plen, seq, tlen;
1329 union tle_tcp_tsopt ts;
1332 for (i = 0; i != num; i = j) {
1334 hlen = PKT_L234_HLEN(mb[i]);
1335 plen = mb[i]->pkt_len - hlen;
1338 ts = rx_tms_opt(&s->tcb, mb[i]);
1339 ret = rx_check_seqack(&s->tcb, seq, si[i].ack, plen, ts);
1341 /* account segment received */
1342 ack_info_update(tack, &si[i], ret != 0, plen, ts);
1345 /* skip duplicate data, if any */
1346 ret = data_pkt_adjust(&s->tcb, &mb[i], hlen,
1358 /* group sequential packets together. */
1359 for (tlen = plen; j != num; tlen += plen, j++) {
1361 hlen = PKT_L234_HLEN(mb[j]);
1362 plen = mb[j]->pkt_len - hlen;
1364 /* not consecutive packet */
1365 if (plen == 0 || seq + tlen != si[j].seq)
1369 ts = rx_tms_opt(&s->tcb, mb[j]);
1370 ret = rx_check_seqack(&s->tcb, si[j].seq, si[j].ack,
1376 /* account for segment received */
1377 ack_info_update(tack, &si[j], ret != 0, plen, ts);
1379 rte_pktmbuf_adj(mb[j], hlen);
1384 /* account for OFO data */
1385 if (seq != s->tcb.rcv.nxt)
1386 tack->segs.ofo += n;
1388 /* enqueue packets */
1389 t = rx_data_enqueue(s, seq, tlen, mb + i, n);
1391 /* if we are out of space in stream recv buffer. */
1392 for (; t != n; t++) {
1403 start_fast_retransmit(struct tle_tcp_stream *s)
1409 /* RFC 6582 3.2.2 */
1410 tcb->snd.rcvr = tcb->snd.nxt;
1411 tcb->snd.fastack = 1;
1413 /* RFC 5681 3.2.2 */
1414 rto_ssthresh_update(tcb);
1416 /* RFC 5681 3.2.3 */
1417 tcp_txq_rst_nxt_head(s);
1418 tcb->snd.nxt = tcb->snd.una;
1419 tcb->snd.cwnd = tcb->snd.ssthresh + 3 * tcb->snd.mss;
1423 stop_fast_retransmit(struct tle_tcp_stream *s)
1429 n = tcb->snd.nxt - tcb->snd.una;
1430 tcb->snd.cwnd = RTE_MIN(tcb->snd.ssthresh,
1431 RTE_MAX(n, tcb->snd.mss) + tcb->snd.mss);
1432 tcb->snd.fastack = 0;
1436 in_fast_retransmit(struct tle_tcp_stream *s, uint32_t ack_len, uint32_t ack_num,
1444 /* RFC 5682 3.2.3 partial ACK */
1447 n = ack_num * tcb->snd.mss;
1449 tcb->snd.cwnd -= ack_len - n;
1451 tcb->snd.cwnd -= ack_len % tcb->snd.mss;
1454 * For the first partial ACK that arrives
1455 * during fast recovery, also reset the
1458 if (tcb->snd.fastack == 1)
1461 tcb->snd.fastack += ack_num;
1464 /* RFC 5681 3.2.4 */
1465 } else if (dup_num > 3) {
1466 s->tcb.snd.cwnd += (dup_num - 3) * tcb->snd.mss;
1474 process_ack(struct tle_tcp_stream *s, uint32_t acked,
1475 const struct dack_info *tack)
1482 if (s->tcb.snd.fastack == 0) {
1486 /* RFC 6582 3.2.2 switch to fast retransmit mode */
1487 if (tack->dup3.seg != 0 && s->tcb.snd.una != s->tcb.snd.nxt &&
1488 s->tcb.snd.una >= s->tcb.snd.rcvr) {
1490 start_fast_retransmit(s);
1491 in_fast_retransmit(s,
1492 tack->ack - tack->dup3.ack,
1493 tack->segs.ack - tack->dup3.seg - 1,
1496 /* remain in normal mode */
1497 } else if (acked != 0) {
1498 ack_cwnd_update(&s->tcb, acked, tack);
1502 /* fast retransmit mode */
1505 /* remain in fast retransmit mode */
1506 if (s->tcb.snd.una < s->tcb.snd.rcvr) {
1508 send = in_fast_retransmit(s, acked, tack->segs.ack,
1511 /* RFC 5682 3.2.3 full ACK */
1512 stop_fast_retransmit(s);
1515 /* if we have another series of dup ACKs */
1516 if (tack->dup3.seg != 0 &&
1517 s->tcb.snd.una != s->tcb.snd.nxt &&
1518 tcp_seq_leq((uint32_t)s->tcb.snd.rcvr,
1521 /* restart fast retransmit again. */
1522 start_fast_retransmit(s);
1523 send = in_fast_retransmit(s,
1524 tack->ack - tack->dup3.ack,
1525 tack->segs.ack - tack->dup3.seg - 1,
1535 * our FIN was acked, stop rto timer, change stream state,
1536 * and possibly close the stream.
1539 rx_ackfin(struct tle_tcp_stream *s)
1543 s->tcb.snd.una = s->tcb.snd.fss;
1546 state = s->tcb.state;
1547 if (state == TLE_TCP_ST_LAST_ACK)
1549 else if (state == TLE_TCP_ST_FIN_WAIT_1) {
1551 s->tcb.state = TLE_TCP_ST_FIN_WAIT_2;
1552 } else if (state == TLE_TCP_ST_CLOSING) {
1553 stream_timewait(s, s->tcb.snd.rto_tw);
1558 rx_process_ack(struct tle_tcp_stream *s, uint32_t ts,
1559 const struct dack_info *tack)
1564 s->tcb.rcv.dupack = tack->segs.dup;
1566 n = rx_ackdata(s, tack->ack);
1567 send = process_ack(s, n, tack);
1569 /* try to send more data. */
1570 if ((n != 0 || send != 0) && tcp_txq_nxt_cnt(s) != 0)
1571 txs_enqueue(s->s.ctx, s);
1573 /* restart RTO timer. */
1574 if (s->tcb.snd.nxt != s->tcb.snd.una)
1577 /* update rto, if fresh packet is here then calculate rtt */
1578 if (tack->ts.ecr != 0)
1579 rto_estimate(&s->tcb, ts - tack->ts.ecr);
1584 * returns negative value on failure, or zero on success.
1587 rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
1588 const union seg_info *si, struct rte_mbuf *mb,
1589 struct resp_info *rsp)
1591 struct tle_tcp_syn_opts so;
1592 struct rte_tcp_hdr *th;
1594 if (state != TLE_TCP_ST_SYN_SENT)
1598 * RFC 793 3.9: in the SYN-SENT state
1599 * If SEG.ACK =< ISS, or SEG.ACK > SND.NXT, send a reset
1600 * <SEQ=SEG.ACK><CTL=RST>
1601 * and discard the segment.
1602 * The connection remains in the same state.
1604 if (si->ack != (uint32_t)s->tcb.snd.nxt) {
1605 send_rst(s, si->ack);
1609 th = rte_pktmbuf_mtod_offset(mb, struct rte_tcp_hdr *,
1610 mb->l2_len + mb->l3_len);
1611 get_syn_opts(&so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th));
1615 s->tcb.snd.una = s->tcb.snd.nxt;
1616 s->tcb.snd.mss = calc_smss(so.mss, &s->tx.dst);
1617 s->tcb.snd.wnd = si->wnd << so.wscale;
1618 s->tcb.snd.wu.wl1 = si->seq;
1619 s->tcb.snd.wu.wl2 = si->ack;
1620 s->tcb.snd.wscale = so.wscale;
1622 /* setup congestion variables */
1623 s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd);
1624 s->tcb.snd.ssthresh = s->tcb.snd.wnd;
1626 s->tcb.rcv.ts = so.ts.val;
1627 s->tcb.rcv.irs = si->seq;
1628 s->tcb.rcv.nxt = si->seq + 1;
1630 /* if peer doesn't support WSCALE opt, recalculate RCV.WND */
1631 s->tcb.rcv.wscale = (so.wscale == TCP_WSCALE_NONE) ?
1632 TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
1633 s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
1635 /* calculate initial rto */
1636 rto_estimate(&s->tcb, ts - s->tcb.snd.ts);
1638 rsp->flags |= TCP_FLAG_ACK;
1641 s->tcb.state = TLE_TCP_ST_ESTABLISHED;
1644 if (s->tx.ev != NULL)
1645 tle_event_raise(s->tx.ev);
1646 else if (s->tx.cb.func != NULL)
1647 s->tx.cb.func(s->tx.cb.data, &s->s);
1652 static inline uint32_t
1653 rx_stream(struct tle_tcp_stream *s, uint32_t ts,
1654 const union pkt_info *pi, const union seg_info si[],
1655 struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1658 uint32_t i, k, n, state;
1660 struct resp_info rsp;
1661 struct dack_info tack;
1666 state = s->tcb.state;
1669 * first check for the states/flags where we don't
1670 * expect groups of packets.
1674 if ((pi->tf.flags & TCP_FLAG_RST) != 0) {
1677 rx_rst(s, state, pi->tf.flags, &si[i]);
1682 /* RFC 793: if the ACK bit is off drop the segment and return */
1683 } else if ((pi->tf.flags & TCP_FLAG_ACK) == 0) {
1686 * first check for the states/flags where we don't
1687 * expect groups of packets.
1690 /* process <SYN,ACK> */
1691 } else if ((pi->tf.flags & TCP_FLAG_SYN) != 0) {
1692 for (i = 0; i != num; i++) {
1693 ret = rx_synack(s, ts, state, &si[i], mb[i], &rsp);
1703 } else if ((pi->tf.flags & TCP_FLAG_FIN) != 0) {
1705 for (i = 0; i != num; i++) {
1706 ret = rx_fin(s, state, &si[i], mb[i], &rsp);
1716 /* normal data/ack packets */
1717 } else if (state >= TLE_TCP_ST_ESTABLISHED &&
1718 state <= TLE_TCP_ST_LAST_ACK) {
1720 /* process incoming data packets. */
1721 dack_info_init(&tack, &s->tcb);
1722 n = rx_data_ack(s, &tack, si, mb, rp, rc, num);
1724 /* follow up actions based on aggregated information */
1726 /* update SND.WND */
1727 ack_window_update(&s->tcb, &tack);
1730 * fast-path: all data & FIN was already sent out
1731 * and now is acknowledged.
1733 if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1734 tack.ack == (uint32_t)s->tcb.snd.nxt)
1737 rx_process_ack(s, ts, &tack);
1740 * send an immediate ACK if either:
1741 * - received segment with invalid seq/ack number
1742 * - received segment with OFO data
1743 * - received segment with INO data and no TX is scheduled
1746 if (tack.segs.badseq != 0 || tack.segs.ofo != 0 ||
1747 (tack.segs.data != 0 &&
1748 rte_atomic32_read(&s->tx.arm) == 0))
1749 rsp.flags |= TCP_FLAG_ACK;
1751 rx_ofo_fin(s, &rsp);
1756 /* unhandled state, drop all packets. */
1760 /* we have a response packet to send. */
1761 if (rsp.flags != 0) {
1762 send_ack(s, ts, rsp.flags);
1764 /* start the timer for FIN packet */
1765 if ((rsp.flags & TCP_FLAG_FIN) != 0)
1769 /* unprocessed packets */
1770 for (; i != num; i++, k++) {
1778 static inline uint32_t
1779 rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
1780 const union pkt_info *pi, const union seg_info si[],
1781 struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1786 if (tcp_stream_acquire(s) > 0) {
1787 i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1788 tcp_stream_release(s);
1792 for (i = 0; i != num; i++) {
1799 static inline uint32_t
1800 rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
1801 const union pkt_info pi[], union seg_info si[],
1802 struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1805 struct tle_tcp_stream *cs, *s;
1806 uint32_t i, k, n, state;
1809 s = rx_obtain_stream(dev, st, &pi[0], type);
1811 for (i = 0; i != num; i++) {
1819 state = s->tcb.state;
1821 if (state == TLE_TCP_ST_LISTEN) {
1823 /* one connection per flow */
1826 for (i = 0; i != num; i++) {
1828 ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs);
1830 /* valid packet encountered */
1834 /* invalid packet, keep trying to find a proper one */
1840 /* packet is valid, but we are out of streams to serve it */
1842 for (; i != num; i++, k++) {
1846 /* new stream is accepted */
1847 } else if (ret == 0) {
1849 /* inform listen stream about new connections */
1850 if (s->rx.ev != NULL)
1851 tle_event_raise(s->rx.ev);
1852 else if (s->rx.cb.func != NULL &&
1853 rte_ring_count(s->rx.q) == 1)
1854 s->rx.cb.func(s->rx.cb.data, &s->s);
1856 /* if there is no data, drop current packet */
1857 if (PKT_L4_PLEN(mb[i]) == 0) {
1862 /* process remaining packets for that stream */
1864 n = rx_new_stream(cs, ts, pi + i, si + i,
1865 mb + i, rp + k, rc + k, num - i);
1871 i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1875 tcp_stream_release(s);
1880 static inline uint32_t
1881 rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
1882 const union pkt_info pi[], const union seg_info si[],
1883 struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1886 struct tle_tcp_stream *s;
1890 s = rx_obtain_listen_stream(dev, &pi[0], type);
1892 for (i = 0; i != num; i++) {
1900 for (i = 0; i != num; i++) {
1902 /* check that this remote is allowed to connect */
1903 if (rx_check_stream(s, &pi[i]) != 0)
1906 /* syncokie: reply with <SYN,ACK> */
1907 ret = sync_ack(s, &pi[i], &si[i], ts, mb[i]);
1916 tcp_stream_release(s);
1921 tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
1922 struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
1925 struct tle_ctx *ctx;
1926 uint32_t i, j, k, mt, n, t, ts;
1927 union pkt_info pi[num];
1928 union seg_info si[num];
1930 uint8_t t[TLE_VNUM];
1935 ts = tcp_get_tms(ctx->cycles_ms_shift);
1936 st = CTX_TCP_STLB(ctx);
1937 mt = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0);
1941 /* extract packet info and check the L3/L4 csums */
1942 for (i = 0; i != num; i++) {
1944 get_pkt_info(pkt[i], &pi[i], &si[i]);
1947 pi[i].csf = check_pkt_csum(pkt[i], pi[i].csf, t, IPPROTO_TCP);
1951 if (stu.t[TLE_V4] != 0)
1952 stbl_lock(st, TLE_V4);
1953 if (stu.t[TLE_V6] != 0)
1954 stbl_lock(st, TLE_V6);
1957 for (i = 0; i != num; i += j) {
1961 /*basic checks for incoming packet */
1962 if (t >= TLE_VNUM || pi[i].csf != 0 || dev->dp[t] == NULL) {
1967 /* process input SYN packets */
1968 } else if (pi[i].tf.flags == TCP_FLAG_SYN) {
1969 j = pkt_info_bulk_syneq(pi + i, num - i);
1970 n = rx_syn(dev, t, ts, pi + i, si + i, pkt + i,
1974 j = pkt_info_bulk_eq(pi + i, num - i);
1975 n = rx_postsyn(dev, st, t, ts, pi + i, si + i, pkt + i,
1981 if (stu.t[TLE_V4] != 0)
1982 stbl_unlock(st, TLE_V4);
1983 if (stu.t[TLE_V6] != 0)
1984 stbl_unlock(st, TLE_V6);
1990 tle_tcp_stream_rx_bulk(struct tle_stream *ts, struct rte_mbuf *pkt[],
1991 struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
1993 struct tle_ctx *ctx;
1994 struct tle_tcp_stream *s;
1995 uint32_t i, j, k, n, t, tms;
1996 union pkt_info pi[num];
1997 union seg_info si[num];
2000 tms = tcp_get_tms(ctx->cycles_ms_shift);
2002 s = rx_acquire_stream(ts);
2004 for (i = 0; i != num; i++) {
2011 tms = tcp_stream_adjust_tms(s, tms);
2013 /* extract packet info and check the L3/L4 csums */
2014 for (i = 0; i != num; i++) {
2015 get_pkt_info(pkt[i], &pi[i], &si[i]);
2016 pi[i].csf = check_pkt_csum(pkt[i], pi[i].csf, pi[i].tf.type,
2021 for (i = 0; i != num; i += j) {
2026 /*basic checks for incoming packet */
2027 if (t != ts->type || pi[i].csf != 0 ||
2028 rx_check_stream(s, pi + i) != 0) {
2035 j = pkt_info_bulk_eq(pi + i, num - i);
2036 n = rx_stream(s, tms, pi + i, si + i, pkt + i,
2041 tcp_stream_release(s);
2046 tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
2050 struct tle_tcp_stream *s;
2051 struct tle_memtank *mts;
2054 n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num);
2058 mts = CTX_TCP_MTS(ts->ctx);
2061 * if we still have packets to read,
2062 * then rearm stream RX event.
2064 if (n == num && rte_ring_count(s->rx.q) != 0) {
2065 if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
2066 tle_event_raise(s->rx.ev);
2067 tcp_stream_release(s);
2070 tle_memtank_grow(mts);
2075 tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
2077 uint32_t i, j, k, n;
2078 struct tle_drb *drb[num];
2079 struct tle_tcp_stream *s;
2081 /* extract packets from device TX queue. */
2084 n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
2090 /* free empty drbs and notify related streams. */
2092 for (i = 0; i != k; i = j) {
2094 for (j = i + 1; j != k && s == drb[j]->udata; j++)
2096 stream_drb_free(s, drb + i, j - i);
2103 stream_fill_pkt_info(const struct tle_tcp_stream *s, union pkt_info *pi)
2105 if (s->s.type == TLE_V4)
2106 pi->addr4 = s->s.ipv4.addr;
2108 pi->addr6 = &s->s.ipv6.addr;
2110 pi->port = s->s.port;
2111 pi->tf.type = s->s.type;
2115 stream_fill_addr(struct tle_tcp_stream *s, const struct sockaddr *addr)
2117 const struct sockaddr_in *in4;
2118 const struct sockaddr_in6 *in6;
2119 const struct tle_dev_param *prm;
2123 s->s.pmsk.raw = UINT32_MAX;
2125 /* setup L4 src ports and src address fields. */
2126 if (s->s.type == TLE_V4) {
2127 in4 = (const struct sockaddr_in *)addr;
2128 if (in4->sin_addr.s_addr == INADDR_ANY || in4->sin_port == 0)
2131 s->s.port.src = in4->sin_port;
2132 s->s.ipv4.addr.src = in4->sin_addr.s_addr;
2133 s->s.ipv4.mask.src = INADDR_NONE;
2134 s->s.ipv4.mask.dst = INADDR_NONE;
2136 } else if (s->s.type == TLE_V6) {
2137 in6 = (const struct sockaddr_in6 *)addr;
2138 if (memcmp(&in6->sin6_addr, &tle_ipv6_any,
2139 sizeof(tle_ipv6_any)) == 0 ||
2140 in6->sin6_port == 0)
2143 s->s.port.src = in6->sin6_port;
2144 rte_memcpy(&s->s.ipv6.addr.src, &in6->sin6_addr,
2145 sizeof(s->s.ipv6.addr.src));
2146 rte_memcpy(&s->s.ipv6.mask.src, &tle_ipv6_none,
2147 sizeof(s->s.ipv6.mask.src));
2148 rte_memcpy(&s->s.ipv6.mask.dst, &tle_ipv6_none,
2149 sizeof(s->s.ipv6.mask.dst));
2152 /* setup the destination device. */
2153 rc = stream_fill_dest(s);
2157 /* setup L4 dst address from device param */
2158 prm = &s->tx.dst.dev->prm;
2159 if (s->s.type == TLE_V4) {
2160 if (s->s.ipv4.addr.dst == INADDR_ANY)
2161 s->s.ipv4.addr.dst = prm->local_addr4.s_addr;
2162 } else if (memcmp(&s->s.ipv6.addr.dst, &tle_ipv6_any,
2163 sizeof(tle_ipv6_any)) == 0)
2164 memcpy(&s->s.ipv6.addr.dst, &prm->local_addr6,
2165 sizeof(s->s.ipv6.addr.dst));
2171 tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
2177 struct stbl_entry *se;
2179 /* fill stream address */
2180 rc = stream_fill_addr(s, addr);
2184 /* fill pkt info to generate seq.*/
2185 stream_fill_pkt_info(s, &pi);
2187 tms = tcp_get_tms(s->s.ctx->cycles_ms_shift);
2188 s->tcb.so.ts.val = tms;
2189 s->tcb.so.ts.ecr = 0;
2190 s->tcb.so.wscale = TCP_WSCALE_DEFAULT;
2191 s->tcb.so.mss = calc_smss(s->tx.dst.mtu, &s->tx.dst);
2193 /* note that rcv.nxt is 0 here for sync_gen_seq.*/
2194 seq = sync_gen_seq(&pi, s->tcb.rcv.nxt, tms, s->tcb.so.mss,
2195 s->s.ctx->prm.hash_alg,
2196 &s->s.ctx->prm.secret_key);
2197 s->tcb.snd.iss = seq;
2198 s->tcb.snd.rcvr = seq;
2199 s->tcb.snd.una = seq;
2200 s->tcb.snd.nxt = seq + 1;
2201 s->tcb.snd.rto = TCP_RTO_DEFAULT;
2202 s->tcb.snd.ts = tms;
2204 s->tcb.rcv.mss = s->tcb.so.mss;
2205 s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT;
2206 s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
2209 /* add the stream in stream table */
2210 st = CTX_TCP_STLB(s->s.ctx);
2211 se = stbl_add_stream_lock(st, s);
2216 /* put stream into the to-send queue */
2217 txs_enqueue(s->s.ctx, s);
2223 tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
2225 struct tle_tcp_stream *s;
2229 if (ts == NULL || addr == NULL)
2234 if (type >= TLE_VNUM)
2237 if (tcp_stream_try_acquire(s) > 0) {
2238 rc = rte_atomic16_cmpset(&s->tcb.state, TLE_TCP_ST_CLOSED,
2239 TLE_TCP_ST_SYN_SENT);
2240 rc = (rc == 0) ? -EDEADLK : 0;
2245 tcp_stream_release(s);
2249 /* fill stream, prepare and transmit syn pkt */
2250 s->tcb.uop |= TLE_TCP_OP_CONNECT;
2251 rc = tx_syn(s, addr);
2252 tcp_stream_release(s);
2254 /* error happened, do a cleanup */
2256 tle_tcp_stream_close(ts);
2262 * Helper function for tle_tcp_stream_establish().
2263 * updates stream's TCB.
2266 tcb_establish(struct tle_tcp_stream *s, const struct tle_tcp_conn_info *ci)
2270 tms = tcp_get_tms(s->s.ctx->cycles_ms_shift);
2271 mss = calc_smss(ci->so.mss, &s->tx.dst);
2274 fill_tcb_snd(&s->tcb, ci->ack, ci->seq, mss,
2275 ci->wnd, ci->so.wscale, &ci->so.ts);
2276 fill_tcb_rcv(&s->tcb, ci->ack, ci->so.wscale, &ci->so.ts);
2278 s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
2280 /* setup congestion variables */
2281 s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd);
2282 s->tcb.snd.ssthresh = s->tcb.snd.wnd;
2284 /* calculate and store real timestamp offset */
2285 if (ci->so.ts.raw != 0) {
2286 s->ts_offset = tms - ci->so.ts.ecr;
2287 tms -= s->ts_offset;
2290 estimate_stream_rto(s, tms);
2294 tle_tcp_stream_establish(struct tle_ctx *ctx,
2295 const struct tle_tcp_stream_param *prm,
2296 const struct tle_tcp_conn_info *ci, uint32_t flags)
2299 struct tle_tcp_stream *s;
2302 if (ctx == NULL || prm == NULL || ci == NULL) {
2303 rte_errno = -EINVAL;
2307 /* allocate new stream */
2308 s = tcp_stream_get(ctx, TLE_MTANK_ALLOC_CHUNK | TLE_MTANK_ALLOC_GROW);
2315 s->tcb.uop |= TLE_TCP_OP_ESTABLISH;
2317 /* check and use stream addresses and parameters */
2318 rc = tcp_stream_fill_prm(s, prm);
2322 /* retrieve and cache destination information. */
2323 rc = stream_fill_dest(s);
2327 /* add the stream to the stream table */
2328 if ((flags & TLE_TCP_STREAM_F_PRIVATE) == 0) {
2329 st = CTX_TCP_STLB(s->s.ctx);
2330 s->ste = stbl_add_stream_lock(st, s);
2331 if (s->ste == NULL) {
2337 /* fill TCB from user provided data */
2338 tcb_establish(s, ci);
2339 s->tcb.state = TLE_TCP_ST_ESTABLISHED;
2344 /* cleanup on failure */
2346 tcp_stream_reset(ctx, s);
2355 tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2358 struct tle_tcp_stream *s;
2361 n = _rte_ring_mcs_dequeue_burst(s->rx.q, (void **)pkt, num);
2366 * if we still have packets to read,
2367 * then rearm stream RX event.
2369 if (n == num && rte_ring_count(s->rx.q) != 0) {
2370 if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
2371 tle_event_raise(s->rx.ev);
2372 tcp_stream_release(s);
2379 tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov,
2385 struct tle_tcp_stream *s;
2387 struct rxq_objs mo[2];
2391 /* get group of packets */
2392 mn = tcp_rxq_get_objs(s, mo);
2398 for (i = 0; i != iovcnt; i++) {
2401 n += _mbus_to_iovec(&iv, mo[0].mb + n, mo[0].num - n);
2402 if (iv.iov_len != 0) {
2410 if (i != iovcnt && mn != 1) {
2414 n += _mbus_to_iovec(&iv, mo[1].mb + n, mo[1].num - n);
2415 if (iv.iov_len != 0) {
2419 if (i + 1 != iovcnt)
2421 } while (++i != iovcnt);
2425 tcp_rxq_consume(s, tn);
2428 * if we still have packets to read,
2429 * then rearm stream RX event.
2431 if (i == iovcnt && rte_ring_count(s->rx.q) != 0) {
2432 if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
2433 tle_event_raise(s->rx.ev);
2434 tcp_stream_release(s);
2440 static inline int32_t
2441 tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
2442 struct rte_mbuf *segs[], uint32_t num)
2447 for (i = 0; i != num; i++) {
2448 /* Build L2/L3/L4 header */
2449 rc = tcp_fill_mbuf(segs[i], s, &s->tx.dst, ol_flags, s->s.port,
2450 0, TCP_FLAG_ACK, 0, 0);
2452 free_mbufs(segs, num);
2458 /* queue packets for further transmission. */
2459 rc = _rte_ring_enqueue_bulk(s->tx.q, (void **)segs, num);
2461 free_mbufs(segs, num);
2468 tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2470 uint32_t i, j, k, mss, n, state;
2473 struct tle_tcp_stream *s;
2474 struct rte_mbuf *segs[TCP_MAX_PKT_SEG];
2478 /* mark stream as not closable. */
2479 if (tcp_stream_acquire(s) < 0) {
2484 state = s->tcb.state;
2485 if (state != TLE_TCP_ST_ESTABLISHED && state != TLE_TCP_ST_CLOSE_WAIT) {
2486 rte_errno = ENOTCONN;
2487 tcp_stream_release(s);
2491 mss = s->tcb.snd.mss;
2492 ol_flags = s->tx.dst.ol_flags;
2497 /* prepare and check for TX */
2498 for (i = k; i != num; i++) {
2499 if (pkt[i]->pkt_len > mss ||
2500 pkt[i]->nb_segs > TCP_MAX_PKT_SEG)
2502 rc = tcp_fill_mbuf(pkt[i], s, &s->tx.dst, ol_flags,
2503 s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2509 /* queue packets for further transmission. */
2510 n = _rte_ring_enqueue_burst(s->tx.q,
2511 (void **)pkt + k, (i - k));
2515 * for unsent, but already modified packets:
2516 * remove pkt l2/l3 headers, restore ol_flags
2519 ol_flags = ~s->tx.dst.ol_flags;
2520 for (j = k; j != i; j++) {
2521 rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len +
2524 pkt[j]->ol_flags &= ol_flags;
2534 /* segment large packet and enqueue for sending */
2535 } else if (i != num) {
2536 /* segment the packet. */
2537 rc = tcp_segmentation(pkt[i], segs, RTE_DIM(segs),
2544 rc = tx_segments(s, ol_flags, segs, rc);
2546 /* free the large mbuf */
2547 rte_pktmbuf_free(pkt[i]);
2548 /* set the mbuf as consumed */
2551 /* no space left in tx queue */
2556 /* notify BE about more data to send */
2558 txs_enqueue(s->s.ctx, s);
2559 /* if possible, re-arm stream write event. */
2560 if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2561 tle_event_raise(s->tx.ev);
2563 tcp_stream_release(s);
2569 tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp,
2570 const struct iovec *iov, int iovcnt)
2573 uint32_t j, k, n, num, slen, state;
2576 struct tle_tcp_stream *s;
2578 struct rte_mbuf *mb[2 * MAX_PKT_BURST];
2582 /* mark stream as not closable. */
2583 if (tcp_stream_acquire(s) < 0) {
2588 state = s->tcb.state;
2589 if (state != TLE_TCP_ST_ESTABLISHED && state != TLE_TCP_ST_CLOSE_WAIT) {
2590 rte_errno = ENOTCONN;
2591 tcp_stream_release(s);
2595 /* figure out how many mbufs do we need */
2597 for (i = 0; i != iovcnt; i++)
2598 tsz += iov[i].iov_len;
2600 slen = rte_pktmbuf_data_room_size(mp);
2601 slen = RTE_MIN(slen, s->tcb.snd.mss);
2603 num = (tsz + slen - 1) / slen;
2604 n = rte_ring_free_count(s->tx.q);
2605 num = RTE_MIN(num, n);
2606 n = RTE_MIN(num, RTE_DIM(mb));
2608 /* allocate mbufs */
2609 if (rte_pktmbuf_alloc_bulk(mp, mb, n) != 0) {
2611 tcp_stream_release(s);
2615 /* copy data into the mbufs */
2618 for (i = 0; i != iovcnt; i++) {
2621 k += _iovec_to_mbsegs(&iv, slen, mb + k, n - k);
2622 if (iv.iov_len != 0) {
2628 /* partially filled segment */
2629 k += (k != n && mb[k]->data_len != 0);
2631 /* fill pkt headers */
2632 ol_flags = s->tx.dst.ol_flags;
2634 for (j = 0; j != k; j++) {
2635 rc = tcp_fill_mbuf(mb[j], s, &s->tx.dst, ol_flags,
2636 s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2641 /* if no error encountered, then enqueue pkts for transmission */
2643 k = _rte_ring_enqueue_burst(s->tx.q, (void **)mb, j);
2649 /* free pkts that were not enqueued */
2650 free_mbufs(mb + k, j - k);
2652 /* our last segment can be partially filled */
2653 sz += slen - sz % slen;
2654 sz -= (j - k) * slen;
2656 /* report an error */
2665 /* notify BE about more data to send */
2666 txs_enqueue(s->s.ctx, s);
2668 /* if possible, re-arm stream write event. */
2669 if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2670 tle_event_raise(s->tx.ev);
2673 tcp_stream_release(s);
2677 /* send data and FIN (if needed) */
2679 tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
2681 /* try to send some data */
2682 tx_nxt_data(s, tms);
2684 /* we also have to send a FIN */
2685 if (state != TLE_TCP_ST_ESTABLISHED &&
2686 state != TLE_TCP_ST_CLOSE_WAIT &&
2687 tcp_txq_nxt_cnt(s) == 0 &&
2688 s->tcb.snd.fss != s->tcb.snd.nxt) {
2689 s->tcb.snd.fss = ++s->tcb.snd.nxt;
2690 send_ack(s, tms, TCP_FLAG_FIN | TCP_FLAG_ACK);
2695 tx_stream(struct tle_tcp_stream *s, uint32_t tms)
2699 state = s->tcb.state;
2701 if (state == TLE_TCP_ST_SYN_SENT) {
2702 /* send the SYN, start the rto timer */
2703 send_ack(s, tms, TCP_FLAG_SYN);
2706 } else if (state >= TLE_TCP_ST_ESTABLISHED &&
2707 state <= TLE_TCP_ST_LAST_ACK) {
2709 tx_data_fin(s, tms, state);
2711 /* start RTO timer. */
2712 if (s->tcb.snd.nxt != s->tcb.snd.una)
2714 } else if (state == TLE_TCP_ST_CLOSED) {
2715 if ((s->tcb.snd.close_flags & TCP_FLAG_RST) != 0)
2716 send_rst(s, s->tcb.snd.nxt);
2722 rto_stream(struct tle_tcp_stream *s, uint32_t tms)
2726 state = s->tcb.state;
2728 TCP_LOG(DEBUG, "%s(%p, tms=%u): state=%u, "
2729 "retx=%u, retm=%u, "
2730 "rto=%u, snd.ts=%u, tmo=%u, "
2731 "snd.nxt=%lu, snd.una=%lu, flight_size=%lu, "
2732 "snd.rcvr=%lu, snd.fastack=%u, "
2733 "wnd=%u, cwnd=%u, ssthresh=%u, "
2734 "bytes sent=%lu, pkt remain=%u;\n",
2735 __func__, s, tms, s->tcb.state,
2736 s->tcb.snd.nb_retx, s->tcb.snd.nb_retm,
2737 s->tcb.snd.rto, s->tcb.snd.ts, tms - s->tcb.snd.ts,
2738 s->tcb.snd.nxt, s->tcb.snd.una, s->tcb.snd.nxt - s->tcb.snd.una,
2739 s->tcb.snd.rcvr, s->tcb.snd.fastack,
2740 s->tcb.snd.wnd, s->tcb.snd.cwnd, s->tcb.snd.ssthresh,
2741 s->tcb.snd.nxt - s->tcb.snd.iss, tcp_txq_nxt_cnt(s));
2743 if (s->tcb.snd.nb_retx < s->tcb.snd.nb_retm) {
2745 if (state >= TLE_TCP_ST_ESTABLISHED &&
2746 state <= TLE_TCP_ST_LAST_ACK) {
2748 /* update SND.CWD and SND.SSTHRESH */
2749 rto_cwnd_update(&s->tcb);
2751 /* RFC 6582 3.2.4 */
2752 s->tcb.snd.rcvr = s->tcb.snd.nxt;
2753 s->tcb.snd.fastack = 0;
2755 /* restart from last acked data */
2756 tcp_txq_rst_nxt_head(s);
2757 s->tcb.snd.nxt = s->tcb.snd.una;
2759 tx_data_fin(s, tms, state);
2761 } else if (state == TLE_TCP_ST_SYN_SENT) {
2763 s->tcb.so.ts.val = tms;
2765 /* According to RFC 6928 2:
2766 * To reduce the chance for spurious SYN or SYN/ACK
2767 * retransmission, it is RECOMMENDED that
2768 * implementations refrain from resetting the initial
2769 * window to 1 segment, unless there have been more
2770 * than one SYN or SYN/ACK retransmissions or true loss
2771 * detection has been made.
2773 if (s->tcb.snd.nb_retx != 0)
2774 s->tcb.snd.cwnd = s->tcb.snd.mss;
2776 send_ack(s, tms, TCP_FLAG_SYN);
2778 } else if (state == TLE_TCP_ST_TIME_WAIT) {
2779 s->err.rev |= TLE_TCP_REV_RTO;
2783 /* RFC6298:5.5 back off the timer */
2784 s->tcb.snd.rto = rto_roundup(2 * s->tcb.snd.rto);
2785 s->tcb.snd.nb_retx++;
2789 s->err.rev |= TLE_TCP_REV_RTO;
2790 send_rst(s, s->tcb.snd.nxt);
2796 tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
2800 struct tle_timer_wheel *tw;
2801 struct tle_stream *p;
2802 struct tle_tcp_stream *s, *rs[num];
2804 /* process streams with RTO exipred */
2806 tw = CTX_TCP_TMWHL(ctx);
2807 tms = tcp_get_tms(ctx->cycles_ms_shift);
2808 tle_timer_expire(tw, tms);
2810 k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs));
2812 for (i = 0; i != k; i++) {
2815 s->timer.handle = NULL;
2816 if (tcp_stream_try_acquire(s) > 0)
2817 rto_stream(s, tcp_stream_adjust_tms(s, tms));
2818 tcp_stream_release(s);
2821 /* process streams from to-send queue */
2823 k = txs_dequeue_bulk(ctx, rs, RTE_DIM(rs));
2825 for (i = 0; i != k; i++) {
2828 rte_atomic32_set(&s->tx.arm, 0);
2830 if (tcp_stream_try_acquire(s) > 0)
2831 tx_stream(s, tcp_stream_adjust_tms(s, tms));
2833 txs_enqueue(s->s.ctx, s);
2834 tcp_stream_release(s);
2837 /* collect streams to close from the death row */
2839 dr = CTX_TCP_SDR(ctx);
2840 for (k = 0, p = STAILQ_FIRST(&dr->be);
2841 k != num && p != NULL;
2842 k++, p = STAILQ_NEXT(p, link))
2843 rs[k] = TCP_STREAM(p);
2846 STAILQ_INIT(&dr->be);
2848 STAILQ_FIRST(&dr->be) = p;
2850 /* cleanup closed streams */
2851 for (i = 0; i != k; i++) {
2854 tcp_stream_reset(ctx, s);