session: use msg queue for events 58/13358/24
authorFlorin Coras <fcoras@cisco.com>
Wed, 4 Jul 2018 11:15:05 +0000 (04:15 -0700)
committerDamjan Marion <dmarion@me.com>
Tue, 17 Jul 2018 09:02:17 +0000 (09:02 +0000)
Change-Id: I3c58367eec2243fe19b75be78a175c5261863e9e
Signed-off-by: Florin Coras <fcoras@cisco.com>
27 files changed:
src/svm/message_queue.c
src/svm/message_queue.h
src/svm/queue.c
src/svm/queue.h
src/svm/svm_fifo.h
src/svm/test_svm_message_queue.c
src/tests/vnet/session/tcp_echo.c
src/tests/vnet/session/udp_echo.c
src/vcl/vcl_bapi.c
src/vcl/vcl_private.h
src/vcl/vppcom.c
src/vnet/session-apps/echo_client.c
src/vnet/session-apps/echo_client.h
src/vnet/session-apps/echo_server.c
src/vnet/session-apps/http_server.c
src/vnet/session-apps/proxy.c
src/vnet/session-apps/proxy.h
src/vnet/session/application.c
src/vnet/session/application.h
src/vnet/session/application_interface.h
src/vnet/session/segment_manager.c
src/vnet/session/segment_manager.h
src/vnet/session/session.c
src/vnet/session/session.h
src/vnet/session/session_api.c
src/vnet/session/session_node.c
src/vnet/tls/tls.c

index 4f3e764..8941114 100644 (file)
 #include <svm/message_queue.h>
 #include <vppinfra/mem.h>
 
+static inline svm_msg_q_ring_t *
+svm_msg_q_ring_inline (svm_msg_q_t * mq, u32 ring_index)
+{
+  return vec_elt_at_index (mq->rings, ring_index);
+}
+
+svm_msg_q_ring_t *
+svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index)
+{
+  return svm_msg_q_ring_inline (mq, ring_index);
+}
+
+static inline void *
+svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index)
+{
+  ASSERT (elt_index < ring->nitems);
+  return (ring->data + elt_index * ring->elsize);
+}
+
 svm_msg_q_t *
 svm_msg_q_alloc (svm_msg_q_cfg_t * cfg)
 {
@@ -62,6 +81,53 @@ svm_msg_q_free (svm_msg_q_t * mq)
   clib_mem_free (mq);
 }
 
+svm_msg_q_msg_t
+svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index)
+{
+  svm_msg_q_msg_t msg = {.as_u64 = ~0 };
+  svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index);
+
+  ASSERT (ring->cursize != ring->nitems);
+  msg.ring_index = ring - mq->rings;
+  msg.elt_index = ring->tail;
+  ring->tail = (ring->tail + 1) % ring->nitems;
+  __sync_fetch_and_add (&ring->cursize, 1);
+  return msg;
+}
+
+int
+svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
+                                    u8 noblock, svm_msg_q_msg_t * msg)
+{
+  if (noblock)
+    {
+      if (svm_msg_q_try_lock (mq))
+       return -1;
+      if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, ring_index)))
+       {
+         svm_msg_q_unlock (mq);
+         return -2;
+       }
+      *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
+      if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (msg)))
+       {
+         svm_msg_q_unlock (mq);
+         return -2;
+       }
+    }
+  else
+    {
+      svm_msg_q_lock (mq);
+      *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
+      while (svm_msg_q_msg_is_invalid (msg))
+       {
+         svm_msg_q_wait (mq);
+         *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
+       }
+    }
+  return 0;
+}
+
 svm_msg_q_msg_t
 svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes)
 {
@@ -81,23 +147,10 @@ svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes)
   return msg;
 }
 
-static inline svm_msg_q_ring_t *
-svm_msg_q_get_ring (svm_msg_q_t * mq, u32 ring_index)
-{
-  return vec_elt_at_index (mq->rings, ring_index);
-}
-
-static inline void *
-svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index)
-{
-  ASSERT (elt_index < ring->nitems);
-  return (ring->data + elt_index * ring->elsize);
-}
-
 void *
 svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
 {
-  svm_msg_q_ring_t *ring = svm_msg_q_get_ring (mq, msg->ring_index);
+  svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, msg->ring_index);
   return svm_msg_q_ring_data (ring, msg->elt_index);
 }
 
@@ -131,7 +184,7 @@ svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
     return 0;
   ring = &mq->rings[msg->ring_index];
 
-  dist1 = ((ring->nitems + msg->ring_index) - ring->head) % ring->nitems;
+  dist1 = ((ring->nitems + msg->elt_index) - ring->head) % ring->nitems;
   if (ring->tail == ring->head)
     dist2 = (ring->cursize == 0) ? 0 : ring->nitems;
   else
@@ -140,10 +193,17 @@ svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
 }
 
 int
-svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t msg, int nowait)
+svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait)
+{
+  ASSERT (svm_msq_q_msg_is_valid (mq, msg));
+  return svm_queue_add (mq->q, (u8 *) msg, nowait);
+}
+
+void
+svm_msg_q_add_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
 {
-  ASSERT (svm_msq_q_msg_is_valid (mq, &msg));
-  return svm_queue_add (mq->q, (u8 *) & msg, nowait);
+  ASSERT (svm_msq_q_msg_is_valid (mq, msg));
+  svm_queue_add_raw (mq->q, (u8 *) msg);
 }
 
 int
@@ -153,6 +213,12 @@ svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
   return svm_queue_sub (mq->q, (u8 *) msg, cond, time);
 }
 
+void
+svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
+{
+  svm_queue_sub_raw (mq->q, (u8 *) msg);
+}
+
 /*
  * fd.io coding-style-patch-verification: ON
  *
index 5ec8547..708a03d 100644 (file)
 #define SRC_SVM_MESSAGE_QUEUE_H_
 
 #include <vppinfra/clib.h>
+#include <vppinfra/error.h>
 #include <svm/queue.h>
 
 typedef struct svm_msg_q_ring_
 {
   volatile u32 cursize;                        /**< current size of the ring */
   u32 nitems;                          /**< max size of the ring */
-  u32 head;                            /**< current head (for dequeue) */
-  u32 tail;                            /**< current tail (for enqueue) */
+  volatile u32 head;                   /**< current head (for dequeue) */
+  volatile u32 tail;                   /**< current tail (for enqueue) */
   u32 elsize;                          /**< size of an element */
   u8 *data;                            /**< chunk of memory for msg data */
 } svm_msg_q_ring_t;
@@ -64,6 +65,7 @@ typedef union
   u64 as_u64;
 } svm_msg_q_msg_t;
 
+#define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
 /**
  * Allocate message queue
  *
@@ -97,6 +99,36 @@ void svm_msg_q_free (svm_msg_q_t * mq);
  */
 svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes);
 
+/**
+ * Allocate message buffer on ring
+ *
+ * Message is allocated, on requested ring. The caller MUST check that
+ * the ring is not full.
+ *
+ * @param mq           message queue
+ * @param ring_index   ring on which the allocation should occur
+ * @return             message structure pointing to the ring and position
+ *                     allocated
+ */
+svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index);
+
+/**
+ * Lock message queue and allocate message buffer on ring
+ *
+ * This should be used when multiple writers/readers are expected to
+ * compete for the rings/queue. Message should be enqueued by calling
+ * @ref svm_msg_q_add_w_lock and the caller MUST unlock the queue once
+ * the message in enqueued.
+ *
+ * @param mq           message queue
+ * @param ring_index   ring on which the allocation should occur
+ * @param noblock      flag that indicates if request should block
+ * @param msg          pointer to message to be filled in
+ * @return             0 on success, negative number otherwise
+ */
+int svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
+                                        u8 noblock, svm_msg_q_msg_t * msg);
+
 /**
  * Free message buffer
  *
@@ -106,6 +138,7 @@ svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes);
  * @param msg          message to be freed
  */
 void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
+
 /**
  * Producer enqueue one message to queue
  *
@@ -117,7 +150,20 @@ void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
  * @param nowait       flag to indicate if request is blocking or not
  * @return             success status
  */
-int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t msg, int nowait);
+int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait);
+
+/**
+ * Producer enqueue one message to queue with mutex held
+ *
+ * Prior to calling this, the producer should've obtained a message buffer
+ * from one of the rings by calling @ref svm_msg_q_alloc_msg. It assumes
+ * the queue mutex is held.
+ *
+ * @param mq           message queue
+ * @param msg          message (pointer to ring position) to be enqueued
+ * @return             success status
+ */
+void svm_msg_q_add_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
 
 /**
  * Consumer dequeue one message from queue
@@ -129,13 +175,28 @@ int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t msg, int nowait);
  * @param mq           message queue
  * @param msg          pointer to structure where message is to be received
  * @param cond         flag that indicates if request should block or not
+ * @param time         time to wait if condition it SVM_Q_TIMEDWAIT
  * @return             success status
  */
 int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
                   svm_q_conditional_wait_t cond, u32 time);
 
 /**
- * Get data for message in queu
+ * Consumer dequeue one message from queue with mutex held
+ *
+ * Returns the message pointing to the data in the message rings under the
+ * assumption that the message queue lock is already held. The consumer is
+ * expected to call @ref svm_msg_q_free_msg once it finishes
+ * processing/copies the message data.
+ *
+ * @param mq           message queue
+ * @param msg          pointer to structure where message is to be received
+ * @return             success status
+ */
+void svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
+
+/**
+ * Get data for message in queue
  *
  * @param mq           message queue
  * @param msg          message for which the data is requested
@@ -143,6 +204,94 @@ int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
  */
 void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
 
