New upstream version 18.11-rc1
[deb_dpdk.git] / examples / eventdev_pipeline / pipeline_worker_tx.c
index 3dbde92..85eb075 100644 (file)
@@ -36,10 +36,11 @@ worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
 }
 
 static __rte_always_inline void
-worker_tx_pkt(struct rte_mbuf *mbuf)
+worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev)
 {
-       exchange_mac(mbuf);
-       while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1)
+       exchange_mac(ev->mbuf);
+       rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0);
+       while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1))
                rte_pause();
 }
 
@@ -64,15 +65,15 @@ worker_do_tx_single(void *arg)
                received++;
 
                if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-                       worker_tx_pkt(ev.mbuf);
+                       worker_tx_pkt(dev, port, &ev);
                        tx++;
-                       continue;
+               } else {
+                       work();
+                       ev.queue_id++;
+                       worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+                       worker_event_enqueue(dev, port, &ev);
+                       fwd++;
                }
-               work();
-               ev.queue_id++;
-               worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
-               worker_event_enqueue(dev, port, &ev);
-               fwd++;
        }
 
        if (!cdata.quiet)
@@ -100,14 +101,14 @@ worker_do_tx_single_atq(void *arg)
                received++;
 
                if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-                       worker_tx_pkt(ev.mbuf);
+                       worker_tx_pkt(dev, port, &ev);
                        tx++;
-                       continue;
+               } else {
+                       work();
+                       worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+                       worker_event_enqueue(dev, port, &ev);
+                       fwd++;
                }
-               work();
-               worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
-               worker_event_enqueue(dev, port, &ev);
-               fwd++;
        }
 
        if (!cdata.quiet)
@@ -141,7 +142,7 @@ worker_do_tx_single_burst(void *arg)
                        rte_prefetch0(ev[i + 1].mbuf);
                        if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
 
-                               worker_tx_pkt(ev[i].mbuf);
+                               worker_tx_pkt(dev, port, &ev[i]);
                                ev[i].op = RTE_EVENT_OP_RELEASE;
                                tx++;
 
@@ -188,7 +189,7 @@ worker_do_tx_single_burst_atq(void *arg)
                        rte_prefetch0(ev[i + 1].mbuf);
                        if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
 
-                               worker_tx_pkt(ev[i].mbuf);
+                               worker_tx_pkt(dev, port, &ev[i]);
                                ev[i].op = RTE_EVENT_OP_RELEASE;
                                tx++;
                        } else
@@ -232,7 +233,7 @@ worker_do_tx(void *arg)
 
                if (cq_id >= lst_qid) {
                        if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-                               worker_tx_pkt(ev.mbuf);
+                               worker_tx_pkt(dev, port, &ev);
                                tx++;
                                continue;
                        }
@@ -280,7 +281,7 @@ worker_do_tx_atq(void *arg)
 
                if (cq_id == lst_qid) {
                        if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-                               worker_tx_pkt(ev.mbuf);
+                               worker_tx_pkt(dev, port, &ev);
                                tx++;
                                continue;
                        }
@@ -330,7 +331,7 @@ worker_do_tx_burst(void *arg)
 
                        if (cq_id >= lst_qid) {
                                if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
-                                       worker_tx_pkt(ev[i].mbuf);
+                                       worker_tx_pkt(dev, port, &ev[i]);
                                        tx++;
                                        ev[i].op = RTE_EVENT_OP_RELEASE;
                                        continue;
@@ -387,7 +388,7 @@ worker_do_tx_burst_atq(void *arg)
 
                        if (cq_id == lst_qid) {
                                if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
-                                       worker_tx_pkt(ev[i].mbuf);
+                                       worker_tx_pkt(dev, port, &ev[i]);
                                        tx++;
                                        ev[i].op = RTE_EVENT_OP_RELEASE;
                                        continue;
@@ -413,10 +414,8 @@ worker_do_tx_burst_atq(void *arg)
 }
 
 static int
-setup_eventdev_worker_tx(struct cons_data *cons_data,
-               struct worker_data *worker_data)
+setup_eventdev_worker_tx_enq(struct worker_data *worker_data)
 {
-       RTE_SET_USED(cons_data);
        uint8_t i;
        const uint8_t atq = cdata.all_type_queues ? 1 : 0;
        const uint8_t dev_id = 0;
@@ -575,10 +574,9 @@ setup_eventdev_worker_tx(struct cons_data *cons_data,
        }
        rte_service_runstate_set(fdata->evdev_service_id, 1);
        rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
-       if (rte_event_dev_start(dev_id) < 0) {
-               printf("Error starting eventdev\n");
-               return -1;
-       }
+
+       if (rte_event_dev_start(dev_id) < 0)
+               rte_exit(EXIT_FAILURE, "Error starting eventdev");
 
        return dev_id;
 }
@@ -602,7 +600,7 @@ service_rx_adapter(void *arg)
 }
 
 static void
