svm: basic support for fifo shrinking 83/19283/21
authorFlorin Coras <fcoras@cisco.com>
Thu, 2 May 2019 19:52:19 +0000 (12:52 -0700)
committerDave Barach <openvpp@barachs.net>
Fri, 3 May 2019 20:20:42 +0000 (20:20 +0000)
As opposed to growing, this is not a bulk operation, instead dependent
on how the producer/consumer advance head and tail, the fifo will shrink
in one or multiple steps.

Only once the fifo's nitems and size are reduced to their appropriate
values, equal or larger to what was requested, can the fifo chunks be
collected by the owner. Chunk collection must be done with the segment
heap pushed.

Change-Id: Iae407ccf48d85320aa3c1e0304df56c5972c88c1
Signed-off-by: Florin Coras <fcoras@cisco.com>
src/plugins/unittest/svm_fifo_test.c
src/svm/svm_fifo.c
src/svm/svm_fifo.h

index d2afc36..7d83b10 100644 (file)
@@ -1456,8 +1456,403 @@ cleanup:
     }
 
   svm_fifo_free (f);
+  vec_free (test_data);
+  vec_free (data_buf);
+  return 0;
+}
+
+static int
+chunk_list_len (svm_fifo_chunk_t * c)
+{
+  svm_fifo_chunk_t *it;
+  int count = 0;
+
+  if (!c)
+    return 0;
+
+  count = 1;
+  it = c->next;
+  while (it && it != c)
+    {
+      it = it->next;
+      count++;
+    }
+  return count;
+}
+
+static void
+chunk_list_free (svm_fifo_chunk_t * c, svm_fifo_chunk_t * stop)
+{
+  svm_fifo_chunk_t *it, *next;
+
+  it = c;
+  while (it && it != stop)
+    {
+      next = it->next;
+      clib_mem_free (it);
+      it = next;
+    }
+}
+
+static void
+chunk_list_splice (svm_fifo_chunk_t * a, svm_fifo_chunk_t * b)
+{
+  svm_fifo_chunk_t *it;
+
+  it = a;
+  while (it->next)
+    it = it->next;
+  it->next = b;
+}
+
+static int
+sfifo_test_fifo_shrink (vlib_main_t * vm, unformat_input_t * input)
+{
+  int __clib_unused verbose = 0, fifo_size = 101, chunk_size = 100;
+  int i, rv, test_n_bytes, diff, deq_bytes;
+  svm_fifo_chunk_t *c, *prev, *collected;
+  u8 *test_data = 0, *data_buf = 0;
+  svm_fifo_t *f;
 
+  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (input, "verbose"))
+       verbose = 1;
+      else
+       {
+         vlib_cli_output (vm, "parse error: '%U'", format_unformat_error,
+                          input);
+         return -1;
+       }
+    }
+
+  /*
+   * Init fifo with multiple chunks
+   */
+  f = fifo_prepare (fifo_size);
+  svm_fifo_init_pointers (f, 0, 0);
+
+  prev = 0;
+  for (i = 0; i < 11; i++)
+    {
+      c = clib_mem_alloc (sizeof (svm_fifo_chunk_t) + chunk_size);
+      c->length = 100;
+      c->start_byte = ~0;
+      c->next = prev;
+      prev = c;
+    }
+
+  svm_fifo_add_chunk (f, c);
+  SFIFO_TEST (f->size == 12 * chunk_size + 1, "size expected %u is %u",
+             12 * chunk_size + 1, f->size);
+
+  /*
+   * No fifo wrap and no chunk used (one chunk)
+   */
+  rv = svm_fifo_reduce_size (f, chunk_size, 0);
+  SFIFO_TEST (rv == chunk_size, "len expected %u is %u", chunk_size, rv);
+  SFIFO_TEST (f->size == 12 * chunk_size + 1, "size expected %u is %u",
+             12 * chunk_size + 1, f->size);
+  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
+
+  /* Check enqueue space to force size reduction */
+  (void) svm_fifo_max_enqueue (f);
+
+  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
+             11 * chunk_size + 1, f->size);
+  SFIFO_TEST (f->flags & SVM_FIFO_F_COLLECT_CHUNKS, "collect flag should"
+             " be set");
+  SFIFO_TEST (!(f->flags & SVM_FIFO_F_SHRINK), "shrink flag should not be"
+             " set");
+
+  collected = c = svm_fifo_collect_chunks (f);
+  rv = chunk_list_len (c);
+  SFIFO_TEST (rv == 1, "expected %u chunks got %u", 1, rv);
+  rv = chunk_list_len (f->start_chunk);
+  SFIFO_TEST (rv == 11, "expected %u chunks got %u", 11, rv);
+  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
+             " not be set");
+
+  /*
+   * Fifo wrap and multiple chunks used
+   */
+
+  /* Init test data and fifo */
+  test_n_bytes = f->nitems;
+  vec_validate (test_data, test_n_bytes - 1);
+  vec_validate (data_buf, vec_len (test_data));
+
+  for (i = 0; i < vec_len (test_data); i++)
+    test_data[i] = i;
+
+  svm_fifo_init_pointers (f, f->size / 2, f->size / 2);
+  for (i = 0; i < test_n_bytes; i++)
+    {
+      rv = svm_fifo_enqueue (f, sizeof (u8), &test_data[i]);
+      if (rv < 0)
+       SFIFO_TEST (0, "enqueue returned");
+    }
+
+  /* Try to reduce fifo size with fifo full */
+  rv = svm_fifo_reduce_size (f, 3.5 * chunk_size, 0);
+  SFIFO_TEST (rv == 3 * chunk_size, "len expected %u is %u", 3 * chunk_size,
+             rv);
+  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
+             11 * chunk_size + 1, f->size);
+  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
+
+  /* Check enqueue space to try size reduction. Should not work */
+  rv = svm_fifo_max_enqueue (f);
+
+  SFIFO_TEST (rv == 0, "free space expected %u is %u", 0, rv);
+  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
+             11 * chunk_size + 1, f->size);
+  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
+  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
+             " not be set");
+
+  /* Dequeue byte-by-byte up to last byte on last chunk */
+  deq_bytes = f->size - f->size / 2 - 1;
+  for (i = 0; i < deq_bytes; i++)
+    {
+      (void) svm_fifo_max_enqueue (f);
+      rv = svm_fifo_dequeue (f, 1, &data_buf[i]);
+      if (rv < 0)
+       SFIFO_TEST (0, "dequeue returned");
+    }
+
+  rv = svm_fifo_max_enqueue (f);
+
+  /* We've dequeued more than 3*chunk_size so nitems should be updated */
+  SFIFO_TEST (f->nitems == 8 * chunk_size, "nitems expected %u is %u",
+             8 * chunk_size, f->nitems);
+  /* Free space should be what was dequeued - 3 * chunk_size, which was
+   * consumed by shrinking the fifo */
+  diff = deq_bytes - 3 * chunk_size;
+  SFIFO_TEST (rv == diff, "free space expected %u is %u", diff, rv);
+  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
+             11 * chunk_size + 1, f->size);
+  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
+  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
+             " not be set");
+
+  /* Dequeue one more such that head goes beyond last chunk */
+  rv = svm_fifo_dequeue (f, 1, &data_buf[deq_bytes]);
+  if (rv < 0)
+    SFIFO_TEST (0, "dequeue returned");
+
+  rv = svm_fifo_max_enqueue (f);
+  SFIFO_TEST (f->nitems == 8 * chunk_size, "nitems expected %u is %u",
+             8 * chunk_size, f->nitems);
+  SFIFO_TEST (rv == diff + 1, "free space expected %u is %u", diff + 1, rv);
+  SFIFO_TEST (f->size == 8 * chunk_size + 1, "size expected %u is %u",
+             8 * chunk_size + 1, f->size);
+  SFIFO_TEST (!(f->flags & SVM_FIFO_F_SHRINK), "shrink flag should not be"
+             " set");
+  SFIFO_TEST (f->flags & SVM_FIFO_F_COLLECT_CHUNKS, "collect flag should"
+             " be set");
+
+  /* Dequeue the rest of the data */
+  deq_bytes += 1;
+  for (i = 0; i < test_n_bytes - deq_bytes; i++)
+    {
+      rv = svm_fifo_dequeue (f, 1, &data_buf[i + deq_bytes]);
+      if (rv < 0)
+       SFIFO_TEST (0, "dequeue returned");
+    }
+
+  rv = svm_fifo_max_enqueue (f);
+
+  SFIFO_TEST (f->size == 8 * chunk_size + 1, "size expected %u is %u",
+             8 * chunk_size + 1, f->size);
+  SFIFO_TEST (rv == 8 * chunk_size, "free space expected %u is %u",
+             8 * chunk_size, rv);
+
+  rv = compare_data (data_buf, test_data, 0, vec_len (test_data),
+                    (u32 *) & i);
+  if (rv)
+    SFIFO_TEST (0, "[%d] dequeued %u expected %u", i, data_buf[i],
+               test_data[i]);
+
+  c = svm_fifo_collect_chunks (f);
+  rv = chunk_list_len (c);
+  SFIFO_TEST (rv == 3, "expected %u chunks got %u", 3, rv);
+  rv = chunk_list_len (f->start_chunk);
+  SFIFO_TEST (rv == 8, "expected %u chunks got %u", 8, rv);
+  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
+             " not be set");
+
+  /*
+   * OOO segment on chunk that should be removed
+   */
+
+  svm_fifo_add_chunk (f, c);
+  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
+             11 * chunk_size + 1, f->size);
+
+  memset (data_buf, 0, vec_len (data_buf));
+  svm_fifo_init_pointers (f, f->size / 2, f->size / 2);
+  svm_fifo_enqueue (f, 200, test_data);
+  svm_fifo_enqueue_with_offset (f, 50, vec_len (test_data) - 250,
+                               &test_data[250]);
+
+  /* Free space */
+  rv = svm_fifo_max_enqueue (f);
+  SFIFO_TEST (rv == vec_len (test_data) - 200, "free space expected %u is %u",
+             vec_len (test_data) - 200, rv);
+
+  /* Ask to reduce size */
+  rv = svm_fifo_reduce_size (f, 3.5 * chunk_size, 0);
+  SFIFO_TEST (rv == 3 * chunk_size, "len expected %u is %u", 3 * chunk_size,
+             rv);
+  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
+             11 * chunk_size + 1, f->size);
+  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
+
+  /* Try to force size reduction but it should fail */
+  rv = svm_fifo_max_enqueue (f);
+
+  SFIFO_TEST (rv == vec_len (test_data) - 200, "free space expected %u is %u",
+             vec_len (test_data) - 200, rv);
+  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
+             11 * chunk_size + 1, f->size);
+  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
+  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
+             " not be set");
+
+  /* Dequeue the in order data. This should shrink nitems */
+  rv = svm_fifo_dequeue (f, 200, data_buf);
+  if (rv < 0)
+    SFIFO_TEST (0, "dequeue returned");
+
+  rv = svm_fifo_max_enqueue (f);
+  SFIFO_TEST (rv == vec_len (test_data) - 200, "free space expected %u is %u",
+             vec_len (test_data) - 200, rv);
+  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
+             11 * chunk_size + 1, f->size);
+  SFIFO_TEST (f->nitems == 11 * chunk_size - 200, "nitems expected %u is %u",
+             11 * chunk_size - 200, f->nitems);
+  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
+  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
+             " not be set");
+
+  /* Enqueue the missing 50 bytes. Fifo will become full */
+  rv = svm_fifo_enqueue (f, 50, &test_data[200]);
+  SFIFO_TEST (rv == vec_len (test_data) - 200, "free space expected %u is %u",
+             vec_len (test_data) - 200, rv);
+
+  rv = svm_fifo_max_enqueue (f);
+
+  SFIFO_TEST (rv == 0, "free space expected %u is %u", 0, rv);
+  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
+             11 * chunk_size + 1, f->size);
+  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
+  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
+             " not be set");
+
+
+  /* Dequeue a chunk and check nitems shrink but fifo still full */
+  svm_fifo_dequeue (f, 100, &data_buf[200]);
+
+  rv = svm_fifo_max_enqueue (f);
+
+  SFIFO_TEST (rv == 0, "free space expected %u is %u", 0, rv);
+  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
+             11 * chunk_size + 1, f->size);
+  SFIFO_TEST (f->nitems == 11 * chunk_size - 300, "nitems expected %u is %u",
+             11 * chunk_size - 300, f->nitems);
+  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
+  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
+             " not be set");
+
+  /* Dequeue enough to unwrap the fifo */
+  deq_bytes = f->size - f->size / 2 - 300;
+  svm_fifo_dequeue (f, deq_bytes, &data_buf[300]);
+  rv = svm_fifo_max_enqueue (f);
+
+  /* Overall we've dequeued deq_bytes + 300, but fifo size shrunk 300 */
+  SFIFO_TEST (rv == 300 + deq_bytes - 300, "free space expected %u is %u",
+             300 + deq_bytes - 300, rv);
+  SFIFO_TEST (f->size == 8 * chunk_size + 1, "size expected %u is %u",
+             8 * chunk_size + 1, f->size);
+  SFIFO_TEST (f->nitems == 8 * chunk_size, "nitems expected %u is %u",
+             8 * chunk_size, f->nitems);
+  SFIFO_TEST (!(f->flags & SVM_FIFO_F_SHRINK), "shrink flag should not be"
+             " set");
+  SFIFO_TEST (f->flags & SVM_FIFO_F_COLLECT_CHUNKS, "collect flag should"
+             " be set");
+
+  /* Dequeue the rest */
+  svm_fifo_dequeue (f, test_n_bytes / 2, &data_buf[300 + deq_bytes]);
+  rv = compare_data (data_buf, test_data, 0, vec_len (test_data),
+                    (u32 *) & i);
+  if (rv)
+    SFIFO_TEST (0, "[%d] dequeued %u expected %u", i, data_buf[i],
+               test_data[i]);
+
+  c = svm_fifo_collect_chunks (f);
+  rv = chunk_list_len (c);
+  SFIFO_TEST (rv == 3, "expected %u chunks got %u", 3, rv);
+  rv = chunk_list_len (f->start_chunk);
+  SFIFO_TEST (rv == 8, "expected %u chunks got %u", 8, rv);
+  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
+             " not be set");
+
+  chunk_list_splice (collected, c);
+
+  /*
+   * Remove all chunks possible
+   */
+  svm_fifo_init_pointers (f, 601, 601);
+  rv = svm_fifo_reduce_size (f, 8 * chunk_size, 1);
+  SFIFO_TEST (rv == 7 * chunk_size, "actual len expected %u is %u",
+             7 * chunk_size, rv);
+  SFIFO_TEST (f->size == 6 * chunk_size + 1, "size expected %u is %u",
+             6 * chunk_size + 1, f->size);
+  SFIFO_TEST (f->nitems == 1 * chunk_size, "nitems expected %u is %u",
+             1 * chunk_size, f->nitems);
+  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
+  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
+             " not be set");
+
+  rv = svm_fifo_max_enqueue (f);
+  SFIFO_TEST (rv == chunk_size, "free space expected %u is %u", chunk_size,
+             rv);
+
+  /* Force head/tail to move to first chunk */
+  svm_fifo_enqueue (f, 1, test_data);
+  svm_fifo_dequeue (f, 1, data_buf);
+  rv = svm_fifo_max_enqueue (f);
+
+  SFIFO_TEST (rv == chunk_size, "free space expected %u is %u", chunk_size,
+             rv);
+  SFIFO_TEST (f->size == chunk_size + 1, "size expected %u is %u",
+             chunk_size + 1, f->size);
+  SFIFO_TEST (!(f->flags & SVM_FIFO_F_SHRINK), "shrink flag should not be"
+             " set");
+  SFIFO_TEST (f->flags & SVM_FIFO_F_COLLECT_CHUNKS, "collect flag should"
+             " be set");
+
+  c = svm_fifo_collect_chunks (f);
+  rv = chunk_list_len (c);
+  SFIFO_TEST (rv == 7, "expected %u chunks got %u", 7, rv);
+  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
+             " not be set");
+  SFIFO_TEST (!(f->flags & SVM_FIFO_F_MULTI_CHUNK), "multi-chunk flag should"
+             " not be set");
+
+  chunk_list_splice (collected, c);
+
+  /*
+   * Cleanup
+   */
+
+  chunk_list_free (f->start_chunk->next, f->start_chunk);
+  chunk_list_free (collected, 0);
+  svm_fifo_free (f);
+  vec_free (test_data);
   vec_free (data_buf);
