svm: fifo ooo reads/writes with multiple chunks 69/19069/13
authorFlorin Coras <fcoras@cisco.com>
Fri, 19 Apr 2019 22:56:00 +0000 (15:56 -0700)
committerDamjan Marion <dmarion@me.com>
Wed, 24 Apr 2019 11:31:55 +0000 (11:31 +0000)
Change-Id: If23a04623a7138c9f6c98ee9ecfa587396618a60
Signed-off-by: Florin Coras <fcoras@cisco.com>
src/plugins/unittest/svm_fifo_test.c
src/svm/svm_fifo.c
src/svm/svm_fifo.h
src/svm/svm_fifo_segment.c
src/vppinfra/rbtree.c
src/vppinfra/rbtree.h

index 14ebfca..af806e4 100644 (file)
@@ -645,7 +645,7 @@ tcp_test_fifo3 (vlib_main_t * vm, unformat_input_t * input)
 
   /* manually set head and tail pointers to validate modular arithmetic */
   fifo_initial_offset = fifo_initial_offset % fifo_size;
-  svm_fifo_init_pointers (f, fifo_initial_offset);
+  svm_fifo_init_pointers (f, fifo_initial_offset, fifo_initial_offset);
 
   for (i = !randomize; i < vec_len (generate); i++)
     {
@@ -759,7 +759,7 @@ tcp_test_fifo4 (vlib_main_t * vm, unformat_input_t * input)
 
   /* Set head and tail pointers */
   fifo_initial_offset = fifo_initial_offset % fifo_size;
-  svm_fifo_init_pointers (f, fifo_initial_offset);
+  svm_fifo_init_pointers (f, fifo_initial_offset, fifo_initial_offset);
 
   vec_validate (test_data, test_n_bytes - 1);
   for (i = 0; i < vec_len (test_data); i++)
@@ -825,7 +825,7 @@ tcp_test_fifo5 (vlib_main_t * vm, unformat_input_t * input)
     }
 
   f = fifo_prepare (fifo_size);
-  svm_fifo_init_pointers (f, offset);
+  svm_fifo_init_pointers (f, offset, offset);
 
   vec_validate (test_data, 399);
   for (i = 0; i < vec_len (test_data); i++)
