2 * Copyright (c) 2016 Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
16 #include <rte_errno.h>
17 #include <rte_ethdev.h>
19 #include <rte_ip_frag.h>
22 #include "tcp_stream.h"
23 #include "tcp_timer.h"
24 #include "stream_table.h"
25 #include "syncookie.h"
30 #include "tcp_tx_seg.h"
32 #define TCP_MAX_PKT_SEG 0x20
35 * checks if input TCP ports and IP addresses match given stream.
36 * returns zero on success.
39 rx_check_stream(const struct tle_tcp_stream *s, const union pkt_info *pi)
43 if (pi->tf.type == TLE_V4)
44 rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
45 (pi->addr4.raw & s->s.ipv4.mask.raw) !=
48 rc = (pi->port.raw & s->s.pmsk.raw) != s->s.port.raw ||
49 ymm_mask_cmp(&pi->addr6->raw, &s->s.ipv6.addr.raw,
50 &s->s.ipv6.mask.raw) != 0;
55 static inline struct tle_tcp_stream *
56 rx_obtain_listen_stream(const struct tle_dev *dev, const union pkt_info *pi,
59 struct tle_tcp_stream *s;
61 s = (struct tle_tcp_stream *)dev->dp[type]->streams[pi->port.dst];
62 if (s == NULL || rwl_acquire(&s->rx.use) < 0)
65 /* check that we have a proper stream. */
66 if (s->tcb.state != TCP_ST_LISTEN) {
67 rwl_release(&s->rx.use);
74 static inline struct tle_tcp_stream *
75 rx_obtain_stream(const struct tle_dev *dev, struct stbl *st,
76 const union pkt_info *pi, uint32_t type)
78 struct tle_tcp_stream *s;
80 s = stbl_find_data(st, pi);
82 if (pi->tf.flags == TCP_FLAG_ACK)
83 return rx_obtain_listen_stream(dev, pi, type);
87 if (stbl_data_pkt(s) || rwl_acquire(&s->rx.use) < 0)
89 /* check that we have a proper stream. */
90 else if (s->tcb.state == TCP_ST_CLOSED) {
91 rwl_release(&s->rx.use);
99 * Consider 2 pkt_info *equal* if their:
100 * - types (IPv4/IPv6)
103 * - TCP src and dst ports
104 * - IP src and dst addresses
108 pkt_info_bulk_eq(const union pkt_info pi[], uint32_t num)
114 if (pi[0].tf.type == TLE_V4) {
115 while (i != num && xmm_cmp(&pi[0].raw, &pi[i].raw) == 0)
118 } else if (pi[0].tf.type == TLE_V6) {
120 pi[0].raw.u64[0] == pi[i].raw.u64[0] &&
121 ymm_cmp(&pi[0].addr6->raw,
122 &pi[i].addr6->raw) == 0)
130 pkt_info_bulk_syneq(const union pkt_info pi[], uint32_t num)
136 if (pi[0].tf.type == TLE_V4) {
137 while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
138 pi[0].port.dst == pi[i].port.dst &&
139 pi[0].addr4.dst == pi[i].addr4.dst)
142 } else if (pi[0].tf.type == TLE_V6) {
143 while (i != num && pi[0].tf.raw == pi[i].tf.raw &&
144 pi[0].port.dst == pi[i].port.dst &&
145 xmm_cmp(&pi[0].addr6->dst,
146 &pi[i].addr6->dst) == 0)
154 stream_drb_free(struct tle_tcp_stream *s, struct tle_drb *drbs[],
157 _rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
160 static inline uint32_t
161 stream_drb_alloc(struct tle_tcp_stream *s, struct tle_drb *drbs[],
164 return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
168 fill_tcph(struct tcp_hdr *l4h, const struct tcb *tcb, union l4_ports port,
169 uint32_t seq, uint8_t hlen, uint8_t flags)
173 l4h->src_port = port.dst;
174 l4h->dst_port = port.src;
176 wnd = (flags & TCP_FLAG_SYN) ?
177 RTE_MIN(tcb->rcv.wnd, (uint32_t)UINT16_MAX) :
178 tcb->rcv.wnd >> tcb->rcv.wscale;
180 /* ??? use sse shuffle to hton all remaining 16 bytes at once. ??? */
181 l4h->sent_seq = rte_cpu_to_be_32(seq);
182 l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
183 l4h->data_off = hlen / TCP_DATA_ALIGN << TCP_DATA_OFFSET;
184 l4h->tcp_flags = flags;
185 l4h->rx_win = rte_cpu_to_be_16(wnd);
189 if (flags & TCP_FLAG_SYN)
190 fill_syn_opts(l4h + 1, &tcb->so);
191 else if ((flags & TCP_FLAG_RST) == 0 && tcb->so.ts.raw != 0)
192 fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
196 tcp_fill_mbuf(struct rte_mbuf *m, const struct tle_tcp_stream *s,
197 const struct tle_dest *dst, uint64_t ol_flags,
198 union l4_ports port, uint32_t seq, uint32_t flags,
199 uint32_t pid, uint32_t swcsm)
201 uint32_t l4, len, plen;
205 len = dst->l2_len + dst->l3_len;
208 if (flags & TCP_FLAG_SYN)
209 l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_MAX;
210 else if ((flags & TCP_FLAG_RST) == 0 && s->tcb.rcv.ts != 0)
211 l4 = sizeof(*l4h) + TCP_TX_OPT_LEN_TMS;
215 /* adjust mbuf to put L2/L3/L4 headers into it. */
216 l2h = rte_pktmbuf_prepend(m, len + l4);
220 /* copy L2/L3 header */
221 rte_memcpy(l2h, dst->hdr, len);
223 /* setup TCP header & options */
224 l4h = (struct tcp_hdr *)(l2h + len);
225 fill_tcph(l4h, &s->tcb, port, seq, l4, flags);
227 /* setup mbuf TX offload related fields. */
228 m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len, l4, 0, 0, 0);
229 m->ol_flags |= ol_flags;
231 /* update proto specific fields. */
233 if (s->s.type == TLE_V4) {
234 struct ipv4_hdr *l3h;
235 l3h = (struct ipv4_hdr *)(l2h + dst->l2_len);
236 l3h->packet_id = rte_cpu_to_be_16(pid);
237 l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len + l4);
239 if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
240 l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
243 l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
245 if ((ol_flags & PKT_TX_IP_CKSUM) == 0 && swcsm != 0)
246 l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
248 struct ipv6_hdr *l3h;
249 l3h = (struct ipv6_hdr *)(l2h + dst->l2_len);
250 l3h->payload_len = rte_cpu_to_be_16(plen + l4);
251 if ((ol_flags & PKT_TX_TCP_CKSUM) != 0)
252 l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
254 l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
261 * That function supposed to be used only for data packets.
262 * Assumes that L2/L3/L4 headers and mbuf fields already setup properly.
263 * - updates tcp SEG.SEQ, SEG.ACK, TS.VAL, TS.ECR.
264 * - if no HW cksum offloads are enabled, calculates TCP checksum.
267 tcp_update_mbuf(struct rte_mbuf *m, uint32_t type, const struct tcb *tcb,
268 uint32_t seq, uint32_t pid)
273 len = m->l2_len + m->l3_len;
274 l4h = rte_pktmbuf_mtod_offset(m, struct tcp_hdr *, len);
276 l4h->sent_seq = rte_cpu_to_be_32(seq);
277 l4h->recv_ack = rte_cpu_to_be_32(tcb->rcv.nxt);
279 if (tcb->so.ts.raw != 0)
280 fill_tms_opts(l4h + 1, tcb->snd.ts, tcb->rcv.ts);
282 if (type == TLE_V4) {
283 struct ipv4_hdr *l3h;
284 l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *, m->l2_len);
285 l3h->hdr_checksum = 0;
286 l3h->packet_id = rte_cpu_to_be_16(pid);
287 if ((m->ol_flags & PKT_TX_IP_CKSUM) == 0)
288 l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
291 /* have to calculate TCP checksum in SW */
292 if ((m->ol_flags & PKT_TX_TCP_CKSUM) == 0) {
296 if (type == TLE_V4) {
297 struct ipv4_hdr *l3h;
298 l3h = rte_pktmbuf_mtod_offset(m, struct ipv4_hdr *,
300 l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
303 struct ipv6_hdr *l3h;
304 l3h = rte_pktmbuf_mtod_offset(m, struct ipv6_hdr *,
306 l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
311 /* Send data packets that need to be ACK-ed by peer */
312 static inline uint32_t
313 tx_data_pkts(struct tle_tcp_stream *s, struct rte_mbuf *const m[], uint32_t num)
315 uint32_t bsz, i, nb, nbm;
317 struct tle_drb *drb[num];
319 /* calculate how many drbs are needed.*/
320 bsz = s->tx.drb.nb_elem;
321 nbm = (num + bsz - 1) / bsz;
323 /* allocate drbs, adjust number of packets. */
324 nb = stream_drb_alloc(s, drb, nbm);
326 /* drb ring is empty. */
335 /* enqueue pkts for TX. */
337 i = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)m,
340 /* free unused drbs. */
342 stream_drb_free(s, drb + nbm - nb, nb);
347 static inline uint32_t
348 tx_data_bulk(struct tle_tcp_stream *s, union seqlen *sl, struct rte_mbuf *mi[],
351 uint32_t fail, i, k, n, mss, pid, plen, sz, tn, type;
354 struct rte_mbuf *mo[MAX_PKT_BURST + TCP_MAX_PKT_SEG];
356 mss = s->tcb.snd.mss;
360 pid = rte_atomic32_add_return(&dev->tx.packet_id[type], num) - num;
365 for (i = 0; i != num && sl->len != 0 && fail == 0; i++) {
368 sz = RTE_MIN(sl->len, mss);
369 plen = PKT_L4_PLEN(mb);
371 /*fast path, no need to use indirect mbufs. */
374 /* update pkt TCP header */
375 tcp_update_mbuf(mb, type, &s->tcb, sl->seq, pid + i);
377 /* keep mbuf till ACK is received. */
378 rte_pktmbuf_refcnt_update(mb, 1);
382 /* remaining snd.wnd is less them MSS, send nothing */
385 /* packet indirection needed */
389 if (k >= MAX_PKT_BURST) {
390 n = tx_data_pkts(s, mo, k);
398 n = tx_data_pkts(s, mo, k);
404 sz = tcp_mbuf_seq_free(mo + n, fail);
413 * gets data from stream send buffer, updates it and
414 * queues it into TX device queue.
415 * Note that this function and is not MT safe.
417 static inline uint32_t
418 tx_nxt_data(struct tle_tcp_stream *s, uint32_t tms)
420 uint32_t n, num, tn, wnd;
421 struct rte_mbuf **mi;
425 wnd = s->tcb.snd.wnd - (uint32_t)(s->tcb.snd.nxt - s->tcb.snd.una);
426 sl.seq = s->tcb.snd.nxt;
427 sl.len = RTE_MIN(wnd, s->tcb.snd.cwnd);
432 /* update send timestamp */
436 /* get group of packets */
437 mi = tcp_txq_get_nxt_objs(s, &num);
439 /* stream send buffer is empty */
443 /* queue data packets for TX */
444 n = tx_data_bulk(s, &sl, mi, num);
447 /* update consumer head */
448 tcp_txq_set_nxt_head(s, n);
451 s->tcb.snd.nxt += sl.seq - (uint32_t)s->tcb.snd.nxt;
456 free_una_data(struct tle_tcp_stream *s, uint32_t len)
458 uint32_t i, n, num, plen;
459 struct rte_mbuf **mi;
465 /* get group of packets */
466 mi = tcp_txq_get_una_objs(s, &num);
471 /* free acked data */
472 for (i = 0; i != num && n != len; i++, n = plen) {
473 plen += PKT_L4_PLEN(mi[i]);
475 /* keep SND.UNA at the start of the packet */
476 len -= RTE_MIN(len, plen - len);
479 rte_pktmbuf_free(mi[i]);
482 /* update consumer tail */
483 tcp_txq_set_una_tail(s, i);
484 } while (plen < len);
486 s->tcb.snd.una += len;
489 * that could happen in case of retransmit,
490 * adjust SND.NXT with SND.UNA.
492 if (s->tcb.snd.una > s->tcb.snd.nxt) {
493 tcp_txq_rst_nxt_head(s);
494 s->tcb.snd.nxt = s->tcb.snd.una;
498 static inline uint16_t
499 calc_smss(uint16_t mss, const struct tle_dest *dst)
503 n = dst->mtu - dst->l2_len - dst->l3_len - TCP_TX_HDR_DACK;
504 mss = RTE_MIN(n, mss);
510 * If SMSS > 2190 bytes:
511 * IW = 2 * SMSS bytes and MUST NOT be more than 2 segments
512 * If (SMSS > 1095 bytes) and (SMSS <= 2190 bytes):
513 * IW = 3 * SMSS bytes and MUST NOT be more than 3 segments
514 * if SMSS <= 1095 bytes:
515 * IW = 4 * SMSS bytes and MUST NOT be more than 4 segments
517 static inline uint32_t
518 initial_cwnd(uint16_t smss)
522 else if (smss > 1095)
528 * queue standalone packet to he particular output device
530 * - L2/L3/L4 headers should be already set.
531 * - packet fits into one segment.
534 send_pkt(struct tle_tcp_stream *s, struct tle_dev *dev, struct rte_mbuf *m)
539 if (stream_drb_alloc(s, &drb, 1) == 0)
542 /* enqueue pkt for TX. */
544 n = tle_dring_mp_enqueue(&dev->tx.dr, (const void * const*)&m, 1,
547 /* free unused drbs. */
549 stream_drb_free(s, &drb, 1);
551 return (n == 1) ? 0 : -ENOBUFS;
555 send_ctrl_pkt(struct tle_tcp_stream *s, struct rte_mbuf *m, uint32_t seq,
558 const struct tle_dest *dst;
564 pid = rte_atomic32_add_return(&dst->dev->tx.packet_id[type], 1) - 1;
566 rc = tcp_fill_mbuf(m, s, dst, 0, s->s.port, seq, flags, pid, 1);
568 rc = send_pkt(s, dst->dev, m);
574 send_rst(struct tle_tcp_stream *s, uint32_t seq)
579 m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
583 rc = send_ctrl_pkt(s, m, seq, TCP_FLAG_RST);
591 send_ack(struct tle_tcp_stream *s, uint32_t tms, uint32_t flags)
597 m = rte_pktmbuf_alloc(s->tx.dst.head_mp);
601 seq = s->tcb.snd.nxt - ((flags & (TCP_FLAG_FIN | TCP_FLAG_SYN)) != 0);
604 rc = send_ctrl_pkt(s, m, seq, flags);
610 s->tcb.snd.ack = s->tcb.rcv.nxt;
616 sync_ack(struct tle_tcp_stream *s, const union pkt_info *pi,
617 const union seg_info *si, uint32_t ts, struct rte_mbuf *m)
621 uint32_t pid, seq, type;
625 const struct tcp_hdr *th;
629 /* get destination information. */
633 da = &pi->addr6->src;
635 rc = stream_get_dest(&s->s, da, &dst);
639 th = rte_pktmbuf_mtod_offset(m, const struct tcp_hdr *,
640 m->l2_len + m->l3_len);
641 get_syn_opts(&s->tcb.so, (uintptr_t)(th + 1), m->l4_len - sizeof(*th));
643 s->tcb.rcv.nxt = si->seq + 1;
644 seq = sync_gen_seq(pi, s->tcb.rcv.nxt, ts, s->tcb.so.mss,
645 s->s.ctx->prm.hash_alg,
646 &s->s.ctx->prm.secret_key);
647 s->tcb.so.ts.ecr = s->tcb.so.ts.val;
648 s->tcb.so.ts.val = sync_gen_ts(ts, s->tcb.so.wscale);
649 s->tcb.so.wscale = (s->tcb.so.wscale == TCP_WSCALE_NONE) ?
650 TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
651 s->tcb.so.mss = calc_smss(dst.mtu, &dst);
653 /* reset mbuf's data contents. */
654 len = m->l2_len + m->l3_len + m->l4_len;
656 if (rte_pktmbuf_adj(m, len) == NULL)
660 pid = rte_atomic32_add_return(&dev->tx.packet_id[type], 1) - 1;
662 rc = tcp_fill_mbuf(m, s, &dst, 0, pi->port, seq,
663 TCP_FLAG_SYN | TCP_FLAG_ACK, pid, 1);
665 rc = send_pkt(s, dev, m);
672 * There are four cases for the acceptability test for an incoming segment:
673 * Segment Receive Test
675 * ------- ------- -------------------------------------------
676 * 0 0 SEG.SEQ = RCV.NXT
677 * 0 >0 RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
678 * >0 0 not acceptable
679 * >0 >0 RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
680 * or RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
683 check_seqn(const struct tcb *tcb, uint32_t seqn, uint32_t len)
688 if (seqn - tcb->rcv.nxt >= tcb->rcv.wnd &&
689 n - tcb->rcv.nxt > tcb->rcv.wnd)
695 static inline union tsopt
696 rx_tms_opt(const struct tcb *tcb, const struct rte_mbuf *mb)
700 const struct tcp_hdr *th;
702 if (tcb->so.ts.val != 0) {
703 opt = rte_pktmbuf_mtod_offset(mb, uintptr_t,
704 mb->l2_len + mb->l3_len + sizeof(*th));
705 ts = get_tms_opts(opt, mb->l4_len - sizeof(*th));
713 * PAWS and sequence check.
717 rx_check_seq(struct tcb *tcb, uint32_t seq, uint32_t len, const union tsopt ts)
721 /* RFC 1323 4.2.1 R2 */
722 rc = check_seqn(tcb, seq, len);
728 /* RFC 1323 4.2.1 R1 */
729 if (tcp_seq_lt(ts.val, tcb->rcv.ts))
732 /* RFC 1323 4.2.1 R3 */
733 if (tcp_seq_leq(seq, tcb->snd.ack) &&
734 tcp_seq_lt(tcb->snd.ack, seq + len))
735 tcb->rcv.ts = ts.val;
742 rx_check_ack(const struct tcb *tcb, uint32_t ack)
746 max = (uint32_t)RTE_MAX(tcb->snd.nxt, tcb->snd.rcvr);
748 if (tcp_seq_leq(tcb->snd.una, ack) && tcp_seq_leq(ack, max))
755 rx_check_seqack(struct tcb *tcb, uint32_t seq, uint32_t ack, uint32_t len,
756 const union tsopt ts)
760 rc = rx_check_seq(tcb, seq, len, ts);
761 rc |= rx_check_ack(tcb, ack);
766 restore_syn_opt(struct syn_opts *so, const union pkt_info *pi,
767 const union seg_info *si, uint32_t ts, const struct rte_mbuf *mb,
768 uint32_t hash_alg, rte_xmm_t *secret_key)
772 const struct tcp_hdr *th;
774 /* check that ACK, etc fields are what we expected. */
775 rc = sync_check_ack(pi, si->seq, si->ack - 1, ts,
783 th = rte_pktmbuf_mtod_offset(mb, const struct tcp_hdr *,
784 mb->l2_len + mb->l3_len);
785 len = mb->l4_len - sizeof(*th);
786 sync_get_opts(so, (uintptr_t)(th + 1), len);
791 stream_term(struct tle_tcp_stream *s)
795 s->tcb.state = TCP_ST_CLOSED;
800 /* close() was already invoked, schedule final cleanup */
801 if ((s->tcb.uop & TCP_OP_CLOSE) != 0) {
803 dr = CTX_TCP_SDR(s->s.ctx);
804 STAILQ_INSERT_TAIL(&dr->be, &s->s, link);
806 /* notify user that stream need to be closed */
807 } else if (s->err.ev != NULL)
808 tle_event_raise(s->err.ev);
809 else if (s->err.cb.func != NULL)
810 s->err.cb.func(s->err.cb.data, &s->s);
814 stream_fill_dest(struct tle_tcp_stream *s)
819 if (s->s.type == TLE_V4)
820 da = &s->s.ipv4.addr.src;
822 da = &s->s.ipv6.addr.src;
824 rc = stream_get_dest(&s->s, da, &s->tx.dst);
825 return (rc < 0) ? rc : 0;
829 * helper function, prepares a new accept stream.
832 accept_prep_stream(struct tle_tcp_stream *ps, struct stbl *st,
833 struct tle_tcp_stream *cs, const struct syn_opts *so,
834 uint32_t tms, const union pkt_info *pi, const union seg_info *si)
839 /* some TX still pending for that stream. */
840 if (TCP_STREAM_TX_PENDING(cs))
843 /* setup L4 ports and L3 addresses fields. */
844 cs->s.port.raw = pi->port.raw;
845 cs->s.pmsk.raw = UINT32_MAX;
847 if (pi->tf.type == TLE_V4) {
848 cs->s.ipv4.addr = pi->addr4;
849 cs->s.ipv4.mask.src = INADDR_NONE;
850 cs->s.ipv4.mask.dst = INADDR_NONE;
851 } else if (pi->tf.type == TLE_V6) {
852 cs->s.ipv6.addr = *pi->addr6;
853 rte_memcpy(&cs->s.ipv6.mask.src, &tle_ipv6_none,
854 sizeof(cs->s.ipv6.mask.src));
855 rte_memcpy(&cs->s.ipv6.mask.dst, &tle_ipv6_none,
856 sizeof(cs->s.ipv6.mask.dst));
860 sync_fill_tcb(&cs->tcb, si, so);
861 cs->tcb.rcv.wnd = calc_rx_wnd(cs, cs->tcb.rcv.wscale);
865 * for now rtt is calculated based on the tcp TMS option,
866 * later add real-time one
868 if (cs->tcb.so.ts.ecr) {
869 rtt = tms - cs->tcb.so.ts.ecr;
870 rto_estimate(&cs->tcb, rtt);
872 cs->tcb.snd.rto = TCP_RTO_DEFAULT;
874 /* copy streams type. */
875 cs->s.type = ps->s.type;
877 /* retrive and cache destination information. */
878 rc = stream_fill_dest(cs);
882 /* update snd.mss with SMSS value */
883 cs->tcb.snd.mss = calc_smss(cs->tcb.snd.mss, &cs->tx.dst);
885 /* setup congestion variables */
886 cs->tcb.snd.cwnd = initial_cwnd(cs->tcb.snd.mss);
887 cs->tcb.snd.ssthresh = cs->tcb.snd.wnd;
889 cs->tcb.state = TCP_ST_ESTABLISHED;
891 /* add stream to the table */
892 cs->ste = stbl_add_stream(st, pi, cs);
896 cs->tcb.uop |= TCP_OP_ACCEPT;
903 * ACK for new connection request arrived.
904 * Check that the packet meets all conditions and try to open a new stream.
906 * < 0 - invalid packet
907 * == 0 - packet is valid and new stream was opened for it.
908 * > 0 - packet is valid, but failed to open new stream.
911 rx_ack_listen(struct tle_tcp_stream *s, struct stbl *st,
912 const union pkt_info *pi, const union seg_info *si,
913 uint32_t tms, struct rte_mbuf *mb, struct tle_tcp_stream **csp)
917 struct tle_stream *ts;
918 struct tle_tcp_stream *cs;
923 if (pi->tf.flags != TCP_FLAG_ACK || rx_check_stream(s, pi) != 0)
927 rc = restore_syn_opt(&so, pi, si, tms, mb, ctx->prm.hash_alg,
928 &ctx->prm.secret_key);
932 /* allocate new stream */
933 ts = get_stream(ctx);
938 /* prepare stream to handle new connection */
939 if (accept_prep_stream(s, st, cs, &so, tms, pi, si) == 0) {
941 /* put new stream in the accept queue */
942 if (_rte_ring_enqueue_burst(s->rx.q,
943 (void * const *)&ts, 1) == 1) {
948 /* cleanup on failure */
950 stbl_del_pkt(st, cs->ste, pi);
954 tcp_stream_reset(ctx, cs);
959 data_pkt_adjust(const struct tcb *tcb, struct rte_mbuf *mb, uint32_t hlen,
960 uint32_t *seqn, uint32_t *plen)
962 uint32_t len, n, seq;
967 rte_pktmbuf_adj(mb, hlen);
970 /* cut off the start of the packet */
971 else if (tcp_seq_lt(seq, tcb->rcv.nxt)) {
972 n = tcb->rcv.nxt - seq;
976 rte_pktmbuf_adj(mb, n);
984 static inline uint32_t
985 rx_ackdata(struct tle_tcp_stream *s, uint32_t ack)
989 n = ack - (uint32_t)s->tcb.snd.una;
991 /* some more data was acked. */
994 /* advance SND.UNA and free related packets. */
995 k = rte_ring_free_count(s->tx.q);
998 /* mark the stream as available for writing */
999 if (rte_ring_free_count(s->tx.q) != 0) {
1000 if (s->tx.ev != NULL)
1001 tle_event_raise(s->tx.ev);
1002 else if (k == 0 && s->tx.cb.func != NULL)
1003 s->tx.cb.func(s->tx.cb.data, &s->s);
1011 rx_fin_state(struct tle_tcp_stream *s, struct resp_info *rsp)
1016 s->tcb.rcv.nxt += 1;
1018 ackfin = (s->tcb.snd.una == s->tcb.snd.fss);
1019 state = s->tcb.state;
1021 if (state == TCP_ST_ESTABLISHED) {
1022 s->tcb.state = TCP_ST_CLOSE_WAIT;
1023 /* raise err.ev & err.cb */
1024 if (s->err.ev != NULL)
1025 tle_event_raise(s->err.ev);
1026 else if (s->err.cb.func != NULL)
1027 s->err.cb.func(s->err.cb.data, &s->s);
1028 } else if (state == TCP_ST_FIN_WAIT_1 || state == TCP_ST_CLOSING) {
1029 rsp->flags |= TCP_FLAG_ACK;
1031 s->tcb.state = TCP_ST_TIME_WAIT;
1032 s->tcb.snd.rto = TCP_RTO_2MSL;
1035 s->tcb.state = TCP_ST_CLOSING;
1036 } else if (state == TCP_ST_FIN_WAIT_2) {
1037 rsp->flags |= TCP_FLAG_ACK;
1038 s->tcb.state = TCP_ST_TIME_WAIT;
1039 s->tcb.snd.rto = TCP_RTO_2MSL;
1041 } else if (state == TCP_ST_LAST_ACK && ackfin != 0) {
1047 * FIN process for ESTABLISHED state
1049 * 0 < - error occurred
1050 * 0 - FIN was processed OK, and mbuf can be free/reused.
1051 * 0 > - FIN was processed OK and mbuf can't be free/reused.
1054 rx_fin(struct tle_tcp_stream *s, uint32_t state,
1055 const union seg_info *si, struct rte_mbuf *mb,
1056 struct resp_info *rsp)
1058 uint32_t hlen, plen, seq;
1062 hlen = PKT_L234_HLEN(mb);
1063 plen = mb->pkt_len - hlen;
1066 ts = rx_tms_opt(&s->tcb, mb);
1067 ret = rx_check_seqack(&s->tcb, seq, si->ack, plen, ts);
1071 if (state < TCP_ST_ESTABLISHED)
1076 ret = data_pkt_adjust(&s->tcb, mb, hlen, &seq, &plen);
1079 if (rx_data_enqueue(s, seq, plen, &mb, 1) != 1)
1083 /* process ack here */
1084 rx_ackdata(s, si->ack);
1086 /* some fragments still missing */
1087 if (seq + plen != s->tcb.rcv.nxt) {
1088 s->tcb.rcv.frs.seq = seq + plen;
1089 s->tcb.rcv.frs.on = 1;
1091 rx_fin_state(s, rsp);
1097 rx_rst(struct tle_tcp_stream *s, uint32_t state, uint32_t flags,
1098 const union seg_info *si)
1103 * RFC 793: In all states except SYN-SENT, all reset (RST) segments
1104 * are validated by checking their SEQ-fields.
1105 * A reset is valid if its sequence number is in the window.
1106 * In the SYN-SENT state (a RST received in response to an initial SYN),
1107 * the RST is acceptable if the ACK field acknowledges the SYN.
1109 if (state == TCP_ST_SYN_SENT) {
1110 rc = ((flags & TCP_FLAG_ACK) == 0 ||
1111 si->ack != s->tcb.snd.nxt) ?
1116 rc = check_seqn(&s->tcb, si->seq, 0);
1125 * check do we have FIN that was received out-of-order.
1126 * if yes, try to process it now.
1129 rx_ofo_fin(struct tle_tcp_stream *s, struct resp_info *rsp)
1131 if (s->tcb.rcv.frs.on != 0 && s->tcb.rcv.nxt == s->tcb.rcv.frs.seq)
1132 rx_fin_state(s, rsp);
1136 dack_info_init(struct dack_info *tack, const struct tcb *tcb)
1138 memset(tack, 0, sizeof(*tack));
1139 tack->ack = tcb->snd.una;
1140 tack->segs.dup = tcb->rcv.dupack;
1141 tack->wu.raw = tcb->snd.wu.raw;
1142 tack->wnd = tcb->snd.wnd >> tcb->snd.wscale;
1146 ack_window_update(struct tcb *tcb, const struct dack_info *tack)
1148 tcb->snd.wu.raw = tack->wu.raw;
1149 tcb->snd.wnd = tack->wnd << tcb->snd.wscale;
1153 ack_cwnd_update(struct tcb *tcb, uint32_t acked, const struct dack_info *tack)
1157 n = tack->segs.ack * tcb->snd.mss;
1159 /* slow start phase, RFC 5681 3.1 (2) */
1160 if (tcb->snd.cwnd < tcb->snd.ssthresh)
1161 tcb->snd.cwnd += RTE_MIN(acked, n);
1162 /* congestion avoidance phase, RFC 5681 3.1 (3) */
1164 tcb->snd.cwnd += RTE_MAX(1U, n * tcb->snd.mss / tcb->snd.cwnd);
1168 rto_ssthresh_update(struct tcb *tcb)
1172 /* RFC 5681 3.1 (4) */
1173 n = (tcb->snd.nxt - tcb->snd.una) / 2;
1174 k = 2 * tcb->snd.mss;
1175 tcb->snd.ssthresh = RTE_MAX(n, k);
1179 rto_cwnd_update(struct tcb *tcb)
1182 if (tcb->snd.nb_retx == 0)
1183 rto_ssthresh_update(tcb);
1186 * RFC 5681 3.1: upon a timeout cwnd MUST be set to
1187 * no more than 1 full-sized segment.
1189 tcb->snd.cwnd = tcb->snd.mss;
1193 ack_info_update(struct dack_info *tack, const union seg_info *si,
1194 int32_t badseq, uint32_t dlen, const union tsopt ts)
1197 tack->segs.badseq++;
1201 /* segnt with incoming data */
1202 tack->segs.data += (dlen != 0);
1204 /* segment with newly acked data */
1205 if (tcp_seq_lt(tack->ack, si->ack)) {
1208 tack->ack = si->ack;
1212 * RFC 5681: An acknowledgment is considered a "duplicate" when:
1213 * (a) the receiver of the ACK has outstanding data
1214 * (b) the incoming acknowledgment carries no data
1215 * (c) the SYN and FIN bits are both off
1216 * (d) the acknowledgment number is equal to the TCP.UNA
1217 * (e) the advertised window in the incoming acknowledgment equals the
1218 * advertised window in the last incoming acknowledgment.
1220 * Here will have only to check only for (b),(d),(e).
1221 * (a) will be checked later for the whole bulk of packets,
1222 * (c) should never happen here.
1224 } else if (dlen == 0 && si->wnd == tack->wnd && ++tack->segs.dup == 3) {
1225 tack->dup3.seg = tack->segs.ack + 1;
1226 tack->dup3.ack = tack->ack;
1231 * If SND.UNA < SEG.ACK =< SND.NXT, the send window should be
1232 * updated. If (SND.WL1 < SEG.SEQ or (SND.WL1 = SEG.SEQ and
1233 * SND.WL2 =< SEG.ACK)), set SND.WND <- SEG.WND, set
1234 * SND.WL1 <- SEG.SEQ, and set SND.WL2 <- SEG.ACK.
1236 if (tcp_seq_lt(tack->wu.wl1, si->seq) ||
1237 (si->seq == tack->wu.wl1 &&
1238 tcp_seq_leq(tack->wu.wl2, si->ack))) {
1240 tack->wu.wl1 = si->seq;
1241 tack->wu.wl2 = si->ack;
1242 tack->wnd = si->wnd;
1246 static inline uint32_t
1247 rx_data_ack(struct tle_tcp_stream *s, struct dack_info *tack,
1248 const union seg_info si[], struct rte_mbuf *mb[], struct rte_mbuf *rp[],
1249 int32_t rc[], uint32_t num)
1251 uint32_t i, j, k, n, t;
1252 uint32_t hlen, plen, seq, tlen;
1257 for (i = 0; i != num; i = j) {
1259 hlen = PKT_L234_HLEN(mb[i]);
1260 plen = mb[i]->pkt_len - hlen;
1263 ts = rx_tms_opt(&s->tcb, mb[i]);
1264 ret = rx_check_seqack(&s->tcb, seq, si[i].ack, plen, ts);
1266 /* account segment received */
1267 ack_info_update(tack, &si[i], ret != 0, plen, ts);
1270 /* skip duplicate data, if any */
1271 ret = data_pkt_adjust(&s->tcb, mb[i], hlen,
1283 /* group sequential packets together. */
1284 for (tlen = plen; j != num; tlen += plen, j++) {
1286 hlen = PKT_L234_HLEN(mb[j]);
1287 plen = mb[j]->pkt_len - hlen;
1289 /* not consecutive packet */
1290 if (plen == 0 || seq + tlen != si[j].seq)
1294 ts = rx_tms_opt(&s->tcb, mb[j]);
1295 ret = rx_check_seqack(&s->tcb, si[j].seq, si[j].ack,
1298 /* account for segment received */
1299 ack_info_update(tack, &si[j], ret != 0, plen, ts);
1307 rte_pktmbuf_adj(mb[j], hlen);
1313 /* account for OFO data */
1314 if (seq != s->tcb.rcv.nxt)
1315 tack->segs.ofo += n;
1317 /* enqueue packets */
1318 t = rx_data_enqueue(s, seq, tlen, mb + i, n);
1320 /* if we are out of space in stream recv buffer. */
1321 for (; t != n; t++) {
1332 start_fast_retransmit(struct tle_tcp_stream *s)
1338 /* RFC 6582 3.2.2 */
1339 tcb->snd.rcvr = tcb->snd.nxt;
1340 tcb->snd.fastack = 1;
1342 /* RFC 5681 3.2.2 */
1343 rto_ssthresh_update(tcb);
1345 /* RFC 5681 3.2.3 */
1346 tcp_txq_rst_nxt_head(s);
1347 tcb->snd.nxt = tcb->snd.una;
1348 tcb->snd.cwnd = tcb->snd.ssthresh + 3 * tcb->snd.mss;
1352 stop_fast_retransmit(struct tle_tcp_stream *s)
1358 n = tcb->snd.nxt - tcb->snd.una;
1359 tcb->snd.cwnd = RTE_MIN(tcb->snd.ssthresh,
1360 RTE_MAX(n, tcb->snd.mss) + tcb->snd.mss);
1361 tcb->snd.fastack = 0;
1365 in_fast_retransmit(struct tle_tcp_stream *s, uint32_t ack_len, uint32_t ack_num,
1373 /* RFC 5682 3.2.3 partial ACK */
1376 n = ack_num * tcb->snd.mss;
1378 tcb->snd.cwnd -= ack_len - n;
1380 tcb->snd.cwnd -= ack_len % tcb->snd.mss;
1383 * For the first partial ACK that arrives
1384 * during fast recovery, also reset the
1387 if (tcb->snd.fastack == 1)
1390 tcb->snd.fastack += ack_num;
1393 /* RFC 5681 3.2.4 */
1394 } else if (dup_num > 3) {
1395 s->tcb.snd.cwnd += (dup_num - 3) * tcb->snd.mss;
1403 process_ack(struct tle_tcp_stream *s, uint32_t acked,
1404 const struct dack_info *tack)
1411 if (s->tcb.snd.fastack == 0) {
1415 /* RFC 6582 3.2.2 switch to fast retransmit mode */
1416 if (tack->dup3.seg != 0 && s->tcb.snd.una != s->tcb.snd.nxt &&
1417 s->tcb.snd.una >= s->tcb.snd.rcvr) {
1419 start_fast_retransmit(s);
1420 in_fast_retransmit(s,
1421 tack->ack - tack->dup3.ack,
1422 tack->segs.ack - tack->dup3.seg - 1,
1425 /* remain in normal mode */
1426 } else if (acked != 0) {
1427 ack_cwnd_update(&s->tcb, acked, tack);
1431 /* fast retransmit mode */
1434 /* remain in fast retransmit mode */
1435 if (s->tcb.snd.una < s->tcb.snd.rcvr) {
1437 send = in_fast_retransmit(s, acked, tack->segs.ack,
1440 /* RFC 5682 3.2.3 full ACK */
1441 stop_fast_retransmit(s);
1444 /* if we have another series of dup ACKs */
1445 if (tack->dup3.seg != 0 &&
1446 s->tcb.snd.una != s->tcb.snd.nxt &&
1447 tcp_seq_leq((uint32_t)s->tcb.snd.rcvr,
1450 /* restart fast retransmit again. */
1451 start_fast_retransmit(s);
1452 send = in_fast_retransmit(s,
1453 tack->ack - tack->dup3.ack,
1454 tack->segs.ack - tack->dup3.seg - 1,
1464 * our FIN was acked, stop rto timer, change stream state,
1465 * and possibly close the stream.
1468 rx_ackfin(struct tle_tcp_stream *s)
1472 s->tcb.snd.una = s->tcb.snd.fss;
1473 empty_mbuf_ring(s->tx.q);
1475 state = s->tcb.state;
1476 if (state == TCP_ST_LAST_ACK)
1478 else if (state == TCP_ST_FIN_WAIT_1) {
1480 s->tcb.state = TCP_ST_FIN_WAIT_2;
1481 } else if (state == TCP_ST_CLOSING) {
1482 s->tcb.state = TCP_ST_TIME_WAIT;
1483 s->tcb.snd.rto = TCP_RTO_2MSL;
1489 rx_process_ack(struct tle_tcp_stream *s, uint32_t ts,
1490 const struct dack_info *tack)
1495 s->tcb.rcv.dupack = tack->segs.dup;
1497 n = rx_ackdata(s, tack->ack);
1498 send = process_ack(s, n, tack);
1500 /* try to send more data. */
1501 if ((n != 0 || send != 0) && tcp_txq_nxt_cnt(s) != 0)
1502 txs_enqueue(s->s.ctx, s);
1504 /* restart RTO timer. */
1505 if (s->tcb.snd.nxt != s->tcb.snd.una)
1508 /* update rto, if fresh packet is here then calculate rtt */
1509 if (tack->ts.ecr != 0)
1510 rto_estimate(&s->tcb, ts - tack->ts.ecr);
1515 * returns negative value on failure, or zero on success.
1518 rx_synack(struct tle_tcp_stream *s, uint32_t ts, uint32_t state,
1519 const union seg_info *si, struct rte_mbuf *mb,
1520 struct resp_info *rsp)
1525 if (state != TCP_ST_SYN_SENT)
1528 /* invalid SEG.SEQ */
1529 if (si->ack != (uint32_t)s->tcb.snd.nxt) {
1530 rsp->flags = TCP_FLAG_RST;
1534 th = rte_pktmbuf_mtod_offset(mb, struct tcp_hdr *,
1535 mb->l2_len + mb->l3_len);
1536 get_syn_opts(&so, (uintptr_t)(th + 1), mb->l4_len - sizeof(*th));
1540 s->tcb.snd.una = s->tcb.snd.nxt;
1541 s->tcb.snd.mss = calc_smss(so.mss, &s->tx.dst);
1542 s->tcb.snd.wnd = si->wnd << so.wscale;
1543 s->tcb.snd.wu.wl1 = si->seq;
1544 s->tcb.snd.wu.wl2 = si->ack;
1545 s->tcb.snd.wscale = so.wscale;
1547 /* setup congestion variables */
1548 s->tcb.snd.cwnd = initial_cwnd(s->tcb.snd.mss);
1549 s->tcb.snd.ssthresh = s->tcb.snd.wnd;
1551 s->tcb.rcv.ts = so.ts.val;
1552 s->tcb.rcv.irs = si->seq;
1553 s->tcb.rcv.nxt = si->seq + 1;
1555 /* if peer doesn't support WSCALE opt, recalculate RCV.WND */
1556 s->tcb.rcv.wscale = (so.wscale == TCP_WSCALE_NONE) ?
1557 TCP_WSCALE_NONE : TCP_WSCALE_DEFAULT;
1558 s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
1560 /* calculate initial rto */
1561 rto_estimate(&s->tcb, ts - s->tcb.snd.ts);
1563 rsp->flags |= TCP_FLAG_ACK;
1566 s->tcb.state = TCP_ST_ESTABLISHED;
1569 if (s->tx.ev != NULL)
1570 tle_event_raise(s->tx.ev);
1571 else if (s->tx.cb.func != NULL)
1572 s->tx.cb.func(s->tx.cb.data, &s->s);
1577 static inline uint32_t
1578 rx_stream(struct tle_tcp_stream *s, uint32_t ts,
1579 const union pkt_info *pi, const union seg_info si[],
1580 struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1583 uint32_t i, k, n, state;
1585 struct resp_info rsp;
1586 struct dack_info tack;
1591 state = s->tcb.state;
1594 * first check for the states/flags where we don't
1595 * expect groups of packets.
1599 if ((pi->tf.flags & TCP_FLAG_RST) != 0) {
1602 rx_rst(s, state, pi->tf.flags, &si[i]);
1607 /* RFC 793: if the ACK bit is off drop the segment and return */
1608 } else if ((pi->tf.flags & TCP_FLAG_ACK) == 0) {
1611 * first check for the states/flags where we don't
1612 * expect groups of packets.
1615 /* process <SYN,ACK> */
1616 } else if ((pi->tf.flags & TCP_FLAG_SYN) != 0) {
1618 for (i = 0; i != num; i++) {
1619 ret = rx_synack(s, ts, state, &si[i], mb[i], &rsp);
1629 } else if ((pi->tf.flags & TCP_FLAG_FIN) != 0) {
1631 for (i = 0; i != num; i++) {
1632 ret = rx_fin(s, state, &si[i], mb[i], &rsp);
1642 /* normal data/ack packets */
1643 } else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
1645 /* process incoming data packets. */
1646 dack_info_init(&tack, &s->tcb);
1647 n = rx_data_ack(s, &tack, si, mb, rp, rc, num);
1649 /* follow up actions based on aggregated information */
1651 /* update SND.WND */
1652 ack_window_update(&s->tcb, &tack);
1655 * fast-path: all data & FIN was already sent out
1656 * and now is acknowledged.
1658 if (s->tcb.snd.fss == s->tcb.snd.nxt &&
1659 tack.ack == (uint32_t) s->tcb.snd.nxt)
1662 rx_process_ack(s, ts, &tack);
1665 * send an immediate ACK if either:
1666 * - received segment with invalid seq/ack number
1667 * - received segment with OFO data
1668 * - received segment with INO data and no TX is scheduled
1671 if (tack.segs.badseq != 0 || tack.segs.ofo != 0 ||
1672 (tack.segs.data != 0 &&
1673 rte_atomic32_read(&s->tx.arm) == 0))
1674 rsp.flags |= TCP_FLAG_ACK;
1676 rx_ofo_fin(s, &rsp);
1681 /* unhandled state, drop all packets. */
1685 /* we have a response packet to send. */
1686 if (rsp.flags == TCP_FLAG_RST) {
1687 send_rst(s, si[i].ack);
1689 } else if (rsp.flags != 0) {
1690 send_ack(s, ts, rsp.flags);
1692 /* start the timer for FIN packet */
1693 if ((rsp.flags & TCP_FLAG_FIN) != 0)
1697 /* unprocessed packets */
1698 for (; i != num; i++, k++) {
1706 static inline uint32_t
1707 rx_new_stream(struct tle_tcp_stream *s, uint32_t ts,
1708 const union pkt_info *pi, const union seg_info si[],
1709 struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1714 if (rwl_acquire(&s->rx.use) > 0) {
1715 i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1716 rwl_release(&s->rx.use);
1720 for (i = 0; i != num; i++) {
1727 static inline uint32_t
1728 rx_postsyn(struct tle_dev *dev, struct stbl *st, uint32_t type, uint32_t ts,
1729 const union pkt_info pi[], const union seg_info si[],
1730 struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1733 struct tle_tcp_stream *cs, *s;
1734 uint32_t i, k, n, state;
1737 s = rx_obtain_stream(dev, st, &pi[0], type);
1739 for (i = 0; i != num; i++) {
1747 state = s->tcb.state;
1749 if (state == TCP_ST_LISTEN) {
1751 /* one connection per flow */
1754 for (i = 0; i != num; i++) {
1756 ret = rx_ack_listen(s, st, pi, &si[i], ts, mb[i], &cs);
1758 /* valid packet encountered */
1762 /* invalid packet, keep trying to find a proper one */
1768 /* packet is valid, but we are out of streams to serve it */
1770 for (; i != num; i++, k++) {
1774 /* new stream is accepted */
1775 } else if (ret == 0) {
1777 /* inform listen stream about new connections */
1778 if (s->rx.ev != NULL)
1779 tle_event_raise(s->rx.ev);
1780 else if (s->rx.cb.func != NULL &&
1781 rte_ring_count(s->rx.q) == 1)
1782 s->rx.cb.func(s->rx.cb.data, &s->s);
1784 /* if there is no data, drop current packet */
1785 if (PKT_L4_PLEN(mb[i]) == 0) {
1790 /* process remaining packets for that stream */
1792 n = rx_new_stream(cs, ts, pi + i, si + i,
1793 mb + i, rp + k, rc + k, num - i);
1799 i = rx_stream(s, ts, pi, si, mb, rp, rc, num);
1803 rwl_release(&s->rx.use);
1808 static inline uint32_t
1809 rx_syn(struct tle_dev *dev, uint32_t type, uint32_t ts,
1810 const union pkt_info pi[], const union seg_info si[],
1811 struct rte_mbuf *mb[], struct rte_mbuf *rp[], int32_t rc[],
1814 struct tle_tcp_stream *s;
1818 s = rx_obtain_listen_stream(dev, &pi[0], type);
1820 for (i = 0; i != num; i++) {
1828 for (i = 0; i != num; i++) {
1830 /* check that this remote is allowed to connect */
1831 if (rx_check_stream(s, &pi[i]) != 0)
1834 /* syncokie: reply with <SYN,ACK> */
1835 ret = sync_ack(s, &pi[i], &si[i], ts, mb[i]);
1844 rwl_release(&s->rx.use);
1849 tle_tcp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
1850 struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
1853 uint32_t i, j, k, n, t, ts;
1855 union pkt_info pi[num];
1856 union seg_info si[num];
1858 uint8_t t[TLE_VNUM];
1863 st = CTX_TCP_STLB(dev->ctx);
1867 /* extract packet info and check the L3/L4 csums */
1868 for (i = 0; i != num; i++) {
1870 get_pkt_info(pkt[i], &pi[i], &si[i]);
1873 csf = dev->rx.ol_flags[t] &
1874 (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
1876 /* check csums in SW */
1877 if (pi[i].csf == 0 && csf != 0 && check_pkt_csum(pkt[i], csf,
1878 pi[i].tf.type, IPPROTO_TCP) != 0)
1884 if (stu.t[TLE_V4] != 0)
1885 stbl_lock(st, TLE_V4);
1886 if (stu.t[TLE_V6] != 0)
1887 stbl_lock(st, TLE_V6);
1890 for (i = 0; i != num; i += j) {
1894 /*basic checks for incoming packet */
1895 if (t >= TLE_VNUM || pi[i].csf != 0 || dev->dp[t] == NULL) {
1900 /* process input SYN packets */
1901 } else if (pi[i].tf.flags == TCP_FLAG_SYN) {
1902 j = pkt_info_bulk_syneq(pi + i, num - i);
1903 n = rx_syn(dev, t, ts, pi + i, si + i, pkt + i,
1907 j = pkt_info_bulk_eq(pi + i, num - i);
1908 n = rx_postsyn(dev, st, t, ts, pi + i, si + i, pkt + i,
1914 if (stu.t[TLE_V4] != 0)
1915 stbl_unlock(st, TLE_V4);
1916 if (stu.t[TLE_V6] != 0)
1917 stbl_unlock(st, TLE_V6);
1923 tle_tcp_stream_accept(struct tle_stream *ts, struct tle_stream *rs[],
1927 struct tle_tcp_stream *s;
1930 n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)rs, num);
1935 * if we still have packets to read,
1936 * then rearm stream RX event.
1938 if (n == num && rte_ring_count(s->rx.q) != 0) {
1939 if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
1940 tle_event_raise(s->rx.ev);
1941 rwl_release(&s->rx.use);
1948 tle_tcp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
1950 uint32_t i, j, k, n;
1951 struct tle_drb *drb[num];
1952 struct tle_tcp_stream *s;
1954 /* extract packets from device TX queue. */
1957 n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
1963 /* free empty drbs and notify related streams. */
1965 for (i = 0; i != k; i = j) {
1967 for (j = i + 1; j != k && s == drb[j]->udata; j++)
1969 stream_drb_free(s, drb + i, j - i);
1976 stream_fill_pkt_info(const struct tle_tcp_stream *s, union pkt_info *pi)
1978 if (s->s.type == TLE_V4)
1979 pi->addr4 = s->s.ipv4.addr;
1981 pi->addr6 = &s->s.ipv6.addr;
1983 pi->port = s->s.port;
1984 pi->tf.type = s->s.type;
1988 stream_fill_addr(struct tle_tcp_stream *s, const struct sockaddr *addr)
1990 const struct sockaddr_in *in4;
1991 const struct sockaddr_in6 *in6;
1992 const struct tle_dev_param *prm;
1996 s->s.pmsk.raw = UINT32_MAX;
1998 /* setup L4 src ports and src address fields. */
1999 if (s->s.type == TLE_V4) {
2000 in4 = (const struct sockaddr_in *)addr;
2001 if (in4->sin_addr.s_addr == INADDR_ANY || in4->sin_port == 0)
2004 s->s.port.src = in4->sin_port;
2005 s->s.ipv4.addr.src = in4->sin_addr.s_addr;
2006 s->s.ipv4.mask.src = INADDR_NONE;
2007 s->s.ipv4.mask.dst = INADDR_NONE;
2009 } else if (s->s.type == TLE_V6) {
2010 in6 = (const struct sockaddr_in6 *)addr;
2011 if (memcmp(&in6->sin6_addr, &tle_ipv6_any,
2012 sizeof(tle_ipv6_any)) == 0 ||
2013 in6->sin6_port == 0)
2016 s->s.port.src = in6->sin6_port;
2017 rte_memcpy(&s->s.ipv6.addr.src, &in6->sin6_addr,
2018 sizeof(s->s.ipv6.addr.src));
2019 rte_memcpy(&s->s.ipv6.mask.src, &tle_ipv6_none,
2020 sizeof(s->s.ipv6.mask.src));
2021 rte_memcpy(&s->s.ipv6.mask.dst, &tle_ipv6_none,
2022 sizeof(s->s.ipv6.mask.dst));
2025 /* setup the destination device. */
2026 rc = stream_fill_dest(s);
2030 /* setup L4 dst address from device param */
2031 prm = &s->tx.dst.dev->prm;
2032 if (s->s.type == TLE_V4) {
2033 if (s->s.ipv4.addr.dst == INADDR_ANY)
2034 s->s.ipv4.addr.dst = prm->local_addr4.s_addr;
2035 } else if (memcmp(&s->s.ipv6.addr.dst, &tle_ipv6_any,
2036 sizeof(tle_ipv6_any)) == 0)
2037 memcpy(&s->s.ipv6.addr.dst, &prm->local_addr6,
2038 sizeof(s->s.ipv6.addr.dst));
2044 tx_syn(struct tle_tcp_stream *s, const struct sockaddr *addr)
2050 struct stbl_entry *se;
2052 /* fill stream address */
2053 rc = stream_fill_addr(s, addr);
2057 /* fill pkt info to generate seq.*/
2058 stream_fill_pkt_info(s, &pi);
2060 tms = tcp_get_tms();
2061 s->tcb.so.ts.val = tms;
2062 s->tcb.so.ts.ecr = 0;
2063 s->tcb.so.wscale = TCP_WSCALE_DEFAULT;
2064 s->tcb.so.mss = calc_smss(s->tx.dst.mtu, &s->tx.dst);
2066 /* note that rcv.nxt is 0 here for sync_gen_seq.*/
2067 seq = sync_gen_seq(&pi, s->tcb.rcv.nxt, tms, s->tcb.so.mss,
2068 s->s.ctx->prm.hash_alg,
2069 &s->s.ctx->prm.secret_key);
2070 s->tcb.snd.iss = seq;
2071 s->tcb.snd.rcvr = seq;
2072 s->tcb.snd.una = seq;
2073 s->tcb.snd.nxt = seq + 1;
2074 s->tcb.snd.rto = TCP_RTO_DEFAULT;
2075 s->tcb.snd.ts = tms;
2077 s->tcb.rcv.mss = s->tcb.so.mss;
2078 s->tcb.rcv.wscale = TCP_WSCALE_DEFAULT;
2079 s->tcb.rcv.wnd = calc_rx_wnd(s, s->tcb.rcv.wscale);
2082 /* add the stream in stream table */
2083 st = CTX_TCP_STLB(s->s.ctx);
2084 se = stbl_add_stream_lock(st, s);
2089 /* put stream into the to-send queue */
2090 txs_enqueue(s->s.ctx, s);
2096 tle_tcp_stream_connect(struct tle_stream *ts, const struct sockaddr *addr)
2098 struct tle_tcp_stream *s;
2102 if (ts == NULL || addr == NULL)
2107 if (type >= TLE_VNUM)
2110 if (rwl_try_acquire(&s->tx.use) > 0) {
2111 rc = rte_atomic16_cmpset(&s->tcb.state, TCP_ST_CLOSED,
2113 rc = (rc == 0) ? -EDEADLK : 0;
2118 rwl_release(&s->tx.use);
2122 /* fill stream, prepare and transmit syn pkt */
2123 s->tcb.uop |= TCP_OP_CONNECT;
2124 rc = tx_syn(s, addr);
2125 rwl_release(&s->tx.use);
2127 /* error happened, do a cleanup */
2129 tle_tcp_stream_close(ts);
2135 tle_tcp_stream_recv(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2138 struct tle_tcp_stream *s;
2141 n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
2146 * if we still have packets to read,
2147 * then rearm stream RX event.
2149 if (n == num && rte_ring_count(s->rx.q) != 0) {
2150 if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
2151 tle_event_raise(s->rx.ev);
2152 rwl_release(&s->rx.use);
2158 static inline int32_t
2159 tx_segments(struct tle_tcp_stream *s, uint64_t ol_flags,
2160 struct rte_mbuf *segs[], uint32_t num)
2165 for (i = 0; i != num; i++) {
2166 /* Build L2/L3/L4 header */
2167 rc = tcp_fill_mbuf(segs[i], s, &s->tx.dst, ol_flags, s->s.port,
2168 0, TCP_FLAG_ACK, 0, 0);
2170 free_segments(segs, num);
2176 /* queue packets for further transmission. */
2177 rc = _rte_ring_mp_enqueue_bulk(s->tx.q, (void **)segs, num);
2179 free_segments(segs, num);
2186 tle_tcp_stream_send(struct tle_stream *ts, struct rte_mbuf *pkt[], uint16_t num)
2188 uint32_t i, j, k, mss, n, state, type;
2191 struct tle_tcp_stream *s;
2192 struct tle_dev *dev;
2193 struct rte_mbuf *segs[TCP_MAX_PKT_SEG];
2197 /* mark stream as not closable. */
2198 if (rwl_acquire(&s->tx.use) < 0) {
2203 state = s->tcb.state;
2204 if (state != TCP_ST_ESTABLISHED && state != TCP_ST_CLOSE_WAIT) {
2205 rte_errno = ENOTCONN;
2206 rwl_release(&s->tx.use);
2210 mss = s->tcb.snd.mss;
2211 dev = s->tx.dst.dev;
2213 ol_flags = dev->tx.ol_flags[type];
2218 /* prepare and check for TX */
2219 for (i = k; i != num; i++) {
2220 if (pkt[i]->pkt_len > mss ||
2221 pkt[i]->nb_segs > TCP_MAX_PKT_SEG)
2223 rc = tcp_fill_mbuf(pkt[i], s, &s->tx.dst, ol_flags,
2224 s->s.port, 0, TCP_FLAG_ACK, 0, 0);
2230 /* queue packets for further transmission. */
2231 n = _rte_ring_mp_enqueue_burst(s->tx.q,
2232 (void **)pkt + k, (i - k));
2236 * for unsent, but already modified packets:
2237 * remove pkt l2/l3 headers, restore ol_flags
2240 ol_flags = ~dev->tx.ol_flags[type];
2241 for (j = k; j != i; j++) {
2242 rte_pktmbuf_adj(pkt[j], pkt[j]->l2_len +
2245 pkt[j]->ol_flags &= ol_flags;
2255 /* segment large packet and enqueue for sending */
2256 } else if (i != num) {
2257 /* segment the packet. */
2258 rc = tcp_segmentation(pkt[i], segs, RTE_DIM(segs),
2265 rc = tx_segments(s, dev->tx.ol_flags[type], segs, rc);
2267 /* free the large mbuf */
2268 rte_pktmbuf_free(pkt[i]);
2269 /* set the mbuf as consumed */
2272 /* no space left in tx queue */
2277 /* notify BE about more data to send */
2279 txs_enqueue(s->s.ctx, s);
2280 /* if possible, re-arm stream write event. */
2281 if (rte_ring_free_count(s->tx.q) != 0 && s->tx.ev != NULL)
2282 tle_event_raise(s->tx.ev);
2284 rwl_release(&s->tx.use);
2289 /* send data and FIN (if needed) */
2291 tx_data_fin(struct tle_tcp_stream *s, uint32_t tms, uint32_t state)
2293 /* try to send some data */
2294 tx_nxt_data(s, tms);
2296 /* we also have to send a FIN */
2297 if (state != TCP_ST_ESTABLISHED &&
2298 state != TCP_ST_CLOSE_WAIT &&
2299 tcp_txq_nxt_cnt(s) == 0 &&
2300 s->tcb.snd.fss != s->tcb.snd.nxt) {
2301 s->tcb.snd.fss = ++s->tcb.snd.nxt;
2302 send_ack(s, tms, TCP_FLAG_FIN | TCP_FLAG_ACK);
2307 tx_stream(struct tle_tcp_stream *s, uint32_t tms)
2311 state = s->tcb.state;
2313 if (state == TCP_ST_SYN_SENT) {
2314 /* send the SYN, start the rto timer */
2315 send_ack(s, tms, TCP_FLAG_SYN);
2318 } else if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2320 tx_data_fin(s, tms, state);
2322 /* start RTO timer. */
2323 if (s->tcb.snd.nxt != s->tcb.snd.una)
2329 rto_stream(struct tle_tcp_stream *s, uint32_t tms)
2333 state = s->tcb.state;
2335 TCP_LOG(DEBUG, "%s(%p, tms=%u): state=%u, "
2336 "retx=%u, retm=%u, "
2337 "rto=%u, snd.ts=%u, tmo=%u, "
2338 "snd.nxt=%lu, snd.una=%lu, flight_size=%lu, "
2339 "snd.rcvr=%lu, snd.fastack=%u, "
2340 "wnd=%u, cwnd=%u, ssthresh=%u, "
2341 "bytes sent=%lu, pkt remain=%u;\n",
2342 __func__, s, tms, s->tcb.state,
2343 s->tcb.snd.nb_retx, s->tcb.snd.nb_retm,
2344 s->tcb.snd.rto, s->tcb.snd.ts, tms - s->tcb.snd.ts,
2345 s->tcb.snd.nxt, s->tcb.snd.una, s->tcb.snd.nxt - s->tcb.snd.una,
2346 s->tcb.snd.rcvr, s->tcb.snd.fastack,
2347 s->tcb.snd.wnd, s->tcb.snd.cwnd, s->tcb.snd.ssthresh,
2348 s->tcb.snd.nxt - s->tcb.snd.iss, tcp_txq_nxt_cnt(s));
2350 if (s->tcb.snd.nb_retx < s->tcb.snd.nb_retm) {
2352 if (state >= TCP_ST_ESTABLISHED && state <= TCP_ST_LAST_ACK) {
2354 /* update SND.CWD and SND.SSTHRESH */
2355 rto_cwnd_update(&s->tcb);
2357 /* RFC 6582 3.2.4 */
2358 s->tcb.snd.rcvr = s->tcb.snd.nxt;
2359 s->tcb.snd.fastack = 0;
2361 /* restart from last acked data */
2362 tcp_txq_rst_nxt_head(s);
2363 s->tcb.snd.nxt = s->tcb.snd.una;
2365 tx_data_fin(s, tms, state);
2367 } else if (state == TCP_ST_SYN_SENT) {
2369 s->tcb.so.ts.val = tms;
2370 send_ack(s, tms, TCP_FLAG_SYN);
2372 } else if (state == TCP_ST_TIME_WAIT) {
2376 /* RFC6298:5.5 back off the timer */
2377 s->tcb.snd.rto = rto_roundup(2 * s->tcb.snd.rto);
2378 s->tcb.snd.nb_retx++;
2382 send_rst(s, s->tcb.snd.una);
2388 tle_tcp_process(struct tle_ctx *ctx, uint32_t num)
2392 struct tle_timer_wheel *tw;
2393 struct tle_stream *p;
2394 struct tle_tcp_stream *s, *rs[num];
2396 /* process streams with RTO exipred */
2398 tw = CTX_TCP_TMWHL(ctx);
2399 tms = tcp_get_tms();
2400 tle_timer_expire(tw, tms);
2402 k = tle_timer_get_expired_bulk(tw, (void **)rs, RTE_DIM(rs));
2404 for (i = 0; i != k; i++) {
2407 s->timer.handle = NULL;
2408 if (rwl_try_acquire(&s->tx.use) > 0)
2410 rwl_release(&s->tx.use);
2413 /* process streams from to-send queue */
2415 k = txs_dequeue_bulk(ctx, rs, RTE_DIM(rs));
2417 for (i = 0; i != k; i++) {
2420 if (rwl_try_acquire(&s->tx.use) > 0 &&
2421 rte_atomic32_read(&s->tx.arm) > 0) {
2422 rte_atomic32_set(&s->tx.arm, 0);
2425 rwl_release(&s->tx.use);
2428 /* collect streams to close from the death row */
2430 dr = CTX_TCP_SDR(ctx);
2431 for (k = 0, p = STAILQ_FIRST(&dr->be);
2432 k != num && p != NULL;
2433 k++, p = STAILQ_NEXT(p, link))
2434 rs[k] = TCP_STREAM(p);
2437 STAILQ_INIT(&dr->be);
2439 STAILQ_FIRST(&dr->be) = p;
2441 /* cleanup closed streams */
2442 for (i = 0; i != k; i++) {
2445 tcp_stream_reset(ctx, s);