svm: add support for eventfd signaling to queue 80/13880/8
authorFlorin Coras <fcoras@cisco.com>
Wed, 1 Aug 2018 14:53:18 +0000 (07:53 -0700)
committerDamjan Marion <dmarion@me.com>
Fri, 3 Aug 2018 09:11:41 +0000 (09:11 +0000)
Support the use of eventfds to signal queue updates between consumer
and producer pairs.

Change-Id: Idb6133be2b731fff78ed520daf9d2e0399642aab
Signed-off-by: Florin Coras <fcoras@cisco.com>
src/svm/message_queue.c
src/svm/queue.c
src/svm/queue.h
src/svm/test_svm_message_queue.c
src/vlibapi/api_common.h
src/vlibmemory/memory_client.c
src/vlibmemory/memory_shared.c

index 1b2d2e1..e97cab8 100644 (file)
@@ -39,12 +39,12 @@ svm_msg_q_t *
 svm_msg_q_alloc (svm_msg_q_cfg_t * cfg)
 {
   svm_msg_q_ring_cfg_t *ring_cfg;
+  uword rings_sz = 0, mq_sz;
   svm_msg_q_ring_t *ring;
   u8 *base, *rings_ptr;
-  uword rings_sz = 0;
   vec_header_t *vh;
+  u32 vec_sz, q_sz;
   svm_msg_q_t *mq;
-  u32 vec_sz;
   int i;
 
   ASSERT (cfg);
@@ -58,13 +58,17 @@ svm_msg_q_alloc (svm_msg_q_cfg_t * cfg)
       rings_sz += (uword) ring_cfg->nitems * ring_cfg->elsize;
     }
 
-  base = clib_mem_alloc_aligned (sizeof (svm_msg_q_t) + vec_sz + rings_sz,
-                                CLIB_CACHE_LINE_BYTES);
+  q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
+  mq_sz = sizeof (svm_msg_q_t) + vec_sz + rings_sz + q_sz;
+  base = clib_mem_alloc_aligned (mq_sz, CLIB_CACHE_LINE_BYTES);
   if (!base)
     return 0;
 
   mq = (svm_msg_q_t *) base;
-  vh = (vec_header_t *) (base + sizeof (svm_msg_q_t));
+  mq->q = svm_queue_init (base + sizeof (svm_msg_q_t), cfg->q_nitems,
+                         sizeof (svm_msg_q_msg_t));
+  mq->q->consumer_pid = cfg->consumer_pid;
+  vh = (vec_header_t *) ((u8 *) mq->q + q_sz);
   vh->len = cfg->n_rings;
   mq->rings = (svm_msg_q_ring_t *) (vh + 1);
   rings_ptr = (u8 *) mq->rings + vec_sz;
@@ -82,8 +86,6 @@ svm_msg_q_alloc (svm_msg_q_cfg_t * cfg)
          rings_ptr += (uword) ring->nitems * ring->elsize;
        }
     }
-  mq->q = svm_queue_init (cfg->q_nitems, sizeof (svm_msg_q_msg_t),
-                         cfg->consumer_pid, 0);
 
   return mq;
 }
index 8e18f58..0fa1fe9 100644 (file)
 #include <vppinfra/cache.h>
 #include <svm/queue.h>
 #include <vppinfra/time.h>
-#include <signal.h>
 
-/*
- * svm_queue_init
- *
- * nels = number of elements on the queue
- * elsize = element size, presumably 4 and cacheline-size will
- *          be popular choices.
- * pid   = consumer pid
- *
- * The idea is to call this function in the queue consumer,
- * and e-mail the queue pointer to the producer(s).
- *
- * The vpp process / main thread allocates one of these
- * at startup; its main input queue. The vpp main input queue
- * has a pointer to it in the shared memory segment header.
- *
- * You probably want to be on an svm data heap before calling this
- * function.
- */
 svm_queue_t *
