vlib: improve enqueue_to_next buffer indices extraction
[vpp.git] / src / vlib / buffer_funcs.c
index f3023a3..eaf141e 100644 (file)
  * Copyright(c) 2021 Cisco Systems, Inc.
  */
 
+#include <vppinfra/clib.h>
+#include <vppinfra/vector_funcs.h>
 #include <vlib/vlib.h>
 
-void __clib_section (".vlib_buffer_enqueue_to_next_fn")
-CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_next_fn)
-(vlib_main_t *vm, vlib_node_runtime_t *node, u32 *buffers, u16 *nexts,
- uword count)
+typedef struct
 {
-  u32 *to_next, n_left_to_next, max;
-  u16 next_index;
+  uword used_elts[VLIB_FRAME_SIZE / 64];
+  u32 uword_offset;
+} extract_data_t;
 
-  next_index = nexts[0];
-  vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
-  max = clib_min (n_left_to_next, count);
-
-  while (count)
+static_always_inline u32 *
+extract_unused_elts_x64 (u32 *elts, u16 *indices, u16 index, int n_left,
+                        u64 *bmp, u32 *dst)
+{
+  u64 mask = 0;
+#if defined(CLIB_HAVE_VEC128)
+  mask = clib_compare_u16_x64 (index, indices);
+  if (n_left == 64)
     {
-      u32 n_enqueued;
-      if ((nexts[0] != next_index) || n_left_to_next == 0)
+      if (mask == ~0ULL)
        {
-         vlib_put_next_frame (vm, node, next_index, n_left_to_next);
-         next_index = nexts[0];
-         vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
-         max = clib_min (n_left_to_next, count);
+         clib_memcpy_u32 (dst, elts, 64);
+         *bmp = ~0ULL;
+         return dst + 64;
        }
-#if defined(CLIB_HAVE_VEC512)
-      u16x32 next32 = CLIB_MEM_OVERFLOW_LOAD (u16x32_load_unaligned, nexts);
-      next32 = (next32 == u16x32_splat (next32[0]));
-      u64 bitmap = u16x32_msb_mask (next32);
-      n_enqueued = count_trailing_zeros (~bitmap);
+    }
+  else
+    mask &= pow2_mask (n_left);
+
+  *bmp |= mask;
+
+#if defined(CLIB_HAVE_VEC512_COMPRESS)
+  u32x16u *ev = (u32x16u *) elts;
+  for (int i = 0; i < 4; i++)
+    {
+      int cnt = _popcnt32 ((u16) mask);
+      u32x16_compress_store (ev[i], mask, dst);
+      dst += cnt;
+      mask >>= 16;
+    }
+
+#elif defined(CLIB_HAVE_VEC256_COMPRESS)
+  u32x8u *ev = (u32x8u *) elts;
+  for (int i = 0; i < 8; i++)
+    {
+      int cnt = _popcnt32 ((u8) mask);
+      u32x8_compress_store (ev[i], mask, dst);
+      dst += cnt;
+      mask >>= 8;
+    }
 #elif defined(CLIB_HAVE_VEC256)
-      u16x16 next16 = CLIB_MEM_OVERFLOW_LOAD (u16x16_load_unaligned, nexts);
-      next16 = (next16 == u16x16_splat (next16[0]));
-      u64 bitmap = u8x32_msb_mask ((u8x32) next16);
-      n_enqueued = count_trailing_zeros (~bitmap) / 2;
-#elif defined(CLIB_HAVE_VEC128) && defined(CLIB_HAVE_VEC128_MSB_MASK)
-      u16x8 next8 = CLIB_MEM_OVERFLOW_LOAD (u16x8_load_unaligned, nexts);
-      next8 = (next8 == u16x8_splat (next8[0]));
-      u64 bitmap = u8x16_msb_mask ((u8x16) next8);
-      n_enqueued = count_trailing_zeros (~bitmap) / 2;
+  while (mask)
+    {
+      u16 bit = count_trailing_zeros (mask);
+      mask = clear_lowest_set_bit (mask);
+      dst++[0] = elts[bit];
+    }
+#else
+  while (mask)
+    {
+      u16 bit = count_trailing_zeros (mask);
+      mask ^= 1ULL << bit;
+      dst++[0] = elts[bit];
+    }
+#endif
 #else
-      u16 x = 0;
-      if (count + 3 < max)
+  for (int i = 0; i < n_left; i++)
+    {
+      if (indices[i] == index)
        {
-         x |= next_index ^ nexts[1];
-         x |= next_index ^ nexts[2];
-         x |= next_index ^ nexts[3];
-         n_enqueued = (x == 0) ? 4 : 1;
+         dst++[0] = elts[i];
+         mask |= 1ULL << i;
        }
-      else
-       n_enqueued = 1;
+    }
+  *bmp |= mask;
 #endif
+  return dst;
+}
 