+/**
+ * Get message queue ring
+ *
+ * @param mq           message queue
+ * @param ring_index   index of ring
+ * @return             pointer to ring
+ */
+svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
+
+/**
+ * Check if message queue is full
+ */
+static inline u8
+svm_msg_q_is_full (svm_msg_q_t * mq)
+{
+  return (mq->q->cursize == mq->q->maxsize);
+}
+
+static inline u8
+svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
+{
+  ASSERT (ring_index < vec_len (mq->rings));
+  return (mq->rings[ring_index].cursize == mq->rings[ring_index].nitems);
+}
+
+/**
+ * Check if message queue is empty
+ */
+static inline u8
+svm_msg_q_is_empty (svm_msg_q_t * mq)
+{
+  return (mq->q->cursize == 0);
+}
+
+/**
+ * Check length of message queue
+ */
+static inline u32
+svm_msg_q_size (svm_msg_q_t * mq)
+{
+  return mq->q->cursize;
+}
+
+/**
+ * Check if message is invalid
+ */
+static inline u8
+svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
+{
+  return (msg->as_u64 == (u64) ~ 0);
+}
+
+/**
+ * Try locking message queue
+ */
+static inline int
+svm_msg_q_try_lock (svm_msg_q_t * mq)
+{
+  return pthread_mutex_trylock (&mq->q->mutex);
+}
+
+/**
+ * Lock, or block trying, the message queue
+ */
+static inline int
+svm_msg_q_lock (svm_msg_q_t * mq)
+{
+  return pthread_mutex_lock (&mq->q->mutex);
+}
+
+static inline void
+svm_msg_q_wait (svm_msg_q_t * mq)
+{
+  pthread_cond_wait (&mq->q->condvar, &mq->q->mutex);
+}
+
+/**
+ * Unlock message queue
+ */
+static inline void
+svm_msg_q_unlock (svm_msg_q_t * mq)
+{
+  /* The other side of the connection is not polling */
+  if (mq->q->cursize < (mq->q->maxsize / 8))
+    (void) pthread_cond_broadcast (&mq->q->condvar);
+  pthread_mutex_unlock (&mq->q->mutex);
+}
+
 #endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
 
 /*
index 96e40fc..8e18f58 100644 (file)
@@ -154,26 +154,16 @@ svm_queue_add_nolock (svm_queue_t * q, u8 * elem)
   return 0;
 }
 
-int
+void
 svm_queue_add_raw (svm_queue_t * q, u8 * elem)
 {
   i8 *tailp;
 
-  if (PREDICT_FALSE (q->cursize == q->maxsize))
-    {
-      while (q->cursize == q->maxsize)
-       ;
-    }
-
   tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
   clib_memcpy (tailp, elem, q->elsize);
 
-  q->tail++;
+  q->tail = (q->tail + 1) % q->maxsize;
   q->cursize++;
-
-  if (q->tail == q->maxsize)
-    q->tail = 0;
-  return 0;
 }
 
 
@@ -414,11 +404,9 @@ svm_queue_sub_raw (svm_queue_t * q, u8 * elem)
   headp = (i8 *) (&q->data[0] + q->elsize * q->head);
   clib_memcpy (elem, headp, q->elsize);
 
-  q->head++;
+  q->head = (q->head + 1) % q->maxsize;
   q->cursize--;
 
-  if (q->head == q->maxsize)
-    q->head = 0;
   return 0;
 }
 
index 856c172..68a63d7 100644 (file)
@@ -69,7 +69,13 @@ void svm_queue_unlock (svm_queue_t * q);
 int svm_queue_is_full (svm_queue_t * q);
 int svm_queue_add_nolock (svm_queue_t * q, u8 * elem);
 int svm_queue_sub_raw (svm_queue_t * q, u8 * elem);
-int svm_queue_add_raw (svm_queue_t * q, u8 * elem);
+
+/**
+ * Add element to queue with mutex held
+ * @param q            queue
+ * @param elem         pointer element data to add
+ */
+void svm_queue_add_raw (svm_queue_t * q, u8 * elem);
 
 /*
  * DEPRECATED please use svm_queue_t instead
index 0d5a08b..4018290 100644 (file)
@@ -107,6 +107,12 @@ svm_fifo_max_dequeue (svm_fifo_t * f)
   return f->cursize;
 }
 
+static inline int
+svm_fifo_is_full (svm_fifo_t * f)
+{
+  return (f->cursize == f->nitems);
+}
+
 static inline u32
 svm_fifo_max_enqueue (svm_fifo_t * f)
 {
index 69ffd13..758163f 100644 (file)
@@ -88,9 +88,9 @@ test1 (int verbose)
       test1_error ("failed: msg alloc3");
 
   *(u32 *)svm_msg_q_msg_data (mq, &msg2) = 123;
-  svm_msg_q_add (mq, msg2, SVM_Q_NOWAIT);
+  svm_msg_q_add (mq, &msg2, SVM_Q_NOWAIT);
   for (i = 0; i < 12; i++)
-    svm_msg_q_add (mq, msg[i], SVM_Q_NOWAIT);
+    svm_msg_q_add (mq, &msg[i], SVM_Q_NOWAIT);
 
   if (svm_msg_q_sub (mq, &msg2, SVM_Q_NOWAIT, 0))
     test1_error ("failed: dequeue1");
index 59314f9..f8b75d9 100644 (file)
@@ -48,7 +48,7 @@ typedef struct
   svm_fifo_t *server_rx_fifo;
   svm_fifo_t *server_tx_fifo;
 
-  svm_queue_t *vpp_evt_q;
+  svm_msg_q_t *vpp_evt_q;
 
   u64 vpp_session_handle;
   u64 bytes_sent;
@@ -99,7 +99,7 @@ typedef struct
   int no_return;
 
   /* Our event queue */
-  svm_queue_t *our_event_queue;
+  svm_msg_q_t *our_event_queue;
 
   /* $$$ single thread only for the moment */
   svm_queue_t *vpp_event_queue;
@@ -426,7 +426,7 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
 
   ASSERT (mp->app_event_queue_address);
   em->our_event_queue = uword_to_pointer (mp->app_event_queue_address,
-                                         svm_queue_t *);
+                                         svm_msg_q_t *);
   em->state = STATE_ATTACHED;
 }
 
@@ -677,7 +677,6 @@ send_test_chunk (echo_main_t * em, session_t * s)
   svm_fifo_t *tx_fifo = s->server_tx_fifo;
   u8 *test_data = em->connect_test_data;
   u32 enq_space, min_chunk = 16 << 10;
-  session_fifo_event_t evt;
   int written;
 
   test_buf_len = vec_len (test_data);
@@ -698,12 +697,8 @@ send_test_chunk (echo_main_t * em, session_t * s)
       s->bytes_sent += written;
 
       if (svm_fifo_set_event (tx_fifo))
-       {
-         /* Fabricate TX event, send to vpp */
-         evt.fifo = tx_fifo;
-         evt.event_type = FIFO_EVENT_APP_TX;
-         svm_queue_add (s->vpp_evt_q, (u8 *) & evt, 0 /* wait for mutex */ );
-       }
+       app_send_io_evt_to_vpp (s->vpp_evt_q, tx_fifo, FIFO_EVENT_APP_TX,
+                               0 /* do wait for mutex */ );
     }
 }
 
@@ -751,6 +746,7 @@ client_rx_thread_fn (void *arg)
   session_fifo_event_t _e, *e = &_e;
   echo_main_t *em = &echo_main;
   static u8 *rx_buf = 0;
+  svm_msg_q_msg_t msg;
 
   vec_validate (rx_buf, 1 << 20);
 
@@ -759,7 +755,8 @@ client_rx_thread_fn (void *arg)
 
   while (!em->time_to_stop)
     {
-      svm_queue_sub (em->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0);
+      svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_WAIT, 0);
+      e = svm_msg_q_msg_data (em->our_event_queue, &msg);
       switch (e->event_type)
        {
        case FIFO_EVENT_APP_RX:
@@ -769,6 +766,7 @@ client_rx_thread_fn (void *arg)
          clib_warning ("unknown event type %d", e->event_type);
          break;
        }
+      svm_msg_q_free_msg (em->our_event_queue, &msg);
     }
   pthread_exit (0);
 }
@@ -808,7 +806,7 @@ vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
   session->vpp_session_handle = mp->handle;
   session->start = clib_time_now (&em->clib_time);
   session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
-                                        svm_queue_t *);
+                                        svm_msg_q_t *);
 
   hash_set (em->session_index_by_vpp_handles, mp->handle, session_index);
 