-svm_queue_init (int nels,
-               int elsize, int consumer_pid, int signal_when_queue_non_empty)
+svm_queue_init (void *base, int nels, int elsize)
 {
   svm_queue_t *q;
   pthread_mutexattr_t attr;
   pthread_condattr_t cattr;
 
-  q = clib_mem_alloc_aligned (sizeof (svm_queue_t)
-                             + nels * elsize, CLIB_CACHE_LINE_BYTES);
+  q = (svm_queue_t *) base;
   memset (q, 0, sizeof (*q));
 
   q->elsize = elsize;
   q->maxsize = nels;
-  q->consumer_pid = consumer_pid;
-  q->signal_when_queue_non_empty = signal_when_queue_non_empty;
+  q->producer_evtfd = -1;
+  q->consumer_evtfd = -1;
 
   memset (&attr, 0, sizeof (attr));
   memset (&cattr, 0, sizeof (cattr));
@@ -88,6 +67,20 @@ svm_queue_init (int nels,
   return (q);
 }
 
+svm_queue_t *
+svm_queue_alloc_and_init (int nels, int elsize, int consumer_pid)
+{
+  svm_queue_t *q;
+
+  q = clib_mem_alloc_aligned (sizeof (svm_queue_t)
+                             + nels * elsize, CLIB_CACHE_LINE_BYTES);
+  memset (q, 0, sizeof (*q));
+  q = svm_queue_init (q, nels, elsize);
+  q->consumer_pid = consumer_pid;
+
+  return q;
+}
+
 /*
  * svm_queue_free
  */
@@ -117,6 +110,23 @@ svm_queue_is_full (svm_queue_t * q)
   return q->cursize == q->maxsize;
 }
 
+static inline void
+svm_queue_send_signal (svm_queue_t * q, u8 is_prod)
+{
+  if (q->producer_evtfd == -1)
+    {
+      (void) pthread_cond_broadcast (&q->condvar);
+    }
+  else
+    {
+      int __clib_unused rv, fd;
+      u64 data = 1;
+      ASSERT (q->consumer_evtfd != -1);
+      fd = is_prod ? q->producer_evtfd : q->consumer_evtfd;
+      rv = write (fd, &data, sizeof (data));
+    }
+}
+
 /*
  * svm_queue_add_nolock
  */
@@ -146,11 +156,7 @@ svm_queue_add_nolock (svm_queue_t * q, u8 * elem)
     q->tail = 0;
 
   if (need_broadcast)
-    {
-      (void) pthread_cond_broadcast (&q->condvar);
-      if (q->signal_when_queue_non_empty)
-       kill (q->consumer_pid, q->signal_when_queue_non_empty);
-    }
+    svm_queue_send_signal (q, 1);
   return 0;
 }
 
@@ -212,11 +218,8 @@ svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
     q->tail = 0;
 
   if (need_broadcast)
-    {
-      (void) pthread_cond_broadcast (&q->condvar);
-      if (q->signal_when_queue_non_empty)
-       kill (q->consumer_pid, q->signal_when_queue_non_empty);
-    }
+    svm_queue_send_signal (q, 1);
+
   pthread_mutex_unlock (&q->mutex);
 
   return 0;
@@ -276,11 +279,8 @@ svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
     q->tail = 0;
 
   if (need_broadcast)
-    {
-      (void) pthread_cond_broadcast (&q->condvar);
-      if (q->signal_when_queue_non_empty)
-       kill (q->consumer_pid, q->signal_when_queue_non_empty);
-    }
+    svm_queue_send_signal (q, 1);
+
   pthread_mutex_unlock (&q->mutex);
 
   return 0;
@@ -353,7 +353,7 @@ svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
     q->head = 0;
 
   if (need_broadcast)
-    (void) pthread_cond_broadcast (&q->condvar);
+    svm_queue_send_signal (q, 0);
 
   pthread_mutex_unlock (&q->mutex);
 
@@ -385,7 +385,7 @@ svm_queue_sub2 (svm_queue_t * q, u8 * elem)
   pthread_mutex_unlock (&q->mutex);
 
   if (need_broadcast)
