svm: add custom q implementation for mq 20/30920/6
authorFlorin Coras <fcoras@cisco.com>
Fri, 22 Jan 2021 23:05:14 +0000 (15:05 -0800)
committerDave Barach <openvpp@barachs.net>
Mon, 25 Jan 2021 15:34:21 +0000 (15:34 +0000)
Add separate queue implementation for the message queue as it's custom
tailored for fifo segments as opposed to binary api.

Also move eventfds to the private data structures.

Type: refactor

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I6df0c824ecd94c7904516373f92a9fffc6b04736

12 files changed:
src/plugins/hs_apps/sapi/vpp_echo_bapi.c
src/plugins/unittest/session_test.c
src/svm/fifo_segment.c
src/svm/message_queue.c
src/svm/message_queue.h
src/vcl/vcl_bapi.c
src/vcl/vcl_private.c
src/vcl/vcl_sapi.c
src/vnet/session/segment_manager.c
src/vnet/session/session.c
src/vnet/session/session_api.c
src/vnet/session/session_debug.c

index 807ec62..915448a 100644 (file)
@@ -437,7 +437,7 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
       echo_segment_attach_mq (segment_handle, mp->app_mq, 0, &em->app_mq);
 
       if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
-       svm_msg_q_set_consumer_eventfd (em->app_mq, fds[n_fds++]);
+       svm_msg_q_set_eventfd (em->app_mq, fds[n_fds++]);
 
       vec_free (fds);
     }
index 68605b2..6496a99 100644 (file)
@@ -1839,9 +1839,8 @@ session_test_mq_speed (vlib_main_t * vm, unformat_input_t * input)
   mq = app_wrk->event_queue;
   if (use_eventfd)
     {
-      svm_msg_q_alloc_producer_eventfd (mq);
-      svm_msg_q_alloc_consumer_eventfd (mq);
-      prod_fd = svm_msg_q_get_producer_eventfd (mq);
+      svm_msg_q_alloc_eventfd (mq);
+      prod_fd = svm_msg_q_get_eventfd (mq);
       SESSION_TEST (prod_fd != -1, "mq producer eventd valid %u", prod_fd);
     }
 
index d309324..3e9aecb 100644 (file)
@@ -1042,7 +1042,7 @@ fifo_segment_msg_q_attach (fifo_segment_t *fs, uword offset, u32 mq_index)
 
   mq = vec_elt_at_index (fs->mqs, mq_index);
 
-  if (!mq->q)
+  if (!mq->q.shr)
     {
       svm_msg_q_shared_t *smq;
       smq = (svm_msg_q_shared_t *) ((u8 *) fs->h + offset);
@@ -1059,10 +1059,11 @@ fifo_segment_msg_q_offset (fifo_segment_t *fs, u32 mq_index)
 {
   svm_msg_q_t *mq = vec_elt_at_index (fs->mqs, mq_index);
 
-  if (mq->q == 0)
+  if (mq->q.shr == 0)
     return ~0ULL;
 
-  return (uword) ((u8 *) mq->q - (u8 *) fs->h) - sizeof (svm_msg_q_shared_t);
+  return (uword) ((u8 *) mq->q.shr - (u8 *) fs->h) -
+        sizeof (svm_msg_q_shared_t);
 }
 
 int
index 0ebce70..fdf9293 100644 (file)
@@ -16,7 +16,9 @@
 #include <svm/message_queue.h>
 #include <vppinfra/mem.h>
 #include <vppinfra/format.h>
+#include <vppinfra/time.h>
 #include <sys/eventfd.h>
+#include <sys/socket.h>
 
 static inline svm_msg_q_ring_t *
 svm_msg_q_ring_inline (svm_msg_q_t * mq, u32 ring_index)
@@ -37,19 +39,51 @@ svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index)
   return (ring->shr->data + elt_index * ring->elsize);
 }
 
