2 * Copyright (c) 2016 Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
17 #include <rte_malloc.h>
18 #include <rte_errno.h>
19 #include <rte_ethdev.h>
23 #include "udp_stream.h"
27 unuse_stream(struct tle_udp_stream *s)
30 rte_atomic32_set(&s->rx.use, INT32_MIN);
31 rte_atomic32_set(&s->tx.use, INT32_MIN);
35 fini_stream(struct tle_udp_stream *s)
39 rte_free(s->tx.drb.r);
44 udp_fini_streams(struct tle_ctx *ctx)
47 struct tle_udp_stream *s;
51 for (i = 0; i != ctx->prm.max_streams; i++)
56 ctx->streams.buf = NULL;
57 STAILQ_INIT(&ctx->streams.free);
61 init_stream(struct tle_ctx *ctx, struct tle_udp_stream *s)
66 char name[RTE_RING_NAMESIZE];
70 n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
71 n = rte_align32pow2(n);
72 sz = rte_ring_get_memsize(n);
74 s->rx.q = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
76 if (s->rx.q == NULL) {
77 UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
78 "failed with error code: %d\n",
79 __func__, s, sz, ctx->prm.socket_id, rte_errno);
83 snprintf(name, sizeof(name), "%p@%zu", s, sz);
84 rte_ring_init(s->rx.q, name, n, RING_F_SP_ENQ);
88 nb = drb_nb_elem(ctx);
89 k = calc_stream_drb_num(ctx, nb);
90 n = rte_align32pow2(k);
92 /* size of the drbs ring */
93 rsz = rte_ring_get_memsize(n);
94 rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE);
96 /* size of the drb. */
97 bsz = tle_drb_calc_size(nb);
99 /* total stream drbs size. */
102 s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
104 if (s->tx.drb.r == NULL) {
105 UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
106 "failed with error code: %d\n",
107 __func__, s, sz, ctx->prm.socket_id, rte_errno);
111 snprintf(name, sizeof(name), "%p@%zu", s, sz);
112 rte_ring_init(s->tx.drb.r, name, n, 0);
114 for (i = 0; i != k; i++) {
115 drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
119 rte_ring_enqueue(s->tx.drb.r, drb);
122 s->tx.drb.nb_elem = nb;
123 s->tx.drb.nb_max = k;
125 /* mark stream as avaialble to use. */
129 STAILQ_INSERT_TAIL(&ctx->streams.free, &s->s, link);
135 udp_free_drbs(struct tle_stream *s, struct tle_drb *drb[], uint32_t nb_drb)
137 struct tle_udp_stream *us;
139 us = (struct tle_udp_stream *)s;
140 _rte_ring_enqueue_burst(us->tx.drb.r, (void **)drb, nb_drb);
144 udp_init_streams(struct tle_ctx *ctx)
149 struct tle_udp_stream *s;
151 sz = sizeof(*s) * ctx->prm.max_streams;
152 s = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
155 UDP_LOG(ERR, "allocation of %zu bytes on socket %d "
156 "for %u udp_streams failed\n",
157 sz, ctx->prm.socket_id, ctx->prm.max_streams);
161 ctx->streams.buf = s;
162 STAILQ_INIT(&ctx->streams.free);
164 for (i = 0; i != ctx->prm.max_streams; i++) {
165 rc = init_stream(ctx, s + i);
167 UDP_LOG(ERR, "initalisation of %u-th stream failed", i);
168 udp_fini_streams(ctx);
173 ctx->streams.nb_free = ctx->prm.max_streams;
177 static void __attribute__((constructor))
178 udp_stream_setup(void)
180 static const struct stream_ops udp_ops = {
181 .init_streams = udp_init_streams,
182 .fini_streams = udp_fini_streams,
183 .free_drbs = udp_free_drbs,
186 tle_stream_ops[TLE_PROTO_UDP] = udp_ops;
190 stream_down(struct tle_udp_stream *s)
192 rwl_down(&s->rx.use);
193 rwl_down(&s->tx.use);
197 stream_up(struct tle_udp_stream *s)
204 check_stream_prm(const struct tle_ctx *ctx,
205 const struct tle_udp_stream_param *prm)
207 if ((prm->local_addr.ss_family != AF_INET &&
208 prm->local_addr.ss_family != AF_INET6) ||
209 prm->local_addr.ss_family != prm->remote_addr.ss_family)
212 /* callback and event notifications mechanisms are mutually exclusive */
213 if ((prm->recv_ev != NULL && prm->recv_cb.func != NULL) ||
214 (prm->send_ev != NULL && prm->send_cb.func != NULL))
217 /* check does context support desired address family. */
218 if ((prm->local_addr.ss_family == AF_INET &&
219 ctx->prm.lookup4 == NULL) ||
220 (prm->local_addr.ss_family == AF_INET6 &&
221 ctx->prm.lookup6 == NULL))
228 tle_udp_stream_open(struct tle_ctx *ctx,
229 const struct tle_udp_stream_param *prm)
231 struct tle_udp_stream *s;
234 if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) {
239 s = (struct tle_udp_stream *)get_stream(ctx);
244 /* some TX still pending for that stream. */
245 } else if (UDP_STREAM_TX_PENDING(s)) {
246 put_stream(ctx, &s->s, 0);
251 /* copy input parameters. */
254 /* setup L4 ports and L3 addresses fields. */
255 rc = stream_fill_ctx(ctx, &s->s,
256 (const struct sockaddr *)&prm->local_addr,
257 (const struct sockaddr *)&prm->remote_addr);
260 put_stream(ctx, &s->s, 1);
264 /* setup stream notification menchanism */
265 s->rx.ev = prm->recv_ev;
266 s->rx.cb = prm->recv_cb;
267 s->tx.ev = prm->send_ev;
268 s->tx.cb = prm->send_cb;
270 /* mark stream as avaialbe for RX/TX */
271 if (s->tx.ev != NULL)
272 tle_event_raise(s->tx.ev);
280 tle_udp_stream_close(struct tle_stream *us)
284 struct tle_udp_stream *s;
286 static const struct tle_stream_cb zcb;
289 if (us == NULL || s->s.type >= TLE_VNUM)
294 /* mark stream as unavaialbe for RX/TX. */
297 /* reset stream events if any. */
298 if (s->rx.ev != NULL) {
299 tle_event_idle(s->rx.ev);
302 if (s->tx.ev != NULL) {
303 tle_event_idle(s->tx.ev);
310 /* free stream's destination port */
311 rc = stream_clear_ctx(ctx, &s->s);
313 /* empty stream's RX queue */
314 empty_mbuf_ring(s->rx.q);
317 * mark the stream as free again.
318 * if there still are pkts queued for TX,
319 * then put this stream to the tail of free list.
321 put_stream(ctx, &s->s, UDP_STREAM_TX_FINISHED(s));
326 tle_udp_stream_get_param(const struct tle_stream *us,
327 struct tle_udp_stream_param *prm)
329 struct sockaddr_in *lin4;
330 struct sockaddr_in6 *lin6;
331 const struct tle_udp_stream *s;
334 if (prm == NULL || us == NULL || s->s.type >= TLE_VNUM)
338 if (prm->local_addr.ss_family == AF_INET) {
339 lin4 = (struct sockaddr_in *)&prm->local_addr;
340 lin4->sin_port = s->s.port.dst;
341 } else if (s->prm.local_addr.ss_family == AF_INET6) {
342 lin6 = (struct sockaddr_in6 *)&prm->local_addr;
343 lin6->sin6_port = s->s.port.dst;