96e40fc2aec749940e32f110d75aefbd062f725e
[vpp.git] / src / svm / queue.c
1 /*
2  *------------------------------------------------------------------
3  * svm_queue.c - unidirectional shared-memory queues
4  *
5  * Copyright (c) 2009 Cisco and/or its affiliates.
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at:
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *------------------------------------------------------------------
18  */
19
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <pthread.h>
25 #include <vppinfra/mem.h>
26 #include <vppinfra/format.h>
27 #include <vppinfra/cache.h>
28 #include <svm/queue.h>
29 #include <vppinfra/time.h>
30 #include <signal.h>
31
32 /*
33  * svm_queue_init
34  *
35  * nels = number of elements on the queue
36  * elsize = element size, presumably 4 and cacheline-size will
37  *          be popular choices.
38  * pid   = consumer pid
39  *
40  * The idea is to call this function in the queue consumer,
41  * and e-mail the queue pointer to the producer(s).
42  *
43  * The vpp process / main thread allocates one of these
44  * at startup; its main input queue. The vpp main input queue
45  * has a pointer to it in the shared memory segment header.
46  *
47  * You probably want to be on an svm data heap before calling this
48  * function.
49  */
50 svm_queue_t *
51 svm_queue_init (int nels,
52                 int elsize, int consumer_pid, int signal_when_queue_non_empty)
53 {
54   svm_queue_t *q;
55   pthread_mutexattr_t attr;
56   pthread_condattr_t cattr;
57
58   q = clib_mem_alloc_aligned (sizeof (svm_queue_t)
59                               + nels * elsize, CLIB_CACHE_LINE_BYTES);
60   memset (q, 0, sizeof (*q));
61
62   q->elsize = elsize;
63   q->maxsize = nels;
64   q->consumer_pid = consumer_pid;
65   q->signal_when_queue_non_empty = signal_when_queue_non_empty;
66
67   memset (&attr, 0, sizeof (attr));
68   memset (&cattr, 0, sizeof (cattr));
69
70   if (pthread_mutexattr_init (&attr))
71     clib_unix_warning ("mutexattr_init");
72   if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
73     clib_unix_warning ("pthread_mutexattr_setpshared");
74   if (pthread_mutex_init (&q->mutex, &attr))
75     clib_unix_warning ("mutex_init");
76   if (pthread_mutexattr_destroy (&attr))
77     clib_unix_warning ("mutexattr_destroy");
78   if (pthread_condattr_init (&cattr))
79     clib_unix_warning ("condattr_init");
80   /* prints funny-looking messages in the Linux target */
81   if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
82     clib_unix_warning ("condattr_setpshared");
83   if (pthread_cond_init (&q->condvar, &cattr))
84     clib_unix_warning ("cond_init1");
85   if (pthread_condattr_destroy (&cattr))
86     clib_unix_warning ("cond_init2");
87
88   return (q);
89 }
90
91 /*
92  * svm_queue_free
93  */
94 void
95 svm_queue_free (svm_queue_t * q)
96 {
97   (void) pthread_mutex_destroy (&q->mutex);
98   (void) pthread_cond_destroy (&q->condvar);
99   clib_mem_free (q);
100 }
101
102 void
103 svm_queue_lock (svm_queue_t * q)
104 {
105   pthread_mutex_lock (&q->mutex);
106 }
107
108 void
109 svm_queue_unlock (svm_queue_t * q)
110 {
111   pthread_mutex_unlock (&q->mutex);
112 }
113
114 int
115 svm_queue_is_full (svm_queue_t * q)
116 {
117   return q->cursize == q->maxsize;
118 }
119
120 /*
121  * svm_queue_add_nolock
122  */
123 int
124 svm_queue_add_nolock (svm_queue_t * q, u8 * elem)
125 {
126   i8 *tailp;
127   int need_broadcast = 0;
128
129   if (PREDICT_FALSE (q->cursize == q->maxsize))
130     {
131       while (q->cursize == q->maxsize)
132         {
133           (void) pthread_cond_wait (&q->condvar, &q->mutex);
134         }
135     }
136
137   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
138   clib_memcpy (tailp, elem, q->elsize);
139
140   q->tail++;
141   q->cursize++;
142
143   need_broadcast = (q->cursize == 1);
144
145   if (q->tail == q->maxsize)
146     q->tail = 0;
147
148   if (need_broadcast)
149     {
150       (void) pthread_cond_broadcast (&q->condvar);
151       if (q->signal_when_queue_non_empty)
152         kill (q->consumer_pid, q->signal_when_queue_non_empty);
153     }
154   return 0;
155 }
156
157 int
158 svm_queue_add_raw (svm_queue_t * q, u8 * elem)
159 {
160   i8 *tailp;
161
162   if (PREDICT_FALSE (q->cursize == q->maxsize))
163     {
164       while (q->cursize == q->maxsize)
165         ;
166     }
167
168   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
169   clib_memcpy (tailp, elem, q->elsize);
170
171   q->tail++;
172   q->cursize++;
173
174   if (q->tail == q->maxsize)
175     q->tail = 0;
176   return 0;
177 }
178
179
180 /*
181  * svm_queue_add
182  */
183 int
184 svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
185 {
186   i8 *tailp;
187   int need_broadcast = 0;
188
189   if (nowait)
190     {
191       /* zero on success */
192       if (pthread_mutex_trylock (&q->mutex))
193         {
194           return (-1);
195         }
196     }
197   else
198     pthread_mutex_lock (&q->mutex);
199
200   if (PREDICT_FALSE (q->cursize == q->maxsize))
201     {
202       if (nowait)
203         {
204           pthread_mutex_unlock (&q->mutex);
205           return (-2);
206         }
207       while (q->cursize == q->maxsize)
208         {
209           (void) pthread_cond_wait (&q->condvar, &q->mutex);
210         }
211     }
212
213   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
214   clib_memcpy (tailp, elem, q->elsize);
215
216   q->tail++;
217   q->cursize++;
218
219   need_broadcast = (q->cursize == 1);
220
221   if (q->tail == q->maxsize)
222     q->tail = 0;
223
224   if (need_broadcast)
225     {
226       (void) pthread_cond_broadcast (&q->condvar);
227       if (q->signal_when_queue_non_empty)
228         kill (q->consumer_pid, q->signal_when_queue_non_empty);
229     }
230   pthread_mutex_unlock (&q->mutex);
231
232   return 0;
233 }
234
235 /*
236  * svm_queue_add2
237  */
238 int
239 svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
240 {
241   i8 *tailp;
242   int need_broadcast = 0;
243
244   if (nowait)
245     {
246       /* zero on success */
247       if (pthread_mutex_trylock (&q->mutex))
248         {
249           return (-1);
250         }
251     }
252   else
253     pthread_mutex_lock (&q->mutex);
254
255   if (PREDICT_FALSE (q->cursize + 1 == q->maxsize))
256     {
257       if (nowait)
258         {
259           pthread_mutex_unlock (&q->mutex);
260           return (-2);
261         }
262       while (q->cursize + 1 == q->maxsize)
263         {
264           (void) pthread_cond_wait (&q->condvar, &q->mutex);
265         }
266     }
267
268   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
269   clib_memcpy (tailp, elem, q->elsize);
270
271   q->tail++;
272   q->cursize++;
273
274   if (q->tail == q->maxsize)
275     q->tail = 0;
276
277   need_broadcast = (q->cursize == 1);
278
279   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
280   clib_memcpy (tailp, elem2, q->elsize);
281
282   q->tail++;
283   q->cursize++;
284
285   if (q->tail == q->maxsize)
286     q->tail = 0;
287
288   if (need_broadcast)
289     {
290       (void) pthread_cond_broadcast (&q->condvar);
291       if (q->signal_when_queue_non_empty)
292         kill (q->consumer_pid, q->signal_when_queue_non_empty);
293     }
294   pthread_mutex_unlock (&q->mutex);
295
296   return 0;
297 }
298
299 /*
300  * svm_queue_sub
301  */
302 int
303 svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
304                u32 time)
305 {
306   i8 *headp;
307   int need_broadcast = 0;
308   int rc = 0;
309
310   if (cond == SVM_Q_NOWAIT)
311     {
312       /* zero on success */
313       if (pthread_mutex_trylock (&q->mutex))
314         {
315           return (-1);
316         }
317     }
318   else
319     pthread_mutex_lock (&q->mutex);
320
321   if (PREDICT_FALSE (q->cursize == 0))
322     {
323       if (cond == SVM_Q_NOWAIT)
324         {
325           pthread_mutex_unlock (&q->mutex);
326           return (-2);
327         }
328       else if (cond == SVM_Q_TIMEDWAIT)
329         {
330           struct timespec ts;
331           ts.tv_sec = unix_time_now () + time;
332           ts.tv_nsec = 0;
333           while (q->cursize == 0 && rc == 0)
334             {
335               rc = pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
336             }
337           if (rc == ETIMEDOUT)
338             {
339               pthread_mutex_unlock (&q->mutex);
340               return ETIMEDOUT;
341             }
342         }
343       else
344         {
345           while (q->cursize == 0)
346             {
347               (void) pthread_cond_wait (&q->condvar, &q->mutex);
348             }
349         }
350     }
351
352   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
353   clib_memcpy (elem, headp, q->elsize);
354
355   q->head++;
356   /* $$$$ JFC shouldn't this be == 0? */
357   if (q->cursize == q->maxsize)
358     need_broadcast = 1;
359
360   q->cursize--;
361
362   if (q->head == q->maxsize)
363     q->head = 0;
364
365   if (need_broadcast)
366     (void) pthread_cond_broadcast (&q->condvar);
367
368   pthread_mutex_unlock (&q->mutex);
369
370   return 0;
371 }
372
373 int
374 svm_queue_sub2 (svm_queue_t * q, u8 * elem)
375 {
376   int need_broadcast;
377   i8 *headp;
378
379   pthread_mutex_lock (&q->mutex);
380   if (q->cursize == 0)
381     {
382       pthread_mutex_unlock (&q->mutex);
383       return -1;
384     }
385
386   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
387   clib_memcpy (elem, headp, q->elsize);
388
389   q->head++;
390   need_broadcast = (q->cursize == q->maxsize / 2);
391   q->cursize--;
392
393   if (PREDICT_FALSE (q->head == q->maxsize))
394     q->head = 0;
395   pthread_mutex_unlock (&q->mutex);
396
397   if (need_broadcast)
398     (void) pthread_cond_broadcast (&q->condvar);
399
400   return 0;
401 }
402
403 int
404 svm_queue_sub_raw (svm_queue_t * q, u8 * elem)
405 {
406   i8 *headp;
407
408   if (PREDICT_FALSE (q->cursize == 0))
409     {
410       while (q->cursize == 0)
411         ;
412     }
413
414   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
415   clib_memcpy (elem, headp, q->elsize);
416
417   q->head++;
418   q->cursize--;
419
420   if (q->head == q->maxsize)
421     q->head = 0;
422   return 0;
423 }
424
425 /*
426  * fd.io coding-style-patch-verification: ON
427  *
428  * Local Variables:
429  * eval: (c-set-style "gnu")
430  * End:
431  */