From aaef1eb92bead2411dfe888c05839861538e353f Mon Sep 17 00:00:00 2001 From: Damjan Marion Date: Tue, 8 Nov 2016 17:37:01 +0100 Subject: [PATCH] threads: add support for multiple worker handoff queues Change-Id: I2452df3c493eeb0a5078d53a230df6906651c057 Signed-off-by: Damjan Marion --- vlib/vlib/threads.c | 68 +++++++++--------- vlib/vlib/threads.h | 107 ++++++++++++++++++++++++++-- vlib/vlib/threads_cli.c | 161 ++++++++++++++++++++++++++++++++++--------- vnet/vnet/devices/dpdk/cli.c | 37 +--------- vnet/vnet/handoff.c | 23 ++++--- vnet/vnet/handoff.h | 79 --------------------- 6 files changed, 281 insertions(+), 194 deletions(-) diff --git a/vlib/vlib/threads.c b/vlib/vlib/threads.c index 5581d43a55e..70d4019a1fd 100644 --- a/vlib/vlib/threads.c +++ b/vlib/vlib/threads.c @@ -556,7 +556,6 @@ start_workers (vlib_main_t * vm) vlib_worker_thread_t *w; vlib_main_t *vm_clone; void *oldheap; - vlib_frame_queue_t *fq; vlib_thread_main_t *tm = &vlib_thread_main; vlib_thread_registration_t *tr; vlib_node_runtime_t *rt; @@ -594,11 +593,6 @@ start_workers (vlib_main_t * vm) _vec_len (vlib_mains) = 0; vec_add1 (vlib_mains, vm); - vec_validate (vlib_frame_queues, tm->n_vlib_mains - 1); - _vec_len (vlib_frame_queues) = 0; - fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS); - vec_add1 (vlib_frame_queues, fq); - vlib_worker_threads->wait_at_barrier = clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES); vlib_worker_threads->workers_at_barrier = @@ -645,19 +639,6 @@ start_workers (vlib_main_t * vm) if (tr->no_data_structure_clone) continue; - /* Allocate "to-worker-N" frame queue */ - if (tr->frame_queue_nelts) - { - fq = vlib_frame_queue_alloc (tr->frame_queue_nelts); - } - else - { - fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS); - } - - vec_validate (vlib_frame_queues, worker_thread_index); - vlib_frame_queues[worker_thread_index] = fq; - /* Fork vlib_global_main et al. Look for bugs here */ oldheap = clib_mem_set_heap (w->thread_mheap); @@ -1241,10 +1222,11 @@ vlib_worker_thread_barrier_release (vlib_main_t * vm) * the handoff node. */ static inline int -vlib_frame_queue_dequeue_internal (vlib_main_t * vm) +vlib_frame_queue_dequeue_internal (vlib_main_t * vm, + vlib_frame_queue_main_t * fqm) { u32 thread_id = vm->cpu_index; - vlib_frame_queue_t *fq = vlib_frame_queues[thread_id]; + vlib_frame_queue_t *fq = fqm->vlib_frame_queues[thread_id]; vlib_frame_queue_elt_t *elt; u32 *from, *to; vlib_frame_t *f; @@ -1252,12 +1234,11 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm) int processed = 0; u32 n_left_to_node; u32 vectors = 0; - vlib_thread_main_t *tm = vlib_get_thread_main (); ASSERT (fq); ASSERT (vm == vlib_mains[thread_id]); - if (PREDICT_FALSE (tm->handoff_dispatch_node_index == ~0)) + if (PREDICT_FALSE (fqm->node_index == ~0)) return 0; /* * Gather trace data for frame queues @@ -1268,7 +1249,7 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm) frame_queue_nelt_counter_t *fqh; u32 elix; - fqt = &tm->frame_queue_traces[thread_id]; + fqt = &fqm->frame_queue_traces[thread_id]; fqt->nelts = fq->nelts; fqt->head = fq->head; @@ -1283,7 +1264,7 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm) } /* Record the number of elements in use in the histogram */ - fqh = &tm->frame_queue_histogram[thread_id]; + fqh = &fqm->frame_queue_histogram[thread_id]; fqh->count[fqt->n_in_use]++; /* Record a snapshot of the elements in use */ @@ -1320,7 +1301,7 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm) ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME); ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE); - f = vlib_get_frame_to_node (vm, tm->handoff_dispatch_node_index); + f = vlib_get_frame_to_node (vm, fqm->node_index); to = vlib_frame_vector_args (f); @@ -1347,7 +1328,7 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm) vectors += elt->n_vectors; f->n_vectors = elt->n_vectors; - vlib_put_frame_to_node (vm, tm->handoff_dispatch_node_index, f); + vlib_put_frame_to_node (vm, fqm->node_index, f); elt->valid = 0; elt->n_vectors = 0; @@ -1373,7 +1354,9 @@ static_always_inline void vlib_worker_thread_internal (vlib_main_t * vm) { vlib_node_main_t *nm = &vm->node_main; + vlib_thread_main_t *tm = vlib_get_thread_main (); u64 cpu_time_now = clib_cpu_time_now (); + vlib_frame_queue_main_t *fqm; vec_alloc (nm->pending_interrupt_node_runtime_indices, 32); @@ -1381,7 +1364,8 @@ vlib_worker_thread_internal (vlib_main_t * vm) { vlib_worker_thread_barrier_check (); - vlib_frame_queue_dequeue_internal (vm); + vec_foreach (fqm, tm->frame_queue_mains) + vlib_frame_queue_dequeue_internal (vm, fqm); vlib_node_runtime_t *n; vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]) @@ -1465,13 +1449,35 @@ VLIB_REGISTER_THREAD (worker_thread_reg, static) = { }; /* *INDENT-ON* */ -clib_error_t * -threads_init (vlib_main_t * vm) +u32 +vlib_frame_queue_main_init (u32 node_index, u32 frame_queue_nelts) { vlib_thread_main_t *tm = vlib_get_thread_main (); + vlib_frame_queue_main_t *fqm; + vlib_frame_queue_t *fq; + int i; + + if (frame_queue_nelts == 0) + frame_queue_nelts = FRAME_QUEUE_NELTS; - tm->handoff_dispatch_node_index = ~0; + vec_add2 (tm->frame_queue_mains, fqm, 1); + fqm->node_index = node_index; + + vec_validate (fqm->vlib_frame_queues, tm->n_vlib_mains - 1); + _vec_len (fqm->vlib_frame_queues) = 0; + for (i = 0; i < tm->n_vlib_mains; i++) + { + fq = vlib_frame_queue_alloc (frame_queue_nelts); + vec_add1 (fqm->vlib_frame_queues, fq); + } + + return (fqm - tm->frame_queue_mains); +} + +clib_error_t * +threads_init (vlib_main_t * vm) +{ return 0; } diff --git a/vlib/vlib/threads.h b/vlib/vlib/threads.h index e264435c3ed..c2db86442aa 100644 --- a/vlib/vlib/threads.h +++ b/vlib/vlib/threads.h @@ -141,7 +141,15 @@ typedef struct } vlib_frame_queue_t; -vlib_frame_queue_t **vlib_frame_queues; +typedef struct +{ + u32 node_index; + vlib_frame_queue_t **vlib_frame_queues; + + /* for frame queue tracing */ + frame_queue_trace_t *frame_queue_traces; + frame_queue_nelt_counter_t *frame_queue_histogram; +} vlib_frame_queue_main_t; /* Called early, in thread 0's context */ clib_error_t *vlib_thread_init (vlib_main_t * vm); @@ -170,6 +178,7 @@ void vlib_create_worker_threads (vlib_main_t * vm, int n, void (*thread_function) (void *)); void vlib_worker_thread_init (vlib_worker_thread_t * w); +u32 vlib_frame_queue_main_init (u32 node_index, u32 frame_queue_nelts); /* Check for a barrier sync request every 30ms */ #define BARRIER_SYNC_DELAY (0.030000) @@ -321,12 +330,8 @@ typedef struct vlib_efd_t efd; - /* handoff node index */ - u32 handoff_dispatch_node_index; - - /* for frame queue tracing */ - frame_queue_trace_t *frame_queue_traces; - frame_queue_nelt_counter_t *frame_queue_histogram; + /* Worker handoff queues */ + vlib_frame_queue_main_t *frame_queue_mains; /* worker thread initialization barrier */ volatile u32 worker_thread_release; @@ -388,6 +393,94 @@ vlib_get_worker_vlib_main (u32 worker_index) return vm; } +static inline void +vlib_put_frame_queue_elt (vlib_frame_queue_elt_t * hf) +{ + CLIB_MEMORY_BARRIER (); + hf->valid = 1; +} + +static inline vlib_frame_queue_elt_t * +vlib_get_frame_queue_elt (u32 frame_queue_index, u32 index) +{ + vlib_frame_queue_t *fq; + vlib_frame_queue_elt_t *elt; + vlib_thread_main_t *tm = &vlib_thread_main; + vlib_frame_queue_main_t *fqm = + vec_elt_at_index (tm->frame_queue_mains, frame_queue_index); + u64 new_tail; + + fq = fqm->vlib_frame_queues[index]; + ASSERT (fq); + + new_tail = __sync_add_and_fetch (&fq->tail, 1); + + /* Wait until a ring slot is available */ + while (new_tail >= fq->head_hint + fq->nelts) + vlib_worker_thread_barrier_check (); + + elt = fq->elts + (new_tail & (fq->nelts - 1)); + + /* this would be very bad... */ + while (elt->valid) + ; + + elt->msg_type = VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME; + elt->last_n_vectors = elt->n_vectors = 0; + + return elt; +} + +static inline vlib_frame_queue_t * +is_vlib_frame_queue_congested (u32 frame_queue_index, + u32 index, + u32 queue_hi_thresh, + vlib_frame_queue_t ** + handoff_queue_by_worker_index) +{ + vlib_frame_queue_t *fq; + vlib_thread_main_t *tm = &vlib_thread_main; + vlib_frame_queue_main_t *fqm = + vec_elt_at_index (tm->frame_queue_mains, frame_queue_index); + + fq = handoff_queue_by_worker_index[index]; + if (fq != (vlib_frame_queue_t *) (~0)) + return fq; + + fq = fqm->vlib_frame_queues[index]; + ASSERT (fq); + + if (PREDICT_FALSE (fq->tail >= (fq->head_hint + queue_hi_thresh))) + { + /* a valid entry in the array will indicate the queue has reached + * the specified threshold and is congested + */ + handoff_queue_by_worker_index[index] = fq; + fq->enqueue_full_events++; + return fq; + } + + return NULL; +} + +static inline vlib_frame_queue_elt_t * +vlib_get_worker_handoff_queue_elt (u32 frame_queue_index, + u32 vlib_worker_index, + vlib_frame_queue_elt_t ** + handoff_queue_elt_by_worker_index) +{ + vlib_frame_queue_elt_t *elt; + + if (handoff_queue_elt_by_worker_index[vlib_worker_index]) + return handoff_queue_elt_by_worker_index[vlib_worker_index]; + + elt = vlib_get_frame_queue_elt (frame_queue_index, vlib_worker_index); + + handoff_queue_elt_by_worker_index[vlib_worker_index] = elt; + + return elt; +} + #endif /* included_vlib_threads_h */ /* diff --git a/vlib/vlib/threads_cli.c b/vlib/vlib/threads_cli.c index 70bf729b306..aef67576751 100644 --- a/vlib/vlib/threads_cli.c +++ b/vlib/vlib/threads_cli.c @@ -157,28 +157,48 @@ static clib_error_t * trace_frame_queue (vlib_main_t * vm, unformat_input_t * input, vlib_cli_command_t * cmd) { + unformat_input_t _line_input, *line_input = &_line_input; clib_error_t *error = NULL; frame_queue_trace_t *fqt; frame_queue_nelt_counter_t *fqh; vlib_thread_main_t *tm = vlib_get_thread_main (); + vlib_frame_queue_main_t *fqm; u32 num_fq; u32 fqix; - u32 enable = 0; + u32 enable = 2; + u32 index = ~(u32) 0; - if (unformat (input, "on")) - { - enable = 1; - } - else if (unformat (input, "off")) - { - enable = 0; - } - else + if (!unformat_user (input, unformat_line_input, line_input)) + return 0; + + while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT) { - return clib_error_return (0, "expecting on or off"); + if (unformat (line_input, "on")) + enable = 1; + else if (unformat (line_input, "off")) + enable = 0; + else if (unformat (line_input, "index %u"), &index) + ; + else + return clib_error_return (0, "parse error: '%U'", + format_unformat_error, line_input); } - num_fq = vec_len (vlib_frame_queues); + unformat_free (line_input); + + if (enable > 1) + return clib_error_return (0, "expecting on or off"); + + if (vec_len (tm->frame_queue_mains) == 0) + return clib_error_return (0, "no worker handoffs exist"); + + if (index > vec_len (tm->frame_queue_mains) - 1) + return clib_error_return (0, + "expecting valid worker handoff queue index"); + + fqm = vec_elt_at_index (tm->frame_queue_mains, index); + + num_fq = vec_len (fqm->vlib_frame_queues); if (num_fq == 0) { vlib_cli_output (vm, "No frame queues exist\n"); @@ -186,20 +206,20 @@ trace_frame_queue (vlib_main_t * vm, unformat_input_t * input, } // Allocate storage for trace if necessary - vec_validate_aligned (tm->frame_queue_traces, num_fq - 1, + vec_validate_aligned (fqm->frame_queue_traces, num_fq - 1, CLIB_CACHE_LINE_BYTES); - vec_validate_aligned (tm->frame_queue_histogram, num_fq - 1, + vec_validate_aligned (fqm->frame_queue_histogram, num_fq - 1, CLIB_CACHE_LINE_BYTES); for (fqix = 0; fqix < num_fq; fqix++) { - fqt = &tm->frame_queue_traces[fqix]; - fqh = &tm->frame_queue_histogram[fqix]; + fqt = &fqm->frame_queue_traces[fqix]; + fqh = &fqm->frame_queue_histogram[fqix]; memset (fqt->n_vectors, 0xff, sizeof (fqt->n_vectors)); fqt->written = 0; memset (fqh, 0, sizeof (*fqh)); - vlib_frame_queues[fqix]->trace = enable; + fqm->vlib_frame_queues[fqix]->trace = enable; } return error; } @@ -236,16 +256,16 @@ compute_percent (u64 * two_counters, u64 total) * Display frame queue trace data gathered by threads. */ static clib_error_t * -show_frame_queue_internal (vlib_main_t * vm, u32 histogram) +show_frame_queue_internal (vlib_main_t * vm, + vlib_frame_queue_main_t * fqm, u32 histogram) { - vlib_thread_main_t *tm = vlib_get_thread_main (); clib_error_t *error = NULL; frame_queue_trace_t *fqt; frame_queue_nelt_counter_t *fqh; u32 num_fq; u32 fqix; - num_fq = vec_len (tm->frame_queue_traces); + num_fq = vec_len (fqm->frame_queue_traces); if (num_fq == 0) { vlib_cli_output (vm, "No trace data for frame queues\n"); @@ -260,7 +280,7 @@ show_frame_queue_internal (vlib_main_t * vm, u32 histogram) for (fqix = 0; fqix < num_fq; fqix++) { - fqt = &(tm->frame_queue_traces[fqix]); + fqt = &(fqm->frame_queue_traces[fqix]); vlib_cli_output (vm, "Thread %d %v\n", fqix, vlib_worker_threads[fqix].name); @@ -273,7 +293,7 @@ show_frame_queue_internal (vlib_main_t * vm, u32 histogram) if (histogram) { - fqh = &(tm->frame_queue_histogram[fqix]); + fqh = &(fqm->frame_queue_histogram[fqix]); u32 nelt; u64 total = 0; @@ -350,14 +370,40 @@ static clib_error_t * show_frame_queue_trace (vlib_main_t * vm, unformat_input_t * input, vlib_cli_command_t * cmd) { - return show_frame_queue_internal (vm, 0); + vlib_thread_main_t *tm = vlib_get_thread_main (); + vlib_frame_queue_main_t *fqm; + clib_error_t *error; + + vec_foreach (fqm, tm->frame_queue_mains) + { + vlib_cli_output (vm, "Worker handoff queue index %u (next node '%U'):", + fqm - tm->frame_queue_mains, + format_vlib_node_name, vm, fqm->node_index); + error = show_frame_queue_internal (vm, fqm, 0); + if (error) + return error; + } + return 0; } static clib_error_t * show_frame_queue_histogram (vlib_main_t * vm, unformat_input_t * input, vlib_cli_command_t * cmd) { - return show_frame_queue_internal (vm, 1); + vlib_thread_main_t *tm = vlib_get_thread_main (); + vlib_frame_queue_main_t *fqm; + clib_error_t *error; + + vec_foreach (fqm, tm->frame_queue_mains) + { + vlib_cli_output (vm, "Worker handoff queue index %u (next node '%U'):", + fqm - tm->frame_queue_mains, + format_vlib_node_name, vm, fqm->node_index); + error = show_frame_queue_internal (vm, fqm, 1); + if (error) + return error; + } + return 0; } /* *INDENT-OFF* */ @@ -384,18 +430,43 @@ static clib_error_t * test_frame_queue_nelts (vlib_main_t * vm, unformat_input_t * input, vlib_cli_command_t * cmd) { + unformat_input_t _line_input, *line_input = &_line_input; + vlib_thread_main_t *tm = vlib_get_thread_main (); + vlib_frame_queue_main_t *fqm; clib_error_t *error = NULL; u32 num_fq; u32 fqix; u32 nelts = 0; + u32 index = ~(u32) 0; + + if (!unformat_user (input, unformat_line_input, line_input)) + return 0; + + while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT) + { + if (unformat (line_input, "nelts %u"), &nelts) + ; + else if (unformat (line_input, "index %u"), &index) + ; + else + return clib_error_return (0, "parse error: '%U'", + format_unformat_error, line_input); + } + + unformat_free (line_input); + + if (index > vec_len (tm->frame_queue_mains) - 1) + return clib_error_return (0, + "expecting valid worker handoff queue index"); + + fqm = vec_elt_at_index (tm->frame_queue_mains, index); - if ((unformat (input, "%d", &nelts) != 1) || - ((nelts != 4) && (nelts != 8) && (nelts != 16) && (nelts != 32))) + if ((nelts != 4) && (nelts != 8) && (nelts != 16) && (nelts != 32)) { return clib_error_return (0, "expecting 4,8,16,32"); } - num_fq = vec_len (vlib_frame_queues); + num_fq = vec_len (fqm->vlib_frame_queues); if (num_fq == 0) { vlib_cli_output (vm, "No frame queues exist\n"); @@ -404,7 +475,7 @@ test_frame_queue_nelts (vlib_main_t * vm, unformat_input_t * input, for (fqix = 0; fqix < num_fq; fqix++) { - vlib_frame_queues[fqix]->nelts = nelts; + fqm->vlib_frame_queues[fqix]->nelts = nelts; } return error; @@ -426,15 +497,39 @@ static clib_error_t * test_frame_queue_threshold (vlib_main_t * vm, unformat_input_t * input, vlib_cli_command_t * cmd) { + unformat_input_t _line_input, *line_input = &_line_input; + vlib_thread_main_t *tm = vlib_get_thread_main (); + vlib_frame_queue_main_t *fqm; clib_error_t *error = NULL; u32 num_fq; u32 fqix; - u32 threshold = 0; + u32 threshold = ~(u32) 0; + u32 index = ~(u32) 0; + + if (!unformat_user (input, unformat_line_input, line_input)) + return 0; - if (unformat (input, "%d", &threshold)) + while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT) { + if (unformat (line_input, "threshold %u"), &threshold) + ; + else if (unformat (line_input, "index %u"), &index) + ; + else + return clib_error_return (0, "parse error: '%U'", + format_unformat_error, line_input); } - else + + unformat_free (line_input); + + if (index > vec_len (tm->frame_queue_mains) - 1) + return clib_error_return (0, + "expecting valid worker handoff queue index"); + + fqm = vec_elt_at_index (tm->frame_queue_mains, index); + + + if (threshold == ~(u32) 0) { vlib_cli_output (vm, "expecting threshold value\n"); return error; @@ -443,7 +538,7 @@ test_frame_queue_threshold (vlib_main_t * vm, unformat_input_t * input, if (threshold == 0) threshold = ~0; - num_fq = vec_len (vlib_frame_queues); + num_fq = vec_len (fqm->vlib_frame_queues); if (num_fq == 0) { vlib_cli_output (vm, "No frame queues exist\n"); @@ -452,7 +547,7 @@ test_frame_queue_threshold (vlib_main_t * vm, unformat_input_t * input, for (fqix = 0; fqix < num_fq; fqix++) { - vlib_frame_queues[fqix]->vector_threshold = threshold; + fqm->vlib_frame_queues[fqix]->vector_threshold = threshold; } return error; diff --git a/vnet/vnet/devices/dpdk/cli.c b/vnet/vnet/devices/dpdk/cli.c index 8f99b272ad8..5e53a98beae 100644 --- a/vnet/vnet/devices/dpdk/cli.c +++ b/vnet/vnet/devices/dpdk/cli.c @@ -346,9 +346,7 @@ show_efd (vlib_main_t * vm, else if (unformat (input, "worker")) { vlib_thread_main_t *tm = vlib_get_thread_main (); - vlib_frame_queue_t *fq; vlib_thread_registration_t *tr; - int thread_id; u32 num_workers = 0; u32 first_worker_index = 0; uword *p; @@ -364,27 +362,9 @@ show_efd (vlib_main_t * vm, vlib_cli_output (vm, "num_workers %d\n" - "first_worker_index %d\n" - "vlib_frame_queues[%d]:\n", - num_workers, first_worker_index, tm->n_vlib_mains); + "first_worker_index %d\n", + num_workers, first_worker_index); - for (thread_id = 0; thread_id < tm->n_vlib_mains; thread_id++) - { - fq = vlib_frame_queues[thread_id]; - if (fq) - { - vlib_cli_output (vm, - "%2d: frames_queued %u\n" - " frames_queued_hint %u\n" - " enqueue_full_events %u\n" - " enqueue_efd_discards %u\n", - thread_id, - (fq->tail - fq->head), - (fq->tail - fq->head_hint), - fq->enqueue_full_events, - fq->enqueue_efd_discards); - } - } } else if (unformat (input, "help")) { @@ -413,9 +393,6 @@ clear_efd (vlib_main_t * vm, { dpdk_main_t *dm = &dpdk_main; dpdk_device_t *xd; - vlib_thread_main_t *tm = vlib_get_thread_main (); - vlib_frame_queue_t *fq; - int thread_id; /* *INDENT-OFF* */ vec_foreach (xd, dm->devices) @@ -432,16 +409,6 @@ clear_efd (vlib_main_t * vm, } /* *INDENT-ON* */ - for (thread_id = 0; thread_id < tm->n_vlib_mains; thread_id++) - { - fq = vlib_frame_queues[thread_id]; - if (fq) - { - fq->enqueue_full_events = 0; - fq->enqueue_efd_discards = 0; - } - } - return 0; } diff --git a/vnet/vnet/handoff.c b/vnet/vnet/handoff.c index 5593aa74942..22d2ea98df7 100644 --- a/vnet/vnet/handoff.c +++ b/vnet/vnet/handoff.c @@ -34,12 +34,16 @@ typedef struct per_inteface_handoff_data_t *if_data; + /* Worker handoff index */ + u32 frame_queue_index; + /* convenience variables */ vlib_main_t *vlib_main; vnet_main_t *vnet_main; } handoff_main_t; handoff_main_t handoff_main; +vlib_node_registration_t handoff_dispatch_node; typedef struct { @@ -147,8 +151,9 @@ worker_handoff_node_fn (vlib_main_t * vm, if (hf) hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_worker; - hf = dpdk_get_handoff_queue_elt (next_worker_index, - handoff_queue_elt_by_worker_index); + hf = vlib_get_worker_handoff_queue_elt (hm->frame_queue_index, + next_worker_index, + handoff_queue_elt_by_worker_index); n_left_to_next_worker = VLIB_FRAME_SIZE - hf->n_vectors; to_next_worker = &hf->buffer_index[hf->n_vectors]; @@ -163,7 +168,7 @@ worker_handoff_node_fn (vlib_main_t * vm, if (n_left_to_next_worker == 0) { hf->n_vectors = VLIB_FRAME_SIZE; - vlib_put_handoff_queue_elt (hf); + vlib_put_frame_queue_elt (hf); current_worker_index = ~0; handoff_queue_elt_by_worker_index[next_worker_index] = 0; hf = 0; @@ -196,7 +201,7 @@ worker_handoff_node_fn (vlib_main_t * vm, */ if (1 || hf->n_vectors == hf->last_n_vectors) { - vlib_put_handoff_queue_elt (hf); + vlib_put_frame_queue_elt (hf); handoff_queue_elt_by_worker_index[i] = 0; } else @@ -247,6 +252,10 @@ interface_handoff_enable_disable (vlib_main_t * vm, u32 sw_if_index, if (clib_bitmap_last_set (bitmap) >= hm->num_workers) return VNET_API_ERROR_INVALID_WORKER; + if (hm->frame_queue_index == ~0) + hm->frame_queue_index = + vlib_frame_queue_main_init (handoff_dispatch_node.index, 0); + vec_validate (hm->if_data, sw_if_index); d = vec_elt_at_index (hm->if_data, sw_if_index); @@ -356,9 +365,6 @@ format_handoff_dispatch_trace (u8 * s, va_list * args) return s; } - -vlib_node_registration_t handoff_dispatch_node; - #define foreach_handoff_dispatch_error \ _(EXAMPLE, "example packets") @@ -556,8 +562,7 @@ handoff_init (vlib_main_t * vm) hm->vlib_main = vm; hm->vnet_main = &vnet_main; - ASSERT (tm->handoff_dispatch_node_index == ~0); - tm->handoff_dispatch_node_index = handoff_dispatch_node.index; + hm->frame_queue_index = ~0; return 0; } diff --git a/vnet/vnet/handoff.h b/vnet/vnet/handoff.h index 9320f5602b5..4fefb369475 100644 --- a/vnet/vnet/handoff.h +++ b/vnet/vnet/handoff.h @@ -32,85 +32,6 @@ typedef enum HANDOFF_DISPATCH_N_NEXT, } handoff_dispatch_next_t; -static inline void -vlib_put_handoff_queue_elt (vlib_frame_queue_elt_t * hf) -{ - CLIB_MEMORY_BARRIER (); - hf->valid = 1; -} - -static inline vlib_frame_queue_elt_t * -vlib_get_handoff_queue_elt (u32 vlib_worker_index) -{ - vlib_frame_queue_t *fq; - vlib_frame_queue_elt_t *elt; - u64 new_tail; - - fq = vlib_frame_queues[vlib_worker_index]; - ASSERT (fq); - - new_tail = __sync_add_and_fetch (&fq->tail, 1); - - /* Wait until a ring slot is available */ - while (new_tail >= fq->head_hint + fq->nelts) - vlib_worker_thread_barrier_check (); - - elt = fq->elts + (new_tail & (fq->nelts - 1)); - - /* this would be very bad... */ - while (elt->valid) - ; - - elt->msg_type = VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME; - elt->last_n_vectors = elt->n_vectors = 0; - - return elt; -} - -static inline vlib_frame_queue_t * -is_vlib_handoff_queue_congested (u32 vlib_worker_index, - u32 queue_hi_thresh, - vlib_frame_queue_t ** - handoff_queue_by_worker_index) -{ - vlib_frame_queue_t *fq; - - fq = handoff_queue_by_worker_index[vlib_worker_index]; - if (fq != (vlib_frame_queue_t *) (~0)) - return fq; - - fq = vlib_frame_queues[vlib_worker_index]; - ASSERT (fq); - - if (PREDICT_FALSE (fq->tail >= (fq->head_hint + queue_hi_thresh))) - { - /* a valid entry in the array will indicate the queue has reached - * the specified threshold and is congested - */ - handoff_queue_by_worker_index[vlib_worker_index] = fq; - fq->enqueue_full_events++; - return fq; - } - - return NULL; -} - -static inline vlib_frame_queue_elt_t * -dpdk_get_handoff_queue_elt (u32 vlib_worker_index, - vlib_frame_queue_elt_t ** - handoff_queue_elt_by_worker_index) -{ - vlib_frame_queue_elt_t *elt; - - if (handoff_queue_elt_by_worker_index[vlib_worker_index]) - return handoff_queue_elt_by_worker_index[vlib_worker_index]; - - elt = vlib_get_handoff_queue_elt (vlib_worker_index); - - handoff_queue_elt_by_worker_index[vlib_worker_index] = elt; - - return elt; -} static inline u64 ipv4_get_key (ip4_header_t * ip) -- 2.16.6