@@ -869,8 +867,8 @@ client_disconnect (echo_main_t * em, session_t * s)
 static void
 clients_run (echo_main_t * em)
 {
-  session_fifo_event_t _e, *e = &_e;
   f64 start_time, deltat, timeout = 100.0;
+  svm_msg_q_msg_t msg;
   session_t *s;
   int i;
 
@@ -918,9 +916,15 @@ clients_run (echo_main_t * em)
   start_time = clib_time_now (&em->clib_time);
   em->state = STATE_READY;
   while (em->n_active_clients)
-    if (em->our_event_queue->cursize)
-      svm_queue_sub (em->our_event_queue, (u8 *) e, SVM_Q_NOWAIT, 0);
-
+    if (!svm_msg_q_is_empty (em->our_event_queue))
+      {
+       if (svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_WAIT, 0))
+         {
+           clib_warning ("svm msg q returned");
+         }
+       else
+         svm_msg_q_free_msg (em->our_event_queue, &msg);
+      }
 
   for (i = 0; i < em->n_clients; i++)
     {
@@ -1180,10 +1184,12 @@ void
 server_handle_event_queue (echo_main_t * em)
 {
   session_fifo_event_t _e, *e = &_e;
+  svm_msg_q_msg_t msg;
 
   while (1)
     {
-      svm_queue_sub (em->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0);
+      svm_msg_q_sub (em->our_event_queue, &msg, SVM_Q_WAIT, 0);
+      e = svm_msg_q_msg_data (em->our_event_queue, &msg);
       switch (e->event_type)
        {
        case FIFO_EVENT_APP_RX:
@@ -1191,6 +1197,7 @@ server_handle_event_queue (echo_main_t * em)
          break;
 
        case FIFO_EVENT_DISCONNECT:
+         svm_msg_q_free_msg (em->our_event_queue, &msg);
          return;
 
        default:
@@ -1204,6 +1211,7 @@ server_handle_event_queue (echo_main_t * em)
          em->time_to_print_stats = 0;
          fformat (stdout, "%d connections\n", pool_elts (em->sessions));
        }
+      svm_msg_q_free_msg (em->our_event_queue, &msg);
     }
 }
 
index 54e00b1..d796b6b 100644 (file)
@@ -87,10 +87,10 @@ typedef struct
   u8 is_connected;
 
   /* Our event queue */
-  svm_queue_t *our_event_queue;
+  svm_msg_q_t *our_event_queue;
 
   /* $$$ single thread only for the moment */
-  svm_queue_t *vpp_event_queue;
+  svm_msg_q_t *vpp_event_queue;
 
   /* $$$$ hack: cut-through session index */
   volatile u32 cut_through_session_index;
@@ -369,7 +369,7 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
     }
 
   utm->our_event_queue =
-    uword_to_pointer (mp->app_event_queue_address, svm_queue_t *);
+    uword_to_pointer (mp->app_event_queue_address, svm_msg_q_t *);
 }
 
 static void
@@ -736,7 +736,7 @@ vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp)
               sizeof (ip46_address_t));
   session->transport.is_ip4 = mp->lcl_is_ip4;
   session->transport.lcl_port = mp->lcl_port;
-  session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_queue_t *);
+  session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
 
   utm->state = utm->is_connected ? STATE_BOUND : STATE_READY;
 }
@@ -896,7 +896,7 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
     start_time = clib_time_now (&utm->clib_time);
 
   utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
-                                          svm_queue_t *);
+                                          svm_msg_q_t *);
   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
 
@@ -909,7 +909,7 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
     {
       clib_warning ("cut-through session");
       utm->our_event_queue = uword_to_pointer (mp->server_event_queue_address,
-                                              svm_queue_t *);
+                                              svm_msg_q_t *);
       rx_fifo->master_session_index = session_index;
       tx_fifo->master_session_index = session_index;
       utm->cut_through_session_index = session_index;
@@ -1012,23 +1012,23 @@ vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
   session->rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
   session->tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
   session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
-                                        svm_queue_t *);
+                                        svm_msg_q_t *);
   /* Cut-through case */
   if (mp->client_event_queue_address)
     {
       clib_warning ("cut-through session");
       utm->cut_through_session_index = session - utm->sessions;
       utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
-                                              svm_queue_t *);
+                                              svm_msg_q_t *);
       utm->our_event_queue = uword_to_pointer (mp->client_event_queue_address,
-                                              svm_queue_t *);
+                                              svm_msg_q_t *);
       utm->do_echo = 1;
     }
   else
     {
       utm->connected_session = session - utm->sessions;
       utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
-                                              svm_queue_t *);
+                                              svm_msg_q_t *);
 
       clib_memcpy (&session->transport.lcl_ip, mp->lcl_ip,
                   sizeof (ip46_address_t));
@@ -1134,14 +1134,20 @@ server_handle_fifo_event_rx (udp_echo_main_t * utm, session_fifo_event_t * e)
 void
 server_handle_event_queue (udp_echo_main_t * utm)
 {
-  session_fifo_event_t _e, *e = &_e;
+  session_fifo_event_t *e;
+  svm_msg_q_msg_t msg;
 
   while (utm->state != STATE_READY)
     sleep (5);
 
   while (1)
     {
-      svm_queue_sub (utm->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0);
+      if (svm_msg_q_sub (utm->our_event_queue, &msg, SVM_Q_WAIT, 0))
+       {
+         clib_warning ("svm msg q returned");
+         continue;
+       }
+      e = svm_msg_q_msg_data (utm->our_event_queue, &msg);
       switch (e->event_type)
        {
        case FIFO_EVENT_APP_RX:
@@ -1149,12 +1155,14 @@ server_handle_event_queue (udp_echo_main_t * utm)
          break;
 
        case FIFO_EVENT_DISCONNECT:
-         return;
+         utm->time_to_stop = 1;
+         break;
 
        default:
          clib_warning ("unknown event type %d", e->event_type);
          break;
        }
+      svm_msg_q_free_msg (utm->our_event_queue, &msg);
       if (PREDICT_FALSE (utm->time_to_stop == 1))
        return;
       if (PREDICT_FALSE (utm->time_to_print_stats == 1))
index cc3179d..ca65782 100644 (file)
@@ -99,7 +99,7 @@ vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
     }
 
   vcm->app_event_queue =
-    uword_to_pointer (mp->app_event_queue_address, svm_queue_t *);
+    uword_to_pointer (mp->app_event_queue_address, svm_msg_q_t *);
 
   vcm->app_state = STATE_APP_ATTACHED;
 }
@@ -291,7 +291,7 @@ done:
       VCL_IO_SESSIONS_UNLOCK ();
     }
   session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
-                                        svm_queue_t *);
+                                        svm_msg_q_t *);
 
   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
   rx_fifo->client_session_index = session_index;
@@ -431,7 +431,7 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
   session->rx_fifo = rx_fifo;
   session->tx_fifo = tx_fifo;
   session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
-                                        svm_queue_t *);
+                                        svm_msg_q_t *);
   session->session_state = STATE_ACCEPT;
   session->transport.rmt_port = mp->port;
   session->transport.is_ip4 = mp->is_ip4;
index 4283b6e..aba4839 100644 (file)
@@ -194,7 +194,7 @@ typedef struct vppcom_main_t_
   clib_bitmap_t *ex_bitmap;
 
   /* Our event queue */
-  svm_queue_t *app_event_queue;
+  svm_msg_q_t *app_event_queue;
 
   /* unique segment name counter */
   u32 unique_segment_index;
index 1fd138e..0d077ca 100644 (file)
@@ -1164,16 +1164,19 @@ vppcom_session_read_ready (vcl_session_t * session, u32 session_index)
     }
   rv = ready;
 
-  if (vcm->app_event_queue->cursize &&
-      !pthread_mutex_trylock (&vcm->app_event_queue->mutex))
+  if (!svm_msg_q_is_empty (vcm->app_event_queue) &&
+      !pthread_mutex_trylock (&vcm->app_event_queue->q->mutex))
     {
-      u32 i, n_to_dequeue = vcm->app_event_queue->cursize;
-      session_fifo_event_t e;
+      u32 i, n_to_dequeue = vcm->app_event_queue->q->cursize;
+      svm_msg_q_msg_t msg;
 
       for (i = 0; i < n_to_dequeue; i++)
-       svm_queue_sub_raw (vcm->app_event_queue, (u8 *) & e);
+       {
+         svm_queue_sub_raw (vcm->app_event_queue->q, (u8 *) & msg);
+         svm_msg_q_free_msg (vcm->app_event_queue, &msg);
+       }
 
-      pthread_mutex_unlock (&vcm->app_event_queue->mutex);
+      pthread_mutex_unlock (&vcm->app_event_queue->q->mutex);
     }
 done:
   return rv;
@@ -1184,8 +1187,7 @@ vppcom_session_write (uint32_t session_index, void *buf, size_t n)
 {
   vcl_session_t *session = 0;
   svm_fifo_t *tx_fifo = 0;
-  svm_queue_t *q;
-  session_fifo_event_t evt;
+  svm_msg_q_t *mq;
   session_state_t state;
   int rv, n_write, is_nonblocking;
   u32 poll_et;
@@ -1241,18 +1243,15 @@ vppcom_session_write (uint32_t session_index, void *buf, size_t n)
 
   if ((n_write > 0) && svm_fifo_set_event (tx_fifo))
     {
-      /* Fabricate TX event, send to vpp */
-      evt.fifo = tx_fifo;
-      evt.event_type = FIFO_EVENT_APP_TX;
-
+      /* Send TX event to vpp */
       VCL_SESSION_LOCK_AND_GET (session_index, &session);
-      q = session->vpp_evt_q;
-      ASSERT (q);
-      svm_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ );
+      mq = session->vpp_evt_q;
+      ASSERT (mq);
+      app_send_io_evt_to_vpp (mq, tx_fifo, FIFO_EVENT_APP_TX, SVM_Q_WAIT);
       VCL_SESSION_UNLOCK ();
       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: added FIFO_EVENT_APP_TX "
            "to vpp_event_q %p, n_write %d", getpid (),
-           vpp_handle, session_index, q, n_write);
+           vpp_handle, session_index, mq, n_write);
     }
 
   if (n_write <= 0)
index 6ee91f9..3d1af67 100644 (file)
@@ -62,13 +62,8 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
          svm_fifo_t *f = s->data.tx_fifo;
          rv = clib_min (svm_fifo_max_enqueue (f), bytes_this_chunk);
          svm_fifo_enqueue_nocopy (f, rv);