+
   return 0;
 }
 
@@ -1876,6 +2271,8 @@ svm_fifo_test (vlib_main_t * vm, unformat_input_t * input,
        res = sfifo_test_fifo_replay (vm, input);
       else if (unformat (input, "grow"))
        res = sfifo_test_fifo_grow (vm, input);
+      else if (unformat (input, "shrink"))
+       res = sfifo_test_fifo_shrink (vm, input);
       else if (unformat (input, "segment"))
        res = sfifo_test_fifo_segment (vm, input);
       else if (unformat (input, "all"))
@@ -1934,6 +2331,9 @@ svm_fifo_test (vlib_main_t * vm, unformat_input_t * input,
          if ((res = sfifo_test_fifo_grow (vm, input)))
            goto done;
 
+         if ((res = sfifo_test_fifo_shrink (vm, input)))
+           goto done;
+
          str = "all";
          unformat_init_cstring (input, str);
          if ((res = sfifo_test_fifo_segment (vm, input)))
index d563877..e017b61 100644 (file)
@@ -135,7 +135,7 @@ svm_fifo_free_ooo_data (svm_fifo_t * f)
 }
 
 static inline ooo_segment_t *
-ooo_segment_get_prev (svm_fifo_t * f, ooo_segment_t * s)
+ooo_segment_prev (svm_fifo_t * f, ooo_segment_t * s)
 {
   if (s->prev == OOO_SEGMENT_INVALID_INDEX)
     return 0;
@@ -222,7 +222,7 @@ ooo_segment_add (svm_fifo_t * f, u32 offset, u32 head, u32 tail, u32 length)
     s = pool_elt_at_index (f->ooo_segments, s->next);
 
   /* If we have a previous and we overlap it, use it as starting point */
-  prev = ooo_segment_get_prev (f, s);
+  prev = ooo_segment_prev (f, s);
   if (prev
       && position_leq (f, offset_pos, ooo_segment_end_pos (f, prev), tail))
     {
@@ -374,6 +374,20 @@ ooo_segment_try_collect (svm_fifo_t * f, u32 n_bytes_enqueued, u32 * tail)
   return bytes;
 }
 
+static ooo_segment_t *
+ooo_segment_last (svm_fifo_t * f)
+{
+  ooo_segment_t *s;
+
+  if (f->ooos_list_head == OOO_SEGMENT_INVALID_INDEX)
+    return 0;
+
+  s = svm_fifo_first_ooo_segment (f);
+  while (s->next != OOO_SEGMENT_INVALID_INDEX)
+    s = pool_elt_at_index (f->ooo_segments, s->next);
+  return s;
+}
+
 void
 svm_fifo_init (svm_fifo_t * f, u32 size)
 {
@@ -434,8 +448,63 @@ svm_fifo_chunk_alloc (u32 size)
   return c;
 }
 
+static inline u8
+svm_fifo_chunk_includes_pos (svm_fifo_chunk_t * c, u32 pos)
+{
+  return (pos >= c->start_byte && pos < c->start_byte + c->length);
+}
+
+/**
+ * Find chunk for given byte position
+ *
+ * @param f    fifo
+ * @param pos  normalized position in fifo
+ *
+ * @return chunk that includes given position or 0
+ */
+static svm_fifo_chunk_t *
+svm_fifo_find_chunk (svm_fifo_t * f, u32 pos)
+{
+  rb_tree_t *rt = &f->chunk_lookup;
+  rb_node_t *cur, *prev;
+  svm_fifo_chunk_t *c;
+
+  cur = rb_node (rt, rt->root);
+  while (pos != cur->key)
+    {
+      prev = cur;
+      if (pos < cur->key)
+       cur = rb_node_left (rt, cur);
+      else
+       cur = rb_node_right (rt, cur);
+
+      if (rb_node_is_tnil (rt, cur))
+       {
+         /* Hit tnil as a left child. Find predecessor */
+         if (pos < prev->key)
+           {
+             cur = rb_tree_predecessor (rt, prev);
+             c = uword_to_pointer (cur->opaque, svm_fifo_chunk_t *);
+             if (svm_fifo_chunk_includes_pos (c, pos))
+               return c;
+             return 0;
+           }
+         /* Hit tnil as a right child. Check if this is the one */
+         c = uword_to_pointer (prev->opaque, svm_fifo_chunk_t *);
+         if (svm_fifo_chunk_includes_pos (c, pos))
+           return c;
+
+         return 0;
+       }
+    }
+
+  if (!rb_node_is_tnil (rt, cur))
+    return uword_to_pointer (cur->opaque, svm_fifo_chunk_t *);
+  return 0;
+}
+
 static inline void
-svm_fifo_size_update (svm_fifo_t * f, svm_fifo_chunk_t * c)
+svm_fifo_grow (svm_fifo_t * f, svm_fifo_chunk_t * c)
 {
   svm_fifo_chunk_t *prev;
   u32 add_bytes = 0;
@@ -458,13 +527,13 @@ svm_fifo_size_update (svm_fifo_t * f, svm_fifo_chunk_t * c)
 }
 
 static void
-svm_fifo_try_size_update (svm_fifo_t * f, u32 new_head)
+svm_fifo_try_grow (svm_fifo_t * f, u32 new_head)
 {
   if (new_head > f->tail)
     return;
 
-  svm_fifo_size_update (f, f->new_chunks);
-  f->flags &= ~SVM_FIFO_F_SIZE_UPDATE;
+  svm_fifo_grow (f, f->new_chunks);
+  f->flags &= ~SVM_FIFO_F_GROW;
 }
 
 void
@@ -472,7 +541,8 @@ svm_fifo_add_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c)
 {
   svm_fifo_chunk_t *cur, *prev;
 
-  /* Initialize rbtree if needed and add default chunk to it */
+  /* Initialize rbtree if needed and add default chunk to it. Expectation is
+   * that this is called with the heap where the rbtree's pool is pushed. */
   if (!(f->flags & SVM_FIFO_F_MULTI_CHUNK))
     {
       rb_tree_init (&f->chunk_lookup);
@@ -480,8 +550,7 @@ svm_fifo_add_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c)
       f->flags |= SVM_FIFO_F_MULTI_CHUNK;
     }
 
-  /* Initialize chunks and add to lookup rbtree. Expectation is that this is
-   * called with the heap where the rbtree's pool is pushed. */
+  /* Initialize chunks and add to lookup rbtree */
   cur = c;
   if (f->new_chunks)
     {
@@ -506,7 +575,7 @@ svm_fifo_add_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c)
   if (!svm_fifo_is_wrapped (f))
     {
       ASSERT (!f->new_chunks);
-      svm_fifo_size_update (f, c);
+      svm_fifo_grow (f, c);
       return;
     }
 
@@ -514,63 +583,129 @@ svm_fifo_add_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c)
   if (!f->new_chunks)
     {
       f->new_chunks = c;
-      f->flags |= SVM_FIFO_F_SIZE_UPDATE;
+      f->flags |= SVM_FIFO_F_GROW;
     }
 }
 
