#include <vppinfra/clib.h>
#include <vppinfra/error.h>
+#include <vppinfra/lock.h>
#include <svm/queue.h>
typedef struct svm_msg_q_shr_queue_
{
svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */
int evtfd; /**< producer/consumer eventfd */
+ clib_spinlock_t lock; /**< private lock for multi-producer */
} svm_msg_q_queue_t;
typedef struct svm_msg_q_ring_shared_
} svm_msg_q_msg_t;
#define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
+
+typedef enum svm_msg_q_wait_type_
+{
+ SVM_MQ_WAIT_EMPTY,
+ SVM_MQ_WAIT_FULL
+} svm_msg_q_wait_type_t;
+
/**
* Allocate message queue
*
* Consumer dequeue one message from queue
*
* This returns the message pointing to the data in the message rings.
+ * Should only be used in single consumer scenarios as no locks are grabbed.
* The consumer is expected to call @ref svm_msg_q_free_msg once it
* finishes processing/copies the message data.
*
svm_q_conditional_wait_t cond, u32 time);
/**
- * Consumer dequeue one message from queue with mutex held
+ * Consumer dequeue one message from queue
*
- * Returns the message pointing to the data in the message rings under the
- * assumption that the message queue lock is already held. The consumer is
- * expected to call @ref svm_msg_q_free_msg once it finishes
+ * Returns the message pointing to the data in the message rings. Should only
+ * be used in single consumer scenarios as no locks are grabbed. The consumer
+ * is expected to call @ref svm_msg_q_free_msg once it finishes
* processing/copies the message data.
*
* @param mq message queue
* @param msg pointer to structure where message is to be received
* @return success status
*/
-void svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
+int svm_msg_q_sub_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *elem);
+
+/**
+ * Consumer dequeue multiple messages from queue
+ *
+ * Returns the message pointing to the data in the message rings. Should only
+ * be used in single consumer scenarios as no locks are grabbed. The consumer
+ * is expected to call @ref svm_msg_q_free_msg once it finishes
+ * processing/copies the message data.
+ *
+ * @param mq message queue
+ * @param msg_buf pointer to array of messages to received
+ * @param n_msgs lengt of msg_buf array
+ * @return number of messages dequeued
+ */
+int svm_msg_q_sub_raw_batch (svm_msg_q_t *mq, svm_msg_q_msg_t *msg_buf,
+ u32 n_msgs);
/**
* Get data for message in queue
static inline int
svm_msg_q_try_lock (svm_msg_q_t * mq)
{
- int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
- if (PREDICT_FALSE (rv == EOWNERDEAD))
- rv = pthread_mutex_consistent (&mq->q.shr->mutex);
- return rv;
+ if (mq->q.evtfd == -1)
+ {
+ int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
+ if (PREDICT_FALSE (rv == EOWNERDEAD))
+ rv = pthread_mutex_consistent (&mq->q.shr->mutex);
+ return rv;
+ }
+ else
+ {
+ return !clib_spinlock_trylock (&mq->q.lock);
+ }
}
/**
static inline int
svm_msg_q_lock (svm_msg_q_t * mq)
{
- int rv = pthread_mutex_lock (&mq->q.shr->mutex);
- if (PREDICT_FALSE (rv == EOWNERDEAD))
- rv = pthread_mutex_consistent (&mq->q.shr->mutex);
- return rv;
+ if (mq->q.evtfd == -1)
+ {
+ int rv = pthread_mutex_lock (&mq->q.shr->mutex);
+ if (PREDICT_FALSE (rv == EOWNERDEAD))
+ rv = pthread_mutex_consistent (&mq->q.shr->mutex);
+ return rv;
+ }
+ else
+ {
+ clib_spinlock_lock (&mq->q.lock);
+ return 0;
+ }
}
/**
static inline void
svm_msg_q_unlock (svm_msg_q_t * mq)
{
- pthread_mutex_unlock (&mq->q.shr->mutex);
+ if (mq->q.evtfd == -1)
+ {
+ pthread_mutex_unlock (&mq->q.shr->mutex);
+ }
+ else
+ {
+ clib_spinlock_unlock (&mq->q.lock);
+ }
}
/**
* Must be called with mutex held. The queue only works non-blocking
* with eventfds, so handle blocking calls as an exception here.
*/
-void svm_msg_q_wait (svm_msg_q_t *mq);
+int svm_msg_q_wait (svm_msg_q_t *mq, svm_msg_q_wait_type_t type);
/**
* Timed wait for message queue event