New upstream version 18.08
[deb_dpdk.git] / examples / performance-thread / common / lthread_pool.h
1 /*
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright 2015 Intel Corporation.
4  * Copyright 2010-2011 Dmitry Vyukov
5  */
6
7 #ifndef LTHREAD_POOL_H_
8 #define LTHREAD_POOL_H_
9
10 #ifdef __cplusplus
11 extern "C" {
12 #endif
13
14 #include <rte_malloc.h>
15 #include <rte_per_lcore.h>
16 #include <rte_log.h>
17
18 #include "lthread_int.h"
19 #include "lthread_diag.h"
20
21 /*
22  * This file implements pool of queue nodes used by the queue implemented
23  * in lthread_queue.h.
24  *
25  * The pool is an intrusive lock free MPSC queue.
26  *
27  * The pool is created empty and populated lazily, i.e. on first attempt to
28  * allocate a the pool.
29  *
30  * Whenever the pool is empty more nodes are added to the pool
31  * The number of nodes preallocated in this way is a parameter of
32  * _qnode_pool_create. Freeing an object returns it to the pool.
33  *
34  * Each lthread scheduler maintains its own pool of nodes. L-threads must always
35  * allocate from this local pool ( because it is a single consumer queue ).
36  * L-threads can free nodes to any pool (because it is a multi producer queue)
37  * This enables threads that have affined to a different scheduler to free
38  * nodes safely.
39  */
40
41 struct qnode;
42 struct qnode_cache;
43
44 /*
45  * define intermediate node
46  */
47 struct qnode {
48         struct qnode *next;
49         void *data;
50         struct qnode_pool *pool;
51 } __rte_cache_aligned;
52
53 /*
54  * a pool structure
55  */
56 struct qnode_pool {
57         struct qnode *head;
58         struct qnode *stub;
59         struct qnode *fast_alloc;
60         struct qnode *tail __rte_cache_aligned;
61         int pre_alloc;
62         char name[LT_MAX_NAME_SIZE];
63
64         DIAG_COUNT_DEFINE(rd);
65         DIAG_COUNT_DEFINE(wr);
66         DIAG_COUNT_DEFINE(available);
67         DIAG_COUNT_DEFINE(prealloc);
68         DIAG_COUNT_DEFINE(capacity);
69 } __rte_cache_aligned;
70
71 /*
72  * Create a pool of qnodes
73  */
74
75 static inline struct qnode_pool *
76 _qnode_pool_create(const char *name, int prealloc_size) {
77
78         struct qnode_pool *p = rte_malloc_socket(NULL,
79                                         sizeof(struct qnode_pool),
80                                         RTE_CACHE_LINE_SIZE,
81                                         rte_socket_id());
82
83         RTE_ASSERT(p);
84
85         p->stub = rte_malloc_socket(NULL,
86                                 sizeof(struct qnode),
87                                 RTE_CACHE_LINE_SIZE,
88                                 rte_socket_id());
89
90         RTE_ASSERT(p->stub);
91
92         if (name != NULL)
93                 strncpy(p->name, name, LT_MAX_NAME_SIZE);
94         p->name[sizeof(p->name)-1] = 0;
95
96         p->stub->pool = p;
97         p->stub->next = NULL;
98         p->tail = p->stub;
99         p->head = p->stub;
100         p->pre_alloc = prealloc_size;
101
102         DIAG_COUNT_INIT(p, rd);
103         DIAG_COUNT_INIT(p, wr);
104         DIAG_COUNT_INIT(p, available);
105         DIAG_COUNT_INIT(p, prealloc);
106         DIAG_COUNT_INIT(p, capacity);
107
108         return p;
109 }
110
111
112 /*
113  * Insert a node into the pool
114  */
115 static __rte_always_inline void
116 _qnode_pool_insert(struct qnode_pool *p, struct qnode *n)
117 {
118         n->next = NULL;
119         struct qnode *prev = n;
120         /* We insert at the head */
121         prev = (struct qnode *) __sync_lock_test_and_set((uint64_t *)&p->head,
122                                                 (uint64_t) prev);
123         /* there is a window of inconsistency until prev next is set */
124         /* which is why remove must retry */
125         prev->next = (n);
126 }
127
128 /*
129  * Remove a node from the pool
130  *
131  * There is a race with _qnode_pool_insert() whereby the queue could appear
132  * empty during a concurrent insert, this is handled by retrying
133  *
134  * The queue uses a stub node, which must be swung as the queue becomes
135  * empty, this requires an insert of the stub, which means that removing the
136  * last item from the queue incurs the penalty of an atomic exchange. Since the
137  * pool is maintained with a bulk pre-allocation the cost of this is amortised.
138  */
139 static __rte_always_inline struct qnode *
140 _pool_remove(struct qnode_pool *p)
141 {
142         struct qnode *head;
143         struct qnode *tail = p->tail;
144         struct qnode *next = tail->next;
145
146         /* we remove from the tail */
147         if (tail == p->stub) {
148                 if (next == NULL)
149                         return NULL;
150                 /* advance the tail */
151                 p->tail = next;
152                 tail = next;
153                 next = next->next;
154         }
155         if (likely(next != NULL)) {
156                 p->tail = next;
157                 return tail;
158         }
159
160         head = p->head;
161         if (tail == head)
162                 return NULL;
163
164         /* swing stub node */
165         _qnode_pool_insert(p, p->stub);
166
167         next = tail->next;
168         if (next) {
169                 p->tail = next;
170                 return tail;
171         }
172         return NULL;
173 }
174
175
176 /*
177  * This adds a retry to the _pool_remove function
178  * defined above
179  */
180 static __rte_always_inline struct qnode *
181 _qnode_pool_remove(struct qnode_pool *p)
182 {
183         struct qnode *n;
184
185         do {
186                 n = _pool_remove(p);
187                 if (likely(n != NULL))
188                         return n;
189
190                 rte_compiler_barrier();
191         }  while ((p->head != p->tail) &&
192                         (p->tail != p->stub));
193         return NULL;
194 }
195
196 /*
197  * Allocate a node from the pool
198  * If the pool is empty add mode nodes
199  */
200 static __rte_always_inline struct qnode *
201 _qnode_alloc(void)
202 {
203         struct qnode_pool *p = (THIS_SCHED)->qnode_pool;
204         int prealloc_size = p->pre_alloc;
205         struct qnode *n;
206         int i;
207
208         if (likely(p->fast_alloc != NULL)) {
209                 n = p->fast_alloc;
210                 p->fast_alloc = NULL;
211                 return n;
212         }
213
214         n = _qnode_pool_remove(p);
215
216         if (unlikely(n == NULL)) {
217                 DIAG_COUNT_INC(p, prealloc);
218                 for (i = 0; i < prealloc_size; i++) {
219                         n = rte_malloc_socket(NULL,
220                                         sizeof(struct qnode),
221                                         RTE_CACHE_LINE_SIZE,
222                                         rte_socket_id());
223                         if (n == NULL)
224                                 return NULL;
225
226                         DIAG_COUNT_INC(p, available);
227                         DIAG_COUNT_INC(p, capacity);
228
229                         n->pool = p;
230                         _qnode_pool_insert(p, n);
231                 }
232                 n = _qnode_pool_remove(p);
233         }
234         n->pool = p;
235         DIAG_COUNT_INC(p, rd);
236         DIAG_COUNT_DEC(p, available);
237         return n;
238 }
239
240
241
242 /*
243 * free a queue node to the per scheduler pool from which it came
244 */
245 static __rte_always_inline void
246 _qnode_free(struct qnode *n)
247 {
248         struct qnode_pool *p = n->pool;
249
250
251         if (unlikely(p->fast_alloc != NULL) ||
252                         unlikely(n->pool != (THIS_SCHED)->qnode_pool)) {
253                 DIAG_COUNT_INC(p, wr);
254                 DIAG_COUNT_INC(p, available);
255                 _qnode_pool_insert(p, n);
256                 return;
257         }
258         p->fast_alloc = n;
259 }
260
261 /*
262  * Destroy an qnode pool
263  * queue must be empty when this is called
264  */
265 static inline int
266 _qnode_pool_destroy(struct qnode_pool *p)
267 {
268         rte_free(p->stub);
269         rte_free(p);
270         return 0;
271 }
272
273 #ifdef __cplusplus
274 }
275 #endif
276
277 #endif                          /* LTHREAD_POOL_H_ */