VOM: fix cflags
[vpp.git] / src / vlibmemory / unix_shared_memory_queue.c
1 /*
2  *------------------------------------------------------------------
3  * unix_shared_memory_queue.c - unidirectional shared-memory queues
4  *
5  * Copyright (c) 2009 Cisco and/or its affiliates.
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at:
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *------------------------------------------------------------------
18  */
19
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <pthread.h>
24 #include <vppinfra/mem.h>
25 #include <vppinfra/format.h>
26 #include <vppinfra/cache.h>
27 #include <vlibmemory/unix_shared_memory_queue.h>
28 #include <signal.h>
29
30 /*
31  * unix_shared_memory_queue_init
32  *
33  * nels = number of elements on the queue
34  * elsize = element size, presumably 4 and cacheline-size will
35  *          be popular choices.
36  * pid   = consumer pid
37  *
38  * The idea is to call this function in the queue consumer,
39  * and e-mail the queue pointer to the producer(s).
40  *
41  * The vpp process / main thread allocates one of these
42  * at startup; its main input queue. The vpp main input queue
43  * has a pointer to it in the shared memory segment header.
44  *
45  * You probably want to be on an svm data heap before calling this
46  * function.
47  */
48 unix_shared_memory_queue_t *
49 unix_shared_memory_queue_init (int nels,
50                                int elsize,
51                                int consumer_pid,
52                                int signal_when_queue_non_empty)
53 {
54   unix_shared_memory_queue_t *q;
55   pthread_mutexattr_t attr;
56   pthread_condattr_t cattr;
57
58   q = clib_mem_alloc_aligned (sizeof (unix_shared_memory_queue_t)
59                               + nels * elsize, CLIB_CACHE_LINE_BYTES);
60   memset (q, 0, sizeof (*q));
61
62   q->elsize = elsize;
63   q->maxsize = nels;
64   q->consumer_pid = consumer_pid;
65   q->signal_when_queue_non_empty = signal_when_queue_non_empty;
66
67   memset (&attr, 0, sizeof (attr));
68   memset (&cattr, 0, sizeof (cattr));
69
70   if (pthread_mutexattr_init (&attr))
71     clib_unix_warning ("mutexattr_init");
72   if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
73     clib_unix_warning ("pthread_mutexattr_setpshared");
74   if (pthread_mutex_init (&q->mutex, &attr))
75     clib_unix_warning ("mutex_init");
76   if (pthread_mutexattr_destroy (&attr))
77     clib_unix_warning ("mutexattr_destroy");
78   if (pthread_condattr_init (&cattr))
79     clib_unix_warning ("condattr_init");
80   /* prints funny-looking messages in the Linux target */
81   if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
82     clib_unix_warning ("condattr_setpshared");
83   if (pthread_cond_init (&q->condvar, &cattr))
84     clib_unix_warning ("cond_init1");
85   if (pthread_condattr_destroy (&cattr))
86     clib_unix_warning ("cond_init2");
87
88   return (q);
89 }
90
91 /*
92  * unix_shared_memory_queue_free
93  */
94 void
95 unix_shared_memory_queue_free (unix_shared_memory_queue_t * q)
96 {
97   (void) pthread_mutex_destroy (&q->mutex);
98   (void) pthread_cond_destroy (&q->condvar);
99   clib_mem_free (q);
100 }
101
102 void
103 unix_shared_memory_queue_lock (unix_shared_memory_queue_t * q)
104 {
105   pthread_mutex_lock (&q->mutex);
106 }
107
108 void
109 unix_shared_memory_queue_unlock (unix_shared_memory_queue_t * q)
110 {
111   pthread_mutex_unlock (&q->mutex);
112 }
113
114 int
115 unix_shared_memory_queue_is_full (unix_shared_memory_queue_t * q)
116 {
117   return q->cursize == q->maxsize;
118 }
119
120 /*
121  * unix_shared_memory_queue_add_nolock
122  */
123 int
124 unix_shared_memory_queue_add_nolock (unix_shared_memory_queue_t * q,
125                                      u8 * elem)
126 {
127   i8 *tailp;
128   int need_broadcast = 0;
129
130   if (PREDICT_FALSE (q->cursize == q->maxsize))
131     {
132       while (q->cursize == q->maxsize)
133         {
134           (void) pthread_cond_wait (&q->condvar, &q->mutex);
135         }
136     }
137
138   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
139   clib_memcpy (tailp, elem, q->elsize);
140
141   q->tail++;
142   q->cursize++;
143
144   need_broadcast = (q->cursize == 1);
145
146   if (q->tail == q->maxsize)
147     q->tail = 0;
148
149   if (need_broadcast)
150     {
151       (void) pthread_cond_broadcast (&q->condvar);
152       if (q->signal_when_queue_non_empty)
153         kill (q->consumer_pid, q->signal_when_queue_non_empty);
154     }
155   return 0;
156 }
157
158 int
159 unix_shared_memory_queue_add_raw (unix_shared_memory_queue_t * q, u8 * elem)
160 {
161   i8 *tailp;
162
163   if (PREDICT_FALSE (q->cursize == q->maxsize))
164     {
165       while (q->cursize == q->maxsize)
166         ;
167     }
168
169   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
170   clib_memcpy (tailp, elem, q->elsize);
171
172   q->tail++;
173   q->cursize++;
174
175   if (q->tail == q->maxsize)
176     q->tail = 0;
177   return 0;
178 }
179
180
181 /*
182  * unix_shared_memory_queue_add
183  */
184 int
185 unix_shared_memory_queue_add (unix_shared_memory_queue_t * q,
186                               u8 * elem, int nowait)
187 {
188   i8 *tailp;
189   int need_broadcast = 0;
190
191   if (nowait)
192     {
193       /* zero on success */
194       if (pthread_mutex_trylock (&q->mutex))
195         {
196           return (-1);
197         }
198     }
199   else
200     pthread_mutex_lock (&q->mutex);
201
202   if (PREDICT_FALSE (q->cursize == q->maxsize))
203     {
204       if (nowait)
205         {
206           pthread_mutex_unlock (&q->mutex);
207           return (-2);
208         }
209       while (q->cursize == q->maxsize)
210         {
211           (void) pthread_cond_wait (&q->condvar, &q->mutex);
212         }
213     }
214
215   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
216   clib_memcpy (tailp, elem, q->elsize);
217
218   q->tail++;
219   q->cursize++;
220
221   need_broadcast = (q->cursize == 1);
222
223   if (q->tail == q->maxsize)
224     q->tail = 0;
225
226   if (need_broadcast)
227     {
228       (void) pthread_cond_broadcast (&q->condvar);
229       if (q->signal_when_queue_non_empty)
230         kill (q->consumer_pid, q->signal_when_queue_non_empty);
231     }
232   pthread_mutex_unlock (&q->mutex);
233
234   return 0;
235 }
236
237 /*
238  * unix_shared_memory_queue_add2
239  */
240 int
241 unix_shared_memory_queue_add2 (unix_shared_memory_queue_t * q, u8 * elem,
242                                u8 * elem2, int nowait)
243 {
244   i8 *tailp;
245   int need_broadcast = 0;
246
247   if (nowait)
248     {
249       /* zero on success */
250       if (pthread_mutex_trylock (&q->mutex))
251         {
252           return (-1);
253         }
254     }
255   else
256     pthread_mutex_lock (&q->mutex);
257
258   if (PREDICT_FALSE (q->cursize + 1 == q->maxsize))
259     {
260       if (nowait)
261         {
262           pthread_mutex_unlock (&q->mutex);
263           return (-2);
264         }
265       while (q->cursize + 1 == q->maxsize)
266         {
267           (void) pthread_cond_wait (&q->condvar, &q->mutex);
268         }
269     }
270
271   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
272   clib_memcpy (tailp, elem, q->elsize);
273
274   q->tail++;
275   q->cursize++;
276
277   if (q->tail == q->maxsize)
278     q->tail = 0;
279
280   need_broadcast = (q->cursize == 1);
281
282   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
283   clib_memcpy (tailp, elem2, q->elsize);
284
285   q->tail++;
286   q->cursize++;
287
288   if (q->tail == q->maxsize)
289     q->tail = 0;
290
291   if (need_broadcast)
292     {
293       (void) pthread_cond_broadcast (&q->condvar);
294       if (q->signal_when_queue_non_empty)
295         kill (q->consumer_pid, q->signal_when_queue_non_empty);
296     }
297   pthread_mutex_unlock (&q->mutex);
298
299   return 0;
300 }
301
302 /*
303  * unix_shared_memory_queue_sub
304  */
305 int
306 unix_shared_memory_queue_sub (unix_shared_memory_queue_t * q,
307                               u8 * elem, int nowait)
308 {
309   i8 *headp;
310   int need_broadcast = 0;
311
312   if (nowait)
313     {
314       /* zero on success */
315       if (pthread_mutex_trylock (&q->mutex))
316         {
317           return (-1);
318         }
319     }
320   else
321     pthread_mutex_lock (&q->mutex);
322
323   if (PREDICT_FALSE (q->cursize == 0))
324     {
325       if (nowait)
326         {
327           pthread_mutex_unlock (&q->mutex);
328           return (-2);
329         }
330       while (q->cursize == 0)
331         {
332           (void) pthread_cond_wait (&q->condvar, &q->mutex);
333         }
334     }
335
336   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
337   clib_memcpy (elem, headp, q->elsize);
338
339   q->head++;
340   /* $$$$ JFC shouldn't this be == 0? */
341   if (q->cursize == q->maxsize)
342     need_broadcast = 1;
343
344   q->cursize--;
345
346   if (q->head == q->maxsize)
347     q->head = 0;
348
349   if (need_broadcast)
350     (void) pthread_cond_broadcast (&q->condvar);
351
352   pthread_mutex_unlock (&q->mutex);
353
354   return 0;
355 }
356
357 int
358 unix_shared_memory_queue_sub_raw (unix_shared_memory_queue_t * q, u8 * elem)
359 {
360   i8 *headp;
361
362   if (PREDICT_FALSE (q->cursize == 0))
363     {
364       while (q->cursize == 0)
365         ;
366     }
367
368   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
369   clib_memcpy (elem, headp, q->elsize);
370
371   q->head++;
372   q->cursize--;
373
374   if (q->head == q->maxsize)
375     q->head = 0;
376   return 0;
377 }
378
379 /*
380  * fd.io coding-style-patch-verification: ON
381  *
382  * Local Variables:
383  * eval: (c-set-style "gnu")
384  * End:
385  */