@@ -934,10 +934,11 @@ tcp_test_fifo5 (vlib_main_t * vm, unformat_input_t * input)
 static int
 tcp_test_fifo_grow (vlib_main_t * vm, unformat_input_t * input)
 {
-  int __clib_unused verbose, fifo_size = 201, start_offset = 100, i;
+  int verbose = 0, fifo_size = 201, start_offset = 100, i, j, rv;
+  int test_n_bytes, deq_bytes, enq_bytes, n_deqs, n_enqs;
   svm_fifo_chunk_t *c, *next, *prev;
+  u8 *test_data = 0, *data_buf = 0;
   svm_fifo_t *f;
-  u8 *buf = 0;
 
   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
     {
@@ -952,7 +953,7 @@ tcp_test_fifo_grow (vlib_main_t * vm, unformat_input_t * input)
     }
 
   f = fifo_prepare (fifo_size);
-  svm_fifo_init_pointers (f, start_offset);
+  svm_fifo_init_pointers (f, start_offset, start_offset);
 
   /*
    * Add with fifo not wrapped
@@ -973,7 +974,7 @@ tcp_test_fifo_grow (vlib_main_t * vm, unformat_input_t * input)
    *  Add with fifo wrapped
    */
 
-  f->tail = f->head + f->nitems;
+  svm_fifo_init_pointers (f, f->nitems - 100, f->nitems + 100);
   c = clib_mem_alloc (sizeof (svm_fifo_chunk_t) + 100);
   c->length = 100;
   c->start_byte = ~0;
@@ -984,14 +985,14 @@ tcp_test_fifo_grow (vlib_main_t * vm, unformat_input_t * input)
   SFIFO_TEST (f->end_chunk != c, "tail chunk should not be updated");
   SFIFO_TEST (f->size == fifo_size + 100, "size expected %u is %u",
              fifo_size + 100, f->size);
-  SFIFO_TEST (c->start_byte == ~0, "start byte expected %u is %u", ~0,
-             c->start_byte);
+  SFIFO_TEST (c->start_byte == fifo_size + 100, "start byte expected %u is "
+             " %u", fifo_size + 100, c->start_byte);
 
   /*
    * Unwrap fifo
    */
-  vec_validate (buf, 200);
-  svm_fifo_dequeue_nowait (f, 201, buf);
+  vec_validate (data_buf, 200);
+  svm_fifo_dequeue_nowait (f, 201, data_buf);
 
   SFIFO_TEST (f->end_chunk == c, "tail chunk should be updated");
   SFIFO_TEST (f->size == fifo_size + 200, "size expected %u is %u",
@@ -1002,9 +1003,7 @@ tcp_test_fifo_grow (vlib_main_t * vm, unformat_input_t * input)
   /*
    * Add N chunks
    */
-
-  f->head = f->nitems - 100;
-  f->tail = f->head + f->nitems;
+  svm_fifo_init_pointers (f, f->nitems - 100, f->nitems + 100);
 
   prev = 0;
   for (i = 0; i < 5; i++)
@@ -1034,14 +1033,160 @@ tcp_test_fifo_grow (vlib_main_t * vm, unformat_input_t * input)
   SFIFO_TEST (f->size == fifo_size + 200, "size expected %u is %u",
              fifo_size + 200, f->size);
 
-  svm_fifo_dequeue_nowait (f, 201, buf);
+  svm_fifo_dequeue_nowait (f, 201, data_buf);
 
   SFIFO_TEST (f->size == fifo_size + 200 + 10 * 100, "size expected %u is %u",
              fifo_size + 200 + 10 * 100, f->size);
+  /*
+   * Enqueue/dequeue tests
+   */
+
+  test_n_bytes = f->nitems;
+  vec_validate (test_data, test_n_bytes - 1);
+  vec_validate (data_buf, vec_len (test_data));
+  n_deqs = n_enqs = 6;
+  deq_bytes = enq_bytes = vec_len (test_data) / n_deqs;
+
+  for (i = 0; i < vec_len (test_data); i++)
+    test_data[i] = i;
+
+  /*
+   * Enqueue/deq boundary conditions
+   */
+  svm_fifo_init_pointers (f, 201, 201);
+  SFIFO_TEST (f->tail_chunk->start_byte == 201, "start byte expected %u is "
+             "%u", 201, f->tail_chunk->start_byte);
+
+  svm_fifo_enqueue_nowait (f, 200, test_data);
+  SFIFO_TEST (f->tail_chunk->start_byte == 401, "start byte expected %u is "
+             "%u", 401, f->tail_chunk->start_byte);
+
+  svm_fifo_dequeue_nowait (f, 200, data_buf);
+  SFIFO_TEST (f->head_chunk->start_byte == 401, "start byte expected %u is "
+             "%u", 401, f->head_chunk->start_byte);
+
+  /*
+   * Simple enqueue/deq and data validation (1)
+   */
+  svm_fifo_init_pointers (f, f->nitems / 2, f->nitems / 2);
+  for (i = 0; i < test_n_bytes; i++)
+    {
+      rv = svm_fifo_enqueue_nowait (f, sizeof (u8), &test_data[i]);
+      if (rv < 0)
+       {
+         clib_warning ("enqueue returned %d", rv);
+         goto cleanup;
+       }
+    }
+
+  SFIFO_TEST (svm_fifo_max_dequeue (f) == test_n_bytes, "max deq expected %u "
+             "is %u", test_n_bytes, svm_fifo_max_dequeue (f));
+
+  for (i = 0; i < test_n_bytes; i++)
+    svm_fifo_dequeue_nowait (f, 1, &data_buf[i]);
+
+  rv = compare_data (data_buf, test_data, 0, vec_len (test_data),
+                    (u32 *) & j);
+  if (rv)
+    vlib_cli_output (vm, "[%d] dequeued %u expected %u", j, data_buf[j],
+                    test_data[j]);
+  SFIFO_TEST ((rv == 0), "dequeued compared to original returned %d", rv);
+
+  /*
+   * Simple enqueue/deq and data validation (2)
+   */
+  for (i = 0; i <= n_enqs; i++)
+    {
+      rv = svm_fifo_enqueue_nowait (f, enq_bytes, test_data + i * enq_bytes);
+      if (rv < 0)
+       {
+         clib_warning ("enqueue returned %d", rv);
+         goto cleanup;
+       }
+    }
+
+  SFIFO_TEST (svm_fifo_max_dequeue (f) == test_n_bytes, "max deq expected %u "
+             "is %u", test_n_bytes, svm_fifo_max_dequeue (f));
+
+  for (i = 0; i <= n_deqs; i++)
+    svm_fifo_dequeue_nowait (f, deq_bytes, data_buf + i * deq_bytes);
+
+  rv = compare_data (data_buf, test_data, 0, vec_len (test_data),
+                    (u32 *) & j);
+  if (rv)
+    vlib_cli_output (vm, "[%d] dequeued %u expected %u", j, data_buf[j],
+                    test_data[j]);
+  SFIFO_TEST ((rv == 0), "dequeued compared to original returned %d", rv);
+
+  /*
+   * OOO enqueues/dequeues and data validation (1)
+   */
+  for (i = test_n_bytes - 1; i > 0; i--)
+    {
+      rv = svm_fifo_enqueue_with_offset (f, i, sizeof (u8), &test_data[i]);
+      if (verbose)
+       vlib_cli_output (vm, "add [%d] [%d, %d]", i, i, i + sizeof (u8));
+      if (rv)
+       {
+         clib_warning ("enqueue returned %d", rv);
+         goto cleanup;
+       }
+    }
+
+  SFIFO_TEST (svm_fifo_max_dequeue (f) == 0, "max deq expected %u is %u",
+             0, svm_fifo_max_dequeue (f));
+
+  svm_fifo_enqueue_nowait (f, sizeof (u8), &test_data[0]);
+
+  memset (data_buf, 0, vec_len (data_buf));
+  for (i = 0; i <= n_deqs; i++)
+    svm_fifo_dequeue_nowait (f, deq_bytes, data_buf + i * deq_bytes);
+
+  rv = compare_data (data_buf, test_data, 0, vec_len (test_data),
+                    (u32 *) & j);
+  if (rv)
+    vlib_cli_output (vm, "[%d] dequeued %u expected %u", j, data_buf[j],
+                    test_data[j]);
+  SFIFO_TEST ((rv == 0), "dequeued compared to original returned %d", rv);
+
+  /*
+   * OOO enqueues/dequeues and data validation (2)
+   */
+
+  for (i = n_enqs; i > 0; i--)
+    {
+      u32 enq_now = clib_min (enq_bytes, vec_len (test_data) - i * enq_bytes);
+      rv = svm_fifo_enqueue_with_offset (f, i * enq_bytes, enq_now,
+                                        test_data + i * enq_bytes);
+      if (verbose)
+       vlib_cli_output (vm, "add [%d, %d]", i * enq_bytes,
+                        i * enq_bytes + enq_now);
+      if (rv)
+       {
+         clib_warning ("enqueue returned %d", rv);
+         goto cleanup;
+       }
+    }
+
+  svm_fifo_enqueue_nowait (f, enq_bytes, &test_data[0]);
+
+  memset (data_buf, 0, vec_len (data_buf));
+  for (i = 0; i <= n_deqs; i++)
+    svm_fifo_dequeue_nowait (f, deq_bytes, data_buf + i * deq_bytes);
+
+  rv = compare_data (data_buf, test_data, 0, vec_len (test_data),
+                    (u32 *) & j);
+  if (rv)
+    vlib_cli_output (vm, "[%d] dequeued %u expected %u", j, data_buf[j],
+                    test_data[j]);
+  SFIFO_TEST ((rv == 0), "dequeued compared to original returned %d", rv);
 
   /*
    * Cleanup
    */
+
+cleanup:
+
   c = f->start_chunk->next;
   while (c && c != f->start_chunk)
     {
@@ -1052,7 +1197,7 @@ tcp_test_fifo_grow (vlib_main_t * vm, unformat_input_t * input)
 
   svm_fifo_free (f);
 
-  vec_free (buf);
+  vec_free (data_buf);
   return 0;
 }
 
index f457128..a335519 100644 (file)
@@ -22,7 +22,7 @@
 
 CLIB_MARCH_FN (svm_fifo_copy_to_chunk, void, svm_fifo_t * f,
               svm_fifo_chunk_t * c, u32 tail_idx, const u8 * src, u32 len,
-              u8 update_tail)
+              svm_fifo_chunk_t ** last)
 {
   u32 n_chunk;
 
@@ -30,18 +30,19 @@ CLIB_MARCH_FN (svm_fifo_copy_to_chunk, void, svm_fifo_t * f,
 
   tail_idx -= c->start_byte;
   n_chunk = c->length - tail_idx;
-  if (n_chunk < len)
+  if (n_chunk <= len)
     {
       u32 to_copy = len;
       clib_memcpy_fast (&c->data[tail_idx], src, n_chunk);
+      c = c->next;
       while ((to_copy -= n_chunk))
        {
-         c = c->next;
          n_chunk = clib_min (c->length, to_copy);
          clib_memcpy_fast (&c->data[0], src + (len - to_copy), n_chunk);
+         c = c->length <= to_copy ? c->next : c;
        }
-      if (update_tail)
-       f->tail_chunk = c;
+      if (*last)
+       *last = c;
     }
   else
     {
@@ -51,24 +52,27 @@ CLIB_MARCH_FN (svm_fifo_copy_to_chunk, void, svm_fifo_t * f,
 
 CLIB_MARCH_FN (svm_fifo_copy_from_chunk, void, svm_fifo_t * f,
               svm_fifo_chunk_t * c, u32 head_idx, u8 * dst, u32 len,
-              u8 update_head)
+              svm_fifo_chunk_t ** last)
 {
   u32 n_chunk;
 
+  ASSERT (head_idx >= c->start_byte && head_idx < c->start_byte + c->length);
+
   head_idx -= c->start_byte;
   n_chunk = c->length - head_idx;
-  if (n_chunk < len)
+  if (n_chunk <= len)
     {
       u32 to_copy = len;
       clib_memcpy_fast (dst, &c->data[head_idx], n_chunk);
+      c = c->next;
       while ((to_copy -= n_chunk))
        {
-         c = c->next;
          n_chunk = clib_min (c->length, to_copy);
          clib_memcpy_fast (dst + (len - to_copy), &c->data[0], n_chunk);
+         c = c->length <= to_copy ? c->next : c;
        }
-      if (update_head)
-       f->head_chunk = c;
+      if (*last)
+       *last = c;
     }
   else
     {
@@ -80,18 +84,18 @@ CLIB_MARCH_FN (svm_fifo_copy_from_chunk, void, svm_fifo_t * f,
 
 static inline void
 svm_fifo_copy_to_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c, u32 tail_idx,
-                       const u8 * src, u32 len, u8 update_tail)
+                       const u8 * src, u32 len, svm_fifo_chunk_t ** last)
 {
   CLIB_MARCH_FN_SELECT (svm_fifo_copy_to_chunk) (f, c, tail_idx, src, len,
-                                                update_tail);
+                                                last);
 }
 
 static inline void
 svm_fifo_copy_from_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c, u32 head_idx,
-                         u8 * dst, u32 len, u8 update_head)
+                         u8 * dst, u32 len, svm_fifo_chunk_t ** last)
 {
   CLIB_MARCH_FN_SELECT (svm_fifo_copy_from_chunk) (f, c, head_idx, dst, len,
-                                                  update_head);
+                                                  last);
 }
 
 static inline u8
@@ -290,8 +294,8 @@ svm_fifo_init (svm_fifo_t * f, u32 size)
   f->refcnt = 1;
   f->default_chunk.start_byte = 0;
   f->default_chunk.length = f->size;
-  f->default_chunk.next = f->start_chunk = &f->default_chunk;
-  f->end_chunk = f->head_chunk = f->tail_chunk = f->start_chunk;
+  f->default_chunk.next = f->start_chunk = f->end_chunk = &f->default_chunk;
+  f->head_chunk = f->tail_chunk = f->ooo_enq = f->ooo_deq = f->start_chunk;
 }
 
 /** create an svm fifo, in the current heap. Fails vs blow up the process */