-    (void) pthread_cond_broadcast (&q->condvar);
+    svm_queue_send_signal (q, 0);
 
   return 0;
 }
@@ -410,6 +410,18 @@ svm_queue_sub_raw (svm_queue_t * q, u8 * elem)
   return 0;
 }
 
+void
+svm_queue_set_producer_event_fd (svm_queue_t * q, int fd)
+{
+  q->producer_evtfd = fd;
+}
+
+void
+svm_queue_set_consumer_event_fd (svm_queue_t * q, int fd)
+{
+  q->consumer_evtfd = fd;
+}
+
 /*
  * fd.io coding-style-patch-verification: ON
  *
index 68a63d7..75e63a4 100644 (file)
@@ -32,32 +32,43 @@ typedef struct _svm_queue
   int maxsize;
   int elsize;
   int consumer_pid;
-  int signal_when_queue_non_empty;
+  int producer_evtfd;
+  int consumer_evtfd;
   char data[0];
 } svm_queue_t;
 
 typedef enum
 {
-  /**
-   * blocking call
-   */
-  SVM_Q_WAIT = 0,
-
-  /**
-   * non-blocking call
-   */
-  SVM_Q_NOWAIT,
-
-  /**
-   * blocking call, return on signal or time-out
-   */
-  SVM_Q_TIMEDWAIT,
+  SVM_Q_WAIT = 0,      /**< blocking call - must be used only in combination
+                            with condvars */
+  SVM_Q_NOWAIT,                /**< non-blocking call - works with both condvar and
+                            eventfd signaling */
+  SVM_Q_TIMEDWAIT,     /**< blocking call, returns on signal or time-out -
+                            must be used only in combination with condvars */
 } svm_q_conditional_wait_t;
 
-svm_queue_t *svm_queue_init (int nels,
-                            int elsize,
-                            int consumer_pid,
-                            int signal_when_queue_non_empty);
+/**
+ * Allocate and initialize svm queue
+ *
+ * @param nels         number of elements on the queue
+ * @param elsize       element size, presumably 4 and cacheline-size will
+ *                     be popular choices.
+ * @param pid          consumer pid
+ * @return             a newly initialized svm queue
+ *
+ * The idea is to call this function in the queue consumer,
+ * and e-mail the queue pointer to the producer(s).
+ *
+ * The vpp process / main thread allocates one of these
+ * at startup; its main input queue. The vpp main input queue
+ * has a pointer to it in the shared memory segment header.
+ *
+ * You probably want to be on an svm data heap before calling this
+ * function.
+ */
+svm_queue_t *svm_queue_alloc_and_init (int nels, int elsize,
+                                      int consumer_pid);
+svm_queue_t *svm_queue_init (void *base, int nels, int elsize);
 void svm_queue_free (svm_queue_t * q);
 int svm_queue_add (svm_queue_t * q, u8 * elem, int nowait);
 int svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait);
@@ -77,6 +88,25 @@ int svm_queue_sub_raw (svm_queue_t * q, u8 * elem);
  */
 void svm_queue_add_raw (svm_queue_t * q, u8 * elem);
 
+/**
+ * Set producer's event fd
+ *
+ * When the producer must generate an event it writes 1 to the provided fd.
+ * Once this is set, condvars are not used anymore for signaling.
+ */
+void svm_queue_set_producer_event_fd (svm_queue_t * q, int fd);
+
+/**
+ * Set consumer's event fd
+ *
+ * When the consumer must generate an event it writes 1 to the provided fd.
+ * Although in practice the two fds point to the same underlying file
+ * description, because the producer and consumer are different processes
+ * the descriptors will be different. It's the caller's responsibility to
+ * ensure the file descriptors are properly exchanged between the two peers.
+ */
+void svm_queue_set_consumer_event_fd (svm_queue_t * q, int fd);
+
 /*
  * DEPRECATED please use svm_queue_t instead
  */
index 758163f..9441c59 100644 (file)
@@ -169,6 +169,7 @@ main (int argc, char *argv[])
   unformat_input_t i;
   int r;
 
