session svm: non blocking mq 27/30927/27
authorFlorin Coras <fcoras@cisco.com>
Tue, 26 Jan 2021 04:31:27 +0000 (20:31 -0800)
committerDave Barach <openvpp@barachs.net>
Fri, 5 Feb 2021 17:28:34 +0000 (17:28 +0000)
Avoid synchronizing producers and the consumer. Instead, only use mutex
or spinlock (if eventfds are configured) to synchronize producers.

Type: improvement

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Ie2aafbdc2e07fced5d5e46ee2df6b30a186faa2f

src/plugins/hs_apps/sapi/vpp_echo.c
src/plugins/unittest/session_test.c
src/svm/message_queue.c
src/svm/message_queue.h
src/vcl/vppcom.c
src/vnet/session/application_interface.h
src/vnet/session/session_node.c

index 19b5808..816b7d4 100644 (file)
@@ -808,15 +808,12 @@ echo_process_rpcs (echo_main_t * em)
 
   while (em->state < STATE_DATA_DONE && !em->time_to_stop)
     {
-      svm_msg_q_lock (mq);
       if (svm_msg_q_is_empty (mq) && svm_msg_q_timedwait (mq, 1))
        {
-         svm_msg_q_unlock (mq);
          continue;
        }
-      svm_msg_q_sub_w_lock (mq, &msg);
+      svm_msg_q_sub_raw (mq, &msg);
       rpc = svm_msg_q_msg_data (mq, &msg);
-      svm_msg_q_unlock (mq);
       ((echo_rpc_t) rpc->fp) (em, &rpc->args);
       svm_msg_q_free_msg (mq, &msg);
     }
@@ -876,18 +873,15 @@ echo_mq_thread_fn (void *arg)
       if (em->periodic_stats_delta)
        echo_print_periodic_stats (em);
 
-      svm_msg_q_lock (mq);
       if (svm_msg_q_is_empty (mq) && svm_msg_q_timedwait (mq, 1))
        {
-         svm_msg_q_unlock (mq);
          continue;
        }
       for (i = 0; i < svm_msg_q_size (mq); i++)
        {
          vec_add2 (msg_vec, msg, 1);
-         svm_msg_q_sub_w_lock (mq, msg);
+         svm_msg_q_sub_raw (mq, msg);
        }
-      svm_msg_q_unlock (mq);
 
       for (i = 0; i < vec_len (msg_vec); i++)
        {
index 6496a99..cd99b0c 100644 (file)
@@ -1738,9 +1738,7 @@ wait_for_event (svm_msg_q_t * mq, int fd, int epfd, u8 use_eventfd)
 {
   if (!use_eventfd)
     {
-      svm_msg_q_lock (mq);
-      while (svm_msg_q_is_empty (mq))
-       svm_msg_q_wait (mq);
+      svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
     }
   else
     {
@@ -1764,10 +1762,7 @@ wait_for_event (svm_msg_q_t * mq, int fd, int epfd, u8 use_eventfd)
            continue;
 
          if (!svm_msg_q_is_empty (mq))
-           {
-             svm_msg_q_lock (mq);
-             break;
-           }
+           break;
        }
     }
 }
@@ -1871,7 +1866,7 @@ session_test_mq_speed (vlib_main_t * vm, unformat_input_t * input)
       for (i = 0; i < n_test_msgs; i++)
        {
          wait_for_event (mq, prod_fd, epfd, use_eventfd);
-         svm_msg_q_sub_w_lock (mq, &msg);
+         svm_msg_q_sub_raw (mq, &msg);
          svm_msg_q_free_msg (mq, &msg);
          svm_msg_q_unlock (mq);
          *counter = *counter + 1;
index fdf9293..b423826 100644 (file)
@@ -163,21 +163,36 @@ svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base)
       offset = sizeof (*ring) + ring->nitems * ring->elsize;
       ring = (void *) ((u8 *) ring + offset);
     }
