X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fsvm%2Fmessage_queue.c;h=a6af7962f73082ad17b8939632bdfad456fe5a99;hb=961d3631125df357c966f6ea4ee7b5fe57421c45;hp=a40c4a46a390ba2a51f058dec2edd8964d80e6ef;hpb=2b5fed8696ce2a9b67e63cf5b5dbf49505172c9a;p=vpp.git diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c index a40c4a46a39..a6af7962f73 100644 --- a/src/svm/message_queue.c +++ b/src/svm/message_queue.c @@ -15,7 +15,10 @@ #include #include +#include +#include #include +#include static inline svm_msg_q_ring_t * svm_msg_q_ring_inline (svm_msg_q_t * mq, u32 ring_index) @@ -33,24 +36,81 @@ static inline void * svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index) { ASSERT (elt_index < ring->nitems); - return (ring->data + elt_index * ring->elsize); + return (ring->shr->data + elt_index * ring->elsize); } -svm_msg_q_t * -svm_msg_q_alloc (svm_msg_q_cfg_t * cfg) +static void +svm_msg_q_init_mutex (svm_msg_q_shared_queue_t *sq) +{ + pthread_mutexattr_t attr; + pthread_condattr_t cattr; + + clib_memset (&attr, 0, sizeof (attr)); + clib_memset (&cattr, 0, sizeof (cattr)); + + if (pthread_mutexattr_init (&attr)) + clib_unix_warning ("mutexattr_init"); + if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED)) + clib_unix_warning ("pthread_mutexattr_setpshared"); + if (pthread_mutexattr_setrobust (&attr, PTHREAD_MUTEX_ROBUST)) + clib_unix_warning ("setrobust"); + if (pthread_mutex_init (&sq->mutex, &attr)) + clib_unix_warning ("mutex_init"); + if (pthread_mutexattr_destroy (&attr)) + clib_unix_warning ("mutexattr_destroy"); + if (pthread_condattr_init (&cattr)) + clib_unix_warning ("condattr_init"); + if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED)) + clib_unix_warning ("condattr_setpshared"); + if (pthread_cond_init (&sq->condvar, &cattr)) + clib_unix_warning ("cond_init1"); + if (pthread_condattr_destroy (&cattr)) + clib_unix_warning ("cond_init2"); +} + +svm_msg_q_shared_t * +svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg) +{ + svm_msg_q_ring_shared_t *ring; + svm_msg_q_shared_queue_t *sq; + svm_msg_q_shared_t *smq; + u32 q_sz, offset; + int i; + + q_sz = sizeof (*sq) + cfg->q_nitems * sizeof (svm_msg_q_msg_t); + + smq = (svm_msg_q_shared_t *) base; + sq = smq->q; + clib_memset (sq, 0, sizeof (*sq)); + sq->elsize = sizeof (svm_msg_q_msg_t); + sq->maxsize = cfg->q_nitems; + smq->n_rings = cfg->n_rings; + ring = (void *) ((u8 *) smq->q + q_sz); + for (i = 0; i < cfg->n_rings; i++) + { + ring->elsize = cfg->ring_cfgs[i].elsize; + ring->nitems = cfg->ring_cfgs[i].nitems; + ring->cursize = ring->head = ring->tail = 0; + offset = sizeof (*ring) + ring->nitems * ring->elsize; + ring = (void *) ((u8 *) ring + offset); + } + + svm_msg_q_init_mutex (sq); + + return smq; +} + +uword +svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg) { svm_msg_q_ring_cfg_t *ring_cfg; uword rings_sz = 0, mq_sz; - svm_msg_q_ring_t *ring; - u8 *base, *rings_ptr; - vec_header_t *vh; - u32 vec_sz, q_sz; - svm_msg_q_t *mq; + u32 q_sz; int i; ASSERT (cfg); - vec_sz = vec_header_bytes (0) + sizeof (svm_msg_q_ring_t) * cfg->n_rings; + rings_sz = sizeof (svm_msg_q_ring_shared_t) * cfg->n_rings; for (i = 0; i < cfg->n_rings; i++) { if (cfg->ring_cfgs[i].data) @@ -59,56 +119,119 @@ svm_msg_q_alloc (svm_msg_q_cfg_t * cfg) rings_sz += (uword) ring_cfg->nitems * ring_cfg->elsize; } - q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t); - mq_sz = sizeof (svm_msg_q_t) + vec_sz + rings_sz + q_sz; + q_sz = sizeof (svm_msg_q_shared_queue_t) + + cfg->q_nitems * sizeof (svm_msg_q_msg_t); + mq_sz = sizeof (svm_msg_q_shared_t) + q_sz + rings_sz; + + return mq_sz; +} + +svm_msg_q_shared_t * +svm_msg_q_alloc (svm_msg_q_cfg_t *cfg) +{ + uword mq_sz; + u8 *base; + + mq_sz = svm_msg_q_size_to_alloc (cfg); base = clib_mem_alloc_aligned (mq_sz, CLIB_CACHE_LINE_BYTES); if (!base) return 0; - mq = (svm_msg_q_t *) base; - mq->q = svm_queue_init (base + sizeof (svm_msg_q_t), cfg->q_nitems, - sizeof (svm_msg_q_msg_t)); - mq->q->consumer_pid = cfg->consumer_pid; - vh = (vec_header_t *) ((u8 *) mq->q + q_sz); - vh->len = cfg->n_rings; - mq->rings = (svm_msg_q_ring_t *) (vh + 1); - rings_ptr = (u8 *) mq->rings + sizeof (svm_msg_q_ring_t) * cfg->n_rings; - for (i = 0; i < cfg->n_rings; i++) + return svm_msg_q_init (base, cfg); +} + +void +svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base) +{ + svm_msg_q_ring_shared_t *ring; + svm_msg_q_shared_t *smq; + u32 i, n_rings, q_sz, offset; + + smq = (svm_msg_q_shared_t *) smq_base; + mq->q.shr = smq->q; + mq->q.evtfd = -1; + n_rings = smq->n_rings; + vec_validate (mq->rings, n_rings - 1); + q_sz = sizeof (svm_msg_q_shared_queue_t) + + mq->q.shr->maxsize * sizeof (svm_msg_q_msg_t); + ring = (void *) ((u8 *) smq->q + q_sz); + for (i = 0; i < n_rings; i++) { - ring = &mq->rings[i]; - ring->elsize = cfg->ring_cfgs[i].elsize; - ring->nitems = cfg->ring_cfgs[i].nitems; - ring->cursize = ring->head = ring->tail = 0; - if (cfg->ring_cfgs[i].data) - ring->data = cfg->ring_cfgs[i].data; - else - { - ring->data = rings_ptr; - rings_ptr += (uword) ring->nitems * ring->elsize; - } + mq->rings[i].nitems = ring->nitems; + mq->rings[i].elsize = ring->elsize; + mq->rings[i].shr = ring; + offset = sizeof (*ring) + ring->nitems * ring->elsize; + ring = (void *) ((u8 *) ring + offset); } + clib_spinlock_init (&mq->q.lock); +} - return mq; +void +svm_msg_q_cleanup (svm_msg_q_t *mq) +{ + vec_free (mq->rings); + clib_spinlock_free (&mq->q.lock); + if (mq->q.evtfd != -1) + close (mq->q.evtfd); } void svm_msg_q_free (svm_msg_q_t * mq) { - svm_queue_free (mq->q); + svm_msg_q_cleanup (mq); + clib_mem_free (mq->q.shr); clib_mem_free (mq); } +static void +svm_msg_q_send_signal (svm_msg_q_t *mq, u8 is_consumer) +{ + if (mq->q.evtfd == -1) + { + if (is_consumer) + { + int rv = pthread_mutex_lock (&mq->q.shr->mutex); + if (PREDICT_FALSE (rv == EOWNERDEAD)) + { + rv = pthread_mutex_consistent (&mq->q.shr->mutex); + return; + } + } + + (void) pthread_cond_broadcast (&mq->q.shr->condvar); + + if (is_consumer) + pthread_mutex_unlock (&mq->q.shr->mutex); + } + else + { + int __clib_unused rv; + u64 data = 1; + + if (mq->q.evtfd < 0) + return; + + rv = write (mq->q.evtfd, &data, sizeof (data)); + if (PREDICT_FALSE (rv < 0)) + clib_unix_warning ("signal write on %d returned %d", mq->q.evtfd, rv); + } +} + svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index) { + svm_msg_q_ring_shared_t *sr; + svm_msg_q_ring_t *ring; svm_msg_q_msg_t msg; - svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index); - ASSERT (ring->cursize < ring->nitems); + ring = svm_msg_q_ring_inline (mq, ring_index); + sr = ring->shr; + + ASSERT (sr->cursize < ring->nitems); msg.ring_index = ring - mq->rings; - msg.elt_index = ring->tail; - ring->tail = (ring->tail + 1) % ring->nitems; - clib_atomic_fetch_add (&ring->cursize, 1); + msg.elt_index = sr->tail; + sr->tail = (sr->tail + 1) % ring->nitems; + clib_atomic_fetch_add_rel (&sr->cursize, 1); return msg; } @@ -127,18 +250,13 @@ svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index, return -2; } *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index); - if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (msg))) - { - svm_msg_q_unlock (mq); - return -2; - } } else { svm_msg_q_lock (mq); while (svm_msg_q_is_full (mq) || svm_msg_q_ring_is_full (mq, ring_index)) - svm_msg_q_wait (mq); + svm_msg_q_wait_prod (mq); *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index); } return 0; @@ -148,16 +266,18 @@ svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes) { svm_msg_q_msg_t msg = {.as_u64 = ~0 }; + svm_msg_q_ring_shared_t *sr; svm_msg_q_ring_t *ring; vec_foreach (ring, mq->rings) { - if (ring->elsize < nbytes || ring->cursize == ring->nitems) + sr = ring->shr; + if (ring->elsize < nbytes || sr->cursize == ring->nitems) continue; msg.ring_index = ring - mq->rings; - msg.elt_index = ring->tail; - ring->tail = (ring->tail + 1) % ring->nitems; - clib_atomic_fetch_add (&ring->cursize, 1); + msg.elt_index = sr->tail; + sr->tail = (sr->tail + 1) % ring->nitems; + clib_atomic_fetch_add_relax (&sr->cursize, 1); break; } return msg; @@ -173,103 +293,350 @@ svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) { + svm_msg_q_ring_shared_t *sr; svm_msg_q_ring_t *ring; + u32 need_signal; ASSERT (vec_len (mq->rings) > msg->ring_index); - ring = &mq->rings[msg->ring_index]; - if (msg->elt_index == ring->head) + ring = svm_msg_q_ring_inline (mq, msg->ring_index); + sr = ring->shr; + if (msg->elt_index == sr->head) { - ring->head = (ring->head + 1) % ring->nitems; + sr->head = (sr->head + 1) % ring->nitems; } else { - clib_warning ("message out of order"); + clib_warning ("message out of order: elt %u head %u ring %u", + msg->elt_index, sr->head, msg->ring_index); /* for now, expect messages to be processed in order */ ASSERT (0); } - clib_atomic_fetch_sub (&ring->cursize, 1); + + need_signal = clib_atomic_load_relax_n (&sr->cursize) == ring->nitems; + clib_atomic_fetch_sub_relax (&sr->cursize, 1); + + if (PREDICT_FALSE (need_signal)) + svm_msg_q_send_signal (mq, 1 /* is consumer */); } static int svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) { u32 dist1, dist2, tail, head; + svm_msg_q_ring_shared_t *sr; svm_msg_q_ring_t *ring; if (vec_len (mq->rings) <= msg->ring_index) return 0; - ring = &mq->rings[msg->ring_index]; - tail = ring->tail; - head = ring->head; + + ring = svm_msg_q_ring_inline (mq, msg->ring_index); + sr = ring->shr; + tail = sr->tail; + head = sr->head; dist1 = ((ring->nitems + msg->elt_index) - head) % ring->nitems; if (tail == head) - dist2 = (ring->cursize == 0) ? 0 : ring->nitems; + dist2 = (sr->cursize == 0) ? 0 : ring->nitems; else dist2 = ((ring->nitems + tail) - head) % ring->nitems; return (dist1 < dist2); } +static void +svm_msg_q_add_raw (svm_msg_q_t *mq, u8 *elem) +{ + svm_msg_q_shared_queue_t *sq = mq->q.shr; + i8 *tailp; + u32 sz; + + tailp = (i8 *) (&sq->data[0] + sq->elsize * sq->tail); + clib_memcpy_fast (tailp, elem, sq->elsize); + + sq->tail = (sq->tail + 1) % sq->maxsize; + + sz = clib_atomic_fetch_add_rel (&sq->cursize, 1); + if (!sz) + svm_msg_q_send_signal (mq, 0 /* is consumer */); +} + int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait) { ASSERT (svm_msq_q_msg_is_valid (mq, msg)); - return svm_queue_add (mq->q, (u8 *) msg, nowait); + + if (nowait) + { + /* zero on success */ + if (svm_msg_q_try_lock (mq)) + { + return (-1); + } + } + else + svm_msg_q_lock (mq); + + if (PREDICT_FALSE (svm_msg_q_is_full (mq))) + { + if (nowait) + return (-2); + while (svm_msg_q_is_full (mq)) + svm_msg_q_wait_prod (mq); + } + + svm_msg_q_add_raw (mq, (u8 *) msg); + + svm_msg_q_unlock (mq); + + return 0; } void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) { ASSERT (svm_msq_q_msg_is_valid (mq, msg)); - svm_queue_add_raw (mq->q, (u8 *) msg); + svm_msg_q_add_raw (mq, (u8 *) msg); svm_msg_q_unlock (mq); } int -svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, - svm_q_conditional_wait_t cond, u32 time) +svm_msg_q_sub_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *elem) { - return svm_queue_sub (mq->q, (u8 *) msg, cond, time); + svm_msg_q_shared_queue_t *sq = mq->q.shr; + i8 *headp; + u32 sz; + + ASSERT (!svm_msg_q_is_empty (mq)); + + headp = (i8 *) (&sq->data[0] + sq->elsize * sq->head); + clib_memcpy_fast (elem, headp, sq->elsize); + + sq->head = (sq->head + 1) % sq->maxsize; + + sz = clib_atomic_fetch_sub_relax (&sq->cursize, 1); + if (PREDICT_FALSE (sz == sq->maxsize)) + svm_msg_q_send_signal (mq, 1 /* is consumer */); + + return 0; } -void -svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) +int +svm_msg_q_sub_raw_batch (svm_msg_q_t *mq, svm_msg_q_msg_t *msg_buf, u32 n_msgs) { - svm_queue_sub_raw (mq->q, (u8 *) msg); + svm_msg_q_shared_queue_t *sq = mq->q.shr; + u32 sz, to_deq; + i8 *headp; + + sz = svm_msg_q_size (mq); + ASSERT (sz); + to_deq = clib_min (sz, n_msgs); + + headp = (i8 *) (&sq->data[0] + sq->elsize * sq->head); + + if (sq->head + to_deq < sq->maxsize) + { + clib_memcpy_fast (msg_buf, headp, sq->elsize * to_deq); + sq->head += to_deq; + } + else + { + u32 first_batch = sq->maxsize - sq->head; + clib_memcpy_fast (msg_buf, headp, sq->elsize * first_batch); + clib_memcpy_fast (msg_buf + first_batch, sq->data, + sq->elsize * (to_deq - first_batch)); + sq->head = (sq->head + to_deq) % sq->maxsize; + } + + clib_atomic_fetch_sub_relax (&sq->cursize, to_deq); + if (PREDICT_FALSE (sz == sq->maxsize)) + svm_msg_q_send_signal (mq, 1 /* is consumer */); + + return to_deq; } -void -svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd) +int +svm_msg_q_sub (svm_msg_q_t *mq, svm_msg_q_msg_t *msg, + svm_q_conditional_wait_t cond, u32 time) { - mq->q->consumer_evtfd = fd; + int rc = 0; + + if (svm_msg_q_is_empty (mq)) + { + if (cond == SVM_Q_NOWAIT) + { + return (-2); + } + else if (cond == SVM_Q_TIMEDWAIT) + { + if ((rc = svm_msg_q_timedwait (mq, time))) + return rc; + } + else + { + svm_msg_q_wait (mq, SVM_MQ_WAIT_EMPTY); + } + } + + svm_msg_q_sub_raw (mq, msg); + + return 0; } void -svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd) +svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd) { - mq->q->producer_evtfd = fd; + mq->q.evtfd = fd; } int -svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq) +svm_msg_q_alloc_eventfd (svm_msg_q_t *mq) { int fd; - if ((fd = eventfd (0, EFD_NONBLOCK)) < 0) + if ((fd = eventfd (0, 0)) < 0) return -1; - svm_msg_q_set_consumer_eventfd (mq, fd); + svm_msg_q_set_eventfd (mq, fd); return 0; } int -svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq) +svm_msg_q_wait (svm_msg_q_t *mq, svm_msg_q_wait_type_t type) { - int fd; - if ((fd = eventfd (0, EFD_NONBLOCK)) < 0) - return -1; - svm_msg_q_set_producer_eventfd (mq, fd); + u8 (*fn) (svm_msg_q_t *); + int rv; + + fn = (type == SVM_MQ_WAIT_EMPTY) ? svm_msg_q_is_empty : svm_msg_q_is_full; + + if (mq->q.evtfd == -1) + { + rv = pthread_mutex_lock (&mq->q.shr->mutex); + if (PREDICT_FALSE (rv == EOWNERDEAD)) + { + rv = pthread_mutex_consistent (&mq->q.shr->mutex); + return rv; + } + + while (fn (mq)) + pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex); + + pthread_mutex_unlock (&mq->q.shr->mutex); + } + else + { + u64 buf; + + while (fn (mq)) + { + while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0) + { + if (errno != EAGAIN) + { + clib_unix_warning ("read error"); + return rv; + } + } + } + } + + return 0; +} + +int +svm_msg_q_wait_prod (svm_msg_q_t *mq) +{ + if (mq->q.evtfd == -1) + { + while (svm_msg_q_is_full (mq)) + pthread_cond_wait (&mq->q.shr->condvar, &mq->q.shr->mutex); + } + else + { + u64 buf; + int rv; + + while (svm_msg_q_is_full (mq)) + { + while ((rv = read (mq->q.evtfd, &buf, sizeof (buf))) < 0) + { + if (errno != EAGAIN) + { + clib_unix_warning ("read error"); + return rv; + } + } + } + } + return 0; } +int +svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout) +{ + if (mq->q.evtfd == -1) + { + svm_msg_q_shared_queue_t *sq = mq->q.shr; + struct timespec ts; + u32 sz; + int rv; + + rv = pthread_mutex_lock (&sq->mutex); + if (PREDICT_FALSE (rv == EOWNERDEAD)) + { + rv = pthread_mutex_consistent (&sq->mutex); + return rv; + } + + /* check if we're still in a signalable state after grabbing lock */ + sz = svm_msg_q_size (mq); + if (sz != 0 && sz != sq->maxsize) + { + pthread_mutex_unlock (&sq->mutex); + return 0; + } + + ts.tv_sec = unix_time_now () + (u32) timeout; + ts.tv_nsec = (timeout - (u32) timeout) * 1e9; + rv = pthread_cond_timedwait (&sq->condvar, &sq->mutex, &ts); + + pthread_mutex_unlock (&sq->mutex); + return rv; + } + else + { + struct timeval tv; + u64 buf; + int rv; + + tv.tv_sec = (u64) timeout; + tv.tv_usec = ((u64) timeout - (u64) timeout) * 1e9; + rv = setsockopt (mq->q.evtfd, SOL_SOCKET, SO_RCVTIMEO, + (const char *) &tv, sizeof tv); + if (rv < 0) + { + clib_unix_warning ("setsockopt"); + return -1; + } + + rv = read (mq->q.evtfd, &buf, sizeof (buf)); + if (rv < 0) + clib_warning ("read %u", errno); + + return rv < 0 ? errno : 0; + } +} + +u8 * +format_svm_msg_q (u8 * s, va_list * args) +{ + svm_msg_q_t *mq = va_arg (*args, svm_msg_q_t *); + s = format (s, " [Q:%d/%d]", mq->q.shr->cursize, mq->q.shr->maxsize); + for (u32 i = 0; i < vec_len (mq->rings); i++) + { + s = format (s, " [R%d:%d/%d]", i, mq->rings[i].shr->cursize, + mq->rings[i].nitems); + } + return s; +} + /* * fd.io coding-style-patch-verification: ON *