New upstream version 18.08
[deb_dpdk.git] / drivers / event / opdl / opdl_ring.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation
3  */
4
5 #include <stdbool.h>
6 #include <stddef.h>
7 #include <stdint.h>
8 #include <stdio.h>
9
10 #include <rte_branch_prediction.h>
11 #include <rte_debug.h>
12 #include <rte_lcore.h>
13 #include <rte_log.h>
14 #include <rte_malloc.h>
15 #include <rte_memcpy.h>
16 #include <rte_memory.h>
17 #include <rte_memzone.h>
18 #include <rte_eal_memconfig.h>
19
20 #include "opdl_ring.h"
21 #include "opdl_log.h"
22
23 #define LIB_NAME "opdl_ring"
24
25 #define OPDL_NAME_SIZE 64
26
27
28 #define OPDL_EVENT_MASK  (0x00000000000FFFFFULL)
29 #define OPDL_FLOWID_MASK (0xFFFFF)
30 #define OPDL_OPA_MASK    (0xFF)
31 #define OPDL_OPA_OFFSET  (0x38)
32
33 int opdl_logtype_driver;
34
35 /* Types of dependency between stages */
36 enum dep_type {
37         DEP_NONE = 0,  /* no dependency */
38         DEP_DIRECT,  /* stage has direct dependency */
39         DEP_INDIRECT,  /* in-direct dependency through other stage(s) */
40         DEP_SELF,  /* stage dependency on itself, used to detect loops */
41 };
42
43 /* Shared section of stage state.
44  * Care is needed when accessing and the layout is important, especially to
45  * limit the adjacent cache-line HW prefetcher from impacting performance.
46  */
47 struct shared_state {
48         /* Last known minimum sequence number of dependencies, used for multi
49          * thread operation
50          */
51         uint32_t available_seq;
52         char _pad1[RTE_CACHE_LINE_SIZE * 3];
53         uint32_t head;  /* Head sequence number (for multi thread operation) */
54         char _pad2[RTE_CACHE_LINE_SIZE * 3];
55         struct opdl_stage *stage;  /* back pointer */
56         uint32_t tail;  /* Tail sequence number */
57         char _pad3[RTE_CACHE_LINE_SIZE * 2];
58 } __rte_cache_aligned;
59
60 /* A structure to keep track of "unfinished" claims. This is only used for
61  * stages that are threadsafe. Each lcore accesses its own instance of this
62  * structure to record the entries it has claimed. This allows one lcore to make
63  * multiple claims without being blocked by another. When disclaiming it moves
64  * forward the shared tail when the shared tail matches the tail value recorded
65  * here.
66  */
67 struct claim_manager {
68         uint32_t num_to_disclaim;
69         uint32_t num_claimed;
70         uint32_t mgr_head;
71         uint32_t mgr_tail;
72         struct {
73                 uint32_t head;
74                 uint32_t tail;
75         } claims[OPDL_DISCLAIMS_PER_LCORE];
76 } __rte_cache_aligned;
77
78 /* Context for each stage of opdl_ring.
79  * Calculations on sequence numbers need to be done with other uint32_t values
80  * so that results are modulus 2^32, and not undefined.
81  */
82 struct opdl_stage {
83         struct opdl_ring *t;  /* back pointer, set at init */
84         uint32_t num_slots;  /* Number of slots for entries, set at init */
85         uint32_t index;  /* ID for this stage, set at init */
86         bool threadsafe;  /* Set to 1 if this stage supports threadsafe use */
87         /* Last known min seq number of dependencies for used for single thread
88          * operation
89          */
90         uint32_t available_seq;
91         uint32_t head;  /* Current head for single-thread operation */
92         uint32_t nb_instance;  /* Number of instances */
93         uint32_t instance_id;  /* ID of this stage instance */
94         uint16_t num_claimed;  /* Number of slots claimed */
95         uint16_t num_event;             /* Number of events */
96         uint32_t seq;                   /* sequence number  */
97         uint32_t num_deps;  /* Number of direct dependencies */
98         /* Keep track of all dependencies, used during init only */
99         enum dep_type *dep_tracking;
100         /* Direct dependencies of this stage */
101         struct shared_state **deps;
102         /* Other stages read this! */
103         struct shared_state shared __rte_cache_aligned;
104         /* For managing disclaims in multi-threaded processing stages */
105         struct claim_manager pending_disclaims[RTE_MAX_LCORE]
106                                                __rte_cache_aligned;
107         uint32_t shadow_head;  /* Shadow head for single-thread operation */
108         uint32_t queue_id;     /* ID of Queue which is assigned to this stage */
109         uint32_t pos;           /* Atomic scan position */
110 } __rte_cache_aligned;
111
112 /* Context for opdl_ring */
113 struct opdl_ring {
114         char name[OPDL_NAME_SIZE];  /* OPDL queue instance name */
115         int socket;  /* NUMA socket that memory is allocated on */
116         uint32_t num_slots;  /* Number of slots for entries */
117         uint32_t mask;  /* Mask for sequence numbers (num_slots - 1) */
118         uint32_t slot_size;  /* Size of each slot in bytes */
119         uint32_t num_stages;  /* Number of stages that have been added */
120         uint32_t max_num_stages;  /* Max number of stages */
121         /* Stages indexed by ID */
122         struct opdl_stage *stages;
123         /* Memory for storing slot data */
124         uint8_t slots[0] __rte_cache_aligned;
125 };
126
127
128 /* Return input stage of a opdl_ring */
129 static __rte_always_inline struct opdl_stage *
130 input_stage(const struct opdl_ring *t)
131 {
132         return &t->stages[0];
133 }
134
135 /* Check if a stage is the input stage */
136 static __rte_always_inline bool
137 is_input_stage(const struct opdl_stage *s)
138 {
139         return s->index == 0;
140 }
141
142 /* Get slot pointer from sequence number */
143 static __rte_always_inline void *
144 get_slot(const struct opdl_ring *t, uint32_t n)
145 {
146         return (void *)(uintptr_t)&t->slots[(n & t->mask) * t->slot_size];
147 }
148
149 /* Find how many entries are available for processing */
150 static __rte_always_inline uint32_t
151 available(const struct opdl_stage *s)
152 {
153         if (s->threadsafe == true) {
154                 uint32_t n = __atomic_load_n(&s->shared.available_seq,
155                                 __ATOMIC_ACQUIRE) -
156                                 __atomic_load_n(&s->shared.head,
157                                 __ATOMIC_ACQUIRE);
158
159                 /* Return 0 if available_seq needs to be updated */
160                 return (n <= s->num_slots) ? n : 0;
161         }
162
163         /* Single threaded */
164         return s->available_seq - s->head;
165 }
166
167 /* Read sequence number of dependencies and find minimum */
168 static __rte_always_inline void
169 update_available_seq(struct opdl_stage *s)
170 {
171         uint32_t i;
172         uint32_t this_tail = s->shared.tail;
173         uint32_t min_seq = __atomic_load_n(&s->deps[0]->tail, __ATOMIC_ACQUIRE);
174         /* Input stage sequence numbers are greater than the sequence numbers of
175          * its dependencies so an offset of t->num_slots is needed when
176          * calculating available slots and also the condition which is used to
177          * determine the dependencies minimum sequence number must be reverted.
178          */
179         uint32_t wrap;
180
181         if (is_input_stage(s)) {
182                 wrap = s->num_slots;
183                 for (i = 1; i < s->num_deps; i++) {
184                         uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
185                                         __ATOMIC_ACQUIRE);
186                         if ((this_tail - seq) > (this_tail - min_seq))
187                                 min_seq = seq;
188                 }
189         } else {
190                 wrap = 0;
191                 for (i = 1; i < s->num_deps; i++) {
192                         uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
193                                         __ATOMIC_ACQUIRE);
194                         if ((seq - this_tail) < (min_seq - this_tail))
195                                 min_seq = seq;
196                 }
197         }
198
199         if (s->threadsafe == false)
200                 s->available_seq = min_seq + wrap;
201         else
202                 __atomic_store_n(&s->shared.available_seq, min_seq + wrap,
203                                 __ATOMIC_RELEASE);
204 }
205
206 /* Wait until the number of available slots reaches number requested */
207 static __rte_always_inline void
208 wait_for_available(struct opdl_stage *s, uint32_t n)
209 {
210         while (available(s) < n) {
211                 rte_pause();
212                 update_available_seq(s);
213         }
214 }
215
216 /* Return number of slots to process based on number requested and mode */
217 static __rte_always_inline uint32_t
218 num_to_process(struct opdl_stage *s, uint32_t n, bool block)
219 {
220         /* Don't read tail sequences of dependencies if not needed */
221         if (available(s) >= n)
222                 return n;
223
224         update_available_seq(s);
225
226         if (block == false) {
227                 uint32_t avail = available(s);
228
229                 if (avail == 0) {
230                         rte_pause();
231                         return 0;
232                 }
233                 return (avail <= n) ? avail : n;
234         }
235
236         if (unlikely(n > s->num_slots)) {
237                 PMD_DRV_LOG(ERR, "%u entries is more than max (%u)",
238                                 n, s->num_slots);
239                 return 0;  /* Avoid infinite loop */
240         }
241         /* blocking */
242         wait_for_available(s, n);
243         return n;
244 }
245
246 /* Copy entries in to slots with wrap-around */
247 static __rte_always_inline void
248 copy_entries_in(struct opdl_ring *t, uint32_t start, const void *entries,
249                 uint32_t num_entries)
250 {
251         uint32_t slot_size = t->slot_size;
252         uint32_t slot_index = start & t->mask;
253
254         if (slot_index + num_entries <= t->num_slots) {
255                 rte_memcpy(get_slot(t, start), entries,
256                                 num_entries * slot_size);
257         } else {
258                 uint32_t split = t->num_slots - slot_index;
259
260                 rte_memcpy(get_slot(t, start), entries, split * slot_size);
261                 rte_memcpy(get_slot(t, 0),
262                                 RTE_PTR_ADD(entries, split * slot_size),
263                                 (num_entries - split) * slot_size);
264         }
265 }
266
267 /* Copy entries out from slots with wrap-around */
268 static __rte_always_inline void
269 copy_entries_out(struct opdl_ring *t, uint32_t start, void *entries,
270                 uint32_t num_entries)
271 {
272         uint32_t slot_size = t->slot_size;
273         uint32_t slot_index = start & t->mask;
274
275         if (slot_index + num_entries <= t->num_slots) {
276                 rte_memcpy(entries, get_slot(t, start),
277                                 num_entries * slot_size);
278         } else {
279                 uint32_t split = t->num_slots - slot_index;
280
281                 rte_memcpy(entries, get_slot(t, start), split * slot_size);
282                 rte_memcpy(RTE_PTR_ADD(entries, split * slot_size),
283                                 get_slot(t, 0),
284                                 (num_entries - split) * slot_size);
285         }
286 }
287
288 /* Input function optimised for single thread */
289 static __rte_always_inline uint32_t
290 opdl_ring_input_singlethread(struct opdl_ring *t, const void *entries,
291                 uint32_t num_entries, bool block)
292 {
293         struct opdl_stage *s = input_stage(t);
294         uint32_t head = s->head;
295
296         num_entries = num_to_process(s, num_entries, block);
297         if (num_entries == 0)
298                 return 0;
299
300         copy_entries_in(t, head, entries, num_entries);
301
302         s->head += num_entries;
303         __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
304
305         return num_entries;
306 }
307
308 /* Convert head and tail of claim_manager into valid index */
309 static __rte_always_inline uint32_t
310 claim_mgr_index(uint32_t n)
311 {
312         return n & (OPDL_DISCLAIMS_PER_LCORE - 1);
313 }
314
315 /* Check if there are available slots in claim_manager */
316 static __rte_always_inline bool
317 claim_mgr_available(struct claim_manager *mgr)
318 {
319         return (mgr->mgr_head < (mgr->mgr_tail + OPDL_DISCLAIMS_PER_LCORE)) ?
320                         true : false;
321 }
322
323 /* Record a new claim. Only use after first checking an entry is available */
324 static __rte_always_inline void
325 claim_mgr_add(struct claim_manager *mgr, uint32_t tail, uint32_t head)
326 {
327         if ((mgr->mgr_head != mgr->mgr_tail) &&
328                         (mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head ==
329                         tail)) {
330                 /* Combine with previous claim */
331                 mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head = head;
332         } else {
333                 mgr->claims[claim_mgr_index(mgr->mgr_head)].head = head;
334                 mgr->claims[claim_mgr_index(mgr->mgr_head)].tail = tail;
335                 mgr->mgr_head++;
336         }
337
338         mgr->num_claimed += (head - tail);
339 }
340
341 /* Read the oldest recorded claim */
342 static __rte_always_inline bool
343 claim_mgr_read(struct claim_manager *mgr, uint32_t *tail, uint32_t *head)
344 {
345         if (mgr->mgr_head == mgr->mgr_tail)
346                 return false;
347
348         *head = mgr->claims[claim_mgr_index(mgr->mgr_tail)].head;
349         *tail = mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail;
350         return true;
351 }
352
353 /* Remove the oldest recorded claim. Only use after first reading the entry */
354 static __rte_always_inline void
355 claim_mgr_remove(struct claim_manager *mgr)
356 {
357         mgr->num_claimed -= (mgr->claims[claim_mgr_index(mgr->mgr_tail)].head -
358                         mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail);
359         mgr->mgr_tail++;
360 }
361
362 /* Update tail in the oldest claim. Only use after first reading the entry */
363 static __rte_always_inline void
364 claim_mgr_move_tail(struct claim_manager *mgr, uint32_t num_entries)
365 {
366         mgr->num_claimed -= num_entries;
367         mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail += num_entries;
368 }
369
370 static __rte_always_inline void
371 opdl_stage_disclaim_multithread_n(struct opdl_stage *s,
372                 uint32_t num_entries, bool block)
373 {
374         struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
375         uint32_t head;
376         uint32_t tail;
377
378         while (num_entries) {
379                 bool ret = claim_mgr_read(disclaims, &tail, &head);
380
381                 if (ret == false)
382                         break;  /* nothing is claimed */
383                 /* There should be no race condition here. If shared.tail
384                  * matches, no other core can update it until this one does.
385                  */
386                 if (__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) ==
387                                 tail) {
388                         if (num_entries >= (head - tail)) {
389                                 claim_mgr_remove(disclaims);
390                                 __atomic_store_n(&s->shared.tail, head,
391                                                 __ATOMIC_RELEASE);
392                                 num_entries -= (head - tail);
393                         } else {
394                                 claim_mgr_move_tail(disclaims, num_entries);
395                                 __atomic_store_n(&s->shared.tail,
396                                                 num_entries + tail,
397                                                 __ATOMIC_RELEASE);
398                                 num_entries = 0;
399                         }
400                 } else if (block == false)
401                         break;  /* blocked by other thread */
402                 /* Keep going until num_entries are disclaimed. */
403                 rte_pause();
404         }
405
406         disclaims->num_to_disclaim = num_entries;
407 }
408
409 /* Move head atomically, returning number of entries available to process and
410  * the original value of head. For non-input stages, the claim is recorded
411  * so that the tail can be updated later by opdl_stage_disclaim().
412  */
413 static __rte_always_inline void
414 move_head_atomically(struct opdl_stage *s, uint32_t *num_entries,
415                 uint32_t *old_head, bool block, bool claim_func)
416 {
417         uint32_t orig_num_entries = *num_entries;
418         uint32_t ret;
419         struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
420
421         /* Attempt to disclaim any outstanding claims */
422         opdl_stage_disclaim_multithread_n(s, disclaims->num_to_disclaim,
423                         false);
424
425         *old_head = __atomic_load_n(&s->shared.head, __ATOMIC_ACQUIRE);
426         while (true) {
427                 bool success;
428                 /* If called by opdl_ring_input(), claim does not need to be
429                  * recorded, as there will be no disclaim.
430                  */
431                 if (claim_func) {
432                         /* Check that the claim can be recorded */
433                         ret = claim_mgr_available(disclaims);
434                         if (ret == false) {
435                                 /* exit out if claim can't be recorded */
436                                 *num_entries = 0;
437                                 return;
438                         }
439                 }
440
441                 *num_entries = num_to_process(s, orig_num_entries, block);
442                 if (*num_entries == 0)
443                         return;
444
445                 success = __atomic_compare_exchange_n(&s->shared.head, old_head,
446                                 *old_head + *num_entries,
447                                 true,  /* may fail spuriously */
448                                 __ATOMIC_RELEASE,  /* memory order on success */
449                                 __ATOMIC_ACQUIRE);  /* memory order on fail */
450                 if (likely(success))
451                         break;
452                 rte_pause();
453         }
454
455         if (claim_func)
456                 /* Store the claim record */
457                 claim_mgr_add(disclaims, *old_head, *old_head + *num_entries);
458 }
459
460 /* Input function that supports multiple threads */
461 static __rte_always_inline uint32_t
462 opdl_ring_input_multithread(struct opdl_ring *t, const void *entries,
463                 uint32_t num_entries, bool block)
464 {
465         struct opdl_stage *s = input_stage(t);
466         uint32_t old_head;
467
468         move_head_atomically(s, &num_entries, &old_head, block, false);
469         if (num_entries == 0)
470                 return 0;
471
472         copy_entries_in(t, old_head, entries, num_entries);
473
474         /* If another thread started inputting before this one, but hasn't
475          * finished, we need to wait for it to complete to update the tail.
476          */
477         while (unlikely(__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) !=
478                         old_head))
479                 rte_pause();
480
481         __atomic_store_n(&s->shared.tail, old_head + num_entries,
482                         __ATOMIC_RELEASE);
483
484         return num_entries;
485 }
486
487 static __rte_always_inline uint32_t
488 opdl_first_entry_id(uint32_t start_seq, uint8_t nb_p_lcores,
489                 uint8_t this_lcore)
490 {
491         return ((nb_p_lcores <= 1) ? 0 :
492                         (nb_p_lcores - (start_seq % nb_p_lcores) + this_lcore) %
493                         nb_p_lcores);
494 }
495
496 /* Claim slots to process, optimised for single-thread operation */
497 static __rte_always_inline uint32_t
498 opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
499                 uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
500 {
501         uint32_t i = 0, j = 0,  offset;
502         uint32_t opa_id   = 0;
503         uint32_t flow_id  = 0;
504         uint64_t event    = 0;
505         void *get_slots;
506         struct rte_event *ev;
507         RTE_SET_USED(seq);
508         struct opdl_ring *t = s->t;
509         uint8_t *entries_offset = (uint8_t *)entries;
510
511         if (!atomic) {
512
513                 offset = opdl_first_entry_id(s->seq, s->nb_instance,
514                                 s->instance_id);
515
516                 num_entries = s->nb_instance * num_entries;
517
518                 num_entries = num_to_process(s, num_entries, block);
519
520                 for (; offset < num_entries; offset += s->nb_instance) {
521                         get_slots = get_slot(t, s->head + offset);
522                         memcpy(entries_offset, get_slots, t->slot_size);
523                         entries_offset += t->slot_size;
524                         i++;
525                 }
526         } else {
527                 num_entries = num_to_process(s, num_entries, block);
528
529                 for (j = 0; j < num_entries; j++) {
530                         ev = (struct rte_event *)get_slot(t, s->head+j);
531
532                         event  = __atomic_load_n(&(ev->event),
533                                         __ATOMIC_ACQUIRE);
534
535                         opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
536                         flow_id  = OPDL_FLOWID_MASK & event;
537
538                         if (opa_id >= s->queue_id)
539                                 continue;
540
541                         if ((flow_id % s->nb_instance) == s->instance_id) {
542                                 memcpy(entries_offset, ev, t->slot_size);
543                                 entries_offset += t->slot_size;
544                                 i++;
545                         }
546                 }
547         }
548         s->shadow_head = s->head;
549         s->head += num_entries;
550         s->num_claimed = num_entries;
551         s->num_event = i;
552         s->pos = 0;
553
554         /* automatically disclaim entries if number of rte_events is zero */
555         if (unlikely(i == 0))
556                 opdl_stage_disclaim(s, 0, false);
557
558         return i;
559 }
560
561 /* Thread-safe version of function to claim slots for processing */
562 static __rte_always_inline uint32_t
563 opdl_stage_claim_multithread(struct opdl_stage *s, void *entries,
564                 uint32_t num_entries, uint32_t *seq, bool block)
565 {
566         uint32_t old_head;
567         struct opdl_ring *t = s->t;
568         uint32_t i = 0, offset;
569         uint8_t *entries_offset = (uint8_t *)entries;
570
571         if (seq == NULL) {
572                 PMD_DRV_LOG(ERR, "Invalid seq PTR");
573                 return 0;
574         }
575         offset = opdl_first_entry_id(*seq, s->nb_instance, s->instance_id);
576         num_entries = offset + (s->nb_instance * num_entries);
577
578         move_head_atomically(s, &num_entries, &old_head, block, true);
579
580         for (; offset < num_entries; offset += s->nb_instance) {
581                 memcpy(entries_offset, get_slot(t, s->head + offset),
582                         t->slot_size);
583                 entries_offset += t->slot_size;
584                 i++;
585         }
586
587         *seq = old_head;
588
589         return i;
590 }
591
592 /* Claim and copy slot pointers, optimised for single-thread operation */
593 static __rte_always_inline uint32_t
594 opdl_stage_claim_copy_singlethread(struct opdl_stage *s, void *entries,
595                 uint32_t num_entries, uint32_t *seq, bool block)
596 {
597         num_entries = num_to_process(s, num_entries, block);
598         if (num_entries == 0)
599                 return 0;
600         copy_entries_out(s->t, s->head, entries, num_entries);
601         if (seq != NULL)
602                 *seq = s->head;
603         s->head += num_entries;
604         return num_entries;
605 }
606
607 /* Thread-safe version of function to claim and copy pointers to slots */
608 static __rte_always_inline uint32_t
609 opdl_stage_claim_copy_multithread(struct opdl_stage *s, void *entries,
610                 uint32_t num_entries, uint32_t *seq, bool block)
611 {
612         uint32_t old_head;
613
614         move_head_atomically(s, &num_entries, &old_head, block, true);
615         if (num_entries == 0)
616                 return 0;
617         copy_entries_out(s->t, old_head, entries, num_entries);
618         if (seq != NULL)
619                 *seq = old_head;
620         return num_entries;
621 }
622
623 static __rte_always_inline void
624 opdl_stage_disclaim_singlethread_n(struct opdl_stage *s,
625                 uint32_t num_entries)
626 {
627         uint32_t old_tail = s->shared.tail;
628
629         if (unlikely(num_entries > (s->head - old_tail))) {
630                 PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
631                                 num_entries, s->head - old_tail);
632                 num_entries = s->head - old_tail;
633         }
634         __atomic_store_n(&s->shared.tail, num_entries + old_tail,
635                         __ATOMIC_RELEASE);
636 }
637
638 uint32_t
639 opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries,
640                 bool block)
641 {
642         if (input_stage(t)->threadsafe == false)
643                 return opdl_ring_input_singlethread(t, entries, num_entries,
644                                 block);
645         else
646                 return opdl_ring_input_multithread(t, entries, num_entries,
647                                 block);
648 }
649
650 uint32_t
651 opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s,
652                 const void *entries, uint32_t num_entries, bool block)
653 {
654         uint32_t head = s->head;
655
656         num_entries = num_to_process(s, num_entries, block);
657
658         if (num_entries == 0)
659                 return 0;
660
661         copy_entries_in(t, head, entries, num_entries);
662
663         s->head += num_entries;
664         __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
665
666         return num_entries;
667
668 }
669
670 uint32_t
671 opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s,
672                 void *entries, uint32_t num_entries, bool block)
673 {
674         uint32_t head = s->head;
675
676         num_entries = num_to_process(s, num_entries, block);
677         if (num_entries == 0)
678                 return 0;
679
680         copy_entries_out(t, head, entries, num_entries);
681
682         s->head += num_entries;
683         __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
684
685         return num_entries;
686 }
687
688 uint32_t
689 opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries)
690 {
691         /* return (num_to_process(s, num_entries, false)); */
692
693         if (available(s) >= num_entries)
694                 return num_entries;
695
696         update_available_seq(s);
697
698         uint32_t avail = available(s);
699
700         if (avail == 0) {
701                 rte_pause();
702                 return 0;
703         }
704         return (avail <= num_entries) ? avail : num_entries;
705 }
706
707 uint32_t
708 opdl_stage_claim(struct opdl_stage *s, void *entries,
709                 uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
710 {
711         if (s->threadsafe == false)
712                 return opdl_stage_claim_singlethread(s, entries, num_entries,
713                                 seq, block, atomic);
714         else
715                 return opdl_stage_claim_multithread(s, entries, num_entries,
716                                 seq, block);
717 }
718
719 uint32_t
720 opdl_stage_claim_copy(struct opdl_stage *s, void *entries,
721                 uint32_t num_entries, uint32_t *seq, bool block)
722 {
723         if (s->threadsafe == false)
724                 return opdl_stage_claim_copy_singlethread(s, entries,
725                                 num_entries, seq, block);
726         else
727                 return opdl_stage_claim_copy_multithread(s, entries,
728                                 num_entries, seq, block);
729 }
730
731 void
732 opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries,
733                 bool block)
734 {
735
736         if (s->threadsafe == false) {
737                 opdl_stage_disclaim_singlethread_n(s, s->num_claimed);
738         } else {
739                 struct claim_manager *disclaims =
740                         &s->pending_disclaims[rte_lcore_id()];
741
742                 if (unlikely(num_entries > s->num_slots)) {
743                         PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
744                                         num_entries, disclaims->num_claimed);
745                         num_entries = disclaims->num_claimed;
746                 }
747
748                 num_entries = RTE_MIN(num_entries + disclaims->num_to_disclaim,
749                                 disclaims->num_claimed);
750                 opdl_stage_disclaim_multithread_n(s, num_entries, block);
751         }
752 }
753
754 int
755 opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, bool block)
756 {
757         if (num_entries != s->num_event) {
758                 rte_errno = -EINVAL;
759                 return 0;
760         }
761         if (s->threadsafe == false) {
762                 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
763                 s->seq += s->num_claimed;
764                 s->shadow_head = s->head;
765                 s->num_claimed = 0;
766         } else {
767                 struct claim_manager *disclaims =
768                                 &s->pending_disclaims[rte_lcore_id()];
769                 opdl_stage_disclaim_multithread_n(s, disclaims->num_claimed,
770                                 block);
771         }
772         return num_entries;
773 }
774
775 uint32_t
776 opdl_ring_available(struct opdl_ring *t)
777 {
778         return opdl_stage_available(&t->stages[0]);
779 }
780
781 uint32_t
782 opdl_stage_available(struct opdl_stage *s)
783 {
784         update_available_seq(s);
785         return available(s);
786 }
787
788 void
789 opdl_ring_flush(struct opdl_ring *t)
790 {
791         struct opdl_stage *s = input_stage(t);
792
793         wait_for_available(s, s->num_slots);
794 }
795
796 /******************** Non performance sensitive functions ********************/
797
798 /* Initial setup of a new stage's context */
799 static int
800 init_stage(struct opdl_ring *t, struct opdl_stage *s, bool threadsafe,
801                 bool is_input)
802 {
803         uint32_t available = (is_input) ? t->num_slots : 0;
804
805         s->t = t;
806         s->num_slots = t->num_slots;
807         s->index = t->num_stages;
808         s->threadsafe = threadsafe;
809         s->shared.stage = s;
810
811         /* Alloc memory for deps */
812         s->dep_tracking = rte_zmalloc_socket(LIB_NAME,
813                         t->max_num_stages * sizeof(enum dep_type),
814                         0, t->socket);
815         if (s->dep_tracking == NULL)
816                 return -ENOMEM;
817
818         s->deps = rte_zmalloc_socket(LIB_NAME,
819                         t->max_num_stages * sizeof(struct shared_state *),
820                         0, t->socket);
821         if (s->deps == NULL) {
822                 rte_free(s->dep_tracking);
823                 return -ENOMEM;
824         }
825
826         s->dep_tracking[s->index] = DEP_SELF;
827
828         if (threadsafe == true)
829                 s->shared.available_seq = available;
830         else
831                 s->available_seq = available;
832
833         return 0;
834 }
835
836 /* Add direct or indirect dependencies between stages */
837 static int
838 add_dep(struct opdl_stage *dependent, const struct opdl_stage *dependency,
839                 enum dep_type type)
840 {
841         struct opdl_ring *t = dependent->t;
842         uint32_t i;
843
844         /* Add new direct dependency */
845         if ((type == DEP_DIRECT) &&
846                         (dependent->dep_tracking[dependency->index] ==
847                                         DEP_NONE)) {
848                 PMD_DRV_LOG(DEBUG, "%s:%u direct dependency on %u",
849                                 t->name, dependent->index, dependency->index);
850                 dependent->dep_tracking[dependency->index] = DEP_DIRECT;
851         }
852
853         /* Add new indirect dependency or change direct to indirect */
854         if ((type == DEP_INDIRECT) &&
855                         ((dependent->dep_tracking[dependency->index] ==
856                         DEP_NONE) ||
857                         (dependent->dep_tracking[dependency->index] ==
858                         DEP_DIRECT))) {
859                 PMD_DRV_LOG(DEBUG, "%s:%u indirect dependency on %u",
860                                 t->name, dependent->index, dependency->index);
861                 dependent->dep_tracking[dependency->index] = DEP_INDIRECT;
862         }
863
864         /* Shouldn't happen... */
865         if ((dependent->dep_tracking[dependency->index] == DEP_SELF) &&
866                         (dependent != input_stage(t))) {
867                 PMD_DRV_LOG(ERR, "Loop in dependency graph %s:%u",
868                                 t->name, dependent->index);
869                 return -EINVAL;
870         }
871
872         /* Keep going to dependencies of the dependency, until input stage */
873         if (dependency != input_stage(t))
874                 for (i = 0; i < dependency->num_deps; i++) {
875                         int ret = add_dep(dependent, dependency->deps[i]->stage,
876                                         DEP_INDIRECT);
877
878                         if (ret < 0)
879                                 return ret;
880                 }
881
882         /* Make list of sequence numbers for direct dependencies only */
883         if (type == DEP_DIRECT)
884                 for (i = 0, dependent->num_deps = 0; i < t->num_stages; i++)
885                         if (dependent->dep_tracking[i] == DEP_DIRECT) {
886                                 if ((i == 0) && (dependent->num_deps > 1))
887                                         rte_panic("%s:%u depends on > input",
888                                                         t->name,
889                                                         dependent->index);
890                                 dependent->deps[dependent->num_deps++] =
891                                                 &t->stages[i].shared;
892                         }
893
894         return 0;
895 }
896
897 struct opdl_ring *
898 opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size,
899                 uint32_t max_num_stages, int socket)
900 {
901         struct opdl_ring *t;
902         char mz_name[RTE_MEMZONE_NAMESIZE];
903         int mz_flags = 0;
904         struct opdl_stage *st = NULL;
905         const struct rte_memzone *mz = NULL;
906         size_t alloc_size = RTE_CACHE_LINE_ROUNDUP(sizeof(*t) +
907                         (num_slots * slot_size));
908
909         /* Compile time checking */
910         RTE_BUILD_BUG_ON((sizeof(struct shared_state) & RTE_CACHE_LINE_MASK) !=
911                         0);
912         RTE_BUILD_BUG_ON((offsetof(struct opdl_stage, shared) &
913                         RTE_CACHE_LINE_MASK) != 0);
914         RTE_BUILD_BUG_ON((offsetof(struct opdl_ring, slots) &
915                         RTE_CACHE_LINE_MASK) != 0);
916         RTE_BUILD_BUG_ON(!rte_is_power_of_2(OPDL_DISCLAIMS_PER_LCORE));
917
918         /* Parameter checking */
919         if (name == NULL) {
920                 PMD_DRV_LOG(ERR, "name param is NULL");
921                 return NULL;
922         }
923         if (!rte_is_power_of_2(num_slots)) {
924                 PMD_DRV_LOG(ERR, "num_slots (%u) for %s is not power of 2",
925                                 num_slots, name);
926                 return NULL;
927         }
928
929         /* Alloc memory for stages */
930         st = rte_zmalloc_socket(LIB_NAME,
931                 max_num_stages * sizeof(struct opdl_stage),
932                 RTE_CACHE_LINE_SIZE, socket);
933         if (st == NULL)
934                 goto exit_fail;
935
936         snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
937
938         /* Alloc memory for memzone */
939         mz = rte_memzone_reserve(mz_name, alloc_size, socket, mz_flags);
940         if (mz == NULL)
941                 goto exit_fail;
942
943         t = mz->addr;
944
945         /* Initialise opdl_ring queue */
946         memset(t, 0, sizeof(*t));
947         snprintf(t->name, sizeof(t->name), "%s", name);
948         t->socket = socket;
949         t->num_slots = num_slots;
950         t->mask = num_slots - 1;
951         t->slot_size = slot_size;
952         t->max_num_stages = max_num_stages;
953         t->stages = st;
954
955         PMD_DRV_LOG(DEBUG, "Created %s at %p (num_slots=%u,socket=%i,slot_size=%u)",
956                         t->name, t, num_slots, socket, slot_size);
957
958         return t;
959
960 exit_fail:
961         PMD_DRV_LOG(ERR, "Cannot reserve memory");
962         rte_free(st);
963         rte_memzone_free(mz);
964
965         return NULL;
966 }
967
968 void *
969 opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
970 {
971         return get_slot(t, index);
972 }
973
974 bool
975 opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
976                 uint32_t index, bool atomic)
977 {
978         uint32_t i = 0, offset;
979         struct opdl_ring *t = s->t;
980         struct rte_event *ev_orig = NULL;
981         bool ev_updated = false;
982         uint64_t ev_temp    = 0;
983         uint64_t ev_update  = 0;
984
985         uint32_t opa_id   = 0;
986         uint32_t flow_id  = 0;
987         uint64_t event    = 0;
988
989         if (index > s->num_event) {
990                 PMD_DRV_LOG(ERR, "index is overflow");
991                 return ev_updated;
992         }
993
994         ev_temp = ev->event & OPDL_EVENT_MASK;
995
996         if (!atomic) {
997                 offset = opdl_first_entry_id(s->seq, s->nb_instance,
998                                 s->instance_id);
999                 offset += index*s->nb_instance;
1000                 ev_orig = get_slot(t, s->shadow_head+offset);
1001                 if ((ev_orig->event&OPDL_EVENT_MASK) != ev_temp) {
1002                         ev_orig->event = ev->event;
1003                         ev_updated = true;
1004                 }
1005                 if (ev_orig->u64 != ev->u64) {
1006                         ev_orig->u64 = ev->u64;
1007                         ev_updated = true;
1008                 }
1009
1010         } else {
1011                 for (i = s->pos; i < s->num_claimed; i++) {
1012                         ev_orig = (struct rte_event *)
1013                                 get_slot(t, s->shadow_head+i);
1014
1015                         event  = __atomic_load_n(&(ev_orig->event),
1016                                         __ATOMIC_ACQUIRE);
1017
1018                         opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
1019                         flow_id  = OPDL_FLOWID_MASK & event;
1020
1021                         if (opa_id >= s->queue_id)
1022                                 continue;
1023
1024                         if ((flow_id % s->nb_instance) == s->instance_id) {
1025                                 ev_update = s->queue_id;
1026                                 ev_update = (ev_update << OPDL_OPA_OFFSET)
1027                                         | ev->event;
1028
1029                                 s->pos = i + 1;
1030
1031                                 if ((event & OPDL_EVENT_MASK) !=
1032                                                 ev_temp) {
1033                                         __atomic_store_n(&(ev_orig->event),
1034                                                         ev_update,
1035                                                         __ATOMIC_RELEASE);
1036                                         ev_updated = true;
1037                                 }
1038                                 if (ev_orig->u64 != ev->u64) {
1039                                         ev_orig->u64 = ev->u64;
1040                                         ev_updated = true;
1041                                 }
1042
1043                                 break;
1044                         }
1045                 }
1046
1047         }
1048
1049         return ev_updated;
1050 }
1051
1052 int
1053 opdl_ring_get_socket(const struct opdl_ring *t)
1054 {
1055         return t->socket;
1056 }
1057
1058 uint32_t
1059 opdl_ring_get_num_slots(const struct opdl_ring *t)
1060 {
1061         return t->num_slots;
1062 }
1063
1064 const char *
1065 opdl_ring_get_name(const struct opdl_ring *t)
1066 {
1067         return t->name;
1068 }
1069
1070 /* Check dependency list is valid for a given opdl_ring */
1071 static int
1072 check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
1073                 uint32_t num_deps)
1074 {
1075         unsigned int i;
1076
1077         for (i = 0; i < num_deps; ++i) {
1078                 if (!deps[i]) {
1079                         PMD_DRV_LOG(ERR, "deps[%u] is NULL", i);
1080                         return -EINVAL;
1081                 }
1082                 if (t != deps[i]->t) {
1083                         PMD_DRV_LOG(ERR, "deps[%u] is in opdl_ring %s, not %s",
1084                                         i, deps[i]->t->name, t->name);
1085                         return -EINVAL;
1086                 }
1087         }
1088
1089         return 0;
1090 }
1091
1092 struct opdl_stage *
1093 opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input)
1094 {
1095         struct opdl_stage *s;
1096
1097         /* Parameter checking */
1098         if (!t) {
1099                 PMD_DRV_LOG(ERR, "opdl_ring is NULL");
1100                 return NULL;
1101         }
1102         if (t->num_stages == t->max_num_stages) {
1103                 PMD_DRV_LOG(ERR, "%s has max number of stages (%u)",
1104                                 t->name, t->max_num_stages);
1105                 return NULL;
1106         }
1107
1108         s = &t->stages[t->num_stages];
1109
1110         if (((uintptr_t)&s->shared & RTE_CACHE_LINE_MASK) != 0)
1111                 PMD_DRV_LOG(WARNING, "Tail seq num (%p) of %s stage not cache aligned",
1112                                 &s->shared, t->name);
1113
1114         if (init_stage(t, s, threadsafe, is_input) < 0) {
1115                 PMD_DRV_LOG(ERR, "Cannot reserve memory");
1116                 return NULL;
1117         }
1118         t->num_stages++;
1119
1120         return s;
1121 }
1122
1123 uint32_t
1124 opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s,
1125                 uint32_t nb_instance, uint32_t instance_id,
1126                 struct opdl_stage *deps[],
1127                 uint32_t num_deps)
1128 {
1129         uint32_t i;
1130         int ret = 0;
1131
1132         if ((num_deps > 0) && (!deps)) {
1133                 PMD_DRV_LOG(ERR, "%s stage has NULL dependencies", t->name);
1134                 return -1;
1135         }
1136         ret = check_deps(t, deps, num_deps);
1137         if (ret < 0)
1138                 return ret;
1139
1140         for (i = 0; i < num_deps; i++) {
1141                 ret = add_dep(s, deps[i], DEP_DIRECT);
1142                 if (ret < 0)
1143                         return ret;
1144         }
1145
1146         s->nb_instance = nb_instance;
1147         s->instance_id = instance_id;
1148
1149         return ret;
1150 }
1151
1152 struct opdl_stage *
1153 opdl_ring_get_input_stage(const struct opdl_ring *t)
1154 {
1155         return input_stage(t);
1156 }
1157
1158 int
1159 opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[],
1160                 uint32_t num_deps)
1161 {
1162         unsigned int i;
1163         int ret;
1164
1165         if ((num_deps == 0) || (!deps)) {
1166                 PMD_DRV_LOG(ERR, "cannot set NULL dependencies");
1167                 return -EINVAL;
1168         }
1169
1170         ret = check_deps(s->t, deps, num_deps);
1171         if (ret < 0)
1172                 return ret;
1173
1174         /* Update deps */
1175         for (i = 0; i < num_deps; i++)
1176                 s->deps[i] = &deps[i]->shared;
1177         s->num_deps = num_deps;
1178
1179         return 0;
1180 }
1181
1182 struct opdl_ring *
1183 opdl_stage_get_opdl_ring(const struct opdl_stage *s)
1184 {
1185         return s->t;
1186 }
1187
1188 void
1189 opdl_stage_set_queue_id(struct opdl_stage *s,
1190                 uint32_t queue_id)
1191 {
1192         s->queue_id = queue_id;
1193 }
1194
1195 void
1196 opdl_ring_dump(const struct opdl_ring *t, FILE *f)
1197 {
1198         uint32_t i;
1199
1200         if (t == NULL) {
1201                 fprintf(f, "NULL OPDL!\n");
1202                 return;
1203         }
1204         fprintf(f, "OPDL \"%s\": num_slots=%u; mask=%#x; slot_size=%u; num_stages=%u; socket=%i\n",
1205                         t->name, t->num_slots, t->mask, t->slot_size,
1206                         t->num_stages, t->socket);
1207         for (i = 0; i < t->num_stages; i++) {
1208                 uint32_t j;
1209                 const struct opdl_stage *s = &t->stages[i];
1210
1211                 fprintf(f, "  %s[%u]: threadsafe=%s; head=%u; available_seq=%u; tail=%u; deps=%u",
1212                                 t->name, i, (s->threadsafe) ? "true" : "false",
1213                                 (s->threadsafe) ? s->shared.head : s->head,
1214                                 (s->threadsafe) ? s->shared.available_seq :
1215                                 s->available_seq,
1216                                 s->shared.tail, (s->num_deps > 0) ?
1217                                 s->deps[0]->stage->index : 0);
1218                 for (j = 1; j < s->num_deps; j++)
1219                         fprintf(f, ",%u", s->deps[j]->stage->index);
1220                 fprintf(f, "\n");
1221         }
1222         fflush(f);
1223 }
1224
1225 void
1226 opdl_ring_free(struct opdl_ring *t)
1227 {
1228         uint32_t i;
1229         const struct rte_memzone *mz;
1230         char mz_name[RTE_MEMZONE_NAMESIZE];
1231
1232         if (t == NULL) {
1233                 PMD_DRV_LOG(DEBUG, "Freeing NULL OPDL Ring!");
1234                 return;
1235         }
1236
1237         PMD_DRV_LOG(DEBUG, "Freeing %s opdl_ring at %p", t->name, t);
1238
1239         for (i = 0; i < t->num_stages; ++i) {
1240                 rte_free(t->stages[i].deps);
1241                 rte_free(t->stages[i].dep_tracking);
1242         }
1243
1244         rte_free(t->stages);
1245
1246         snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, t->name);
1247         mz = rte_memzone_lookup(mz_name);
1248         if (rte_memzone_free(mz) != 0)
1249                 PMD_DRV_LOG(ERR, "Cannot free memzone for %s", t->name);
1250 }
1251
1252 /* search a opdl_ring from its name */
1253 struct opdl_ring *
1254 opdl_ring_lookup(const char *name)
1255 {
1256         const struct rte_memzone *mz;
1257         char mz_name[RTE_MEMZONE_NAMESIZE];
1258
1259         snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
1260
1261         mz = rte_memzone_lookup(mz_name);
1262         if (mz == NULL)
1263                 return NULL;
1264
1265         return mz->addr;
1266 }
1267
1268 void
1269 opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe)
1270 {
1271         s->threadsafe = threadsafe;
1272 }