X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvlib%2Fbuffer_funcs.c;h=a661370a1411a01c891d89175ac6c203b746af2a;hb=83b2bb86769fef6b6ff985ca4d2d8d64551caf17;hp=fcef2d80f351b61d2abef10eaba2b8f1edb7b3ce;hpb=eee099e9579083fbce665e8c4a3617b9f0e5ea2f;p=vpp.git diff --git a/src/vlib/buffer_funcs.c b/src/vlib/buffer_funcs.c index fcef2d80f35..a661370a141 100644 --- a/src/vlib/buffer_funcs.c +++ b/src/vlib/buffer_funcs.c @@ -4,125 +4,15 @@ #include #include -#include - -typedef struct -{ - uword used_elts[VLIB_FRAME_SIZE / 64]; - u32 uword_offset; -} extract_data_t; - -static_always_inline u32 * -extract_unused_elts_x64 (u32 *elts, u16 *indices, u16 index, int n_left, - u64 *bmp, u32 *dst) -{ - u64 mask = 0; -#if defined(CLIB_HAVE_VEC128) - mask = clib_compare_u16_x64 (index, indices); - if (n_left == 64) - { - if (mask == ~0ULL) - { - clib_memcpy_u32 (dst, elts, 64); - *bmp = ~0ULL; - return dst + 64; - } - } - else - mask &= pow2_mask (n_left); - - *bmp |= mask; - -#if defined(CLIB_HAVE_VEC512_COMPRESS) - u32x16u *ev = (u32x16u *) elts; - for (int i = 0; i < 4; i++) - { - int cnt = _popcnt32 ((u16) mask); - u32x16_compress_store (ev[i], mask, dst); - dst += cnt; - mask >>= 16; - } - -#elif defined(CLIB_HAVE_VEC256_COMPRESS) - u32x8u *ev = (u32x8u *) elts; - for (int i = 0; i < 8; i++) - { - int cnt = _popcnt32 ((u8) mask); - u32x8_compress_store (ev[i], mask, dst); - dst += cnt; - mask >>= 8; - } -#elif defined(CLIB_HAVE_VEC256) - while (mask) - { - u16 bit = count_trailing_zeros (mask); - mask = clear_lowest_set_bit (mask); - dst++[0] = elts[bit]; - } -#else - while (mask) - { - u16 bit = count_trailing_zeros (mask); - mask ^= 1ULL << bit; - dst++[0] = elts[bit]; - } -#endif -#else - for (int i = 0; i < n_left; i++) - { - if (indices[i] == index) - { - dst++[0] = elts[i]; - mask |= 1ULL << i; - } - } - *bmp |= mask; -#endif - return dst; -} +#include +#include static_always_inline u32 -extract_unused_elts_by_index (extract_data_t *d, u32 *elts, u16 *indices, - u16 index, int n_left, u32 *dst) -{ - u32 *dst0 = dst; - u64 *bmp = d->used_elts; - while (n_left >= 64) - { - dst = extract_unused_elts_x64 (elts, indices, index, 64, bmp, dst); - - /* next */ - indices += 64; - elts += 64; - bmp++; - n_left -= 64; - } - - if (n_left) - dst = extract_unused_elts_x64 (elts, indices, index, n_left, bmp, dst); - - return dst - dst0; -} - -static_always_inline u32 -find_first_unused_elt (extract_data_t *d) -{ - u64 *ue = d->used_elts + d->uword_offset; - - while (PREDICT_FALSE (ue[0] == ~0)) - { - ue++; - d->uword_offset++; - } - - return d->uword_offset * 64 + count_trailing_zeros (~ue[0]); -} - -static_always_inline u32 -enqueue_one (vlib_main_t *vm, vlib_node_runtime_t *node, extract_data_t *d, +enqueue_one (vlib_main_t *vm, vlib_node_runtime_t *node, u64 *used_elt_bmp, u16 next_index, u32 *buffers, u16 *nexts, u32 n_buffers, u32 n_left, u32 *tmp) { + u64 match_bmp[VLIB_FRAME_SIZE / 64]; vlib_frame_t *f; u32 n_extracted, n_free; u32 *to; @@ -138,8 +28,12 @@ enqueue_one (vlib_main_t *vm, vlib_node_runtime_t *node, extract_data_t *d, else to = tmp; - n_extracted = extract_unused_elts_by_index (d, buffers, nexts, next_index, - n_buffers, to); + clib_mask_compare_u16 (next_index, nexts, match_bmp, n_buffers); + + n_extracted = clib_compress_u32 (to, buffers, match_bmp, n_buffers); + + for (int i = 0; i < ARRAY_LEN (match_bmp); i++) + used_elt_bmp[i] |= match_bmp[i]; if (to != tmp) { @@ -183,18 +77,26 @@ CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_next_fn) while (count >= VLIB_FRAME_SIZE) { - extract_data_t d = {}; + u64 used_elt_bmp[VLIB_FRAME_SIZE / 64] = {}; n_left = VLIB_FRAME_SIZE; + u32 off = 0; next_index = nexts[0]; - n_left = enqueue_one (vm, node, &d, next_index, buffers, nexts, + n_left = enqueue_one (vm, node, used_elt_bmp, next_index, buffers, nexts, VLIB_FRAME_SIZE, n_left, tmp); while (n_left) { - next_index = nexts[find_first_unused_elt (&d)]; - n_left = enqueue_one (vm, node, &d, next_index, buffers, nexts, - VLIB_FRAME_SIZE, n_left, tmp); + while (PREDICT_FALSE (used_elt_bmp[off] == ~0)) + { + off++; + ASSERT (off < ARRAY_LEN (used_elt_bmp)); + } + + next_index = + nexts[off * 64 + count_trailing_zeros (~used_elt_bmp[off])]; + n_left = enqueue_one (vm, node, used_elt_bmp, next_index, buffers, + nexts, VLIB_FRAME_SIZE, n_left, tmp); } buffers += VLIB_FRAME_SIZE; @@ -204,18 +106,26 @@ CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_next_fn) if (count) { - extract_data_t d = {}; + u64 used_elt_bmp[VLIB_FRAME_SIZE / 64] = {}; next_index = nexts[0]; n_left = count; + u32 off = 0; - n_left = enqueue_one (vm, node, &d, next_index, buffers, nexts, count, - n_left, tmp); + n_left = enqueue_one (vm, node, used_elt_bmp, next_index, buffers, nexts, + count, n_left, tmp); while (n_left) { - next_index = nexts[find_first_unused_elt (&d)]; - n_left = enqueue_one (vm, node, &d, next_index, buffers, nexts, - count, n_left, tmp); + while (PREDICT_FALSE (used_elt_bmp[off] == ~0)) + { + off++; + ASSERT (off < ARRAY_LEN (used_elt_bmp)); + } + + next_index = + nexts[off * 64 + count_trailing_zeros (~used_elt_bmp[off])]; + n_left = enqueue_one (vm, node, used_elt_bmp, next_index, buffers, + nexts, count, n_left, tmp); } } } @@ -258,99 +168,88 @@ next: } CLIB_MARCH_FN_REGISTRATION (vlib_buffer_enqueue_to_single_next_fn); -u32 __clib_section (".vlib_buffer_enqueue_to_thread_fn") -CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_thread_fn) -(vlib_main_t *vm, u32 frame_queue_index, u32 *buffer_indices, - u16 *thread_indices, u32 n_packets, int drop_on_congestion) +static inline vlib_frame_queue_elt_t * +vlib_get_frame_queue_elt (vlib_frame_queue_main_t *fqm, u32 index, + int dont_wait) { - vlib_thread_main_t *tm = vlib_get_thread_main (); - vlib_frame_queue_main_t *fqm; - vlib_frame_queue_per_thread_data_t *ptd; - u32 n_left = n_packets; - u32 drop_list[VLIB_FRAME_SIZE], *dbi = drop_list, n_drop = 0; - vlib_frame_queue_elt_t *hf = 0; - u32 n_left_to_next_thread = 0, *to_next_thread = 0; - u32 next_thread_index, current_thread_index = ~0; - int i; + vlib_frame_queue_t *fq; + u64 nelts, tail, new_tail; - fqm = vec_elt_at_index (tm->frame_queue_mains, frame_queue_index); - ptd = vec_elt_at_index (fqm->per_thread_data, vm->thread_index); + fq = fqm->vlib_frame_queues[index]; + ASSERT (fq); + nelts = fq->nelts; + +retry: + tail = __atomic_load_n (&fq->tail, __ATOMIC_ACQUIRE); + new_tail = tail + 1; - while (n_left) + if (new_tail >= fq->head + nelts) { - next_thread_index = thread_indices[0]; + if (dont_wait) + return 0; - if (next_thread_index != current_thread_index) - { - if (drop_on_congestion && - is_vlib_frame_queue_congested ( - frame_queue_index, next_thread_index, fqm->queue_hi_thresh, - ptd->congested_handoff_queue_by_thread_index)) - { - dbi[0] = buffer_indices[0]; - dbi++; - n_drop++; - goto next; - } + /* Wait until a ring slot is available */ + while (new_tail >= fq->head + nelts) + vlib_worker_thread_barrier_check (); + } - if (hf) - hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_thread; + if (!__atomic_compare_exchange_n (&fq->tail, &tail, new_tail, 0 /* weak */, + __ATOMIC_RELAXED, __ATOMIC_RELAXED)) + goto retry; - hf = vlib_get_worker_handoff_queue_elt ( - frame_queue_index, next_thread_index, - ptd->handoff_queue_elt_by_thread_index); + return fq->elts + (new_tail & (nelts - 1)); +} - n_left_to_next_thread = VLIB_FRAME_SIZE - hf->n_vectors; - to_next_thread = &hf->buffer_index[hf->n_vectors]; - current_thread_index = next_thread_index; - } +static_always_inline u32 +vlib_buffer_enqueue_to_thread_inline (vlib_main_t *vm, + vlib_node_runtime_t *node, + vlib_frame_queue_main_t *fqm, + u32 *buffer_indices, u16 *thread_indices, + u32 n_packets, int drop_on_congestion) +{ + u32 drop_list[VLIB_FRAME_SIZE], n_drop = 0; + u64 used_elts[VLIB_FRAME_SIZE / 64] = {}; + u64 mask[VLIB_FRAME_SIZE / 64]; + vlib_frame_queue_elt_t *hf = 0; + u16 thread_index; + u32 n_comp, off = 0, n_left = n_packets; - to_next_thread[0] = buffer_indices[0]; - to_next_thread++; - n_left_to_next_thread--; + thread_index = thread_indices[0]; - if (n_left_to_next_thread == 0) - { - hf->n_vectors = VLIB_FRAME_SIZE; - vlib_put_frame_queue_elt (hf); - vlib_get_main_by_index (current_thread_index)->check_frame_queues = - 1; - current_thread_index = ~0; - ptd->handoff_queue_elt_by_thread_index[next_thread_index] = 0; - hf = 0; - } +more: + clib_mask_compare_u16 (thread_index, thread_indices, mask, n_packets); + hf = vlib_get_frame_queue_elt (fqm, thread_index, drop_on_congestion); - /* next */ - next: - thread_indices += 1; - buffer_indices += 1; - n_left -= 1; - } + n_comp = clib_compress_u32 (hf ? hf->buffer_index : drop_list + n_drop, + buffer_indices, mask, n_packets); if (hf) - hf->n_vectors = VLIB_FRAME_SIZE - n_left_to_next_thread; + { + if (node->flags & VLIB_NODE_FLAG_TRACE) + hf->maybe_trace = 1; + hf->n_vectors = n_comp; + __atomic_store_n (&hf->valid, 1, __ATOMIC_RELEASE); + vlib_get_main_by_index (thread_index)->check_frame_queues = 1; + } + else + n_drop += n_comp; - /* Ship frames to the thread nodes */ - for (i = 0; i < vec_len (ptd->handoff_queue_elt_by_thread_index); i++) + n_left -= n_comp; + + if (n_left) { - if (ptd->handoff_queue_elt_by_thread_index[i]) + for (int i = 0; i < ARRAY_LEN (used_elts); i++) + used_elts[i] |= mask[i]; + + while (PREDICT_FALSE (used_elts[off] == ~0)) { - hf = ptd->handoff_queue_elt_by_thread_index[i]; - /* - * It works better to let the handoff node - * rate-adapt, always ship the handoff queue element. - */ - if (1 || hf->n_vectors == hf->last_n_vectors) - { - vlib_put_frame_queue_elt (hf); - vlib_get_main_by_index (i)->check_frame_queues = 1; - ptd->handoff_queue_elt_by_thread_index[i] = 0; - } - else - hf->last_n_vectors = hf->n_vectors; + off++; + ASSERT (off < ARRAY_LEN (used_elts)); } - ptd->congested_handoff_queue_by_thread_index[i] = - (vlib_frame_queue_t *) (~0); + + thread_index = + thread_indices[off * 64 + count_trailing_zeros (~used_elts[off])]; + goto more; } if (drop_on_congestion && n_drop) @@ -359,25 +258,50 @@ CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_thread_fn) return n_packets - n_drop; } +u32 __clib_section (".vlib_buffer_enqueue_to_thread_fn") +CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_thread_fn) +(vlib_main_t *vm, vlib_node_runtime_t *node, u32 frame_queue_index, + u32 *buffer_indices, u16 *thread_indices, u32 n_packets, + int drop_on_congestion) +{ + vlib_thread_main_t *tm = vlib_get_thread_main (); + vlib_frame_queue_main_t *fqm; + u32 n_enq = 0; + + fqm = vec_elt_at_index (tm->frame_queue_mains, frame_queue_index); + + while (n_packets >= VLIB_FRAME_SIZE) + { + n_enq += vlib_buffer_enqueue_to_thread_inline ( + vm, node, fqm, buffer_indices, thread_indices, VLIB_FRAME_SIZE, + drop_on_congestion); + buffer_indices += VLIB_FRAME_SIZE; + thread_indices += VLIB_FRAME_SIZE; + n_packets -= VLIB_FRAME_SIZE; + } + + if (n_packets == 0) + return n_enq; + + n_enq += vlib_buffer_enqueue_to_thread_inline (vm, node, fqm, buffer_indices, + thread_indices, n_packets, + drop_on_congestion); + + return n_enq; +} + CLIB_MARCH_FN_REGISTRATION (vlib_buffer_enqueue_to_thread_fn); -/* - * Check the frame queue to see if any frames are available. - * If so, pull the packets off the frames and put them to - * the handoff node. - */ u32 __clib_section (".vlib_frame_queue_dequeue_fn") CLIB_MULTIARCH_FN (vlib_frame_queue_dequeue_fn) (vlib_main_t *vm, vlib_frame_queue_main_t *fqm) { u32 thread_id = vm->thread_index; vlib_frame_queue_t *fq = fqm->vlib_frame_queues[thread_id]; + u32 mask = fq->nelts - 1; vlib_frame_queue_elt_t *elt; - u32 *from, *to; - vlib_frame_t *f; - int msg_type; - int processed = 0; - u32 vectors = 0; + u32 n_free, n_copy, *from, *to = 0, processed = 0, vectors = 0; + vlib_frame_t *f = 0; ASSERT (fq); ASSERT (vm == vlib_global_main.vlib_mains[thread_id]); @@ -397,7 +321,6 @@ CLIB_MULTIARCH_FN (vlib_frame_queue_dequeue_fn) fqt->nelts = fq->nelts; fqt->head = fq->head; - fqt->head_hint = fq->head_hint; fqt->tail = fq->tail; fqt->threshold = fq->vector_threshold; fqt->n_in_use = fqt->tail - fqt->head; @@ -414,7 +337,7 @@ CLIB_MULTIARCH_FN (vlib_frame_queue_dequeue_fn) /* Record a snapshot of the elements in use */ for (elix = 0; elix < fqt->nelts; elix++) { - elt = fq->elts + ((fq->head + 1 + elix) & (fq->nelts - 1)); + elt = fq->elts + ((fq->head + 1 + elix) & (mask)); if (1 || elt->valid) { fqt->n_vectors[elix] = elt->n_vectors; @@ -425,61 +348,71 @@ CLIB_MULTIARCH_FN (vlib_frame_queue_dequeue_fn) while (1) { - vlib_buffer_t *b; if (fq->head == fq->tail) - { - fq->head_hint = fq->head; - return processed; - } + break; - elt = fq->elts + ((fq->head + 1) & (fq->nelts - 1)); + elt = fq->elts + ((fq->head + 1) & mask); - if (!elt->valid) - { - fq->head_hint = fq->head; - return processed; - } + if (!__atomic_load_n (&elt->valid, __ATOMIC_ACQUIRE)) + break; - from = elt->buffer_index; - msg_type = elt->msg_type; + from = elt->buffer_index + elt->offset; - ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME); - ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE); + ASSERT (elt->offset + elt->n_vectors <= VLIB_FRAME_SIZE); - f = vlib_get_frame_to_node (vm, fqm->node_index); + if (f == 0) + { + f = vlib_get_frame_to_node (vm, fqm->node_index); + to = vlib_frame_vector_args (f); + n_free = VLIB_FRAME_SIZE; + } - /* If the first vector is traced, set the frame trace flag */ - b = vlib_get_buffer (vm, from[0]); - if (b->flags & VLIB_BUFFER_IS_TRACED) + if (elt->maybe_trace) f->frame_flags |= VLIB_NODE_FLAG_TRACE; - to = vlib_frame_vector_args (f); - - vlib_buffer_copy_indices (to, from, elt->n_vectors); + n_copy = clib_min (n_free, elt->n_vectors); - vectors += elt->n_vectors; - f->n_vectors = elt->n_vectors; - vlib_put_frame_to_node (vm, fqm->node_index, f); + vlib_buffer_copy_indices (to, from, n_copy); + to += n_copy; + n_free -= n_copy; + vectors += n_copy; - elt->valid = 0; - elt->n_vectors = 0; - elt->msg_type = 0xfefefefe; - CLIB_MEMORY_BARRIER (); - fq->head++; - processed++; + if (n_free == 0) + { + f->n_vectors = VLIB_FRAME_SIZE; + vlib_put_frame_to_node (vm, fqm->node_index, f); + f = 0; + } - /* - * Limit the number of packets pushed into the graph - */ - if (vectors >= fq->vector_threshold) + if (n_copy < elt->n_vectors) + { + /* not empty - leave it on the ring */ + elt->n_vectors -= n_copy; + elt->offset += n_copy; + } + else { - fq->head_hint = fq->head; - return processed; + /* empty - reset and bump head */ + u32 sz = STRUCT_OFFSET_OF (vlib_frame_queue_elt_t, end_of_reset); + clib_memset (elt, 0, sz); + __atomic_store_n (&fq->head, fq->head + 1, __ATOMIC_RELEASE); + processed++; } + + /* Limit the number of packets pushed into the graph */ + if (vectors >= fq->vector_threshold) + break; } - ASSERT (0); + + if (f) + { + f->n_vectors = VLIB_FRAME_SIZE - n_free; + vlib_put_frame_to_node (vm, fqm->node_index, f); + } + return processed; } + CLIB_MARCH_FN_REGISTRATION (vlib_frame_queue_dequeue_fn); #ifndef CLIB_MARCH_VARIANT