X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fsvm%2Fmessage_queue.c;h=b381173c70b31082d0177700d51e6f20d2231113;hb=ce266ad574c4145e837c716c46ef0ef6b02620ce;hp=0f9be9c11b7344e5432e65c042cae64427cf0d30;hpb=e91bdb3783653b34c1bbf9738a739e646d02f533;p=vpp.git diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c index 0f9be9c11b7..b381173c70b 100644 --- a/src/svm/message_queue.c +++ b/src/svm/message_queue.c @@ -15,6 +15,8 @@ #include #include +#include +#include static inline svm_msg_q_ring_t * svm_msg_q_ring_inline (svm_msg_q_t * mq, u32 ring_index) @@ -39,12 +41,12 @@ svm_msg_q_t * svm_msg_q_alloc (svm_msg_q_cfg_t * cfg) { svm_msg_q_ring_cfg_t *ring_cfg; + uword rings_sz = 0, mq_sz; svm_msg_q_ring_t *ring; u8 *base, *rings_ptr; - uword rings_sz = 0; vec_header_t *vh; + u32 vec_sz, q_sz; svm_msg_q_t *mq; - u32 vec_sz; int i; ASSERT (cfg); @@ -58,21 +60,26 @@ svm_msg_q_alloc (svm_msg_q_cfg_t * cfg) rings_sz += (uword) ring_cfg->nitems * ring_cfg->elsize; } - base = clib_mem_alloc_aligned (sizeof (svm_msg_q_t) + vec_sz + rings_sz, - CLIB_CACHE_LINE_BYTES); + q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t); + mq_sz = sizeof (svm_msg_q_t) + vec_sz + rings_sz + q_sz; + base = clib_mem_alloc_aligned (mq_sz, CLIB_CACHE_LINE_BYTES); if (!base) return 0; mq = (svm_msg_q_t *) base; - vh = (vec_header_t *) (base + sizeof (svm_msg_q_t)); + mq->q = svm_queue_init (base + sizeof (svm_msg_q_t), cfg->q_nitems, + sizeof (svm_msg_q_msg_t)); + mq->q->consumer_pid = cfg->consumer_pid; + vh = (vec_header_t *) ((u8 *) mq->q + q_sz); vh->len = cfg->n_rings; mq->rings = (svm_msg_q_ring_t *) (vh + 1); - rings_ptr = (u8 *) mq->rings + vec_sz; + rings_ptr = (u8 *) mq->rings + sizeof (svm_msg_q_ring_t) * cfg->n_rings; for (i = 0; i < cfg->n_rings; i++) { ring = &mq->rings[i]; ring->elsize = cfg->ring_cfgs[i].elsize; ring->nitems = cfg->ring_cfgs[i].nitems; + ring->cursize = ring->head = ring->tail = 0; if (cfg->ring_cfgs[i].data) ring->data = cfg->ring_cfgs[i].data; else @@ -81,8 +88,6 @@ svm_msg_q_alloc (svm_msg_q_cfg_t * cfg) rings_ptr += (uword) ring->nitems * ring->elsize; } } - mq->q = svm_queue_init (cfg->q_nitems, sizeof (svm_msg_q_msg_t), - cfg->consumer_pid, 0); return mq; } @@ -97,14 +102,14 @@ svm_msg_q_free (svm_msg_q_t * mq) svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index) { - svm_msg_q_msg_t msg = {.as_u64 = ~0 }; + svm_msg_q_msg_t msg; svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index); - ASSERT (ring->cursize != ring->nitems); + ASSERT (ring->cursize < ring->nitems); msg.ring_index = ring - mq->rings; msg.elt_index = ring->tail; ring->tail = (ring->tail + 1) % ring->nitems; - __sync_fetch_and_add (&ring->cursize, 1); + clib_atomic_fetch_add (&ring->cursize, 1); return msg; } @@ -116,27 +121,21 @@ svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index, { if (svm_msg_q_try_lock (mq)) return -1; - if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, ring_index))) + if (PREDICT_FALSE (svm_msg_q_is_full (mq) + || svm_msg_q_ring_is_full (mq, ring_index))) { svm_msg_q_unlock (mq); return -2; } *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index); - if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (msg))) - { - svm_msg_q_unlock (mq); - return -2; - } } else { svm_msg_q_lock (mq); + while (svm_msg_q_is_full (mq) + || svm_msg_q_ring_is_full (mq, ring_index)) + svm_msg_q_wait (mq); *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index); - while (svm_msg_q_msg_is_invalid (msg)) - { - svm_msg_q_wait (mq); - *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index); - } } return 0; } @@ -154,7 +153,7 @@ svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes) msg.ring_index = ring - mq->rings; msg.elt_index = ring->tail; ring->tail = (ring->tail + 1) % ring->nitems; - __sync_fetch_and_add (&ring->cursize, 1); + clib_atomic_fetch_add (&ring->cursize, 1); break; } return msg; @@ -171,9 +170,9 @@ void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) { svm_msg_q_ring_t *ring; + int need_signal; - if (vec_len (mq->rings) <= msg->ring_index) - return; + ASSERT (vec_len (mq->rings) > msg->ring_index); ring = &mq->rings[msg->ring_index]; if (msg->elt_index == ring->head) { @@ -181,27 +180,35 @@ svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) } else { + clib_warning ("message out of order"); /* for now, expect messages to be processed in order */ ASSERT (0); } - __sync_fetch_and_sub (&ring->cursize, 1); + + need_signal = ring->cursize == ring->nitems; + clib_atomic_fetch_sub (&ring->cursize, 1); + + if (PREDICT_FALSE (need_signal)) + svm_queue_send_signal (mq->q, 0); } static int svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) { + u32 dist1, dist2, tail, head; svm_msg_q_ring_t *ring; - u32 dist1, dist2; if (vec_len (mq->rings) <= msg->ring_index) return 0; ring = &mq->rings[msg->ring_index]; + tail = ring->tail; + head = ring->head; - dist1 = ((ring->nitems + msg->elt_index) - ring->head) % ring->nitems; - if (ring->tail == ring->head) + dist1 = ((ring->nitems + msg->elt_index) - head) % ring->nitems; + if (tail == head) dist2 = (ring->cursize == 0) ? 0 : ring->nitems; else - dist2 = ((ring->nitems + ring->tail) - ring->head) % ring->nitems; + dist2 = ((ring->nitems + tail) - head) % ring->nitems; return (dist1 < dist2); } @@ -233,6 +240,51 @@ svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) svm_queue_sub_raw (mq->q, (u8 *) msg); } +void +svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd) +{ + mq->q->consumer_evtfd = fd; +} + +void +svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd) +{ + mq->q->producer_evtfd = fd; +} + +int +svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq) +{ + int fd; + if ((fd = eventfd (0, EFD_NONBLOCK)) < 0) + return -1; + svm_msg_q_set_consumer_eventfd (mq, fd); + return 0; +} + +int +svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq) +{ + int fd; + if ((fd = eventfd (0, EFD_NONBLOCK)) < 0) + return -1; + svm_msg_q_set_producer_eventfd (mq, fd); + return 0; +} + +u8 * +format_svm_msg_q (u8 * s, va_list * args) +{ + svm_msg_q_t *mq = va_arg (*args, svm_msg_q_t *); + s = format (s, " [Q:%d/%d]", mq->q->cursize, mq->q->maxsize); + for (u32 i = 0; i < vec_len (mq->rings); i++) + { + s = format (s, " [R%d:%d/%d]", i, mq->rings[i].cursize, + mq->rings[i].nitems); + } + return s; +} + /* * fd.io coding-style-patch-verification: ON *