clib_spinlock_init (&mq->q.lock);
}
+void
+svm_msg_q_cleanup (svm_msg_q_t *mq)
+{
+ vec_free (mq->rings);
+ clib_spinlock_free (&mq->q.lock);
+ if (mq->q.evtfd != -1)
+ close (mq->q.evtfd);
+}
+
void
svm_msg_q_free (svm_msg_q_t * mq)
{
+ svm_msg_q_cleanup (mq);
clib_mem_free (mq->q.shr);
- clib_spinlock_free (&mq->q.lock);
clib_mem_free (mq);
}
{
if (svm_msg_q_try_lock (mq))
return -1;
- if (PREDICT_FALSE (svm_msg_q_is_full (mq)
- || svm_msg_q_ring_is_full (mq, ring_index)))
+ if (PREDICT_FALSE (svm_msg_q_or_ring_is_full (mq, ring_index)))
{
svm_msg_q_unlock (mq);
return -2;
else
{
svm_msg_q_lock (mq);
- while (svm_msg_q_is_full (mq)
- || svm_msg_q_ring_is_full (mq, ring_index))
- svm_msg_q_wait_prod (mq);
+ while (svm_msg_q_or_ring_is_full (mq, ring_index))
+ svm_msg_q_or_ring_wait_prod (mq, ring_index);
*msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
}
return 0;
return (dist1 < dist2);
}
-static void
-svm_msg_q_add_raw (svm_msg_q_t *mq, u8 *elem)
+void
+svm_msg_q_add_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
{
svm_msg_q_shared_queue_t *sq = mq->q.shr;
i8 *tailp;
u32 sz;
tailp = (i8 *) (&sq->data[0] + sq->elsize * sq->tail);
- clib_memcpy_fast (tailp, elem, sq->elsize);
+ clib_memcpy_fast (tailp, msg, sq->elsize);
sq->tail = (sq->tail + 1) % sq->maxsize;
svm_msg_q_wait_prod (mq);
}
- svm_msg_q_add_raw (mq, (u8 *) msg);
+ svm_msg_q_add_raw (mq, msg);
svm_msg_q_unlock (mq);
svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
{
ASSERT (svm_msq_q_msg_is_valid (mq, msg));
- svm_msg_q_add_raw (mq, (u8 *) msg);
+ svm_msg_q_add_raw (mq, msg);
svm_msg_q_unlock (mq);
}
svm_msg_q_alloc_eventfd (svm_msg_q_t *mq)
{
int fd;
- if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
+ if ((fd = eventfd (0, 0)) < 0)
return -1;
svm_msg_q_set_eventfd (mq, fd);
return 0;
return 0;
}
+int
+svm_msg_q_or_ring_wait_prod (svm_msg_q_t *mq, u32 ring_index)
+{
+ if (mq->q.evtfd == -1)
+ {
+ while (svm_msg_q_or_ring_is_full (mq, ring_index))
+ pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex);
+ }
+ else
+ {
+ u64 buf;
+ int rv;
+
+ while (svm_msg_q_or_ring_is_full (mq, ring_index))
+ {
+ while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0)
+ {
+ if (errno != EAGAIN)
+ {
+ clib_unix_warning ("read error");
+ return rv;
+ }
+ }
+ }
+ }
+
+ return 0;
+}
+
int
svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout)
{