clib_spinlock_init (&mq->q.lock);
}
+void
+svm_msg_q_cleanup (svm_msg_q_t *mq)
+{
+ vec_free (mq->rings);
+ clib_spinlock_free (&mq->q.lock);
+ if (mq->q.evtfd != -1)
+ close (mq->q.evtfd);
+}
+
void
svm_msg_q_free (svm_msg_q_t * mq)
{
+ svm_msg_q_cleanup (mq);
clib_mem_free (mq->q.shr);
- clib_spinlock_free (&mq->q.lock);
clib_mem_free (mq);
}
{
if (svm_msg_q_try_lock (mq))
return -1;
- if (PREDICT_FALSE (svm_msg_q_is_full (mq)
- || svm_msg_q_ring_is_full (mq, ring_index)))
+ if (PREDICT_FALSE (svm_msg_q_or_ring_is_full (mq, ring_index)))
{
svm_msg_q_unlock (mq);
return -2;
else
{
svm_msg_q_lock (mq);
- while (svm_msg_q_is_full (mq)
- || svm_msg_q_ring_is_full (mq, ring_index))
- svm_msg_q_wait (mq, SVM_MQ_WAIT_FULL);
+ while (svm_msg_q_or_ring_is_full (mq, ring_index))
+ svm_msg_q_or_ring_wait_prod (mq, ring_index);
*msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
}
return 0;
if (nowait)
return (-2);
while (svm_msg_q_is_full (mq))
- svm_msg_q_wait (mq, SVM_MQ_WAIT_FULL);
+ svm_msg_q_wait_prod (mq);
}
svm_msg_q_add_raw (mq, (u8 *) msg);
svm_msg_q_alloc_eventfd (svm_msg_q_t *mq)
{
int fd;
- if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
+ if ((fd = eventfd (0, 0)) < 0)
return -1;
svm_msg_q_set_eventfd (mq, fd);
return 0;
return 0;
}
+int
+svm_msg_q_wait_prod (svm_msg_q_t *mq)
+{
+ if (mq->q.evtfd == -1)
+ {
+ while (svm_msg_q_is_full (mq))
+ pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex);
+ }
+ else
+ {
+ u64 buf;
+ int rv;
+
+ while (svm_msg_q_is_full (mq))
+ {
+ while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0)
+ {
+ if (errno != EAGAIN)
+ {
+ clib_unix_warning ("read error");
+ return rv;
+ }
+ }
+ }
+ }
+
+ return 0;
+}
+
+int
+svm_msg_q_or_ring_wait_prod (svm_msg_q_t *mq, u32 ring_index)
+{
+ if (mq->q.evtfd == -1)
+ {
+ while (svm_msg_q_or_ring_is_full (mq, ring_index))
+ pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex);
+ }
+ else
+ {
+ u64 buf;
+ int rv;
+
+ while (svm_msg_q_or_ring_is_full (mq, ring_index))
+ {
+ while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0)
+ {
+ if (errno != EAGAIN)
+ {
+ clib_unix_warning ("read error");
+ return rv;
+ }
+ }
+ }
+ }
+
+ return 0;
+}
+
int
svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout)
{
tv.tv_sec = (u64) timeout;
tv.tv_usec = ((u64) timeout - (u64) timeout) * 1e9;
- setsockopt (mq->q.evtfd, SOL_SOCKET, SO_RCVTIMEO, (const char *) &tv,
- sizeof tv);
+ rv = setsockopt (mq->q.evtfd, SOL_SOCKET, SO_RCVTIMEO,
+ (const char *) &tv, sizeof tv);
+ if (rv < 0)
+ {
+ clib_unix_warning ("setsockopt");
+ return -1;
+ }
rv = read (mq->q.evtfd, &buf, sizeof (buf));
if (rv < 0)