New upstream version 18.02
[deb_dpdk.git] / lib / librte_ring / rte_ring_generic.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2017 Intel Corporation
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9
10 #ifndef _RTE_RING_GENERIC_H_
11 #define _RTE_RING_GENERIC_H_
12
13 static __rte_always_inline void
14 update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
15                 uint32_t single, uint32_t enqueue)
16 {
17         if (enqueue)
18                 rte_smp_wmb();
19         else
20                 rte_smp_rmb();
21         /*
22          * If there are other enqueues/dequeues in progress that preceded us,
23          * we need to wait for them to complete
24          */
25         if (!single)
26                 while (unlikely(ht->tail != old_val))
27                         rte_pause();
28
29         ht->tail = new_val;
30 }
31
32 /**
33  * @internal This function updates the producer head for enqueue
34  *
35  * @param r
36  *   A pointer to the ring structure
37  * @param is_sp
38  *   Indicates whether multi-producer path is needed or not
39  * @param n
40  *   The number of elements we will want to enqueue, i.e. how far should the
41  *   head be moved
42  * @param behavior
43  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
44  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
45  * @param old_head
46  *   Returns head value as it was before the move, i.e. where enqueue starts
47  * @param new_head
48  *   Returns the current/new head value i.e. where enqueue finishes
49  * @param free_entries
50  *   Returns the amount of free space in the ring BEFORE head was moved
51  * @return
52  *   Actual number of objects enqueued.
53  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
54  */
55 static __rte_always_inline unsigned int
56 __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
57                 unsigned int n, enum rte_ring_queue_behavior behavior,
58                 uint32_t *old_head, uint32_t *new_head,
59                 uint32_t *free_entries)
60 {
61         const uint32_t capacity = r->capacity;
62         unsigned int max = n;
63         int success;
64
65         do {
66                 /* Reset n to the initial burst count */
67                 n = max;
68
69                 *old_head = r->prod.head;
70
71                 /* add rmb barrier to avoid load/load reorder in weak
72                  * memory model. It is noop on x86
73                  */
74                 rte_smp_rmb();
75
76                 const uint32_t cons_tail = r->cons.tail;
77                 /*
78                  *  The subtraction is done between two unsigned 32bits value
79                  * (the result is always modulo 32 bits even if we have
80                  * *old_head > cons_tail). So 'free_entries' is always between 0
81                  * and capacity (which is < size).
82                  */
83                 *free_entries = (capacity + cons_tail - *old_head);
84
85                 /* check that we have enough room in ring */
86                 if (unlikely(n > *free_entries))
87                         n = (behavior == RTE_RING_QUEUE_FIXED) ?
88                                         0 : *free_entries;
89
90                 if (n == 0)
91                         return 0;
92
93                 *new_head = *old_head + n;
94                 if (is_sp)
95                         r->prod.head = *new_head, success = 1;
96                 else
97                         success = rte_atomic32_cmpset(&r->prod.head,
98                                         *old_head, *new_head);
99         } while (unlikely(success == 0));
100         return n;
101 }
102
103 /**
104  * @internal This function updates the consumer head for dequeue
105  *
106  * @param r
107  *   A pointer to the ring structure
108  * @param is_sc
109  *   Indicates whether multi-consumer path is needed or not
110  * @param n
111  *   The number of elements we will want to enqueue, i.e. how far should the
112  *   head be moved
113  * @param behavior
114  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
115  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
116  * @param old_head
117  *   Returns head value as it was before the move, i.e. where dequeue starts
118  * @param new_head
119  *   Returns the current/new head value i.e. where dequeue finishes
120  * @param entries
121  *   Returns the number of entries in the ring BEFORE head was moved
122  * @return
123  *   - Actual number of objects dequeued.
124  *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
125  */
126 static __rte_always_inline unsigned int
127 __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
128                 unsigned int n, enum rte_ring_queue_behavior behavior,
129                 uint32_t *old_head, uint32_t *new_head,
130                 uint32_t *entries)
131 {
132         unsigned int max = n;
133         int success;
134
135         /* move cons.head atomically */
136         do {
137                 /* Restore n as it may change every loop */
138                 n = max;
139
140                 *old_head = r->cons.head;
141
142                 /* add rmb barrier to avoid load/load reorder in weak
143                  * memory model. It is noop on x86
144                  */
145                 rte_smp_rmb();
146
147                 const uint32_t prod_tail = r->prod.tail;
148                 /* The subtraction is done between two unsigned 32bits value
149                  * (the result is always modulo 32 bits even if we have
150                  * cons_head > prod_tail). So 'entries' is always between 0
151                  * and size(ring)-1.
152                  */
153                 *entries = (prod_tail - *old_head);
154
155                 /* Set the actual entries for dequeue */
156                 if (n > *entries)
157                         n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
158
159                 if (unlikely(n == 0))
160                         return 0;
161
162                 *new_head = *old_head + n;
163                 if (is_sc)
164                         r->cons.head = *new_head, success = 1;
165                 else
166                         success = rte_atomic32_cmpset(&r->cons.head, *old_head,
167                                         *new_head);
168         } while (unlikely(success == 0));
169         return n;
170 }
171
172 #endif /* _RTE_RING_GENERIC_H_ */