session: segment handle in accept/connect notifications
[vpp.git] / src / svm / message_queue.c
index e97cab8..13d089a 100644 (file)
@@ -15,6 +15,7 @@
 
 #include <svm/message_queue.h>
 #include <vppinfra/mem.h>
+#include <sys/eventfd.h>
 
 static inline svm_msg_q_ring_t *
 svm_msg_q_ring_inline (svm_msg_q_t * mq, u32 ring_index)
@@ -107,7 +108,7 @@ svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index)
   msg.ring_index = ring - mq->rings;
   msg.elt_index = ring->tail;
   ring->tail = (ring->tail + 1) % ring->nitems;
-  __sync_fetch_and_add (&ring->cursize, 1);
+  clib_atomic_fetch_add (&ring->cursize, 1);
   return msg;
 }
 
@@ -154,7 +155,7 @@ svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes)
     msg.ring_index = ring - mq->rings;
     msg.elt_index = ring->tail;
     ring->tail = (ring->tail + 1) % ring->nitems;
-    __sync_fetch_and_add (&ring->cursize, 1);
+    clib_atomic_fetch_add (&ring->cursize, 1);
     break;
   }
   return msg;
@@ -172,8 +173,7 @@ svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
 {
   svm_msg_q_ring_t *ring;
 
-  if (vec_len (mq->rings) <= msg->ring_index)
-    return;
+  ASSERT (vec_len (mq->rings) > msg->ring_index);
   ring = &mq->rings[msg->ring_index];
   if (msg->elt_index == ring->head)
     {
@@ -181,10 +181,11 @@ svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
     }
   else
     {
+      clib_warning ("message out of order");
       /* for now, expect messages to be processed in order */
       ASSERT (0);
     }
-  __sync_fetch_and_sub (&ring->cursize, 1);
+  clib_atomic_fetch_sub (&ring->cursize, 1);
 }
 
 static int
@@ -235,6 +236,38 @@ svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
   svm_queue_sub_raw (mq->q, (u8 *) msg);
 }
 
+void
+svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd)
+{
+  mq->q->consumer_evtfd = fd;
+}
+
+void
+svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd)
+{
+  mq->q->producer_evtfd = fd;
+}
+
+int
+svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq)
+{
+  int fd;
+  if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
+    return -1;
+  svm_msg_q_set_consumer_eventfd (mq, fd);
+  return 0;
+}
+
+int
+svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq)
+{
+  int fd;
+  if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
+    return -1;
+  svm_msg_q_set_producer_eventfd (mq, fd);
+  return 0;
+}
+
 /*
  * fd.io coding-style-patch-verification: ON
  *