session svm: non blocking mq
[vpp.git] / src / svm / message_queue.h
index 7716c67..1ef773d 100644 (file)
@@ -22,6 +22,7 @@
 
 #include <vppinfra/clib.h>
 #include <vppinfra/error.h>
+#include <vppinfra/lock.h>
 #include <svm/queue.h>
 
 typedef struct svm_msg_q_shr_queue_
@@ -41,6 +42,7 @@ typedef struct svm_msg_q_queue_
 {
   svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */
   int evtfd;                    /**< producer/consumer eventfd */
+  clib_spinlock_t lock;                 /**< private lock for multi-producer */
 } svm_msg_q_queue_t;
 
 typedef struct svm_msg_q_ring_shared_
@@ -99,6 +101,13 @@ typedef union
 } svm_msg_q_msg_t;
 
 #define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
+
+typedef enum svm_msg_q_wait_type_
+{
+  SVM_MQ_WAIT_EMPTY,
+  SVM_MQ_WAIT_FULL
+} svm_msg_q_wait_type_t;
+
 /**
  * Allocate message queue
  *
@@ -206,6 +215,7 @@ void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
  * Consumer dequeue one message from queue
  *
  * This returns the message pointing to the data in the message rings.
+ * Should only be used in single consumer scenarios as no locks are grabbed.
  * The consumer is expected to call @ref svm_msg_q_free_msg once it
  * finishes processing/copies the message data.
  *
@@ -219,18 +229,34 @@ int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
                   svm_q_conditional_wait_t cond, u32 time);
 
 /**
- * Consumer dequeue one message from queue with mutex held
+ * Consumer dequeue one message from queue
  *
- * Returns the message pointing to the data in the message rings under the
- * assumption that the message queue lock is already held. The consumer is
- * expected to call @ref svm_msg_q_free_msg once it finishes
+ * Returns the message pointing to the data in the message rings. Should only
+ * be used in single consumer scenarios as no locks are grabbed. The consumer
+ * is expected to call @ref svm_msg_q_free_msg once it finishes
  * processing/copies the message data.
  *
  * @param mq           message queue
  * @param msg          pointer to structure where message is to be received
  * @return             success status
  */
-void svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
+int svm_msg_q_sub_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *elem);
+
+/**
+ * Consumer dequeue multiple messages from queue
+ *
+ * Returns the message pointing to the data in the message rings. Should only
+ * be used in single consumer scenarios as no locks are grabbed. The consumer
+ * is expected to call @ref svm_msg_q_free_msg once it finishes
+ * processing/copies the message data.
+ *
+ * @param mq           message queue
+ * @param msg_buf      pointer to array of messages to received
+ * @param n_msgs       lengt of msg_buf array
+ * @return             number of messages dequeued
+ */
+int svm_msg_q_sub_raw_batch (svm_msg_q_t *mq, svm_msg_q_msg_t *msg_buf,
+                            u32 n_msgs);
 
 /**
  * Get data for message in queue
@@ -321,10 +347,17 @@ svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
 static inline int
 svm_msg_q_try_lock (svm_msg_q_t * mq)
 {
-  int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
-  if (PREDICT_FALSE (rv == EOWNERDEAD))
-    rv = pthread_mutex_consistent (&mq->q.shr->mutex);
-  return rv;
+  if (mq->q.evtfd == -1)
+    {
+      int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
+      if (PREDICT_FALSE (rv == EOWNERDEAD))
+       rv = pthread_mutex_consistent (&mq->q.shr->mutex);
+      return rv;
+    }
+  else
+    {
+      return !clib_spinlock_trylock (&mq->q.lock);
+    }
 }
 
 /**
@@ -333,10 +366,18 @@ svm_msg_q_try_lock (svm_msg_q_t * mq)
 static inline int
 svm_msg_q_lock (svm_msg_q_t * mq)
 {
-  int rv = pthread_mutex_lock (&mq->q.shr->mutex);
-  if (PREDICT_FALSE (rv == EOWNERDEAD))
-    rv = pthread_mutex_consistent (&mq->q.shr->mutex);
-  return rv;
+  if (mq->q.evtfd == -1)
+    {
+      int rv = pthread_mutex_lock (&mq->q.shr->mutex);
+      if (PREDICT_FALSE (rv == EOWNERDEAD))
+       rv = pthread_mutex_consistent (&mq->q.shr->mutex);
+      return rv;
+    }
+  else
+    {
+      clib_spinlock_lock (&mq->q.lock);
+      return 0;
+    }
 }
 
 /**
@@ -345,7 +386,14 @@ svm_msg_q_lock (svm_msg_q_t * mq)
 static inline void
 svm_msg_q_unlock (svm_msg_q_t * mq)
 {
-  pthread_mutex_unlock (&mq->q.shr->mutex);
+  if (mq->q.evtfd == -1)
+    {
+      pthread_mutex_unlock (&mq->q.shr->mutex);
+    }
+  else
+    {
+      clib_spinlock_unlock (&mq->q.lock);
+    }
 }
 
 /**
@@ -354,7 +402,7 @@ svm_msg_q_unlock (svm_msg_q_t * mq)
  * Must be called with mutex held. The queue only works non-blocking
  * with eventfds, so handle blocking calls as an exception here.
  */
-void svm_msg_q_wait (svm_msg_q_t *mq);
+int svm_msg_q_wait (svm_msg_q_t *mq, svm_msg_q_wait_type_t type);
 
 /**
  * Timed wait for message queue event