2 * Copyright (c) 2016-2017 Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
16 #include <rte_errno.h>
17 #include <rte_ethdev.h>
19 #include <rte_ip_frag.h>
22 #include "tcp_stream.h"
23 #include "tcp_timer.h"
24 #include "stream_table.h"
25 #include "syncookie.h"
30 #include "tcp_tx_seg.h"
32 #define TCP_MAX_PKT_SEG 0x20
35 * checks if input TCP ports and IP addresses match given stream.
36 * returns zero on success.
39 rx_check_stream(const struct tle_tcp_stream *s, const union pkt_info *pi)
43 if (pi->tf.type == TLE_V4)
44 rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
45 (pi->addr4.raw & s->s.ipv4.mask.raw) !=
48 rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
49 ymm_mask_cmp(&pi->addr6->raw, &s->s.ipv6.addr.raw,
50 &s->s.ipv6.mask.raw) != 0;
55 static inline struct tle_tcp_stream *
56 rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi,
59 struct tle_tcp_stream *s;
61 s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst];
62 if (s == NULL || tcp_stream_acquire(s) < 0)
65 /* check that we have a proper stream. */
66 if (s->tcb.state != TCP_ST_LISTEN) {
67 tcp_stream_release(s);
74 static inline struct tle_tcp_stream *
75 rx_obtain_stream(const struct tle_dev *dev, struct stbl *st,
76 const union pkt_info *pi, uint32_t type)
78 struct tle_tcp_stream *s;
80 s = stbl_find_data(st, pi);
82 if (pi->tf.flags == TCP_FLAG_ACK)
83 return rx_obtain_listen_stream(dev, pi, type);
87 if (tcp_stream_acquire(s) < 0)
89 /* check that we have a proper stream. */
90 else if (s->tcb.state == TCP_ST_CLOSED) {
91 tcp_stream_release(s);
99 * Consider 2 pkt_info *equal* if their:
100 * - types (IPv4/IPv6)
103 * - TCP src and dst ports
104 * - IP src and dst addresses
108 pkt_info_bulk_eq(const union pkt_info pi[], uint32_t num)
114 if (pi[0].tf.type == TLE_V4) {
115 while (i != num && xmm_cmp(&pi[0].raw, &pi[i].raw) == 0)
118 } else if (pi[0].tf.type == TLE_V6) {
120 pi[0].raw.u64[0] == pi[i].raw.u64[0] &&
121 ymm_cmp(&pi[0].addr6->raw,
122 &pi[i].addr6->raw) == 0)
130 pkt_info_bulk_syneq(const union pkt_info pi[], uint32_t num)
136 if (pi[0].tf.type == TLE_V4) {
137 while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
138 pi[0].port.dst == pi[i].port.dst &&
139 pi[0].addr4.dst == pi[i].addr4.dst)
142 } else if (pi[0].tf.type == TLE_V6) {
143 while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
144 pi[0].port.dst == pi[i].port.dst &&
145 xmm_cmp(&pi[0].addr6->dst,
146 &pi[i].addr6->dst) == 0)
154 stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[],
157 _rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
160 static inline uint32_t
161 stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
164 return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
167 static inline uint32_t
168 get_ip_pid(struct tle_dev *dev, uint32_t num, uint32_t type, uint32_t st)
173 pa = &dev->tx.packet_id[type];
176 pid = rte_atomic32_add_return(pa, num);
179 pid = rte_atomic32_read(pa);
180 rte_atomic32_set(pa, pid + num);
186 fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
187 uint32_t seq, uint8_t hlen, uint8_t flags)
191 l4h->src_port = port.dst;
192 l4h->dst_port = port.src;
194 wnd = (flags & TCP_FLAG_SYN) ?
195 RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
196 tcb->rcv.wnd >> tcb->rcv.wscale;
198 /* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
199 l4h->sent_seq = rte_cpu_to_be_32(seq);
200 l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
201 l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET;
202 l4h->tcp_flags = flags;
203 l4h->rx_win = rte_cpu_to_be_16(wnd);
207 if (flags & TCP_FLAG_SYN)
208 fill_syn_opts(l4h + 1, &tcb->so);
209 else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0)
210 fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
214 tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s,
215 const struct tle_dest *dst, uint64_t ol_flags,
216 union l4_ports port, uint32_t seq, uint32_t flags,
217 uint32_t pid, uint32_t swcsm)
219 uint32_t l4, len, plen;
223 len = dst->l2_len + dst->l3_len;
226 if (flags & TCP_FLAG_SYN)
227 l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_MAX;
228 else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0)
229 l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS;
233 /* adjust mbuf to put L2/L3/L4 headers into it. */
234 l2h = rte_pktmbuf_prepend(m, len + l4);
238 /* copy L2/L3 header */
239 rte_memcpy(l2h, dst->hdr, len);
241 /* setup TCP header & options */
242 l4h = (struct tcp_hdr *)(l2h + len);
243 fill_tcph(l4h, &s->tcb, port, seq, l4, flags);
245 /* setup mbuf TX offload related fields. */
246 m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0);
247 m->ol_flags |= ol_flags;
249 /* update proto specific fields. */
251 if (s->s.type == TLE_V4) {
252 struct ipv4_hdr *l3h;
253 l3h = (struct ipv4_hdr *)(l2h + dst->l2_len);
254 l3h->packet_id = rte_cpu_to_be_16(pid);
255 l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4);
257 if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
258 l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
261 l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
263 if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0)
264 l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
266 struct ipv6_hdr *l3h;
267 l3h = (struct ipv6_hdr *)(l2h + dst->l2_len);
268 l3h->payload_len = rte_cpu_to_be_16(plen + l4);
269 if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
270 l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
272 l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
279 * That function supposed to be used only for data packets.
280 * Assumes that L2/L3/L4 headers and mbuf fields already setup properly.
281 * - updates tcp SEG.SEQ, SEG.ACK, TS.VAL, TS.ECR.
282 * - if no HW cksum offloads are enabled, calculates TCP checksum.
285 tcp_update_mbuf(struct rte_mbuf *m, uint32_t type, const struct tcb *tcb,
286 uint32_t seq, uint32_t pid)
291 len = m->l2_len + m->l3_len;
292 l4h = rte_pktmbuf_mtod_offset(m, struct tcp_hdr *, len);
294 l4h->sent_seq = rte_cpu_to_be_32(seq);
295 l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
297 if (tcb->so.ts.raw != 0)
298 fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
300 if (type == TLE_V4) {
301 struct ipv4_hdr *l3h;
302 l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *, m->l2_len);
303 l3h->hdr_checksum = 0;
304 l3h->packet_id = rte_cpu_to_be_16(pid);
305 if ((m->ol_flags & PKT_TX_IP_CKSUM) == 0)
306 l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
309 /* have to calculate TCP checksum in SW */
310 if ((m->ol_flags & PKT_TX_TCP_CKSUM) == 0) {
314 if (type == TLE_V4) {
315 struct ipv4_hdr *l3h;
316 l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *,
318 l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
321 struct ipv6_hdr *l3h;
322 l3h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *,
324 l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
329 /* Send data packets that need to be ACK-ed by peer */
330 static inline uint32_t
331 tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num)
333 uint32_t bsz, i, nb, nbm;
335 struct tle_drb *drb[num];
337 /* calculate how many drbs are needed.*/
338 bsz = s->tx.drb.nb_elem;
339 nbm = (num + bsz - 1) / bsz;
341 /* allocate drbs, adjust number of packets. */
342 nb = stream_drb_alloc(s, drb, nbm);
344 /* drb ring is empty. */
353 /* enqueue pkts for TX. */
355 i = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)m,
358 /* free unused drbs. */
360 stream_drb_free(s, drb + nbm - nb, nb);
365 static inline uint32_t
366 tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
369 uint32_t fail, i, k, n, mss, pid, plen, sz, tn, type;
372 struct rte_mbuf *mo[MAX_PKT_BURST + TCP_MAX_PKT_SEG];
374 mss = s->tcb.snd.mss;
378 pid = get_ip_pid(dev, num, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
383 for (i = 0; i != num && sl->len != 0 && fail == 0; i++) {
386 sz = RTE_MIN(sl->len, mss);
387 plen = PKT_L4_PLEN(mb);
389 /*fast path, no need to use indirect mbufs. */
392 /* update pkt TCP header */
393 tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid + i);
395 /* keep mbuf till ACK is received. */
396 rte_pktmbuf_refcnt_update(mb, 1);
400 /* remaining snd.wnd is less them MSS, send nothing */
403 /* packet indirection needed */
407 if (k >= MAX_PKT_BURST) {
408 n = tx_data_pkts(s, mo, k);
416 n = tx_data_pkts(s, mo, k);
422 sz = tcp_mbuf_seq_free(mo + n, fail);
431 * gets data from stream send buffer, updates it and
432 * queues it into TX device queue.
433 * Note that this function and is not MT safe.
435 static inline uint32_t
436 tx_nxt_data(struct tle_tcp_stream *s, uint32_t tms)
438 uint32_t n, num, tn, wnd;
439 struct rte_mbuf **mi;
443 wnd = s->tcb.snd.wnd - (uint32_t)(s->tcb.snd.nxt - s->tcb.snd.una);
444 sl.seq = s->tcb.snd.nxt;
445 sl.len = RTE_MIN(wnd, s->tcb.snd.cwnd);
450 /* update send timestamp */
454 /* get group of packets */
455 mi = tcp_txq_get_nxt_objs(s, &num);
457 /* stream send buffer is empty */
461 /* queue data packets for TX */
462 n = tx_data_bulk(s, &sl, mi, num);
465 /* update consumer head */
466 tcp_txq_set_nxt_head(s, n);
469 s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt;
474 free_una_data(struct tle_tcp_stream *s, uint32_t len)
476 uint32_t i, n, num, plen;
477 struct rte_mbuf **mi;
483 /* get group of packets */
484 mi = tcp_txq_get_una_objs(s, &num);
489 /* free acked data */
490 for (i = 0; i != num && n != len; i++, n = plen) {
491 plen += PKT_L4_PLEN(mi[i]);
493 /* keep SND.UNA at the start of the packet */
494 len -= RTE_MIN(len, plen - len);
497 rte_pktmbuf_free(mi[i]);
500 /* update consumer tail */
501 tcp_txq_set_una_tail(s, i);
502 } while (plen < len);
504 s->tcb.snd.una += len;
507 * that could happen in case of retransmit,
508 * adjust SND.NXT with SND.UNA.
510 if (s->tcb.snd.una > s->tcb.snd.nxt) {
511 tcp_txq_rst_nxt_head(s);
512 s->tcb.snd.nxt = s->tcb.snd.una;
516 static inline uint16_t
517 calc_smss(uint16_t mss, const struct tle_dest *dst)
521 n = dst->mtu - dst->l2_len - dst->l3_len - TCP_TX_HDR_DACK;
522 mss = RTE_MIN(n, mss);
528 * min (10*MSS, max (2*MSS, 14600))
530 * or using user provided initial congestion window (icw)
531 * min (10*MSS, max (2*MSS, icw))
533 static inline uint32_t
534 initial_cwnd(uint32_t smss, uint32_t icw)
536 return RTE_MIN(10 * smss, RTE_MAX(2 * smss, icw));
540 * queue standalone packet to he particular output device
542 * - L2/L3/L4 headers should be already set.
543 * - packet fits into one segment.
546 send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m)
551 if (stream_drb_alloc(s, &drb, 1) == 0)
554 /* enqueue pkt for TX. */
556 n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1,
559 /* free unused drbs. */
561 stream_drb_free(s, &drb, 1);
563 return (n == 1) ? 0 : -ENOBUFS;
567 send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
570 const struct tle_dest *dst;
576 pid = get_ip_pid(dst->dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
578 rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1);
580 rc = send_pkt(s, dst->dev, m);
586 send_rst(struct tle_tcp_stream *s, uint32_t seq)
591 m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
595 rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST);
603 send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags)
609 m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
613 seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0);
616 rc = send_ctrl_pkt(s, m, seq, flags);
622 s->tcb.snd.ack = s->tcb.rcv.nxt;
628 sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
629 const union seg_info *si, uint32_t ts, struct rte_mbuf *m)
633 uint32_t pid, seq, type;
637 const struct tcp_hdr *th;
641 /* get destination information. */
645 da = &pi->addr6->src;
647 rc = stream_get_dest(&s->s, da, &dst);
651 th = rte_pktmbuf_mtod_offset(m, const struct tcp_hdr *,
652 m->l2_len + m->l3_len);
653 get_syn_opts(&s->tcb.so, (uintptr_t)(th + 1), m->l4_len - sizeof(*th));
655 s->tcb.rcv.nxt = si->seq + 1;
656 seq = sync_gen_seq(pi, s->tcb.rcv.nxt, ts, s->tcb.so.mss,
657 s->s.ctx->prm.hash_alg,
658 &s->s.ctx->prm.secret_key);
659 s->tcb.so.ts.ecr = s->tcb.so.ts.val;
660 s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale);
661 s->tcb.so.wscale = (s->tcb.so.wscale == TCP_WSCALE_NONE) ?
662 TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
663 s->tcb.so.mss = calc_smss(dst.mtu, &dst);
665 /* reset mbuf's data contents. */
666 len = m->l2_len + m->l3_len + m->l4_len;
668 if (rte_pktmbuf_adj(m, len) == NULL)
672 pid = get_ip_pid(dev, 1, type, (s->flags & TLE_CTX_FLAG_ST) != 0);
674 rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq,
675 TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
677 rc = send_pkt(s, dev, m);
684 * There are four cases for the acceptability test for an incoming segment:
685 * Segment Receive Test
687 * ------- ------- -------------------------------------------
688 * 0 0 SEG.SEQ = RCV.NXT
689 * 0 >0 RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
690 * >0 0 not acceptable
691 * >0 >0 RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
692 * or RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
695 check_seqn(const struct tcb *tcb, uint32_t seqn, uint32_t len)
700 if (seqn - tcb->rcv.nxt >= tcb->rcv.wnd &&
701 n - tcb->rcv.nxt > tcb->rcv.wnd)
707 static inline union tsopt
708 rx_tms_opt(const struct tcb *tcb, const struct rte_mbuf *mb)
712 const struct tcp_hdr *th;
714 if (tcb->so.ts.val != 0) {
715 opt = rte_pktmbuf_mtod_offset(mb, uintptr_t,
716 mb->l2_len + mb->l3_len + sizeof(*th));
717 ts = get_tms_opts(opt, mb->l4_len - sizeof(*th));
725 * PAWS and sequence check.
729 rx_check_seq(struct tcb *tcb, uint32_t seq, uint32_t len, const union tsopt ts)
733 /* RFC 1323 4.2.1 R2 */
734 rc = check_seqn(tcb, seq, len);
740 /* RFC 1323 4.2.1 R1 */
741 if (tcp_seq_lt(ts.val, tcb->rcv.ts))
744 /* RFC 1323 4.2.1 R3 */
745 if (tcp_seq_leq(seq, tcb->snd.ack) &&
746 tcp_seq_lt(tcb->snd.ack, seq + len))
747 tcb->rcv.ts = ts.val;
754 rx_check_ack(const struct tcb *tcb, uint32_t ack)
758 max = (uint32_t)RTE_MAX(tcb->snd.nxt, tcb->snd.rcvr);
760 if (tcp_seq_leq(tcb->snd.una, ack) && tcp_seq_leq(ack, max))
767 rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
768 const union tsopt ts)
772 rc = rx_check_seq(tcb, seq, len, ts);
773 rc |= rx_check_ack(tcb, ack);
778 restore_syn_opt(union seg_info *si, union tsopt *to,
779 const union pkt_info *pi, uint32_t ts, const struct rte_mbuf *mb,
780 uint32_t hash_alg, rte_xmm_t *secret_key)
784 const struct tcp_hdr *th;
786 /* check that ACK, etc fields are what we expected. */
787 rc = sync_check_ack(pi, si->seq, si->ack - 1, ts,
795 th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
796 mb->l2_len + mb->l3_len);
797 len = mb->l4_len - sizeof(*th);
798 to[0] = get_tms_opts((uintptr_t)(th + 1), len);
803 stream_term(struct tle_tcp_stream *s)
807 s->tcb.state = TCP_ST_CLOSED;
812 /* close() was already invoked, schedule final cleanup */
813 if ((s->tcb.uop & TCP_OP_CLOSE) != 0) {
815 dr = CTX_TCP_SDR(s->s.ctx);
816 STAILQ_INSERT_TAIL(&dr->be, &s->s, link);
818 /* notify user that stream need to be closed */
819 } else if (s->err.ev != NULL)
820 tle_event_raise(s->err.ev);
821 else if (s->err.cb.func != NULL)
822 s->err.cb.func(s->err.cb.data, &s->s);
826 stream_fill_dest(struct tle_tcp_stream *s)
834 da = &s->s.ipv4.addr.src;
836 da = &s->s.ipv6.addr.src;
838 rc = stream_get_dest(&s->s, da, &s->tx.dst);
839 return (rc < 0) ? rc : 0;
843 * helper function, prepares a new accept stream.
846 accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
847 struct tle_tcp_stream *cs, const union tsopt *to,
848 uint32_t tms, const union pkt_info *pi, const union seg_info *si)
853 /* some TX still pending for that stream. */
854 if (TCP_STREAM_TX_PENDING(cs))
857 /* setup L4 ports and L3 addresses fields. */
858 cs->s.port.raw = pi->port.raw;
859 cs->s.pmsk.raw = UINT32_MAX;
861 if (pi->tf.type == TLE_V4) {
862 cs->s.ipv4.addr = pi->addr4;
863 cs->s.ipv4.mask.src = INADDR_NONE;
864 cs->s.ipv4.mask.dst = INADDR_NONE;
865 } else if (pi->tf.type == TLE_V6) {
866 cs->s.ipv6.addr = *pi->addr6;
867 rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
868 sizeof(cs->s.ipv6.mask.src));
869 rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
870 sizeof(cs->s.ipv6.mask.dst));
874 sync_fill_tcb(&cs->tcb, si, to);
875 cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
879 * for now rtt is calculated based on the tcp TMS option,
880 * later add real-time one
882 if (cs->tcb.so.ts.ecr) {
883 rtt = tms - cs->tcb.so.ts.ecr;
884 rto_estimate(&cs->tcb, rtt);
886 cs->tcb.snd.rto = TCP_RTO_DEFAULT;
888 /* copy streams type & flags. */
889 cs->s.type = ps->s.type;
890 cs->flags = ps->flags;
892 /* retrive and cache destination information. */
893 rc = stream_fill_dest(cs);
897 /* update snd.mss with SMSS value */
898 cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
900 /* setup congestion variables */
901 cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss, ps->tcb.snd.cwnd);
902 cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
903 cs->tcb.snd.rto_tw = ps->tcb.snd.rto_tw;
905 cs->tcb.state = TCP_ST_ESTABLISHED;
907 /* add stream to the table */
908 cs->ste = stbl_add_stream(st, pi, cs);
912 cs->tcb.uop |= TCP_OP_ACCEPT;
919 * ACK for new connection request arrived.
920 * Check that the packet meets all conditions and try to open a new stream.
922 * < 0 - invalid packet
923 * == 0 - packet is valid and new stream was opened for it.
924 * > 0 - packet is valid, but failed to open new stream.
927 rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
928 const union pkt_info *pi, union seg_info *si,
929 uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
933 struct tle_stream *ts;
934 struct tle_tcp_stream *cs;
939 if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
943 rc = restore_syn_opt(si, &to, pi, tms, mb, ctx->prm.hash_alg,
944 &ctx->prm.secret_key);
948 /* allocate new stream */
949 ts = get_stream(ctx);
954 /* prepare stream to handle new connection */
955 if (accept_prep_stream(s, st, cs, &to, tms, pi, si) == 0) {
957 /* put new stream in the accept queue */
958 if (_rte_ring_enqueue_burst(s->rx.q,
959 (void * const *)&ts, 1) == 1) {
964 /* cleanup on failure */
966 stbl_del_stream(st, cs->ste, cs, 0);
970 tcp_stream_reset(ctx, cs);
975 data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf *mb, uint32_t hlen,
976 uint32_t *seqn, uint32_t *plen)
978 uint32_t len, n, seq;
983 rte_pktmbuf_adj(mb, hlen);
986 /* cut off the start of the packet */
987 else if (tcp_seq_lt(seq, tcb->rcv.nxt)) {
988 n = tcb->rcv.nxt - seq;
992 rte_pktmbuf_adj(mb, n);
1000 static inline uint32_t
1001 rx_ackdata(struct tle_tcp_stream *s, uint32_t ack)
1005 n = ack - (uint32_t)s->tcb.snd.una;
1007 /* some more data was acked. */
1010 /* advance SND.UNA and free related packets. */
1011 k = rte_ring_free_count(s->tx.q);
1012 free_una_data(s, n);
1014 /* mark the stream as available for writing */
1015 if (rte_ring_free_count(s->tx.q) != 0) {
1016 if (s->tx.ev != NULL)
1017 tle_event_raise(s->tx.ev);
1018 else if (k == 0 && s->tx.cb.func != NULL)
1019 s->tx.cb.func(s->tx.cb.data, &s->s);
1027 stream_timewait(struct tle_tcp_stream *s, uint32_t rto)
1030 s->tcb.state = TCP_ST_TIME_WAIT;
1031 s->tcb.snd.rto = rto;
1038 rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
1043 s->tcb.rcv.nxt += 1;
1045 ackfin = (s->tcb.snd.una == s->tcb.snd.fss);
1046 state = s->tcb.state;
1048 if (state == TCP_ST_ESTABLISHED) {
1049 s->tcb.state = TCP_ST_CLOSE_WAIT;
1050 /* raise err.ev & err.cb */
1051 if (s->err.ev != NULL)
1052 tle_event_raise(s->err.ev);
1053 else if (s->err.cb.func != NULL)
1054 s->err.cb.func(s->err.cb.data, &s->s);
1055 } else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) {
1056 rsp->flags |= TCP_FLAG_ACK;
1058 stream_timewait(s, s->tcb.snd.rto_tw);
1060 s->tcb.state = TCP_ST_CLOSING;
1061 } else if (state == TCP_ST_FIN_WAIT_2) {
1062 rsp->flags |= TCP_FLAG_ACK;
1063 stream_timewait(s, s->tcb.snd.rto_tw);
1064 } else if (state == TCP_ST_LAST_ACK && ackfin != 0) {
1070 * FIN process for ESTABLISHED state
1072 * 0 < - error occurred
1073 * 0 - FIN was processed OK, and mbuf can be free/reused.
1074 * 0 > - FIN was processed OK and mbuf can't be free/reused.
1077 rx_fin(struct tle_tcp_stream *s, uint32_t state,
1078 const union seg_info *si, struct rte_mbuf *mb,
1079 struct resp_info *rsp)
1081 uint32_t hlen, plen, seq;
1085 hlen = PKT_L234_HLEN(mb);
1086 plen = mb->pkt_len - hlen;
1089 ts = rx_tms_opt(&s->tcb, mb);
1090 ret = rx_check_seqack(&s->tcb, seq, si->ack, plen, ts);
1094 if (state < TCP_ST_ESTABLISHED)
1099 ret = data_pkt_adjust(&s->tcb, mb, hlen, &seq, &plen);
1102 if (rx_data_enqueue(s, seq, plen, &mb, 1) != 1)
1107 * fast-path: all data & FIN was already sent out
1108 * and now is acknowledged.
1110 if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1111 si->ack == (uint32_t)s->tcb.snd.nxt) {
1112 s->tcb.snd.una = s->tcb.snd.fss;
1114 /* conventional ACK processiing */
1116 rx_ackdata(s, si->ack);
1118 /* some fragments still missing */
1119 if (seq + plen != s->tcb.rcv.nxt) {
1120 s->tcb.rcv.frs.seq = seq + plen;
1121 s->tcb.rcv.frs.on = 1;
1123 rx_fin_state(s, rsp);
1129 rx_rst(struct tle_tcp_stream *s, uint32_t state, uint32_t flags,
1130 const union seg_info *si)
1135 * RFC 793: In all states except SYN-SENT, all reset (RST) segments
1136 * are validated by checking their SEQ-fields.
1137 * A reset is valid if its sequence number is in the window.
1138 * In the SYN-SENT state (a RST received in response to an initial SYN),
1139 * the RST is acceptable if the ACK field acknowledges the SYN.
1141 if (state == TCP_ST_SYN_SENT) {
1142 rc = ((flags & TCP_FLAG_ACK) == 0 ||
1143 si->ack != s->tcb.snd.nxt) ?
1148 rc = check_seqn(&s->tcb, si->seq, 0);
1157 * check do we have FIN that was received out-of-order.
1158 * if yes, try to process it now.
1161 rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp)
1163 if (s->tcb.rcv.frs.on != 0 && s->tcb.rcv.nxt == s->tcb.rcv.frs.seq)
1164 rx_fin_state(s, rsp);
1168 dack_info_init(struct dack_info *tack, const struct tcb *tcb)
1170 static const struct dack_info zero_dack;
1172 tack[0] = zero_dack;
1173 tack->ack = tcb->snd.una;
1174 tack->segs.dup = tcb->rcv.dupack;
1175 tack->wu.raw = tcb->snd.wu.raw;
1176 tack->wnd = tcb->snd.wnd >> tcb->snd.wscale;
1180 ack_window_update(struct tcb *tcb, const struct dack_info *tack)
1182 tcb->snd.wu.raw = tack->wu.raw;
1183 tcb->snd.wnd = tack->wnd << tcb->snd.wscale;
1187 ack_cwnd_update(struct tcb *tcb, uint32_t acked, const struct dack_info *tack)
1191 n = tack->segs.ack * tcb->snd.mss;
1193 /* slow start phase, RFC 5681 3.1 (2) */
1194 if (tcb->snd.cwnd < tcb->snd.ssthresh)
1195 tcb->snd.cwnd += RTE_MIN(acked, n);
1196 /* congestion avoidance phase, RFC 5681 3.1 (3) */
1198 tcb->snd.cwnd += RTE_MAX(1U, n * tcb->snd.mss / tcb->snd.cwnd);
1202 rto_ssthresh_update(struct tcb *tcb)
1206 /* RFC 5681 3.1 (4) */
1207 n = (tcb->snd.nxt - tcb->snd.una) / 2;
1208 k = 2 * tcb->snd.mss;
1209 tcb->snd.ssthresh = RTE_MAX(n, k);
1213 rto_cwnd_update(struct tcb *tcb)
1216 if (tcb->snd.nb_retx == 0)
1217 rto_ssthresh_update(tcb);
1220 * RFC 5681 3.1: upon a timeout cwnd MUST be set to
1221 * no more than 1 full-sized segment.
1223 tcb->snd.cwnd = tcb->snd.mss;
1227 ack_info_update(struct dack_info *tack, const union seg_info *si,
1228 int32_t badseq, uint32_t dlen, const union tsopt ts)
1231 tack->segs.badseq++;
1235 /* segnt with incoming data */
1236 tack->segs.data += (dlen != 0);
1238 /* segment with newly acked data */
1239 if (tcp_seq_lt(tack->ack, si->ack)) {
1242 tack->ack = si->ack;
1246 * RFC 5681: An acknowledgment is considered a "duplicate" when:
1247 * (a) the receiver of the ACK has outstanding data
1248 * (b) the incoming acknowledgment carries no data
1249 * (c) the SYN and FIN bits are both off
1250 * (d) the acknowledgment number is equal to the TCP.UNA
1251 * (e) the advertised window in the incoming acknowledgment equals the
1252 * advertised window in the last incoming acknowledgment.
1254 * Here will have only to check only for (b),(d),(e).
1255 * (a) will be checked later for the whole bulk of packets,
1256 * (c) should never happen here.
1258 } else if (dlen == 0 && si->wnd == tack->wnd && ++tack->segs.dup == 3) {
1259 tack->dup3.seg = tack->segs.ack + 1;
1260 tack->dup3.ack = tack->ack;
1265 * If SND.UNA < SEG.ACK =< SND.NXT, the send window should be
1266 * updated. If (SND.WL1 < SEG.SEQ or (SND.WL1 = SEG.SEQ and
1267 * SND.WL2 =< SEG.ACK)), set SND.WND <- SEG.WND, set
1268 * SND.WL1 <- SEG.SEQ, and set SND.WL2 <- SEG.ACK.
1270 if (tcp_seq_lt(tack->wu.wl1, si->seq) ||
1271 (si->seq == tack->wu.wl1 &&
1272 tcp_seq_leq(tack->wu.wl2, si->ack))) {
1274 tack->wu.wl1 = si->seq;
1275 tack->wu.wl2 = si->ack;
1276 tack->wnd = si->wnd;
1280 static inline uint32_t
1281 rx_data_ack(struct tle_tcp_stream *s, struct dack_info *tack,
1282 const union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[],
1283 int32_t rc[], uint32_t num)
1285 uint32_t i, j, k, n, t;
1286 uint32_t hlen, plen, seq, tlen;
1291 for (i = 0; i != num; i = j) {
1293 hlen = PKT_L234_HLEN(mb[i]);
1294 plen = mb[i]->pkt_len - hlen;
1297 ts = rx_tms_opt(&s->tcb, mb[i]);
1298 ret = rx_check_seqack(&s->tcb, seq, si[i].ack, plen, ts);
1300 /* account segment received */
1301 ack_info_update(tack, &si[i], ret != 0, plen, ts);
1304 /* skip duplicate data, if any */
1305 ret = data_pkt_adjust(&s->tcb, mb[i], hlen,
1317 /* group sequential packets together. */
1318 for (tlen = plen; j != num; tlen += plen, j++) {
1320 hlen = PKT_L234_HLEN(mb[j]);
1321 plen = mb[j]->pkt_len - hlen;
1323 /* not consecutive packet */
1324 if (plen == 0 || seq + tlen != si[j].seq)
1328 ts = rx_tms_opt(&s->tcb, mb[j]);
1329 ret = rx_check_seqack(&s->tcb, si[j].seq, si[j].ack,
1332 /* account for segment received */
1333 ack_info_update(tack, &si[j], ret != 0, plen, ts);
1341 rte_pktmbuf_adj(mb[j], hlen);
1347 /* account for OFO data */
1348 if (seq != s->tcb.rcv.nxt)
1349 tack->segs.ofo += n;
1351 /* enqueue packets */
1352 t = rx_data_enqueue(s, seq, tlen, mb + i, n);
1354 /* if we are out of space in stream recv buffer. */
1355 for (; t != n; t++) {
1366 start_fast_retransmit(struct tle_tcp_stream *s)
1372 /* RFC 6582 3.2.2 */
1373 tcb->snd.rcvr = tcb->snd.nxt;
1374 tcb->snd.fastack = 1;
1376 /* RFC 5681 3.2.2 */
1377 rto_ssthresh_update(tcb);
1379 /* RFC 5681 3.2.3 */
1380 tcp_txq_rst_nxt_head(s);
1381 tcb->snd.nxt = tcb->snd.una;
1382 tcb->snd.cwnd = tcb->snd.ssthresh + 3 * tcb->snd.mss;
1386 stop_fast_retransmit(struct tle_tcp_stream *s)
1392 n = tcb->snd.nxt - tcb->snd.una;
1393 tcb->snd.cwnd = RTE_MIN(tcb->snd.ssthresh,
1394 RTE_MAX(n, tcb->snd.mss) + tcb->snd.mss);
1395 tcb->snd.fastack = 0;
1399 in_fast_retransmit(struct tle_tcp_stream *s, uint32_t ack_len, uint32_t ack_num,
1407 /* RFC 5682 3.2.3 partial ACK */
1410 n = ack_num * tcb->snd.mss;
1412 tcb->snd.cwnd -= ack_len - n;
1414 tcb->snd.cwnd -= ack_len % tcb->snd.mss;
1417 * For the first partial ACK that arrives
1418 * during fast recovery, also reset the
1421 if (tcb->snd.fastack == 1)
1424 tcb->snd.fastack += ack_num;
1427 /* RFC 5681 3.2.4 */
1428 } else if (dup_num > 3) {
1429 s->tcb.snd.cwnd += (dup_num - 3) * tcb->snd.mss;
1437 process_ack(struct tle_tcp_stream *s, uint32_t acked,
1438 const struct dack_info *tack)
1445 if (s->tcb.snd.fastack == 0) {
1449 /* RFC 6582 3.2.2 switch to fast retransmit mode */
1450 if (tack->dup3.seg != 0 && s->tcb.snd.una != s->tcb.snd.nxt &&
1451 s->tcb.snd.una >= s->tcb.snd.rcvr) {
1453 start_fast_retransmit(s);
1454 in_fast_retransmit(s,
1455 tack->ack - tack->dup3.ack,
1456 tack->segs.ack - tack->dup3.seg - 1,
1459 /* remain in normal mode */
1460 } else if (acked != 0) {
1461 ack_cwnd_update(&s->tcb, acked, tack);
1465 /* fast retransmit mode */
1468 /* remain in fast retransmit mode */
1469 if (s->tcb.snd.una < s->tcb.snd.rcvr) {
1471 send = in_fast_retransmit(s, acked, tack->segs.ack,
1474 /* RFC 5682 3.2.3 full ACK */
1475 stop_fast_retransmit(s);
1478 /* if we have another series of dup ACKs */
1479 if (tack->dup3.seg != 0 &&
1480 s->tcb.snd.una != s->tcb.snd.nxt &&
1481 tcp_seq_leq((uint32_t)s->tcb.snd.rcvr,
1484 /* restart fast retransmit again. */
1485 start_fast_retransmit(s);
1486 send = in_fast_retransmit(s,
1487 tack->ack - tack->dup3.ack,
1488 tack->segs.ack - tack->dup3.seg - 1,
1498 * our FIN was acked, stop rto timer, change stream state,
1499 * and possibly close the stream.
1502 rx_ackfin(struct tle_tcp_stream *s)
1506 s->tcb.snd.una = s->tcb.snd.fss;
1509 state = s->tcb.state;
1510 if (state == TCP_ST_LAST_ACK)
1512 else if (state == TCP_ST_FIN_WAIT_1) {
1514 s->tcb.state = TCP_ST_FIN_WAIT_2;
1515 } else if (state == TCP_ST_CLOSING) {
1516 stream_timewait(s, s->tcb.snd.rto_tw);
1521 rx_process_ack(struct tle_tcp_stream *s, uint32_t ts,
1522 const struct dack_info *tack)
1527 s->tcb.rcv.dupack = tack->segs.dup;
1529 n = rx_ackdata(s, tack->ack);
1530 send = process_ack(s, n, tack);
1532 /* try to send more data. */
1533 if ((n != 0 || send != 0) && tcp_txq_nxt_cnt(s) != 0)
1534 txs_enqueue(s->s.ctx, s);
1536 /* restart RTO timer. */
1537 if (s->tcb.snd.nxt != s->tcb.snd.una)
1540 /* update rto, if fresh packet is here then calculate rtt */
1541 if (tack->ts.ecr != 0)
1542 rto_estimate(&s->tcb, ts - tack->ts.ecr);
1547 * returns negative value on failure, or zero on success.
1550 rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
1551 const union seg_info *si, struct rte_mbuf *mb,
1552 struct resp_info *rsp)
1557 if (state != TCP_ST_SYN_SENT)
1560 /* invalid SEG.SEQ */
1561 if (si->ack != (uint32_t)s->tcb.snd.nxt) {
1562 rsp->flags = TCP_FLAG_RST;
1566 th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *,
1567 mb->l2_len + mb->l3_len);
1568 get_syn_opts(&so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th));
1572 s->tcb.snd.una = s->tcb.snd.nxt;
1573 s->tcb.snd.mss = calc_smss(so.mss, &s->tx.dst);
1574 s->tcb.snd.wnd = si->wnd << so.wscale;
1575 s->tcb.snd.wu.wl1 = si->seq;
1576 s->tcb.snd.wu.wl2 = si->ack;
1577 s->tcb.snd.wscale = so.wscale;
1579 /* setup congestion variables */
1580 s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss, s->tcb.snd.cwnd);
1581 s->tcb.snd.ssthresh = s->tcb.snd.wnd;
1583 s->tcb.rcv.ts = so.ts.val;
1584 s->tcb.rcv.irs = si->seq;
1585 s->tcb.rcv.nxt = si->seq + 1;
1587 /* if peer doesn't support WSCALE opt, recalculate RCV.WND */
1588 s->tcb.rcv.wscale = (so.wscale == TCP_WSCALE_NONE) ?
1589 TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
1590 s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
1592 /* calculate initial rto */
1593 rto_estimate(&s->tcb, ts - s->tcb.snd.ts);
1595 rsp->flags |= TCP_FLAG_ACK;
1598 s->tcb.state = TCP_ST_ESTABLISHED;
1601 if (s->tx.ev != NULL)
1602 tle_event_raise(s->tx.ev);
1603 else if (s->tx.cb.func != NULL)
1604 s->tx.cb.func(s->tx.cb.data, &s->s);
1609 static inline uint32_t
1610 rx_stream(struct tle_tcp_stream *s, uint32_t ts,
1611 const union pkt_info *pi, const union seg_info si[],
1612 struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1615 uint32_t i, k, n, state;
1617 struct resp_info rsp;
1618 struct dack_info tack;
1623 state = s->tcb.state;
1626 * first check for the states/flags where we don't
1627 * expect groups of packets.
1631 if ((pi->tf.flags & TCP_FLAG_RST) != 0) {
1634 rx_rst(s, state, pi->tf.flags, &si[i]);
1639 /* RFC 793: if the ACK bit is off drop the segment and return */
1640 } else if ((pi->tf.flags & TCP_FLAG_ACK) == 0) {
1643 * first check for the states/flags where we don't
1644 * expect groups of packets.
1647 /* process <SYN,ACK> */
1648 } else if ((pi->tf.flags & TCP_FLAG_SYN) != 0) {
1649 for (i = 0; i != num; i++) {
1650 ret = rx_synack(s, ts, state, &si[i], mb[i], &rsp);
1660 } else if ((pi->tf.flags & TCP_FLAG_FIN) != 0) {
1662 for (i = 0; i != num; i++) {
1663 ret = rx_fin(s, state, &si[i], mb[i], &rsp);
1673 /* normal data/ack packets */
1674 } else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
1676 /* process incoming data packets. */
1677 dack_info_init(&tack, &s->tcb);
1678 n = rx_data_ack(s, &tack, si, mb, rp, rc, num);
1680 /* follow up actions based on aggregated information */
1682 /* update SND.WND */
1683 ack_window_update(&s->tcb, &tack);
1686 * fast-path: all data & FIN was already sent out
1687 * and now is acknowledged.
1689 if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1690 tack.ack == (uint32_t)s->tcb.snd.nxt)
1693 rx_process_ack(s, ts, &tack);
1696 * send an immediate ACK if either:
1697 * - received segment with invalid seq/ack number
1698 * - received segment with OFO data
1699 * - received segment with INO data and no TX is scheduled
1702 if (tack.segs.badseq != 0 || tack.segs.ofo != 0 ||
1703 (tack.segs.data != 0 &&
1704 rte_atomic32_read(&s->tx.arm) == 0))
1705 rsp.flags |= TCP_FLAG_ACK;
1707 rx_ofo_fin(s, &rsp);
1712 /* unhandled state, drop all packets. */
1716 /* we have a response packet to send. */
1717 if (rsp.flags == TCP_FLAG_RST) {
1718 send_rst(s, si[i].ack);
1720 } else if (rsp.flags != 0) {
1721 send_ack(s, ts, rsp.flags);
1723 /* start the timer for FIN packet */
1724 if ((rsp.flags & TCP_FLAG_FIN) != 0)
1728 /* unprocessed packets */
1729 for (; i != num; i++, k++) {
1737 static inline uint32_t
1738 rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
1739 const union pkt_info *pi, const union seg_info si[],
1740 struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1745 if (tcp_stream_acquire(s) > 0) {
1746 i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1747 tcp_stream_release(s);
1751 for (i = 0; i != num; i++) {
1758 static inline uint32_t
1759 rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
1760 const union pkt_info pi[], union seg_info si[],
1761 struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1764 struct tle_tcp_stream *cs, *s;
1765 uint32_t i, k, n, state;
1768 s = rx_obtain_stream(dev, st, &pi[0], type);
1770 for (i = 0; i != num; i++) {
1778 state = s->tcb.state;
1780 if (state == TCP_ST_LISTEN) {
1782 /* one connection per flow */
1785 for (i = 0; i != num; i++) {
1787 ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs);
1789 /* valid packet encountered */
1793 /* invalid packet, keep trying to find a proper one */
1799 /* packet is valid, but we are out of streams to serve it */
1801 for (; i != num; i++, k++) {
1805 /* new stream is accepted */
1806 } else if (ret == 0) {
1808 /* inform listen stream about new connections */
1809 if (s->rx.ev != NULL)
1810 tle_event_raise(s->rx.ev);
1811 else if (s->rx.cb.func != NULL &&
1812 rte_ring_count(s->rx.q) == 1)
1813 s->rx.cb.func(s->rx.cb.data, &s->s);
1815 /* if there is no data, drop current packet */
1816 if (PKT_L4_PLEN(mb[i]) == 0) {
1821 /* process remaining packets for that stream */
1823 n = rx_new_stream(cs, ts, pi + i, si + i,
1824 mb + i, rp + k, rc + k, num - i);
1830 i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1834 tcp_stream_release(s);
1839 static inline uint32_t
1840 rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
1841 const union pkt_info pi[], const union seg_info si[],
1842 struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1845 struct tle_tcp_stream *s;
1849 s = rx_obtain_listen_stream(dev, &pi[0], type);
1851 for (i = 0; i != num; i++) {
1859 for (i = 0; i != num; i++) {
1861 /* check that this remote is allowed to connect */
1862 if (rx_check_stream(s, &pi[i]) != 0)
1865 /* syncokie: reply with <SYN,ACK> */
1866 ret = sync_ack(s, &pi[i], &si[i], ts, mb[i]);
1875 tcp_stream_release(s);
1880 tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
1881 struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
1884 struct tle_ctx *ctx;
1885 uint32_t i, j, k, mt, n, t, ts;
1887 union pkt_info pi[num];
1888 union seg_info si[num];
1890 uint8_t t[TLE_VNUM];
1895 ts = tcp_get_tms(ctx->cycles_ms_shift);
1896 st = CTX_TCP_STLB(ctx);
1897 mt = ((ctx->prm.flags & TLE_CTX_FLAG_ST) == 0);
1901 /* extract packet info and check the L3/L4 csums */
1902 for (i = 0; i != num; i++) {
1904 get_pkt_info(pkt[i], &pi[i], &si[i]);
1907 csf = dev->rx.ol_flags[t] &
1908 (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
1910 /* check csums in SW */
1911 if (pi[i].csf == 0 && csf != 0 && check_pkt_csum(pkt[i], csf,
1912 pi[i].tf.type, IPPROTO_TCP) != 0)
1918 if (stu.t[TLE_V4] != 0)
1919 stbl_lock(st, TLE_V4);
1920 if (stu.t[TLE_V6] != 0)
1921 stbl_lock(st, TLE_V6);
1924 for (i = 0; i != num; i += j) {
1928 /*basic checks for incoming packet */
1929 if (t >= TLE_VNUM || pi[i].csf != 0 || dev->dp[t] == NULL) {
1934 /* process input SYN packets */
1935 } else if (pi[i].tf.flags == TCP_FLAG_SYN) {
1936 j = pkt_info_bulk_syneq(pi + i, num - i);
1937 n = rx_syn(dev, t, ts, pi + i, si + i, pkt + i,
1941 j = pkt_info_bulk_eq(pi + i, num - i);
1942 n = rx_postsyn(dev, st, t, ts, pi + i, si + i, pkt + i,
1948 if (stu.t[TLE_V4] != 0)
1949 stbl_unlock(st, TLE_V4);
1950 if (stu.t[TLE_V6] != 0)
1951 stbl_unlock(st, TLE_V6);
1957 tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
1961 struct tle_tcp_stream *s;
1964 n = _rte_ring_dequeue_burst(s->rx.q, (void **)rs, num);
1969 * if we still have packets to read,
1970 * then rearm stream RX event.
1972 if (n == num && rte_ring_count(s->rx.q) != 0) {
1973 if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
1974 tle_event_raise(s->rx.ev);
1975 tcp_stream_release(s);
1982 tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
1984 uint32_t i, j, k, n;
1985 struct tle_drb *drb[num];
1986 struct tle_tcp_stream *s;
1988 /* extract packets from device TX queue. */
1991 n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
1997 /* free empty drbs and notify related streams. */
1999 for (i = 0; i != k; i = j) {
2001 for (j = i + 1; j != k && s == drb[j]->udata; j++)
2003 stream_drb_free(s, drb + i, j - i);
2010 stream_fill_pkt_info(const struct tle_tcp_stream *s, union pkt_info *pi)
2012 if (s->s.type == TLE_V4)
2013 pi->addr4 = s->s.ipv4.addr;
2015 pi->addr6 = &s->s.ipv6.addr;
2017 pi->port = s->s.port;
2018 pi->tf.type = s->s.type;
2022 stream_fill_addr(struct tle_tcp_stream *s, const struct sockaddr *addr)
2024 const struct sockaddr_in *in4;
2025 const struct sockaddr_in6 *in6;
2026 const struct tle_dev_param *prm;
2030 s->s.pmsk.raw = UINT32_MAX;
2032 /* setup L4 src ports and src address fields. */
2033 if (s->s.type == TLE_V4) {
2034 in4 = (const struct sockaddr_in *)addr;
2035 if (in4->sin_addr.s_addr == INADDR_ANY || in4->sin_port == 0)
2038 s->s.port.src = in4->sin_port;
2039 s->s.ipv4.addr.src = in4->sin_addr.s_addr;
2040 s->s.ipv4.mask.src = INADDR_NONE;
2041 s->s.ipv4.mask.dst = INADDR_NONE;
2043 } else if (s->s.type == TLE_V6) {
2044 in6 = (const struct sockaddr_in6 *)addr;
2045 if (memcmp(&in6->sin6_addr, &tle_ipv6_any,
2046 sizeof(tle_ipv6_any)) == 0 ||
2047 in6->sin6_port == 0)
2050 s->s.port.src = in6->sin6_port;
2051 rte_memcpy(&s->s.ipv6.addr.src, &in6->sin6_addr,
2052 sizeof(s->s.ipv6.addr.src));
2053 rte_memcpy(&s->s.ipv6.mask.src, &tle_ipv6_none,
2054 sizeof(s->s.ipv6.mask.src));
2055 rte_memcpy(&s->s.ipv6.mask.dst, &tle_ipv6_none,
2056 sizeof(s->s.ipv6.mask.dst));
2059 /* setup the destination device. */
2060 rc = stream_fill_dest(s);
2064 /* setup L4 dst address from device param */
2065 prm = &s->tx.dst.dev->prm;
2066 if (s->s.type == TLE_V4) {
2067 if (s->s.ipv4.addr.dst == INADDR_ANY)
2068 s->s.ipv4.addr.dst = prm->local_addr4.s_addr;
2069 } else if (memcmp(&s->s.ipv6.addr.dst, &tle_ipv6_any,
2070 sizeof(tle_ipv6_any)) == 0)
2071 memcpy(&s->s.ipv6.addr.dst, &prm->local_addr6,
2072 sizeof(s->s.ipv6.addr.dst));
2078 tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
2084 struct stbl_entry *se;
2086 /* fill stream address */
2087 rc = stream_fill_addr(s, addr);
2091 /* fill pkt info to generate seq.*/
2092 stream_fill_pkt_info(s, &pi);
2094 tms = tcp_get_tms(s->s.ctx->cycles_ms_shift);
2095 s->tcb.so.ts.val = tms;
2096 s->tcb.so.ts.ecr = 0;
2097 s->tcb.so.wscale = TCP_WSCALE_DEFAULT;
2098 s->tcb.so.mss = calc_smss(s->tx.dst.mtu, &s->tx.dst);
2100 /* note that rcv.nxt is 0 here for sync_gen_seq.*/
2101 seq = sync_gen_seq(&pi, s->tcb.rcv.nxt, tms, s->tcb.so.mss,
2102 s->s.ctx->prm.hash_alg,
2103 &s->s.ctx->prm.secret_key);
2104 s->tcb.snd.iss = seq;
2105 s->tcb.snd.rcvr = seq;
2106 s->tcb.snd.una = seq;
2107 s->tcb.snd.nxt = seq + 1;
2108 s->tcb.snd.rto = TCP_RTO_DEFAULT;
2109 s->tcb.snd.ts = tms;
2111 s->tcb.rcv.mss = s->tcb.so.mss;
2112 s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT;
2113 s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
2116 /* add the stream in stream table */
2117 st = CTX_TCP_STLB(s->s.ctx);
2118 se = stbl_add_stream_lock(st, s);
2123 /* put stream into the to-send queue */
2124 txs_enqueue(s->s.ctx, s);
2130 tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
2132 struct tle_tcp_stream *s;
2136 if (ts == NULL || addr == NULL)
2141 if (type >= TLE_VNUM)
2144 if (tcp_stream_try_acquire(s) > 0) {
2145 rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
2147 rc = (rc == 0) ? -EDEADLK : 0;
2152 tcp_stream_release(s);
2156 /* fill stream, prepare and transmit syn pkt */
2157 s->tcb.uop |= TCP_OP_CONNECT;
2158 rc = tx_syn(s, addr);
2159 tcp_stream_release(s);
2161 /* error happened, do a cleanup */
2163 tle_tcp_stream_close(ts);
2169 tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2172 struct tle_tcp_stream *s;
2175 n = _rte_ring_mcs_dequeue_burst(s->rx.q, (void **)pkt, num);
2180 * if we still have packets to read,
2181 * then rearm stream RX event.
2183 if (n == num && rte_ring_count(s->rx.q) != 0) {
2184 if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
2185 tle_event_raise(s->rx.ev);
2186 tcp_stream_release(s);
2193 tle_tcp_stream_readv(struct tle_stream *ts, const struct iovec *iov,
2199 struct tle_tcp_stream *s;
2201 struct rxq_objs mo[2];
2205 /* get group of packets */
2206 mn = tcp_rxq_get_objs(s, mo);
2212 for (i = 0; i != iovcnt; i++) {
2215 n += _mbus_to_iovec(&iv, mo[0].mb + n, mo[0].num - n);
2216 if (iv.iov_len != 0) {
2224 if (i != iovcnt && mn != 1) {
2228 n += _mbus_to_iovec(&iv, mo[1].mb + n, mo[1].num - n);
2229 if (iv.iov_len != 0) {
2233 if (i + 1 != iovcnt)
2235 } while (++i != iovcnt);
2239 tcp_rxq_consume(s, tn);
2242 * if we still have packets to read,
2243 * then rearm stream RX event.
2245 if (i == iovcnt && rte_ring_count(s->rx.q) != 0) {
2246 if (tcp_stream_try_acquire(s) > 0 && s->rx.ev != NULL)
2247 tle_event_raise(s->rx.ev);
2248 tcp_stream_release(s);
2254 static inline int32_t
2255 tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
2256 struct rte_mbuf *segs[], uint32_t num)
2261 for (i = 0; i != num; i++) {
2262 /* Build L2/L3/L4 header */
2263 rc = tcp_fill_mbuf(segs[i], s, &s->tx.dst, ol_flags, s->s.port,
2264 0, TCP_FLAG_ACK, 0, 0);
2266 free_mbufs(segs, num);
2272 /* queue packets for further transmission. */
2273 rc = _rte_ring_enqueue_bulk(s->tx.q, (void **)segs, num);
2275 free_mbufs(segs, num);
2282 tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2284 uint32_t i, j, k, mss, n, state;
2287 struct tle_tcp_stream *s;
2288 struct rte_mbuf *segs[TCP_MAX_PKT_SEG];
2292 /* mark stream as not closable. */
2293 if (tcp_stream_acquire(s) < 0) {
2298 state = s->tcb.state;
2299 if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
2300 rte_errno = ENOTCONN;
2301 tcp_stream_release(s);
2305 mss = s->tcb.snd.mss;
2306 ol_flags = s->tx.dst.ol_flags;
2311 /* prepare and check for TX */
2312 for (i = k; i != num; i++) {
2313 if (pkt[i]->pkt_len > mss ||
2314 pkt[i]->nb_segs > TCP_MAX_PKT_SEG)
2316 rc = tcp_fill_mbuf(pkt[i], s, &s->tx.dst, ol_flags,
2317 s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2323 /* queue packets for further transmission. */
2324 n = _rte_ring_enqueue_burst(s->tx.q,
2325 (void **)pkt + k, (i - k));
2329 * for unsent, but already modified packets:
2330 * remove pkt l2/l3 headers, restore ol_flags
2333 ol_flags = ~s->tx.dst.ol_flags;
2334 for (j = k; j != i; j++) {
2335 rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len +
2338 pkt[j]->ol_flags &= ol_flags;
2348 /* segment large packet and enqueue for sending */
2349 } else if (i != num) {
2350 /* segment the packet. */
2351 rc = tcp_segmentation(pkt[i], segs, RTE_DIM(segs),
2358 rc = tx_segments(s, ol_flags, segs, rc);
2360 /* free the large mbuf */
2361 rte_pktmbuf_free(pkt[i]);
2362 /* set the mbuf as consumed */
2365 /* no space left in tx queue */
2370 /* notify BE about more data to send */
2372 txs_enqueue(s->s.ctx, s);
2373 /* if possible, re-arm stream write event. */
2374 if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2375 tle_event_raise(s->tx.ev);
2377 tcp_stream_release(s);
2383 tle_tcp_stream_writev(struct tle_stream *ts, struct rte_mempool *mp,
2384 const struct iovec *iov, int iovcnt)
2387 uint32_t j, k, n, num, slen, state;
2390 struct tle_tcp_stream *s;
2392 struct rte_mbuf *mb[2 * MAX_PKT_BURST];
2396 /* mark stream as not closable. */
2397 if (tcp_stream_acquire(s) < 0) {
2402 state = s->tcb.state;
2403 if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
2404 rte_errno = ENOTCONN;
2405 tcp_stream_release(s);
2409 /* figure out how many mbufs do we need */
2411 for (i = 0; i != iovcnt; i++)
2412 tsz += iov[i].iov_len;
2414 slen = rte_pktmbuf_data_room_size(mp);
2415 slen = RTE_MIN(slen, s->tcb.snd.mss);
2417 num = (tsz + slen - 1) / slen;
2418 n = rte_ring_free_count(s->tx.q);
2419 num = RTE_MIN(num, n);
2420 n = RTE_MIN(num, RTE_DIM(mb));
2422 /* allocate mbufs */
2423 if (rte_pktmbuf_alloc_bulk(mp, mb, n) != 0) {
2425 tcp_stream_release(s);
2429 /* copy data into the mbufs */
2432 for (i = 0; i != iovcnt; i++) {
2435 k += _iovec_to_mbsegs(&iv, slen, mb + k, n - k);
2436 if (iv.iov_len != 0) {
2442 /* partially filled segment */
2443 k += (k != n && mb[k]->data_len != 0);
2445 /* fill pkt headers */
2446 ol_flags = s->tx.dst.ol_flags;
2448 for (j = 0; j != k; j++) {
2449 rc = tcp_fill_mbuf(mb[j], s, &s->tx.dst, ol_flags,
2450 s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2455 /* if no error encountered, then enqueue pkts for transmission */
2457 k = _rte_ring_enqueue_burst(s->tx.q, (void **)mb, j);
2463 /* free pkts that were not enqueued */
2464 free_mbufs(mb + k, j - k);
2466 /* our last segment can be partially filled */
2467 sz += slen - sz % slen;
2468 sz -= (j - k) * slen;
2470 /* report an error */
2479 /* notify BE about more data to send */
2480 txs_enqueue(s->s.ctx, s);
2482 /* if possible, re-arm stream write event. */
2483 if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2484 tle_event_raise(s->tx.ev);
2487 tcp_stream_release(s);
2491 /* send data and FIN (if needed) */
2493 tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
2495 /* try to send some data */
2496 tx_nxt_data(s, tms);
2498 /* we also have to send a FIN */
2499 if (state != TCP_ST_ESTABLISHED &&
2500 state != TCP_ST_CLOSE_WAIT &&
2501 tcp_txq_nxt_cnt(s) == 0 &&
2502 s->tcb.snd.fss != s->tcb.snd.nxt) {
2503 s->tcb.snd.fss = ++s->tcb.snd.nxt;
2504 send_ack(s, tms, TCP_FLAG_FIN | TCP_FLAG_ACK);
2509 tx_stream(struct tle_tcp_stream *s, uint32_t tms)
2513 state = s->tcb.state;
2515 if (state == TCP_ST_SYN_SENT) {
2516 /* send the SYN, start the rto timer */
2517 send_ack(s, tms, TCP_FLAG_SYN);
2520 } else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2522 tx_data_fin(s, tms, state);
2524 /* start RTO timer. */
2525 if (s->tcb.snd.nxt != s->tcb.snd.una)
2531 rto_stream(struct tle_tcp_stream *s, uint32_t tms)
2535 state = s->tcb.state;
2537 TCP_LOG(DEBUG, "%s(%p, tms=%u): state=%u, "
2538 "retx=%u, retm=%u, "
2539 "rto=%u, snd.ts=%u, tmo=%u, "
2540 "snd.nxt=%lu, snd.una=%lu, flight_size=%lu, "
2541 "snd.rcvr=%lu, snd.fastack=%u, "
2542 "wnd=%u, cwnd=%u, ssthresh=%u, "
2543 "bytes sent=%lu, pkt remain=%u;\n",
2544 __func__, s, tms, s->tcb.state,
2545 s->tcb.snd.nb_retx, s->tcb.snd.nb_retm,
2546 s->tcb.snd.rto, s->tcb.snd.ts, tms - s->tcb.snd.ts,
2547 s->tcb.snd.nxt, s->tcb.snd.una, s->tcb.snd.nxt - s->tcb.snd.una,
2548 s->tcb.snd.rcvr, s->tcb.snd.fastack,
2549 s->tcb.snd.wnd, s->tcb.snd.cwnd, s->tcb.snd.ssthresh,
2550 s->tcb.snd.nxt - s->tcb.snd.iss, tcp_txq_nxt_cnt(s));
2552 if (s->tcb.snd.nb_retx < s->tcb.snd.nb_retm) {
2554 if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2556 /* update SND.CWD and SND.SSTHRESH */
2557 rto_cwnd_update(&s->tcb);
2559 /* RFC 6582 3.2.4 */
2560 s->tcb.snd.rcvr = s->tcb.snd.nxt;
2561 s->tcb.snd.fastack = 0;
2563 /* restart from last acked data */
2564 tcp_txq_rst_nxt_head(s);
2565 s->tcb.snd.nxt = s->tcb.snd.una;
2567 tx_data_fin(s, tms, state);
2569 } else if (state == TCP_ST_SYN_SENT) {
2571 s->tcb.so.ts.val = tms;
2573 /* According to RFC 6928 2:
2574 * To reduce the chance for spurious SYN or SYN/ACK
2575 * retransmission, it is RECOMMENDED that
2576 * implementations refrain from resetting the initial
2577 * window to 1 segment, unless there have been more
2578 * than one SYN or SYN/ACK retransmissions or true loss
2579 * detection has been made.
2581 if (s->tcb.snd.nb_retx != 0)
2582 s->tcb.snd.cwnd = s->tcb.snd.mss;
2584 send_ack(s, tms, TCP_FLAG_SYN);
2586 } else if (state == TCP_ST_TIME_WAIT) {
2590 /* RFC6298:5.5 back off the timer */
2591 s->tcb.snd.rto = rto_roundup(2 * s->tcb.snd.rto);
2592 s->tcb.snd.nb_retx++;
2596 send_rst(s, s->tcb.snd.una);
2602 tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
2606 struct tle_timer_wheel *tw;
2607 struct tle_stream *p;
2608 struct tle_tcp_stream *s, *rs[num];
2610 /* process streams with RTO exipred */
2612 tw = CTX_TCP_TMWHL(ctx);
2613 tms = tcp_get_tms(ctx->cycles_ms_shift);
2614 tle_timer_expire(tw, tms);
2616 k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs));
2618 for (i = 0; i != k; i++) {
2621 s->timer.handle = NULL;
2622 if (tcp_stream_try_acquire(s) > 0)
2624 tcp_stream_release(s);
2627 /* process streams from to-send queue */
2629 k = txs_dequeue_bulk(ctx, rs, RTE_DIM(rs));
2631 for (i = 0; i != k; i++) {
2634 rte_atomic32_set(&s->tx.arm, 0);
2636 if (tcp_stream_try_acquire(s) > 0)
2639 txs_enqueue(s->s.ctx, s);
2640 tcp_stream_release(s);
2643 /* collect streams to close from the death row */
2645 dr = CTX_TCP_SDR(ctx);
2646 for (k = 0, p = STAILQ_FIRST(&dr->be);
2647 k != num && p != NULL;
2648 k++, p = STAILQ_NEXT(p, link))
2649 rs[k] = TCP_STREAM(p);
2652 STAILQ_INIT(&dr->be);
2654 STAILQ_FIRST(&dr->be) = p;
2656 /* cleanup closed streams */
2657 for (i = 0; i != k; i++) {
2660 tcp_stream_reset(ctx, s);