vcl: use events for epoll/select/read/write
[vpp.git] / src / svm / message_queue.c
index 0f9be9c..1b2d2e1 100644 (file)
@@ -73,6 +73,7 @@ svm_msg_q_alloc (svm_msg_q_cfg_t * cfg)
       ring = &mq->rings[i];
       ring->elsize = cfg->ring_cfgs[i].elsize;
       ring->nitems = cfg->ring_cfgs[i].nitems;
+      ring->cursize = ring->head = ring->tail = 0;
       if (cfg->ring_cfgs[i].data)
        ring->data = cfg->ring_cfgs[i].data;
       else
@@ -97,10 +98,10 @@ svm_msg_q_free (svm_msg_q_t * mq)
 svm_msg_q_msg_t
 svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index)
 {
-  svm_msg_q_msg_t msg = {.as_u64 = ~0 };
+  svm_msg_q_msg_t msg;
   svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index);
 
-  ASSERT (ring->cursize != ring->nitems);
+  ASSERT (ring->cursize < ring->nitems);
   msg.ring_index = ring - mq->rings;
   msg.elt_index = ring->tail;
   ring->tail = (ring->tail + 1) % ring->nitems;
@@ -131,12 +132,9 @@ svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
   else
     {
       svm_msg_q_lock (mq);
+      while (svm_msg_q_ring_is_full (mq, ring_index))
+       svm_msg_q_wait (mq);
       *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
-      while (svm_msg_q_msg_is_invalid (msg))
-       {
-         svm_msg_q_wait (mq);
-         *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
-       }
     }
   return 0;
 }
@@ -190,18 +188,20 @@ svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
 static int
 svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
 {
+  u32 dist1, dist2, tail, head;
   svm_msg_q_ring_t *ring;
-  u32 dist1, dist2;
 
   if (vec_len (mq->rings) <= msg->ring_index)
     return 0;
   ring = &mq->rings[msg->ring_index];
+  tail = ring->tail;
+  head = ring->head;
 
-  dist1 = ((ring->nitems + msg->elt_index) - ring->head) % ring->nitems;
-  if (ring->tail == ring->head)
+  dist1 = ((ring->nitems + msg->elt_index) - head) % ring->nitems;
+  if (tail == head)
     dist2 = (ring->cursize == 0) ? 0 : ring->nitems;
   else
-    dist2 = ((ring->nitems + ring->tail) - ring->head) % ring->nitems;
+    dist2 = ((ring->nitems + tail) - head) % ring->nitems;
   return (dist1 < dist2);
 }