+  clib_spinlock_init (&mq->q.lock);
 }
 
 void
 svm_msg_q_free (svm_msg_q_t * mq)
 {
   clib_mem_free (mq->q.shr);
+  clib_spinlock_free (&mq->q.lock);
   clib_mem_free (mq);
 }
 
 static void
-svm_msg_q_send_signal (svm_msg_q_t *mq)
+svm_msg_q_send_signal (svm_msg_q_t *mq, u8 is_consumer)
 {
   if (mq->q.evtfd == -1)
     {
+      if (is_consumer)
+       {
+         int rv = pthread_mutex_lock (&mq->q.shr->mutex);
+         if (PREDICT_FALSE (rv == EOWNERDEAD))
+           {
+             rv = pthread_mutex_consistent (&mq->q.shr->mutex);
+             return;
+           }
+       }
+
       (void) pthread_cond_broadcast (&mq->q.shr->condvar);
+
+      if (is_consumer)
+       pthread_mutex_unlock (&mq->q.shr->mutex);
     }
   else
     {
@@ -232,7 +247,7 @@ svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
       svm_msg_q_lock (mq);
       while (svm_msg_q_is_full (mq)
             || svm_msg_q_ring_is_full (mq, ring_index))
-       svm_msg_q_wait (mq);
+       svm_msg_q_wait (mq, SVM_MQ_WAIT_FULL);
       *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
     }
   return 0;
@@ -253,7 +268,7 @@ svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes)
     msg.ring_index = ring - mq->rings;
     msg.elt_index = sr->tail;
     sr->tail = (sr->tail + 1) % ring->nitems;
-    clib_atomic_fetch_add_rel (&sr->cursize, 1);
+    clib_atomic_fetch_add_relax (&sr->cursize, 1);
     break;
   }
   return msg;
@@ -271,7 +286,7 @@ svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
 {
   svm_msg_q_ring_shared_t *sr;
   svm_msg_q_ring_t *ring;
-  int need_signal;
+  u32 need_signal;
 
   ASSERT (vec_len (mq->rings) > msg->ring_index);
   ring = svm_msg_q_ring_inline (mq, msg->ring_index);
@@ -282,16 +297,17 @@ svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
     }
   else
     {
-      clib_warning ("message out of order");
+      clib_warning ("message out of order: elt %u head %u ring %u",
+                   msg->elt_index, sr->head, msg->ring_index);
       /* for now, expect messages to be processed in order */
       ASSERT (0);
     }
 
-  need_signal = sr->cursize == ring->nitems;
-  clib_atomic_fetch_sub_rel (&sr->cursize, 1);
+  need_signal = clib_atomic_load_relax_n (&sr->cursize) == ring->nitems;
+  clib_atomic_fetch_sub_relax (&sr->cursize, 1);
 
   if (PREDICT_FALSE (need_signal))
-    svm_msg_q_send_signal (mq);
+    svm_msg_q_send_signal (mq, 1 /* is consumer */);
 }
 
 static int
@@ -331,7 +347,7 @@ svm_msg_q_add_raw (svm_msg_q_t *mq, u8 *elem)
 
   sz = clib_atomic_fetch_add_rel (&sq->cursize, 1);
   if (!sz)
-    svm_msg_q_send_signal (mq);
+    svm_msg_q_send_signal (mq, 0 /* is consumer */);
 }
 
 int
@@ -355,7 +371,7 @@ svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait)
       if (nowait)
        return (-2);
       while (svm_msg_q_is_full (mq))
-       svm_msg_q_wait (mq);
+       svm_msg_q_wait (mq, SVM_MQ_WAIT_FULL);
     }
 
   svm_msg_q_add_raw (mq, (u8 *) msg);
@@ -373,8 +389,8 @@ svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
   svm_msg_q_unlock (mq);
 }
 
