2 *------------------------------------------------------------------
3 * svm_queue.c - unidirectional shared-memory queues
5 * Copyright (c) 2009-2019 Cisco and/or its affiliates.
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at:
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *------------------------------------------------------------------
25 #include <vppinfra/mem.h>
26 #include <vppinfra/format.h>
27 #include <vppinfra/cache.h>
28 #include <svm/queue.h>
29 #include <vppinfra/time.h>
30 #include <vppinfra/lock.h>
33 svm_queue_init (void *base, int nels, int elsize)
36 pthread_mutexattr_t attr;
37 pthread_condattr_t cattr;
39 q = (svm_queue_t *) base;
40 clib_memset (q, 0, sizeof (*q));
44 q->producer_evtfd = -1;
45 q->consumer_evtfd = -1;
47 clib_memset (&attr, 0, sizeof (attr));
48 clib_memset (&cattr, 0, sizeof (cattr));
50 if (pthread_mutexattr_init (&attr))
51 clib_unix_warning ("mutexattr_init");
52 if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
53 clib_unix_warning ("pthread_mutexattr_setpshared");
54 if (pthread_mutexattr_setrobust (&attr, PTHREAD_MUTEX_ROBUST))
55 clib_unix_warning ("setrobust");
56 if (pthread_mutex_init (&q->mutex, &attr))
57 clib_unix_warning ("mutex_init");
58 if (pthread_mutexattr_destroy (&attr))
59 clib_unix_warning ("mutexattr_destroy");
60 if (pthread_condattr_init (&cattr))
61 clib_unix_warning ("condattr_init");
62 /* prints funny-looking messages in the Linux target */
63 if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
64 clib_unix_warning ("condattr_setpshared");
65 if (pthread_cond_init (&q->condvar, &cattr))
66 clib_unix_warning ("cond_init1");
67 if (pthread_condattr_destroy (&cattr))
68 clib_unix_warning ("cond_init2");
74 svm_queue_alloc_and_init (int nels, int elsize, int consumer_pid)
78 q = clib_mem_alloc_aligned (sizeof (svm_queue_t)
79 + nels * elsize, CLIB_CACHE_LINE_BYTES);
80 clib_memset (q, 0, sizeof (*q));
81 q = svm_queue_init (q, nels, elsize);
82 q->consumer_pid = consumer_pid;
91 svm_queue_free (svm_queue_t * q)
93 (void) pthread_mutex_destroy (&q->mutex);
94 (void) pthread_cond_destroy (&q->condvar);
99 svm_queue_lock (svm_queue_t * q)
101 int rv = pthread_mutex_lock (&q->mutex);
102 if (PREDICT_FALSE (rv == EOWNERDEAD))
103 pthread_mutex_consistent (&q->mutex);
107 svm_queue_unlock (svm_queue_t * q)
109 pthread_mutex_unlock (&q->mutex);
113 svm_queue_is_full (svm_queue_t * q)
115 return q->cursize == q->maxsize;
119 svm_queue_send_signal_inline (svm_queue_t * q, u8 is_prod)
121 if (q->producer_evtfd == -1)
123 (void) pthread_cond_broadcast (&q->condvar);
127 int __clib_unused rv, fd;
129 ASSERT (q->consumer_evtfd > 0 && q->producer_evtfd > 0);
130 fd = is_prod ? q->producer_evtfd : q->consumer_evtfd;
131 rv = write (fd, &data, sizeof (data));
132 if (PREDICT_FALSE (rv < 0))
133 clib_unix_warning ("signal write on %d returned %d", fd, rv);
138 svm_queue_send_signal (svm_queue_t * q, u8 is_prod)
140 svm_queue_send_signal_inline (q, is_prod);
144 svm_queue_wait_inline (svm_queue_t * q)
146 if (q->producer_evtfd == -1)
148 pthread_cond_wait (&q->condvar, &q->mutex);
152 /* Fake a wait for event. We could use epoll but that would mean
153 * using yet another fd. Should do for now */
154 u32 cursize = q->cursize;
155 svm_queue_unlock (q);
156 while (q->cursize == cursize)
163 svm_queue_wait (svm_queue_t * q)
165 svm_queue_wait_inline (q);
169 svm_queue_timedwait_inline (svm_queue_t * q, double timeout)
172 ts.tv_sec = unix_time_now () + (u32) timeout;
173 ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
175 if (q->producer_evtfd == -1)
177 return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
181 double max_time = unix_time_now () + timeout;
182 u32 cursize = q->cursize;
185 svm_queue_unlock (q);
186 while (q->cursize == cursize && unix_time_now () < max_time)
188 rv = unix_time_now () < max_time ? 0 : ETIMEDOUT;
195 svm_queue_timedwait (svm_queue_t * q, double timeout)
197 return svm_queue_timedwait_inline (q, timeout);
201 * svm_queue_add_nolock
204 svm_queue_add_nolock (svm_queue_t * q, u8 * elem)
207 int need_broadcast = 0;
209 if (PREDICT_FALSE (q->cursize == q->maxsize))
211 while (q->cursize == q->maxsize)
212 svm_queue_wait_inline (q);
215 tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
216 clib_memcpy_fast (tailp, elem, q->elsize);
221 need_broadcast = (q->cursize == 1);
223 if (q->tail == q->maxsize)
227 svm_queue_send_signal_inline (q, 1);
232 svm_queue_add_raw (svm_queue_t * q, u8 * elem)
236 tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
237 clib_memcpy_fast (tailp, elem, q->elsize);
239 q->tail = (q->tail + 1) % q->maxsize;
243 svm_queue_send_signal_inline (q, 1);
251 svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
254 int need_broadcast = 0;
258 /* zero on success */
259 if (pthread_mutex_trylock (&q->mutex))
267 if (PREDICT_FALSE (q->cursize == q->maxsize))
271 svm_queue_unlock (q);
274 while (q->cursize == q->maxsize)
275 svm_queue_wait_inline (q);
278 tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
279 clib_memcpy_fast (tailp, elem, q->elsize);
284 need_broadcast = (q->cursize == 1);
286 if (q->tail == q->maxsize)
290 svm_queue_send_signal_inline (q, 1);
292 svm_queue_unlock (q);
301 svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
304 int need_broadcast = 0;
308 /* zero on success */
309 if (pthread_mutex_trylock (&q->mutex))
317 if (PREDICT_FALSE (q->cursize + 1 == q->maxsize))
321 svm_queue_unlock (q);
324 while (q->cursize + 1 == q->maxsize)
325 svm_queue_wait_inline (q);
328 tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
329 clib_memcpy_fast (tailp, elem, q->elsize);
334 if (q->tail == q->maxsize)
337 need_broadcast = (q->cursize == 1);
339 tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
340 clib_memcpy_fast (tailp, elem2, q->elsize);
345 if (q->tail == q->maxsize)
349 svm_queue_send_signal_inline (q, 1);
351 svm_queue_unlock (q);
360 svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
364 int need_broadcast = 0;
367 if (cond == SVM_Q_NOWAIT)
369 /* zero on success */
370 if (pthread_mutex_trylock (&q->mutex))
378 if (PREDICT_FALSE (q->cursize == 0))
380 if (cond == SVM_Q_NOWAIT)
382 svm_queue_unlock (q);
385 else if (cond == SVM_Q_TIMEDWAIT)
387 while (q->cursize == 0 && rc == 0)
388 rc = svm_queue_timedwait_inline (q, time);
392 svm_queue_unlock (q);
398 while (q->cursize == 0)
399 svm_queue_wait_inline (q);
403 headp = (i8 *) (&q->data[0] + q->elsize * q->head);
404 clib_memcpy_fast (elem, headp, q->elsize);
407 /* $$$$ JFC shouldn't this be == 0? */
408 if (q->cursize == q->maxsize)
413 if (q->head == q->maxsize)
417 svm_queue_send_signal_inline (q, 0);
419 svm_queue_unlock (q);
425 svm_queue_sub2 (svm_queue_t * q, u8 * elem)
433 svm_queue_unlock (q);
437 headp = (i8 *) (&q->data[0] + q->elsize * q->head);
438 clib_memcpy_fast (elem, headp, q->elsize);
441 need_broadcast = (q->cursize == q->maxsize / 2);
444 if (PREDICT_FALSE (q->head == q->maxsize))
446 svm_queue_unlock (q);
449 svm_queue_send_signal_inline (q, 0);
455 svm_queue_sub_raw (svm_queue_t * q, u8 * elem)
460 if (PREDICT_FALSE (q->cursize == 0))
462 while (q->cursize == 0)
466 headp = (i8 *) (&q->data[0] + q->elsize * q->head);
467 clib_memcpy_fast (elem, headp, q->elsize);
469 need_broadcast = q->cursize == q->maxsize;
471 q->head = (q->head + 1) % q->maxsize;
474 if (PREDICT_FALSE (need_broadcast))
475 svm_queue_send_signal_inline (q, 0);
481 svm_queue_set_producer_event_fd (svm_queue_t * q, int fd)
483 q->producer_evtfd = fd;
487 svm_queue_set_consumer_event_fd (svm_queue_t * q, int fd)
489 q->consumer_evtfd = fd;
493 * fd.io coding-style-patch-verification: ON
496 * eval: (c-set-style "gnu")