Revert "l4p/tcp: introduce tle_tcp_stream_establish() API"
[tldk.git] / lib / libtle_l4p / udp_stream.c
1 /*
2  * Copyright (c) 2016  Intel Corporation.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <string.h>
17 #include <rte_malloc.h>
18 #include <rte_errno.h>
19 #include <rte_ethdev.h>
20 #include <rte_ip.h>
21 #include <rte_udp.h>
22
23 #include "udp_stream.h"
24 #include "misc.h"
25
26 static void
27 unuse_stream(struct tle_udp_stream *s)
28 {
29         s->s.type = TLE_VNUM;
30         rte_atomic32_set(&s->rx.use, INT32_MIN);
31         rte_atomic32_set(&s->tx.use, INT32_MIN);
32 }
33
34 static void
35 fini_stream(struct tle_udp_stream *s)
36 {
37         if (s != NULL) {
38                 rte_free(s->rx.q);
39                 rte_free(s->tx.drb.r);
40         }
41 }
42
43 static void
44 udp_fini_streams(struct tle_ctx *ctx)
45 {
46         uint32_t i;
47         struct tle_udp_stream *s;
48
49         s = ctx->streams.buf;
50         if (s != NULL) {
51                 for (i = 0; i != ctx->prm.max_streams; i++)
52                         fini_stream(s + i);
53         }
54
55         rte_free(s);
56         ctx->streams.buf = NULL;
57         STAILQ_INIT(&ctx->streams.free);
58 }
59
60 static int
61 init_stream(struct tle_ctx *ctx, struct tle_udp_stream *s)
62 {
63         size_t bsz, rsz, sz;
64         uint32_t i, k, n, nb;
65         struct tle_drb *drb;
66         char name[RTE_RING_NAMESIZE];
67
68         /* init RX part. */
69
70         n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
71         n = rte_align32pow2(n);
72         sz = rte_ring_get_memsize(n);
73
74         s->rx.q = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
75                 ctx->prm.socket_id);
76         if (s->rx.q == NULL) {
77                 UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
78                         "failed with error code: %d\n",
79                         __func__, s, sz, ctx->prm.socket_id, rte_errno);
80                 return -ENOMEM;
81         }
82
83         snprintf(name, sizeof(name), "%p@%zu", s, sz);
84         rte_ring_init(s->rx.q, name, n, RING_F_SP_ENQ);
85
86         /* init TX part. */
87
88         nb = drb_nb_elem(ctx);
89         k = calc_stream_drb_num(ctx, nb);
90         n = rte_align32pow2(k);
91
92         /* size of the drbs ring */
93         rsz = rte_ring_get_memsize(n);
94         rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE);
95
96         /* size of the drb. */
97         bsz = tle_drb_calc_size(nb);
98
99         /* total stream drbs size. */
100         sz = rsz + bsz * k;
101
102         s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
103                 ctx->prm.socket_id);
104         if (s->tx.drb.r == NULL) {
105                 UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
106                         "failed with error code: %d\n",
107                         __func__, s, sz, ctx->prm.socket_id, rte_errno);
108                 return -ENOMEM;
109         }
110
111         snprintf(name, sizeof(name), "%p@%zu", s, sz);
112         rte_ring_init(s->tx.drb.r, name, n, 0);
113
114         for (i = 0; i != k; i++) {
115                 drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
116                         rsz + bsz * i);
117                 drb->udata = s;
118                 drb->size = nb;
119                 rte_ring_enqueue(s->tx.drb.r, drb);
120         }
121
122         s->tx.drb.nb_elem = nb;
123         s->tx.drb.nb_max = k;
124
125         /* mark stream as avaialble to use. */
126
127         s->s.ctx = ctx;
128         unuse_stream(s);
129         STAILQ_INSERT_TAIL(&ctx->streams.free, &s->s, link);
130
131         return 0;
132 }
133
134 static void
135 udp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb)
136 {
137         struct tle_udp_stream *us;
138
139         us = (struct tle_udp_stream *)s;
140         _rte_ring_enqueue_burst(us->tx.drb.r, (void **)drb, nb_drb);
141 }
142
143 static int
144 udp_init_streams(struct tle_ctx *ctx)
145 {
146         size_t sz;
147         uint32_t i;
148         int32_t rc;
149         struct tle_udp_stream *s;
150
151         sz = sizeof(*s) * ctx->prm.max_streams;
152         s = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
153                 ctx->prm.socket_id);
154         if (s == NULL) {
155                 UDP_LOG(ERR, "allocation of %zu bytes on socket %d "
156                         "for %u udp_streams failed\n",
157                         sz, ctx->prm.socket_id, ctx->prm.max_streams);
158                 return -ENOMEM;
159         }
160
161         ctx->streams.buf = s;
162         STAILQ_INIT(&ctx->streams.free);
163
164         for (i = 0; i != ctx->prm.max_streams; i++) {
165                 rc = init_stream(ctx, s + i);
166                 if (rc != 0) {
167                         UDP_LOG(ERR, "initalisation of %u-th stream failed", i);
168                         udp_fini_streams(ctx);
169                         return rc;
170                 }
171         }
172
173         ctx->streams.nb_free = ctx->prm.max_streams;
174         return 0;
175 }
176
177 static void __attribute__((constructor))
178 udp_stream_setup(void)
179 {
180         static const struct stream_ops udp_ops = {
181                 .init_streams = udp_init_streams,
182                 .fini_streams = udp_fini_streams,
183                 .free_drbs = udp_free_drbs,
184         };
185
186         tle_stream_ops[TLE_PROTO_UDP] = udp_ops;
187 }
188
189 static inline void
190 stream_down(struct tle_udp_stream *s)
191 {
192         rwl_down(&s->rx.use);
193         rwl_down(&s->tx.use);
194 }
195
196 static inline void
197 stream_up(struct tle_udp_stream *s)
198 {
199         rwl_up(&s->rx.use);
200         rwl_up(&s->tx.use);
201 }
202
203 static int
204 check_stream_prm(const struct tle_ctx *ctx,
205         const struct tle_udp_stream_param *prm)
206 {
207         if ((prm->local_addr.ss_family != AF_INET &&
208                         prm->local_addr.ss_family != AF_INET6) ||
209                         prm->local_addr.ss_family != prm->remote_addr.ss_family)
210                 return -EINVAL;
211
212         /* callback and event notifications mechanisms are mutually exclusive */
213         if ((prm->recv_ev != NULL && prm->recv_cb.func != NULL) ||
214                         (prm->send_ev != NULL && prm->send_cb.func != NULL))
215                 return -EINVAL;
216
217         /* check does context support desired address family. */
218         if ((prm->local_addr.ss_family == AF_INET &&
219                         ctx->prm.lookup4 == NULL) ||
220                         (prm->local_addr.ss_family == AF_INET6 &&
221                         ctx->prm.lookup6 == NULL))
222                 return -EINVAL;
223
224         return 0;
225 }
226
227 struct tle_stream *
228 tle_udp_stream_open(struct tle_ctx *ctx,
229         const struct tle_udp_stream_param *prm)
230 {
231         struct tle_udp_stream *s;
232         int32_t rc;
233
234         if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) {
235                 rte_errno = EINVAL;
236                 return NULL;
237         }
238
239         s = (struct tle_udp_stream *)get_stream(ctx);
240         if (s == NULL)  {
241                 rte_errno = ENFILE;
242                 return NULL;
243
244         /* some TX still pending for that stream. */
245         } else if (UDP_STREAM_TX_PENDING(s)) {
246                 put_stream(ctx, &s->s, 0);
247                 rte_errno = EAGAIN;
248                 return NULL;
249         }
250
251         /* copy input parameters. */
252         s->prm = *prm;
253
254         /* setup L4 ports and L3 addresses fields. */
255         rc = stream_fill_ctx(ctx, &s->s,
256                 (const struct sockaddr *)&prm->local_addr,
257                 (const struct sockaddr *)&prm->remote_addr);
258
259         if (rc != 0) {
260                 put_stream(ctx, &s->s, 1);
261                 s = NULL;
262                 rte_errno = rc;
263         } else {
264                 /* setup stream notification menchanism */
265                 s->rx.ev = prm->recv_ev;
266                 s->rx.cb = prm->recv_cb;
267                 s->tx.ev = prm->send_ev;
268                 s->tx.cb = prm->send_cb;
269
270                 /* mark stream as avaialbe for RX/TX */
271                 if (s->tx.ev != NULL)
272                         tle_event_raise(s->tx.ev);
273                 stream_up(s);
274         }
275
276         return &s->s;
277 }
278
279 int
280 tle_udp_stream_close(struct tle_stream *us)
281 {
282         int32_t rc;
283         struct tle_ctx *ctx;
284         struct tle_udp_stream *s;
285
286         static const struct tle_stream_cb zcb;
287
288         s = UDP_STREAM(us);
289         if (us == NULL || s->s.type >= TLE_VNUM)
290                 return -EINVAL;
291
292         ctx = s->s.ctx;
293
294         /* mark stream as unavaialbe for RX/TX. */
295         stream_down(s);
296
297         /* reset stream events if any. */
298         if (s->rx.ev != NULL) {
299                 tle_event_idle(s->rx.ev);
300                 s->rx.ev = NULL;
301         }
302         if (s->tx.ev != NULL) {
303                 tle_event_idle(s->tx.ev);
304                 s->tx.ev = NULL;
305         }
306
307         s->rx.cb = zcb;
308         s->tx.cb = zcb;
309
310         /* free stream's destination port */
311         rc = stream_clear_ctx(ctx, &s->s);
312
313         /* empty stream's RX queue */
314         empty_mbuf_ring(s->rx.q);
315
316         /*
317          * mark the stream as free again.
318          * if there still are pkts queued for TX,
319          * then put this stream to the tail of free list.
320          */
321         put_stream(ctx, &s->s, UDP_STREAM_TX_FINISHED(s));
322         return rc;
323 }
324
325 int
326 tle_udp_stream_get_param(const struct tle_stream *us,
327         struct tle_udp_stream_param *prm)
328 {
329         struct sockaddr_in *lin4;
330         struct sockaddr_in6 *lin6;
331         const struct tle_udp_stream *s;
332
333         s = UDP_STREAM(us);
334         if (prm == NULL || us == NULL || s->s.type >= TLE_VNUM)
335                 return -EINVAL;
336
337         prm[0] = s->prm;
338         if (prm->local_addr.ss_family == AF_INET) {
339                 lin4 = (struct sockaddr_in *)&prm->local_addr;
340                 lin4->sin_port = s->s.port.dst;
341         } else if (s->prm.local_addr.ss_family == AF_INET6) {
342                 lin6 = (struct sockaddr_in6 *)&prm->local_addr;
343                 lin6->sin6_port = s->s.port.dst;
344         }
345
346         return 0;
347 }