-      if (PREDICT_FALSE (n_enqueued > max))
-       n_enqueued = max;
+static_always_inline u32
+extract_unused_elts_by_index (extract_data_t *d, u32 *elts, u16 *indices,
+                             u16 index, int n_left, u32 *dst)
+{
+  u32 *dst0 = dst;
+  u64 *bmp = d->used_elts;
+  while (n_left >= 64)
+    {
+      dst = extract_unused_elts_x64 (elts, indices, index, 64, bmp, dst);
 
-#ifdef CLIB_HAVE_VEC512
-      if (n_enqueued >= 32)
-       {
-         vlib_buffer_copy_indices (to_next, buffers, 32);
-         nexts += 32;
-         to_next += 32;
-         buffers += 32;
-         n_left_to_next -= 32;
-         count -= 32;
-         max -= 32;
-         continue;
-       }
-#endif
+      /* next */
+      indices += 64;
+      elts += 64;
+      bmp++;
+      n_left -= 64;
+    }
 
-#ifdef CLIB_HAVE_VEC256
-      if (n_enqueued >= 16)
-       {
-         vlib_buffer_copy_indices (to_next, buffers, 16);
-         nexts += 16;
-         to_next += 16;
-         buffers += 16;
-         n_left_to_next -= 16;
-         count -= 16;
-         max -= 16;
-         continue;
-       }
-#endif
+  if (n_left)
+    dst = extract_unused_elts_x64 (elts, indices, index, n_left, bmp, dst);
 
-#ifdef CLIB_HAVE_VEC128
-      if (n_enqueued >= 8)
-       {
-         vlib_buffer_copy_indices (to_next, buffers, 8);
-         nexts += 8;
-         to_next += 8;
-         buffers += 8;
-         n_left_to_next -= 8;
-         count -= 8;
-         max -= 8;
-         continue;
-       }
-#endif
+  return dst - dst0;
+}
+
+static_always_inline u32
+find_first_unused_elt (extract_data_t *d)
+{
+  u64 *ue = d->used_elts + d->uword_offset;
+
+  while (PREDICT_FALSE (ue[0] == ~0))
+    {
+      ue++;
+      d->uword_offset++;
+    }
+
+  return d->uword_offset * 64 + count_trailing_zeros (~ue[0]);
+}
+
+static_always_inline u32
+enqueue_one (vlib_main_t *vm, vlib_node_runtime_t *node, extract_data_t *d,
+            u16 next_index, u32 *buffers, u16 *nexts, u32 n_buffers,
+            u32 n_left, u32 *tmp)
+{
+  vlib_frame_t *f;
+  u32 n_extracted, n_free;
+  u32 *to;
+
+  f = vlib_get_next_frame_internal (vm, node, next_index, 0);
+
+  n_free = VLIB_FRAME_SIZE - f->n_vectors;
+
+  /* if frame contains enough space for worst case scenario, we can avoid
+   * use of tmp */
+  if (n_free >= n_left)
+    to = (u32 *) vlib_frame_vector_args (f) + f->n_vectors;
+  else
+    to = tmp;
+
+  n_extracted = extract_unused_elts_by_index (d, buffers, nexts, next_index,
+                                             n_buffers, to);
+
+  if (to != tmp)
+    {
+      /* indices already written to frame, just close it */
+      vlib_put_next_frame (vm, node, next_index, n_free - n_extracted);
+    }
+  else if (n_free >= n_extracted)
+    {
+      /* enough space in the existing frame */
+      to = (u32 *) vlib_frame_vector_args (f) + f->n_vectors;
+      vlib_buffer_copy_indices (to, tmp, n_extracted);
+      vlib_put_next_frame (vm, node, next_index, n_free - n_extracted);
+    }
+  else
+    {
+      /* full frame */
+      to = (u32 *) vlib_frame_vector_args (f) + f->n_vectors;
+      vlib_buffer_copy_indices (to, tmp, n_free);
+      vlib_put_next_frame (vm, node, next_index, 0);
+
+      /* second frame */
+      u32 n_2nd_frame = n_extracted - n_free;
+      f = vlib_get_next_frame_internal (vm, node, next_index, 1);
+      to = vlib_frame_vector_args (f);
+      vlib_buffer_copy_indices (to, tmp + n_free, n_2nd_frame);
+      vlib_put_next_frame (vm, node, next_index,
+                          VLIB_FRAME_SIZE - n_2nd_frame);
+    }
+
+  return n_left - n_extracted;
+}
+
+void __clib_section (".vlib_buffer_enqueue_to_next_fn")
+CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_next_fn)
+(vlib_main_t *vm, vlib_node_runtime_t *node, u32 *buffers, u16 *nexts,
+ uword count)
+{
+  u32 tmp[VLIB_FRAME_SIZE];
+  u32 n_left;
+  u16 next_index;
+
+  while (count >= VLIB_FRAME_SIZE)
+    {
+      extract_data_t d = {};
+      n_left = VLIB_FRAME_SIZE;
 
-      if (n_enqueued >= 4)
+      next_index = nexts[0];
+      n_left = enqueue_one (vm, node, &d, next_index, buffers, nexts,
+                           VLIB_FRAME_SIZE, n_left, tmp);
+
+      while (n_left)
        {
-         vlib_buffer_copy_indices (to_next, buffers, 4);
-         nexts += 4;
-         to_next += 4;
-         buffers += 4;
-         n_left_to_next -= 4;
-         count -= 4;
-         max -= 4;
-         continue;
+         next_index = nexts[find_first_unused_elt (&d)];
+         n_left = enqueue_one (vm, node, &d, next_index, buffers, nexts,
+                               VLIB_FRAME_SIZE, n_left, tmp);
        }
 
-      /* copy */
-      to_next[0] = buffers[0];
+      buffers += VLIB_FRAME_SIZE;
+      nexts += VLIB_FRAME_SIZE;
+      count -= VLIB_FRAME_SIZE;
+    }
 
-      /* next */
-      nexts += 1;
-      to_next += 1;
-      buffers += 1;
-      n_left_to_next -= 1;
-      count -= 1;
-      max -= 1;
+  if (count)
+    {
+      extract_data_t d = {};
+      next_index = nexts[0];
+      n_left = count;
+
+      n_left = enqueue_one (vm, node, &d, next_index, buffers, nexts, count,
+                           n_left, tmp);
+
+      while (n_left)
+       {
+         next_index = nexts[find_first_unused_elt (&d)];
+         n_left = enqueue_one (vm, node, &d, next_index, buffers, nexts,
+                               count, n_left, tmp);
+       }
     }
-  vlib_put_next_frame (vm, node, next_index, n_left_to_next);
 }
 
 CLIB_MARCH_FN_REGISTRATION (vlib_buffer_enqueue_to_next_fn);
@@ -161,7 +256,6 @@ next:
     }
   vlib_put_next_frame (vm, node, next_index, n_left_to_next);
 }
-
 CLIB_MARCH_FN_REGISTRATION (vlib_buffer_enqueue_to_single_next_fn);
 
 u32 __clib_section (".vlib_buffer_enqueue_to_thread_fn")