2 * Copyright (c) 2016-2017 Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
16 #include <rte_errno.h>
17 #include <rte_ethdev.h>
19 #include <rte_ip_frag.h>
22 #include "tcp_stream.h"
23 #include "tcp_timer.h"
24 #include "stream_table.h"
25 #include "syncookie.h"
30 #include "tcp_tx_seg.h"
32 #define TCP_MAX_PKT_SEG 0x20
35 * checks if input TCP ports and IP addresses match given stream.
36 * returns zero on success.
39 rx_check_stream(const struct tle_tcp_stream *s, const union pkt_info *pi)
43 if (pi->tf.type == TLE_V4)
44 rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
45 (pi->addr4.raw & s->s.ipv4.mask.raw) !=
48 rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
49 ymm_mask_cmp(&pi->addr6->raw, &s->s.ipv6.addr.raw,
50 &s->s.ipv6.mask.raw) != 0;
55 static inline struct tle_tcp_stream *
56 rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi,
59 struct tle_tcp_stream *s;
61 s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst];
62 if (s == NULL || tcp_stream_acquire(s) < 0)
65 /* check that we have a proper stream. */
66 if (s->tcb.state != TCP_ST_LISTEN) {
67 tcp_stream_release(s);
74 static inline struct tle_tcp_stream *
75 rx_obtain_stream(const struct tle_dev *dev, struct stbl *st,
76 const union pkt_info *pi, uint32_t type)
78 struct tle_tcp_stream *s;
80 s = stbl_find_data(st, pi);
82 if (pi->tf.flags == TCP_FLAG_ACK)
83 return rx_obtain_listen_stream(dev, pi, type);
87 if (tcp_stream_acquire(s) < 0)
89 /* check that we have a proper stream. */
90 else if (s->tcb.state == TCP_ST_CLOSED) {
91 tcp_stream_release(s);
99 * Consider 2 pkt_info *equal* if their:
100 * - types (IPv4/IPv6)
103 * - TCP src and dst ports
104 * - IP src and dst addresses
108 pkt_info_bulk_eq(const union pkt_info pi[], uint32_t num)
114 if (pi[0].tf.type == TLE_V4) {
115 while (i != num && xmm_cmp(&pi[0].raw, &pi[i].raw) == 0)
118 } else if (pi[0].tf.type == TLE_V6) {
120 pi[0].raw.u64[0] == pi[i].raw.u64[0] &&
121 ymm_cmp(&pi[0].addr6->raw,
122 &pi[i].addr6->raw) == 0)
130 pkt_info_bulk_syneq(const union pkt_info pi[], uint32_t num)
136 if (pi[0].tf.type == TLE_V4) {
137 while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
138 pi[0].port.dst == pi[i].port.dst &&
139 pi[0].addr4.dst == pi[i].addr4.dst)
142 } else if (pi[0].tf.type == TLE_V6) {
143 while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
144 pi[0].port.dst == pi[i].port.dst &&
145 xmm_cmp(&pi[0].addr6->dst,
146 &pi[i].addr6->dst) == 0)
154 stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[],
157 _rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
160 static inline uint32_t
161 stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
164 return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
167 static inline uint32_t
168 get_ip_pid(struct tle_dev *dev, uint32_t num, uint32_t type, uint32_t st)
173 pa = &dev->tx.packet_id[type];
176 pid = rte_atomic32_add_return(pa, num);
179 pid = rte_atomic32_read(pa);
180 rte_atomic32_set(pa, pid + num);
186 fill_tcph(struct rte_tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
187 uint32_t seq, uint8_t hlen, uint8_t flags)
191 l4h->src_port = port.dst;
192 l4h->dst_port = port.src;
194 wnd = (flags & TCP_FLAG_SYN) ?
195 RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
196 tcb->rcv.wnd >> tcb->rcv.wscale;
198 /* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
199 l4h->sent_seq = rte_cpu_to_be_32(seq);
200 l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
201 l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET;
202 l4h->tcp_flags = flags;
203 l4h->rx_win = rte_cpu_to_be_16(wnd);
207 if (flags & TCP_FLAG_SYN)
208 fill_syn_opts(l4h + 1, &tcb->so);
209 else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0)
210 fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
214 tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s,
215 const struct tle_dest *dst, uint64_t ol_flags,
216 union l4_ports port, uint32_t seq, uint32_t flags,
217 uint32_t pid, uint32_t swcsm)
219 uint32_t l4, len, plen;
220 struct rte_tcp_hdr *l4h;
223 len = dst->l2_len + dst->l3_len;
226 if (flags & TCP_FLAG_SYN)
227 l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_MAX;
228 else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0)
229 l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS;
233 /* adjust mbuf to put L2/L3/L4 headers into it. */
234 l2h = rte_pktmbuf_prepend(m, len + l4);
238 /* copy L2/L3 header */
239 rte_memcpy(l2h, dst->hdr, len);
241 /* setup TCP header & options */
242 l4h = (struct rte_tcp_hdr *)(l2h + len);
243 fill_tcph(l4h, &s->tcb, port, seq, l4, flags);
245 /* setup mbuf TX offload related fields. */
246 m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0);
247 m->ol_flags |= ol_flags;
249 /* update proto specific fields. */
251 if (s->s.type == TLE_V4) {
252 struct rte_ipv4_hdr *l3h;
253 l3h = (struct rte_ipv4_hdr *)(l2h + dst->l2_len);
254 l3h->packet_id = rte_cpu_to_be_16(pid);
255 l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4);
257 if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
258 l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
261 l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
263 if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0)
264 l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
266 struct rte_ipv6_hdr *l3h;
267 l3h = (struct rte_ipv6_hdr *)(l2h + dst->l2_len);
268 l3h->payload_len = rte_cpu_to_be_16(plen + l4);
269 if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
270 l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
272 l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
279 * That function supposed to be used only for data packets.
280 * Assumes that L2/L3/L4 headers and mbuf fields already setup properly.
281 * - updates tcp SEG.SEQ, SEG.ACK, TS.VAL, TS.ECR.
282 * - if no HW cksum offloads are enabled, calculates TCP checksum.
285 tcp_update_mbuf(struct rte_mbuf *m, uint32_t type, const struct tcb *tcb,
286 uint32_t seq, uint32_t pid)
288 struct rte_tcp_hdr *l4h;
291 len = m->l2_len + m->l3_len;
292 l4h = rte_pktmbuf_mtod_offset(m, struct rte_tcp_hdr *, len);
294 l4h->sent_seq = rte_cpu_to_be_32(seq);
295 l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
297 if (tcb->so.ts.raw != 0)
298 fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
300 if (type == TLE_V4) {
301 struct rte_ipv4_hdr *l3h;
302 l3h = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *,
304 l3h->hdr_checksum = 0;
305 l3h->packet_id = rte_cpu_to_be_16(pid);
306 if ((m->ol_flags & PKT_TX_IP_CKSUM) == 0)
307 l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
310 /* have to calculate TCP checksum in SW */
311 if ((m->ol_flags & PKT_TX_TCP_CKSUM) == 0) {
315 if (type == TLE_V4) {
316 struct rte_ipv4_hdr *l3h;
317 l3h = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *,
319 l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
322 struct rte_ipv6_hdr *l3h;
323 l3h = rte_pktmbuf_mtod_offset(m, struct rte_ipv6_hdr *,
325 l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
330 /* Send data packets that need to be ACK-ed by peer */
331 static inline uint32_t
332 tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num)
334 uint32_t bsz, i, nb, nbm;
336 struct tle_drb *drb[num];
338 /* calculate how many drbs are needed.*/
339 bsz = s->tx.drb.nb_elem;
340 nbm = (num + bsz - 1) / bsz;
342 /* allocate drbs, adjust number of packets. */
343 nb = stream_drb_alloc(s, drb, nbm);
345 /* drb ring is empty. */
354 /* enqueue pkts for TX. */
356 i = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)m,
359 /* free unused drbs. */
361 stream_drb_free(s, drb + nbm - nb, nb);
366 static inline uint32_t
367 tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
370 uint32_t fail, i, k, n, mss, pid, plen, sz, tn, type;
373 struct rte_mbuf *mo[MAX_PKT_BURST + TCP_MAX_PKT_SEG];
375 mss = s->tcb.snd.mss;
379 pid = get_ip_pid(dev, num, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
384 for (i = 0; i != num && sl->len != 0 && fail == 0; i++) {
387 sz = RTE_MIN(sl->len, mss);
388 plen = PKT_L4_PLEN(mb);
390 /*fast path, no need to use indirect mbufs. */
393 /* update pkt TCP header */
394 tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid + i);
396 /* keep mbuf till ACK is received. */
397 rte_pktmbuf_refcnt_update(mb, 1);
401 /* remaining snd.wnd is less them MSS, send nothing */
404 /* packet indirection needed */
408 if (k >= MAX_PKT_BURST) {
409 n = tx_data_pkts(s, mo, k);
417 n = tx_data_pkts(s, mo, k);
423 sz = tcp_mbuf_seq_free(mo + n, fail);
432 * gets data from stream send buffer, updates it and
433 * queues it into TX device queue.
434 * Note that this function and is not MT safe.
436 static inline uint32_t
437 tx_nxt_data(struct tle_tcp_stream *s, uint32_t tms)
439 uint32_t n, num, tn, wnd;
440 struct rte_mbuf **mi;
444 wnd = s->tcb.snd.wnd - (uint32_t)(s->tcb.snd.nxt - s->tcb.snd.una);
445 sl.seq = s->tcb.snd.nxt;
446 sl.len = RTE_MIN(wnd, s->tcb.snd.cwnd);
451 /* update send timestamp */
455 /* get group of packets */
456 mi = tcp_txq_get_nxt_objs(s, &num);
458 /* stream send buffer is empty */
462 /* queue data packets for TX */
463 n = tx_data_bulk(s, &sl, mi, num);
466 /* update consumer head */
467 tcp_txq_set_nxt_head(s, n);
470 s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt;
475 free_una_data(struct tle_tcp_stream *s, uint32_t len)
477 uint32_t i, num, plen;
478 struct rte_mbuf **mi;
483 /* get group of packets */
484 mi = tcp_txq_get_una_objs(s, &num);
489 /* free acked data */
490 for (i = 0; i != num && plen != len; i++) {
491 uint32_t next_pkt_len = PKT_L4_PLEN(mi[i]);
492 if (plen + next_pkt_len > len) {
493 /* keep SND.UNA at the start of the packet */
497 plen += next_pkt_len;
499 rte_pktmbuf_free(mi[i]);
502 /* update consumer tail */
503 tcp_txq_set_una_tail(s, i);
504 } while (plen < len);
506 s->tcb.snd.una += len;
509 * that could happen in case of retransmit,
510 * adjust SND.NXT with SND.UNA.
512 if (s->tcb.snd.una > s->tcb.snd.nxt) {
513 tcp_txq_rst_nxt_head(s);
514 s->tcb.snd.nxt = s->tcb.snd.una;
518 static inline uint16_t
519 calc_smss(uint16_t mss, const struct tle_dest *dst)
523 n = dst->mtu - dst->l2_len - dst->l3_len - TCP_TX_HDR_DACK;
524 mss = RTE_MIN(n, mss);
530 * min (10*MSS, max (2*MSS, 14600))
532 * or using user provided initial congestion window (icw)
533 * min (10*MSS, max (2*MSS, icw))
535 static inline uint32_t
536 initial_cwnd(uint32_t smss, uint32_t icw)
538 return RTE_MIN(10 * smss, RTE_MAX(2 * smss, icw));
542 * queue standalone packet to he particular output device
544 * - L2/L3/L4 headers should be already set.
545 * - packet fits into one segment.
548 send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m)
553 if (stream_drb_alloc(s, &drb, 1) == 0)
556 /* enqueue pkt for TX. */
558 n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1,
561 /* free unused drbs. */
563 stream_drb_free(s, &drb, 1);
565 return (n == 1) ? 0 : -ENOBUFS;
569 send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
572 const struct tle_dest *dst;
578 pid = get_ip_pid(dst->dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
580 rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1);
582 rc = send_pkt(s, dst->dev, m);
588 send_rst(struct tle_tcp_stream *s, uint32_t seq)
593 m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
597 rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST);
605 send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags)
611 m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
615 seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0);
618 rc = send_ctrl_pkt(s, m, seq, flags);
624 s->tcb.snd.ack = s->tcb.rcv.nxt;
630 sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
631 const union seg_info *si, uint32_t ts, struct rte_mbuf *m)
635 uint32_t pid, seq, type;
639 const struct rte_tcp_hdr *th;
643 /* get destination information. */
647 da = &pi->addr6->src;
649 rc = stream_get_dest(&s->s, da, &dst);
653 th = rte_pktmbuf_mtod_offset(m, const struct rte_tcp_hdr *,
654 m->l2_len + m->l3_len);
655 get_syn_opts(&s->tcb.so, (uintptr_t)(th + 1), m->l4_len - sizeof(*th));
657 /* reset wscale option if timestamp is not present */
658 if (s->tcb.so.ts.val == 0)
659 s->tcb.so.wscale = 0;
661 s->tcb.rcv.nxt = si->seq + 1;
662 seq = sync_gen_seq(pi, s->tcb.rcv.nxt, ts, s->tcb.so.mss,
663 s->s.ctx->prm.hash_alg,
664 &s->s.ctx->prm.secret_key);
665 s->tcb.so.ts.ecr = s->tcb.so.ts.val;
666 s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale);
667 s->tcb.so.wscale = (s->tcb.so.wscale == TCP_WSCALE_NONE) ?
668 TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
669 s->tcb.so.mss = calc_smss(dst.mtu, &dst);
671 /* reset mbuf's data contents. */
672 len = m->l2_len + m->l3_len + m->l4_len;
674 if (rte_pktmbuf_adj(m, len) == NULL)
678 pid = get_ip_pid(dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
680 rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq,
681 TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
683 rc = send_pkt(s, dev, m);
690 * There are four cases for the acceptability test for an incoming segment:
691 * Segment Receive Test
693 * ------- ------- -------------------------------------------
694 * 0 0 SEG.SEQ = RCV.NXT
695 * 0 >0 RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
696 * >0 0 not acceptable
697 * >0 >0 RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
698 * or RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
701 check_seqn(const struct tcb *tcb, uint32_t seqn, uint32_t len)
706 if (seqn - tcb->rcv.nxt >= tcb->rcv.wnd &&
707 n - tcb->rcv.nxt > tcb->rcv.wnd)
713 static inline union tle_tcp_tsopt
714 rx_tms_opt(const struct tcb *tcb, const struct rte_mbuf *mb)
716 union tle_tcp_tsopt ts;
718 const struct rte_tcp_hdr *th;
720 if (tcb->so.ts.val != 0) {
721 opt = rte_pktmbuf_mtod_offset(mb, uintptr_t,
722 mb->l2_len + mb->l3_len + sizeof(*th));
723 ts = get_tms_opts(opt, mb->l4_len - sizeof(*th));
731 * PAWS and sequence check.
735 rx_check_seq(struct tcb *tcb, uint32_t seq, uint32_t len,
736 const union tle_tcp_tsopt ts)
740 /* RFC 1323 4.2.1 R2 */
741 rc = check_seqn(tcb, seq, len);
747 /* RFC 1323 4.2.1 R1 */
748 if (tcp_seq_lt(ts.val, tcb->rcv.ts))
751 /* RFC 1323 4.2.1 R3 */
752 if (tcp_seq_leq(seq, tcb->snd.ack) &&
753 tcp_seq_lt(tcb->snd.ack, seq + len))
754 tcb->rcv.ts = ts.val;
761 rx_check_ack(const struct tcb *tcb, uint32_t ack)
765 max = (uint32_t)RTE_MAX(tcb->snd.nxt, tcb->snd.rcvr);
767 if (tcp_seq_leq(tcb->snd.una, ack) && tcp_seq_leq(ack, max))
774 rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
775 const union tle_tcp_tsopt ts)
779 rc = rx_check_seq(tcb, seq, len, ts);
780 rc |= rx_check_ack(tcb, ack);
785 restore_syn_opt(union seg_info *si, union tle_tcp_tsopt *to,
786 const union pkt_info *pi, uint32_t ts, const struct rte_mbuf *mb,
787 uint32_t hash_alg, rte_xmm_t *secret_key)
791 const struct rte_tcp_hdr *th;
793 /* check that ACK, etc fields are what we expected. */
794 rc = sync_check_ack(pi, si->seq, si->ack - 1, ts,
802 th = rte_pktmbuf_mtod_offset(mb, const struct rte_tcp_hdr *,
803 mb->l2_len + mb->l3_len);
804 len = mb->l4_len - sizeof(*th);
805 to[0] = get_tms_opts((uintptr_t)(th + 1), len);
810 stream_term(struct tle_tcp_stream *s)
814 s->tcb.state = TCP_ST_CLOSED;
819 /* close() was already invoked, schedule final cleanup */
820 if ((s->tcb.uop & TCP_OP_CLOSE) != 0) {
822 dr = CTX_TCP_SDR(s->s.ctx);
823 STAILQ_INSERT_TAIL(&dr->be, &s->s, link);
825 /* notify user that stream need to be closed */
826 } else if (s->err.ev != NULL)
827 tle_event_raise(s->err.ev);
828 else if (s->err.cb.func != NULL)
829 s->err.cb.func(s->err.cb.data, &s->s);
833 stream_fill_dest(struct tle_tcp_stream *s)
841 da = &s->s.ipv4.addr.src;
843 da = &s->s.ipv6.addr.src;
845 rc = stream_get_dest(&s->s, da, &s->tx.dst);
846 return (rc < 0) ? rc : 0;
851 * for now rtt is calculated based on the tcp TMS option,
852 * later add real-time one
855 estimate_stream_rto(struct tle_tcp_stream *s, uint32_t tms)
859 if (s->tcb.so.ts.ecr) {
860 rtt = tms - s->tcb.so.ts.ecr;
861 rto_estimate(&s->tcb, rtt);
863 s->tcb.snd.rto = TCP_RTO_DEFAULT;
867 * helper function, prepares a new accept stream.
870 accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
871 struct tle_tcp_stream *cs, const union tle_tcp_tsopt *to,
872 uint32_t tms, const union pkt_info *pi, const union seg_info *si)
876 /* some TX still pending for that stream. */
877 if (TCP_STREAM_TX_PENDING(cs))
880 /* setup L4 ports and L3 addresses fields. */
881 cs->s.port.raw = pi->port.raw;
882 cs->s.pmsk.raw = UINT32_MAX;
884 if (pi->tf.type == TLE_V4) {
885 cs->s.ipv4.addr = pi->addr4;
886 cs->s.ipv4.mask.src = INADDR_NONE;
887 cs->s.ipv4.mask.dst = INADDR_NONE;
888 } else if (pi->tf.type == TLE_V6) {
889 cs->s.ipv6.addr = *pi->addr6;
890 rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
891 sizeof(cs->s.ipv6.mask.src));
892 rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
893 sizeof(cs->s.ipv6.mask.dst));
897 sync_fill_tcb(&cs->tcb, si, to);
898 cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
900 estimate_stream_rto(cs, tms);
902 /* copy streams type & flags. */
903 cs->s.type = ps->s.type;
904 cs->flags = ps->flags;
906 /* retrive and cache destination information. */
907 rc = stream_fill_dest(cs);
911 /* update snd.mss with SMSS value */
912 cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
914 /* setup congestion variables */
915 cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss, ps->tcb.snd.cwnd);
916 cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
917 cs->tcb.snd.rto_tw = ps->tcb.snd.rto_tw;
919 cs->tcb.state = TCP_ST_ESTABLISHED;
921 /* add stream to the table */
922 cs->ste = stbl_add_stream(st, pi, cs);
926 cs->tcb.uop |= TCP_OP_ACCEPT;
933 * ACK for new connection request arrived.
934 * Check that the packet meets all conditions and try to open a new stream.
936 * < 0 - invalid packet
937 * == 0 - packet is valid and new stream was opened for it.
938 * > 0 - packet is valid, but failed to open new stream.
941 rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
942 const union pkt_info *pi, union seg_info *si,
943 uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
947 struct tle_stream *ts;
948 struct tle_tcp_stream *cs;
949 union tle_tcp_tsopt to;
953 if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
957 rc = restore_syn_opt(si, &to, pi, tms, mb, ctx->prm.hash_alg,
958 &ctx->prm.secret_key);
962 /* allocate new stream */
963 cs = tcp_stream_get(ctx, 0);
967 /* prepare stream to handle new connection */
968 if (accept_prep_stream(s, st, cs, &to, tms, pi, si) == 0) {
970 /* put new stream in the accept queue */
972 if (_rte_ring_enqueue_burst(s->rx.q,
973 (void * const *)&ts, 1) == 1) {
978 /* cleanup on failure */
980 stbl_del_stream(st, cs->ste, cs, 0);
984 tcp_stream_reset(ctx, cs);
989 data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf **mb, uint32_t hlen,
990 uint32_t *seqn, uint32_t *plen)
992 uint32_t len, n, seq;
997 rte_pktmbuf_adj(*mb, hlen);
1000 /* cut off the start of the packet */
1001 else if (tcp_seq_lt(seq, tcb->rcv.nxt)) {
1002 n = tcb->rcv.nxt - seq;
1006 *mb = _rte_pktmbuf_adj(*mb, n);
1014 static inline uint32_t
1015 rx_ackdata(struct tle_tcp_stream *s, uint32_t ack)
1019 n = ack - (uint32_t)s->tcb.snd.una;
1021 /* some more data was acked. */
1024 /* advance SND.UNA and free related packets. */
1025 k = rte_ring_free_count(s->tx.q);
1026 free_una_data(s, n);
1028 /* mark the stream as available for writing */
1029 if (rte_ring_free_count(s->tx.q) != 0) {
1030 if (s->tx.ev != NULL)
1031 tle_event_raise(s->tx.ev);
1032 else if (k == 0 && s->tx.cb.func != NULL)
1033 s->tx.cb.func(s->tx.cb.data, &s->s);
1041 stream_timewait(struct tle_tcp_stream *s, uint32_t rto)
1044 s->tcb.state = TCP_ST_TIME_WAIT;
1045 s->tcb.snd.rto = rto;
1052 rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
1057 s->tcb.rcv.nxt += 1;
1059 ackfin = (s->tcb.snd.una == s->tcb.snd.fss);
1060 state = s->tcb.state;
1062 if (state == TCP_ST_ESTABLISHED) {
1063 s->tcb.state = TCP_ST_CLOSE_WAIT;
1064 /* raise err.ev & err.cb */
1065 if (s->err.ev != NULL)
1066 tle_event_raise(s->err.ev);
1067 else if (s->err.cb.func != NULL)
1068 s->err.cb.func(s->err.cb.data, &s->s);
1069 } else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) {
1070 rsp->flags |= TCP_FLAG_ACK;
1072 stream_timewait(s, s->tcb.snd.rto_tw);
1074 s->tcb.state = TCP_ST_CLOSING;
1075 } else if (state == TCP_ST_FIN_WAIT_2) {
1076 rsp->flags |= TCP_FLAG_ACK;
1077 stream_timewait(s, s->tcb.snd.rto_tw);
1078 } else if (state == TCP_ST_LAST_ACK && ackfin != 0) {
1084 * FIN process for ESTABLISHED state
1086 * 0 < - error occurred
1087 * 0 - FIN was processed OK, and mbuf can be free/reused.
1088 * 0 > - FIN was processed OK and mbuf can't be free/reused.
1091 rx_fin(struct tle_tcp_stream *s, uint32_t state,
1092 const union seg_info *si, struct rte_mbuf *mb,
1093 struct resp_info *rsp)
1095 uint32_t hlen, plen, seq;
1097 union tle_tcp_tsopt ts;
1099 hlen = PKT_L234_HLEN(mb);
1100 plen = mb->pkt_len - hlen;
1103 ts = rx_tms_opt(&s->tcb, mb);
1104 ret = rx_check_seqack(&s->tcb, seq, si->ack, plen, ts);
1108 if (state < TCP_ST_ESTABLISHED)
1113 ret = data_pkt_adjust(&s->tcb, &mb, hlen, &seq, &plen);
1116 if (rx_data_enqueue(s, seq, plen, &mb, 1) != 1)
1121 * fast-path: all data & FIN was already sent out
1122 * and now is acknowledged.
1124 if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1125 si->ack == (uint32_t)s->tcb.snd.nxt) {
1126 s->tcb.snd.una = s->tcb.snd.fss;
1128 /* conventional ACK processiing */
1130 rx_ackdata(s, si->ack);
1132 /* some fragments still missing */
1133 if (seq + plen != s->tcb.rcv.nxt) {
1134 s->tcb.rcv.frs.seq = seq + plen;
1135 s->tcb.rcv.frs.on = 1;
1137 rx_fin_state(s, rsp);
1143 rx_rst(struct tle_tcp_stream *s, uint32_t state, uint32_t flags,
1144 const union seg_info *si)
1149 * RFC 793: In all states except SYN-SENT, all reset (RST) segments
1150 * are validated by checking their SEQ-fields.
1151 * A reset is valid if its sequence number is in the window.
1152 * In the SYN-SENT state (a RST received in response to an initial SYN),
1153 * the RST is acceptable if the ACK field acknowledges the SYN.
1155 if (state == TCP_ST_SYN_SENT) {
1156 rc = ((flags & TCP_FLAG_ACK) == 0 ||
1157 si->ack != s->tcb.snd.nxt) ?
1162 rc = check_seqn(&s->tcb, si->seq, 0);
1171 * check do we have FIN that was received out-of-order.
1172 * if yes, try to process it now.
1175 rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp)
1177 if (s->tcb.rcv.frs.on != 0 && s->tcb.rcv.nxt == s->tcb.rcv.frs.seq)
1178 rx_fin_state(s, rsp);
1182 dack_info_init(struct dack_info *tack, const struct tcb *tcb)
1184 static const struct dack_info zero_dack;
1186 tack[0] = zero_dack;
1187 tack->ack = tcb->snd.una;
1188 tack->segs.dup = tcb->rcv.dupack;
1189 tack->wu.raw = tcb->snd.wu.raw;
1190 tack->wnd = tcb->snd.wnd >> tcb->snd.wscale;
1194 ack_window_update(struct tcb *tcb, const struct dack_info *tack)
1196 tcb->snd.wu.raw = tack->wu.raw;
1197 tcb->snd.wnd = tack->wnd << tcb->snd.wscale;
1201 ack_cwnd_update(struct tcb *tcb, uint32_t acked, const struct dack_info *tack)
1205 n = tack->segs.ack * tcb->snd.mss;
1207 /* slow start phase, RFC 5681 3.1 (2) */
1208 if (tcb->snd.cwnd < tcb->snd.ssthresh)
1209 tcb->snd.cwnd += RTE_MIN(acked, n);
1210 /* congestion avoidance phase, RFC 5681 3.1 (3) */
1212 tcb->snd.cwnd += RTE_MAX(1U, n * tcb->snd.mss / tcb->snd.cwnd);
1216 rto_ssthresh_update(struct tcb *tcb)
1220 /* RFC 5681 3.1 (4) */
1221 n = (tcb->snd.nxt - tcb->snd.una) / 2;
1222 k = 2 * tcb->snd.mss;
1223 tcb->snd.ssthresh = RTE_MAX(n, k);
1227 rto_cwnd_update(struct tcb *tcb)
1230 if (tcb->snd.nb_retx == 0)
1231 rto_ssthresh_update(tcb);
1234 * RFC 5681 3.1: upon a timeout cwnd MUST be set to
1235 * no more than 1 full-sized segment.
1237 tcb->snd.cwnd = tcb->snd.mss;
1241 ack_info_update(struct dack_info *tack, const union seg_info *si,
1242 int32_t badseq, uint32_t dlen, const union tle_tcp_tsopt ts)
1245 tack->segs.badseq++;
1249 /* segnt with incoming data */
1250 tack->segs.data += (dlen != 0);
1252 /* segment with newly acked data */
1253 if (tcp_seq_lt(tack->ack, si->ack)) {
1256 tack->ack = si->ack;
1260 * RFC 5681: An acknowledgment is considered a "duplicate" when:
1261 * (a) the receiver of the ACK has outstanding data
1262 * (b) the incoming acknowledgment carries no data
1263 * (c) the SYN and FIN bits are both off
1264 * (d) the acknowledgment number is equal to the TCP.UNA
1265 * (e) the advertised window in the incoming acknowledgment equals the
1266 * advertised window in the last incoming acknowledgment.
1268 * Here will have only to check only for (b),(d),(e).
1269 * (a) will be checked later for the whole bulk of packets,
1270 * (c) should never happen here.
1272 } else if (dlen == 0 && si->wnd == tack->wnd && ++tack->segs.dup == 3) {
1273 tack->dup3.seg = tack->segs.ack + 1;
1274 tack->dup3.ack = tack->ack;
1279 * If SND.UNA < SEG.ACK =< SND.NXT, the send window should be
1280 * updated. If (SND.WL1 < SEG.SEQ or (SND.WL1 = SEG.SEQ and
1281 * SND.WL2 =< SEG.ACK)), set SND.WND <- SEG.WND, set
1282 * SND.WL1 <- SEG.SEQ, and set SND.WL2 <- SEG.ACK.
1284 if (tcp_seq_lt(tack->wu.wl1, si->seq) ||
1285 (si->seq == tack->wu.wl1 &&
1286 tcp_seq_leq(tack->wu.wl2, si->ack))) {
1288 tack->wu.wl1 = si->seq;
1289 tack->wu.wl2 = si->ack;
1290 tack->wnd = si->wnd;
1294 static inline uint32_t
1295 rx_data_ack(struct tle_tcp_stream *s, struct dack_info *tack,
1296 const union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[],
1297 int32_t rc[], uint32_t num)
1299 uint32_t i, j, k, n, t;
1300 uint32_t hlen, plen, seq, tlen;
1302 union tle_tcp_tsopt ts;
1305 for (i = 0; i != num; i = j) {
1307 hlen = PKT_L234_HLEN(mb[i]);
1308 plen = mb[i]->pkt_len - hlen;
1311 ts = rx_tms_opt(&s->tcb, mb[i]);
1312 ret = rx_check_seqack(&s->tcb, seq, si[i].ack, plen, ts);
1314 /* account segment received */
1315 ack_info_update(tack, &si[i], ret != 0, plen, ts);
1318 /* skip duplicate data, if any */
1319 ret = data_pkt_adjust(&s->tcb, &mb[i], hlen,
1331 /* group sequential packets together. */
1332 for (tlen = plen; j != num; tlen += plen, j++) {
1334 hlen = PKT_L234_HLEN(mb[j]);
1335 plen = mb[j]->pkt_len - hlen;
1337 /* not consecutive packet */
1338 if (plen == 0 || seq + tlen != si[j].seq)
1342 ts = rx_tms_opt(&s->tcb, mb[j]);
1343 ret = rx_check_seqack(&s->tcb, si[j].seq, si[j].ack,
1349 /* account for segment received */
1350 ack_info_update(tack, &si[j], ret != 0, plen, ts);
1352 rte_pktmbuf_adj(mb[j], hlen);
1357 /* account for OFO data */
1358 if (seq != s->tcb.rcv.nxt)
1359 tack->segs.ofo += n;
1361 /* enqueue packets */
1362 t = rx_data_enqueue(s, seq, tlen, mb + i, n);
1364 /* if we are out of space in stream recv buffer. */
1365 for (; t != n; t++) {
1376 start_fast_retransmit(struct tle_tcp_stream *s)
1382 /* RFC 6582 3.2.2 */
1383 tcb->snd.rcvr = tcb->snd.nxt;
1384 tcb->snd.fastack = 1;
1386 /* RFC 5681 3.2.2 */
1387 rto_ssthresh_update(tcb);
1389 /* RFC 5681 3.2.3 */
1390 tcp_txq_rst_nxt_head(s);
1391 tcb->snd.nxt = tcb->snd.una;
1392 tcb->snd.cwnd = tcb->snd.ssthresh + 3 * tcb->snd.mss;
1396 stop_fast_retransmit(struct tle_tcp_stream *s)
1402 n = tcb->snd.nxt - tcb->snd.una;
1403 tcb->snd.cwnd = RTE_MIN(tcb->snd.ssthresh,
1404 RTE_MAX(n, tcb->snd.mss) + tcb->snd.mss);
1405 tcb->snd.fastack = 0;
1409 in_fast_retransmit(struct tle_tcp_stream *s, uint32_t ack_len, uint32_t ack_num,
1417 /* RFC 5682 3.2.3 partial ACK */
1420 n = ack_num * tcb->snd.mss;
1422 tcb->snd.cwnd -= ack_len - n;
1424 tcb->snd.cwnd -= ack_len % tcb->snd.mss;
1427 * For the first partial ACK that arrives
1428 * during fast recovery, also reset the
1431 if (tcb->snd.fastack == 1)
1434 tcb->snd.fastack += ack_num;
1437 /* RFC 5681 3.2.4 */
1438 } else if (dup_num > 3) {
1439 s->tcb.snd.cwnd += (dup_num - 3) * tcb->snd.mss;
1447 process_ack(struct tle_tcp_stream *s, uint32_t acked,
1448 const struct dack_info *tack)
1455 if (s->tcb.snd.fastack == 0) {
1459 /* RFC 6582 3.2.2 switch to fast retransmit mode */
1460 if (tack->dup3.seg != 0 && s->tcb.snd.una != s->tcb.snd.nxt &&
1461 s->tcb.snd.una >= s->tcb.snd.rcvr) {
1463 start_fast_retransmit(s);
1464 in_fast_retransmit(s,
1465 tack->ack - tack->dup3.ack,
1466 tack->segs.ack - tack->dup3.seg - 1,
1469 /* remain in normal mode */
1470 } else if (acked != 0) {
1471 ack_cwnd_update(&s->tcb, acked, tack);
1475 /* fast retransmit mode */
1478 /* remain in fast retransmit mode */
1479 if (s->tcb.snd.una < s->tcb.snd.rcvr) {
1481 send = in_fast_retransmit(s, acked, tack->segs.ack,
1484 /* RFC 5682 3.2.3 full ACK */
1485 stop_fast_retransmit(s);
1488 /* if we have another series of dup ACKs */
1489 if (tack->dup3.seg != 0 &&
1490 s->tcb.snd.una != s->tcb.snd.nxt &&
1491 tcp_seq_leq((uint32_t)s->tcb.snd.rcvr,
1494 /* restart fast retransmit again. */
1495 start_fast_retransmit(s);
1496 send = in_fast_retransmit(s,
1497 tack->ack - tack->dup3.ack,
1498 tack->segs.ack - tack->dup3.seg - 1,
1508 * our FIN was acked, stop rto timer, change stream state,
1509 * and possibly close the stream.
1512 rx_ackfin(struct tle_tcp_stream *s)
1516 s->tcb.snd.una = s->tcb.snd.fss;
1519 state = s->tcb.state;
1520 if (state == TCP_ST_LAST_ACK)
1522 else if (state == TCP_ST_FIN_WAIT_1) {
1524 s->tcb.state = TCP_ST_FIN_WAIT_2;
1525 } else if (state == TCP_ST_CLOSING) {
1526 stream_timewait(s, s->tcb.snd.rto_tw);
1531 rx_process_ack(struct tle_tcp_stream *s, uint32_t ts,
1532 const struct dack_info *tack)
1537 s->tcb.rcv.dupack = tack->segs.dup;
1539 n = rx_ackdata(s, tack->ack);
1540 send = process_ack(s, n, tack);
1542 /* try to send more data. */
1543 if ((n != 0 || send != 0) && tcp_txq_nxt_cnt(s) != 0)
1544 txs_enqueue(s->s.ctx, s);
1546 /* restart RTO timer. */
1547 if (s->tcb.snd.nxt != s->tcb.snd.una)
1550 /* update rto, if fresh packet is here then calculate rtt */
1551 if (tack->ts.ecr != 0)
1552 rto_estimate(&s->tcb, ts - tack->ts.ecr);
1557 * returns negative value on failure, or zero on success.
1560 rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
1561 const union seg_info *si, struct rte_mbuf *mb,
1562 struct resp_info *rsp)
1564 struct tle_tcp_syn_opts so;
1565 struct rte_tcp_hdr *th;
1567 if (state != TCP_ST_SYN_SENT)
1571 * RFC 793 3.9: in the SYN-SENT state
1572 * If SEG.ACK =< ISS, or SEG.ACK > SND.NXT, send a reset
1573 * <SEQ=SEG.ACK><CTL=RST>
1574 * and discard the segment.
1575 * The connection remains in the same state.
1577 if (si->ack != (uint32_t)s->tcb.snd.nxt) {
1578 send_rst(s, si->ack);
1582 th = rte_pktmbuf_mtod_offset(mb, struct rte_tcp_hdr *,
1583 mb->l2_len + mb->l3_len);
1584 get_syn_opts(&so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th));
1588 s->tcb.snd.una = s->tcb.snd.nxt;
1589 s->tcb.snd.mss = calc_smss(so.mss, &s->tx.dst);
1590 s->tcb.snd.wnd = si->wnd << so.wscale;
1591 s->tcb.snd.wu.wl1 = si->seq;
1592 s->tcb.snd.wu.wl2 = si->ack;
1593 s->tcb.snd.wscale = so.wscale;
1595 /* setup congestion variables */
1596 s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd);
1597 s->tcb.snd.ssthresh = s->tcb.snd.wnd;
1599 s->tcb.rcv.ts = so.ts.val;
1600 s->tcb.rcv.irs = si->seq;
1601 s->tcb.rcv.nxt = si->seq + 1;
1603 /* if peer doesn't support WSCALE opt, recalculate RCV.WND */
1604 s->tcb.rcv.wscale = (so.wscale == TCP_WSCALE_NONE) ?
1605 TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
1606 s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
1608 /* calculate initial rto */
1609 rto_estimate(&s->tcb, ts - s->tcb.snd.ts);
1611 rsp->flags |= TCP_FLAG_ACK;
1614 s->tcb.state = TCP_ST_ESTABLISHED;
1617 if (s->tx.ev != NULL)
1618 tle_event_raise(s->tx.ev);
1619 else if (s->tx.cb.func != NULL)
1620 s->tx.cb.func(s->tx.cb.data, &s->s);
1625 static inline uint32_t
1626 rx_stream(struct tle_tcp_stream *s, uint32_t ts,
1627 const union pkt_info *pi, const union seg_info si[],
1628 struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1631 uint32_t i, k, n, state;
1633 struct resp_info rsp;
1634 struct dack_info tack;
1639 state = s->tcb.state;
1642 * first check for the states/flags where we don't
1643 * expect groups of packets.
1647 if ((pi->tf.flags & TCP_FLAG_RST) != 0) {
1650 rx_rst(s, state, pi->tf.flags, &si[i]);
1655 /* RFC 793: if the ACK bit is off drop the segment and return */
1656 } else if ((pi->tf.flags & TCP_FLAG_ACK) == 0) {
1659 * first check for the states/flags where we don't
1660 * expect groups of packets.
1663 /* process <SYN,ACK> */
1664 } else if ((pi->tf.flags & TCP_FLAG_SYN) != 0) {
1665 for (i = 0; i != num; i++) {
1666 ret = rx_synack(s, ts, state, &si[i], mb[i], &rsp);
1676 } else if ((pi->tf.flags & TCP_FLAG_FIN) != 0) {
1678 for (i = 0; i != num; i++) {
1679 ret = rx_fin(s, state, &si[i], mb[i], &rsp);
1689 /* normal data/ack packets */
1690 } else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
1692 /* process incoming data packets. */
1693 dack_info_init(&tack, &s->tcb);
1694 n = rx_data_ack(s, &tack, si, mb, rp, rc, num);
1696 /* follow up actions based on aggregated information */
1698 /* update SND.WND */
1699 ack_window_update(&s->tcb, &tack);
1702 * fast-path: all data & FIN was already sent out
1703 * and now is acknowledged.
1705 if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1706 tack.ack == (uint32_t)s->tcb.snd.nxt)
1709 rx_process_ack(s, ts, &tack);
1712 * send an immediate ACK if either:
1713 * - received segment with invalid seq/ack number
1714 * - received segment with OFO data
1715 * - received segment with INO data and no TX is scheduled
1718 if (tack.segs.badseq != 0 || tack.segs.ofo != 0 ||
1719 (tack.segs.data != 0 &&
1720 rte_atomic32_read(&s->tx.arm) == 0))
1721 rsp.flags |= TCP_FLAG_ACK;
1723 rx_ofo_fin(s, &rsp);
1728 /* unhandled state, drop all packets. */
1732 /* we have a response packet to send. */
1733 if (rsp.flags != 0) {
1734 send_ack(s, ts, rsp.flags);
1736 /* start the timer for FIN packet */
1737 if ((rsp.flags & TCP_FLAG_FIN) != 0)
1741 /* unprocessed packets */
1742 for (; i != num; i++, k++) {
1750 static inline uint32_t
1751 rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
1752 const union pkt_info *pi, const union seg_info si[],
1753 struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1758 if (tcp_stream_acquire(s) > 0) {
1759 i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1760 tcp_stream_release(s);
1764 for (i = 0; i != num; i++) {
1771 static inline uint32_t
1772 rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
1773 const union pkt_info pi[], union seg_info si[],
1774 struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1777 struct tle_tcp_stream *cs, *s;
1778 uint32_t i, k, n, state;
1781 s = rx_obtain_stream(dev, st, &pi[0], type);
1783 for (i = 0; i != num; i++) {
1791 state = s->tcb.state;
1793 if (state == TCP_ST_LISTEN) {
1795 /* one connection per flow */
1798 for (i = 0; i != num; i++) {
1800 ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs);
1802 /* valid packet encountered */
1806 /* invalid packet, keep trying to find a proper one */
1812 /* packet is valid, but we are out of streams to serve it */
1814 for (; i != num; i++, k++) {
1818 /* new stream is accepted */
1819 } else if (ret == 0) {
1821 /* inform listen stream about new connections */
1822 if (s->rx.ev != NULL)
1823 tle_event_raise(s->rx.ev);
1824 else if (s->rx.cb.func != NULL &&
1825 rte_ring_count(s->rx.q) == 1)
1826 s->rx.cb.func(s->rx.cb.data, &s->s);
1828 /* if there is no data, drop current packet */
1829 if (PKT_L4_PLEN(mb[i]) == 0) {
1834 /* process remaining packets for that stream */
1836 n = rx_new_stream(cs, ts, pi + i, si + i,
1837 mb + i, rp + k, rc + k, num - i);
1843 i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1847 tcp_stream_release(s);
1852 static inline uint32_t
1853 rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
1854 const union pkt_info pi[], const union seg_info si[],
1855 struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1858 struct tle_tcp_stream *s;
1862 s = rx_obtain_listen_stream(dev, &pi[0], type);
1864 for (i = 0; i != num; i++) {
1872 for (i = 0; i != num; i++) {
1874 /* check that this remote is allowed to connect */
1875 if (rx_check_stream(s, &pi[i]) != 0)
1878 /* syncokie: reply with <SYN,ACK> */
1879 ret = sync_ack(s, &pi[i], &si[i], ts, mb[i]);
1888 tcp_stream_release(s);
1893 tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
1894 struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
1897 struct tle_ctx *ctx;
1898 uint32_t i, j, k, mt, n, t, ts;
1899 union pkt_info pi[num];
1900 union seg_info si[num];
1902 uint8_t t[TLE_VNUM];
1907 ts = tcp_get_tms(ctx->cycles_ms_shift);
1908 st = CTX_TCP_STLB(ctx);
1909 mt = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0);
1913 /* extract packet info and check the L3/L4 csums */
1914 for (i = 0; i != num; i++) {
1916 get_pkt_info(pkt[i], &pi[i], &si[i]);
1919 pi[i].csf = check_pkt_csum(pkt[i], pi[i].csf, t, IPPROTO_TCP);
1923 if (stu.t[TLE_V4] != 0)
1924 stbl_lock(st, TLE_V4);
1925 if (stu.t[TLE_V6] != 0)
1926 stbl_lock(st, TLE_V6);
1929 for (i = 0; i != num; i += j) {
1933 /*basic checks for incoming packet */
1934 if (t >= TLE_VNUM || pi[i].csf != 0 || dev->dp[t] == NULL) {
1939 /* process input SYN packets */
1940 } else if (pi[i].tf.flags == TCP_FLAG_SYN) {
1941 j = pkt_info_bulk_syneq(pi + i, num - i);
1942 n = rx_syn(dev, t, ts, pi + i, si + i, pkt + i,
1946 j = pkt_info_bulk_eq(pi + i, num - i);
1947 n = rx_postsyn(dev, st, t, ts, pi + i, si + i, pkt + i,
1953 if (stu.t[TLE_V4] != 0)
1954 stbl_unlock(st, TLE_V4);
1955 if (stu.t[TLE_V6] != 0)
1956 stbl_unlock(st, TLE_V6);
1962 tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
1966 struct tle_tcp_stream *s;
1967 struct tle_memtank *mts;
1970 n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num);
1974 mts = CTX_TCP_MTS(ts->ctx);
1977 * if we still have packets to read,
1978 * then rearm stream RX event.
1980 if (n == num && rte_ring_count(s->rx.q) != 0) {
1981 if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
1982 tle_event_raise(s->rx.ev);
1983 tcp_stream_release(s);
1986 tle_memtank_grow(mts);
1991 tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
1993 uint32_t i, j, k, n;
1994 struct tle_drb *drb[num];
1995 struct tle_tcp_stream *s;
1997 /* extract packets from device TX queue. */
2000 n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
2006 /* free empty drbs and notify related streams. */
2008 for (i = 0; i != k; i = j) {
2010 for (j = i + 1; j != k && s == drb[j]->udata; j++)
2012 stream_drb_free(s, drb + i, j - i);
2019 stream_fill_pkt_info(const struct tle_tcp_stream *s, union pkt_info *pi)
2021 if (s->s.type == TLE_V4)
2022 pi->addr4 = s->s.ipv4.addr;
2024 pi->addr6 = &s->s.ipv6.addr;
2026 pi->port = s->s.port;
2027 pi->tf.type = s->s.type;
2031 stream_fill_addr(struct tle_tcp_stream *s, const struct sockaddr *addr)
2033 const struct sockaddr_in *in4;
2034 const struct sockaddr_in6 *in6;
2035 const struct tle_dev_param *prm;
2039 s->s.pmsk.raw = UINT32_MAX;
2041 /* setup L4 src ports and src address fields. */
2042 if (s->s.type == TLE_V4) {
2043 in4 = (const struct sockaddr_in *)addr;
2044 if (in4->sin_addr.s_addr == INADDR_ANY || in4->sin_port == 0)
2047 s->s.port.src = in4->sin_port;
2048 s->s.ipv4.addr.src = in4->sin_addr.s_addr;
2049 s->s.ipv4.mask.src = INADDR_NONE;
2050 s->s.ipv4.mask.dst = INADDR_NONE;
2052 } else if (s->s.type == TLE_V6) {
2053 in6 = (const struct sockaddr_in6 *)addr;
2054 if (memcmp(&in6->sin6_addr, &tle_ipv6_any,
2055 sizeof(tle_ipv6_any)) == 0 ||
2056 in6->sin6_port == 0)
2059 s->s.port.src = in6->sin6_port;
2060 rte_memcpy(&s->s.ipv6.addr.src, &in6->sin6_addr,
2061 sizeof(s->s.ipv6.addr.src));
2062 rte_memcpy(&s->s.ipv6.mask.src, &tle_ipv6_none,
2063 sizeof(s->s.ipv6.mask.src));
2064 rte_memcpy(&s->s.ipv6.mask.dst, &tle_ipv6_none,
2065 sizeof(s->s.ipv6.mask.dst));
2068 /* setup the destination device. */
2069 rc = stream_fill_dest(s);
2073 /* setup L4 dst address from device param */
2074 prm = &s->tx.dst.dev->prm;
2075 if (s->s.type == TLE_V4) {
2076 if (s->s.ipv4.addr.dst == INADDR_ANY)
2077 s->s.ipv4.addr.dst = prm->local_addr4.s_addr;
2078 } else if (memcmp(&s->s.ipv6.addr.dst, &tle_ipv6_any,
2079 sizeof(tle_ipv6_any)) == 0)
2080 memcpy(&s->s.ipv6.addr.dst, &prm->local_addr6,
2081 sizeof(s->s.ipv6.addr.dst));
2087 tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
2093 struct stbl_entry *se;
2095 /* fill stream address */
2096 rc = stream_fill_addr(s, addr);
2100 /* fill pkt info to generate seq.*/
2101 stream_fill_pkt_info(s, &pi);
2103 tms = tcp_get_tms(s->s.ctx->cycles_ms_shift);
2104 s->tcb.so.ts.val = tms;
2105 s->tcb.so.ts.ecr = 0;
2106 s->tcb.so.wscale = TCP_WSCALE_DEFAULT;
2107 s->tcb.so.mss = calc_smss(s->tx.dst.mtu, &s->tx.dst);
2109 /* note that rcv.nxt is 0 here for sync_gen_seq.*/
2110 seq = sync_gen_seq(&pi, s->tcb.rcv.nxt, tms, s->tcb.so.mss,
2111 s->s.ctx->prm.hash_alg,
2112 &s->s.ctx->prm.secret_key);
2113 s->tcb.snd.iss = seq;
2114 s->tcb.snd.rcvr = seq;
2115 s->tcb.snd.una = seq;
2116 s->tcb.snd.nxt = seq + 1;
2117 s->tcb.snd.rto = TCP_RTO_DEFAULT;
2118 s->tcb.snd.ts = tms;
2120 s->tcb.rcv.mss = s->tcb.so.mss;
2121 s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT;
2122 s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
2125 /* add the stream in stream table */
2126 st = CTX_TCP_STLB(s->s.ctx);
2127 se = stbl_add_stream_lock(st, s);
2132 /* put stream into the to-send queue */
2133 txs_enqueue(s->s.ctx, s);
2139 tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
2141 struct tle_tcp_stream *s;
2145 if (ts == NULL || addr == NULL)
2150 if (type >= TLE_VNUM)
2153 if (tcp_stream_try_acquire(s) > 0) {
2154 rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
2156 rc = (rc == 0) ? -EDEADLK : 0;
2161 tcp_stream_release(s);
2165 /* fill stream, prepare and transmit syn pkt */
2166 s->tcb.uop |= TCP_OP_CONNECT;
2167 rc = tx_syn(s, addr);
2168 tcp_stream_release(s);
2170 /* error happened, do a cleanup */
2172 tle_tcp_stream_close(ts);
2178 * Helper function for tle_tcp_stream_establish().
2179 * updates stream's TCB.
2182 tcb_establish(struct tle_tcp_stream *s, const struct tle_tcp_conn_info *ci)
2186 tms = tcp_get_tms(s->s.ctx->cycles_ms_shift);
2189 fill_tcb_snd(&s->tcb, ci->seq, ci->ack, ci->so.mss,
2190 ci->wnd, ci->so.wscale, &ci->so.ts);
2191 fill_tcb_rcv(&s->tcb, ci->seq, ci->so.wscale, &ci->so.ts);
2193 s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
2195 /* setup congestion variables */
2196 s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd);
2197 s->tcb.snd.ssthresh = s->tcb.snd.wnd;
2199 estimate_stream_rto(s, tms);
2203 * !!! add flgs to distinguish - add or not stream into the table.
2206 tle_tcp_stream_establish(struct tle_ctx *ctx,
2207 const struct tle_tcp_stream_param *prm,
2208 const struct tle_tcp_conn_info *ci)
2211 struct tle_tcp_stream *s;
2214 if (ctx == NULL || prm == NULL || ci == NULL) {
2215 rte_errno = -EINVAL;
2219 /* allocate new stream */
2220 s = tcp_stream_get(ctx, TLE_MTANK_ALLOC_CHUNK | TLE_MTANK_ALLOC_GROW);
2227 s->tcb.uop |= TCP_OP_ESTABLISH;
2229 /* check and use stream addresses and parameters */
2230 rc = tcp_stream_fill_prm(s, prm);
2234 /* retrieve and cache destination information. */
2235 rc = stream_fill_dest(s);
2239 /* add the stream to the stream table */
2240 st = CTX_TCP_STLB(s->s.ctx);
2241 s->ste = stbl_add_stream_lock(st, s);
2242 if (s->ste == NULL) {
2247 /* fill TCB from user provided data */
2248 tcb_establish(s, ci);
2249 s->tcb.state = TCP_ST_ESTABLISHED;
2254 /* cleanup on failure */
2256 tcp_stream_reset(ctx, s);
2265 tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2268 struct tle_tcp_stream *s;
2271 n = _rte_ring_mcs_dequeue_burst(s->rx.q, (void **)pkt, num);
2276 * if we still have packets to read,
2277 * then rearm stream RX event.
2279 if (n == num && rte_ring_count(s->rx.q) != 0) {
2280 if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
2281 tle_event_raise(s->rx.ev);
2282 tcp_stream_release(s);
2289 tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov,
2295 struct tle_tcp_stream *s;
2297 struct rxq_objs mo[2];
2301 /* get group of packets */
2302 mn = tcp_rxq_get_objs(s, mo);
2308 for (i = 0; i != iovcnt; i++) {
2311 n += _mbus_to_iovec(&iv, mo[0].mb + n, mo[0].num - n);
2312 if (iv.iov_len != 0) {
2320 if (i != iovcnt && mn != 1) {
2324 n += _mbus_to_iovec(&iv, mo[1].mb + n, mo[1].num - n);
2325 if (iv.iov_len != 0) {
2329 if (i + 1 != iovcnt)
2331 } while (++i != iovcnt);
2335 tcp_rxq_consume(s, tn);
2338 * if we still have packets to read,
2339 * then rearm stream RX event.
2341 if (i == iovcnt && rte_ring_count(s->rx.q) != 0) {
2342 if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
2343 tle_event_raise(s->rx.ev);
2344 tcp_stream_release(s);
2350 static inline int32_t
2351 tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
2352 struct rte_mbuf *segs[], uint32_t num)
2357 for (i = 0; i != num; i++) {
2358 /* Build L2/L3/L4 header */
2359 rc = tcp_fill_mbuf(segs[i], s, &s->tx.dst, ol_flags, s->s.port,
2360 0, TCP_FLAG_ACK, 0, 0);
2362 free_mbufs(segs, num);
2368 /* queue packets for further transmission. */
2369 rc = _rte_ring_enqueue_bulk(s->tx.q, (void **)segs, num);
2371 free_mbufs(segs, num);
2378 tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2380 uint32_t i, j, k, mss, n, state;
2383 struct tle_tcp_stream *s;
2384 struct rte_mbuf *segs[TCP_MAX_PKT_SEG];
2388 /* mark stream as not closable. */
2389 if (tcp_stream_acquire(s) < 0) {
2394 state = s->tcb.state;
2395 if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
2396 rte_errno = ENOTCONN;
2397 tcp_stream_release(s);
2401 mss = s->tcb.snd.mss;
2402 ol_flags = s->tx.dst.ol_flags;
2407 /* prepare and check for TX */
2408 for (i = k; i != num; i++) {
2409 if (pkt[i]->pkt_len > mss ||
2410 pkt[i]->nb_segs > TCP_MAX_PKT_SEG)
2412 rc = tcp_fill_mbuf(pkt[i], s, &s->tx.dst, ol_flags,
2413 s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2419 /* queue packets for further transmission. */
2420 n = _rte_ring_enqueue_burst(s->tx.q,
2421 (void **)pkt + k, (i - k));
2425 * for unsent, but already modified packets:
2426 * remove pkt l2/l3 headers, restore ol_flags
2429 ol_flags = ~s->tx.dst.ol_flags;
2430 for (j = k; j != i; j++) {
2431 rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len +
2434 pkt[j]->ol_flags &= ol_flags;
2444 /* segment large packet and enqueue for sending */
2445 } else if (i != num) {
2446 /* segment the packet. */
2447 rc = tcp_segmentation(pkt[i], segs, RTE_DIM(segs),
2454 rc = tx_segments(s, ol_flags, segs, rc);
2456 /* free the large mbuf */
2457 rte_pktmbuf_free(pkt[i]);
2458 /* set the mbuf as consumed */
2461 /* no space left in tx queue */
2466 /* notify BE about more data to send */
2468 txs_enqueue(s->s.ctx, s);
2469 /* if possible, re-arm stream write event. */
2470 if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2471 tle_event_raise(s->tx.ev);
2473 tcp_stream_release(s);
2479 tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp,
2480 const struct iovec *iov, int iovcnt)
2483 uint32_t j, k, n, num, slen, state;
2486 struct tle_tcp_stream *s;
2488 struct rte_mbuf *mb[2 * MAX_PKT_BURST];
2492 /* mark stream as not closable. */
2493 if (tcp_stream_acquire(s) < 0) {
2498 state = s->tcb.state;
2499 if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
2500 rte_errno = ENOTCONN;
2501 tcp_stream_release(s);
2505 /* figure out how many mbufs do we need */
2507 for (i = 0; i != iovcnt; i++)
2508 tsz += iov[i].iov_len;
2510 slen = rte_pktmbuf_data_room_size(mp);
2511 slen = RTE_MIN(slen, s->tcb.snd.mss);
2513 num = (tsz + slen - 1) / slen;
2514 n = rte_ring_free_count(s->tx.q);
2515 num = RTE_MIN(num, n);
2516 n = RTE_MIN(num, RTE_DIM(mb));
2518 /* allocate mbufs */
2519 if (rte_pktmbuf_alloc_bulk(mp, mb, n) != 0) {
2521 tcp_stream_release(s);
2525 /* copy data into the mbufs */
2528 for (i = 0; i != iovcnt; i++) {
2531 k += _iovec_to_mbsegs(&iv, slen, mb + k, n - k);
2532 if (iv.iov_len != 0) {
2538 /* partially filled segment */
2539 k += (k != n && mb[k]->data_len != 0);
2541 /* fill pkt headers */
2542 ol_flags = s->tx.dst.ol_flags;
2544 for (j = 0; j != k; j++) {
2545 rc = tcp_fill_mbuf(mb[j], s, &s->tx.dst, ol_flags,
2546 s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2551 /* if no error encountered, then enqueue pkts for transmission */
2553 k = _rte_ring_enqueue_burst(s->tx.q, (void **)mb, j);
2559 /* free pkts that were not enqueued */
2560 free_mbufs(mb + k, j - k);
2562 /* our last segment can be partially filled */
2563 sz += slen - sz % slen;
2564 sz -= (j - k) * slen;
2566 /* report an error */
2575 /* notify BE about more data to send */
2576 txs_enqueue(s->s.ctx, s);
2578 /* if possible, re-arm stream write event. */
2579 if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2580 tle_event_raise(s->tx.ev);
2583 tcp_stream_release(s);
2587 /* send data and FIN (if needed) */
2589 tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
2591 /* try to send some data */
2592 tx_nxt_data(s, tms);
2594 /* we also have to send a FIN */
2595 if (state != TCP_ST_ESTABLISHED &&
2596 state != TCP_ST_CLOSE_WAIT &&
2597 tcp_txq_nxt_cnt(s) == 0 &&
2598 s->tcb.snd.fss != s->tcb.snd.nxt) {
2599 s->tcb.snd.fss = ++s->tcb.snd.nxt;
2600 send_ack(s, tms, TCP_FLAG_FIN | TCP_FLAG_ACK);
2605 tx_stream(struct tle_tcp_stream *s, uint32_t tms)
2609 state = s->tcb.state;
2611 if (state == TCP_ST_SYN_SENT) {
2612 /* send the SYN, start the rto timer */
2613 send_ack(s, tms, TCP_FLAG_SYN);
2616 } else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2618 tx_data_fin(s, tms, state);
2620 /* start RTO timer. */
2621 if (s->tcb.snd.nxt != s->tcb.snd.una)
2627 rto_stream(struct tle_tcp_stream *s, uint32_t tms)
2631 state = s->tcb.state;
2633 TCP_LOG(DEBUG, "%s(%p, tms=%u): state=%u, "
2634 "retx=%u, retm=%u, "
2635 "rto=%u, snd.ts=%u, tmo=%u, "
2636 "snd.nxt=%lu, snd.una=%lu, flight_size=%lu, "
2637 "snd.rcvr=%lu, snd.fastack=%u, "
2638 "wnd=%u, cwnd=%u, ssthresh=%u, "
2639 "bytes sent=%lu, pkt remain=%u;\n",
2640 __func__, s, tms, s->tcb.state,
2641 s->tcb.snd.nb_retx, s->tcb.snd.nb_retm,
2642 s->tcb.snd.rto, s->tcb.snd.ts, tms - s->tcb.snd.ts,
2643 s->tcb.snd.nxt, s->tcb.snd.una, s->tcb.snd.nxt - s->tcb.snd.una,
2644 s->tcb.snd.rcvr, s->tcb.snd.fastack,
2645 s->tcb.snd.wnd, s->tcb.snd.cwnd, s->tcb.snd.ssthresh,
2646 s->tcb.snd.nxt - s->tcb.snd.iss, tcp_txq_nxt_cnt(s));
2648 if (s->tcb.snd.nb_retx < s->tcb.snd.nb_retm) {
2650 if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2652 /* update SND.CWD and SND.SSTHRESH */
2653 rto_cwnd_update(&s->tcb);
2655 /* RFC 6582 3.2.4 */
2656 s->tcb.snd.rcvr = s->tcb.snd.nxt;
2657 s->tcb.snd.fastack = 0;
2659 /* restart from last acked data */
2660 tcp_txq_rst_nxt_head(s);
2661 s->tcb.snd.nxt = s->tcb.snd.una;
2663 tx_data_fin(s, tms, state);
2665 } else if (state == TCP_ST_SYN_SENT) {
2667 s->tcb.so.ts.val = tms;
2669 /* According to RFC 6928 2:
2670 * To reduce the chance for spurious SYN or SYN/ACK
2671 * retransmission, it is RECOMMENDED that
2672 * implementations refrain from resetting the initial
2673 * window to 1 segment, unless there have been more
2674 * than one SYN or SYN/ACK retransmissions or true loss
2675 * detection has been made.
2677 if (s->tcb.snd.nb_retx != 0)
2678 s->tcb.snd.cwnd = s->tcb.snd.mss;
2680 send_ack(s, tms, TCP_FLAG_SYN);
2682 } else if (state == TCP_ST_TIME_WAIT) {
2686 /* RFC6298:5.5 back off the timer */
2687 s->tcb.snd.rto = rto_roundup(2 * s->tcb.snd.rto);
2688 s->tcb.snd.nb_retx++;
2692 send_rst(s, s->tcb.snd.nxt);
2698 tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
2702 struct tle_timer_wheel *tw;
2703 struct tle_stream *p;
2704 struct tle_tcp_stream *s, *rs[num];
2706 /* process streams with RTO exipred */
2708 tw = CTX_TCP_TMWHL(ctx);
2709 tms = tcp_get_tms(ctx->cycles_ms_shift);
2710 tle_timer_expire(tw, tms);
2712 k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs));
2714 for (i = 0; i != k; i++) {
2717 s->timer.handle = NULL;
2718 if (tcp_stream_try_acquire(s) > 0)
2720 tcp_stream_release(s);
2723 /* process streams from to-send queue */
2725 k = txs_dequeue_bulk(ctx, rs, RTE_DIM(rs));
2727 for (i = 0; i != k; i++) {
2730 rte_atomic32_set(&s->tx.arm, 0);
2732 if (tcp_stream_try_acquire(s) > 0)
2735 txs_enqueue(s->s.ctx, s);
2736 tcp_stream_release(s);
2739 /* collect streams to close from the death row */
2741 dr = CTX_TCP_SDR(ctx);
2742 for (k = 0, p = STAILQ_FIRST(&dr->be);
2743 k != num && p != NULL;
2744 k++, p = STAILQ_NEXT(p, link))
2745 rs[k] = TCP_STREAM(p);
2748 STAILQ_INIT(&dr->be);
2750 STAILQ_FIRST(&dr->be) = p;
2752 /* cleanup closed streams */
2753 for (i = 0; i != k; i++) {
2756 tcp_stream_reset(ctx, s);