offset = sizeof (*ring) + ring->nitems * ring->elsize;
ring = (void *) ((u8 *) ring + offset);
}
+ clib_spinlock_init (&mq->q.lock);
+}
+
+void
+svm_msg_q_cleanup (svm_msg_q_t *mq)
+{
+ vec_free (mq->rings);
+ clib_spinlock_free (&mq->q.lock);
+ if (mq->q.evtfd != -1)
+ close (mq->q.evtfd);
}
void
svm_msg_q_free (svm_msg_q_t * mq)
{
+ svm_msg_q_cleanup (mq);
clib_mem_free (mq->q.shr);
clib_mem_free (mq);
}
static void
-svm_msg_q_send_signal (svm_msg_q_t *mq)
+svm_msg_q_send_signal (svm_msg_q_t *mq, u8 is_consumer)
{
if (mq->q.evtfd == -1)
{
+ if (is_consumer)
+ {
+ int rv = pthread_mutex_lock (&mq->q.shr->mutex);
+ if (PREDICT_FALSE (rv == EOWNERDEAD))
+ {
+ rv = pthread_mutex_consistent (&mq->q.shr->mutex);
+ return;
+ }
+ }
+
(void) pthread_cond_broadcast (&mq->q.shr->condvar);
+
+ if (is_consumer)
+ pthread_mutex_unlock (&mq->q.shr->mutex);
}
else
{
{
if (svm_msg_q_try_lock (mq))
return -1;
- if (PREDICT_FALSE (svm_msg_q_is_full (mq)
- || svm_msg_q_ring_is_full (mq, ring_index)))
+ if (PREDICT_FALSE (svm_msg_q_or_ring_is_full (mq, ring_index)))
{
svm_msg_q_unlock (mq);
return -2;
else
{
svm_msg_q_lock (mq);
- while (svm_msg_q_is_full (mq)
- || svm_msg_q_ring_is_full (mq, ring_index))
- svm_msg_q_wait (mq);
+ while (svm_msg_q_or_ring_is_full (mq, ring_index))
+ svm_msg_q_or_ring_wait_prod (mq, ring_index);
*msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
}
return 0;
msg.ring_index = ring - mq->rings;
msg.elt_index = sr->tail;
sr->tail = (sr->tail + 1) % ring->nitems;
- clib_atomic_fetch_add_rel (&sr->cursize, 1);
+ clib_atomic_fetch_add_relax (&sr->cursize, 1);
break;
}
return msg;
{
svm_msg_q_ring_shared_t *sr;
svm_msg_q_ring_t *ring;
- int need_signal;
+ u32 need_signal;
ASSERT (vec_len (mq->rings) > msg->ring_index);
ring = svm_msg_q_ring_inline (mq, msg->ring_index);
}
else
{
- clib_warning ("message out of order");
+ clib_warning ("message out of order: elt %u head %u ring %u",
+ msg->elt_index, sr->head, msg->ring_index);
/* for now, expect messages to be processed in order */
ASSERT (0);
}
- need_signal = sr->cursize == ring->nitems;
- clib_atomic_fetch_sub_rel (&sr->cursize, 1);
+ need_signal = clib_atomic_load_relax_n (&sr->cursize) == ring->nitems;
+ clib_atomic_fetch_sub_relax (&sr->cursize, 1);
if (PREDICT_FALSE (need_signal))
- svm_msg_q_send_signal (mq);
+ svm_msg_q_send_signal (mq, 1 /* is consumer */);
}
static int
sz = clib_atomic_fetch_add_rel (&sq->cursize, 1);
if (!sz)
- svm_msg_q_send_signal (mq);
+ svm_msg_q_send_signal (mq, 0 /* is consumer */);
}
int
if (nowait)
return (-2);
while (svm_msg_q_is_full (mq))
- svm_msg_q_wait (mq);
+ svm_msg_q_wait_prod (mq);
}
svm_msg_q_add_raw (mq, (u8 *) msg);
svm_msg_q_unlock (mq);
}
-static int
-svm_msg_q_sub_raw (svm_msg_q_t *mq, u8 *elem)
+int
+svm_msg_q_sub_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *elem)
{
svm_msg_q_shared_queue_t *sq = mq->q.shr;
i8 *headp;
sq->head = (sq->head + 1) % sq->maxsize;
- sz = clib_atomic_fetch_sub_rel (&sq->cursize, 1);
+ sz = clib_atomic_fetch_sub_relax (&sq->cursize, 1);
if (PREDICT_FALSE (sz == sq->maxsize))
- svm_msg_q_send_signal (mq);
+ svm_msg_q_send_signal (mq, 1 /* is consumer */);
return 0;
}
int
-svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
- svm_q_conditional_wait_t cond, u32 time)
+svm_msg_q_sub_raw_batch (svm_msg_q_t *mq, svm_msg_q_msg_t *msg_buf, u32 n_msgs)
{
- int rc = 0;
+ svm_msg_q_shared_queue_t *sq = mq->q.shr;
+ u32 sz, to_deq;
+ i8 *headp;
+
+ sz = svm_msg_q_size (mq);
+ ASSERT (sz);
+ to_deq = clib_min (sz, n_msgs);
- if (cond == SVM_Q_NOWAIT)
+ headp = (i8 *) (&sq->data[0] + sq->elsize * sq->head);
+
+ if (sq->head + to_deq < sq->maxsize)
{
- /* zero on success */
- if (svm_msg_q_try_lock (mq))
- {
- return (-1);
- }
+ clib_memcpy_fast (msg_buf, headp, sq->elsize * to_deq);
+ sq->head += to_deq;
}
else
- svm_msg_q_lock (mq);
+ {
+ u32 first_batch = sq->maxsize - sq->head;
+ clib_memcpy_fast (msg_buf, headp, sq->elsize * first_batch);
+ clib_memcpy_fast (msg_buf + first_batch, sq->data,
+ sq->elsize * (to_deq - first_batch));
+ sq->head = (sq->head + to_deq) % sq->maxsize;
+ }
- if (PREDICT_FALSE (svm_msg_q_is_empty (mq)))
+ clib_atomic_fetch_sub_relax (&sq->cursize, to_deq);
+ if (PREDICT_FALSE (sz == sq->maxsize))
+ svm_msg_q_send_signal (mq, 1 /* is consumer */);
+
+ return to_deq;
+}
+
+int
+svm_msg_q_sub (svm_msg_q_t *mq, svm_msg_q_msg_t *msg,
+ svm_q_conditional_wait_t cond, u32 time)
+{
+ int rc = 0;
+
+ if (svm_msg_q_is_empty (mq))
{
if (cond == SVM_Q_NOWAIT)
{
- svm_msg_q_unlock (mq);
return (-2);
}
else if (cond == SVM_Q_TIMEDWAIT)
{
- while (svm_msg_q_is_empty (mq) && rc == 0)
- rc = svm_msg_q_timedwait (mq, time);
-
- if (rc == ETIMEDOUT)
- {
- svm_msg_q_unlock (mq);
- return ETIMEDOUT;
- }
+ if ((rc = svm_msg_q_timedwait (mq, time)))
+ return rc;
}
else
{
- while (svm_msg_q_is_empty (mq))
- svm_msg_q_wait (mq);
+ svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
}
}
- svm_msg_q_sub_raw (mq, (u8 *) msg);
-
- svm_msg_q_unlock (mq);
+ svm_msg_q_sub_raw (mq, msg);
return 0;
}
-void
-svm_msg_q_sub_w_lock (svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
-{
- svm_msg_q_sub_raw (mq, (u8 *) msg);
-}
-
void
svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd)
{
svm_msg_q_alloc_eventfd (svm_msg_q_t *mq)
{
int fd;
- if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
+ if ((fd = eventfd (0, 0)) < 0)
return -1;
svm_msg_q_set_eventfd (mq, fd);
return 0;
}
-void
-svm_msg_q_wait (svm_msg_q_t *mq)
+int
+svm_msg_q_wait (svm_msg_q_t *mq, svm_msg_q_wait_type_t type)
{
+ u8 (*fn) (svm_msg_q_t *);
+ int rv;
+
+ fn = (type == SVM_MQ_WAIT_EMPTY) ? svm_msg_q_is_empty : svm_msg_q_is_full;
+
if (mq->q.evtfd == -1)
{
- pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex);
+ rv = pthread_mutex_lock (&mq->q.shr->mutex);
+ if (PREDICT_FALSE (rv == EOWNERDEAD))
+ {
+ rv = pthread_mutex_consistent (&mq->q.shr->mutex);
+ return rv;
+ }
+
+ while (fn (mq))
+ pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex);
+
+ pthread_mutex_unlock (&mq->q.shr->mutex);
+ }
+ else
+ {
+ u64 buf;
+
+ while (fn (mq))
+ {
+ while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0)
+ {
+ if (errno != EAGAIN)
+ {
+ clib_unix_warning ("read error");
+ return rv;
+ }
+ }
+ }
+ }
+
+ return 0;
+}
+
+int
+svm_msg_q_wait_prod (svm_msg_q_t *mq)
+{
+ if (mq->q.evtfd == -1)
+ {
+ while (svm_msg_q_is_full (mq))
+ pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex);
}
else
{
u64 buf;
int rv;
- svm_msg_q_unlock (mq);
- while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0)
+ while (svm_msg_q_is_full (mq))
{
- if (errno != EAGAIN)
+ while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0)
{
- clib_unix_warning ("read error");
- return;
+ if (errno != EAGAIN)
+ {
+ clib_unix_warning ("read error");
+ return rv;
+ }
}
}
- svm_msg_q_lock (mq);
}
+
+ return 0;
+}
+
+int
+svm_msg_q_or_ring_wait_prod (svm_msg_q_t *mq, u32 ring_index)
+{
+ if (mq->q.evtfd == -1)
+ {
+ while (svm_msg_q_or_ring_is_full (mq, ring_index))
+ pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex);
+ }
+ else
+ {
+ u64 buf;
+ int rv;
+
+ while (svm_msg_q_or_ring_is_full (mq, ring_index))
+ {
+ while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0)
+ {
+ if (errno != EAGAIN)
+ {
+ clib_unix_warning ("read error");
+ return rv;
+ }
+ }
+ }
+ }
+
+ return 0;
}
int
{
if (mq->q.evtfd == -1)
{
+ svm_msg_q_shared_queue_t *sq = mq->q.shr;
struct timespec ts;
+ u32 sz;
+ int rv;
+
+ rv = pthread_mutex_lock (&sq->mutex);
+ if (PREDICT_FALSE (rv == EOWNERDEAD))
+ {
+ rv = pthread_mutex_consistent (&sq->mutex);
+ return rv;
+ }
+
+ /* check if we're still in a signalable state after grabbing lock */
+ sz = svm_msg_q_size (mq);
+ if (sz != 0 && sz != sq->maxsize)
+ {
+ pthread_mutex_unlock (&sq->mutex);
+ return 0;
+ }
+
ts.tv_sec = unix_time_now () + (u32) timeout;
ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
- return pthread_cond_timedwait (&mq->q.shr->condvar, &mq->q.shr->mutex,
- &ts);
+ rv = pthread_cond_timedwait (&sq->condvar, &sq->mutex, &ts);
+
+ pthread_mutex_unlock (&sq->mutex);
+ return rv;
}
else
{
tv.tv_sec = (u64) timeout;
tv.tv_usec = ((u64) timeout - (u64) timeout) * 1e9;
- setsockopt (mq->q.evtfd, SOL_SOCKET, SO_RCVTIMEO, (const char *) &tv,
- sizeof tv);
+ rv = setsockopt (mq->q.evtfd, SOL_SOCKET, SO_RCVTIMEO,
+ (const char *) &tv, sizeof tv);
+ if (rv < 0)
+ {
+ clib_unix_warning ("setsockopt");
+ return -1;
+ }
- svm_msg_q_unlock (mq);
rv = read (mq->q.evtfd, &buf, sizeof (buf));
if (rv < 0)
clib_warning ("read %u", errno);
- svm_msg_q_lock (mq);
return rv < 0 ? errno : 0;
}