svm: allow mq attachments at random offsets 90/30390/53
authorFlorin Coras <fcoras@cisco.com>
Fri, 11 Dec 2020 21:58:12 +0000 (13:58 -0800)
committerFlorin Coras <fcoras@cisco.com>
Tue, 29 Dec 2020 20:11:07 +0000 (12:11 -0800)
Type: feature

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Ic373cd2c11272da539eb4b0db27227f36f2f9688

23 files changed:
src/plugins/hs_apps/sapi/vpp_echo.c
src/plugins/hs_apps/sapi/vpp_echo_bapi.c
src/plugins/hs_apps/sapi/vpp_echo_common.c
src/plugins/hs_apps/sapi/vpp_echo_common.h
src/plugins/hs_apps/sapi/vpp_echo_proto_udp.c
src/plugins/unittest/session_test.c
src/svm/fifo_segment.c
src/svm/fifo_segment.h
src/svm/fifo_types.h
src/svm/message_queue.c
src/svm/message_queue.h
src/svm/svm_fifo.c
src/vcl/vcl_bapi.c
src/vcl/vcl_private.c
src/vcl/vcl_private.h
src/vcl/vcl_sapi.c
src/vcl/vppcom.c
src/vnet/session/application_interface.h
src/vnet/session/segment_manager.c
src/vnet/session/session.c
src/vnet/session/session_api.c
src/vnet/session/session_debug.c
src/vnet/session/session_node.c

index a47a4d4..19b5808 100644 (file)
@@ -556,16 +556,14 @@ session_accepted_handler (session_accepted_msg_t * mp)
   session = echo_session_new (em);
 
   if (echo_attach_session (mp->segment_handle, mp->server_rx_fifo,
-                          mp->server_tx_fifo, session))
+                          mp->server_tx_fifo, mp->vpp_event_queue_address,
+                          session))
     {
       ECHO_FAIL (ECHO_FAIL_ACCEPTED_WAIT_FOR_SEG_ALLOC,
                 "accepted wait_for_segment_allocation errored");
       return;
     }
 
-  session->vpp_evt_q =
-    uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
-
   session->vpp_session_handle = mp->handle;
 
   /* session->transport needed by app_send_dgram */
@@ -617,14 +615,14 @@ session_connected_handler (session_connected_msg_t * mp)
   session = echo_session_new (em);
 
   if (echo_attach_session (mp->segment_handle, mp->server_rx_fifo,
-                          mp->server_tx_fifo, session))
+                          mp->server_tx_fifo, mp->vpp_event_queue_address,
+                          session))
     {
       ECHO_FAIL (ECHO_FAIL_CONNECTED_WAIT_FOR_SEG_ALLOC,
                 "connected wait_for_segment_allocation errored");
       return;
     }
-  session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
-                                        svm_msg_q_t *);
+
   session->vpp_session_handle = mp->handle;
   session->start = clib_time_now (&em->clib_time);
   session->listener_index = listener_index;
@@ -806,7 +804,7 @@ echo_process_rpcs (echo_main_t * em)
 {
   echo_rpc_msg_t *rpc;
   svm_msg_q_msg_t msg;
-  svm_msg_q_t *mq = em->rpc_msq_queue;
+  svm_msg_q_t *mq = &em->rpc_msq_queue;
 
   while (em->state < STATE_DATA_DONE && !em->time_to_stop)
     {
@@ -1321,7 +1319,7 @@ main (int argc, char **argv)
   cfg->n_rings = 1;
   cfg->q_nitems = rpc_queue_size;
   cfg->ring_cfgs = rc;
-  em->rpc_msq_queue = svm_msg_q_alloc (cfg);
+  svm_msg_q_attach (&em->rpc_msq_queue, svm_msg_q_alloc (cfg));
 
   signal (SIGINT, stop_signal);
   signal (SIGQUIT, stop_signal);
index c643cec..6ad825d 100644 (file)
@@ -264,12 +264,12 @@ echo_segment_detach (u64 segment_handle)
 
 int
 echo_attach_session (uword segment_handle, uword rxf_offset, uword txf_offset,
-                    echo_session_t *s)
+                    uword mq_offset, echo_session_t *s)
 {
   svm_fifo_shared_t *rx_fifo, *tx_fifo;
   echo_main_t *em = &echo_main;
+  u32 fs_index, eqs_index;
   fifo_segment_t *fs;
-  u32 fs_index;
 
   fs_index = echo_segment_lookup (segment_handle);
   if (fs_index == (u32) ~0)
@@ -279,6 +279,12 @@ echo_attach_session (uword segment_handle, uword rxf_offset, uword txf_offset,
       return -1;
     }
 
+  if (mq_offset != (uword) ~0)
+    {
+      eqs_index = echo_segment_lookup (ECHO_MQ_SEG_HANDLE);
+      ASSERT (eqs_index != (u32) ~0);
+    }
+
   rx_fifo = uword_to_pointer (rxf_offset, svm_fifo_shared_t *);
   tx_fifo = uword_to_pointer (txf_offset, svm_fifo_shared_t *);
   rx_fifo->client_session_index = s->session_index;
@@ -290,6 +296,39 @@ echo_attach_session (uword segment_handle, uword rxf_offset, uword txf_offset,
   s->rx_fifo = fifo_segment_alloc_fifo_w_shared (fs, rx_fifo);
   s->tx_fifo = fifo_segment_alloc_fifo_w_shared (fs, tx_fifo);
 
+  if (mq_offset != (uword) ~0)
+    {
+      fs = fifo_segment_get_segment (&em->segment_main, eqs_index);
+      s->vpp_evt_q =
+       fifo_segment_msg_q_attach (fs, mq_offset, rx_fifo->slice_index);
+    }
+
+  clib_spinlock_unlock (&em->segment_handles_lock);
+
+  return 0;
+}
+
+int
+echo_segment_attach_mq (uword segment_handle, uword mq_offset, u32 mq_index,
+                       svm_msg_q_t **mq)
+{
+  echo_main_t *em = &echo_main;
+  fifo_segment_t *fs;
+  u32 fs_index;
+
+  fs_index = echo_segment_lookup (segment_handle);
+  if (fs_index == (u32) ~0)
+    {
+      ECHO_LOG (0, "ERROR: mq segment %lx for is not attached!",
+               segment_handle);
+      return -1;
+    }
+
+  clib_spinlock_lock (&em->segment_handles_lock);
+
+  fs = fifo_segment_get_segment (&em->segment_main, fs_index);
+  *mq = fifo_segment_msg_q_attach (fs, mq_offset, mq_index);
+
   clib_spinlock_unlock (&em->segment_handles_lock);
 
   return 0;
@@ -338,8 +377,6 @@ static void
   em->state = STATE_CLEANED_CERT_KEY;
 }
 
-#define ECHO_MQ_SEG_HANDLE ((u64) ~0 - 1)
-
 static void
 vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
 {
@@ -364,8 +401,6 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
       ECHO_FAIL (ECHO_FAIL_VL_API_NULL_APP_MQ, "NULL app_mq");
       return;
     }
-  em->app_mq = uword_to_pointer (mp->app_mq, svm_msg_q_t *);
-  em->ctrl_mq = uword_to_pointer (mp->vpp_ctrl_mq, svm_msg_q_t *);
 
   if (mp->n_fds)
     {
@@ -385,6 +420,8 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
                       "svm_fifo_segment_attach failed on SSVM_SEGMENT_MEMFD");
            goto failed;
          }
+      echo_segment_attach_mq (ECHO_MQ_SEG_HANDLE, mp->vpp_ctrl_mq,
+                             mp->vpp_ctrl_mq_thread, &em->ctrl_mq);
 
       if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT)
        {
@@ -401,6 +438,7 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
            }
          vec_free (segment_name);
        }
+      echo_segment_attach_mq (segment_handle, mp->app_mq, 0, &em->app_mq);
 
       if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
        svm_msg_q_set_consumer_eventfd (em->app_mq, fds[n_fds++]);
