New upstream version 18.11-rc1
[deb_dpdk.git] / drivers / event / sw / sw_evdev_scheduler.c
index a333a6f..cff747d 100644 (file)
@@ -1,40 +1,12 @@
-/*-
- *   BSD LICENSE
- *
- *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
- *
- *   Redistribution and use in source and binary forms, with or without
- *   modification, are permitted provided that the following conditions
- *   are met:
- *
- *     * Redistributions of source code must retain the above copyright
- *       notice, this list of conditions and the following disclaimer.
- *     * Redistributions in binary form must reproduce the above copyright
- *       notice, this list of conditions and the following disclaimer in
- *       the documentation and/or other materials provided with the
- *       distribution.
- *     * Neither the name of Intel Corporation nor the names of its
- *       contributors may be used to endorse or promote products derived
- *       from this software without specific prior written permission.
- *
- *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2016-2017 Intel Corporation
  */
 
 #include <rte_ring.h>
 #include <rte_hash_crc.h>
+#include <rte_event_ring.h>
 #include "sw_evdev.h"
-#include "iq_ring.h"
-#include "event_ring.h"
+#include "iq_chunk.h"
 
 #define SW_IQS_MASK (SW_IQS_MAX-1)
 
@@ -71,7 +43,7 @@ sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
         */
        uint32_t qid_id = qid->id;
 
-       iq_ring_dequeue_burst(qid->iq[iq_num], qes, count);
+       iq_dequeue_burst(sw, &qid->iq[iq_num], qes, count);
        for (i = 0; i < count; i++) {
                const struct rte_event *qe = &qes[i];
                const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id);
@@ -79,9 +51,11 @@ sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
                int cq = fid->cq;
 
                if (cq < 0) {
-                       uint32_t cq_idx = qid->cq_next_tx++;
-                       if (qid->cq_next_tx == qid->cq_num_mapped_cqs)
+                       uint32_t cq_idx;
+                       if (qid->cq_next_tx >= qid->cq_num_mapped_cqs)
                                qid->cq_next_tx = 0;
+                       cq_idx = qid->cq_next_tx++;
+
                        cq = qid->cq_map[cq_idx];
 
                        /* find least used */
@@ -119,17 +93,18 @@ sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
 
                p->stats.tx_pkts++;
                qid->stats.tx_pkts++;
+               qid->to_port[cq]++;
 
                /* if we just filled in the last slot, flush the buffer */
                if (sw->cq_ring_space[cq] == 0) {
-                       struct qe_ring *worker = p->cq_worker_ring;
-                       qe_ring_enqueue_burst(worker, p->cq_buf,
+                       struct rte_event_ring *worker = p->cq_worker_ring;
+                       rte_event_ring_enqueue_burst(worker, p->cq_buf,
                                        p->cq_buf_count,
                                        &sw->cq_ring_space[cq]);
                        p->cq_buf_count = 0;
                }
        }
-       iq_ring_put_back(qid->iq[iq_num], blocked_qes, nb_blocked);
+       iq_put_back(sw, &qid->iq[iq_num], blocked_qes, nb_blocked);
 
        return count - nb_blocked;
 }
@@ -155,7 +130,7 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
                                rte_ring_count(qid->reorder_buffer_freelist));
 
        for (i = 0; i < count; i++) {
-               const struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]);
+               const struct rte_event *qe = iq_peek(&qid->iq[iq_num]);
                uint32_t cq_check_count = 0;
                uint32_t cq;
 
@@ -167,10 +142,12 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
                do {
                        if (++cq_check_count > qid->cq_num_mapped_cqs)
                                goto exit;
-                       cq = qid->cq_map[cq_idx];
-                       if (++cq_idx == qid->cq_num_mapped_cqs)
+                       if (cq_idx >= qid->cq_num_mapped_cqs)
                                cq_idx = 0;
-               } while (qe_ring_free_count(sw->ports[cq].cq_worker_ring) == 0 ||
+                       cq = qid->cq_map[cq_idx++];
+
+               } while (rte_event_ring_free_count(
+                               sw->ports[cq].cq_worker_ring) == 0 ||
                                sw->ports[cq].inflights == SW_PORT_HIST_LIST);
 
                struct sw_port *p = &sw->ports[cq];
