vcl: support for eventfd mq signaling
[vpp.git] / src / vnet / session / application.c
index 6041b49..3e127df 100644 (file)
@@ -312,6 +312,12 @@ application_init (application_t * app, u32 api_client_index, u8 * app_name,
     }
   else
     {
+      if (options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
+       {
+         clib_warning ("mq eventfds can only be used if socket transport is "
+                       "used for api");
+         return VNET_API_ERROR_APP_UNSUPPORTED_CFG;
+       }
       seg_type = SSVM_SEGMENT_PRIVATE;
     }
 
@@ -336,6 +342,8 @@ application_init (application_t * app, u32 api_client_index, u8 * app_name,
     props->tx_fifo_size = options[APP_OPTIONS_TX_FIFO_SIZE];
   if (options[APP_OPTIONS_EVT_QUEUE_SIZE])
     props->evt_q_size = options[APP_OPTIONS_EVT_QUEUE_SIZE];
+  if (options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
+    props->use_mq_eventfd = 1;
   if (options[APP_OPTIONS_TLS_ENGINE])
     app->tls_engine = options[APP_OPTIONS_TLS_ENGINE];
   props->segment_type = seg_type;
@@ -810,22 +818,7 @@ application_get_segment_manager_properties (u32 app_index)
 static inline int
 app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock)
 {
-  if (PREDICT_TRUE (!svm_msg_q_is_full (mq)))
-    {
-      if (lock)
-       {
-         svm_msg_q_add_w_lock (mq, msg);
-         svm_msg_q_unlock (mq);
-       }
-      else if (svm_msg_q_add (mq, msg, SVM_Q_WAIT))
-       {
-         clib_warning ("msg q add returned");
-         if (lock)
-           svm_msg_q_unlock (mq);
-         return -1;
-       }
-    }
-  else
+  if (PREDICT_FALSE (svm_msg_q_is_full (mq)))
     {
       clib_warning ("evt q full");
       svm_msg_q_free_msg (mq, msg);
@@ -833,29 +826,43 @@ app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock)
        svm_msg_q_unlock (mq);
       return -1;
     }
+
+  if (lock)
+    {
+      svm_msg_q_add_and_unlock (mq, msg);
+      return 0;
+    }
+
+  /* Even when not locking the ring, we must wait for queue mutex */
+  if (svm_msg_q_add (mq, msg, SVM_Q_WAIT))
+    {
+      clib_warning ("msg q add returned");
+      return -1;
+    }
   return 0;
 }
 
 static inline int
 app_send_io_evt_rx (application_t * app, stream_session_t * s, u8 lock)
 {
-  session_fifo_event_t *evt;
+  session_event_t *evt;
   svm_msg_q_msg_t msg;
   svm_msg_q_t *mq;
 
-  if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
+  if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY
+                    && s->session_state != SESSION_STATE_LISTENING))
     {
       /* Session is closed so app will never clean up. Flush rx fifo */
-      svm_fifo_dequeue_drop_all (s->server_rx_fifo);
+      if (s->session_state == SESSION_STATE_CLOSED)
+       svm_fifo_dequeue_drop_all (s->server_rx_fifo);
       return 0;
     }
 
-  /* Built-in app? Hand event to the callback... */
   if (app->cb_fns.builtin_app_rx_callback)
     return app->cb_fns.builtin_app_rx_callback (s);
 
-  /* If no need for event, return */
-  if (!svm_fifo_set_event (s->server_rx_fifo))
+  if (svm_fifo_has_event (s->server_rx_fifo)
+      || svm_fifo_is_empty (s->server_rx_fifo))
     return 0;
 
   mq = app->event_queue;
@@ -873,18 +880,21 @@ app_send_io_evt_rx (application_t * app, stream_session_t * s, u8 lock)
   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
   ASSERT (!svm_msg_q_msg_is_invalid (&msg));
 
-  evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+  evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
   evt->fifo = s->server_rx_fifo;
   evt->event_type = FIFO_EVENT_APP_RX;
 
