New upstream version 18.08
[deb_dpdk.git] / drivers / event / sw / sw_evdev_scheduler.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2017 Intel Corporation
3  */
4
5 #include <rte_ring.h>
6 #include <rte_hash_crc.h>
7 #include <rte_event_ring.h>
8 #include "sw_evdev.h"
9 #include "iq_chunk.h"
10
11 #define SW_IQS_MASK (SW_IQS_MAX-1)
12
13 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
14  * CLZ twice is faster than caching the value due to data dependencies
15  */
16 #define PKT_MASK_TO_IQ(pkts) \
17         (__builtin_ctz(pkts | (1 << SW_IQS_MAX)))
18
19 #if SW_IQS_MAX != 4
20 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change
21 #endif
22 #define PRIO_TO_IQ(prio) (prio >> 6)
23
24 #define MAX_PER_IQ_DEQUEUE 48
25 #define FLOWID_MASK (SW_QID_NUM_FIDS-1)
26 /* use cheap bit mixing, we only need to lose a few bits */
27 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
28
29 static inline uint32_t
30 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
31                 uint32_t iq_num, unsigned int count)
32 {
33         struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */
34         struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE];
35         uint32_t nb_blocked = 0;
36         uint32_t i;
37
38         if (count > MAX_PER_IQ_DEQUEUE)
39                 count = MAX_PER_IQ_DEQUEUE;
40
41         /* This is the QID ID. The QID ID is static, hence it can be
42          * used to identify the stage of processing in history lists etc
43          */
44         uint32_t qid_id = qid->id;
45
46         iq_dequeue_burst(sw, &qid->iq[iq_num], qes, count);
47         for (i = 0; i < count; i++) {
48                 const struct rte_event *qe = &qes[i];
49                 const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id);
50                 struct sw_fid_t *fid = &qid->fids[flow_id];
51                 int cq = fid->cq;
52
53                 if (cq < 0) {
54                         uint32_t cq_idx = qid->cq_next_tx++;
55                         if (qid->cq_next_tx == qid->cq_num_mapped_cqs)
56                                 qid->cq_next_tx = 0;
57                         cq = qid->cq_map[cq_idx];
58
59                         /* find least used */
60                         int cq_free_cnt = sw->cq_ring_space[cq];
61                         for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs;
62                                         cq_idx++) {
63                                 int test_cq = qid->cq_map[cq_idx];
64                                 int test_cq_free = sw->cq_ring_space[test_cq];
65                                 if (test_cq_free > cq_free_cnt) {
66                                         cq = test_cq;
67                                         cq_free_cnt = test_cq_free;
68                                 }
69                         }
70
71                         fid->cq = cq; /* this pins early */
72                 }
73
74                 if (sw->cq_ring_space[cq] == 0 ||
75                                 sw->ports[cq].inflights == SW_PORT_HIST_LIST) {
76                         blocked_qes[nb_blocked++] = *qe;
77                         continue;
78                 }
79
80                 struct sw_port *p = &sw->ports[cq];
81
82                 /* at this point we can queue up the packet on the cq_buf */
83                 fid->pcount++;
84                 p->cq_buf[p->cq_buf_count++] = *qe;
85                 p->inflights++;
86                 sw->cq_ring_space[cq]--;
87
88                 int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1));
89                 p->hist_list[head].fid = flow_id;
90                 p->hist_list[head].qid = qid_id;
91
92                 p->stats.tx_pkts++;
93                 qid->stats.tx_pkts++;
94                 qid->to_port[cq]++;
95
96                 /* if we just filled in the last slot, flush the buffer */
97                 if (sw->cq_ring_space[cq] == 0) {
98                         struct rte_event_ring *worker = p->cq_worker_ring;
99                         rte_event_ring_enqueue_burst(worker, p->cq_buf,
100                                         p->cq_buf_count,
101                                         &sw->cq_ring_space[cq]);
102                         p->cq_buf_count = 0;
103                 }
104         }
105         iq_put_back(sw, &qid->iq[iq_num], blocked_qes, nb_blocked);
106
107         return count - nb_blocked;
108 }
109
110 static inline uint32_t
111 sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
112                 uint32_t iq_num, unsigned int count, int keep_order)
113 {
114         uint32_t i;
115         uint32_t cq_idx = qid->cq_next_tx;
116
117         /* This is the QID ID. The QID ID is static, hence it can be
118          * used to identify the stage of processing in history lists etc
119          */
120         uint32_t qid_id = qid->id;
121
122         if (count > MAX_PER_IQ_DEQUEUE)
123                 count = MAX_PER_IQ_DEQUEUE;
124
125         if (keep_order)
126                 /* only schedule as many as we have reorder buffer entries */
127                 count = RTE_MIN(count,
128                                 rte_ring_count(qid->reorder_buffer_freelist));
129
130         for (i = 0; i < count; i++) {
131                 const struct rte_event *qe = iq_peek(&qid->iq[iq_num]);
132                 uint32_t cq_check_count = 0;
133                 uint32_t cq;
134
135                 /*
136                  *  for parallel, just send to next available CQ in round-robin
137                  * fashion. So scan for an available CQ. If all CQs are full
138                  * just return and move on to next QID
139                  */
140                 do {
141                         if (++cq_check_count > qid->cq_num_mapped_cqs)
142                                 goto exit;
143                         cq = qid->cq_map[cq_idx];
144                         if (++cq_idx == qid->cq_num_mapped_cqs)
145                                 cq_idx = 0;
146                 } while (rte_event_ring_free_count(
147                                 sw->ports[cq].cq_worker_ring) == 0 ||
148                                 sw->ports[cq].inflights == SW_PORT_HIST_LIST);
149
150                 struct sw_port *p = &sw->ports[cq];
151                 if (sw->cq_ring_space[cq] == 0 ||
152                                 p->inflights == SW_PORT_HIST_LIST)
153                         break;
154
155                 sw->cq_ring_space[cq]--;
156
157                 qid->stats.tx_pkts++;
158
159                 const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
160                 p->hist_list[head].fid = SW_HASH_FLOWID(qe->flow_id);
161                 p->hist_list[head].qid = qid_id;
162
163                 if (keep_order)
164                         rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
165                                         (void *)&p->hist_list[head].rob_entry);
166
167                 sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
168                 iq_pop(sw, &qid->iq[iq_num]);
169
170                 rte_compiler_barrier();
171                 p->inflights++;
172                 p->stats.tx_pkts++;
173                 p->hist_head++;
174         }
175 exit:
176         qid->cq_next_tx = cq_idx;
177         return i;
178 }
179
180 static uint32_t
181 sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
182                 uint32_t iq_num, unsigned int count __rte_unused)
183 {
184         uint32_t cq_id = qid->cq_map[0];
185         struct sw_port *port = &sw->ports[cq_id];
186
187         /* get max burst enq size for cq_ring */
188         uint32_t count_free = sw->cq_ring_space[cq_id];
189         if (count_free == 0)
190                 return 0;
191
192         /* burst dequeue from the QID IQ ring */
193         struct sw_iq *iq = &qid->iq[iq_num];
194         uint32_t ret = iq_dequeue_burst(sw, iq,
195                         &port->cq_buf[port->cq_buf_count], count_free);
196         port->cq_buf_count += ret;
197
198         /* Update QID, Port and Total TX stats */
199         qid->stats.tx_pkts += ret;
200         port->stats.tx_pkts += ret;
201
202         /* Subtract credits from cached value */
203         sw->cq_ring_space[cq_id] -= ret;
204
205         return ret;
206 }
207
208 static uint32_t
209 sw_schedule_qid_to_cq(struct sw_evdev *sw)
210 {
211         uint32_t pkts = 0;
212         uint32_t qid_idx;
213
214         sw->sched_cq_qid_called++;
215
216         for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {
217                 struct sw_qid *qid = sw->qids_prioritized[qid_idx];
218
219                 int type = qid->type;
220                 int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
221
222                 /* zero mapped CQs indicates directed */
223                 if (iq_num >= SW_IQS_MAX)
224                         continue;
225
226                 uint32_t pkts_done = 0;
227                 uint32_t count = iq_count(&qid->iq[iq_num]);
228
229                 if (count > 0) {
230                         if (type == SW_SCHED_TYPE_DIRECT)
231                                 pkts_done += sw_schedule_dir_to_cq(sw, qid,
232                                                 iq_num, count);
233                         else if (type == RTE_SCHED_TYPE_ATOMIC)
234                                 pkts_done += sw_schedule_atomic_to_cq(sw, qid,
235                                                 iq_num, count);
236                         else
237                                 pkts_done += sw_schedule_parallel_to_cq(sw, qid,
238                                                 iq_num, count,
239                                                 type == RTE_SCHED_TYPE_ORDERED);
240                 }
241
242                 /* Check if the IQ that was polled is now empty, and unset it
243                  * in the IQ mask if its empty.
244                  */
245                 int all_done = (pkts_done == count);
246
247                 qid->iq_pkt_mask &= ~(all_done << (iq_num));
248                 pkts += pkts_done;
249         }
250
251         return pkts;
252 }
253
254 /* This function will perform re-ordering of packets, and injecting into
255  * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
256  * contiguous in that array, this function accepts a "range" of QIDs to scan.
257  */
258 static uint16_t
259 sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
260 {
261         /* Perform egress reordering */
262         struct rte_event *qe;
263         uint32_t pkts_iter = 0;
264
265         for (; qid_start < qid_end; qid_start++) {
266                 struct sw_qid *qid = &sw->qids[qid_start];
267                 int i, num_entries_in_use;
268
269                 if (qid->type != RTE_SCHED_TYPE_ORDERED)
270                         continue;
271
272                 num_entries_in_use = rte_ring_free_count(
273                                         qid->reorder_buffer_freelist);
274
275                 for (i = 0; i < num_entries_in_use; i++) {
276                         struct reorder_buffer_entry *entry;
277                         int j;
278
279                         entry = &qid->reorder_buffer[qid->reorder_buffer_index];
280
281                         if (!entry->ready)
282                                 break;
283
284                         for (j = 0; j < entry->num_fragments; j++) {
285                                 uint16_t dest_qid;
286                                 uint16_t dest_iq;
287
288                                 int idx = entry->fragment_index + j;
289                                 qe = &entry->fragments[idx];
290
291                                 dest_qid = qe->queue_id;
292                                 dest_iq  = PRIO_TO_IQ(qe->priority);
293
294                                 if (dest_qid >= sw->qid_count) {
295                                         sw->stats.rx_dropped++;
296                                         continue;
297                                 }
298
299                                 pkts_iter++;
300
301                                 struct sw_qid *q = &sw->qids[dest_qid];
302                                 struct sw_iq *iq = &q->iq[dest_iq];
303
304                                 /* we checked for space above, so enqueue must
305                                  * succeed
306                                  */
307                                 iq_enqueue(sw, iq, qe);
308                                 q->iq_pkt_mask |= (1 << (dest_iq));
309                                 q->iq_pkt_count[dest_iq]++;
310                                 q->stats.rx_pkts++;
311                         }
312
313                         entry->ready = (j != entry->num_fragments);
314                         entry->num_fragments -= j;
315                         entry->fragment_index += j;
316
317                         if (!entry->ready) {
318                                 entry->fragment_index = 0;
319
320                                 rte_ring_sp_enqueue(
321                                                 qid->reorder_buffer_freelist,
322                                                 entry);
323
324                                 qid->reorder_buffer_index++;
325                                 qid->reorder_buffer_index %= qid->window_size;
326                         }
327                 }
328         }
329         return pkts_iter;
330 }
331
332 static __rte_always_inline void
333 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
334 {
335         RTE_SET_USED(sw);
336         struct rte_event_ring *worker = port->rx_worker_ring;
337         port->pp_buf_start = 0;
338         port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf,
339                         RTE_DIM(port->pp_buf), NULL);
340 }
341
342 static __rte_always_inline uint32_t
343 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
344 {
345         static struct reorder_buffer_entry dummy_rob;
346         uint32_t pkts_iter = 0;
347         struct sw_port *port = &sw->ports[port_id];
348
349         /* If shadow ring has 0 pkts, pull from worker ring */
350         if (port->pp_buf_count == 0)
351                 sw_refill_pp_buf(sw, port);
352
353         while (port->pp_buf_count) {
354                 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
355                 struct sw_hist_list_entry *hist_entry = NULL;
356                 uint8_t flags = qe->op;
357                 const uint16_t eop = !(flags & QE_FLAG_NOT_EOP);
358                 int needs_reorder = 0;
359                 /* if no-reordering, having PARTIAL == NEW */
360                 if (!allow_reorder && !eop)
361                         flags = QE_FLAG_VALID;
362
363                 /*
364                  * if we don't have space for this packet in an IQ,
365                  * then move on to next queue. Technically, for a
366                  * packet that needs reordering, we don't need to check
367                  * here, but it simplifies things not to special-case
368                  */
369                 uint32_t iq_num = PRIO_TO_IQ(qe->priority);
370                 struct sw_qid *qid = &sw->qids[qe->queue_id];
371
372                 /* now process based on flags. Note that for directed
373                  * queues, the enqueue_flush masks off all but the
374                  * valid flag. This makes FWD and PARTIAL enqueues just
375                  * NEW type, and makes DROPS no-op calls.
376                  */
377                 if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
378                         const uint32_t hist_tail = port->hist_tail &
379                                         (SW_PORT_HIST_LIST - 1);
380
381                         hist_entry = &port->hist_list[hist_tail];
382                         const uint32_t hist_qid = hist_entry->qid;
383                         const uint32_t hist_fid = hist_entry->fid;
384
385                         struct sw_fid_t *fid =
386                                 &sw->qids[hist_qid].fids[hist_fid];
387                         fid->pcount -= eop;
388                         if (fid->pcount == 0)
389                                 fid->cq = -1;
390
391                         if (allow_reorder) {
392                                 /* set reorder ready if an ordered QID */
393                                 uintptr_t rob_ptr =
394                                         (uintptr_t)hist_entry->rob_entry;
395                                 const uintptr_t valid = (rob_ptr != 0);
396                                 needs_reorder = valid;
397                                 rob_ptr |=
398                                         ((valid - 1) & (uintptr_t)&dummy_rob);
399                                 struct reorder_buffer_entry *tmp_rob_ptr =
400                                         (struct reorder_buffer_entry *)rob_ptr;
401                                 tmp_rob_ptr->ready = eop * needs_reorder;
402                         }
403
404                         port->inflights -= eop;
405                         port->hist_tail += eop;
406                 }
407                 if (flags & QE_FLAG_VALID) {
408                         port->stats.rx_pkts++;
409
410                         if (allow_reorder && needs_reorder) {
411                                 struct reorder_buffer_entry *rob_entry =
412                                                 hist_entry->rob_entry;
413
414                                 hist_entry->rob_entry = NULL;
415                                 /* Although fragmentation not currently
416                                  * supported by eventdev API, we support it
417                                  * here. Open: How do we alert the user that
418                                  * they've exceeded max frags?
419                                  */
420                                 int num_frag = rob_entry->num_fragments;
421                                 if (num_frag == SW_FRAGMENTS_MAX)
422                                         sw->stats.rx_dropped++;
423                                 else {
424                                         int idx = rob_entry->num_fragments++;
425                                         rob_entry->fragments[idx] = *qe;
426                                 }
427                                 goto end_qe;
428                         }
429
430                         /* Use the iq_num from above to push the QE
431                          * into the qid at the right priority
432                          */
433
434                         qid->iq_pkt_mask |= (1 << (iq_num));
435                         iq_enqueue(sw, &qid->iq[iq_num], qe);
436                         qid->iq_pkt_count[iq_num]++;
437                         qid->stats.rx_pkts++;
438                         pkts_iter++;
439                 }
440
441 end_qe:
442                 port->pp_buf_start++;
443                 port->pp_buf_count--;
444         } /* while (avail_qes) */
445
446         return pkts_iter;
447 }
448
449 static uint32_t
450 sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)
451 {
452         return __pull_port_lb(sw, port_id, 1);
453 }
454
455 static uint32_t
456 sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)
457 {
458         return __pull_port_lb(sw, port_id, 0);
459 }
460
461 static uint32_t
462 sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
463 {
464         uint32_t pkts_iter = 0;
465         struct sw_port *port = &sw->ports[port_id];
466
467         /* If shadow ring has 0 pkts, pull from worker ring */
468         if (port->pp_buf_count == 0)
469                 sw_refill_pp_buf(sw, port);
470
471         while (port->pp_buf_count) {
472                 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
473                 uint8_t flags = qe->op;
474
475                 if ((flags & QE_FLAG_VALID) == 0)
476                         goto end_qe;
477
478                 uint32_t iq_num = PRIO_TO_IQ(qe->priority);
479                 struct sw_qid *qid = &sw->qids[qe->queue_id];
480                 struct sw_iq *iq = &qid->iq[iq_num];
481
482                 port->stats.rx_pkts++;
483
484                 /* Use the iq_num from above to push the QE
485                  * into the qid at the right priority
486                  */
487                 qid->iq_pkt_mask |= (1 << (iq_num));
488                 iq_enqueue(sw, iq, qe);
489                 qid->iq_pkt_count[iq_num]++;
490                 qid->stats.rx_pkts++;
491                 pkts_iter++;
492
493 end_qe:
494                 port->pp_buf_start++;
495                 port->pp_buf_count--;
496         } /* while port->pp_buf_count */
497
498         return pkts_iter;
499 }
500
501 void
502 sw_event_schedule(struct rte_eventdev *dev)
503 {
504         struct sw_evdev *sw = sw_pmd_priv(dev);
505         uint32_t in_pkts, out_pkts;
506         uint32_t out_pkts_total = 0, in_pkts_total = 0;
507         int32_t sched_quanta = sw->sched_quanta;
508         uint32_t i;
509
510         sw->sched_called++;
511         if (unlikely(!sw->started))
512                 return;
513
514         do {
515                 uint32_t in_pkts_this_iteration = 0;
516
517                 /* Pull from rx_ring for ports */
518                 do {
519                         in_pkts = 0;
520                         for (i = 0; i < sw->port_count; i++)
521                                 if (sw->ports[i].is_directed)
522                                         in_pkts += sw_schedule_pull_port_dir(sw, i);
523                                 else if (sw->ports[i].num_ordered_qids > 0)
524                                         in_pkts += sw_schedule_pull_port_lb(sw, i);
525                                 else
526                                         in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
527
528                         /* QID scan for re-ordered */
529                         in_pkts += sw_schedule_reorder(sw, 0,
530                                         sw->qid_count);
531                         in_pkts_this_iteration += in_pkts;
532                 } while (in_pkts > 4 &&
533                                 (int)in_pkts_this_iteration < sched_quanta);
534
535                 out_pkts = sw_schedule_qid_to_cq(sw);
536                 out_pkts_total += out_pkts;
537                 in_pkts_total += in_pkts_this_iteration;
538
539                 if (in_pkts == 0 && out_pkts == 0)
540                         break;
541         } while ((int)out_pkts_total < sched_quanta);
542
543         sw->stats.tx_pkts += out_pkts_total;
544         sw->stats.rx_pkts += in_pkts_total;
545
546         sw->sched_no_iq_enqueues += (in_pkts_total == 0);
547         sw->sched_no_cq_enqueues += (out_pkts_total == 0);
548
549         /* push all the internal buffered QEs in port->cq_ring to the
550          * worker cores: aka, do the ring transfers batched.
551          */
552         for (i = 0; i < sw->port_count; i++) {
553                 struct rte_event_ring *worker = sw->ports[i].cq_worker_ring;
554                 rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
555                                 sw->ports[i].cq_buf_count,
556                                 &sw->cq_ring_space[i]);
557                 sw->ports[i].cq_buf_count = 0;
558         }
559
560 }