+static void
+svm_msg_q_init_mutex (svm_msg_q_shared_queue_t *sq)
+{
+  pthread_mutexattr_t attr;
+  pthread_condattr_t cattr;
+
+  clib_memset (&attr, 0, sizeof (attr));
+  clib_memset (&cattr, 0, sizeof (cattr));
+
+  if (pthread_mutexattr_init (&attr))
+    clib_unix_warning ("mutexattr_init");
+  if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
+    clib_unix_warning ("pthread_mutexattr_setpshared");
+  if (pthread_mutexattr_setrobust (&attr, PTHREAD_MUTEX_ROBUST))
+    clib_unix_warning ("setrobust");
+  if (pthread_mutex_init (&sq->mutex, &attr))
+    clib_unix_warning ("mutex_init");
+  if (pthread_mutexattr_destroy (&attr))
+    clib_unix_warning ("mutexattr_destroy");
+  if (pthread_condattr_init (&cattr))
+    clib_unix_warning ("condattr_init");
+  if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
+    clib_unix_warning ("condattr_setpshared");
+  if (pthread_cond_init (&sq->condvar, &cattr))
+    clib_unix_warning ("cond_init1");
+  if (pthread_condattr_destroy (&cattr))
+    clib_unix_warning ("cond_init2");
+}
+
 svm_msg_q_shared_t *
 svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg)
 {
   svm_msg_q_ring_shared_t *ring;
+  svm_msg_q_shared_queue_t *sq;
   svm_msg_q_shared_t *smq;
   u32 q_sz, offset;
   int i;
 
-  q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
+  q_sz = sizeof (*sq) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
 
   smq = (svm_msg_q_shared_t *) base;
-  svm_queue_init (&smq->q, cfg->q_nitems, sizeof (svm_msg_q_msg_t));
-  smq->q->consumer_pid = cfg->consumer_pid;
+  sq = smq->q;
+  clib_memset (sq, 0, sizeof (*sq));
+  sq->elsize = sizeof (svm_msg_q_msg_t);
+  sq->maxsize = cfg->q_nitems;
   smq->n_rings = cfg->n_rings;
   ring = (void *) ((u8 *) smq->q + q_sz);
   for (i = 0; i < cfg->n_rings; i++)
@@ -61,6 +95,8 @@ svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg)
       ring = (void *) ((u8 *) ring + offset);
     }
 
+  svm_msg_q_init_mutex (sq);
+
   return smq;
 }
 
@@ -83,7 +119,8 @@ svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg)
       rings_sz += (uword) ring_cfg->nitems * ring_cfg->elsize;
     }
 
-  q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
+  q_sz = sizeof (svm_msg_q_shared_queue_t) +
+        cfg->q_nitems * sizeof (svm_msg_q_msg_t);
   mq_sz = sizeof (svm_msg_q_shared_t) + q_sz + rings_sz;
 
   return mq_sz;
@@ -111,10 +148,12 @@ svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base)
   u32 i, n_rings, q_sz, offset;
 
   smq = (svm_msg_q_shared_t *) smq_base;
-  mq->q = smq->q;
+  mq->q.shr = smq->q;
+  mq->q.evtfd = -1;
   n_rings = smq->n_rings;
   vec_validate (mq->rings, n_rings - 1);
-  q_sz = sizeof (svm_queue_t) + mq->q->maxsize * sizeof (svm_msg_q_msg_t);
+  q_sz = sizeof (svm_msg_q_shared_queue_t) +
+        mq->q.shr->maxsize * sizeof (svm_msg_q_msg_t);
   ring = (void *) ((u8 *) smq->q + q_sz);
   for (i = 0; i < n_rings; i++)
     {
@@ -129,10 +168,31 @@ svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base)
 void
 svm_msg_q_free (svm_msg_q_t * mq)
 {
-  svm_queue_free (mq->q);
+  clib_mem_free (mq->q.shr);
   clib_mem_free (mq);
 }
 