@@ -319,12 +323,13 @@ svm_fifo_size_update (svm_fifo_t * f, svm_fifo_chunk_t * c)
   svm_fifo_chunk_t *prev;
   u32 add_bytes = 0;
 
-  prev = f->end_chunk;
+  if (!c)
+    return;
+
+  f->end_chunk->next = c;
   while (c)
     {
-      c->start_byte = prev->start_byte + prev->length;
       add_bytes += c->length;
-      prev->next = c;
       prev = c;
       c = c->next;
     }
@@ -348,26 +353,112 @@ svm_fifo_try_size_update (svm_fifo_t * f, u32 new_head)
 void
 svm_fifo_add_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c)
 {
-  if (svm_fifo_is_wrapped (f))
+  svm_fifo_chunk_t *cur, *prev;
+
+  /* Initialize rbtree if needed and add default chunk to it */
+  if (!(f->flags & SVM_FIFO_F_MULTI_CHUNK))
     {
-      if (f->new_chunks)
-       {
-         svm_fifo_chunk_t *prev;
+      rb_tree_init (&f->chunk_lookup);
+      rb_tree_add2 (&f->chunk_lookup, 0, pointer_to_uword (f->start_chunk));
+      f->flags |= SVM_FIFO_F_MULTI_CHUNK;
+    }
 
-         prev = f->new_chunks;
-         while (prev->next)
-           prev = prev->next;
-         prev->next = c;
-       }
+  /* Initialize chunks and add to lookup rbtree. Expectation is that this is
+   * called with the heap where the rbtree's pool is pushed. */
+  cur = c;
+  if (f->new_chunks)
+    {
+      prev = f->new_chunks;
+      while (prev->next)
+       prev = prev->next;
+      prev->next = c;
+    }
+  else
+    prev = f->end_chunk;
+
+  while (cur)
+    {
+      cur->start_byte = prev->start_byte + prev->length;
+      rb_tree_add2 (&f->chunk_lookup, cur->start_byte,
+                   pointer_to_uword (cur));
+      prev = cur;
+      cur = cur->next;
+    }
+
+  /* If fifo is not wrapped, update the size now */
+  if (!svm_fifo_is_wrapped (f))
+    {
+      ASSERT (!f->new_chunks);
+      svm_fifo_size_update (f, c);
+      return;
+    }
+
+  /* Postpone size update */
+  if (!f->new_chunks)
+    {
+      f->new_chunks = c;
+      f->flags |= SVM_FIFO_F_SIZE_UPDATE;
+    }
+}
+
+static inline u8
+svm_fifo_chunk_includes_pos (svm_fifo_chunk_t * c, u32 pos)
+{
+  return (pos >= c->start_byte && pos < c->start_byte + c->length);
+}
+
+/**
+ * Find chunk for given byte position
+ *
+ * @param f    fifo
+ * @param pos  normalized position in fifo
+ *
+ * @return chunk that includes given position or 0
+ */
+static svm_fifo_chunk_t *
+svm_fifo_find_chunk (svm_fifo_t * f, u32 pos)
+{
+  rb_tree_t *rt = &f->chunk_lookup;
+  rb_node_t *cur, *prev;
+  svm_fifo_chunk_t *c;
+
+  cur = rb_node (rt, rt->root);
+  while (pos != cur->key)
+    {
+      prev = cur;
+      if (pos < cur->key)
+       cur = rb_node_left (rt, cur);
       else
+       cur = rb_node_right (rt, cur);
+
+      if (rb_node_is_tnil (rt, cur))
        {
-         f->new_chunks = c;
+         /* Hit tnil as a left child. Find predecessor */
+         if (pos < prev->key)
+           {
+             cur = rb_tree_predecessor (rt, prev);
+             c = uword_to_pointer (cur->opaque, svm_fifo_chunk_t *);
+             if (svm_fifo_chunk_includes_pos (c, pos))
+               return c;
+             return 0;
+           }
+         /* Hit tnil as a right child. Check if this is the one, otherwise
+          * search for successor */
+         c = uword_to_pointer (prev->opaque, svm_fifo_chunk_t *);
+         if (svm_fifo_chunk_includes_pos (c, pos))
+           return c;
+
+         cur = rb_tree_successor (rt, prev);
+         c = uword_to_pointer (cur->opaque, svm_fifo_chunk_t *);
+         if (svm_fifo_chunk_includes_pos (c, pos))
+           return c;
+         return 0;
        }
-      f->flags |= SVM_FIFO_F_SIZE_UPDATE;
-      return;
     }
 
-  svm_fifo_size_update (f, c);
+  if (!rb_node_is_tnil (rt, cur))
+    return uword_to_pointer (cur->opaque, svm_fifo_chunk_t *);
+  return 0;
 }
 
 void
