New upstream version 18.11-rc1
[deb_dpdk.git] / drivers / event / sw / sw_evdev_scheduler.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2017 Intel Corporation
3  */
4
5 #include <rte_ring.h>
6 #include <rte_hash_crc.h>
7 #include <rte_event_ring.h>
8 #include "sw_evdev.h"
9 #include "iq_chunk.h"
10
11 #define SW_IQS_MASK (SW_IQS_MAX-1)
12
13 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
14  * CLZ twice is faster than caching the value due to data dependencies
15  */
16 #define PKT_MASK_TO_IQ(pkts) \
17         (__builtin_ctz(pkts | (1 << SW_IQS_MAX)))
18
19 #if SW_IQS_MAX != 4
20 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change
21 #endif
22 #define PRIO_TO_IQ(prio) (prio >> 6)
23
24 #define MAX_PER_IQ_DEQUEUE 48
25 #define FLOWID_MASK (SW_QID_NUM_FIDS-1)
26 /* use cheap bit mixing, we only need to lose a few bits */
27 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
28
29 static inline uint32_t
30 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
31                 uint32_t iq_num, unsigned int count)
32 {
33         struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */
34         struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE];
35         uint32_t nb_blocked = 0;
36         uint32_t i;
37
38         if (count > MAX_PER_IQ_DEQUEUE)
39                 count = MAX_PER_IQ_DEQUEUE;
40
41         /* This is the QID ID. The QID ID is static, hence it can be
42          * used to identify the stage of processing in history lists etc
43          */
44         uint32_t qid_id = qid->id;
45
46         iq_dequeue_burst(sw, &qid->iq[iq_num], qes, count);
47         for (i = 0; i < count; i++) {
48                 const struct rte_event *qe = &qes[i];
49                 const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id);
50                 struct sw_fid_t *fid = &qid->fids[flow_id];
51                 int cq = fid->cq;
52
53                 if (cq < 0) {
54                         uint32_t cq_idx;
55                         if (qid->cq_next_tx >= qid->cq_num_mapped_cqs)
56                                 qid->cq_next_tx = 0;
57                         cq_idx = qid->cq_next_tx++;
58
59                         cq = qid->cq_map[cq_idx];
60
61                         /* find least used */
62                         int cq_free_cnt = sw->cq_ring_space[cq];
63                         for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs;
64                                         cq_idx++) {
65                                 int test_cq = qid->cq_map[cq_idx];
66                                 int test_cq_free = sw->cq_ring_space[test_cq];
67                                 if (test_cq_free > cq_free_cnt) {
68                                         cq = test_cq;
69                                         cq_free_cnt = test_cq_free;
70                                 }
71                         }
72
73                         fid->cq = cq; /* this pins early */
74                 }
75
76                 if (sw->cq_ring_space[cq] == 0 ||
77                                 sw->ports[cq].inflights == SW_PORT_HIST_LIST) {
78                         blocked_qes[nb_blocked++] = *qe;
79                         continue;
80                 }
81
82                 struct sw_port *p = &sw->ports[cq];
83
84                 /* at this point we can queue up the packet on the cq_buf */
85                 fid->pcount++;
86                 p->cq_buf[p->cq_buf_count++] = *qe;
87                 p->inflights++;
88                 sw->cq_ring_space[cq]--;
89
90                 int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1));
91                 p->hist_list[head].fid = flow_id;
92                 p->hist_list[head].qid = qid_id;
93
94                 p->stats.tx_pkts++;
95                 qid->stats.tx_pkts++;
96                 qid->to_port[cq]++;
97
98                 /* if we just filled in the last slot, flush the buffer */
99                 if (sw->cq_ring_space[cq] == 0) {
100                         struct rte_event_ring *worker = p->cq_worker_ring;
101                         rte_event_ring_enqueue_burst(worker, p->cq_buf,
102                                         p->cq_buf_count,
103                                         &sw->cq_ring_space[cq]);
104                         p->cq_buf_count = 0;
105                 }
106         }
107         iq_put_back(sw, &qid->iq[iq_num], blocked_qes, nb_blocked);
108
109         return count - nb_blocked;
110 }
111
112 static inline uint32_t
113 sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
114                 uint32_t iq_num, unsigned int count, int keep_order)
115 {
116         uint32_t i;
117         uint32_t cq_idx = qid->cq_next_tx;
118
119         /* This is the QID ID. The QID ID is static, hence it can be
120          * used to identify the stage of processing in history lists etc
121          */
122         uint32_t qid_id = qid->id;
123
124         if (count > MAX_PER_IQ_DEQUEUE)
125                 count = MAX_PER_IQ_DEQUEUE;
126
127         if (keep_order)
128                 /* only schedule as many as we have reorder buffer entries */
129                 count = RTE_MIN(count,
130                                 rte_ring_count(qid->reorder_buffer_freelist));
131
132         for (i = 0; i < count; i++) {
133                 const struct rte_event *qe = iq_peek(&qid->iq[iq_num]);
134                 uint32_t cq_check_count = 0;
135                 uint32_t cq;
136
137                 /*
138                  *  for parallel, just send to next available CQ in round-robin
139                  * fashion. So scan for an available CQ. If all CQs are full
140                  * just return and move on to next QID
141                  */
142                 do {
143                         if (++cq_check_count > qid->cq_num_mapped_cqs)
144                                 goto exit;
145                         if (cq_idx >= qid->cq_num_mapped_cqs)
146                                 cq_idx = 0;
147                         cq = qid->cq_map[cq_idx++];
148
149                 } while (rte_event_ring_free_count(
150                                 sw->ports[cq].cq_worker_ring) == 0 ||
151                                 sw->ports[cq].inflights == SW_PORT_HIST_LIST);
152
153                 struct sw_port *p = &sw->ports[cq];
154                 if (sw->cq_ring_space[cq] == 0 ||
155                                 p->inflights == SW_PORT_HIST_LIST)
156                         break;
157
158                 sw->cq_ring_space[cq]--;
159
160                 qid->stats.tx_pkts++;
161
162                 const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
163                 p->hist_list[head].fid = SW_HASH_FLOWID(qe->flow_id);
164                 p->hist_list[head].qid = qid_id;
165
166                 if (keep_order)
167                         rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
168                                         (void *)&p->hist_list[head].rob_entry);
169
170                 sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
171                 iq_pop(sw, &qid->iq[iq_num]);
172
173                 rte_compiler_barrier();
174                 p->inflights++;
175                 p->stats.tx_pkts++;
176                 p->hist_head++;
177         }
178 exit:
179         qid->cq_next_tx = cq_idx;
180         return i;
181 }
182
183 static uint32_t
184 sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
185                 uint32_t iq_num, unsigned int count __rte_unused)
186 {
187         uint32_t cq_id = qid->cq_map[0];
188         struct sw_port *port = &sw->ports[cq_id];
189
190         /* get max burst enq size for cq_ring */
191         uint32_t count_free = sw->cq_ring_space[cq_id];
192         if (count_free == 0)
193                 return 0;
194
195         /* burst dequeue from the QID IQ ring */
196         struct sw_iq *iq = &qid->iq[iq_num];
197         uint32_t ret = iq_dequeue_burst(sw, iq,
198                         &port->cq_buf[port->cq_buf_count], count_free);
199         port->cq_buf_count += ret;
200
201         /* Update QID, Port and Total TX stats */
202         qid->stats.tx_pkts += ret;
203         port->stats.tx_pkts += ret;
204
205         /* Subtract credits from cached value */
206         sw->cq_ring_space[cq_id] -= ret;
207
208         return ret;
209 }
210
211 static uint32_t
212 sw_schedule_qid_to_cq(struct sw_evdev *sw)
213 {
214         uint32_t pkts = 0;
215         uint32_t qid_idx;
216
217         sw->sched_cq_qid_called++;
218
219         for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {
220                 struct sw_qid *qid = sw->qids_prioritized[qid_idx];
221
222                 int type = qid->type;
223                 int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
224
225                 /* zero mapped CQs indicates directed */
226                 if (iq_num >= SW_IQS_MAX || qid->cq_num_mapped_cqs == 0)
227                         continue;
228
229                 uint32_t pkts_done = 0;
230                 uint32_t count = iq_count(&qid->iq[iq_num]);
231
232                 if (count > 0) {
233                         if (type == SW_SCHED_TYPE_DIRECT)
234                                 pkts_done += sw_schedule_dir_to_cq(sw, qid,
235                                                 iq_num, count);
236                         else if (type == RTE_SCHED_TYPE_ATOMIC)
237                                 pkts_done += sw_schedule_atomic_to_cq(sw, qid,
238                                                 iq_num, count);
239                         else
240                                 pkts_done += sw_schedule_parallel_to_cq(sw, qid,
241                                                 iq_num, count,
242                                                 type == RTE_SCHED_TYPE_ORDERED);
243                 }
244
245                 /* Check if the IQ that was polled is now empty, and unset it
246                  * in the IQ mask if its empty.
247                  */
248                 int all_done = (pkts_done == count);
249
250                 qid->iq_pkt_mask &= ~(all_done << (iq_num));
251                 pkts += pkts_done;
252         }
253
254         return pkts;
255 }
256
257 /* This function will perform re-ordering of packets, and injecting into
258  * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
259  * contiguous in that array, this function accepts a "range" of QIDs to scan.
260  */
261 static uint16_t
262 sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
263 {
264         /* Perform egress reordering */
265         struct rte_event *qe;
266         uint32_t pkts_iter = 0;
267
268         for (; qid_start < qid_end; qid_start++) {
269                 struct sw_qid *qid = &sw->qids[qid_start];
270                 int i, num_entries_in_use;
271
272                 if (qid->type != RTE_SCHED_TYPE_ORDERED)
273                         continue;
274
275                 num_entries_in_use = rte_ring_free_count(
276                                         qid->reorder_buffer_freelist);
277
278                 for (i = 0; i < num_entries_in_use; i++) {
279                         struct reorder_buffer_entry *entry;
280                         int j;
281
282                         entry = &qid->reorder_buffer[qid->reorder_buffer_index];
283
284                         if (!entry->ready)
285                                 break;
286
287                         for (j = 0; j < entry->num_fragments; j++) {
288                                 uint16_t dest_qid;
289                                 uint16_t dest_iq;
290
291                                 int idx = entry->fragment_index + j;
292                                 qe = &entry->fragments[idx];
293
294                                 dest_qid = qe->queue_id;
295                                 dest_iq  = PRIO_TO_IQ(qe->priority);
296
297                                 if (dest_qid >= sw->qid_count) {
298                                         sw->stats.rx_dropped++;
299                                         continue;
300                                 }
301
302                                 pkts_iter++;
303
304                                 struct sw_qid *q = &sw->qids[dest_qid];
305                                 struct sw_iq *iq = &q->iq[dest_iq];
306
307                                 /* we checked for space above, so enqueue must
308                                  * succeed
309                                  */
310                                 iq_enqueue(sw, iq, qe);
311                                 q->iq_pkt_mask |= (1 << (dest_iq));
312                                 q->iq_pkt_count[dest_iq]++;
313                                 q->stats.rx_pkts++;
314                         }
315
316                         entry->ready = (j != entry->num_fragments);
317                         entry->num_fragments -= j;
318                         entry->fragment_index += j;
319
320                         if (!entry->ready) {
321                                 entry->fragment_index = 0;
322
323                                 rte_ring_sp_enqueue(
324                                                 qid->reorder_buffer_freelist,
325                                                 entry);
326
327                                 qid->reorder_buffer_index++;
328                                 qid->reorder_buffer_index %= qid->window_size;
329                         }
330                 }
331         }
332         return pkts_iter;
333 }
334
335 static __rte_always_inline void
336 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
337 {
338         RTE_SET_USED(sw);
339         struct rte_event_ring *worker = port->rx_worker_ring;
340         port->pp_buf_start = 0;
341         port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf,
342                         RTE_DIM(port->pp_buf), NULL);
343 }
344
345 static __rte_always_inline uint32_t
346 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
347 {
348         static struct reorder_buffer_entry dummy_rob;
349         uint32_t pkts_iter = 0;
350         struct sw_port *port = &sw->ports[port_id];
351
352         /* If shadow ring has 0 pkts, pull from worker ring */
353         if (port->pp_buf_count == 0)
354                 sw_refill_pp_buf(sw, port);
355
356         while (port->pp_buf_count) {
357                 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
358                 struct sw_hist_list_entry *hist_entry = NULL;
359                 uint8_t flags = qe->op;
360                 const uint16_t eop = !(flags & QE_FLAG_NOT_EOP);
361                 int needs_reorder = 0;
362                 /* if no-reordering, having PARTIAL == NEW */
363                 if (!allow_reorder && !eop)
364                         flags = QE_FLAG_VALID;
365
366                 /*
367                  * if we don't have space for this packet in an IQ,
368                  * then move on to next queue. Technically, for a
369                  * packet that needs reordering, we don't need to check
370                  * here, but it simplifies things not to special-case
371                  */
372                 uint32_t iq_num = PRIO_TO_IQ(qe->priority);
373                 struct sw_qid *qid = &sw->qids[qe->queue_id];
374
375                 /* now process based on flags. Note that for directed
376                  * queues, the enqueue_flush masks off all but the
377                  * valid flag. This makes FWD and PARTIAL enqueues just
378                  * NEW type, and makes DROPS no-op calls.
379                  */
380                 if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
381                         const uint32_t hist_tail = port->hist_tail &
382                                         (SW_PORT_HIST_LIST - 1);
383
384                         hist_entry = &port->hist_list[hist_tail];
385                         const uint32_t hist_qid = hist_entry->qid;
386                         const uint32_t hist_fid = hist_entry->fid;
387
388                         struct sw_fid_t *fid =
389                                 &sw->qids[hist_qid].fids[hist_fid];
390                         fid->pcount -= eop;
391                         if (fid->pcount == 0)
392                                 fid->cq = -1;
393
394                         if (allow_reorder) {
395                                 /* set reorder ready if an ordered QID */
396                                 uintptr_t rob_ptr =
397                                         (uintptr_t)hist_entry->rob_entry;
398                                 const uintptr_t valid = (rob_ptr != 0);
399                                 needs_reorder = valid;
400                                 rob_ptr |=
401                                         ((valid - 1) & (uintptr_t)&dummy_rob);
402                                 struct reorder_buffer_entry *tmp_rob_ptr =
403                                         (struct reorder_buffer_entry *)rob_ptr;
404                                 tmp_rob_ptr->ready = eop * needs_reorder;
405                         }
406
407                         port->inflights -= eop;
408                         port->hist_tail += eop;
409                 }
410                 if (flags & QE_FLAG_VALID) {
411                         port->stats.rx_pkts++;
412
413                         if (allow_reorder && needs_reorder) {
414                                 struct reorder_buffer_entry *rob_entry =
415                                                 hist_entry->rob_entry;
416
417                                 hist_entry->rob_entry = NULL;
418                                 /* Although fragmentation not currently
419                                  * supported by eventdev API, we support it
420                                  * here. Open: How do we alert the user that
421                                  * they've exceeded max frags?
422                                  */
423                                 int num_frag = rob_entry->num_fragments;
424                                 if (num_frag == SW_FRAGMENTS_MAX)
425                                         sw->stats.rx_dropped++;
426                                 else {
427                                         int idx = rob_entry->num_fragments++;
428                                         rob_entry->fragments[idx] = *qe;
429                                 }
430                                 goto end_qe;
431                         }
432
433                         /* Use the iq_num from above to push the QE
434                          * into the qid at the right priority
435                          */
436
437                         qid->iq_pkt_mask |= (1 << (iq_num));
438                         iq_enqueue(sw, &qid->iq[iq_num], qe);
439                         qid->iq_pkt_count[iq_num]++;
440                         qid->stats.rx_pkts++;
441                         pkts_iter++;
442                 }
443
444 end_qe:
445                 port->pp_buf_start++;
446                 port->pp_buf_count--;
447         } /* while (avail_qes) */
448
449         return pkts_iter;
450 }
451
452 static uint32_t
453 sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)
454 {
455         return __pull_port_lb(sw, port_id, 1);
456 }
457
458 static uint32_t
459 sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)
460 {
461         return __pull_port_lb(sw, port_id, 0);
462 }
463
464 static uint32_t
465 sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
466 {
467         uint32_t pkts_iter = 0;
468         struct sw_port *port = &sw->ports[port_id];
469
470         /* If shadow ring has 0 pkts, pull from worker ring */
471         if (port->pp_buf_count == 0)
472                 sw_refill_pp_buf(sw, port);
473
474         while (port->pp_buf_count) {
475                 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
476                 uint8_t flags = qe->op;
477
478                 if ((flags & QE_FLAG_VALID) == 0)
479                         goto end_qe;
480
481                 uint32_t iq_num = PRIO_TO_IQ(qe->priority);
482                 struct sw_qid *qid = &sw->qids[qe->queue_id];
483                 struct sw_iq *iq = &qid->iq[iq_num];
484
485                 port->stats.rx_pkts++;
486
487                 /* Use the iq_num from above to push the QE
488                  * into the qid at the right priority
489                  */
490                 qid->iq_pkt_mask |= (1 << (iq_num));
491                 iq_enqueue(sw, iq, qe);
492                 qid->iq_pkt_count[iq_num]++;
493                 qid->stats.rx_pkts++;
494                 pkts_iter++;
495
496 end_qe:
497                 port->pp_buf_start++;
498                 port->pp_buf_count--;
499         } /* while port->pp_buf_count */
500
501         return pkts_iter;
502 }
503
504 void
505 sw_event_schedule(struct rte_eventdev *dev)
506 {
507         struct sw_evdev *sw = sw_pmd_priv(dev);
508         uint32_t in_pkts, out_pkts;
509         uint32_t out_pkts_total = 0, in_pkts_total = 0;
510         int32_t sched_quanta = sw->sched_quanta;
511         uint32_t i;
512
513         sw->sched_called++;
514         if (unlikely(!sw->started))
515                 return;
516
517         do {
518                 uint32_t in_pkts_this_iteration = 0;
519
520                 /* Pull from rx_ring for ports */
521                 do {
522                         in_pkts = 0;
523                         for (i = 0; i < sw->port_count; i++) {
524                                 /* ack the unlinks in progress as done */
525                                 if (sw->ports[i].unlinks_in_progress)
526                                         sw->ports[i].unlinks_in_progress = 0;
527
528                                 if (sw->ports[i].is_directed)
529                                         in_pkts += sw_schedule_pull_port_dir(sw, i);
530                                 else if (sw->ports[i].num_ordered_qids > 0)
531                                         in_pkts += sw_schedule_pull_port_lb(sw, i);
532                                 else
533                                         in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
534                         }
535
536                         /* QID scan for re-ordered */
537                         in_pkts += sw_schedule_reorder(sw, 0,
538                                         sw->qid_count);
539                         in_pkts_this_iteration += in_pkts;
540                 } while (in_pkts > 4 &&
541                                 (int)in_pkts_this_iteration < sched_quanta);
542
543                 out_pkts = sw_schedule_qid_to_cq(sw);
544                 out_pkts_total += out_pkts;
545                 in_pkts_total += in_pkts_this_iteration;
546
547                 if (in_pkts == 0 && out_pkts == 0)
548                         break;
549         } while ((int)out_pkts_total < sched_quanta);
550
551         sw->stats.tx_pkts += out_pkts_total;
552         sw->stats.rx_pkts += in_pkts_total;
553
554         sw->sched_no_iq_enqueues += (in_pkts_total == 0);
555         sw->sched_no_cq_enqueues += (out_pkts_total == 0);
556
557         /* push all the internal buffered QEs in port->cq_ring to the
558          * worker cores: aka, do the ring transfers batched.
559          */
560         for (i = 0; i < sw->port_count; i++) {
561                 struct rte_event_ring *worker = sw->ports[i].cq_worker_ring;
562                 rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
563                                 sw->ports[i].cq_buf_count,
564                                 &sw->cq_ring_space[i]);
565                 sw->ports[i].cq_buf_count = 0;
566         }
567
568 }