-static inline u8
-svm_fifo_chunk_includes_pos (svm_fifo_chunk_t * c, u32 pos)
+/**
+ * Removes chunks that are after fifo end byte
+ */
+svm_fifo_chunk_t *
+svm_fifo_collect_chunks (svm_fifo_t * f)
 {
-  return (pos >= c->start_byte && pos < c->start_byte + c->length);
+  svm_fifo_chunk_t *list, *cur;
+
+  f->flags &= ~SVM_FIFO_F_COLLECT_CHUNKS;
+
+  list = f->new_chunks;
+  f->new_chunks = 0;
+  cur = list;
+  while (cur)
+    {
+      rb_tree_del (&f->chunk_lookup, cur->start_byte);
+      cur = cur->next;
+    }
+
+  return list;
+}
+
+void
+svm_fifo_try_shrink (svm_fifo_t * f, u32 head, u32 tail)
+{
+  u32 len_to_shrink = 0, tail_pos, len;
+  svm_fifo_chunk_t *cur, *prev, *next, *start;
+
+  tail_pos = tail;
+  if (f->ooos_list_head != OOO_SEGMENT_INVALID_INDEX)
+    {
+      ooo_segment_t *last = ooo_segment_last (f);
+      tail_pos = ooo_segment_end_pos (f, last);
+    }
+
+  if (f->size_decrement)
+    {
+      /* Figure out available free space considering that there may be
+       * ooo segments */
+      len = clib_min (f->size_decrement, f_free_count (f, head, tail_pos));
+      f->nitems -= len;
+      f->size_decrement -= len;
+    }
+
+  /* Remove tail chunks if the following hold:
+   * - not wrapped
+   * - last used byte less than start of last chunk
+   */
+  if (tail_pos >= head && tail_pos <= f->end_chunk->start_byte)
+    {
+      /* Lookup the last position not to be removed. Since size still needs
+       * to be nitems + 1, nitems must fall within the usable space */
+      tail_pos = tail_pos > 0 ? tail_pos - 1 : tail_pos;
+      prev = svm_fifo_find_chunk (f, clib_max (f->nitems, tail_pos));
+      next = prev->next;
+      while (next != f->start_chunk)
+       {
+         cur = next;
+         next = cur->next;
+         len_to_shrink += cur->length;
+       }
+      if (len_to_shrink)
+       {
+         f->size -= len_to_shrink;
+         start = prev->next;
+         prev->next = f->start_chunk;
+         f->end_chunk = prev;
+         cur->next = f->new_chunks;
+         f->new_chunks = start;
+       }
+    }
+
+  if (!f->size_decrement && f->size == f->nitems + 1)
+    {
+      f->flags &= ~SVM_FIFO_F_SHRINK;
+      f->flags |= SVM_FIFO_F_COLLECT_CHUNKS;
+      if (f->start_chunk == f->start_chunk->next)
+       f->flags &= ~SVM_FIFO_F_MULTI_CHUNK;
+    }
 }
 
 /**
- * Find chunk for given byte position
- *
- * @param f    fifo
- * @param pos  normalized position in fifo
- *
- * @return chunk that includes given position or 0
+ * Request to reduce fifo size by amount of bytes
  */
