From 65784c1602c7c8171effd00384f65f546d93a13b Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Wed, 4 Jul 2018 04:17:41 -0700 Subject: [PATCH] svm: add unidirectional message queue Meant for single reader/writer message exchanges. Supports multiple message rings. Change-Id: I925de9a6ae19226c5c39a63caff76424ed123a13 Signed-off-by: Florin Coras --- src/svm.am | 19 +++-- src/svm/message_queue.c | 159 +++++++++++++++++++++++++++++++++++ src/svm/message_queue.h | 154 ++++++++++++++++++++++++++++++++++ src/svm/ssvm.c | 10 ++- src/svm/test_svm_message_queue.c | 176 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 510 insertions(+), 8 deletions(-) create mode 100644 src/svm/message_queue.c create mode 100644 src/svm/message_queue.h create mode 100644 src/svm/test_svm_message_queue.c diff --git a/src/svm.am b/src/svm.am index c29072e4f52..7413e672bce 100644 --- a/src/svm.am +++ b/src/svm.am @@ -13,13 +13,14 @@ bin_PROGRAMS += svmtool svmdbtool -nobase_include_HEADERS += \ +nobase_include_HEADERS += \ svm/ssvm.h \ svm/svm_common.h \ svm/svmdb.h \ svm/svm_fifo.h \ - svm/svm_fifo_segment.h \ + svm/svm_fifo_segment.h \ svm/queue.h \ + svm/message_queue.h \ svm/svm.h lib_LTLIBRARIES += libsvm.la libsvmdb.la @@ -28,8 +29,9 @@ libsvm_la_SOURCES = \ svm/svm.c \ svm/ssvm.c \ svm/svm_fifo.c \ - svm/svm_fifo_segment.c \ - svm/queue.c + svm/svm_fifo_segment.c \ + svm/queue.c \ + svm/message_queue.c libsvm_la_LIBADD = libvppinfra.la -lrt -lpthread libsvm_la_DEPENDENCIES = libvppinfra.la @@ -44,9 +46,16 @@ libsvmdb_la_SOURCES = svm/svmdb.c svmdbtool_SOURCES = svm/svmdbtool.c svmdbtool_LDADD = libsvmdb.la libsvm.la libvppinfra.la -lpthread -lrt -noinst_PROGRAMS += test_svm_fifo1 +noinst_PROGRAMS += \ + test_svm_fifo1 \ + test_svm_message_queue + test_svm_fifo1_SOURCES = svm/test_svm_fifo1.c test_svm_fifo1_LDADD = libsvm.la libvppinfra.la -lpthread -lrt test_svm_fifo1_LDFLAGS = -static +test_svm_message_queue_SOURCES = svm/test_svm_message_queue.c +test_svm_message_queue_LDADD = libsvm.la libvppinfra.la -lpthread -lrt +test_svm_message_queue_LDFLAGS = -static + # vi:syntax=automake diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c new file mode 100644 index 00000000000..dc0f6251120 --- /dev/null +++ b/src/svm/message_queue.c @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2018 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +svm_msg_q_t * +svm_msg_q_alloc (svm_msg_q_cfg_t * cfg) +{ + svm_msg_q_ring_t *ring; + svm_msg_q_t *mq; + int i; + + if (!cfg) + return 0; + + mq = clib_mem_alloc_aligned (sizeof (svm_msg_q_t), CLIB_CACHE_LINE_BYTES); + memset (mq, 0, sizeof (*mq)); + mq->q = svm_queue_init (cfg->q_nitems, sizeof (svm_msg_q_msg_t), + cfg->consumer_pid, 0); + vec_validate (mq->rings, cfg->n_rings - 1); + for (i = 0; i < cfg->n_rings; i++) + { + ring = &mq->rings[i]; + ring->elsize = cfg->ring_cfgs[i].elsize; + ring->nitems = cfg->ring_cfgs[i].nitems; + if (cfg->ring_cfgs[i].data) + ring->data = cfg->ring_cfgs[i].data; + else + ring->data = clib_mem_alloc_aligned (ring->nitems * ring->elsize, + CLIB_CACHE_LINE_BYTES); + } + + return mq; +} + +void +svm_msg_q_free (svm_msg_q_t * mq) +{ + svm_msg_q_ring_t *ring; + + vec_foreach (ring, mq->rings) + { + clib_mem_free (ring->data); + } + vec_free (mq->rings); + clib_mem_free (mq); +} + +svm_msg_q_msg_t +svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes) +{ + svm_msg_q_msg_t msg = {.as_u64 = ~0 }; + svm_msg_q_ring_t *ring; + + vec_foreach (ring, mq->rings) + { + if (ring->elsize < nbytes || ring->cursize == ring->nitems) + continue; + msg.ring_index = ring - mq->rings; + msg.elt_index = ring->tail; + ring->tail = (ring->tail + 1) % ring->nitems; + __sync_fetch_and_add (&ring->cursize, 1); + break; + } + return msg; +} + +static inline svm_msg_q_ring_t * +svm_msg_q_get_ring (svm_msg_q_t * mq, u32 ring_index) +{ + return vec_elt_at_index (mq->rings, ring_index); +} + +static inline void * +svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index) +{ + ASSERT (elt_index < ring->nitems); + return (ring->data + elt_index * ring->elsize); +} + +void * +svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) +{ + svm_msg_q_ring_t *ring = svm_msg_q_get_ring (mq, msg->ring_index); + return svm_msg_q_ring_data (ring, msg->elt_index); +} + +void +svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) +{ + svm_msg_q_ring_t *ring; + + if (vec_len (mq->rings) <= msg->ring_index) + return; + ring = &mq->rings[msg->ring_index]; + if (msg->elt_index == ring->head) + { + ring->head = (ring->head + 1) % ring->nitems; + } + else + { + /* for now, expect messages to be processed in order */ + ASSERT (0); + } + __sync_fetch_and_sub (&ring->cursize, 1); +} + +static int +svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg) +{ + svm_msg_q_ring_t *ring; + u32 dist1, dist2; + + if (vec_len (mq->rings) <= msg->ring_index) + return 0; + ring = &mq->rings[msg->ring_index]; + + dist1 = ((ring->nitems + msg->ring_index) - ring->head) % ring->nitems; + if (ring->tail == ring->head) + dist2 = (ring->cursize == 0) ? 0 : ring->nitems; + else + dist2 = ((ring->nitems + ring->tail) - ring->head) % ring->nitems; + return (dist1 < dist2); +} + +int +svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t msg, int nowait) +{ + ASSERT (svm_msq_q_msg_is_valid (mq, &msg)); + return svm_queue_add (mq->q, (u8 *) & msg, nowait); +} + +int +svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, + svm_q_conditional_wait_t cond, u32 time) +{ + return svm_queue_sub (mq->q, (u8 *) msg, cond, time); +} + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ diff --git a/src/svm/message_queue.h b/src/svm/message_queue.h new file mode 100644 index 00000000000..5ec8547016e --- /dev/null +++ b/src/svm/message_queue.h @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2018 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * @file + * @brief Unidirectional shared-memory multi-ring message queue + */ + +#ifndef SRC_SVM_MESSAGE_QUEUE_H_ +#define SRC_SVM_MESSAGE_QUEUE_H_ + +#include +#include + +typedef struct svm_msg_q_ring_ +{ + volatile u32 cursize; /**< current size of the ring */ + u32 nitems; /**< max size of the ring */ + u32 head; /**< current head (for dequeue) */ + u32 tail; /**< current tail (for enqueue) */ + u32 elsize; /**< size of an element */ + u8 *data; /**< chunk of memory for msg data */ +} svm_msg_q_ring_t; + +typedef struct svm_msg_q_ +{ + svm_queue_t *q; /**< queue for exchanging messages */ + svm_msg_q_ring_t *rings; /**< rings with message data*/ +} svm_msg_q_t; + +typedef struct svm_msg_q_ring_cfg_ +{ + u32 nitems; + u32 elsize; + void *data; +} svm_msg_q_ring_cfg_t; + +typedef struct svm_msg_q_cfg_ +{ + int consumer_pid; /**< pid of msg consumer */ + u32 q_nitems; /**< msg queue size (not rings) */ + u32 n_rings; /**< number of msg rings */ + svm_msg_q_ring_cfg_t *ring_cfgs; /**< array of ring cfgs */ +} svm_msg_q_cfg_t; + +typedef union +{ + struct + { + u32 ring_index; /**< ring index, could be u8 */ + u32 elt_index; /**< index in ring */ + }; + u64 as_u64; +} svm_msg_q_msg_t; + +/** + * Allocate message queue + * + * Allocates a message queue on the heap. Based on the configuration options, + * apart from the message queue this also allocates (one or multiple) + * shared-memory rings for the messages. + * + * @param cfg configuration options: queue len, consumer pid, + * ring configs + * @return message queue + */ +svm_msg_q_t *svm_msg_q_alloc (svm_msg_q_cfg_t * cfg); + +/** + * Free message queue + * + * @param mq message queue to be freed + */ +void svm_msg_q_free (svm_msg_q_t * mq); + +/** + * Allocate message buffer + * + * Message is allocated on the first available ring capable of holding + * the requested number of bytes. + * + * @param mq message queue + * @param nbytes number of bytes needed for message + * @return message structure pointing to the ring and position + * allocated + */ +svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes); + +/** + * Free message buffer + * + * Marks message buffer on ring as free. + * + * @param mq message queue + * @param msg message to be freed + */ +void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg); +/** + * Producer enqueue one message to queue + * + * Prior to calling this, the producer should've obtained a message buffer + * from one of the rings by calling @ref svm_msg_q_alloc_msg. + * + * @param mq message queue + * @param msg message (pointer to ring position) to be enqueued + * @param nowait flag to indicate if request is blocking or not + * @return success status + */ +int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t msg, int nowait); + +/** + * Consumer dequeue one message from queue + * + * This returns the message pointing to the data in the message rings. + * The consumer is expected to call @ref svm_msg_q_free_msg once it + * finishes processing/copies the message data. + * + * @param mq message queue + * @param msg pointer to structure where message is to be received + * @param cond flag that indicates if request should block or not + * @return success status + */ +int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, + svm_q_conditional_wait_t cond, u32 time); + +/** + * Get data for message in queu + * + * @param mq message queue + * @param msg message for which the data is requested + * @return pointer to data + */ +void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg); + +#endif /* SRC_SVM_MESSAGE_QUEUE_H_ */ + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ diff --git a/src/svm/ssvm.c b/src/svm/ssvm.c index 04e0efa3d62..dfb68c039ad 100644 --- a/src/svm/ssvm.c +++ b/src/svm/ssvm.c @@ -29,7 +29,6 @@ int ssvm_master_init_shm (ssvm_private_t * ssvm) { int ssvm_fd, mh_flags = MHEAP_FLAG_DISABLE_VM | MHEAP_FLAG_THREAD_SAFE; - svm_main_region_t *smr = svm_get_root_rp ()->data_base; clib_mem_vm_map_t mapa = { 0 }; u8 junk = 0, *ssvm_filename; ssvm_shared_header_t *sh; @@ -56,8 +55,13 @@ ssvm_master_init_shm (ssvm_private_t * ssvm) if (fchmod (ssvm_fd, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP) < 0) clib_unix_warning ("ssvm segment chmod"); - if (fchown (ssvm_fd, smr->uid, smr->gid) < 0) - clib_unix_warning ("ssvm segment chown"); + if (svm_get_root_rp ()) + { + /* TODO: is this really needed? */ + svm_main_region_t *smr = svm_get_root_rp ()->data_base; + if (fchown (ssvm_fd, smr->uid, smr->gid) < 0) + clib_unix_warning ("ssvm segment chown"); + } if (lseek (ssvm_fd, ssvm->ssvm_size, SEEK_SET) < 0) { diff --git a/src/svm/test_svm_message_queue.c b/src/svm/test_svm_message_queue.c new file mode 100644 index 00000000000..69ffd131ac2 --- /dev/null +++ b/src/svm/test_svm_message_queue.c @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2018 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#define test1_error(_fmt, _args...) \ +{ \ + ssvm_pop_heap (oldheap); \ + error = clib_error_return (0, _fmt, ##_args); \ + goto done; \ +} + +clib_error_t * +test1 (int verbose) +{ + ssvm_private_t _ssvm, *ssvm = &_ssvm; + svm_msg_q_cfg_t _cfg, *cfg = &_cfg; + svm_msg_q_msg_t msg1, msg2, msg[12]; + ssvm_shared_header_t *sh; + clib_error_t *error = 0; + svm_msg_q_t *mq; + void *oldheap; + int i; + + memset (ssvm, 0, sizeof (*ssvm)); + + ssvm->ssvm_size = 1 << 20; + ssvm->i_am_master = 1; + ssvm->my_pid = getpid (); + ssvm->name = format (0, "%s%c", "test", 0); + ssvm->requested_va = 0; + + if (ssvm_master_init (ssvm, SSVM_SEGMENT_SHM)) + return clib_error_return (0, "failed: segment allocation"); + sh = ssvm->sh; + + svm_msg_q_ring_cfg_t rc[2]= {{8, 8, 0}, {8, 16, 0}}; + cfg->consumer_pid = ~0; + cfg->n_rings = 2; + cfg->q_nitems = 16; + cfg->ring_cfgs = rc; + + oldheap = ssvm_push_heap (sh); + mq = svm_msg_q_alloc (cfg); + if (!mq) + test1_error ("failed: alloc"); + + if (vec_len (mq->rings) != 2) + test1_error ("failed: ring allocation"); + + msg1 = svm_msg_q_alloc_msg (mq, 8); + if (mq->rings[0].cursize != 1 + || msg1.ring_index != 0 + || msg1.elt_index != 0) + test1_error ("failed: msg alloc1"); + + msg2 = svm_msg_q_alloc_msg (mq, 15); + if (mq->rings[1].cursize != 1 + || msg2.ring_index != 1 + || msg2.elt_index != 0) + test1_error ("failed: msg alloc2"); + + svm_msg_q_free_msg (mq, &msg1); + if (mq->rings[0].cursize != 0) + test1_error("failed: free msg"); + + for (i = 0; i < 12; i++) + { + msg[i] = svm_msg_q_alloc_msg (mq, 7); + *(u32 *)svm_msg_q_msg_data (mq, &msg[i]) = i; + } + + if (mq->rings[0].cursize != 8 + || mq->rings[1].cursize != 5) + test1_error ("failed: msg alloc3"); + + *(u32 *)svm_msg_q_msg_data (mq, &msg2) = 123; + svm_msg_q_add (mq, msg2, SVM_Q_NOWAIT); + for (i = 0; i < 12; i++) + svm_msg_q_add (mq, msg[i], SVM_Q_NOWAIT); + + if (svm_msg_q_sub (mq, &msg2, SVM_Q_NOWAIT, 0)) + test1_error ("failed: dequeue1"); + + if (msg2.ring_index != 1 || msg2.elt_index != 0) + test1_error ("failed: dequeue1 result"); + if (*(u32 *)svm_msg_q_msg_data (mq, &msg2) != 123) + test1_error ("failed: dequeue1 wrong data"); + + svm_msg_q_free_msg (mq, &msg2); + + for (i = 0; i < 12; i++) + { + if (svm_msg_q_sub (mq, &msg[i], SVM_Q_NOWAIT, 0)) + test1_error ("failed: dequeue2"); + if (i < 8) + { + if (msg[i].ring_index != 0 || msg[i].elt_index != (i + 1) % 8) + test1_error ("failed: dequeue2 result2"); + } + else + { + if (msg[i].ring_index != 1 || msg[i].elt_index != (i - 8) + 1) + test1_error ("failed: dequeue2 result3"); + } + if (*(u32 *)svm_msg_q_msg_data (mq, &msg[i]) != i) + test1_error ("failed: dequeue2 wrong data"); + svm_msg_q_free_msg (mq, &msg[i]); + } + if (mq->rings[0].cursize != 0 || mq->rings[1].cursize != 0) + test1_error ("failed: post dequeue"); + + ssvm_pop_heap (oldheap); + +done: + ssvm_delete (ssvm); + return error; +} + +int +test_svm_message_queue (unformat_input_t * input) +{ + clib_error_t *error = 0; + int verbose = 0; + int test_id = 0; + + while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT) + { + if (unformat (input, "test1")) + test_id = 1; + else + { + error = clib_error_create ("unknown input `%U'\n", + format_unformat_error, input); + goto out; + } + } + + switch (test_id) + { + case 1: + error = test1 (verbose); + } +out: + if (error) + clib_error_report (error); + else + clib_warning ("success"); + + return 0; +} + +int +main (int argc, char *argv[]) +{ + unformat_input_t i; + int r; + + unformat_init_command_line (&i, argv); + r = test_svm_message_queue (&i); + unformat_free (&i); + return r; +} -- 2.16.6