+ u8 (*fn) (svm_msg_q_t *);
+ int rv;
+
+ fn = (type == SVM_MQ_WAIT_EMPTY) ? svm_msg_q_is_empty : svm_msg_q_is_full;
+
+ if (mq->q.evtfd == -1)
+ {
+ rv = pthread_mutex_lock (&mq->q.shr->mutex);
+ if (PREDICT_FALSE (rv == EOWNERDEAD))
+ {
+ rv = pthread_mutex_consistent (&mq->q.shr->mutex);
+ return rv;
+ }
+
+ while (fn (mq))
+ pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex);
+
+ pthread_mutex_unlock (&mq->q.shr->mutex);
+ }
+ else
+ {
+ u64 buf;
+
+ while (fn (mq))
+ {
+ while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0)
+ {
+ if (errno != EAGAIN)
+ {
+ clib_unix_warning ("read error");
+ return rv;
+ }
+ }
+ }
+ }
+
+ return 0;
+}
+
+int
+svm_msg_q_wait_prod (svm_msg_q_t *mq)
+{
+ if (mq->q.evtfd == -1)
+ {
+ while (svm_msg_q_is_full (mq))
+ pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex);
+ }
+ else
+ {
+ u64 buf;
+ int rv;
+
+ while (svm_msg_q_is_full (mq))
+ {
+ while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0)
+ {
+ if (errno != EAGAIN)
+ {
+ clib_unix_warning ("read error");
+ return rv;
+ }
+ }
+ }
+ }
+
+ return 0;
+}
+
+int
+svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout)
+{
+ if (mq->q.evtfd == -1)
+ {
+ svm_msg_q_shared_queue_t *sq = mq->q.shr;
+ struct timespec ts;
+ u32 sz;
+ int rv;
+
+ rv = pthread_mutex_lock (&sq->mutex);
+ if (PREDICT_FALSE (rv == EOWNERDEAD))
+ {
+ rv = pthread_mutex_consistent (&sq->mutex);
+ return rv;
+ }
+
+ /* check if we're still in a signalable state after grabbing lock */
+ sz = svm_msg_q_size (mq);
+ if (sz != 0 && sz != sq->maxsize)
+ {
+ pthread_mutex_unlock (&sq->mutex);
+ return 0;
+ }
+
+ ts.tv_sec = unix_time_now () + (u32) timeout;
+ ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
+ rv = pthread_cond_timedwait (&sq->condvar, &sq->mutex, &ts);
+
+ pthread_mutex_unlock (&sq->mutex);
+ return rv;
+ }
+ else
+ {
+ struct timeval tv;
+ u64 buf;
+ int rv;
+
+ tv.tv_sec = (u64) timeout;
+ tv.tv_usec = ((u64) timeout - (u64) timeout) * 1e9;
+ rv = setsockopt (mq->q.evtfd, SOL_SOCKET, SO_RCVTIMEO,
+ (const char *) &tv, sizeof tv);
+ if (rv < 0)
+ {
+ clib_unix_warning ("setsockopt");
+ return -1;
+ }
+
+ rv = read (mq->q.evtfd, &buf, sizeof (buf));
+ if (rv < 0)
+ clib_warning ("read %u", errno);
+
+ return rv < 0 ? errno : 0;
+ }
+}
+
+u8 *
+format_svm_msg_q (u8 * s, va_list * args)
+{
+ svm_msg_q_t *mq = va_arg (*args, svm_msg_q_t *);
+ s = format (s, " [Q:%d/%d]", mq->q.shr->cursize, mq->q.shr->maxsize);
+ for (u32 i = 0; i < vec_len (mq->rings); i++)
+ {
+ s = format (s, " [R%d:%d/%d]", i, mq->rings[i].shr->cursize,
+ mq->rings[i].nitems);
+ }
+ return s;