-static int
-svm_msg_q_sub_raw (svm_msg_q_t *mq, u8 *elem)
+int
+svm_msg_q_sub_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *elem)
 {
   svm_msg_q_shared_queue_t *sq = mq->q.shr;
   i8 *headp;
@@ -387,68 +403,75 @@ svm_msg_q_sub_raw (svm_msg_q_t *mq, u8 *elem)
 
   sq->head = (sq->head + 1) % sq->maxsize;
 
-  sz = clib_atomic_fetch_sub_rel (&sq->cursize, 1);
+  sz = clib_atomic_fetch_sub_relax (&sq->cursize, 1);
   if (PREDICT_FALSE (sz == sq->maxsize))
-    svm_msg_q_send_signal (mq);
+    svm_msg_q_send_signal (mq, 1 /* is consumer */);
 
   return 0;
 }
 
 int
-svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
-              svm_q_conditional_wait_t cond, u32 time)
+svm_msg_q_sub_raw_batch (svm_msg_q_t *mq, svm_msg_q_msg_t *msg_buf, u32 n_msgs)
 {
-  int rc = 0;
+  svm_msg_q_shared_queue_t *sq = mq->q.shr;
+  u32 sz, to_deq;
+  i8 *headp;
 
-  if (cond == SVM_Q_NOWAIT)
+  sz = svm_msg_q_size (mq);
+  ASSERT (sz);
+  to_deq = clib_min (sz, n_msgs);
+
+  headp = (i8 *) (&sq->data[0] + sq->elsize * sq->head);
+
+  if (sq->head + to_deq < sq->maxsize)
     {
-      /* zero on success */
-      if (svm_msg_q_try_lock (mq))
-       {
-         return (-1);
-       }
+      clib_memcpy_fast (msg_buf, headp, sq->elsize * to_deq);
+      sq->head += to_deq;
     }
   else
-    svm_msg_q_lock (mq);
+    {
+      u32 first_batch = sq->maxsize - sq->head;
+      clib_memcpy_fast (msg_buf, headp, sq->elsize * first_batch);
+      clib_memcpy_fast (msg_buf + first_batch, sq->data,
+                       sq->elsize * (to_deq - first_batch));
+      sq->head = (sq->head + to_deq) % sq->maxsize;
+    }
+
+  clib_atomic_fetch_sub_relax (&sq->cursize, to_deq);
+  if (PREDICT_FALSE (sz == sq->maxsize))
+    svm_msg_q_send_signal (mq, 1 /* is consumer */);
+
+  return to_deq;
+}
+
+int
+svm_msg_q_sub (svm_msg_q_t *mq, svm_msg_q_msg_t *msg,
+              svm_q_conditional_wait_t cond, u32 time)
+{
+  int rc = 0;
 
-  if (PREDICT_FALSE (svm_msg_q_is_empty (mq)))
+  if (svm_msg_q_is_empty (mq))
     {
       if (cond == SVM_Q_NOWAIT)
        {
-         svm_msg_q_unlock (mq);
          return (-2);
        }
       else if (cond == SVM_Q_TIMEDWAIT)
        {
-         while (svm_msg_q_is_empty (mq) && rc == 0)
-           rc = svm_msg_q_timedwait (mq, time);
-
-         if (rc == ETIMEDOUT)
-           {
-             svm_msg_q_unlock (mq);
-             return ETIMEDOUT;
-           }
+         if ((rc = svm_msg_q_timedwait (mq, time)))
+           return rc;
        }
       else
        {
-         while (svm_msg_q_is_empty (mq))
-           svm_msg_q_wait (mq);
+         svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
        }
     }
 
-  svm_msg_q_sub_raw (mq, (u8 *) msg);
-
-  svm_msg_q_unlock (mq);
+  svm_msg_q_sub_raw (mq, msg);
 
   return 0;
 }
 
