Revert "l4p/tcp: introduce tle_tcp_stream_establish() API"
[tldk.git] / lib / libtle_l4p / udp_rxtx.c
1 /*
2  * Copyright (c) 2016  Intel Corporation.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <rte_malloc.h>
17 #include <rte_errno.h>
18 #include <rte_ethdev.h>
19 #include <rte_ip.h>
20 #include <rte_ip_frag.h>
21 #include <rte_udp.h>
22
23 #include "udp_stream.h"
24 #include "misc.h"
25
26 static inline struct tle_udp_stream *
27 rx_stream_obtain(struct tle_dev *dev, uint32_t type, uint32_t port)
28 {
29         struct tle_udp_stream *s;
30
31         if (type >= TLE_VNUM || dev->dp[type] == NULL)
32                 return NULL;
33
34         s = (struct tle_udp_stream *)dev->dp[type]->streams[port];
35         if (s == NULL)
36                 return NULL;
37
38         if (rwl_acquire(&s->rx.use) < 0)
39                 return NULL;
40
41         return s;
42 }
43
44 static inline uint16_t
45 get_pkt_type(const struct rte_mbuf *m)
46 {
47         uint32_t v;
48
49         v = m->packet_type &
50                 (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_MASK);
51         if (v == (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L4_UDP))
52                 return TLE_V4;
53         else if (v == (RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_UDP))
54                 return TLE_V6;
55         else
56                 return TLE_VNUM;
57 }
58
59 static inline union l4_ports
60 pkt_info(struct rte_mbuf *m, union l4_ports *ports, union ipv4_addrs *addr4,
61         union ipv6_addrs **addr6)
62 {
63         uint32_t len;
64         union l4_ports ret, *up;
65         union ipv4_addrs *pa4;
66
67         ret.src = get_pkt_type(m);
68
69         len = m->l2_len;
70         if (ret.src == TLE_V4) {
71                 pa4 = rte_pktmbuf_mtod_offset(m, union ipv4_addrs *,
72                         len + offsetof(struct rte_ipv4_hdr, src_addr));
73                 addr4->raw = pa4->raw;
74         } else if (ret.src == TLE_V6) {
75                 *addr6 = rte_pktmbuf_mtod_offset(m, union ipv6_addrs *,
76                         len + offsetof(struct rte_ipv6_hdr, src_addr));
77         }
78
79         len += m->l3_len;
80         up = rte_pktmbuf_mtod_offset(m, union l4_ports *,
81                 len + offsetof(struct rte_udp_hdr, src_port));
82         ports->raw = up->raw;
83         ret.dst = ports->dst;
84         return ret;
85 }
86
87 /*
88  * Helper routine, enqueues packets to the stream and calls RX
89  * notification callback, if needed.
90  */
91 static inline uint16_t
92 rx_stream(struct tle_udp_stream *s, void *mb[], struct rte_mbuf *rp[],
93         int32_t rc[], uint32_t num)
94 {
95         uint32_t i, k, r;
96
97         r = _rte_ring_enqueue_burst(s->rx.q, mb, num);
98
99         /* if RX queue was empty invoke user RX notification callback. */
100         if (s->rx.cb.func != NULL && r != 0 && rte_ring_count(s->rx.q) == r)
101                 s->rx.cb.func(s->rx.cb.data, &s->s);
102
103         for (i = r, k = 0; i != num; i++, k++) {
104                 rc[k] = ENOBUFS;
105                 rp[k] = mb[i];
106         }
107
108         return r;
109 }
110
111 static inline uint16_t
112 rx_stream6(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
113         union ipv6_addrs *addr[], union l4_ports port[],
114         struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
115 {
116         uint32_t i, k, n;
117         void *mb[num];
118
119         k = 0;
120         n = 0;
121
122         for (i = 0; i != num; i++) {
123
124                 if ((port[i].raw & s->s.pmsk.raw) != s->s.port.raw ||
125                                 ymm_mask_cmp(&addr[i]->raw, &s->s.ipv6.addr.raw,
126                                 &s->s.ipv6.mask.raw) != 0) {
127                         rc[k] = ENOENT;
128                         rp[k] = pkt[i];
129                         k++;
130                 } else {
131                         mb[n] = pkt[i];
132                         n++;
133                 }
134         }
135
136         return rx_stream(s, mb, rp + k, rc + k, n);
137 }
138
139 static inline uint16_t
140 rx_stream4(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
141         union ipv4_addrs addr[], union l4_ports port[],
142         struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
143 {
144         uint32_t i, k, n;
145         void *mb[num];
146
147         k = 0;
148         n = 0;
149
150         for (i = 0; i != num; i++) {
151
152                 if ((addr[i].raw & s->s.ipv4.mask.raw) != s->s.ipv4.addr.raw ||
153                                 (port[i].raw & s->s.pmsk.raw) !=
154                                 s->s.port.raw) {
155                         rc[k] = ENOENT;
156                         rp[k] = pkt[i];
157                         k++;
158                 } else {
159                         mb[n] = pkt[i];
160                         n++;
161                 }
162         }
163
164         return rx_stream(s, mb, rp + k, rc + k, n);
165 }
166
167 uint16_t
168 tle_udp_rx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[],
169         struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
170 {
171         struct tle_udp_stream *s;
172         uint32_t i, j, k, n, p, t;
173         union l4_ports tp[num], port[num];
174         union ipv4_addrs a4[num];
175         union ipv6_addrs *pa6[num];
176
177         for (i = 0; i != num; i++)
178                 tp[i] = pkt_info(pkt[i], &port[i], &a4[i], &pa6[i]);
179
180         k = 0;
181         for (i = 0; i != num; i = j) {
182
183                 for (j = i + 1; j != num && tp[j].raw == tp[i].raw; j++)
184                         ;
185
186                 t = tp[i].src;
187                 p = tp[i].dst;
188                 s = rx_stream_obtain(dev, t, p);
189                 if (s != NULL) {
190
191                         if (t == TLE_V4)
192                                 n = rx_stream4(s, pkt + i, a4 + i,
193                                         port + i, rp + k, rc + k, j - i);
194                         else
195                                 n = rx_stream6(s, pkt + i, pa6 + i, port + i,
196                                         rp + k, rc + k, j - i);
197
198                         k += j - i - n;
199
200                         if (s->rx.ev != NULL)
201                                 tle_event_raise(s->rx.ev);
202                         rwl_release(&s->rx.use);
203
204                 } else {
205                         for (; i != j; i++) {
206                                 rc[k] = ENOENT;
207                                 rp[k] = pkt[i];
208                                 k++;
209                         }
210                 }
211         }
212
213         return num - k;
214 }
215
216 static inline void
217 stream_drb_release(struct tle_udp_stream *s, struct tle_drb *drb[],
218         uint32_t nb_drb)
219 {
220         uint32_t n;
221
222         n = rte_ring_count(s->tx.drb.r);
223         _rte_ring_enqueue_burst(s->tx.drb.r, (void **)drb, nb_drb);
224
225         /* If stream is still open, then mark it as avaialble for writing. */
226         if (rwl_try_acquire(&s->tx.use) > 0) {
227
228                 if (s->tx.ev != NULL)
229                         tle_event_raise(s->tx.ev);
230
231                 /* if stream send buffer was full invoke TX callback */
232                 else if (s->tx.cb.func != NULL && n == 0)
233                         s->tx.cb.func(s->tx.cb.data, &s->s);
234
235         }
236
237         rwl_release(&s->tx.use);
238 }
239
240 uint16_t
241 tle_udp_tx_bulk(struct tle_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
242 {
243         uint32_t i, j, k, n;
244         struct tle_drb *drb[num];
245         struct tle_udp_stream *s;
246
247         /* extract packets from device TX queue. */
248
249         k = num;
250         n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
251                 num, drb, &k);
252
253         if (n == 0)
254                 return 0;
255
256         /* free empty drbs and notify related streams. */
257
258         for (i = 0; i != k; i = j) {
259                 s = drb[i]->udata;
260                 for (j = i + 1; j != k && s == drb[j]->udata; j++)
261                         ;
262                 stream_drb_release(s, drb + i, j - i);
263         }
264
265         return n;
266 }
267
268 /*
269  * helper function, do the necessary pre-processing for the received packets
270  * before handiing them to the strem_recv caller.
271  */
272 static inline uint32_t
273 recv_pkt_process(struct rte_mbuf *m[], uint32_t num, uint32_t type)
274 {
275         uint32_t i, k;
276         uint64_t flg[num], ofl[num];
277
278         for (i = 0; i != num; i++) {
279                 flg[i] = m[i]->ol_flags;
280                 ofl[i] = m[i]->tx_offload;
281         }
282
283         k = 0;
284         for (i = 0; i != num; i++) {
285
286                 /* drop packets with invalid cksum(s). */
287                 if (check_pkt_csum(m[i], flg[i], type, IPPROTO_UDP) != 0) {
288                         rte_pktmbuf_free(m[i]);
289                         m[i] = NULL;
290                         k++;
291                 } else
292                         rte_pktmbuf_adj(m[i], _tx_offload_l4_offset(ofl[i]));
293         }
294
295         return k;
296 }
297
298 uint16_t
299 tle_udp_stream_recv(struct tle_stream *us, struct rte_mbuf *pkt[], uint16_t num)
300 {
301         uint32_t k, n;
302         struct tle_udp_stream *s;
303
304         s = UDP_STREAM(us);
305         n = _rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
306         if (n == 0)
307                 return 0;
308
309         /*
310          * if we still have packets to read,
311          * then rearm stream RX event.
312          */
313         if (n == num && rte_ring_count(s->rx.q) != 0) {
314                 if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
315                         tle_event_raise(s->rx.ev);
316                 rwl_release(&s->rx.use);
317         }
318
319         k = recv_pkt_process(pkt, n, s->s.type);
320         return compress_pkt_list(pkt, n, k);
321 }
322
323 static inline int
324 udp_fill_mbuf(struct rte_mbuf *m,
325         uint32_t type, uint64_t ol_flags, uint32_t pid,
326         union udph udph, const struct tle_dest *dst)
327 {
328         uint32_t len, plen;
329         char *l2h;
330         union udph *l4h;
331
332         len = dst->l2_len + dst->l3_len;
333         plen = m->pkt_len;
334
335         /* copy to mbuf L2/L3 header template. */
336
337         l2h = rte_pktmbuf_prepend(m, len + sizeof(*l4h));
338         if (l2h == NULL)
339                 return -ENOBUFS;
340
341         /* copy L2/L3 header */
342         rte_memcpy(l2h, dst->hdr, len);
343
344         /* copy UDP header */
345         l4h = (union udph *)(l2h + len);
346         l4h->raw = udph.raw;
347
348         /* setup mbuf TX offload related fields. */
349         m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len,
350                 sizeof(*l4h), 0, 0, 0);
351         m->ol_flags |= ol_flags;
352
353         l4h->len = rte_cpu_to_be_16(plen + sizeof(*l4h));
354
355         /* update proto specific fields. */
356
357         if (type == TLE_V4) {
358                 struct rte_ipv4_hdr *l3h;
359                 l3h = (struct rte_ipv4_hdr *)(l2h + dst->l2_len);
360                 l3h->packet_id = rte_cpu_to_be_16(pid);
361                 l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len +
362                         sizeof(*l4h));
363
364                 if ((ol_flags & PKT_TX_UDP_CKSUM) != 0)
365                         l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
366                                 ol_flags);
367                 else
368                         l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
369
370                 if ((ol_flags & PKT_TX_IP_CKSUM) == 0)
371                         l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
372         } else {
373                 struct rte_ipv6_hdr *l3h;
374                 l3h = (struct rte_ipv6_hdr *)(l2h + dst->l2_len);
375                 l3h->payload_len = rte_cpu_to_be_16(plen + sizeof(*l4h));
376                 if ((ol_flags & PKT_TX_UDP_CKSUM) != 0)
377                         l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
378                 else
379                         l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
380         }
381
382         return 0;
383 }
384
385 /* ???
386  * probably this function should be there -
387  * rte_ipv[4,6]_fragment_packet should do that.
388  */
389 static inline void
390 frag_fixup(const struct rte_mbuf *ms, struct rte_mbuf *mf, uint32_t type)
391 {
392         struct rte_ipv4_hdr *l3h;
393
394         mf->ol_flags = ms->ol_flags;
395         mf->tx_offload = ms->tx_offload;
396
397         if (type == TLE_V4 && (ms->ol_flags & PKT_TX_IP_CKSUM) == 0) {
398                 l3h = rte_pktmbuf_mtod(mf, struct rte_ipv4_hdr *);
399                 l3h->hdr_checksum = _ipv4x_cksum(l3h, mf->l3_len);
400         }
401 }
402
403 /*
404  * Returns negative for failure to fragment or actual number of fragments.
405  */
406 static inline int
407 fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num,
408         uint32_t type, const struct tle_dest *dst)
409 {
410         int32_t frag_num, i;
411         uint16_t mtu;
412         void *eth_hdr;
413
414         /* Remove the Ethernet header from the input packet */
415         rte_pktmbuf_adj(pkt, dst->l2_len);
416         mtu = dst->mtu - dst->l2_len;
417
418         /* fragment packet */
419         if (type == TLE_V4)
420                 frag_num = rte_ipv4_fragment_packet(pkt, frag, num, mtu,
421                         dst->head_mp, dst->head_mp);
422         else
423                 frag_num = rte_ipv6_fragment_packet(pkt, frag, num, mtu,
424                         dst->head_mp, dst->head_mp);
425
426         if (frag_num > 0) {
427                 for (i = 0; i != frag_num; i++) {
428
429                         frag_fixup(pkt, frag[i], type);
430
431                         /* Move data_off to include l2 header first */
432                         eth_hdr = rte_pktmbuf_prepend(frag[i], dst->l2_len);
433
434                         /* copy l2 header into fragment */
435                         rte_memcpy(eth_hdr, dst->hdr, dst->l2_len);
436                 }
437         }
438
439         return frag_num;
440 }
441
442 static inline void
443 stream_drb_free(struct tle_udp_stream *s, struct tle_drb *drbs[],
444         uint32_t nb_drb)
445 {
446         _rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
447 }
448
449 static inline uint32_t
450 stream_drb_alloc(struct tle_udp_stream *s, struct tle_drb *drbs[],
451         uint32_t nb_drb)
452 {
453         return _rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
454 }
455
456 /* enqueue up to num packets to the destination device queue. */
457 static inline uint16_t
458 queue_pkt_out(struct tle_udp_stream *s, struct tle_dev *dev,
459                 const void *pkt[], uint16_t nb_pkt,
460                 struct tle_drb *drbs[], uint32_t *nb_drb, uint8_t all_or_nothing)
461 {
462         uint32_t bsz, i, n, nb, nbc, nbm;
463
464         bsz = s->tx.drb.nb_elem;
465
466         /* calulate how many drbs are needed.*/
467         nbc = *nb_drb;
468         nbm = (nb_pkt + bsz - 1) / bsz;
469         nb = RTE_MAX(nbm, nbc) - nbc;
470
471         /* allocate required drbs */
472         if (nb != 0)
473                 nb = stream_drb_alloc(s, drbs + nbc, nb);
474
475         nb += nbc;
476
477         /* no free drbs, can't send anything */
478         if (nb == 0)
479                 return 0;
480
481         /* not enough free drbs, reduce number of packets to send. */
482         else if (nb != nbm) {
483                 if (all_or_nothing)
484                         return 0;
485                 nb_pkt = nb * bsz;
486         }
487
488         /* enqueue packets to the destination device. */
489         nbc = nb;
490         n = tle_dring_mp_enqueue(&dev->tx.dr, pkt, nb_pkt, drbs, &nb);
491
492         /* if not all available drbs were consumed, move them to the start. */
493         nbc -= nb;
494         for (i = 0; i != nb; i++)
495                 drbs[i] = drbs[nbc + i];
496
497         *nb_drb = nb;
498         return n;
499 }
500
501 uint16_t
502 tle_udp_stream_send(struct tle_stream *us, struct rte_mbuf *pkt[],
503         uint16_t num, const struct sockaddr *dst_addr)
504 {
505         int32_t di, frg, rc;
506         uint64_t ol_flags;
507         uint32_t i, k, n, nb;
508         uint32_t mtu, pid, type;
509         const struct sockaddr_in *d4;
510         const struct sockaddr_in6 *d6;
511         struct tle_udp_stream *s;
512         const void *da;
513         union udph udph;
514         struct tle_dest dst;
515         struct tle_drb *drb[num];
516
517         s = UDP_STREAM(us);
518         type = s->s.type;
519
520         /* start filling UDP header. */
521         udph.raw = 0;
522         udph.ports.src = s->s.port.dst;
523
524         /* figure out what destination addr/port to use. */
525         if (dst_addr != NULL) {
526                 if (dst_addr->sa_family != s->prm.remote_addr.ss_family) {
527                         rte_errno = EINVAL;
528                         return 0;
529                 }
530                 if (type == TLE_V4) {
531                         d4 = (const struct sockaddr_in *)dst_addr;
532                         da = &d4->sin_addr;
533                         udph.ports.dst = d4->sin_port;
534                 } else {
535                         d6 = (const struct sockaddr_in6 *)dst_addr;
536                         da = &d6->sin6_addr;
537                         udph.ports.dst = d6->sin6_port;
538                 }
539         } else {
540                 udph.ports.dst = s->s.port.src;
541                 if (type == TLE_V4)
542                         da = &s->s.ipv4.addr.src;
543                 else
544                         da = &s->s.ipv6.addr.src;
545         }
546
547         di = stream_get_dest(&s->s, da, &dst);
548         if (di < 0) {
549                 rte_errno = -di;
550                 return 0;
551         }
552
553         pid = rte_atomic32_add_return(&dst.dev->tx.packet_id[type], num) - num;
554         mtu = dst.mtu - dst.l2_len - dst.l3_len;
555
556         /* mark stream as not closable. */
557         if (rwl_acquire(&s->tx.use) < 0) {
558                 rte_errno = EAGAIN;
559                 return 0;
560         }
561
562         nb = 0;
563         for (i = 0, k = 0; k != num; k = i) {
564
565                 /* copy L2/L3/L4 headers into mbufs, setup mbufs metadata. */
566
567                 frg = 0;
568                 ol_flags = dst.dev->tx.ol_flags[type];
569
570                 while (i != num && frg == 0) {
571                         frg = pkt[i]->pkt_len > mtu;
572                         if (frg != 0)
573                                 ol_flags &= ~PKT_TX_UDP_CKSUM;
574                         rc = udp_fill_mbuf(pkt[i], type, ol_flags, pid + i,
575                                 udph, &dst);
576                         if (rc != 0) {
577                                 rte_errno = -rc;
578                                 goto out;
579                         }
580                         i += (frg == 0);
581                 }
582
583                 /* enqueue non-fragment packets to the destination device. */
584                 if (k != i) {
585                         k += queue_pkt_out(s, dst.dev,
586                                 (const void **)(uintptr_t)&pkt[k], i - k,
587                                 drb, &nb, 0);
588
589                         /* stream TX queue is full. */
590                         if (k != i) {
591                                 rte_errno = EAGAIN;
592                                 break;
593                         }
594                 }
595
596                 /* enqueue packet that need to be fragmented */
597                 if (i != num) {
598
599                         struct rte_mbuf *frag[RTE_LIBRTE_IP_FRAG_MAX_FRAG];
600
601                         /* fragment the packet. */
602                         rc = fragment(pkt[i], frag, RTE_DIM(frag), type, &dst);
603                         if (rc < 0) {
604                                 rte_errno = -rc;
605                                 break;
606                         }
607
608                         n = queue_pkt_out(s, dst.dev,
609                                 (const void **)(uintptr_t)frag, rc, drb, &nb, 1);
610                         if (n == 0) {
611                                 while (rc-- != 0)
612                                         rte_pktmbuf_free(frag[rc]);
613                                 rte_errno = EAGAIN;
614                                 break;
615                         }
616
617                         /* all fragments enqueued, free the original packet. */
618                         rte_pktmbuf_free(pkt[i]);
619                         i++;
620                 }
621         }
622
623         /* if possible, rearm socket write event. */
624         if (k == num && s->tx.ev != NULL)
625                 tle_event_raise(s->tx.ev);
626
627 out:
628         /* free unused drbs. */
629         if (nb != 0)
630                 stream_drb_free(s, drb, nb);
631
632         /* stream can be closed. */
633         rwl_release(&s->tx.use);
634
635         /*
636          * remove pkt l2/l3 headers, restore ol_flags for unsent, but
637          * already modified packets.
638          */
639         ol_flags = ~dst.dev->tx.ol_flags[type];
640         for (n = k; n != i; n++) {
641                 rte_pktmbuf_adj(pkt[n], dst.l2_len + dst.l3_len + sizeof(udph));
642                 pkt[n]->ol_flags &= ol_flags;
643         }
644
645         return k;
646 }