X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fsvm%2Fmessage_queue.c;h=1b2d2e148ed11d25144e4fd2760a278af0a60574;hb=54693d23307ce8944a4d97379efd3bd4dcf0485c;hp=4f3e7642740bc08ef0dbbfd9f04d03fb4be60715;hpb=95e0ce05470554e403ade1db322800e489165b1b;p=vpp.git diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c index 4f3e7642740..1b2d2e148ed 100644 --- a/src/svm/message_queue.c +++ b/src/svm/message_queue.c @@ -16,35 +16,74 @@ #include #include +static inline svm_msg_q_ring_t * +svm_msg_q_ring_inline (svm_msg_q_t * mq, u32 ring_index) +{ + return vec_elt_at_index (mq->rings, ring_index); +} + +svm_msg_q_ring_t * +svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index) +{ + return svm_msg_q_ring_inline (mq, ring_index); +} + +static inline void * +svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index) +{ + ASSERT (elt_index < ring->nitems); + return (ring->data + elt_index * ring->elsize); +} + svm_msg_q_t * svm_msg_q_alloc (svm_msg_q_cfg_t * cfg) { + svm_msg_q_ring_cfg_t *ring_cfg; svm_msg_q_ring_t *ring; + u8 *base, *rings_ptr; + uword rings_sz = 0; + vec_header_t *vh; svm_msg_q_t *mq; - uword size; + u32 vec_sz; int i; - if (!cfg) + ASSERT (cfg); + + vec_sz = vec_header_bytes (0) + sizeof (svm_msg_q_ring_t) * cfg->n_rings; + for (i = 0; i < cfg->n_rings; i++) + { + if (cfg->ring_cfgs[i].data) + continue; + ring_cfg = &cfg->ring_cfgs[i]; + rings_sz += (uword) ring_cfg->nitems * ring_cfg->elsize; + } + + base = clib_mem_alloc_aligned (sizeof (svm_msg_q_t) + vec_sz + rings_sz, + CLIB_CACHE_LINE_BYTES); + if (!base) return 0; - mq = clib_mem_alloc_aligned (sizeof (svm_msg_q_t), CLIB_CACHE_LINE_BYTES); - memset (mq, 0, sizeof (*mq)); - mq->q = svm_queue_init (cfg->q_nitems, sizeof (svm_msg_q_msg_t), - cfg->consumer_pid, 0); - vec_validate (mq->rings, cfg->n_rings - 1); + mq = (svm_msg_q_t *) base; + vh = (vec_header_t *) (base + sizeof (svm_msg_q_t)); + vh->len = cfg->n_rings; + mq->rings = (svm_msg_q_ring_t *) (vh + 1); + rings_ptr = (u8 *) mq->rings + vec_sz; for (i = 0; i < cfg->n_rings; i++) { ring = &mq->rings[i]; ring->elsize = cfg->ring_cfgs[i].elsize; ring->nitems = cfg->ring_cfgs[i].nitems; + ring->cursize = ring->head = ring->tail = 0; if (cfg->ring_cfgs[i].data) ring->data = cfg->ring_cfgs[i].data; else { - size = (uword) ring->nitems * ring->elsize; - ring->data = clib_mem_alloc_aligned (size, CLIB_CACHE_LINE_BYTES); + ring->data = rings_ptr; + rings_ptr += (uword) ring->nitems * ring->elsize; } } + mq->q = svm_queue_init (cfg->q_nitems, sizeof (svm_msg_q_msg_t), + cfg->consumer_pid, 0); return mq; } @@ -52,16 +91,54 @@ svm_msg_q_alloc (svm_msg_q_cfg_t * cfg) void svm_msg_q_free (svm_msg_q_t * mq) { - svm_msg_q_ring_t *ring; - - vec_foreach (ring, mq->rings) - { - clib_mem_free (ring->data); - } - vec_free (mq->rings); + svm_queue_free (mq->q); clib_mem_free (mq); } +svm_msg_q_msg_t +svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index) +{ + svm_msg_q_msg_t msg; + svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index); + + ASSERT (ring->cursize < ring->nitems); + msg.ring_index = ring - mq->rings; + msg.elt_index = ring->tail; + ring->tail = (ring->tail + 1) % ring->nitems; + __sync_fetch_and_add (&ring->cursize, 1); + return msg; +} + +int +svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index, + u8 noblock, svm_msg_q_msg_t * msg) +{ + if (noblock) + { + if (svm_msg_q_try_lock (mq)) + return -1; + if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, ring_index))) + { + svm_msg_q_unlock (mq); + return -2; + } + *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index); + if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (msg))) + { + svm_msg_q_unlock (mq); + return -2; + } + } + else + { + svm_msg_q_lock (mq); + while (svm_msg_q_ring_is_full (mq, ring_index)) + svm_msg_q_wait (mq); + *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index); + } + return 0; +} + svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes) { @@ -81,23 +158,10 @@ svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes) return msg; } -static inline svm_msg_q_ring_t * -svm_msg_q_get_ring (svm_msg_q_t * mq, u32 ring_index) -{ - return vec_elt_at_index (mq->rings, ring_index); -} - -static inline void * -svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index) -{ - ASSERT (elt_index < ring->nitems); - return (ring->data + elt_index * ring->elsize); -} - void * svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) { - svm_msg_q_ring_t *ring = svm_msg_q_get_ring (mq, msg->ring_index); + svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, msg->ring_index); return svm_msg_q_ring_data (ring, msg->elt_index); } @@ -124,26 +188,36 @@ svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) static int svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) { + u32 dist1, dist2, tail, head; svm_msg_q_ring_t *ring; - u32 dist1, dist2; if (vec_len (mq->rings) <= msg->ring_index) return 0; ring = &mq->rings[msg->ring_index]; + tail = ring->tail; + head = ring->head; - dist1 = ((ring->nitems + msg->ring_index) - ring->head) % ring->nitems; - if (ring->tail == ring->head) + dist1 = ((ring->nitems + msg->elt_index) - head) % ring->nitems; + if (tail == head) dist2 = (ring->cursize == 0) ? 0 : ring->nitems; else - dist2 = ((ring->nitems + ring->tail) - ring->head) % ring->nitems; + dist2 = ((ring->nitems + tail) - head) % ring->nitems; return (dist1 < dist2); } int -svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t msg, int nowait) +svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait) { - ASSERT (svm_msq_q_msg_is_valid (mq, &msg)); - return svm_queue_add (mq->q, (u8 *) & msg, nowait); + ASSERT (svm_msq_q_msg_is_valid (mq, msg)); + return svm_queue_add (mq->q, (u8 *) msg, nowait); +} + +void +svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) +{ + ASSERT (svm_msq_q_msg_is_valid (mq, msg)); + svm_queue_add_raw (mq->q, (u8 *) msg); + svm_msg_q_unlock (mq); } int @@ -153,6 +227,12 @@ svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, return svm_queue_sub (mq->q, (u8 *) msg, cond, time); } +void +svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) +{ + svm_queue_sub_raw (mq->q, (u8 *) msg); +} + /* * fd.io coding-style-patch-verification: ON *