-static svm_fifo_chunk_t *
-svm_fifo_find_chunk (svm_fifo_t * f, u32 pos)
+int
+svm_fifo_reduce_size (svm_fifo_t * f, u32 len, u8 try_shrink)
 {
-  rb_tree_t *rt = &f->chunk_lookup;
-  rb_node_t *cur, *prev;
-  svm_fifo_chunk_t *c;
+  svm_fifo_chunk_t *cur;
+  u32 actual_len = 0;
 
-  cur = rb_node (rt, rt->root);
-  while (pos != cur->key)
+  if (len > f->nitems)
+    return 0;
+
+  /* last chunk that will not be removed */
+  cur = svm_fifo_find_chunk (f, f->nitems - len);
+
+  /* sum length of chunks that will be removed */
+  cur = cur->next;
+  while (cur != f->start_chunk)
     {
-      prev = cur;
-      if (pos < cur->key)
-       cur = rb_node_left (rt, cur);
-      else
-       cur = rb_node_right (rt, cur);
+      actual_len += cur->length;
+      cur = cur->next;
+    }
 
-      if (rb_node_is_tnil (rt, cur))
-       {
-         /* Hit tnil as a left child. Find predecessor */
-         if (pos < prev->key)
-           {
-             cur = rb_tree_predecessor (rt, prev);
-             c = uword_to_pointer (cur->opaque, svm_fifo_chunk_t *);
-             if (svm_fifo_chunk_includes_pos (c, pos))
-               return c;
-             return 0;
-           }
-         /* Hit tnil as a right child. Check if this is the one */
-         c = uword_to_pointer (prev->opaque, svm_fifo_chunk_t *);
-         if (svm_fifo_chunk_includes_pos (c, pos))
-           return c;
+  ASSERT (actual_len <= len);
+  if (!actual_len)
+    return 0;
 
-         return 0;
-       }
+  f->size_decrement = actual_len;
+  f->flags |= SVM_FIFO_F_SHRINK;
+
+  if (try_shrink)
+    {
+      u32 head, tail;
+      f_load_head_tail_prod (f, &head, &tail);
+      svm_fifo_try_shrink (f, head, tail);
     }
 
-  if (!rb_node_is_tnil (rt, cur))
-    return uword_to_pointer (cur->opaque, svm_fifo_chunk_t *);
-  return 0;
+  return actual_len;
 }
 
 void
