New upstream version 17.11.5
[deb_dpdk.git] / drivers / event / sw / sw_evdev_scheduler.c
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
5  *
6  *   Redistribution and use in source and binary forms, with or without
7  *   modification, are permitted provided that the following conditions
8  *   are met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above copyright
13  *       notice, this list of conditions and the following disclaimer in
14  *       the documentation and/or other materials provided with the
15  *       distribution.
16  *     * Neither the name of Intel Corporation nor the names of its
17  *       contributors may be used to endorse or promote products derived
18  *       from this software without specific prior written permission.
19  *
20  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include <rte_ring.h>
34 #include <rte_hash_crc.h>
35 #include <rte_event_ring.h>
36 #include "sw_evdev.h"
37 #include "iq_ring.h"
38
39 #define SW_IQS_MASK (SW_IQS_MAX-1)
40
41 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
42  * CLZ twice is faster than caching the value due to data dependencies
43  */
44 #define PKT_MASK_TO_IQ(pkts) \
45         (__builtin_ctz(pkts | (1 << SW_IQS_MAX)))
46
47 #if SW_IQS_MAX != 4
48 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change
49 #endif
50 #define PRIO_TO_IQ(prio) (prio >> 6)
51
52 #define MAX_PER_IQ_DEQUEUE 48
53 #define FLOWID_MASK (SW_QID_NUM_FIDS-1)
54 /* use cheap bit mixing, we only need to lose a few bits */
55 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
56
57 static inline uint32_t
58 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
59                 uint32_t iq_num, unsigned int count)
60 {
61         struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */
62         struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE];
63         uint32_t nb_blocked = 0;
64         uint32_t i;
65
66         if (count > MAX_PER_IQ_DEQUEUE)
67                 count = MAX_PER_IQ_DEQUEUE;
68
69         /* This is the QID ID. The QID ID is static, hence it can be
70          * used to identify the stage of processing in history lists etc
71          */
72         uint32_t qid_id = qid->id;
73
74         iq_ring_dequeue_burst(qid->iq[iq_num], qes, count);
75         for (i = 0; i < count; i++) {
76                 const struct rte_event *qe = &qes[i];
77                 const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id);
78                 struct sw_fid_t *fid = &qid->fids[flow_id];
79                 int cq = fid->cq;
80
81                 if (cq < 0) {
82                         uint32_t cq_idx;
83                         if (qid->cq_next_tx >= qid->cq_num_mapped_cqs)
84                                 qid->cq_next_tx = 0;
85                         cq_idx = qid->cq_next_tx++;
86
87                         cq = qid->cq_map[cq_idx];
88
89                         /* find least used */
90                         int cq_free_cnt = sw->cq_ring_space[cq];
91                         for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs;
92                                         cq_idx++) {
93                                 int test_cq = qid->cq_map[cq_idx];
94                                 int test_cq_free = sw->cq_ring_space[test_cq];
95                                 if (test_cq_free > cq_free_cnt) {
96                                         cq = test_cq;
97                                         cq_free_cnt = test_cq_free;
98                                 }
99                         }
100
101                         fid->cq = cq; /* this pins early */
102                 }
103
104                 if (sw->cq_ring_space[cq] == 0 ||
105                                 sw->ports[cq].inflights == SW_PORT_HIST_LIST) {
106                         blocked_qes[nb_blocked++] = *qe;
107                         continue;
108                 }
109
110                 struct sw_port *p = &sw->ports[cq];
111
112                 /* at this point we can queue up the packet on the cq_buf */
113                 fid->pcount++;
114                 p->cq_buf[p->cq_buf_count++] = *qe;
115                 p->inflights++;
116                 sw->cq_ring_space[cq]--;
117
118                 int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1));
119                 p->hist_list[head].fid = flow_id;
120                 p->hist_list[head].qid = qid_id;
121
122                 p->stats.tx_pkts++;
123                 qid->stats.tx_pkts++;
124                 qid->to_port[cq]++;
125
126                 /* if we just filled in the last slot, flush the buffer */
127                 if (sw->cq_ring_space[cq] == 0) {
128                         struct rte_event_ring *worker = p->cq_worker_ring;
129                         rte_event_ring_enqueue_burst(worker, p->cq_buf,
130                                         p->cq_buf_count,
131                                         &sw->cq_ring_space[cq]);
132                         p->cq_buf_count = 0;
133                 }
134         }
135         iq_ring_put_back(qid->iq[iq_num], blocked_qes, nb_blocked);
136
137         return count - nb_blocked;
138 }
139
140 static inline uint32_t
141 sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
142                 uint32_t iq_num, unsigned int count, int keep_order)
143 {
144         uint32_t i;
145         uint32_t cq_idx = qid->cq_next_tx;
146
147         /* This is the QID ID. The QID ID is static, hence it can be
148          * used to identify the stage of processing in history lists etc
149          */
150         uint32_t qid_id = qid->id;
151
152         if (count > MAX_PER_IQ_DEQUEUE)
153                 count = MAX_PER_IQ_DEQUEUE;
154
155         if (keep_order)
156                 /* only schedule as many as we have reorder buffer entries */
157                 count = RTE_MIN(count,
158                                 rte_ring_count(qid->reorder_buffer_freelist));
159
160         for (i = 0; i < count; i++) {
161                 const struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]);
162                 uint32_t cq_check_count = 0;
163                 uint32_t cq;
164
165                 /*
166                  *  for parallel, just send to next available CQ in round-robin
167                  * fashion. So scan for an available CQ. If all CQs are full
168                  * just return and move on to next QID
169                  */
170                 do {
171                         if (++cq_check_count > qid->cq_num_mapped_cqs)
172                                 goto exit;
173                         if (cq_idx >= qid->cq_num_mapped_cqs)
174                                 cq_idx = 0;
175                         cq = qid->cq_map[cq_idx++];
176
177                 } while (rte_event_ring_free_count(
178                                 sw->ports[cq].cq_worker_ring) == 0 ||
179                                 sw->ports[cq].inflights == SW_PORT_HIST_LIST);
180
181                 struct sw_port *p = &sw->ports[cq];
182                 if (sw->cq_ring_space[cq] == 0 ||
183                                 p->inflights == SW_PORT_HIST_LIST)
184                         break;
185
186                 sw->cq_ring_space[cq]--;
187
188                 qid->stats.tx_pkts++;
189
190                 const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
191                 p->hist_list[head].fid = SW_HASH_FLOWID(qe->flow_id);
192                 p->hist_list[head].qid = qid_id;
193
194                 if (keep_order)
195                         rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
196                                         (void *)&p->hist_list[head].rob_entry);
197
198                 sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
199                 iq_ring_pop(qid->iq[iq_num]);
200
201                 rte_compiler_barrier();
202                 p->inflights++;
203                 p->stats.tx_pkts++;
204                 p->hist_head++;
205         }
206 exit:
207         qid->cq_next_tx = cq_idx;
208         return i;
209 }
210
211 static uint32_t
212 sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
213                 uint32_t iq_num, unsigned int count __rte_unused)
214 {
215         uint32_t cq_id = qid->cq_map[0];
216         struct sw_port *port = &sw->ports[cq_id];
217
218         /* get max burst enq size for cq_ring */
219         uint32_t count_free = sw->cq_ring_space[cq_id];
220         if (count_free == 0)
221                 return 0;
222
223         /* burst dequeue from the QID IQ ring */
224         struct iq_ring *ring = qid->iq[iq_num];
225         uint32_t ret = iq_ring_dequeue_burst(ring,
226                         &port->cq_buf[port->cq_buf_count], count_free);
227         port->cq_buf_count += ret;
228
229         /* Update QID, Port and Total TX stats */
230         qid->stats.tx_pkts += ret;
231         port->stats.tx_pkts += ret;
232
233         /* Subtract credits from cached value */
234         sw->cq_ring_space[cq_id] -= ret;
235
236         return ret;
237 }
238
239 static uint32_t
240 sw_schedule_qid_to_cq(struct sw_evdev *sw)
241 {
242         uint32_t pkts = 0;
243         uint32_t qid_idx;
244
245         sw->sched_cq_qid_called++;
246
247         for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {
248                 struct sw_qid *qid = sw->qids_prioritized[qid_idx];
249
250                 int type = qid->type;
251                 int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
252
253                 /* zero mapped CQs indicates directed */
254                 if (iq_num >= SW_IQS_MAX || qid->cq_num_mapped_cqs == 0)
255                         continue;
256
257                 uint32_t pkts_done = 0;
258                 uint32_t count = iq_ring_count(qid->iq[iq_num]);
259
260                 if (count > 0) {
261                         if (type == SW_SCHED_TYPE_DIRECT)
262                                 pkts_done += sw_schedule_dir_to_cq(sw, qid,
263                                                 iq_num, count);
264                         else if (type == RTE_SCHED_TYPE_ATOMIC)
265                                 pkts_done += sw_schedule_atomic_to_cq(sw, qid,
266                                                 iq_num, count);
267                         else
268                                 pkts_done += sw_schedule_parallel_to_cq(sw, qid,
269                                                 iq_num, count,
270                                                 type == RTE_SCHED_TYPE_ORDERED);
271                 }
272
273                 /* Check if the IQ that was polled is now empty, and unset it
274                  * in the IQ mask if its empty.
275                  */
276                 int all_done = (pkts_done == count);
277
278                 qid->iq_pkt_mask &= ~(all_done << (iq_num));
279                 pkts += pkts_done;
280         }
281
282         return pkts;
283 }
284
285 /* This function will perform re-ordering of packets, and injecting into
286  * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
287  * contiguous in that array, this function accepts a "range" of QIDs to scan.
288  */
289 static uint16_t
290 sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
291 {
292         /* Perform egress reordering */
293         struct rte_event *qe;
294         uint32_t pkts_iter = 0;
295
296         for (; qid_start < qid_end; qid_start++) {
297                 struct sw_qid *qid = &sw->qids[qid_start];
298                 int i, num_entries_in_use;
299
300                 if (qid->type != RTE_SCHED_TYPE_ORDERED)
301                         continue;
302
303                 num_entries_in_use = rte_ring_free_count(
304                                         qid->reorder_buffer_freelist);
305
306                 for (i = 0; i < num_entries_in_use; i++) {
307                         struct reorder_buffer_entry *entry;
308                         int j;
309
310                         entry = &qid->reorder_buffer[qid->reorder_buffer_index];
311
312                         if (!entry->ready)
313                                 break;
314
315                         for (j = 0; j < entry->num_fragments; j++) {
316                                 uint16_t dest_qid;
317                                 uint16_t dest_iq;
318
319                                 int idx = entry->fragment_index + j;
320                                 qe = &entry->fragments[idx];
321
322                                 dest_qid = qe->queue_id;
323                                 dest_iq  = PRIO_TO_IQ(qe->priority);
324
325                                 if (dest_qid >= sw->qid_count) {
326                                         sw->stats.rx_dropped++;
327                                         continue;
328                                 }
329
330                                 struct sw_qid *dest_qid_ptr =
331                                         &sw->qids[dest_qid];
332                                 const struct iq_ring *dest_iq_ptr =
333                                         dest_qid_ptr->iq[dest_iq];
334                                 if (iq_ring_free_count(dest_iq_ptr) == 0)
335                                         break;
336
337                                 pkts_iter++;
338
339                                 struct sw_qid *q = &sw->qids[dest_qid];
340                                 struct iq_ring *r = q->iq[dest_iq];
341
342                                 /* we checked for space above, so enqueue must
343                                  * succeed
344                                  */
345                                 iq_ring_enqueue(r, qe);
346                                 q->iq_pkt_mask |= (1 << (dest_iq));
347                                 q->iq_pkt_count[dest_iq]++;
348                                 q->stats.rx_pkts++;
349                         }
350
351                         entry->ready = (j != entry->num_fragments);
352                         entry->num_fragments -= j;
353                         entry->fragment_index += j;
354
355                         if (!entry->ready) {
356                                 entry->fragment_index = 0;
357
358                                 rte_ring_sp_enqueue(
359                                                 qid->reorder_buffer_freelist,
360                                                 entry);
361
362                                 qid->reorder_buffer_index++;
363                                 qid->reorder_buffer_index %= qid->window_size;
364                         }
365                 }
366         }
367         return pkts_iter;
368 }
369
370 static __rte_always_inline void
371 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
372 {
373         RTE_SET_USED(sw);
374         struct rte_event_ring *worker = port->rx_worker_ring;
375         port->pp_buf_start = 0;
376         port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf,
377                         RTE_DIM(port->pp_buf), NULL);
378 }
379
380 static __rte_always_inline uint32_t
381 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
382 {
383         static struct reorder_buffer_entry dummy_rob;
384         uint32_t pkts_iter = 0;
385         struct sw_port *port = &sw->ports[port_id];
386
387         /* If shadow ring has 0 pkts, pull from worker ring */
388         if (port->pp_buf_count == 0)
389                 sw_refill_pp_buf(sw, port);
390
391         while (port->pp_buf_count) {
392                 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
393                 struct sw_hist_list_entry *hist_entry = NULL;
394                 uint8_t flags = qe->op;
395                 const uint16_t eop = !(flags & QE_FLAG_NOT_EOP);
396                 int needs_reorder = 0;
397                 /* if no-reordering, having PARTIAL == NEW */
398                 if (!allow_reorder && !eop)
399                         flags = QE_FLAG_VALID;
400
401                 /*
402                  * if we don't have space for this packet in an IQ,
403                  * then move on to next queue. Technically, for a
404                  * packet that needs reordering, we don't need to check
405                  * here, but it simplifies things not to special-case
406                  */
407                 uint32_t iq_num = PRIO_TO_IQ(qe->priority);
408                 struct sw_qid *qid = &sw->qids[qe->queue_id];
409
410                 if ((flags & QE_FLAG_VALID) &&
411                                 iq_ring_free_count(qid->iq[iq_num]) == 0)
412                         break;
413
414                 /* now process based on flags. Note that for directed
415                  * queues, the enqueue_flush masks off all but the
416                  * valid flag. This makes FWD and PARTIAL enqueues just
417                  * NEW type, and makes DROPS no-op calls.
418                  */
419                 if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
420                         const uint32_t hist_tail = port->hist_tail &
421                                         (SW_PORT_HIST_LIST - 1);
422
423                         hist_entry = &port->hist_list[hist_tail];
424                         const uint32_t hist_qid = hist_entry->qid;
425                         const uint32_t hist_fid = hist_entry->fid;
426
427                         struct sw_fid_t *fid =
428                                 &sw->qids[hist_qid].fids[hist_fid];
429                         fid->pcount -= eop;
430                         if (fid->pcount == 0)
431                                 fid->cq = -1;
432
433                         if (allow_reorder) {
434                                 /* set reorder ready if an ordered QID */
435                                 uintptr_t rob_ptr =
436                                         (uintptr_t)hist_entry->rob_entry;
437                                 const uintptr_t valid = (rob_ptr != 0);
438                                 needs_reorder = valid;
439                                 rob_ptr |=
440                                         ((valid - 1) & (uintptr_t)&dummy_rob);
441                                 struct reorder_buffer_entry *tmp_rob_ptr =
442                                         (struct reorder_buffer_entry *)rob_ptr;
443                                 tmp_rob_ptr->ready = eop * needs_reorder;
444                         }
445
446                         port->inflights -= eop;
447                         port->hist_tail += eop;
448                 }
449                 if (flags & QE_FLAG_VALID) {
450                         port->stats.rx_pkts++;
451
452                         if (allow_reorder && needs_reorder) {
453                                 struct reorder_buffer_entry *rob_entry =
454                                                 hist_entry->rob_entry;
455
456                                 hist_entry->rob_entry = NULL;
457                                 /* Although fragmentation not currently
458                                  * supported by eventdev API, we support it
459                                  * here. Open: How do we alert the user that
460                                  * they've exceeded max frags?
461                                  */
462                                 int num_frag = rob_entry->num_fragments;
463                                 if (num_frag == SW_FRAGMENTS_MAX)
464                                         sw->stats.rx_dropped++;
465                                 else {
466                                         int idx = rob_entry->num_fragments++;
467                                         rob_entry->fragments[idx] = *qe;
468                                 }
469                                 goto end_qe;
470                         }
471
472                         /* Use the iq_num from above to push the QE
473                          * into the qid at the right priority
474                          */
475
476                         qid->iq_pkt_mask |= (1 << (iq_num));
477                         iq_ring_enqueue(qid->iq[iq_num], qe);
478                         qid->iq_pkt_count[iq_num]++;
479                         qid->stats.rx_pkts++;
480                         pkts_iter++;
481                 }
482
483 end_qe:
484                 port->pp_buf_start++;
485                 port->pp_buf_count--;
486         } /* while (avail_qes) */
487
488         return pkts_iter;
489 }
490
491 static uint32_t
492 sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)
493 {
494         return __pull_port_lb(sw, port_id, 1);
495 }
496
497 static uint32_t
498 sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)
499 {
500         return __pull_port_lb(sw, port_id, 0);
501 }
502
503 static uint32_t
504 sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
505 {
506         uint32_t pkts_iter = 0;
507         struct sw_port *port = &sw->ports[port_id];
508
509         /* If shadow ring has 0 pkts, pull from worker ring */
510         if (port->pp_buf_count == 0)
511                 sw_refill_pp_buf(sw, port);
512
513         while (port->pp_buf_count) {
514                 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
515                 uint8_t flags = qe->op;
516
517                 if ((flags & QE_FLAG_VALID) == 0)
518                         goto end_qe;
519
520                 uint32_t iq_num = PRIO_TO_IQ(qe->priority);
521                 struct sw_qid *qid = &sw->qids[qe->queue_id];
522                 struct iq_ring *iq_ring = qid->iq[iq_num];
523
524                 if (iq_ring_free_count(iq_ring) == 0)
525                         break; /* move to next port */
526
527                 port->stats.rx_pkts++;
528
529                 /* Use the iq_num from above to push the QE
530                  * into the qid at the right priority
531                  */
532                 qid->iq_pkt_mask |= (1 << (iq_num));
533                 iq_ring_enqueue(iq_ring, qe);
534                 qid->iq_pkt_count[iq_num]++;
535                 qid->stats.rx_pkts++;
536                 pkts_iter++;
537
538 end_qe:
539                 port->pp_buf_start++;
540                 port->pp_buf_count--;
541         } /* while port->pp_buf_count */
542
543         return pkts_iter;
544 }
545
546 void
547 sw_event_schedule(struct rte_eventdev *dev)
548 {
549         struct sw_evdev *sw = sw_pmd_priv(dev);
550         uint32_t in_pkts, out_pkts;
551         uint32_t out_pkts_total = 0, in_pkts_total = 0;
552         int32_t sched_quanta = sw->sched_quanta;
553         uint32_t i;
554
555         sw->sched_called++;
556         if (!sw->started)
557                 return;
558
559         do {
560                 uint32_t in_pkts_this_iteration = 0;
561
562                 /* Pull from rx_ring for ports */
563                 do {
564                         in_pkts = 0;
565                         for (i = 0; i < sw->port_count; i++)
566                                 if (sw->ports[i].is_directed)
567                                         in_pkts += sw_schedule_pull_port_dir(sw, i);
568                                 else if (sw->ports[i].num_ordered_qids > 0)
569                                         in_pkts += sw_schedule_pull_port_lb(sw, i);
570                                 else
571                                         in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
572
573                         /* QID scan for re-ordered */
574                         in_pkts += sw_schedule_reorder(sw, 0,
575                                         sw->qid_count);
576                         in_pkts_this_iteration += in_pkts;
577                 } while (in_pkts > 4 &&
578                                 (int)in_pkts_this_iteration < sched_quanta);
579
580                 out_pkts = 0;
581                 out_pkts += sw_schedule_qid_to_cq(sw);
582                 out_pkts_total += out_pkts;
583                 in_pkts_total += in_pkts_this_iteration;
584
585                 if (in_pkts == 0 && out_pkts == 0)
586                         break;
587         } while ((int)out_pkts_total < sched_quanta);
588
589         /* push all the internal buffered QEs in port->cq_ring to the
590          * worker cores: aka, do the ring transfers batched.
591          */
592         for (i = 0; i < sw->port_count; i++) {
593                 struct rte_event_ring *worker = sw->ports[i].cq_worker_ring;
594                 rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
595                                 sw->ports[i].cq_buf_count,
596                                 &sw->cq_ring_space[i]);
597                 sw->ports[i].cq_buf_count = 0;
598         }
599
600         sw->stats.tx_pkts += out_pkts_total;
601         sw->stats.rx_pkts += in_pkts_total;
602
603         sw->sched_no_iq_enqueues += (in_pkts_total == 0);
604         sw->sched_no_cq_enqueues += (out_pkts_total == 0);
605
606 }