@@ -191,7 +168,7 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
                                        (void *)&p->hist_list[head].rob_entry);
 
                sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
-               iq_ring_pop(qid->iq[iq_num]);
+               iq_pop(sw, &qid->iq[iq_num]);
 
                rte_compiler_barrier();
                p->inflights++;
@@ -216,8 +193,8 @@ sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
                return 0;
 
        /* burst dequeue from the QID IQ ring */
-       struct iq_ring *ring = qid->iq[iq_num];
-       uint32_t ret = iq_ring_dequeue_burst(ring,
+       struct sw_iq *iq = &qid->iq[iq_num];
+       uint32_t ret = iq_dequeue_burst(sw, iq,
                        &port->cq_buf[port->cq_buf_count], count_free);
        port->cq_buf_count += ret;
 
@@ -246,11 +223,11 @@ sw_schedule_qid_to_cq(struct sw_evdev *sw)
                int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
 
                /* zero mapped CQs indicates directed */
-               if (iq_num >= SW_IQS_MAX)
+               if (iq_num >= SW_IQS_MAX || qid->cq_num_mapped_cqs == 0)
                        continue;
 
                uint32_t pkts_done = 0;
-               uint32_t count = iq_ring_count(qid->iq[iq_num]);
+               uint32_t count = iq_count(&qid->iq[iq_num]);
 
                if (count > 0) {
                        if (type == SW_SCHED_TYPE_DIRECT)
@@ -322,22 +299,15 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
                                        continue;
                                }
 
-                               struct sw_qid *dest_qid_ptr =
-                                       &sw->qids[dest_qid];
-                               const struct iq_ring *dest_iq_ptr =
-                                       dest_qid_ptr->iq[dest_iq];
-                               if (iq_ring_free_count(dest_iq_ptr) == 0)
-                                       break;
-
                                pkts_iter++;
 
                                struct sw_qid *q = &sw->qids[dest_qid];
-                               struct iq_ring *r = q->iq[dest_iq];
+                               struct sw_iq *iq = &q->iq[dest_iq];
 
                                /* we checked for space above, so enqueue must
                                 * succeed
                                 */
-                               iq_ring_enqueue(r, qe);
+                               iq_enqueue(sw, iq, qe);
                                q->iq_pkt_mask |= (1 << (dest_iq));
                                q->iq_pkt_count[dest_iq]++;
                                q->stats.rx_pkts++;
@@ -362,17 +332,17 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
        return pkts_iter;
 }
 
-static inline void __attribute__((always_inline))
+static __rte_always_inline void
 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
 {
        RTE_SET_USED(sw);
-       struct qe_ring *worker = port->rx_worker_ring;
+       struct rte_event_ring *worker = port->rx_worker_ring;
        port->pp_buf_start = 0;
-       port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf,
-                       RTE_DIM(port->pp_buf));
+       port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf,
+                       RTE_DIM(port->pp_buf), NULL);
 }
 