@@ -660,6 +795,9 @@ svm_fifo_enqueue_with_offset (svm_fifo_t * f, u32 offset, u32 len, u8 * src)
 
   f_load_head_tail_prod (f, &head, &tail);
 
+  if (PREDICT_FALSE (f->flags & SVM_FIFO_F_SHRINK))
+    svm_fifo_try_shrink (f, head, tail);
+
   /* free space in fifo can only increase during enqueue: SPSC */
   free_count = f_free_count (f, head, tail);
 
@@ -680,6 +818,22 @@ svm_fifo_enqueue_with_offset (svm_fifo_t * f, u32 offset, u32 len, u8 * src)
   return 0;
 }
 
+/**
+ * Advance tail
+ */
+void
+svm_fifo_enqueue_nocopy (svm_fifo_t * f, u32 len)
+{
+  u32 tail;
+
+  ASSERT (len <= svm_fifo_max_enqueue_prod (f));
+  /* load-relaxed: producer owned index */
+  tail = f->tail;
+  tail = (tail + len) % f->size;
+  /* store-rel: producer owned index (paired with load-acq in consumer) */
+  clib_atomic_store_rel_n (&f->tail, tail);
+}
+
 int
 svm_fifo_dequeue (svm_fifo_t * f, u32 len, u8 * dst)
 {
@@ -697,8 +851,8 @@ svm_fifo_dequeue (svm_fifo_t * f, u32 len, u8 * dst)
   svm_fifo_copy_from_chunk (f, f->head_chunk, head, dst, len, &f->head_chunk);
   head = (head + len) % f->size;
 
-  if (PREDICT_FALSE (f->flags & SVM_FIFO_F_SIZE_UPDATE))
-    svm_fifo_try_size_update (f, head);
+  if (PREDICT_FALSE (f->flags & SVM_FIFO_F_GROW))
+    svm_fifo_try_grow (f, head);
 
   /* store-rel: consumer owned index (paired with load-acq in producer) */
   clib_atomic_store_rel_n (&f->head, head);
index 390e117..417b0ec 100644 (file)
@@ -64,9 +64,11 @@ typedef struct svm_fifo_chunk_
 
 typedef enum svm_fifo_flag_
 {
-  SVM_FIFO_F_SIZE_UPDATE = 1 << 0,
-  SVM_FIFO_F_MULTI_CHUNK = 1 << 1,
-  SVM_FIFO_F_LL_TRACKED = 1 << 2,
+  SVM_FIFO_F_MULTI_CHUNK = 1 << 0,
+  SVM_FIFO_F_GROW = 1 << 1,
+  SVM_FIFO_F_SHRINK = 1 << 2,
+  SVM_FIFO_F_COLLECT_CHUNKS = 1 << 3,
+  SVM_FIFO_F_LL_TRACKED = 1 << 4,
 } svm_fifo_flag_t;
 
 typedef struct _svm_fifo
@@ -86,12 +88,13 @@ typedef struct _svm_fifo
   u32 client_session_index;    /**< app session index */
   u8 master_thread_index;      /**< session layer thread index */
   u8 client_thread_index;      /**< app worker index */
+  i8 refcnt;                   /**< reference count  */
   u32 segment_manager;         /**< session layer segment manager index */
   u32 segment_index;           /**< segment index in segment manager */
   u32 freelist_index;          /**< aka log2(allocated_size) - const. */
-  i8 refcnt;                   /**< reference count  */
   struct _svm_fifo *next;      /**< next in freelist/active chain */
   struct _svm_fifo *prev;      /**< prev in active chain */
+  u32 size_decrement;          /**< bytes to remove from fifo */
 
     CLIB_CACHE_LINE_ALIGN_MARK (consumer);
   u32 head;                    /**< fifo head position/byte */
@@ -172,7 +175,8 @@ f_load_head_tail_prod (svm_fifo_t * f, u32 * head, u32 * tail)
   *head = clib_atomic_load_acq_n (&f->head);
 }
 