index 497f56c..e24629b 100644 (file)
@@ -543,23 +543,23 @@ echo_send_rpc (echo_main_t * em, void *fp, echo_rpc_args_t * args)
 {
   svm_msg_q_msg_t msg;
   echo_rpc_msg_t *evt;
-  if (PREDICT_FALSE (svm_msg_q_lock (em->rpc_msq_queue)))
+  if (PREDICT_FALSE (svm_msg_q_lock (&em->rpc_msq_queue)))
     {
       ECHO_FAIL (ECHO_FAIL_RPC_SIZE, "RPC lock failed");
       return -1;
     }
-  if (PREDICT_FALSE (svm_msg_q_ring_is_full (em->rpc_msq_queue, 0)))
+  if (PREDICT_FALSE (svm_msg_q_ring_is_full (&em->rpc_msq_queue, 0)))
     {
-      svm_msg_q_unlock (em->rpc_msq_queue);
+      svm_msg_q_unlock (&em->rpc_msq_queue);
       ECHO_FAIL (ECHO_FAIL_RPC_SIZE, "RPC ring is full");
       return -2;
     }
-  msg = svm_msg_q_alloc_msg_w_ring (em->rpc_msq_queue, 0);
-  evt = (echo_rpc_msg_t *) svm_msg_q_msg_data (em->rpc_msq_queue, &msg);
+  msg = svm_msg_q_alloc_msg_w_ring (&em->rpc_msq_queue, 0);
+  evt = (echo_rpc_msg_t *) svm_msg_q_msg_data (&em->rpc_msq_queue, &msg);
   evt->fp = fp;
   clib_memcpy (&evt->args, args, sizeof (evt->args));
 
-  svm_msg_q_add_and_unlock (em->rpc_msq_queue, &msg);
+  svm_msg_q_add_and_unlock (&em->rpc_msq_queue, &msg);
   return 0;
 }
 
index cd2bbb6..dc5f7df 100644 (file)
@@ -38,6 +38,7 @@
 #define TIMEOUT 10.0
 #define LOGGING_BATCH (100)
 #define LOG_EVERY_N_IDLE_CYCLES (1e8)
+#define ECHO_MQ_SEG_HANDLE     ((u64) ~0 - 1)
 
 #define foreach_echo_fail_code                                          \
   _(ECHO_FAIL_NONE, "ECHO_FAIL_NONE")                                   \
@@ -300,7 +301,7 @@ typedef struct
   uword *shared_segment_handles;       /* Hash table : segment_names -> 1 */
   clib_spinlock_t segment_handles_lock;        /* Hash table lock */
   echo_proto_cb_vft_t *proto_cb_vft;
-  svm_msg_q_t *rpc_msq_queue;  /* MQ between quic_echo threads */
+  svm_msg_q_t rpc_msq_queue; /* MQ between quic_echo threads */
   fifo_segment_main_t segment_main;
 
   /* State of the connection, shared between msg RX thread and main thread */
@@ -444,7 +445,9 @@ int echo_segment_attach (u64 segment_handle, char *name,
 u32 echo_segment_lookup (u64 segment_handle);
 void echo_segment_detach (u64 segment_handle);
 int echo_attach_session (uword segment_handle, uword rxf_offset,
-                        uword txf_offset, echo_session_t *s);
+                        uword mq_offset, uword txf_offset, echo_session_t *s);
+int echo_segment_attach_mq (uword segment_handle, uword mq_offset,
+                           u32 mq_index, svm_msg_q_t **mq);
 
 /* Binary API */
 
index 9689a83..10dfcf0 100644 (file)
@@ -132,14 +132,13 @@ udp_echo_bound_uri_cb (session_bound_msg_t * mp, echo_session_t * session)
     return;
 
   if (echo_attach_session (mp->segment_handle, mp->rx_fifo, mp->tx_fifo,
-                          session))
+                          mp->vpp_evt_q, session))
     {
       ECHO_FAIL (ECHO_FAIL_ACCEPTED_WAIT_FOR_SEG_ALLOC,
                 "accepted wait_for_segment_allocation errored");
       return;
     }
 
-  session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
   session->transport.is_ip4 = mp->lcl_is_ip4;
   clib_memcpy_fast (&session->transport.lcl_ip, mp->lcl_ip,
                    sizeof (ip46_address_t));
index f54ed9f..68605b2 100644 (file)
@@ -1909,8 +1909,9 @@ session_test_mq_basic (vlib_main_t * vm, unformat_input_t * input)
   svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
   svm_msg_q_msg_t msg1, msg2, msg[12];
   int __clib_unused verbose, i, rv;
-  svm_msg_q_t *mq;
+  svm_msg_q_shared_t *smq;
   svm_msg_q_ring_t *ring;
+  svm_msg_q_t _mq = { 0 }, *mq = &_mq;
   u8 *rings_ptr;
 
   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
@@ -1933,28 +1934,30 @@ session_test_mq_basic (vlib_main_t * vm, unformat_input_t * input)
   cfg->q_nitems = 16;
   cfg->ring_cfgs = rc;
 
-  mq = svm_msg_q_alloc (cfg);
+  smq = svm_msg_q_alloc (cfg);
+  svm_msg_q_attach (mq, smq);
   SESSION_TEST (mq != 0, "svm_msg_q_alloc");
   SESSION_TEST (vec_len (mq->rings) == 2, "ring allocation");
-  rings_ptr = (u8 *) mq->rings + vec_bytes (mq->rings);
+  rings_ptr = (u8 *) mq->rings[0].shr->data;
   vec_foreach (ring, mq->rings)
   {
-    SESSION_TEST (ring->data == rings_ptr, "ring data");
+    SESSION_TEST (ring->shr->data == rings_ptr, "ring data");
     rings_ptr += (uword) ring->nitems * ring->elsize;
+    rings_ptr += sizeof (svm_msg_q_ring_shared_t);
   }
 
   msg1 = svm_msg_q_alloc_msg (mq, 8);
-  rv = (mq->rings[0].cursize != 1
-       || msg1.ring_index != 0 || msg1.elt_index != 0);
+  rv = (mq->rings[0].shr->cursize != 1 || msg1.ring_index != 0 ||
+       msg1.elt_index != 0);
   SESSION_TEST (rv == 0, "msg alloc1");
 
   msg2 = svm_msg_q_alloc_msg (mq, 15);
-  rv = (mq->rings[1].cursize != 1
-       || msg2.ring_index != 1 || msg2.elt_index != 0);
+  rv = (mq->rings[1].shr->cursize != 1 || msg2.ring_index != 1 ||
+       msg2.elt_index != 0);
   SESSION_TEST (rv == 0, "msg alloc2");
 
   svm_msg_q_free_msg (mq, &msg1);
-  SESSION_TEST (mq->rings[0].cursize == 0, "free msg");
+  SESSION_TEST (mq->rings[0].shr->cursize == 0, "free msg");
 
   for (i = 0; i < 12; i++)
     {
@@ -1962,7 +1965,7 @@ session_test_mq_basic (vlib_main_t * vm, unformat_input_t * input)
       *(u32 *) svm_msg_q_msg_data (mq, &msg[i]) = i;
     }
 
-  rv = (mq->rings[0].cursize != 8 || mq->rings[1].cursize != 5);
+  rv = (mq->rings[0].shr->cursize != 8 || mq->rings[1].shr->cursize != 5);
   SESSION_TEST (rv == 0, "msg alloc3");
 
   *(u32 *) svm_msg_q_msg_data (mq, &msg2) = 123;
@@ -1998,7 +2001,7 @@ session_test_mq_basic (vlib_main_t * vm, unformat_input_t * input)
        SESSION_TEST (0, "dequeue2 wrong data");
       svm_msg_q_free_msg (mq, &msg[i]);
     }