-static inline uint32_t __attribute__((always_inline))
+static __rte_always_inline uint32_t
 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
 {
        static struct reorder_buffer_entry dummy_rob;
@@ -402,10 +372,6 @@ __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
                uint32_t iq_num = PRIO_TO_IQ(qe->priority);
                struct sw_qid *qid = &sw->qids[qe->queue_id];
 
-               if ((flags & QE_FLAG_VALID) &&
-                               iq_ring_free_count(qid->iq[iq_num]) == 0)
-                       break;
-
                /* now process based on flags. Note that for directed
                 * queues, the enqueue_flush masks off all but the
                 * valid flag. This makes FWD and PARTIAL enqueues just
@@ -469,7 +435,7 @@ __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
                         */
 
                        qid->iq_pkt_mask |= (1 << (iq_num));
-                       iq_ring_enqueue(qid->iq[iq_num], qe);
+                       iq_enqueue(sw, &qid->iq[iq_num], qe);
                        qid->iq_pkt_count[iq_num]++;
                        qid->stats.rx_pkts++;
                        pkts_iter++;
@@ -514,10 +480,7 @@ sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
 
                uint32_t iq_num = PRIO_TO_IQ(qe->priority);
                struct sw_qid *qid = &sw->qids[qe->queue_id];
-               struct iq_ring *iq_ring = qid->iq[iq_num];
-
-               if (iq_ring_free_count(iq_ring) == 0)
-                       break; /* move to next port */
+               struct sw_iq *iq = &qid->iq[iq_num];
 
                port->stats.rx_pkts++;
 
@@ -525,7 +488,7 @@ sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
                 * into the qid at the right priority
                 */
                qid->iq_pkt_mask |= (1 << (iq_num));
-               iq_ring_enqueue(iq_ring, qe);
+               iq_enqueue(sw, iq, qe);
                qid->iq_pkt_count[iq_num]++;
                qid->stats.rx_pkts++;
                pkts_iter++;
@@ -548,7 +511,7 @@ sw_event_schedule(struct rte_eventdev *dev)
        uint32_t i;
 
        sw->sched_called++;
-       if (!sw->started)
+       if (unlikely(!sw->started))
                return;
 
        do {
@@ -557,13 +520,18 @@ sw_event_schedule(struct rte_eventdev *dev)
                /* Pull from rx_ring for ports */
                do {
                        in_pkts = 0;
-                       for (i = 0; i < sw->port_count; i++)
+                       for (i = 0; i < sw->port_count; i++) {
+                               /* ack the unlinks in progress as done */
+                               if (sw->ports[i].unlinks_in_progress)
+                                       sw->ports[i].unlinks_in_progress = 0;
+
                                if (sw->ports[i].is_directed)
                                        in_pkts += sw_schedule_pull_port_dir(sw, i);
                                else if (sw->ports[i].num_ordered_qids > 0)
                                        in_pkts += sw_schedule_pull_port_lb(sw, i);
                                else
                                        in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
+                       }
 
                        /* QID scan for re-ordered */
                        in_pkts += sw_schedule_reorder(sw, 0,
@@ -572,8 +540,7 @@ sw_event_schedule(struct rte_eventdev *dev)
                } while (in_pkts > 4 &&
                                (int)in_pkts_this_iteration < sched_quanta);
 
-               out_pkts = 0;
-               out_pkts += sw_schedule_qid_to_cq(sw);
+               out_pkts = sw_schedule_qid_to_cq(sw);
                out_pkts_total += out_pkts;
                in_pkts_total += in_pkts_this_iteration;
 
@@ -581,21 +548,21 @@ sw_event_schedule(struct rte_eventdev *dev)
                        break;
        } while ((int)out_pkts_total < sched_quanta);
 
+       sw->stats.tx_pkts += out_pkts_total;
+       sw->stats.rx_pkts += in_pkts_total;
+
+       sw->sched_no_iq_enqueues += (in_pkts_total == 0);
+       sw->sched_no_cq_enqueues += (out_pkts_total == 0);
+
        /* push all the internal buffered QEs in port->cq_ring to the
         * worker cores: aka, do the ring transfers batched.
         */
        for (i = 0; i < sw->port_count; i++) {
-               struct qe_ring *worker = sw->ports[i].cq_worker_ring;
-               qe_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
+               struct rte_event_ring *worker = sw->ports[i].cq_worker_ring;
+               rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
                                sw->ports[i].cq_buf_count,
                                &sw->cq_ring_space[i]);
                sw->ports[i].cq_buf_count = 0;
        }
 
-       sw->stats.tx_pkts += out_pkts_total;
-       sw->stats.rx_pkts += in_pkts_total;
-
-       sw->sched_no_iq_enqueues += (in_pkts_total == 0);
-       sw->sched_no_cq_enqueues += (out_pkts_total == 0);
-
 }