+static void
+svm_msg_q_send_signal (svm_msg_q_t *mq)
+{
+  if (mq->q.evtfd == -1)
+    {
+      (void) pthread_cond_broadcast (&mq->q.shr->condvar);
+    }
+  else
+    {
+      int __clib_unused rv;
+      u64 data = 1;
+
+      if (mq->q.evtfd < 0)
+       return;
+
+      rv = write (mq->q.evtfd, &data, sizeof (data));
+      if (PREDICT_FALSE (rv < 0))
+       clib_unix_warning ("signal write on %d returned %d", mq->q.evtfd, rv);
+    }
+}
+
 svm_msg_q_msg_t
 svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index)
 {
@@ -147,7 +207,7 @@ svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index)
   msg.ring_index = ring - mq->rings;
   msg.elt_index = sr->tail;
   sr->tail = (sr->tail + 1) % ring->nitems;
-  clib_atomic_fetch_add (&sr->cursize, 1);
+  clib_atomic_fetch_add_rel (&sr->cursize, 1);
   return msg;
 }
 
@@ -193,7 +253,7 @@ svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes)
     msg.ring_index = ring - mq->rings;
     msg.elt_index = sr->tail;
     sr->tail = (sr->tail + 1) % ring->nitems;
-    clib_atomic_fetch_add (&sr->cursize, 1);
+    clib_atomic_fetch_add_rel (&sr->cursize, 1);
     break;
   }
   return msg;
@@ -228,10 +288,10 @@ svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
     }
 
   need_signal = sr->cursize == ring->nitems;
-  clib_atomic_fetch_sub (&sr->cursize, 1);
+  clib_atomic_fetch_sub_rel (&sr->cursize, 1);
 
   if (PREDICT_FALSE (need_signal))
-    svm_queue_send_signal (mq->q, 0);
+    svm_msg_q_send_signal (mq);
 }
 
 static int
@@ -257,71 +317,216 @@ svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
   return (dist1 < dist2);
 }
 
+static void
+svm_msg_q_add_raw (svm_msg_q_t *mq, u8 *elem)
+{
+  svm_msg_q_shared_queue_t *sq = mq->q.shr;
+  i8 *tailp;
+  u32 sz;
+
+  tailp = (i8 *) (&sq->data[0] + sq->elsize * sq->tail);
+  clib_memcpy_fast (tailp, elem, sq->elsize);
+
+  sq->tail = (sq->tail + 1) % sq->maxsize;
+
+  sz = clib_atomic_fetch_add_rel (&sq->cursize, 1);
+  if (!sz)
+    svm_msg_q_send_signal (mq);
+}
+
 int
 svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait)
 {
   ASSERT (svm_msq_q_msg_is_valid (mq, msg));
-  return svm_queue_add (mq->q, (u8 *) msg, nowait);
+
+  if (nowait)
+    {
+      /* zero on success */
+      if (svm_msg_q_try_lock (mq))
+       {
+         return (-1);
+       }
+    }
+  else
+    svm_msg_q_lock (mq);
+
+  if (PREDICT_FALSE (svm_msg_q_is_full (mq)))
+    {
+      if (nowait)
+       return (-2);
+      while (svm_msg_q_is_full (mq))
+       svm_msg_q_wait (mq);
+    }
+
+  svm_msg_q_add_raw (mq, (u8 *) msg);
+
+  svm_msg_q_unlock (mq);
+
+  return 0;
 }
 
 void
 svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
 {
   ASSERT (svm_msq_q_msg_is_valid (mq, msg));
-  svm_queue_add_raw (mq->q, (u8 *) msg);
+  svm_msg_q_add_raw (mq, (u8 *) msg);
   svm_msg_q_unlock (mq);
 }
 