-         if (svm_fifo_set_event (f))
-           {
-             session_fifo_event_t evt;
-             evt.fifo = f;
-             evt.event_type = FIFO_EVENT_APP_TX;
-             svm_queue_add (s->data.vpp_evt_q, (u8 *) & evt, 0);
-           }
+         session_send_io_evt_to_thread_custom (f, s->thread_index,
+                                               FIFO_EVENT_APP_TX);
        }
       else
        rv = app_send_stream (&s->data, test_data + test_buf_offset,
@@ -98,13 +93,8 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
          hdr.lcl_port = at->lcl_port;
          svm_fifo_enqueue_nowait (f, sizeof (hdr), (u8 *) & hdr);
          svm_fifo_enqueue_nocopy (f, rv);
-         if (svm_fifo_set_event (f))
-           {
-             session_fifo_event_t evt;
-             evt.fifo = f;
-             evt.event_type = FIFO_EVENT_APP_TX;
-             svm_queue_add (s->data.vpp_evt_q, (u8 *) & evt, 0);
-           }
+         session_send_io_evt_to_thread_custom (f, s->thread_index,
+                                               FIFO_EVENT_APP_TX);
        }
       else
        rv = app_send_dgram (&s->data, test_data + test_buf_offset,
@@ -462,18 +452,9 @@ echo_clients_rx_callback (stream_session_t * s)
 
   if (svm_fifo_max_dequeue (s->server_rx_fifo))
     {
-      session_fifo_event_t evt;
-      svm_queue_t *q;
       if (svm_fifo_set_event (s->server_rx_fifo))
-       {
-         evt.fifo = s->server_rx_fifo;
-         evt.event_type = FIFO_EVENT_BUILTIN_RX;
-         q = session_manager_get_vpp_event_queue (s->thread_index);
-         if (PREDICT_FALSE (q->cursize == q->maxsize))
-           clib_warning ("out of event queue space");
-         else if (svm_queue_add (q, (u8 *) & evt, 0))
-           clib_warning ("failed to enqueue self-tap");
-       }
+       session_send_io_evt_to_thread (s->server_rx_fifo,
+                                      FIFO_EVENT_BUILTIN_RX);
     }
   return 0;
 }
index f3fc8d2..db5ba16 100644 (file)
@@ -46,7 +46,7 @@ typedef struct
    * Application setup parameters
    */
   svm_queue_t *vl_input_queue;         /**< vpe input queue */
-  svm_queue_t **vpp_event_queue;
+  svm_msg_q_t **vpp_event_queue;
 
   u32 cli_node_index;                  /**< cli process node index */
   u32 my_client_index;                 /**< loopback API client handle */
index 7d1ae5a..770f4ba 100644 (file)
@@ -23,7 +23,7 @@ typedef struct
   /*
    * Server app parameters
    */
-  svm_queue_t **vpp_queue;
+  svm_msg_q_t **vpp_queue;
   svm_queue_t *vl_input_queue; /**< Sever's event queue */
 
   u32 app_index;               /**< Server app index */
@@ -145,10 +145,8 @@ echo_server_rx_callback (stream_session_t * s)
   int actual_transfer;
   svm_fifo_t *tx_fifo, *rx_fifo;
   echo_server_main_t *esm = &echo_server_main;
-  session_fifo_event_t evt;
   u32 thread_index = vlib_get_thread_index ();
   app_session_transport_t at;
-  svm_queue_t *q;
 
   ASSERT (s->thread_index == thread_index);
 
@@ -170,8 +168,9 @@ echo_server_rx_callback (stream_session_t * s)
       max_dequeue = ph.data_length - ph.data_offset;
       if (!esm->vpp_queue[s->thread_index])
        {
-         q = session_manager_get_vpp_event_queue (s->thread_index);
-         esm->vpp_queue[s->thread_index] = q;
+         svm_msg_q_t *mq;
+         mq = session_manager_get_vpp_event_queue (s->thread_index);
+         esm->vpp_queue[s->thread_index] = mq;
        }
       max_enqueue -= sizeof (session_dgram_hdr_t);
     }
@@ -191,13 +190,7 @@ echo_server_rx_callback (stream_session_t * s)
       /* Program self-tap to retry */
       if (svm_fifo_set_event (rx_fifo))
        {
-         evt.fifo = rx_fifo;
-         evt.event_type = FIFO_EVENT_BUILTIN_RX;
-
-         q = esm->vpp_queue[s->thread_index];
-         if (PREDICT_FALSE (q->cursize == q->maxsize))
-           clib_warning ("out of event queue space");
-         else if (svm_queue_add (q, (u8 *) & evt, 0))
+         if (session_send_io_evt_to_thread (rx_fifo, FIFO_EVENT_BUILTIN_RX))
            clib_warning ("failed to enqueue self-tap");
 
          vec_validate (esm->rx_retries[s->thread_index], s->session_index);
index 9ad1297..ef9f760 100644 (file)
@@ -32,7 +32,7 @@ typedef struct
 typedef struct
 {
   u8 **rx_buf;
-  svm_queue_t **vpp_queue;
+  svm_msg_q_t **vpp_queue;
   u64 byte_index;
 
   uword *handler_by_get_request;
@@ -140,7 +140,6 @@ http_cli_output (uword arg, u8 * buffer, uword buffer_bytes)
 void
 send_data (stream_session_t * s, u8 * data)
 {
-  session_fifo_event_t evt;
   u32 offset, bytes_to_send;
   f64 delay = 10e-3;
   http_server_main_t *hsm = &http_server_main;
@@ -178,14 +177,8 @@ send_data (stream_session_t * s, u8 * data)
          bytes_to_send -= actual_transfer;
 
          if (svm_fifo_set_event (s->server_tx_fifo))
-           {
-             /* Fabricate TX event, send to vpp */
-             evt.fifo = s->server_tx_fifo;
-             evt.event_type = FIFO_EVENT_APP_TX;
-
-             svm_queue_add (hsm->vpp_queue[s->thread_index],
-                            (u8 *) & evt, 0 /* do wait for mutex */ );
-           }
+           session_send_io_evt_to_thread (s->server_tx_fifo,
+                                          FIFO_EVENT_APP_TX);
          delay = 10e-3;
        }
     }
@@ -379,15 +372,7 @@ http_server_rx_callback (stream_session_t * s)
 
   /* Send an RPC request via the thread-0 input node */
   if (vlib_get_thread_index () != 0)
-    {
-      session_fifo_event_t evt;
-      evt.rpc_args.fp = alloc_http_process_callback;
-      evt.rpc_args.arg = args;
-      evt.event_type = FIFO_EVENT_RPC;
-      svm_queue_add
-       (session_manager_get_vpp_event_queue (0 /* main thread */ ),
-        (u8 *) & evt, 0 /* do wait for mutex */ );
-    }
+    session_send_rpc_evt_to_thread (0, alloc_http_process_callback, args);
   else
     alloc_http_process (args);
   return 0;
index 78aa0de..6260ad3 100644 (file)
@@ -194,7 +194,6 @@ proxy_rx_callback (stream_session_t * s)
   int proxy_index;
   uword *p;
   svm_fifo_t *active_open_tx_fifo;
-  session_fifo_event_t evt;
 
   ASSERT (s->thread_index == thread_index);
 
@@ -212,10 +211,9 @@ proxy_rx_callback (stream_session_t * s)
       if (svm_fifo_set_event (active_open_tx_fifo))
        {
          u32 ao_thread_index = active_open_tx_fifo->master_thread_index;
-         evt.fifo = active_open_tx_fifo;
-         evt.event_type = FIFO_EVENT_APP_TX;
-         if (svm_queue_add (pm->active_open_event_queue[ao_thread_index],
-                            (u8 *) & evt, 0 /* do wait for mutex */ ))
+         if (session_send_io_evt_to_thread_custom (active_open_tx_fifo,
+                                                   ao_thread_index,
+                                                   FIFO_EVENT_APP_TX))
            clib_warning ("failed to enqueue tx evt");
        }
     }
