- Introduce tle_tcp_stream_readv() and tle_tcp_stream_writev().
[tldk.git] / lib / libtle_l4p / tcp_rxq.h
1 /*
2  * Copyright (c) 2016-2017  Intel Corporation.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #ifndef _TCP_RXQ_H_
17 #define _TCP_RXQ_H_
18
19 #include "tcp_ofo.h"
20
21 #ifdef __cplusplus
22 extern "C" {
23 #endif
24
25 struct rxq_objs {
26         struct rte_mbuf **mb;
27         uint32_t num;
28 };
29
30 static inline uint32_t
31 rx_ofo_enqueue(struct tle_tcp_stream *s, union seqlen *sl,
32         struct rte_mbuf *mb[], uint32_t num)
33 {
34         uint32_t i, n;
35
36         n = 0;
37         do {
38                 i = _ofo_step(s->rx.ofo, sl, mb + n, num - n);
39                 n += i;
40         } while (i != 0 && n != num);
41
42         _ofo_compact(s->rx.ofo);
43         return n;
44 }
45
46 static inline uint32_t
47 rx_ofo_reduce(struct tle_tcp_stream *s)
48 {
49         uint32_t i, n, end, seq;
50         struct ofo *ofo;
51         struct ofodb *db;
52         union seqlen sl;
53
54         seq = s->tcb.rcv.nxt;
55         ofo = s->rx.ofo;
56
57         n = 0;
58         for (i = 0; i != ofo->nb_elem; i++) {
59
60                 db = ofo->db + i;
61
62                 /* gap still present */
63                 if (tcp_seq_lt(seq, db->sl.seq))
64                         break;
65
66                 end = db->sl.seq + db->sl.len;
67
68                 /* this db is fully overlapped */
69                 if (tcp_seq_leq(end, seq))
70                         _ofodb_free(db);
71                 else
72                         n += _ofodb_enqueue(s->rx.q, db, &sl);
73
74                 seq = sl.seq + sl.len;
75         }
76
77         s->tcb.rcv.nxt = seq;
78         _ofo_remove(ofo, 0, i);
79         return n;
80 }
81
82 static inline uint32_t
83 rx_ino_enqueue(struct tle_tcp_stream *s, union seqlen *sl,
84         struct rte_mbuf *mb[], uint32_t num)
85 {
86         uint32_t i, n;
87
88         n = _rte_ring_enqueue_burst(s->rx.q, (void * const *)mb, num);
89
90         /* error: can'queue some packets into receive buffer. */
91         for (i = n; i != num; i++)
92                 sl->len -= mb[i]->pkt_len;
93
94         s->tcb.rcv.nxt = sl->seq + sl->len;
95         return n;
96 }
97
98 static inline uint32_t
99 rx_data_enqueue(struct tle_tcp_stream *s, uint32_t seq, uint32_t len,
100         struct rte_mbuf *mb[], uint32_t num)
101 {
102         uint32_t n, r, t;
103         union seqlen sl;
104
105         sl.seq = seq;
106         sl.len = len;
107
108         r = rte_ring_count(s->rx.q);
109
110         /* in order packets, ready to be delivered */
111         if (seq == s->tcb.rcv.nxt) {
112
113                 t = rx_ino_enqueue(s, &sl, mb, num);
114
115                 /* failed to queue all input in-order packets */
116                 if (t != num)
117                         TCP_LOG(DEBUG,
118                         "%s(s=%p, seq=%u, len=%u, num=%u) failed to queue "
119                         "%u packets;\n",
120                         __func__, s, seq, len, num, num - t);
121
122                 /* try to consume some out-of-order packets*/
123                 else {
124                         n = rx_ofo_reduce(s);
125                         if (n != 0)
126                                 TCP_LOG(DEBUG,
127                                 "%s(s=%p, rcv.nxt=%u) failed to queue %u "
128                                 "OFO packets;\n",
129                                 __func__, s, s->tcb.rcv.nxt, n);
130                 }
131
132         /* queue out of order packets */
133         } else {
134                 t = rx_ofo_enqueue(s, &sl, mb, num);
135         }
136
137         n = rte_ring_count(s->rx.q);
138         if (r != n) {
139                 /* raise RX event */
140                 if (s->rx.ev != NULL)
141                         tle_event_raise(s->rx.ev);
142                 /* if RX queue was empty invoke RX notification callback. */
143                 else if (s->rx.cb.func != NULL && r == 0)
144                         s->rx.cb.func(s->rx.cb.data, &s->s);
145         }
146
147         return t;
148 }
149
150 static inline uint32_t
151 tcp_rxq_get_objs(struct tle_tcp_stream *s, struct rxq_objs obj[2])
152 {
153         struct rte_ring *r;
154         uint32_t n, head, sz;
155
156         r = s->rx.q;
157
158         n = _rte_ring_mcs_dequeue_start(r, UINT32_MAX);
159         if (n == 0)
160                 return 0;
161
162         sz = _rte_ring_get_size(r);
163         head = (r->cons.head - n) & _rte_ring_get_mask(r);
164
165         obj[0].mb = (struct rte_mbuf **)(_rte_ring_get_data(r) + head);
166         obj[1].mb = (struct rte_mbuf **)_rte_ring_get_data(r);
167
168         if (head + n <= sz) {
169                 obj[0].num = n;
170                 obj[1].num = 0;
171                 return 1;
172         } else {
173                 obj[0].num = sz - head;
174                 obj[1].num = n + head - sz;
175                 return 2;
176         }
177 }
178
179 static inline void
180 tcp_rxq_consume(struct tle_tcp_stream *s, uint32_t num)
181 {
182         _rte_ring_mcs_dequeue_finish(s->rx.q, num);
183 }
184
185 #ifdef __cplusplus
186 }
187 #endif
188
189 #endif /* _TCP_RXQ_H_ */