New upstream version 18.08
[deb_dpdk.git] / examples / eventdev_pipeline / main.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2017 Intel Corporation
3  */
4
5 #include <getopt.h>
6 #include <stdint.h>
7 #include <stdio.h>
8 #include <signal.h>
9 #include <sched.h>
10
11 #include "pipeline_common.h"
12
13 struct config_data cdata = {
14         .num_packets = (1L << 25), /* do ~32M packets */
15         .num_fids = 512,
16         .queue_type = RTE_SCHED_TYPE_ATOMIC,
17         .next_qid = {-1},
18         .qid = {-1},
19         .num_stages = 1,
20         .worker_cq_depth = 16
21 };
22
23 static bool
24 core_in_use(unsigned int lcore_id) {
25         return (fdata->rx_core[lcore_id] || fdata->sched_core[lcore_id] ||
26                 fdata->tx_core[lcore_id] || fdata->worker_core[lcore_id]);
27 }
28
29 static void
30 eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
31                         void *userdata)
32 {
33         int port_id = (uintptr_t) userdata;
34         unsigned int _sent = 0;
35
36         do {
37                 /* Note: hard-coded TX queue */
38                 _sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
39                                           unsent - _sent);
40         } while (_sent != unsent);
41 }
42
43 /*
44  * Parse the coremask given as argument (hexadecimal string) and fill
45  * the global configuration (core role and core count) with the parsed
46  * value.
47  */
48 static int xdigit2val(unsigned char c)
49 {
50         int val;
51
52         if (isdigit(c))
53                 val = c - '0';
54         else if (isupper(c))
55                 val = c - 'A' + 10;
56         else
57                 val = c - 'a' + 10;
58         return val;
59 }
60
61 static uint64_t
62 parse_coremask(const char *coremask)
63 {
64         int i, j, idx = 0;
65         unsigned int count = 0;
66         char c;
67         int val;
68         uint64_t mask = 0;
69         const int32_t BITS_HEX = 4;
70
71         if (coremask == NULL)
72                 return -1;
73         /* Remove all blank characters ahead and after .
74          * Remove 0x/0X if exists.
75          */
76         while (isblank(*coremask))
77                 coremask++;
78         if (coremask[0] == '0' && ((coremask[1] == 'x')
79                 || (coremask[1] == 'X')))
80                 coremask += 2;
81         i = strlen(coremask);
82         while ((i > 0) && isblank(coremask[i - 1]))
83                 i--;
84         if (i == 0)
85                 return -1;
86
87         for (i = i - 1; i >= 0 && idx < MAX_NUM_CORE; i--) {
88                 c = coremask[i];
89                 if (isxdigit(c) == 0) {
90                         /* invalid characters */
91                         return -1;
92                 }
93                 val = xdigit2val(c);
94                 for (j = 0; j < BITS_HEX && idx < MAX_NUM_CORE; j++, idx++) {
95                         if ((1 << j) & val) {
96                                 mask |= (1UL << idx);
97                                 count++;
98                         }
99                 }
100         }
101         for (; i >= 0; i--)
102                 if (coremask[i] != '0')
103                         return -1;
104         if (count == 0)
105                 return -1;
106         return mask;
107 }
108
109 static struct option long_options[] = {
110         {"workers", required_argument, 0, 'w'},
111         {"packets", required_argument, 0, 'n'},
112         {"atomic-flows", required_argument, 0, 'f'},
113         {"num_stages", required_argument, 0, 's'},
114         {"rx-mask", required_argument, 0, 'r'},
115         {"tx-mask", required_argument, 0, 't'},
116         {"sched-mask", required_argument, 0, 'e'},
117         {"cq-depth", required_argument, 0, 'c'},
118         {"work-cycles", required_argument, 0, 'W'},
119         {"mempool-size", required_argument, 0, 'm'},
120         {"queue-priority", no_argument, 0, 'P'},
121         {"parallel", no_argument, 0, 'p'},
122         {"ordered", no_argument, 0, 'o'},
123         {"quiet", no_argument, 0, 'q'},
124         {"use-atq", no_argument, 0, 'a'},
125         {"dump", no_argument, 0, 'D'},
126         {0, 0, 0, 0}
127 };
128
129 static void
130 usage(void)
131 {
132         const char *usage_str =
133                 "  Usage: eventdev_demo [options]\n"
134                 "  Options:\n"
135                 "  -n, --packets=N              Send N packets (default ~32M), 0 implies no limit\n"
136                 "  -f, --atomic-flows=N         Use N random flows from 1 to N (default 16)\n"
137                 "  -s, --num_stages=N           Use N atomic stages (default 1)\n"
138                 "  -r, --rx-mask=core mask      Run NIC rx on CPUs in core mask\n"
139                 "  -w, --worker-mask=core mask  Run worker on CPUs in core mask\n"
140                 "  -t, --tx-mask=core mask      Run NIC tx on CPUs in core mask\n"
141                 "  -e  --sched-mask=core mask   Run scheduler on CPUs in core mask\n"
142                 "  -c  --cq-depth=N             Worker CQ depth (default 16)\n"
143                 "  -W  --work-cycles=N          Worker cycles (default 0)\n"
144                 "  -P  --queue-priority         Enable scheduler queue prioritization\n"
145                 "  -o, --ordered                Use ordered scheduling\n"
146                 "  -p, --parallel               Use parallel scheduling\n"
147                 "  -q, --quiet                  Minimize printed output\n"
148                 "  -a, --use-atq                Use all type queues\n"
149                 "  -m, --mempool-size=N         Dictate the mempool size\n"
150                 "  -D, --dump                   Print detailed statistics before exit"
151                 "\n";
152         fprintf(stderr, "%s", usage_str);
153         exit(1);
154 }
155
156 static void
157 parse_app_args(int argc, char **argv)
158 {
159         /* Parse cli options*/
160         int option_index;
161         int c;
162         opterr = 0;
163         uint64_t rx_lcore_mask = 0;
164         uint64_t tx_lcore_mask = 0;
165         uint64_t sched_lcore_mask = 0;
166         uint64_t worker_lcore_mask = 0;
167         int i;
168
169         for (;;) {
170                 c = getopt_long(argc, argv, "r:t:e:c:w:n:f:s:m:paoPqDW:",
171                                 long_options, &option_index);
172                 if (c == -1)
173                         break;
174
175                 int popcnt = 0;
176                 switch (c) {
177                 case 'n':
178                         cdata.num_packets = (int64_t)atol(optarg);
179                         if (cdata.num_packets == 0)
180                                 cdata.num_packets = INT64_MAX;
181                         break;
182                 case 'f':
183                         cdata.num_fids = (unsigned int)atoi(optarg);
184                         break;
185                 case 's':
186                         cdata.num_stages = (unsigned int)atoi(optarg);
187                         break;
188                 case 'c':
189                         cdata.worker_cq_depth = (unsigned int)atoi(optarg);
190                         break;
191                 case 'W':
192                         cdata.worker_cycles = (unsigned int)atoi(optarg);
193                         break;
194                 case 'P':
195                         cdata.enable_queue_priorities = 1;
196                         break;
197                 case 'o':
198                         cdata.queue_type = RTE_SCHED_TYPE_ORDERED;
199                         break;
200                 case 'p':
201                         cdata.queue_type = RTE_SCHED_TYPE_PARALLEL;
202                         break;
203                 case 'a':
204                         cdata.all_type_queues = 1;
205                         break;
206                 case 'q':
207                         cdata.quiet = 1;
208                         break;
209                 case 'D':
210                         cdata.dump_dev = 1;
211                         break;
212                 case 'w':
213                         worker_lcore_mask = parse_coremask(optarg);
214                         break;
215                 case 'r':
216                         rx_lcore_mask = parse_coremask(optarg);
217                         popcnt = __builtin_popcountll(rx_lcore_mask);
218                         fdata->rx_single = (popcnt == 1);
219                         break;
220                 case 't':
221                         tx_lcore_mask = parse_coremask(optarg);
222                         popcnt = __builtin_popcountll(tx_lcore_mask);
223                         fdata->tx_single = (popcnt == 1);
224                         break;
225                 case 'e':
226                         sched_lcore_mask = parse_coremask(optarg);
227                         popcnt = __builtin_popcountll(sched_lcore_mask);
228                         fdata->sched_single = (popcnt == 1);
229                         break;
230                 case 'm':
231                         cdata.num_mbuf = (uint64_t)atol(optarg);
232                         break;
233                 default:
234                         usage();
235                 }
236         }
237
238         cdata.worker_lcore_mask = worker_lcore_mask;
239         cdata.sched_lcore_mask = sched_lcore_mask;
240         cdata.rx_lcore_mask = rx_lcore_mask;
241         cdata.tx_lcore_mask = tx_lcore_mask;
242
243         if (cdata.num_stages == 0 || cdata.num_stages > MAX_NUM_STAGES)
244                 usage();
245
246         for (i = 0; i < MAX_NUM_CORE; i++) {
247                 fdata->rx_core[i] = !!(rx_lcore_mask & (1UL << i));
248                 fdata->tx_core[i] = !!(tx_lcore_mask & (1UL << i));
249                 fdata->sched_core[i] = !!(sched_lcore_mask & (1UL << i));
250                 fdata->worker_core[i] = !!(worker_lcore_mask & (1UL << i));
251
252                 if (fdata->worker_core[i])
253                         cdata.num_workers++;
254                 if (core_in_use(i))
255                         cdata.active_cores++;
256         }
257 }
258
259 /*
260  * Initializes a given port using global settings and with the RX buffers
261  * coming from the mbuf_pool passed as a parameter.
262  */
263 static inline int
264 port_init(uint8_t port, struct rte_mempool *mbuf_pool)
265 {
266         static const struct rte_eth_conf port_conf_default = {
267                 .rxmode = {
268                         .mq_mode = ETH_MQ_RX_RSS,
269                         .max_rx_pkt_len = ETHER_MAX_LEN,
270                 },
271                 .rx_adv_conf = {
272                         .rss_conf = {
273                                 .rss_hf = ETH_RSS_IP |
274                                           ETH_RSS_TCP |
275                                           ETH_RSS_UDP,
276                         }
277                 }
278         };
279         const uint16_t rx_rings = 1, tx_rings = 1;
280         const uint16_t rx_ring_size = 512, tx_ring_size = 512;
281         struct rte_eth_conf port_conf = port_conf_default;
282         int retval;
283         uint16_t q;
284         struct rte_eth_dev_info dev_info;
285         struct rte_eth_txconf txconf;
286
287         if (!rte_eth_dev_is_valid_port(port))
288                 return -1;
289
290         rte_eth_dev_info_get(port, &dev_info);
291         if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
292                 port_conf.txmode.offloads |=
293                         DEV_TX_OFFLOAD_MBUF_FAST_FREE;
294
295         port_conf.rx_adv_conf.rss_conf.rss_hf &=
296                 dev_info.flow_type_rss_offloads;
297         if (port_conf.rx_adv_conf.rss_conf.rss_hf !=
298                         port_conf_default.rx_adv_conf.rss_conf.rss_hf) {
299                 printf("Port %u modified RSS hash function based on hardware support,"
300                         "requested:%#"PRIx64" configured:%#"PRIx64"\n",
301                         port,
302                         port_conf_default.rx_adv_conf.rss_conf.rss_hf,
303                         port_conf.rx_adv_conf.rss_conf.rss_hf);
304         }
305
306         /* Configure the Ethernet device. */
307         retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
308         if (retval != 0)
309                 return retval;
310
311         /* Allocate and set up 1 RX queue per Ethernet port. */
312         for (q = 0; q < rx_rings; q++) {
313                 retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
314                                 rte_eth_dev_socket_id(port), NULL, mbuf_pool);
315                 if (retval < 0)
316                         return retval;
317         }
318
319         txconf = dev_info.default_txconf;
320         txconf.offloads = port_conf_default.txmode.offloads;
321         /* Allocate and set up 1 TX queue per Ethernet port. */
322         for (q = 0; q < tx_rings; q++) {
323                 retval = rte_eth_tx_queue_setup(port, q, tx_ring_size,
324                                 rte_eth_dev_socket_id(port), &txconf);
325                 if (retval < 0)
326                         return retval;
327         }
328
329         /* Start the Ethernet port. */
330         retval = rte_eth_dev_start(port);
331         if (retval < 0)
332                 return retval;
333
334         /* Display the port MAC address. */
335         struct ether_addr addr;
336         rte_eth_macaddr_get(port, &addr);
337         printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
338                            " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
339                         (unsigned int)port,
340                         addr.addr_bytes[0], addr.addr_bytes[1],
341                         addr.addr_bytes[2], addr.addr_bytes[3],
342                         addr.addr_bytes[4], addr.addr_bytes[5]);
343
344         /* Enable RX in promiscuous mode for the Ethernet device. */
345         rte_eth_promiscuous_enable(port);
346
347         return 0;
348 }
349
350 static int
351 init_ports(uint16_t num_ports)
352 {
353         uint16_t portid, i;
354
355         if (!cdata.num_mbuf)
356                 cdata.num_mbuf = 16384 * num_ports;
357
358         struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool",
359                         /* mbufs */ cdata.num_mbuf,
360                         /* cache_size */ 512,
361                         /* priv_size*/ 0,
362                         /* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE,
363                         rte_socket_id());
364
365         RTE_ETH_FOREACH_DEV(portid)
366                 if (port_init(portid, mp) != 0)
367                         rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n",
368                                         portid);
369
370         RTE_ETH_FOREACH_DEV(i) {
371                 void *userdata = (void *)(uintptr_t) i;
372                 fdata->tx_buf[i] =
373                         rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0);
374                 if (fdata->tx_buf[i] == NULL)
375                         rte_panic("Out of memory\n");
376                 rte_eth_tx_buffer_init(fdata->tx_buf[i], 32);
377                 rte_eth_tx_buffer_set_err_callback(fdata->tx_buf[i],
378                                                    eth_tx_buffer_retry,
379                                                    userdata);
380         }
381
382         return 0;
383 }
384
385 static void
386 do_capability_setup(uint8_t eventdev_id)
387 {
388         uint16_t i;
389         uint8_t mt_unsafe = 0;
390         uint8_t burst = 0;
391
392         RTE_ETH_FOREACH_DEV(i) {
393                 struct rte_eth_dev_info dev_info;
394                 memset(&dev_info, 0, sizeof(struct rte_eth_dev_info));
395
396                 rte_eth_dev_info_get(i, &dev_info);
397                 /* Check if it is safe ask worker to tx. */
398                 mt_unsafe |= !(dev_info.tx_offload_capa &
399                                 DEV_TX_OFFLOAD_MT_LOCKFREE);
400         }
401
402         struct rte_event_dev_info eventdev_info;
403         memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
404
405         rte_event_dev_info_get(eventdev_id, &eventdev_info);
406         burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 :
407                 0;
408
409         if (mt_unsafe)
410                 set_worker_generic_setup_data(&fdata->cap, burst);
411         else
412                 set_worker_tx_setup_data(&fdata->cap, burst);
413 }
414
415 static void
416 signal_handler(int signum)
417 {
418         if (fdata->done)
419                 rte_exit(1, "Exiting on signal %d\n", signum);
420         if (signum == SIGINT || signum == SIGTERM) {
421                 printf("\n\nSignal %d received, preparing to exit...\n",
422                                 signum);
423                 fdata->done = 1;
424         }
425         if (signum == SIGTSTP)
426                 rte_event_dev_dump(0, stdout);
427 }
428
429 static inline uint64_t
430 port_stat(int dev_id, int32_t p)
431 {
432         char statname[64];
433         snprintf(statname, sizeof(statname), "port_%u_rx", p);
434         return rte_event_dev_xstats_by_name_get(dev_id, statname, NULL);
435 }
436
437 int
438 main(int argc, char **argv)
439 {
440         struct worker_data *worker_data;
441         uint16_t num_ports;
442         int lcore_id;
443         int err;
444
445         signal(SIGINT, signal_handler);
446         signal(SIGTERM, signal_handler);
447         signal(SIGTSTP, signal_handler);
448
449         err = rte_eal_init(argc, argv);
450         if (err < 0)
451                 rte_panic("Invalid EAL arguments\n");
452
453         argc -= err;
454         argv += err;
455
456         fdata = rte_malloc(NULL, sizeof(struct fastpath_data), 0);
457         if (fdata == NULL)
458                 rte_panic("Out of memory\n");
459
460         /* Parse cli options*/
461         parse_app_args(argc, argv);
462
463         num_ports = rte_eth_dev_count_avail();
464         if (num_ports == 0)
465                 rte_panic("No ethernet ports found\n");
466
467         const unsigned int cores_needed = cdata.active_cores;
468
469         if (!cdata.quiet) {
470                 printf("  Config:\n");
471                 printf("\tports: %u\n", num_ports);
472                 printf("\tworkers: %u\n", cdata.num_workers);
473                 printf("\tpackets: %"PRIi64"\n", cdata.num_packets);
474                 printf("\tQueue-prio: %u\n", cdata.enable_queue_priorities);
475                 if (cdata.queue_type == RTE_SCHED_TYPE_ORDERED)
476                         printf("\tqid0 type: ordered\n");
477                 if (cdata.queue_type == RTE_SCHED_TYPE_ATOMIC)
478                         printf("\tqid0 type: atomic\n");
479                 printf("\tCores available: %u\n", rte_lcore_count());
480                 printf("\tCores used: %u\n", cores_needed);
481         }
482
483         if (rte_lcore_count() < cores_needed)
484                 rte_panic("Too few cores (%d < %d)\n", rte_lcore_count(),
485                                 cores_needed);
486
487         const unsigned int ndevs = rte_event_dev_count();
488         if (ndevs == 0)
489                 rte_panic("No dev_id devs found. Pasl in a --vdev eventdev.\n");
490         if (ndevs > 1)
491                 fprintf(stderr, "Warning: More than one eventdev, using idx 0");
492
493
494         do_capability_setup(0);
495         fdata->cap.check_opt();
496
497         worker_data = rte_calloc(0, cdata.num_workers,
498                         sizeof(worker_data[0]), 0);
499         if (worker_data == NULL)
500                 rte_panic("rte_calloc failed\n");
501
502         int dev_id = fdata->cap.evdev_setup(&cons_data, worker_data);
503         if (dev_id < 0)
504                 rte_exit(EXIT_FAILURE, "Error setting up eventdev\n");
505
506         init_ports(num_ports);
507         fdata->cap.adptr_setup(num_ports);
508
509         int worker_idx = 0;
510         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
511                 if (lcore_id >= MAX_NUM_CORE)
512                         break;
513
514                 if (!fdata->rx_core[lcore_id] &&
515                         !fdata->worker_core[lcore_id] &&
516                         !fdata->tx_core[lcore_id] &&
517                         !fdata->sched_core[lcore_id])
518                         continue;
519
520                 if (fdata->rx_core[lcore_id])
521                         printf(
522                                 "[%s()] lcore %d executing NIC Rx\n",
523                                 __func__, lcore_id);
524
525                 if (fdata->tx_core[lcore_id])
526                         printf(
527                                 "[%s()] lcore %d executing NIC Tx, and using eventdev port %u\n",
528                                 __func__, lcore_id, cons_data.port_id);
529
530                 if (fdata->sched_core[lcore_id])
531                         printf("[%s()] lcore %d executing scheduler\n",
532                                         __func__, lcore_id);
533
534                 if (fdata->worker_core[lcore_id])
535                         printf(
536                                 "[%s()] lcore %d executing worker, using eventdev port %u\n",
537                                 __func__, lcore_id,
538                                 worker_data[worker_idx].port_id);
539
540                 err = rte_eal_remote_launch(fdata->cap.worker,
541                                 &worker_data[worker_idx], lcore_id);
542                 if (err) {
543                         rte_panic("Failed to launch worker on core %d\n",
544                                         lcore_id);
545                         continue;
546                 }
547                 if (fdata->worker_core[lcore_id])
548                         worker_idx++;
549         }
550
551         lcore_id = rte_lcore_id();
552
553         if (core_in_use(lcore_id))
554                 fdata->cap.worker(&worker_data[worker_idx++]);
555
556         rte_eal_mp_wait_lcore();
557
558         if (cdata.dump_dev)
559                 rte_event_dev_dump(dev_id, stdout);
560
561         if (!cdata.quiet && (port_stat(dev_id, worker_data[0].port_id) !=
562                         (uint64_t)-ENOTSUP)) {
563                 printf("\nPort Workload distribution:\n");
564                 uint32_t i;
565                 uint64_t tot_pkts = 0;
566                 uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0};
567                 for (i = 0; i < cdata.num_workers; i++) {
568                         pkts_per_wkr[i] =
569                                 port_stat(dev_id, worker_data[i].port_id);
570                         tot_pkts += pkts_per_wkr[i];
571                 }
572                 for (i = 0; i < cdata.num_workers; i++) {
573                         float pc = pkts_per_wkr[i]  * 100 /
574                                 ((float)tot_pkts);
575                         printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n",
576                                         i, pc, pkts_per_wkr[i]);
577                 }
578
579         }
580
581         return 0;
582 }