+session_event_get_session (session_worker_t * wrk, session_event_t * e)
+{
+ if (PREDICT_FALSE (pool_is_free_index (wrk->sessions, e->session_index)))
+ return 0;
+
+ ASSERT (session_is_valid (e->session_index, wrk->vm->thread_index));
+ return pool_elt_at_index (wrk->sessions, e->session_index);
+}
+
+always_inline void
+session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
+{
+ clib_llist_index_t ei;
+ void (*fp) (void *);
+ session_event_t *e;
+ session_t *s;
+
+ ei = clib_llist_entry_index (wrk->event_elts, elt);
+ e = &elt->evt;
+
+ switch (e->event_type)
+ {
+ case SESSION_CTRL_EVT_RPC:
+ fp = e->rpc_args.fp;
+ (*fp) (e->rpc_args.arg);
+ break;
+ case SESSION_CTRL_EVT_CLOSE:
+ s = session_get_from_handle_if_valid (e->session_handle);
+ if (PREDICT_FALSE (!s))
+ break;
+ session_transport_close (s);
+ break;
+ case SESSION_CTRL_EVT_RESET:
+ s = session_get_from_handle_if_valid (e->session_handle);
+ if (PREDICT_FALSE (!s))
+ break;
+ session_transport_reset (s);
+ break;
+ case SESSION_CTRL_EVT_LISTEN:
+ session_mq_listen_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_LISTEN_URI:
+ session_mq_listen_uri_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_UNLISTEN:
+ session_mq_unlisten_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_CONNECT:
+ session_mq_connect_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_CONNECT_URI:
+ session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_DISCONNECT:
+ session_mq_disconnect_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED:
+ session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_ACCEPTED_REPLY:
+ session_mq_accepted_reply_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
+ session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk,
+ elt));
+ break;
+ case SESSION_CTRL_EVT_RESET_REPLY:
+ session_mq_reset_reply_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_WORKER_UPDATE:
+ session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_APP_DETACH:
+ app_mq_detach_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_APP_WRK_RPC:
+ session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ case SESSION_CTRL_EVT_TRANSPORT_ATTR:
+ session_mq_transport_attr_handler (session_evt_ctrl_data (wrk, elt));
+ break;
+ default:
+ clib_warning ("unhandled event type %d", e->event_type);
+ }
+
+ /* Regrab elements in case pool moved */
+ elt = pool_elt_at_index (wrk->event_elts, ei);
+ if (!clib_llist_elt_is_linked (elt, evt_list))
+ {
+ e = &elt->evt;
+ if (e->event_type >= SESSION_CTRL_EVT_BOUND)
+ session_evt_ctrl_data_free (wrk, elt);
+ session_evt_elt_free (wrk, elt);
+ }
+ SESSION_EVT (SESSION_EVT_COUNTS, CNT_CTRL_EVTS, 1, wrk);
+}
+
+always_inline void
+session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node,
+ session_evt_elt_t * elt, int *n_tx_packets)
+{
+ session_main_t *smm = &session_main;
+ app_worker_t *app_wrk;
+ clib_llist_index_t ei;
+ session_event_t *e;
+ session_t *s;
+
+ ei = clib_llist_entry_index (wrk->event_elts, elt);
+ e = &elt->evt;
+
+ switch (e->event_type)
+ {
+ case SESSION_IO_EVT_TX_FLUSH:
+ case SESSION_IO_EVT_TX:
+ s = session_event_get_session (wrk, e);
+ if (PREDICT_FALSE (!s))
+ break;
+ CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD);
+ wrk->ctx.s = s;
+ /* Spray packets in per session type frames, since they go to
+ * different nodes */
+ (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets);
+ break;
+ case SESSION_IO_EVT_RX:
+ s = session_event_get_session (wrk, e);
+ if (!s)
+ break;
+ transport_app_rx_evt (session_get_transport_proto (s),
+ s->connection_index, s->thread_index);
+ break;
+ case SESSION_IO_EVT_BUILTIN_RX:
+ s = session_event_get_session (wrk, e);
+ if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
+ break;
+ svm_fifo_unset_event (s->rx_fifo);
+ app_wrk = app_worker_get (s->app_wrk_index);
+ app_worker_builtin_rx (app_wrk, s);
+ break;
+ case SESSION_IO_EVT_BUILTIN_TX:
+ s = session_get_from_handle_if_valid (e->session_handle);
+ wrk->ctx.s = s;
+ if (PREDICT_TRUE (s != 0))
+ session_tx_fifo_dequeue_internal (wrk, node, elt, n_tx_packets);
+ break;
+ default:
+ clib_warning ("unhandled event type %d", e->event_type);
+ }
+
+ SESSION_EVT (SESSION_IO_EVT_COUNTS, e->event_type, 1, wrk);
+
+ /* Regrab elements in case pool moved */
+ elt = pool_elt_at_index (wrk->event_elts, ei);
+ if (!clib_llist_elt_is_linked (elt, evt_list))
+ session_evt_elt_free (wrk, elt);
+}
+
+/* *INDENT-OFF* */
+static const u32 session_evt_msg_sizes[] = {
+#define _(symc, sym) \
+ [SESSION_CTRL_EVT_ ## symc] = sizeof (session_ ## sym ##_msg_t),
+ foreach_session_ctrl_evt
+#undef _
+};
+/* *INDENT-ON* */
+
+always_inline void
+session_evt_add_to_list (session_worker_t * wrk, session_event_t * evt)
+{
+ session_evt_elt_t *elt;
+
+ if (evt->event_type >= SESSION_CTRL_EVT_RPC)
+ {
+ elt = session_evt_alloc_ctrl (wrk);
+ if (evt->event_type >= SESSION_CTRL_EVT_BOUND)
+ {
+ elt->evt.ctrl_data_index = session_evt_ctrl_data_alloc (wrk);
+ elt->evt.event_type = evt->event_type;
+ clib_memcpy_fast (session_evt_ctrl_data (wrk, elt), evt->data,
+ session_evt_msg_sizes[evt->event_type]);
+ }
+ else
+ {
+ /* Internal control events fit into io events footprint */
+ clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
+ }
+ }
+ else
+ {
+ elt = session_evt_alloc_new (wrk);
+ clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
+ }
+}
+
+static void
+session_flush_pending_tx_buffers (session_worker_t * wrk,
+ vlib_node_runtime_t * node)
+{
+ vlib_buffer_enqueue_to_next (wrk->vm, node, wrk->pending_tx_buffers,
+ wrk->pending_tx_nexts,
+ vec_len (wrk->pending_tx_nexts));
+ vec_reset_length (wrk->pending_tx_buffers);
+ vec_reset_length (wrk->pending_tx_nexts);
+}
+
+int
+session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
+{
+ svm_msg_q_msg_t _msg, *msg = &_msg;
+ u32 i, n_to_dequeue = 0;
+ session_event_t *evt;
+
+ n_to_dequeue = svm_msg_q_size (mq);
+ for (i = 0; i < n_to_dequeue; i++)
+ {
+ svm_msg_q_sub_raw (mq, msg);
+ evt = svm_msg_q_msg_data (mq, msg);
+ session_evt_add_to_list (wrk, evt);
+ svm_msg_q_free_msg (mq, msg);
+ }
+
+ return n_to_dequeue;
+}
+
+static void
+session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)