-void
-svm_msg_q_sub_w_lock (svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
-{
-  svm_msg_q_sub_raw (mq, (u8 *) msg);
-}
-
 void
 svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd)
 {
@@ -465,29 +488,46 @@ svm_msg_q_alloc_eventfd (svm_msg_q_t *mq)
   return 0;
 }
 
-void
-svm_msg_q_wait (svm_msg_q_t *mq)
+int
+svm_msg_q_wait (svm_msg_q_t *mq, svm_msg_q_wait_type_t type)
 {
+  u8 (*fn) (svm_msg_q_t *);
+  int rv;
+
+  fn = (type == SVM_MQ_WAIT_EMPTY) ? svm_msg_q_is_empty : svm_msg_q_is_full;
+
   if (mq->q.evtfd == -1)
     {
-      pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex);
+      rv = pthread_mutex_lock (&mq->q.shr->mutex);
+      if (PREDICT_FALSE (rv == EOWNERDEAD))
+       {
+         rv = pthread_mutex_consistent (&mq->q.shr->mutex);
+         return rv;
+       }
+
+      while (fn (mq))
+       pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex);
+
+      pthread_mutex_unlock (&mq->q.shr->mutex);
     }
   else
     {
       u64 buf;
-      int rv;
 
-      svm_msg_q_unlock (mq);
-      while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0)
+      while (fn (mq))
        {
-         if (errno != EAGAIN)
+         while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0)
            {
-             clib_unix_warning ("read error");
-             return;
+             if (errno != EAGAIN)
+               {
+                 clib_unix_warning ("read error");
+                 return rv;
+               }
            }
        }
-      svm_msg_q_lock (mq);
     }
+
+  return 0;
 }
 
 int
@@ -495,11 +535,32 @@ svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout)
 {
   if (mq->q.evtfd == -1)
     {
+      svm_msg_q_shared_queue_t *sq = mq->q.shr;
       struct timespec ts;
+      u32 sz;
+      int rv;
+
+      rv = pthread_mutex_lock (&sq->mutex);
+      if (PREDICT_FALSE (rv == EOWNERDEAD))
+       {
+         rv = pthread_mutex_consistent (&sq->mutex);
+         return rv;
+       }
+
+      /* check if we're still in a signalable state after grabbing lock */
+      sz = svm_msg_q_size (mq);
+      if (sz != 0 && sz != sq->maxsize)
+       {
+         pthread_mutex_unlock (&sq->mutex);
+         return 0;
+       }
+
       ts.tv_sec = unix_time_now () + (u32) timeout;
       ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
-      return pthread_cond_timedwait (&mq->q.shr->condvar, &mq->q.shr->mutex,
-                                    &ts);
+      rv = pthread_cond_timedwait (&sq->condvar, &sq->mutex, &ts);
+
+      pthread_mutex_unlock (&sq->mutex);
+      return rv;
     }
   else
     {
@@ -512,11 +573,9 @@ svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout)
       setsockopt (mq->q.evtfd, SOL_SOCKET, SO_RCVTIMEO, (const char *) &tv,
                  sizeof tv);
 
-      svm_msg_q_unlock (mq);
       rv = read (mq->q.evtfd, &buf, sizeof (buf));
       if (rv < 0)
        clib_warning ("read %u", errno);
-      svm_msg_q_lock (mq);
 
       return rv < 0 ? errno : 0;
     }
index 7716c67..1ef773d 100644 (file)
@@ -22,6 +22,7 @@
 
 #include <vppinfra/clib.h>
 #include <vppinfra/error.h>
+#include <vppinfra/lock.h>
 #include <svm/queue.h>
 
 typedef struct svm_msg_q_shr_queue_
@@ -41,6 +42,7 @@ typedef struct svm_msg_q_queue_
 {
   svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */
   int evtfd;                    /**< producer/consumer eventfd */
+  clib_spinlock_t lock;                 /**< private lock for multi-producer */
 } svm_msg_q_queue_t;
 
 typedef struct svm_msg_q_ring_shared_
