From 1033b4997e4194e5172b7b12ddf638bfdfdaae22 Mon Sep 17 00:00:00 2001 From: Damjan Marion Date: Wed, 3 Jun 2020 12:20:41 +0200 Subject: [PATCH] vlib: improve node interrupt handling - add ability to pass data together with interrupt - avoid locking for local interrupts (same thread) Type: improvement Change-Id: I73a2ab2e716bb887a1f02c87788ae83e329f9b40 Signed-off-by: Damjan Marion --- src/vlib/main.c | 81 +++++++++++++++++++++++++++------------------------ src/vlib/node.h | 17 +++++++---- src/vlib/node_funcs.h | 30 ++++++++++++++++--- 3 files changed, 81 insertions(+), 47 deletions(-) diff --git a/src/vlib/main.c b/src/vlib/main.c index c3ca8b153f0..2e100b24df7 100644 --- a/src/vlib/main.c +++ b/src/vlib/main.c @@ -1702,6 +1702,26 @@ vl_api_send_pending_rpc_requests (vlib_main_t * vm) { } +static_always_inline u64 +dispatch_pending_interrupts (vlib_main_t * vm, vlib_node_main_t * nm, + u64 cpu_time_now) +{ + vlib_node_runtime_t *n; + + for (int i = 0; i < _vec_len (nm->pending_local_interrupts); i++) + { + vlib_node_interrupt_t *in; + in = vec_elt_at_index (nm->pending_local_interrupts, i); + n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT], + in->node_runtime_index); + n->interrupt_data = in->data; + cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT, + VLIB_NODE_STATE_INTERRUPT, /* frame */ 0, + cpu_time_now); + } + vec_reset_length (nm->pending_local_interrupts); + return cpu_time_now; +} static_always_inline void vlib_main_or_worker_loop (vlib_main_t * vm, int is_main) @@ -1712,7 +1732,6 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main) u64 cpu_time_now; f64 now; vlib_frame_queue_main_t *fqm; - u32 *last_node_runtime_indices = 0; u32 frame_queue_check_counter = 0; /* Initialize pending node vector. */ @@ -1732,10 +1751,9 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main) cpu_time_now = clib_cpu_time_now (); /* Pre-allocate interupt runtime indices and lock. */ - vec_alloc (nm->pending_interrupt_node_runtime_indices, 32); - vec_alloc (last_node_runtime_indices, 32); - if (!is_main) - clib_spinlock_init (&nm->pending_interrupt_lock); + vec_alloc (nm->pending_local_interrupts, 32); + vec_alloc (nm->pending_remote_interrupts, 32); + clib_spinlock_init (&nm->pending_interrupt_lock); /* Pre-allocate expired nodes. */ if (!nm->polling_threshold_vector_length) @@ -1821,40 +1839,27 @@ vlib_main_or_worker_loop (vlib_main_t * vm, int is_main) if (PREDICT_TRUE (is_main && vm->queue_signal_pending == 0)) vm->queue_signal_callback (vm); - /* Next handle interrupts. */ - { - /* unlocked read, for performance */ - uword l = _vec_len (nm->pending_interrupt_node_runtime_indices); - uword i; - if (PREDICT_FALSE (l > 0)) - { - u32 *tmp; - if (!is_main) - { - clib_spinlock_lock (&nm->pending_interrupt_lock); - /* Re-read w/ lock held, in case another thread added an item */ - l = _vec_len (nm->pending_interrupt_node_runtime_indices); - } + /* handle local interruots */ + if (_vec_len (nm->pending_local_interrupts)) + cpu_time_now = dispatch_pending_interrupts (vm, nm, cpu_time_now); + + /* handle remote interruots */ + if (_vec_len (nm->pending_remote_interrupts)) + { + vlib_node_interrupt_t *in; + + /* at this point it is known that + * vec_len (nm->pending_local_interrupts) is zero so we quickly swap + * local and remote vector under the spinlock */ + clib_spinlock_lock (&nm->pending_interrupt_lock); + in = nm->pending_local_interrupts; + nm->pending_local_interrupts = nm->pending_remote_interrupts; + nm->pending_remote_interrupts = in; + clib_spinlock_unlock (&nm->pending_interrupt_lock); + + cpu_time_now = dispatch_pending_interrupts (vm, nm, cpu_time_now); + } - tmp = nm->pending_interrupt_node_runtime_indices; - nm->pending_interrupt_node_runtime_indices = - last_node_runtime_indices; - last_node_runtime_indices = tmp; - _vec_len (last_node_runtime_indices) = 0; - if (!is_main) - clib_spinlock_unlock (&nm->pending_interrupt_lock); - for (i = 0; i < l; i++) - { - n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT], - last_node_runtime_indices[i]); - cpu_time_now = - dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT, - VLIB_NODE_STATE_INTERRUPT, - /* frame */ 0, - cpu_time_now); - } - } - } /* Input nodes may have added work to the pending vector. Process pending vector until there is nothing left. All pending vectors will be processed from input -> output. */ diff --git a/src/vlib/node.h b/src/vlib/node.h index 1bdb3bb7797..ca7564a13e7 100644 --- a/src/vlib/node.h +++ b/src/vlib/node.h @@ -471,10 +471,6 @@ typedef struct vlib_node_runtime_t vlib_error_t *errors; /**< Vector of errors for this node. */ -#if __SIZEOF_POINTER__ == 4 - u8 pad[8]; -#endif - u32 clocks_since_last_overflow; /**< Number of clock cycles. */ u32 max_clock; /**< Maximum clock cycle for an @@ -512,6 +508,10 @@ typedef struct vlib_node_runtime_t u16 state; /**< Input node state. */ + u32 interrupt_data; /**< Data passed together with interrupt. + Valid only when state is + VLIB_NODE_STATE_INTERRUPT */ + u16 n_next_nodes; u16 cached_next_index; /**< Next frame index that vector @@ -674,6 +674,12 @@ vlib_timing_wheel_data_get_index (u32 d) return d / 2; } +typedef struct +{ + u32 node_runtime_index; + u32 data; +} vlib_node_interrupt_t; + typedef struct { /* Public nodes. */ @@ -690,7 +696,8 @@ typedef struct vlib_node_runtime_t *nodes_by_type[VLIB_N_NODE_TYPE]; /* Node runtime indices for input nodes with pending interrupts. */ - u32 *pending_interrupt_node_runtime_indices; + vlib_node_interrupt_t *pending_local_interrupts; + vlib_node_interrupt_t *pending_remote_interrupts; clib_spinlock_t pending_interrupt_lock; /* Input nodes are switched from/to interrupt to/from polling mode diff --git a/src/vlib/node_funcs.h b/src/vlib/node_funcs.h index d6d04fb16c2..263017d0ec7 100644 --- a/src/vlib/node_funcs.h +++ b/src/vlib/node_funcs.h @@ -194,14 +194,36 @@ vlib_node_get_state (vlib_main_t * vm, u32 node_index) } always_inline void -vlib_node_set_interrupt_pending (vlib_main_t * vm, u32 node_index) +vlib_node_set_interrupt_pending_with_data (vlib_main_t * vm, u32 node_index, + u32 data) { vlib_node_main_t *nm = &vm->node_main; vlib_node_t *n = vec_elt (nm->nodes, node_index); + vlib_node_interrupt_t *i; ASSERT (n->type == VLIB_NODE_TYPE_INPUT); - clib_spinlock_lock_if_init (&nm->pending_interrupt_lock); - vec_add1 (nm->pending_interrupt_node_runtime_indices, n->runtime_index); - clib_spinlock_unlock_if_init (&nm->pending_interrupt_lock); + + if (vm == vlib_get_main ()) + { + /* local thread */ + vec_add2 (nm->pending_local_interrupts, i, 1); + i->node_runtime_index = n->runtime_index; + i->data = data; + } + else + { + /* remote thread */ + clib_spinlock_lock (&nm->pending_interrupt_lock); + vec_add2 (nm->pending_remote_interrupts, i, 1); + i->node_runtime_index = n->runtime_index; + i->data = data; + clib_spinlock_unlock (&nm->pending_interrupt_lock); + } +} + +always_inline void +vlib_node_set_interrupt_pending (vlib_main_t * vm, u32 node_index) +{ + vlib_node_set_interrupt_pending_with_data (vm, node_index, 0); } always_inline vlib_process_t * -- 2.16.6