New upstream version 18.08
[deb_dpdk.git] / examples / performance-thread / common / lthread_queue.h
1 /*
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright 2015 Intel Corporation.
4  * Copyright 2010-2011 Dmitry Vyukov
5  */
6
7 #ifndef LTHREAD_QUEUE_H_
8 #define LTHREAD_QUEUE_H_
9
10 #ifdef __cplusplus
11 extern "C" {
12 #endif
13
14 #include <string.h>
15
16 #include <rte_prefetch.h>
17 #include <rte_per_lcore.h>
18
19 #include "lthread_int.h"
20 #include "lthread.h"
21 #include "lthread_diag.h"
22 #include "lthread_pool.h"
23
24 struct lthread_queue;
25
26 /*
27  * This file implements an unbounded FIFO queue based on a lock free
28  * linked list.
29  *
30  * The queue is non-intrusive in that it uses intermediate nodes, and does
31  * not require these nodes to be inserted into the object being placed
32  * in the queue.
33  *
34  * This is slightly more efficient than the very similar queue in lthread_pool
35  * in that it does not have to swing a stub node as the queue becomes empty.
36  *
37  * The queue access functions allocate and free intermediate node
38  * transparently from/to a per scheduler pool ( see lthread_pool.h ).
39  *
40  * The queue provides both MPSC and SPSC insert methods
41  */
42
43 /*
44  * define a queue of lthread nodes
45  */
46 struct lthread_queue {
47         struct qnode *head;
48         struct qnode *tail __rte_cache_aligned;
49         struct lthread_queue *p;
50         char name[LT_MAX_NAME_SIZE];
51
52         DIAG_COUNT_DEFINE(rd);
53         DIAG_COUNT_DEFINE(wr);
54         DIAG_COUNT_DEFINE(size);
55
56 } __rte_cache_aligned;
57
58
59
60 static inline struct lthread_queue *
61 _lthread_queue_create(const char *name)
62 {
63         struct qnode *stub;
64         struct lthread_queue *new_queue;
65
66         new_queue = rte_malloc_socket(NULL, sizeof(struct lthread_queue),
67                                         RTE_CACHE_LINE_SIZE,
68                                         rte_socket_id());
69         if (new_queue == NULL)
70                 return NULL;
71
72         /* allocated stub node */
73         stub = _qnode_alloc();
74         RTE_ASSERT(stub);
75
76         if (name != NULL)
77                 strncpy(new_queue->name, name, sizeof(new_queue->name));
78         new_queue->name[sizeof(new_queue->name)-1] = 0;
79
80         /* initialize queue as empty */
81         stub->next = NULL;
82         new_queue->head = stub;
83         new_queue->tail = stub;
84
85         DIAG_COUNT_INIT(new_queue, rd);
86         DIAG_COUNT_INIT(new_queue, wr);
87         DIAG_COUNT_INIT(new_queue, size);
88
89         return new_queue;
90 }
91
92 /**
93  * Return true if the queue is empty
94  */
95 static __rte_always_inline int
96 _lthread_queue_empty(struct lthread_queue *q)
97 {
98         return q->tail == q->head;
99 }
100
101
102
103 /**
104  * Destroy a queue
105  * fail if queue is not empty
106  */
107 static inline int _lthread_queue_destroy(struct lthread_queue *q)
108 {
109         if (q == NULL)
110                 return -1;
111
112         if (!_lthread_queue_empty(q))
113                 return -1;
114
115         _qnode_free(q->head);
116         rte_free(q);
117         return 0;
118 }
119
120 RTE_DECLARE_PER_LCORE(struct lthread_sched *, this_sched);
121
122 /*
123  * Insert a node into a queue
124  * this implementation is multi producer safe
125  */
126 static __rte_always_inline struct qnode *
127 _lthread_queue_insert_mp(struct lthread_queue
128                                                           *q, void *data)
129 {
130         struct qnode *prev;
131         struct qnode *n = _qnode_alloc();
132
133         if (n == NULL)
134                 return NULL;
135
136         /* set object in node */
137         n->data = data;
138         n->next = NULL;
139
140         /* this is an MPSC method, perform a locked update */
141         prev = n;
142         prev =
143             (struct qnode *)__sync_lock_test_and_set((uint64_t *) &(q)->head,
144                                                (uint64_t) prev);
145         /* there is a window of inconsistency until prev next is set,
146          * which is why remove must retry
147          */
148         prev->next = n;
149
150         DIAG_COUNT_INC(q, wr);
151         DIAG_COUNT_INC(q, size);
152
153         return n;
154 }
155
156 /*
157  * Insert an node into a queue in single producer mode
158  * this implementation is NOT mult producer safe
159  */
160 static __rte_always_inline struct qnode *
161 _lthread_queue_insert_sp(struct lthread_queue
162                                                           *q, void *data)
163 {
164         /* allocate a queue node */
165         struct qnode *prev;
166         struct qnode *n = _qnode_alloc();
167
168         if (n == NULL)
169                 return NULL;
170
171         /* set data in node */
172         n->data = data;
173         n->next = NULL;
174
175         /* this is an SPSC method, no need for locked exchange operation */
176         prev = q->head;
177         prev->next = q->head = n;
178
179         DIAG_COUNT_INC(q, wr);
180         DIAG_COUNT_INC(q, size);
181
182         return n;
183 }
184
185 /*
186  * Remove a node from a queue
187  */
188 static __rte_always_inline void *
189 _lthread_queue_poll(struct lthread_queue *q)
190 {
191         void *data = NULL;
192         struct qnode *tail = q->tail;
193         struct qnode *next = (struct qnode *)tail->next;
194         /*
195          * There is a small window of inconsistency between producer and
196          * consumer whereby the queue may appear empty if consumer and
197          * producer access it at the same time.
198          * The consumer must handle this by retrying
199          */
200
201         if (likely(next != NULL)) {
202                 q->tail = next;
203                 tail->data = next->data;
204                 data = tail->data;
205
206                 /* free the node */
207                 _qnode_free(tail);
208
209                 DIAG_COUNT_INC(q, rd);
210                 DIAG_COUNT_DEC(q, size);
211                 return data;
212         }
213         return NULL;
214 }
215
216 /*
217  * Remove a node from a queue
218  */
219 static __rte_always_inline void *
220 _lthread_queue_remove(struct lthread_queue *q)
221 {
222         void *data = NULL;
223
224         /*
225          * There is a small window of inconsistency between producer and
226          * consumer whereby the queue may appear empty if consumer and
227          * producer access it at the same time. We handle this by retrying
228          */
229         do {
230                 data = _lthread_queue_poll(q);
231
232                 if (likely(data != NULL)) {
233
234                         DIAG_COUNT_INC(q, rd);
235                         DIAG_COUNT_DEC(q, size);
236                         return data;
237                 }
238                 rte_compiler_barrier();
239         } while (unlikely(!_lthread_queue_empty(q)));
240         return NULL;
241 }
242
243 #ifdef __cplusplus
244 }
245 #endif
246
247 #endif                          /* LTHREAD_QUEUE_H_ */