New upstream version 18.02
[deb_dpdk.git] / lib / librte_port / rte_port_ring.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  */
4 #include <string.h>
5 #include <stdint.h>
6
7 #include <rte_mbuf.h>
8 #include <rte_ring.h>
9 #include <rte_malloc.h>
10
11 #include "rte_port_ring.h"
12
13 /*
14  * Port RING Reader
15  */
16 #ifdef RTE_PORT_STATS_COLLECT
17
18 #define RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(port, val) \
19         port->stats.n_pkts_in += val
20 #define RTE_PORT_RING_READER_STATS_PKTS_DROP_ADD(port, val) \
21         port->stats.n_pkts_drop += val
22
23 #else
24
25 #define RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(port, val)
26 #define RTE_PORT_RING_READER_STATS_PKTS_DROP_ADD(port, val)
27
28 #endif
29
30 struct rte_port_ring_reader {
31         struct rte_port_in_stats stats;
32
33         struct rte_ring *ring;
34 };
35
36 static void *
37 rte_port_ring_reader_create_internal(void *params, int socket_id,
38         uint32_t is_multi)
39 {
40         struct rte_port_ring_reader_params *conf =
41                         params;
42         struct rte_port_ring_reader *port;
43
44         /* Check input parameters */
45         if ((conf == NULL) ||
46                 (conf->ring == NULL) ||
47                 (conf->ring->cons.single && is_multi) ||
48                 (!(conf->ring->cons.single) && !is_multi)) {
49                 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
50                 return NULL;
51         }
52
53         /* Memory allocation */
54         port = rte_zmalloc_socket("PORT", sizeof(*port),
55                         RTE_CACHE_LINE_SIZE, socket_id);
56         if (port == NULL) {
57                 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
58                 return NULL;
59         }
60
61         /* Initialization */
62         port->ring = conf->ring;
63
64         return port;
65 }
66
67 static void *
68 rte_port_ring_reader_create(void *params, int socket_id)
69 {
70         return rte_port_ring_reader_create_internal(params, socket_id, 0);
71 }
72
73 static void *
74 rte_port_ring_multi_reader_create(void *params, int socket_id)
75 {
76         return rte_port_ring_reader_create_internal(params, socket_id, 1);
77 }
78
79 static int
80 rte_port_ring_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts)
81 {
82         struct rte_port_ring_reader *p = port;
83         uint32_t nb_rx;
84
85         nb_rx = rte_ring_sc_dequeue_burst(p->ring, (void **) pkts,
86                         n_pkts, NULL);
87         RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx);
88
89         return nb_rx;
90 }
91
92 static int
93 rte_port_ring_multi_reader_rx(void *port, struct rte_mbuf **pkts,
94         uint32_t n_pkts)
95 {
96         struct rte_port_ring_reader *p = port;
97         uint32_t nb_rx;
98
99         nb_rx = rte_ring_mc_dequeue_burst(p->ring, (void **) pkts,
100                         n_pkts, NULL);
101         RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx);
102
103         return nb_rx;
104 }
105
106 static int
107 rte_port_ring_reader_free(void *port)
108 {
109         if (port == NULL) {
110                 RTE_LOG(ERR, PORT, "%s: port is NULL\n", __func__);
111                 return -EINVAL;
112         }
113
114         rte_free(port);
115
116         return 0;
117 }
118
119 static int
120 rte_port_ring_reader_stats_read(void *port,
121                 struct rte_port_in_stats *stats, int clear)
122 {
123         struct rte_port_ring_reader *p =
124                 port;
125
126         if (stats != NULL)
127                 memcpy(stats, &p->stats, sizeof(p->stats));
128
129         if (clear)
130                 memset(&p->stats, 0, sizeof(p->stats));
131
132         return 0;
133 }
134
135 /*
136  * Port RING Writer
137  */
138 #ifdef RTE_PORT_STATS_COLLECT
139
140 #define RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(port, val) \
141         port->stats.n_pkts_in += val
142 #define RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(port, val) \
143         port->stats.n_pkts_drop += val
144
145 #else
146
147 #define RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(port, val)
148 #define RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(port, val)
149
150 #endif
151
152 struct rte_port_ring_writer {
153         struct rte_port_out_stats stats;
154
155         struct rte_mbuf *tx_buf[2 * RTE_PORT_IN_BURST_SIZE_MAX];
156         struct rte_ring *ring;
157         uint32_t tx_burst_sz;
158         uint32_t tx_buf_count;
159         uint64_t bsz_mask;
160         uint32_t is_multi;
161 };
162
163 static void *
164 rte_port_ring_writer_create_internal(void *params, int socket_id,
165         uint32_t is_multi)
166 {
167         struct rte_port_ring_writer_params *conf =
168                         params;
169         struct rte_port_ring_writer *port;
170
171         /* Check input parameters */
172         if ((conf == NULL) ||
173                 (conf->ring == NULL) ||
174                 (conf->ring->prod.single && is_multi) ||
175                 (!(conf->ring->prod.single) && !is_multi) ||
176                 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
177                 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
178                 return NULL;
179         }
180
181         /* Memory allocation */
182         port = rte_zmalloc_socket("PORT", sizeof(*port),
183                         RTE_CACHE_LINE_SIZE, socket_id);
184         if (port == NULL) {
185                 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
186                 return NULL;
187         }
188
189         /* Initialization */
190         port->ring = conf->ring;
191         port->tx_burst_sz = conf->tx_burst_sz;
192         port->tx_buf_count = 0;
193         port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
194         port->is_multi = is_multi;
195
196         return port;
197 }
198
199 static void *
200 rte_port_ring_writer_create(void *params, int socket_id)
201 {
202         return rte_port_ring_writer_create_internal(params, socket_id, 0);
203 }
204
205 static void *
206 rte_port_ring_multi_writer_create(void *params, int socket_id)
207 {
208         return rte_port_ring_writer_create_internal(params, socket_id, 1);
209 }
210
211 static inline void
212 send_burst(struct rte_port_ring_writer *p)
213 {
214         uint32_t nb_tx;
215
216         nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
217                         p->tx_buf_count, NULL);
218
219         RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
220         for ( ; nb_tx < p->tx_buf_count; nb_tx++)
221                 rte_pktmbuf_free(p->tx_buf[nb_tx]);
222
223         p->tx_buf_count = 0;
224 }
225
226 static inline void
227 send_burst_mp(struct rte_port_ring_writer *p)
228 {
229         uint32_t nb_tx;
230
231         nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
232                         p->tx_buf_count, NULL);
233
234         RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
235         for ( ; nb_tx < p->tx_buf_count; nb_tx++)
236                 rte_pktmbuf_free(p->tx_buf[nb_tx]);
237
238         p->tx_buf_count = 0;
239 }
240
241 static int
242 rte_port_ring_writer_tx(void *port, struct rte_mbuf *pkt)
243 {
244         struct rte_port_ring_writer *p = port;
245
246         p->tx_buf[p->tx_buf_count++] = pkt;
247         RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
248         if (p->tx_buf_count >= p->tx_burst_sz)
249                 send_burst(p);
250
251         return 0;
252 }
253
254 static int
255 rte_port_ring_multi_writer_tx(void *port, struct rte_mbuf *pkt)
256 {
257         struct rte_port_ring_writer *p = port;
258
259         p->tx_buf[p->tx_buf_count++] = pkt;
260         RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
261         if (p->tx_buf_count >= p->tx_burst_sz)
262                 send_burst_mp(p);
263
264         return 0;
265 }
266
267 static __rte_always_inline int
268 rte_port_ring_writer_tx_bulk_internal(void *port,
269                 struct rte_mbuf **pkts,
270                 uint64_t pkts_mask,
271                 uint32_t is_multi)
272 {
273         struct rte_port_ring_writer *p =
274                 port;
275
276         uint64_t bsz_mask = p->bsz_mask;
277         uint32_t tx_buf_count = p->tx_buf_count;
278         uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
279                         ((pkts_mask & bsz_mask) ^ bsz_mask);
280
281         if (expr == 0) {
282                 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
283                 uint32_t n_pkts_ok;
284
285                 if (tx_buf_count) {
286                         if (is_multi)
287                                 send_burst_mp(p);
288                         else
289                                 send_burst(p);
290                 }
291
292                 RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, n_pkts);
293                 if (is_multi)
294                         n_pkts_ok = rte_ring_mp_enqueue_burst(p->ring,
295                                         (void **)pkts, n_pkts, NULL);
296                 else
297                         n_pkts_ok = rte_ring_sp_enqueue_burst(p->ring,
298                                         (void **)pkts, n_pkts, NULL);
299
300                 RTE_PORT_RING_WRITER_STATS_PKTS_DROP_ADD(p, n_pkts - n_pkts_ok);
301                 for ( ; n_pkts_ok < n_pkts; n_pkts_ok++) {
302                         struct rte_mbuf *pkt = pkts[n_pkts_ok];
303
304                         rte_pktmbuf_free(pkt);
305                 }
306         } else {
307                 for ( ; pkts_mask; ) {
308                         uint32_t pkt_index = __builtin_ctzll(pkts_mask);
309                         uint64_t pkt_mask = 1LLU << pkt_index;
310                         struct rte_mbuf *pkt = pkts[pkt_index];
311
312                         p->tx_buf[tx_buf_count++] = pkt;
313                         RTE_PORT_RING_WRITER_STATS_PKTS_IN_ADD(p, 1);
314                         pkts_mask &= ~pkt_mask;
315                 }
316
317                 p->tx_buf_count = tx_buf_count;
318                 if (tx_buf_count >= p->tx_burst_sz) {
319                         if (is_multi)
320                                 send_burst_mp(p);
321                         else
322                                 send_burst(p);
323                 }
324         }
325
326         return 0;
327 }
328
329 static int
330 rte_port_ring_writer_tx_bulk(void *port,
331                 struct rte_mbuf **pkts,
332                 uint64_t pkts_mask)
333 {
334         return rte_port_ring_writer_tx_bulk_internal(port, pkts, pkts_mask, 0);
335 }
336
337 static int
338 rte_port_ring_multi_writer_tx_bulk(void *port,
339                 struct rte_mbuf **pkts,
340                 uint64_t pkts_mask)
341 {
342         return rte_port_ring_writer_tx_bulk_internal(port, pkts, pkts_mask, 1);
343 }
344
345 static int
346 rte_port_ring_writer_flush(void *port)
347 {
348         struct rte_port_ring_writer *p = port;
349
350         if (p->tx_buf_count > 0)
351                 send_burst(p);
352
353         return 0;
354 }
355
356 static int
357 rte_port_ring_multi_writer_flush(void *port)
358 {
359         struct rte_port_ring_writer *p = port;
360
361         if (p->tx_buf_count > 0)
362                 send_burst_mp(p);
363
364         return 0;
365 }
366
367 static int
368 rte_port_ring_writer_free(void *port)
369 {
370         struct rte_port_ring_writer *p = port;
371
372         if (port == NULL) {
373                 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
374                 return -EINVAL;
375         }
376
377         if (p->is_multi)
378                 rte_port_ring_multi_writer_flush(port);
379         else
380                 rte_port_ring_writer_flush(port);
381
382         rte_free(port);
383
384         return 0;
385 }
386
387 static int
388 rte_port_ring_writer_stats_read(void *port,
389                 struct rte_port_out_stats *stats, int clear)
390 {
391         struct rte_port_ring_writer *p =
392                 port;
393
394         if (stats != NULL)
395                 memcpy(stats, &p->stats, sizeof(p->stats));
396
397         if (clear)
398                 memset(&p->stats, 0, sizeof(p->stats));
399
400         return 0;
401 }
402
403 /*
404  * Port RING Writer Nodrop
405  */
406 #ifdef RTE_PORT_STATS_COLLECT
407
408 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val) \
409         port->stats.n_pkts_in += val
410 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val) \
411         port->stats.n_pkts_drop += val
412
413 #else
414
415 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val)
416 #define RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val)
417
418 #endif
419
420 struct rte_port_ring_writer_nodrop {
421         struct rte_port_out_stats stats;
422
423         struct rte_mbuf *tx_buf[2 * RTE_PORT_IN_BURST_SIZE_MAX];
424         struct rte_ring *ring;
425         uint32_t tx_burst_sz;
426         uint32_t tx_buf_count;
427         uint64_t bsz_mask;
428         uint64_t n_retries;
429         uint32_t is_multi;
430 };
431
432 static void *
433 rte_port_ring_writer_nodrop_create_internal(void *params, int socket_id,
434         uint32_t is_multi)
435 {
436         struct rte_port_ring_writer_nodrop_params *conf =
437                         params;
438         struct rte_port_ring_writer_nodrop *port;
439
440         /* Check input parameters */
441         if ((conf == NULL) ||
442                 (conf->ring == NULL) ||
443                 (conf->ring->prod.single && is_multi) ||
444                 (!(conf->ring->prod.single) && !is_multi) ||
445                 (conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
446                 RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
447                 return NULL;
448         }
449
450         /* Memory allocation */
451         port = rte_zmalloc_socket("PORT", sizeof(*port),
452                         RTE_CACHE_LINE_SIZE, socket_id);
453         if (port == NULL) {
454                 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__);
455                 return NULL;
456         }
457
458         /* Initialization */
459         port->ring = conf->ring;
460         port->tx_burst_sz = conf->tx_burst_sz;
461         port->tx_buf_count = 0;
462         port->bsz_mask = 1LLU << (conf->tx_burst_sz - 1);
463         port->is_multi = is_multi;
464
465         /*
466          * When n_retries is 0 it means that we should wait for every packet to
467          * send no matter how many retries should it take. To limit number of
468          * branches in fast path, we use UINT64_MAX instead of branching.
469          */
470         port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries;
471
472         return port;
473 }
474
475 static void *
476 rte_port_ring_writer_nodrop_create(void *params, int socket_id)
477 {
478         return rte_port_ring_writer_nodrop_create_internal(params, socket_id, 0);
479 }
480
481 static void *
482 rte_port_ring_multi_writer_nodrop_create(void *params, int socket_id)
483 {
484         return rte_port_ring_writer_nodrop_create_internal(params, socket_id, 1);
485 }
486
487 static inline void
488 send_burst_nodrop(struct rte_port_ring_writer_nodrop *p)
489 {
490         uint32_t nb_tx = 0, i;
491
492         nb_tx = rte_ring_sp_enqueue_burst(p->ring, (void **)p->tx_buf,
493                                 p->tx_buf_count, NULL);
494
495         /* We sent all the packets in a first try */
496         if (nb_tx >= p->tx_buf_count) {
497                 p->tx_buf_count = 0;
498                 return;
499         }
500
501         for (i = 0; i < p->n_retries; i++) {
502                 nb_tx += rte_ring_sp_enqueue_burst(p->ring,
503                                 (void **) (p->tx_buf + nb_tx),
504                                 p->tx_buf_count - nb_tx, NULL);
505
506                 /* We sent all the packets in more than one try */
507                 if (nb_tx >= p->tx_buf_count) {
508                         p->tx_buf_count = 0;
509                         return;
510                 }
511         }
512
513         /* We didn't send the packets in maximum allowed attempts */
514         RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
515         for ( ; nb_tx < p->tx_buf_count; nb_tx++)
516                 rte_pktmbuf_free(p->tx_buf[nb_tx]);
517
518         p->tx_buf_count = 0;
519 }
520
521 static inline void
522 send_burst_mp_nodrop(struct rte_port_ring_writer_nodrop *p)
523 {
524         uint32_t nb_tx = 0, i;
525
526         nb_tx = rte_ring_mp_enqueue_burst(p->ring, (void **)p->tx_buf,
527                                 p->tx_buf_count, NULL);
528
529         /* We sent all the packets in a first try */
530         if (nb_tx >= p->tx_buf_count) {
531                 p->tx_buf_count = 0;
532                 return;
533         }
534
535         for (i = 0; i < p->n_retries; i++) {
536                 nb_tx += rte_ring_mp_enqueue_burst(p->ring,
537                                 (void **) (p->tx_buf + nb_tx),
538                                 p->tx_buf_count - nb_tx, NULL);
539
540                 /* We sent all the packets in more than one try */
541                 if (nb_tx >= p->tx_buf_count) {
542                         p->tx_buf_count = 0;
543                         return;
544                 }
545         }
546
547         /* We didn't send the packets in maximum allowed attempts */
548         RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_DROP_ADD(p, p->tx_buf_count - nb_tx);
549         for ( ; nb_tx < p->tx_buf_count; nb_tx++)
550                 rte_pktmbuf_free(p->tx_buf[nb_tx]);
551
552         p->tx_buf_count = 0;
553 }
554
555 static int
556 rte_port_ring_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
557 {
558         struct rte_port_ring_writer_nodrop *p =
559                         port;
560
561         p->tx_buf[p->tx_buf_count++] = pkt;
562         RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
563         if (p->tx_buf_count >= p->tx_burst_sz)
564                 send_burst_nodrop(p);
565
566         return 0;
567 }
568
569 static int
570 rte_port_ring_multi_writer_nodrop_tx(void *port, struct rte_mbuf *pkt)
571 {
572         struct rte_port_ring_writer_nodrop *p =
573                         port;
574
575         p->tx_buf[p->tx_buf_count++] = pkt;
576         RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
577         if (p->tx_buf_count >= p->tx_burst_sz)
578                 send_burst_mp_nodrop(p);
579
580         return 0;
581 }
582
583 static __rte_always_inline int
584 rte_port_ring_writer_nodrop_tx_bulk_internal(void *port,
585                 struct rte_mbuf **pkts,
586                 uint64_t pkts_mask,
587                 uint32_t is_multi)
588 {
589         struct rte_port_ring_writer_nodrop *p =
590                 port;
591
592         uint64_t bsz_mask = p->bsz_mask;
593         uint32_t tx_buf_count = p->tx_buf_count;
594         uint64_t expr = (pkts_mask & (pkts_mask + 1)) |
595                         ((pkts_mask & bsz_mask) ^ bsz_mask);
596
597         if (expr == 0) {
598                 uint64_t n_pkts = __builtin_popcountll(pkts_mask);
599                 uint32_t n_pkts_ok;
600
601                 if (tx_buf_count) {
602                         if (is_multi)
603                                 send_burst_mp_nodrop(p);
604                         else
605                                 send_burst_nodrop(p);
606                 }
607
608                 RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts);
609                 if (is_multi)
610                         n_pkts_ok =
611                                 rte_ring_mp_enqueue_burst(p->ring,
612                                                 (void **)pkts, n_pkts, NULL);
613                 else
614                         n_pkts_ok =
615                                 rte_ring_sp_enqueue_burst(p->ring,
616                                                 (void **)pkts, n_pkts, NULL);
617
618                 if (n_pkts_ok >= n_pkts)
619                         return 0;
620
621                 /*
622                  * If we didn't manage to send all packets in single burst, move
623                  * remaining packets to the buffer and call send burst.
624                  */
625                 for (; n_pkts_ok < n_pkts; n_pkts_ok++) {
626                         struct rte_mbuf *pkt = pkts[n_pkts_ok];
627
628                         p->tx_buf[p->tx_buf_count++] = pkt;
629                 }
630                 if (is_multi)
631                         send_burst_mp_nodrop(p);
632                 else
633                         send_burst_nodrop(p);
634         } else {
635                 for ( ; pkts_mask; ) {
636                         uint32_t pkt_index = __builtin_ctzll(pkts_mask);
637                         uint64_t pkt_mask = 1LLU << pkt_index;
638                         struct rte_mbuf *pkt = pkts[pkt_index];
639
640                         p->tx_buf[tx_buf_count++] = pkt;
641                         RTE_PORT_RING_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1);
642                         pkts_mask &= ~pkt_mask;
643                 }
644
645                 p->tx_buf_count = tx_buf_count;
646                 if (tx_buf_count >= p->tx_burst_sz) {
647                         if (is_multi)
648                                 send_burst_mp_nodrop(p);
649                         else
650                                 send_burst_nodrop(p);
651                 }
652         }
653
654         return 0;
655 }
656
657 static int
658 rte_port_ring_writer_nodrop_tx_bulk(void *port,
659                 struct rte_mbuf **pkts,
660                 uint64_t pkts_mask)
661 {
662         return
663                 rte_port_ring_writer_nodrop_tx_bulk_internal(port, pkts, pkts_mask, 0);
664 }
665
666 static int
667 rte_port_ring_multi_writer_nodrop_tx_bulk(void *port,
668                 struct rte_mbuf **pkts,
669                 uint64_t pkts_mask)
670 {
671         return
672                 rte_port_ring_writer_nodrop_tx_bulk_internal(port, pkts, pkts_mask, 1);
673 }
674
675 static int
676 rte_port_ring_writer_nodrop_flush(void *port)
677 {
678         struct rte_port_ring_writer_nodrop *p =
679                         port;
680
681         if (p->tx_buf_count > 0)
682                 send_burst_nodrop(p);
683
684         return 0;
685 }
686
687 static int
688 rte_port_ring_multi_writer_nodrop_flush(void *port)
689 {
690         struct rte_port_ring_writer_nodrop *p =
691                         port;
692
693         if (p->tx_buf_count > 0)
694                 send_burst_mp_nodrop(p);
695
696         return 0;
697 }
698
699 static int
700 rte_port_ring_writer_nodrop_free(void *port)
701 {
702         struct rte_port_ring_writer_nodrop *p =
703                         port;
704
705         if (port == NULL) {
706                 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__);
707                 return -EINVAL;
708         }
709
710         if (p->is_multi)
711                 rte_port_ring_multi_writer_nodrop_flush(port);
712         else
713                 rte_port_ring_writer_nodrop_flush(port);
714
715         rte_free(port);
716
717         return 0;
718 }
719
720 static int
721 rte_port_ring_writer_nodrop_stats_read(void *port,
722                 struct rte_port_out_stats *stats, int clear)
723 {
724         struct rte_port_ring_writer_nodrop *p =
725                 port;
726
727         if (stats != NULL)
728                 memcpy(stats, &p->stats, sizeof(p->stats));
729
730         if (clear)
731                 memset(&p->stats, 0, sizeof(p->stats));
732
733         return 0;
734 }
735
736 /*
737  * Summary of port operations
738  */
739 struct rte_port_in_ops rte_port_ring_reader_ops = {
740         .f_create = rte_port_ring_reader_create,
741         .f_free = rte_port_ring_reader_free,
742         .f_rx = rte_port_ring_reader_rx,
743         .f_stats = rte_port_ring_reader_stats_read,
744 };
745
746 struct rte_port_out_ops rte_port_ring_writer_ops = {
747         .f_create = rte_port_ring_writer_create,
748         .f_free = rte_port_ring_writer_free,
749         .f_tx = rte_port_ring_writer_tx,
750         .f_tx_bulk = rte_port_ring_writer_tx_bulk,
751         .f_flush = rte_port_ring_writer_flush,
752         .f_stats = rte_port_ring_writer_stats_read,
753 };
754
755 struct rte_port_out_ops rte_port_ring_writer_nodrop_ops = {
756         .f_create = rte_port_ring_writer_nodrop_create,
757         .f_free = rte_port_ring_writer_nodrop_free,
758         .f_tx = rte_port_ring_writer_nodrop_tx,
759         .f_tx_bulk = rte_port_ring_writer_nodrop_tx_bulk,
760         .f_flush = rte_port_ring_writer_nodrop_flush,
761         .f_stats = rte_port_ring_writer_nodrop_stats_read,
762 };
763
764 struct rte_port_in_ops rte_port_ring_multi_reader_ops = {
765         .f_create = rte_port_ring_multi_reader_create,
766         .f_free = rte_port_ring_reader_free,
767         .f_rx = rte_port_ring_multi_reader_rx,
768         .f_stats = rte_port_ring_reader_stats_read,
769 };
770
771 struct rte_port_out_ops rte_port_ring_multi_writer_ops = {
772         .f_create = rte_port_ring_multi_writer_create,
773         .f_free = rte_port_ring_writer_free,
774         .f_tx = rte_port_ring_multi_writer_tx,
775         .f_tx_bulk = rte_port_ring_multi_writer_tx_bulk,
776         .f_flush = rte_port_ring_multi_writer_flush,
777         .f_stats = rte_port_ring_writer_stats_read,
778 };
779
780 struct rte_port_out_ops rte_port_ring_multi_writer_nodrop_ops = {
781         .f_create = rte_port_ring_multi_writer_nodrop_create,
782         .f_free = rte_port_ring_writer_nodrop_free,
783         .f_tx = rte_port_ring_multi_writer_nodrop_tx,
784         .f_tx_bulk = rte_port_ring_multi_writer_nodrop_tx_bulk,
785         .f_flush = rte_port_ring_multi_writer_nodrop_flush,
786         .f_stats = rte_port_ring_writer_nodrop_stats_read,
787 };