@@ -650,7 +741,7 @@ svm_fifo_enqueue_nowait (svm_fifo_t * f, u32 len, const u8 * src)
   len = clib_min (free_count, len);
 
   svm_fifo_copy_to_chunk (f, f->tail_chunk, tail % f->size, src, len,
-                         1 /* update tail */ );
+                         &f->tail_chunk);
   tail += len;
 
   svm_fifo_trace_add (f, head, n_total, 2);
@@ -675,7 +766,7 @@ svm_fifo_enqueue_nowait (svm_fifo_t * f, u32 len, const u8 * src)
 int
 svm_fifo_enqueue_with_offset (svm_fifo_t * f, u32 offset, u32 len, u8 * src)
 {
-  u32 tail, head, free_count;
+  u32 tail, head, free_count, tail_idx;
 
   f_load_head_tail_prod (f, &head, &tail);
 
@@ -692,8 +783,12 @@ svm_fifo_enqueue_with_offset (svm_fifo_t * f, u32 offset, u32 len, u8 * src)
 
   ooo_segment_add (f, offset, head, tail, len);
 
-  svm_fifo_copy_to_chunk (f, f->tail_chunk, (tail + offset) % f->size, src,
-                         len, 0 /* update tail */ );
+  tail_idx = (tail + offset) % f->size;
+
+  if (!svm_fifo_chunk_includes_pos (f->ooo_enq, tail_idx))
+    f->ooo_enq = svm_fifo_find_chunk (f, tail_idx);
+
+  svm_fifo_copy_to_chunk (f, f->ooo_enq, tail_idx, src, len, &f->ooo_enq);
 
   return 0;
 }
