}
else
{
+ if (options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
+ {
+ clib_warning ("mq eventfds can only be used if socket transport is "
+ "used for api");
+ return VNET_API_ERROR_APP_UNSUPPORTED_CFG;
+ }
seg_type = SSVM_SEGMENT_PRIVATE;
}
props->tx_fifo_size = options[APP_OPTIONS_TX_FIFO_SIZE];
if (options[APP_OPTIONS_EVT_QUEUE_SIZE])
props->evt_q_size = options[APP_OPTIONS_EVT_QUEUE_SIZE];
+ if (options[APP_OPTIONS_FLAGS] & APP_OPTIONS_FLAGS_EVT_MQ_USE_EVENTFD)
+ props->use_mq_eventfd = 1;
if (options[APP_OPTIONS_TLS_ENGINE])
app->tls_engine = options[APP_OPTIONS_TLS_ENGINE];
props->segment_type = seg_type;
return &app->sm_properties;
}
+static inline int
+app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock)
+{
+ if (PREDICT_FALSE (svm_msg_q_is_full (mq)))
+ {
+ clib_warning ("evt q full");
+ svm_msg_q_free_msg (mq, msg);
+ if (lock)
+ svm_msg_q_unlock (mq);
+ return -1;
+ }
+
+ if (lock)
+ {
+ svm_msg_q_add_and_unlock (mq, msg);
+ return 0;
+ }
+
+ /* Even when not locking the ring, we must wait for queue mutex */
+ if (svm_msg_q_add (mq, msg, SVM_Q_WAIT))
+ {
+ clib_warning ("msg q add returned");
+ return -1;
+ }
+ return 0;
+}
+
+static inline int
+app_send_io_evt_rx (application_t * app, stream_session_t * s, u8 lock)
+{
+ session_event_t *evt;
+ svm_msg_q_msg_t msg;
+ svm_msg_q_t *mq;
+
+ if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY
+ && s->session_state != SESSION_STATE_LISTENING))
+ {
+ /* Session is closed so app will never clean up. Flush rx fifo */
+ if (s->session_state == SESSION_STATE_CLOSED)
+ svm_fifo_dequeue_drop_all (s->server_rx_fifo);
+ return 0;
+ }
+
+ if (app->cb_fns.builtin_app_rx_callback)
+ return app->cb_fns.builtin_app_rx_callback (s);
+
+ if (svm_fifo_has_event (s->server_rx_fifo)
+ || svm_fifo_is_empty (s->server_rx_fifo))
+ return 0;
+
+ mq = app->event_queue;
+ if (lock)
+ svm_msg_q_lock (mq);
+
+ if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+ {
+ clib_warning ("evt q rings full");
+ if (lock)
+ svm_msg_q_unlock (mq);
+ return -1;
+ }
+
+ msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+ ASSERT (!svm_msg_q_msg_is_invalid (&msg));
+
+ evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
+ evt->fifo = s->server_rx_fifo;
+ evt->event_type = FIFO_EVENT_APP_RX;
+
+ if (app_enqueue_evt (mq, &msg, lock))
+ return -1;
+ (void) svm_fifo_set_event (s->server_rx_fifo);
+ return 0;
+}
+
+static inline int
+app_send_io_evt_tx (application_t * app, stream_session_t * s, u8 lock)
+{
+ svm_msg_q_t *mq;
+ session_event_t *evt;
+ svm_msg_q_msg_t msg;
+
+ if (application_is_builtin (app))
+ return 0;
+
+ mq = app->event_queue;
+ if (lock)
+ svm_msg_q_lock (mq);
+
+ if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+ {
+ clib_warning ("evt q rings full");
+ if (lock)
+ svm_msg_q_unlock (mq);
+ return -1;
+ }
+
+ msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+ ASSERT (!svm_msg_q_msg_is_invalid (&msg));
+
+ evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
+ evt->event_type = FIFO_EVENT_APP_TX;
+ evt->fifo = s->server_tx_fifo;
+
+ return app_enqueue_evt (mq, &msg, lock);
+}
+
+/* *INDENT-OFF* */
+typedef int (app_send_evt_handler_fn) (application_t *app,
+ stream_session_t *s,
+ u8 lock);
+static app_send_evt_handler_fn * const app_send_evt_handler_fns[3] = {
+ app_send_io_evt_rx,
+ 0,
+ app_send_io_evt_tx,
+};
+/* *INDENT-ON* */
+
+/**
+ * Send event to application
+ *
+ * Logic from queue perspective is non-blocking. That is, if there's
+ * not enough space to enqueue a message, we return. However, if the lock
+ * flag is set, we do wait for queue mutex.
+ */
+int
+application_send_event (application_t * app, stream_session_t * s,
+ u8 evt_type)
+{
+ ASSERT (app && evt_type <= FIFO_EVENT_APP_TX);
+ return app_send_evt_handler_fns[evt_type] (app, s, 0 /* lock */ );
+}
+
+int
+application_lock_and_send_event (application_t * app, stream_session_t * s,
+ u8 evt_type)
+{
+ return app_send_evt_handler_fns[evt_type] (app, s, 1 /* lock */ );
+}
+
local_session_t *
application_alloc_local_session (application_t * app)
{
local_session_t *
application_get_local_session (application_t * app, u32 session_index)
{
+ if (pool_is_free_index (app->local_sessions, session_index))
+ return 0;
return pool_elt_at_index (app->local_sessions, session_index);
}
return 0;
}
+static void
+application_local_session_fix_eventds (svm_msg_q_t * sq, svm_msg_q_t * cq)
+{
+ int fd;
+
+ /*
+ * segment manager initializes only the producer eventds, since vpp is
+ * typically the producer. But for local sessions, we also pass to the
+ * apps the mqs they listen on for events from peer apps, so they are also
+ * consumer fds.
+ */
+ fd = svm_msg_q_get_producer_eventfd (sq);
+ svm_msg_q_set_consumer_eventfd (sq, fd);
+ fd = svm_msg_q_get_producer_eventfd (cq);
+ svm_msg_q_set_consumer_eventfd (cq, fd);
+}
+
int
application_local_session_connect (u32 table_index, application_t * client,
application_t * server,
{
u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10;
segment_manager_properties_t *props, *cprops;
+ u32 round_rx_fifo_sz, round_tx_fifo_sz;
int rv, has_transport, seg_index;
svm_fifo_segment_private_t *seg;
segment_manager_t *sm;
local_session_t *ls;
- svm_queue_t *sq, *cq;
+ svm_msg_q_t *sq, *cq;
ls = application_alloc_local_session (server);
props = application_segment_manager_properties (server);
cprops = application_segment_manager_properties (client);
evt_q_elts = props->evt_q_size + cprops->evt_q_size;
- evt_q_sz = evt_q_elts * sizeof (session_fifo_event_t);
- seg_size = props->rx_fifo_size + props->tx_fifo_size + evt_q_sz + margin;
+ evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts);
+ round_rx_fifo_sz = 1 << max_log2 (props->rx_fifo_size);
+ round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size);
+ seg_size = round_rx_fifo_sz + round_tx_fifo_sz + evt_q_sz + margin;
has_transport = session_has_transport ((stream_session_t *) ll);
if (!has_transport)
return seg_index;
}
seg = segment_manager_get_segment_w_lock (sm, seg_index);
- sq = segment_manager_alloc_queue (seg, props->evt_q_size);
- cq = segment_manager_alloc_queue (seg, cprops->evt_q_size);
+ sq = segment_manager_alloc_queue (seg, props);
+ cq = segment_manager_alloc_queue (seg, cprops);
+
+ if (props->use_mq_eventfd)
+ application_local_session_fix_eventds (sq, cq);
+
ls->server_evt_q = pointer_to_uword (sq);
ls->client_evt_q = pointer_to_uword (cq);
rv = segment_manager_try_alloc_fifos (seg, props->rx_fifo_size,
if (app_index == ls->client_index)
{
- send_local_session_disconnect_callback (ls->app_index, ls);
+ mq_send_local_session_disconnected_cb (ls->app_index, ls);
}
else
{
}
else
{
- send_local_session_disconnect_callback (client->index, ls);
+ mq_send_local_session_disconnected_cb (client->index, ls);
}
}