session: fix vpp to app msg generation 68/20868/4
authorFlorin Coras <fcoras@cisco.com>
Thu, 25 Jul 2019 21:51:09 +0000 (14:51 -0700)
committerDave Barach <openvpp@barachs.net>
Mon, 29 Jul 2019 20:46:46 +0000 (20:46 +0000)
Type:fix

Freeing mq messages in vpp (producer), if enqueueing fails, invalidates
consumer assumption that messages can be freed without a lock.

Change-Id: I748a33b8846597bdad865945d8e899346d482434
Signed-off-by: Florin Coras <fcoras@cisco.com>
Signed-off-by: Tal Saiag <tal.saiag@gmail.com>
src/svm/message_queue.c
src/vnet/session/application_worker.c
src/vnet/session/session_node.c

index 6304420..a40c4a4 100644 (file)
@@ -120,7 +120,8 @@ svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
     {
       if (svm_msg_q_try_lock (mq))
        return -1;
-      if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, ring_index)))
+      if (PREDICT_FALSE (svm_msg_q_is_full (mq)
+                        || svm_msg_q_ring_is_full (mq, ring_index)))
        {
          svm_msg_q_unlock (mq);
          return -2;
@@ -135,7 +136,8 @@ svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
   else
     {
       svm_msg_q_lock (mq);
-      while (svm_msg_q_ring_is_full (mq, ring_index))
+      while (svm_msg_q_is_full (mq)
+            || svm_msg_q_ring_is_full (mq, ring_index))
        svm_msg_q_wait (mq);
       *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
     }
index cab96b5..0be1a2e 100644 (file)
@@ -535,34 +535,7 @@ app_worker_application_is_builtin (app_worker_t * app_wrk)
 }
 
 static inline int
-app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock)
-{
-  if (PREDICT_FALSE (svm_msg_q_is_full (mq)))
-    {
-      clib_warning ("evt q full");
-      svm_msg_q_free_msg (mq, msg);
-      if (lock)
-       svm_msg_q_unlock (mq);
-      return -1;
-    }
-
-  if (lock)
-    {
-      svm_msg_q_add_and_unlock (mq, msg);
-      return 0;
-    }
-
-  /* Even when not locking the ring, we must wait for queue mutex */
-  if (svm_msg_q_add (mq, msg, SVM_Q_WAIT))
-    {
-      clib_warning ("msg q add returned");
-      return -1;
-    }
-  return 0;
-}
-
-static inline int
-app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s, u8 lock)
+app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s)
 {
   session_event_t *evt;
   svm_msg_q_msg_t msg;
@@ -579,33 +552,35 @@ app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s, u8 lock)
     return 0;
 
   mq = app_wrk->event_queue;
-  if (lock)
-    svm_msg_q_lock (mq);
+  svm_msg_q_lock (mq);
+
+  if (PREDICT_FALSE (svm_msg_q_is_full (mq)))
+    {
+      clib_warning ("evt q full");
+      svm_msg_q_unlock (mq);
+      return -1;
+    }
 
   if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
     {
       clib_warning ("evt q rings full");
-      if (lock)
-       svm_msg_q_unlock (mq);
+      svm_msg_q_unlock (mq);
       return -1;
     }
 
   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
-  ASSERT (!svm_msg_q_msg_is_invalid (&msg));
-
   evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
   evt->session_index = s->rx_fifo->client_session_index;
   evt->event_type = SESSION_IO_EVT_RX;
 
   (void) svm_fifo_set_event (s->rx_fifo);
+  svm_msg_q_add_and_unlock (mq, &msg);
 
-  if (app_enqueue_evt (mq, &msg, lock))
-    return -1;
   return 0;
 }
 
 static inline int