@@ -278,7 +276,6 @@ active_open_connected_callback (u32 app_index, u32 opaque,
   proxy_main_t *pm = &proxy_main;
   proxy_session_t *ps;
   u8 thread_index = vlib_get_thread_index ();
-  session_fifo_event_t evt;
 
   if (is_fail)
     {
@@ -320,15 +317,9 @@ active_open_connected_callback (u32 app_index, u32 opaque,
   /*
    * Send event for active open tx fifo
    */
+  ASSERT (s->thread_index == thread_index);
   if (svm_fifo_set_event (s->server_tx_fifo))
-    {
-      evt.fifo = s->server_tx_fifo;
-      evt.event_type = FIFO_EVENT_APP_TX;
-      if (svm_queue_add
-         (pm->active_open_event_queue[thread_index], (u8 *) & evt,
-          0 /* do wait for mutex */ ))
-       clib_warning ("failed to enqueue tx evt");
-    }
+    session_send_io_evt_to_thread (s->server_tx_fifo, FIFO_EVENT_APP_TX);
 
   return 0;
 }
@@ -354,8 +345,6 @@ active_open_disconnect_callback (stream_session_t * s)
 static int
 active_open_rx_callback (stream_session_t * s)
 {
-  proxy_main_t *pm = &proxy_main;
-  session_fifo_event_t evt;
   svm_fifo_t *proxy_tx_fifo;
 
   proxy_tx_fifo = s->server_rx_fifo;
@@ -365,12 +354,10 @@ active_open_rx_callback (stream_session_t * s)
    */
   if (svm_fifo_set_event (proxy_tx_fifo))
     {
-      u32 p_thread_index = proxy_tx_fifo->master_thread_index;
-      evt.fifo = proxy_tx_fifo;
-      evt.event_type = FIFO_EVENT_APP_TX;
-      if (svm_queue_add (pm->server_event_queue[p_thread_index], (u8 *) & evt,
-                        0 /* do wait for mutex */ ))
-       clib_warning ("failed to enqueue server rx evt");
+      u8 thread_index = proxy_tx_fifo->master_thread_index;
+      return session_send_io_evt_to_thread_custom (proxy_tx_fifo,
+                                                  thread_index,
+                                                  FIFO_EVENT_APP_TX);
     }
 
   return 0;
index 4bca0a0..c221a5e 100644 (file)
@@ -40,8 +40,8 @@ typedef struct
 {
   svm_queue_t *vl_input_queue; /**< vpe input queue */
   /** per-thread vectors */
-  svm_queue_t **server_event_queue;
-  svm_queue_t **active_open_event_queue;
+  svm_msg_q_t **server_event_queue;
+  svm_msg_q_t **active_open_event_queue;
   u8 **rx_buf;                         /**< intermediate rx buffers */
 
   u32 cli_node_index;                  /**< cli process node index */
index 60019dd..6041b49 100644 (file)
@@ -807,6 +807,143 @@ application_get_segment_manager_properties (u32 app_index)
   return &app->sm_properties;
 }
 
+static inline int
+app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock)
+{
+  if (PREDICT_TRUE (!svm_msg_q_is_full (mq)))
+    {
+      if (lock)
+       {
+         svm_msg_q_add_w_lock (mq, msg);
+         svm_msg_q_unlock (mq);
+       }
+      else if (svm_msg_q_add (mq, msg, SVM_Q_WAIT))
+       {
+         clib_warning ("msg q add returned");
+         if (lock)
+           svm_msg_q_unlock (mq);
+         return -1;
+       }
+    }
+  else
+    {
+      clib_warning ("evt q full");
+      svm_msg_q_free_msg (mq, msg);
+      if (lock)
+       svm_msg_q_unlock (mq);
+      return -1;
+    }
+  return 0;
+}
+
+static inline int
+app_send_io_evt_rx (application_t * app, stream_session_t * s, u8 lock)
+{
+  session_fifo_event_t *evt;
+  svm_msg_q_msg_t msg;
+  svm_msg_q_t *mq;
+
+  if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
+    {
+      /* Session is closed so app will never clean up. Flush rx fifo */
+      svm_fifo_dequeue_drop_all (s->server_rx_fifo);
+      return 0;
+    }
+
+  /* Built-in app? Hand event to the callback... */
+  if (app->cb_fns.builtin_app_rx_callback)
+    return app->cb_fns.builtin_app_rx_callback (s);
+
+  /* If no need for event, return */
+  if (!svm_fifo_set_event (s->server_rx_fifo))
+    return 0;
+
+  mq = app->event_queue;
+  if (lock)
+    svm_msg_q_lock (mq);
+
+  if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+    {
+      clib_warning ("evt q rings full");
+      if (lock)
+       svm_msg_q_unlock (mq);
+      return -1;
+    }
+
+  msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+  ASSERT (!svm_msg_q_msg_is_invalid (&msg));
+
+  evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+  evt->fifo = s->server_rx_fifo;
+  evt->event_type = FIFO_EVENT_APP_RX;
+
+  return app_enqueue_evt (mq, &msg, lock);
+}
+
+static inline int
+app_send_io_evt_tx (application_t * app, stream_session_t * s, u8 lock)
+{
+  svm_msg_q_t *mq;
+  session_fifo_event_t *evt;
+  svm_msg_q_msg_t msg;
+
+  if (application_is_builtin (app))
+    return 0;
+
+  mq = app->event_queue;
+  if (lock)
+    svm_msg_q_lock (mq);
+
+  if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+    {
+      clib_warning ("evt q rings full");
+      if (lock)
+       svm_msg_q_unlock (mq);
+      return -1;
+    }
+
+  msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+  ASSERT (!svm_msg_q_msg_is_invalid (&msg));
+
+  evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+  evt->event_type = FIFO_EVENT_APP_TX;
+  evt->fifo = s->server_tx_fifo;
+
+  return app_enqueue_evt (mq, &msg, lock);
+}
+
+/* *INDENT-OFF* */
+typedef int (app_send_evt_handler_fn) (application_t *app,
+                                      stream_session_t *s,
+                                      u8 lock);
+static app_send_evt_handler_fn * const app_send_evt_handler_fns[2] = {
+    app_send_io_evt_rx,
+    app_send_io_evt_tx,
+};
+/* *INDENT-ON* */
+
+/**
+ * Send event to application
+ *
+ * Logic from queue perspective is non-blocking. That is, if there's
+ * not enough space to enqueue a message, we return. However, if the lock
+ * flag is set, we do wait for queue mutex.
+ */
+int
+application_send_event (application_t * app, stream_session_t * s,
+                       u8 evt_type)
+{
+  ASSERT (app && evt_type <= FIFO_EVENT_APP_TX);
+  return app_send_evt_handler_fns[evt_type] (app, s, 0 /* lock */ );
+}
+
+int
+application_lock_and_send_event (application_t * app, stream_session_t * s,
+                                u8 evt_type)
+{
+  return app_send_evt_handler_fns[evt_type] (app, s, 1 /* lock */ );
+}
+
 local_session_t *
 application_alloc_local_session (application_t * app)
 {
@@ -949,14 +1086,14 @@ application_local_session_connect (u32 table_index, application_t * client,
   svm_fifo_segment_private_t *seg;
   segment_manager_t *sm;
   local_session_t *ls;
-  svm_queue_t *sq, *cq;
+  svm_msg_q_t *sq, *cq;
 
   ls = application_alloc_local_session (server);
 
   props = application_segment_manager_properties (server);
   cprops = application_segment_manager_properties (client);
   evt_q_elts = props->evt_q_size + cprops->evt_q_size;
-  evt_q_sz = evt_q_elts * sizeof (session_fifo_event_t);
+  evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts);
   seg_size = props->rx_fifo_size + props->tx_fifo_size + evt_q_sz + margin;
 
   has_transport = session_has_transport ((stream_session_t *) ll);
index aad7089..f6c8127 100644 (file)
@@ -72,7 +72,7 @@ typedef struct _application
   u32 ns_index;
 
   /** Application listens for events on this svm queue */
-  svm_queue_t *event_queue;
+  svm_msg_q_t *event_queue;
 
   /*
    * Callbacks: shoulder-taps for the server/client
@@ -207,6 +207,11 @@ int application_local_session_disconnect_w_index (u32 app_index,
                                                  u32 ls_index);
 void application_local_sessions_del (application_t * app);
 
+int application_send_event (application_t * app, stream_session_t * s,
+                           u8 evt);
+int application_lock_and_send_event (application_t * app,
+                                    stream_session_t * s, u8 evt_type);
+
 always_inline u32
 local_session_id (local_session_t * ll)
 {
index 2c17148..50c0434 100644 (file)
@@ -136,7 +136,7 @@ typedef enum
   _(IS_BUILTIN, "Application is builtin")                      \
   _(IS_PROXY, "Application is proxying")                       \
   _(USE_GLOBAL_SCOPE, "App can use global session scope")      \
-  _(USE_LOCAL_SCOPE, "App can use local session scope")
+  _(USE_LOCAL_SCOPE, "App can use local session scope")                \
 
 typedef enum _app_options
 {
@@ -187,7 +187,7 @@ typedef struct app_session_transport_
   _(volatile u8, session_state)                /**< session state */           \
   _(u32, session_index)                        /**< index in owning pool */    \
   _(app_session_transport_t, transport)        /**< transport info */          \
-  _(svm_queue_t, *vpp_evt_q)           /**< vpp event queue  */        \
+  _(svm_msg_q_t, *vpp_evt_q)           /**< vpp event queue  */        \
   _(u8, is_dgram)                      /**< flag for dgram mode */     \
 
 typedef struct
@@ -197,13 +197,73 @@ typedef struct
 #undef _
 } app_session_t;
 
+/**
+ * Send fifo io event to vpp worker thread
+ *
+ * Because there may be multiple writers to one of vpp's queues, this
+ * protects message allocation and enqueueing.
+ *
+ * @param mq           vpp message queue
+ * @param f            fifo for which the event is sent
+ * @param evt_type     type of event
+ * @param noblock      flag to indicate is request is blocking or not
+ * @return             0 if success, negative integer otherwise
+ */
+static inline int
+app_send_io_evt_to_vpp (svm_msg_q_t * mq, svm_fifo_t * f, u8 evt_type,
+                       u8 noblock)
+{
+  session_fifo_event_t *evt;
+  svm_msg_q_msg_t msg;
+
+  if (noblock)
+    {
+      if (svm_msg_q_try_lock (mq))
+       return -1;
+      if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+       {
+         svm_msg_q_unlock (mq);
+         return -2;
+       }
+      msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+      if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg)))
+       {
+         svm_msg_q_unlock (mq);
+         return -2;
+       }
+      evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+      evt->fifo = f;
+      evt->event_type = evt_type;
+      svm_msg_q_add_w_lock (mq, &msg);
+      svm_msg_q_unlock (mq);
+      return 0;
+    }
+  else
+    {
+      svm_msg_q_lock (mq);
+      msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+      while (svm_msg_q_msg_is_invalid (&msg))
+       {
+         svm_msg_q_wait (mq);
+         msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+       }
+      evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+      evt->fifo = f;
+      evt->event_type = evt_type;
+      if (svm_msg_q_is_full (mq))
+       svm_msg_q_wait (mq);
+      svm_msg_q_add_w_lock (mq, &msg);
+      svm_msg_q_unlock (mq);
+      return 0;
+    }
+}
+
 always_inline int
 app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at,
