X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fsvm%2Fmessage_queue.c;h=13d089a97ccc4b22c99e2cabb2d1134c0a57c77a;hb=ad9d528;hp=4f3e7642740bc08ef0dbbfd9f04d03fb4be60715;hpb=95e0ce05470554e403ade1db322800e489165b1b;p=vpp.git diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c index 4f3e7642740..13d089a97cc 100644 --- a/src/svm/message_queue.c +++ b/src/svm/message_queue.c @@ -15,34 +15,76 @@ #include #include +#include + +static inline svm_msg_q_ring_t * +svm_msg_q_ring_inline (svm_msg_q_t * mq, u32 ring_index) +{ + return vec_elt_at_index (mq->rings, ring_index); +} + +svm_msg_q_ring_t * +svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index) +{ + return svm_msg_q_ring_inline (mq, ring_index); +} + +static inline void * +svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index) +{ + ASSERT (elt_index < ring->nitems); + return (ring->data + elt_index * ring->elsize); +} svm_msg_q_t * svm_msg_q_alloc (svm_msg_q_cfg_t * cfg) { + svm_msg_q_ring_cfg_t *ring_cfg; + uword rings_sz = 0, mq_sz; svm_msg_q_ring_t *ring; + u8 *base, *rings_ptr; + vec_header_t *vh; + u32 vec_sz, q_sz; svm_msg_q_t *mq; - uword size; int i; - if (!cfg) + ASSERT (cfg); + + vec_sz = vec_header_bytes (0) + sizeof (svm_msg_q_ring_t) * cfg->n_rings; + for (i = 0; i < cfg->n_rings; i++) + { + if (cfg->ring_cfgs[i].data) + continue; + ring_cfg = &cfg->ring_cfgs[i]; + rings_sz += (uword) ring_cfg->nitems * ring_cfg->elsize; + } + + q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t); + mq_sz = sizeof (svm_msg_q_t) + vec_sz + rings_sz + q_sz; + base = clib_mem_alloc_aligned (mq_sz, CLIB_CACHE_LINE_BYTES); + if (!base) return 0; - mq = clib_mem_alloc_aligned (sizeof (svm_msg_q_t), CLIB_CACHE_LINE_BYTES); - memset (mq, 0, sizeof (*mq)); - mq->q = svm_queue_init (cfg->q_nitems, sizeof (svm_msg_q_msg_t), - cfg->consumer_pid, 0); - vec_validate (mq->rings, cfg->n_rings - 1); + mq = (svm_msg_q_t *) base; + mq->q = svm_queue_init (base + sizeof (svm_msg_q_t), cfg->q_nitems, + sizeof (svm_msg_q_msg_t)); + mq->q->consumer_pid = cfg->consumer_pid; + vh = (vec_header_t *) ((u8 *) mq->q + q_sz); + vh->len = cfg->n_rings; + mq->rings = (svm_msg_q_ring_t *) (vh + 1); + rings_ptr = (u8 *) mq->rings + vec_sz; for (i = 0; i < cfg->n_rings; i++) { ring = &mq->rings[i]; ring->elsize = cfg->ring_cfgs[i].elsize; ring->nitems = cfg->ring_cfgs[i].nitems; + ring->cursize = ring->head = ring->tail = 0; if (cfg->ring_cfgs[i].data) ring->data = cfg->ring_cfgs[i].data; else { - size = (uword) ring->nitems * ring->elsize; - ring->data = clib_mem_alloc_aligned (size, CLIB_CACHE_LINE_BYTES); + ring->data = rings_ptr; + rings_ptr += (uword) ring->nitems * ring->elsize; } } @@ -52,16 +94,54 @@ svm_msg_q_alloc (svm_msg_q_cfg_t * cfg) void svm_msg_q_free (svm_msg_q_t * mq) { - svm_msg_q_ring_t *ring; - - vec_foreach (ring, mq->rings) - { - clib_mem_free (ring->data); - } - vec_free (mq->rings); + svm_queue_free (mq->q); clib_mem_free (mq); } +svm_msg_q_msg_t +svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index) +{ + svm_msg_q_msg_t msg; + svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index); + + ASSERT (ring->cursize < ring->nitems); + msg.ring_index = ring - mq->rings; + msg.elt_index = ring->tail; + ring->tail = (ring->tail + 1) % ring->nitems; + clib_atomic_fetch_add (&ring->cursize, 1); + return msg; +} + +int +svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index, + u8 noblock, svm_msg_q_msg_t * msg) +{ + if (noblock) + { + if (svm_msg_q_try_lock (mq)) + return -1; + if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, ring_index))) + { + svm_msg_q_unlock (mq); + return -2; + } + *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index); + if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (msg))) + { + svm_msg_q_unlock (mq); + return -2; + } + } + else + { + svm_msg_q_lock (mq); + while (svm_msg_q_ring_is_full (mq, ring_index)) + svm_msg_q_wait (mq); + *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index); + } + return 0; +} + svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes) { @@ -75,29 +155,16 @@ svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes) msg.ring_index = ring - mq->rings; msg.elt_index = ring->tail; ring->tail = (ring->tail + 1) % ring->nitems; - __sync_fetch_and_add (&ring->cursize, 1); + clib_atomic_fetch_add (&ring->cursize, 1); break; } return msg; } -static inline svm_msg_q_ring_t * -svm_msg_q_get_ring (svm_msg_q_t * mq, u32 ring_index) -{ - return vec_elt_at_index (mq->rings, ring_index); -} - -static inline void * -svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index) -{ - ASSERT (elt_index < ring->nitems); - return (ring->data + elt_index * ring->elsize); -} - void * svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) { - svm_msg_q_ring_t *ring = svm_msg_q_get_ring (mq, msg->ring_index); + svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, msg->ring_index); return svm_msg_q_ring_data (ring, msg->elt_index); } @@ -106,8 +173,7 @@ svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) { svm_msg_q_ring_t *ring; - if (vec_len (mq->rings) <= msg->ring_index) - return; + ASSERT (vec_len (mq->rings) > msg->ring_index); ring = &mq->rings[msg->ring_index]; if (msg->elt_index == ring->head) { @@ -115,35 +181,46 @@ svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) } else { + clib_warning ("message out of order"); /* for now, expect messages to be processed in order */ ASSERT (0); } - __sync_fetch_and_sub (&ring->cursize, 1); + clib_atomic_fetch_sub (&ring->cursize, 1); } static int svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) { + u32 dist1, dist2, tail, head; svm_msg_q_ring_t *ring; - u32 dist1, dist2; if (vec_len (mq->rings) <= msg->ring_index) return 0; ring = &mq->rings[msg->ring_index]; + tail = ring->tail; + head = ring->head; - dist1 = ((ring->nitems + msg->ring_index) - ring->head) % ring->nitems; - if (ring->tail == ring->head) + dist1 = ((ring->nitems + msg->elt_index) - head) % ring->nitems; + if (tail == head) dist2 = (ring->cursize == 0) ? 0 : ring->nitems; else - dist2 = ((ring->nitems + ring->tail) - ring->head) % ring->nitems; + dist2 = ((ring->nitems + tail) - head) % ring->nitems; return (dist1 < dist2); } int -svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t msg, int nowait) +svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait) +{ + ASSERT (svm_msq_q_msg_is_valid (mq, msg)); + return svm_queue_add (mq->q, (u8 *) msg, nowait); +} + +void +svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) { - ASSERT (svm_msq_q_msg_is_valid (mq, &msg)); - return svm_queue_add (mq->q, (u8 *) & msg, nowait); + ASSERT (svm_msq_q_msg_is_valid (mq, msg)); + svm_queue_add_raw (mq->q, (u8 *) msg); + svm_msg_q_unlock (mq); } int @@ -153,6 +230,44 @@ svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, return svm_queue_sub (mq->q, (u8 *) msg, cond, time); } +void +svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) +{ + svm_queue_sub_raw (mq->q, (u8 *) msg); +} + +void +svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd) +{ + mq->q->consumer_evtfd = fd; +} + +void +svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd) +{ + mq->q->producer_evtfd = fd; +} + +int +svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq) +{ + int fd; + if ((fd = eventfd (0, EFD_NONBLOCK)) < 0) + return -1; + svm_msg_q_set_consumer_eventfd (mq, fd); + return 0; +} + +int +svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq) +{ + int fd; + if ((fd = eventfd (0, EFD_NONBLOCK)) < 0) + return -1; + svm_msg_q_set_producer_eventfd (mq, fd); + return 0; +} + /* * fd.io coding-style-patch-verification: ON *