libtle_udp: don't allow to open stream for unsupported family
[tldk.git] / lib / libtle_udp / udp_ctl.c
1 /*
2  * Copyright (c) 2016  Intel Corporation.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <string.h>
17 #include <rte_malloc.h>
18 #include <rte_errno.h>
19 #include <rte_ethdev.h>
20 #include <rte_ip.h>
21 #include <rte_udp.h>
22
23 #include "udp_impl.h"
24 #include "misc.h"
25
26 #define LPORT_START     0x8000
27 #define LPORT_END       MAX_PORT_NUM
28
29 #define LPORT_START_BLK PORT_BLK(LPORT_START)
30 #define LPORT_END_BLK   PORT_BLK(LPORT_END)
31
32 static const struct in6_addr tle_udp6_any = IN6ADDR_ANY_INIT;
33 static const struct in6_addr tle_udp6_none = {
34         {
35                 .__u6_addr32 = {
36                         UINT32_MAX, UINT32_MAX, UINT32_MAX, UINT32_MAX
37                 },
38         },
39 };
40
41 static int
42 check_dev_prm(const struct tle_udp_dev_param *dev_prm)
43 {
44         /* no valid IPv4/IPv6 addresses provided. */
45         if (dev_prm->local_addr4.s_addr == INADDR_ANY &&
46                         memcmp(&dev_prm->local_addr6, &tle_udp6_any,
47                         sizeof(tle_udp6_any)) == 0)
48                 return -EINVAL;
49
50         /* all the ports are blocked. */
51         if (dev_prm->bl4.nb_port > UINT16_MAX ||
52                 (dev_prm->bl4.nb_port != 0 && dev_prm->bl4.port == NULL))
53                 return -EINVAL;
54
55         if (dev_prm->bl6.nb_port > UINT16_MAX ||
56                 (dev_prm->bl6.nb_port != 0 && dev_prm->bl6.port == NULL))
57                 return -EINVAL;
58
59         return 0;
60 }
61
62 static void
63 unuse_stream(struct tle_udp_stream *s)
64 {
65         s->type = TLE_UDP_VNUM;
66         rte_atomic32_set(&s->rx.use, INT32_MIN);
67         rte_atomic32_set(&s->tx.use, INT32_MIN);
68 }
69
70 /* calculate number of drbs per stream. */
71 static uint32_t
72 calc_stream_drb_num(const struct tle_udp_ctx *ctx, uint32_t obj_num)
73 {
74         uint32_t num;
75
76         num = (ctx->prm.max_stream_sbufs + obj_num - 1) / obj_num;
77         num = num + num / 2;
78         num = RTE_MAX(num, RTE_DIM(ctx->dev) + 1);
79         return num;
80 }
81
82 static uint32_t
83 drb_nb_elem(const struct tle_udp_ctx *ctx)
84 {
85         return (ctx->prm.send_bulk_size != 0) ?
86                 ctx->prm.send_bulk_size : MAX_PKT_BURST;
87 }
88
89 static int
90 init_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s)
91 {
92         size_t bsz, rsz, sz;
93         uint32_t i, k, n, nb;
94         struct tle_drb *drb;
95         char name[RTE_RING_NAMESIZE];
96
97         /* init RX part. */
98
99         n = RTE_MAX(ctx->prm.max_stream_rbufs, 1U);
100         n = rte_align32pow2(n);
101         sz = sizeof(*s->rx.q) + n * sizeof(s->rx.q->ring[0]);
102
103         s->rx.q = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
104                 ctx->prm.socket_id);
105         if (s->rx.q == NULL) {
106                 UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
107                         "failed with error code: %d\n",
108                         __func__, s, sz, ctx->prm.socket_id, rte_errno);
109                 return ENOMEM;
110         }
111
112         snprintf(name, sizeof(name), "%p@%zu", s, sz);
113         rte_ring_init(s->rx.q, name, n, RING_F_SP_ENQ);
114
115         /* init TX part. */
116
117         nb = drb_nb_elem(ctx);
118         k = calc_stream_drb_num(ctx, nb);
119         n = rte_align32pow2(k);
120
121         /* size of the drbs ring */
122         rsz = sizeof(*s->tx.drb.r) + n * sizeof(s->tx.drb.r->ring[0]);
123         rsz = RTE_ALIGN_CEIL(rsz, RTE_CACHE_LINE_SIZE);
124
125         /* size of the drb. */
126         bsz = tle_drb_calc_size(nb);
127
128         /* total stream drbs size. */
129         sz = rsz + bsz * k;
130
131         s->tx.drb.r = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
132                 ctx->prm.socket_id);
133         if (s->tx.drb.r == NULL) {
134                 UDP_LOG(ERR, "%s(%p): allocation of %zu bytes on socket %d "
135                         "failed with error code: %d\n",
136                         __func__, s, sz, ctx->prm.socket_id, rte_errno);
137                 return ENOMEM;
138         }
139
140         snprintf(name, sizeof(name), "%p@%zu", s, sz);
141         rte_ring_init(s->tx.drb.r, name, n, 0);
142
143         for (i = 0; i != k; i++) {
144                 drb = (struct tle_drb *)((uintptr_t)s->tx.drb.r +
145                         rsz + bsz * i);
146                 drb->udata = s;
147                 drb->size = nb;
148                 rte_ring_enqueue(s->tx.drb.r, drb);
149         }
150
151         s->tx.drb.nb_elem = nb;
152         s->tx.drb.nb_max = k;
153
154         /* mark stream as avaialble to use. */
155
156         s->ctx = ctx;
157         unuse_stream(s);
158         STAILQ_INSERT_TAIL(&ctx->streams.free, s, link);
159
160         return 0;
161 }
162
163 static void
164 fini_stream(struct tle_udp_stream *s)
165 {
166         rte_free(s->rx.q);
167         rte_free(s->tx.drb.r);
168 }
169
170 struct tle_udp_ctx *
171 tle_udp_create(const struct tle_udp_ctx_param *ctx_prm)
172 {
173         struct tle_udp_ctx *ctx;
174         size_t sz;
175         uint32_t i;
176
177         if (ctx_prm == NULL) {
178                 rte_errno = EINVAL;
179                 return NULL;
180         }
181
182         sz = sizeof(*ctx);
183         ctx = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
184                 ctx_prm->socket_id);
185         if (ctx == NULL) {
186                 UDP_LOG(ERR, "allocation of %zu bytes for new udp_ctx "
187                         "on socket %d failed\n",
188                         sz, ctx_prm->socket_id);
189                 return NULL;
190         }
191
192         ctx->prm = *ctx_prm;
193
194         sz = sizeof(*ctx->streams.buf) * ctx_prm->max_streams;
195         ctx->streams.buf = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
196                 ctx_prm->socket_id);
197         if (ctx->streams.buf == NULL) {
198                 UDP_LOG(ERR, "allocation of %zu bytes on socket %d "
199                         "for %u udp_streams failed\n",
200                         sz, ctx_prm->socket_id, ctx_prm->max_streams);
201                 tle_udp_destroy(ctx);
202                 return NULL;
203         }
204
205         STAILQ_INIT(&ctx->streams.free);
206         for (i = 0; i != ctx_prm->max_streams &&
207                         init_stream(ctx, &ctx->streams.buf[i]) == 0;
208                         i++)
209                 ;
210
211         if (i != ctx_prm->max_streams) {
212                 UDP_LOG(ERR, "initalisation of %u-th stream failed", i);
213                 tle_udp_destroy(ctx);
214                 return NULL;
215         }
216
217         for (i = 0; i != RTE_DIM(ctx->use); i++)
218                 udp_pbm_init(ctx->use + i, LPORT_START_BLK);
219
220         ctx->streams.nb_free = ctx->prm.max_streams;
221         return ctx;
222 }
223
224 void
225 tle_udp_destroy(struct tle_udp_ctx *ctx)
226 {
227         uint32_t i;
228
229         if (ctx == NULL) {
230                 rte_errno = EINVAL;
231                 return;
232         }
233
234         for (i = 0; i != RTE_DIM(ctx->dev); i++)
235                 tle_udp_del_dev(ctx->dev + i);
236
237         if (ctx->streams.buf != 0) {
238                 for (i = 0; i != ctx->prm.max_streams; i++)
239                         fini_stream(&ctx->streams.buf[i]);
240                 rte_free(ctx->streams.buf);
241         }
242
243         rte_free(ctx);
244 }
245
246 void
247 tle_udp_ctx_invalidate(struct tle_udp_ctx *ctx)
248 {
249         RTE_SET_USED(ctx);
250 }
251
252 static void
253 fill_pbm(struct udp_pbm *pbm, const struct tle_bl_port *blp)
254 {
255         uint32_t i;
256
257         for (i = 0; i != blp->nb_port; i++)
258                 udp_pbm_set(pbm, blp->port[i]);
259 }
260
261 static int
262 init_dev_proto(struct tle_udp_dev *dev, uint32_t idx, int32_t socket_id,
263         const struct tle_bl_port *blp)
264 {
265         size_t sz;
266
267         sz = sizeof(*dev->dp[idx]);
268         dev->dp[idx] = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
269                 socket_id);
270
271         if (dev->dp[idx] == NULL) {
272                 UDP_LOG(ERR, "allocation of %zu bytes on "
273                         "socket %d for %u-th device failed\n",
274                         sz, socket_id, idx);
275                 return ENOMEM;
276         }
277
278         udp_pbm_init(&dev->dp[idx]->use, LPORT_START_BLK);
279         fill_pbm(&dev->dp[idx]->use, blp);
280
281         return 0;
282 }
283
284 static struct tle_udp_dev *
285 find_free_dev(struct tle_udp_ctx *ctx)
286 {
287         uint32_t i;
288
289         if (ctx->nb_dev < RTE_DIM(ctx->dev)) {
290                 for (i = 0; i != RTE_DIM(ctx->dev); i++) {
291                         if (ctx->dev[i].ctx != ctx)
292                                 return ctx->dev + i;
293                 }
294         }
295
296         rte_errno = ENODEV;
297         return NULL;
298 }
299
300 struct tle_udp_dev *
301 tle_udp_add_dev(struct tle_udp_ctx *ctx,
302         const struct tle_udp_dev_param *dev_prm)
303 {
304         int32_t rc;
305         struct tle_udp_dev *dev;
306
307         if (ctx == NULL || dev_prm == NULL || check_dev_prm(dev_prm) != 0) {
308                 rte_errno = EINVAL;
309                 return NULL;
310         }
311
312         dev = find_free_dev(ctx);
313         if (dev == NULL)
314                 return NULL;
315         rc = 0;
316
317         /* device can handle IPv4 traffic */
318         if (dev_prm->local_addr4.s_addr != INADDR_ANY) {
319                 rc = init_dev_proto(dev, TLE_UDP_V4, ctx->prm.socket_id,
320                                 &dev_prm->bl4);
321                 if (rc == 0)
322                         fill_pbm(&ctx->use[TLE_UDP_V4], &dev_prm->bl4);
323         }
324
325         /* device can handle IPv6 traffic */
326         if (rc == 0 && memcmp(&dev_prm->local_addr6, &tle_udp6_any,
327                         sizeof(tle_udp6_any)) != 0) {
328                 rc = init_dev_proto(dev, TLE_UDP_V6, ctx->prm.socket_id,
329                                 &dev_prm->bl6);
330                 if (rc == 0)
331                         fill_pbm(&ctx->use[TLE_UDP_V6], &dev_prm->bl6);
332         }
333
334         if (rc != 0) {
335                 /* cleanup and return an error. */
336                 rte_free(dev->dp[TLE_UDP_V4]);
337                 rte_free(dev->dp[TLE_UDP_V6]);
338                 rte_errno = rc;
339                 return NULL;
340         }
341
342         /* setup RX data. */
343         if (dev_prm->local_addr4.s_addr != INADDR_ANY &&
344                         (dev_prm->rx_offload & DEV_RX_OFFLOAD_IPV4_CKSUM) == 0)
345                 dev->rx.ol_flags[TLE_UDP_V4] |= PKT_RX_IP_CKSUM_BAD;
346         if ((dev_prm->rx_offload & DEV_RX_OFFLOAD_UDP_CKSUM) == 0) {
347                 dev->rx.ol_flags[TLE_UDP_V4] |= PKT_RX_L4_CKSUM_BAD;
348                 dev->rx.ol_flags[TLE_UDP_V6] |= PKT_RX_L4_CKSUM_BAD;
349         }
350
351         /* setup TX data. */
352         tle_dring_reset(&dev->tx.dr);
353
354         if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_UDP_CKSUM) != 0) {
355                 dev->tx.ol_flags[TLE_UDP_V4] |= PKT_TX_IPV4 | PKT_TX_UDP_CKSUM;
356                 dev->tx.ol_flags[TLE_UDP_V6] |= PKT_TX_IPV6 | PKT_TX_UDP_CKSUM;
357         }
358         if ((dev_prm->tx_offload & DEV_TX_OFFLOAD_IPV4_CKSUM) != 0)
359                 dev->tx.ol_flags[TLE_UDP_V4] |= PKT_TX_IPV4 | PKT_TX_IP_CKSUM;
360
361         dev->prm = *dev_prm;
362         dev->ctx = ctx;
363         ctx->nb_dev++;
364
365         return dev;
366 }
367
368 static void
369 empty_dring(struct tle_dring *dr)
370 {
371         uint32_t i, k, n;
372         struct tle_udp_stream *s;
373         struct rte_mbuf *pkt[MAX_PKT_BURST];
374         struct tle_drb *drb[MAX_PKT_BURST];
375
376         do {
377                 k = RTE_DIM(drb);
378                 n = tle_dring_sc_dequeue(dr, (const void **)(uintptr_t)pkt,
379                         RTE_DIM(pkt), drb, &k);
380
381                 /* free mbufs */
382                 for (i = 0; i != n; i++)
383                         rte_pktmbuf_free(pkt[i]);
384                 /* free drbs */
385                 for (i = 0; i != k; i++) {
386                         s = drb[i]->udata;
387                         rte_ring_enqueue(s->tx.drb.r, drb[i]);
388                 }
389         } while (n != 0);
390 }
391
392 int
393 tle_udp_del_dev(struct tle_udp_dev *dev)
394 {
395         uint32_t p;
396         struct tle_udp_ctx *ctx;
397
398         if (dev == NULL || dev->ctx == NULL)
399                 return -EINVAL;
400
401         ctx = dev->ctx;
402         p = dev - ctx->dev;
403
404         if (p >= RTE_DIM(ctx->dev) ||
405                         (dev->dp[TLE_UDP_V4] == NULL &&
406                         dev->dp[TLE_UDP_V6] == NULL))
407                 return -EINVAL;
408
409         /* emtpy TX queues. */
410         empty_dring(&dev->tx.dr);
411
412         rte_free(dev->dp[TLE_UDP_V4]);
413         rte_free(dev->dp[TLE_UDP_V6]);
414         memset(dev, 0, sizeof(*dev));
415         ctx->nb_dev--;
416         return 0;
417 }
418
419 static inline void
420 stream_down(struct tle_udp_stream *s)
421 {
422         rwl_down(&s->rx.use);
423         rwl_down(&s->tx.use);
424 }
425
426 static inline void
427 stream_up(struct tle_udp_stream *s)
428 {
429         rwl_up(&s->rx.use);
430         rwl_up(&s->tx.use);
431 }
432
433 static struct tle_udp_dev *
434 find_ipv4_dev(struct tle_udp_ctx *ctx, const struct in_addr *addr)
435 {
436         uint32_t i;
437
438         for (i = 0; i != RTE_DIM(ctx->dev); i++) {
439                 if (ctx->dev[i].prm.local_addr4.s_addr == addr->s_addr &&
440                                 ctx->dev[i].dp[TLE_UDP_V4] != NULL)
441                         return ctx->dev + i;
442         }
443
444         return NULL;
445 }
446
447 static struct tle_udp_dev *
448 find_ipv6_dev(struct tle_udp_ctx *ctx, const struct in6_addr *addr)
449 {
450         uint32_t i;
451
452         for (i = 0; i != RTE_DIM(ctx->dev); i++) {
453                 if (memcmp(&ctx->dev[i].prm.local_addr6, addr,
454                                 sizeof(*addr)) == 0 &&
455                                 ctx->dev[i].dp[TLE_UDP_V6] != NULL)
456                         return ctx->dev + i;
457         }
458
459         return NULL;
460 }
461
462 static int
463 stream_fill_dev(struct tle_udp_ctx *ctx, struct tle_udp_stream *s)
464 {
465         struct tle_udp_dev *dev;
466         struct udp_pbm *pbm;
467         struct sockaddr_in *lin4;
468         struct sockaddr_in6 *lin6;
469         uint32_t i, p, sp, t;
470
471         if (s->prm.local_addr.ss_family == AF_INET) {
472                 lin4 = (struct sockaddr_in *)&s->prm.local_addr;
473                 t = TLE_UDP_V4;
474                 p = lin4->sin_port;
475         } else if (s->prm.local_addr.ss_family == AF_INET6) {
476                 lin6 = (struct sockaddr_in6 *)&s->prm.local_addr;
477                 t = TLE_UDP_V6;
478                 p = lin6->sin6_port;
479         } else
480                 return EINVAL;
481
482         p = ntohs(p);
483
484         /* if local address is not wildcard, find device it belongs to. */
485         if (t == TLE_UDP_V4 && lin4->sin_addr.s_addr != INADDR_ANY) {
486                 dev = find_ipv4_dev(ctx, &lin4->sin_addr);
487                 if (dev == NULL)
488                         return ENODEV;
489         } else if (t == TLE_UDP_V6 && memcmp(&tle_udp6_any, &lin6->sin6_addr,
490                         sizeof(tle_udp6_any)) != 0) {
491                 dev = find_ipv6_dev(ctx, &lin6->sin6_addr);
492                 if (dev == NULL)
493                         return ENODEV;
494         } else
495                 dev = NULL;
496
497         if (dev != NULL)
498                 pbm = &dev->dp[t]->use;
499         else
500                 pbm = &ctx->use[t];
501
502         /* try to acquire local port number. */
503         if (p == 0) {
504                 p = udp_pbm_find_range(pbm, pbm->blk, LPORT_END_BLK);
505                 if (p == 0 && pbm->blk > LPORT_START_BLK)
506                         p = udp_pbm_find_range(pbm, LPORT_START_BLK, pbm->blk);
507         } else if (udp_pbm_check(pbm, p) != 0)
508                 return EEXIST;
509
510         if (p == 0)
511                 return ENFILE;
512
513         /* fill socket's dst port and type */
514         sp = htons(p);
515         s->type = t;
516         s->port.dst = sp;
517
518         /* mark port as in-use */
519         udp_pbm_set(&ctx->use[t], p);
520         if (dev != NULL) {
521                 udp_pbm_set(pbm, p);
522                 dev->dp[t]->streams[sp] = s;
523         } else {
524                 for (i = 0; i != RTE_DIM(ctx->dev); i++) {
525                         if (ctx->dev[i].dp[t] != NULL) {
526                                 udp_pbm_set(&ctx->dev[i].dp[t]->use, p);
527                                 ctx->dev[i].dp[t]->streams[sp] = s;
528                         }
529                 }
530         }
531
532         return 0;
533 }
534
535 static int
536 stream_clear_dev(struct tle_udp_ctx *ctx, struct tle_udp_stream *s)
537 {
538         struct tle_udp_dev *dev;
539         uint32_t i, p, sp, t;
540
541         t = s->type;
542         sp = s->port.dst;
543         p = ntohs(sp);
544
545         /* if local address is not wildcard, find device it belongs to. */
546         if (t == TLE_UDP_V4 && s->ipv4.addr.dst != INADDR_ANY) {
547                 dev = find_ipv4_dev(ctx, (struct in_addr *)&s->ipv4.addr.dst);
548                 if (dev == NULL)
549                         return ENODEV;
550         } else if (t == TLE_UDP_V6 && memcmp(&tle_udp6_any, &s->ipv6.addr.dst,
551                         sizeof(tle_udp6_any)) != 0) {
552                 dev = find_ipv6_dev(ctx, (struct in6_addr *)&s->ipv6.addr.dst);
553                 if (dev == NULL)
554                         return ENODEV;
555         } else
556                 dev = NULL;
557
558         udp_pbm_clear(&ctx->use[t], p);
559         if (dev != NULL) {
560                 udp_pbm_clear(&dev->dp[t]->use, p);
561                 dev->dp[t]->streams[sp] = NULL;
562         } else {
563                 for (i = 0; i != RTE_DIM(ctx->dev); i++) {
564                         if (ctx->dev[i].dp[t] != NULL) {
565                                 udp_pbm_clear(&ctx->dev[i].dp[t]->use, p);
566                                 ctx->dev[i].dp[t]->streams[sp] = NULL;
567                         }
568                 }
569         }
570
571         return 0;
572 }
573
574 static struct tle_udp_stream *
575 get_stream(struct tle_udp_ctx *ctx)
576 {
577         struct tle_udp_stream *s;
578
579         s = NULL;
580         if (ctx->streams.nb_free == 0)
581                 return s;
582
583         rte_spinlock_lock(&ctx->streams.lock);
584         if (ctx->streams.nb_free != 0) {
585                 s = STAILQ_FIRST(&ctx->streams.free);
586                 STAILQ_REMOVE_HEAD(&ctx->streams.free, link);
587                 ctx->streams.nb_free--;
588         }
589         rte_spinlock_unlock(&ctx->streams.lock);
590         return s;
591 }
592
593 static void
594 put_stream(struct tle_udp_ctx *ctx, struct tle_udp_stream *s, int32_t head)
595 {
596         s->type = TLE_UDP_VNUM;
597         rte_spinlock_lock(&ctx->streams.lock);
598         if (head != 0)
599                 STAILQ_INSERT_HEAD(&ctx->streams.free, s, link);
600         else
601                 STAILQ_INSERT_TAIL(&ctx->streams.free, s, link);
602         ctx->streams.nb_free++;
603         rte_spinlock_unlock(&ctx->streams.lock);
604 }
605
606 static void
607 fill_ipv4_am(const struct sockaddr_in *in, uint32_t *addr, uint32_t *mask)
608 {
609         *addr = in->sin_addr.s_addr;
610         *mask = (*addr == INADDR_ANY) ? INADDR_ANY : INADDR_NONE;
611 }
612
613 static void
614 fill_ipv6_am(const struct sockaddr_in6 *in, rte_xmm_t *addr, rte_xmm_t *mask)
615 {
616         const struct in6_addr *pm;
617
618         memcpy(addr, &in->sin6_addr, sizeof(*addr));
619         if (memcmp(&tle_udp6_any, addr, sizeof(*addr)) == 0)
620                 pm = &tle_udp6_any;
621         else
622                 pm = &tle_udp6_none;
623
624         memcpy(mask, pm, sizeof(*mask));
625 }
626
627 static int
628 check_stream_prm(const struct tle_udp_ctx *ctx,
629         const struct tle_udp_stream_param *prm)
630 {
631         if ((prm->local_addr.ss_family != AF_INET &&
632                         prm->local_addr.ss_family != AF_INET6) ||
633                         prm->local_addr.ss_family != prm->remote_addr.ss_family)
634                 return -EINVAL;
635
636         /* callback and event notifications mechanisms are mutually exclusive */
637         if ((prm->recv_ev != NULL && prm->recv_cb.func != NULL) ||
638                         (prm->send_ev != NULL && prm->send_cb.func != NULL))
639                 return -EINVAL;
640
641         /* check does context support desired address family. */
642         if ((prm->local_addr.ss_family == AF_INET &&
643                         ctx->prm.lookup4 == NULL) ||
644                         (prm->local_addr.ss_family == AF_INET6 &&
645                         ctx->prm.lookup6 == NULL))
646                 return -EINVAL;
647
648         return 0;
649 }
650
651 struct tle_udp_stream *
652 tle_udp_stream_open(struct tle_udp_ctx *ctx,
653         const struct tle_udp_stream_param *prm)
654 {
655         struct tle_udp_stream *s;
656         const struct sockaddr_in *rin;
657         int32_t rc;
658
659         if (ctx == NULL || prm == NULL || check_stream_prm(ctx, prm) != 0) {
660                 rte_errno = EINVAL;
661                 return NULL;
662         }
663
664         s = get_stream(ctx);
665         if (s == NULL)  {
666                 rte_errno = ENFILE;
667                 return NULL;
668
669         /* some TX still pending for that stream. */
670         } else if (UDP_STREAM_TX_PENDING(s)) {
671                 put_stream(ctx, s, 0);
672                 rte_errno = EAGAIN;
673                 return NULL;
674         }
675
676         /* copy input parameters. */
677         s->prm = *prm;
678
679         /* setup ports and port mask fields (except dst port). */
680         rin = (const struct sockaddr_in *)&prm->remote_addr;
681         s->port.src = rin->sin_port;
682         s->pmsk.src = (s->port.src == 0) ? 0 : UINT16_MAX;
683         s->pmsk.dst = UINT16_MAX;
684
685         /* setup src and dst addresses. */
686         if (prm->local_addr.ss_family == AF_INET) {
687                 fill_ipv4_am((const struct sockaddr_in *)&prm->local_addr,
688                         &s->ipv4.addr.dst, &s->ipv4.mask.dst);
689                 fill_ipv4_am((const struct sockaddr_in *)&prm->remote_addr,
690                         &s->ipv4.addr.src, &s->ipv4.mask.src);
691         } else if (prm->local_addr.ss_family == AF_INET6) {
692                 fill_ipv6_am((const struct sockaddr_in6 *)&prm->local_addr,
693                         &s->ipv6.addr.dst, &s->ipv6.mask.dst);
694                 fill_ipv6_am((const struct sockaddr_in6 *)&prm->remote_addr,
695                         &s->ipv6.addr.src, &s->ipv6.mask.src);
696         }
697
698         rte_spinlock_lock(&ctx->dev_lock);
699         rc = stream_fill_dev(ctx, s);
700         rte_spinlock_unlock(&ctx->dev_lock);
701
702         if (rc != 0) {
703                 put_stream(ctx, s, 1);
704                 s = NULL;
705                 rte_errno = rc;
706         } else {
707                 /* setup stream notification menchanism */
708                 s->rx.ev = prm->recv_ev;
709                 s->rx.cb = prm->recv_cb;
710                 s->tx.ev = prm->send_ev;
711                 s->tx.cb = prm->send_cb;
712
713                 /* mark stream as avaialbe for RX/TX */
714                 if (s->tx.ev != NULL)
715                         tle_event_raise(s->tx.ev);
716                 stream_up(s);
717         }
718
719         return s;
720 }
721
722 int
723 tle_udp_stream_close(struct tle_udp_stream *s)
724 {
725         uint32_t i, n;
726         int32_t rc;
727         struct tle_udp_ctx *ctx;
728         struct rte_mbuf *m[MAX_PKT_BURST];
729
730         static const struct tle_udp_stream_cb zcb;
731
732         if (s == NULL || s->type >= TLE_UDP_VNUM)
733                 return EINVAL;
734
735         ctx = s->ctx;
736
737         /* mark stream as unavaialbe for RX/TX. */
738         stream_down(s);
739
740         /* reset stream events if any. */
741         if (s->rx.ev != NULL) {
742                 tle_event_idle(s->rx.ev);
743                 s->rx.ev = NULL;
744         }
745         if (s->tx.ev != NULL) {
746                 tle_event_idle(s->tx.ev);
747                 s->tx.ev = NULL;
748         }
749
750         s->rx.cb = zcb;
751         s->tx.cb = zcb;
752
753         /* free stream's destination port */
754         rte_spinlock_lock(&ctx->dev_lock);
755         rc = stream_clear_dev(ctx, s);
756         rte_spinlock_unlock(&ctx->dev_lock);
757
758         /* empty stream's RX queue */
759         do {
760                 n = rte_ring_dequeue_burst(s->rx.q, (void **)m, RTE_DIM(m));
761                 for (i = 0; i != n; i++)
762                         rte_pktmbuf_free(m[i]);
763         } while (n != 0);
764
765         /*
766          * mark the stream as free again.
767          * if there still are pkts queued for TX,
768          * then put this stream to the tail of free list.
769          */
770         put_stream(ctx, s, UDP_STREAM_TX_FINISHED(s));
771         return rc;
772 }
773
774 int
775 tle_udp_stream_get_param(const struct tle_udp_stream *s,
776         struct tle_udp_stream_param *prm)
777 {
778         struct sockaddr_in *lin4;
779         struct sockaddr_in6 *lin6;
780
781         if (prm == NULL || s == NULL || s->type >= TLE_UDP_VNUM)
782                 return EINVAL;
783
784         prm[0] = s->prm;
785         if (prm->local_addr.ss_family == AF_INET) {
786                 lin4 = (struct sockaddr_in *)&prm->local_addr;
787                 lin4->sin_port = s->port.dst;
788         } else if (s->prm.local_addr.ss_family == AF_INET6) {
789                 lin6 = (struct sockaddr_in6 *)&prm->local_addr;
790                 lin6->sin6_port = s->port.dst;
791         }
792
793         return 0;
794 }