2 * Copyright (c) 2016 Intel Corporation.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
16 #include <rte_malloc.h>
17 #include <rte_errno.h>
18 #include <rte_ethdev.h>
20 #include <rte_ip_frag.h>
26 static inline struct tle_udp_stream *
27 rx_stream_obtain(struct tle_udp_dev *dev, uint32_t type, uint32_t port)
29 struct tle_udp_stream *s;
31 if (type >= TLE_UDP_VNUM || dev->dp[type] == NULL)
34 s = dev->dp[type]->streams[port];
38 if (rwl_acquire(&s->rx.use) < 0)
44 static inline uint16_t
45 get_pkt_type(const struct rte_mbuf *m)
50 (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_MASK);
51 if (v == (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L4_UDP))
53 else if (v == (RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_UDP))
59 static inline union udp_ports
60 pkt_info(const struct tle_udp_dev *dev, struct rte_mbuf *m,
61 union udp_ports *ports, union ipv4_addrs *addr4,
62 union ipv6_addrs **addr6)
65 union udp_ports ret, *up;
66 union ipv4_addrs *pa4;
68 ret.src = get_pkt_type(m);
71 if (ret.src == TLE_UDP_V4) {
72 pa4 = rte_pktmbuf_mtod_offset(m, union ipv4_addrs *,
73 len + offsetof(struct ipv4_hdr, src_addr));
74 addr4->raw = pa4->raw;
75 m->ol_flags |= dev->rx.ol_flags[TLE_UDP_V4];
76 } else if (ret.src == TLE_UDP_V6) {
77 *addr6 = rte_pktmbuf_mtod_offset(m, union ipv6_addrs *,
78 len + offsetof(struct ipv6_hdr, src_addr));
79 m->ol_flags |= dev->rx.ol_flags[TLE_UDP_V6];
83 up = rte_pktmbuf_mtod_offset(m, union udp_ports *,
84 len + offsetof(struct udp_hdr, src_port));
91 * Helper routine, enqueues packets to the stream and calls RX
92 * notification callback, if needed.
94 static inline uint16_t
95 rx_stream(struct tle_udp_stream *s, void *mb[], struct rte_mbuf *rp[],
96 int32_t rc[], uint32_t num)
100 r = rte_ring_enqueue_burst(s->rx.q, mb, num);
102 /* if RX queue was empty invoke user RX notification callback. */
103 if (s->rx.cb.func != NULL && r != 0 && rte_ring_count(s->rx.q) == r)
104 s->rx.cb.func(s->rx.cb.data, s);
106 for (i = r, k = 0; i != num; i++, k++) {
114 static inline uint16_t
115 rx_stream6(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
116 union ipv6_addrs *addr[], union udp_ports port[],
117 struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
125 for (i = 0; i != num; i++) {
127 if ((port[i].raw & s->pmsk.raw) != s->port.raw ||
128 ymm_mask_cmp(&addr[i]->raw, &s->ipv6.addr.raw,
129 &s->ipv6.mask.raw) != 0) {
139 return rx_stream(s, mb, rp + k, rc + k, n);
142 static inline uint16_t
143 rx_stream4(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
144 union ipv4_addrs addr[], union udp_ports port[],
145 struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
153 for (i = 0; i != num; i++) {
155 if ((addr[i].raw & s->ipv4.mask.raw) != s->ipv4.addr.raw ||
156 (port[i].raw & s->pmsk.raw) !=
167 return rx_stream(s, mb, rp + k, rc + k, n);
171 tle_udp_rx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[],
172 struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
174 struct tle_udp_stream *s;
175 uint32_t i, j, k, n, p, t;
176 union udp_ports tp[num], port[num];
177 union ipv4_addrs a4[num];
178 union ipv6_addrs *pa6[num];
180 for (i = 0; i != num; i++)
181 tp[i] = pkt_info(dev, pkt[i], &port[i], &a4[i], &pa6[i]);
184 for (i = 0; i != num; i = j) {
186 for (j = i + 1; j != num && tp[j].raw == tp[i].raw; j++)
191 s = rx_stream_obtain(dev, t, p);
195 n = rx_stream4(s, pkt + i, a4 + i,
196 port + i, rp + k, rc + k, j - i);
198 n = rx_stream6(s, pkt + i, pa6 + i, port + i,
199 rp + k, rc + k, j - i);
203 if (s->rx.ev != NULL)
204 tle_event_raise(s->rx.ev);
205 rwl_release(&s->rx.use);
208 for (; i != j; i++) {
220 stream_drb_release(struct tle_udp_stream *s, struct tle_drb * drb[],
225 n = rte_ring_count(s->tx.drb.r);
226 rte_ring_enqueue_burst(s->tx.drb.r, (void **)drb, nb_drb);
228 /* If stream is still open, then mark it as avaialble for writing. */
229 if (rwl_try_acquire(&s->tx.use) > 0) {
231 if (s->tx.ev != NULL)
232 tle_event_raise(s->tx.ev);
234 /* if stream send buffer was full invoke TX callback */
235 else if (s->tx.cb.func != NULL && n == 0)
236 s->tx.cb.func(s->tx.cb.data, s);
240 rwl_release(&s->tx.use);
244 tle_udp_tx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
247 struct tle_drb *drb[num];
248 struct tle_udp_stream *s;
250 /* extract packets from device TX queue. */
253 n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
259 /* free empty drbs and notify related streams. */
261 for (i = 0; i != k; i = j) {
263 for (j = i + 1; j != k && s == drb[i]->udata; j++)
265 stream_drb_release(s, drb + i, j - i);
272 check_pkt_csum(const struct rte_mbuf *m, uint32_t type)
274 const struct ipv4_hdr *l3h4;
275 const struct ipv6_hdr *l3h6;
276 const struct udp_hdr *l4h;
281 l3h4 = rte_pktmbuf_mtod_offset(m, const struct ipv4_hdr *, m->l2_len);
282 l3h6 = rte_pktmbuf_mtod_offset(m, const struct ipv6_hdr *, m->l2_len);
284 if ((m->ol_flags & PKT_RX_IP_CKSUM_BAD) != 0) {
285 csum = _ipv4x_cksum(l3h4, m->l3_len);
286 ret = (csum != UINT16_MAX);
289 if (ret == 0 && (m->ol_flags & PKT_RX_L4_CKSUM_BAD) != 0) {
292 * for IPv4 it is allowed to have zero UDP cksum,
293 * for IPv6 valid UDP cksum is mandatory.
295 if (type == TLE_UDP_V4) {
296 l4h = (const struct udp_hdr *)((uintptr_t)l3h4 +
298 csum = (l4h->dgram_cksum == 0) ? UINT16_MAX :
299 _ipv4_udptcp_mbuf_cksum(m,
300 m->l2_len + m->l3_len, l3h4);
302 csum = _ipv6_udptcp_mbuf_cksum(m,
303 m->l2_len + m->l3_len, l3h6);
305 ret = (csum != UINT16_MAX);
311 /* exclude NULLs from the final list of packets. */
312 static inline uint32_t
313 compress_pkt_list(struct rte_mbuf *pkt[], uint32_t nb_pkt, uint32_t nb_zero)
317 for (j = nb_pkt; nb_zero != 0 && j-- != 0; ) {
320 if (pkt[j] == NULL) {
322 /* find how big is it. */
323 for (i = j; i-- != 0 && pkt[i] == NULL; )
326 for (k = j + 1, l = i + 1; k != nb_pkt; k++, l++)
338 * helper function, do the necessary pre-processing for the received packets
339 * before handiing them to the strem_recv caller.
341 static inline uint32_t
342 recv_pkt_process(struct rte_mbuf *m[], uint32_t num, uint32_t type)
345 uint64_t f, flg[num], ofl[num];
347 for (i = 0; i != num; i++) {
348 flg[i] = m[i]->ol_flags;
349 ofl[i] = m[i]->tx_offload;
353 for (i = 0; i != num; i++) {
355 f = flg[i] & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
357 /* drop packets with invalid cksum(s). */
358 if (f != 0 && check_pkt_csum(m[i], type) != 0) {
359 rte_pktmbuf_free(m[i]);
365 rte_pktmbuf_adj(m[i], _tx_offload_l4_offset(ofl[i]));
372 tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
377 n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
382 * if we still have packets to read,
383 * then rearm stream RX event.
385 if (n == num && rte_ring_count(s->rx.q) != 0) {
386 if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
387 tle_event_raise(s->rx.ev);
388 rwl_release(&s->rx.use);
391 k = recv_pkt_process(pkt, n, s->type);
392 return compress_pkt_list(pkt, n, k);
396 udp_get_dest(struct tle_udp_stream *s, const void *dst_addr,
397 struct tle_udp_dest *dst)
400 const struct in_addr *d4;
401 const struct in6_addr *d6;
402 struct tle_udp_ctx *ctx;
403 struct tle_udp_dev *dev;
407 /* it is here just to keep gcc happy. */
410 if (s->type == TLE_UDP_V4) {
412 rc = ctx->prm.lookup4(ctx->prm.lookup4_data, d4, dst);
413 } else if (s->type == TLE_UDP_V6) {
415 rc = ctx->prm.lookup6(ctx->prm.lookup6_data, d6, dst);
419 if (rc < 0 || dst->dev == NULL || dst->dev->ctx != ctx)
423 if (s->type == TLE_UDP_V4) {
424 struct ipv4_hdr *l3h;
425 l3h = (struct ipv4_hdr *)(dst->hdr + dst->l2_len);
426 l3h->src_addr = dev->prm.local_addr4.s_addr;
427 l3h->dst_addr = d4->s_addr;
429 struct ipv6_hdr *l3h;
430 l3h = (struct ipv6_hdr *)(dst->hdr + dst->l2_len);
431 rte_memcpy(l3h->src_addr, &dev->prm.local_addr6,
432 sizeof(l3h->src_addr));
433 rte_memcpy(l3h->dst_addr, d6, sizeof(l3h->dst_addr));
436 return dev - ctx->dev;
440 udp_fill_mbuf(struct rte_mbuf *m,
441 uint32_t type, uint64_t ol_flags, uint32_t pid,
442 union udph udph, const struct tle_udp_dest *dst)
448 len = dst->l2_len + dst->l3_len;
451 /* copy to mbuf L2/L3 header template. */
453 l2h = rte_pktmbuf_prepend(m, len + sizeof(*l4h));
457 /* copy L2/L3 header */
458 rte_memcpy(l2h, dst->hdr, len);
460 /* copy UDP header */
461 l4h = (union udph *)(l2h + len);
464 /* setup mbuf TX offload related fields. */
465 m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len,
466 sizeof(*l4h), 0, 0, 0);
467 m->ol_flags |= ol_flags;
469 l4h->len = rte_cpu_to_be_16(plen + sizeof(*l4h));
471 /* update proto specific fields. */
473 if (type == TLE_UDP_V4) {
474 struct ipv4_hdr *l3h;
475 l3h = (struct ipv4_hdr *)(l2h + dst->l2_len);
476 l3h->packet_id = rte_cpu_to_be_16(pid);
477 l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len +
480 if ((ol_flags & PKT_TX_UDP_CKSUM) != 0)
481 l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
484 l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
486 if ((ol_flags & PKT_TX_IP_CKSUM) == 0)
487 l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
489 struct ipv6_hdr *l3h;
490 l3h = (struct ipv6_hdr *)(l2h + dst->l2_len);
491 l3h->payload_len = rte_cpu_to_be_16(plen + sizeof(*l4h));
492 if ((ol_flags & PKT_TX_UDP_CKSUM) != 0)
493 l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
495 l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
502 * probably this function should be there -
503 * rte_ipv[4,6]_fragment_packet should do that.
506 frag_fixup(const struct rte_mbuf *ms, struct rte_mbuf *mf, uint32_t type)
508 struct ipv4_hdr *l3h;
510 mf->ol_flags = ms->ol_flags;
511 mf->tx_offload = ms->tx_offload;
513 if (type == TLE_UDP_V4 && (ms->ol_flags & PKT_TX_IP_CKSUM) == 0) {
514 l3h = rte_pktmbuf_mtod(mf, struct ipv4_hdr *);
515 l3h->hdr_checksum = _ipv4x_cksum(l3h, mf->l3_len);
520 * Returns negative for failure to fragment or actual number of fragments.
523 fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num,
524 uint32_t type, const struct tle_udp_dest *dst)
530 /* Remove the Ethernet header from the input packet */
531 rte_pktmbuf_adj(pkt, dst->l2_len);
532 mtu = dst->mtu - dst->l2_len;
534 /* fragment packet */
535 if (type == TLE_UDP_V4)
536 frag_num = rte_ipv4_fragment_packet(pkt, frag, num, mtu,
537 dst->head_mp, dst->head_mp);
539 frag_num = rte_ipv6_fragment_packet(pkt, frag, num, mtu,
540 dst->head_mp, dst->head_mp);
543 for (i = 0; i != frag_num; i++) {
545 frag_fixup(pkt, frag[i], type);
547 /* Move data_off to include l2 header first */
548 eth_hdr = rte_pktmbuf_prepend(frag[i], dst->l2_len);
550 /* copy l2 header into fragment */
551 rte_memcpy(eth_hdr, dst->hdr, dst->l2_len);
559 stream_drb_free(struct tle_udp_stream *s, struct tle_drb *drbs[],
562 rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
565 static inline uint32_t
566 stream_drb_alloc(struct tle_udp_stream *s, struct tle_drb *drbs[],
569 return rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
572 /* enqueue up to num packets to the destination device queue. */
573 static inline uint16_t
574 queue_pkt_out(struct tle_udp_stream *s, struct tle_udp_dev *dev,
575 const void *pkt[], uint16_t nb_pkt,
576 struct tle_drb *drbs[], uint32_t *nb_drb)
578 uint32_t bsz, i, n, nb, nbc, nbm;
580 bsz = s->tx.drb.nb_elem;
582 /* calulate how many drbs are needed.*/
584 nbm = (nb_pkt + bsz - 1) / bsz;
585 nb = RTE_MAX(nbm, nbc) - nbc;
587 /* allocate required drbs */
589 nb = stream_drb_alloc(s, drbs + nbc, nb);
593 /* no free drbs, can't send anything */
597 /* not enough free drbs, reduce number of packets to send. */
601 /* enqueue packets to the destination device. */
603 n = tle_dring_mp_enqueue(&dev->tx.dr, pkt, nb_pkt, drbs, &nb);
605 /* if not all available drbs were consumed, move them to the start. */
607 for (i = 0; i != nb; i++)
608 drbs[i] = drbs[nbc + i];
615 tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
616 uint16_t num, const struct sockaddr *dst_addr)
620 uint32_t i, k, n, nb;
621 uint32_t mtu, pid, type;
622 const struct sockaddr_in *d4;
623 const struct sockaddr_in6 *d6;
626 struct tle_udp_dest dst;
627 struct tle_drb *drb[num];
631 /* start filling UDP header. */
633 udph.ports.src = s->port.dst;
635 /* figure out what destination addr/port to use. */
636 if (dst_addr != NULL) {
637 if (dst_addr->sa_family != s->prm.remote_addr.ss_family) {
641 if (type == TLE_UDP_V4) {
642 d4 = (const struct sockaddr_in *)dst_addr;
644 udph.ports.dst = d4->sin_port;
646 d6 = (const struct sockaddr_in6 *)dst_addr;
648 udph.ports.dst = d6->sin6_port;
651 udph.ports.dst = s->port.src;
652 if (type == TLE_UDP_V4)
653 da = &s->ipv4.addr.src;
655 da = &s->ipv6.addr.src;
658 di = udp_get_dest(s, da, &dst);
664 pid = rte_atomic32_add_return(&dst.dev->tx.packet_id[type], num) - num;
665 mtu = dst.mtu - dst.l2_len - dst.l3_len;
667 /* mark stream as not closable. */
668 if (rwl_acquire(&s->tx.use) < 0)
672 for (i = 0, k = 0; k != num; k = i) {
674 /* copy L2/L3/L4 headers into mbufs, setup mbufs metadata. */
677 ol_flags = dst.dev->tx.ol_flags[type];
679 while (i != num && frg == 0) {
680 frg = pkt[i]->pkt_len > mtu;
682 ol_flags &= ~PKT_TX_UDP_CKSUM;
683 rc = udp_fill_mbuf(pkt[i], type, ol_flags, pid + i,
692 /* enqueue non-fragment packets to the destination device. */
694 k += queue_pkt_out(s, dst.dev,
695 (const void **)(uintptr_t)&pkt[k], i - k,
698 /* stream TX queue is full. */
703 /* enqueue packet that need to be fragmented */
706 struct rte_mbuf *frag[RTE_LIBRTE_IP_FRAG_MAX_FRAG];
708 /* fragment the packet. */
709 rc = fragment(pkt[i], frag, RTE_DIM(frag), type, &dst);
715 n = queue_pkt_out(s, dst.dev,
716 (const void **)(uintptr_t)frag, rc, drb, &nb);
719 rte_pktmbuf_free(frag[rc]);
723 /* all fragments enqueued, free the original packet. */
724 rte_pktmbuf_free(pkt[i]);
729 /* if possible, rearm socket write event. */
730 if (k == num && s->tx.ev != NULL)
731 tle_event_raise(s->tx.ev);
734 /* free unused drbs. */
736 stream_drb_free(s, drb, nb);
738 /* stream can be closed. */
739 rwl_release(&s->tx.use);
742 * remove pkt l2/l3 headers, restore ol_flags for unsent, but
743 * already modified packets.
745 ol_flags = ~dst.dev->tx.ol_flags[type];
746 for (n = k; n != i; n++) {
747 rte_pktmbuf_adj(pkt[n], dst.l2_len + dst.l3_len + sizeof(udph));
748 pkt[n]->ol_flags &= ol_flags;