@@ -712,10 +807,9 @@ svm_fifo_dequeue_nowait (svm_fifo_t * f, u32 len, u8 * dst)
     return -2;                 /* nothing in the fifo */
 
   len = clib_min (cursize, len);
-  ASSERT (cursize >= len);
 
   svm_fifo_copy_from_chunk (f, f->head_chunk, head % f->size, dst, len,
-                           1 /* update head */ );
+                           &f->head_chunk);
   head += len;
 
   if (PREDICT_FALSE (f->flags & SVM_FIFO_F_SIZE_UPDATE))
@@ -728,23 +822,24 @@ svm_fifo_dequeue_nowait (svm_fifo_t * f, u32 len, u8 * dst)
 }
 
 int
-svm_fifo_peek (svm_fifo_t * f, u32 relative_offset, u32 len, u8 * dst)
+svm_fifo_peek (svm_fifo_t * f, u32 offset, u32 len, u8 * dst)
 {
-  u32 tail, head, cursize;
+  u32 tail, head, cursize, head_idx;
 
   f_load_head_tail_cons (f, &head, &tail);
 
   /* current size of fifo can only increase during peek: SPSC */
   cursize = f_cursize (f, head, tail);
 
-  if (PREDICT_FALSE (cursize < relative_offset))
+  if (PREDICT_FALSE (cursize < offset))
     return -2;                 /* nothing in the fifo */
 
-  len = clib_min (cursize - relative_offset, len);
+  len = clib_min (cursize - offset, len);
+  head_idx = (head + offset) % f->size;
+  if (!svm_fifo_chunk_includes_pos (f->ooo_deq, head_idx))
+    f->ooo_deq = svm_fifo_find_chunk (f, head_idx);
 
-  svm_fifo_copy_from_chunk (f, f->head_chunk,
-                           (head + relative_offset) % f->size, dst, len,
-                           0 /* update head */ );
+  svm_fifo_copy_from_chunk (f, f->ooo_deq, head_idx, dst, len, &f->ooo_deq);
   return len;
 }
 
