Imported Upstream version 16.04
[deb_dpdk.git] / lib / librte_port / rte_port_ring.c
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 #include <string.h>
34 #include <stdint.h>
35
36 #include <rte_mbuf.h>
37 #include <rte_ring.h>
38 #include <rte_malloc.h>
39
40 #include "rte_port_ring.h"
41
42 /*
43  * Port RING Reader
44  */
45 #ifdef RTE_PORT_STATS_COLLECT
46
47 #define RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(port, val) \
48         port->stats.n_pkts_in += val
49 #define RTE_PORT_RING_READER_STATS_PKTS_DROP_ADD(port, val) \
50         port->stats.n_pkts_drop += val
51
52 #else
53
54 #define RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(port, val)
55 #define RTE_PORT_RING_READER_STATS_PKTS_DROP_ADD(port, val)
56
57 #endif
58
59 struct rte_port_ring_reader {
60         struct rte_port_in_stats stats;
61
62         struct rte_ring *ring;
63 };
64
65 static void *
66 rte_port_ring_reader_create_internal(void *params, int socket_id,
67         uint32_t is_multi)
68 {
69         struct rte_port_ring_reader_params *conf =
70                         (struct rte_port_ring_reader_params *) params;
71         struct rte_port_ring_reader *port;
72
73         /* Check input parameters */
74         if ((conf == NULL) ||
75                 (conf->ring == NULL) ||
76                 (conf->ring->cons.sc_dequeue && is_multi) ||
77                 (!(conf->ring->cons.sc_dequeue) && !is_multi)) {
78                 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
79                 return NULL;
80         }
81
82         /* Memory allocation */
83         port = rte_zmalloc_socket("PORT", sizeof(*port),
84                         RTE_CACHE_LINE_SIZE, socket_id);
85         if (port == NULL) {
86                 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
87                 return NULL;
88         }
89
90         /* Initialization */
91         port->ring = conf->ring;
92
93         return port;
94 }
95
96 static void *
97 rte_port_ring_reader_create(void *params, int socket_id)
98 {
99         return rte_port_ring_reader_create_internal(params, socket_id, 0);
100 }
101
102 static void *
103 rte_port_ring_multi_reader_create(void *params, int socket_id)
104 {
105         return rte_port_ring_reader_create_internal(params, socket_id, 1);
106 }
107
108 static int
109 rte_port_ring_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts)
110 {
111         struct rte_port_ring_reader *p = (struct rte_port_ring_reader *) port;
112         uint32_t nb_rx;
113
114         nb_rx = rte_ring_sc_dequeue_burst(p->ring, (void **) pkts, n_pkts);
115         RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx);
116
117         return nb_rx;
118 }
119
120 static int
121 rte_port_ring_multi_reader_rx(void *port, struct rte_mbuf **pkts,
122         uint32_t n_pkts)
123 {
124         struct rte_port_ring_reader *p = (struct rte_port_ring_reader *) port;
125         uint32_t nb_rx;
126
127         nb_rx = rte_ring_mc_dequeue_burst(p->ring, (void **) pkts, n_pkts);
128         RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx);
129
130         return nb_rx;
131 }
132
133 static int
134 rte_port_ring_reader_free(void *port)
135 {
136         if (port == NULL) {
137                 RTE_LOG(ERR, PORT, "%s: port is NULL\n", __func__);
138                 return -EINVAL;
139         }
140
141         rte_free(port);
142
143         return 0;
144 }
145
146 static int
147 rte_port_ring_reader_stats_read(void *port,
148                 struct rte_port_in_stats *stats, int clear)
149 {
150         struct rte_port_ring_reader *p =
151                 (struct rte_port_ring_reader *) port;
152
153         if (stats != NULL)
154                 memcpy(stats, &p->stats, sizeof(p->stats));
155
156         if (clear)
157                 memset(&p->stats, 0, sizeof(p->stats));
158
159         return 0;
160 }
161
162 /*
163  * Port RING Writer
164  */
165 #ifdef RTE_PORT_STATS_COLLECT
166
167 #define RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(port, val) \
168         port->stats.n_pkts_in += val
169 #define RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(port, val) \
170         port->stats.n_pkts_drop += val
171
172 #else
173
174 #define RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(port, val)
175 #define RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(port, val)
176
177 #endif
178
179 struct rte_port_ring_writer {
180         struct rte_port_out_stats stats;
181
182         struct rte_mbuf *tx_buf[2 * RTE_PORT_IN_BURST_SIZE_MAX];
183         struct rte_ring *ring;
184         uint32_t tx_burst_sz;
185         uint32_t tx_buf_count;
186         uint64_t bsz_mask;
187         uint32_t is_multi;
188 };
189
190 static void *
191 rte_port_ring_writer_create_internal(void *params, int socket_id,
192         uint32_t is_multi)
193 {
194         struct rte_port_ring_writer_params *conf =
195                         (struct rte_port_ring_writer_params *) params;
196         struct rte_port_ring_writer *port;
197
198         /* Check input parameters */
199         if ((conf == NULL) ||
200                 (conf->ring == NULL) ||
201                 (conf->ring->prod.sp_enqueue && is_multi) ||
202                 (!(conf->ring->prod.sp_enqueue) && !is_multi) ||
203                 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
204                 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
205                 return NULL;
206         }
207
208         /* Memory allocation */
209         port = rte_zmalloc_socket("PORT", sizeof(*port),
210                         RTE_CACHE_LINE_SIZE, socket_id);
211         if (port == NULL) {
212                 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
213                 return NULL;
214         }
215
216         /* Initialization */
217         port->ring = conf->ring;
218         port->tx_burst_sz = conf->tx_burst_sz;
219         port->tx_buf_count = 0;
220         port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
221         port->is_multi = is_multi;
222
223         return port;
224 }
225
226 static void *
227 rte_port_ring_writer_create(void *params, int socket_id)
228 {
229         return rte_port_ring_writer_create_internal(params, socket_id, 0);
230 }
231
232 static void *
233 rte_port_ring_multi_writer_create(void *params, int socket_id)
234 {
235         return rte_port_ring_writer_create_internal(params, socket_id, 1);
236 }
237
238 static inline void
239 send_burst(struct rte_port_ring_writer *p)
240 {
241         uint32_t nb_tx;
242
243         nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
244                         p->tx_buf_count);
245
246         RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
247         for ( ; nb_tx < p->tx_buf_count; nb_tx++)
248                 rte_pktmbuf_free(p->tx_buf[nb_tx]);
249
250         p->tx_buf_count = 0;
251 }
252
253 static inline void
254 send_burst_mp(struct rte_port_ring_writer *p)
255 {
256         uint32_t nb_tx;
257
258         nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
259                         p->tx_buf_count);
260
261         RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
262         for ( ; nb_tx < p->tx_buf_count; nb_tx++)
263                 rte_pktmbuf_free(p->tx_buf[nb_tx]);
264
265         p->tx_buf_count = 0;
266 }
267
268 static int
269 rte_port_ring_writer_tx(void *port, struct rte_mbuf *pkt)
270 {
271         struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
272
273         p->tx_buf[p->tx_buf_count++] = pkt;
274         RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
275         if (p->tx_buf_count >= p->tx_burst_sz)
276                 send_burst(p);
277
278         return 0;
279 }
280
281 static int
282 rte_port_ring_multi_writer_tx(void *port, struct rte_mbuf *pkt)
283 {
284         struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
285
286         p->tx_buf[p->tx_buf_count++] = pkt;
287         RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
288         if (p->tx_buf_count >= p->tx_burst_sz)
289                 send_burst_mp(p);
290
291         return 0;
292 }
293
294 static inline int __attribute__((always_inline))
295 rte_port_ring_writer_tx_bulk_internal(void *port,
296                 struct rte_mbuf **pkts,
297                 uint64_t pkts_mask,
298                 uint32_t is_multi)
299 {
300         struct rte_port_ring_writer *p =
301                 (struct rte_port_ring_writer *) port;
302
303         uint64_t bsz_mask = p->bsz_mask;
304         uint32_t tx_buf_count = p->tx_buf_count;
305         uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
306                         ((pkts_mask & bsz_mask) ^ bsz_mask);
307
308         if (expr == 0) {
309                 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
310                 uint32_t n_pkts_ok;
311
312                 if (tx_buf_count) {
313                         if (is_multi)
314                                 send_burst_mp(p);
315                         else
316                                 send_burst(p);
317                 }
318
319                 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
320                 if (is_multi)
321                         n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring, (void **)pkts,
322                                 n_pkts);
323                 else
324                         n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring, (void **)pkts,
325                                 n_pkts);
326
327                 RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, n_pkts - n_pkts_ok);
328                 for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) {
329                         struct rte_mbuf *pkt = pkts[n_pkts_ok];
330
331                         rte_pktmbuf_free(pkt);
332                 }
333         } else {
334                 for ( ; pkts_mask; ) {
335                         uint32_t pkt_index = __builtin_ctzll(pkts_mask);
336                         uint64_t pkt_mask = 1LLU << pkt_index;
337                         struct rte_mbuf *pkt = pkts[pkt_index];
338
339                         p->tx_buf[tx_buf_count++] = pkt;
340                         RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
341                         pkts_mask &= ~pkt_mask;
342                 }
343
344                 p->tx_buf_count = tx_buf_count;
345                 if (tx_buf_count >= p->tx_burst_sz) {
346                         if (is_multi)
347                                 send_burst_mp(p);
348                         else
349                                 send_burst(p);
350                 }
351         }
352
353         return 0;
354 }
355
356 static int
357 rte_port_ring_writer_tx_bulk(void *port,
358                 struct rte_mbuf **pkts,
359                 uint64_t pkts_mask)
360 {
361         return rte_port_ring_writer_tx_bulk_internal(port, pkts, pkts_mask, 0);
362 }
363
364 static int
365 rte_port_ring_multi_writer_tx_bulk(void *port,
366                 struct rte_mbuf **pkts,
367                 uint64_t pkts_mask)
368 {
369         return rte_port_ring_writer_tx_bulk_internal(port, pkts, pkts_mask, 1);
370 }
371
372 static int
373 rte_port_ring_writer_flush(void *port)
374 {
375         struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
376
377         if (p->tx_buf_count > 0)
378                 send_burst(p);
379
380         return 0;
381 }
382
383 static int
384 rte_port_ring_multi_writer_flush(void *port)
385 {
386         struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
387
388         if (p->tx_buf_count > 0)
389                 send_burst_mp(p);
390
391         return 0;
392 }
393
394 static int
395 rte_port_ring_writer_free(void *port)
396 {
397         struct rte_port_ring_writer *p = (struct rte_port_ring_writer *) port;
398
399         if (port == NULL) {
400                 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
401                 return -EINVAL;
402         }
403
404         if (p->is_multi)
405                 rte_port_ring_multi_writer_flush(port);
406         else
407                 rte_port_ring_writer_flush(port);
408
409         rte_free(port);
410
411         return 0;
412 }
413
414 static int
415 rte_port_ring_writer_stats_read(void *port,
416                 struct rte_port_out_stats *stats, int clear)
417 {
418         struct rte_port_ring_writer *p =
419                 (struct rte_port_ring_writer *) port;
420
421         if (stats != NULL)
422                 memcpy(stats, &p->stats, sizeof(p->stats));
423
424         if (clear)
425                 memset(&p->stats, 0, sizeof(p->stats));
426
427         return 0;
428 }
429
430 /*
431  * Port RING Writer Nodrop
432  */
433 #ifdef RTE_PORT_STATS_COLLECT
434
435 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val) \
436         port->stats.n_pkts_in += val
437 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val) \
438         port->stats.n_pkts_drop += val
439
440 #else
441
442 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val)
443 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val)
444
445 #endif
446
447 struct rte_port_ring_writer_nodrop {
448         struct rte_port_out_stats stats;
449
450         struct rte_mbuf *tx_buf[2 * RTE_PORT_IN_BURST_SIZE_MAX];
451         struct rte_ring *ring;
452         uint32_t tx_burst_sz;
453         uint32_t tx_buf_count;
454         uint64_t bsz_mask;
455         uint64_t n_retries;
456         uint32_t is_multi;
457 };
458
459 static void *
460 rte_port_ring_writer_nodrop_create_internal(void *params, int socket_id,
461         uint32_t is_multi)
462 {
463         struct rte_port_ring_writer_nodrop_params *conf =
464                         (struct rte_port_ring_writer_nodrop_params *) params;
465         struct rte_port_ring_writer_nodrop *port;
466
467         /* Check input parameters */
468         if ((conf == NULL) ||
469                 (conf->ring == NULL) ||
470                 (conf->ring->prod.sp_enqueue && is_multi) ||
471                 (!(conf->ring->prod.sp_enqueue) && !is_multi) ||
472                 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
473                 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
474                 return NULL;
475         }
476
477         /* Memory allocation */
478         port = rte_zmalloc_socket("PORT", sizeof(*port),
479                         RTE_CACHE_LINE_SIZE, socket_id);
480         if (port == NULL) {
481                 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
482                 return NULL;
483         }
484
485         /* Initialization */
486         port->ring = conf->ring;
487         port->tx_burst_sz = conf->tx_burst_sz;
488         port->tx_buf_count = 0;
489         port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
490         port->is_multi = is_multi;
491
492         /*
493          * When n_retries is 0 it means that we should wait for every packet to
494          * send no matter how many retries should it take. To limit number of
495          * branches in fast path, we use UINT64_MAX instead of branching.
496          */
497         port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
498
499         return port;
500 }
501
502 static void *
503 rte_port_ring_writer_nodrop_create(void *params, int socket_id)
504 {
505         return rte_port_ring_writer_nodrop_create_internal(params, socket_id, 0);
506 }
507
508 static void *
509 rte_port_ring_multi_writer_nodrop_create(void *params, int socket_id)
510 {
511         return rte_port_ring_writer_nodrop_create_internal(params, socket_id, 1);
512 }
513
514 static inline void
515 send_burst_nodrop(struct rte_port_ring_writer_nodrop *p)
516 {
517         uint32_t nb_tx = 0, i;
518
519         nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
520                                 p->tx_buf_count);
521
522         /* We sent all the packets in a first try */
523         if (nb_tx >= p->tx_buf_count) {
524                 p->tx_buf_count = 0;
525                 return;
526         }
527
528         for (i = 0; i < p->n_retries; i++) {
529                 nb_tx += rte_ring_sp_enqueue_burst(p->ring,
530                                 (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
531
532                 /* We sent all the packets in more than one try */
533                 if (nb_tx >= p->tx_buf_count) {
534                         p->tx_buf_count = 0;
535                         return;
536                 }
537         }
538
539         /* We didn't send the packets in maximum allowed attempts */
540         RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
541         for ( ; nb_tx < p->tx_buf_count; nb_tx++)
542                 rte_pktmbuf_free(p->tx_buf[nb_tx]);
543
544         p->tx_buf_count = 0;
545 }
546
547 static inline void
548 send_burst_mp_nodrop(struct rte_port_ring_writer_nodrop *p)
549 {
550         uint32_t nb_tx = 0, i;
551
552         nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
553                                 p->tx_buf_count);
554
555         /* We sent all the packets in a first try */
556         if (nb_tx >= p->tx_buf_count) {
557                 p->tx_buf_count = 0;
558                 return;
559         }
560
561         for (i = 0; i < p->n_retries; i++) {
562                 nb_tx += rte_ring_mp_enqueue_burst(p->ring,
563                                 (void **) (p->tx_buf + nb_tx), p->tx_buf_count - nb_tx);
564
565                 /* We sent all the packets in more than one try */
566                 if (nb_tx >= p->tx_buf_count) {
567                         p->tx_buf_count = 0;
568                         return;
569                 }
570         }
571
572         /* We didn't send the packets in maximum allowed attempts */
573         RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
574         for ( ; nb_tx < p->tx_buf_count; nb_tx++)
575                 rte_pktmbuf_free(p->tx_buf[nb_tx]);
576
577         p->tx_buf_count = 0;
578 }
579
580 static int
581 rte_port_ring_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
582 {
583         struct rte_port_ring_writer_nodrop *p =
584                         (struct rte_port_ring_writer_nodrop *) port;
585
586         p->tx_buf[p->tx_buf_count++] = pkt;
587         RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
588         if (p->tx_buf_count >= p->tx_burst_sz)
589                 send_burst_nodrop(p);
590
591         return 0;
592 }
593
594 static int
595 rte_port_ring_multi_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
596 {
597         struct rte_port_ring_writer_nodrop *p =
598                         (struct rte_port_ring_writer_nodrop *) port;
599
600         p->tx_buf[p->tx_buf_count++] = pkt;
601         RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
602         if (p->tx_buf_count >= p->tx_burst_sz)
603                 send_burst_mp_nodrop(p);
604
605         return 0;
606 }
607
608 static inline int __attribute__((always_inline))
609 rte_port_ring_writer_nodrop_tx_bulk_internal(void *port,
610                 struct rte_mbuf **pkts,
611                 uint64_t pkts_mask,
612                 uint32_t is_multi)
613 {
614         struct rte_port_ring_writer_nodrop *p =
615                 (struct rte_port_ring_writer_nodrop *) port;
616
617         uint64_t bsz_mask = p->bsz_mask;
618         uint32_t tx_buf_count = p->tx_buf_count;
619         uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
620                         ((pkts_mask & bsz_mask) ^ bsz_mask);
621
622         if (expr == 0) {
623                 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
624                 uint32_t n_pkts_ok;
625
626                 if (tx_buf_count) {
627                         if (is_multi)
628                                 send_burst_mp_nodrop(p);
629                         else
630                                 send_burst_nodrop(p);
631                 }
632
633                 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
634                 if (is_multi)
635                         n_pkts_ok =
636                                 rte_ring_mp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
637                 else
638                         n_pkts_ok =
639                                 rte_ring_sp_enqueue_burst(p->ring, (void **)pkts, n_pkts);
640
641                 if (n_pkts_ok >= n_pkts)
642                         return 0;
643
644                 /*
645                  * If we didn't manage to send all packets in single burst, move
646                  * remaining packets to the buffer and call send burst.
647                  */
648                 for (; n_pkts_ok < n_pkts; n_pkts_ok++) {
649                         struct rte_mbuf *pkt = pkts[n_pkts_ok];
650
651                         p->tx_buf[p->tx_buf_count++] = pkt;
652                 }
653                 if (is_multi)
654                         send_burst_mp_nodrop(p);
655                 else
656                         send_burst_nodrop(p);
657         } else {
658                 for ( ; pkts_mask; ) {
659                         uint32_t pkt_index = __builtin_ctzll(pkts_mask);
660                         uint64_t pkt_mask = 1LLU << pkt_index;
661                         struct rte_mbuf *pkt = pkts[pkt_index];
662
663                         p->tx_buf[tx_buf_count++] = pkt;
664                         RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
665                         pkts_mask &= ~pkt_mask;
666                 }
667
668                 p->tx_buf_count = tx_buf_count;
669                 if (tx_buf_count >= p->tx_burst_sz) {
670                         if (is_multi)
671                                 send_burst_mp_nodrop(p);
672                         else
673                                 send_burst_nodrop(p);
674                 }
675         }
676
677         return 0;
678 }
679
680 static int
681 rte_port_ring_writer_nodrop_tx_bulk(void *port,
682                 struct rte_mbuf **pkts,
683                 uint64_t pkts_mask)
684 {
685         return
686                 rte_port_ring_writer_nodrop_tx_bulk_internal(port, pkts, pkts_mask, 0);
687 }
688
689 static int
690 rte_port_ring_multi_writer_nodrop_tx_bulk(void *port,
691                 struct rte_mbuf **pkts,
692                 uint64_t pkts_mask)
693 {
694         return
695                 rte_port_ring_writer_nodrop_tx_bulk_internal(port, pkts, pkts_mask, 1);
696 }
697
698 static int
699 rte_port_ring_writer_nodrop_flush(void *port)
700 {
701         struct rte_port_ring_writer_nodrop *p =
702                         (struct rte_port_ring_writer_nodrop *) port;
703
704         if (p->tx_buf_count > 0)
705                 send_burst_nodrop(p);
706
707         return 0;
708 }
709
710 static int
711 rte_port_ring_multi_writer_nodrop_flush(void *port)
712 {
713         struct rte_port_ring_writer_nodrop *p =
714                         (struct rte_port_ring_writer_nodrop *) port;
715
716         if (p->tx_buf_count > 0)
717                 send_burst_mp_nodrop(p);
718
719         return 0;
720 }
721
722 static int
723 rte_port_ring_writer_nodrop_free(void *port)
724 {
725         struct rte_port_ring_writer_nodrop *p =
726                         (struct rte_port_ring_writer_nodrop *) port;
727
728         if (port == NULL) {
729                 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
730                 return -EINVAL;
731         }
732
733         if (p->is_multi)
734                 rte_port_ring_multi_writer_nodrop_flush(port);
735         else
736                 rte_port_ring_writer_nodrop_flush(port);
737
738         rte_free(port);
739
740         return 0;
741 }
742
743 static int
744 rte_port_ring_writer_nodrop_stats_read(void *port,
745                 struct rte_port_out_stats *stats, int clear)
746 {
747         struct rte_port_ring_writer_nodrop *p =
748                 (struct rte_port_ring_writer_nodrop *) port;
749
750         if (stats != NULL)
751                 memcpy(stats, &p->stats, sizeof(p->stats));
752
753         if (clear)
754                 memset(&p->stats, 0, sizeof(p->stats));
755
756         return 0;
757 }
758
759 /*
760  * Summary of port operations
761  */
762 struct rte_port_in_ops rte_port_ring_reader_ops = {
763         .f_create = rte_port_ring_reader_create,
764         .f_free = rte_port_ring_reader_free,
765         .f_rx = rte_port_ring_reader_rx,
766         .f_stats = rte_port_ring_reader_stats_read,
767 };
768
769 struct rte_port_out_ops rte_port_ring_writer_ops = {
770         .f_create = rte_port_ring_writer_create,
771         .f_free = rte_port_ring_writer_free,
772         .f_tx = rte_port_ring_writer_tx,
773         .f_tx_bulk = rte_port_ring_writer_tx_bulk,
774         .f_flush = rte_port_ring_writer_flush,
775         .f_stats = rte_port_ring_writer_stats_read,
776 };
777
778 struct rte_port_out_ops rte_port_ring_writer_nodrop_ops = {
779         .f_create = rte_port_ring_writer_nodrop_create,
780         .f_free = rte_port_ring_writer_nodrop_free,
781         .f_tx = rte_port_ring_writer_nodrop_tx,
782         .f_tx_bulk = rte_port_ring_writer_nodrop_tx_bulk,
783         .f_flush = rte_port_ring_writer_nodrop_flush,
784         .f_stats = rte_port_ring_writer_nodrop_stats_read,
785 };
786
787 struct rte_port_in_ops rte_port_ring_multi_reader_ops = {
788         .f_create = rte_port_ring_multi_reader_create,
789         .f_free = rte_port_ring_reader_free,
790         .f_rx = rte_port_ring_multi_reader_rx,
791         .f_stats = rte_port_ring_reader_stats_read,
792 };
793
794 struct rte_port_out_ops rte_port_ring_multi_writer_ops = {
795         .f_create = rte_port_ring_multi_writer_create,
796         .f_free = rte_port_ring_writer_free,
797         .f_tx = rte_port_ring_multi_writer_tx,
798         .f_tx_bulk = rte_port_ring_multi_writer_tx_bulk,
799         .f_flush = rte_port_ring_multi_writer_flush,
800         .f_stats = rte_port_ring_writer_stats_read,
801 };
802
803 struct rte_port_out_ops rte_port_ring_multi_writer_nodrop_ops = {
804         .f_create = rte_port_ring_multi_writer_nodrop_create,
805         .f_free = rte_port_ring_writer_nodrop_free,
806         .f_tx = rte_port_ring_multi_writer_nodrop_tx,
807         .f_tx_bulk = rte_port_ring_multi_writer_nodrop_tx_bulk,
808         .f_flush = rte_port_ring_multi_writer_nodrop_flush,
809         .f_stats = rte_port_ring_writer_nodrop_stats_read,
810 };