+static int
+svm_msg_q_sub_raw (svm_msg_q_t *mq, u8 *elem)
+{
+  svm_msg_q_shared_queue_t *sq = mq->q.shr;
+  i8 *headp;
+  u32 sz;
+
+  ASSERT (!svm_msg_q_is_empty (mq));
+
+  headp = (i8 *) (&sq->data[0] + sq->elsize * sq->head);
+  clib_memcpy_fast (elem, headp, sq->elsize);
+
+  sq->head = (sq->head + 1) % sq->maxsize;
+
+  sz = clib_atomic_fetch_sub_rel (&sq->cursize, 1);
+  if (PREDICT_FALSE (sz == sq->maxsize))
+    svm_msg_q_send_signal (mq);
+
+  return 0;
+}
+
 int
 svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
               svm_q_conditional_wait_t cond, u32 time)
 {
-  return svm_queue_sub (mq->q, (u8 *) msg, cond, time);
-}
+  int rc = 0;
 
-void
-svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
-{
-  svm_queue_sub_raw (mq->q, (u8 *) msg);
+  if (cond == SVM_Q_NOWAIT)
+    {
+      /* zero on success */
+      if (svm_msg_q_try_lock (mq))
+       {
+         return (-1);
+       }
+    }
+  else
+    svm_msg_q_lock (mq);
+
+  if (PREDICT_FALSE (svm_msg_q_is_empty (mq)))
+    {
+      if (cond == SVM_Q_NOWAIT)
+       {
+         svm_msg_q_unlock (mq);
+         return (-2);
+       }
+      else if (cond == SVM_Q_TIMEDWAIT)
+       {
+         while (svm_msg_q_is_empty (mq) && rc == 0)
+           rc = svm_msg_q_timedwait (mq, time);
+
+         if (rc == ETIMEDOUT)
+           {
+             svm_msg_q_unlock (mq);
+             return ETIMEDOUT;
+           }
+       }
+      else
+       {
+         while (svm_msg_q_is_empty (mq))
+           svm_msg_q_wait (mq);
+       }
+    }
+
+  svm_msg_q_sub_raw (mq, (u8 *) msg);
+
+  svm_msg_q_unlock (mq);
+
+  return 0;
 }
 
 void
-svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd)
+svm_msg_q_sub_w_lock (svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
 {
-  mq->q->consumer_evtfd = fd;
+  svm_msg_q_sub_raw (mq, (u8 *) msg);
 }
 
 void
-svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd)
+svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd)
 {
-  mq->q->producer_evtfd = fd;
+  mq->q.evtfd = fd;
 }
 
 int
-svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq)
+svm_msg_q_alloc_eventfd (svm_msg_q_t *mq)
 {
   int fd;
   if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
     return -1;
-  svm_msg_q_set_consumer_eventfd (mq, fd);
+  svm_msg_q_set_eventfd (mq, fd);
   return 0;
 }
 
+void
+svm_msg_q_wait (svm_msg_q_t *mq)
+{
+  if (mq->q.evtfd == -1)
+    {
+      pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex);
+    }
+  else
+    {
+      u64 buf;
+      int rv;
+
+      svm_msg_q_unlock (mq);
+      while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0)
+       {
+         if (errno != EAGAIN)
+           {
+             clib_unix_warning ("read error");
+             return;
+           }
+       }
+      svm_msg_q_lock (mq);
+    }
+}
+
 int
-svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq)
+svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout)
 {
-  int fd;
-  if ((fd = eventfd (0, EFD_NONBLOCK)) < 0)
-    return -1;
-  svm_msg_q_set_producer_eventfd (mq, fd);
-  return 0;
+  if (mq->q.evtfd == -1)
+    {
+      struct timespec ts;
+      ts.tv_sec = unix_time_now () + (u32) timeout;
+      ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
+      return pthread_cond_timedwait (&mq->q.shr->condvar, &mq->q.shr->mutex,
+                                    &ts);
+    }
+  else
+    {
+      struct timeval tv;
+      u64 buf;
+      int rv;
+
+      tv.tv_sec = (u64) timeout;
+      tv.tv_usec = ((u64) timeout - (u64) timeout) * 1e9;
+      setsockopt (mq->q.evtfd, SOL_SOCKET, SO_RCVTIMEO, (const char *) &tv,
+                 sizeof tv);
+
+      svm_msg_q_unlock (mq);
+      rv = read (mq->q.evtfd, &buf, sizeof (buf));
+      if (rv < 0)
+       clib_warning ("read %u", errno);
+      svm_msg_q_lock (mq);
+
+      return rv < 0 ? errno : 0;
+    }
 }
 
 u8 *
 format_svm_msg_q (u8 * s, va_list * args)
 {
   svm_msg_q_t *mq = va_arg (*args, svm_msg_q_t *);
-  s = format (s, " [Q:%d/%d]", mq->q->cursize, mq->q->maxsize);
+  s = format (s, " [Q:%d/%d]", mq->q.shr->cursize, mq->q.shr->maxsize);
   for (u32 i = 0; i < vec_len (mq->rings); i++)
     {
       s = format (s, " [R%d:%d/%d]", i, mq->rings[i].shr->cursize,
index 4b314b8..7716c67 100644 (file)
 #include <vppinfra/error.h>
 #include <svm/queue.h>
 
+typedef struct svm_msg_q_shr_queue_
+{
+  pthread_mutex_t mutex;  /* 8 bytes */
+  pthread_cond_t condvar; /* 8 bytes */
+  u32 head;
+  u32 tail;
+  volatile u32 cursize;
+  u32 maxsize;
+  u32 elsize;
+  u32 pad;
+  u8 data[0];
+} svm_msg_q_shared_queue_t;
+
+typedef struct svm_msg_q_queue_
+{
+  svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */
+  int evtfd;                    /**< producer/consumer eventfd */
+} svm_msg_q_queue_t;
+
 typedef struct svm_msg_q_ring_shared_
 {
   volatile u32 cursize;                        /**< current size of the ring */
@@ -43,14 +62,14 @@ typedef struct svm_msg_q_ring_
 
 typedef struct svm_msg_q_shared_
 {
-  u32 n_rings;     /**< number of rings after q */
-  u32 pad;         /**< 8 byte alignment for q */
-  svm_queue_t q[0]; /**< queue for exchanging messages */
+  u32 n_rings;                  /**< number of rings after q */
+  u32 pad;                      /**< 8 byte alignment for q */
+  svm_msg_q_shared_queue_t q[0]; /**< queue for exchanging messages */
 } __clib_packed svm_msg_q_shared_t;
 
 typedef struct svm_msg_q_
 {
-  svm_queue_t *q;                      /**< queue for exchanging messages */
+  svm_msg_q_queue_t q;                 /**< queue for exchanging messages */
   svm_msg_q_ring_t *rings;             /**< rings with message data*/
 } __clib_packed svm_msg_q_t;
 
@@ -232,7 +251,7 @@ void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
 svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
 
 /**
- * Set event fd for queue consumer
+ * Set event fd for queue
  *
  * If set, queue will exclusively use eventfds for signaling. Moreover,
  * afterwards, the queue should only be used in non-blocking mode. Waiting
@@ -241,35 +260,26 @@ svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
  * @param mq           message queue
  * @param fd           consumer eventfd
  */
-void svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd);
+void svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd);
 
 /**
- * Set event fd for queue producer
- *
- * If set, queue will exclusively use eventfds for signaling. Moreover,
- * afterwards, the queue should only be used in non-blocking mode. Waiting
- * for events should be done externally using something like epoll.
- *
- * @param mq           message queue
- * @param fd           producer eventfd
+ * Allocate event fd for queue
  */
-void svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd);
+int svm_msg_q_alloc_eventfd (svm_msg_q_t *mq);
 
 /**
- * Allocate event fd for queue consumer
+ * Format message queue, shows msg count for each ring
  */
-int svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq);
+u8 *format_svm_msg_q (u8 *s, va_list *args);
 
 /**
- * Allocate event fd for queue consumer
- */
-int svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq);
-
-
-/**
- * Format message queue, shows msg count for each ring
+ * Check length of message queue
  */
