X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fsvm%2Fmessage_queue.c;h=1b2d2e148ed11d25144e4fd2760a278af0a60574;hb=54693d23307ce8944a4d97379efd3bd4dcf0485c;hp=89411143c1243eb1e9d1dff788847722d4842c29;hpb=3c2fed5145d9e40a9ecd178c2866c813eddc6203;p=vpp.git diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c index 89411143c12..1b2d2e148ed 100644 --- a/src/svm/message_queue.c +++ b/src/svm/message_queue.c @@ -38,32 +38,52 @@ svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index) svm_msg_q_t * svm_msg_q_alloc (svm_msg_q_cfg_t * cfg) { + svm_msg_q_ring_cfg_t *ring_cfg; svm_msg_q_ring_t *ring; + u8 *base, *rings_ptr; + uword rings_sz = 0; + vec_header_t *vh; svm_msg_q_t *mq; - uword size; + u32 vec_sz; int i; - if (!cfg) + ASSERT (cfg); + + vec_sz = vec_header_bytes (0) + sizeof (svm_msg_q_ring_t) * cfg->n_rings; + for (i = 0; i < cfg->n_rings; i++) + { + if (cfg->ring_cfgs[i].data) + continue; + ring_cfg = &cfg->ring_cfgs[i]; + rings_sz += (uword) ring_cfg->nitems * ring_cfg->elsize; + } + + base = clib_mem_alloc_aligned (sizeof (svm_msg_q_t) + vec_sz + rings_sz, + CLIB_CACHE_LINE_BYTES); + if (!base) return 0; - mq = clib_mem_alloc_aligned (sizeof (svm_msg_q_t), CLIB_CACHE_LINE_BYTES); - memset (mq, 0, sizeof (*mq)); - mq->q = svm_queue_init (cfg->q_nitems, sizeof (svm_msg_q_msg_t), - cfg->consumer_pid, 0); - vec_validate (mq->rings, cfg->n_rings - 1); + mq = (svm_msg_q_t *) base; + vh = (vec_header_t *) (base + sizeof (svm_msg_q_t)); + vh->len = cfg->n_rings; + mq->rings = (svm_msg_q_ring_t *) (vh + 1); + rings_ptr = (u8 *) mq->rings + vec_sz; for (i = 0; i < cfg->n_rings; i++) { ring = &mq->rings[i]; ring->elsize = cfg->ring_cfgs[i].elsize; ring->nitems = cfg->ring_cfgs[i].nitems; + ring->cursize = ring->head = ring->tail = 0; if (cfg->ring_cfgs[i].data) ring->data = cfg->ring_cfgs[i].data; else { - size = (uword) ring->nitems * ring->elsize; - ring->data = clib_mem_alloc_aligned (size, CLIB_CACHE_LINE_BYTES); + ring->data = rings_ptr; + rings_ptr += (uword) ring->nitems * ring->elsize; } } + mq->q = svm_queue_init (cfg->q_nitems, sizeof (svm_msg_q_msg_t), + cfg->consumer_pid, 0); return mq; } @@ -71,23 +91,17 @@ svm_msg_q_alloc (svm_msg_q_cfg_t * cfg) void svm_msg_q_free (svm_msg_q_t * mq) { - svm_msg_q_ring_t *ring; - - vec_foreach (ring, mq->rings) - { - clib_mem_free (ring->data); - } - vec_free (mq->rings); + svm_queue_free (mq->q); clib_mem_free (mq); } svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index) { - svm_msg_q_msg_t msg = {.as_u64 = ~0 }; + svm_msg_q_msg_t msg; svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index); - ASSERT (ring->cursize != ring->nitems); + ASSERT (ring->cursize < ring->nitems); msg.ring_index = ring - mq->rings; msg.elt_index = ring->tail; ring->tail = (ring->tail + 1) % ring->nitems; @@ -118,12 +132,9 @@ svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index, else { svm_msg_q_lock (mq); + while (svm_msg_q_ring_is_full (mq, ring_index)) + svm_msg_q_wait (mq); *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index); - while (svm_msg_q_msg_is_invalid (msg)) - { - svm_msg_q_wait (mq); - *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index); - } } return 0; } @@ -177,18 +188,20 @@ svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) static int svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) { + u32 dist1, dist2, tail, head; svm_msg_q_ring_t *ring; - u32 dist1, dist2; if (vec_len (mq->rings) <= msg->ring_index) return 0; ring = &mq->rings[msg->ring_index]; + tail = ring->tail; + head = ring->head; - dist1 = ((ring->nitems + msg->elt_index) - ring->head) % ring->nitems; - if (ring->tail == ring->head) + dist1 = ((ring->nitems + msg->elt_index) - head) % ring->nitems; + if (tail == head) dist2 = (ring->cursize == 0) ? 0 : ring->nitems; else - dist2 = ((ring->nitems + ring->tail) - ring->head) % ring->nitems; + dist2 = ((ring->nitems + tail) - head) % ring->nitems; return (dist1 < dist2); } @@ -200,10 +213,11 @@ svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait) } void -svm_msg_q_add_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) +svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) { ASSERT (svm_msq_q_msg_is_valid (mq, msg)); svm_queue_add_raw (mq->q, (u8 *) msg); + svm_msg_q_unlock (mq); } int