2 * Copyright (c) 2016-2017 Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
30 static inline uint32_t
31 rx_ofo_enqueue(struct tle_tcp_stream *s, union seqlen *sl,
32 struct rte_mbuf *mb[], uint32_t num)
38 i = _ofo_step(s->rx.ofo, sl, mb + n, num - n);
40 } while (i != 0 && n != num);
42 _ofo_compact(s->rx.ofo);
46 static inline uint32_t
47 rx_ofo_reduce(struct tle_tcp_stream *s)
49 uint32_t i, n, end, seq;
58 for (i = 0; i != ofo->nb_elem; i++) {
62 /* gap still present */
63 if (tcp_seq_lt(seq, db->sl.seq))
66 end = db->sl.seq + db->sl.len;
68 /* this db is fully overlapped */
69 if (tcp_seq_leq(end, seq))
72 n += _ofodb_enqueue(s->rx.q, db, &sl);
74 seq = sl.seq + sl.len;
78 _ofo_remove(ofo, 0, i);
82 static inline uint32_t
83 rx_ino_enqueue(struct tle_tcp_stream *s, union seqlen *sl,
84 struct rte_mbuf *mb[], uint32_t num)
88 n = _rte_ring_enqueue_burst(s->rx.q, (void * const *)mb, num);
90 /* error: can'queue some packets into receive buffer. */
91 for (i = n; i != num; i++)
92 sl->len -= mb[i]->pkt_len;
94 s->tcb.rcv.nxt = sl->seq + sl->len;
98 static inline uint32_t
99 rx_data_enqueue(struct tle_tcp_stream *s, uint32_t seq, uint32_t len,
100 struct rte_mbuf *mb[], uint32_t num)
108 r = rte_ring_count(s->rx.q);
110 /* in order packets, ready to be delivered */
111 if (seq == s->tcb.rcv.nxt) {
113 t = rx_ino_enqueue(s, &sl, mb, num);
115 /* failed to queue all input in-order packets */
118 "%s(s=%p, seq=%u, len=%u, num=%u) failed to queue "
120 __func__, s, seq, len, num, num - t);
122 /* try to consume some out-of-order packets*/
124 n = rx_ofo_reduce(s);
127 "%s(s=%p, rcv.nxt=%u) failed to queue %u "
129 __func__, s, s->tcb.rcv.nxt, n);
132 /* queue out of order packets */
134 t = rx_ofo_enqueue(s, &sl, mb, num);
137 n = rte_ring_count(s->rx.q);
140 if (s->rx.ev != NULL)
141 tle_event_raise(s->rx.ev);
142 /* if RX queue was empty invoke RX notification callback. */
143 else if (s->rx.cb.func != NULL && r == 0)
144 s->rx.cb.func(s->rx.cb.data, &s->s);
150 static inline uint32_t
151 tcp_rxq_get_objs(struct tle_tcp_stream *s, struct rxq_objs obj[2])
154 uint32_t n, head, sz;
158 n = _rte_ring_mcs_dequeue_start(r, UINT32_MAX);
162 sz = _rte_ring_get_size(r);
163 head = (r->cons.head - n) & _rte_ring_get_mask(r);
165 obj[0].mb = (struct rte_mbuf **)(_rte_ring_get_data(r) + head);
166 obj[1].mb = (struct rte_mbuf **)_rte_ring_get_data(r);
168 if (head + n <= sz) {
173 obj[0].num = sz - head;
174 obj[1].num = n + head - sz;
180 tcp_rxq_consume(struct tle_tcp_stream *s, uint32_t num)
182 _rte_ring_mcs_dequeue_finish(s->rx.q, num);
189 #endif /* _TCP_RXQ_H_ */