-u8 *format_svm_msg_q (u8 * s, va_list * args);
+static inline u32
+svm_msg_q_size (svm_msg_q_t *mq)
+{
+  return clib_atomic_load_relax_n (&mq->q.shr->cursize);
+}
 
 /**
  * Check if message queue is full
@@ -277,14 +287,14 @@ u8 *format_svm_msg_q (u8 * s, va_list * args);
 static inline u8
 svm_msg_q_is_full (svm_msg_q_t * mq)
 {
-  return (mq->q->cursize == mq->q->maxsize);
+  return (svm_msg_q_size (mq) == mq->q.shr->maxsize);
 }
 
 static inline u8
 svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
 {
   svm_msg_q_ring_t *ring = vec_elt_at_index (mq->rings, ring_index);
-  return (ring->shr->cursize >= ring->nitems);
+  return (clib_atomic_load_relax_n (&ring->shr->cursize) >= ring->nitems);
 }
 
 /**
@@ -293,16 +303,7 @@ svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
 static inline u8
 svm_msg_q_is_empty (svm_msg_q_t * mq)
 {
-  return (mq->q->cursize == 0);
-}
-
-/**
- * Check length of message queue
- */
-static inline u32
-svm_msg_q_size (svm_msg_q_t * mq)
-{
-  return mq->q->cursize;
+  return (svm_msg_q_size (mq) == 0);
 }
 
 /**
@@ -320,9 +321,9 @@ svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
 static inline int
 svm_msg_q_try_lock (svm_msg_q_t * mq)
 {
-  int rv = pthread_mutex_trylock (&mq->q->mutex);
+  int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
   if (PREDICT_FALSE (rv == EOWNERDEAD))
-    rv = pthread_mutex_consistent (&mq->q->mutex);
+    rv = pthread_mutex_consistent (&mq->q.shr->mutex);
   return rv;
 }
 
@@ -332,9 +333,9 @@ svm_msg_q_try_lock (svm_msg_q_t * mq)
 static inline int
 svm_msg_q_lock (svm_msg_q_t * mq)
 {
-  int rv = pthread_mutex_lock (&mq->q->mutex);
+  int rv = pthread_mutex_lock (&mq->q.shr->mutex);
   if (PREDICT_FALSE (rv == EOWNERDEAD))
-    rv = pthread_mutex_consistent (&mq->q->mutex);
+    rv = pthread_mutex_consistent (&mq->q.shr->mutex);
   return rv;
 }
 
@@ -344,7 +345,7 @@ svm_msg_q_lock (svm_msg_q_t * mq)
 static inline void
 svm_msg_q_unlock (svm_msg_q_t * mq)
 {
-  pthread_mutex_unlock (&mq->q->mutex);
+  pthread_mutex_unlock (&mq->q.shr->mutex);
 }
 
 /**
@@ -353,11 +354,7 @@ svm_msg_q_unlock (svm_msg_q_t * mq)
  * Must be called with mutex held. The queue only works non-blocking
  * with eventfds, so handle blocking calls as an exception here.
  */
-static inline void
-svm_msg_q_wait (svm_msg_q_t * mq)
-{
-  svm_queue_wait (mq->q);
-}
+void svm_msg_q_wait (svm_msg_q_t *mq);
 
 /**
  * Timed wait for message queue event
@@ -367,22 +364,12 @@ svm_msg_q_wait (svm_msg_q_t * mq)
  * @param mq           message queue
  * @param timeout      time in seconds
  */
-static inline int
-svm_msg_q_timedwait (svm_msg_q_t * mq, double timeout)
-{
-  return svm_queue_timedwait (mq->q, timeout);
-}
-
-static inline int
-svm_msg_q_get_consumer_eventfd (svm_msg_q_t * mq)
-{
-  return mq->q->consumer_evtfd;
-}
+int svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout);
 
 static inline int