-  return app_enqueue_evt (mq, &msg, lock);
+  if (app_enqueue_evt (mq, &msg, lock))
+    return -1;
+  (void) svm_fifo_set_event (s->server_rx_fifo);
+  return 0;
 }
 
 static inline int
 app_send_io_evt_tx (application_t * app, stream_session_t * s, u8 lock)
 {
   svm_msg_q_t *mq;
-  session_fifo_event_t *evt;
+  session_event_t *evt;
   svm_msg_q_msg_t msg;
 
   if (application_is_builtin (app))
@@ -905,7 +915,7 @@ app_send_io_evt_tx (application_t * app, stream_session_t * s, u8 lock)
   msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
   ASSERT (!svm_msg_q_msg_is_invalid (&msg));
 
-  evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+  evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
   evt->event_type = FIFO_EVENT_APP_TX;
   evt->fifo = s->server_tx_fifo;
 
@@ -916,8 +926,9 @@ app_send_io_evt_tx (application_t * app, stream_session_t * s, u8 lock)
 typedef int (app_send_evt_handler_fn) (application_t *app,
                                       stream_session_t *s,
                                       u8 lock);
-static app_send_evt_handler_fn * const app_send_evt_handler_fns[2] = {
+static app_send_evt_handler_fn * const app_send_evt_handler_fns[3] = {
     app_send_io_evt_rx,
+    0,
     app_send_io_evt_tx,
 };
 /* *INDENT-ON* */
@@ -967,6 +978,8 @@ application_free_local_session (application_t * app, local_session_t * s)
 local_session_t *
 application_get_local_session (application_t * app, u32 session_index)
 {
+  if (pool_is_free_index (app->local_sessions, session_index))
+    return 0;
   return pool_elt_at_index (app->local_sessions, session_index);
 }
 
@@ -1075,6 +1088,23 @@ application_stop_local_listen (application_t * server, session_handle_t lh)
   return 0;
 }
 
+static void
+application_local_session_fix_eventds (svm_msg_q_t * sq, svm_msg_q_t * cq)
+{
+  int fd;
+
+  /*
+   * segment manager initializes only the producer eventds, since vpp is
+   * typically the producer. But for local sessions, we also pass to the
+   * apps the mqs they listen on for events from peer apps, so they are also
+   * consumer fds.
+   */
+  fd = svm_msg_q_get_producer_eventfd (sq);
+  svm_msg_q_set_consumer_eventfd (sq, fd);
+  fd = svm_msg_q_get_producer_eventfd (cq);
+  svm_msg_q_set_consumer_eventfd (cq, fd);
+}
+
 int
 application_local_session_connect (u32 table_index, application_t * client,
                                   application_t * server,
@@ -1082,6 +1112,7 @@ application_local_session_connect (u32 table_index, application_t * client,
 {
   u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10;
   segment_manager_properties_t *props, *cprops;
+  u32 round_rx_fifo_sz, round_tx_fifo_sz;
   int rv, has_transport, seg_index;
   svm_fifo_segment_private_t *seg;
   segment_manager_t *sm;
@@ -1094,7 +1125,9 @@ application_local_session_connect (u32 table_index, application_t * client,
   cprops = application_segment_manager_properties (client);
   evt_q_elts = props->evt_q_size + cprops->evt_q_size;
   evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts);
-  seg_size = props->rx_fifo_size + props->tx_fifo_size + evt_q_sz + margin;
+  round_rx_fifo_sz = 1 << max_log2 (props->rx_fifo_size);
+  round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size);
+  seg_size = round_rx_fifo_sz + round_tx_fifo_sz + evt_q_sz + margin;
 
   has_transport = session_has_transport ((stream_session_t *) ll);
   if (!has_transport)
@@ -1119,8 +1152,12 @@ application_local_session_connect (u32 table_index, application_t * client,
       return seg_index;
     }
   seg = segment_manager_get_segment_w_lock (sm, seg_index);
-  sq = segment_manager_alloc_queue (seg, props->evt_q_size);
-  cq = segment_manager_alloc_queue (seg, cprops->evt_q_size);
+  sq = segment_manager_alloc_queue (seg, props);
+  cq = segment_manager_alloc_queue (seg, cprops);
+
+  if (props->use_mq_eventfd)
+    application_local_session_fix_eventds (sq, cq);
+
   ls->server_evt_q = pointer_to_uword (sq);
   ls->client_evt_q = pointer_to_uword (cq);
   rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size,
@@ -1267,7 +1304,7 @@ application_local_session_disconnect (u32 app_index, local_session_t * ls)
 
   if (app_index == ls->client_index)
     {
-      send_local_session_disconnect_callback (ls->app_index, ls);
+      mq_send_local_session_disconnected_cb (ls->app_index, ls);
     }
   else
     {
@@ -1286,7 +1323,7 @@ application_local_session_disconnect (u32 app_index, local_session_t * ls)
        }
       else
        {
-         send_local_session_disconnect_callback (client->index, ls);
+         mq_send_local_session_disconnected_cb (client->index, ls);
        }
     }