a4e709c96bf8f1da8a9aee2951aee5d7a99f1592
[deb_dpdk.git] / lib / librte_port / rte_port_ring.c
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 #include <string.h>
34 #include <stdint.h>
35
36 #include <rte_mbuf.h>
37 #include <rte_ring.h>
38 #include <rte_malloc.h>
39
40 #include "rte_port_ring.h"
41
42 /*
43  * Port RING Reader
44  */
45 #ifdef RTE_PORT_STATS_COLLECT
46
47 #define RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(port, val) \
48         port->stats.n_pkts_in += val
49 #define RTE_PORT_RING_READER_STATS_PKTS_DROP_ADD(port, val) \
50         port->stats.n_pkts_drop += val
51
52 #else
53
54 #define RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(port, val)
55 #define RTE_PORT_RING_READER_STATS_PKTS_DROP_ADD(port, val)
56
57 #endif
58
59 struct rte_port_ring_reader {
60         struct rte_port_in_stats stats;
61
62         struct rte_ring *ring;
63 };
64
65 static void *
66 rte_port_ring_reader_create_internal(void *params, int socket_id,
67         uint32_t is_multi)
68 {
69         struct rte_port_ring_reader_params *conf =
70                         params;
71         struct rte_port_ring_reader *port;
72
73         /* Check input parameters */
74         if ((conf == NULL) ||
75                 (conf->ring == NULL) ||
76                 (conf->ring->cons.single && is_multi) ||
77                 (!(conf->ring->cons.single) && !is_multi)) {
78                 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
79                 return NULL;
80         }
81
82         /* Memory allocation */
83         port = rte_zmalloc_socket("PORT", sizeof(*port),
84                         RTE_CACHE_LINE_SIZE, socket_id);
85         if (port == NULL) {
86                 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
87                 return NULL;
88         }
89
90         /* Initialization */
91         port->ring = conf->ring;
92
93         return port;
94 }
95
96 static void *
97 rte_port_ring_reader_create(void *params, int socket_id)
98 {
99         return rte_port_ring_reader_create_internal(params, socket_id, 0);
100 }
101
102 static void *
103 rte_port_ring_multi_reader_create(void *params, int socket_id)
104 {
105         return rte_port_ring_reader_create_internal(params, socket_id, 1);
106 }
107
108 static int
109 rte_port_ring_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts)
110 {
111         struct rte_port_ring_reader *p = port;
112         uint32_t nb_rx;
113
114         nb_rx = rte_ring_sc_dequeue_burst(p->ring, (void **) pkts,
115                         n_pkts, NULL);
116         RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx);
117
118         return nb_rx;
119 }
120
121 static int
122 rte_port_ring_multi_reader_rx(void *port, struct rte_mbuf **pkts,
123         uint32_t n_pkts)
124 {
125         struct rte_port_ring_reader *p = port;
126         uint32_t nb_rx;
127
128         nb_rx = rte_ring_mc_dequeue_burst(p->ring, (void **) pkts,
129                         n_pkts, NULL);
130         RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx);
131
132         return nb_rx;
133 }
134
135 static int
136 rte_port_ring_reader_free(void *port)
137 {
138         if (port == NULL) {
139                 RTE_LOG(ERR, PORT, "%s: port is NULL\n", __func__);
140                 return -EINVAL;
141         }
142
143         rte_free(port);
144
145         return 0;
146 }
147
148 static int
149 rte_port_ring_reader_stats_read(void *port,
150                 struct rte_port_in_stats *stats, int clear)
151 {
152         struct rte_port_ring_reader *p =
153                 port;
154
155         if (stats != NULL)
156                 memcpy(stats, &p->stats, sizeof(p->stats));
157
158         if (clear)
159                 memset(&p->stats, 0, sizeof(p->stats));
160
161         return 0;
162 }
163
164 /*
165  * Port RING Writer
166  */
167 #ifdef RTE_PORT_STATS_COLLECT
168
169 #define RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(port, val) \
170         port->stats.n_pkts_in += val
171 #define RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(port, val) \
172         port->stats.n_pkts_drop += val
173
174 #else
175
176 #define RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(port, val)
177 #define RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(port, val)
178
179 #endif
180
181 struct rte_port_ring_writer {
182         struct rte_port_out_stats stats;
183
184         struct rte_mbuf *tx_buf[2 * RTE_PORT_IN_BURST_SIZE_MAX];
185         struct rte_ring *ring;
186         uint32_t tx_burst_sz;
187         uint32_t tx_buf_count;
188         uint64_t bsz_mask;
189         uint32_t is_multi;
190 };
191
192 static void *
193 rte_port_ring_writer_create_internal(void *params, int socket_id,
194         uint32_t is_multi)
195 {
196         struct rte_port_ring_writer_params *conf =
197                         params;
198         struct rte_port_ring_writer *port;
199
200         /* Check input parameters */
201         if ((conf == NULL) ||
202                 (conf->ring == NULL) ||
203                 (conf->ring->prod.single && is_multi) ||
204                 (!(conf->ring->prod.single) && !is_multi) ||
205                 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
206                 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
207                 return NULL;
208         }
209
210         /* Memory allocation */
211         port = rte_zmalloc_socket("PORT", sizeof(*port),
212                         RTE_CACHE_LINE_SIZE, socket_id);
213         if (port == NULL) {
214                 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
215                 return NULL;
216         }
217
218         /* Initialization */
219         port->ring = conf->ring;
220         port->tx_burst_sz = conf->tx_burst_sz;
221         port->tx_buf_count = 0;
222         port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
223         port->is_multi = is_multi;
224
225         return port;
226 }
227
228 static void *
229 rte_port_ring_writer_create(void *params, int socket_id)
230 {
231         return rte_port_ring_writer_create_internal(params, socket_id, 0);
232 }
233
234 static void *
235 rte_port_ring_multi_writer_create(void *params, int socket_id)
236 {
237         return rte_port_ring_writer_create_internal(params, socket_id, 1);
238 }
239
240 static inline void
241 send_burst(struct rte_port_ring_writer *p)
242 {
243         uint32_t nb_tx;
244
245         nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
246                         p->tx_buf_count, NULL);
247
248         RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
249         for ( ; nb_tx < p->tx_buf_count; nb_tx++)
250                 rte_pktmbuf_free(p->tx_buf[nb_tx]);
251
252         p->tx_buf_count = 0;
253 }
254
255 static inline void
256 send_burst_mp(struct rte_port_ring_writer *p)
257 {
258         uint32_t nb_tx;
259
260         nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
261                         p->tx_buf_count, NULL);
262
263         RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
264         for ( ; nb_tx < p->tx_buf_count; nb_tx++)
265                 rte_pktmbuf_free(p->tx_buf[nb_tx]);
266
267         p->tx_buf_count = 0;
268 }
269
270 static int
271 rte_port_ring_writer_tx(void *port, struct rte_mbuf *pkt)
272 {
273         struct rte_port_ring_writer *p = port;
274
275         p->tx_buf[p->tx_buf_count++] = pkt;
276         RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
277         if (p->tx_buf_count >= p->tx_burst_sz)
278                 send_burst(p);
279
280         return 0;
281 }
282
283 static int
284 rte_port_ring_multi_writer_tx(void *port, struct rte_mbuf *pkt)
285 {
286         struct rte_port_ring_writer *p = port;
287
288         p->tx_buf[p->tx_buf_count++] = pkt;
289         RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
290         if (p->tx_buf_count >= p->tx_burst_sz)
291                 send_burst_mp(p);
292
293         return 0;
294 }
295
296 static __rte_always_inline int
297 rte_port_ring_writer_tx_bulk_internal(void *port,
298                 struct rte_mbuf **pkts,
299                 uint64_t pkts_mask,
300                 uint32_t is_multi)
301 {
302         struct rte_port_ring_writer *p =
303                 port;
304
305         uint64_t bsz_mask = p->bsz_mask;
306         uint32_t tx_buf_count = p->tx_buf_count;
307         uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
308                         ((pkts_mask & bsz_mask) ^ bsz_mask);
309
310         if (expr == 0) {
311                 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
312                 uint32_t n_pkts_ok;
313
314                 if (tx_buf_count) {
315                         if (is_multi)
316                                 send_burst_mp(p);
317                         else
318                                 send_burst(p);
319                 }
320
321                 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
322                 if (is_multi)
323                         n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring,
324                                         (void **)pkts, n_pkts, NULL);
325                 else
326                         n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring,
327                                         (void **)pkts, n_pkts, NULL);
328
329                 RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, n_pkts - n_pkts_ok);
330                 for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) {
331                         struct rte_mbuf *pkt = pkts[n_pkts_ok];
332
333                         rte_pktmbuf_free(pkt);
334                 }
335         } else {
336                 for ( ; pkts_mask; ) {
337                         uint32_t pkt_index = __builtin_ctzll(pkts_mask);
338                         uint64_t pkt_mask = 1LLU << pkt_index;
339                         struct rte_mbuf *pkt = pkts[pkt_index];
340
341                         p->tx_buf[tx_buf_count++] = pkt;
342                         RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
343                         pkts_mask &= ~pkt_mask;
344                 }
345
346                 p->tx_buf_count = tx_buf_count;
347                 if (tx_buf_count >= p->tx_burst_sz) {
348                         if (is_multi)
349                                 send_burst_mp(p);
350                         else
351                                 send_burst(p);
352                 }
353         }
354
355         return 0;
356 }
357
358 static int
359 rte_port_ring_writer_tx_bulk(void *port,
360                 struct rte_mbuf **pkts,
361                 uint64_t pkts_mask)
362 {
363         return rte_port_ring_writer_tx_bulk_internal(port, pkts, pkts_mask, 0);
364 }
365
366 static int
367 rte_port_ring_multi_writer_tx_bulk(void *port,
368                 struct rte_mbuf **pkts,
369                 uint64_t pkts_mask)
370 {
371         return rte_port_ring_writer_tx_bulk_internal(port, pkts, pkts_mask, 1);
372 }
373
374 static int
375 rte_port_ring_writer_flush(void *port)
376 {
377         struct rte_port_ring_writer *p = port;
378
379         if (p->tx_buf_count > 0)
380                 send_burst(p);
381
382         return 0;
383 }
384
385 static int
386 rte_port_ring_multi_writer_flush(void *port)
387 {
388         struct rte_port_ring_writer *p = port;
389
390         if (p->tx_buf_count > 0)
391                 send_burst_mp(p);
392
393         return 0;
394 }
395
396 static int
397 rte_port_ring_writer_free(void *port)
398 {
399         struct rte_port_ring_writer *p = port;
400
401         if (port == NULL) {
402                 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
403                 return -EINVAL;
404         }
405
406         if (p->is_multi)
407                 rte_port_ring_multi_writer_flush(port);
408         else
409                 rte_port_ring_writer_flush(port);
410
411         rte_free(port);
412
413         return 0;
414 }
415
416 static int
417 rte_port_ring_writer_stats_read(void *port,
418                 struct rte_port_out_stats *stats, int clear)
419 {
420         struct rte_port_ring_writer *p =
421                 port;
422
423         if (stats != NULL)
424                 memcpy(stats, &p->stats, sizeof(p->stats));
425
426         if (clear)
427                 memset(&p->stats, 0, sizeof(p->stats));
428
429         return 0;
430 }
431
432 /*
433  * Port RING Writer Nodrop
434  */
435 #ifdef RTE_PORT_STATS_COLLECT
436
437 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val) \
438         port->stats.n_pkts_in += val
439 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val) \
440         port->stats.n_pkts_drop += val
441
442 #else
443
444 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val)
445 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val)
446
447 #endif
448
449 struct rte_port_ring_writer_nodrop {
450         struct rte_port_out_stats stats;
451
452         struct rte_mbuf *tx_buf[2 * RTE_PORT_IN_BURST_SIZE_MAX];
453         struct rte_ring *ring;
454         uint32_t tx_burst_sz;
455         uint32_t tx_buf_count;
456         uint64_t bsz_mask;
457         uint64_t n_retries;
458         uint32_t is_multi;
459 };
460
461 static void *
462 rte_port_ring_writer_nodrop_create_internal(void *params, int socket_id,
463         uint32_t is_multi)
464 {
465         struct rte_port_ring_writer_nodrop_params *conf =
466                         params;
467         struct rte_port_ring_writer_nodrop *port;
468
469         /* Check input parameters */
470         if ((conf == NULL) ||
471                 (conf->ring == NULL) ||
472                 (conf->ring->prod.single && is_multi) ||
473                 (!(conf->ring->prod.single) && !is_multi) ||
474                 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
475                 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
476                 return NULL;
477         }
478
479         /* Memory allocation */
480         port = rte_zmalloc_socket("PORT", sizeof(*port),
481                         RTE_CACHE_LINE_SIZE, socket_id);
482         if (port == NULL) {
483                 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
484                 return NULL;
485         }
486
487         /* Initialization */
488         port->ring = conf->ring;
489         port->tx_burst_sz = conf->tx_burst_sz;
490         port->tx_buf_count = 0;
491         port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
492         port->is_multi = is_multi;
493
494         /*
495          * When n_retries is 0 it means that we should wait for every packet to
496          * send no matter how many retries should it take. To limit number of
497          * branches in fast path, we use UINT64_MAX instead of branching.
498          */
499         port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
500
501         return port;
502 }
503
504 static void *
505 rte_port_ring_writer_nodrop_create(void *params, int socket_id)
506 {
507         return rte_port_ring_writer_nodrop_create_internal(params, socket_id, 0);
508 }
509
510 static void *
511 rte_port_ring_multi_writer_nodrop_create(void *params, int socket_id)
512 {
513         return rte_port_ring_writer_nodrop_create_internal(params, socket_id, 1);
514 }
515
516 static inline void
517 send_burst_nodrop(struct rte_port_ring_writer_nodrop *p)
518 {
519         uint32_t nb_tx = 0, i;
520
521         nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
522                                 p->tx_buf_count, NULL);
523
524         /* We sent all the packets in a first try */
525         if (nb_tx >= p->tx_buf_count) {
526                 p->tx_buf_count = 0;
527                 return;
528         }
529
530         for (i = 0; i < p->n_retries; i++) {
531                 nb_tx += rte_ring_sp_enqueue_burst(p->ring,
532                                 (void **) (p->tx_buf + nb_tx),
533                                 p->tx_buf_count - nb_tx, NULL);
534
535                 /* We sent all the packets in more than one try */
536                 if (nb_tx >= p->tx_buf_count) {
537                         p->tx_buf_count = 0;
538                         return;
539                 }
540         }
541
542         /* We didn't send the packets in maximum allowed attempts */
543         RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
544         for ( ; nb_tx < p->tx_buf_count; nb_tx++)
545                 rte_pktmbuf_free(p->tx_buf[nb_tx]);
546
547         p->tx_buf_count = 0;
548 }
549
550 static inline void
551 send_burst_mp_nodrop(struct rte_port_ring_writer_nodrop *p)
552 {
553         uint32_t nb_tx = 0, i;
554
555         nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
556                                 p->tx_buf_count, NULL);
557
558         /* We sent all the packets in a first try */
559         if (nb_tx >= p->tx_buf_count) {
560                 p->tx_buf_count = 0;
561                 return;
562         }
563
564         for (i = 0; i < p->n_retries; i++) {
565                 nb_tx += rte_ring_mp_enqueue_burst(p->ring,
566                                 (void **) (p->tx_buf + nb_tx),
567                                 p->tx_buf_count - nb_tx, NULL);
568
569                 /* We sent all the packets in more than one try */
570                 if (nb_tx >= p->tx_buf_count) {
571                         p->tx_buf_count = 0;
572                         return;
573                 }
574         }
575
576         /* We didn't send the packets in maximum allowed attempts */
577         RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
578         for ( ; nb_tx < p->tx_buf_count; nb_tx++)
579                 rte_pktmbuf_free(p->tx_buf[nb_tx]);
580
581         p->tx_buf_count = 0;
582 }
583
584 static int
585 rte_port_ring_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
586 {
587         struct rte_port_ring_writer_nodrop *p =
588                         port;
589
590         p->tx_buf[p->tx_buf_count++] = pkt;
591         RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
592         if (p->tx_buf_count >= p->tx_burst_sz)
593                 send_burst_nodrop(p);
594
595         return 0;
596 }
597
598 static int
599 rte_port_ring_multi_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
600 {
601         struct rte_port_ring_writer_nodrop *p =
602                         port;
603
604         p->tx_buf[p->tx_buf_count++] = pkt;
605         RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
606         if (p->tx_buf_count >= p->tx_burst_sz)
607                 send_burst_mp_nodrop(p);
608
609         return 0;
610 }
611
612 static __rte_always_inline int
613 rte_port_ring_writer_nodrop_tx_bulk_internal(void *port,
614                 struct rte_mbuf **pkts,
615                 uint64_t pkts_mask,
616                 uint32_t is_multi)
617 {
618         struct rte_port_ring_writer_nodrop *p =
619                 port;
620
621         uint64_t bsz_mask = p->bsz_mask;
622         uint32_t tx_buf_count = p->tx_buf_count;
623         uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
624                         ((pkts_mask & bsz_mask) ^ bsz_mask);
625
626         if (expr == 0) {
627                 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
628                 uint32_t n_pkts_ok;
629
630                 if (tx_buf_count) {
631                         if (is_multi)
632                                 send_burst_mp_nodrop(p);
633                         else
634                                 send_burst_nodrop(p);
635                 }
636
637                 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
638                 if (is_multi)
639                         n_pkts_ok =
640                                 rte_ring_mp_enqueue_burst(p->ring,
641                                                 (void **)pkts, n_pkts, NULL);
642                 else
643                         n_pkts_ok =
644                                 rte_ring_sp_enqueue_burst(p->ring,
645                                                 (void **)pkts, n_pkts, NULL);
646
647                 if (n_pkts_ok >= n_pkts)
648                         return 0;
649
650                 /*
651                  * If we didn't manage to send all packets in single burst, move
652                  * remaining packets to the buffer and call send burst.
653                  */
654                 for (; n_pkts_ok < n_pkts; n_pkts_ok++) {
655                         struct rte_mbuf *pkt = pkts[n_pkts_ok];
656
657                         p->tx_buf[p->tx_buf_count++] = pkt;
658                 }
659                 if (is_multi)
660                         send_burst_mp_nodrop(p);
661                 else
662                         send_burst_nodrop(p);
663         } else {
664                 for ( ; pkts_mask; ) {
665                         uint32_t pkt_index = __builtin_ctzll(pkts_mask);
666                         uint64_t pkt_mask = 1LLU << pkt_index;
667                         struct rte_mbuf *pkt = pkts[pkt_index];
668
669                         p->tx_buf[tx_buf_count++] = pkt;
670                         RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
671                         pkts_mask &= ~pkt_mask;
672                 }
673
674                 p->tx_buf_count = tx_buf_count;
675                 if (tx_buf_count >= p->tx_burst_sz) {
676                         if (is_multi)
677                                 send_burst_mp_nodrop(p);
678                         else
679                                 send_burst_nodrop(p);
680                 }
681         }
682
683         return 0;
684 }
685
686 static int
687 rte_port_ring_writer_nodrop_tx_bulk(void *port,
688                 struct rte_mbuf **pkts,
689                 uint64_t pkts_mask)
690 {
691         return
692                 rte_port_ring_writer_nodrop_tx_bulk_internal(port, pkts, pkts_mask, 0);
693 }
694
695 static int
696 rte_port_ring_multi_writer_nodrop_tx_bulk(void *port,
697                 struct rte_mbuf **pkts,
698                 uint64_t pkts_mask)
699 {
700         return
701                 rte_port_ring_writer_nodrop_tx_bulk_internal(port, pkts, pkts_mask, 1);
702 }
703
704 static int
705 rte_port_ring_writer_nodrop_flush(void *port)
706 {
707         struct rte_port_ring_writer_nodrop *p =
708                         port;
709
710         if (p->tx_buf_count > 0)
711                 send_burst_nodrop(p);
712
713         return 0;
714 }
715
716 static int
717 rte_port_ring_multi_writer_nodrop_flush(void *port)
718 {
719         struct rte_port_ring_writer_nodrop *p =
720                         port;
721
722         if (p->tx_buf_count > 0)
723                 send_burst_mp_nodrop(p);
724
725         return 0;
726 }
727
728 static int
729 rte_port_ring_writer_nodrop_free(void *port)
730 {
731         struct rte_port_ring_writer_nodrop *p =
732                         port;
733
734         if (port == NULL) {
735                 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
736                 return -EINVAL;
737         }
738
739         if (p->is_multi)
740                 rte_port_ring_multi_writer_nodrop_flush(port);
741         else
742                 rte_port_ring_writer_nodrop_flush(port);
743
744         rte_free(port);
745
746         return 0;
747 }
748
749 static int
750 rte_port_ring_writer_nodrop_stats_read(void *port,
751                 struct rte_port_out_stats *stats, int clear)
752 {
753         struct rte_port_ring_writer_nodrop *p =
754                 port;
755
756         if (stats != NULL)
757                 memcpy(stats, &p->stats, sizeof(p->stats));
758
759         if (clear)
760                 memset(&p->stats, 0, sizeof(p->stats));
761
762         return 0;
763 }
764
765 /*
766  * Summary of port operations
767  */
768 struct rte_port_in_ops rte_port_ring_reader_ops = {
769         .f_create = rte_port_ring_reader_create,
770         .f_free = rte_port_ring_reader_free,
771         .f_rx = rte_port_ring_reader_rx,
772         .f_stats = rte_port_ring_reader_stats_read,
773 };
774
775 struct rte_port_out_ops rte_port_ring_writer_ops = {
776         .f_create = rte_port_ring_writer_create,
777         .f_free = rte_port_ring_writer_free,
778         .f_tx = rte_port_ring_writer_tx,
779         .f_tx_bulk = rte_port_ring_writer_tx_bulk,
780         .f_flush = rte_port_ring_writer_flush,
781         .f_stats = rte_port_ring_writer_stats_read,
782 };
783
784 struct rte_port_out_ops rte_port_ring_writer_nodrop_ops = {
785         .f_create = rte_port_ring_writer_nodrop_create,
786         .f_free = rte_port_ring_writer_nodrop_free,
787         .f_tx = rte_port_ring_writer_nodrop_tx,
788         .f_tx_bulk = rte_port_ring_writer_nodrop_tx_bulk,
789         .f_flush = rte_port_ring_writer_nodrop_flush,
790         .f_stats = rte_port_ring_writer_nodrop_stats_read,
791 };
792
793 struct rte_port_in_ops rte_port_ring_multi_reader_ops = {
794         .f_create = rte_port_ring_multi_reader_create,
795         .f_free = rte_port_ring_reader_free,
796         .f_rx = rte_port_ring_multi_reader_rx,
797         .f_stats = rte_port_ring_reader_stats_read,
798 };
799
800 struct rte_port_out_ops rte_port_ring_multi_writer_ops = {
801         .f_create = rte_port_ring_multi_writer_create,
802         .f_free = rte_port_ring_writer_free,
803         .f_tx = rte_port_ring_multi_writer_tx,
804         .f_tx_bulk = rte_port_ring_multi_writer_tx_bulk,
805         .f_flush = rte_port_ring_multi_writer_flush,
806         .f_stats = rte_port_ring_writer_stats_read,
807 };
808
809 struct rte_port_out_ops rte_port_ring_multi_writer_nodrop_ops = {
810         .f_create = rte_port_ring_multi_writer_nodrop_create,
811         .f_free = rte_port_ring_writer_nodrop_free,
812         .f_tx = rte_port_ring_multi_writer_nodrop_tx,
813         .f_tx_bulk = rte_port_ring_multi_writer_nodrop_tx_bulk,
814         .f_flush = rte_port_ring_multi_writer_nodrop_flush,
815         .f_stats = rte_port_ring_writer_nodrop_stats_read,
816 };