-                   svm_queue_t * vpp_evt_q, u8 * data, u32 len, u8 noblock)
+                   svm_msg_q_t * vpp_evt_q, u8 * data, u32 len, u8 noblock)
 {
   u32 max_enqueue, actual_write;
   session_dgram_hdr_t hdr;
-  session_fifo_event_t evt;
   int rv;
 
   max_enqueue = svm_fifo_max_enqueue (f);
@@ -225,11 +285,7 @@ app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at,
   if ((rv = svm_fifo_enqueue_nowait (f, actual_write, data)) > 0)
     {
       if (svm_fifo_set_event (f))
-       {
-         evt.fifo = f;
-         evt.event_type = FIFO_EVENT_APP_TX;
-         svm_queue_add (vpp_evt_q, (u8 *) & evt, noblock);
-       }
+       app_send_io_evt_to_vpp (vpp_evt_q, f, FIFO_EVENT_APP_TX, noblock);
     }
   ASSERT (rv);
   return rv;
@@ -243,20 +299,15 @@ app_send_dgram (app_session_t * s, u8 * data, u32 len, u8 noblock)
 }
 
 always_inline int
-app_send_stream_raw (svm_fifo_t * f, svm_queue_t * vpp_evt_q, u8 * data,
+app_send_stream_raw (svm_fifo_t * f, svm_msg_q_t * vpp_evt_q, u8 * data,
                     u32 len, u8 noblock)
 {
-  session_fifo_event_t evt;
   int rv;
 
   if ((rv = svm_fifo_enqueue_nowait (f, len, data)) > 0)
     {
       if (svm_fifo_set_event (f))
-       {
-         evt.fifo = f;
-         evt.event_type = FIFO_EVENT_APP_TX;
-         svm_queue_add (vpp_evt_q, (u8 *) & evt, noblock);
-       }
+       app_send_io_evt_to_vpp (vpp_evt_q, f, FIFO_EVENT_APP_TX, noblock);
     }
   return rv;
 }
index 31bb0c3..b00bcd5 100644 (file)
@@ -599,25 +599,52 @@ segment_manager_dealloc_fifos (u32 segment_index, svm_fifo_t * rx_fifo,
     segment_manager_segment_reader_unlock (sm);
 }
 
+u32
+segment_manager_evt_q_expected_size (u32 q_len)
+{
+  u32 fifo_evt_size, notif_q_size, q_hdrs;
+  u32 msg_q_sz, fifo_evt_ring_sz, session_ntf_ring_sz;
+
+  fifo_evt_size = 1 << max_log2 (sizeof (session_fifo_event_t));
+  notif_q_size = clib_max (16, q_len >> 4);
+
+  msg_q_sz = q_len * sizeof (svm_msg_q_msg_t);
+  fifo_evt_ring_sz = q_len * fifo_evt_size;
+  session_ntf_ring_sz = notif_q_size * 256;
+  q_hdrs = sizeof (svm_queue_t) + sizeof (svm_msg_q_t);
+
+  return (msg_q_sz + fifo_evt_ring_sz + session_ntf_ring_sz + q_hdrs);
+}
+
 /**
  * Allocates shm queue in the first segment
  *
  * Must be called with lock held
  */
-svm_queue_t *
+svm_msg_q_t *
 segment_manager_alloc_queue (svm_fifo_segment_private_t * segment,
                             u32 queue_size)
 {
-  ssvm_shared_header_t *sh;
-  svm_queue_t *q;
+  u32 fifo_evt_size, session_evt_size = 256, notif_q_size;
+  svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
+  svm_msg_q_t *q;
   void *oldheap;
 
-  sh = segment->ssvm.sh;
+  fifo_evt_size = sizeof (session_fifo_event_t);
+  notif_q_size = clib_max (16, queue_size >> 4);
+  /* *INDENT-OFF* */
+  svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
+    {queue_size, fifo_evt_size, 0},
+    {notif_q_size, session_evt_size, 0}
+  };
+  /* *INDENT-ON* */
+  cfg->consumer_pid = 0;
+  cfg->n_rings = 2;
+  cfg->q_nitems = queue_size;
+  cfg->ring_cfgs = rc;
 
-  oldheap = ssvm_push_heap (sh);
-  q = svm_queue_init (queue_size, sizeof (session_fifo_event_t),
-                     0 /* consumer pid */ ,
-                     0 /* signal when queue non-empty */ );
+  oldheap = ssvm_push_heap (segment->ssvm.sh);
+  q = svm_msg_q_alloc (cfg);
   ssvm_pop_heap (oldheap);
   return q;
 }
index 62e5e97..73cb482 100644 (file)
@@ -17,7 +17,7 @@
 
 #include <vnet/vnet.h>
 #include <svm/svm_fifo_segment.h>
-#include <svm/queue.h>
+#include <svm/message_queue.h>
 #include <vlibmemory/api.h>
 #include <vppinfra/lock.h>
 #include <vppinfra/valloc.h>
@@ -61,7 +61,7 @@ typedef struct _segment_manager
   /**
    * App event queue allocated in first segment
    */
-  svm_queue_t *event_queue;
+  svm_msg_q_t *event_queue;
 } segment_manager_t;
 
 #define segment_manager_foreach_segment_w_lock(VAR, SM, BODY)          \
@@ -114,7 +114,7 @@ segment_manager_index (segment_manager_t * sm)
   return sm - segment_manager_main.segment_managers;
 }
 
-always_inline svm_queue_t *
+always_inline svm_msg_q_t *
 segment_manager_event_queue (segment_manager_t * sm)
 {
   return sm->event_queue;
@@ -152,7 +152,8 @@ int segment_manager_try_alloc_fifos (svm_fifo_segment_private_t * fs,
                                     svm_fifo_t ** tx_fifo);
 void segment_manager_dealloc_fifos (u32 segment_index, svm_fifo_t * rx_fifo,
                                    svm_fifo_t * tx_fifo);
-svm_queue_t *segment_manager_alloc_queue (svm_fifo_segment_private_t * fs,
+u32 segment_manager_evt_q_expected_size (u32 q_size);
+svm_msg_q_t *segment_manager_alloc_queue (svm_fifo_segment_private_t * fs,
                                          u32 queue_size);
 void segment_manager_dealloc_queue (segment_manager_t * sm, svm_queue_t * q);
 void segment_manager_app_detach (segment_manager_t * sm);
index 26bc70e..38a0521 100644 (file)
 session_manager_main_t session_manager_main;
 extern transport_proto_vft_t *tp_vfts;
 
-static void
-session_send_evt_to_thread (u64 session_handle, fifo_event_type_t evt_type,
-                           u32 thread_index, void *fp, void *rpc_args)
+static inline int
+session_send_evt_to_thread (void *data, void *args, u32 thread_index,
+                           session_evt_type_t evt_type)
 {
-  session_fifo_event_t evt = { {0}, };
-  svm_queue_t *q;
+  session_fifo_event_t *evt;
+  svm_msg_q_msg_t msg;
+  svm_msg_q_t *mq;
   u32 tries = 0, max_tries;
 
-  evt.event_type = evt_type;
-  if (evt_type == FIFO_EVENT_RPC)
-    {
-      evt.rpc_args.fp = fp;
-      evt.rpc_args.arg = rpc_args;
-    }
-  else
-    evt.session_handle = session_handle;
-
-  q = session_manager_get_vpp_event_queue (thread_index);
-  while (svm_queue_add (q, (u8 *) & evt, 1))
+  mq = session_manager_get_vpp_event_queue (thread_index);
+  while (svm_msg_q_try_lock (mq))
     {
       max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3;
       if (tries++ == max_tries)
        {
          SESSION_DBG ("failed to enqueue evt");
-         break;
+         return -1;
        }
     }
+  if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+    {
+      svm_msg_q_unlock (mq);
+      return -2;
+    }
+  msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+  if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (&msg)))
+    {
+      svm_msg_q_unlock (mq);
+      return -2;
+    }
+  evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+  evt->event_type = evt_type;
+  switch (evt_type)
+    {
+    case FIFO_EVENT_RPC:
+      evt->rpc_args.fp = data;
+      evt->rpc_args.arg = args;
+      break;
+    case FIFO_EVENT_APP_TX:
+    case FIFO_EVENT_BUILTIN_RX:
+      evt->fifo = data;
+      break;
+    case FIFO_EVENT_DISCONNECT:
+      evt->session_handle = session_handle ((stream_session_t *) data);
+      break;
+    default:
+      clib_warning ("evt unhandled!");
+      svm_msg_q_unlock (mq);
+      return -1;
+    }
+
+  svm_msg_q_add_w_lock (mq, &msg);
+  svm_msg_q_unlock (mq);
+  return 0;
 }
 
