From 60183db3a8b25714539882cca05ba3b9e9e54489 Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Sat, 20 Jul 2019 15:53:16 -0700 Subject: [PATCH] session: reorganize dispatch logic Type:refactor Change-Id: Id796d0103e61e15c35a586d8cbd3d8916487b84d Signed-off-by: Florin Coras --- src/vnet/session/session.c | 4 +- src/vnet/session/session.h | 29 ++--- src/vnet/session/session_node.c | 261 ++++++++++++++++++++-------------------- 3 files changed, 141 insertions(+), 153 deletions(-) diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 4e340687041..ea1a8c9fc5e 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -1377,12 +1377,12 @@ session_manager_main_enable (vlib_main_t * vm) { wrk = &smm->wrk[i]; wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list); - wrk->pending_head = clib_llist_make_head (wrk->event_elts, evt_list); + wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list); wrk->postponed_head = clib_llist_make_head (wrk->event_elts, evt_list); wrk->disconnects_head = clib_llist_make_head (wrk->event_elts, evt_list); + wrk->vm = vlib_mains[i]; wrk->last_vlib_time = vlib_time_now (vlib_mains[i]); - wrk->dispatch_period = 500e-6; if (num_threads > 1) clib_rwlock_init (&smm->wrk[i].peekers_rw_locks); diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index 1e3229171da..a3f2a01929b 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -78,12 +78,12 @@ typedef struct session_worker_ /** vpp event message queue for worker */ svm_msg_q_t *vpp_event_queue; - /** Our approximation of a "complete" dispatch loop period */ - f64 dispatch_period; - /** vlib_time_now last time around the track */ f64 last_vlib_time; + /** Convenience pointer to this worker's vlib_main */ + vlib_main_t *vm; + /** Per-proto vector of sessions to enqueue */ u32 *session_to_enqueue[TRANSPORT_N_PROTO]; @@ -100,7 +100,7 @@ typedef struct session_worker_ clib_llist_index_t new_head; /** Head of list of pending events */ - clib_llist_index_t pending_head; + clib_llist_index_t old_head; /** Head of list of postponed events */ clib_llist_index_t postponed_head; @@ -111,18 +111,15 @@ typedef struct session_worker_ /** Peekers rw lock */ clib_rwlock_t peekers_rw_locks; - u32 last_tx_packets; - #if SESSION_DEBUG /** last event poll time by thread */ f64 last_event_poll; #endif } session_worker_t; -typedef int (session_fifo_rx_fn) (vlib_main_t * vm, +typedef int (session_fifo_rx_fn) (session_worker_t * wrk, vlib_node_runtime_t * node, - session_worker_t * wrk, - session_evt_elt_t * e, int *n_tx_pkts); + session_evt_elt_t * e, int *n_tx_packets); extern session_fifo_rx_fn session_tx_fifo_peek_and_snd; extern session_fifo_rx_fn session_tx_fifo_dequeue_and_snd; @@ -207,9 +204,9 @@ session_evt_elt_free (session_worker_t * wrk, session_evt_elt_t * elt) } static inline session_evt_elt_t * -session_evt_pending_head (session_worker_t * wrk) +session_evt_old_head (session_worker_t * wrk) { - return pool_elt_at_index (wrk->event_elts, wrk->pending_head); + return pool_elt_at_index (wrk->event_elts, wrk->old_head); } static inline session_evt_elt_t * @@ -225,10 +222,10 @@ session_evt_pending_disconnects_head (session_worker_t * wrk) } static inline void -session_evt_add_pending (session_worker_t * wrk, session_evt_elt_t * elt) +session_evt_add_old (session_worker_t * wrk, session_evt_elt_t * elt) { clib_llist_add_tail (wrk->event_elts, evt_list, elt, - session_evt_pending_head (wrk)); + session_evt_old_head (wrk)); } static inline void @@ -477,12 +474,6 @@ transport_rx_fifo_has_ooo_data (transport_connection_t * tc) return svm_fifo_has_ooo_data (s->rx_fifo); } -always_inline f64 -transport_dispatch_period (u32 thread_index) -{ - return session_main.wrk[thread_index].dispatch_period; -} - always_inline f64 transport_time_now (u32 thread_index) { diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index bfe7702388c..999776f02bd 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -613,16 +613,17 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx, } always_inline int -session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, - session_worker_t * wrk, - session_evt_elt_t * elt, int *n_tx_packets, - u8 peek_data) +session_tx_fifo_read_and_snd_i (session_worker_t * wrk, + vlib_node_runtime_t * node, + session_evt_elt_t * elt, + int *n_tx_packets, u8 peek_data) { - u32 next_index, next0, next1, *to_next, n_left_to_next, n_left, pbi; - u32 n_trace = vlib_get_trace_count (vm, node), n_bufs_needed = 0; - session_main_t *smm = &session_main; + u32 next_index, next0, next1, *to_next, n_left_to_next; + u32 n_trace, n_bufs_needed = 0, n_left, pbi; session_tx_context_t *ctx = &wrk->ctx; + session_main_t *smm = &session_main; session_event_t *e = &elt->evt; + vlib_main_t *vm = wrk->vm; transport_proto_t tp; vlib_buffer_t *pb; u16 n_bufs, rv; @@ -630,7 +631,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, if (PREDICT_FALSE ((rv = session_tx_not_ready (ctx->s, peek_data)))) { if (rv < 2) - session_evt_add_pending (wrk, elt); + session_evt_add_old (wrk, elt); return SESSION_TX_NO_DATA; } @@ -649,12 +650,13 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, } ctx->snd_space = transport_connection_snd_space (ctx->tc, - vm->clib_time. + wrk->vm->clib_time. last_cpu_time, ctx->snd_mss); + if (ctx->snd_space == 0 || ctx->snd_mss == 0) { - session_evt_add_pending (wrk, elt); + session_evt_add_old (wrk, elt); return SESSION_TX_NO_DATA; } @@ -676,7 +678,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, { if (n_bufs) vlib_buffer_free (vm, wrk->tx_buffers, n_bufs); - session_evt_add_pending (wrk, elt); + session_evt_add_old (wrk, elt); return SESSION_TX_NO_BUFFERS; } @@ -757,13 +759,13 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, n_left_to_next, bi0, next0); } - if (PREDICT_FALSE (n_trace > 0)) + if (PREDICT_FALSE ((n_trace = vlib_get_trace_count (vm, node)) > 0)) session_tx_trace_frame (vm, node, next_index, to_next, ctx->n_segs_per_evt, ctx->s, n_trace); + if (PREDICT_FALSE (n_bufs)) - { - vlib_buffer_free (vm, wrk->tx_buffers, n_bufs); - } + vlib_buffer_free (vm, wrk->tx_buffers, n_bufs); + *n_tx_packets += ctx->n_segs_per_evt; transport_connection_update_tx_stats (ctx->tc, ctx->max_len_to_snd); vlib_put_next_frame (vm, node, next_index, n_left_to_next); @@ -775,7 +777,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, ASSERT (ctx->left_to_snd == 0); if (ctx->max_len_to_snd < ctx->max_dequeue) if (svm_fifo_set_event (ctx->s->tx_fifo)) - session_evt_add_pending (wrk, elt); + session_evt_add_old (wrk, elt); if (!peek_data && ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM) @@ -787,7 +789,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, /* More data needs to be read */ else if (svm_fifo_max_dequeue_cons (ctx->s->tx_fifo) > 0) if (svm_fifo_set_event (ctx->s->tx_fifo)) - session_evt_add_pending (wrk, elt); + session_evt_add_old (wrk, elt); if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, ctx->max_len_to_snd)) session_dequeue_notify (ctx->s); @@ -796,26 +798,25 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, } int -session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node, - session_worker_t * wrk, - session_evt_elt_t * e, int *n_tx_pkts) +session_tx_fifo_peek_and_snd (session_worker_t * wrk, + vlib_node_runtime_t * node, + session_evt_elt_t * e, int *n_tx_packets) { - return session_tx_fifo_read_and_snd_i (vm, node, wrk, e, n_tx_pkts, 1); + return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 1); } int -session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node, - session_worker_t * wrk, - session_evt_elt_t * e, int *n_tx_pkts) +session_tx_fifo_dequeue_and_snd (session_worker_t * wrk, + vlib_node_runtime_t * node, + session_evt_elt_t * e, int *n_tx_packets) { - return session_tx_fifo_read_and_snd_i (vm, node, wrk, e, n_tx_pkts, 0); + return session_tx_fifo_read_and_snd_i (wrk, node, e, n_tx_packets, 0); } int -session_tx_fifo_dequeue_internal (vlib_main_t * vm, +session_tx_fifo_dequeue_internal (session_worker_t * wrk, vlib_node_runtime_t * node, - session_worker_t * wrk, - session_evt_elt_t * e, int *n_tx_pkts) + session_evt_elt_t * e, int *n_tx_packets) { session_t *s = wrk->ctx.s; @@ -831,16 +832,100 @@ session_event_get_session (session_event_t * e, u8 thread_index) return session_get_if_valid (e->session_index, thread_index); } -static void -session_update_dispatch_period (session_worker_t * wrk, f64 now, - u32 thread_index) +always_inline void +session_event_dispatch (session_worker_t * wrk, vlib_node_runtime_t * node, + session_evt_elt_t * elt, u32 thread_index, + int *n_tx_packets) { - if (wrk->last_tx_packets) + session_main_t *smm = &session_main; + app_worker_t *app_wrk; + void (*fp) (void *); + session_event_t *e; + session_t *s; + int rv; + + e = &elt->evt; + switch (e->event_type) { - f64 sample = now - wrk->last_vlib_time; - wrk->dispatch_period = (wrk->dispatch_period + sample) * 0.5; + case SESSION_IO_EVT_TX_FLUSH: + case SESSION_IO_EVT_TX: + /* Don't try to send more that one frame per dispatch cycle */ + if (*n_tx_packets == VLIB_FRAME_SIZE) + { + session_evt_add_postponed (wrk, elt); + return; + } + + s = session_event_get_session (e, thread_index); + if (PREDICT_FALSE (!s)) + { + clib_warning ("session was freed!"); + break; + } + CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD); + wrk->ctx.s = s; + /* Spray packets in per session type frames, since they go to + * different nodes */ + rv = (smm->session_tx_fns[s->session_type]) (wrk, node, elt, + n_tx_packets); + if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS)) + { + vlib_node_increment_counter (wrk->vm, node->node_index, + SESSION_QUEUE_ERROR_NO_BUFFER, 1); + break; + } + break; + case SESSION_IO_EVT_RX: + s = session_event_get_session (e, thread_index); + if (!s) + break; + transport_app_rx_evt (session_get_transport_proto (s), + s->connection_index, s->thread_index); + break; + case SESSION_CTRL_EVT_CLOSE: + s = session_get_from_handle_if_valid (e->session_handle); + if (PREDICT_FALSE (!s)) + break; + session_transport_close (s); + break; + case SESSION_IO_EVT_BUILTIN_RX: + s = session_event_get_session (e, thread_index); + if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING)) + break; + svm_fifo_unset_event (s->rx_fifo); + app_wrk = app_worker_get (s->app_wrk_index); + app_worker_builtin_rx (app_wrk, s); + break; + case SESSION_IO_EVT_BUILTIN_TX: + s = session_get_from_handle_if_valid (e->session_handle); + wrk->ctx.s = s; + if (PREDICT_TRUE (s != 0)) + session_tx_fifo_dequeue_internal (wrk, node, elt, n_tx_packets); + break; + case SESSION_CTRL_EVT_RPC: + fp = e->rpc_args.fp; + (*fp) (e->rpc_args.arg); + break; + case SESSION_CTRL_EVT_DISCONNECTED: + session_mq_disconnected_handler (e->data); + break; + case SESSION_CTRL_EVT_ACCEPTED_REPLY: + session_mq_accepted_reply_handler (e->data); + break; + case SESSION_CTRL_EVT_CONNECTED_REPLY: + break; + case SESSION_CTRL_EVT_DISCONNECTED_REPLY: + session_mq_disconnected_reply_handler (e->data); + break; + case SESSION_CTRL_EVT_RESET_REPLY: + session_mq_reset_reply_handler (e->data); + break; + case SESSION_CTRL_EVT_WORKER_UPDATE: + session_mq_worker_update_handler (e->data); + break; + default: + clib_warning ("unhandled event type %d", e->event_type); } - wrk->last_vlib_time = now; } static uword @@ -850,22 +935,20 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, session_main_t *smm = vnet_get_session_main (); u32 thread_index = vm->thread_index, n_to_dequeue; session_worker_t *wrk = &smm->wrk[thread_index]; - session_evt_elt_t *elt, *new_he, *new_te, *pending_he; + session_evt_elt_t *elt, *new_he, *new_te, *old_he; session_evt_elt_t *disconnects_he, *postponed_he; svm_msg_q_msg_t _msg, *msg = &_msg; - f64 now = vlib_time_now (vm); - int n_tx_packets = 0, i, rv; - app_worker_t *app_wrk; + int i, n_tx_packets = 0; svm_msg_q_t *mq; - void (*fp) (void *); SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk); + wrk->last_vlib_time = vlib_time_now (vm); + /* * Update transport time */ - session_update_dispatch_period (wrk, now, thread_index); - transport_update_time (now, thread_index); + transport_update_time (wrk->last_vlib_time, thread_index); /* Make sure postponed events are handled first */ new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head); @@ -896,106 +979,22 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, svm_msg_q_unlock (mq); } - pending_he = pool_elt_at_index (wrk->event_elts, wrk->pending_head); - postponed_he = pool_elt_at_index (wrk->event_elts, wrk->postponed_head); + old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head); disconnects_he = pool_elt_at_index (wrk->event_elts, wrk->disconnects_head); new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he); - clib_llist_splice (wrk->event_elts, evt_list, new_te, pending_he); + clib_llist_splice (wrk->event_elts, evt_list, new_te, old_he); new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he); clib_llist_splice (wrk->event_elts, evt_list, new_te, disconnects_he); while (!clib_llist_is_empty (wrk->event_elts, evt_list, new_he)) { clib_llist_index_t ei; - session_event_t *e; - session_t *s; clib_llist_pop_first (wrk->event_elts, evt_list, elt, new_he); ei = clib_llist_entry_index (wrk->event_elts, elt); - e = &elt->evt; - switch (e->event_type) - { - case SESSION_IO_EVT_TX_FLUSH: - case SESSION_IO_EVT_TX: - /* Don't try to send more that one frame per dispatch cycle */ - if (n_tx_packets == VLIB_FRAME_SIZE) - { - session_evt_add_postponed (wrk, elt); - continue; - } - s = session_event_get_session (e, thread_index); - if (PREDICT_FALSE (!s)) - { - clib_warning ("session was freed!"); - break; - } - CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD); - wrk->ctx.s = s; - /* Spray packets in per session type frames, since they go to - * different nodes */ - rv = (smm->session_tx_fns[s->session_type]) (vm, node, wrk, elt, - &n_tx_packets); - if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS)) - { - vlib_node_increment_counter (vm, node->node_index, - SESSION_QUEUE_ERROR_NO_BUFFER, 1); - break; - } - break; - case SESSION_IO_EVT_RX: - s = session_event_get_session (e, thread_index); - if (!s) - break; - transport_app_rx_evt (session_get_transport_proto (s), - s->connection_index, s->thread_index); - break; - case SESSION_CTRL_EVT_CLOSE: - s = session_get_from_handle_if_valid (e->session_handle); - if (PREDICT_FALSE (!s)) - break; - session_transport_close (s); - break; - case SESSION_IO_EVT_BUILTIN_RX: - s = session_event_get_session (e, thread_index); - if (PREDICT_FALSE (!s || s->session_state >= SESSION_STATE_CLOSING)) - break; - svm_fifo_unset_event (s->rx_fifo); - app_wrk = app_worker_get (s->app_wrk_index); - app_worker_builtin_rx (app_wrk, s); - break; - case SESSION_IO_EVT_BUILTIN_TX: - s = session_get_from_handle_if_valid (e->session_handle); - wrk->ctx.s = s; - if (PREDICT_TRUE (s != 0)) - session_tx_fifo_dequeue_internal (vm, node, wrk, elt, - &n_tx_packets); - break; - case SESSION_CTRL_EVT_RPC: - fp = e->rpc_args.fp; - (*fp) (e->rpc_args.arg); - break; - case SESSION_CTRL_EVT_DISCONNECTED: - session_mq_disconnected_handler (e->data); - break; - case SESSION_CTRL_EVT_ACCEPTED_REPLY: - session_mq_accepted_reply_handler (e->data); - break; - case SESSION_CTRL_EVT_CONNECTED_REPLY: - break; - case SESSION_CTRL_EVT_DISCONNECTED_REPLY: - session_mq_disconnected_reply_handler (e->data); - break; - case SESSION_CTRL_EVT_RESET_REPLY: - session_mq_reset_reply_handler (e->data); - break; - case SESSION_CTRL_EVT_WORKER_UPDATE: - session_mq_worker_update_handler (e->data); - break; - default: - clib_warning ("unhandled event type %d", e->event_type); - } + session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets); /* Regrab elements in case pool moved */ elt = pool_elt_at_index (wrk->event_elts, ei); @@ -1005,8 +1004,6 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head); } - wrk->last_tx_packets = n_tx_packets; - vlib_node_increment_counter (vm, session_queue_node.index, SESSION_QUEUE_ERROR_TX, n_tx_packets); @@ -1154,7 +1151,7 @@ session_node_lookup_fifo_event (svm_fifo_t * f, session_event_t * e) /* *INDENT-OFF* */ clib_llist_foreach (wrk->event_elts, evt_list, - session_evt_pending_head (wrk), elt, ({ + session_evt_old_head (wrk), elt, ({ found = session_node_cmp_event (&elt->evt, f); if (found) { -- 2.16.6