-init_rx_adapter(uint16_t nb_ports)
+init_adapters(uint16_t nb_ports)
 {
        int i;
        int ret;
@@ -613,17 +611,18 @@ init_rx_adapter(uint16_t nb_ports)
        ret = rte_event_dev_info_get(evdev_id, &dev_info);
        adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0);
 
-       struct rte_event_port_conf rx_p_conf = {
-               .dequeue_depth = 8,
-               .enqueue_depth = 8,
-               .new_event_threshold = 1200,
+       struct rte_event_port_conf adptr_p_conf = {
+               .dequeue_depth = cdata.worker_cq_depth,
+               .enqueue_depth = 64,
+               .new_event_threshold = 4096,
        };
 
-       if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
-               rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
-       if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
-               rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
-
+       if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
+               adptr_p_conf.dequeue_depth =
+                       dev_info.max_event_port_dequeue_depth;
+       if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
+               adptr_p_conf.enqueue_depth =
+                       dev_info.max_event_port_enqueue_depth;
 
        struct rte_event_eth_rx_adapter_queue_conf queue_conf;
        memset(&queue_conf, 0, sizeof(queue_conf));
@@ -633,11 +632,11 @@ init_rx_adapter(uint16_t nb_ports)
                uint32_t cap;
                uint32_t service_id;
 
-               ret = rte_event_eth_rx_adapter_create(i, evdev_id, &rx_p_conf);
+               ret = rte_event_eth_rx_adapter_create(i, evdev_id,
+                               &adptr_p_conf);
                if (ret)
                        rte_exit(EXIT_FAILURE,
-                                       "failed to create rx adapter[%d]",
-                                       cdata.rx_adapter_id);
+                                       "failed to create rx adapter[%d]", i);
 
                ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
                if (ret)
@@ -654,7 +653,6 @@ init_rx_adapter(uint16_t nb_ports)
                        rte_exit(EXIT_FAILURE,
                                        "Failed to add queues to Rx adapter");
 
-
                /* Producer needs to be scheduled. */
                if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) {
                        ret = rte_event_eth_rx_adapter_service_id_get(i,
@@ -680,9 +678,29 @@ init_rx_adapter(uint16_t nb_ports)
                ret = rte_event_eth_rx_adapter_start(i);
                if (ret)
                        rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
-                                       cdata.rx_adapter_id);
+                                       i);
        }
 
+       /* We already know that Tx adapter has INTERNAL port cap*/
+       ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id,
+                       &adptr_p_conf);
+       if (ret)
+               rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]",
+                               cdata.tx_adapter_id);
+
+       for (i = 0; i < nb_ports; i++) {
+               ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i,
+                               -1);
+               if (ret)
+                       rte_exit(EXIT_FAILURE,
+                                       "Failed to add queues to Tx adapter");
+       }
+
+       ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id);
+       if (ret)
+               rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed",
+                               cdata.tx_adapter_id);
+
        if (adptr_services->nb_rx_adptrs) {
                struct rte_service_spec service;
 
@@ -695,8 +713,7 @@ init_rx_adapter(uint16_t nb_ports)
                                &fdata->rxadptr_service_id);
                if (ret)
                        rte_exit(EXIT_FAILURE,
-                               "Rx adapter[%d] service register failed",
-                               cdata.rx_adapter_id);
+                               "Rx adapter service register failed");
 
                rte_service_runstate_set(fdata->rxadptr_service_id, 1);
                rte_service_component_runstate_set(fdata->rxadptr_service_id,
@@ -708,23 +725,19 @@ init_rx_adapter(uint16_t nb_ports)
                rte_free(adptr_services);
        }
 
-       if (!adptr_services->nb_rx_adptrs && fdata->cap.consumer == NULL &&
-                       (dev_info.event_dev_cap &
+       if (!adptr_services->nb_rx_adptrs && (dev_info.event_dev_cap &
                         RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))
                fdata->cap.scheduler = NULL;
-
-       if (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
-               memset(fdata->sched_core, 0,
-                               sizeof(unsigned int) * MAX_NUM_CORE);
 }
 
 static void
-worker_tx_opt_check(void)
+worker_tx_enq_opt_check(void)
 {
        int i;
        int ret;
        uint32_t cap = 0;
        uint8_t rx_needed = 0;
+       uint8_t sched_needed = 0;
        struct rte_event_dev_info eventdev_info;
 
        memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
@@ -734,32 +747,38 @@ worker_tx_opt_check(void)
                                RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
                rte_exit(EXIT_FAILURE,
                                "Event dev doesn't support all type queues\n");
+       sched_needed = !(eventdev_info.event_dev_cap &
+               RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED);
 
        RTE_ETH_FOREACH_DEV(i) {
                ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
                if (ret)
                        rte_exit(EXIT_FAILURE,
-                                       "failed to get event rx adapter "
-                                       "capabilities");
+                               "failed to get event rx adapter capabilities");
                rx_needed |=
                        !(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
        }
 
        if (cdata.worker_lcore_mask == 0 ||
                        (rx_needed && cdata.rx_lcore_mask == 0) ||
-                       (cdata.sched_lcore_mask == 0 &&
-                        !(eventdev_info.event_dev_cap &
-                                RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
+                       (sched_needed && cdata.sched_lcore_mask == 0)) {
                printf("Core part of pipeline was not assigned any cores. "
                        "This will stall the pipeline, please check core masks "
                        "(use -h for details on setting core masks):\n"
-                       "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
-                       "\n\tworkers: %"PRIu64"\n",
-                       cdata.rx_lcore_mask, cdata.tx_lcore_mask,
-                       cdata.sched_lcore_mask,
-                       cdata.worker_lcore_mask);
+                       "\trx: %"PRIu64"\n\tsched: %"PRIu64
+                       "\n\tworkers: %"PRIu64"\n", cdata.rx_lcore_mask,
+                       cdata.sched_lcore_mask, cdata.worker_lcore_mask);
                rte_exit(-1, "Fix core masks\n");
        }
+
+       if (!sched_needed)
+               memset(fdata->sched_core, 0,
+                               sizeof(unsigned int) * MAX_NUM_CORE);
+       if (!rx_needed)
+               memset(fdata->rx_core, 0,
+                               sizeof(unsigned int) * MAX_NUM_CORE);
+
+       memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
 }
 
 static worker_loop
@@ -821,18 +840,15 @@ get_worker_multi_stage(bool burst)
 }
 
 void
-set_worker_tx_setup_data(struct setup_data *caps, bool burst)
+set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst)
 {
        if (cdata.num_stages == 1)
                caps->worker = get_worker_single_stage(burst);
        else
                caps->worker = get_worker_multi_stage(burst);
 
-       memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
-
-       caps->check_opt = worker_tx_opt_check;
-       caps->consumer = NULL;
+       caps->check_opt = worker_tx_enq_opt_check;
        caps->scheduler = schedule_devices;
-       caps->evdev_setup = setup_eventdev_worker_tx;
-       caps->adptr_setup = init_rx_adapter;
+       caps->evdev_setup = setup_eventdev_worker_tx_enq;
+       caps->adptr_setup = init_adapters;
 }