-  rv = (mq->rings[0].cursize == 0 && mq->rings[1].cursize == 0);
+  rv = (mq->rings[0].shr->cursize == 0 && mq->rings[1].shr->cursize == 0);
   SESSION_TEST (rv, "post dequeue");
 
   return 0;
index 636f223..0c3a79a 100644 (file)
@@ -764,9 +764,15 @@ void
 fifo_segment_cleanup (fifo_segment_t *fs)
 {
   int slice_index;
+  svm_msg_q_t *mq = 0;
 
   for (slice_index = 0; slice_index < fs->n_slices; slice_index++)
     clib_mem_bulk_destroy (fs->slices[slice_index].fifos);
+
+  vec_foreach (fs->mqs, mq)
+    vec_free (mq->rings);
+
+  vec_free (fs->mqs);
 }
 
 /**
@@ -944,6 +950,69 @@ fifo_segment_attach_fifo (fifo_segment_t * fs, svm_fifo_t * f,
     }
 }
 
+svm_msg_q_t *
+fifo_segment_msg_q_alloc (fifo_segment_t *fs, u32 mq_index,
+                         svm_msg_q_cfg_t *cfg)
+{
+  fifo_segment_header_t *fsh = fs->h;
+  svm_msg_q_shared_t *smq;
+  svm_msg_q_t *mq;
+  void *base;
+  u32 size;
+
+  if (!fs->mqs)
+    {
+      u32 n_mqs = clib_max (fs->h->n_mqs, 1);
+      vec_validate (fs->mqs, n_mqs - 1);
+    }
+
+  size = svm_msg_q_size_to_alloc (cfg);
+  base = fsh_alloc_aligned (fsh, size, 8);
+  fsh->n_reserved_bytes += size;
+
+  smq = svm_msg_q_init (base, cfg);
+  mq = vec_elt_at_index (fs->mqs, mq_index);
+  svm_msg_q_attach (mq, smq);
+
+  return mq;
+}
+
+svm_msg_q_t *
+fifo_segment_msg_q_attach (fifo_segment_t *fs, uword offset, u32 mq_index)
+{
+  svm_msg_q_t *mq;
+
+  if (!fs->mqs)
+    {
+      u32 n_mqs = clib_max (fs->h->n_mqs, 1);
+      vec_validate (fs->mqs, n_mqs - 1);
+    }
+
+  mq = vec_elt_at_index (fs->mqs, mq_index);
+
+  if (!mq->q)
+    {
+      svm_msg_q_shared_t *smq;
+      smq = (svm_msg_q_shared_t *) ((u8 *) fs->h + offset);
+      svm_msg_q_attach (mq, smq);
+    }
+
+  ASSERT (fifo_segment_msg_q_offset (fs, mq_index) == offset);
+
+  return mq;
+}
+
+uword
+fifo_segment_msg_q_offset (fifo_segment_t *fs, u32 mq_index)
+{
+  svm_msg_q_t *mq = vec_elt_at_index (fs->mqs, mq_index);
+
+  if (mq->q == 0)
+    return ~0ULL;
+
+  return (uword) ((u8 *) mq->q - (u8 *) fs->h) - sizeof (svm_msg_q_shared_t);
+}
+
 int
 fifo_segment_prealloc_fifo_hdrs (fifo_segment_t * fs, u32 slice_index,
                                 u32 batch_size)
index 006ffc4..195869a 100644 (file)
@@ -17,6 +17,7 @@
 
 #include <svm/ssvm.h>
 #include <svm/fifo_types.h>
+#include <svm/message_queue.h>
 #include <svm/svm_fifo.h>
 
 typedef enum
@@ -70,6 +71,7 @@ typedef struct
   uword max_byte_index;
   u8 n_slices;                 /**< number of fifo segment slices */
   fifo_slice_private_t *slices; /**< private slice information */
+  svm_msg_q_t *mqs;            /**< private vec of attached mqs */
 } fifo_segment_t;
 
 typedef struct
@@ -129,6 +131,37 @@ void fifo_segment_detach_fifo (fifo_segment_t * fs, svm_fifo_t * f);
 void fifo_segment_attach_fifo (fifo_segment_t * fs, svm_fifo_t * f,
                               u32 slice_index);
 
+/**
+ * Allocate message queue on segment
+ *
+ * @param fs           fifo segment for mq
+ * @param mq_index     index in private mqs vector to use to attach
+ * @param cfg          configuration for mq
+ * @return             attached message queue
+ */
+svm_msg_q_t *fifo_segment_msg_q_alloc (fifo_segment_t *fs, u32 mq_index,
+                                      svm_msg_q_cfg_t *cfg);
+
+/**
+ *  Attach message queue at fifo segment offset
+ *
+ *  @param fs          fifo segment for mq
+ *  @param offset      offset for shared mq on the segment
+ *  @param mq_index    index in private mqs vector to use to attach
+ *  @return            attached message queue
+ */
+svm_msg_q_t *fifo_segment_msg_q_attach (fifo_segment_t *fs, uword offset,
+                                       u32 mq_index);
+
+/**
+ * Message queue offset on segment
+ *
+ * @param fs           fifo segment for mq
+ * @param mq_index     index of mq in private mqs vector
+ * @return             offset of the shared mq the private mq is attached to
+ */
+uword fifo_segment_msg_q_offset (fifo_segment_t *fs, u32 mq_index);
+
 /**
  * Try to preallocate fifo headers
  *
index 85e67bb..bfd1a41 100644 (file)
@@ -146,6 +146,7 @@ struct fifo_segment_header_
   u8 high_watermark;                   /**< Memory pressure watermark high */
   u8 low_watermark;                    /**< Memory pressure watermark low */
   u8 pct_first_alloc;                  /**< Pct of fifo size to alloc */
+  u8 n_mqs;                            /**< Num mqs for mqs segment */
   CLIB_CACHE_LINE_ALIGN_MARK (allocator);
   uword byte_index;
   uword max_byte_index;
index e586841..0ebce70 100644 (file)
@@ -34,45 +34,34 @@ static inline void *
 svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index)
 {
   ASSERT (elt_index < ring->nitems);
-  return (ring->data + elt_index * ring->elsize);
+  return (ring->shr->data + elt_index * ring->elsize);
 }
 
-svm_msg_q_t *
+svm_msg_q_shared_t *
 svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg)
 {
-  svm_msg_q_ring_t *ring;
-  vec_header_t *vh;
-  svm_msg_q_t *mq;
-  u8 *rings_ptr;
-  u32 q_sz;
+  svm_msg_q_ring_shared_t *ring;
+  svm_msg_q_shared_t *smq;
+  u32 q_sz, offset;
   int i;
 
   q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
 
-  mq = (svm_msg_q_t *) base;
-  mq->q = svm_queue_init (base + sizeof (svm_msg_q_t), cfg->q_nitems,
-                         sizeof (svm_msg_q_msg_t));
-  mq->q->consumer_pid = cfg->consumer_pid;
-  vh = (vec_header_t *) ((u8 *) mq->q + q_sz);
-  vh->len = cfg->n_rings;
-  mq->rings = (svm_msg_q_ring_t *) (vh + 1);
-  rings_ptr = (u8 *) mq->rings + sizeof (svm_msg_q_ring_t) * cfg->n_rings;
+  smq = (svm_msg_q_shared_t *) base;
+  svm_queue_init (&smq->q, cfg->q_nitems, sizeof (svm_msg_q_msg_t));
+  smq->q->consumer_pid = cfg->consumer_pid;
+  smq->n_rings = cfg->n_rings;
+  ring = (void *) ((u8 *) smq->q + q_sz);
   for (i = 0; i < cfg->n_rings; i++)
     {
-      ring = &mq->rings[i];
       ring->elsize = cfg->ring_cfgs[i].elsize;
       ring->nitems = cfg->ring_cfgs[i].nitems;
       ring->cursize = ring->head = ring->tail = 0;
-      if (cfg->ring_cfgs[i].data)
-       ring->data = cfg->ring_cfgs[i].data;
-      else
-       {
-         ring->data = rings_ptr;
-         rings_ptr += (uword) ring->nitems * ring->elsize;
-       }
+      offset = sizeof (*ring) + ring->nitems * ring->elsize;
+      ring = (void *) ((u8 *) ring + offset);
     }
 
-  return mq;
+  return smq;
 }
 
 uword
