+static uword
+session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
+ vlib_frame_t * frame)
+{
+ session_manager_main_t *smm = vnet_get_session_manager_main ();
+ u32 thread_index = vm->thread_index, n_to_dequeue, n_events;
+ session_event_t *pending_events, *e;
+ session_event_t *fifo_events;
+ svm_msg_q_msg_t _msg, *msg = &_msg;
+ f64 now = vlib_time_now (vm);
+ int n_tx_packets = 0, i, rv;
+ app_worker_t *app_wrk;
+ application_t *app;
+ svm_msg_q_t *mq;
+ void (*fp) (void *);
+
+ SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, thread_index);
+
+ /*
+ * Update transport time
+ */
+ transport_update_time (now, thread_index);
+
+ /*
+ * Get vpp queue events that we can dequeue without blocking
+ */
+ mq = smm->vpp_event_queues[thread_index];
+ fifo_events = smm->free_event_vector[thread_index];
+ n_to_dequeue = svm_msg_q_size (mq);
+ pending_events = smm->pending_event_vector[thread_index];
+
+ if (!n_to_dequeue && !vec_len (pending_events)
+ && !vec_len (smm->pending_disconnects[thread_index]))
+ return 0;
+
+ SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
+
+ /*
+ * If we didn't manage to process previous events try going
+ * over them again without dequeuing new ones.
+ * XXX: Handle senders to sessions that can't keep up
+ */
+ if (0 && vec_len (pending_events) >= 100)
+ {
+ clib_warning ("too many fifo events unsolved");
+ goto skip_dequeue;
+ }
+
+ /* See you in the next life, don't be late
+ * XXX: we may need priorities here */
+ if (svm_msg_q_try_lock (mq))
+ return 0;
+
+ for (i = 0; i < n_to_dequeue; i++)
+ {
+ vec_add2 (fifo_events, e, 1);
+ svm_msg_q_sub_w_lock (mq, msg);
+ clib_memcpy (e, svm_msg_q_msg_data (mq, msg), sizeof (*e));
+ svm_msg_q_free_msg (mq, msg);
+ }
+
+ svm_msg_q_unlock (mq);
+
+ vec_append (fifo_events, pending_events);
+ vec_append (fifo_events, smm->pending_disconnects[thread_index]);
+
+ _vec_len (pending_events) = 0;
+ smm->pending_event_vector[thread_index] = pending_events;
+ _vec_len (smm->pending_disconnects[thread_index]) = 0;
+
+skip_dequeue:
+ n_events = vec_len (fifo_events);
+ for (i = 0; i < n_events; i++)
+ {
+ stream_session_t *s; /* $$$ prefetch 1 ahead maybe */
+ session_event_t *e;
+ u8 want_tx_evt;
+
+ e = &fifo_events[i];
+ switch (e->event_type)
+ {
+ case FIFO_EVENT_APP_TX:
+ /* Don't try to send more that one frame per dispatch cycle */
+ if (n_tx_packets == VLIB_FRAME_SIZE)
+ {
+ vec_add1 (smm->pending_event_vector[thread_index], *e);
+ break;
+ }
+
+ s = session_event_get_session (e, thread_index);
+ if (PREDICT_FALSE (!s))
+ {
+ clib_warning ("It's dead, Jim!");
+ continue;
+ }
+
+ want_tx_evt = svm_fifo_want_tx_evt (s->server_tx_fifo);
+ /* Spray packets in per session type frames, since they go to
+ * different nodes */
+ rv = (smm->session_tx_fns[s->session_type]) (vm, node, e, s,
+ &n_tx_packets);
+ if (PREDICT_TRUE (rv == SESSION_TX_OK))
+ {
+ if (PREDICT_FALSE (want_tx_evt))
+ {
+ svm_fifo_set_want_tx_evt (s->server_tx_fifo, 0);
+ session_dequeue_notify (s);
+ }
+ }
+ else if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS))
+ {
+ vlib_node_increment_counter (vm, node->node_index,
+ SESSION_QUEUE_ERROR_NO_BUFFER, 1);
+ continue;
+ }
+ break;
+ case FIFO_EVENT_DISCONNECT:
+ /* Make sure stream disconnects run after the pending list is
+ * drained */
+ s = session_get_from_handle (e->session_handle);
+ if (!e->postponed)
+ {
+ e->postponed = 1;
+ vec_add1 (smm->pending_disconnects[thread_index], *e);
+ continue;
+ }
+ /* If tx queue is still not empty, wait */
+ if (svm_fifo_max_dequeue (s->server_tx_fifo))
+ {
+ vec_add1 (smm->pending_disconnects[thread_index], *e);
+ continue;
+ }
+
+ stream_session_disconnect_transport (s);
+ break;
+ case FIFO_EVENT_BUILTIN_RX:
+ s = session_event_get_session (e, thread_index);
+ if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING))
+ continue;
+ svm_fifo_unset_event (s->server_rx_fifo);
+ app_wrk = app_worker_get (s->app_wrk_index);
+ app = application_get (app_wrk->app_index);
+ app->cb_fns.builtin_app_rx_callback (s);
+ break;
+ case FIFO_EVENT_BUILTIN_TX:
+ s = session_get_from_handle_if_valid (e->session_handle);
+ if (PREDICT_TRUE (s != 0))
+ session_tx_fifo_dequeue_internal (vm, node, e, s, &n_tx_packets);
+ break;
+ case FIFO_EVENT_RPC:
+ fp = e->rpc_args.fp;
+ (*fp) (e->rpc_args.arg);
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED:
+ session_mq_disconnected_handler (e->data);
+ break;
+ case SESSION_CTRL_EVT_ACCEPTED_REPLY:
+ session_mq_accepted_reply_handler (e->data);
+ break;
+ case SESSION_CTRL_EVT_CONNECTED_REPLY:
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
+ session_mq_disconnected_reply_handler (e->data);
+ break;
+ case SESSION_CTRL_EVT_RESET_REPLY:
+ session_mq_reset_reply_handler (e->data);
+ break;
+ default:
+ clib_warning ("unhandled event type %d", e->event_type);
+ }
+ }
+
+ _vec_len (fifo_events) = 0;
+ smm->free_event_vector[thread_index] = fifo_events;
+
+ vlib_node_increment_counter (vm, session_queue_node.index,
+ SESSION_QUEUE_ERROR_TX, n_tx_packets);
+
+ SESSION_EVT_DBG (SESSION_EVT_DISPATCH_END, smm, thread_index);
+
+ return n_tx_packets;
+}
+
+/* *INDENT-OFF* */
+VLIB_REGISTER_NODE (session_queue_node) =
+{
+ .function = session_queue_node_fn,
+ .name = "session-queue",
+ .format_trace = format_session_queue_trace,
+ .type = VLIB_NODE_TYPE_INPUT,
+ .n_errors = ARRAY_LEN (session_queue_error_strings),
+ .error_strings = session_queue_error_strings,
+ .state = VLIB_NODE_STATE_DISABLED,
+};
+/* *INDENT-ON* */
+