-void
-session_send_session_evt_to_thread (u64 session_handle,
-                                   fifo_event_type_t evt_type,
-                                   u32 thread_index)
+int
+session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
+{
+  return session_send_evt_to_thread (f, 0, f->master_thread_index, evt_type);
+}
+
+int
+session_send_io_evt_to_thread_custom (svm_fifo_t * f, u32 thread_index,
+                                     session_evt_type_t evt_type)
 {
-  session_send_evt_to_thread (session_handle, evt_type, thread_index, 0, 0);
+  return session_send_evt_to_thread (f, 0, thread_index, evt_type);
+}
+
+int
+session_send_ctrl_evt_to_thread (stream_session_t * s,
+                                session_evt_type_t evt_type)
+{
+  /* only event supported for now is disconnect */
+  ASSERT (evt_type == FIFO_EVENT_DISCONNECT);
+  return session_send_evt_to_thread (s, 0, s->thread_index,
+                                    FIFO_EVENT_DISCONNECT);
 }
 
 void
 session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
 {
   if (thread_index != vlib_get_thread_index ())
-    session_send_evt_to_thread (0, FIFO_EVENT_RPC, thread_index, fp,
-                               rpc_args);
+    session_send_evt_to_thread (fp, rpc_args, thread_index, FIFO_EVENT_RPC);
   else
     {
       void (*fnp) (void *) = fp;
@@ -440,24 +481,15 @@ stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
 /**
  * Notify session peer that new data has been enqueued.
  *
- * @param s Stream session for which the event is to be generated.
- * @param block Flag to indicate if call should block if event queue is full.
+ * @param s    Stream session for which the event is to be generated.
+ * @param lock         Flag to indicate if call should lock message queue.
  *
- * @return 0 on succes or negative number if failed to send notification.
+ * @return 0 on success or negative number if failed to send notification.
  */
-static int
-session_enqueue_notify (stream_session_t * s, u8 block)
+static inline int
+session_enqueue_notify (stream_session_t * s, u8 lock)
 {
   application_t *app;
-  session_fifo_event_t evt;
-  svm_queue_t *q;
-
-  if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
-    {
-      /* Session is closed so app will never clean up. Flush rx fifo */
-      svm_fifo_dequeue_drop_all (s->server_rx_fifo);
-      return 0;
-    }
 
   app = application_get_if_valid (s->app_index);
   if (PREDICT_FALSE (app == 0))
@@ -466,68 +498,32 @@ session_enqueue_notify (stream_session_t * s, u8 block)
       return 0;
     }
 
-  /* Built-in app? Hand event to the callback... */
-  if (app->cb_fns.builtin_app_rx_callback)
-    return app->cb_fns.builtin_app_rx_callback (s);
-
-  /* If no event, send one */
-  if (svm_fifo_set_event (s->server_rx_fifo))
-    {
-      /* Fabricate event */
-      evt.fifo = s->server_rx_fifo;
-      evt.event_type = FIFO_EVENT_APP_RX;
-
-      /* Add event to server's event queue */
-      q = app->event_queue;
-
-      /* Based on request block (or not) for lack of space */
-      if (block || PREDICT_TRUE (q->cursize < q->maxsize))
-       svm_queue_add (app->event_queue, (u8 *) & evt,
-                      0 /* do wait for mutex */ );
-      else
-       {
-         clib_warning ("fifo full");
-         return -1;
-       }
-    }
-
   /* *INDENT-OFF* */
   SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
-      ed->data[0] = evt.event_type;
+      ed->data[0] = FIFO_EVENT_APP_RX;
       ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo);
   }));
   /* *INDENT-ON* */
 
-  return 0;
+  if (lock)
+    return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX);
+
+  return application_send_event (app, s, FIFO_EVENT_APP_RX);
 }
 
 int
 session_dequeue_notify (stream_session_t * s)
 {
   application_t *app;
-  svm_queue_t *q;
 
   app = application_get_if_valid (s->app_index);
   if (PREDICT_FALSE (!app))
     return -1;
 
-  if (application_is_builtin (app))
-    return 0;
+  if (session_transport_service_type (s) == TRANSPORT_SERVICE_CL)
+    return application_lock_and_send_event (app, s, FIFO_EVENT_APP_RX);
 
-  q = app->event_queue;
-  if (PREDICT_TRUE (q->cursize < q->maxsize))
-    {
-      session_fifo_event_t evt = {
-       .event_type = FIFO_EVENT_APP_TX,
-       .fifo = s->server_tx_fifo
-      };
-      svm_queue_add (app->event_queue, (u8 *) & evt, SVM_Q_WAIT);
-    }
-  else
-    {
-      return -1;
-    }
-  return 0;
+  return application_send_event (app, s, FIFO_EVENT_APP_TX);
 }
 
 /**
@@ -542,16 +538,24 @@ int
 session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index)
 {
   session_manager_main_t *smm = &session_manager_main;
-  u32 *indices;
+  transport_service_type_t tp_service;
+  int i, errors = 0, lock;
   stream_session_t *s;
-  int i, errors = 0;
+  u32 *indices;
 
   indices = smm->session_to_enqueue[transport_proto][thread_index];
+  tp_service = transport_protocol_service_type (transport_proto);
+  lock = tp_service == TRANSPORT_SERVICE_CL;
 
   for (i = 0; i < vec_len (indices); i++)
     {
       s = session_get_if_valid (indices[i], thread_index);
-      if (s == 0 || session_enqueue_notify (s, 0 /* don't block */ ))
+      if (PREDICT_FALSE (!s))
+       {
+         errors++;
+         continue;
+       }
+      if (PREDICT_FALSE (session_enqueue_notify (s, lock)))
        errors++;
     }
 
@@ -1118,9 +1122,7 @@ stream_session_disconnect (stream_session_t * s)
       evt->event_type = FIFO_EVENT_DISCONNECT;
     }
   else
-    session_send_session_evt_to_thread (session_handle (s),
-                                       FIFO_EVENT_DISCONNECT,
-                                       s->thread_index);
+    session_send_ctrl_evt_to_thread (s, FIFO_EVENT_DISCONNECT);
 }
 
 /**
@@ -1231,8 +1233,18 @@ session_vpp_event_queues_allocate (session_manager_main_t * smm)
 
   for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
     {
-      smm->vpp_event_queues[i] = svm_queue_init (evt_q_length, evt_size,
-                                                vpp_pid, 0);
+      svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
+      u32 notif_q_size = clib_max (16, evt_q_length >> 4);
+      svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
+       {evt_q_length, evt_size, 0}
+       ,
+       {notif_q_size, 256, 0}
+      };
+      cfg->consumer_pid = 0;
+      cfg->n_rings = 2;
+      cfg->q_nitems = evt_q_length;
+      cfg->ring_cfgs = rc;
+      smm->vpp_event_queues[i] = svm_msg_q_alloc (cfg);
     }
 
   if (smm->evt_qs_use_memfd_seg)
index fe3477b..879b382 100644 (file)
@@ -38,10 +38,10 @@ typedef enum
   FIFO_EVENT_DISCONNECT,
   FIFO_EVENT_BUILTIN_RX,
   FIFO_EVENT_RPC,
-} fifo_event_type_t;
+} session_evt_type_t;
 
 static inline const char *
-fifo_event_type_str (fifo_event_type_t et)
+fifo_event_type_str (session_evt_type_t et)
 {
   switch (et)
     {
@@ -62,6 +62,13 @@ fifo_event_type_str (fifo_event_type_t et)
     }
 }
 
+typedef enum
+{
+  SESSION_MQ_IO_EVT_RING,
+  SESSION_MQ_CTRL_EVT_RING,
+  SESSION_MQ_N_RINGS
+} session_mq_rings_e;
+
 #define foreach_session_input_error                                            \
 _(NO_SESSION, "No session drops")                                       \
 _(NO_LISTENER, "No listener for dst port drops")                        \
@@ -86,23 +93,30 @@ typedef struct
 {
   void *fp;
   void *arg;
-} rpc_args_t;
+} session_rpc_args_t;
 
 typedef u64 session_handle_t;
 
 /* *INDENT-OFF* */
-typedef CLIB_PACKED (struct {
+typedef struct
+{
+  u8 event_type;
+  u8 postponed;
   union
+  {
+    svm_fifo_t *fifo;
+    session_handle_t session_handle;
+    session_rpc_args_t rpc_args;
+    struct
     {
-      svm_fifo_t * fifo;
-      session_handle_t session_handle;
-      rpc_args_t rpc_args;
+      u8 data[0];
     };
-  u8 event_type;
-  u8 postponed;
-}) session_fifo_event_t;
+  };
+} __clib_packed session_fifo_event_t;
 /* *INDENT-ON* */
 