@@ -861,10 +956,20 @@ svm_fifo_first_ooo_segment (svm_fifo_t * f)
  * Set fifo pointers to requested offset
  */
 void
-svm_fifo_init_pointers (svm_fifo_t * f, u32 pointer)
+svm_fifo_init_pointers (svm_fifo_t * f, u32 head, u32 tail)
 {
-  clib_atomic_store_rel_n (&f->head, pointer);
-  clib_atomic_store_rel_n (&f->tail, pointer);
+  clib_atomic_store_rel_n (&f->head, head);
+  clib_atomic_store_rel_n (&f->tail, tail);
+  if (f->flags & SVM_FIFO_F_MULTI_CHUNK)
+    {
+      svm_fifo_chunk_t *c;
+      c = svm_fifo_find_chunk (f, head % f->size);
+      ASSERT (c != 0);
+      f->head_chunk = f->ooo_deq = c;
+      c = svm_fifo_find_chunk (f, tail % f->size);
+      ASSERT (c != 0);
+      f->tail_chunk = f->ooo_enq = c;
+    }
 }
 
 void
index 82d2b39..619d609 100644 (file)
@@ -23,6 +23,7 @@
 #include <vppinfra/vec.h>
 #include <vppinfra/pool.h>
 #include <vppinfra/format.h>