@@ -99,6 +101,13 @@ typedef union
 } svm_msg_q_msg_t;
 
 #define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
+
+typedef enum svm_msg_q_wait_type_
+{
+  SVM_MQ_WAIT_EMPTY,
+  SVM_MQ_WAIT_FULL
+} svm_msg_q_wait_type_t;
+
 /**
  * Allocate message queue
  *
@@ -206,6 +215,7 @@ void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
  * Consumer dequeue one message from queue
  *
  * This returns the message pointing to the data in the message rings.
+ * Should only be used in single consumer scenarios as no locks are grabbed.
  * The consumer is expected to call @ref svm_msg_q_free_msg once it
  * finishes processing/copies the message data.
  *
@@ -219,18 +229,34 @@ int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
                   svm_q_conditional_wait_t cond, u32 time);
 
 /**
- * Consumer dequeue one message from queue with mutex held
+ * Consumer dequeue one message from queue
  *
- * Returns the message pointing to the data in the message rings under the
- * assumption that the message queue lock is already held. The consumer is
- * expected to call @ref svm_msg_q_free_msg once it finishes
+ * Returns the message pointing to the data in the message rings. Should only
+ * be used in single consumer scenarios as no locks are grabbed. The consumer
+ * is expected to call @ref svm_msg_q_free_msg once it finishes
  * processing/copies the message data.
  *
  * @param mq           message queue
  * @param msg          pointer to structure where message is to be received
  * @return             success status
  */
-void svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
+int svm_msg_q_sub_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *elem);
+
+/**
+ * Consumer dequeue multiple messages from queue
+ *
+ * Returns the message pointing to the data in the message rings. Should only
+ * be used in single consumer scenarios as no locks are grabbed. The consumer
+ * is expected to call @ref svm_msg_q_free_msg once it finishes
+ * processing/copies the message data.
+ *
+ * @param mq           message queue
+ * @param msg_buf      pointer to array of messages to received
+ * @param n_msgs       lengt of msg_buf array
+ * @return             number of messages dequeued
+ */
+int svm_msg_q_sub_raw_batch (svm_msg_q_t *mq, svm_msg_q_msg_t *msg_buf,
+                            u32 n_msgs);
 
 /**
  * Get data for message in queue
@@ -321,10 +347,17 @@ svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
 static inline int
 svm_msg_q_try_lock (svm_msg_q_t * mq)
 {
-  int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
-  if (PREDICT_FALSE (rv == EOWNERDEAD))
-    rv = pthread_mutex_consistent (&mq->q.shr->mutex);
-  return rv;
+  if (mq->q.evtfd == -1)
+    {
+      int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
+      if (PREDICT_FALSE (rv == EOWNERDEAD))
+       rv = pthread_mutex_consistent (&mq->q.shr->mutex);
+      return rv;
+    }
+  else
+    {
+      return !clib_spinlock_trylock (&mq->q.lock);
+    }
 }
 
 /**
@@ -333,10 +366,18 @@ svm_msg_q_try_lock (svm_msg_q_t * mq)
 static inline int
 svm_msg_q_lock (svm_msg_q_t * mq)
 {
-  int rv = pthread_mutex_lock (&mq->q.shr->mutex);
-  if (PREDICT_FALSE (rv == EOWNERDEAD))
-    rv = pthread_mutex_consistent (&mq->q.shr->mutex);
-  return rv;
+  if (mq->q.evtfd == -1)
+    {
+      int rv = pthread_mutex_lock (&mq->q.shr->mutex);
+      if (PREDICT_FALSE (rv == EOWNERDEAD))
+       rv = pthread_mutex_consistent (&mq->q.shr->mutex);
+      return rv;
+    }
+  else
+    {
+      clib_spinlock_lock (&mq->q.lock);
+      return 0;
+    }
 }
 
 /**
@@ -345,7 +386,14 @@ svm_msg_q_lock (svm_msg_q_t * mq)
 static inline void
 svm_msg_q_unlock (svm_msg_q_t * mq)
 {
-  pthread_mutex_unlock (&mq->q.shr->mutex);
+  if (mq->q.evtfd == -1)
+    {
+      pthread_mutex_unlock (&mq->q.shr->mutex);
+    }
+  else
+    {
+      clib_spinlock_unlock (&mq->q.lock);
+    }
 }
 
 /**
@@ -354,7 +402,7 @@ svm_msg_q_unlock (svm_msg_q_t * mq)
  * Must be called with mutex held. The queue only works non-blocking
  * with eventfds, so handle blocking calls as an exception here.
  */