-/* Load head and tail independent of producer/consumer role
+/**
+ * Load head and tail independent of producer/consumer role
  *
  * Internal function.
  */
@@ -229,6 +233,13 @@ f_free_count (svm_fifo_t * f, u32 head, u32 tail)
   return (f->nitems - f_cursize (f, head, tail));
 }
 
+/**
+ * Try to shrink fifo size.
+ *
+ * Internal function.
+ */
+void svm_fifo_try_shrink (svm_fifo_t * f, u32 head, u32 tail);
+
 /**
  * Create fifo of requested size
  *
@@ -266,6 +277,31 @@ svm_fifo_chunk_t *svm_fifo_chunk_alloc (u32 size);
  * @param c    chunk or linked list of chunks to be added
  */
 void svm_fifo_add_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c);
+/**
+ * Request to reduce fifo size by amount of bytes
+ *
+ * Because the producer might be enqueuing data when this is called, the
+ * actual size update is only applied when producer tries to enqueue new
+ * data, unless @param try_shrink is set.
+ *
+ * @param f            fifo
+ * @param len          number of bytes to remove from fifo. The actual number
+ *                     of bytes to be removed will be less or equal to this
+ *                     value.
+ * @param try_shrink   flg to indicate if it's safe to try to shrink fifo
+ *                     size. It should be set only if this is called by the
+ *                     producer of if the producer is not using the fifo
+ * @return             actual length fifo size will be reduced by
+ */
+int svm_fifo_reduce_size (svm_fifo_t * f, u32 len, u8 try_shrink);
+/**
+ * Removes chunks that are after fifo end byte
+ *
+ * Needs to be called with segment heap pushed.
+ *
+ * @param f fifo
+ */
+svm_fifo_chunk_t *svm_fifo_collect_chunks (svm_fifo_t * f);
 /**
  * Free fifo and associated state
  *
@@ -333,6 +369,16 @@ int svm_fifo_enqueue (svm_fifo_t * f, u32 len, const u8 * src);
  */
 int svm_fifo_enqueue_with_offset (svm_fifo_t * f, u32 offset, u32 len,
                                  u8 * src);