@@ -80,12 +69,12 @@ svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg)
 {
   svm_msg_q_ring_cfg_t *ring_cfg;
   uword rings_sz = 0, mq_sz;
-  u32 vec_sz, q_sz;
+  u32 q_sz;
   int i;
 
   ASSERT (cfg);
 
-  vec_sz = vec_header_bytes (0) + sizeof (svm_msg_q_ring_t) * cfg->n_rings;
+  rings_sz = sizeof (svm_msg_q_ring_shared_t) * cfg->n_rings;
   for (i = 0; i < cfg->n_rings; i++)
     {
       if (cfg->ring_cfgs[i].data)
@@ -95,33 +84,18 @@ svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg)
     }
 
   q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
-  mq_sz = sizeof (svm_msg_q_t) + vec_sz + rings_sz + q_sz;
+  mq_sz = sizeof (svm_msg_q_shared_t) + q_sz + rings_sz;
 
   return mq_sz;
 }
 
-svm_msg_q_t *
+svm_msg_q_shared_t *
 svm_msg_q_alloc (svm_msg_q_cfg_t *cfg)
 {
-  svm_msg_q_ring_cfg_t *ring_cfg;
-  uword rings_sz = 0, mq_sz;
-  u32 vec_sz, q_sz;
+  uword mq_sz;
   u8 *base;
-  int i;
-
-  ASSERT (cfg);
 
-  vec_sz = vec_header_bytes (0) + sizeof (svm_msg_q_ring_t) * cfg->n_rings;
-  for (i = 0; i < cfg->n_rings; i++)
-    {
-      if (cfg->ring_cfgs[i].data)
-       continue;
-      ring_cfg = &cfg->ring_cfgs[i];
-      rings_sz += (uword) ring_cfg->nitems * ring_cfg->elsize;
-    }
-
-  q_sz = sizeof (svm_queue_t) + cfg->q_nitems * sizeof (svm_msg_q_msg_t);
-  mq_sz = sizeof (svm_msg_q_t) + vec_sz + rings_sz + q_sz;
+  mq_sz = svm_msg_q_size_to_alloc (cfg);
   base = clib_mem_alloc_aligned (mq_sz, CLIB_CACHE_LINE_BYTES);
   if (!base)
     return 0;
@@ -129,6 +103,29 @@ svm_msg_q_alloc (svm_msg_q_cfg_t *cfg)
   return svm_msg_q_init (base, cfg);
 }
 
+void
+svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base)
+{
+  svm_msg_q_ring_shared_t *ring;
+  svm_msg_q_shared_t *smq;
+  u32 i, n_rings, q_sz, offset;
+
+  smq = (svm_msg_q_shared_t *) smq_base;
+  mq->q = smq->q;
+  n_rings = smq->n_rings;
+  vec_validate (mq->rings, n_rings - 1);
+  q_sz = sizeof (svm_queue_t) + mq->q->maxsize * sizeof (svm_msg_q_msg_t);
+  ring = (void *) ((u8 *) smq->q + q_sz);
+  for (i = 0; i < n_rings; i++)
+    {
+      mq->rings[i].nitems = ring->nitems;
+      mq->rings[i].elsize = ring->elsize;
+      mq->rings[i].shr = ring;
+      offset = sizeof (*ring) + ring->nitems * ring->elsize;
+      ring = (void *) ((u8 *) ring + offset);
+    }
+}
+
 void
 svm_msg_q_free (svm_msg_q_t * mq)
 {
@@ -139,14 +136,18 @@ svm_msg_q_free (svm_msg_q_t * mq)
 svm_msg_q_msg_t
 svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index)
 {
+  svm_msg_q_ring_shared_t *sr;
+  svm_msg_q_ring_t *ring;
   svm_msg_q_msg_t msg;
-  svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index);
 
-  ASSERT (ring->cursize < ring->nitems);
+  ring = svm_msg_q_ring_inline (mq, ring_index);
+  sr = ring->shr;
+
+  ASSERT (sr->cursize < ring->nitems);
   msg.ring_index = ring - mq->rings;
-  msg.elt_index = ring->tail;
-  ring->tail = (ring->tail + 1) % ring->nitems;
-  clib_atomic_fetch_add (&ring->cursize, 1);
+  msg.elt_index = sr->tail;
+  sr->tail = (sr->tail + 1) % ring->nitems;
+  clib_atomic_fetch_add (&sr->cursize, 1);
   return msg;
 }
 