-void svm_msg_q_wait (svm_msg_q_t *mq);
+int svm_msg_q_wait (svm_msg_q_t *mq, svm_msg_q_wait_type_t type);
 
 /**
  * Timed wait for message queue event
index cf8bb72..2189243 100644 (file)
@@ -25,15 +25,14 @@ __thread uword __vcl_worker_index = ~0;
 static inline int
 vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq, u32 n_max_msg)
 {
-  svm_msg_q_msg_t *msg;
-  u32 n_msgs;
-  int i;
+  u32 n_msgs = 0, sz, len;
 
-  n_msgs = clib_min (svm_msg_q_size (mq), n_max_msg);
-  for (i = 0; i < n_msgs; i++)
+  while ((sz = svm_msg_q_size (mq)))
     {
-      vec_add2 (wrk->mq_msg_vector, msg, 1);
-      svm_msg_q_sub_w_lock (mq, msg);
+      len = vec_len (wrk->mq_msg_vector);
+      vec_validate (wrk->mq_msg_vector, len + sz - 1);
+      svm_msg_q_sub_raw_batch (mq, wrk->mq_msg_vector + len, sz);
+      n_msgs += sz;
     }
   return n_msgs;
 }
@@ -1101,18 +1100,15 @@ vcl_handle_pending_wrk_updates (vcl_worker_t * wrk)
 }
 
 void
-vcl_flush_mq_events (void)
+vcl_worker_flush_mq_events (vcl_worker_t *wrk)
 {
-  vcl_worker_t *wrk = vcl_worker_get_current ();
   svm_msg_q_msg_t *msg;
   session_event_t *e;
   svm_msg_q_t *mq;
   int i;
 
   mq = wrk->app_event_queue;
-  svm_msg_q_lock (mq);
   vcl_mq_dequeue_batch (wrk, mq, ~0);
-  svm_msg_q_unlock (mq);
 
   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
     {
@@ -1125,6 +1121,12 @@ vcl_flush_mq_events (void)
   vcl_handle_pending_wrk_updates (wrk);
 }
 
+void
+vcl_flush_mq_events (void)
+{
+  vcl_worker_flush_mq_events (vcl_worker_get_current ());
+}
+
 static int
 vppcom_session_unbind (u32 session_handle)
 {
@@ -1539,11 +1541,11 @@ vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep,
   vcl_session_t *listen_session = 0;
   vcl_session_t *client_session = 0;
   vcl_session_msg_t *evt;
-  svm_msg_q_msg_t msg;
-  session_event_t *e;
   u8 is_nonblocking;
   int rv;
 
+again:
+
   listen_session = vcl_session_get_w_handle (wrk, listen_session_handle);
   if (!listen_session)
     return VPPCOM_EBADFD;
@@ -1567,19 +1569,9 @@ vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep,
       if (svm_msg_q_is_empty (wrk->app_event_queue) && is_nonblocking)
        return VPPCOM_EAGAIN;
 
-      if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_WAIT, 0))
-       return VPPCOM_EAGAIN;
-
-      e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
-      if (e->event_type != SESSION_CTRL_EVT_ACCEPTED)
-       {
-         vcl_handle_mq_event (wrk, e);
-         svm_msg_q_free_msg (wrk->app_event_queue, &msg);
-         continue;
-       }
-      clib_memcpy_fast (&accepted_msg, e->data, sizeof (accepted_msg));
-      svm_msg_q_free_msg (wrk->app_event_queue, &msg);
-      break;
+      svm_msg_q_wait (wrk->app_event_queue, SVM_MQ_WAIT_EMPTY);
+      vcl_worker_flush_mq_events (wrk);
+      goto again;
     }
 
 handle:
@@ -1785,12 +1777,6 @@ vppcom_session_stream_connect (uint32_t session_handle,
   return rv;
 }
 
-static u8
-vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
-{
-  return (e->event_type == SESSION_IO_EVT_RX && e->session_index == sid);
-}
-
 static inline int
 vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
                              u8 peek)
@@ -1799,7 +1785,6 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
   int rv, n_read = 0, is_nonblocking;
   vcl_session_t *s = 0;
   svm_fifo_t *rx_fifo;
-  svm_msg_q_msg_t msg;
   session_event_t *e;
   svm_msg_q_t *mq;
   u8 is_ct;
@@ -1844,16 +1829,9 @@ vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
          if (is_ct)
            svm_fifo_unset_event (s->rx_fifo);
          svm_fifo_unset_event (rx_fifo);
-         svm_msg_q_lock (mq);
-         if (svm_msg_q_is_empty (mq))
-           svm_msg_q_wait (mq);
-
-         svm_msg_q_sub_w_lock (mq, &msg);
-         e = svm_msg_q_msg_data (mq, &msg);
-         svm_msg_q_unlock (mq);
-         if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
-           vcl_handle_mq_event (wrk, e);
-         svm_msg_q_free_msg (mq, &msg);
+
+         svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
+         vcl_worker_flush_mq_events (wrk);
        }
     }
 
@@ -1924,8 +1902,6 @@ vppcom_session_read_segments (uint32_t session_handle,
   int n_read = 0, is_nonblocking;
   vcl_session_t *s = 0;
   svm_fifo_t *rx_fifo;
-  svm_msg_q_msg_t msg;
-  session_event_t *e;
   svm_msg_q_t *mq;
   u8 is_ct;
 
@@ -1959,16 +1935,9 @@ vppcom_session_read_segments (uint32_t session_handle,
          if (is_ct)
            svm_fifo_unset_event (s->rx_fifo);
          svm_fifo_unset_event (rx_fifo);
-         svm_msg_q_lock (mq);
-         if (svm_msg_q_is_empty (mq))
-           svm_msg_q_wait (mq);
-
-         svm_msg_q_sub_w_lock (mq, &msg);
-         e = svm_msg_q_msg_data (mq, &msg);
-         svm_msg_q_unlock (mq);
-         if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
-           vcl_handle_mq_event (wrk, e);
-         svm_msg_q_free_msg (mq, &msg);
+
+         svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
+         vcl_worker_flush_mq_events (wrk);
        }
     }
 
@@ -2015,12 +1984,6 @@ vppcom_session_free_segments (uint32_t session_handle, uint32_t n_bytes)
   s->rx_bytes_pending -= n_bytes;
 }
 
-static u8
-vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
-{
-  return (e->event_type == SESSION_IO_EVT_TX && e->session_index == sid);
-}
-
 always_inline u8
 vcl_fifo_is_writeable (svm_fifo_t * f, u32 len, u8 is_dgram)
 {
@@ -2037,9 +2000,7 @@ vppcom_session_write_inline (vcl_worker_t * wrk, vcl_session_t * s, void *buf,
 {
   int n_write, is_nonblocking;
   session_evt_type_t et;
-  svm_msg_q_msg_t msg;
   svm_fifo_t *tx_fifo;
-  session_event_t *e;
   svm_msg_q_t *mq;
   u8 is_ct;
 
@@ -2077,17 +2038,9 @@ vppcom_session_write_inline (vcl_worker_t * wrk, vcl_session_t * s, void *buf,
          svm_fifo_add_want_deq_ntf (tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
          if (vcl_session_is_closing (s))
            return vcl_session_closing_error (s);
-         svm_msg_q_lock (mq);
-         if (svm_msg_q_is_empty (mq))
-           svm_msg_q_wait (mq);
 
-         svm_msg_q_sub_w_lock (mq, &msg);
-         e = svm_msg_q_msg_data (mq, &msg);
-         svm_msg_q_unlock (mq);
-
-         if (!vcl_is_tx_evt_for_session (e, s->session_index, is_ct))
-           vcl_handle_mq_event (wrk, e);
-         svm_msg_q_free_msg (mq, &msg);
+         svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
+         vcl_worker_flush_mq_events (wrk);
        }
     }
 
@@ -2290,35 +2243,22 @@ vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
   session_event_t *e;
   u32 i;
 
-  svm_msg_q_lock (mq);
   if (svm_msg_q_is_empty (mq))
     {
       if (*bits_set)
-       {
-         svm_msg_q_unlock (mq);
-         return 0;
-       }
+       return 0;
 
       if (!time_to_wait)
-       {
-         svm_msg_q_unlock (mq);
-         return 0;
-       }
+       return 0;
       else if (time_to_wait < 0)
-       {
-         svm_msg_q_wait (mq);
-       }
+       svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
       else
        {
          if (svm_msg_q_timedwait (mq, time_to_wait))
-           {
-             svm_msg_q_unlock (mq);
-             return 0;
-           }
+           return 0;
        }
     }
   vcl_mq_dequeue_batch (wrk, mq, ~0);
-  svm_msg_q_unlock (mq);
 
   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
     {
@@ -2939,30 +2879,20 @@ vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
   if (vec_len (wrk->mq_msg_vector) && svm_msg_q_is_empty (mq))
     goto handle_dequeued;
 
-  svm_msg_q_lock (mq);
   if (svm_msg_q_is_empty (mq))
     {
       if (!wait_for_time)
-       {
-         svm_msg_q_unlock (mq);
-         return 0;
-       }
+       return 0;
       else if (wait_for_time < 0)
-       {
-         svm_msg_q_wait (mq);
-       }
+       svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY);
       else
        {
          if (svm_msg_q_timedwait (mq, wait_for_time / 1e3))
-           {
-             svm_msg_q_unlock (mq);
-             return 0;
-           }
+           return 0;
        }
     }
   ASSERT (maxevents > *num_ev);
   vcl_mq_dequeue_batch (wrk, mq, ~0);
-  svm_msg_q_unlock (mq);
 
 handle_dequeued:
   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
index b2e0ef9..87bcd90 100644 (file)
@@ -600,7 +600,7 @@ app_send_io_evt_to_vpp (svm_msg_q_t * mq, u32 session_index, u8 evt_type,
       svm_msg_q_lock (mq);
       while (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)
             || svm_msg_q_is_full (mq))
-       svm_msg_q_wait (mq);
+       svm_msg_q_wait (mq, SVM_MQ_WAIT_FULL);
       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
       evt->session_index = session_index;
index 37df0c4..2c91a2f 100644 (file)
@@ -1425,16 +1425,15 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
    * XXX: we may need priorities here */
   mq = wrk->vpp_event_queue;
   n_to_dequeue = svm_msg_q_size (mq);
-  if (n_to_dequeue && svm_msg_q_try_lock (mq) == 0)
+  if (n_to_dequeue)
     {
       for (i = 0; i < n_to_dequeue; i++)
        {
-         svm_msg_q_sub_w_lock (mq, msg);
+         svm_msg_q_sub_raw (mq, msg);
          evt = svm_msg_q_msg_data (mq, msg);
          session_evt_add_to_list (wrk, evt);
          svm_msg_q_free_msg (mq, msg);
        }
-      svm_msg_q_unlock (mq);
     }
 
   SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_to_dequeue, !i);