2 *------------------------------------------------------------------
3 * svm_queue.c - unidirectional shared-memory queues
5 * Copyright (c) 2009-2019 Cisco and/or its affiliates.
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at:
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *------------------------------------------------------------------
25 #include <vppinfra/mem.h>
26 #include <vppinfra/format.h>
27 #include <vppinfra/cache.h>
28 #include <svm/queue.h>
29 #include <vppinfra/time.h>
30 #include <vppinfra/lock.h>
33 svm_queue_init (void *base, int nels, int elsize)
36 pthread_mutexattr_t attr;
37 pthread_condattr_t cattr;
39 q = (svm_queue_t *) base;
40 clib_memset (q, 0, sizeof (*q));
44 q->producer_evtfd = -1;
45 q->consumer_evtfd = -1;
47 clib_memset (&attr, 0, sizeof (attr));
48 clib_memset (&cattr, 0, sizeof (cattr));
50 if (pthread_mutexattr_init (&attr))
51 clib_unix_warning ("mutexattr_init");
52 if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
53 clib_unix_warning ("pthread_mutexattr_setpshared");
54 if (pthread_mutexattr_setrobust (&attr, PTHREAD_MUTEX_ROBUST))
55 clib_unix_warning ("setrobust");
56 if (pthread_mutex_init (&q->mutex, &attr))
57 clib_unix_warning ("mutex_init");
58 if (pthread_mutexattr_destroy (&attr))
59 clib_unix_warning ("mutexattr_destroy");
60 if (pthread_condattr_init (&cattr))
61 clib_unix_warning ("condattr_init");
62 /* prints funny-looking messages in the Linux target */
63 if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
64 clib_unix_warning ("condattr_setpshared");
65 if (pthread_cond_init (&q->condvar, &cattr))
66 clib_unix_warning ("cond_init1");
67 if (pthread_condattr_destroy (&cattr))
68 clib_unix_warning ("cond_init2");
74 svm_queue_alloc_and_init (int nels, int elsize, int consumer_pid)
78 q = clib_mem_alloc_aligned (sizeof (svm_queue_t)
79 + nels * elsize, CLIB_CACHE_LINE_BYTES);
80 clib_memset (q, 0, sizeof (*q));
81 q = svm_queue_init (q, nels, elsize);
82 q->consumer_pid = consumer_pid;
91 svm_queue_free (svm_queue_t * q)
93 (void) pthread_mutex_destroy (&q->mutex);
94 (void) pthread_cond_destroy (&q->condvar);
99 svm_queue_lock (svm_queue_t * q)
101 int rv = pthread_mutex_lock (&q->mutex);
102 if (PREDICT_FALSE (rv == EOWNERDEAD))
103 pthread_mutex_consistent (&q->mutex);
107 svm_queue_trylock (svm_queue_t * q)
109 int rv = pthread_mutex_trylock (&q->mutex);
110 if (PREDICT_FALSE (rv == EOWNERDEAD))
111 rv = pthread_mutex_consistent (&q->mutex);
116 svm_queue_unlock (svm_queue_t * q)
118 pthread_mutex_unlock (&q->mutex);
122 svm_queue_is_full (svm_queue_t * q)
124 return q->cursize == q->maxsize;
128 svm_queue_send_signal_inline (svm_queue_t * q, u8 is_prod)
130 if (q->producer_evtfd == -1)
132 (void) pthread_cond_broadcast (&q->condvar);
136 int __clib_unused rv, fd;
138 ASSERT (q->consumer_evtfd > 0 && q->producer_evtfd > 0);
139 fd = is_prod ? q->producer_evtfd : q->consumer_evtfd;
140 rv = write (fd, &data, sizeof (data));
141 if (PREDICT_FALSE (rv < 0))
142 clib_unix_warning ("signal write on %d returned %d", fd, rv);
147 svm_queue_send_signal (svm_queue_t * q, u8 is_prod)
149 svm_queue_send_signal_inline (q, is_prod);
153 svm_queue_wait_inline (svm_queue_t * q)
155 if (q->producer_evtfd == -1)
157 pthread_cond_wait (&q->condvar, &q->mutex);
161 /* Fake a wait for event. We could use epoll but that would mean
162 * using yet another fd. Should do for now */
163 u32 cursize = q->cursize;
164 svm_queue_unlock (q);
165 while (q->cursize == cursize)
172 svm_queue_wait (svm_queue_t * q)
174 svm_queue_wait_inline (q);
178 svm_queue_timedwait_inline (svm_queue_t * q, double timeout)
181 ts.tv_sec = unix_time_now () + (u32) timeout;
182 ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
184 if (q->producer_evtfd == -1)
186 return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
190 double max_time = unix_time_now () + timeout;
191 u32 cursize = q->cursize;
194 svm_queue_unlock (q);
195 while (q->cursize == cursize && unix_time_now () < max_time)
197 rv = unix_time_now () < max_time ? 0 : ETIMEDOUT;
204 svm_queue_timedwait (svm_queue_t * q, double timeout)
206 return svm_queue_timedwait_inline (q, timeout);
210 * svm_queue_add_nolock
213 svm_queue_add_nolock (svm_queue_t * q, u8 * elem)
216 int need_broadcast = 0;
218 if (PREDICT_FALSE (q->cursize == q->maxsize))
220 while (q->cursize == q->maxsize)
221 svm_queue_wait_inline (q);
224 tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
225 clib_memcpy_fast (tailp, elem, q->elsize);
230 need_broadcast = (q->cursize == 1);
232 if (q->tail == q->maxsize)
236 svm_queue_send_signal_inline (q, 1);
241 svm_queue_add_raw (svm_queue_t * q, u8 * elem)
245 tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
246 clib_memcpy_fast (tailp, elem, q->elsize);
248 q->tail = (q->tail + 1) % q->maxsize;
252 svm_queue_send_signal_inline (q, 1);
260 svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
263 int need_broadcast = 0;
267 /* zero on success */
268 if (svm_queue_trylock (q))
276 if (PREDICT_FALSE (q->cursize == q->maxsize))
280 svm_queue_unlock (q);
283 while (q->cursize == q->maxsize)
284 svm_queue_wait_inline (q);
287 tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
288 clib_memcpy_fast (tailp, elem, q->elsize);
293 need_broadcast = (q->cursize == 1);
295 if (q->tail == q->maxsize)
299 svm_queue_send_signal_inline (q, 1);
301 svm_queue_unlock (q);
310 svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
313 int need_broadcast = 0;
317 /* zero on success */
318 if (svm_queue_trylock (q))
326 if (PREDICT_FALSE (q->cursize + 1 == q->maxsize))
330 svm_queue_unlock (q);
333 while (q->cursize + 1 == q->maxsize)
334 svm_queue_wait_inline (q);
337 tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
338 clib_memcpy_fast (tailp, elem, q->elsize);
343 if (q->tail == q->maxsize)
346 need_broadcast = (q->cursize == 1);
348 tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
349 clib_memcpy_fast (tailp, elem2, q->elsize);
354 if (q->tail == q->maxsize)
358 svm_queue_send_signal_inline (q, 1);
360 svm_queue_unlock (q);
369 svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
373 int need_broadcast = 0;
376 if (cond == SVM_Q_NOWAIT)
378 /* zero on success */
379 if (svm_queue_trylock (q))
387 if (PREDICT_FALSE (q->cursize == 0))
389 if (cond == SVM_Q_NOWAIT)
391 svm_queue_unlock (q);
394 else if (cond == SVM_Q_TIMEDWAIT)
396 while (q->cursize == 0 && rc == 0)
397 rc = svm_queue_timedwait_inline (q, time);
401 svm_queue_unlock (q);
407 while (q->cursize == 0)
408 svm_queue_wait_inline (q);
412 headp = (i8 *) (&q->data[0] + q->elsize * q->head);
413 clib_memcpy_fast (elem, headp, q->elsize);
416 /* $$$$ JFC shouldn't this be == 0? */
417 if (q->cursize == q->maxsize)
422 if (q->head == q->maxsize)
426 svm_queue_send_signal_inline (q, 0);
428 svm_queue_unlock (q);
434 svm_queue_sub2 (svm_queue_t * q, u8 * elem)
442 svm_queue_unlock (q);
446 headp = (i8 *) (&q->data[0] + q->elsize * q->head);
447 clib_memcpy_fast (elem, headp, q->elsize);
450 need_broadcast = (q->cursize == q->maxsize / 2);
453 if (PREDICT_FALSE (q->head == q->maxsize))
455 svm_queue_unlock (q);
458 svm_queue_send_signal_inline (q, 0);
464 svm_queue_sub_raw (svm_queue_t * q, u8 * elem)
469 if (PREDICT_FALSE (q->cursize == 0))
471 while (q->cursize == 0)
475 headp = (i8 *) (&q->data[0] + q->elsize * q->head);
476 clib_memcpy_fast (elem, headp, q->elsize);
478 need_broadcast = q->cursize == q->maxsize;
480 q->head = (q->head + 1) % q->maxsize;
483 if (PREDICT_FALSE (need_broadcast))
484 svm_queue_send_signal_inline (q, 0);
490 svm_queue_set_producer_event_fd (svm_queue_t * q, int fd)
492 q->producer_evtfd = fd;
496 svm_queue_set_consumer_event_fd (svm_queue_t * q, int fd)
498 q->consumer_evtfd = fd;
502 * fd.io coding-style-patch-verification: ON
505 * eval: (c-set-style "gnu")