New upstream version 18.11-rc1
[deb_dpdk.git] / drivers / event / dsw / dsw_event.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Ericsson AB
3  */
4
5 #include "dsw_evdev.h"
6
7 #ifdef DSW_SORT_DEQUEUED
8 #include "dsw_sort.h"
9 #endif
10
11 #include <stdbool.h>
12 #include <string.h>
13
14 #include <rte_atomic.h>
15 #include <rte_cycles.h>
16 #include <rte_memcpy.h>
17 #include <rte_random.h>
18
19 static bool
20 dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
21                          int32_t credits)
22 {
23         int32_t inflight_credits = port->inflight_credits;
24         int32_t missing_credits = credits - inflight_credits;
25         int32_t total_on_loan;
26         int32_t available;
27         int32_t acquired_credits;
28         int32_t new_total_on_loan;
29
30         if (likely(missing_credits <= 0)) {
31                 port->inflight_credits -= credits;
32                 return true;
33         }
34
35         total_on_loan = rte_atomic32_read(&dsw->credits_on_loan);
36         available = dsw->max_inflight - total_on_loan;
37         acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
38
39         if (available < acquired_credits)
40                 return false;
41
42         /* This is a race, no locks are involved, and thus some other
43          * thread can allocate tokens in between the check and the
44          * allocation.
45          */
46         new_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,
47                                                     acquired_credits);
48
49         if (unlikely(new_total_on_loan > dsw->max_inflight)) {
50                 /* Some other port took the last credits */
51                 rte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);
52                 return false;
53         }
54
55         DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
56                         acquired_credits);
57
58         port->inflight_credits += acquired_credits;
59         port->inflight_credits -= credits;
60
61         return true;
62 }
63
64 static void
65 dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
66                         int32_t credits)
67 {
68         port->inflight_credits += credits;
69
70         if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
71                 int32_t leave_credits = DSW_PORT_MIN_CREDITS;
72                 int32_t return_credits =
73                         port->inflight_credits - leave_credits;
74
75                 port->inflight_credits = leave_credits;
76
77                 rte_atomic32_sub(&dsw->credits_on_loan, return_credits);
78
79                 DSW_LOG_DP_PORT(DEBUG, port->id,
80                                 "Returned %d tokens to pool.\n",
81                                 return_credits);
82         }
83 }
84
85 static void
86 dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
87                        uint16_t num_forward, uint16_t num_release)
88 {
89         port->new_enqueued += num_new;
90         port->forward_enqueued += num_forward;
91         port->release_enqueued += num_release;
92 }
93
94 static void
95 dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
96 {
97         source_port->queue_enqueued[queue_id]++;
98 }
99
100 static void
101 dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
102 {
103         port->dequeued += num;
104 }
105
106 static void
107 dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
108 {
109         source_port->queue_dequeued[queue_id]++;
110 }
111
112 static void
113 dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
114 {
115         if (dequeued > 0 && port->busy_start == 0)
116                 /* work period begins */
117                 port->busy_start = rte_get_timer_cycles();
118         else if (dequeued == 0 && port->busy_start > 0) {
119                 /* work period ends */
120                 uint64_t work_period =
121                         rte_get_timer_cycles() - port->busy_start;
122                 port->busy_cycles += work_period;
123                 port->busy_start = 0;
124         }
125 }
126
127 static int16_t
128 dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
129 {
130         uint64_t passed = now - port->measurement_start;
131         uint64_t busy_cycles = port->busy_cycles;
132
133         if (port->busy_start > 0) {
134                 busy_cycles += (now - port->busy_start);
135                 port->busy_start = now;
136         }
137
138         int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
139
140         port->measurement_start = now;
141         port->busy_cycles = 0;
142
143         port->total_busy_cycles += busy_cycles;
144
145         return load;
146 }
147
148 static void
149 dsw_port_load_update(struct dsw_port *port, uint64_t now)
150 {
151         int16_t old_load;
152         int16_t period_load;
153         int16_t new_load;
154
155         old_load = rte_atomic16_read(&port->load);
156
157         period_load = dsw_port_load_close_period(port, now);
158
159         new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
160                 (DSW_OLD_LOAD_WEIGHT+1);
161
162         rte_atomic16_set(&port->load, new_load);
163 }
164
165 static void
166 dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
167 {
168         if (now < port->next_load_update)
169                 return;
170
171         port->next_load_update = now + port->load_update_interval;
172
173         dsw_port_load_update(port, now);
174 }
175
176 static void
177 dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
178 {
179         void *raw_msg;
180
181         memcpy(&raw_msg, msg, sizeof(*msg));
182
183         /* there's always room on the ring */
184         while (rte_ring_enqueue(port->ctl_in_ring, raw_msg) != 0)
185                 rte_pause();
186 }
187
188 static int
189 dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
190 {
191         void *raw_msg;
192         int rc;
193
194         rc = rte_ring_dequeue(port->ctl_in_ring, &raw_msg);
195
196         if (rc == 0)
197                 memcpy(msg, &raw_msg, sizeof(*msg));
198
199         return rc;
200 }
201
202 static void
203 dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
204                        uint8_t type, uint8_t queue_id, uint16_t flow_hash)
205 {
206         uint16_t port_id;
207         struct dsw_ctl_msg msg = {
208                 .type = type,
209                 .originating_port_id = source_port->id,
210                 .queue_id = queue_id,
211                 .flow_hash = flow_hash
212         };
213
214         for (port_id = 0; port_id < dsw->num_ports; port_id++)
215                 if (port_id != source_port->id)
216                         dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
217 }
218
219 static bool
220 dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
221                         uint16_t flow_hash)
222 {
223         uint16_t i;
224
225         for (i = 0; i < port->paused_flows_len; i++) {
226                 struct dsw_queue_flow *qf = &port->paused_flows[i];
227                 if (qf->queue_id == queue_id &&
228                     qf->flow_hash == flow_hash)
229                         return true;
230         }
231         return false;
232 }
233
234 static void
235 dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id,
236                          uint16_t paused_flow_hash)
237 {
238         port->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) {
239                 .queue_id = queue_id,
240                 .flow_hash = paused_flow_hash
241         };
242         port->paused_flows_len++;
243 }
244
245 static void
246 dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id,
247                             uint16_t paused_flow_hash)
248 {
249         uint16_t i;
250
251         for (i = 0; i < port->paused_flows_len; i++) {
252                 struct dsw_queue_flow *qf = &port->paused_flows[i];
253
254                 if (qf->queue_id == queue_id &&
255                     qf->flow_hash == paused_flow_hash) {
256                         uint16_t last_idx = port->paused_flows_len-1;
257                         if (i != last_idx)
258                                 port->paused_flows[i] =
259                                         port->paused_flows[last_idx];
260                         port->paused_flows_len--;
261                         break;
262                 }
263         }
264 }
265
266 static void
267 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
268
269 static void
270 dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
271                            uint8_t originating_port_id, uint8_t queue_id,
272                            uint16_t paused_flow_hash)
273 {
274         struct dsw_ctl_msg cfm = {
275                 .type = DSW_CTL_CFM,
276                 .originating_port_id = port->id,
277                 .queue_id = queue_id,
278                 .flow_hash = paused_flow_hash
279         };
280
281         DSW_LOG_DP_PORT(DEBUG, port->id, "Pausing queue_id %d flow_hash %d.\n",
282                         queue_id, paused_flow_hash);
283
284         /* There might be already-scheduled events belonging to the
285          * paused flow in the output buffers.
286          */
287         dsw_port_flush_out_buffers(dsw, port);
288
289         dsw_port_add_paused_flow(port, queue_id, paused_flow_hash);
290
291         /* Make sure any stores to the original port's in_ring is seen
292          * before the ctl message.
293          */
294         rte_smp_wmb();
295
296         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
297 }
298
299 static void
300 dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids,
301                           uint8_t exclude_port_id, int16_t *port_loads,
302                           uint8_t *target_port_id, int16_t *target_load)
303 {
304         int16_t candidate_port_id = -1;
305         int16_t candidate_load = DSW_MAX_LOAD;
306         uint16_t i;
307
308         for (i = 0; i < num_port_ids; i++) {
309                 uint8_t port_id = port_ids[i];
310                 if (port_id != exclude_port_id) {
311                         int16_t load = port_loads[port_id];
312                         if (candidate_port_id == -1 ||
313                             load < candidate_load) {
314                                 candidate_port_id = port_id;
315                                 candidate_load = load;
316                         }
317                 }
318         }
319         *target_port_id = candidate_port_id;
320         *target_load = candidate_load;
321 }
322
323 struct dsw_queue_flow_burst {
324         struct dsw_queue_flow queue_flow;
325         uint16_t count;
326 };
327
328 static inline int
329 dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b)
330 {
331         const struct dsw_queue_flow_burst *burst_a = v_burst_a;
332         const struct dsw_queue_flow_burst *burst_b = v_burst_b;
333
334         int a_count = burst_a->count;
335         int b_count = burst_b->count;
336
337         return a_count - b_count;
338 }
339
340 #define DSW_QF_TO_INT(_qf)                                      \
341         ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
342
343 static inline int
344 dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
345 {
346         const struct dsw_queue_flow *qf_a = v_qf_a;
347         const struct dsw_queue_flow *qf_b = v_qf_b;
348
349         return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
350 }
351
352 static uint16_t
353 dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
354                        struct dsw_queue_flow_burst *bursts)
355 {
356         uint16_t i;
357         struct dsw_queue_flow_burst *current_burst = NULL;
358         uint16_t num_bursts = 0;
359
360         /* We don't need the stable property, and the list is likely
361          * large enough for qsort() to outperform dsw_stable_sort(),
362          * so we use qsort() here.
363          */
364         qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
365
366         /* arrange the (now-consecutive) events into bursts */
367         for (i = 0; i < qfs_len; i++) {
368                 if (i == 0 ||
369                     dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {
370                         current_burst = &bursts[num_bursts];
371                         current_burst->queue_flow = qfs[i];
372                         current_burst->count = 0;
373                         num_bursts++;
374                 }
375                 current_burst->count++;
376         }
377
378         qsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst);
379
380         return num_bursts;
381 }
382
383 static bool
384 dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
385                         int16_t load_limit)
386 {
387         bool below_limit = false;
388         uint16_t i;
389
390         for (i = 0; i < dsw->num_ports; i++) {
391                 int16_t load = rte_atomic16_read(&dsw->ports[i].load);
392                 if (load < load_limit)
393                         below_limit = true;
394                 port_loads[i] = load;
395         }
396         return below_limit;
397 }
398
399 static bool
400 dsw_select_migration_target(struct dsw_evdev *dsw,
401                             struct dsw_port *source_port,
402                             struct dsw_queue_flow_burst *bursts,
403                             uint16_t num_bursts, int16_t *port_loads,
404                             int16_t max_load, struct dsw_queue_flow *target_qf,
405                             uint8_t *target_port_id)
406 {
407         uint16_t source_load = port_loads[source_port->id];
408         uint16_t i;
409
410         for (i = 0; i < num_bursts; i++) {
411                 struct dsw_queue_flow *qf = &bursts[i].queue_flow;
412
413                 if (dsw_port_is_flow_paused(source_port, qf->queue_id,
414                                             qf->flow_hash))
415                         continue;
416
417                 struct dsw_queue *queue = &dsw->queues[qf->queue_id];
418                 int16_t target_load;
419
420                 dsw_find_lowest_load_port(queue->serving_ports,
421                                           queue->num_serving_ports,
422                                           source_port->id, port_loads,
423                                           target_port_id, &target_load);
424
425                 if (target_load < source_load &&
426                     target_load < max_load) {
427                         *target_qf = *qf;
428                         return true;
429                 }
430         }
431
432         DSW_LOG_DP_PORT(DEBUG, source_port->id, "For the %d flows considered, "
433                         "no target port found with load less than %d.\n",
434                         num_bursts, DSW_LOAD_TO_PERCENT(max_load));
435
436         return false;
437 }
438
439 static uint8_t
440 dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
441 {
442         struct dsw_queue *queue = &dsw->queues[queue_id];
443         uint8_t port_id;
444
445         if (queue->num_serving_ports > 1)
446                 port_id = queue->flow_to_port_map[flow_hash];
447         else
448                 /* A single-link queue, or atomic/ordered/parallel but
449                  * with just a single serving port.
450                  */
451                 port_id = queue->serving_ports[0];
452
453         DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
454                    "to port %d.\n", queue_id, flow_hash, port_id);
455
456         return port_id;
457 }
458
459 static void
460 dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
461                            uint8_t dest_port_id)
462 {
463         struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
464         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
465         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
466         uint16_t enqueued = 0;
467
468         if (*buffer_len == 0)
469                 return;
470
471         /* The rings are dimensioned to fit all in-flight events (even
472          * on a single ring), so looping will work.
473          */
474         do {
475                 enqueued +=
476                         rte_event_ring_enqueue_burst(dest_port->in_ring,
477                                                      buffer+enqueued,
478                                                      *buffer_len-enqueued,
479                                                      NULL);
480         } while (unlikely(enqueued != *buffer_len));
481
482         (*buffer_len) = 0;
483 }
484
485 static uint16_t
486 dsw_port_get_parallel_flow_id(struct dsw_port *port)
487 {
488         uint16_t flow_id = port->next_parallel_flow_id;
489
490         port->next_parallel_flow_id =
491                 (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
492
493         return flow_id;
494 }
495
496 static void
497 dsw_port_buffer_paused(struct dsw_port *port,
498                        const struct rte_event *paused_event)
499 {
500         port->paused_events[port->paused_events_len] = *paused_event;
501         port->paused_events_len++;
502 }
503
504 static void
505 dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
506                            uint8_t dest_port_id, const struct rte_event *event)
507 {
508         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
509         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
510
511         if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
512                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
513
514         buffer[*buffer_len] = *event;
515
516         (*buffer_len)++;
517 }
518
519 #define DSW_FLOW_ID_BITS (24)
520 static uint16_t
521 dsw_flow_id_hash(uint32_t flow_id)
522 {
523         uint16_t hash = 0;
524         uint16_t offset = 0;
525
526         do {
527                 hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
528                 offset += DSW_MAX_FLOWS_BITS;
529         } while (offset < DSW_FLOW_ID_BITS);
530
531         return hash;
532 }
533
534 static void
535 dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
536                          struct rte_event event)
537 {
538         uint8_t dest_port_id;
539
540         event.flow_id = dsw_port_get_parallel_flow_id(source_port);
541
542         dest_port_id = dsw_schedule(dsw, event.queue_id,
543                                     dsw_flow_id_hash(event.flow_id));
544
545         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
546 }
547
548 static void
549 dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
550                       const struct rte_event *event)
551 {
552         uint16_t flow_hash;
553         uint8_t dest_port_id;
554
555         if (unlikely(dsw->queues[event->queue_id].schedule_type ==
556                      RTE_SCHED_TYPE_PARALLEL)) {
557                 dsw_port_buffer_parallel(dsw, source_port, *event);
558                 return;
559         }
560
561         flow_hash = dsw_flow_id_hash(event->flow_id);
562
563         if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
564                                              flow_hash))) {
565                 dsw_port_buffer_paused(source_port, event);
566                 return;
567         }
568
569         dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
570
571         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
572 }
573
574 static void
575 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
576                              struct dsw_port *source_port,
577                              uint8_t queue_id, uint16_t paused_flow_hash)
578 {
579         uint16_t paused_events_len = source_port->paused_events_len;
580         struct rte_event paused_events[paused_events_len];
581         uint8_t dest_port_id;
582         uint16_t i;
583
584         if (paused_events_len == 0)
585                 return;
586
587         if (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash))
588                 return;
589
590         rte_memcpy(paused_events, source_port->paused_events,
591                    paused_events_len * sizeof(struct rte_event));
592
593         source_port->paused_events_len = 0;
594
595         dest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash);
596
597         for (i = 0; i < paused_events_len; i++) {
598                 struct rte_event *event = &paused_events[i];
599                 uint16_t flow_hash;
600
601                 flow_hash = dsw_flow_id_hash(event->flow_id);
602
603                 if (event->queue_id == queue_id &&
604                     flow_hash == paused_flow_hash)
605                         dsw_port_buffer_non_paused(dsw, source_port,
606                                                    dest_port_id, event);
607                 else
608                         dsw_port_buffer_paused(source_port, event);
609         }
610 }
611
612 static void
613 dsw_port_migration_stats(struct dsw_port *port)
614 {
615         uint64_t migration_latency;
616
617         migration_latency = (rte_get_timer_cycles() - port->migration_start);
618         port->migration_latency += migration_latency;
619         port->migrations++;
620 }
621
622 static void
623 dsw_port_end_migration(struct dsw_evdev *dsw, struct dsw_port *port)
624 {
625         uint8_t queue_id = port->migration_target_qf.queue_id;
626         uint16_t flow_hash = port->migration_target_qf.flow_hash;
627
628         port->migration_state = DSW_MIGRATION_STATE_IDLE;
629         port->seen_events_len = 0;
630
631         dsw_port_migration_stats(port);
632
633         if (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) {
634                 dsw_port_remove_paused_flow(port, queue_id, flow_hash);
635                 dsw_port_flush_paused_events(dsw, port, queue_id, flow_hash);
636         }
637
638         DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for queue_id "
639                         "%d flow_hash %d.\n", queue_id, flow_hash);
640 }
641
642 static void
643 dsw_port_consider_migration(struct dsw_evdev *dsw,
644                             struct dsw_port *source_port,
645                             uint64_t now)
646 {
647         bool any_port_below_limit;
648         struct dsw_queue_flow *seen_events = source_port->seen_events;
649         uint16_t seen_events_len = source_port->seen_events_len;
650         struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
651         uint16_t num_bursts;
652         int16_t source_port_load;
653         int16_t port_loads[dsw->num_ports];
654
655         if (now < source_port->next_migration)
656                 return;
657
658         if (dsw->num_ports == 1)
659                 return;
660
661         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering migration.\n");
662
663         /* Randomize interval to avoid having all threads considering
664          * migration at the same in point in time, which might lead to
665          * all choosing the same target port.
666          */
667         source_port->next_migration = now +
668                 source_port->migration_interval / 2 +
669                 rte_rand() % source_port->migration_interval;
670
671         if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
672                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
673                                 "Migration already in progress.\n");
674                 return;
675         }
676
677         /* For simplicity, avoid migration in the unlikely case there
678          * is still events to consume in the in_buffer (from the last
679          * migration).
680          */
681         if (source_port->in_buffer_len > 0) {
682                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
683                                 "events in the input buffer.\n");
684                 return;
685         }
686
687         source_port_load = rte_atomic16_read(&source_port->load);
688         if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
689                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
690                                 "Load %d is below threshold level %d.\n",
691                                 DSW_LOAD_TO_PERCENT(source_port_load),
692                        DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
693                 return;
694         }
695
696         /* Avoid starting any expensive operations (sorting etc), in
697          * case of a scenario with all ports above the load limit.
698          */
699         any_port_below_limit =
700                 dsw_retrieve_port_loads(dsw, port_loads,
701                                         DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
702         if (!any_port_below_limit) {
703                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
704                                 "Candidate target ports are all too highly "
705                                 "loaded.\n");
706                 return;
707         }
708
709         /* Sort flows into 'bursts' to allow attempting to migrating
710          * small (but still active) flows first - this it to avoid
711          * having large flows moving around the worker cores too much
712          * (to avoid cache misses, among other things). Of course, the
713          * number of recorded events (queue+flow ids) are limited, and
714          * provides only a snapshot, so only so many conclusions can
715          * be drawn from this data.
716          */
717         num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
718                                             bursts);
719         /* For non-big-little systems, there's no point in moving the
720          * only (known) flow.
721          */
722         if (num_bursts < 2) {
723                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
724                                 "queue_id %d flow_hash %d has been seen.\n",
725                                 bursts[0].queue_flow.queue_id,
726                                 bursts[0].queue_flow.flow_hash);
727                 return;
728         }
729
730         /* The strategy is to first try to find a flow to move to a
731          * port with low load (below the migration-attempt
732          * threshold). If that fails, we try to find a port which is
733          * below the max threshold, and also less loaded than this
734          * port is.
735          */
736         if (!dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
737                                          port_loads,
738                                          DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,
739                                          &source_port->migration_target_qf,
740                                          &source_port->migration_target_port_id)
741             &&
742             !dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
743                                          port_loads,
744                                          DSW_MAX_TARGET_LOAD_FOR_MIGRATION,
745                                          &source_port->migration_target_qf,
746                                        &source_port->migration_target_port_id))
747                 return;
748
749         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d "
750                         "flow_hash %d from port %d to port %d.\n",
751                         source_port->migration_target_qf.queue_id,
752                         source_port->migration_target_qf.flow_hash,
753                         source_port->id, source_port->migration_target_port_id);
754
755         /* We have a winner. */
756
757         source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
758         source_port->migration_start = rte_get_timer_cycles();
759
760         /* No need to go through the whole pause procedure for
761          * parallel queues, since atomic/ordered semantics need not to
762          * be maintained.
763          */
764
765         if (dsw->queues[source_port->migration_target_qf.queue_id].schedule_type
766             == RTE_SCHED_TYPE_PARALLEL) {
767                 uint8_t queue_id = source_port->migration_target_qf.queue_id;
768                 uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
769                 uint8_t dest_port_id = source_port->migration_target_port_id;
770
771                 /* Single byte-sized stores are always atomic. */
772                 dsw->queues[queue_id].flow_to_port_map[flow_hash] =
773                         dest_port_id;
774                 rte_smp_wmb();
775
776                 dsw_port_end_migration(dsw, source_port);
777
778                 return;
779         }
780
781         /* There might be 'loopback' events already scheduled in the
782          * output buffers.
783          */
784         dsw_port_flush_out_buffers(dsw, source_port);
785
786         dsw_port_add_paused_flow(source_port,
787                                  source_port->migration_target_qf.queue_id,
788                                  source_port->migration_target_qf.flow_hash);
789
790         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
791                                source_port->migration_target_qf.queue_id,
792                                source_port->migration_target_qf.flow_hash);
793         source_port->cfm_cnt = 0;
794 }
795
796 static void
797 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
798                              struct dsw_port *source_port,
799                              uint8_t queue_id, uint16_t paused_flow_hash);
800
801 static void
802 dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
803                              uint8_t originating_port_id, uint8_t queue_id,
804                              uint16_t paused_flow_hash)
805 {
806         struct dsw_ctl_msg cfm = {
807                 .type = DSW_CTL_CFM,
808                 .originating_port_id = port->id,
809                 .queue_id = queue_id,
810                 .flow_hash = paused_flow_hash
811         };
812
813         DSW_LOG_DP_PORT(DEBUG, port->id, "Un-pausing queue_id %d flow_hash %d.\n",
814                         queue_id, paused_flow_hash);
815
816         dsw_port_remove_paused_flow(port, queue_id, paused_flow_hash);
817
818         rte_smp_rmb();
819
820         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
821
822         dsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash);
823 }
824
825 #define FORWARD_BURST_SIZE (32)
826
827 static void
828 dsw_port_forward_migrated_flow(struct dsw_port *source_port,
829                                struct rte_event_ring *dest_ring,
830                                uint8_t queue_id,
831                                uint16_t flow_hash)
832 {
833         uint16_t events_left;
834
835         /* Control ring message should been seen before the ring count
836          * is read on the port's in_ring.
837          */
838         rte_smp_rmb();
839
840         events_left = rte_event_ring_count(source_port->in_ring);
841
842         while (events_left > 0) {
843                 uint16_t in_burst_size =
844                         RTE_MIN(FORWARD_BURST_SIZE, events_left);
845                 struct rte_event in_burst[in_burst_size];
846                 uint16_t in_len;
847                 uint16_t i;
848
849                 in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
850                                                       in_burst,
851                                                       in_burst_size, NULL);
852                 /* No need to care about bursting forwarded events (to
853                  * the destination port's in_ring), since migration
854                  * doesn't happen very often, and also the majority of
855                  * the dequeued events will likely *not* be forwarded.
856                  */
857                 for (i = 0; i < in_len; i++) {
858                         struct rte_event *e = &in_burst[i];
859                         if (e->queue_id == queue_id &&
860                             dsw_flow_id_hash(e->flow_id) == flow_hash) {
861                                 while (rte_event_ring_enqueue_burst(dest_ring,
862                                                                     e, 1,
863                                                                     NULL) != 1)
864                                         rte_pause();
865                         } else {
866                                 uint16_t last_idx = source_port->in_buffer_len;
867                                 source_port->in_buffer[last_idx] = *e;
868                                 source_port->in_buffer_len++;
869                         }
870                 }
871
872                 events_left -= in_len;
873         }
874 }
875
876 static void
877 dsw_port_move_migrating_flow(struct dsw_evdev *dsw,
878                              struct dsw_port *source_port)
879 {
880         uint8_t queue_id = source_port->migration_target_qf.queue_id;
881         uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
882         uint8_t dest_port_id = source_port->migration_target_port_id;
883         struct dsw_port *dest_port = &dsw->ports[dest_port_id];
884
885         dsw_port_flush_out_buffers(dsw, source_port);
886
887         rte_smp_wmb();
888
889         dsw->queues[queue_id].flow_to_port_map[flow_hash] =
890                 dest_port_id;
891
892         dsw_port_forward_migrated_flow(source_port, dest_port->in_ring,
893                                        queue_id, flow_hash);
894
895         /* Flow table update and migration destination port's enqueues
896          * must be seen before the control message.
897          */
898         rte_smp_wmb();
899
900         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id,
901                                flow_hash);
902         source_port->cfm_cnt = 0;
903         source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
904 }
905
906 static void
907 dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
908 {
909         port->cfm_cnt++;
910
911         if (port->cfm_cnt == (dsw->num_ports-1)) {
912                 switch (port->migration_state) {
913                 case DSW_MIGRATION_STATE_PAUSING:
914                         DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
915                                         "migration state.\n");
916                         port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
917                         break;
918                 case DSW_MIGRATION_STATE_UNPAUSING:
919                         dsw_port_end_migration(dsw, port);
920                         break;
921                 default:
922                         RTE_ASSERT(0);
923                         break;
924                 }
925         }
926 }
927
928 static void
929 dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
930 {
931         struct dsw_ctl_msg msg;
932
933         /* So any table loads happens before the ring dequeue, in the
934          * case of a 'paus' message.
935          */
936         rte_smp_rmb();
937
938         if (dsw_port_ctl_dequeue(port, &msg) == 0) {
939                 switch (msg.type) {
940                 case DSW_CTL_PAUS_REQ:
941                         dsw_port_handle_pause_flow(dsw, port,
942                                                    msg.originating_port_id,
943                                                    msg.queue_id, msg.flow_hash);
944                         break;
945                 case DSW_CTL_UNPAUS_REQ:
946                         dsw_port_handle_unpause_flow(dsw, port,
947                                                      msg.originating_port_id,
948                                                      msg.queue_id,
949                                                      msg.flow_hash);
950                         break;
951                 case DSW_CTL_CFM:
952                         dsw_port_handle_confirm(dsw, port);
953                         break;
954                 }
955         }
956 }
957
958 static void
959 dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
960 {
961         /* To pull the control ring reasonbly often on busy ports,
962          * each dequeued/enqueued event is considered an 'op' too.
963          */
964         port->ops_since_bg_task += (num_events+1);
965 }
966
967 static void
968 dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
969 {
970         if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
971                      port->pending_releases == 0))
972                 dsw_port_move_migrating_flow(dsw, port);
973
974         /* Polling the control ring is relatively inexpensive, and
975          * polling it often helps bringing down migration latency, so
976          * do this for every iteration.
977          */
978         dsw_port_ctl_process(dsw, port);
979
980         /* To avoid considering migration and flushing output buffers
981          * on every dequeue/enqueue call, the scheduler only performs
982          * such 'background' tasks every nth
983          * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
984          */
985         if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
986                 uint64_t now;
987
988                 now = rte_get_timer_cycles();
989
990                 port->last_bg = now;
991
992                 /* Logic to avoid having events linger in the output
993                  * buffer too long.
994                  */
995                 dsw_port_flush_out_buffers(dsw, port);
996
997                 dsw_port_consider_load_update(port, now);
998
999                 dsw_port_consider_migration(dsw, port, now);
1000
1001                 port->ops_since_bg_task = 0;
1002         }
1003 }
1004
1005 static void
1006 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
1007 {
1008         uint16_t dest_port_id;
1009
1010         for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
1011                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
1012 }
1013
1014 uint16_t
1015 dsw_event_enqueue(void *port, const struct rte_event *ev)
1016 {
1017         return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
1018 }
1019
1020 static __rte_always_inline uint16_t
1021 dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
1022                                 uint16_t events_len, bool op_types_known,
1023                                 uint16_t num_new, uint16_t num_release,
1024                                 uint16_t num_non_release)
1025 {
1026         struct dsw_port *source_port = port;
1027         struct dsw_evdev *dsw = source_port->dsw;
1028         bool enough_credits;
1029         uint16_t i;
1030
1031         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
1032                         "events to port %d.\n", events_len, source_port->id);
1033
1034         dsw_port_bg_process(dsw, source_port);
1035
1036         /* XXX: For performance (=ring efficiency) reasons, the
1037          * scheduler relies on internal non-ring buffers instead of
1038          * immediately sending the event to the destination ring. For
1039          * a producer that doesn't intend to produce or consume any
1040          * more events, the scheduler provides a way to flush the
1041          * buffer, by means of doing an enqueue of zero events. In
1042          * addition, a port cannot be left "unattended" (e.g. unused)
1043          * for long periods of time, since that would stall
1044          * migration. Eventdev API extensions to provide a cleaner way
1045          * to archieve both of these functions should be
1046          * considered.
1047          */
1048         if (unlikely(events_len == 0)) {
1049                 dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
1050                 return 0;
1051         }
1052
1053         if (unlikely(events_len > source_port->enqueue_depth))
1054                 events_len = source_port->enqueue_depth;
1055
1056         dsw_port_note_op(source_port, events_len);
1057
1058         if (!op_types_known)
1059                 for (i = 0; i < events_len; i++) {
1060                         switch (events[i].op) {
1061                         case RTE_EVENT_OP_RELEASE:
1062                                 num_release++;
1063                                 break;
1064                         case RTE_EVENT_OP_NEW:
1065                                 num_new++;
1066                                 /* Falls through. */
1067                         default:
1068                                 num_non_release++;
1069                                 break;
1070                         }
1071                 }
1072
1073         /* Technically, we could allow the non-new events up to the
1074          * first new event in the array into the system, but for
1075          * simplicity reasons, we deny the whole burst if the port is
1076          * above the water mark.
1077          */
1078         if (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >
1079                      source_port->new_event_threshold))
1080                 return 0;
1081
1082         enough_credits = dsw_port_acquire_credits(dsw, source_port,
1083                                                   num_non_release);
1084         if (unlikely(!enough_credits))
1085                 return 0;
1086
1087         source_port->pending_releases -= num_release;
1088
1089         dsw_port_enqueue_stats(source_port, num_new,
1090                                num_non_release-num_new, num_release);
1091
1092         for (i = 0; i < events_len; i++) {
1093                 const struct rte_event *event = &events[i];
1094
1095                 if (likely(num_release == 0 ||
1096                            event->op != RTE_EVENT_OP_RELEASE))
1097                         dsw_port_buffer_event(dsw, source_port, event);
1098                 dsw_port_queue_enqueue_stats(source_port, event->queue_id);
1099         }
1100
1101         DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
1102                         "accepted.\n", num_non_release);
1103
1104         return num_non_release;
1105 }
1106
1107 uint16_t
1108 dsw_event_enqueue_burst(void *port, const struct rte_event events[],
1109                         uint16_t events_len)
1110 {
1111         return dsw_event_enqueue_burst_generic(port, events, events_len, false,
1112                                                0, 0, 0);
1113 }
1114
1115 uint16_t
1116 dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
1117                             uint16_t events_len)
1118 {
1119         return dsw_event_enqueue_burst_generic(port, events, events_len, true,
1120                                                events_len, 0, events_len);
1121 }
1122
1123 uint16_t
1124 dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
1125                                 uint16_t events_len)
1126 {
1127         return dsw_event_enqueue_burst_generic(port, events, events_len, true,
1128                                                0, 0, events_len);
1129 }
1130
1131 uint16_t
1132 dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
1133 {
1134         return dsw_event_dequeue_burst(port, events, 1, wait);
1135 }
1136
1137 static void
1138 dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
1139                             uint16_t num)
1140 {
1141         uint16_t i;
1142
1143         dsw_port_dequeue_stats(port, num);
1144
1145         for (i = 0; i < num; i++) {
1146                 uint16_t l_idx = port->seen_events_idx;
1147                 struct dsw_queue_flow *qf = &port->seen_events[l_idx];
1148                 struct rte_event *event = &events[i];
1149                 qf->queue_id = event->queue_id;
1150                 qf->flow_hash = dsw_flow_id_hash(event->flow_id);
1151
1152                 port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
1153
1154                 dsw_port_queue_dequeued_stats(port, event->queue_id);
1155         }
1156
1157         if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
1158                 port->seen_events_len =
1159                         RTE_MIN(port->seen_events_len + num,
1160                                 DSW_MAX_EVENTS_RECORDED);
1161 }
1162
1163 #ifdef DSW_SORT_DEQUEUED
1164
1165 #define DSW_EVENT_TO_INT(_event)                                \
1166         ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
1167
1168 static inline int
1169 dsw_cmp_event(const void *v_event_a, const void *v_event_b)
1170 {
1171         const struct rte_event *event_a = v_event_a;
1172         const struct rte_event *event_b = v_event_b;
1173
1174         return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
1175 }
1176 #endif
1177
1178 static uint16_t
1179 dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
1180                        uint16_t num)
1181 {
1182         struct dsw_port *source_port = port;
1183         struct dsw_evdev *dsw = source_port->dsw;
1184
1185         dsw_port_ctl_process(dsw, source_port);
1186
1187         if (unlikely(port->in_buffer_len > 0)) {
1188                 uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
1189
1190                 rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
1191                            dequeued * sizeof(struct rte_event));
1192
1193                 port->in_buffer_start += dequeued;
1194                 port->in_buffer_len -= dequeued;
1195
1196                 if (port->in_buffer_len == 0)
1197                         port->in_buffer_start = 0;
1198
1199                 return dequeued;
1200         }
1201
1202         return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
1203 }
1204
1205 uint16_t
1206 dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
1207                         uint64_t wait __rte_unused)
1208 {
1209         struct dsw_port *source_port = port;
1210         struct dsw_evdev *dsw = source_port->dsw;
1211         uint16_t dequeued;
1212
1213         source_port->pending_releases = 0;
1214
1215         dsw_port_bg_process(dsw, source_port);
1216
1217         if (unlikely(num > source_port->dequeue_depth))
1218                 num = source_port->dequeue_depth;
1219
1220         dequeued = dsw_port_dequeue_burst(source_port, events, num);
1221
1222         source_port->pending_releases = dequeued;
1223
1224         dsw_port_load_record(source_port, dequeued);
1225
1226         dsw_port_note_op(source_port, dequeued);
1227
1228         if (dequeued > 0) {
1229                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
1230                                 dequeued);
1231
1232                 dsw_port_return_credits(dsw, source_port, dequeued);
1233
1234                 /* One potential optimization one might think of is to
1235                  * add a migration state (prior to 'pausing'), and
1236                  * only record seen events when the port is in this
1237                  * state (and transit to 'pausing' when enough events
1238                  * have been gathered). However, that schema doesn't
1239                  * seem to improve performance.
1240                  */
1241                 dsw_port_record_seen_events(port, events, dequeued);
1242         }
1243         /* XXX: Assuming the port can't produce any more work,
1244          *      consider flushing the output buffer, on dequeued ==
1245          *      0.
1246          */
1247
1248 #ifdef DSW_SORT_DEQUEUED
1249         dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
1250 #endif
1251
1252         return dequeued;
1253 }