@@ -181,16 +182,18 @@ svm_msg_q_msg_t
 svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes)
 {
   svm_msg_q_msg_t msg = {.as_u64 = ~0 };
+  svm_msg_q_ring_shared_t *sr;
   svm_msg_q_ring_t *ring;
 
   vec_foreach (ring, mq->rings)
   {
-    if (ring->elsize < nbytes || ring->cursize == ring->nitems)
+    sr = ring->shr;
+    if (ring->elsize < nbytes || sr->cursize == ring->nitems)
       continue;
     msg.ring_index = ring - mq->rings;
-    msg.elt_index = ring->tail;
-    ring->tail = (ring->tail + 1) % ring->nitems;
-    clib_atomic_fetch_add (&ring->cursize, 1);
+    msg.elt_index = sr->tail;
+    sr->tail = (sr->tail + 1) % ring->nitems;
+    clib_atomic_fetch_add (&sr->cursize, 1);
     break;
   }
   return msg;
@@ -206,14 +209,16 @@ svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
 void
 svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
 {
+  svm_msg_q_ring_shared_t *sr;
   svm_msg_q_ring_t *ring;
   int need_signal;
 
   ASSERT (vec_len (mq->rings) > msg->ring_index);
-  ring = &mq->rings[msg->ring_index];
-  if (msg->elt_index == ring->head)
+  ring = svm_msg_q_ring_inline (mq, msg->ring_index);
+  sr = ring->shr;
+  if (msg->elt_index == sr->head)
     {
-      ring->head = (ring->head + 1) % ring->nitems;
+      sr->head = (sr->head + 1) % ring->nitems;
     }
   else
     {
@@ -222,8 +227,8 @@ svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
       ASSERT (0);
     }
 
-  need_signal = ring->cursize == ring->nitems;
-  clib_atomic_fetch_sub (&ring->cursize, 1);
+  need_signal = sr->cursize == ring->nitems;
+  clib_atomic_fetch_sub (&sr->cursize, 1);
 
   if (PREDICT_FALSE (need_signal))
     svm_queue_send_signal (mq->q, 0);
@@ -233,17 +238,20 @@ static int
 svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
 {
   u32 dist1, dist2, tail, head;
+  svm_msg_q_ring_shared_t *sr;
   svm_msg_q_ring_t *ring;
 
   if (vec_len (mq->rings) <= msg->ring_index)
     return 0;
-  ring = &mq->rings[msg->ring_index];
-  tail = ring->tail;
-  head = ring->head;
+
+  ring = svm_msg_q_ring_inline (mq, msg->ring_index);
+  sr = ring->shr;
+  tail = sr->tail;
+  head = sr->head;
 
   dist1 = ((ring->nitems + msg->elt_index) - head) % ring->nitems;
   if (tail == head)
-    dist2 = (ring->cursize == 0) ? 0 : ring->nitems;
+    dist2 = (sr->cursize == 0) ? 0 : ring->nitems;
   else
     dist2 = ((ring->nitems + tail) - head) % ring->nitems;
   return (dist1 < dist2);
@@ -316,7 +324,7 @@ format_svm_msg_q (u8 * s, va_list * args)
   s = format (s, " [Q:%d/%d]", mq->q->cursize, mq->q->maxsize);
   for (u32 i = 0; i < vec_len (mq->rings); i++)
     {
-      s = format (s, " [R%d:%d/%d]", i, mq->rings[i].cursize,
+      s = format (s, " [R%d:%d/%d]", i, mq->rings[i].shr->cursize,
                  mq->rings[i].nitems);
     }
   return s;
index 50f79fb..4b314b8 100644 (file)
 #include <vppinfra/error.h>
 #include <svm/queue.h>
 
-typedef struct svm_msg_q_ring_
+typedef struct svm_msg_q_ring_shared_
 {
   volatile u32 cursize;                        /**< current size of the ring */
   u32 nitems;                          /**< max size of the ring */
   volatile u32 head;                   /**< current head (for dequeue) */
   volatile u32 tail;                   /**< current tail (for enqueue) */
   u32 elsize;                          /**< size of an element */
-  u8 *data;                            /**< chunk of memory for msg data */
+  u8 data[0];                          /**< chunk of memory for msg data */
+} svm_msg_q_ring_shared_t;
+
+typedef struct svm_msg_q_ring_
+{
+  u32 nitems;                  /**< max size of the ring */
+  u32 elsize;                  /**< size of an element */
+  svm_msg_q_ring_shared_t *shr; /**< ring in shared memory */
 } __clib_packed svm_msg_q_ring_t;
 
+typedef struct svm_msg_q_shared_
+{
+  u32 n_rings;     /**< number of rings after q */
+  u32 pad;         /**< 8 byte alignment for q */
+  svm_queue_t q[0]; /**< queue for exchanging messages */
+} __clib_packed svm_msg_q_shared_t;
+
 typedef struct svm_msg_q_
 {
   svm_queue_t *q;                      /**< queue for exchanging messages */
@@ -77,10 +91,12 @@ typedef union
  *                     ring configs
  * @return             message queue
  */
-svm_msg_q_t *svm_msg_q_alloc (svm_msg_q_cfg_t * cfg);
-svm_msg_q_t *svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg);
+svm_msg_q_shared_t *svm_msg_q_alloc (svm_msg_q_cfg_t *cfg);
+svm_msg_q_shared_t *svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg);
 uword svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg);
 
+void svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base);
+
 /**
  * Free message queue
  *
@@ -267,8 +283,8 @@ svm_msg_q_is_full (svm_msg_q_t * mq)
 static inline u8
 svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
 {
-  ASSERT (ring_index < vec_len (mq->rings));
-  return (mq->rings[ring_index].cursize == mq->rings[ring_index].nitems);
+  svm_msg_q_ring_t *ring = vec_elt_at_index (mq->rings, ring_index);
+  return (ring->shr->cursize >= ring->nitems);
 }
 
 /**
index f79f37a..1472971 100644 (file)
@@ -398,6 +398,7 @@ svm_fifo_init (svm_fifo_t * f, u32 size)
     {
       c->start_byte = prev->start_byte + prev->length;
       c->enq_rb_index = c->deq_rb_index = RBTREE_TNIL_INDEX;
+      ASSERT (c->length >= 1 << FS_MIN_LOG2_CHUNK_SZ);
       prev = c;
       c = f_cptr (f, c->next);
     }
index e245c4d..a13948d 100644 (file)
@@ -65,7 +65,6 @@ static void
 vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
 {
   vcl_worker_t *wrk = vcl_worker_get (0);
-  svm_msg_q_t *ctrl_mq;
   u64 segment_handle;
   int *fds = 0, i, rv;
   u32 n_fds = 0;
@@ -77,9 +76,6 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
       goto failed;
     }
 
-  wrk->app_event_queue = uword_to_pointer (mp->app_mq, svm_msg_q_t *);
-  ctrl_mq = uword_to_pointer (mp->vpp_ctrl_mq, svm_msg_q_t *);
-  vcm->ctrl_mq = wrk->ctrl_mq = ctrl_mq;
   segment_handle = clib_net_to_host_u64 (mp->segment_handle);
   if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
     {
@@ -100,6 +96,11 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
                                fds[n_fds++]))
          goto failed;
 
+      vcl_segment_attach_mq (vcl_vpp_worker_segment_handle (0),
+                            mp->vpp_ctrl_mq, mp->vpp_ctrl_mq_thread,
+                            &wrk->ctrl_mq);
+      vcm->ctrl_mq = wrk->ctrl_mq;
+
       if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT)
        {
          segment_name = vl_api_from_api_to_new_c_string (&mp->segment_name);
@@ -111,6 +112,8 @@ vl_api_app_attach_reply_t_handler (vl_api_app_attach_reply_t * mp)
            goto failed;
        }
 
+      vcl_segment_attach_mq (segment_handle, mp->app_mq, 0,
+                            &wrk->app_event_queue);
 
       if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
        {
@@ -169,8 +172,6 @@ vl_api_app_worker_add_del_reply_t_handler (vl_api_app_worker_add_del_reply_t *
     return;
 
   wrk->vpp_wrk_index = clib_net_to_host_u32 (mp->wrk_index);
-  wrk->app_event_queue = uword_to_pointer (mp->app_event_queue_address,
-                                          svm_msg_q_t *);
   wrk->ctrl_mq = vcm->ctrl_mq;
 
   segment_handle = clib_net_to_host_u64 (mp->segment_handle);
@@ -204,6 +205,9 @@ vl_api_app_worker_add_del_reply_t_handler (vl_api_app_worker_add_del_reply_t *
            goto failed;
        }
 
+      vcl_segment_attach_mq (segment_handle, mp->app_event_queue_address, 0,
+                            &wrk->app_event_queue);
+
       if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
        {
          svm_msg_q_set_consumer_eventfd (wrk->app_event_queue, fds[n_fds]);
index ea93811..5b41235 100644 (file)
@@ -376,12 +376,14 @@ vcl_segment_detach (u64 segment_handle)
 
 int
 vcl_segment_attach_session (uword segment_handle, uword rxf_offset,
-                           uword txf_offset, u8 is_ct, vcl_session_t *s)
+                           uword txf_offset, uword mq_offset, u8 is_ct,
+                           vcl_session_t *s)
 {
   svm_fifo_shared_t *rxsf, *txsf;
+  u32 fs_index, eqs_index;
   svm_fifo_t *rxf, *txf;
   fifo_segment_t *fs;
-  u32 fs_index;
+  u64 eqs_handle;
 
   fs_index = vcl_segment_table_lookup (segment_handle);
   if (fs_index == VCL_INVALID_SEGMENT_INDEX)
@@ -391,6 +393,13 @@ vcl_segment_attach_session (uword segment_handle, uword rxf_offset,
       return -1;
     }
 
+  if (mq_offset != (uword) ~0)
+    {
+      eqs_handle = vcl_vpp_worker_segment_handle (0);
+      eqs_index = vcl_segment_table_lookup (eqs_handle);
+      ASSERT (eqs_index != VCL_INVALID_SEGMENT_INDEX);
+    }
+
   rxsf = uword_to_pointer (rxf_offset, svm_fifo_shared_t *);
   txsf = uword_to_pointer (txf_offset, svm_fifo_shared_t *);
 
@@ -400,6 +409,13 @@ vcl_segment_attach_session (uword segment_handle, uword rxf_offset,
   rxf = fifo_segment_alloc_fifo_w_shared (fs, rxsf);
   txf = fifo_segment_alloc_fifo_w_shared (fs, txsf);
 
+  if (!is_ct && mq_offset != (uword) ~0)
+    {
+      fs = fifo_segment_get_segment (&vcm->segment_main, eqs_index);
+      s->vpp_evt_q =
+       fifo_segment_msg_q_attach (fs, mq_offset, rxf->shr->slice_index);
+    }
+
   clib_rwlock_reader_unlock (&vcm->segment_table_lock);
 
   if (!is_ct)
@@ -420,6 +436,30 @@ vcl_segment_attach_session (uword segment_handle, uword rxf_offset,
   return 0;
 }
 
+int
+vcl_segment_attach_mq (uword segment_handle, uword mq_offset, u32 mq_index,
+                      svm_msg_q_t **mq)
+{
+  fifo_segment_t *fs;
+  u32 fs_index;
+
+  fs_index = vcl_segment_table_lookup (segment_handle);
+  if (fs_index == VCL_INVALID_SEGMENT_INDEX)
+    {
+      VDBG (0, "ERROR: mq segment %lx for is not attached!", segment_handle);
+      return -1;
+    }
+
+  clib_rwlock_reader_lock (&vcm->segment_table_lock);
+
+  fs = fifo_segment_get_segment (&vcm->segment_main, fs_index);
+  *mq = fifo_segment_msg_q_attach (fs, mq_offset, mq_index);
+
+  clib_rwlock_reader_unlock (&vcm->segment_table_lock);
+
+  return 0;
+}
+
 /*
  * fd.io coding-style-patch-verification: ON
  *
index 637581b..c864375 100644 (file)
@@ -686,7 +686,10 @@ void vcl_segment_detach (u64 segment_handle);
 void vcl_send_session_unlisten (vcl_worker_t * wrk, vcl_session_t * s);
 
 int vcl_segment_attach_session (uword segment_handle, uword rxf_offset,
-                               uword txf_offset, u8 is_ct, vcl_session_t *s);
+                               uword txf_offset, uword mq_offset, u8 is_ct,
+                               vcl_session_t *s);
+int vcl_segment_attach_mq (uword segment_handle, uword mq_offset, u32 mq_index,
+                          svm_msg_q_t **mq);
 
 /*
  * VCL Binary API
index 7651b35..bc44272 100644 (file)
@@ -46,7 +46,6 @@ vcl_api_attach_reply_handler (app_sapi_attach_reply_msg_t * mp, int *fds)
 {
   vcl_worker_t *wrk = vcl_worker_get_current ();
   int i, rv, n_fds_used = 0;
-  svm_msg_q_t *ctrl_mq;
   u64 segment_handle;
   u8 *segment_name;
 
@@ -57,9 +56,6 @@ vcl_api_attach_reply_handler (app_sapi_attach_reply_msg_t * mp, int *fds)
     }
 
   wrk->api_client_handle = mp->api_client_handle;
-  wrk->app_event_queue = uword_to_pointer (mp->app_mq, svm_msg_q_t *);
-  ctrl_mq = uword_to_pointer (mp->vpp_ctrl_mq, svm_msg_q_t *);
-  vcm->ctrl_mq = wrk->ctrl_mq = ctrl_mq;
   segment_handle = mp->segment_handle;
   if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
     {
@@ -75,6 +71,10 @@ vcl_api_attach_reply_handler (app_sapi_attach_reply_msg_t * mp, int *fds)
                            SSVM_SEGMENT_MEMFD, fds[n_fds_used++]))
       goto failed;
 
+  vcl_segment_attach_mq (vcl_vpp_worker_segment_handle (0), mp->vpp_ctrl_mq,
+                        mp->vpp_ctrl_mq_thread, &wrk->ctrl_mq);
+  vcm->ctrl_mq = wrk->ctrl_mq;
+
   if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT)
     {
       segment_name = format (0, "memfd-%ld%c", segment_handle, 0);
@@ -85,6 +85,8 @@ vcl_api_attach_reply_handler (app_sapi_attach_reply_msg_t * mp, int *fds)
        goto failed;
     }
 
+  vcl_segment_attach_mq (segment_handle, mp->app_mq, 0, &wrk->app_event_queue);
+
   if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
     {
       svm_msg_q_set_consumer_eventfd (wrk->app_event_queue,
@@ -201,8 +203,6 @@ vcl_api_add_del_worker_reply_handler (app_sapi_worker_add_del_reply_msg_t *
   wrk = vcl_worker_get_current ();
   wrk->api_client_handle = mp->api_client_handle;
   wrk->vpp_wrk_index = mp->wrk_index;
-  wrk->app_event_queue = uword_to_pointer (mp->app_event_queue_address,
-                                          svm_msg_q_t *);
   wrk->ctrl_mq = vcm->ctrl_mq;
 
   segment_handle = mp->segment_handle;
@@ -231,6 +231,9 @@ vcl_api_add_del_worker_reply_handler (app_sapi_worker_add_del_reply_msg_t *
        goto failed;
     }
 
+  vcl_segment_attach_mq (segment_handle, mp->app_event_queue_address, 0,
+                        &wrk->app_event_queue);
+
   if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD)
     {
       svm_msg_q_set_consumer_eventfd (wrk->app_event_queue, fds[n_fds]);
index 734d062..dbb2cd5 100644 (file)
@@ -374,11 +374,9 @@ vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp,
       goto error;
     }
 
-  session->vpp_evt_q =
-    uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
-
   if (vcl_segment_attach_session (mp->segment_handle, mp->server_rx_fifo,
-                                 mp->server_tx_fifo, 0, session))
+                                 mp->server_tx_fifo,
+                                 mp->vpp_event_queue_address, 0, session))
     {
       VDBG (0, "failed to attach fifos for %u", session->session_index);
       goto error;
@@ -412,7 +410,8 @@ vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp,
   return session->session_index;
 
 error:
-  evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
+  vcl_segment_attach_mq (vcl_vpp_worker_segment_handle (0),
+                        mp->vpp_event_queue_address, mp->mq_index, &evt_q);
   vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle,
                                   VNET_API_ERROR_INVALID_ARGUMENT);
   vcl_session_free (wrk, session);
@@ -444,11 +443,10 @@ vcl_session_connected_handler (vcl_worker_t * wrk,
     }
 
   session->vpp_handle = mp->handle;
-  session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
-                                        svm_msg_q_t *);
 
   if (vcl_segment_attach_session (mp->segment_handle, mp->server_rx_fifo,
-                                 mp->server_tx_fifo, 0, session))
+                                 mp->server_tx_fifo,
+                                 mp->vpp_event_queue_address, 0, session))
     {
       VDBG (0, "failed to attach fifos for %u", session->session_index);
       session->session_state = VCL_STATE_DETACHED;
@@ -459,7 +457,7 @@ vcl_session_connected_handler (vcl_worker_t * wrk,
   if (mp->ct_rx_fifo)
     {
       if (vcl_segment_attach_session (mp->ct_segment_handle, mp->ct_rx_fifo,
-                                     mp->ct_tx_fifo, 1, session))
+                                     mp->ct_tx_fifo, (uword) ~0, 1, session))
        {
          VDBG (0, "failed to attach ct fifos for %u", session->session_index);
          session->session_state = VCL_STATE_DETACHED;
@@ -571,12 +569,11 @@ vcl_session_bound_handler (vcl_worker_t * wrk, session_bound_msg_t * mp)
   session->transport.lcl_port = mp->lcl_port;
   vcl_session_table_add_listener (wrk, mp->handle, sid);
   session->session_state = VCL_STATE_LISTEN;
-  session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
 
   if (vcl_session_is_cl (session))
     {
       if (vcl_segment_attach_session (mp->segment_handle, mp->rx_fifo,
-                                     mp->tx_fifo, 0, session))
+                                     mp->tx_fifo, mp->vpp_evt_q, 0, session))
        {
          VDBG (0, "failed to attach fifos for %u", session->session_index);
          session->session_state = VCL_STATE_DETACHED;
@@ -645,7 +642,9 @@ vcl_session_migrated_handler (vcl_worker_t * wrk, void *data)
     }
 
   s->vpp_handle = mp->new_handle;
-  s->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
+
+  vcl_segment_attach_mq (vcl_vpp_worker_segment_handle (0), mp->vpp_evt_q,
+                        mp->vpp_thread_index, &s->vpp_evt_q);
 
   vcl_session_table_del_vpp_handle (wrk, mp->handle);
   vcl_session_table_add_vpp_handle (wrk, mp->new_handle, s->session_index);
@@ -856,7 +855,7 @@ vcl_session_worker_update_reply_handler (vcl_worker_t * wrk, void *data)
   if (s->rx_fifo)
     {
       if (vcl_segment_attach_session (msg->segment_handle, msg->rx_fifo,
-                                     msg->tx_fifo, 0, s))
+                                     msg->tx_fifo, (uword) ~0, 0, s))
        {
          VDBG (0, "failed to attach fifos for %u", s->session_index);
          return;
index 096af1e..9614257 100644 (file)
@@ -370,6 +370,7 @@ typedef struct session_accepted_msg_
   uword server_tx_fifo;
   u64 segment_handle;
   uword vpp_event_queue_address;
+  u32 mq_index;
   transport_endpoint_t rmt;
   u8 flags;
 } __clib_packed session_accepted_msg_t;
index 7683760..eca199f 100644 (file)
@@ -875,7 +875,6 @@ segment_manager_alloc_queue (fifo_segment_t * segment,
   u32 fifo_evt_size, session_evt_size = 256, notif_q_size;
   svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
   svm_msg_q_t *q;
-  void *base;
 
   fifo_evt_size = sizeof (session_event_t);
   notif_q_size = clib_max (16, props->evt_q_size >> 4);
@@ -890,8 +889,7 @@ segment_manager_alloc_queue (fifo_segment_t * segment,
   cfg->q_nitems = props->evt_q_size;
   cfg->ring_cfgs = rc;
 
-  base = fifo_segment_alloc (segment, svm_msg_q_size_to_alloc (cfg));
-  q = svm_msg_q_init (base, cfg);
+  q = fifo_segment_msg_q_alloc (segment, 0, cfg);
 
   if (props->use_mq_eventfd)
     {
index 259e212..9a4d29b 100644 (file)
@@ -1508,7 +1508,6 @@ session_vpp_event_queues_allocate (session_main_t * smm)
   fifo_segment_t *eqs = &smm->evt_qs_segment;
   uword eqs_size = 64 << 20;
   pid_t vpp_pid = getpid ();
-  void *base;
   int i;
 
   if (smm->configured_event_queue_length)
@@ -1531,6 +1530,9 @@ session_vpp_event_queues_allocate (session_main_t * smm)
 
   fifo_segment_init (eqs);
 
+  /* Special fifo segment that's filled only with mqs */
+  eqs->h->n_mqs = vec_len (smm->wrk);
+
   for (i = 0; i < vec_len (smm->wrk); i++)
     {
       svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
@@ -1544,8 +1546,7 @@ session_vpp_event_queues_allocate (session_main_t * smm)
       cfg->q_nitems = evt_q_length;
       cfg->ring_cfgs = rc;
 
-      base = fifo_segment_alloc (eqs, svm_msg_q_size_to_alloc (cfg));
-      smm->wrk[i].vpp_event_queue = svm_msg_q_init (base, cfg);
+      smm->wrk[i].vpp_event_queue = fifo_segment_msg_q_alloc (eqs, i, cfg);
 
       if (svm_msg_q_alloc_consumer_eventfd (smm->wrk[i].vpp_event_queue))
        clib_warning ("eventfd returned");
index 8fe1acf..4602a78 100644 (file)
@@ -130,7 +130,8 @@ mq_send_session_accepted_cb (session_t * s)
 {
   app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
   svm_msg_q_msg_t _msg, *msg = &_msg;
-  svm_msg_q_t *vpp_queue, *app_mq;
+  svm_msg_q_t *app_mq;
+  fifo_segment_t *eq_seg;
   session_t *listener;
   session_accepted_msg_t *mp;
   session_event_t *evt;
@@ -152,6 +153,8 @@ mq_send_session_accepted_cb (session_t * s)
   mp->segment_handle = session_segment_handle (s);
   mp->flags = s->flags;
 
+  eq_seg = session_main_get_evt_q_segment ();
+
   if (session_has_transport (s))
     {
       listener = listen_session_get_from_handle (s->listener_handle);
@@ -164,8 +167,9 @@ mq_send_session_accepted_cb (session_t * s)
          if (listener)
            mp->listener_handle = listen_session_get_handle (listener);
        }
-      vpp_queue = session_main_get_vpp_event_queue (s->thread_index);
-      mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
+      mp->vpp_event_queue_address =
+       fifo_segment_msg_q_offset (eq_seg, s->thread_index);
+      mp->mq_index = s->thread_index;
       mp->handle = session_handle (s);
 
       session_get_endpoint (s, &mp->rmt, 0 /* is_lcl */ );
@@ -180,8 +184,9 @@ mq_send_session_accepted_cb (session_t * s)
       mp->rmt.is_ip4 = session_type_is_ip4 (listener->session_type);
       mp->rmt.port = ct->c_rmt_port;
       mp->handle = session_handle (s);
-      vpp_queue = session_main_get_vpp_event_queue (s->thread_index);
-      mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
+      mp->vpp_event_queue_address =
+       fifo_segment_msg_q_offset (eq_seg, s->thread_index);
+      mp->mq_index = s->thread_index;
     }
   svm_msg_q_add_and_unlock (app_mq, msg);
 
@@ -262,8 +267,9 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
 {
   svm_msg_q_msg_t _msg, *msg = &_msg;
   session_connected_msg_t *mp;
-  svm_msg_q_t *vpp_mq, *app_mq;
+  svm_msg_q_t *app_mq;
   transport_connection_t *tc;
+  fifo_segment_t *eq_seg;
   app_worker_t *app_wrk;
   session_event_t *evt;
 
@@ -289,6 +295,8 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
   if (err)
     goto done;
 
+  eq_seg = session_main_get_evt_q_segment ();
+
   if (session_has_transport (s))
     {
       tc = session_get_transport (s);
@@ -299,9 +307,9 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
          goto done;
        }
 
-      vpp_mq = session_main_get_vpp_event_queue (s->thread_index);
       mp->handle = session_handle (s);
-      mp->vpp_event_queue_address = pointer_to_uword (vpp_mq);
+      mp->vpp_event_queue_address =
+       fifo_segment_msg_q_offset (eq_seg, s->thread_index);
 
       session_get_endpoint (s, &mp->lcl, 1 /* is_lcl */ );
 
@@ -318,8 +326,8 @@ mq_send_session_connected_cb (u32 app_wrk_index, u32 api_context,
       mp->handle = session_handle (s);
       mp->lcl.port = cct->c_lcl_port;
       mp->lcl.is_ip4 = cct->c_is_ip4;
-      vpp_mq = session_main_get_vpp_event_queue (s->thread_index);
-      mp->vpp_event_queue_address = pointer_to_uword (vpp_mq);
+      mp->vpp_event_queue_address =
+       fifo_segment_msg_q_offset (eq_seg, s->thread_index);
       mp->server_rx_fifo = pointer_to_uword (s->rx_fifo->shr);
       mp->server_tx_fifo = pointer_to_uword (s->tx_fifo->shr);
       mp->segment_handle = session_segment_handle (s);
@@ -341,9 +349,10 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
                          session_handle_t handle, int rv)
 {
   svm_msg_q_msg_t _msg, *msg = &_msg;
-  svm_msg_q_t *app_mq, *vpp_evt_q;
+  svm_msg_q_t *app_mq;
   transport_endpoint_t tep;
   session_bound_msg_t *mp;
+  fifo_segment_t *eq_seg;
   app_worker_t *app_wrk;
   session_event_t *evt;
   app_listener_t *al;
@@ -381,8 +390,8 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
   mp->lcl_is_ip4 = tep.is_ip4;
   clib_memcpy_fast (mp->lcl_ip, &tep.ip, sizeof (tep.ip));
 
-  vpp_evt_q = session_main_get_vpp_event_queue (0);
-  mp->vpp_evt_q = pointer_to_uword (vpp_evt_q);
+  eq_seg = session_main_get_evt_q_segment ();
+  mp->vpp_evt_q = fifo_segment_msg_q_offset (eq_seg, ls->thread_index);
 
   if (session_transport_service_type (ls) == TRANSPORT_SERVICE_CL)
     {
@@ -425,10 +434,14 @@ mq_send_session_migrate_cb (session_t * s, session_handle_t new_sh)
 {
   svm_msg_q_msg_t _msg, *msg = &_msg;
   session_migrated_msg_t *mp;
-  svm_msg_q_t *vpp_evt_q;
+  fifo_segment_t *eq_seg;
   app_worker_t *app_wrk;
   session_event_t *evt;
   svm_msg_q_t *app_mq;
+  u32 thread_index;
+
+  thread_index = session_thread_from_handle (new_sh);
+  eq_seg = session_main_get_evt_q_segment ();
 
   app_wrk = app_worker_get (s->app_wrk_index);
   app_mq = app_wrk->event_queue;
@@ -441,9 +454,8 @@ mq_send_session_migrate_cb (session_t * s, session_handle_t new_sh)
   mp = (session_migrated_msg_t *) evt->data;
   mp->handle = session_handle (s);
   mp->new_handle = new_sh;
-  mp->vpp_thread_index = session_thread_from_handle (new_sh);
-  vpp_evt_q = session_main_get_vpp_event_queue (mp->vpp_thread_index);
-  mp->vpp_evt_q = pointer_to_uword (vpp_evt_q);
+  mp->vpp_thread_index = thread_index;
+  mp->vpp_evt_q = fifo_segment_msg_q_offset (eq_seg, thread_index);
   mp->segment_handle = session_segment_handle (s);
   svm_msg_q_add_and_unlock (app_mq, msg);
 }
@@ -601,12 +613,10 @@ vl_api_app_attach_t_handler (vl_api_app_attach_t * mp)
 {
   int rv = 0, fds[SESSION_N_FD_TYPE], n_fds = 0;
   vl_api_app_attach_reply_t *rmp;
-  ssvm_private_t *segp;
+  fifo_segment_t *segp, *evt_q_segment = 0;
   vnet_app_attach_args_t _a, *a = &_a;
-  fifo_segment_t *evt_q_segment;
   u8 fd_flags = 0, ctrl_thread;
   vl_api_registration_t *reg;
-  svm_msg_q_t *ctrl_mq;
 
   reg = vl_api_client_index_to_registration (mp->client_index);
   if (!reg)
@@ -669,19 +679,19 @@ done:
     if (!rv)
       {
        ctrl_thread = vlib_num_workers () ? 1 : 0;
-       ctrl_mq = session_main_get_vpp_event_queue (ctrl_thread);
-       segp = a->segment;
+       segp = (fifo_segment_t *) a->segment;
        rmp->app_index = clib_host_to_net_u32 (a->app_index);
-       rmp->app_mq = pointer_to_uword (a->app_evt_q);
-       rmp->vpp_ctrl_mq = pointer_to_uword (ctrl_mq);
+       rmp->app_mq = fifo_segment_msg_q_offset (segp, 0);
+       rmp->vpp_ctrl_mq =
+         fifo_segment_msg_q_offset (evt_q_segment, ctrl_thread);
        rmp->vpp_ctrl_mq_thread = ctrl_thread;
        rmp->n_fds = n_fds;
        rmp->fd_flags = fd_flags;
-       if (vec_len (segp->name))
+       if (vec_len (segp->ssvm.name))
          {
-           vl_api_vec_to_api_string (segp->name, &rmp->segment_name);
+           vl_api_vec_to_api_string (segp->ssvm.name, &rmp->segment_name);
          }
-       rmp->segment_size = segp->ssvm_size;
+       rmp->segment_size = segp->ssvm.ssvm_size;
        rmp->segment_handle = clib_host_to_net_u64 (a->segment_handle);
       }
   }));
@@ -755,13 +765,14 @@ done:
     rmp->segment_handle = clib_host_to_net_u64 (args.segment_handle);
     if (!rv && mp->is_add)
       {
+       rmp->app_event_queue_address =
+         fifo_segment_msg_q_offset ((fifo_segment_t *) args.segment, 0);
+       rmp->n_fds = n_fds;
+       rmp->fd_flags = fd_flags;
        if (vec_len (args.segment->name))
          {
            vl_api_vec_to_api_string (args.segment->name, &rmp->segment_name);
          }
-       rmp->app_event_queue_address = pointer_to_uword (args.evt_q);
-       rmp->n_fds = n_fds;
-       rmp->fd_flags = fd_flags;
       }
   }));
   /* *INDENT-ON* */