+#include <vppinfra/rbtree.h>
 
 /** Out-of-order segment */
 typedef struct
@@ -38,7 +39,7 @@ typedef struct
 #define OOO_SEGMENT_INVALID_INDEX      ((u32)~0)
 #define SVM_FIFO_INVALID_SESSION_INDEX         ((u32)~0)
 #define SVM_FIFO_INVALID_INDEX         ((u32)~0)
-#define SVM_FIFO_MAX_EVT_SUBSCRIBERS   8
+#define SVM_FIFO_MAX_EVT_SUBSCRIBERS   7
 
 enum svm_fifo_tx_ntf_
 {
@@ -65,6 +66,7 @@ typedef struct svm_fifo_chunk_
 typedef enum svm_fifo_flag_
 {
   SVM_FIFO_F_SIZE_UPDATE = 1 << 0,
+  SVM_FIFO_F_MULTI_CHUNK = 1 << 1,
 } svm_fifo_flag_t;
 
 typedef struct _svm_fifo
@@ -76,6 +78,7 @@ typedef struct _svm_fifo
   svm_fifo_chunk_t *start_chunk;/**< first chunk in fifo chunk list */
   svm_fifo_chunk_t *end_chunk; /**< end chunk in fifo chunk list */
   svm_fifo_chunk_t *new_chunks;        /**< chunks yet to be added to list */
+  rb_tree_t chunk_lookup;
 
     CLIB_CACHE_LINE_ALIGN_MARK (shared_second);
   volatile u32 has_event;      /**< non-zero if deq event exists */
@@ -93,17 +96,18 @@ typedef struct _svm_fifo
   struct _svm_fifo *prev;      /**< prev in active chain */
 
     CLIB_CACHE_LINE_ALIGN_MARK (consumer);
-  u32 head;
+  u32 head;                    /**< fifo head position/byte */
   svm_fifo_chunk_t *head_chunk;        /**< tracks chunk where head lands */
+  svm_fifo_chunk_t *ooo_deq;   /**< last chunk used for ooo dequeue */
   volatile u32 want_tx_ntf;    /**< producer wants nudge */
   volatile u32 has_tx_ntf;
 
     CLIB_CACHE_LINE_ALIGN_MARK (producer);
-  u32 tail;
+  u32 tail;                    /**< fifo tail position/byte */
+  u32 ooos_list_head;          /**< Head of out-of-order linked-list */
   svm_fifo_chunk_t *tail_chunk;        /**< tracks chunk where tail lands */
-
+  svm_fifo_chunk_t *ooo_enq;   /**< last chunk used for ooo enqueue */
   ooo_segment_t *ooo_segments; /**< Pool of ooo segments */
-  u32 ooos_list_head;          /**< Head of out-of-order linked-list */
   u32 ooos_newest;             /**< Last segment to have been updated */
   volatile u8 n_subscribers;
   u8 subscribers[SVM_FIFO_MAX_EVT_SUBSCRIBERS];
@@ -327,6 +331,15 @@ svm_fifo_unset_event (svm_fifo_t * f)
 
 svm_fifo_t *svm_fifo_create (u32 data_size_in_bytes);
 void svm_fifo_init (svm_fifo_t * f, u32 size);
+/**
+ * Grow fifo size by adding chunk to chunk list
+ *
+ * If fifos are allocated on a segment, this should be called with
+ * the segment's heap pushed.
+ *
+ * @param f    fifo to be extended
+ * @param c    chunk or linked list of chunks to be added
+ */
 void svm_fifo_add_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c);
 void svm_fifo_free (svm_fifo_t * f);
 
@@ -341,7 +354,7 @@ int svm_fifo_dequeue_drop (svm_fifo_t * f, u32 max_bytes);
 void svm_fifo_dequeue_drop_all (svm_fifo_t * f);
 int svm_fifo_segments (svm_fifo_t * f, svm_fifo_segment_t * fs);
 void svm_fifo_segments_free (svm_fifo_t * f, svm_fifo_segment_t * fs);
