#include <vnet/session/application_local.h>
#include <vnet/session/session_debug.h>
#include <svm/queue.h>
+#include <sys/timerfd.h>
#define app_check_thread_and_barrier(_fn, _arg) \
if (!vlib_thread_is_main_w_barrier ()) \
a->sep_ext.transport_flags = mp->flags;
if ((rv = vnet_listen (a)))
- clib_warning ("listen returned: %d", rv);
+ clib_warning ("listen returned: %U", format_session_error, rv);
app_wrk = application_get_worker (app, mp->wrk_index);
mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
rmp = (session_worker_update_reply_msg_t *) evt->data;
rmp->handle = mp->handle;
- rmp->rx_fifo = pointer_to_uword (s->rx_fifo);
- rmp->tx_fifo = pointer_to_uword (s->tx_fifo);
+ if (s->rx_fifo)
+ rmp->rx_fifo = fifo_segment_fifo_offset (s->rx_fifo);
+ if (s->tx_fifo)
+ rmp->tx_fifo = fifo_segment_fifo_offset (s->tx_fifo);
rmp->segment_handle = session_segment_handle (s);
svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
}
+static void
+session_mq_transport_attr_handler (void *data)
+{
+ session_transport_attr_msg_t *mp = (session_transport_attr_msg_t *) data;
+ session_transport_attr_reply_msg_t *rmp;
+ svm_msg_q_msg_t _msg, *msg = &_msg;
+ app_worker_t *app_wrk;
+ session_event_t *evt;
+ application_t *app;
+ session_t *s;
+ int rv;
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+
+ if (!(s = session_get_from_handle_if_valid (mp->handle)))
+ {
+ clib_warning ("invalid handle %llu", mp->handle);
+ return;
+ }
+ app_wrk = app_worker_get (s->app_wrk_index);
+ if (app_wrk->app_index != app->app_index)
+ {
+ clib_warning ("app %u does not own session %llu", app->app_index,
+ mp->handle);
+ return;
+ }
+
+ rv = session_transport_attribute (s, mp->is_get, &mp->attr);
+
+ svm_msg_q_lock_and_alloc_msg_w_ring (
+ app_wrk->event_queue, SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT, msg);
+ evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
+ clib_memset (evt, 0, sizeof (*evt));
+ evt->event_type = SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY;
+ rmp = (session_transport_attr_reply_msg_t *) evt->data;
+ rmp->handle = mp->handle;
+ rmp->retval = rv;
+ rmp->is_get = mp->is_get;
+ if (!rv && mp->is_get)
+ rmp->attr = mp->attr;
+ svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
+}
+
vlib_node_registration_t session_queue_node;
typedef struct
u32 next_index, u32 * to_next, u16 n_segs,
session_t * s, u32 n_trace)
{
- session_queue_trace_t *t;
- vlib_buffer_t *b;
- int i;
-
- for (i = 0; i < clib_min (n_trace, n_segs); i++)
+ while (n_trace && n_segs)
{
- b = vlib_get_buffer (vm, to_next[i]);
- vlib_trace_buffer (vm, node, next_index, b, 1 /* follow_chain */ );
- t = vlib_add_trace (vm, node, b, sizeof (*t));
- t->session_index = s->session_index;
- t->server_thread_index = s->thread_index;
+ vlib_buffer_t *b = vlib_get_buffer (vm, to_next[0]);
+ if (PREDICT_TRUE
+ (vlib_trace_buffer
+ (vm, node, next_index, b, 1 /* follow_chain */ )))
+ {
+ session_queue_trace_t *t =
+ vlib_add_trace (vm, node, b, sizeof (*t));
+ t->session_index = s->session_index;
+ t->server_thread_index = s->thread_index;
+ n_trace--;
+ }
+ to_next++;
+ n_segs--;
}
- vlib_set_trace_count (vm, node, n_trace - i);
+ vlib_set_trace_count (vm, node, n_trace);
}
always_inline void
svm_fifo_t *f = ctx->s->tx_fifo;
session_dgram_hdr_t *hdr = &ctx->hdr;
u16 deq_now;
+ u32 offset;
+
deq_now = clib_min (hdr->data_length - hdr->data_offset,
len_to_deq);
- n_bytes_read = svm_fifo_peek (f, hdr->data_offset, deq_now,
- data);
+ offset = hdr->data_offset + SESSION_CONN_HDR_LEN;
+ n_bytes_read = svm_fifo_peek (f, offset, deq_now, data);
ASSERT (n_bytes_read > 0);
hdr->data_offset += n_bytes_read;
if (hdr->data_offset == hdr->data_length)
{
- u32 offset = hdr->data_length + SESSION_CONN_HDR_LEN;
+ offset = hdr->data_length + SESSION_CONN_HDR_LEN;
svm_fifo_dequeue_drop (f, offset);
if (ctx->left_to_snd > n_bytes_read)
svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
}
next_index = smm->session_type_to_next[ctx->s->session_type];
- max_burst = VLIB_FRAME_SIZE - *n_tx_packets;
+ max_burst = SESSION_NODE_FRAME_SIZE - *n_tx_packets;
tp = session_get_transport_proto (ctx->s);
ctx->transport_vft = transport_protocol_get_vft (tp);
return SESSION_TX_NO_BUFFERS;
}
- transport_connection_update_tx_bytes (ctx->tc, ctx->max_len_to_snd);
+ if (transport_connection_is_tx_paced (ctx->tc))
+ transport_connection_tx_pacer_update_bytes (ctx->tc, ctx->max_len_to_snd);
ctx->left_to_snd = ctx->max_len_to_snd;
n_left = ctx->n_segs_per_evt;
if (!peek_data)
{
- if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, ctx->max_len_to_snd))
+ u32 n_dequeued = ctx->max_len_to_snd;
+ if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
+ n_dequeued += ctx->n_segs_per_evt * SESSION_CONN_HDR_LEN;
+ if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, n_dequeued))
session_dequeue_notify (ctx->s);
}
return SESSION_TX_OK;
/* Clear custom-tx flag used to request reschedule for tx */
s->flags &= ~SESSION_F_CUSTOM_TX;
- sp->max_burst_size = clib_min (VLIB_FRAME_SIZE - *n_tx_packets,
+ sp->max_burst_size = clib_min (SESSION_NODE_FRAME_SIZE - *n_tx_packets,
TRANSPORT_PACER_MAX_BURST_PKTS);
n_packets = transport_custom_tx (session_get_transport_proto (s), s, sp);
session_evt_add_head_old (wrk, elt);
}
+ if (sp->max_burst_size &&
+ svm_fifo_needs_deq_ntf (s->tx_fifo, sp->max_burst_size))
+ session_dequeue_notify (s);
+
return n_packets;
}
case SESSION_CTRL_EVT_APP_WRK_RPC:
session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
break;
+ case SESSION_CTRL_EVT_TRANSPORT_ATTR:
+ session_mq_transport_attr_handler (session_evt_ctrl_data (wrk, elt));
+ break;
default:
clib_warning ("unhandled event type %d", e->event_type);
}
vec_reset_length (wrk->pending_tx_nexts);
}
+int
+session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq)
+{
+ svm_msg_q_msg_t _msg, *msg = &_msg;
+ u32 i, n_to_dequeue = 0;
+ session_event_t *evt;
+
+ n_to_dequeue = svm_msg_q_size (mq);
+ for (i = 0; i < n_to_dequeue; i++)
+ {
+ svm_msg_q_sub_raw (mq, msg);
+ evt = svm_msg_q_msg_data (mq, msg);
+ session_evt_add_to_list (wrk, evt);
+ svm_msg_q_free_msg (mq, msg);
+ }
+
+ return n_to_dequeue;
+}
+
+static void
+session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
+{
+ struct itimerspec its;
+
+ its.it_value.tv_sec = 0;
+ its.it_value.tv_nsec = time_ns;
+ its.it_interval.tv_sec = 0;
+ its.it_interval.tv_nsec = its.it_value.tv_nsec;
+
+ if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
+ clib_warning ("timerfd_settime");
+}
+
+always_inline u64
+session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
+{
+ if (state == SESSION_WRK_INTERRUPT)
+ return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
+ else if (state == SESSION_WRK_IDLE)
+ return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
+ else
+ return 0;
+}
+
+static inline void
+session_wrk_set_state (session_worker_t *wrk, session_wrk_state_t state)
+{
+ u64 time_ns;
+
+ wrk->state = state;
+ time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
+ session_wrk_timerfd_update (wrk, time_ns);
+}
+
+static void
+session_wrk_update_state (session_worker_t *wrk)
+{
+ vlib_main_t *vm = wrk->vm;
+
+ if (wrk->state == SESSION_WRK_POLLING)
+ {
+ if (pool_elts (wrk->event_elts) == 3 &&
+ vlib_last_vectors_per_main_loop (vm) < 1)
+ {
+ session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
+ vlib_node_set_state (vm, session_queue_node.index,
+ VLIB_NODE_STATE_INTERRUPT);
+ }
+ }
+ else if (wrk->state == SESSION_WRK_INTERRUPT)
+ {
+ if (pool_elts (wrk->event_elts) > 3 ||
+ vlib_last_vectors_per_main_loop (vm) > 1)
+ {
+ session_wrk_set_state (wrk, SESSION_WRK_POLLING);
+ vlib_node_set_state (vm, session_queue_node.index,
+ VLIB_NODE_STATE_POLLING);
+ }
+ else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
+ {
+ session_wrk_set_state (wrk, SESSION_WRK_IDLE);
+ }
+ }
+ else
+ {
+ if (pool_elts (wrk->event_elts))
+ {
+ session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
+ }
+ }
+}
+
static uword
session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame)
{
+ u32 thread_index = vm->thread_index, __clib_unused n_evts;
+ session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
session_main_t *smm = vnet_get_session_main ();
- u32 thread_index = vm->thread_index, n_to_dequeue;
session_worker_t *wrk = &smm->wrk[thread_index];
- session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
clib_llist_index_t ei, next_ei, old_ti;
- svm_msg_q_msg_t _msg, *msg = &_msg;
- int i = 0, n_tx_packets;
- session_event_t *evt;
- svm_msg_q_t *mq;
+ int n_tx_packets;
SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
- wrk->last_vlib_time = vlib_time_now (vm);
- wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
+ session_wrk_update_time (wrk, vlib_time_now (vm));
/*
* Update transport time
SESSION_EVT (SESSION_EVT_DSP_CNTRS, UPDATE_TIME, wrk);
/*
- * Dequeue and handle new events
+ * Dequeue new internal mq events
*/
- /* Try to dequeue what is available. Don't wait for lock.
- * XXX: we may need priorities here */
- mq = wrk->vpp_event_queue;
- n_to_dequeue = svm_msg_q_size (mq);
- if (n_to_dequeue && svm_msg_q_try_lock (mq) == 0)
- {
- for (i = 0; i < n_to_dequeue; i++)
- {
- svm_msg_q_sub_w_lock (mq, msg);
- evt = svm_msg_q_msg_data (mq, msg);
- session_evt_add_to_list (wrk, evt);
- svm_msg_q_free_msg (mq, msg);
- }
- svm_msg_q_unlock (mq);
- }
-
- SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_to_dequeue, !i);
+ n_evts = session_wrk_handle_mq (wrk, wrk->vpp_event_queue);
+ SESSION_EVT (SESSION_EVT_DSP_CNTRS, MQ_DEQ, wrk, n_evts);
/*
* Handle control events
ctrl_he = pool_elt_at_index (wrk->event_elts, wrk->ctrl_head);
- /* *INDENT-OFF* */
clib_llist_foreach_safe (wrk->event_elts, evt_list, ctrl_he, elt, ({
clib_llist_remove (wrk->event_elts, evt_list, elt);
session_event_dispatch_ctrl (wrk, elt);
}));
- /* *INDENT-ON* */
SESSION_EVT (SESSION_EVT_DSP_CNTRS, CTRL_EVTS, wrk);
old_ti = clib_llist_prev_index (old_he, evt_list);
ei = clib_llist_next_index (new_he, evt_list);
- while (ei != wrk->new_head && n_tx_packets < VLIB_FRAME_SIZE)
+ while (ei != wrk->new_head && n_tx_packets < SESSION_NODE_FRAME_SIZE)
{
elt = pool_elt_at_index (wrk->event_elts, ei);
ei = clib_llist_next_index (elt, evt_list);
old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
ei = clib_llist_next_index (old_he, evt_list);
- while (n_tx_packets < VLIB_FRAME_SIZE)
+ while (n_tx_packets < SESSION_NODE_FRAME_SIZE)
{
elt = pool_elt_at_index (wrk->event_elts, ei);
next_ei = clib_llist_next_index (elt, evt_list);
SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
+ if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
+ session_wrk_update_state (wrk);
+
return n_tx_packets;
}
};
/* *INDENT-ON* */
+static clib_error_t *
+session_wrk_tfd_read_ready (clib_file_t *cf)
+{
+ session_worker_t *wrk = session_main_get_worker (cf->private_data);
+ u64 buf;
+ int rv;
+
+ vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
+ rv = read (wrk->timerfd, &buf, sizeof (buf));
+ if (rv < 0 && errno != EAGAIN)
+ clib_unix_warning ("failed");
+ return 0;
+}
+
+static clib_error_t *
+session_wrk_tfd_write_ready (clib_file_t *cf)
+{
+ return 0;
+}
+
+void
+session_wrk_enable_adaptive_mode (session_worker_t *wrk)
+{
+ u32 thread_index = wrk->vm->thread_index;
+ clib_file_t template = { 0 };
+
+ if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
+ clib_warning ("timerfd_create");
+
+ template.read_function = session_wrk_tfd_read_ready;
+ template.write_function = session_wrk_tfd_write_ready;
+ template.file_descriptor = wrk->timerfd;
+ template.private_data = thread_index;
+ template.polling_thread_index = thread_index;
+ template.description = format (0, "session-wrk-tfd-%u", thread_index);
+
+ wrk->timerfd_file = clib_file_add (&file_main, &template);
+ wrk->flags |= SESSION_WRK_F_ADAPTIVE;
+}
+
static clib_error_t *
session_queue_exit (vlib_main_t * vm)
{
- if (vec_len (vlib_mains) < 2)
+ if (vlib_get_n_threads () < 2)
return 0;
/*
session_queue_run_on_main (vm);
break;
case SESSION_Q_PROCESS_STOP:
+ vlib_node_set_state (vm, session_queue_process_node.index,
+ VLIB_NODE_STATE_DISABLED);
timeout = 100000.0;
break;
case ~0: