New upstream version 18.08
[deb_dpdk.git] / lib / librte_ring / rte_ring_generic.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2017 Intel Corporation
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9
10 #ifndef _RTE_RING_GENERIC_H_
11 #define _RTE_RING_GENERIC_H_
12
13 static __rte_always_inline void
14 update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
15                 uint32_t single, uint32_t enqueue)
16 {
17         if (enqueue)
18                 rte_smp_wmb();
19         else
20                 rte_smp_rmb();
21         /*
22          * If there are other enqueues/dequeues in progress that preceded us,
23          * we need to wait for them to complete
24          */
25         if (!single)
26                 while (unlikely(ht->tail != old_val))
27                         rte_pause();
28
29         ht->tail = new_val;
30 }
31
32 /**
33  * @internal This function updates the producer head for enqueue
34  *
35  * @param r
36  *   A pointer to the ring structure
37  * @param is_sp
38  *   Indicates whether multi-producer path is needed or not
39  * @param n
40  *   The number of elements we will want to enqueue, i.e. how far should the
41  *   head be moved
42  * @param behavior
43  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
44  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
45  * @param old_head
46  *   Returns head value as it was before the move, i.e. where enqueue starts
47  * @param new_head
48  *   Returns the current/new head value i.e. where enqueue finishes
49  * @param free_entries
50  *   Returns the amount of free space in the ring BEFORE head was moved
51  * @return
52  *   Actual number of objects enqueued.
53  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
54  */
55 static __rte_always_inline unsigned int
56 __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
57                 unsigned int n, enum rte_ring_queue_behavior behavior,
58                 uint32_t *old_head, uint32_t *new_head,
59                 uint32_t *free_entries)
60 {
61         const uint32_t capacity = r->capacity;
62         unsigned int max = n;
63         int success;
64
65         do {
66                 /* Reset n to the initial burst count */
67                 n = max;
68
69                 *old_head = r->prod.head;
70
71                 /* add rmb barrier to avoid load/load reorder in weak
72                  * memory model. It is noop on x86
73                  */
74                 rte_smp_rmb();
75
76                 /*
77                  *  The subtraction is done between two unsigned 32bits value
78                  * (the result is always modulo 32 bits even if we have
79                  * *old_head > cons_tail). So 'free_entries' is always between 0
80                  * and capacity (which is < size).
81                  */
82                 *free_entries = (capacity + r->cons.tail - *old_head);
83
84                 /* check that we have enough room in ring */
85                 if (unlikely(n > *free_entries))
86                         n = (behavior == RTE_RING_QUEUE_FIXED) ?
87                                         0 : *free_entries;
88
89                 if (n == 0)
90                         return 0;
91
92                 *new_head = *old_head + n;
93                 if (is_sp)
94                         r->prod.head = *new_head, success = 1;
95                 else
96                         success = rte_atomic32_cmpset(&r->prod.head,
97                                         *old_head, *new_head);
98         } while (unlikely(success == 0));
99         return n;
100 }
101
102 /**
103  * @internal This function updates the consumer head for dequeue
104  *
105  * @param r
106  *   A pointer to the ring structure
107  * @param is_sc
108  *   Indicates whether multi-consumer path is needed or not
109  * @param n
110  *   The number of elements we will want to enqueue, i.e. how far should the
111  *   head be moved
112  * @param behavior
113  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
114  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
115  * @param old_head
116  *   Returns head value as it was before the move, i.e. where dequeue starts
117  * @param new_head
118  *   Returns the current/new head value i.e. where dequeue finishes
119  * @param entries
120  *   Returns the number of entries in the ring BEFORE head was moved
121  * @return
122  *   - Actual number of objects dequeued.
123  *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
124  */
125 static __rte_always_inline unsigned int
126 __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
127                 unsigned int n, enum rte_ring_queue_behavior behavior,
128                 uint32_t *old_head, uint32_t *new_head,
129                 uint32_t *entries)
130 {
131         unsigned int max = n;
132         int success;
133
134         /* move cons.head atomically */
135         do {
136                 /* Restore n as it may change every loop */
137                 n = max;
138
139                 *old_head = r->cons.head;
140
141                 /* add rmb barrier to avoid load/load reorder in weak
142                  * memory model. It is noop on x86
143                  */
144                 rte_smp_rmb();
145
146                 /* The subtraction is done between two unsigned 32bits value
147                  * (the result is always modulo 32 bits even if we have
148                  * cons_head > prod_tail). So 'entries' is always between 0
149                  * and size(ring)-1.
150                  */
151                 *entries = (r->prod.tail - *old_head);
152
153                 /* Set the actual entries for dequeue */
154                 if (n > *entries)
155                         n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
156
157                 if (unlikely(n == 0))
158                         return 0;
159
160                 *new_head = *old_head + n;
161                 if (is_sc)
162                         r->cons.head = *new_head, success = 1;
163                 else
164                         success = rte_atomic32_cmpset(&r->cons.head, *old_head,
165                                         *new_head);
166         } while (unlikely(success == 0));
167         return n;
168 }
169
170 #endif /* _RTE_RING_GENERIC_H_ */