+#define SESSION_MSG_NULL { }
+
 typedef struct session_dgram_pre_hdr_
 {
   u32 data_length;
@@ -193,7 +207,7 @@ struct _session_manager_main
   session_tx_context_t *ctx;
 
   /** vpp fifo event queue */
-  svm_queue_t **vpp_event_queues;
+  svm_msg_q_t **vpp_event_queues;
 
   /** Event queues memfd segment initialized only if so configured */
   ssvm_private_t evt_qs_segment;
@@ -533,9 +547,12 @@ int stream_session_stop_listen (stream_session_t * s);
 void stream_session_disconnect (stream_session_t * s);
 void stream_session_disconnect_transport (stream_session_t * s);
 void stream_session_cleanup (stream_session_t * s);
-void session_send_session_evt_to_thread (u64 session_handle,
-                                        fifo_event_type_t evt_type,
-                                        u32 thread_index);
+int session_send_io_evt_to_thread (svm_fifo_t * f,
+                                  session_evt_type_t evt_type);
+int session_send_io_evt_to_thread_custom (svm_fifo_t * f, u32 thread_index,
+                                         session_evt_type_t evt_type);
+void session_send_rpc_evt_to_thread (u32 thread_index, void *fp,
+                                    void *rpc_args);
 ssvm_private_t *session_manager_get_evt_q_segment (void);
 
 u8 *format_stream_session (u8 * s, va_list * args);
@@ -549,7 +566,7 @@ void session_register_transport (transport_proto_t transport_proto,
 
 clib_error_t *vnet_session_enable_disable (vlib_main_t * vm, u8 is_en);
 
-always_inline svm_queue_t *
+always_inline svm_msg_q_t *
 session_manager_get_vpp_event_queue (u32 thread_index)
 {
   return session_manager_main.vpp_event_queues[thread_index];
index 1a41dbd..f9fddea 100755 (executable)
@@ -155,7 +155,7 @@ send_session_accept_callback (stream_session_t * s)
   vl_api_registration_t *reg;
   transport_connection_t *tc;
   stream_session_t *listener;
-  svm_queue_t *vpp_queue;
+  svm_msg_q_t *vpp_queue;
 
   reg = vl_mem_api_client_index_to_registration (server->api_client_index);
   if (!reg)
@@ -300,7 +300,7 @@ send_session_connected_callback (u32 app_index, u32 api_context,
   vl_api_connect_session_reply_t *mp;
   transport_connection_t *tc;
   vl_api_registration_t *reg;
-  svm_queue_t *vpp_queue;
+  svm_msg_q_t *vpp_queue;
   application_t *app;
 
   app = application_get (app_index);
@@ -485,7 +485,7 @@ vl_api_bind_uri_t_handler (vl_api_bind_uri_t * mp)
   vl_api_bind_uri_reply_t *rmp;
   stream_session_t *s;
   application_t *app = 0;
-  svm_queue_t *vpp_evt_q;
+  svm_msg_q_t *vpp_evt_q;
   int rv;
 
   if (session_manager_is_enabled () == 0)
@@ -759,7 +759,7 @@ vl_api_bind_sock_t_handler (vl_api_bind_sock_t * mp)
   stream_session_t *s;
   transport_connection_t *tc = 0;
   ip46_address_t *ip46;
-  svm_queue_t *vpp_evt_q;
+  svm_msg_q_t *vpp_evt_q;
 
   if (session_manager_is_enabled () == 0)
     {
index 85fd28d..350282b 100644 (file)
@@ -575,15 +575,14 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
                       vlib_frame_t * frame)
 {
   session_manager_main_t *smm = vnet_get_session_manager_main ();
+  u32 thread_index = vm->thread_index, n_to_dequeue, n_events;
   session_fifo_event_t *pending_events, *e;
   session_fifo_event_t *fifo_events;
-  u32 n_to_dequeue, n_events;
-  svm_queue_t *q;
-  application_t *app;
-  int n_tx_packets = 0;
-  u32 thread_index = vm->thread_index;
-  int i, rv;
+  svm_msg_q_msg_t _msg, *msg = &_msg;
   f64 now = vlib_time_now (vm);
+  int n_tx_packets = 0, i, rv;
+  application_t *app;
+  svm_msg_q_t *mq;
   void (*fp) (void *);
 
   SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, thread_index);
@@ -594,16 +593,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
   transport_update_time (now, thread_index);
 
   /*
-   * Get vpp queue events
+   * Get vpp queue events that we can dequeue without blocking
    */
-  q = smm->vpp_event_queues[thread_index];
-  if (PREDICT_FALSE (q == 0))
-    return 0;
-
+  mq = smm->vpp_event_queues[thread_index];
   fifo_events = smm->free_event_vector[thread_index];
-
-  /* min number of events we can dequeue without blocking */
-  n_to_dequeue = q->cursize;
+  n_to_dequeue = svm_msg_q_size (mq);
   pending_events = smm->pending_event_vector[thread_index];
 
   if (!n_to_dequeue && !vec_len (pending_events)
@@ -624,21 +618,19 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
     }
 
   /* See you in the next life, don't be late
-   * XXX: we may need priorities here
-   */
-  if (pthread_mutex_trylock (&q->mutex))
+   * XXX: we may need priorities here */
+  if (svm_msg_q_try_lock (mq))
     return 0;
 
   for (i = 0; i < n_to_dequeue; i++)
     {
       vec_add2 (fifo_events, e, 1);
-      svm_queue_sub_raw (q, (u8 *) e);
+      svm_msg_q_sub_w_lock (mq, msg);
+      clib_memcpy (e, svm_msg_q_msg_data (mq, msg), sizeof (*e));
+      svm_msg_q_free_msg (mq, msg);
     }
 
-  /* The other side of the connection is not polling */
-  if (q->cursize < (q->maxsize / 8))
-    (void) pthread_cond_broadcast (&q->condvar);
-  pthread_mutex_unlock (&q->mutex);
+  svm_msg_q_unlock (mq);
 
   vec_append (fifo_events, pending_events);
   vec_append (fifo_events, smm->pending_disconnects[thread_index]);
@@ -760,19 +752,20 @@ dump_thread_0_event_queue (void)
   vlib_main_t *vm = &vlib_global_main;
   u32 my_thread_index = vm->thread_index;
   session_fifo_event_t _e, *e = &_e;
+  svm_msg_q_ring_t *ring;
   stream_session_t *s0;
+  svm_msg_q_msg_t *msg;
+  svm_msg_q_t *mq;
   int i, index;
-  i8 *headp;
-
-  svm_queue_t *q;
-  q = smm->vpp_event_queues[my_thread_index];
 
-  index = q->head;
+  mq = smm->vpp_event_queues[my_thread_index];
+  index = mq->q->head;
 
-  for (i = 0; i < q->cursize; i++)
+  for (i = 0; i < mq->q->cursize; i++)
     {
-      headp = (i8 *) (&q->data[0] + q->elsize * index);
-      clib_memcpy (e, headp, q->elsize);
+      msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
+      ring = svm_msg_q_ring (mq, msg->ring_index);
+      clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
 
       switch (e->event_type)
        {
@@ -805,7 +798,7 @@ dump_thread_0_event_queue (void)
 
       index++;
 
-      if (index == q->maxsize)
+      if (index == mq->q->maxsize)
        index = 0;
     }
 }
@@ -844,10 +837,11 @@ u8
 session_node_lookup_fifo_event (svm_fifo_t * f, session_fifo_event_t * e)
 {
   session_manager_main_t *smm = vnet_get_session_manager_main ();
-  svm_queue_t *q;
+  svm_msg_q_t *mq;
   session_fifo_event_t *pending_event_vector, *evt;
   int i, index, found = 0;
-  i8 *headp;
+  svm_msg_q_msg_t *msg;
+  svm_msg_q_ring_t *ring;
   u8 thread_index;
 
   ASSERT (e);
@@ -855,16 +849,17 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_fifo_event_t * e)
   /*
    * Search evt queue
    */
-  q = smm->vpp_event_queues[thread_index];
-  index = q->head;
-  for (i = 0; i < q->cursize; i++)
+  mq = smm->vpp_event_queues[thread_index];
+  index = mq->q->head;
+  for (i = 0; i < mq->q->cursize; i++)
     {
-      headp = (i8 *) (&q->data[0] + q->elsize * index);
-      clib_memcpy (e, headp, q->elsize);
+      msg = (svm_msg_q_msg_t *) (&mq->q->data[0] + mq->q->elsize * index);
+      ring = svm_msg_q_ring (mq, msg->ring_index);
+      clib_memcpy (e, svm_msg_q_msg_data (mq, msg), ring->elsize);
       found = session_node_cmp_event (e, f);
       if (found)
        return 1;
-      if (++index == q->maxsize)
+      if (++index == mq->q->maxsize)
        index = 0;
     }
   /*
index 492b3cc..cea4491 100644 (file)
@@ -41,63 +41,15 @@ tls_get_available_engine (void)
 int
 tls_add_vpp_q_evt (svm_fifo_t * f, u8 evt_type)
 {
-  session_fifo_event_t evt;
-  svm_queue_t *q;
-
   if (svm_fifo_set_event (f))
-    {
-      evt.fifo = f;
-      evt.event_type = evt_type;
-
-      q = session_manager_get_vpp_event_queue (f->master_thread_index);
-      if (PREDICT_TRUE (q->cursize < q->maxsize))
-       {
-         svm_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ );
-       }
-      else
-       {
-         clib_warning ("vpp's evt q full");
-         return -1;
-       }
-    }
+    session_send_io_evt_to_thread (f, evt_type);
   return 0;
 }
 
 static inline int
 tls_add_app_q_evt (application_t * app, stream_session_t * app_session)
 {
-  session_fifo_event_t evt;
-  svm_queue_t *q;
-
-  if (PREDICT_FALSE (app_session->session_state == SESSION_STATE_CLOSED))
-    {
-      /* Session is closed so app will never clean up. Flush rx fifo */
-      u32 to_dequeue = svm_fifo_max_dequeue (app_session->server_rx_fifo);
-      if (to_dequeue)
-       svm_fifo_dequeue_drop (app_session->server_rx_fifo, to_dequeue);
-      return 0;
-    }
-
-  if (app->cb_fns.builtin_app_rx_callback)
-    return app->cb_fns.builtin_app_rx_callback (app_session);
-
-  if (svm_fifo_set_event (app_session->server_rx_fifo))
-    {
-      evt.fifo = app_session->server_rx_fifo;
-      evt.event_type = FIFO_EVENT_APP_RX;
-      q = app->event_queue;
-
-      if (PREDICT_TRUE (q->cursize < q->maxsize))
-       {
-         svm_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ );
-       }
-      else
-       {
-         clib_warning ("app evt q full");
-         return -1;
-       }
-    }
-  return 0;
+  return application_send_event (app, app_session, FIFO_EVENT_APP_RX);
 }
 
 u32