#include <vnet/session/application_local.h>
#include <vnet/session/session_debug.h>
#include <svm/queue.h>
+#include <sys/timerfd.h>
#define app_check_thread_and_barrier(_fn, _arg) \
if (!vlib_thread_is_main_w_barrier ()) \
svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
}
+static void
+session_mq_transport_attr_handler (void *data)
+{
+ session_transport_attr_msg_t *mp = (session_transport_attr_msg_t *) data;
+ session_transport_attr_reply_msg_t *rmp;
+ svm_msg_q_msg_t _msg, *msg = &_msg;
+ app_worker_t *app_wrk;
+ session_event_t *evt;
+ application_t *app;
+ session_t *s;
+ int rv;
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+
+ if (!(s = session_get_from_handle_if_valid (mp->handle)))
+ {
+ clib_warning ("invalid handle %llu", mp->handle);
+ return;
+ }
+ app_wrk = app_worker_get (s->app_wrk_index);
+ if (app_wrk->app_index != app->app_index)
+ {
+ clib_warning ("app %u does not own session %llu", app->app_index,
+ mp->handle);
+ return;
+ }
+
+ rv = session_transport_attribute (s, mp->is_get, &mp->attr);
+
+ svm_msg_q_lock_and_alloc_msg_w_ring (
+ app_wrk->event_queue, SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT, msg);
+ evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
+ clib_memset (evt, 0, sizeof (*evt));
+ evt->event_type = SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY;
+ rmp = (session_transport_attr_reply_msg_t *) evt->data;
+ rmp->handle = mp->handle;
+ rmp->retval = rv;
+ rmp->is_get = mp->is_get;
+ if (!rv && mp->is_get)
+ rmp->attr = mp->attr;
+ svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
+}
+
vlib_node_registration_t session_queue_node;
typedef struct
case SESSION_CTRL_EVT_APP_WRK_RPC:
session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
break;
+ case SESSION_CTRL_EVT_TRANSPORT_ATTR:
+ session_mq_transport_attr_handler (session_evt_ctrl_data (wrk, elt));
+ break;
default:
clib_warning ("unhandled event type %d", e->event_type);
}
vec_reset_length (wrk->pending_tx_nexts);
}
+int
+session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
+{
+ svm_msg_q_msg_t _msg, *msg = &_msg;
+ u32 i, n_to_dequeue = 0;
+ session_event_t *evt;
+
+ n_to_dequeue = svm_msg_q_size (mq);
+ for (i = 0; i < n_to_dequeue; i++)
+ {
+ svm_msg_q_sub_raw (mq, msg);
+ evt = svm_msg_q_msg_data (mq, msg);
+ session_evt_add_to_list (wrk, evt);
+ svm_msg_q_free_msg (mq, msg);
+ }
+
+ return n_to_dequeue;
+}
+
+static void
+session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
+{
+ struct itimerspec its;
+
+ its.it_value.tv_sec = 0;
+ its.it_value.tv_nsec = time_ns;
+ its.it_interval.tv_sec = 0;
+ its.it_interval.tv_nsec = its.it_value.tv_nsec;
+
+ if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
+ clib_warning ("timerfd_settime");
+}
+
+always_inline u64
+session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
+{
+ if (state == SESSION_WRK_INTERRUPT)
+ return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
+ else if (state == SESSION_WRK_IDLE)
+ return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
+ else
+ return 0;
+}
+
+static inline void
+session_wrk_set_state (session_worker_t *wrk, session_wrk_state_t state)
+{
+ u64 time_ns;
+
+ wrk->state = state;
+ time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
+ session_wrk_timerfd_update (wrk, time_ns);
+}
+
+static void
+session_wrk_update_state (session_worker_t *wrk)
+{
+ vlib_main_t *vm = wrk->vm;
+
+ if (wrk->state == SESSION_WRK_POLLING)
+ {
+ if (pool_elts (wrk->event_elts) == 3 &&
+ vlib_last_vectors_per_main_loop (vm) < 1)
+ {
+ session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
+ vlib_node_set_state (vm, session_queue_node.index,
+ VLIB_NODE_STATE_INTERRUPT);
+ }
+ }
+ else if (wrk->state == SESSION_WRK_INTERRUPT)
+ {
+ if (pool_elts (wrk->event_elts) > 3 ||
+ vlib_last_vectors_per_main_loop (vm) > 1)
+ {
+ session_wrk_set_state (wrk, SESSION_WRK_POLLING);
+ vlib_node_set_state (vm, session_queue_node.index,
+ VLIB_NODE_STATE_POLLING);
+ }
+ else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
+ {
+ session_wrk_set_state (wrk, SESSION_WRK_IDLE);
+ }
+ }
+ else
+ {
+ if (pool_elts (wrk->event_elts))
+ {
+ session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
+ }
+ }
+}
+
static uword
session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame)
{
+ u32 thread_index = vm->thread_index, __clib_unused n_evts;
+ session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
session_main_t *smm = vnet_get_session_main ();
- u32 thread_index = vm->thread_index, n_to_dequeue;
session_worker_t *wrk = &smm->wrk[thread_index];
- session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
clib_llist_index_t ei, next_ei, old_ti;
- svm_msg_q_msg_t _msg, *msg = &_msg;
- int i = 0, n_tx_packets;
- session_event_t *evt;
- svm_msg_q_t *mq;
+ int n_tx_packets;
SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
- wrk->last_vlib_time = vlib_time_now (vm);
- wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
+ session_wrk_update_time (wrk, vlib_time_now (vm));
/*
* Update transport time
SESSION_EVT (SESSION_EVT_DSP_CNTRS, UPDATE_TIME, wrk);
/*
- * Dequeue and handle new events
+ * Dequeue new internal mq events
*/
- /* Try to dequeue what is available. Don't wait for lock.
- * XXX: we may need priorities here */
- mq = wrk->vpp_event_queue;
- n_to_dequeue = svm_msg_q_size (mq);
- if (n_to_dequeue)
- {
- for (i = 0; i < n_to_dequeue; i++)
- {
- svm_msg_q_sub_raw (mq, msg);
- evt = svm_msg_q_msg_data (mq, msg);
- session_evt_add_to_list (wrk, evt);
- svm_msg_q_free_msg (mq, msg);
- }
- }
-
- SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_to_dequeue, !i);
+ n_evts = session_wrk_handle_mq (wrk, wrk->vpp_event_queue);
+ SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_evts);
/*
* Handle control events
ctrl_he = pool_elt_at_index (wrk->event_elts, wrk->ctrl_head);
- /* *INDENT-OFF* */
clib_llist_foreach_safe (wrk->event_elts, evt_list, ctrl_he, elt, ({
clib_llist_remove (wrk->event_elts, evt_list, elt);
session_event_dispatch_ctrl (wrk, elt);
}));
- /* *INDENT-ON* */
SESSION_EVT (SESSION_EVT_DSP_CNTRS, CTRL_EVTS, wrk);
SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
+ if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
+ session_wrk_update_state (wrk);
+
return n_tx_packets;
}
};
/* *INDENT-ON* */
+static clib_error_t *
+session_wrk_tfd_read_ready (clib_file_t *cf)
+{
+ session_worker_t *wrk = session_main_get_worker (cf->private_data);
+ u64 buf;
+ int rv;
+
+ vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
+ rv = read (wrk->timerfd, &buf, sizeof (buf));
+ if (rv < 0 && errno != EAGAIN)
+ clib_unix_warning ("failed");
+ return 0;
+}
+
+static clib_error_t *
+session_wrk_tfd_write_ready (clib_file_t *cf)
+{
+ return 0;
+}
+
+void
+session_wrk_enable_adaptive_mode (session_worker_t *wrk)
+{
+ u32 thread_index = wrk->vm->thread_index;
+ clib_file_t template = { 0 };
+
+ if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
+ clib_warning ("timerfd_create");
+
+ template.read_function = session_wrk_tfd_read_ready;
+ template.write_function = session_wrk_tfd_write_ready;
+ template.file_descriptor = wrk->timerfd;
+ template.private_data = thread_index;
+ template.polling_thread_index = thread_index;
+ template.description = format (0, "session-wrk-tfd-%u", thread_index);
+
+ wrk->timerfd_file = clib_file_add (&file_main, &template);
+ wrk->flags |= SESSION_WRK_F_ADAPTIVE;
+}
+
static clib_error_t *
session_queue_exit (vlib_main_t * vm)
{