71367147b88bf20d00f272ce283a1eb468c988b4
[tldk.git] / lib / libtle_udp / udp_rxtx.c
1 /*
2  * Copyright (c) 2016  Intel Corporation.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <rte_malloc.h>
17 #include <rte_errno.h>
18 #include <rte_ethdev.h>
19 #include <rte_ip.h>
20 #include <rte_ip_frag.h>
21 #include <rte_udp.h>
22
23 #include "udp_impl.h"
24 #include "misc.h"
25
26 static inline struct tle_udp_stream *
27 rx_stream_obtain(struct tle_udp_dev *dev, uint32_t type, uint32_t port)
28 {
29         struct tle_udp_stream *s;
30
31         if (type >= TLE_UDP_VNUM || dev->dp[type] == NULL)
32                 return NULL;
33
34         s = dev->dp[type]->streams[port];
35         if (s == NULL)
36                 return NULL;
37
38         if (rwl_acquire(&s->rx.use) < 0)
39                 return NULL;
40
41         return s;
42 }
43
44 static inline uint16_t
45 get_pkt_type(const struct rte_mbuf *m)
46 {
47         uint32_t v;
48
49         v = m->packet_type &
50                 (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_MASK);
51         if (v == (RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L4_UDP))
52                 return TLE_UDP_V4;
53         else if (v == (RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_UDP))
54                 return TLE_UDP_V6;
55         else
56                 return TLE_UDP_VNUM;
57 }
58
59 static inline union udp_ports
60 pkt_info(const struct tle_udp_dev *dev, struct rte_mbuf *m,
61         union udp_ports *ports, union ipv4_addrs *addr4,
62         union ipv6_addrs **addr6)
63 {
64         uint32_t len;
65         union udp_ports ret, *up;
66         union ipv4_addrs *pa4;
67
68         ret.src = get_pkt_type(m);
69
70         len = m->l2_len;
71         if (ret.src == TLE_UDP_V4) {
72                 pa4 = rte_pktmbuf_mtod_offset(m, union ipv4_addrs *,
73                         len + offsetof(struct ipv4_hdr, src_addr));
74                 addr4->raw = pa4->raw;
75                 m->ol_flags |= dev->rx.ol_flags[TLE_UDP_V4];
76         } else if (ret.src == TLE_UDP_V6) {
77                 *addr6 = rte_pktmbuf_mtod_offset(m, union ipv6_addrs *,
78                         len + offsetof(struct ipv6_hdr, src_addr));
79                 m->ol_flags |= dev->rx.ol_flags[TLE_UDP_V6];
80         }
81
82         len += m->l3_len;
83         up = rte_pktmbuf_mtod_offset(m, union udp_ports *,
84                 len + offsetof(struct udp_hdr, src_port));
85         ports->raw = up->raw;
86         ret.dst = ports->dst;
87         return ret;
88 }
89
90 /*
91  * Helper routine, enqueues packets to the stream and calls RX
92  * notification callback, if needed.
93  */
94 static inline uint16_t
95 rx_stream(struct tle_udp_stream *s, void *mb[], struct rte_mbuf *rp[],
96         int32_t rc[], uint32_t num)
97 {
98         uint32_t i, k, r;
99
100         r = rte_ring_enqueue_burst(s->rx.q, mb, num);
101
102         /* if RX queue was empty invoke user RX notification callback. */
103         if (s->rx.cb.func != NULL && r != 0 && rte_ring_count(s->rx.q) == r)
104                 s->rx.cb.func(s->rx.cb.data, s);
105
106         for (i = r, k = 0; i != num; i++, k++) {
107                 rc[k] = ENOBUFS;
108                 rp[k] = mb[i];
109         }
110
111         return r;
112 }
113
114 static inline uint16_t
115 rx_stream6(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
116         union ipv6_addrs *addr[], union udp_ports port[],
117         struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
118 {
119         uint32_t i, k, n;
120         void *mb[num];
121
122         k = 0;
123         n = 0;
124
125         for (i = 0; i != num; i++) {
126
127                 if ((port[i].raw & s->pmsk.raw) != s->port.raw ||
128                                 ymm_mask_cmp(&addr[i]->raw, &s->ipv6.addr.raw,
129                                 &s->ipv6.mask.raw) != 0) {
130                         rc[k] = ENOENT;
131                         rp[k] = pkt[i];
132                         k++;
133                 } else {
134                         mb[n] = pkt[i];
135                         n++;
136                 }
137         }
138
139         return rx_stream(s, mb, rp + k, rc + k, n);
140 }
141
142 static inline uint16_t
143 rx_stream4(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
144         union ipv4_addrs addr[], union udp_ports port[],
145         struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
146 {
147         uint32_t i, k, n;
148         void *mb[num];
149
150         k = 0;
151         n = 0;
152
153         for (i = 0; i != num; i++) {
154
155                 if ((addr[i].raw & s->ipv4.mask.raw) != s->ipv4.addr.raw ||
156                                 (port[i].raw & s->pmsk.raw) !=
157                                 s->port.raw) {
158                         rc[k] = ENOENT;
159                         rp[k] = pkt[i];
160                         k++;
161                 } else {
162                         mb[n] = pkt[i];
163                         n++;
164                 }
165         }
166
167         return rx_stream(s, mb, rp + k, rc + k, n);
168 }
169
170 uint16_t
171 tle_udp_rx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[],
172         struct rte_mbuf *rp[], int32_t rc[], uint16_t num)
173 {
174         struct tle_udp_stream *s;
175         uint32_t i, j, k, n, p, t;
176         union udp_ports tp[num], port[num];
177         union ipv4_addrs a4[num];
178         union ipv6_addrs *pa6[num];
179
180         for (i = 0; i != num; i++)
181                 tp[i] = pkt_info(dev, pkt[i], &port[i], &a4[i], &pa6[i]);
182
183         k = 0;
184         for (i = 0; i != num; i = j) {
185
186                 for (j = i + 1; j != num && tp[j].raw == tp[i].raw; j++)
187                         ;
188
189                 t = tp[i].src;
190                 p = tp[i].dst;
191                 s = rx_stream_obtain(dev, t, p);
192                 if (s != NULL) {
193
194                         if (t == TLE_UDP_V4)
195                                 n = rx_stream4(s, pkt + i, a4 + i,
196                                         port + i, rp + k, rc + k, j - i);
197                         else
198                                 n = rx_stream6(s, pkt + i, pa6 + i, port + i,
199                                         rp + k, rc + k, j - i);
200
201                         k += j - i - n;
202
203                         if (s->rx.ev != NULL)
204                                 tle_event_raise(s->rx.ev);
205                         rwl_release(&s->rx.use);
206
207                 } else {
208                         for (; i != j; i++) {
209                                 rc[k] = ENOENT;
210                                 rp[k] = pkt[i];
211                                 k++;
212                         }
213                 }
214         }
215
216         return num - k;
217 }
218
219 static inline void
220 stream_drb_release(struct tle_udp_stream *s, struct tle_drb * drb[],
221         uint32_t nb_drb)
222 {
223         uint32_t n;
224
225         n = rte_ring_count(s->tx.drb.r);
226         rte_ring_enqueue_burst(s->tx.drb.r, (void **)drb, nb_drb);
227
228         /* If stream is still open, then mark it as avaialble for writing. */
229         if (rwl_try_acquire(&s->tx.use) > 0) {
230
231                 if (s->tx.ev != NULL)
232                         tle_event_raise(s->tx.ev);
233
234                 /* if stream send buffer was full invoke TX callback */
235                 else if (s->tx.cb.func != NULL && n == 0)
236                         s->tx.cb.func(s->tx.cb.data, s);
237
238         }
239
240         rwl_release(&s->tx.use);
241 }
242
243 uint16_t
244 tle_udp_tx_bulk(struct tle_udp_dev *dev, struct rte_mbuf *pkt[], uint16_t num)
245 {
246         uint32_t i, j, k, n;
247         struct tle_drb *drb[num];
248         struct tle_udp_stream *s;
249
250         /* extract packets from device TX queue. */
251
252         k = num;
253         n = tle_dring_sc_dequeue(&dev->tx.dr, (const void **)(uintptr_t)pkt,
254                 num, drb, &k);
255
256         if (n == 0)
257                 return 0;
258
259         /* free empty drbs and notify related streams. */
260
261         for (i = 0; i != k; i = j) {
262                 s = drb[i]->udata;
263                 for (j = i + 1; j != k && s == drb[i]->udata; j++)
264                         ;
265                 stream_drb_release(s, drb + i, j - i);
266         }
267
268         return n;
269 }
270
271 static int
272 check_pkt_csum(const struct rte_mbuf *m, uint32_t type)
273 {
274         const struct ipv4_hdr *l3h4;
275         const struct ipv6_hdr *l3h6;
276         const struct udp_hdr *l4h;
277         int32_t ret;
278         uint16_t csum;
279
280         ret = 0;
281         l3h4 = rte_pktmbuf_mtod_offset(m, const struct ipv4_hdr *, m->l2_len);
282         l3h6 = rte_pktmbuf_mtod_offset(m, const struct ipv6_hdr *, m->l2_len);
283
284         if ((m->ol_flags & PKT_RX_IP_CKSUM_BAD) != 0) {
285                 csum = _ipv4x_cksum(l3h4, m->l3_len);
286                 ret = (csum != UINT16_MAX);
287         }
288
289         if (ret == 0 && (m->ol_flags & PKT_RX_L4_CKSUM_BAD) != 0) {
290
291                 /*
292                  * for IPv4 it is allowed to have zero UDP cksum,
293                  * for IPv6 valid UDP cksum is mandatory.
294                  */
295                 if (type == TLE_UDP_V4) {
296                         l4h = (const struct udp_hdr *)((uintptr_t)l3h4 +
297                                 m->l3_len);
298                         csum = (l4h->dgram_cksum == 0) ? UINT16_MAX :
299                                 _ipv4_udptcp_mbuf_cksum(m,
300                                 m->l2_len + m->l3_len, l3h4);
301                 } else
302                         csum = _ipv6_udptcp_mbuf_cksum(m,
303                                 m->l2_len + m->l3_len, l3h6);
304
305                 ret = (csum != UINT16_MAX);
306         }
307
308         return ret;
309 }
310
311 /* exclude NULLs from the final list of packets. */
312 static inline uint32_t
313 compress_pkt_list(struct rte_mbuf *pkt[], uint32_t nb_pkt, uint32_t nb_zero)
314 {
315         uint32_t i, j, k, l;
316
317         for (j = nb_pkt; nb_zero != 0 && j-- != 0; ) {
318
319                 /* found a hole. */
320                 if (pkt[j] == NULL) {
321
322                         /* find how big is it. */
323                         for (i = j; i-- != 0 && pkt[i] == NULL; )
324                                 ;
325                         /* fill the hole. */
326                         for (k = j + 1, l = i + 1; k != nb_pkt; k++, l++)
327                                 pkt[l] = pkt[k];
328
329                         nb_pkt -= j - i;
330                         nb_zero -= j - i;
331                 }
332         }
333
334         return nb_pkt;
335 }
336
337 /*
338  * helper function, do the necessary pre-processing for the received packets
339  * before handiing them to the strem_recv caller.
340  */
341 static inline uint32_t
342 recv_pkt_process(struct rte_mbuf *m[], uint32_t num, uint32_t type)
343 {
344         uint32_t i, k;
345         uint64_t f, flg[num], ofl[num];
346
347         for (i = 0; i != num; i++) {
348                 flg[i] = m[i]->ol_flags;
349                 ofl[i] = m[i]->tx_offload;
350         }
351
352         k = 0;
353         for (i = 0; i != num; i++) {
354
355                 f = flg[i] & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD);
356
357                 /* drop packets with invalid cksum(s). */
358                 if (f != 0 && check_pkt_csum(m[i], type) != 0) {
359                         rte_pktmbuf_free(m[i]);
360                         m[i] = NULL;
361                         k++;
362                 }
363
364                 m[i]->ol_flags ^= f;
365                 rte_pktmbuf_adj(m[i], _tx_offload_l4_offset(ofl[i]));
366         }
367
368         return k;
369 }
370
371 uint16_t
372 tle_udp_stream_recv(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
373         uint16_t num)
374 {
375         uint32_t k, n;
376
377         n = rte_ring_mc_dequeue_burst(s->rx.q, (void **)pkt, num);
378         if (n == 0)
379                 return 0;
380
381         /*
382          * if we still have packets to read,
383          * then rearm stream RX event.
384          */
385         if (n == num && rte_ring_count(s->rx.q) != 0) {
386                 if (rwl_try_acquire(&s->rx.use) > 0 && s->rx.ev != NULL)
387                         tle_event_raise(s->rx.ev);
388                 rwl_release(&s->rx.use);
389         }
390
391         k = recv_pkt_process(pkt, n, s->type);
392         return compress_pkt_list(pkt, n, k);
393 }
394
395 static int32_t
396 udp_get_dest(struct tle_udp_stream *s, const void *dst_addr,
397         struct tle_udp_dest *dst)
398 {
399         int32_t rc;
400         const struct in_addr *d4;
401         const struct in6_addr *d6;
402         struct tle_udp_ctx *ctx;
403         struct tle_udp_dev *dev;
404
405         ctx = s->ctx;
406
407         /* it is here just to keep gcc happy. */
408         d4 = NULL;
409
410         if (s->type == TLE_UDP_V4) {
411                 d4 = dst_addr;
412                 rc = ctx->prm.lookup4(ctx->prm.lookup4_data, d4, dst);
413         } else if (s->type == TLE_UDP_V6) {
414                 d6 = dst_addr;
415                 rc = ctx->prm.lookup6(ctx->prm.lookup6_data, d6, dst);
416         } else
417                 rc = -ENOENT;
418
419         if (rc < 0 || dst->dev == NULL || dst->dev->ctx != ctx)
420                 return -ENOENT;
421
422         dev = dst->dev;
423         if (s->type == TLE_UDP_V4) {
424                 struct ipv4_hdr *l3h;
425                 l3h = (struct ipv4_hdr *)(dst->hdr + dst->l2_len);
426                 l3h->src_addr = dev->prm.local_addr4.s_addr;
427                 l3h->dst_addr = d4->s_addr;
428         } else {
429                 struct ipv6_hdr *l3h;
430                 l3h = (struct ipv6_hdr *)(dst->hdr + dst->l2_len);
431                 rte_memcpy(l3h->src_addr, &dev->prm.local_addr6,
432                         sizeof(l3h->src_addr));
433                 rte_memcpy(l3h->dst_addr, d6, sizeof(l3h->dst_addr));
434         }
435
436         return dev - ctx->dev;
437 }
438
439 static inline int
440 udp_fill_mbuf(struct rte_mbuf *m,
441         uint32_t type, uint64_t ol_flags, uint32_t pid,
442         union udph udph, const struct tle_udp_dest *dst)
443 {
444         uint32_t len, plen;
445         char *l2h;
446         union udph *l4h;
447
448         len = dst->l2_len + dst->l3_len;
449         plen = m->pkt_len;
450
451         /* copy to mbuf L2/L3 header template. */
452
453         l2h = rte_pktmbuf_prepend(m, len + sizeof(*l4h));
454         if (l2h == NULL)
455                 return -ENOBUFS;
456
457         /* copy L2/L3 header */
458         rte_memcpy(l2h, dst->hdr, len);
459
460         /* copy UDP header */
461         l4h = (union udph *)(l2h + len);
462         l4h->raw = udph.raw;
463
464         /* setup mbuf TX offload related fields. */
465         m->tx_offload = _mbuf_tx_offload(dst->l2_len, dst->l3_len,
466                 sizeof(*l4h), 0, 0, 0);
467         m->ol_flags |= ol_flags;
468
469         l4h->len = rte_cpu_to_be_16(plen + sizeof(*l4h));
470
471         /* update proto specific fields. */
472
473         if (type == TLE_UDP_V4) {
474                 struct ipv4_hdr *l3h;
475                 l3h = (struct ipv4_hdr *)(l2h + dst->l2_len);
476                 l3h->packet_id = rte_cpu_to_be_16(pid);
477                 l3h->total_length = rte_cpu_to_be_16(plen + dst->l3_len +
478                         sizeof(*l4h));
479
480                 if ((ol_flags & PKT_TX_UDP_CKSUM) != 0)
481                         l4h->cksum = _ipv4x_phdr_cksum(l3h, m->l3_len,
482                                 ol_flags);
483                 else
484                         l4h->cksum = _ipv4_udptcp_mbuf_cksum(m, len, l3h);
485
486                 if ((ol_flags & PKT_TX_IP_CKSUM) == 0)
487                         l3h->hdr_checksum = _ipv4x_cksum(l3h, m->l3_len);
488         } else {
489                 struct ipv6_hdr *l3h;
490                 l3h = (struct ipv6_hdr *)(l2h + dst->l2_len);
491                 l3h->payload_len = rte_cpu_to_be_16(plen + sizeof(*l4h));
492                 if ((ol_flags & PKT_TX_UDP_CKSUM) != 0)
493                         l4h->cksum = rte_ipv6_phdr_cksum(l3h, ol_flags);
494                 else
495                         l4h->cksum = _ipv6_udptcp_mbuf_cksum(m, len, l3h);
496         }
497
498         return 0;
499 }
500
501 /* ???
502  * probably this function should be there -
503  * rte_ipv[4,6]_fragment_packet should do that.
504  */
505 static inline void
506 frag_fixup(const struct rte_mbuf *ms, struct rte_mbuf *mf, uint32_t type)
507 {
508         struct ipv4_hdr *l3h;
509
510         mf->ol_flags = ms->ol_flags;
511         mf->tx_offload = ms->tx_offload;
512
513         if (type == TLE_UDP_V4 && (ms->ol_flags & PKT_TX_IP_CKSUM) == 0) {
514                 l3h = rte_pktmbuf_mtod(mf, struct ipv4_hdr *);
515                 l3h->hdr_checksum = _ipv4x_cksum(l3h, mf->l3_len);
516         }
517 }
518
519 /*
520  * Returns negative for failure to fragment or actual number of fragments.
521  */
522 static inline int
523 fragment(struct rte_mbuf *pkt, struct rte_mbuf *frag[], uint32_t num,
524         uint32_t type, const struct tle_udp_dest *dst)
525 {
526         int32_t frag_num, i;
527         uint16_t mtu;
528         void *eth_hdr;
529
530         /* Remove the Ethernet header from the input packet */
531         rte_pktmbuf_adj(pkt, dst->l2_len);
532         mtu = dst->mtu - dst->l2_len;
533
534         /* fragment packet */
535         if (type == TLE_UDP_V4)
536                 frag_num = rte_ipv4_fragment_packet(pkt, frag, num, mtu,
537                         dst->head_mp, dst->head_mp);
538         else
539                 frag_num = rte_ipv6_fragment_packet(pkt, frag, num, mtu,
540                         dst->head_mp, dst->head_mp);
541
542         if (frag_num > 0) {
543                 for (i = 0; i != frag_num; i++) {
544
545                         frag_fixup(pkt, frag[i], type);
546
547                         /* Move data_off to include l2 header first */
548                         eth_hdr = rte_pktmbuf_prepend(frag[i], dst->l2_len);
549
550                         /* copy l2 header into fragment */
551                         rte_memcpy(eth_hdr, dst->hdr, dst->l2_len);
552                 }
553         }
554
555         return frag_num;
556 }
557
558 static inline void
559 stream_drb_free(struct tle_udp_stream *s, struct tle_drb *drbs[],
560         uint32_t nb_drb)
561 {
562         rte_ring_enqueue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
563 }
564
565 static inline uint32_t
566 stream_drb_alloc(struct tle_udp_stream *s, struct tle_drb *drbs[],
567         uint32_t nb_drb)
568 {
569         return rte_ring_dequeue_burst(s->tx.drb.r, (void **)drbs, nb_drb);
570 }
571
572 /* enqueue up to num packets to the destination device queue. */
573 static inline uint16_t
574 queue_pkt_out(struct tle_udp_stream *s, struct tle_udp_dev *dev,
575                 const void *pkt[], uint16_t nb_pkt,
576                 struct tle_drb *drbs[], uint32_t *nb_drb)
577 {
578         uint32_t bsz, i, n, nb, nbc, nbm;
579
580         bsz = s->tx.drb.nb_elem;
581
582         /* calulate how many drbs are needed.*/
583         nbc = *nb_drb;
584         nbm = (nb_pkt + bsz - 1) / bsz;
585         nb = RTE_MAX(nbm, nbc) - nbc;
586
587         /* allocate required drbs */
588         if (nb != 0)
589                 nb = stream_drb_alloc(s, drbs + nbc, nb);
590
591         nb += nbc;
592
593         /* no free drbs, can't send anything */
594         if (nb == 0)
595                 return 0;
596
597         /* not enough free drbs, reduce number of packets to send. */
598         else if (nb != nbm)
599                 nb_pkt = nb * bsz;
600
601         /* enqueue packets to the destination device. */
602         nbc = nb;
603         n = tle_dring_mp_enqueue(&dev->tx.dr, pkt, nb_pkt, drbs, &nb);
604
605         /* if not all available drbs were consumed, move them to the start. */
606         nbc -= nb;
607         for (i = 0; i != nb; i++)
608                 drbs[i] = drbs[nbc + i];
609
610         *nb_drb = nb;
611         return n;
612 }
613
614 uint16_t
615 tle_udp_stream_send(struct tle_udp_stream *s, struct rte_mbuf *pkt[],
616         uint16_t num, const struct sockaddr *dst_addr)
617 {
618         int32_t di, frg, rc;
619         uint64_t ol_flags;
620         uint32_t i, k, n, nb;
621         uint32_t mtu, pid, type;
622         const struct sockaddr_in *d4;
623         const struct sockaddr_in6 *d6;
624         const void *da;
625         union udph udph;
626         struct tle_udp_dest dst;
627         struct tle_drb *drb[num];
628
629         type = s->type;
630
631         /* start filling UDP header. */
632         udph.raw = 0;
633         udph.ports.src = s->port.dst;
634
635         /* figure out what destination addr/port to use. */
636         if (dst_addr != NULL) {
637                 if (dst_addr->sa_family != s->prm.remote_addr.ss_family) {
638                         rte_errno = EINVAL;
639                         return 0;
640                 }
641                 if (type == TLE_UDP_V4) {
642                         d4 = (const struct sockaddr_in *)dst_addr;
643                         da = &d4->sin_addr;
644                         udph.ports.dst = d4->sin_port;
645                 } else {
646                         d6 = (const struct sockaddr_in6 *)dst_addr;
647                         da = &d6->sin6_addr;
648                         udph.ports.dst = d6->sin6_port;
649                 }
650         } else {
651                 udph.ports.dst = s->port.src;
652                 if (type == TLE_UDP_V4)
653                         da = &s->ipv4.addr.src;
654                 else
655                         da = &s->ipv6.addr.src;
656         }
657
658         di = udp_get_dest(s, da, &dst);
659         if (di < 0) {
660                 rte_errno = -di;
661                 return 0;
662         }
663
664         pid = rte_atomic32_add_return(&dst.dev->tx.packet_id[type], num) - num;
665         mtu = dst.mtu - dst.l2_len - dst.l3_len;
666
667         /* mark stream as not closable. */
668         if (rwl_acquire(&s->tx.use) < 0)
669                 return 0;
670
671         nb = 0;
672         for (i = 0, k = 0; k != num; k = i) {
673
674                 /* copy L2/L3/L4 headers into mbufs, setup mbufs metadata. */
675
676                 frg = 0;
677                 ol_flags = dst.dev->tx.ol_flags[type];
678
679                 while (i != num && frg == 0) {
680                         frg = pkt[i]->pkt_len > mtu;
681                         if (frg != 0)
682                                 ol_flags &= ~PKT_TX_UDP_CKSUM;
683                         rc = udp_fill_mbuf(pkt[i], type, ol_flags, pid + i,
684                                 udph, &dst);
685                         if (rc != 0) {
686                                 rte_errno = -rc;
687                                 goto out;
688                         }
689                         i += (frg == 0);
690                 }
691
692                 /* enqueue non-fragment packets to the destination device. */
693                 if (k != i) {
694                         k += queue_pkt_out(s, dst.dev,
695                                 (const void **)(uintptr_t)&pkt[k], i - k,
696                                 drb, &nb);
697
698                         /* stream TX queue is full. */
699                         if (k != i)
700                                 break;
701                 }
702
703                 /* enqueue packet that need to be fragmented */
704                 if (i != num) {
705
706                         struct rte_mbuf *frag[RTE_LIBRTE_IP_FRAG_MAX_FRAG];
707
708                         /* fragment the packet. */
709                         rc = fragment(pkt[i], frag, RTE_DIM(frag), type, &dst);
710                         if (rc < 0) {
711                                 rte_errno = -rc;
712                                 break;
713                         }
714
715                         n = queue_pkt_out(s, dst.dev,
716                                 (const void **)(uintptr_t)frag, rc, drb, &nb);
717                         if (n == 0) {
718                                 while (rc-- != 0)
719                                         rte_pktmbuf_free(frag[rc]);
720                                 break;
721                         }
722
723                         /* all fragments enqueued, free the original packet. */
724                         rte_pktmbuf_free(pkt[i]);
725                         i++;
726                 }
727         }
728
729         /* if possible, rearm socket write event. */
730         if (k == num && s->tx.ev != NULL)
731                 tle_event_raise(s->tx.ev);
732
733 out:
734         /* free unused drbs. */
735         if (nb != 0)
736                 stream_drb_free(s, drb, nb);
737
738         /* stream can be closed. */
739         rwl_release(&s->tx.use);
740
741         /*
742          * remove pkt l2/l3 headers, restore ol_flags for unsent, but
743          * already modified packets.
744          */
745         ol_flags = ~dst.dev->tx.ol_flags[type];
746         for (n = k; n != i; n++) {
747                 rte_pktmbuf_adj(pkt[n], dst.l2_len + dst.l3_len + sizeof(udph));
748                 pkt[n]->ol_flags &= ol_flags;
749         }
750
751         return k;
752 }