@@ -1341,7 +1352,6 @@ session_api_attach_handler (app_namespace_t * app_ns, clib_socket_t * cs,
   app_ns_api_handle_t *handle;
   app_sapi_msg_t msg = { 0 };
   app_worker_t *app_wrk;
-  svm_msg_q_t *ctrl_mq;
   application_t *app;
 
   /* Make sure name is null terminated */
@@ -1390,10 +1400,11 @@ done:
   if (!rv)
     {
       ctrl_thread = vlib_num_workers ()? 1 : 0;
-      ctrl_mq = session_main_get_vpp_event_queue (ctrl_thread);
       rmp->app_index = a->app_index;
-      rmp->app_mq = pointer_to_uword (a->app_evt_q);
-      rmp->vpp_ctrl_mq = pointer_to_uword (ctrl_mq);
+      rmp->app_mq =
+       fifo_segment_msg_q_offset ((fifo_segment_t *) a->segment, 0);
+      rmp->vpp_ctrl_mq =
+       fifo_segment_msg_q_offset (evt_q_segment, ctrl_thread);
       rmp->vpp_ctrl_mq_thread = ctrl_thread;
       rmp->n_fds = n_fds;
       rmp->fd_flags = fd_flags;
@@ -1502,7 +1513,8 @@ done:
   if (!rv && mp->is_add)
     {
       /* No segment name and size. This supports only memfds */
-      rmp->app_event_queue_address = pointer_to_uword (args.evt_q);
+      rmp->app_event_queue_address =
+       fifo_segment_msg_q_offset ((fifo_segment_t *) args.segment, 0);
       rmp->n_fds = n_fds;
       rmp->fd_flags = fd_flags;
 
index c2718f3..cd4198c 100644 (file)
@@ -120,7 +120,6 @@ session_debug_init (void)
 void
 dump_thread_0_event_queue (void)
 {
-  session_main_t *smm = vnet_get_session_main ();
   vlib_main_t *vm = &vlib_global_main;
   u32 my_thread_index = vm->thread_index;
   session_event_t _e, *e = &_e;
@@ -130,7 +129,7 @@ dump_thread_0_event_queue (void)
   svm_msg_q_t *mq;
   int i, index;
 
-  mq = smm->wrk[my_thread_index].vpp_event_queue;
+  mq = session_main_get_vpp_event_queue (my_thread_index);
   index = mq->q->head;
 
   for (i = 0; i < mq->q->cursize; i++)
index 2fde85b..49e4e5a 100644 (file)
@@ -472,8 +472,8 @@ session_mq_worker_update_handler (void *data)
   evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
   rmp = (session_worker_update_reply_msg_t *) evt->data;
   rmp->handle = mp->handle;
-  rmp->rx_fifo = pointer_to_uword (s->rx_fifo);
-  rmp->tx_fifo = pointer_to_uword (s->tx_fifo);
+  rmp->rx_fifo = pointer_to_uword (s->rx_fifo->shr);
+  rmp->tx_fifo = pointer_to_uword (s->tx_fifo->shr);
   rmp->segment_handle = session_segment_handle (s);
   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);