X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvlib%2Fthreads.c;h=c99458ddaec3865c4d4fb186041ce71b762c4a4c;hb=b7b929931a07fbb27b43d5cd105f366c3e29807e;hp=e3ea3c9cb478c69b9669ba7db3ee9f6988850d1e;hpb=bd69a5f24c6e83e9101f203dd124864fb2877a17;p=vpp.git diff --git a/src/vlib/threads.c b/src/vlib/threads.c index e3ea3c9cb47..c99458ddaec 100644 --- a/src/vlib/threads.c +++ b/src/vlib/threads.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -24,7 +25,7 @@ DECLARE_CJ_GLOBAL_LOG; -#define FRAME_QUEUE_NELTS 32 +#define FRAME_QUEUE_NELTS 64 u32 vl (void *p) @@ -35,31 +36,138 @@ vl (void *p) vlib_worker_thread_t *vlib_worker_threads; vlib_thread_main_t vlib_thread_main; -uword -os_get_cpu_number (void) +/* + * Barrier tracing can be enabled on a normal build to collect information + * on barrier use, including timings and call stacks. Deliberately not + * keyed off CLIB_DEBUG, because that can add significant overhead which + * imapacts observed timings. + */ + +u32 +elog_global_id_for_msg_name (const char *msg_name) { - void *sp; - uword n; - u32 len; + uword *p, r; + static uword *h; + u8 *name_copy; - len = vec_len (vlib_thread_stacks); - if (len == 0) - return 0; + if (!h) + h = hash_create_string (0, sizeof (uword)); + + p = hash_get_mem (h, msg_name); + if (p) + return p[0]; + r = elog_string (&vlib_global_main.elog_main, "%s", msg_name); + + name_copy = format (0, "%s%c", msg_name, 0); + + hash_set_mem (h, name_copy, r); - /* Get any old stack address. */ - sp = &sp; + return r; +} - n = ((uword) sp - (uword) vlib_thread_stacks[0]) - >> VLIB_LOG2_THREAD_STACK_SIZE; +static inline void +barrier_trace_sync (f64 t_entry, f64 t_open, f64 t_closed) +{ + if (!vlib_worker_threads->barrier_elog_enabled) + return; + + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "bar-trace-%s-#%d", + .format_args = "T4i4", + }; + /* *INDENT-ON* */ + struct + { + u32 caller, count, t_entry, t_open, t_closed; + } *ed = 0; + + ed = ELOG_DATA (&vlib_global_main.elog_main, e); + ed->count = (int) vlib_worker_threads[0].barrier_sync_count; + ed->caller = elog_global_id_for_msg_name + (vlib_worker_threads[0].barrier_caller); + ed->t_entry = (int) (1000000.0 * t_entry); + ed->t_open = (int) (1000000.0 * t_open); + ed->t_closed = (int) (1000000.0 * t_closed); +} + +static inline void +barrier_trace_sync_rec (f64 t_entry) +{ + if (!vlib_worker_threads->barrier_elog_enabled) + return; + + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "bar-syncrec-%s-#%d", + .format_args = "T4i4", + }; + /* *INDENT-ON* */ + struct + { + u32 caller, depth; + } *ed = 0; + + ed = ELOG_DATA (&vlib_global_main.elog_main, e); + ed->depth = (int) vlib_worker_threads[0].recursion_level - 1; + ed->caller = elog_global_id_for_msg_name + (vlib_worker_threads[0].barrier_caller); +} + +static inline void +barrier_trace_release_rec (f64 t_entry) +{ + if (!vlib_worker_threads->barrier_elog_enabled) + return; + + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "bar-relrrec-#%d", + .format_args = "i4", + }; + /* *INDENT-ON* */ + struct + { + u32 depth; + } *ed = 0; + + ed = ELOG_DATA (&vlib_global_main.elog_main, e); + ed->depth = (int) vlib_worker_threads[0].recursion_level; +} - /* "processes" have their own stacks, and they always run in thread 0 */ - n = n >= len ? 0 : n; +static inline void +barrier_trace_release (f64 t_entry, f64 t_closed_total, f64 t_update_main) +{ + if (!vlib_worker_threads->barrier_elog_enabled) + return; - return n; + /* *INDENT-OFF* */ + ELOG_TYPE_DECLARE (e) = + { + .format = "bar-rel-#%d-e%d-u%d-t%d", + .format_args = "i4i4i4i4", + }; + /* *INDENT-ON* */ + struct + { + u32 count, t_entry, t_update_main, t_closed_total; + } *ed = 0; + + ed = ELOG_DATA (&vlib_global_main.elog_main, e); + ed->t_entry = (int) (1000000.0 * t_entry); + ed->t_update_main = (int) (1000000.0 * t_update_main); + ed->t_closed_total = (int) (1000000.0 * t_closed_total); + ed->count = (int) vlib_worker_threads[0].barrier_sync_count; + + /* Reset context for next trace */ + vlib_worker_threads[0].barrier_context = NULL; } uword -os_get_ncpus (void) +os_get_nthreads (void) { u32 len; @@ -96,7 +204,7 @@ sort_registrations_by_no_clone (void *a0, void *a1) } static uword * -vlib_sysfs_list_to_bitmap (char *filename) +clib_sysfs_list_to_bitmap (char *filename) { FILE *fp; uword *r = 0; @@ -138,9 +246,9 @@ vlib_thread_init (vlib_main_t * vm) /* get bitmaps of active cpu cores and sockets */ tm->cpu_core_bitmap = - vlib_sysfs_list_to_bitmap ("/sys/devices/system/cpu/online"); + clib_sysfs_list_to_bitmap ("/sys/devices/system/cpu/online"); tm->cpu_socket_bitmap = - vlib_sysfs_list_to_bitmap ("/sys/devices/system/node/online"); + clib_sysfs_list_to_bitmap ("/sys/devices/system/node/online"); avail_cpu = clib_bitmap_dup (tm->cpu_core_bitmap); @@ -155,9 +263,13 @@ vlib_thread_init (vlib_main_t * vm) } /* grab cpu for main thread */ - if (!tm->main_lcore) + if (tm->main_lcore == ~0) { - tm->main_lcore = clib_bitmap_first_set (avail_cpu); + /* if main-lcore is not set, we try to use lcore 1 */ + if (clib_bitmap_get (avail_cpu, 1)) + tm->main_lcore = 1; + else + tm->main_lcore = clib_bitmap_first_set (avail_cpu); if (tm->main_lcore == (u8) ~ 0) return clib_error_return (0, "no available cpus to be used for the" " main thread"); @@ -196,7 +308,7 @@ vlib_thread_init (vlib_main_t * vm) w = vlib_worker_threads; w->thread_mheap = clib_mem_get_heap (); w->thread_stack = vlib_thread_stacks[0]; - w->lcore_id = tm->main_lcore; + w->cpu_id = tm->main_lcore; w->lwp = syscall (SYS_gettid); w->thread_id = pthread_self (); tm->n_vlib_mains = 1; @@ -275,28 +387,13 @@ vlib_thread_init (vlib_main_t * vm) return 0; } -vlib_worker_thread_t * -vlib_alloc_thread (vlib_main_t * vm) -{ - vlib_worker_thread_t *w; - - if (vec_len (vlib_worker_threads) >= vec_len (vlib_thread_stacks)) - { - clib_warning ("out of worker threads... Quitting..."); - exit (1); - } - vec_add2 (vlib_worker_threads, w, 1); - w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads]; - return w; -} - vlib_frame_queue_t * vlib_frame_queue_alloc (int nelts) { vlib_frame_queue_t *fq; fq = clib_mem_alloc_aligned (sizeof (*fq), CLIB_CACHE_LINE_BYTES); - memset (fq, 0, sizeof (*fq)); + clib_memset (fq, 0, sizeof (*fq)); fq->nelts = nelts; fq->vector_threshold = 128; // packets vec_validate_aligned (fq->elts, nelts - 1, CLIB_CACHE_LINE_BYTES); @@ -419,7 +516,7 @@ vlib_frame_queue_enqueue (vlib_main_t * vm, u32 node_runtime_index, ASSERT (fq); - new_tail = __sync_add_and_fetch (&fq->tail, 1); + new_tail = clib_atomic_add_fetch (&fq->tail, 1); /* Wait until a ring slot is available */ while (new_tail >= fq->head + fq->nelts) @@ -427,7 +524,7 @@ vlib_frame_queue_enqueue (vlib_main_t * vm, u32 node_runtime_index, f64 b4 = vlib_time_now_ticks (vm, before); vlib_worker_thread_barrier_check (vm, b4); /* Bad idea. Dequeue -> enqueue -> dequeue -> trouble */ - // vlib_frame_queue_dequeue (vm->cpu_index, vm, nm); + // vlib_frame_queue_dequeue (vm->thread_index, vm, nm); } elt = fq->elts + (new_tail & (fq->nelts - 1)); @@ -479,12 +576,12 @@ vlib_worker_thread_init (vlib_worker_thread_t * w) { /* Initial barrier sync, for both worker and i/o threads */ - clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, 1); + clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, 1); while (*vlib_worker_threads->wait_at_barrier) ; - clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, -1); + clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, -1); } } @@ -497,6 +594,8 @@ vlib_worker_thread_bootstrap_fn (void *arg) w->lwp = syscall (SYS_gettid); w->thread_id = pthread_self (); + __os_thread_index = w - vlib_worker_threads; + rv = (void *) clib_calljmp ((uword (*)(uword)) w->thread_function, (uword) arg, w->thread_stack + VLIB_THREAD_STACK_SIZE); @@ -504,21 +603,42 @@ vlib_worker_thread_bootstrap_fn (void *arg) return rv; } +static void +vlib_get_thread_core_socket (vlib_worker_thread_t * w, unsigned cpu_id) +{ + const char *sys_cpu_path = "/sys/devices/system/cpu/cpu"; + u8 *p = 0; + int core_id = -1, socket_id = -1; + + p = format (p, "%s%u/topology/core_id%c", sys_cpu_path, cpu_id, 0); + clib_sysfs_read ((char *) p, "%d", &core_id); + vec_reset_length (p); + p = + format (p, "%s%u/topology/physical_package_id%c", sys_cpu_path, cpu_id, + 0); + clib_sysfs_read ((char *) p, "%d", &socket_id); + vec_free (p); + + w->core_id = core_id; + w->socket_id = socket_id; +} + static clib_error_t * -vlib_launch_thread_int (void *fp, vlib_worker_thread_t * w, unsigned lcore_id) +vlib_launch_thread_int (void *fp, vlib_worker_thread_t * w, unsigned cpu_id) { vlib_thread_main_t *tm = &vlib_thread_main; void *(*fp_arg) (void *) = fp; - w->lcore_id = lcore_id; + w->cpu_id = cpu_id; + vlib_get_thread_core_socket (w, cpu_id); if (tm->cb.vlib_launch_thread_cb && !w->registration->use_pthreads) - return tm->cb.vlib_launch_thread_cb (fp, (void *) w, lcore_id); + return tm->cb.vlib_launch_thread_cb (fp, (void *) w, cpu_id); else { pthread_t worker; cpu_set_t cpuset; CPU_ZERO (&cpuset); - CPU_SET (lcore_id, &cpuset); + CPU_SET (cpu_id, &cpuset); if (pthread_create (&worker, NULL /* attr */ , fp_arg, (void *) w)) return clib_error_return_unix (0, "pthread_create"); @@ -543,7 +663,6 @@ start_workers (vlib_main_t * vm) u32 n_vlib_mains = tm->n_vlib_mains; u32 worker_thread_index; u8 *main_heap = clib_mem_get_per_cpu_heap (); - mheap_t *main_heap_header = mheap_header (main_heap); vec_reset_length (vlib_worker_threads); @@ -558,37 +677,45 @@ start_workers (vlib_main_t * vm) vlib_set_thread_name ((char *) w->name); } - /* - * Truth of the matter: we always use at least two - * threads. So, make the main heap thread-safe - * and make the event log thread-safe. - */ - main_heap_header->flags |= MHEAP_FLAG_THREAD_SAFE; vm->elog_main.lock = clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES, CLIB_CACHE_LINE_BYTES); vm->elog_main.lock[0] = 0; if (n_vlib_mains > 1) { - vec_validate (vlib_mains, tm->n_vlib_mains - 1); + /* Replace hand-crafted length-1 vector with a real vector */ + vlib_mains = 0; + + vec_validate_aligned (vlib_mains, tm->n_vlib_mains - 1, + CLIB_CACHE_LINE_BYTES); _vec_len (vlib_mains) = 0; - vec_add1 (vlib_mains, vm); + vec_add1_aligned (vlib_mains, vm, CLIB_CACHE_LINE_BYTES); vlib_worker_threads->wait_at_barrier = clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES); vlib_worker_threads->workers_at_barrier = clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES); + vlib_worker_threads->node_reforks_required = + clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES); + /* Ask for an initial barrier sync */ *vlib_worker_threads->workers_at_barrier = 0; *vlib_worker_threads->wait_at_barrier = 1; + /* Without update or refork */ + *vlib_worker_threads->node_reforks_required = 0; + vm->need_vlib_worker_thread_node_runtime_update = 0; + + /* init timing */ + vm->barrier_epoch = 0; + vm->barrier_no_close_before = 0; + worker_thread_index = 1; for (i = 0; i < vec_len (tm->registrations); i++) { vlib_node_main_t *nm, *nm_clone; - vlib_buffer_main_t *bm_clone; vlib_buffer_free_list_t *fl_clone, *fl_orig; vlib_buffer_free_list_t *orig_freelist_pool; int k; @@ -600,13 +727,25 @@ start_workers (vlib_main_t * vm) for (k = 0; k < tr->count; k++) { + vlib_node_t *n; + vec_add2 (vlib_worker_threads, w, 1); + /* Currently unused, may not really work */ if (tr->mheap_size) - w->thread_mheap = - mheap_alloc (0 /* use VM */ , tr->mheap_size); + { +#if USE_DLMALLOC == 0 + w->thread_mheap = + mheap_alloc (0 /* use VM */ , tr->mheap_size); +#else + w->thread_mheap = create_mspace (tr->mheap_size, + 0 /* unlocked */ ); +#endif + } else w->thread_mheap = main_heap; - w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads]; + + w->thread_stack = + vlib_thread_stack_init (w - vlib_worker_threads); w->thread_function = tr->function; w->thread_function_arg = w; w->instance_id = k; @@ -623,19 +762,27 @@ start_workers (vlib_main_t * vm) /* Fork vlib_global_main et al. Look for bugs here */ oldheap = clib_mem_set_heap (w->thread_mheap); - vm_clone = clib_mem_alloc (sizeof (*vm_clone)); + vm_clone = clib_mem_alloc_aligned (sizeof (*vm_clone), + CLIB_CACHE_LINE_BYTES); clib_memcpy (vm_clone, vlib_mains[0], sizeof (*vm_clone)); - vm_clone->cpu_index = worker_thread_index; + vm_clone->thread_index = worker_thread_index; vm_clone->heap_base = w->thread_mheap; - vm_clone->mbuf_alloc_list = 0; - memset (&vm_clone->random_buffer, 0, - sizeof (vm_clone->random_buffer)); + vm_clone->heap_aligned_base = (void *) + (((uword) w->thread_mheap) & ~(VLIB_FRAME_ALIGN - 1)); + vm_clone->init_functions_called = + hash_create (0, /* value bytes */ 0); + vm_clone->pending_rpc_requests = 0; + vec_validate (vm_clone->pending_rpc_requests, 0); + _vec_len (vm_clone->pending_rpc_requests) = 0; + clib_memset (&vm_clone->random_buffer, 0, + sizeof (vm_clone->random_buffer)); nm = &vlib_mains[0]->node_main; nm_clone = &vm_clone->node_main; /* fork next frames array, preserving node runtime indices */ - nm_clone->next_frames = vec_dup (nm->next_frames); + nm_clone->next_frames = vec_dup_aligned (nm->next_frames, + CLIB_CACHE_LINE_BYTES); for (j = 0; j < vec_len (nm_clone->next_frames); j++) { vlib_next_frame_t *nf = &nm_clone->next_frames[j]; @@ -656,56 +803,79 @@ start_workers (vlib_main_t * vm) /* fork nodes */ nm_clone->nodes = 0; + + /* Allocate all nodes in single block for speed */ + n = clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*n)); + for (j = 0; j < vec_len (nm->nodes); j++) { - vlib_node_t *n; - n = clib_mem_alloc_no_fail (sizeof (*n)); clib_memcpy (n, nm->nodes[j], sizeof (*n)); /* none of the copied nodes have enqueue rights given out */ n->owner_node_index = VLIB_INVALID_NODE_INDEX; - memset (&n->stats_total, 0, sizeof (n->stats_total)); - memset (&n->stats_last_clear, 0, - sizeof (n->stats_last_clear)); + clib_memset (&n->stats_total, 0, sizeof (n->stats_total)); + clib_memset (&n->stats_last_clear, 0, + sizeof (n->stats_last_clear)); vec_add1 (nm_clone->nodes, n); + n++; } nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] = - vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]); + vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL], + CLIB_CACHE_LINE_BYTES); + vec_foreach (rt, + nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]) + { + vlib_node_t *n = vlib_get_node (vm, rt->node_index); + rt->thread_index = vm_clone->thread_index; + /* copy initial runtime_data from node */ + if (n->runtime_data && n->runtime_data_bytes > 0) + clib_memcpy (rt->runtime_data, n->runtime_data, + clib_min (VLIB_NODE_RUNTIME_DATA_SIZE, + n->runtime_data_bytes)); + } nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] = - vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]); + vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT], + CLIB_CACHE_LINE_BYTES); vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT]) - rt->cpu_index = vm_clone->cpu_index; + { + vlib_node_t *n = vlib_get_node (vm, rt->node_index); + rt->thread_index = vm_clone->thread_index; + /* copy initial runtime_data from node */ + if (n->runtime_data && n->runtime_data_bytes > 0) + clib_memcpy (rt->runtime_data, n->runtime_data, + clib_min (VLIB_NODE_RUNTIME_DATA_SIZE, + n->runtime_data_bytes)); + } - nm_clone->processes = vec_dup (nm->processes); + nm_clone->processes = vec_dup_aligned (nm->processes, + CLIB_CACHE_LINE_BYTES); /* zap the (per worker) frame freelists, etc */ nm_clone->frame_sizes = 0; - nm_clone->frame_size_hash = 0; + nm_clone->frame_size_hash = hash_create (0, sizeof (uword)); /* Packet trace buffers are guaranteed to be empty, nothing to do here */ clib_mem_set_heap (oldheap); - vec_add1 (vlib_mains, vm_clone); + vec_add1_aligned (vlib_mains, vm_clone, CLIB_CACHE_LINE_BYTES); - vm_clone->error_main.counters = - vec_dup (vlib_mains[0]->error_main.counters); - vm_clone->error_main.counters_last_clear = - vec_dup (vlib_mains[0]->error_main.counters_last_clear); + vm_clone->error_main.counters = vec_dup_aligned + (vlib_mains[0]->error_main.counters, CLIB_CACHE_LINE_BYTES); + vm_clone->error_main.counters_last_clear = vec_dup_aligned + (vlib_mains[0]->error_main.counters_last_clear, + CLIB_CACHE_LINE_BYTES); /* Fork the vlib_buffer_main_t free lists, etc. */ - bm_clone = vec_dup (vm_clone->buffer_main); - vm_clone->buffer_main = bm_clone; - - orig_freelist_pool = bm_clone->buffer_free_list_pool; - bm_clone->buffer_free_list_pool = 0; + orig_freelist_pool = vm_clone->buffer_free_list_pool; + vm_clone->buffer_free_list_pool = 0; /* *INDENT-OFF* */ pool_foreach (fl_orig, orig_freelist_pool, ({ - pool_get_aligned (bm_clone->buffer_free_list_pool, + pool_get_aligned (vm_clone->buffer_free_list_pool, fl_clone, CLIB_CACHE_LINE_BYTES); ASSERT (fl_orig - orig_freelist_pool - == fl_clone - bm_clone->buffer_free_list_pool); + == fl_clone - vm_clone->buffer_free_list_pool); fl_clone[0] = fl_orig[0]; fl_clone->buffers = 0; @@ -728,11 +898,19 @@ start_workers (vlib_main_t * vm) { vec_add2 (vlib_worker_threads, w, 1); if (tr->mheap_size) - w->thread_mheap = - mheap_alloc (0 /* use VM */ , tr->mheap_size); + { +#if USE_DLMALLOC == 0 + w->thread_mheap = + mheap_alloc (0 /* use VM */ , tr->mheap_size); +#else + w->thread_mheap = + create_mspace (tr->mheap_size, 0 /* locked */ ); +#endif + } else w->thread_mheap = main_heap; - w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads]; + w->thread_stack = + vlib_thread_stack_init (w - vlib_worker_threads); w->thread_function = tr->function; w->thread_function_arg = w; w->instance_id = j; @@ -786,32 +964,26 @@ start_workers (vlib_main_t * vm) VLIB_MAIN_LOOP_ENTER_FUNCTION (start_workers); -void -vlib_worker_thread_node_runtime_update (void) + +static inline void +worker_thread_node_runtime_update_internal (void) { int i, j; - vlib_worker_thread_t *w; vlib_main_t *vm; vlib_node_main_t *nm, *nm_clone; - vlib_node_t **old_nodes_clone; vlib_main_t *vm_clone; - vlib_node_runtime_t *rt, *old_rt; - void *oldheap; + vlib_node_runtime_t *rt; never_inline void vlib_node_runtime_sync_stats (vlib_main_t * vm, vlib_node_runtime_t * r, uword n_calls, uword n_vectors, uword n_clocks); - ASSERT (os_get_cpu_number () == 0); - - if (vec_len (vlib_mains) == 0) - return; + ASSERT (vlib_get_thread_index () == 0); vm = vlib_mains[0]; nm = &vm->node_main; - ASSERT (os_get_cpu_number () == 0); ASSERT (*vlib_worker_threads->wait_at_barrier == 1); /* @@ -841,117 +1013,173 @@ vlib_worker_thread_node_runtime_update (void) } } - for (i = 1; i < vec_len (vlib_mains); i++) - { - vlib_node_runtime_t *rt; - w = vlib_worker_threads + i; - oldheap = clib_mem_set_heap (w->thread_mheap); - - vm_clone = vlib_mains[i]; - - /* Re-clone error heap */ - u64 *old_counters = vm_clone->error_main.counters; - u64 *old_counters_all_clear = vm_clone->error_main.counters_last_clear; - clib_memcpy (&vm_clone->error_main, &vm->error_main, - sizeof (vm->error_main)); - j = vec_len (vm->error_main.counters) - 1; - vec_validate_aligned (old_counters, j, CLIB_CACHE_LINE_BYTES); - vec_validate_aligned (old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES); - vm_clone->error_main.counters = old_counters; - vm_clone->error_main.counters_last_clear = old_counters_all_clear; - - nm_clone = &vm_clone->node_main; - vec_free (nm_clone->next_frames); - nm_clone->next_frames = vec_dup (nm->next_frames); - - for (j = 0; j < vec_len (nm_clone->next_frames); j++) - { - vlib_next_frame_t *nf = &nm_clone->next_frames[j]; - u32 save_node_runtime_index; - u32 save_flags; - - save_node_runtime_index = nf->node_runtime_index; - save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH; - vlib_next_frame_init (nf); - nf->node_runtime_index = save_node_runtime_index; - nf->flags = save_flags; - } + /* Per-worker clone rebuilds are now done on each thread */ +} - old_nodes_clone = nm_clone->nodes; - nm_clone->nodes = 0; - /* re-fork nodes */ - for (j = 0; j < vec_len (nm->nodes); j++) - { - vlib_node_t *old_n_clone; - vlib_node_t *new_n, *new_n_clone; +void +vlib_worker_thread_node_refork (void) +{ + vlib_main_t *vm, *vm_clone; + vlib_node_main_t *nm, *nm_clone; + vlib_node_t **old_nodes_clone; + vlib_node_runtime_t *rt, *old_rt; - new_n = nm->nodes[j]; - old_n_clone = old_nodes_clone[j]; + vlib_node_t *new_n_clone; - new_n_clone = clib_mem_alloc_no_fail (sizeof (*new_n_clone)); - clib_memcpy (new_n_clone, new_n, sizeof (*new_n)); - /* none of the copied nodes have enqueue rights given out */ - new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX; + int j; - if (j >= vec_len (old_nodes_clone)) - { - /* new node, set to zero */ - memset (&new_n_clone->stats_total, 0, - sizeof (new_n_clone->stats_total)); - memset (&new_n_clone->stats_last_clear, 0, - sizeof (new_n_clone->stats_last_clear)); - } - else - { - /* Copy stats if the old data is valid */ - clib_memcpy (&new_n_clone->stats_total, - &old_n_clone->stats_total, - sizeof (new_n_clone->stats_total)); - clib_memcpy (&new_n_clone->stats_last_clear, - &old_n_clone->stats_last_clear, - sizeof (new_n_clone->stats_last_clear)); - - /* keep previous node state */ - new_n_clone->state = old_n_clone->state; - } - vec_add1 (nm_clone->nodes, new_n_clone); - } - /* Free the old node clone */ - for (j = 0; j < vec_len (old_nodes_clone); j++) - clib_mem_free (old_nodes_clone[j]); - vec_free (old_nodes_clone); + vm = vlib_mains[0]; + nm = &vm->node_main; + vm_clone = vlib_get_main (); + nm_clone = &vm_clone->node_main; + + /* Re-clone error heap */ + u64 *old_counters = vm_clone->error_main.counters; + u64 *old_counters_all_clear = vm_clone->error_main.counters_last_clear; + + clib_memcpy (&vm_clone->error_main, &vm->error_main, + sizeof (vm->error_main)); + j = vec_len (vm->error_main.counters) - 1; + vec_validate_aligned (old_counters, j, CLIB_CACHE_LINE_BYTES); + vec_validate_aligned (old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES); + vm_clone->error_main.counters = old_counters; + vm_clone->error_main.counters_last_clear = old_counters_all_clear; + + nm_clone = &vm_clone->node_main; + vec_free (nm_clone->next_frames); + nm_clone->next_frames = vec_dup_aligned (nm->next_frames, + CLIB_CACHE_LINE_BYTES); + + for (j = 0; j < vec_len (nm_clone->next_frames); j++) + { + vlib_next_frame_t *nf = &nm_clone->next_frames[j]; + u32 save_node_runtime_index; + u32 save_flags; + + save_node_runtime_index = nf->node_runtime_index; + save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH; + vlib_next_frame_init (nf); + nf->node_runtime_index = save_node_runtime_index; + nf->flags = save_flags; + } - vec_free (nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]); + old_nodes_clone = nm_clone->nodes; + nm_clone->nodes = 0; - nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] = - vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]); + /* re-fork nodes */ - /* clone input node runtime */ - old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT]; + /* Allocate all nodes in single block for speed */ + new_n_clone = + clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*new_n_clone)); + for (j = 0; j < vec_len (nm->nodes); j++) + { + vlib_node_t *old_n_clone; + vlib_node_t *new_n; - nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] = - vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]); + new_n = nm->nodes[j]; + old_n_clone = old_nodes_clone[j]; - vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT]) - { - rt->cpu_index = vm_clone->cpu_index; - } + clib_memcpy (new_n_clone, new_n, sizeof (*new_n)); + /* none of the copied nodes have enqueue rights given out */ + new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX; - for (j = 0; j < vec_len (old_rt); j++) + if (j >= vec_len (old_nodes_clone)) { - rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index); - rt->state = old_rt[j].state; + /* new node, set to zero */ + clib_memset (&new_n_clone->stats_total, 0, + sizeof (new_n_clone->stats_total)); + clib_memset (&new_n_clone->stats_last_clear, 0, + sizeof (new_n_clone->stats_last_clear)); } + else + { + /* Copy stats if the old data is valid */ + clib_memcpy (&new_n_clone->stats_total, + &old_n_clone->stats_total, + sizeof (new_n_clone->stats_total)); + clib_memcpy (&new_n_clone->stats_last_clear, + &old_n_clone->stats_last_clear, + sizeof (new_n_clone->stats_last_clear)); + + /* keep previous node state */ + new_n_clone->state = old_n_clone->state; + } + vec_add1 (nm_clone->nodes, new_n_clone); + new_n_clone++; + } + /* Free the old node clones */ + clib_mem_free (old_nodes_clone[0]); + + vec_free (old_nodes_clone); + + + /* re-clone internal nodes */ + old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]; + nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] = + vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL], + CLIB_CACHE_LINE_BYTES); + + vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]) + { + vlib_node_t *n = vlib_get_node (vm, rt->node_index); + rt->thread_index = vm_clone->thread_index; + /* copy runtime_data, will be overwritten later for existing rt */ + if (n->runtime_data && n->runtime_data_bytes > 0) + clib_memcpy (rt->runtime_data, n->runtime_data, + clib_min (VLIB_NODE_RUNTIME_DATA_SIZE, + n->runtime_data_bytes)); + } + + for (j = 0; j < vec_len (old_rt); j++) + { + rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index); + rt->state = old_rt[j].state; + clib_memcpy (rt->runtime_data, old_rt[j].runtime_data, + VLIB_NODE_RUNTIME_DATA_SIZE); + } - vec_free (old_rt); + vec_free (old_rt); + + /* re-clone input nodes */ + old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT]; + nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] = + vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT], + CLIB_CACHE_LINE_BYTES); + + vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT]) + { + vlib_node_t *n = vlib_get_node (vm, rt->node_index); + rt->thread_index = vm_clone->thread_index; + /* copy runtime_data, will be overwritten later for existing rt */ + if (n->runtime_data && n->runtime_data_bytes > 0) + clib_memcpy (rt->runtime_data, n->runtime_data, + clib_min (VLIB_NODE_RUNTIME_DATA_SIZE, + n->runtime_data_bytes)); + } + + for (j = 0; j < vec_len (old_rt); j++) + { + rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index); + rt->state = old_rt[j].state; + clib_memcpy (rt->runtime_data, old_rt[j].runtime_data, + VLIB_NODE_RUNTIME_DATA_SIZE); + } - nm_clone->processes = vec_dup (nm->processes); + vec_free (old_rt); - clib_mem_set_heap (oldheap); + nm_clone->processes = vec_dup_aligned (nm->processes, + CLIB_CACHE_LINE_BYTES); +} - // vnet_main_fork_fixup (i); - } +void +vlib_worker_thread_node_runtime_update (void) +{ + /* + * Make a note that we need to do a node runtime update + * prior to releasing the barrier. + */ + vlib_global_main.need_vlib_worker_thread_node_runtime_update = 1; } u32 @@ -975,7 +1203,6 @@ cpu_config (vlib_main_t * vm, unformat_input_t * input) uword *p; vlib_thread_main_t *tm = &vlib_thread_main; u8 *name; - u64 coremask; uword *bitmap; u32 count; @@ -984,6 +1211,7 @@ cpu_config (vlib_main_t * vm, unformat_input_t * input) tm->n_thread_stacks = 1; /* account for main thread */ tm->sched_policy = ~0; tm->sched_priority = ~0; + tm->main_lcore = ~0; tr = tm->next; @@ -1003,25 +1231,10 @@ cpu_config (vlib_main_t * vm, unformat_input_t * input) ; else if (unformat (input, "skip-cores %u", &tm->skip_cores)) ; - else if (unformat (input, "coremask-%s %llx", &name, &coremask)) - { - p = hash_get_mem (tm->thread_registrations_by_name, name); - if (p == 0) - return clib_error_return (0, "no such thread type '%s'", name); - - tr = (vlib_thread_registration_t *) p[0]; - - if (tr->use_pthreads) - return clib_error_return (0, - "coremask cannot be set for '%s' threads", - name); - - tr->coremask = clib_bitmap_set_multiple - (tr->coremask, 0, coremask, BITS (coremask)); - tr->count = clib_bitmap_count_set_bits (tr->coremask); - } - else if (unformat (input, "corelist-%s %U", &name, unformat_bitmap_list, - &bitmap)) + else if (unformat (input, "coremask-%s %U", &name, + unformat_bitmap_mask, &bitmap) || + unformat (input, "corelist-%s %U", &name, + unformat_bitmap_list, &bitmap)) { p = hash_get_mem (tm->thread_registrations_by_name, name); if (p == 0) @@ -1097,22 +1310,6 @@ cpu_config (vlib_main_t * vm, unformat_input_t * input) VLIB_EARLY_CONFIG_FUNCTION (cpu_config, "cpu"); -#if !defined (__x86_64__) && !defined (__aarch64__) && !defined (__powerpc64__) && !defined(__arm__) -void -__sync_fetch_and_add_8 (void) -{ - fformat (stderr, "%s called\n", __FUNCTION__); - abort (); -} - -void -__sync_add_and_fetch_8 (void) -{ - fformat (stderr, "%s called\n", __FUNCTION__); - abort (); -} -#endif - void vnet_main_fixup (vlib_fork_fixup_t which) __attribute__ ((weak)); void vnet_main_fixup (vlib_fork_fixup_t which) @@ -1127,7 +1324,7 @@ vlib_worker_thread_fork_fixup (vlib_fork_fixup_t which) if (vlib_mains == 0) return; - ASSERT (os_get_cpu_number () == 0); + ASSERT (vlib_get_thread_index () == 0); vlib_worker_thread_barrier_sync (vm); switch (which) @@ -1142,61 +1339,198 @@ vlib_worker_thread_fork_fixup (vlib_fork_fixup_t which) vlib_worker_thread_barrier_release (vm); } + /* + * Enforce minimum open time to minimize packet loss due to Rx overflow, + * based on a test based heuristic that barrier should be open for at least + * 3 time as long as it is closed (with an upper bound of 1ms because by that + * point it is probably too late to make a difference) + */ + +#ifndef BARRIER_MINIMUM_OPEN_LIMIT +#define BARRIER_MINIMUM_OPEN_LIMIT 0.001 +#endif + +#ifndef BARRIER_MINIMUM_OPEN_FACTOR +#define BARRIER_MINIMUM_OPEN_FACTOR 3 +#endif + void -vlib_worker_thread_barrier_sync (vlib_main_t * vm) +vlib_worker_thread_barrier_sync_int (vlib_main_t * vm) { f64 deadline; + f64 now; + f64 t_entry; + f64 t_open; + f64 t_closed; u32 count; - if (!vlib_mains) + if (vec_len (vlib_mains) < 2) return; + ASSERT (vlib_get_thread_index () == 0); + count = vec_len (vlib_mains) - 1; + /* Record entry relative to last close */ + now = vlib_time_now (vm); + t_entry = now - vm->barrier_epoch; + /* Tolerate recursive calls */ if (++vlib_worker_threads[0].recursion_level > 1) - return; + { + barrier_trace_sync_rec (t_entry); + return; + } vlib_worker_threads[0].barrier_sync_count++; - ASSERT (os_get_cpu_number () == 0); + /* Enforce minimum barrier open time to minimize packet loss */ + ASSERT (vm->barrier_no_close_before <= (now + BARRIER_MINIMUM_OPEN_LIMIT)); + + while (1) + { + now = vlib_time_now (vm); + /* Barrier hold-down timer expired? */ + if (now >= vm->barrier_no_close_before) + break; + if ((vm->barrier_no_close_before - now) + > (2.0 * BARRIER_MINIMUM_OPEN_LIMIT)) + { + clib_warning ("clock change: would have waited for %.4f seconds", + (vm->barrier_no_close_before - now)); + break; + } + } + /* Record time of closure */ + t_open = now - vm->barrier_epoch; + vm->barrier_epoch = now; - deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT; + deadline = now + BARRIER_SYNC_TIMEOUT; *vlib_worker_threads->wait_at_barrier = 1; while (*vlib_worker_threads->workers_at_barrier != count) { - if (vlib_time_now (vm) > deadline) + if ((now = vlib_time_now (vm)) > deadline) { fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__); os_panic (); } } + + t_closed = now - vm->barrier_epoch; + + barrier_trace_sync (t_entry, t_open, t_closed); + +} + +void vlib_stat_segment_lock (void) __attribute__ ((weak)); +void +vlib_stat_segment_lock (void) +{ +} + +void vlib_stat_segment_unlock (void) __attribute__ ((weak)); +void +vlib_stat_segment_unlock (void) +{ } void vlib_worker_thread_barrier_release (vlib_main_t * vm) { f64 deadline; - - if (!vlib_mains) + f64 now; + f64 minimum_open; + f64 t_entry; + f64 t_closed_total; + f64 t_update_main = 0.0; + int refork_needed = 0; + + if (vec_len (vlib_mains) < 2) return; + ASSERT (vlib_get_thread_index () == 0); + + + now = vlib_time_now (vm); + t_entry = now - vm->barrier_epoch; + if (--vlib_worker_threads[0].recursion_level > 0) - return; + { + barrier_trace_release_rec (t_entry); + return; + } + + /* Update (all) node runtimes before releasing the barrier, if needed */ + if (vm->need_vlib_worker_thread_node_runtime_update) + { + /* + * Lock stat segment here, so we's safe when + * rebuilding the stat segment node clones from the + * stat thread... + */ + vlib_stat_segment_lock (); + + /* Do stats elements on main thread */ + worker_thread_node_runtime_update_internal (); + vm->need_vlib_worker_thread_node_runtime_update = 0; + + /* Do per thread rebuilds in parallel */ + refork_needed = 1; + clib_atomic_fetch_add (vlib_worker_threads->node_reforks_required, + (vec_len (vlib_mains) - 1)); + now = vlib_time_now (vm); + t_update_main = now - vm->barrier_epoch; + } - deadline = vlib_time_now (vm) + BARRIER_SYNC_TIMEOUT; + deadline = now + BARRIER_SYNC_TIMEOUT; *vlib_worker_threads->wait_at_barrier = 0; while (*vlib_worker_threads->workers_at_barrier > 0) { - if (vlib_time_now (vm) > deadline) + if ((now = vlib_time_now (vm)) > deadline) { fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__); os_panic (); } } + + /* Wait for reforks before continuing */ + if (refork_needed) + { + now = vlib_time_now (vm); + + deadline = now + BARRIER_SYNC_TIMEOUT; + + while (*vlib_worker_threads->node_reforks_required > 0) + { + if ((now = vlib_time_now (vm)) > deadline) + { + fformat (stderr, "%s: worker thread refork deadlock\n", + __FUNCTION__); + os_panic (); + } + } + vlib_stat_segment_unlock (); + } + + t_closed_total = now - vm->barrier_epoch; + + minimum_open = t_closed_total * BARRIER_MINIMUM_OPEN_FACTOR; + + if (minimum_open > BARRIER_MINIMUM_OPEN_LIMIT) + { + minimum_open = BARRIER_MINIMUM_OPEN_LIMIT; + } + + vm->barrier_no_close_before = now + minimum_open; + + /* Record barrier epoch (used to enforce minimum open time) */ + vm->barrier_epoch = now; + + barrier_trace_release (t_entry, t_closed_total, t_update_main); + } /* @@ -1204,11 +1538,10 @@ vlib_worker_thread_barrier_release (vlib_main_t * vm) * If so, pull the packets off the frames and put them to * the handoff node. */ -static inline int -vlib_frame_queue_dequeue_internal (vlib_main_t * vm, - vlib_frame_queue_main_t * fqm) +int +vlib_frame_queue_dequeue (vlib_main_t * vm, vlib_frame_queue_main_t * fqm) { - u32 thread_id = vm->cpu_index; + u32 thread_id = vm->thread_index; vlib_frame_queue_t *fq = fqm->vlib_frame_queues[thread_id]; vlib_frame_queue_elt_t *elt; u32 *from, *to; @@ -1333,83 +1666,15 @@ vlib_frame_queue_dequeue_internal (vlib_main_t * vm, return processed; } -static_always_inline void -vlib_worker_thread_internal (vlib_main_t * vm) -{ - vlib_node_main_t *nm = &vm->node_main; - vlib_thread_main_t *tm = vlib_get_thread_main (); - u64 cpu_time_now = clib_cpu_time_now (); - vlib_frame_queue_main_t *fqm; - - vec_alloc (nm->pending_interrupt_node_runtime_indices, 32); - - while (1) - { - vlib_worker_thread_barrier_check (); - - vec_foreach (fqm, tm->frame_queue_mains) - vlib_frame_queue_dequeue_internal (vm, fqm); - - vlib_node_runtime_t *n; - vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]) - { - cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT, - VLIB_NODE_STATE_POLLING, /* frame */ 0, - cpu_time_now); - } - - /* Next handle interrupts. */ - { - uword l = _vec_len (nm->pending_interrupt_node_runtime_indices); - uword i; - if (l > 0) - { - _vec_len (nm->pending_interrupt_node_runtime_indices) = 0; - for (i = 0; i < l; i++) - { - n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT], - nm-> - pending_interrupt_node_runtime_indices - [i]); - cpu_time_now = - dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT, - VLIB_NODE_STATE_INTERRUPT, - /* frame */ 0, - cpu_time_now); - } - } - } - - if (_vec_len (nm->pending_frames)) - { - int i; - cpu_time_now = clib_cpu_time_now (); - for (i = 0; i < _vec_len (nm->pending_frames); i++) - { - vlib_pending_frame_t *p; - - p = nm->pending_frames + i; - - cpu_time_now = dispatch_pending_node (vm, p, cpu_time_now); - } - _vec_len (nm->pending_frames) = 0; - } - vlib_increment_main_loop_counter (vm); - - /* Record time stamp in case there are no enabled nodes and above - calls do not update time stamp. */ - cpu_time_now = clib_cpu_time_now (); - } -} - void vlib_worker_thread_fn (void *arg) { vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg; vlib_thread_main_t *tm = vlib_get_thread_main (); vlib_main_t *vm = vlib_get_main (); + clib_error_t *e; - ASSERT (vm->cpu_index == os_get_cpu_number ()); + ASSERT (vm->thread_index == vlib_get_thread_index ()); vlib_worker_thread_init (w); clib_time_init (&vm->clib_time); @@ -1419,7 +1684,12 @@ vlib_worker_thread_fn (void *arg) while (tm->extern_thread_mgmt && tm->worker_thread_release == 0) vlib_worker_thread_barrier_check (); - vlib_worker_thread_internal (vm); + e = vlib_call_init_exit_functions + (vm, vm->worker_init_function_registrations, 1 /* call_once */ ); + if (e) + clib_error_report (e); + + vlib_worker_loop (vm); } /* *INDENT-OFF* */ @@ -1441,22 +1711,34 @@ vlib_frame_queue_main_init (u32 node_index, u32 frame_queue_nelts) if (frame_queue_nelts == 0) frame_queue_nelts = FRAME_QUEUE_NELTS; + ASSERT (frame_queue_nelts >= 8); + vec_add2 (tm->frame_queue_mains, fqm, 1); fqm->node_index = node_index; + fqm->frame_queue_nelts = frame_queue_nelts; + fqm->queue_hi_thresh = frame_queue_nelts - 2; vec_validate (fqm->vlib_frame_queues, tm->n_vlib_mains - 1); + vec_validate (fqm->per_thread_data, tm->n_vlib_mains - 1); _vec_len (fqm->vlib_frame_queues) = 0; for (i = 0; i < tm->n_vlib_mains; i++) { + vlib_frame_queue_per_thread_data_t *ptd; fq = vlib_frame_queue_alloc (frame_queue_nelts); vec_add1 (fqm->vlib_frame_queues, fq); + + ptd = vec_elt_at_index (fqm->per_thread_data, i); + vec_validate (ptd->handoff_queue_elt_by_thread_index, + tm->n_vlib_mains - 1); + vec_validate_init_empty (ptd->congested_handoff_queue_by_thread_index, + tm->n_vlib_mains - 1, + (vlib_frame_queue_t *) (~0)); } return (fqm - tm->frame_queue_mains); } - int vlib_thread_cb_register (struct vlib_main_t *vm, vlib_thread_callbacks_t * cb) { @@ -1470,6 +1752,29 @@ vlib_thread_cb_register (struct vlib_main_t *vm, vlib_thread_callbacks_t * cb) return 0; } +void +vlib_process_signal_event_mt_helper (vlib_process_signal_event_mt_args_t * + args) +{ + ASSERT (vlib_get_thread_index () == 0); + vlib_process_signal_event (vlib_get_main (), args->node_index, + args->type_opaque, args->data); +} + +void *rpc_call_main_thread_cb_fn; + +void +vlib_rpc_call_main_thread (void *callback, u8 * args, u32 arg_size) +{ + if (rpc_call_main_thread_cb_fn) + { + void (*fp) (void *, u8 *, u32) = rpc_call_main_thread_cb_fn; + (*fp) (callback, args, arg_size); + } + else + clib_warning ("BUG: rpc_call_main_thread_cb_fn NULL!"); +} + clib_error_t * threads_init (vlib_main_t * vm) {