session svm: fix mq producer wait on q and ring 00/36000/4
authorFlorin Coras <fcoras@cisco.com>
Wed, 20 Apr 2022 01:57:24 +0000 (18:57 -0700)
committerDave Barach <openvpp@barachs.net>
Thu, 21 Apr 2022 16:48:15 +0000 (16:48 +0000)
Make sure producer drops lock when it waits for empty ring slot.

Type: fix

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Id77d54ee8c01bed20c9eaf5ad372ed4b1e9fa712

src/svm/message_queue.c
src/svm/message_queue.h
src/vnet/session/application_interface.h
src/vnet/session/session.c

index a6af796..2880645 100644 (file)
@@ -243,8 +243,7 @@ svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
     {
       if (svm_msg_q_try_lock (mq))
        return -1;
-      if (PREDICT_FALSE (svm_msg_q_is_full (mq)
-                        || svm_msg_q_ring_is_full (mq, ring_index)))
+      if (PREDICT_FALSE (svm_msg_q_or_ring_is_full (mq, ring_index)))
        {
          svm_msg_q_unlock (mq);
          return -2;
@@ -254,9 +253,8 @@ svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
   else
     {
       svm_msg_q_lock (mq);
-      while (svm_msg_q_is_full (mq)
-            || svm_msg_q_ring_is_full (mq, ring_index))
-       svm_msg_q_wait_prod (mq);
+      while (svm_msg_q_or_ring_is_full (mq, ring_index))
+       svm_msg_q_or_ring_wait_prod (mq, ring_index);
       *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
     }
   return 0;
@@ -568,6 +566,35 @@ svm_msg_q_wait_prod (svm_msg_q_t *mq)
   return 0;
 }
 
+int
+svm_msg_q_or_ring_wait_prod (svm_msg_q_t *mq, u32 ring_index)
+{
+  if (mq->q.evtfd == -1)
+    {
+      while (svm_msg_q_or_ring_is_full (mq, ring_index))
+       pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex);
+    }
+  else
+    {
+      u64 buf;
+      int rv;
+
+      while (svm_msg_q_or_ring_is_full (mq, ring_index))
+       {
+         while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0)
+           {
+             if (errno != EAGAIN)
+               {
+                 clib_unix_warning ("read error");
+                 return rv;
+               }
+           }
+       }
+    }
+
+  return 0;
+}
+
 int
 svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout)
 {
index bd76eda..0780cca 100644 (file)
@@ -328,6 +328,12 @@ svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
   return (clib_atomic_load_relax_n (&ring->shr->cursize) >= ring->nitems);
 }
 
+static inline u8
+svm_msg_q_or_ring_is_full (svm_msg_q_t *mq, u32 ring_index)
+{
+  return (svm_msg_q_is_full (mq) || svm_msg_q_ring_is_full (mq, ring_index));
+}
+
 /**
  * Check if message queue is empty
  */
@@ -417,6 +423,14 @@ int svm_msg_q_wait (svm_msg_q_t *mq, svm_msg_q_wait_type_t type);
  */
 int svm_msg_q_wait_prod (svm_msg_q_t *mq);
 
+/**
+ * Wait for message queue or ring event as producer
+ *
+ * Similar to @ref svm_msg_q_wait but lock (mutex or spinlock) must
+ * be held. Should only be called by producers.
+ */
+int svm_msg_q_or_ring_wait_prod (svm_msg_q_t *mq, u32 ring_index);
+
 /**
  * Timed wait for message queue event
  *
index 733a462..b1dab32 100644 (file)
@@ -613,8 +613,8 @@ app_send_io_evt_to_vpp (svm_msg_q_t * mq, u32 session_index, u8 evt_type,
     {
       if (svm_msg_q_try_lock (mq))
        return -1;
-      if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)
-                        || svm_msg_q_is_full (mq)))
+      if (PREDICT_FALSE (
+           svm_msg_q_or_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
        {
          svm_msg_q_unlock (mq);
          return -2;
@@ -629,9 +629,8 @@ app_send_io_evt_to_vpp (svm_msg_q_t * mq, u32 session_index, u8 evt_type,
   else
     {
       svm_msg_q_lock (mq);
-      while (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)
-            || svm_msg_q_is_full (mq))
-       svm_msg_q_wait_prod (mq);
+      while (svm_msg_q_or_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))
+       svm_msg_q_or_ring_wait_prod (mq, SESSION_MQ_IO_EVT_RING);
       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
       evt->session_index = session_index;
index 2567600..d791c46 100644 (file)
@@ -36,8 +36,7 @@ session_send_evt_to_thread (void *data, void *args, u32 thread_index,
   mq = wrk->vpp_event_queue;
   if (PREDICT_FALSE (svm_msg_q_lock (mq)))
     return -1;
-  if (PREDICT_FALSE (svm_msg_q_is_full (mq)
-                    || svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+  if (PREDICT_FALSE (svm_msg_q_or_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
     {
       svm_msg_q_unlock (mq);
       return -2;