0fa1fe9b230ec9d3b6e0785530f1f1d5f5006078
[vpp.git] / src / svm / queue.c
1 /*
2  *------------------------------------------------------------------
3  * svm_queue.c - unidirectional shared-memory queues
4  *
5  * Copyright (c) 2009 Cisco and/or its affiliates.
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at:
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *------------------------------------------------------------------
18  */
19
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <pthread.h>
25 #include <vppinfra/mem.h>
26 #include <vppinfra/format.h>
27 #include <vppinfra/cache.h>
28 #include <svm/queue.h>
29 #include <vppinfra/time.h>
30
31 svm_queue_t *
32 svm_queue_init (void *base, int nels, int elsize)
33 {
34   svm_queue_t *q;
35   pthread_mutexattr_t attr;
36   pthread_condattr_t cattr;
37
38   q = (svm_queue_t *) base;
39   memset (q, 0, sizeof (*q));
40
41   q->elsize = elsize;
42   q->maxsize = nels;
43   q->producer_evtfd = -1;
44   q->consumer_evtfd = -1;
45
46   memset (&attr, 0, sizeof (attr));
47   memset (&cattr, 0, sizeof (cattr));
48
49   if (pthread_mutexattr_init (&attr))
50     clib_unix_warning ("mutexattr_init");
51   if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
52     clib_unix_warning ("pthread_mutexattr_setpshared");
53   if (pthread_mutex_init (&q->mutex, &attr))
54     clib_unix_warning ("mutex_init");
55   if (pthread_mutexattr_destroy (&attr))
56     clib_unix_warning ("mutexattr_destroy");
57   if (pthread_condattr_init (&cattr))
58     clib_unix_warning ("condattr_init");
59   /* prints funny-looking messages in the Linux target */
60   if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
61     clib_unix_warning ("condattr_setpshared");
62   if (pthread_cond_init (&q->condvar, &cattr))
63     clib_unix_warning ("cond_init1");
64   if (pthread_condattr_destroy (&cattr))
65     clib_unix_warning ("cond_init2");
66
67   return (q);
68 }
69
70 svm_queue_t *
71 svm_queue_alloc_and_init (int nels, int elsize, int consumer_pid)
72 {
73   svm_queue_t *q;
74
75   q = clib_mem_alloc_aligned (sizeof (svm_queue_t)
76                               + nels * elsize, CLIB_CACHE_LINE_BYTES);
77   memset (q, 0, sizeof (*q));
78   q = svm_queue_init (q, nels, elsize);
79   q->consumer_pid = consumer_pid;
80
81   return q;
82 }
83
84 /*
85  * svm_queue_free
86  */
87 void
88 svm_queue_free (svm_queue_t * q)
89 {
90   (void) pthread_mutex_destroy (&q->mutex);
91   (void) pthread_cond_destroy (&q->condvar);
92   clib_mem_free (q);
93 }
94
95 void
96 svm_queue_lock (svm_queue_t * q)
97 {
98   pthread_mutex_lock (&q->mutex);
99 }
100
101 void
102 svm_queue_unlock (svm_queue_t * q)
103 {
104   pthread_mutex_unlock (&q->mutex);
105 }
106
107 int
108 svm_queue_is_full (svm_queue_t * q)
109 {
110   return q->cursize == q->maxsize;
111 }
112
113 static inline void
114 svm_queue_send_signal (svm_queue_t * q, u8 is_prod)
115 {
116   if (q->producer_evtfd == -1)
117     {
118       (void) pthread_cond_broadcast (&q->condvar);
119     }
120   else
121     {
122       int __clib_unused rv, fd;
123       u64 data = 1;
124       ASSERT (q->consumer_evtfd != -1);
125       fd = is_prod ? q->producer_evtfd : q->consumer_evtfd;
126       rv = write (fd, &data, sizeof (data));
127     }
128 }
129
130 /*
131  * svm_queue_add_nolock
132  */
133 int
134 svm_queue_add_nolock (svm_queue_t * q, u8 * elem)
135 {
136   i8 *tailp;
137   int need_broadcast = 0;
138
139   if (PREDICT_FALSE (q->cursize == q->maxsize))
140     {
141       while (q->cursize == q->maxsize)
142         {
143           (void) pthread_cond_wait (&q->condvar, &q->mutex);
144         }
145     }
146
147   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
148   clib_memcpy (tailp, elem, q->elsize);
149
150   q->tail++;
151   q->cursize++;
152
153   need_broadcast = (q->cursize == 1);
154
155   if (q->tail == q->maxsize)
156     q->tail = 0;
157
158   if (need_broadcast)
159     svm_queue_send_signal (q, 1);
160   return 0;
161 }
162
163 void
164 svm_queue_add_raw (svm_queue_t * q, u8 * elem)
165 {
166   i8 *tailp;
167
168   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
169   clib_memcpy (tailp, elem, q->elsize);
170
171   q->tail = (q->tail + 1) % q->maxsize;
172   q->cursize++;
173 }
174
175
176 /*
177  * svm_queue_add
178  */
179 int
180 svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
181 {
182   i8 *tailp;
183   int need_broadcast = 0;
184
185   if (nowait)
186     {
187       /* zero on success */
188       if (pthread_mutex_trylock (&q->mutex))
189         {
190           return (-1);
191         }
192     }
193   else
194     pthread_mutex_lock (&q->mutex);
195
196   if (PREDICT_FALSE (q->cursize == q->maxsize))
197     {
198       if (nowait)
199         {
200           pthread_mutex_unlock (&q->mutex);
201           return (-2);
202         }
203       while (q->cursize == q->maxsize)
204         {
205           (void) pthread_cond_wait (&q->condvar, &q->mutex);
206         }
207     }
208
209   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
210   clib_memcpy (tailp, elem, q->elsize);
211
212   q->tail++;
213   q->cursize++;
214
215   need_broadcast = (q->cursize == 1);
216
217   if (q->tail == q->maxsize)
218     q->tail = 0;
219
220   if (need_broadcast)
221     svm_queue_send_signal (q, 1);
222
223   pthread_mutex_unlock (&q->mutex);
224
225   return 0;
226 }
227
228 /*
229  * svm_queue_add2
230  */
231 int
232 svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
233 {
234   i8 *tailp;
235   int need_broadcast = 0;
236
237   if (nowait)
238     {
239       /* zero on success */
240       if (pthread_mutex_trylock (&q->mutex))
241         {
242           return (-1);
243         }
244     }
245   else
246     pthread_mutex_lock (&q->mutex);
247
248   if (PREDICT_FALSE (q->cursize + 1 == q->maxsize))
249     {
250       if (nowait)
251         {
252           pthread_mutex_unlock (&q->mutex);
253           return (-2);
254         }
255       while (q->cursize + 1 == q->maxsize)
256         {
257           (void) pthread_cond_wait (&q->condvar, &q->mutex);
258         }
259     }
260
261   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
262   clib_memcpy (tailp, elem, q->elsize);
263
264   q->tail++;
265   q->cursize++;
266
267   if (q->tail == q->maxsize)
268     q->tail = 0;
269
270   need_broadcast = (q->cursize == 1);
271
272   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
273   clib_memcpy (tailp, elem2, q->elsize);
274
275   q->tail++;
276   q->cursize++;
277
278   if (q->tail == q->maxsize)
279     q->tail = 0;
280
281   if (need_broadcast)
282     svm_queue_send_signal (q, 1);
283
284   pthread_mutex_unlock (&q->mutex);
285
286   return 0;
287 }
288
289 /*
290  * svm_queue_sub
291  */
292 int
293 svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
294                u32 time)
295 {
296   i8 *headp;
297   int need_broadcast = 0;
298   int rc = 0;
299
300   if (cond == SVM_Q_NOWAIT)
301     {
302       /* zero on success */
303       if (pthread_mutex_trylock (&q->mutex))
304         {
305           return (-1);
306         }
307     }
308   else
309     pthread_mutex_lock (&q->mutex);
310
311   if (PREDICT_FALSE (q->cursize == 0))
312     {
313       if (cond == SVM_Q_NOWAIT)
314         {
315           pthread_mutex_unlock (&q->mutex);
316           return (-2);
317         }
318       else if (cond == SVM_Q_TIMEDWAIT)
319         {
320           struct timespec ts;
321           ts.tv_sec = unix_time_now () + time;
322           ts.tv_nsec = 0;
323           while (q->cursize == 0 && rc == 0)
324             {
325               rc = pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
326             }
327           if (rc == ETIMEDOUT)
328             {
329               pthread_mutex_unlock (&q->mutex);
330               return ETIMEDOUT;
331             }
332         }
333       else
334         {
335           while (q->cursize == 0)
336             {
337               (void) pthread_cond_wait (&q->condvar, &q->mutex);
338             }
339         }
340     }
341
342   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
343   clib_memcpy (elem, headp, q->elsize);
344
345   q->head++;
346   /* $$$$ JFC shouldn't this be == 0? */
347   if (q->cursize == q->maxsize)
348     need_broadcast = 1;
349
350   q->cursize--;
351
352   if (q->head == q->maxsize)
353     q->head = 0;
354
355   if (need_broadcast)
356     svm_queue_send_signal (q, 0);
357
358   pthread_mutex_unlock (&q->mutex);
359
360   return 0;
361 }
362
363 int
364 svm_queue_sub2 (svm_queue_t * q, u8 * elem)
365 {
366   int need_broadcast;
367   i8 *headp;
368
369   pthread_mutex_lock (&q->mutex);
370   if (q->cursize == 0)
371     {
372       pthread_mutex_unlock (&q->mutex);
373       return -1;
374     }
375
376   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
377   clib_memcpy (elem, headp, q->elsize);
378
379   q->head++;
380   need_broadcast = (q->cursize == q->maxsize / 2);
381   q->cursize--;
382
383   if (PREDICT_FALSE (q->head == q->maxsize))
384     q->head = 0;
385   pthread_mutex_unlock (&q->mutex);
386
387   if (need_broadcast)
388     svm_queue_send_signal (q, 0);
389
390   return 0;
391 }
392
393 int
394 svm_queue_sub_raw (svm_queue_t * q, u8 * elem)
395 {
396   i8 *headp;
397
398   if (PREDICT_FALSE (q->cursize == 0))
399     {
400       while (q->cursize == 0)
401         ;
402     }
403
404   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
405   clib_memcpy (elem, headp, q->elsize);
406
407   q->head = (q->head + 1) % q->maxsize;
408   q->cursize--;
409
410   return 0;
411 }
412
413 void
414 svm_queue_set_producer_event_fd (svm_queue_t * q, int fd)
415 {
416   q->producer_evtfd = fd;
417 }
418
419 void
420 svm_queue_set_consumer_event_fd (svm_queue_t * q, int fd)
421 {
422   q->consumer_evtfd = fd;
423 }
424
425 /*
426  * fd.io coding-style-patch-verification: ON
427  *
428  * Local Variables:
429  * eval: (c-set-style "gnu")
430  * End:
431  */