-app_send_io_evt_tx (app_worker_t * app_wrk, session_t * s, u8 lock)
+app_send_io_evt_tx (app_worker_t * app_wrk, session_t * s)
 {
   svm_msg_q_t *mq;
   session_event_t *evt;
@@ -615,50 +590,40 @@ app_send_io_evt_tx (app_worker_t * app_wrk, session_t * s, u8 lock)
     return app_worker_builtin_tx (app_wrk, s);
 
   mq = app_wrk->event_queue;
-  if (lock)
-    svm_msg_q_lock (mq);
+  svm_msg_q_lock (mq);
+
+  if (PREDICT_FALSE (svm_msg_q_is_full (mq)))
+    {
+      clib_warning ("evt q full");
+      svm_msg_q_unlock (mq);
+      return -1;
+    }
 
   if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
     {
       clib_warning ("evt q rings full");
-      if (lock)
-       svm_msg_q_unlock (mq);
+      svm_msg_q_unlock (mq);
       return -1;
     }
 
   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
-  ASSERT (!svm_msg_q_msg_is_invalid (&msg));
-
   evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
   evt->event_type = SESSION_IO_EVT_TX;
   evt->session_index = s->tx_fifo->client_session_index;
 
-  return app_enqueue_evt (mq, &msg, lock);
+  svm_msg_q_add_and_unlock (mq, &msg);
+  return 0;
 }
 
 /* *INDENT-OFF* */
 typedef int (app_send_evt_handler_fn) (app_worker_t *app,
-                                      session_t *s,
-                                      u8 lock);
+                                      session_t *s);
 static app_send_evt_handler_fn * const app_send_evt_handler_fns[2] = {
     app_send_io_evt_rx,
     app_send_io_evt_tx,
 };
 /* *INDENT-ON* */
 
-/**
- * Send event to application
- *
- * Logic from queue perspective is non-blocking. If there's
- * not enough space to enqueue a message, we return.
- */
-int
-app_worker_send_event (app_worker_t * app, session_t * s, u8 evt_type)
-{
-  ASSERT (app && evt_type <= SESSION_IO_EVT_TX);
-  return app_send_evt_handler_fns[evt_type] (app, s, 0 /* lock */ );
-}
-
 /**
  * Send event to application
  *
@@ -669,7 +634,7 @@ int
 app_worker_lock_and_send_event (app_worker_t * app, session_t * s,
                                u8 evt_type)
 {
-  return app_send_evt_handler_fns[evt_type] (app, s, 1 /* lock */ );
+  return app_send_evt_handler_fns[evt_type] (app, s);
 }
 
 u8 *
index 35704b7..bffae62 100644 (file)
@@ -175,7 +175,6 @@ session_mq_disconnected_handler (void *data)
   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
                                       SESSION_MQ_CTRL_EVT_RING,
                                       SVM_Q_WAIT, msg);
-  svm_msg_q_unlock (app_wrk->event_queue);
   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
   clib_memset (evt, 0, sizeof (*evt));
   evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
@@ -183,7 +182,7 @@ session_mq_disconnected_handler (void *data)
   rmp->handle = mp->handle;
   rmp->context = mp->context;
   rmp->retval = rv;
-  svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
+  svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
 }
 
 static void
@@ -250,13 +249,12 @@ session_mq_worker_update_handler (void *data)
       svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
                                           SESSION_MQ_CTRL_EVT_RING,
                                           SVM_Q_WAIT, msg);
-      svm_msg_q_unlock (app_wrk->event_queue);
       evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
       clib_memset (evt, 0, sizeof (*evt));
       evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
       wump = (session_req_worker_update_msg_t *) evt->data;
       wump->session_handle = mp->handle;
-      svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
+      svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
       return;
     }
 
@@ -268,7 +266,6 @@ session_mq_worker_update_handler (void *data)
   svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
                                       SESSION_MQ_CTRL_EVT_RING,
                                       SVM_Q_WAIT, msg);
-  svm_msg_q_unlock (app_wrk->event_queue);
   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
   clib_memset (evt, 0, sizeof (*evt));
   evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
@@ -277,7 +274,7 @@ session_mq_worker_update_handler (void *data)
   rmp->rx_fifo = pointer_to_uword (s->rx_fifo);
   rmp->tx_fifo = pointer_to_uword (s->tx_fifo);
   rmp->segment_handle = session_segment_handle (s);
-  svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
+  svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
 
   /*
    * Retransmit messages that may have been lost