New upstream version 18.11-rc2
[deb_dpdk.git] / lib / librte_ring / rte_ring_c11_mem.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2017,2018 HXT-semitech Corporation.
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9
10 #ifndef _RTE_RING_C11_MEM_H_
11 #define _RTE_RING_C11_MEM_H_
12
13 static __rte_always_inline void
14 update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
15                 uint32_t single, uint32_t enqueue)
16 {
17         RTE_SET_USED(enqueue);
18
19         /*
20          * If there are other enqueues/dequeues in progress that preceded us,
21          * we need to wait for them to complete
22          */
23         if (!single)
24                 while (unlikely(ht->tail != old_val))
25                         rte_pause();
26
27         __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
28 }
29
30 /**
31  * @internal This function updates the producer head for enqueue
32  *
33  * @param r
34  *   A pointer to the ring structure
35  * @param is_sp
36  *   Indicates whether multi-producer path is needed or not
37  * @param n
38  *   The number of elements we will want to enqueue, i.e. how far should the
39  *   head be moved
40  * @param behavior
41  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
42  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
43  * @param old_head
44  *   Returns head value as it was before the move, i.e. where enqueue starts
45  * @param new_head
46  *   Returns the current/new head value i.e. where enqueue finishes
47  * @param free_entries
48  *   Returns the amount of free space in the ring BEFORE head was moved
49  * @return
50  *   Actual number of objects enqueued.
51  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
52  */
53 static __rte_always_inline unsigned int
54 __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
55                 unsigned int n, enum rte_ring_queue_behavior behavior,
56                 uint32_t *old_head, uint32_t *new_head,
57                 uint32_t *free_entries)
58 {
59         const uint32_t capacity = r->capacity;
60         uint32_t cons_tail;
61         unsigned int max = n;
62         int success;
63
64         *old_head = __atomic_load_n(&r->prod.head, __ATOMIC_ACQUIRE);
65         do {
66                 /* Reset n to the initial burst count */
67                 n = max;
68
69                 /* load-acquire synchronize with store-release of ht->tail
70                  * in update_tail.
71                  */
72                 cons_tail = __atomic_load_n(&r->cons.tail,
73                                         __ATOMIC_ACQUIRE);
74
75                 /* The subtraction is done between two unsigned 32bits value
76                  * (the result is always modulo 32 bits even if we have
77                  * *old_head > cons_tail). So 'free_entries' is always between 0
78                  * and capacity (which is < size).
79                  */
80                 *free_entries = (capacity + cons_tail - *old_head);
81
82                 /* check that we have enough room in ring */
83                 if (unlikely(n > *free_entries))
84                         n = (behavior == RTE_RING_QUEUE_FIXED) ?
85                                         0 : *free_entries;
86
87                 if (n == 0)
88                         return 0;
89
90                 *new_head = *old_head + n;
91                 if (is_sp)
92                         r->prod.head = *new_head, success = 1;
93                 else
94                         /* on failure, *old_head is updated */
95                         success = __atomic_compare_exchange_n(&r->prod.head,
96                                         old_head, *new_head,
97                                         0, __ATOMIC_ACQUIRE,
98                                         __ATOMIC_RELAXED);
99         } while (unlikely(success == 0));
100         return n;
101 }
102
103 /**
104  * @internal This function updates the consumer head for dequeue
105  *
106  * @param r
107  *   A pointer to the ring structure
108  * @param is_sc
109  *   Indicates whether multi-consumer path is needed or not
110  * @param n
111  *   The number of elements we will want to enqueue, i.e. how far should the
112  *   head be moved
113  * @param behavior
114  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
115  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
116  * @param old_head
117  *   Returns head value as it was before the move, i.e. where dequeue starts
118  * @param new_head
119  *   Returns the current/new head value i.e. where dequeue finishes
120  * @param entries
121  *   Returns the number of entries in the ring BEFORE head was moved
122  * @return
123  *   - Actual number of objects dequeued.
124  *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
125  */
126 static __rte_always_inline unsigned int
127 __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
128                 unsigned int n, enum rte_ring_queue_behavior behavior,
129                 uint32_t *old_head, uint32_t *new_head,
130                 uint32_t *entries)
131 {
132         unsigned int max = n;
133         uint32_t prod_tail;
134         int success;
135
136         /* move cons.head atomically */
137         *old_head = __atomic_load_n(&r->cons.head, __ATOMIC_ACQUIRE);
138         do {
139                 /* Restore n as it may change every loop */
140                 n = max;
141
142                 /* this load-acquire synchronize with store-release of ht->tail
143                  * in update_tail.
144                  */
145                 prod_tail = __atomic_load_n(&r->prod.tail,
146                                         __ATOMIC_ACQUIRE);
147
148                 /* The subtraction is done between two unsigned 32bits value
149                  * (the result is always modulo 32 bits even if we have
150                  * cons_head > prod_tail). So 'entries' is always between 0
151                  * and size(ring)-1.
152                  */
153                 *entries = (prod_tail - *old_head);
154
155                 /* Set the actual entries for dequeue */
156                 if (n > *entries)
157                         n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
158
159                 if (unlikely(n == 0))
160                         return 0;
161
162                 *new_head = *old_head + n;
163                 if (is_sc)
164                         r->cons.head = *new_head, success = 1;
165                 else
166                         /* on failure, *old_head will be updated */
167                         success = __atomic_compare_exchange_n(&r->cons.head,
168                                                         old_head, *new_head,
169                                                         0, __ATOMIC_ACQUIRE,
170                                                         __ATOMIC_RELAXED);
171         } while (unlikely(success == 0));
172         return n;
173 }
174
175 #endif /* _RTE_RING_C11_MEM_H_ */