X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fsvm%2Fmessage_queue.h;h=1ef773d9f0a6d1542365f76d955e4962eac9136f;hb=5398dfb2592d525018997a991a4f7bfde515adc4;hp=7716c6724d897ea09ef380338b06a4f1c46d11bf;hpb=2b97f597c6705809201ce6a6846d46c47c0933ba;p=vpp.git diff --git a/src/svm/message_queue.h b/src/svm/message_queue.h index 7716c6724d8..1ef773d9f0a 100644 --- a/src/svm/message_queue.h +++ b/src/svm/message_queue.h @@ -22,6 +22,7 @@ #include #include +#include #include typedef struct svm_msg_q_shr_queue_ @@ -41,6 +42,7 @@ typedef struct svm_msg_q_queue_ { svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */ int evtfd; /**< producer/consumer eventfd */ + clib_spinlock_t lock; /**< private lock for multi-producer */ } svm_msg_q_queue_t; typedef struct svm_msg_q_ring_shared_ @@ -99,6 +101,13 @@ typedef union } svm_msg_q_msg_t; #define SVM_MQ_INVALID_MSG { .as_u64 = ~0 } + +typedef enum svm_msg_q_wait_type_ +{ + SVM_MQ_WAIT_EMPTY, + SVM_MQ_WAIT_FULL +} svm_msg_q_wait_type_t; + /** * Allocate message queue * @@ -206,6 +215,7 @@ void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg); * Consumer dequeue one message from queue * * This returns the message pointing to the data in the message rings. + * Should only be used in single consumer scenarios as no locks are grabbed. * The consumer is expected to call @ref svm_msg_q_free_msg once it * finishes processing/copies the message data. * @@ -219,18 +229,34 @@ int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, svm_q_conditional_wait_t cond, u32 time); /** - * Consumer dequeue one message from queue with mutex held + * Consumer dequeue one message from queue * - * Returns the message pointing to the data in the message rings under the - * assumption that the message queue lock is already held. The consumer is - * expected to call @ref svm_msg_q_free_msg once it finishes + * Returns the message pointing to the data in the message rings. Should only + * be used in single consumer scenarios as no locks are grabbed. The consumer + * is expected to call @ref svm_msg_q_free_msg once it finishes * processing/copies the message data. * * @param mq message queue * @param msg pointer to structure where message is to be received * @return success status */ -void svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg); +int svm_msg_q_sub_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *elem); + +/** + * Consumer dequeue multiple messages from queue + * + * Returns the message pointing to the data in the message rings. Should only + * be used in single consumer scenarios as no locks are grabbed. The consumer + * is expected to call @ref svm_msg_q_free_msg once it finishes + * processing/copies the message data. + * + * @param mq message queue + * @param msg_buf pointer to array of messages to received + * @param n_msgs lengt of msg_buf array + * @return number of messages dequeued + */ +int svm_msg_q_sub_raw_batch (svm_msg_q_t *mq, svm_msg_q_msg_t *msg_buf, + u32 n_msgs); /** * Get data for message in queue @@ -321,10 +347,17 @@ svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg) static inline int svm_msg_q_try_lock (svm_msg_q_t * mq) { - int rv = pthread_mutex_trylock (&mq->q.shr->mutex); - if (PREDICT_FALSE (rv == EOWNERDEAD)) - rv = pthread_mutex_consistent (&mq->q.shr->mutex); - return rv; + if (mq->q.evtfd == -1) + { + int rv = pthread_mutex_trylock (&mq->q.shr->mutex); + if (PREDICT_FALSE (rv == EOWNERDEAD)) + rv = pthread_mutex_consistent (&mq->q.shr->mutex); + return rv; + } + else + { + return !clib_spinlock_trylock (&mq->q.lock); + } } /** @@ -333,10 +366,18 @@ svm_msg_q_try_lock (svm_msg_q_t * mq) static inline int svm_msg_q_lock (svm_msg_q_t * mq) { - int rv = pthread_mutex_lock (&mq->q.shr->mutex); - if (PREDICT_FALSE (rv == EOWNERDEAD)) - rv = pthread_mutex_consistent (&mq->q.shr->mutex); - return rv; + if (mq->q.evtfd == -1) + { + int rv = pthread_mutex_lock (&mq->q.shr->mutex); + if (PREDICT_FALSE (rv == EOWNERDEAD)) + rv = pthread_mutex_consistent (&mq->q.shr->mutex); + return rv; + } + else + { + clib_spinlock_lock (&mq->q.lock); + return 0; + } } /** @@ -345,7 +386,14 @@ svm_msg_q_lock (svm_msg_q_t * mq) static inline void svm_msg_q_unlock (svm_msg_q_t * mq) { - pthread_mutex_unlock (&mq->q.shr->mutex); + if (mq->q.evtfd == -1) + { + pthread_mutex_unlock (&mq->q.shr->mutex); + } + else + { + clib_spinlock_unlock (&mq->q.lock); + } } /** @@ -354,7 +402,7 @@ svm_msg_q_unlock (svm_msg_q_t * mq) * Must be called with mutex held. The queue only works non-blocking * with eventfds, so handle blocking calls as an exception here. */ -void svm_msg_q_wait (svm_msg_q_t *mq); +int svm_msg_q_wait (svm_msg_q_t *mq, svm_msg_q_wait_type_t type); /** * Timed wait for message queue event