+  clib_mem_init_thread_safe (0, 256 << 20);
   unformat_init_command_line (&i, argv);
   r = test_svm_message_queue (&i);
   unformat_free (&i);
index 73c5a50..9432082 100644 (file)
@@ -314,9 +314,6 @@ typedef struct
    */
   vl_api_registration_t *my_registration;
 
-  /** (Historical) signal-based queue non-empty signal, to be removed */
-  i32 vlib_signal;
-
   /** vpp/vlib input queue length */
   u32 vlib_input_queue_length;
 
index 55ef001..07eb83e 100644 (file)
@@ -199,8 +199,8 @@ vl_client_connect (const char *name, int ctx_quota, int input_queue_size)
 
   pthread_mutex_lock (&svm->mutex);
   oldheap = svm_push_data_heap (svm);
-  vl_input_queue = svm_queue_init (input_queue_size, sizeof (uword),
-                                  getpid (), 0);
+  vl_input_queue = svm_queue_alloc_and_init (input_queue_size, sizeof (uword),
+                                            getpid ());
   svm_pop_heap (oldheap);
   pthread_mutex_unlock (&svm->mutex);
 
index c8c071f..084d0b7 100644 (file)
@@ -393,13 +393,13 @@ vl_api_default_mem_config (vl_shmem_hdr_t * shmem_hdr)
     vlib_input_queue_length = am->vlib_input_queue_length;
 
   shmem_hdr->vl_input_queue =
-    svm_queue_init (vlib_input_queue_length, sizeof (uword),
-                   getpid (), am->vlib_signal);
+    svm_queue_alloc_and_init (vlib_input_queue_length, sizeof (uword),
+                             getpid ());
 
 #define _(sz,n)                                                 \
     do {                                                        \
         ring_alloc_t _rp;                                       \
-        _rp.rp = svm_queue_init ((n), (sz), 0, 0); \
+        _rp.rp = svm_queue_alloc_and_init ((n), (sz), 0);      \
         _rp.size = (sz);                                        \
         _rp.nitems = n;                                         \
         _rp.hits = 0;                                           \
@@ -413,7 +413,7 @@ vl_api_default_mem_config (vl_shmem_hdr_t * shmem_hdr)
 #define _(sz,n)                                                 \
     do {                                                        \
         ring_alloc_t _rp;                                       \
-        _rp.rp = svm_queue_init ((n), (sz), 0, 0); \
+        _rp.rp = svm_queue_alloc_and_init ((n), (sz), 0);      \
         _rp.size = (sz);                                        \
         _rp.nitems = n;                                         \
         _rp.hits = 0;                                           \
@@ -428,7 +428,6 @@ vl_api_default_mem_config (vl_shmem_hdr_t * shmem_hdr)
 void
 vl_api_mem_config (vl_shmem_hdr_t * hdr, vl_api_shm_elem_config_t * config)
 {
-  api_main_t *am = &api_main;
   vl_api_shm_elem_config_t *c;
   ring_alloc_t *rp;
   u32 size;
@@ -444,9 +443,8 @@ vl_api_mem_config (vl_shmem_hdr_t * hdr, vl_api_shm_elem_config_t * config)
     switch (c->type)
       {
       case VL_API_QUEUE:
-       hdr->vl_input_queue = svm_queue_init (c->count,
-                                             c->size,
-                                             getpid (), am->vlib_signal);
+       hdr->vl_input_queue = svm_queue_alloc_and_init (c->count, c->size,
+                                                       getpid ());
        continue;
       case VL_API_VLIB_RING:
        vec_add2 (hdr->vl_rings, rp, 1);
@@ -460,7 +458,7 @@ vl_api_mem_config (vl_shmem_hdr_t * hdr, vl_api_shm_elem_config_t * config)
       }
 
     size = sizeof (ring_alloc_t) + c->size;
-    rp->rp = svm_queue_init (c->count, size, 0, 0);
+    rp->rp = svm_queue_alloc_and_init (c->count, size, 0);
     rp->size = size;
     rp->nitems = c->count;
     rp->hits = 0;