+
+/**
+ * Advance tail pointer
+ *
+ * Useful for moving tail pointer after external enqueue.
+ *
+ * @param f            fifo
+ * @param len          number of bytes to add to tail
+ */
+void svm_fifo_enqueue_nocopy (svm_fifo_t * f, u32 len);
 /**
  * Overwrite fifo head with new data
  *
@@ -551,6 +597,8 @@ svm_fifo_max_enqueue_prod (svm_fifo_t * f)
 {
   u32 head, tail;
   f_load_head_tail_prod (f, &head, &tail);
+  if (PREDICT_FALSE (f->flags & SVM_FIFO_F_SHRINK))
+    svm_fifo_try_shrink (f, head, tail);
   return f_free_count (f, head, tail);
 }
 
@@ -565,6 +613,8 @@ svm_fifo_max_enqueue (svm_fifo_t * f)
 {
   u32 head, tail;
   f_load_head_tail_all_acq (f, &head, &tail);
+  if (PREDICT_FALSE (f->flags & SVM_FIFO_F_SHRINK))
+    svm_fifo_try_shrink (f, head, tail);
   return f_free_count (f, head, tail);
 }
 
@@ -590,25 +640,6 @@ svm_fifo_max_write_chunk (svm_fifo_t * f)
   return tail > head ? f->size - tail : f_free_count (f, head, tail);
 }
 
-/**
- * Advance tail pointer
- *
- * Useful for moving tail pointer after external enqueue.
- *
- * @param f            fifo
- * @param len          number of bytes to add to tail
- */
-static inline void
-svm_fifo_enqueue_nocopy (svm_fifo_t * f, u32 len)
-{
-  ASSERT (len <= svm_fifo_max_enqueue_prod (f));
-  /* load-relaxed: producer owned index */
-  u32 tail = f->tail;
-  tail = (tail + len) % f->size;
-  /* store-rel: producer owned index (paired with load-acq in consumer) */
-  clib_atomic_store_rel_n (&f->tail, tail);
-}
-
 static inline u8 *
 svm_fifo_head (svm_fifo_t * f)
 {