X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fsvm%2Fmessage_queue.c;h=d6a77e783e3f0d35aaf61010f31ec5b82e9dba27;hb=993683150202254c6ba8dd43e087a7229edd5d4c;hp=0f9be9c11b7344e5432e65c042cae64427cf0d30;hpb=e91bdb3783653b34c1bbf9738a739e646d02f533;p=vpp.git diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c index 0f9be9c11b7..d6a77e783e3 100644 --- a/src/svm/message_queue.c +++ b/src/svm/message_queue.c @@ -15,6 +15,7 @@ #include #include +#include static inline svm_msg_q_ring_t * svm_msg_q_ring_inline (svm_msg_q_t * mq, u32 ring_index) @@ -39,12 +40,12 @@ svm_msg_q_t * svm_msg_q_alloc (svm_msg_q_cfg_t * cfg) { svm_msg_q_ring_cfg_t *ring_cfg; + uword rings_sz = 0, mq_sz; svm_msg_q_ring_t *ring; u8 *base, *rings_ptr; - uword rings_sz = 0; vec_header_t *vh; + u32 vec_sz, q_sz; svm_msg_q_t *mq; - u32 vec_sz; int i; ASSERT (cfg); @@ -58,13 +59,17 @@ svm_msg_q_alloc (svm_msg_q_cfg_t * cfg) rings_sz += (uword) ring_cfg->nitems * ring_cfg->elsize; } - base = clib_mem_alloc_aligned (sizeof (svm_msg_q_t) + vec_sz + rings_sz, - CLIB_CACHE_LINE_BYTES); + q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t); + mq_sz = sizeof (svm_msg_q_t) + vec_sz + rings_sz + q_sz; + base = clib_mem_alloc_aligned (mq_sz, CLIB_CACHE_LINE_BYTES); if (!base) return 0; mq = (svm_msg_q_t *) base; - vh = (vec_header_t *) (base + sizeof (svm_msg_q_t)); + mq->q = svm_queue_init (base + sizeof (svm_msg_q_t), cfg->q_nitems, + sizeof (svm_msg_q_msg_t)); + mq->q->consumer_pid = cfg->consumer_pid; + vh = (vec_header_t *) ((u8 *) mq->q + q_sz); vh->len = cfg->n_rings; mq->rings = (svm_msg_q_ring_t *) (vh + 1); rings_ptr = (u8 *) mq->rings + vec_sz; @@ -73,6 +78,7 @@ svm_msg_q_alloc (svm_msg_q_cfg_t * cfg) ring = &mq->rings[i]; ring->elsize = cfg->ring_cfgs[i].elsize; ring->nitems = cfg->ring_cfgs[i].nitems; + ring->cursize = ring->head = ring->tail = 0; if (cfg->ring_cfgs[i].data) ring->data = cfg->ring_cfgs[i].data; else @@ -81,8 +87,6 @@ svm_msg_q_alloc (svm_msg_q_cfg_t * cfg) rings_ptr += (uword) ring->nitems * ring->elsize; } } - mq->q = svm_queue_init (cfg->q_nitems, sizeof (svm_msg_q_msg_t), - cfg->consumer_pid, 0); return mq; } @@ -97,10 +101,10 @@ svm_msg_q_free (svm_msg_q_t * mq) svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index) { - svm_msg_q_msg_t msg = {.as_u64 = ~0 }; + svm_msg_q_msg_t msg; svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index); - ASSERT (ring->cursize != ring->nitems); + ASSERT (ring->cursize < ring->nitems); msg.ring_index = ring - mq->rings; msg.elt_index = ring->tail; ring->tail = (ring->tail + 1) % ring->nitems; @@ -131,12 +135,9 @@ svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index, else { svm_msg_q_lock (mq); + while (svm_msg_q_ring_is_full (mq, ring_index)) + svm_msg_q_wait (mq); *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index); - while (svm_msg_q_msg_is_invalid (msg)) - { - svm_msg_q_wait (mq); - *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index); - } } return 0; } @@ -190,18 +191,20 @@ svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) static int svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) { + u32 dist1, dist2, tail, head; svm_msg_q_ring_t *ring; - u32 dist1, dist2; if (vec_len (mq->rings) <= msg->ring_index) return 0; ring = &mq->rings[msg->ring_index]; + tail = ring->tail; + head = ring->head; - dist1 = ((ring->nitems + msg->elt_index) - ring->head) % ring->nitems; - if (ring->tail == ring->head) + dist1 = ((ring->nitems + msg->elt_index) - head) % ring->nitems; + if (tail == head) dist2 = (ring->cursize == 0) ? 0 : ring->nitems; else - dist2 = ((ring->nitems + ring->tail) - ring->head) % ring->nitems; + dist2 = ((ring->nitems + tail) - head) % ring->nitems; return (dist1 < dist2); } @@ -233,6 +236,38 @@ svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) svm_queue_sub_raw (mq->q, (u8 *) msg); } +void +svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd) +{ + mq->q->consumer_evtfd = fd; +} + +void +svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd) +{ + mq->q->producer_evtfd = fd; +} + +int +svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq) +{ + int fd; + if ((fd = eventfd (0, EFD_NONBLOCK)) < 0) + return -1; + svm_msg_q_set_consumer_eventfd (mq, fd); + return 0; +} + +int +svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq) +{ + int fd; + if ((fd = eventfd (0, EFD_NONBLOCK)) < 0) + return -1; + svm_msg_q_set_producer_eventfd (mq, fd); + return 0; +} + /* * fd.io coding-style-patch-verification: ON *