api: refactor vlibmemory
[vpp.git] / src / svm / queue.c
1 /*
2  *------------------------------------------------------------------
3  * svm_queue.c - unidirectional shared-memory queues
4  *
5  * Copyright (c) 2009 Cisco and/or its affiliates.
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at:
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *------------------------------------------------------------------
18  */
19
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <pthread.h>
25 #include <vppinfra/mem.h>
26 #include <vppinfra/format.h>
27 #include <vppinfra/cache.h>
28 #include <svm/queue.h>
29 #include <signal.h>
30
31 /*
32  * svm_queue_init
33  *
34  * nels = number of elements on the queue
35  * elsize = element size, presumably 4 and cacheline-size will
36  *          be popular choices.
37  * pid   = consumer pid
38  *
39  * The idea is to call this function in the queue consumer,
40  * and e-mail the queue pointer to the producer(s).
41  *
42  * The vpp process / main thread allocates one of these
43  * at startup; its main input queue. The vpp main input queue
44  * has a pointer to it in the shared memory segment header.
45  *
46  * You probably want to be on an svm data heap before calling this
47  * function.
48  */
49 svm_queue_t *
50 svm_queue_init (int nels,
51                 int elsize, int consumer_pid, int signal_when_queue_non_empty)
52 {
53   svm_queue_t *q;
54   pthread_mutexattr_t attr;
55   pthread_condattr_t cattr;
56
57   q = clib_mem_alloc_aligned (sizeof (svm_queue_t)
58                               + nels * elsize, CLIB_CACHE_LINE_BYTES);
59   memset (q, 0, sizeof (*q));
60
61   q->elsize = elsize;
62   q->maxsize = nels;
63   q->consumer_pid = consumer_pid;
64   q->signal_when_queue_non_empty = signal_when_queue_non_empty;
65
66   memset (&attr, 0, sizeof (attr));
67   memset (&cattr, 0, sizeof (cattr));
68
69   if (pthread_mutexattr_init (&attr))
70     clib_unix_warning ("mutexattr_init");
71   if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
72     clib_unix_warning ("pthread_mutexattr_setpshared");
73   if (pthread_mutex_init (&q->mutex, &attr))
74     clib_unix_warning ("mutex_init");
75   if (pthread_mutexattr_destroy (&attr))
76     clib_unix_warning ("mutexattr_destroy");
77   if (pthread_condattr_init (&cattr))
78     clib_unix_warning ("condattr_init");
79   /* prints funny-looking messages in the Linux target */
80   if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
81     clib_unix_warning ("condattr_setpshared");
82   if (pthread_cond_init (&q->condvar, &cattr))
83     clib_unix_warning ("cond_init1");
84   if (pthread_condattr_destroy (&cattr))
85     clib_unix_warning ("cond_init2");
86
87   return (q);
88 }
89
90 /*
91  * svm_queue_free
92  */
93 void
94 svm_queue_free (svm_queue_t * q)
95 {
96   (void) pthread_mutex_destroy (&q->mutex);
97   (void) pthread_cond_destroy (&q->condvar);
98   clib_mem_free (q);
99 }
100
101 void
102 svm_queue_lock (svm_queue_t * q)
103 {
104   pthread_mutex_lock (&q->mutex);
105 }
106
107 void
108 svm_queue_unlock (svm_queue_t * q)
109 {
110   pthread_mutex_unlock (&q->mutex);
111 }
112
113 int
114 svm_queue_is_full (svm_queue_t * q)
115 {
116   return q->cursize == q->maxsize;
117 }
118
119 /*
120  * svm_queue_add_nolock
121  */
122 int
123 svm_queue_add_nolock (svm_queue_t * q, u8 * elem)
124 {
125   i8 *tailp;
126   int need_broadcast = 0;
127
128   if (PREDICT_FALSE (q->cursize == q->maxsize))
129     {
130       while (q->cursize == q->maxsize)
131         {
132           (void) pthread_cond_wait (&q->condvar, &q->mutex);
133         }
134     }
135
136   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
137   clib_memcpy (tailp, elem, q->elsize);
138
139   q->tail++;
140   q->cursize++;
141
142   need_broadcast = (q->cursize == 1);
143
144   if (q->tail == q->maxsize)
145     q->tail = 0;
146
147   if (need_broadcast)
148     {
149       (void) pthread_cond_broadcast (&q->condvar);
150       if (q->signal_when_queue_non_empty)
151         kill (q->consumer_pid, q->signal_when_queue_non_empty);
152     }
153   return 0;
154 }
155
156 int
157 svm_queue_add_raw (svm_queue_t * q, u8 * elem)
158 {
159   i8 *tailp;
160
161   if (PREDICT_FALSE (q->cursize == q->maxsize))
162     {
163       while (q->cursize == q->maxsize)
164         ;
165     }
166
167   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
168   clib_memcpy (tailp, elem, q->elsize);
169
170   q->tail++;
171   q->cursize++;
172
173   if (q->tail == q->maxsize)
174     q->tail = 0;
175   return 0;
176 }
177
178
179 /*
180  * svm_queue_add
181  */
182 int
183 svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
184 {
185   i8 *tailp;
186   int need_broadcast = 0;
187
188   if (nowait)
189     {
190       /* zero on success */
191       if (pthread_mutex_trylock (&q->mutex))
192         {
193           return (-1);
194         }
195     }
196   else
197     pthread_mutex_lock (&q->mutex);
198
199   if (PREDICT_FALSE (q->cursize == q->maxsize))
200     {
201       if (nowait)
202         {
203           pthread_mutex_unlock (&q->mutex);
204           return (-2);
205         }
206       while (q->cursize == q->maxsize)
207         {
208           (void) pthread_cond_wait (&q->condvar, &q->mutex);
209         }
210     }
211
212   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
213   clib_memcpy (tailp, elem, q->elsize);
214
215   q->tail++;
216   q->cursize++;
217
218   need_broadcast = (q->cursize == 1);
219
220   if (q->tail == q->maxsize)
221     q->tail = 0;
222
223   if (need_broadcast)
224     {
225       (void) pthread_cond_broadcast (&q->condvar);
226       if (q->signal_when_queue_non_empty)
227         kill (q->consumer_pid, q->signal_when_queue_non_empty);
228     }
229   pthread_mutex_unlock (&q->mutex);
230
231   return 0;
232 }
233
234 /*
235  * svm_queue_add2
236  */
237 int
238 svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
239 {
240   i8 *tailp;
241   int need_broadcast = 0;
242
243   if (nowait)
244     {
245       /* zero on success */
246       if (pthread_mutex_trylock (&q->mutex))
247         {
248           return (-1);
249         }
250     }
251   else
252     pthread_mutex_lock (&q->mutex);
253
254   if (PREDICT_FALSE (q->cursize + 1 == q->maxsize))
255     {
256       if (nowait)
257         {
258           pthread_mutex_unlock (&q->mutex);
259           return (-2);
260         }
261       while (q->cursize + 1 == q->maxsize)
262         {
263           (void) pthread_cond_wait (&q->condvar, &q->mutex);
264         }
265     }
266
267   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
268   clib_memcpy (tailp, elem, q->elsize);
269
270   q->tail++;
271   q->cursize++;
272
273   if (q->tail == q->maxsize)
274     q->tail = 0;
275
276   need_broadcast = (q->cursize == 1);
277
278   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
279   clib_memcpy (tailp, elem2, q->elsize);
280
281   q->tail++;
282   q->cursize++;
283
284   if (q->tail == q->maxsize)
285     q->tail = 0;
286
287   if (need_broadcast)
288     {
289       (void) pthread_cond_broadcast (&q->condvar);
290       if (q->signal_when_queue_non_empty)
291         kill (q->consumer_pid, q->signal_when_queue_non_empty);
292     }
293   pthread_mutex_unlock (&q->mutex);
294
295   return 0;
296 }
297
298 /*
299  * svm_queue_sub
300  */
301 int
302 svm_queue_sub (svm_queue_t * q, u8 * elem, int nowait)
303 {
304   i8 *headp;
305   int need_broadcast = 0;
306
307   if (nowait)
308     {
309       /* zero on success */
310       if (pthread_mutex_trylock (&q->mutex))
311         {
312           return (-1);
313         }
314     }
315   else
316     pthread_mutex_lock (&q->mutex);
317
318   if (PREDICT_FALSE (q->cursize == 0))
319     {
320       if (nowait)
321         {
322           pthread_mutex_unlock (&q->mutex);
323           return (-2);
324         }
325       while (q->cursize == 0)
326         {
327           (void) pthread_cond_wait (&q->condvar, &q->mutex);
328         }
329     }
330
331   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
332   clib_memcpy (elem, headp, q->elsize);
333
334   q->head++;
335   /* $$$$ JFC shouldn't this be == 0? */
336   if (q->cursize == q->maxsize)
337     need_broadcast = 1;
338
339   q->cursize--;
340
341   if (q->head == q->maxsize)
342     q->head = 0;
343
344   if (need_broadcast)
345     (void) pthread_cond_broadcast (&q->condvar);
346
347   pthread_mutex_unlock (&q->mutex);
348
349   return 0;
350 }
351
352 int
353 svm_queue_sub2 (svm_queue_t * q, u8 * elem)
354 {
355   int need_broadcast;
356   i8 *headp;
357
358   pthread_mutex_lock (&q->mutex);
359   if (q->cursize == 0)
360     {
361       pthread_mutex_unlock (&q->mutex);
362       return -1;
363     }
364
365   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
366   clib_memcpy (elem, headp, q->elsize);
367
368   q->head++;
369   need_broadcast = (q->cursize == q->maxsize / 2);
370   q->cursize--;
371
372   if (PREDICT_FALSE (q->head == q->maxsize))
373     q->head = 0;
374   pthread_mutex_unlock (&q->mutex);
375
376   if (need_broadcast)
377     (void) pthread_cond_broadcast (&q->condvar);
378
379   return 0;
380 }
381
382 int
383 svm_queue_sub_raw (svm_queue_t * q, u8 * elem)
384 {
385   i8 *headp;
386
387   if (PREDICT_FALSE (q->cursize == 0))
388     {
389       while (q->cursize == 0)
390         ;
391     }
392
393   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
394   clib_memcpy (elem, headp, q->elsize);
395
396   q->head++;
397   q->cursize--;
398
399   if (q->head == q->maxsize)
400     q->head = 0;
401   return 0;
402 }
403
404 /*
405  * fd.io coding-style-patch-verification: ON
406  *
407  * Local Variables:
408  * eval: (c-set-style "gnu")
409  * End:
410  */