New upstream version 18.02
[deb_dpdk.git] / examples / distributor / main.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2017 Intel Corporation
3  */
4
5 #include <stdint.h>
6 #include <inttypes.h>
7 #include <unistd.h>
8 #include <signal.h>
9 #include <getopt.h>
10
11 #include <rte_eal.h>
12 #include <rte_ethdev.h>
13 #include <rte_cycles.h>
14 #include <rte_malloc.h>
15 #include <rte_debug.h>
16 #include <rte_prefetch.h>
17 #include <rte_distributor.h>
18 #include <rte_pause.h>
19
20 #define RX_RING_SIZE 1024
21 #define TX_RING_SIZE 1024
22 #define NUM_MBUFS ((64*1024)-1)
23 #define MBUF_CACHE_SIZE 128
24 #define BURST_SIZE 64
25 #define SCHED_RX_RING_SZ 8192
26 #define SCHED_TX_RING_SZ 65536
27 #define BURST_SIZE_TX 32
28
29 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
30
31 #define ANSI_COLOR_RED     "\x1b[31m"
32 #define ANSI_COLOR_RESET   "\x1b[0m"
33
34 /* mask of enabled ports */
35 static uint32_t enabled_port_mask;
36 volatile uint8_t quit_signal;
37 volatile uint8_t quit_signal_rx;
38 volatile uint8_t quit_signal_dist;
39 volatile uint8_t quit_signal_work;
40
41 static volatile struct app_stats {
42         struct {
43                 uint64_t rx_pkts;
44                 uint64_t returned_pkts;
45                 uint64_t enqueued_pkts;
46                 uint64_t enqdrop_pkts;
47         } rx __rte_cache_aligned;
48         int pad1 __rte_cache_aligned;
49
50         struct {
51                 uint64_t in_pkts;
52                 uint64_t ret_pkts;
53                 uint64_t sent_pkts;
54                 uint64_t enqdrop_pkts;
55         } dist __rte_cache_aligned;
56         int pad2 __rte_cache_aligned;
57
58         struct {
59                 uint64_t dequeue_pkts;
60                 uint64_t tx_pkts;
61                 uint64_t enqdrop_pkts;
62         } tx __rte_cache_aligned;
63         int pad3 __rte_cache_aligned;
64
65         uint64_t worker_pkts[64] __rte_cache_aligned;
66
67         int pad4 __rte_cache_aligned;
68
69         uint64_t worker_bursts[64][8] __rte_cache_aligned;
70
71         int pad5 __rte_cache_aligned;
72
73         uint64_t port_rx_pkts[64] __rte_cache_aligned;
74         uint64_t port_tx_pkts[64] __rte_cache_aligned;
75 } app_stats;
76
77 struct app_stats prev_app_stats;
78
79 static const struct rte_eth_conf port_conf_default = {
80         .rxmode = {
81                 .mq_mode = ETH_MQ_RX_RSS,
82                 .max_rx_pkt_len = ETHER_MAX_LEN,
83                 .ignore_offload_bitfield = 1,
84         },
85         .txmode = {
86                 .mq_mode = ETH_MQ_TX_NONE,
87         },
88         .rx_adv_conf = {
89                 .rss_conf = {
90                         .rss_hf = ETH_RSS_IP | ETH_RSS_UDP |
91                                 ETH_RSS_TCP | ETH_RSS_SCTP,
92                 }
93         },
94 };
95
96 struct output_buffer {
97         unsigned count;
98         struct rte_mbuf *mbufs[BURST_SIZE];
99 };
100
101 static void print_stats(void);
102
103 /*
104  * Initialises a given port using global settings and with the rx buffers
105  * coming from the mbuf_pool passed as parameter
106  */
107 static inline int
108 port_init(uint16_t port, struct rte_mempool *mbuf_pool)
109 {
110         struct rte_eth_conf port_conf = port_conf_default;
111         const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
112         int retval;
113         uint16_t q;
114         uint16_t nb_rxd = RX_RING_SIZE;
115         uint16_t nb_txd = TX_RING_SIZE;
116         struct rte_eth_dev_info dev_info;
117         struct rte_eth_txconf txconf;
118
119         if (port >= rte_eth_dev_count())
120                 return -1;
121
122         rte_eth_dev_info_get(port, &dev_info);
123         if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
124                 port_conf.txmode.offloads |=
125                         DEV_TX_OFFLOAD_MBUF_FAST_FREE;
126
127         retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf);
128         if (retval != 0)
129                 return retval;
130
131         retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd);
132         if (retval != 0)
133                 return retval;
134
135         for (q = 0; q < rxRings; q++) {
136                 retval = rte_eth_rx_queue_setup(port, q, nb_rxd,
137                                                 rte_eth_dev_socket_id(port),
138                                                 NULL, mbuf_pool);
139                 if (retval < 0)
140                         return retval;
141         }
142
143         txconf = dev_info.default_txconf;
144         txconf.txq_flags = ETH_TXQ_FLAGS_IGNORE;
145         txconf.offloads = port_conf.txmode.offloads;
146         for (q = 0; q < txRings; q++) {
147                 retval = rte_eth_tx_queue_setup(port, q, nb_txd,
148                                                 rte_eth_dev_socket_id(port),
149                                                 &txconf);
150                 if (retval < 0)
151                         return retval;
152         }
153
154         retval = rte_eth_dev_start(port);
155         if (retval < 0)
156                 return retval;
157
158         struct rte_eth_link link;
159         rte_eth_link_get_nowait(port, &link);
160         while (!link.link_status) {
161                 printf("Waiting for Link up on port %"PRIu16"\n", port);
162                 sleep(1);
163                 rte_eth_link_get_nowait(port, &link);
164         }
165
166         if (!link.link_status) {
167                 printf("Link down on port %"PRIu16"\n", port);
168                 return 0;
169         }
170
171         struct ether_addr addr;
172         rte_eth_macaddr_get(port, &addr);
173         printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
174                         " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
175                         port,
176                         addr.addr_bytes[0], addr.addr_bytes[1],
177                         addr.addr_bytes[2], addr.addr_bytes[3],
178                         addr.addr_bytes[4], addr.addr_bytes[5]);
179
180         rte_eth_promiscuous_enable(port);
181
182         return 0;
183 }
184
185 struct lcore_params {
186         unsigned worker_id;
187         struct rte_distributor *d;
188         struct rte_ring *rx_dist_ring;
189         struct rte_ring *dist_tx_ring;
190         struct rte_mempool *mem_pool;
191 };
192
193 static int
194 lcore_rx(struct lcore_params *p)
195 {
196         const uint16_t nb_ports = rte_eth_dev_count();
197         const int socket_id = rte_socket_id();
198         uint16_t port;
199         struct rte_mbuf *bufs[BURST_SIZE*2];
200
201         for (port = 0; port < nb_ports; port++) {
202                 /* skip ports that are not enabled */
203                 if ((enabled_port_mask & (1 << port)) == 0)
204                         continue;
205
206                 if (rte_eth_dev_socket_id(port) > 0 &&
207                                 rte_eth_dev_socket_id(port) != socket_id)
208                         printf("WARNING, port %u is on remote NUMA node to "
209                                         "RX thread.\n\tPerformance will not "
210                                         "be optimal.\n", port);
211         }
212
213         printf("\nCore %u doing packet RX.\n", rte_lcore_id());
214         port = 0;
215         while (!quit_signal_rx) {
216
217                 /* skip ports that are not enabled */
218                 if ((enabled_port_mask & (1 << port)) == 0) {
219                         if (++port == nb_ports)
220                                 port = 0;
221                         continue;
222                 }
223                 const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
224                                 BURST_SIZE);
225                 if (unlikely(nb_rx == 0)) {
226                         if (++port == nb_ports)
227                                 port = 0;
228                         continue;
229                 }
230                 app_stats.rx.rx_pkts += nb_rx;
231
232 /*
233  * You can run the distributor on the rx core with this code. Returned
234  * packets are then send straight to the tx core.
235  */
236 #if 0
237         rte_distributor_process(d, bufs, nb_rx);
238         const uint16_t nb_ret = rte_distributor_returned_pktsd,
239                         bufs, BURST_SIZE*2);
240
241                 app_stats.rx.returned_pkts += nb_ret;
242                 if (unlikely(nb_ret == 0)) {
243                         if (++port == nb_ports)
244                                 port = 0;
245                         continue;
246                 }
247
248                 struct rte_ring *tx_ring = p->dist_tx_ring;
249                 uint16_t sent = rte_ring_enqueue_burst(tx_ring,
250                                 (void *)bufs, nb_ret, NULL);
251 #else
252                 uint16_t nb_ret = nb_rx;
253                 /*
254                  * Swap the following two lines if you want the rx traffic
255                  * to go directly to tx, no distribution.
256                  */
257                 struct rte_ring *out_ring = p->rx_dist_ring;
258                 /* struct rte_ring *out_ring = p->dist_tx_ring; */
259
260                 uint16_t sent = rte_ring_enqueue_burst(out_ring,
261                                 (void *)bufs, nb_ret, NULL);
262 #endif
263
264                 app_stats.rx.enqueued_pkts += sent;
265                 if (unlikely(sent < nb_ret)) {
266                         app_stats.rx.enqdrop_pkts +=  nb_ret - sent;
267                         RTE_LOG_DP(DEBUG, DISTRAPP,
268                                 "%s:Packet loss due to full ring\n", __func__);
269                         while (sent < nb_ret)
270                                 rte_pktmbuf_free(bufs[sent++]);
271                 }
272                 if (++port == nb_ports)
273                         port = 0;
274         }
275         /* set worker & tx threads quit flag */
276         printf("\nCore %u exiting rx task.\n", rte_lcore_id());
277         quit_signal = 1;
278         return 0;
279 }
280
281 static inline void
282 flush_one_port(struct output_buffer *outbuf, uint8_t outp)
283 {
284         unsigned int nb_tx = rte_eth_tx_burst(outp, 0,
285                         outbuf->mbufs, outbuf->count);
286         app_stats.tx.tx_pkts += outbuf->count;
287
288         if (unlikely(nb_tx < outbuf->count)) {
289                 app_stats.tx.enqdrop_pkts +=  outbuf->count - nb_tx;
290                 do {
291                         rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
292                 } while (++nb_tx < outbuf->count);
293         }
294         outbuf->count = 0;
295 }
296
297 static inline void
298 flush_all_ports(struct output_buffer *tx_buffers, uint16_t nb_ports)
299 {
300         uint16_t outp;
301
302         for (outp = 0; outp < nb_ports; outp++) {
303                 /* skip ports that are not enabled */
304                 if ((enabled_port_mask & (1 << outp)) == 0)
305                         continue;
306
307                 if (tx_buffers[outp].count == 0)
308                         continue;
309
310                 flush_one_port(&tx_buffers[outp], outp);
311         }
312 }
313
314
315
316 static int
317 lcore_distributor(struct lcore_params *p)
318 {
319         struct rte_ring *in_r = p->rx_dist_ring;
320         struct rte_ring *out_r = p->dist_tx_ring;
321         struct rte_mbuf *bufs[BURST_SIZE * 4];
322         struct rte_distributor *d = p->d;
323
324         printf("\nCore %u acting as distributor core.\n", rte_lcore_id());
325         while (!quit_signal_dist) {
326                 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
327                                 (void *)bufs, BURST_SIZE*1, NULL);
328                 if (nb_rx) {
329                         app_stats.dist.in_pkts += nb_rx;
330
331                         /* Distribute the packets */
332                         rte_distributor_process(d, bufs, nb_rx);
333                         /* Handle Returns */
334                         const uint16_t nb_ret =
335                                 rte_distributor_returned_pkts(d,
336                                         bufs, BURST_SIZE*2);
337
338                         if (unlikely(nb_ret == 0))
339                                 continue;
340                         app_stats.dist.ret_pkts += nb_ret;
341
342                         uint16_t sent = rte_ring_enqueue_burst(out_r,
343                                         (void *)bufs, nb_ret, NULL);
344                         app_stats.dist.sent_pkts += sent;
345                         if (unlikely(sent < nb_ret)) {
346                                 app_stats.dist.enqdrop_pkts += nb_ret - sent;
347                                 RTE_LOG(DEBUG, DISTRAPP,
348                                         "%s:Packet loss due to full out ring\n",
349                                         __func__);
350                                 while (sent < nb_ret)
351                                         rte_pktmbuf_free(bufs[sent++]);
352                         }
353                 }
354         }
355         printf("\nCore %u exiting distributor task.\n", rte_lcore_id());
356         quit_signal_work = 1;
357
358         rte_distributor_flush(d);
359         /* Unblock any returns so workers can exit */
360         rte_distributor_clear_returns(d);
361         quit_signal_rx = 1;
362         return 0;
363 }
364
365
366 static int
367 lcore_tx(struct rte_ring *in_r)
368 {
369         static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
370         const uint16_t nb_ports = rte_eth_dev_count();
371         const int socket_id = rte_socket_id();
372         uint16_t port;
373
374         for (port = 0; port < nb_ports; port++) {
375                 /* skip ports that are not enabled */
376                 if ((enabled_port_mask & (1 << port)) == 0)
377                         continue;
378
379                 if (rte_eth_dev_socket_id(port) > 0 &&
380                                 rte_eth_dev_socket_id(port) != socket_id)
381                         printf("WARNING, port %u is on remote NUMA node to "
382                                         "TX thread.\n\tPerformance will not "
383                                         "be optimal.\n", port);
384         }
385
386         printf("\nCore %u doing packet TX.\n", rte_lcore_id());
387         while (!quit_signal) {
388
389                 for (port = 0; port < nb_ports; port++) {
390                         /* skip ports that are not enabled */
391                         if ((enabled_port_mask & (1 << port)) == 0)
392                                 continue;
393
394                         struct rte_mbuf *bufs[BURST_SIZE_TX];
395                         const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
396                                         (void *)bufs, BURST_SIZE_TX, NULL);
397                         app_stats.tx.dequeue_pkts += nb_rx;
398
399                         /* if we get no traffic, flush anything we have */
400                         if (unlikely(nb_rx == 0)) {
401                                 flush_all_ports(tx_buffers, nb_ports);
402                                 continue;
403                         }
404
405                         /* for traffic we receive, queue it up for transmit */
406                         uint16_t i;
407                         rte_prefetch_non_temporal((void *)bufs[0]);
408                         rte_prefetch_non_temporal((void *)bufs[1]);
409                         rte_prefetch_non_temporal((void *)bufs[2]);
410                         for (i = 0; i < nb_rx; i++) {
411                                 struct output_buffer *outbuf;
412                                 uint8_t outp;
413                                 rte_prefetch_non_temporal((void *)bufs[i + 3]);
414                                 /*
415                                  * workers should update in_port to hold the
416                                  * output port value
417                                  */
418                                 outp = bufs[i]->port;
419                                 /* skip ports that are not enabled */
420                                 if ((enabled_port_mask & (1 << outp)) == 0)
421                                         continue;
422
423                                 outbuf = &tx_buffers[outp];
424                                 outbuf->mbufs[outbuf->count++] = bufs[i];
425                                 if (outbuf->count == BURST_SIZE_TX)
426                                         flush_one_port(outbuf, outp);
427                         }
428                 }
429         }
430         printf("\nCore %u exiting tx task.\n", rte_lcore_id());
431         return 0;
432 }
433
434 static void
435 int_handler(int sig_num)
436 {
437         printf("Exiting on signal %d\n", sig_num);
438         /* set quit flag for rx thread to exit */
439         quit_signal_dist = 1;
440 }
441
442 static void
443 print_stats(void)
444 {
445         struct rte_eth_stats eth_stats;
446         unsigned int i, j;
447         const unsigned int num_workers = rte_lcore_count() - 4;
448
449         for (i = 0; i < rte_eth_dev_count(); i++) {
450                 rte_eth_stats_get(i, &eth_stats);
451                 app_stats.port_rx_pkts[i] = eth_stats.ipackets;
452                 app_stats.port_tx_pkts[i] = eth_stats.opackets;
453         }
454
455         printf("\n\nRX Thread:\n");
456         for (i = 0; i < rte_eth_dev_count(); i++) {
457                 printf("Port %u Pktsin : %5.2f\n", i,
458                                 (app_stats.port_rx_pkts[i] -
459                                 prev_app_stats.port_rx_pkts[i])/1000000.0);
460                 prev_app_stats.port_rx_pkts[i] = app_stats.port_rx_pkts[i];
461         }
462         printf(" - Received:    %5.2f\n",
463                         (app_stats.rx.rx_pkts -
464                         prev_app_stats.rx.rx_pkts)/1000000.0);
465         printf(" - Returned:    %5.2f\n",
466                         (app_stats.rx.returned_pkts -
467                         prev_app_stats.rx.returned_pkts)/1000000.0);
468         printf(" - Enqueued:    %5.2f\n",
469                         (app_stats.rx.enqueued_pkts -
470                         prev_app_stats.rx.enqueued_pkts)/1000000.0);
471         printf(" - Dropped:     %s%5.2f%s\n", ANSI_COLOR_RED,
472                         (app_stats.rx.enqdrop_pkts -
473                         prev_app_stats.rx.enqdrop_pkts)/1000000.0,
474                         ANSI_COLOR_RESET);
475
476         printf("Distributor thread:\n");
477         printf(" - In:          %5.2f\n",
478                         (app_stats.dist.in_pkts -
479                         prev_app_stats.dist.in_pkts)/1000000.0);
480         printf(" - Returned:    %5.2f\n",
481                         (app_stats.dist.ret_pkts -
482                         prev_app_stats.dist.ret_pkts)/1000000.0);
483         printf(" - Sent:        %5.2f\n",
484                         (app_stats.dist.sent_pkts -
485                         prev_app_stats.dist.sent_pkts)/1000000.0);
486         printf(" - Dropped      %s%5.2f%s\n", ANSI_COLOR_RED,
487                         (app_stats.dist.enqdrop_pkts -
488                         prev_app_stats.dist.enqdrop_pkts)/1000000.0,
489                         ANSI_COLOR_RESET);
490
491         printf("TX thread:\n");
492         printf(" - Dequeued:    %5.2f\n",
493                         (app_stats.tx.dequeue_pkts -
494                         prev_app_stats.tx.dequeue_pkts)/1000000.0);
495         for (i = 0; i < rte_eth_dev_count(); i++) {
496                 printf("Port %u Pktsout: %5.2f\n",
497                                 i, (app_stats.port_tx_pkts[i] -
498                                 prev_app_stats.port_tx_pkts[i])/1000000.0);
499                 prev_app_stats.port_tx_pkts[i] = app_stats.port_tx_pkts[i];
500         }
501         printf(" - Transmitted: %5.2f\n",
502                         (app_stats.tx.tx_pkts -
503                         prev_app_stats.tx.tx_pkts)/1000000.0);
504         printf(" - Dropped:     %s%5.2f%s\n", ANSI_COLOR_RED,
505                         (app_stats.tx.enqdrop_pkts -
506                         prev_app_stats.tx.enqdrop_pkts)/1000000.0,
507                         ANSI_COLOR_RESET);
508
509         prev_app_stats.rx.rx_pkts = app_stats.rx.rx_pkts;
510         prev_app_stats.rx.returned_pkts = app_stats.rx.returned_pkts;
511         prev_app_stats.rx.enqueued_pkts = app_stats.rx.enqueued_pkts;
512         prev_app_stats.rx.enqdrop_pkts = app_stats.rx.enqdrop_pkts;
513         prev_app_stats.dist.in_pkts = app_stats.dist.in_pkts;
514         prev_app_stats.dist.ret_pkts = app_stats.dist.ret_pkts;
515         prev_app_stats.dist.sent_pkts = app_stats.dist.sent_pkts;
516         prev_app_stats.dist.enqdrop_pkts = app_stats.dist.enqdrop_pkts;
517         prev_app_stats.tx.dequeue_pkts = app_stats.tx.dequeue_pkts;
518         prev_app_stats.tx.tx_pkts = app_stats.tx.tx_pkts;
519         prev_app_stats.tx.enqdrop_pkts = app_stats.tx.enqdrop_pkts;
520
521         for (i = 0; i < num_workers; i++) {
522                 printf("Worker %02u Pkts: %5.2f. Bursts(1-8): ", i,
523                                 (app_stats.worker_pkts[i] -
524                                 prev_app_stats.worker_pkts[i])/1000000.0);
525                 for (j = 0; j < 8; j++) {
526                         printf("%"PRIu64" ", app_stats.worker_bursts[i][j]);
527                         app_stats.worker_bursts[i][j] = 0;
528                 }
529                 printf("\n");
530                 prev_app_stats.worker_pkts[i] = app_stats.worker_pkts[i];
531         }
532 }
533
534 static int
535 lcore_worker(struct lcore_params *p)
536 {
537         struct rte_distributor *d = p->d;
538         const unsigned id = p->worker_id;
539         unsigned int num = 0;
540         unsigned int i;
541
542         /*
543          * for single port, xor_val will be zero so we won't modify the output
544          * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa
545          */
546         const unsigned xor_val = (rte_eth_dev_count() > 1);
547         struct rte_mbuf *buf[8] __rte_cache_aligned;
548
549         for (i = 0; i < 8; i++)
550                 buf[i] = NULL;
551
552         app_stats.worker_pkts[p->worker_id] = 1;
553
554         printf("\nCore %u acting as worker core.\n", rte_lcore_id());
555         while (!quit_signal_work) {
556                 num = rte_distributor_get_pkt(d, id, buf, buf, num);
557                 /* Do a little bit of work for each packet */
558                 for (i = 0; i < num; i++) {
559                         uint64_t t = rte_rdtsc()+100;
560
561                         while (rte_rdtsc() < t)
562                                 rte_pause();
563                         buf[i]->port ^= xor_val;
564                 }
565
566                 app_stats.worker_pkts[p->worker_id] += num;
567                 if (num > 0)
568                         app_stats.worker_bursts[p->worker_id][num-1]++;
569         }
570         return 0;
571 }
572
573 /* display usage */
574 static void
575 print_usage(const char *prgname)
576 {
577         printf("%s [EAL options] -- -p PORTMASK\n"
578                         "  -p PORTMASK: hexadecimal bitmask of ports to configure\n",
579                         prgname);
580 }
581
582 static int
583 parse_portmask(const char *portmask)
584 {
585         char *end = NULL;
586         unsigned long pm;
587
588         /* parse hexadecimal string */
589         pm = strtoul(portmask, &end, 16);
590         if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0'))
591                 return -1;
592
593         if (pm == 0)
594                 return -1;
595
596         return pm;
597 }
598
599 /* Parse the argument given in the command line of the application */
600 static int
601 parse_args(int argc, char **argv)
602 {
603         int opt;
604         char **argvopt;
605         int option_index;
606         char *prgname = argv[0];
607         static struct option lgopts[] = {
608                 {NULL, 0, 0, 0}
609         };
610
611         argvopt = argv;
612
613         while ((opt = getopt_long(argc, argvopt, "p:",
614                         lgopts, &option_index)) != EOF) {
615
616                 switch (opt) {
617                 /* portmask */
618                 case 'p':
619                         enabled_port_mask = parse_portmask(optarg);
620                         if (enabled_port_mask == 0) {
621                                 printf("invalid portmask\n");
622                                 print_usage(prgname);
623                                 return -1;
624                         }
625                         break;
626
627                 default:
628                         print_usage(prgname);
629                         return -1;
630                 }
631         }
632
633         if (optind <= 1) {
634                 print_usage(prgname);
635                 return -1;
636         }
637
638         argv[optind-1] = prgname;
639
640         optind = 1; /* reset getopt lib */
641         return 0;
642 }
643
644 /* Main function, does initialization and calls the per-lcore functions */
645 int
646 main(int argc, char *argv[])
647 {
648         struct rte_mempool *mbuf_pool;
649         struct rte_distributor *d;
650         struct rte_ring *dist_tx_ring;
651         struct rte_ring *rx_dist_ring;
652         unsigned lcore_id, worker_id = 0;
653         unsigned nb_ports;
654         uint16_t portid;
655         uint16_t nb_ports_available;
656         uint64_t t, freq;
657
658         /* catch ctrl-c so we can print on exit */
659         signal(SIGINT, int_handler);
660
661         /* init EAL */
662         int ret = rte_eal_init(argc, argv);
663         if (ret < 0)
664                 rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
665         argc -= ret;
666         argv += ret;
667
668         /* parse application arguments (after the EAL ones) */
669         ret = parse_args(argc, argv);
670         if (ret < 0)
671                 rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
672
673         if (rte_lcore_count() < 5)
674                 rte_exit(EXIT_FAILURE, "Error, This application needs at "
675                                 "least 5 logical cores to run:\n"
676                                 "1 lcore for stats (can be core 0)\n"
677                                 "1 lcore for packet RX\n"
678                                 "1 lcore for distribution\n"
679                                 "1 lcore for packet TX\n"
680                                 "and at least 1 lcore for worker threads\n");
681
682         nb_ports = rte_eth_dev_count();
683         if (nb_ports == 0)
684                 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n");
685         if (nb_ports != 1 && (nb_ports & 1))
686                 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except "
687                                 "when using a single port\n");
688
689         mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL",
690                 NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0,
691                 RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
692         if (mbuf_pool == NULL)
693                 rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
694         nb_ports_available = nb_ports;
695
696         /* initialize all ports */
697         for (portid = 0; portid < nb_ports; portid++) {
698                 /* skip ports that are not enabled */
699                 if ((enabled_port_mask & (1 << portid)) == 0) {
700                         printf("\nSkipping disabled port %d\n", portid);
701                         nb_ports_available--;
702                         continue;
703                 }
704                 /* init port */
705                 printf("Initializing port %u... done\n", portid);
706
707                 if (port_init(portid, mbuf_pool) != 0)
708                         rte_exit(EXIT_FAILURE, "Cannot initialize port %u\n",
709                                         portid);
710         }
711
712         if (!nb_ports_available) {
713                 rte_exit(EXIT_FAILURE,
714                                 "All available ports are disabled. Please set portmask.\n");
715         }
716
717         d = rte_distributor_create("PKT_DIST", rte_socket_id(),
718                         rte_lcore_count() - 4,
719                         RTE_DIST_ALG_BURST);
720         if (d == NULL)
721                 rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
722
723         /*
724          * scheduler ring is read by the transmitter core, and written to
725          * by scheduler core
726          */
727         dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ,
728                         rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
729         if (dist_tx_ring == NULL)
730                 rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
731
732         rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ,
733                         rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
734         if (rx_dist_ring == NULL)
735                 rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
736
737         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
738                 if (worker_id == rte_lcore_count() - 3) {
739                         printf("Starting distributor on lcore_id %d\n",
740                                         lcore_id);
741                         /* distributor core */
742                         struct lcore_params *p =
743                                         rte_malloc(NULL, sizeof(*p), 0);
744                         if (!p)
745                                 rte_panic("malloc failure\n");
746                         *p = (struct lcore_params){worker_id, d,
747                                 rx_dist_ring, dist_tx_ring, mbuf_pool};
748                         rte_eal_remote_launch(
749                                 (lcore_function_t *)lcore_distributor,
750                                 p, lcore_id);
751                 } else if (worker_id == rte_lcore_count() - 4) {
752                         printf("Starting tx  on worker_id %d, lcore_id %d\n",
753                                         worker_id, lcore_id);
754                         /* tx core */
755                         rte_eal_remote_launch((lcore_function_t *)lcore_tx,
756                                         dist_tx_ring, lcore_id);
757                 } else if (worker_id == rte_lcore_count() - 2) {
758                         printf("Starting rx on worker_id %d, lcore_id %d\n",
759                                         worker_id, lcore_id);
760                         /* rx core */
761                         struct lcore_params *p =
762                                         rte_malloc(NULL, sizeof(*p), 0);
763                         if (!p)
764                                 rte_panic("malloc failure\n");
765                         *p = (struct lcore_params){worker_id, d, rx_dist_ring,
766                                         dist_tx_ring, mbuf_pool};
767                         rte_eal_remote_launch((lcore_function_t *)lcore_rx,
768                                         p, lcore_id);
769                 } else {
770                         printf("Starting worker on worker_id %d, lcore_id %d\n",
771                                         worker_id, lcore_id);
772                         struct lcore_params *p =
773                                         rte_malloc(NULL, sizeof(*p), 0);
774                         if (!p)
775                                 rte_panic("malloc failure\n");
776                         *p = (struct lcore_params){worker_id, d, rx_dist_ring,
777                                         dist_tx_ring, mbuf_pool};
778
779                         rte_eal_remote_launch((lcore_function_t *)lcore_worker,
780                                         p, lcore_id);
781                 }
782                 worker_id++;
783         }
784
785         freq = rte_get_timer_hz();
786         t = rte_rdtsc() + freq;
787         while (!quit_signal_dist) {
788                 if (t < rte_rdtsc()) {
789                         print_stats();
790                         t = rte_rdtsc() + freq;
791                 }
792                 usleep(1000);
793         }
794
795         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
796                 if (rte_eal_wait_lcore(lcore_id) < 0)
797                         return -1;
798         }
799
800         print_stats();
801         return 0;
802 }