session: use msg queue for events
[vpp.git] / src / svm / queue.c
1 /*
2  *------------------------------------------------------------------
3  * svm_queue.c - unidirectional shared-memory queues
4  *
5  * Copyright (c) 2009 Cisco and/or its affiliates.
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at:
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *------------------------------------------------------------------
18  */
19
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <pthread.h>
25 #include <vppinfra/mem.h>
26 #include <vppinfra/format.h>
27 #include <vppinfra/cache.h>
28 #include <svm/queue.h>
29 #include <vppinfra/time.h>
30 #include <signal.h>
31
32 /*
33  * svm_queue_init
34  *
35  * nels = number of elements on the queue
36  * elsize = element size, presumably 4 and cacheline-size will
37  *          be popular choices.
38  * pid   = consumer pid
39  *
40  * The idea is to call this function in the queue consumer,
41  * and e-mail the queue pointer to the producer(s).
42  *
43  * The vpp process / main thread allocates one of these
44  * at startup; its main input queue. The vpp main input queue
45  * has a pointer to it in the shared memory segment header.
46  *
47  * You probably want to be on an svm data heap before calling this
48  * function.
49  */
50 svm_queue_t *
51 svm_queue_init (int nels,
52                 int elsize, int consumer_pid, int signal_when_queue_non_empty)
53 {
54   svm_queue_t *q;
55   pthread_mutexattr_t attr;
56   pthread_condattr_t cattr;
57
58   q = clib_mem_alloc_aligned (sizeof (svm_queue_t)
59                               + nels * elsize, CLIB_CACHE_LINE_BYTES);
60   memset (q, 0, sizeof (*q));
61
62   q->elsize = elsize;
63   q->maxsize = nels;
64   q->consumer_pid = consumer_pid;
65   q->signal_when_queue_non_empty = signal_when_queue_non_empty;
66
67   memset (&attr, 0, sizeof (attr));
68   memset (&cattr, 0, sizeof (cattr));
69
70   if (pthread_mutexattr_init (&attr))
71     clib_unix_warning ("mutexattr_init");
72   if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
73     clib_unix_warning ("pthread_mutexattr_setpshared");
74   if (pthread_mutex_init (&q->mutex, &attr))
75     clib_unix_warning ("mutex_init");
76   if (pthread_mutexattr_destroy (&attr))
77     clib_unix_warning ("mutexattr_destroy");
78   if (pthread_condattr_init (&cattr))
79     clib_unix_warning ("condattr_init");
80   /* prints funny-looking messages in the Linux target */
81   if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
82     clib_unix_warning ("condattr_setpshared");
83   if (pthread_cond_init (&q->condvar, &cattr))
84     clib_unix_warning ("cond_init1");
85   if (pthread_condattr_destroy (&cattr))
86     clib_unix_warning ("cond_init2");
87
88   return (q);
89 }
90
91 /*
92  * svm_queue_free
93  */
94 void
95 svm_queue_free (svm_queue_t * q)
96 {
97   (void) pthread_mutex_destroy (&q->mutex);
98   (void) pthread_cond_destroy (&q->condvar);
99   clib_mem_free (q);
100 }
101
102 void
103 svm_queue_lock (svm_queue_t * q)
104 {
105   pthread_mutex_lock (&q->mutex);
106 }
107
108 void
109 svm_queue_unlock (svm_queue_t * q)
110 {
111   pthread_mutex_unlock (&q->mutex);
112 }
113
114 int
115 svm_queue_is_full (svm_queue_t * q)
116 {
117   return q->cursize == q->maxsize;
118 }
119
120 /*
121  * svm_queue_add_nolock
122  */
123 int
124 svm_queue_add_nolock (svm_queue_t * q, u8 * elem)
125 {
126   i8 *tailp;
127   int need_broadcast = 0;
128
129   if (PREDICT_FALSE (q->cursize == q->maxsize))
130     {
131       while (q->cursize == q->maxsize)
132         {
133           (void) pthread_cond_wait (&q->condvar, &q->mutex);
134         }
135     }
136
137   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
138   clib_memcpy (tailp, elem, q->elsize);
139
140   q->tail++;
141   q->cursize++;
142
143   need_broadcast = (q->cursize == 1);
144
145   if (q->tail == q->maxsize)
146     q->tail = 0;
147
148   if (need_broadcast)
149     {
150       (void) pthread_cond_broadcast (&q->condvar);
151       if (q->signal_when_queue_non_empty)
152         kill (q->consumer_pid, q->signal_when_queue_non_empty);
153     }
154   return 0;
155 }
156
157 void
158 svm_queue_add_raw (svm_queue_t * q, u8 * elem)
159 {
160   i8 *tailp;
161
162   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
163   clib_memcpy (tailp, elem, q->elsize);
164
165   q->tail = (q->tail + 1) % q->maxsize;
166   q->cursize++;
167 }
168
169
170 /*
171  * svm_queue_add
172  */
173 int
174 svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
175 {
176   i8 *tailp;
177   int need_broadcast = 0;
178
179   if (nowait)
180     {
181       /* zero on success */
182       if (pthread_mutex_trylock (&q->mutex))
183         {
184           return (-1);
185         }
186     }
187   else
188     pthread_mutex_lock (&q->mutex);
189
190   if (PREDICT_FALSE (q->cursize == q->maxsize))
191     {
192       if (nowait)
193         {
194           pthread_mutex_unlock (&q->mutex);
195           return (-2);
196         }
197       while (q->cursize == q->maxsize)
198         {
199           (void) pthread_cond_wait (&q->condvar, &q->mutex);
200         }
201     }
202
203   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
204   clib_memcpy (tailp, elem, q->elsize);
205
206   q->tail++;
207   q->cursize++;
208
209   need_broadcast = (q->cursize == 1);
210
211   if (q->tail == q->maxsize)
212     q->tail = 0;
213
214   if (need_broadcast)
215     {
216       (void) pthread_cond_broadcast (&q->condvar);
217       if (q->signal_when_queue_non_empty)
218         kill (q->consumer_pid, q->signal_when_queue_non_empty);
219     }
220   pthread_mutex_unlock (&q->mutex);
221
222   return 0;
223 }
224
225 /*
226  * svm_queue_add2
227  */
228 int
229 svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
230 {
231   i8 *tailp;
232   int need_broadcast = 0;
233
234   if (nowait)
235     {
236       /* zero on success */
237       if (pthread_mutex_trylock (&q->mutex))
238         {
239           return (-1);
240         }
241     }
242   else
243     pthread_mutex_lock (&q->mutex);
244
245   if (PREDICT_FALSE (q->cursize + 1 == q->maxsize))
246     {
247       if (nowait)
248         {
249           pthread_mutex_unlock (&q->mutex);
250           return (-2);
251         }
252       while (q->cursize + 1 == q->maxsize)
253         {
254           (void) pthread_cond_wait (&q->condvar, &q->mutex);
255         }
256     }
257
258   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
259   clib_memcpy (tailp, elem, q->elsize);
260
261   q->tail++;
262   q->cursize++;
263
264   if (q->tail == q->maxsize)
265     q->tail = 0;
266
267   need_broadcast = (q->cursize == 1);
268
269   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
270   clib_memcpy (tailp, elem2, q->elsize);
271
272   q->tail++;
273   q->cursize++;
274
275   if (q->tail == q->maxsize)
276     q->tail = 0;
277
278   if (need_broadcast)
279     {
280       (void) pthread_cond_broadcast (&q->condvar);
281       if (q->signal_when_queue_non_empty)
282         kill (q->consumer_pid, q->signal_when_queue_non_empty);
283     }
284   pthread_mutex_unlock (&q->mutex);
285
286   return 0;
287 }
288
289 /*
290  * svm_queue_sub
291  */
292 int
293 svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
294                u32 time)
295 {
296   i8 *headp;
297   int need_broadcast = 0;
298   int rc = 0;
299
300   if (cond == SVM_Q_NOWAIT)
301     {
302       /* zero on success */
303       if (pthread_mutex_trylock (&q->mutex))
304         {
305           return (-1);
306         }
307     }
308   else
309     pthread_mutex_lock (&q->mutex);
310
311   if (PREDICT_FALSE (q->cursize == 0))
312     {
313       if (cond == SVM_Q_NOWAIT)
314         {
315           pthread_mutex_unlock (&q->mutex);
316           return (-2);
317         }
318       else if (cond == SVM_Q_TIMEDWAIT)
319         {
320           struct timespec ts;
321           ts.tv_sec = unix_time_now () + time;
322           ts.tv_nsec = 0;
323           while (q->cursize == 0 && rc == 0)
324             {
325               rc = pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
326             }
327           if (rc == ETIMEDOUT)
328             {
329               pthread_mutex_unlock (&q->mutex);
330               return ETIMEDOUT;
331             }
332         }
333       else
334         {
335           while (q->cursize == 0)
336             {
337               (void) pthread_cond_wait (&q->condvar, &q->mutex);
338             }
339         }
340     }
341
342   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
343   clib_memcpy (elem, headp, q->elsize);
344
345   q->head++;
346   /* $$$$ JFC shouldn't this be == 0? */
347   if (q->cursize == q->maxsize)
348     need_broadcast = 1;
349
350   q->cursize--;
351
352   if (q->head == q->maxsize)
353     q->head = 0;
354
355   if (need_broadcast)
356     (void) pthread_cond_broadcast (&q->condvar);
357
358   pthread_mutex_unlock (&q->mutex);
359
360   return 0;
361 }
362
363 int
364 svm_queue_sub2 (svm_queue_t * q, u8 * elem)
365 {
366   int need_broadcast;
367   i8 *headp;
368
369   pthread_mutex_lock (&q->mutex);
370   if (q->cursize == 0)
371     {
372       pthread_mutex_unlock (&q->mutex);
373       return -1;
374     }
375
376   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
377   clib_memcpy (elem, headp, q->elsize);
378
379   q->head++;
380   need_broadcast = (q->cursize == q->maxsize / 2);
381   q->cursize--;
382
383   if (PREDICT_FALSE (q->head == q->maxsize))
384     q->head = 0;
385   pthread_mutex_unlock (&q->mutex);
386
387   if (need_broadcast)
388     (void) pthread_cond_broadcast (&q->condvar);
389
390   return 0;
391 }
392
393 int
394 svm_queue_sub_raw (svm_queue_t * q, u8 * elem)
395 {
396   i8 *headp;
397
398   if (PREDICT_FALSE (q->cursize == 0))
399     {
400       while (q->cursize == 0)
401         ;
402     }
403
404   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
405   clib_memcpy (elem, headp, q->elsize);
406
407   q->head = (q->head + 1) % q->maxsize;
408   q->cursize--;
409
410   return 0;
411 }
412
413 /*
414  * fd.io coding-style-patch-verification: ON
415  *
416  * Local Variables:
417  * eval: (c-set-style "gnu")
418  * End:
419  */