-svm_msg_q_get_producer_eventfd (svm_msg_q_t * mq)
+svm_msg_q_get_eventfd (svm_msg_q_t *mq)
 {
-  return mq->q->producer_evtfd;
+  return mq->q.evtfd;
 }
 
 #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
index 7d24162..48695a3 100644 (file)
@@ -121,7 +121,7 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
 
       if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
        {
-         svm_msg_q_set_consumer_eventfd (wrk->app_event_queue, fds[n_fds]);
+         svm_msg_q_set_eventfd (wrk->app_event_queue, fds[n_fds]);
          vcl_mq_epoll_add_evfd (wrk, wrk->app_event_queue);
          n_fds++;
        }
@@ -215,7 +215,7 @@ vl_api_app_worker_add_del_reply_t_handler (vl_api_app_worker_add_del_reply_t *
 
       if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
        {
-         svm_msg_q_set_consumer_eventfd (wrk->app_event_queue, fds[n_fds]);
+         svm_msg_q_set_eventfd (wrk->app_event_queue, fds[n_fds]);
          vcl_mq_epoll_add_evfd (wrk, wrk->app_event_queue);
          n_fds++;
        }
index a140e5e..b9745d2 100644 (file)
@@ -46,7 +46,7 @@ vcl_mq_epoll_add_evfd (vcl_worker_t * wrk, svm_msg_q_t * mq)
   u32 mqc_index;
   int mq_fd;
 
-  mq_fd = svm_msg_q_get_consumer_eventfd (mq);
+  mq_fd = svm_msg_q_get_eventfd (mq);
 
   if (wrk->mqs_epfd < 0 || mq_fd == -1)
     return -1;
index bc44272..1bab7ea 100644 (file)
@@ -89,8 +89,7 @@ vcl_api_attach_reply_handler (app_sapi_attach_reply_msg_t * mp, int *fds)
 
   if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
     {
-      svm_msg_q_set_consumer_eventfd (wrk->app_event_queue,
-                                     fds[n_fds_used++]);
+      svm_msg_q_set_eventfd (wrk->app_event_queue, fds[n_fds_used++]);
       vcl_mq_epoll_add_evfd (wrk, wrk->app_event_queue);
     }
 
@@ -236,7 +235,7 @@ vcl_api_add_del_worker_reply_handler (app_sapi_worker_add_del_reply_msg_t *
 
   if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
     {
-      svm_msg_q_set_consumer_eventfd (wrk->app_event_queue, fds[n_fds]);
+      svm_msg_q_set_eventfd (wrk->app_event_queue, fds[n_fds]);
       vcl_mq_epoll_add_evfd (wrk, wrk->app_event_queue);
       n_fds++;
     }
index 6110832..ffac4e0 100644 (file)
@@ -870,7 +870,7 @@ segment_manager_alloc_queue (fifo_segment_t * segment,
 
   if (props->use_mq_eventfd)
     {
-      if (svm_msg_q_alloc_producer_eventfd (q))
+      if (svm_msg_q_alloc_eventfd (q))
        clib_warning ("failed to alloc eventfd");
     }
   return q;
index d1f21da..169cca5 100644 (file)
@@ -1548,9 +1548,6 @@ session_vpp_event_queues_allocate (session_main_t * smm)
       cfg->ring_cfgs = rc;
 
       smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (eqs, i, cfg);
-
-      if (svm_msg_q_alloc_consumer_eventfd (smm->wrk[i].vpp_event_queue))
-       clib_warning ("eventfd returned");
     }
 }
 
index 2e215f7..0116a7e 100644 (file)
@@ -667,7 +667,7 @@ vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
   if (a->options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
     {
       fd_flags |= SESSION_FD_F_MQ_EVENTFD;
-      fds[n_fds] = svm_msg_q_get_producer_eventfd (a->app_evt_q);
+      fds[n_fds] = svm_msg_q_get_eventfd (a->app_evt_q);
       n_fds += 1;
     }
 
@@ -751,7 +751,7 @@ vl_api_app_worker_add_del_t_handler (vl_api_app_worker_add_del_t * mp)
   if (application_segment_manager_properties (app)->use_mq_eventfd)
     {
       fd_flags |= SESSION_FD_F_MQ_EVENTFD;
-      fds[n_fds] = svm_msg_q_get_producer_eventfd (args.evt_q);
+      fds[n_fds] = svm_msg_q_get_eventfd (args.evt_q);
       n_fds += 1;
     }
 
@@ -1317,7 +1317,7 @@ session_api_attach_handler (app_namespace_t * app_ns, clib_socket_t * cs,
   if (a->options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
     {
       fd_flags |= SESSION_FD_F_MQ_EVENTFD;
-      fds[n_fds] = svm_msg_q_get_producer_eventfd (a->app_evt_q);
+      fds[n_fds] = svm_msg_q_get_eventfd (a->app_evt_q);
       n_fds += 1;
     }
 
@@ -1426,7 +1426,7 @@ sapi_add_del_worker_handler (app_namespace_t * app_ns,
   if (application_segment_manager_properties (app)->use_mq_eventfd)
     {
       fd_flags |= SESSION_FD_F_MQ_EVENTFD;
-      fds[n_fds] = svm_msg_q_get_producer_eventfd (args.evt_q);
+      fds[n_fds] = svm_msg_q_get_eventfd (args.evt_q);
       n_fds += 1;
     }
 
index cd4198c..c042e9e 100644 (file)
@@ -123,6 +123,7 @@ dump_thread_0_event_queue (void)
   vlib_main_t *vm = &vlib_global_main;
   u32 my_thread_index = vm->thread_index;
   session_event_t _e, *e = &_e;
+  svm_msg_q_shared_queue_t *sq;
   svm_msg_q_ring_t *ring;
   session_t *s0;
   svm_msg_q_msg_t *msg;
@@ -130,11 +131,12 @@ dump_thread_0_event_queue (void)
   int i, index;
 
   mq = session_main_get_vpp_event_queue (my_thread_index);
-  index = mq->q->head;
+  sq = mq->q.shr;
+  index = sq->head;
 
-  for (i = 0; i < mq->q->cursize; i++)
+  for (i = 0; i < sq->cursize; i++)
     {
-      msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
+      msg = (svm_msg_q_msg_t *) (&sq->data[0] + sq->elsize * index);
       ring = svm_msg_q_ring (mq, msg->ring_index);
       clib_memcpy_fast (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
 
@@ -170,7 +172,7 @@ dump_thread_0_event_queue (void)
 
       index++;
 
-      if (index == mq->q->maxsize)
+      if (index == sq->maxsize)
        index = 0;
     }
 }
@@ -210,6 +212,7 @@ session_node_cmp_event (session_event_t * e, svm_fifo_t * f)
 u8
 session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
 {
+  svm_msg_q_shared_queue_t *sq;
   session_evt_elt_t *elt;
   session_worker_t *wrk;
   int i, index, found = 0;
@@ -226,16 +229,17 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e)
    * Search evt queue
    */
   mq = wrk->vpp_event_queue;
-  index = mq->q->head;
-  for (i = 0; i < mq->q->cursize; i++)
+  sq = mq->q.shr;
+  index = sq->head;
+  for (i = 0; i < sq->cursize; i++)
     {
-      msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
+      msg = (svm_msg_q_msg_t *) (&sq->data[0] + sq->elsize * index);
       ring = svm_msg_q_ring (mq, msg->ring_index);
       clib_memcpy_fast (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
       found = session_node_cmp_event (e, f);
       if (found)
        return 1;
-      index = (index + 1) % mq->q->maxsize;
+      index = (index + 1) % sq->maxsize;
     }
   /*
    * Search pending events vector