-void svm_fifo_init_pointers (svm_fifo_t * f, u32 pointer);
+void svm_fifo_init_pointers (svm_fifo_t * f, u32 head, u32 tail);
 void svm_fifo_clone (svm_fifo_t * df, svm_fifo_t * sf);
 void svm_fifo_overwrite_head (svm_fifo_t * f, u8 * data, u32 len);
 void svm_fifo_add_subscriber (svm_fifo_t * f, u8 subscriber);
index f5fe60a..8a8bd4a 100644 (file)
@@ -25,8 +25,8 @@ fifo_init_for_segment (svm_fifo_segment_header_t * fsh, u8 * fifo_space,
   f->freelist_index = freelist_index;
   f->default_chunk.start_byte = 0;
   f->default_chunk.length = size;
-  f->default_chunk.next = f->start_chunk = &f->default_chunk;
-  f->end_chunk = f->head_chunk = f->tail_chunk = f->start_chunk;
+  f->default_chunk.next = f->start_chunk = f->end_chunk = &f->default_chunk;
+  f->head_chunk = f->tail_chunk = f->ooo_enq = f->ooo_deq = f->start_chunk;
   f->next = fsh->free_fifos[freelist_index];
   fsh->free_fifos[freelist_index] = f;
 }
index 3770c23..95e9d10 100644 (file)
 
 #include <vppinfra/rbtree.h>
 
-static inline rb_node_t *
-rb_node_right (rb_tree_t * rt, rb_node_t * n)
-{
-  return pool_elt_at_index (rt->nodes, n->right);
-}
-
-static inline rb_node_t *
-rb_node_left (rb_tree_t * rt, rb_node_t * n)
-{
-  return pool_elt_at_index (rt->nodes, n->left);
-}
-
-static inline rb_node_t *
-rb_node_parent (rb_tree_t * rt, rb_node_t * n)
-{
-  return pool_elt_at_index (rt->nodes, n->parent);
-}
-
 static inline void
 rb_tree_rotate_left (rb_tree_t * rt, rb_node_t * x)
 {
@@ -188,7 +170,7 @@ rb_tree_add (rb_tree_t * rt, u32 key)
 }
 
 rb_node_index_t
-rb_tree_add2 (rb_tree_t * rt, u32 key, u32 opaque)
+rb_tree_add2 (rb_tree_t * rt, u32 key, uword opaque)
 {
   rb_node_t *n;
 
index 73180d1..79437cd 100644 (file)
@@ -36,7 +36,7 @@ typedef struct rb_node_
   rb_node_index_t left;                /**< left child index */
   rb_node_index_t right;       /**< right child index */
   u32 key;                     /**< node key */
-  u32 opaque;                  /**< value stored by node */
+  uword opaque;                        /**< value stored by node */
 } rb_node_t;
 
 typedef struct rb_tree_
@@ -47,7 +47,7 @@ typedef struct rb_tree_
 
 void rb_tree_init (rb_tree_t * rt);
 rb_node_index_t rb_tree_add (rb_tree_t * rt, u32 key);
-rb_node_index_t rb_tree_add2 (rb_tree_t * rt, u32 key, u32 opaque);
+rb_node_index_t rb_tree_add2 (rb_tree_t * rt, u32 key, uword opaque);
 void rb_tree_del (rb_tree_t * rt, u32 key);
 void rb_tree_free_nodes (rb_tree_t * rt);
 u32 rb_tree_n_nodes (rb_tree_t * rt);
@@ -75,6 +75,24 @@ rb_node (rb_tree_t * rt, rb_node_index_t ri)
   return pool_elt_at_index (rt->nodes, ri);
 }
 
+static inline rb_node_t *
+rb_node_right (rb_tree_t * rt, rb_node_t * n)
+{
+  return pool_elt_at_index (rt->nodes, n->right);
+}
+
+static inline rb_node_t *
+rb_node_left (rb_tree_t * rt, rb_node_t * n)
+{
+  return pool_elt_at_index (rt->nodes, n->left);
+}
+
+static inline rb_node_t *
+rb_node_parent (rb_tree_t * rt, rb_node_t * n)
+{
+  return pool_elt_at_index (rt->nodes, n->parent);
+}
+
 #endif /* SRC_VPPINFRA_RBTREE_H_ */
 
 /*