New upstream version 18.08
[deb_dpdk.git] / app / test-eventdev / test_pipeline_atq.c
1 /*
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright 2017 Cavium, Inc.
4  */
5
6 #include "test_pipeline_common.h"
7
8 /* See http://dpdk.org/doc/guides/tools/testeventdev.html for test details */
9
10 static __rte_always_inline int
11 pipeline_atq_nb_event_queues(struct evt_options *opt)
12 {
13         RTE_SET_USED(opt);
14
15         return rte_eth_dev_count_avail();
16 }
17
18 static int
19 pipeline_atq_worker_single_stage_tx(void *arg)
20 {
21         PIPELINE_WROKER_SINGLE_STAGE_INIT;
22
23         while (t->done == false) {
24                 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
25
26                 if (!event) {
27                         rte_pause();
28                         continue;
29                 }
30
31                 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
32                         pipeline_tx_pkt(ev.mbuf);
33                         w->processed_pkts++;
34                         continue;
35                 }
36                 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
37                 pipeline_event_enqueue(dev, port, &ev);
38         }
39
40         return 0;
41 }
42
43 static int
44 pipeline_atq_worker_single_stage_fwd(void *arg)
45 {
46         PIPELINE_WROKER_SINGLE_STAGE_INIT;
47         const uint8_t tx_queue = t->tx_service.queue_id;
48
49         while (t->done == false) {
50                 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
51
52                 if (!event) {
53                         rte_pause();
54                         continue;
55                 }
56
57                 w->processed_pkts++;
58                 ev.queue_id = tx_queue;
59                 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
60                 pipeline_event_enqueue(dev, port, &ev);
61         }
62
63         return 0;
64 }
65
66 static int
67 pipeline_atq_worker_single_stage_burst_tx(void *arg)
68 {
69         PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT;
70
71         while (t->done == false) {
72                 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
73                                 BURST_SIZE, 0);
74
75                 if (!nb_rx) {
76                         rte_pause();
77                         continue;
78                 }
79
80                 for (i = 0; i < nb_rx; i++) {
81                         rte_prefetch0(ev[i + 1].mbuf);
82                         if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
83
84                                 pipeline_tx_pkt(ev[i].mbuf);
85                                 ev[i].op = RTE_EVENT_OP_RELEASE;
86                                 w->processed_pkts++;
87                         } else
88                                 pipeline_fwd_event(&ev[i],
89                                                 RTE_SCHED_TYPE_ATOMIC);
90                 }
91
92                 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
93         }
94
95         return 0;
96 }
97
98 static int
99 pipeline_atq_worker_single_stage_burst_fwd(void *arg)
100 {
101         PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT;
102         const uint8_t tx_queue = t->tx_service.queue_id;
103
104         while (t->done == false) {
105                 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
106                                 BURST_SIZE, 0);
107
108                 if (!nb_rx) {
109                         rte_pause();
110                         continue;
111                 }
112
113                 for (i = 0; i < nb_rx; i++) {
114                         rte_prefetch0(ev[i + 1].mbuf);
115                         ev[i].queue_id = tx_queue;
116                         pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
117                         w->processed_pkts++;
118                 }
119
120                 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
121         }
122
123         return 0;
124 }
125
126 static int
127 pipeline_atq_worker_multi_stage_tx(void *arg)
128 {
129         PIPELINE_WROKER_MULTI_STAGE_INIT;
130         const uint8_t nb_stages = t->opt->nb_stages;
131
132
133         while (t->done == false) {
134                 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
135
136                 if (!event) {
137                         rte_pause();
138                         continue;
139                 }
140
141                 cq_id = ev.sub_event_type % nb_stages;
142
143                 if (cq_id == last_queue) {
144                         if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
145
146                                 pipeline_tx_pkt(ev.mbuf);
147                                 w->processed_pkts++;
148                                 continue;
149                         }
150                         pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
151                 } else {
152                         ev.sub_event_type++;
153                         pipeline_fwd_event(&ev, sched_type_list[cq_id]);
154                 }
155
156                 pipeline_event_enqueue(dev, port, &ev);
157         }
158         return 0;
159 }
160
161 static int
162 pipeline_atq_worker_multi_stage_fwd(void *arg)
163 {
164         PIPELINE_WROKER_MULTI_STAGE_INIT;
165         const uint8_t nb_stages = t->opt->nb_stages;
166         const uint8_t tx_queue = t->tx_service.queue_id;
167
168         while (t->done == false) {
169                 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
170
171                 if (!event) {
172                         rte_pause();
173                         continue;
174                 }
175
176                 cq_id = ev.sub_event_type % nb_stages;
177
178                 if (cq_id == last_queue) {
179                         w->processed_pkts++;
180                         ev.queue_id = tx_queue;
181                         pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
182                 } else {
183                         ev.sub_event_type++;
184                         pipeline_fwd_event(&ev, sched_type_list[cq_id]);
185                 }
186
187                 pipeline_event_enqueue(dev, port, &ev);
188         }
189         return 0;
190 }
191
192 static int
193 pipeline_atq_worker_multi_stage_burst_tx(void *arg)
194 {
195         PIPELINE_WROKER_MULTI_STAGE_BURST_INIT;
196         const uint8_t nb_stages = t->opt->nb_stages;
197
198         while (t->done == false) {
199                 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
200                                 BURST_SIZE, 0);
201
202                 if (!nb_rx) {
203                         rte_pause();
204                         continue;
205                 }
206
207                 for (i = 0; i < nb_rx; i++) {
208                         rte_prefetch0(ev[i + 1].mbuf);
209                         cq_id = ev[i].sub_event_type % nb_stages;
210
211                         if (cq_id == last_queue) {
212                                 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
213
214                                         pipeline_tx_pkt(ev[i].mbuf);
215                                         ev[i].op = RTE_EVENT_OP_RELEASE;
216                                         w->processed_pkts++;
217                                         continue;
218                                 }
219
220                                 pipeline_fwd_event(&ev[i],
221                                                 RTE_SCHED_TYPE_ATOMIC);
222                         } else {
223                                 ev[i].sub_event_type++;
224                                 pipeline_fwd_event(&ev[i],
225                                                 sched_type_list[cq_id]);
226                         }
227                 }
228
229                 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
230         }
231         return 0;
232 }
233
234 static int
235 pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
236 {
237         PIPELINE_WROKER_MULTI_STAGE_BURST_INIT;
238         const uint8_t nb_stages = t->opt->nb_stages;
239         const uint8_t tx_queue = t->tx_service.queue_id;
240
241         while (t->done == false) {
242                 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
243                                 BURST_SIZE, 0);
244
245                 if (!nb_rx) {
246                         rte_pause();
247                         continue;
248                 }
249
250                 for (i = 0; i < nb_rx; i++) {
251                         rte_prefetch0(ev[i + 1].mbuf);
252                         cq_id = ev[i].sub_event_type % nb_stages;
253
254                         if (cq_id == last_queue) {
255                                 w->processed_pkts++;
256                                 ev[i].queue_id = tx_queue;
257                                 pipeline_fwd_event(&ev[i],
258                                                 RTE_SCHED_TYPE_ATOMIC);
259                         } else {
260                                 ev[i].sub_event_type++;
261                                 pipeline_fwd_event(&ev[i],
262                                                 sched_type_list[cq_id]);
263                         }
264                 }
265
266                 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
267         }
268         return 0;
269 }
270
271 static int
272 worker_wrapper(void *arg)
273 {
274         struct worker_data *w  = arg;
275         struct evt_options *opt = w->t->opt;
276         const bool burst = evt_has_burst_mode(w->dev_id);
277         const bool mt_safe = !w->t->mt_unsafe;
278         const uint8_t nb_stages = opt->nb_stages;
279         RTE_SET_USED(opt);
280
281         if (nb_stages == 1) {
282                 if (!burst && mt_safe)
283                         return pipeline_atq_worker_single_stage_tx(arg);
284                 else if (!burst && !mt_safe)
285                         return pipeline_atq_worker_single_stage_fwd(arg);
286                 else if (burst && mt_safe)
287                         return pipeline_atq_worker_single_stage_burst_tx(arg);
288                 else if (burst && !mt_safe)
289                         return pipeline_atq_worker_single_stage_burst_fwd(arg);
290         } else {
291                 if (!burst && mt_safe)
292                         return pipeline_atq_worker_multi_stage_tx(arg);
293                 else if (!burst && !mt_safe)
294                         return pipeline_atq_worker_multi_stage_fwd(arg);
295                 if (burst && mt_safe)
296                         return pipeline_atq_worker_multi_stage_burst_tx(arg);
297                 else if (burst && !mt_safe)
298                         return pipeline_atq_worker_multi_stage_burst_fwd(arg);
299         }
300         rte_panic("invalid worker\n");
301 }
302
303 static int
304 pipeline_atq_launch_lcores(struct evt_test *test, struct evt_options *opt)
305 {
306         struct test_pipeline *t = evt_test_priv(test);
307
308         if (t->mt_unsafe)
309                 rte_service_component_runstate_set(t->tx_service.service_id, 1);
310         return pipeline_launch_lcores(test, opt, worker_wrapper);
311 }
312
313 static int
314 pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
315 {
316         int ret;
317         int nb_ports;
318         int nb_queues;
319         uint8_t queue;
320         struct rte_event_dev_info info;
321         struct test_pipeline *t = evt_test_priv(test);
322         uint8_t tx_evqueue_id = 0;
323         uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
324         uint8_t nb_worker_queues = 0;
325
326         nb_ports = evt_nr_active_lcores(opt->wlcores);
327         nb_queues = rte_eth_dev_count_avail();
328
329         /* One extra port and queueu for Tx service */
330         if (t->mt_unsafe) {
331                 tx_evqueue_id = nb_queues;
332                 nb_ports++;
333                 nb_queues++;
334         }
335
336
337         rte_event_dev_info_get(opt->dev_id, &info);
338
339         const struct rte_event_dev_config config = {
340                         .nb_event_queues = nb_queues,
341                         .nb_event_ports = nb_ports,
342                         .nb_events_limit  = info.max_num_events,
343                         .nb_event_queue_flows = opt->nb_flows,
344                         .nb_event_port_dequeue_depth =
345                                 info.max_event_port_dequeue_depth,
346                         .nb_event_port_enqueue_depth =
347                                 info.max_event_port_enqueue_depth,
348         };
349         ret = rte_event_dev_configure(opt->dev_id, &config);
350         if (ret) {
351                 evt_err("failed to configure eventdev %d", opt->dev_id);
352                 return ret;
353         }
354
355         struct rte_event_queue_conf q_conf = {
356                         .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
357                         .nb_atomic_flows = opt->nb_flows,
358                         .nb_atomic_order_sequences = opt->nb_flows,
359         };
360         /* queue configurations */
361         for (queue = 0; queue < nb_queues; queue++) {
362                 q_conf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ALL_TYPES;
363
364                 if (t->mt_unsafe) {
365                         if (queue == tx_evqueue_id) {
366                                 q_conf.event_queue_cfg =
367                                         RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
368                         } else {
369                                 queue_arr[nb_worker_queues] = queue;
370                                 nb_worker_queues++;
371                         }
372                 }
373
374                 ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
375                 if (ret) {
376                         evt_err("failed to setup queue=%d", queue);
377                         return ret;
378                 }
379         }
380
381         if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth)
382                 opt->wkr_deq_dep = info.max_event_port_dequeue_depth;
383
384         /* port configuration */
385         const struct rte_event_port_conf p_conf = {
386                         .dequeue_depth = opt->wkr_deq_dep,
387                         .enqueue_depth = info.max_event_port_dequeue_depth,
388                         .new_event_threshold = info.max_num_events,
389         };
390
391         if (t->mt_unsafe) {
392                 ret = pipeline_event_port_setup(test, opt, queue_arr,
393                                 nb_worker_queues, p_conf);
394                 if (ret)
395                         return ret;
396
397                 ret = pipeline_event_tx_service_setup(test, opt, tx_evqueue_id,
398                                 nb_ports - 1, p_conf);
399         } else
400                 ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
401                                 p_conf);
402
403         if (ret)
404                 return ret;
405
406         /*
407          * The pipelines are setup in the following manner:
408          *
409          * eth_dev_count = 2, nb_stages = 2, atq mode
410          *
411          * Multi thread safe :
412          *      queues = 2
413          *      stride = 1
414          *
415          *      event queue pipelines:
416          *      eth0 -> q0 ->tx
417          *      eth1 -> q1 ->tx
418          *
419          *      q0, q1 are configured as ATQ so, all the different stages can
420          *      be enqueued on the same queue.
421          *
422          * Multi thread unsafe :
423          *      queues = 3
424          *      stride = 1
425          *
426          *      event queue pipelines:
427          *      eth0 -> q0
428          *                } (q3->tx) Tx service
429          *      eth1 -> q1
430          *
431          *      q0,q1 are configured as stated above.
432          *      q3 configured as SINGLE_LINK|ATOMIC.
433          */
434         ret = pipeline_event_rx_adapter_setup(opt, 1, p_conf);
435         if (ret)
436                 return ret;
437
438         if (!evt_has_distributed_sched(opt->dev_id)) {
439                 uint32_t service_id;
440                 rte_event_dev_service_id_get(opt->dev_id, &service_id);
441                 ret = evt_service_setup(service_id);
442                 if (ret) {
443                         evt_err("No service lcore found to run event dev.");
444                         return ret;
445                 }
446         }
447
448         ret = rte_event_dev_start(opt->dev_id);
449         if (ret) {
450                 evt_err("failed to start eventdev %d", opt->dev_id);
451                 return ret;
452         }
453
454         return 0;
455 }
456
457 static void
458 pipeline_atq_opt_dump(struct evt_options *opt)
459 {
460         pipeline_opt_dump(opt, pipeline_atq_nb_event_queues(opt));
461 }
462
463 static int
464 pipeline_atq_opt_check(struct evt_options *opt)
465 {
466         return pipeline_opt_check(opt, pipeline_atq_nb_event_queues(opt));
467 }
468
469 static bool
470 pipeline_atq_capability_check(struct evt_options *opt)
471 {
472         struct rte_event_dev_info dev_info;
473
474         rte_event_dev_info_get(opt->dev_id, &dev_info);
475         if (dev_info.max_event_queues < pipeline_atq_nb_event_queues(opt) ||
476                         dev_info.max_event_ports <
477                         evt_nr_active_lcores(opt->wlcores)) {
478                 evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
479                         pipeline_atq_nb_event_queues(opt),
480                         dev_info.max_event_queues,
481                         evt_nr_active_lcores(opt->wlcores),
482                         dev_info.max_event_ports);
483         }
484
485         return true;
486 }
487
488 static const struct evt_test_ops pipeline_atq =  {
489         .cap_check          = pipeline_atq_capability_check,
490         .opt_check          = pipeline_atq_opt_check,
491         .opt_dump           = pipeline_atq_opt_dump,
492         .test_setup         = pipeline_test_setup,
493         .mempool_setup      = pipeline_mempool_setup,
494         .ethdev_setup       = pipeline_ethdev_setup,
495         .eventdev_setup     = pipeline_atq_eventdev_setup,
496         .launch_lcores      = pipeline_atq_launch_lcores,
497         .eventdev_destroy   = pipeline_eventdev_destroy,
498         .mempool_destroy    = pipeline_mempool_destroy,
499         .ethdev_destroy     = pipeline_ethdev_destroy,
500         .test_result        = pipeline_test_result,
501         .test_destroy       = pipeline_test_destroy,
502 };
503
504 EVT_TEST_REGISTER(pipeline_atq);