svm: refactor fifo 86/24086/65
authorFlorin Coras <fcoras@cisco.com>
Fri, 20 Dec 2019 00:10:58 +0000 (16:10 -0800)
committerDave Barach <openvpp@barachs.net>
Tue, 25 Feb 2020 19:18:49 +0000 (19:18 +0000)
Type: refactor

Switch from a wrapped byte space to a "continuous" one wherein fifo
chunks are appended to the fifo as more data is enqueued and chunks are
removed as data is dequeued.

The fifo is still subject to a maximum size, i.e., maximum number of
bytes that can be enqueued, so the max number of chunks associated to
the fifo is also constrained.

When enqueueing data, which must fit within the available free space, if
not enough "supporting" chunk  memory is available, the fifo asks the
fifo segment for enough chunk memory to ensure that the write can
succeed. To avoid allocating large amounts of small chunks due to small
writes, if possible, the size of the chunks requested is lower capped by
min_alloc.

When dequeuing data, all the chunks that have been completely drained,
i.e., head moved beyond the chunks’ end bytes, are unlinked from the
fifo and returned to the fifo segment. The one exception to this is the
last chunk which is never unlinked.

Change-Id: I98c1dbd9135fb79650365c7e40c29238b96cd4ee
Signed-off-by: Florin Coras <fcoras@cisco.com>
17 files changed:
src/plugins/hs_apps/proxy.c
src/plugins/quic/quic.c
src/plugins/tlsopenssl/tls_openssl.c
src/plugins/unittest/svm_fifo_test.c
src/svm/CMakeLists.txt
src/svm/fifo_segment.c
src/svm/fifo_segment.h
src/svm/fifo_types.h [new file with mode: 0644]
src/svm/svm_fifo.c
src/svm/svm_fifo.h
src/vcl/vppcom.c
src/vnet/session/application_local.c
src/vnet/session/segment_manager.c
src/vnet/session/segment_manager.h
src/vnet/session/session.h
src/vppinfra/rbtree.c
src/vppinfra/rbtree.h

index a520110..d0e3bc4 100644 (file)
@@ -274,7 +274,7 @@ proxy_tx_callback (session_t * proxy_s)
   u32 min_free;
   uword *p;
 
-  min_free = clib_min (proxy_s->tx_fifo->nitems >> 3, 128 << 10);
+  min_free = clib_min (svm_fifo_size (proxy_s->tx_fifo) >> 3, 128 << 10);
   if (svm_fifo_max_enqueue (proxy_s->tx_fifo) < min_free)
     {
       svm_fifo_add_want_deq_ntf (proxy_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
@@ -356,6 +356,9 @@ active_open_connected_callback (u32 app_index, u32 opaque,
   s->tx_fifo->refcnt++;
   s->rx_fifo->refcnt++;
 
+  svm_fifo_init_ooo_lookup (s->tx_fifo, 1 /* deq ooo */ );
+  svm_fifo_init_ooo_lookup (s->rx_fifo, 0 /* enq ooo */ );
+
   hash_set (pm->proxy_session_by_active_open_handle,
            ps->vpp_active_open_handle, opaque);
 
@@ -425,7 +428,7 @@ active_open_tx_callback (session_t * ao_s)
   u32 min_free;
   uword *p;
 
-  min_free = clib_min (ao_s->tx_fifo->nitems >> 3, 128 << 10);
+  min_free = clib_min (svm_fifo_size (ao_s->tx_fifo) >> 3, 128 << 10);
   if (svm_fifo_max_enqueue (ao_s->tx_fifo) < min_free)
     {
       svm_fifo_add_want_deq_ntf (ao_s->tx_fifo, SVM_FIFO_WANT_DEQ_NOTIF);
index 5265e5b..31cfcce 100644 (file)
@@ -916,10 +916,10 @@ int
 quic_fifo_egress_emit (quicly_stream_t * stream, size_t off, void *dst,
                       size_t * len, int *wrote_all)
 {
-  u32 deq_max, first_deq, max_rd_chunk, rem_offset;
   quic_stream_data_t *stream_data;
   session_t *stream_session;
   svm_fifo_t *f;
+  u32 deq_max;
 
   stream_data = (quic_stream_data_t *) stream->data;
   stream_session = get_stream_session_from_stream (stream);
@@ -943,22 +943,7 @@ quic_fifo_egress_emit (quicly_stream_t * stream, size_t off, void *dst,
   if (off + *len > stream_data->app_tx_data_len)
     stream_data->app_tx_data_len = off + *len;
 
-  /* TODO, use something like : return svm_fifo_peek (f, off, *len, dst); */
-  max_rd_chunk = svm_fifo_max_read_chunk (f);
-
-  first_deq = 0;
-  if (off < max_rd_chunk)
-    {
-      first_deq = clib_min (*len, max_rd_chunk - off);
-      clib_memcpy_fast (dst, svm_fifo_head (f) + off, first_deq);
-    }
-
-  if (max_rd_chunk < off + *len)
-    {
-      rem_offset = max_rd_chunk < off ? off - max_rd_chunk : 0;
-      clib_memcpy_fast (dst + first_deq, f->head_chunk->data + rem_offset,
-                       *len - first_deq);
-    }
+  svm_fifo_peek (f, off, *len, dst);
 
   return 0;
 }
index 288f0e1..078dd2f 100644 (file)
@@ -110,70 +110,183 @@ openssl_lctx_get (u32 lctx_index)
 }
 
 static int
-openssl_try_handshake_read (openssl_ctx_t * oc, session_t * tls_session)
+openssl_read_from_bio_into_fifo (svm_fifo_t * f, BIO * bio)
 {
-  u32 deq_max, deq_now;
-  svm_fifo_t *f;
-  int wrote, rv;
+  u32 enq_now, enq_max;
+  svm_fifo_chunk_t *c;
+  int read, rv;
 
-  f = tls_session->rx_fifo;
-  deq_max = svm_fifo_max_dequeue_cons (f);
-  if (!deq_max)
+  enq_max = svm_fifo_max_enqueue_prod (f);
+  if (!enq_max)
     return 0;
 
-  deq_now = clib_min (svm_fifo_max_read_chunk (f), deq_max);
-  wrote = BIO_write (oc->wbio, svm_fifo_head (f), deq_now);
-  if (wrote <= 0)
+  svm_fifo_fill_chunk_list (f);
+
+  enq_now = clib_min (svm_fifo_max_write_chunk (f), enq_max);
+  if (!enq_now)
     return 0;
 
-  svm_fifo_dequeue_drop (f, wrote);
-  if (wrote < deq_max)
+  read = BIO_read (bio, svm_fifo_tail (f), enq_now);
+  if (read <= 0)
+    return 0;
+
+  if (read == enq_now)
     {
-      deq_now = clib_min (svm_fifo_max_read_chunk (f), deq_max - wrote);
-      rv = BIO_write (oc->wbio, svm_fifo_head (f), deq_now);
-      if (rv > 0)
+      c = svm_fifo_tail_chunk (f);
+      while (read < enq_max)
        {
-         svm_fifo_dequeue_drop (f, rv);
-         wrote += rv;
+         c = c->next;
+         enq_now = clib_min (c->length, enq_max - read);
+         rv = BIO_read (bio, c->data, enq_now);
+         read += rv > 0 ? rv : 0;
+
+         if (rv < enq_now)
+           break;
        }
     }
-  return wrote;
+
+  svm_fifo_enqueue_nocopy (f, read);
+
+  return read;
 }
 
 static int
-openssl_try_handshake_write (openssl_ctx_t * oc, session_t * tls_session)
+openssl_read_from_ssl_into_fifo (svm_fifo_t * f, SSL * ssl)
 {
-  u32 enq_max, deq_now;
-  svm_fifo_t *f;
+  u32 enq_now, enq_max;
+  svm_fifo_chunk_t *c;
   int read, rv;
 
-  if (BIO_ctrl_pending (oc->rbio) <= 0)
-    return 0;
-
-  f = tls_session->tx_fifo;
   enq_max = svm_fifo_max_enqueue_prod (f);
   if (!enq_max)
     return 0;
 
-  deq_now = clib_min (svm_fifo_max_write_chunk (f), enq_max);
-  read = BIO_read (oc->rbio, svm_fifo_tail (f), deq_now);
+  svm_fifo_fill_chunk_list (f);
+
+  enq_now = clib_min (svm_fifo_max_write_chunk (f), enq_max);
+  if (!enq_now)
+    return 0;
+
+  read = SSL_read (ssl, svm_fifo_tail (f), enq_now);
   if (read <= 0)
     return 0;
 
+  if (read == enq_now)
+    {
+      c = svm_fifo_tail_chunk (f);
+      while (read < enq_max)
+       {
+         c = c->next;
+         if (!c)
+           break;
+         enq_now = clib_min (c->length, enq_max - read);
+         rv = SSL_read (ssl, c->data, enq_now);
+         read += rv > 0 ? rv : 0;
+
+         if (rv < enq_now)
+           break;
+       }
+    }
+
   svm_fifo_enqueue_nocopy (f, read);
-  tls_add_vpp_q_tx_evt (tls_session);
 
-  if (read < enq_max)
+  return read;
+}
+
+static int
+openssl_write_from_fifo_into_bio (svm_fifo_t * f, BIO * bio, u32 len)
+{
+  svm_fifo_chunk_t *c;
+  int wrote, rv;
+  u32 deq_now;
+
+  svm_fifo_fill_chunk_list (f);
+
+  deq_now = clib_min (svm_fifo_max_read_chunk (f), len);
+  wrote = BIO_write (bio, svm_fifo_head (f), deq_now);
+  if (wrote <= 0)
+    return 0;
+
+  if (wrote == deq_now)
     {
-      deq_now = clib_min (svm_fifo_max_write_chunk (f), enq_max - read);
-      rv = BIO_read (oc->rbio, svm_fifo_tail (f), deq_now);
-      if (rv > 0)
+      c = svm_fifo_head_chunk (f);
+      while (wrote < len)
        {
-         svm_fifo_enqueue_nocopy (f, rv);
-         read += rv;
+         c = c->next;
+         deq_now = clib_min (c->length, len - wrote);
+         rv = BIO_write (bio, c->data, deq_now);
+         wrote += rv > 0 ? rv : 0;
+
+         if (rv < deq_now)
+           break;
        }
     }
 
+  svm_fifo_dequeue_drop (f, wrote);
+
+  return wrote;
+}
+
+static int
+openssl_write_from_fifo_into_ssl (svm_fifo_t * f, SSL * ssl, u32 len)
+{
+  svm_fifo_chunk_t *c;
+  int wrote = 0, rv;
+  u32 deq_now;
+
+  svm_fifo_fill_chunk_list (f);
+
+  deq_now = clib_min (svm_fifo_max_read_chunk (f), len);
+  wrote = SSL_write (ssl, svm_fifo_head (f), deq_now);
+  if (wrote <= 0)
+    return 0;
+
+  if (wrote == deq_now)
+    {
+      c = svm_fifo_head_chunk (f);
+      while (wrote < len)
+       {
+         c = c->next;
+         deq_now = clib_min (c->length, len - wrote);
+         rv = SSL_write (ssl, c->data, deq_now);
+         wrote += rv > 0 ? rv : 0;
+
+         if (rv < deq_now)
+           break;
+       }
+    }
+
+  svm_fifo_dequeue_drop (f, wrote);
+
+  return wrote;
+}
+
+static int
+openssl_try_handshake_read (openssl_ctx_t * oc, session_t * tls_session)
+{
+  svm_fifo_t *f;
+  u32 deq_max;
+
+  f = tls_session->rx_fifo;
+  deq_max = svm_fifo_max_dequeue_cons (f);
+  if (!deq_max)
+    return 0;
+
+  return openssl_write_from_fifo_into_bio (f, oc->wbio, deq_max);
+}
+
+static int
+openssl_try_handshake_write (openssl_ctx_t * oc, session_t * tls_session)
+{
+  u32 read;
+
+  if (BIO_ctrl_pending (oc->rbio) <= 0)
+    return 0;
+
+  read = openssl_read_from_bio_into_fifo (tls_session->tx_fifo, oc->rbio);
+  if (read)
+    tls_add_vpp_q_tx_evt (tls_session);
+
   return read;
 }
 
@@ -325,42 +438,33 @@ static inline int
 openssl_ctx_write (tls_ctx_t * ctx, session_t * app_session)
 {
   openssl_ctx_t *oc = (openssl_ctx_t *) ctx;
-  int wrote = 0, rv, read, max_buf = 100 * TLS_CHUNK_SIZE, max_space;
-  u32 enq_max, deq_max, deq_now, to_write;
+  int wrote = 0, read, max_buf = 100 * TLS_CHUNK_SIZE, max_space;
+  u32 deq_max, to_write;
   session_t *tls_session;
   svm_fifo_t *f;
 
   f = app_session->tx_fifo;
+
   deq_max = svm_fifo_max_dequeue_cons (f);
   if (!deq_max)
     goto check_tls_fifo;
 
+  /* Figure out how much data to write */
   max_space = max_buf - BIO_ctrl_pending (oc->rbio);
   max_space = (max_space < 0) ? 0 : max_space;
-  deq_now = clib_min (deq_max, (u32) max_space);
-  to_write = clib_min (svm_fifo_max_read_chunk (f), deq_now);
-  wrote = SSL_write (oc->ssl, svm_fifo_head (f), to_write);
-  if (wrote <= 0)
+  to_write = clib_min (deq_max, (u32) max_space);
+
+  wrote = openssl_write_from_fifo_into_ssl (f, oc->ssl, to_write);
+  if (!wrote)
     {
       tls_add_vpp_q_builtin_tx_evt (app_session);
       goto check_tls_fifo;
     }
-  svm_fifo_dequeue_drop (app_session->tx_fifo, wrote);
-  if (wrote < deq_now)
-    {
-      to_write = clib_min (svm_fifo_max_read_chunk (f), deq_now - wrote);
-      rv = SSL_write (oc->ssl, svm_fifo_head (f), to_write);
-      if (rv > 0)
-       {
-         svm_fifo_dequeue_drop (app_session->tx_fifo, rv);
-         wrote += rv;
-       }
-    }
 
-  if (svm_fifo_needs_deq_ntf (app_session->tx_fifo, wrote))
+  if (svm_fifo_needs_deq_ntf (f, wrote))
     session_dequeue_notify (app_session);
 
-  if (wrote < deq_max)
+  if (svm_fifo_max_dequeue_cons (f))
     tls_add_vpp_q_builtin_tx_evt (app_session);
 
 check_tls_fifo:
@@ -369,33 +473,16 @@ check_tls_fifo:
     return wrote;
 
   tls_session = session_get_from_handle (ctx->tls_session_handle);
-  f = tls_session->tx_fifo;
-  enq_max = svm_fifo_max_enqueue_prod (f);
-  if (!enq_max)
-    {
-      tls_add_vpp_q_builtin_tx_evt (app_session);
-      return wrote;
-    }
 
-  deq_now = clib_min (svm_fifo_max_write_chunk (f), enq_max);
-  read = BIO_read (oc->rbio, svm_fifo_tail (f), deq_now);
-  if (read <= 0)
+  read = openssl_read_from_bio_into_fifo (tls_session->tx_fifo, oc->rbio);
+  if (!read)
     {
       tls_add_vpp_q_builtin_tx_evt (app_session);
       return wrote;
     }
 
-  svm_fifo_enqueue_nocopy (f, read);
   tls_add_vpp_q_tx_evt (tls_session);
 
-  if (read < enq_max && BIO_ctrl_pending (oc->rbio) > 0)
-    {
-      deq_now = clib_min (svm_fifo_max_write_chunk (f), enq_max - read);
-      read = BIO_read (oc->rbio, svm_fifo_tail (f), deq_now);
-      if (read > 0)
-       svm_fifo_enqueue_nocopy (f, read);
-    }
-
   if (BIO_ctrl_pending (oc->rbio) > 0)
     tls_add_vpp_q_builtin_tx_evt (app_session);
   else if (ctx->app_closed)
@@ -407,9 +494,9 @@ check_tls_fifo:
 static inline int
 openssl_ctx_read (tls_ctx_t * ctx, session_t * tls_session)
 {
-  int read, wrote = 0, max_space, max_buf = 100 * TLS_CHUNK_SIZE, rv;
+  int read, wrote = 0, max_space, max_buf = 100 * TLS_CHUNK_SIZE;
   openssl_ctx_t *oc = (openssl_ctx_t *) ctx;
-  u32 deq_max, enq_max, deq_now, to_read;
+  u32 deq_max, to_write;
   session_t *app_session;
   svm_fifo_t *f;
 
@@ -422,31 +509,21 @@ openssl_ctx_read (tls_ctx_t * ctx, session_t * tls_session)
     }
 
   f = tls_session->rx_fifo;
+
   deq_max = svm_fifo_max_dequeue_cons (f);
   max_space = max_buf - BIO_ctrl_pending (oc->wbio);
   max_space = max_space < 0 ? 0 : max_space;
-  deq_now = clib_min (deq_max, max_space);
-  if (!deq_now)
+  to_write = clib_min (deq_max, max_space);
+  if (!to_write)
     goto check_app_fifo;
 
-  to_read = clib_min (svm_fifo_max_read_chunk (f), deq_now);
-  wrote = BIO_write (oc->wbio, svm_fifo_head (f), to_read);
-  if (wrote <= 0)
+  wrote = openssl_write_from_fifo_into_bio (f, oc->wbio, to_write);
+  if (!wrote)
     {
       tls_add_vpp_q_builtin_rx_evt (tls_session);
       goto check_app_fifo;
     }
-  svm_fifo_dequeue_drop (f, wrote);
-  if (wrote < deq_now)
-    {
-      to_read = clib_min (svm_fifo_max_read_chunk (f), deq_now - wrote);
-      rv = BIO_write (oc->wbio, svm_fifo_head (f), to_read);
-      if (rv > 0)
-       {
-         svm_fifo_dequeue_drop (f, rv);
-         wrote += rv;
-       }
-    }
+
   if (svm_fifo_max_dequeue_cons (f))
     tls_add_vpp_q_builtin_rx_evt (tls_session);
 
@@ -457,28 +534,13 @@ check_app_fifo:
 
   app_session = session_get_from_handle (ctx->app_session_handle);
   f = app_session->rx_fifo;
-  enq_max = svm_fifo_max_enqueue_prod (f);
-  if (!enq_max)
-    {
-      tls_add_vpp_q_builtin_rx_evt (tls_session);
-      return wrote;
-    }
 
-  deq_now = clib_min (svm_fifo_max_write_chunk (f), enq_max);
-  read = SSL_read (oc->ssl, svm_fifo_tail (f), deq_now);
-  if (read <= 0)
+  read = openssl_read_from_ssl_into_fifo (f, oc->ssl);
+  if (!read)
     {
       tls_add_vpp_q_builtin_rx_evt (tls_session);
       return wrote;
     }
-  svm_fifo_enqueue_nocopy (f, read);
-  if (read < enq_max && SSL_pending (oc->ssl) > 0)
-    {
-      deq_now = clib_min (svm_fifo_max_write_chunk (f), enq_max - read);
-      read = SSL_read (oc->ssl, svm_fifo_tail (f), deq_now);
-      if (read > 0)
-       svm_fifo_enqueue_nocopy (f, read);
-    }
 
   /* If handshake just completed, session may still be in accepting state */
   if (app_session->session_state >= SESSION_STATE_READY)
index 8b43ee3..8579cec 100644 (file)
@@ -158,18 +158,54 @@ fifo_get_validate_pattern (vlib_main_t * vm, test_pattern_t * test_data,
   return validate_pattern;
 }
 
+static fifo_segment_t *
+fifo_segment_prepare (fifo_segment_main_t * sm, char *seg_name, u32 seg_size)
+{
+  fifo_segment_create_args_t _a, *a = &_a;
+
+  clib_memset (a, 0, sizeof (*a));
+  a->segment_name = seg_name;
+  a->segment_size = seg_size ? seg_size : 32 << 20;
+
+  if (fifo_segment_create (sm, a))
+    return 0;
+
+  return fifo_segment_get_segment (sm, a->new_segment_indices[0]);
+}
+
+static void
+ft_fifo_segment_free (fifo_segment_main_t * sm, fifo_segment_t * fs)
+{
+  fifo_segment_delete (sm, fs);
+}
+
 static svm_fifo_t *
-fifo_prepare (u32 fifo_size)
+fifo_segment_alloc_fifo (fifo_segment_t * fs, u32 data_bytes,
+                        fifo_segment_ftype_t ftype)
+{
+  return fifo_segment_alloc_fifo_w_slice (fs, 0, data_bytes, ftype);
+}
+
+static svm_fifo_t *
+fifo_prepare (fifo_segment_t * fs, u32 fifo_size)
 {
   svm_fifo_t *f;
-  f = svm_fifo_create (fifo_size);
+
+  f = fifo_segment_alloc_fifo (fs, fifo_size, FIFO_SEGMENT_RX_FIFO);
 
   /* Paint fifo data vector with -1's */
-  clib_memset (f->head_chunk->data, 0xFF, fifo_size);
+  clib_memset (svm_fifo_head_chunk (f)->data, 0xFF, fifo_size);
 
+  svm_fifo_init_ooo_lookup (f, 1 /* deq ooo */ );
   return f;
 }
 
+static void
+ft_fifo_free (fifo_segment_t * fs, svm_fifo_t * f)
+{
+  fifo_segment_free_fifo (fs, f);
+}
+
 static int
 compare_data (u8 * data1, u8 * data2, u32 start, u32 len, u32 * index)
 {
@@ -189,14 +225,14 @@ compare_data (u8 * data1, u8 * data2, u32 start, u32 len, u32 * index)
 int
 sfifo_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
 {
-  svm_fifo_t *f;
-  u32 fifo_size = 1 << 20;
-  u32 *test_data = 0;
-  u32 offset;
+  u32 fifo_size = 1 << 20, *test_data = 0, offset, data_word, test_data_len;
+  fifo_segment_main_t _fsm = { 0 }, *fsm = &_fsm;
+  u8 *data, *s, *data_buf = 0;
   int i, rv, verbose = 0;
-  u32 data_word, test_data_len, j;
   ooo_segment_t *ooo_seg;
-  u8 *data, *s, *data_buf = 0;
+  fifo_segment_t *fs;
+  svm_fifo_t *f;
+  u32 j;
 
   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
     {
@@ -210,7 +246,8 @@ sfifo_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
   for (i = 0; i < vec_len (test_data); i++)
     test_data[i] = i;
 
-  f = fifo_prepare (fifo_size);
+  fs = fifo_segment_prepare (fsm, "fifo-test1", 0);
+  f = fifo_prepare (fs, fifo_size);
 
   /*
    * Enqueue an initial (un-dequeued) chunk
@@ -337,8 +374,8 @@ sfifo_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
   /*
    * Test segment overlaps: last ooo segment overlaps all
    */
-  svm_fifo_free (f);
-  f = fifo_prepare (fifo_size);
+  ft_fifo_free (fs, f);
+  f = fifo_prepare (fs, fifo_size);
 
   for (i = 0; i < 4; i++)
     {
@@ -378,8 +415,8 @@ sfifo_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
   /*
    * Test segment overlaps: enqueue and overlap ooo segments
    */
-  svm_fifo_free (f);
-  f = fifo_prepare (fifo_size);
+  ft_fifo_free (fs, f);
+  f = fifo_prepare (fs, fifo_size);
 
   for (i = 0; i < 4; i++)
     {
@@ -419,26 +456,31 @@ sfifo_test_fifo1 (vlib_main_t * vm, unformat_input_t * input)
   SFIFO_TEST ((rv == 0), "peeked %u expected 0", rv);
 
   vec_free (data_buf);
-  svm_fifo_free (f);
+  ft_fifo_free (fs, f);
+  ft_fifo_segment_free (fsm, fs);
   vec_free (test_data);
 
   return 0;
 
 err:
-  svm_fifo_free (f);
+  ft_fifo_free (fs, f);
+  ft_fifo_segment_free (fsm, fs);
   vec_free (test_data);
+
   return -1;
 }
 
 static int
 sfifo_test_fifo2 (vlib_main_t * vm)
 {
-  svm_fifo_t *f;
+  fifo_segment_main_t _fsm = { 0 }, *fsm = &_fsm;
+  test_pattern_t *tp, *vp, *test_data;
   u32 fifo_size = (1 << 20) + 1;
   int i, rv, test_data_len;
-  u64 data64;
-  test_pattern_t *tp, *vp, *test_data;
   ooo_segment_t *ooo_seg;
+  fifo_segment_t *fs;
+  svm_fifo_t *f;
+  u64 data64;
 
   test_data = test_pattern;
   test_data_len = ARRAY_LEN (test_pattern);
@@ -446,7 +488,8 @@ sfifo_test_fifo2 (vlib_main_t * vm)
   vp = fifo_get_validate_pattern (vm, test_data, test_data_len);
 
   /* Create a fifo */
-  f = fifo_prepare (fifo_size);
+  fs = fifo_segment_prepare (fsm, "fifo-test2", 0);
+  f = fifo_prepare (fs, fifo_size);
 
   /*
    * Try with sorted data
@@ -472,14 +515,14 @@ sfifo_test_fifo2 (vlib_main_t * vm)
   rv = svm_fifo_enqueue (f, sizeof (u32), (u8 *) & data64);
   SFIFO_TEST ((rv == 3000), "bytes to be enqueued %u", rv);
 
-  svm_fifo_free (f);
+  ft_fifo_free (fs, f);
   vec_free (vp);
 
   /*
    * Now try it again w/ unsorted data...
    */
 
-  f = fifo_prepare (fifo_size);
+  f = fifo_prepare (fs, fifo_size);
 
   for (i = 0; i < test_data_len; i++)
     {
@@ -507,7 +550,8 @@ sfifo_test_fifo2 (vlib_main_t * vm)
 
   SFIFO_TEST ((rv == 3000), "bytes to be enqueued %u", rv);
 
-  svm_fifo_free (f);
+  ft_fifo_free (fs, f);
+  ft_fifo_segment_free (fsm, fs);
 
   return 0;
 }
@@ -515,16 +559,15 @@ sfifo_test_fifo2 (vlib_main_t * vm)
 static int
 sfifo_test_fifo3 (vlib_main_t * vm, unformat_input_t * input)
 {
-  svm_fifo_t *f;
-  u32 fifo_size = (4 << 10) + 1;
-  u32 fifo_initial_offset = 0;
-  u32 total_size = 2 << 10;
+  u32 nsegs = 2, seg_size, length_so_far, current_offset, offset_increment;
   int overlap = 0, verbose = 0, randomize = 1, drop = 0, in_seq_all = 0;
+  u32 len_this_chunk, seed = 0xdeaddabe, j, total_size = 2 << 10;
+  u32 fifo_size = (4 << 10) + 1, fifo_initial_offset = 0;
+  fifo_segment_main_t _fsm = { 0 }, *fsm = &_fsm;
   u8 *data_pattern = 0, *data_buf = 0;
   test_pattern_t *tp, *generate = 0;
-  u32 nsegs = 2, seg_size, length_so_far;
-  u32 current_offset, offset_increment, len_this_chunk;
-  u32 seed = 0xdeaddabe, j;
+  fifo_segment_t *fs;
+  svm_fifo_t *f;
   int i, rv;
 
   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
@@ -649,7 +692,8 @@ sfifo_test_fifo3 (vlib_main_t * vm, unformat_input_t * input)
   /*
    * Create a fifo and add segments
    */
-  f = fifo_prepare (fifo_size);
+  fs = fifo_segment_prepare (fsm, "fifo-test3", 0);
+  f = fifo_prepare (fs, fifo_size);
 
   /* manually set head and tail pointers to validate modular arithmetic */
   fifo_initial_offset = fifo_initial_offset % fifo_size;
@@ -730,7 +774,8 @@ sfifo_test_fifo3 (vlib_main_t * vm, unformat_input_t * input)
   SFIFO_TEST ((svm_fifo_max_dequeue (f) == 0), "fifo has %d bytes",
              svm_fifo_max_dequeue (f));
 
-  svm_fifo_free (f);
+  ft_fifo_free (fs, f);
+  ft_fifo_segment_free (fsm, fs);
   vec_free (data_pattern);
   vec_free (data_buf);
 
@@ -740,12 +785,12 @@ sfifo_test_fifo3 (vlib_main_t * vm, unformat_input_t * input)
 static int
 sfifo_test_fifo4 (vlib_main_t * vm, unformat_input_t * input)
 {
-  svm_fifo_t *f;
-  u32 fifo_size = 6 << 10;
-  u32 fifo_initial_offset = 1000000000;
-  u32 test_n_bytes = 5000, j;
+  u32 fifo_size = 6 << 10, fifo_initial_offset = 1e9, test_n_bytes = 5000, j;
+  fifo_segment_main_t _fsm = { 0 }, *fsm = &_fsm;
   u8 *test_data = 0, *data_buf = 0;
   int i, rv, verbose = 0;
+  fifo_segment_t *fs;
+  svm_fifo_t *f;
 
   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
     {
@@ -763,7 +808,8 @@ sfifo_test_fifo4 (vlib_main_t * vm, unformat_input_t * input)
   /*
    * Create a fifo and add segments
    */
-  f = fifo_prepare (fifo_size);
+  fs = fifo_segment_prepare (fsm, "fifo-test4", 0);
+  f = fifo_prepare (fs, fifo_size);
 
   /* Set head and tail pointers */
   fifo_initial_offset = fifo_initial_offset % fifo_size;
@@ -780,12 +826,7 @@ sfifo_test_fifo4 (vlib_main_t * vm, unformat_input_t * input)
       if (verbose)
        vlib_cli_output (vm, "add [%d] [%d, %d]", i, i, i + sizeof (u8));
       if (rv)
-       {
-         clib_warning ("enqueue returned %d", rv);
-         svm_fifo_free (f);
-         vec_free (test_data);
-         return -1;
-       }
+       SFIFO_TEST (0, "enqueue returned %d", rv);
     }
 
   svm_fifo_enqueue (f, sizeof (u8), &test_data[0]);
@@ -799,7 +840,8 @@ sfifo_test_fifo4 (vlib_main_t * vm, unformat_input_t * input)
                     test_data[j]);
   SFIFO_TEST ((rv == 0), "dequeued compared to original returned %d", rv);
 
-  svm_fifo_free (f);
+  ft_fifo_free (fs, f);
+  ft_fifo_segment_free (fsm, fs);
   vec_free (test_data);
   return 0;
 }
@@ -807,7 +849,7 @@ sfifo_test_fifo4 (vlib_main_t * vm, unformat_input_t * input)
 static u32
 fifo_pos (svm_fifo_t * f, u32 pos)
 {
-  return pos % f->size;
+  return pos;
 }
 
 /* Avoids exposing svm_fifo.c internal function */
@@ -822,11 +864,13 @@ ooo_seg_next (svm_fifo_t * f, ooo_segment_t * s)
 static int
 sfifo_test_fifo5 (vlib_main_t * vm, unformat_input_t * input)
 {
-  svm_fifo_t *f;
+  fifo_segment_main_t _fsm = { 0 }, *fsm = &_fsm;
   u32 fifo_size = 401, j = 0, offset = 200;
-  int i, rv, verbose = 0;
   u8 *test_data = 0, *data_buf = 0;
+  int i, rv, verbose = 0;
   ooo_segment_t *ooo_seg;
+  fifo_segment_t *fs;
+  svm_fifo_t *f;
 
   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
     {
@@ -841,7 +885,8 @@ sfifo_test_fifo5 (vlib_main_t * vm, unformat_input_t * input)
        }
     }
 
-  f = fifo_prepare (fifo_size);
+  fs = fifo_segment_prepare (fsm, "fifo-test5", 0);
+  f = fifo_prepare (fs, fifo_size);
   svm_fifo_init_pointers (f, offset, offset);
 
   vec_validate (test_data, 399);
@@ -930,8 +975,8 @@ sfifo_test_fifo5 (vlib_main_t * vm, unformat_input_t * input)
   /*
    * Add [100 200] and overlap it with [50 250]
    */
-  svm_fifo_free (f);
-  f = fifo_prepare (fifo_size);
+  ft_fifo_free (fs, f);
+  f = fifo_prepare (fs, fifo_size);
 
   svm_fifo_enqueue_with_offset (f, 100, 100, &test_data[100]);
   svm_fifo_enqueue_with_offset (f, 50, 200, &test_data[50]);
@@ -943,7 +988,8 @@ sfifo_test_fifo5 (vlib_main_t * vm, unformat_input_t * input)
   SFIFO_TEST ((ooo_seg->length == 200), "first seg length %u expected %u",
              ooo_seg->length, 200);
 
-  svm_fifo_free (f);
+  ft_fifo_free (fs, f);
+  ft_fifo_segment_free (fsm, fs);
   vec_free (test_data);
   return 0;
 }
@@ -954,10 +1000,12 @@ sfifo_test_fifo5 (vlib_main_t * vm, unformat_input_t * input)
 static int
 sfifo_test_fifo6 (vlib_main_t * vm, unformat_input_t * input)
 {
+  fifo_segment_main_t _fsm = { 0 }, *fsm = &_fsm;
   u32 fifo_size = 101, n_test_bytes = 100;
   int i, j, rv, __clib_unused verbose = 0;
   u8 *test_data = 0, *data_buf = 0;
   ooo_segment_t *ooo_seg;
+  fifo_segment_t *fs;
   svm_fifo_t *f;
 
   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
@@ -972,40 +1020,18 @@ sfifo_test_fifo6 (vlib_main_t * vm, unformat_input_t * input)
        }
     }
 
-  f = fifo_prepare (fifo_size);
+  fs = fifo_segment_prepare (fsm, "fifo-test6", 0);
+  f = fifo_prepare (fs, fifo_size);
+
   vec_validate (test_data, n_test_bytes - 1);
   vec_validate (data_buf, n_test_bytes - 1);
   for (i = 0; i < vec_len (test_data); i++)
     test_data[i] = i % 0xff;
 
-  /*
-   * Test ooo segment distance to/from tail with u32 wrap
-   */
-
-  /*
-   * |0|---[start]--(len5)-->|0|--(len6)-->[end]---|0|
-   */
-  rv = f_distance_from (f, ~0 - 5, 5);
-  SFIFO_TEST (rv == 11, "distance to tail should be %u is %u", 11, rv);
-
-  rv = f_distance_to (f, ~0 - 5, 5);
-  SFIFO_TEST (rv == f->size - 11, "distance from tail should be %u is %u",
-             f->size - 11, rv);
-
-  /*
-   * |0|---[end]--(len5)-->|0|--(len6)-->[start]---|0|
-   */
-  rv = f_distance_to (f, 5, ~0 - 5);
-  SFIFO_TEST (rv == 11, "distance from tail should be %u is %u", 11, rv);
-
-  rv = f_distance_from (f, 5, ~0 - 5);
-  SFIFO_TEST (rv == f->size - 11, "distance to tail should be %u is %u",
-             f->size - 11, rv);
-
   /*
    * Add ooo with tail and ooo segment start u32 wrap
    */
-  svm_fifo_init_pointers (f, ~0, ~0);
+  svm_fifo_init_pointers (f, ~0 % fifo_size, ~0 % fifo_size);
   svm_fifo_enqueue_with_offset (f, 10, 10, &test_data[10]);
   SFIFO_TEST ((svm_fifo_n_ooo_segments (f) == 1),
              "number of ooo segments %u", svm_fifo_n_ooo_segments (f));
@@ -1032,7 +1058,7 @@ sfifo_test_fifo6 (vlib_main_t * vm, unformat_input_t * input)
   /*
    * Force collect with tail u32 wrap and without ooo segment start u32 wrap
    */
-  svm_fifo_init_pointers (f, ~0 - 10, ~0 - 10);
+  svm_fifo_init_pointers (f, (~0 - 10) % fifo_size, (~0 - 10) % fifo_size);
   svm_fifo_enqueue_with_offset (f, 5, 15, &test_data[5]);
   svm_fifo_enqueue (f, 12, test_data);
 
@@ -1051,7 +1077,8 @@ sfifo_test_fifo6 (vlib_main_t * vm, unformat_input_t * input)
    */
   vec_free (test_data);
   vec_free (data_buf);
-  svm_fifo_free (f);
+  ft_fifo_free (fs, f);
+  ft_fifo_segment_free (fsm, fs);
   return 0;
 }
 
@@ -1061,10 +1088,12 @@ sfifo_test_fifo6 (vlib_main_t * vm, unformat_input_t * input)
 static int
 sfifo_test_fifo7 (vlib_main_t * vm, unformat_input_t * input)
 {
+  fifo_segment_main_t _fsm = { 0 }, *fsm = &_fsm;
   u32 fifo_size = 101, n_iterations = 100;
   int i, j, rv, __clib_unused verbose = 0;
   u8 *test_data = 0, *data_buf = 0;
   u64 n_test_bytes = 100;
+  fifo_segment_t *fs;
   svm_fifo_t *f;
 
   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
@@ -1082,8 +1111,9 @@ sfifo_test_fifo7 (vlib_main_t * vm, unformat_input_t * input)
   /*
    * Prepare data structures
    */
-  f = fifo_prepare (fifo_size);
-  svm_fifo_init_pointers (f, ~0, ~0);
+  fs = fifo_segment_prepare (fsm, "fifo-test7", 0);
+  f = fifo_prepare (fs, fifo_size);
+  svm_fifo_init_pointers (f, ~0 % fifo_size, ~0 % fifo_size);
 
   vec_validate (test_data, n_test_bytes - 1);
   vec_validate (data_buf, n_test_bytes - 1);
@@ -1118,7 +1148,7 @@ sfifo_test_fifo7 (vlib_main_t * vm, unformat_input_t * input)
       if (compare_data (data_buf, test_data, 0, n_test_bytes, (u32 *) & j))
        SFIFO_TEST (0, "[%d] dequeued %u expected %u", j, data_buf[j],
                    test_data[j]);
-      svm_fifo_init_pointers (f, ~0 - i, ~0 - i);
+      svm_fifo_init_pointers (f, (~0 - i) % f->size, (~0 - i) % f->size);
     }
   SFIFO_TEST (1, "passed multiple ooo enqueue/dequeue");
 
@@ -1127,7 +1157,8 @@ sfifo_test_fifo7 (vlib_main_t * vm, unformat_input_t * input)
    */
   vec_free (test_data);
   vec_free (data_buf);
-  svm_fifo_free (f);
+  ft_fifo_free (fs, f);
+  ft_fifo_segment_free (fsm, fs);
   return 0;
 }
 
@@ -1137,10 +1168,12 @@ sfifo_test_fifo7 (vlib_main_t * vm, unformat_input_t * input)
 static int
 sfifo_test_fifo_large (vlib_main_t * vm, unformat_input_t * input)
 {
-  u32 n_iterations = 100, n_bytes_per_iter, half;
+  u32 n_iterations = 100, n_bytes_per_iter, half, fifo_size;
+  fifo_segment_main_t _fsm = { 0 }, *fsm = &_fsm;
   int i, j, rv, __clib_unused verbose = 0;
   u8 *test_data = 0, *data_buf = 0;
   u64 n_test_bytes = 100;
+  fifo_segment_t *fs;
   svm_fifo_t *f;
 
   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
@@ -1155,13 +1188,14 @@ sfifo_test_fifo_large (vlib_main_t * vm, unformat_input_t * input)
        }
     }
 
-
   n_test_bytes = 5ULL << 30;
   n_iterations = 1 << 10;
   n_bytes_per_iter = n_test_bytes / n_iterations;
+  fifo_size = n_bytes_per_iter + 1;
 
-  f = fifo_prepare (n_bytes_per_iter + 1);
-  svm_fifo_init_pointers (f, ~0, ~0);
+  fs = fifo_segment_prepare (fsm, "fifo-large", 0);
+  f = fifo_prepare (fs, fifo_size);
+  svm_fifo_init_pointers (f, ~0 % fifo_size, ~0 % fifo_size);
 
   vec_validate (test_data, n_bytes_per_iter - 1);
   vec_validate (data_buf, n_bytes_per_iter - 1);
@@ -1184,17 +1218,162 @@ sfifo_test_fifo_large (vlib_main_t * vm, unformat_input_t * input)
     }
   SFIFO_TEST (1, "passed large transfer");
 
+  ft_fifo_free (fs, f);
+  ft_fifo_segment_free (fsm, fs);
+
+  return 0;
+}
+
+static void
+validate_test_and_buf_vecs (u8 ** test_data, u8 ** data_buf, u32 len)
+{
+  int i, cur_len;
+
+  cur_len = vec_len (*test_data);
+  vec_validate (*test_data, len - 1);
+  vec_validate (*data_buf, len - 1);
+
+  for (i = cur_len; i < vec_len (*test_data); i++)
+    (*test_data)[i] = i;
+}
+
+static int
+enqueue_ooo (svm_fifo_t * f, u8 * test_data, u32 len, u32 iterations)
+{
+  u32 offset, enq_now, ooo_chunk;
+  int i, rv;
+
+  ooo_chunk = len / iterations;
+  for (i = iterations; i > 0; i--)
+    {
+      offset = i * ooo_chunk;
+      enq_now = clib_min (ooo_chunk, len - offset);
+      if (!enq_now)
+       continue;
+      rv = svm_fifo_enqueue_with_offset (f, offset, enq_now,
+                                        test_data + offset);
+      if (rv)
+       return rv;
+    }
+
+  return 0;
+}
+
+static int
+enqueue_ooo_packets (svm_fifo_t * f, u32 len, u32 enq_chunk, u8 * test_data)
+{
+  u32 offset, enq_now;
+  int i, rv;
+
+  for (i = 1; i <= len / enq_chunk; i++)
+    {
+      offset = i * enq_chunk;
+      enq_now = clib_min (enq_chunk, len - offset);
+      if (!enq_now)
+       continue;
+      rv = svm_fifo_enqueue_with_offset (f, offset, enq_now,
+                                        test_data + offset);
+      if (rv)
+       return rv;
+
+      if (svm_fifo_size (f) < len - 4096)
+       svm_fifo_set_size (f, svm_fifo_size (f) + enq_now);
+      else
+       svm_fifo_set_size (f, len);
+    }
+
+  return 0;
+}
+
+static int
+enqueue_packets_inc (svm_fifo_t * f, u32 len, u32 enq_chunk, u8 * test_data)
+{
+  u32 enq_now, offset;
+  int i, rv;
+
+  for (i = 0; i <= len / enq_chunk; i++)
+    {
+      offset = i * enq_chunk;
+      enq_now = clib_min (enq_chunk, len - offset);
+      rv = svm_fifo_enqueue (f, enq_now, test_data + offset);
+      if (rv != enq_now)
+       return -1;
+      if (svm_fifo_size (f) < len - 4096)
+       svm_fifo_set_size (f, svm_fifo_size (f) + enq_now);
+      else
+       svm_fifo_set_size (f, len);
+    }
+  return 0;
+}
+
+static int
+dequeue_ooo (svm_fifo_t * f, u8 * data_buf, u32 len, u32 iterations)
+{
+  u32 offset, ooo_chunk, deq_now;
+  int i, rv;
+
+  ooo_chunk = len / iterations;
+  for (i = iterations; i >= 0; i--)
+    {
+      offset = i * ooo_chunk;
+      deq_now = clib_min (ooo_chunk, len - offset);
+      if (deq_now == 0)
+       continue;
+      rv = svm_fifo_peek (f, offset, deq_now, data_buf + offset);
+      if (rv != deq_now)
+       return rv;
+    }
+  return 0;
+}
+
+static int
+dequeue_ooo_inc (svm_fifo_t * f, u8 * data_buf, u32 len, u32 iterations)
+{
+  u32 offset, ooo_chunk, deq_now;
+  int i, rv;
+
+  ooo_chunk = len / iterations;
+  for (i = 0; i <= iterations; i++)
+    {
+      offset = i * ooo_chunk;
+      deq_now = clib_min (ooo_chunk, len - offset);
+      if (deq_now == 0)
+       continue;
+      rv = svm_fifo_peek (f, offset, deq_now, data_buf + offset);
+      if (rv != deq_now)
+       return rv;
+    }
+  return 0;
+}
+
+static int
+dequeue_packets (svm_fifo_t * f, u32 len, u32 deq_chunk, u8 * data_buf)
+{
+  u32 offset, deq_now;
+  int i, rv;
+
+  for (i = 0; i <= len / deq_chunk; i++)
+    {
+      offset = i * deq_chunk;
+      deq_now = clib_min (deq_chunk, len - offset);
+      if (deq_now == 0)
+       continue;
+      rv = svm_fifo_dequeue (f, deq_now, data_buf + offset);
+      if (rv != deq_now)
+       return rv;
+    }
   return 0;
 }
 
 static int
 sfifo_test_fifo_grow (vlib_main_t * vm, unformat_input_t * input)
 {
-  int verbose = 0, fifo_size = 201, start_offset = 100, i, j, rv;
-  int test_n_bytes, deq_bytes, enq_bytes, n_deqs, n_enqs;
-  svm_fifo_chunk_t *c, *next, *prev;
+  int __clib_unused verbose = 0, fifo_size = 4096, fifo_inc = 4096, rv, i;
+  u32 enq_chunk, offset, deq_now, last_start_byte;
+  fifo_segment_main_t _fsm = { 0 }, *fsm = &_fsm;
   u8 *test_data = 0, *data_buf = 0;
-  u32 old_tail, offset;
+  svm_fifo_chunk_t *c;
+  fifo_segment_t *fs;
   svm_fifo_t *f;
 
   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
@@ -1209,365 +1388,364 @@ sfifo_test_fifo_grow (vlib_main_t * vm, unformat_input_t * input)
        }
     }
 
-  f = fifo_prepare (fifo_size);
-  svm_fifo_init_pointers (f, start_offset, start_offset);
+  fs = fifo_segment_prepare (fsm, "fifo-grow", 0);
+  f = fifo_prepare (fs, fifo_size);
 
   /*
-   * Add with fifo not wrapped
+   * Grow size and alloc chunks by enqueueing in order
    */
-  c = clib_mem_alloc (sizeof (svm_fifo_chunk_t) + 100);
-  c->length = 100;
-  c->start_byte = ~0;
-  c->next = 0;
+  fifo_size += fifo_inc;
+  svm_fifo_set_size (f, fifo_size);
+  last_start_byte = 4096;
+  validate_test_and_buf_vecs (&test_data, &data_buf, fifo_size);
 
-  svm_fifo_add_chunk (f, c);
+  rv = svm_fifo_enqueue (f, fifo_size, test_data);
 
-  SFIFO_TEST (f->size == fifo_size + 100, "size expected %u is %u",
-             fifo_size + 100, f->size);
-  SFIFO_TEST (c->start_byte == fifo_size, "start byte expected %u is %u",
-             fifo_size, c->start_byte);
+  SFIFO_TEST (rv == fifo_size, "enqueue should work");
+  SFIFO_TEST (svm_fifo_size (f) == fifo_size, "size expected %u is %u",
+             fifo_size, svm_fifo_size (f));
+  SFIFO_TEST (svm_fifo_max_dequeue (f) == fifo_size, "max deq should be %u",
+             fifo_size);
+  rv = svm_fifo_n_chunks (f);
+  SFIFO_TEST (rv == 2, "should have 2 chunks has %u", rv);
   SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
 
+  c = f->head_chunk;
+  SFIFO_TEST (c->start_byte == 0, "head start byte should be %u", 0);
+  SFIFO_TEST (c->length == 4096, "head chunk length should be %u", 4096);
+  SFIFO_TEST (f->tail_chunk == 0, "no tail chunk");
+  SFIFO_TEST (f->ooo_enq == 0, "should have no ooo enq chunk");
+  SFIFO_TEST (f->ooo_deq == 0, "should have no ooo deq chunk");
+  c = f->end_chunk;
+  SFIFO_TEST (c->start_byte == last_start_byte, "end chunk start byte should"
+             " be %u", last_start_byte);
+  SFIFO_TEST (c->length == 4096, "end chunk length should be %u", 4096);
+
   /*
-   *  Add with fifo wrapped
+   * Dequeue and validate data
    */
 
-  svm_fifo_init_pointers (f, f->nitems - 100, f->nitems + 100);
-  c = clib_mem_alloc (sizeof (svm_fifo_chunk_t) + 100);
-  c->length = 100;
-  c->start_byte = ~0;
-  c->next = 0;
+  rv = svm_fifo_dequeue (f, fifo_size, data_buf);
+  SFIFO_TEST (rv == fifo_size, "should dequeue all data");
+  last_start_byte += 4096;     /* size of last segment */
 
-  svm_fifo_add_chunk (f, c);
+  rv = compare_data (data_buf, test_data, 0, vec_len (test_data),
+                    (u32 *) & i);
 
-  SFIFO_TEST (f->end_chunk != c, "tail chunk should not be updated");
-  SFIFO_TEST (f->size == fifo_size + 100, "size expected %u is %u",
-             fifo_size + 100, f->size);
-  SFIFO_TEST (c->start_byte == fifo_size + 100, "start byte expected %u is "
-             " %u", fifo_size + 100, c->start_byte);
+  if (rv)
+    vlib_cli_output (vm, "[%d] dequeued %u expected %u", i, data_buf[i],
+                    test_data[i]);
+  SFIFO_TEST ((rv == 0), "dequeued compared to original returned %d", rv);
+  SFIFO_TEST (f->head_chunk == 0, "head chunk should be 0");
+  SFIFO_TEST (f->tail_chunk == 0, "tail chunk should be 0");
+  SFIFO_TEST (f->ooo_deq == 0, "should have no ooo deq chunk");
   SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
 
   /*
-   * Unwrap fifo
+   * Allocate one new chunk by enqueueing out of order all but first chunk
+   *
    */
-  vec_validate (data_buf, 200);
-  svm_fifo_dequeue (f, 201, data_buf);
 
-  SFIFO_TEST (f->end_chunk == c, "tail chunk should be updated");
-  SFIFO_TEST (f->size == fifo_size + 200, "size expected %u is %u",
-             fifo_size + 200, f->size);
-  SFIFO_TEST (c->start_byte == fifo_size + 100, "start byte expected %u is "
-             "%u", fifo_size + 100, c->start_byte);
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  enq_chunk = vec_len (test_data) / 10;
+  rv = enqueue_ooo (f, test_data, vec_len (test_data), 10);
+  SFIFO_TEST (!rv, "enqueue ooo should work");
 
-  /*
-   * Add N chunks
-   */
-  svm_fifo_init_pointers (f, f->nitems - 100, f->nitems + 100);
-
-  prev = 0;
-  for (i = 0; i < 5; i++)
-    {
-      c = clib_mem_alloc (sizeof (svm_fifo_chunk_t) + 100);
-      c->length = 100;
-      c->start_byte = ~0;
-      c->next = prev;
-      prev = c;
-    }
-
-  svm_fifo_add_chunk (f, c);
-  SFIFO_TEST (f->size == fifo_size + 200, "size expected %u is %u",
-             fifo_size + 200, f->size);
+  SFIFO_TEST (svm_fifo_size (f) == fifo_size, "size expected %u is %u",
+             fifo_size, svm_fifo_size (f));
+  SFIFO_TEST (svm_fifo_max_dequeue (f) == 0, "max deq should be %u", 0);
+  /* Fifo has 2 chunks because the we didn't allow the first chunk to be
+   * freed when all the data was dequeued. Could be optimized in the future */
+  rv = svm_fifo_n_chunks (f);
+  SFIFO_TEST (rv == 2, "should have %u chunks has %u", 2, rv);
   SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
 
-  prev = 0;
-  for (i = 0; i < 5; i++)
-    {
-      c = clib_mem_alloc (sizeof (svm_fifo_chunk_t) + 100);
-      c->length = 100;
-      c->start_byte = ~0;
-      c->next = prev;
-      prev = c;
-    }
-
-  svm_fifo_add_chunk (f, c);
-  SFIFO_TEST (f->size == fifo_size + 200, "size expected %u is %u",
-             fifo_size + 200, f->size);
-
-  old_tail = f->tail;
-  svm_fifo_dequeue (f, 101, data_buf);
+  SFIFO_TEST (f->head_chunk == 0, "should have no head chunk");
+  /* When new fifo chunks are allocated, tail is initialized */
+  SFIFO_TEST (f->tail_chunk != 0, "should have no tail chunk");
+  SFIFO_TEST (f->ooo_enq != 0, "should have an ooo enq chunk");
 
-  SFIFO_TEST (f->size == fifo_size + 200 + 10 * 100, "size expected %u is %u",
-             fifo_size + 200 + 10 * 100, f->size);
-  SFIFO_TEST (f->tail == old_tail, "new tail expected %u is %u", old_tail,
-             f->tail);
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  c = f->end_chunk;
+  SFIFO_TEST (c->start_byte == last_start_byte,
+             "end chunk should start at %u", last_start_byte);
+  SFIFO_TEST (c->length == 8192, "end chunk length should be %u", 8192);
+  SFIFO_TEST (f->ooo_enq == c, "ooo enq chunk should be end chunk");
 
   /*
-   * Enqueue/dequeue tests
+   * Enqueue the first chunk
    */
+  rv = svm_fifo_enqueue (f, enq_chunk, test_data);
+  SFIFO_TEST (rv == fifo_size, "enq should succeed %u", rv);
+  rv = svm_fifo_max_dequeue (f);
+  SFIFO_TEST (rv == fifo_size, "max deq should be %u is %u", fifo_size, rv);
+  rv = svm_fifo_n_chunks (f);
+  SFIFO_TEST (rv == 2, "should have %u chunks has %u", 2, rv);
+  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
 
-  test_n_bytes = f->nitems;
-  vec_validate (test_data, test_n_bytes - 1);
-  vec_validate (data_buf, vec_len (test_data));
-  n_deqs = n_enqs = 6;
-  deq_bytes = enq_bytes = vec_len (test_data) / n_deqs;
-
-  for (i = 0; i < vec_len (test_data); i++)
-    test_data[i] = i;
+  SFIFO_TEST (f->head_chunk == 0, "should have no head chunk");
+  /* Fifo is full so tail and ooo_enq should be 0 */
+  SFIFO_TEST (f->tail_chunk == 0, "should have no tail chunk");
+  SFIFO_TEST (f->ooo_enq == 0, "should have no ooo enq chunk");
 
   /*
-   * Enqueue/deq boundary conditions
+   * Peek and validate data
    */
-  svm_fifo_init_pointers (f, 201, 201);
-  SFIFO_TEST (f->tail_chunk->start_byte == 201, "start byte expected %u is "
-             "%u", 201, f->tail_chunk->start_byte);
 
-  svm_fifo_enqueue (f, 200, test_data);
-  SFIFO_TEST (f->tail_chunk->start_byte == 401, "start byte expected %u is "
-             "%u", 401, f->tail_chunk->start_byte);
+  memset (data_buf, 0, vec_len (data_buf));
+
+  rv = dequeue_ooo_inc (f, data_buf, fifo_size, 10);
+  SFIFO_TEST (!rv, "ooo deq should work %d", rv);
 
-  svm_fifo_dequeue (f, 200, data_buf);
-  SFIFO_TEST (f->head_chunk->start_byte == 401, "start byte expected %u is "
-             "%u", 401, f->head_chunk->start_byte);
+  rv = compare_data (data_buf, test_data, 0, vec_len (test_data),
+                    (u32 *) & i);
+  if (rv)
+    vlib_cli_output (vm, "[%d] dequeued %u expected %u", i, data_buf[i],
+                    test_data[i]);
+  SFIFO_TEST ((rv == 0), "peeked compared to original returned %d", rv);
+  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  /* Peeked all the data in a full fifo so ooo_deq ends up 0 */
+  SFIFO_TEST (f->ooo_deq == 0, "should have no ooo deq chunk");
 
   /*
-   * Simple enqueue/deq and data validation (1)
+   * Peek in reverse order and validate data
+   *
+   * RB tree should be exercised
    */
-  svm_fifo_init_pointers (f, f->nitems / 2, f->nitems / 2);
-  for (i = 0; i < test_n_bytes; i++)
+
+  memset (data_buf, 0, vec_len (data_buf));
+  for (i = 10; i >= 0; i--)
     {
-      rv = svm_fifo_enqueue (f, sizeof (u8), &test_data[i]);
-      if (rv < 0)
-       {
-         clib_warning ("enqueue returned %d", rv);
-         goto cleanup;
-       }
+      offset = i * enq_chunk;
+      deq_now = clib_min (enq_chunk, vec_len (test_data) - offset);
+      rv = svm_fifo_peek (f, offset, deq_now, data_buf + offset);
+      if (rv != deq_now)
+       SFIFO_TEST (0, "failed to peek");
     }
 
-  SFIFO_TEST (svm_fifo_max_dequeue (f) == test_n_bytes, "max deq expected %u "
-             "is %u", test_n_bytes, svm_fifo_max_dequeue (f));
-
-  for (i = 0; i < test_n_bytes; i++)
-    svm_fifo_dequeue (f, 1, &data_buf[i]);
-
   rv = compare_data (data_buf, test_data, 0, vec_len (test_data),
-                    (u32 *) & j);
+                    (u32 *) & i);
+
   if (rv)
-    vlib_cli_output (vm, "[%d] dequeued %u expected %u", j, data_buf[j],
-                    test_data[j]);
-  SFIFO_TEST ((rv == 0), "dequeued compared to original returned %d", rv);
+    vlib_cli_output (vm, "[%d] dequeued %u expected %u", i, data_buf[i],
+                    test_data[i]);
+  SFIFO_TEST ((rv == 0), "peeked compared to original returned %d", rv);
   SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  /* Last chunk peeked is the first, so ooo_deq should be non zero */
+  SFIFO_TEST (f->ooo_deq != 0, "should have ooo deq chunk");
 
   /*
-   * Simple enqueue/deq and data validation (2)
+   * Dequeue drop all bytes
    */
-  for (i = 0; i <= n_enqs; i++)
-    {
-      rv = svm_fifo_enqueue (f, enq_bytes, test_data + i * enq_bytes);
-      if (rv < 0)
-       {
-         clib_warning ("enqueue returned %d", rv);
-         goto cleanup;
-       }
-    }
+  rv = svm_fifo_dequeue_drop (f, fifo_size);
+  SFIFO_TEST ((rv == fifo_size), "all bytes should be dropped %u", rv);
+  last_start_byte += 8192;
 
-  SFIFO_TEST (svm_fifo_max_dequeue (f) == test_n_bytes, "max deq expected %u "
-             "is %u", test_n_bytes, svm_fifo_max_dequeue (f));
+  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  SFIFO_TEST (f->head_chunk == 0, "should have no head chunk");
+  SFIFO_TEST (f->tail_chunk == 0, "should have no tail chunk");
 
-  for (i = 0; i <= n_deqs; i++)
-    svm_fifo_dequeue (f, deq_bytes, data_buf + i * deq_bytes);
+  /* We don't remove the last chunk even when the fifo goes empty */
+  rv = svm_fifo_n_chunks (f);
+  SFIFO_TEST (rv == 1, "should have %u chunks has %u", 1, rv);
 
-  rv = compare_data (data_buf, test_data, 0, vec_len (test_data),
-                    (u32 *) & j);
-  if (rv)
-    vlib_cli_output (vm, "[%d] dequeued %u expected %u", j, data_buf[j],
-                    test_data[j]);
-  SFIFO_TEST ((rv == 0), "dequeued compared to original returned %d", rv);
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  /*
+   * Increase size such that it can't be the sum of multiple chunk lengths
+   *
+   * A chunk of 16kB should be allocated
+   */
+  fifo_size += 2 * fifo_inc - 100;
+  svm_fifo_set_size (f, fifo_size);
+  validate_test_and_buf_vecs (&test_data, &data_buf, fifo_size + fifo_inc);
+  enq_chunk = vec_len (test_data) / 10;
+  memset (data_buf, 0, vec_len (data_buf));
 
   /*
-   * Simple enqueue and drop
+   * Enqueue data ooo
    */
-  for (i = 0; i <= n_enqs; i++)
-    {
-      rv = svm_fifo_enqueue (f, enq_bytes, test_data + i * enq_bytes);
-      if (rv < 0)
-       SFIFO_TEST (0, "failed to enqueue");
-    }
+  rv = enqueue_ooo (f, test_data, fifo_size, 10);
+  SFIFO_TEST (!rv, "enqueue ooo should work");
+
+  SFIFO_TEST (svm_fifo_size (f) == fifo_size, "size expected %u is %u",
+             fifo_size, svm_fifo_size (f));
+  SFIFO_TEST (svm_fifo_max_dequeue (f) == 0, "max deq should be %u", 0);
+  /* Fifo has 2 chunks because the we didn't allow the first chunk to be
+   * freed when all the data was dequeued. Could be optimized in the future */
+  rv = svm_fifo_n_chunks (f);
+  SFIFO_TEST (rv == 2, "should have %u chunks has %u", 2, rv);
+  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+
+  SFIFO_TEST (f->head_chunk == 0, "should have no head chunk");
+  /* When new fifo chunks are allocated, tail is initialized */
+  SFIFO_TEST (f->tail_chunk != 0, "should have no tail chunk");
+  SFIFO_TEST (f->ooo_enq != 0, "should have an ooo enq chunk");
 
-  rv = svm_fifo_dequeue_drop (f, test_n_bytes / 2);
-  SFIFO_TEST (rv == test_n_bytes / 2, "drop should be equal");
-  SFIFO_TEST (svm_fifo_is_sane (f), "head chunk should be valid");
-  rv = svm_fifo_dequeue_drop (f, test_n_bytes / 2);
-  SFIFO_TEST (rv == test_n_bytes / 2, "drop should be equal");
-  SFIFO_TEST (svm_fifo_is_sane (f), "head chunk should be valid");
-  SFIFO_TEST (svm_fifo_max_dequeue (f) == 0, "should be empty");
+  c = f->end_chunk;
+  SFIFO_TEST (c->start_byte == last_start_byte,
+             "end chunk should start at %u", last_start_byte);
+  SFIFO_TEST (c->length == 16384, "end chunk length should be %u", 16384);
+  SFIFO_TEST (f->ooo_enq == c, "ooo enq chunk should be end chunk");
 
   /*
-   * Simple enqueue and drop all
+   * Enqueue the first chunk
    */
+  rv = svm_fifo_enqueue (f, enq_chunk, test_data);
+  SFIFO_TEST (rv == fifo_size, "enq should succeed %u", rv);
+  rv = svm_fifo_max_dequeue (f);
+  SFIFO_TEST (rv == fifo_size, "max deq should be %u is %u", fifo_size, rv);
+  rv = svm_fifo_n_chunks (f);
+  SFIFO_TEST (rv == 2, "should have %u chunks has %u", 2, rv);
+  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
 
-  /* Enqueue just enough data to make sure fifo is not full */
-  for (i = 0; i <= n_enqs / 2; i++)
-    {
-      rv = svm_fifo_enqueue (f, enq_bytes, test_data + i * enq_bytes);
-      if (rv < 0)
-       SFIFO_TEST (0, "failed to enqueue");
-    }
-
-  /* check drop all as well */
-  svm_fifo_dequeue_drop_all (f);
-  SFIFO_TEST (svm_fifo_is_sane (f), "head chunk should be valid");
-  SFIFO_TEST (svm_fifo_max_dequeue (f) == 0, "should be empty");
+  /*
+   * Dequeue just a part of data
+   */
+  rv = svm_fifo_dequeue (f, fifo_inc, data_buf);
+  SFIFO_TEST (rv == fifo_inc, "should dequeue all data");
+  rv = svm_fifo_n_chunks (f);
+  SFIFO_TEST (rv == 1, "should have %u chunks has %u", 1, rv);
+  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
 
   /*
-   * OOO enqueues/dequeues and data validation (1)
+   * Enqueue ooo as much data as it was dequeued
    */
-  for (i = test_n_bytes - 1; i > 0; i--)
-    {
-      rv = svm_fifo_enqueue_with_offset (f, i, sizeof (u8), &test_data[i]);
-      if (verbose)
-       vlib_cli_output (vm, "add [%d] [%d, %d]", i, i, i + sizeof (u8));
-      if (rv)
-       {
-         clib_warning ("enqueue returned %d", rv);
-         goto cleanup;
-       }
-    }
+  rv = enqueue_ooo (f, test_data + fifo_size, fifo_inc, 2);
+  SFIFO_TEST (!rv, "ooo enqueue should work %d", rv);
+
+  rv = svm_fifo_enqueue (f, fifo_inc / 2, test_data + fifo_size);
+  SFIFO_TEST (rv == fifo_inc, "enqueue should work %d", rv);
 
+  rv = svm_fifo_n_chunks (f);
+  SFIFO_TEST (rv == 2, "should have %u chunks has %u", 2, rv);
   SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
-  SFIFO_TEST (svm_fifo_max_dequeue (f) == 0, "max deq expected %u is %u",
-             0, svm_fifo_max_dequeue (f));
 
-  svm_fifo_enqueue (f, sizeof (u8), &test_data[0]);
+  last_start_byte += 16384;
+  c = f->end_chunk;
+  SFIFO_TEST (c->start_byte == last_start_byte,
+             "end chunk should start at %u", last_start_byte);
+  SFIFO_TEST (c->length == 4096, "end chunk length should be %u", 4096);
 
-  memset (data_buf, 0, vec_len (data_buf));
-  offset = 0;
-  for (i = 0; i <= n_deqs; i++)
-    {
-      rv = svm_fifo_peek (f, offset, deq_bytes, data_buf + i * deq_bytes);
-      if (rv < 0 || (rv != deq_bytes && i != n_deqs))
-       SFIFO_TEST (0, "unexpected peek %d", rv);
-      offset += rv;
-    }
-  svm_fifo_dequeue_drop (f, offset);
+  /*
+   * Dequeue all
+   */
+  rv = svm_fifo_dequeue (f, fifo_size, data_buf + fifo_inc);
+  SFIFO_TEST (rv == fifo_size, "should dequeue all data");
 
   rv = compare_data (data_buf, test_data, 0, vec_len (test_data),
-                    (u32 *) & j);
+                    (u32 *) & i);
   if (rv)
-    vlib_cli_output (vm, "[%d] dequeued %u expected %u", j, data_buf[j],
-                    test_data[j]);
+    vlib_cli_output (vm, "[%d] dequeued %u expected %u", i, data_buf[i],
+                    test_data[i]);
   SFIFO_TEST ((rv == 0), "dequeued compared to original returned %d", rv);
   SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  /* fifo does not end on chunk boundary because of the - 100 */
+  SFIFO_TEST (f->head_chunk != 0, "should have head chunk");
+  SFIFO_TEST (f->tail_chunk != 0, "should have tail chunk");
 
   /*
-   * OOO enqueues/dequeues and data validation (2)
+   * Enqueue and dequeue byte-by-byte ooo
    */
 
-  for (i = n_enqs; i > 0; i--)
-    {
-      u32 enq_now = clib_min (enq_bytes, vec_len (test_data) - i * enq_bytes);
-      rv = svm_fifo_enqueue_with_offset (f, i * enq_bytes, enq_now,
-                                        test_data + i * enq_bytes);
-      if (verbose)
-       vlib_cli_output (vm, "add [%d, %d]", i * enq_bytes,
-                        i * enq_bytes + enq_now);
-      if (rv)
-       {
-         clib_warning ("enqueue returned %d", rv);
-         goto cleanup;
-       }
-    }
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  memset (data_buf, 0, vec_len (data_buf));
 
-  svm_fifo_enqueue (f, enq_bytes, &test_data[0]);
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  rv = enqueue_ooo (f, test_data, fifo_size, fifo_size);
+  SFIFO_TEST (!rv, "ooo enqueue should work %d", rv);
 
-  memset (data_buf, 0, vec_len (data_buf));
-  for (i = 0; i <= n_deqs; i++)
-    svm_fifo_dequeue (f, deq_bytes, data_buf + i * deq_bytes);
+  rv = svm_fifo_enqueue (f, 1, test_data);
+  SFIFO_TEST (rv == fifo_size, "enqueue should work %d", rv);
 
-  rv = compare_data (data_buf, test_data, 0, vec_len (test_data),
-                    (u32 *) & j);
+  rv = dequeue_ooo (f, data_buf, fifo_size, fifo_size);
+  SFIFO_TEST (!rv, "ooo deq should work %d", rv);
+
+  rv = compare_data (data_buf, test_data, 0, fifo_size, (u32 *) & i);
   if (rv)
-    vlib_cli_output (vm, "[%d] dequeued %u expected %u", j, data_buf[j],
-                    test_data[j]);
+    vlib_cli_output (vm, "[%d] dequeued %u expected %u", i, data_buf[i],
+                    test_data[i]);
   SFIFO_TEST ((rv == 0), "dequeued compared to original returned %d", rv);
   SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
 
+  last_start_byte += 4096;
+  c = f->end_chunk;
+  SFIFO_TEST (c->start_byte == last_start_byte,
+             "end chunk should start at %u", last_start_byte);
+  SFIFO_TEST (c->length == 16384, "end chunk length should be %u", 16384);
+
+  rv = svm_fifo_n_chunks (f);
+  SFIFO_TEST (rv == 2, "should have %u chunks has %u", 2, rv);
+
   /*
-   * Cleanup
+   * Dequeue drop all bytes
    */
+  rv = svm_fifo_dequeue_drop (f, fifo_size);
+  SFIFO_TEST ((rv == fifo_size), "all bytes should be dropped %u", rv);
 
-cleanup:
+  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  SFIFO_TEST (f->head_chunk != 0, "should have head chunk");
+  SFIFO_TEST (f->tail_chunk != 0, "should have tail chunk");
 
-  c = f->start_chunk->next;
-  while (c && c != f->start_chunk)
-    {
-      next = c->next;
-      clib_mem_free (c);
-      c = next;
-    }
+  /* We don't remove the last chunk even when the fifo goes empty */
+  rv = svm_fifo_n_chunks (f);
+  SFIFO_TEST (rv == 1, "should have %u chunks has %u", 1, rv);
+  SFIFO_TEST (f->ooo_enq == 0, "should have no ooo enq chunk");
+  SFIFO_TEST (f->ooo_deq == 0, "should have no ooo deq chunk");
 
-  svm_fifo_free (f);
-  vec_free (test_data);
-  vec_free (data_buf);
-  return 0;
-}
+  /*
+   * Grow fifo to 4MB and force only 4kB chunk allocations
+   */
+  fifo_size = 4 << 20;
+  svm_fifo_set_size (f, fifo_inc);
+  validate_test_and_buf_vecs (&test_data, &data_buf, fifo_size);
+  enq_chunk = 1500;
+  memset (data_buf, 0, vec_len (data_buf));
 
-static int
-chunk_list_len (svm_fifo_chunk_t * c)
-{
-  svm_fifo_chunk_t *it;
-  int count = 0;
+  rv = enqueue_packets_inc (f, fifo_size, enq_chunk, test_data);
+  SFIFO_TEST (!rv, "incremental packet enqueue should work");
 
-  if (!c)
-    return 0;
+  SFIFO_TEST (svm_fifo_max_dequeue (f) == fifo_size, "max deq should be %u",
+             fifo_size);
+  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
 
-  count = 1;
-  it = c->next;
-  while (it && it != c)
-    {
-      it = it->next;
-      count++;
-    }
-  return count;
-}
+  rv = svm_fifo_n_chunks (f);
+  SFIFO_TEST (rv == (fifo_size / 4096) + 1, "should have %u chunks has %u",
+             (fifo_size / 4096) + 1, rv);
 
-static void
-chunk_list_free (svm_fifo_chunk_t * c, svm_fifo_chunk_t * stop)
-{
-  svm_fifo_chunk_t *it, *next;
 
-  it = c;
-  while (it && it != stop)
-    {
-      next = it->next;
-      clib_mem_free (it);
-      it = next;
-    }
-}
+  /*
+   * Dequeue all
+   */
+  rv = svm_fifo_dequeue (f, fifo_size, data_buf);
+  SFIFO_TEST (rv == fifo_size, "should dequeue all data");
 
-static void
-chunk_list_splice (svm_fifo_chunk_t * a, svm_fifo_chunk_t * b)
-{
-  svm_fifo_chunk_t *it;
+  rv = compare_data (data_buf, test_data, 0, vec_len (test_data),
+                    (u32 *) & i);
+  if (rv)
+    vlib_cli_output (vm, "[%d] dequeued %u expected %u", i, data_buf[i],
+                    test_data[i]);
+  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  SFIFO_TEST ((rv == 0), "dequeued compared to original returned %d", rv);
+  SFIFO_TEST (f->ooo_deq == 0, "should have no ooo deq chunk");
+  rv = svm_fifo_n_chunks (f);
+  SFIFO_TEST (rv == 1, "should have %u chunks has %u", 1, rv);
 
-  it = a;
-  while (it->next)
-    it = it->next;
-  it->next = b;
+  /*
+   * Cleanup
+   */
+
+  ft_fifo_free (fs, f);
+  ft_fifo_segment_free (fsm, fs);
+  vec_free (test_data);
+  vec_free (data_buf);
+  return 0;
 }
 
 static int
 sfifo_test_fifo_shrink (vlib_main_t * vm, unformat_input_t * input)
 {
-  int __clib_unused verbose = 0, fifo_size = 101, chunk_size = 100;
-  int i, rv, test_n_bytes, diff, deq_bytes;
-  svm_fifo_chunk_t *c, *prev, *collected;
+  int __clib_unused verbose = 0, fifo_size = 4096, deq_chunk;
+  fifo_segment_main_t _fsm = { 0 }, *fsm = &_fsm;
   u8 *test_data = 0, *data_buf = 0;
+  fifo_segment_t *fs;
   svm_fifo_t *f;
+  u32 enq_chunk;
+  int i, rv;
 
   while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
     {
@@ -1582,404 +1760,214 @@ sfifo_test_fifo_shrink (vlib_main_t * vm, unformat_input_t * input)
     }
 
   /*
-   * Init fifo with multiple chunks
+   * Init fifo and enqueue data such that multiple 4096 chunks are allocated
    */
-  f = fifo_prepare (fifo_size);
-  svm_fifo_init_pointers (f, 0, 0);
-
-  prev = 0;
-  for (i = 0; i < 11; i++)
-    {
-      c = clib_mem_alloc (sizeof (svm_fifo_chunk_t) + chunk_size);
-      c->length = 100;
-      c->start_byte = ~0;
-      c->next = prev;
-      prev = c;
-    }
+  fs = fifo_segment_prepare (fsm, "fifo-shrink", 0);
+  f = fifo_prepare (fs, fifo_size);
 
-  svm_fifo_add_chunk (f, c);
-  SFIFO_TEST (f->size == 12 * chunk_size + 1, "size expected %u is %u",
-             12 * chunk_size + 1, f->size);
-
-  /*
-   * No fifo wrap and no chunk used (one chunk)
-   */
-  rv = svm_fifo_reduce_size (f, chunk_size, 0);
-  SFIFO_TEST (rv == chunk_size, "len expected %u is %u", chunk_size, rv);
-  SFIFO_TEST (f->size == 12 * chunk_size + 1, "size expected %u is %u",
-             12 * chunk_size + 1, f->size);
-  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  fifo_size = 4 << 20;
+  svm_fifo_set_size (f, 4096);
+  validate_test_and_buf_vecs (&test_data, &data_buf, fifo_size);
+  enq_chunk = 1500;
+  rv = enqueue_packets_inc (f, fifo_size, enq_chunk, test_data);
+  SFIFO_TEST (!rv, "incremental packet enqueue should work");
 
-  /* Check enqueue space to force size reduction */
-  (void) svm_fifo_max_enqueue (f);
-
-  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
-             11 * chunk_size + 1, f->size);
-  SFIFO_TEST (f->flags & SVM_FIFO_F_COLLECT_CHUNKS, "collect flag should"
-             " be set");
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_SHRINK), "shrink flag should not be"
-             " set");
+  rv = svm_fifo_max_enqueue (f);
+  SFIFO_TEST (rv == 0, "enqueue space %u", rv);
+  SFIFO_TEST (svm_fifo_max_dequeue (f) == fifo_size, "max deq should be %u",
+             fifo_size);
   SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
 
-  collected = c = svm_fifo_collect_chunks (f);
-  rv = chunk_list_len (c);
-  SFIFO_TEST (rv == 1, "expected %u chunks got %u", 1, rv);
-  rv = chunk_list_len (f->start_chunk);
-  SFIFO_TEST (rv == 11, "expected %u chunks got %u", 11, rv);
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
-             " not be set");
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  rv = svm_fifo_n_chunks (f);
+  SFIFO_TEST (rv == (fifo_size / 4096), "should have %u chunks has %u",
+             (fifo_size / 4096), rv);
 
   /*
-   * Fifo wrap and multiple chunks used
+   * Dequeue enough to collect one chunk
    */
+  deq_chunk = 4096;
+  rv = svm_fifo_dequeue (f, deq_chunk, data_buf);
+  SFIFO_TEST (rv == deq_chunk, "should dequeue all data");
 
-  /* Init test data and fifo */
-  test_n_bytes = f->nitems;
-  vec_validate (test_data, test_n_bytes - 1);
-  vec_validate (data_buf, vec_len (test_data));
-
-  for (i = 0; i < vec_len (test_data); i++)
-    test_data[i] = i;
-
-  svm_fifo_init_pointers (f, f->size / 2, f->size / 2);
-  for (i = 0; i < test_n_bytes; i++)
-    {
-      rv = svm_fifo_enqueue (f, sizeof (u8), &test_data[i]);
-      if (rv < 0)
-       SFIFO_TEST (0, "enqueue returned");
-    }
-
-  /* Try to reduce fifo size with fifo full */
-  rv = svm_fifo_reduce_size (f, 3.5 * chunk_size, 0);
-  SFIFO_TEST (rv == 3 * chunk_size, "len expected %u is %u", 3 * chunk_size,
-             rv);
-  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
-             11 * chunk_size + 1, f->size);
-  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
+  rv = svm_fifo_n_chunks (f);
+  SFIFO_TEST (rv == (fifo_size / 4096) - 1, "should have %u chunks has %u",
+             (fifo_size / 4096) - 1, rv);
   SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
 
-  /* Check enqueue space to try size reduction. Should not work */
   rv = svm_fifo_max_enqueue (f);
+  SFIFO_TEST (rv == deq_chunk, "enqueue space %u", rv);
 
-  SFIFO_TEST (rv == 0, "free space expected %u is %u", 0, rv);
-  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
-             11 * chunk_size + 1, f->size);
-  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
-             " not be set");
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
-
-  /* Dequeue byte-by-byte up to last byte on last chunk */
-  deq_bytes = f->size - f->size / 2 - 1;
-  for (i = 0; i < deq_bytes; i++)
-    {
-      (void) svm_fifo_max_enqueue (f);
-      rv = svm_fifo_dequeue (f, 1, &data_buf[i]);
-      if (rv < 0)
-       SFIFO_TEST (0, "dequeue returned");
-    }
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  /*
+   * Dequeue ooo byte-by-byte remaining data
+   */
+  rv = dequeue_ooo (f, data_buf + deq_chunk, fifo_size - deq_chunk,
+                   fifo_size - deq_chunk);
+  SFIFO_TEST (!rv, "ooo deq should work %d", rv);
 
-  rv = svm_fifo_max_enqueue (f);
+  rv = compare_data (data_buf, test_data, 0, fifo_size, (u32 *) & i);
+  if (rv)
+    vlib_cli_output (vm, "[%d] dequeued %u expected %u", i, data_buf[i],
+                    test_data[i]);
 
-  /* We've dequeued more than 3*chunk_size so nitems should be updated */
-  SFIFO_TEST (f->nitems == 8 * chunk_size, "nitems expected %u is %u",
-             8 * chunk_size, f->nitems);
-  /* Free space should be what was dequeued - 3 * chunk_size, which was
-   * consumed by shrinking the fifo */
-  diff = deq_bytes - 3 * chunk_size;
-  SFIFO_TEST (rv == diff, "free space expected %u is %u", diff, rv);
-  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
-             11 * chunk_size + 1, f->size);
-  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
-             " not be set");
   SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  rv = svm_fifo_n_chunks (f);
+  SFIFO_TEST (rv == (fifo_size / 4096) - 1, "should have %u chunks has %u",
+             (fifo_size / 4096) - 1, rv);
 
-  /* Dequeue one more such that head goes beyond last chunk */
-  rv = svm_fifo_dequeue (f, 1, &data_buf[deq_bytes]);
-  if (rv < 0)
-    SFIFO_TEST (0, "dequeue returned");
-
-  rv = svm_fifo_max_enqueue (f);
-  SFIFO_TEST (f->nitems == 8 * chunk_size, "nitems expected %u is %u",
-             8 * chunk_size, f->nitems);
-  SFIFO_TEST (rv == diff + 1, "free space expected %u is %u", diff + 1, rv);
-  SFIFO_TEST (f->size == 8 * chunk_size + 1, "size expected %u is %u",
-             8 * chunk_size + 1, f->size);
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_SHRINK), "shrink flag should not be"
-             " set");
-  SFIFO_TEST (f->flags & SVM_FIFO_F_COLLECT_CHUNKS, "collect flag should"
-             " be set");
+  /*
+   * Drop all data
+   */
+  rv = svm_fifo_dequeue_drop (f, fifo_size - deq_chunk);
+  SFIFO_TEST (rv == fifo_size - deq_chunk, "should drop all data");
   SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
-
-  /* Dequeue the rest of the data */
-  deq_bytes += 1;
-  for (i = 0; i < test_n_bytes - deq_bytes; i++)
-    {
-      rv = svm_fifo_dequeue (f, 1, &data_buf[i + deq_bytes]);
-      if (rv < 0)
-       SFIFO_TEST (0, "dequeue returned");
-    }
-
+  rv = svm_fifo_n_chunks (f);
+  SFIFO_TEST (rv == 1, "should have %u chunks has %u", 1, rv);
   rv = svm_fifo_max_enqueue (f);
+  SFIFO_TEST (rv == fifo_size, "enqueue space %u", rv);
 
-  SFIFO_TEST (f->size == 8 * chunk_size + 1, "size expected %u is %u",
-             8 * chunk_size + 1, f->size);
-  SFIFO_TEST (rv == 8 * chunk_size, "free space expected %u is %u",
-             8 * chunk_size, rv);
-
-  rv = compare_data (data_buf, test_data, 0, vec_len (test_data),
-                    (u32 *) & i);
-  if (rv)
-    SFIFO_TEST (0, "[%d] dequeued %u expected %u", i, data_buf[i],
-               test_data[i]);
-
-  c = svm_fifo_collect_chunks (f);
-  rv = chunk_list_len (c);
-  SFIFO_TEST (rv == 3, "expected %u chunks got %u", 3, rv);
-  rv = chunk_list_len (f->start_chunk);
-  SFIFO_TEST (rv == 8, "expected %u chunks got %u", 8, rv);
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
-             " not be set");
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
 
   /*
-   * OOO segment on chunk that should be removed
+   * Reset size and enqueue ooo all data
    */
-
-  svm_fifo_add_chunk (f, c);
-  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
-             11 * chunk_size + 1, f->size);
-
-  memset (data_buf, 0, vec_len (data_buf));
-  svm_fifo_init_pointers (f, f->size / 2, f->size / 2);
-  svm_fifo_enqueue (f, 200, test_data);
-  svm_fifo_enqueue_with_offset (f, 50, vec_len (test_data) - 250,
-                               &test_data[250]);
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
-
-  /* Free space */
-  rv = svm_fifo_max_enqueue (f);
-  SFIFO_TEST (rv == vec_len (test_data) - 200, "free space expected %u is %u",
-             vec_len (test_data) - 200, rv);
+  svm_fifo_set_size (f, 4096);
+  enq_chunk = deq_chunk = 1500;
+  rv = enqueue_ooo_packets (f, vec_len (test_data), 1500, test_data);
+  SFIFO_TEST (!rv, "enqueue ooo should work");
   SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
 
-  /* Ask to reduce size */
-  rv = svm_fifo_reduce_size (f, 3.5 * chunk_size, 0);
-  SFIFO_TEST (rv == 3 * chunk_size, "len expected %u is %u", 3 * chunk_size,
-             rv);
-  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
-             11 * chunk_size + 1, f->size);
-  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
-
-  /* Try to force size reduction but it should fail */
-  rv = svm_fifo_max_enqueue (f);
+  /* 1 additional chunk left from previous test */
+  rv = svm_fifo_n_chunks (f);
+  SFIFO_TEST (rv == (fifo_size / 4096) + 1, "should have %u chunks has %u",
+             (fifo_size / 4096) + 1, rv);
 
-  SFIFO_TEST (rv == vec_len (test_data) - 200, "free space expected %u is %u",
-             vec_len (test_data) - 200, rv);
-  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
-             11 * chunk_size + 1, f->size);
-  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
-             " not be set");
+  /*
+   * Add missing first chunk
+   */
+  rv = svm_fifo_enqueue (f, enq_chunk, test_data);
+  SFIFO_TEST (rv == fifo_size, "enq should succeed %u", rv);
   SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  rv = svm_fifo_max_dequeue (f);
+  SFIFO_TEST (rv == fifo_size, "max deq should be %u is %u", fifo_size, rv);
+  rv = svm_fifo_n_chunks (f);
+  SFIFO_TEST (rv == (fifo_size / 4096) + 1, "should have %u chunks has %u",
+             (fifo_size / 4096) + 1, rv);
 
-  /* Dequeue the in order data. This should shrink nitems */
-  rv = svm_fifo_dequeue (f, 200, data_buf);
-  if (rv < 0)
-    SFIFO_TEST (0, "dequeue returned");
+  /*
+   * Dequeue as packets
+   */
+  memset (data_buf, 0, vec_len (data_buf));
+  rv = dequeue_packets (f, fifo_size, deq_chunk, data_buf);
+  SFIFO_TEST (!rv, "deq pkts should work %d", rv);
 
-  rv = svm_fifo_max_enqueue (f);
-  SFIFO_TEST (rv == vec_len (test_data) - 200, "free space expected %u is %u",
-             vec_len (test_data) - 200, rv);
-  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
-             11 * chunk_size + 1, f->size);
-  SFIFO_TEST (f->nitems == 11 * chunk_size - 200, "nitems expected %u is %u",
-             11 * chunk_size - 200, f->nitems);
-  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
-             " not be set");
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  rv = compare_data (data_buf, test_data, 0, fifo_size, (u32 *) & i);
+  if (rv)
+    vlib_cli_output (vm, "[%d] dequeued %u expected %u", i, data_buf[i],
+                    test_data[i]);
 
-  /* Enqueue the missing 50 bytes. Fifo will become full */
-  rv = svm_fifo_enqueue (f, 50, &test_data[200]);
-  SFIFO_TEST (rv == vec_len (test_data) - 200, "free space expected %u is %u",
-             vec_len (test_data) - 200, rv);
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  /*
+   * Enqueue and dequeue set of packets
+   */
+  svm_fifo_set_size (f, 4096);
+  for (i = 0; i < 1000; i++)
+    {
+      rv = svm_fifo_enqueue (f, enq_chunk, test_data);
+      if (rv != enq_chunk)
+       SFIFO_TEST (0, "enq fail");
+      rv = svm_fifo_dequeue (f, deq_chunk, data_buf);
+      if (rv != deq_chunk)
+       SFIFO_TEST (0, "deq fail");
+    }
 
-  rv = svm_fifo_max_enqueue (f);
+  /*
+   * Cleanup
+   */
 
-  SFIFO_TEST (rv == 0, "free space expected %u is %u", 0, rv);
-  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
-             11 * chunk_size + 1, f->size);
-  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
-             " not be set");
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  ft_fifo_free (fs, f);
+  ft_fifo_segment_free (fsm, fs);
+  vec_free (test_data);
+  vec_free (data_buf);
 
+  return 0;
+}
 
-  /* Dequeue a chunk and check nitems shrink but fifo still full */
-  svm_fifo_dequeue (f, 100, &data_buf[200]);
+static int
+sfifo_test_fifo_indirect (vlib_main_t * vm, unformat_input_t * input)
+{
+  int __clib_unused verbose = 0, fifo_size = 4096, deq_chunk;
+  fifo_segment_main_t _fsm = { 0 }, *fsm = &_fsm;
+  u8 *test_data = 0, *data_buf = 0;
+  svm_fifo_chunk_t *c;
+  fifo_segment_t *fs;
+  svm_fifo_t *f;
+  int rv;
 
-  rv = svm_fifo_max_enqueue (f);
+  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
+    {
+      if (unformat (input, "verbose"))
+       verbose = 1;
+      else
+       {
+         vlib_cli_output (vm, "parse error: '%U'", format_unformat_error,
+                          input);
+         return -1;
+       }
+    }
 
-  SFIFO_TEST (rv == 0, "free space expected %u is %u", 0, rv);
-  SFIFO_TEST (f->size == 11 * chunk_size + 1, "size expected %u is %u",
-             11 * chunk_size + 1, f->size);
-  SFIFO_TEST (f->nitems == 11 * chunk_size - 300, "nitems expected %u is %u",
-             11 * chunk_size - 300, f->nitems);
-  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
-             " not be set");
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  /*
+   * Init fifo and enqueue data such that multiple 4096 chunks are allocated
+   */
+  fs = fifo_segment_prepare (fsm, "fifo-indirect", 0);
+  f = fifo_prepare (fs, fifo_size);
 
-  /* Dequeue enough to unwrap the fifo */
-  deq_bytes = f->size - f->size / 2 - 300;
-  svm_fifo_dequeue (f, deq_bytes, &data_buf[300]);
-  rv = svm_fifo_max_enqueue (f);
+  fifo_size = 4 << 20;
+  svm_fifo_set_size (f, fifo_size);
+  validate_test_and_buf_vecs (&test_data, &data_buf, fifo_size);
 
-  /* Overall we've dequeued deq_bytes + 300, but fifo size shrunk 300 */
-  SFIFO_TEST (rv == 300 + deq_bytes - 300, "free space expected %u is %u",
-             300 + deq_bytes - 300, rv);
-  SFIFO_TEST (f->size == 8 * chunk_size + 1, "size expected %u is %u",
-             8 * chunk_size + 1, f->size);
-  SFIFO_TEST (f->nitems == 8 * chunk_size, "nitems expected %u is %u",
-             8 * chunk_size, f->nitems);
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_SHRINK), "shrink flag should not be"
-             " set");
-  SFIFO_TEST (f->flags & SVM_FIFO_F_COLLECT_CHUNKS, "collect flag should"
-             " be set");
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  c = f->start_chunk;
+  SFIFO_TEST (c->next == 0, "no next");
 
-  /* Dequeue the rest */
-  svm_fifo_dequeue (f, test_n_bytes / 2, &data_buf[300 + deq_bytes]);
-  rv = compare_data (data_buf, test_data, 0, vec_len (test_data),
-                    (u32 *) & i);
-  if (rv)
-    SFIFO_TEST (0, "[%d] dequeued %u expected %u", i, data_buf[i],
-               test_data[i]);
-
-  c = svm_fifo_collect_chunks (f);
-  rv = chunk_list_len (c);
-  SFIFO_TEST (rv == 3, "expected %u chunks got %u", 3, rv);
-  rv = chunk_list_len (f->start_chunk);
-  SFIFO_TEST (rv == 8, "expected %u chunks got %u", 8, rv);
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
-             " not be set");
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  svm_fifo_fill_chunk_list (f);
+  SFIFO_TEST (c->next != 0, "new chunk should've been allocated");
+  SFIFO_TEST (c->next->length == 4 << 20, "new chunk should be 4MB");
 
-  chunk_list_splice (collected, c);
+  rv = svm_fifo_max_write_chunk (f);
+  SFIFO_TEST (rv == 4096, "max write chunk %u", rv);
 
   /*
-   * Remove all chunks possible (1)
-   *
-   * Tail and head are in first chunk that is not removed
-   */
-  svm_fifo_init_pointers (f, 600, 600);
-  rv = svm_fifo_reduce_size (f, 8 * chunk_size, 1);
-  SFIFO_TEST (rv == 7 * chunk_size, "actual len expected %u is %u",
-             7 * chunk_size, rv);
-  SFIFO_TEST (f->size == 6 * chunk_size + 1, "size expected %u is %u",
-             6 * chunk_size + 1, f->size);
-  SFIFO_TEST (f->nitems == 1 * chunk_size, "nitems expected %u is %u",
-             1 * chunk_size, f->nitems);
-  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
-             " not be set");
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
-
-  rv = svm_fifo_max_enqueue (f);
-  SFIFO_TEST (rv == chunk_size, "free space expected %u is %u", chunk_size,
-             rv);
+   * Enqueue enough to fill first chunk
+   */
+  svm_fifo_enqueue_nocopy (f, 4096);
   SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
 
-  /* Force head/tail to move to first chunk */
-  svm_fifo_enqueue (f, 1, test_data);
-  svm_fifo_dequeue (f, 1, data_buf);
-  rv = svm_fifo_max_enqueue (f);
-
-  SFIFO_TEST (rv == chunk_size, "free space expected %u is %u", chunk_size,
-             rv);
-  SFIFO_TEST (f->size == chunk_size + 1, "size expected %u is %u",
-             chunk_size + 1, f->size);
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_SHRINK), "shrink flag should not be"
-             " set");
-  SFIFO_TEST (f->flags & SVM_FIFO_F_COLLECT_CHUNKS, "collect flag should"
-             " be set");
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  c = svm_fifo_tail_chunk (f);
+  SFIFO_TEST (c == f->end_chunk, "tail is end chunk");
 
-  c = svm_fifo_collect_chunks (f);
-  rv = chunk_list_len (c);
-  SFIFO_TEST (rv == 7, "expected %u chunks got %u", 7, rv);
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
-             " not be set");
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_MULTI_CHUNK), "multi-chunk flag should"
-             " not be set");
+  /* Initialize head chunk */
+  rv = svm_fifo_max_read_chunk (f);
+  SFIFO_TEST (rv == 4096, "max read chunk %u", rv);
   SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
 
-  /* re-add chunks for next test */
-  svm_fifo_add_chunk (f, c);
-
   /*
-   * Remove all chunks possible (2)
-   *
-   * Tail and head are in the first chunk that should eventually be removed
-   */
-  svm_fifo_init_pointers (f, 601, 601);
-  rv = svm_fifo_reduce_size (f, 8 * chunk_size, 1);
-  SFIFO_TEST (rv == 7 * chunk_size, "actual len expected %u is %u",
-             7 * chunk_size, rv);
-  SFIFO_TEST (f->size == 7 * chunk_size + 1, "size expected %u is %u",
-             7 * chunk_size + 1, f->size);
-  SFIFO_TEST (f->nitems == 1 * chunk_size, "nitems expected %u is %u",
-             1 * chunk_size, f->nitems);
-  SFIFO_TEST (f->flags & SVM_FIFO_F_SHRINK, "shrink flag should be set");
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
-             " not be set");
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
-
-  rv = svm_fifo_max_enqueue (f);
-  SFIFO_TEST (rv == chunk_size, "free space expected %u is %u", chunk_size,
-             rv);
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
-
-  /* Force head/tail to move to first chunk */
-  svm_fifo_enqueue (f, chunk_size, test_data);
-  svm_fifo_dequeue (f, chunk_size, data_buf);
-  rv = svm_fifo_max_enqueue (f);
+   * Move head over last segment
+   */
+  rv = svm_fifo_dequeue (f, 4096, data_buf);
+  SFIFO_TEST (rv == 4096, "dequeue should work");
 
-  SFIFO_TEST (rv == chunk_size, "free space expected %u is %u", chunk_size,
-             rv);
-  SFIFO_TEST (f->size == chunk_size + 1, "size expected %u is %u",
-             chunk_size + 1, f->size);
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_SHRINK), "shrink flag should not be"
-             " set");
-  SFIFO_TEST (f->flags & SVM_FIFO_F_COLLECT_CHUNKS, "collect flag should"
-             " be set");
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  c = svm_fifo_head_chunk (f);
+  SFIFO_TEST (c == f->end_chunk, "head chunk should be last");
 
-  c = svm_fifo_collect_chunks (f);
-  rv = chunk_list_len (c);
-  SFIFO_TEST (rv == 7, "expected %u chunks got %u", 7, rv);
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_COLLECT_CHUNKS), "collect flag should"
-             " not be set");
-  SFIFO_TEST (!(f->flags & SVM_FIFO_F_MULTI_CHUNK), "multi-chunk flag should"
-             " not be set");
-  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+  rv = svm_fifo_max_read_chunk (f);
+  SFIFO_TEST (rv == 0, "max read chunk %u", rv);
 
-  chunk_list_splice (collected, c);
+  rv = svm_fifo_max_write_chunk (f);
+  SFIFO_TEST (rv == 4 << 20, "max write chunk %u", rv);
 
   /*
    * Cleanup
    */
 
-  chunk_list_free (f->start_chunk->next, f->start_chunk);
-  chunk_list_free (collected, 0);
-  svm_fifo_free (f);
+  ft_fifo_free (fs, f);
+  ft_fifo_segment_free (fsm, fs);
   vec_free (test_data);
   vec_free (data_buf);
 
@@ -2023,13 +2011,6 @@ sfifo_test_fifo_replay (vlib_main_t * vm, unformat_input_t * input)
 
 static fifo_segment_main_t segment_main;
 
-static svm_fifo_t *
-fifo_segment_alloc_fifo (fifo_segment_t * fs, u32 data_bytes,
-                        fifo_segment_ftype_t ftype)
-{
-  return fifo_segment_alloc_fifo_w_slice (fs, 0, data_bytes, ftype);
-}
-
 static int
 sfifo_test_fifo_segment_hello_world (int verbose)
 {
@@ -2084,6 +2065,7 @@ sfifo_test_fifo_segment_fifo_grow (int verbose)
   int rv, fifo_size = 4096, n_chunks, n_batch;
   fifo_segment_main_t *sm = &segment_main;
   fifo_segment_create_args_t _a, *a = &_a;
+  u8 *test_data = 0, *data_buf = 0;
   u32 n_free_chunk_bytes;
   fifo_segment_t *fs;
   svm_fifo_t *f;
@@ -2101,7 +2083,7 @@ sfifo_test_fifo_segment_fifo_grow (int verbose)
   SFIFO_TEST (!rv, "svm_fifo_segment_create returned %d", rv);
 
   /*
-   * Alloc and grow fifo
+   * Alloc fifo
    */
   fs = fifo_segment_get_segment (sm, a->new_segment_indices[0]);
   f = fifo_segment_alloc_fifo (fs, fifo_size, FIFO_SEGMENT_RX_FIFO);
@@ -2115,10 +2097,14 @@ sfifo_test_fifo_segment_fifo_grow (int verbose)
   SFIFO_TEST (rv == (n_batch - 1) * fifo_size, "free chunk bytes %u "
              "expected %u", rv, (n_batch - 1) * fifo_size);
 
-  /* Grow by preallocated fifo_size chunk */
-  fifo_segment_grow_fifo (fs, f, fifo_size);
-  SFIFO_TEST (f->size == 2 * fifo_size, "fifo size should be %u is %u",
-             2 * fifo_size, f->size);
+  /*
+   * Grow fifo by preallocated fifo_size chunk
+   */
+  svm_fifo_set_size (f, 2 * fifo_size);
+  validate_test_and_buf_vecs (&test_data, &data_buf, 2 * fifo_size);
+
+  rv = svm_fifo_enqueue (f, vec_len (test_data), test_data);
+  SFIFO_TEST (rv == vec_len (test_data), "enq should succeed %u", rv);
   SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
 
   n_chunks = fifo_segment_num_free_chunks (fs, fifo_size);
@@ -2133,9 +2119,13 @@ sfifo_test_fifo_segment_fifo_grow (int verbose)
   SFIFO_TEST (rv > 16 * fifo_size, "free bytes %u more than %u", rv,
              16 * fifo_size);
 
-  fifo_segment_grow_fifo (fs, f, 16 * fifo_size);
-  SFIFO_TEST (f->size == 18 * fifo_size, "fifo size should be %u is %u",
-             18 * fifo_size, f->size);
+  /* Force fifo growth */
+  svm_fifo_set_size (f, svm_fifo_size (f) + 16 * fifo_size);
+  validate_test_and_buf_vecs (&test_data, &data_buf, svm_fifo_size (f));
+  rv = svm_fifo_enqueue (f, vec_len (test_data), test_data);
+
+  SFIFO_TEST (svm_fifo_size (f) == 18 * fifo_size, "fifo size should be %u "
+             "is %u", 18 * fifo_size, svm_fifo_size (f));
 
   rv = fifo_segment_fl_chunk_bytes (fs);
   SFIFO_TEST (rv == (n_batch - 2) * fifo_size, "free chunk bytes %u "
@@ -2164,12 +2154,21 @@ sfifo_test_fifo_segment_fifo_grow (int verbose)
    */
   f = fifo_segment_alloc_fifo (fs, fifo_size, FIFO_SEGMENT_RX_FIFO);
 
-  fifo_segment_grow_fifo (fs, f, fifo_size);
+  /* Force chunk allocation */
+  svm_fifo_set_size (f, svm_fifo_size (f) + fifo_size);
+  rv = svm_fifo_enqueue (f, svm_fifo_size (f), test_data);
+
+  SFIFO_TEST (rv == svm_fifo_size (f), "enq should succeed %u", rv);
+  SFIFO_TEST (svm_fifo_is_sane (f), "fifo should be sane");
+
   n_chunks = fifo_segment_num_free_chunks (fs, fifo_size);
   SFIFO_TEST (n_chunks == n_batch - 2, "free 2^10B chunks should be %u is %u",
              n_batch - 2, n_chunks);
 
-  fifo_segment_grow_fifo (fs, f, 16 * fifo_size);
+  /* Grow and alloc 16 * fifo_size chunk */
+  svm_fifo_set_size (f, svm_fifo_size (f) + 16 * fifo_size);
+  rv = svm_fifo_enqueue (f, svm_fifo_size (f), test_data);
+
   n_chunks = fifo_segment_num_free_chunks (fs, 16 * fifo_size);
   SFIFO_TEST (n_chunks == 0, "free 2^14B chunks should be %u is %u", 0,
              n_chunks);
@@ -2261,88 +2260,16 @@ sfifo_test_fifo_segment_fifo_grow (int verbose)
    * Allocate fifo and try to grow beyond available space
    */
   f = fifo_segment_alloc_fifo (fs, fifo_size, FIFO_SEGMENT_RX_FIFO);
-  rv = fifo_segment_grow_fifo (fs, f, n_free_chunk_bytes);
-
-  SFIFO_TEST (rv == -1, "grow should fail");
-
-  fifo_segment_free_fifo (fs, f);
-
-  /*
-   * Cleanup
-   */
-  fifo_segment_delete (sm, fs);
-  vec_free (a->new_segment_indices);
-  return 0;
-}
-
-static int
-sfifo_test_fifo_segment_fifo_shrink (int verbose)
-{
-  int i, rv, chunk_size = 4096, n_chunks, n_free;
-  fifo_segment_main_t *sm = &segment_main;
-  fifo_segment_create_args_t _a, *a = &_a;
-  fifo_segment_t *fs;
-  svm_fifo_t *f;
-
-  clib_memset (a, 0, sizeof (*a));
-  a->segment_name = "fifo-test1";
-  a->segment_size = 256 << 10;
-
-  rv = fifo_segment_create (sm, a);
 
-  SFIFO_TEST (!rv, "svm_fifo_segment_create returned %d", rv);
-
-  /*
-   * Alloc and grow fifo
-   */
-  fs = fifo_segment_get_segment (sm, a->new_segment_indices[0]);
-  f = fifo_segment_alloc_fifo (fs, chunk_size, FIFO_SEGMENT_RX_FIFO);
-  n_free = FIFO_SEGMENT_ALLOC_BATCH_SIZE - 1;
-
-  SFIFO_TEST (f != 0, "svm_fifo_segment_alloc_fifo");
-
-  for (i = 0; i < 9; i++)
-    {
-      fifo_segment_grow_fifo (fs, f, chunk_size);
-      n_free -= 1;
-      if (f->size != (i + 2) * chunk_size)
-       SFIFO_TEST (0, "fifo size should be %u is %u",
-                   (i + 2) * chunk_size, f->size);
-    }
-
-  rv = svm_fifo_reduce_size (f, 3.5 * chunk_size, 1 /* is producer */ );
-  SFIFO_TEST (rv == 3 * chunk_size, "len expected %u is %u", 3 * chunk_size,
-             rv);
-
-  n_chunks = fifo_segment_num_free_chunks (fs, chunk_size);
-  SFIFO_TEST (n_chunks == n_free, "free chunks should be %u is %u", n_free,
-             n_chunks);
-
-  fifo_segment_collect_fifo_chunks (fs, f);
+  /* Try to force fifo growth */
+  svm_fifo_set_size (f, svm_fifo_size (f) + n_free_chunk_bytes);
+  validate_test_and_buf_vecs (&test_data, &data_buf, svm_fifo_size (f));
+  rv = svm_fifo_enqueue (f, svm_fifo_size (f), test_data);
 
-  n_free += 3;
-  n_chunks = fifo_segment_num_free_chunks (fs, chunk_size);
-  SFIFO_TEST (n_chunks == n_free, "free chunks should be %u is %u", n_free,
-             n_chunks);
-
-  rv = svm_fifo_reduce_size (f, 7 * chunk_size - 1, 1 /* is producer */ );
-  SFIFO_TEST (rv == 6 * chunk_size, "len expected %u is %u", 6 * chunk_size,
-             rv);
-
-  fifo_segment_collect_fifo_chunks (fs, f);
+  SFIFO_TEST (rv != svm_fifo_size (f), "grow should fail size %u wrote %d",
+             svm_fifo_size (f), rv);
 
-  n_free += 6;
-  n_chunks = fifo_segment_num_free_chunks (fs, chunk_size);
-  SFIFO_TEST (n_chunks == n_free, "free chunks should be %u is %u", n_free,
-             n_chunks);
-  /*
-   * Free
-   */
   fifo_segment_free_fifo (fs, f);
-  n_free += 1;
-  n_chunks = fifo_segment_num_free_chunks (fs, ~0);
-  SFIFO_TEST (n_chunks == n_free, "free chunks should be %u is %u", n_free,
-             n_chunks);
 
   /*
    * Cleanup
@@ -2666,11 +2593,6 @@ sfifo_test_fifo_segment (vlib_main_t * vm, unformat_input_t * input)
          if ((rv = sfifo_test_fifo_segment_fifo_grow (verbose)))
            return -1;
        }
-      else if (unformat (input, "shrink fifo"))
-       {
-         if ((rv = sfifo_test_fifo_segment_fifo_shrink (verbose)))
-           return -1;
-       }
       else if (unformat (input, "prealloc"))
        {
          if ((rv = sfifo_test_fifo_segment_prealloc (verbose)))
@@ -2684,8 +2606,6 @@ sfifo_test_fifo_segment (vlib_main_t * vm, unformat_input_t * input)
            return -1;
          if ((rv = sfifo_test_fifo_segment_fifo_grow (verbose)))
            return -1;
-         if ((rv = sfifo_test_fifo_segment_fifo_shrink (verbose)))
-           return -1;
          if ((rv = sfifo_test_fifo_segment_prealloc (verbose)))
            return -1;
          /* Pretty slow so avoid running it always
@@ -2735,6 +2655,8 @@ svm_fifo_test (vlib_main_t * vm, unformat_input_t * input,
        res = sfifo_test_fifo_grow (vm, input);
       else if (unformat (input, "shrink"))
        res = sfifo_test_fifo_shrink (vm, input);
+      else if (unformat (input, "indirect"))
+       res = sfifo_test_fifo_indirect (vm, input);
       else if (unformat (input, "segment"))
        res = sfifo_test_fifo_segment (vm, input);
       else if (unformat (input, "all"))
@@ -2796,6 +2718,9 @@ svm_fifo_test (vlib_main_t * vm, unformat_input_t * input,
          if ((res = sfifo_test_fifo_shrink (vm, input)))
            goto done;
 
+         if ((res = sfifo_test_fifo_indirect (vm, input)))
+           goto done;
+
          str = "all";
          unformat_init_cstring (input, str);
          if ((res = sfifo_test_fifo_segment (vm, input)))
index 7664014..2cb7694 100644 (file)
@@ -28,6 +28,7 @@ add_vpp_library(svm
 
   INSTALL_HEADERS
   fifo_segment.h
+  fifo_types.h
   message_queue.h
   queue.h
   ssvm.h
index bc46aba..49cd116 100644 (file)
@@ -127,6 +127,7 @@ fifo_segment_init (fifo_segment_t * fs)
     {
       fss = fsh_slice_get (fsh, i);
       vec_validate_init_empty (fss->free_chunks, max_chunk_sz, 0);
+      clib_spinlock_init (&fss->chunk_lock);
     }
 
   ssvm_pop_heap (oldheap);
@@ -243,6 +244,8 @@ fifo_segment_main_init (fifo_segment_main_t * sm, u64 baseva,
 static inline u32
 fs_freelist_for_size (u32 size)
 {
+  if (PREDICT_FALSE (size < FIFO_SEGMENT_MIN_FIFO_SIZE))
+    return 0;
   return max_log2 (size) - FIFO_SEGMENT_MIN_LOG2_FIFO_SIZE;
 }
 
@@ -278,9 +281,8 @@ fs_try_alloc_fifo_freelist (fifo_segment_slice_t * fss,
 
   fss->free_fifos = f->next;
   fss->free_chunks[fl_index] = c->next;
-  c->next = c;
+  c->next = 0;
   c->start_byte = 0;
-  c->length = data_bytes;
   memset (f, 0, sizeof (*f));
   f->start_chunk = c;
   f->end_chunk = c;
@@ -331,7 +333,6 @@ fs_try_alloc_fifo_freelist_multi_chunk (fifo_segment_header_t * fsh,
          c->next = first;
          first = c;
          n_alloc += fl_size;
-         c->length = clib_min (fl_size, data_bytes);
          data_bytes -= c->length;
        }
       else
@@ -370,7 +371,6 @@ fs_try_alloc_fifo_freelist_multi_chunk (fifo_segment_header_t * fsh,
 
   f->start_chunk = first;
   f->end_chunk = last;
-  last->next = first;
   fss->n_fl_chunk_bytes -= n_alloc;
   return f;
 }
@@ -412,7 +412,8 @@ fs_try_alloc_fifo_batch (fifo_segment_header_t * fsh,
       c = (svm_fifo_chunk_t *) (fmem + sizeof (*f));
       c->start_byte = 0;
       c->length = rounded_data_size;
-      c->rb_index = RBTREE_TNIL_INDEX;
+      c->enq_rb_index = RBTREE_TNIL_INDEX;
+      c->deq_rb_index = RBTREE_TNIL_INDEX;
       c->next = fss->free_chunks[fl_index];
       fss->free_chunks[fl_index] = c;
       fmem += hdrs + rounded_data_size;
@@ -466,7 +467,7 @@ fs_try_alloc_fifo (fifo_segment_header_t * fsh, fifo_segment_slice_t * fss,
   if (fifo_sz <= n_free_bytes)
     {
       void *oldheap = ssvm_push_heap (fsh->ssvm_sh);
-      f = svm_fifo_create (data_bytes);
+      f = svm_fifo_alloc (data_bytes);
       ssvm_pop_heap (oldheap);
       if (f)
        {
@@ -479,9 +480,87 @@ fs_try_alloc_fifo (fifo_segment_header_t * fsh, fifo_segment_slice_t * fss,
 
 done:
 
+  if (f)
+    f->fs_hdr = fsh;
   return f;
 }
 
+svm_fifo_chunk_t *
+fsh_alloc_chunk (fifo_segment_header_t * fsh, u32 slice_index, u32 chunk_size)
+{
+  fifo_segment_slice_t *fss;
+  svm_fifo_chunk_t *c;
+  void *oldheap;
+  int fl_index;
+
+  fl_index = fs_freelist_for_size (chunk_size);
+  fss = fsh_slice_get (fsh, slice_index);
+
+  clib_spinlock_lock (&fss->chunk_lock);
+  c = fss->free_chunks[fl_index];
+
+  if (!c)
+    {
+      fsh_check_mem (fsh);
+      chunk_size = fs_freelist_index_to_size (fl_index);
+      if (fsh_n_free_bytes (fsh) < chunk_size)
+       goto done;
+
+      oldheap = ssvm_push_heap (fsh->ssvm_sh);
+      c = svm_fifo_chunk_alloc (chunk_size);
+      ssvm_pop_heap (oldheap);
+
+      if (!c)
+       goto done;
+
+      fsh_free_bytes_sub (fsh, chunk_size + sizeof (*c));
+    }
+  else
+    {
+      fss->free_chunks[fl_index] = c->next;
+      c->next = 0;
+      fss->n_fl_chunk_bytes -= fs_freelist_index_to_size (fl_index);
+    }
+
+done:
+
+  clib_spinlock_unlock (&fss->chunk_lock);
+
+  return c;
+}
+
+static void
+fsh_slice_collect_chunks (fifo_segment_slice_t * fss, svm_fifo_chunk_t * cur)
+{
+  svm_fifo_chunk_t *next;
+  int fl_index;
+
+  clib_spinlock_lock (&fss->chunk_lock);
+
+  while (cur)
+    {
+      next = cur->next;
+      fl_index = fs_freelist_for_size (cur->length);
+      cur->next = fss->free_chunks[fl_index];
+      cur->enq_rb_index = RBTREE_TNIL_INDEX;
+      cur->deq_rb_index = RBTREE_TNIL_INDEX;
+      fss->free_chunks[fl_index] = cur;
+      fss->n_fl_chunk_bytes += fs_freelist_index_to_size (fl_index);
+      cur = next;
+    }
+
+  clib_spinlock_unlock (&fss->chunk_lock);
+}
+
+void
+fsh_collect_chunks (fifo_segment_header_t * fsh, u32 slice_index,
+                   svm_fifo_chunk_t * cur)
+{
+  fifo_segment_slice_t *fss;
+  fss = fsh_slice_get (fsh, slice_index);
+  fsh_slice_collect_chunks (fss, cur);
+}
+
 /**
  * Allocate fifo in fifo segment
  */
@@ -502,13 +581,8 @@ fifo_segment_alloc_fifo_w_slice (fifo_segment_t * fs, u32 slice_index,
 
   f->slice_index = slice_index;
 
-  /* (re)initialize the fifo, as in svm_fifo_create */
   svm_fifo_init (f, data_bytes);
 
-  /* Initialize chunks and rbtree for multi-chunk fifos */
-  if (f->start_chunk->next != f->start_chunk)
-    svm_fifo_init_chunks (f);
-
   /* If rx fifo type add to active fifos list. When cleaning up segment,
    * we need a list of active sessions that should be disconnected. Since
    * both rx and tx fifos keep pointers to the session, it's enough to track
@@ -522,7 +596,14 @@ fifo_segment_alloc_fifo_w_slice (fifo_segment_t * fs, u32 slice_index,
        }
       fss->fifos = f;
       f->flags |= SVM_FIFO_F_LL_TRACKED;
+
+      svm_fifo_init_ooo_lookup (f, 0 /* ooo enq */ );
+    }
+  else
+    {
+      svm_fifo_init_ooo_lookup (f, 1 /* ooo deq */ );
     }
+
   fsh_active_fifos_update (fsh, 1);
 
 done:
@@ -536,9 +617,7 @@ void
 fifo_segment_free_fifo (fifo_segment_t * fs, svm_fifo_t * f)
 {
   fifo_segment_header_t *fsh = fs->h;
-  svm_fifo_chunk_t *cur, *next;
   fifo_segment_slice_t *fss;
-  int fl_index;
 
   ASSERT (f->refcnt > 0);
 
@@ -565,26 +644,13 @@ fifo_segment_free_fifo (fifo_segment_t * fs, svm_fifo_t * f)
   fss->free_fifos = f;
 
   /* Free fifo chunks */
-  cur = f->start_chunk;
-  do
-    {
-      next = cur->next;
-      fl_index = fs_freelist_for_size (cur->length);
-      ASSERT (fl_index < vec_len (fss->free_chunks));
-      cur->next = fss->free_chunks[fl_index];
-      cur->rb_index = RBTREE_TNIL_INDEX;
-      fss->free_chunks[fl_index] = cur;
-      fss->n_fl_chunk_bytes += fs_freelist_index_to_size (fl_index);
-      cur = next;
-    }
-  while (cur != f->start_chunk);
+  fsh_slice_collect_chunks (fss, f->start_chunk);
 
-  f->start_chunk = f->end_chunk = f->new_chunks = 0;
+  f->start_chunk = f->end_chunk = 0;
   f->head_chunk = f->tail_chunk = f->ooo_enq = f->ooo_deq = 0;
 
-  svm_fifo_free_chunk_lookup (f);
-
   /* not allocated on segment heap */
+  svm_fifo_free_chunk_lookup (f);
   svm_fifo_free_ooo_data (f);
 
   if (CLIB_DEBUG)
@@ -751,71 +817,6 @@ fifo_segment_preallocate_fifo_pairs (fifo_segment_t * fs,
     }
 }
 
-int
-fifo_segment_grow_fifo (fifo_segment_t * fs, svm_fifo_t * f, u32 chunk_size)
-{
-  fifo_segment_header_t *fsh = fs->h;
-  fifo_segment_slice_t *fss;
-  svm_fifo_chunk_t *c;
-  void *oldheap;
-  int fl_index;
-
-  fl_index = fs_freelist_for_size (chunk_size);
-  fss = fsh_slice_get (fsh, f->slice_index);
-
-  c = fss->free_chunks[fl_index];
-
-  if (!c)
-    {
-      fsh_check_mem (fsh);
-      if (fsh_n_free_bytes (fsh) < chunk_size)
-       return -1;
-
-      oldheap = ssvm_push_heap (fsh->ssvm_sh);
-      c = svm_fifo_chunk_alloc (chunk_size);
-      ssvm_pop_heap (oldheap);
-
-      if (!c)
-       return -1;
-
-      fsh_free_bytes_sub (fsh, chunk_size + sizeof (*c));
-    }
-  else
-    {
-      fss->free_chunks[fl_index] = c->next;
-      c->next = 0;
-      fss->n_fl_chunk_bytes -= fs_freelist_index_to_size (fl_index);
-    }
-
-  svm_fifo_add_chunk (f, c);
-
-  return 0;
-}
-
-int
-fifo_segment_collect_fifo_chunks (fifo_segment_t * fs, svm_fifo_t * f)
-{
-  fifo_segment_header_t *fsh = fs->h;
-  svm_fifo_chunk_t *cur, *next;
-  fifo_segment_slice_t *fss;
-  int fl_index;
-
-  cur = svm_fifo_collect_chunks (f);
-
-  fss = fsh_slice_get (fsh, f->slice_index);
-
-  while (cur)
-    {
-      next = cur->next;
-      fl_index = fs_freelist_for_size (cur->length);
-      cur->next = fss->free_chunks[fl_index];
-      fss->free_chunks[fl_index] = cur;
-      cur = next;
-    }
-
-  return 0;
-}
-
 /**
  * Get number of active fifos
  */
index 02d45d3..8554806 100644 (file)
@@ -16,6 +16,7 @@
 #define __included_fifo_segment_h__
 
 #include <svm/ssvm.h>
+#include <svm/fifo_types.h>
 #include <svm/svm_fifo.h>
 
 typedef enum
@@ -38,26 +39,6 @@ typedef enum fifo_segment_flags_
   FIFO_SEGMENT_F_MEM_LIMIT = 1 << 2,
 } fifo_segment_flags_t;
 
-typedef struct fifo_segment_slice_
-{
-  svm_fifo_t *fifos;                   /**< Linked list of active RX fifos */
-  svm_fifo_t *free_fifos;              /**< Freelists by fifo size  */
-  svm_fifo_chunk_t **free_chunks;      /**< Freelists by chunk size */
-  uword n_fl_chunk_bytes;              /**< Chunk bytes on freelist */
-} fifo_segment_slice_t;
-
-typedef struct
-{
-  fifo_segment_slice_t *slices;                /** Fixed array of slices */
-  ssvm_shared_header_t *ssvm_sh;       /**< Pointer to fs ssvm shared hdr */
-  uword n_free_bytes;                  /**< Segment free bytes */
-  u32 n_active_fifos;                  /**< Number of active fifos */
-  u32 n_reserved_bytes;                        /**< Bytes not to be allocated */
-  u32 max_log2_chunk_size;             /**< Max log2(chunk size) for fs */
-  u8 flags;                            /**< Segment flags */
-  u8 n_slices;                         /**< Number of slices */
-} fifo_segment_header_t;
-
 typedef struct
 {
   ssvm_private_t ssvm;         /**< ssvm segment data */
@@ -158,25 +139,12 @@ void fifo_segment_preallocate_fifo_pairs (fifo_segment_t * fs,
                                          u32 rx_fifo_size,
                                          u32 tx_fifo_size,
                                          u32 * n_fifo_pairs);
-/**
- * Grow fifo size by adding an additional chunk of memory
- *
- * @param fs           fifo segment for fifo
- * @param f            fifo to be grown
- * @param chunk_size   number of bytes to be added to fifo
- * @return             0 on success or a negative number otherwise
- */
-int fifo_segment_grow_fifo (fifo_segment_t * fs, svm_fifo_t * f,
-                           u32 chunk_size);
 
-/**
- * Collect unused chunks for fifo
- *
- * @param fs           fifo segment for fifo
- * @param f            fifo whose chunks are to be collected
- * @return             0 on success, error otherwise
- */
-int fifo_segment_collect_fifo_chunks (fifo_segment_t * fs, svm_fifo_t * f);
+svm_fifo_chunk_t *fsh_alloc_chunk (fifo_segment_header_t * fsh,
+                                  u32 slice_index, u32 chunk_size);
+
+void fsh_collect_chunks (fifo_segment_header_t * fsh, u32 slice_index,
+                        svm_fifo_chunk_t * cur);
 
 /**
  * Fifo segment estimate of number of free bytes
diff --git a/src/svm/fifo_types.h b/src/svm/fifo_types.h
new file mode 100644 (file)
index 0000000..3e6a14e
--- /dev/null
@@ -0,0 +1,130 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_SVM_FIFO_TYPES_H_
+#define SRC_SVM_FIFO_TYPES_H_
+
+#include <svm/ssvm.h>
+#include <vppinfra/clib.h>
+#include <vppinfra/rbtree.h>
+
+#define SVM_FIFO_TRACE                         (0)
+#define SVM_FIFO_MAX_EVT_SUBSCRIBERS   7
+
+typedef struct fifo_segment_header_ fifo_segment_header_t;
+
+typedef struct svm_fifo_chunk_
+{
+  u32 start_byte;              /**< chunk start byte */
+  u32 length;                  /**< length of chunk in bytes */
+  struct svm_fifo_chunk_ *next;        /**< pointer to next chunk in linked-lists */
+  rb_node_index_t enq_rb_index;        /**< enq node index if chunk in rbtree */
+  rb_node_index_t deq_rb_index;        /**< deq node index if chunk in rbtree */
+  u8 data[0];                  /**< start of chunk data */
+} svm_fifo_chunk_t;
+
+typedef struct
+{
+  u32 next;    /**< Next linked-list element pool index */
+  u32 prev;    /**< Previous linked-list element pool index */
+  u32 start;   /**< Start of segment, normalized*/
+  u32 length;  /**< Length of segment */
+} ooo_segment_t;
+
+typedef struct
+{
+  u32 offset;
+  u32 len;
+  u32 action;
+} svm_fifo_trace_elem_t;
+
+typedef struct _svm_fifo
+{
+  CLIB_CACHE_LINE_ALIGN_MARK (shared_first);
+  fifo_segment_header_t *fs_hdr;/**< fifo segment header for fifo */
+  svm_fifo_chunk_t *start_chunk;/**< first chunk in fifo chunk list */
+  svm_fifo_chunk_t *end_chunk; /**< end chunk in fifo chunk list */
+  u32 min_alloc;               /**< min chunk alloc if space available */
+  u32 size;                    /**< size of the fifo in bytes */
+  u8 flags;                    /**< fifo flags */
+  u8 slice_index;              /**< segment slice for fifo */
+
+    CLIB_CACHE_LINE_ALIGN_MARK (shared_second);
+  volatile u32 has_event;      /**< non-zero if deq event exists */
+  u32 master_session_index;    /**< session layer session index */
+  u32 client_session_index;    /**< app session index */
+  u8 master_thread_index;      /**< session layer thread index */
+  u8 client_thread_index;      /**< app worker index */
+  i8 refcnt;                   /**< reference count  */
+  u32 segment_manager;         /**< session layer segment manager index */
+  u32 segment_index;           /**< segment index in segment manager */
+  struct _svm_fifo *next;      /**< next in freelist/active chain */
+  struct _svm_fifo *prev;      /**< prev in active chain */
+
+    CLIB_CACHE_LINE_ALIGN_MARK (consumer);
+  rb_tree_t ooo_deq_lookup;    /**< rbtree for ooo deq chunk lookup */
+  svm_fifo_chunk_t *head_chunk;        /**< tracks chunk where head lands */
+  svm_fifo_chunk_t *ooo_deq;   /**< last chunk used for ooo dequeue */
+  u32 head;                    /**< fifo head position/byte */
+  volatile u32 want_deq_ntf;   /**< producer wants nudge */
+  volatile u32 has_deq_ntf;
+
+    CLIB_CACHE_LINE_ALIGN_MARK (producer);
+  rb_tree_t ooo_enq_lookup;    /**< rbtree for ooo enq chunk lookup */
+  u32 tail;                    /**< fifo tail position/byte */
+  u32 ooos_list_head;          /**< Head of out-of-order linked-list */
+  svm_fifo_chunk_t *tail_chunk;        /**< tracks chunk where tail lands */
+  svm_fifo_chunk_t *ooo_enq;   /**< last chunk used for ooo enqueue */
+  ooo_segment_t *ooo_segments; /**< Pool of ooo segments */
+  u32 ooos_newest;             /**< Last segment to have been updated */
+  volatile u8 n_subscribers;   /**< Number of subscribers for io events */
+  u8 subscribers[SVM_FIFO_MAX_EVT_SUBSCRIBERS];
+
+#if SVM_FIFO_TRACE
+  svm_fifo_trace_elem_t *trace;
+#endif
+
+} svm_fifo_t;
+
+typedef struct fifo_segment_slice_
+{
+  svm_fifo_t *fifos;                   /**< Linked list of active RX fifos */
+  svm_fifo_t *free_fifos;              /**< Freelists by fifo size  */
+  svm_fifo_chunk_t **free_chunks;      /**< Freelists by chunk size */
+  uword n_fl_chunk_bytes;              /**< Chunk bytes on freelist */
+  clib_spinlock_t chunk_lock;
+} fifo_segment_slice_t;
+
+struct fifo_segment_header_
+{
+  fifo_segment_slice_t *slices;                /** Fixed array of slices */
+  ssvm_shared_header_t *ssvm_sh;       /**< Pointer to fs ssvm shared hdr */
+  uword n_free_bytes;                  /**< Segment free bytes */
+  u32 n_active_fifos;                  /**< Number of active fifos */
+  u32 n_reserved_bytes;                        /**< Bytes not to be allocated */
+  u32 max_log2_chunk_size;             /**< Max log2(chunk size) for fs */
+  u8 flags;                            /**< Segment flags */
+  u8 n_slices;                         /**< Number of slices */
+};
+
+#endif /* SRC_SVM_FIFO_TYPES_H_ */
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
index 3df6713..0168c08 100644 (file)
@@ -18,6 +18,7 @@
  */
 
 #include <svm/svm_fifo.h>
+#include <svm/fifo_segment.h>
 #include <vppinfra/cpu.h>
 
 CLIB_MARCH_FN (svm_fifo_copy_to_chunk, void, svm_fifo_t * f,
@@ -26,7 +27,8 @@ CLIB_MARCH_FN (svm_fifo_copy_to_chunk, void, svm_fifo_t * f,
 {
   u32 n_chunk;
 
-  ASSERT (tail_idx >= c->start_byte && tail_idx < c->start_byte + c->length);
+  ASSERT (f_pos_geq (tail_idx, c->start_byte)
+         && f_pos_lt (tail_idx, c->start_byte + c->length));
 
   tail_idx -= c->start_byte;
   n_chunk = c->length - tail_idx;
@@ -56,7 +58,8 @@ CLIB_MARCH_FN (svm_fifo_copy_from_chunk, void, svm_fifo_t * f,
 {
   u32 n_chunk;
 
-  ASSERT (head_idx >= c->start_byte && head_idx < c->start_byte + c->length);
+  ASSERT (f_pos_geq (head_idx, c->start_byte)
+         && f_pos_lt (head_idx, c->start_byte + c->length));
 
   head_idx -= c->start_byte;
   n_chunk = c->length - head_idx;
@@ -98,34 +101,10 @@ svm_fifo_copy_from_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c, u32 head_idx,
                                                   last);
 }
 
-static inline u8
-position_lt (svm_fifo_t * f, u32 a, u32 b, u32 tail)
-{
-  return (f_distance_to (f, a, tail) < f_distance_to (f, b, tail));
-}
-
-static inline u8
-position_leq (svm_fifo_t * f, u32 a, u32 b, u32 tail)
-{
-  return (f_distance_to (f, a, tail) <= f_distance_to (f, b, tail));
-}
-
-static inline u8
-position_gt (svm_fifo_t * f, u32 a, u32 b, u32 tail)
-{
-  return (f_distance_to (f, a, tail) > f_distance_to (f, b, tail));
-}
-
-static inline u32
-position_diff (svm_fifo_t * f, u32 a, u32 b, u32 tail)
-{
-  return f_distance_to (f, a, tail) - f_distance_to (f, b, tail);
-}
-
 static inline u32
-ooo_segment_end_pos (svm_fifo_t * f, ooo_segment_t * s)
+ooo_segment_end_pos (ooo_segment_t * s)
 {
-  return (s->start + s->length) % f->size;
+  return (s->start + s->length);
 }
 
 void
@@ -200,10 +179,10 @@ ooo_segment_add (svm_fifo_t * f, u32 offset, u32 head, u32 tail, u32 length)
   u32 new_index, s_end_pos, s_index;
   u32 offset_pos, offset_end_pos;
 
-  ASSERT (offset + length <= f_distance_to (f, head, tail) || head == tail);
+  ASSERT (offset + length <= f_free_count (f, head, tail));
 
-  offset_pos = (tail + offset) % f->size;
-  offset_end_pos = (tail + offset + length) % f->size;
+  offset_pos = tail + offset;
+  offset_end_pos = tail + offset + length;
 
   f->ooos_newest = OOO_SEGMENT_INVALID_INDEX;
 
@@ -218,28 +197,27 @@ ooo_segment_add (svm_fifo_t * f, u32 offset, u32 head, u32 tail, u32 length)
   /* Find first segment that starts after new segment */
   s = pool_elt_at_index (f->ooo_segments, f->ooos_list_head);
   while (s->next != OOO_SEGMENT_INVALID_INDEX
-        && position_lt (f, s->start, offset_pos, tail))
+        && f_pos_lt (s->start, offset_pos))
     s = pool_elt_at_index (f->ooo_segments, s->next);
 
   /* If we have a previous and we overlap it, use it as starting point */
   prev = ooo_segment_prev (f, s);
-  if (prev
-      && position_leq (f, offset_pos, ooo_segment_end_pos (f, prev), tail))
+  if (prev && f_pos_leq (offset_pos, ooo_segment_end_pos (prev)))
     {
       s = prev;
-      s_end_pos = ooo_segment_end_pos (f, s);
+      s_end_pos = ooo_segment_end_pos (s);
 
       /* Since we have previous, offset start position cannot be smaller
        * than prev->start. Check tail */
-      ASSERT (position_lt (f, s->start, offset_pos, tail));
+      ASSERT (f_pos_lt (s->start, offset_pos));
       goto check_tail;
     }
 
   s_index = s - f->ooo_segments;
-  s_end_pos = ooo_segment_end_pos (f, s);
+  s_end_pos = ooo_segment_end_pos (s);
 
   /* No overlap, add before current segment */
-  if (position_lt (f, offset_end_pos, s->start, tail))
+  if (f_pos_lt (offset_end_pos, s->start))
     {
       new_s = ooo_segment_alloc (f, offset_pos, length);
       new_index = new_s - f->ooo_segments;
@@ -264,7 +242,7 @@ ooo_segment_add (svm_fifo_t * f, u32 offset, u32 head, u32 tail, u32 length)
       return;
     }
   /* No overlap, add after current segment */
-  else if (position_gt (f, offset_pos, s_end_pos, tail))
+  else if (f_pos_gt (offset_pos, s_end_pos))
     {
       new_s = ooo_segment_alloc (f, offset_pos, length);
       new_index = new_s - f->ooo_segments;
@@ -287,24 +265,23 @@ ooo_segment_add (svm_fifo_t * f, u32 offset, u32 head, u32 tail, u32 length)
    */
 
   /* Merge at head */
-  if (position_lt (f, offset_pos, s->start, tail))
+  if (f_pos_lt (offset_pos, s->start))
     {
       s->start = offset_pos;
-      s->length = position_diff (f, s_end_pos, s->start, tail);
+      s->length = s_end_pos - s->start;
       f->ooos_newest = s - f->ooo_segments;
     }
 
 check_tail:
 
   /* Overlapping tail */
-  if (position_gt (f, offset_end_pos, s_end_pos, tail))
+  if (f_pos_gt (offset_end_pos, s_end_pos))
     {
-      s->length = position_diff (f, offset_end_pos, s->start, tail);
+      s->length = offset_end_pos - s->start;
 
       /* Remove the completely overlapped segments in the tail */
       it = ooo_segment_next (f, s);
-      while (it && position_leq (f, ooo_segment_end_pos (f, it),
-                                offset_end_pos, tail))
+      while (it && f_pos_leq (ooo_segment_end_pos (it), offset_end_pos))
        {
          next = ooo_segment_next (f, it);
          ooo_segment_free (f, it - f->ooo_segments);
@@ -312,10 +289,9 @@ check_tail:
        }
 
       /* If partial overlap with last, merge */
-      if (it && position_leq (f, it->start, offset_end_pos, tail))
+      if (it && f_pos_leq (it->start, offset_end_pos))
        {
-         s->length = position_diff (f, ooo_segment_end_pos (f, it),
-                                    s->start, tail);
+         s->length = ooo_segment_end_pos (it) - s->start;
          ooo_segment_free (f, it - f->ooo_segments);
        }
       f->ooos_newest = s - f->ooo_segments;
@@ -334,7 +310,7 @@ ooo_segment_try_collect (svm_fifo_t * f, u32 n_bytes_enqueued, u32 * tail)
   i32 diff;
 
   s = pool_elt_at_index (f->ooo_segments, f->ooos_list_head);
-  diff = f_distance_from (f, s->start, *tail);
+  diff = *tail - s->start;
 
   ASSERT (diff != n_bytes_enqueued);
 
@@ -350,7 +326,7 @@ ooo_segment_try_collect (svm_fifo_t * f, u32 n_bytes_enqueued, u32 * tail)
       if (s->length > diff)
        {
          bytes = s->length - diff;
-         *tail = (*tail + bytes) % f->size;
+         *tail = *tail + bytes;
          ooo_segment_free (f, s_index);
          break;
        }
@@ -359,7 +335,7 @@ ooo_segment_try_collect (svm_fifo_t * f, u32 n_bytes_enqueued, u32 * tail)
       if (s->next != OOO_SEGMENT_INVALID_INDEX)
        {
          s = pool_elt_at_index (f->ooo_segments, s->next);
-         diff = f_distance_from (f, s->start, *tail);
+         diff = *tail - s->start;
          ooo_segment_free (f, s_index);
        }
       /* End of search */
@@ -370,11 +346,11 @@ ooo_segment_try_collect (svm_fifo_t * f, u32 n_bytes_enqueued, u32 * tail)
        }
     }
 
-  ASSERT (bytes <= f->nitems);
+  ASSERT (bytes <= f->size);
   return bytes;
 }
 
-static ooo_segment_t *
+__clib_unused static ooo_segment_t *
 ooo_segment_last (svm_fifo_t * f)
 {
   ooo_segment_t *s;
@@ -391,36 +367,30 @@ ooo_segment_last (svm_fifo_t * f)
 void
 svm_fifo_init (svm_fifo_t * f, u32 size)
 {
+  svm_fifo_chunk_t *c, *prev;
+  u32 first_chunk, min_alloc;
+
   f->size = size;
-  /*
-   * usable size of the fifo set to rounded_data_size - 1
-   * to differentiate between free fifo and empty fifo.
-   */
-  f->nitems = f->size - 1;
   f->ooos_list_head = OOO_SEGMENT_INVALID_INDEX;
   f->segment_index = SVM_FIFO_INVALID_INDEX;
   f->refcnt = 1;
   f->head = f->tail = f->flags = 0;
-  f->head_chunk = f->tail_chunk = f->ooo_enq = f->ooo_deq = f->start_chunk;
-}
-
-void
-svm_fifo_init_chunks (svm_fifo_t * f)
-{
-  svm_fifo_chunk_t *c, *prev;
+  f->head_chunk = f->tail_chunk = f->start_chunk;
+  f->ooo_deq = f->ooo_enq = 0;
 
-  if (f->start_chunk->next == f->start_chunk)
-    return;
-
-  f->flags |= SVM_FIFO_F_MULTI_CHUNK;
-  rb_tree_init (&f->ooo_enq_lookup);
-  rb_tree_init (&f->ooo_deq_lookup);
+  first_chunk = f->start_chunk->length;
+  min_alloc = first_chunk > 16 << 10 ? first_chunk >> 2 : 4096;
+  min_alloc = clib_min (first_chunk, 64 << 10);
+  f->min_alloc = min_alloc;
 
+  /*
+   * Initialize chunks
+   */
   f->start_chunk->start_byte = 0;
   prev = f->start_chunk;
   c = prev->next;
 
-  while (c != f->start_chunk)
+  while (c)
     {
       c->start_byte = prev->start_byte + prev->length;
       prev = c;
@@ -428,11 +398,26 @@ svm_fifo_init_chunks (svm_fifo_t * f)
     }
 }
 
+void
+svm_fifo_init_ooo_lookup (svm_fifo_t * f, u8 ooo_type)
+{
+  if (ooo_type == 0)
+    {
+      ASSERT (!rb_tree_is_init (&f->ooo_enq_lookup));
+      rb_tree_init (&f->ooo_enq_lookup);
+    }
+  else
+    {
+      ASSERT (!rb_tree_is_init (&f->ooo_deq_lookup));
+      rb_tree_init (&f->ooo_deq_lookup);
+    }
+}
+
 /**
  * Creates a fifo in the current heap. Fails vs blow up the process
  */
 svm_fifo_t *
-svm_fifo_create (u32 data_size_in_bytes)
+svm_fifo_alloc (u32 data_size_in_bytes)
 {
   u32 rounded_data_size;
   svm_fifo_chunk_t *c;
@@ -454,13 +439,13 @@ svm_fifo_create (u32 data_size_in_bytes)
       return 0;
     }
 
-  c->next = c;
+  clib_memset (c, 0, sizeof (*c));
   c->start_byte = 0;
   c->length = data_size_in_bytes;
-  c->rb_index = RBTREE_TNIL_INDEX;
+  c->enq_rb_index = RBTREE_TNIL_INDEX;
+  c->deq_rb_index = RBTREE_TNIL_INDEX;
   f->start_chunk = f->end_chunk = c;
 
-  svm_fifo_init (f, data_size_in_bytes);
   return f;
 }
 
@@ -485,14 +470,73 @@ svm_fifo_chunk_alloc (u32 size)
   return c;
 }
 
-static inline u8
-svm_fifo_chunk_includes_pos (svm_fifo_chunk_t * c, u32 pos)
+/**
+ * Find chunk for given byte position
+ *
+ * @param f    fifo
+ * @param pos  normalized position in fifo
+ *
+ * @return chunk that includes given position or 0
+ */
+static svm_fifo_chunk_t *
+svm_fifo_find_chunk (svm_fifo_t * f, u32 pos)
+{
+  svm_fifo_chunk_t *c;
+
+  c = f->start_chunk;
+  while (c && !f_chunk_includes_pos (c, pos))
+    c = c->next;
+
+  return c;
+}
+
+static svm_fifo_chunk_t *
+svm_fifo_find_next_chunk (svm_fifo_t * f, svm_fifo_chunk_t * start, u32 pos)
+{
+  svm_fifo_chunk_t *c;
+
+  ASSERT (start != 0);
+
+  c = start;
+  while (c && !f_chunk_includes_pos (c, pos))
+    c = c->next;
+
+  return c;
+}
+
+u32
+svm_fifo_max_read_chunk (svm_fifo_t * f)
+{
+  u32 head, tail, end_chunk;
+
+  f_load_head_tail_cons (f, &head, &tail);
+  ASSERT (!f->head_chunk || f_chunk_includes_pos (f->head_chunk, head));
+
+  if (!f->head_chunk)
+    {
+      f->head_chunk = svm_fifo_find_chunk (f, head);
+      if (PREDICT_FALSE (!f->head_chunk))
+       return 0;
+    }
+
+  end_chunk = f_chunk_end (f->head_chunk);
+
+  return f_pos_lt (end_chunk, tail) ? end_chunk - head : tail - head;
+}
+
+u32
+svm_fifo_max_write_chunk (svm_fifo_t * f)
 {
-  return (pos >= c->start_byte && pos < c->start_byte + c->length);
+  u32 head, tail;
+
+  f_load_head_tail_prod (f, &head, &tail);
+  ASSERT (!f->tail_chunk || f_chunk_includes_pos (f->tail_chunk, tail));
+
+  return f->tail_chunk ? f_chunk_end (f->tail_chunk) - tail : 0;
 }
 
 static rb_node_t *
-svm_fifo_find_node_rbtree (rb_tree_t * rt, u32 pos)
+f_find_node_rbtree (rb_tree_t * rt, u32 pos)
 {
   rb_node_t *cur, *prev;
 
@@ -503,7 +547,7 @@ svm_fifo_find_node_rbtree (rb_tree_t * rt, u32 pos)
   while (pos != cur->key)
     {
       prev = cur;
-      if (pos < cur->key)
+      if (f_pos_lt (pos, cur->key))
        {
          cur = rb_node_left (rt, cur);
          if (rb_node_is_tnil (rt, cur))
@@ -530,501 +574,206 @@ svm_fifo_find_node_rbtree (rb_tree_t * rt, u32 pos)
 }
 
 static svm_fifo_chunk_t *
-svm_fifo_find_chunk_rbtree (rb_tree_t * rt, u32 pos)
+f_find_chunk_rbtree (rb_tree_t * rt, u32 pos)
 {
   svm_fifo_chunk_t *c;
   rb_node_t *n;
 
-  n = svm_fifo_find_node_rbtree (rt, pos);
+  if (!rb_tree_is_init (rt))
+    return 0;
+
+  n = f_find_node_rbtree (rt, pos);
   if (!n)
     return 0;
   c = uword_to_pointer (n->opaque, svm_fifo_chunk_t *);
-  if (svm_fifo_chunk_includes_pos (c, pos))
+  if (f_chunk_includes_pos (c, pos))
     return c;
 
   return 0;
 }
 
-/**
- * Find chunk for given byte position
- *
- * @param f    fifo
- * @param pos  normalized position in fifo
- *
- * @return chunk that includes given position or 0
- */
-static svm_fifo_chunk_t *
-svm_fifo_find_chunk (svm_fifo_t * f, u32 pos)
-{
-  svm_fifo_chunk_t *c;
-
-  c = f->start_chunk;
-  do
-    {
-      if (svm_fifo_chunk_includes_pos (c, pos))
-       return c;
-      c = c->next;
-    }
-  while (c != f->start_chunk);
-
-  return 0;
-}
-
 static void
-svm_fifo_update_ooo_enq (svm_fifo_t * f, u32 ref_pos, u32 start_pos,
-                        u32 end_pos)
+f_update_ooo_enq (svm_fifo_t * f, u32 start_pos, u32 end_pos)
 {
   rb_tree_t *rt = &f->ooo_enq_lookup;
   svm_fifo_chunk_t *c;
   rb_node_t *cur;
 
-  if (svm_fifo_chunk_includes_pos (f->ooo_enq, start_pos)
-      && svm_fifo_chunk_includes_pos (f->ooo_enq, end_pos)
-      && ref_pos < start_pos)
-    return;
+  /* Use linear search if rbtree is not initialized */
+  if (PREDICT_FALSE (!rb_tree_is_init (rt)))
+    {
+      f->ooo_enq = svm_fifo_find_next_chunk (f, f->tail_chunk, start_pos);
+      return;
+    }
 
   if (rt->root == RBTREE_TNIL_INDEX)
     {
       c = f->tail_chunk;
-      c->rb_index = rb_tree_add2 (rt, c->start_byte, pointer_to_uword (c));
+      ASSERT (c->enq_rb_index == RBTREE_TNIL_INDEX);
+      c->enq_rb_index = rb_tree_add_custom (rt, c->start_byte,
+                                           pointer_to_uword (c), f_pos_lt);
     }
   else
     {
-      cur = svm_fifo_find_node_rbtree (rt, start_pos);
+      cur = f_find_node_rbtree (rt, start_pos);
       c = uword_to_pointer (cur->opaque, svm_fifo_chunk_t *);
-      if (ref_pos > start_pos && c->start_byte > start_pos)
-       {
-         c = f->end_chunk;
-         ASSERT (c->rb_index != RBTREE_TNIL_INDEX);
-       }
+      ASSERT (f_pos_leq (c->start_byte, start_pos));
     }
 
-  if (svm_fifo_chunk_includes_pos (c, start_pos))
+  if (f_chunk_includes_pos (c, start_pos))
     f->ooo_enq = c;
 
-  if (svm_fifo_chunk_includes_pos (c, end_pos) && ref_pos < end_pos)
+  if (f_chunk_includes_pos (c, end_pos))
     return;
 
   do
     {
       c = c->next;
-      if (c->rb_index != RBTREE_TNIL_INDEX)
+      if (!c || c->enq_rb_index != RBTREE_TNIL_INDEX)
        break;
 
-      c->rb_index = rb_tree_add2 (rt, c->start_byte, pointer_to_uword (c));
+      c->enq_rb_index = rb_tree_add_custom (rt, c->start_byte,
+                                           pointer_to_uword (c), f_pos_lt);
 
-      if (svm_fifo_chunk_includes_pos (c, start_pos))
+      if (f_chunk_includes_pos (c, start_pos))
        f->ooo_enq = c;
-
     }
-  while (!svm_fifo_chunk_includes_pos (c, end_pos));
+  while (!f_chunk_includes_pos (c, end_pos));
 }
 
 static void
-svm_fifo_update_ooo_deq (svm_fifo_t * f, u32 ref_pos, u32 start_pos,
-                        u32 end_pos)
+f_update_ooo_deq (svm_fifo_t * f, u32 start_pos, u32 end_pos)
 {
   rb_tree_t *rt = &f->ooo_deq_lookup;
   rb_node_t *cur;
   svm_fifo_chunk_t *c;
 
-  if (svm_fifo_chunk_includes_pos (f->ooo_deq, start_pos)
-      && svm_fifo_chunk_includes_pos (f->ooo_deq, end_pos)
-      && ref_pos < start_pos)
-    return;
+  /* Use linear search if rbtree is not initialized  */
+  if (PREDICT_FALSE (!rb_tree_is_init (rt)))
+    {
+      f->ooo_deq = svm_fifo_find_chunk (f, start_pos);
+      return;
+    }
 
   if (rt->root == RBTREE_TNIL_INDEX)
     {
-      c = f->head_chunk;
-      c->rb_index = rb_tree_add2 (rt, c->start_byte, pointer_to_uword (c));
+      c = f->start_chunk;
+      ASSERT (c->deq_rb_index == RBTREE_TNIL_INDEX);
+      c->deq_rb_index = rb_tree_add_custom (rt, c->start_byte,
+                                           pointer_to_uword (c), f_pos_lt);
     }
   else
     {
-      cur = svm_fifo_find_node_rbtree (rt, start_pos);
+      cur = f_find_node_rbtree (rt, start_pos);
       c = uword_to_pointer (cur->opaque, svm_fifo_chunk_t *);
-      if (ref_pos > start_pos && c->start_byte > start_pos)
-       {
-         c = f->end_chunk;
-         ASSERT (c->rb_index != RBTREE_TNIL_INDEX);
-       }
+      ASSERT (f_pos_leq (c->start_byte, start_pos));
     }
 
-  if (svm_fifo_chunk_includes_pos (c, start_pos))
+  if (f_chunk_includes_pos (c, start_pos))
     f->ooo_deq = c;
 
-  if (svm_fifo_chunk_includes_pos (c, end_pos) && ref_pos < end_pos)
+  if (f_chunk_includes_pos (c, end_pos))
     return;
 
   do
     {
       c = c->next;
-      if (c->rb_index != RBTREE_TNIL_INDEX)
+      if (!c || c->deq_rb_index != RBTREE_TNIL_INDEX)
        break;
 
-      c->rb_index = rb_tree_add2 (rt, c->start_byte, pointer_to_uword (c));
+      c->deq_rb_index = rb_tree_add_custom (rt, c->start_byte,
+                                           pointer_to_uword (c), f_pos_lt);
 
-      if (svm_fifo_chunk_includes_pos (c, start_pos))
+      if (f_chunk_includes_pos (c, start_pos))
        f->ooo_deq = c;
-
     }
-  while (!svm_fifo_chunk_includes_pos (c, end_pos));
+  while (!f_chunk_includes_pos (c, end_pos));
 }
 
-void
-svm_fifo_ooo_deq_track (svm_fifo_t * f, u32 start_pos, u32 end_pos)
+static svm_fifo_chunk_t *
+f_lookup_clear_enq_chunks (svm_fifo_t * f, svm_fifo_chunk_t * start,
+                          u32 end_pos)
 {
-  rb_tree_t *rt = &f->ooo_deq_lookup;
+  rb_tree_t *rt = &f->ooo_enq_lookup;
   svm_fifo_chunk_t *c;
+  rb_node_t *n;
 
-  if (svm_fifo_chunk_includes_pos (f->ooo_deq, end_pos)
-      && start_pos < end_pos)
-    return;
-
-  c = f->ooo_deq->next;
-  do
+  c = start;
+  while (c && !f_chunk_includes_pos (c, end_pos))
     {
-      ASSERT (c->rb_index == RBTREE_TNIL_INDEX);
-      rb_tree_add2 (rt, c->start_byte, pointer_to_uword (c));
+      if (c->enq_rb_index != RBTREE_TNIL_INDEX)
+       {
+         n = rb_node (rt, c->enq_rb_index);
+         rb_tree_del_node (rt, n);
+         c->enq_rb_index = RBTREE_TNIL_INDEX;
+       }
 
       c = c->next;
     }
-  while (!svm_fifo_chunk_includes_pos (c, end_pos));
+
+  /* No ooo segments left, so make sure the current chunk
+   * is not tracked in the enq rbtree */
+  if (f->ooos_list_head == OOO_SEGMENT_INVALID_INDEX
+      && c && c->enq_rb_index != RBTREE_TNIL_INDEX)
+    {
+      n = rb_node (rt, c->enq_rb_index);
+      rb_tree_del_node (rt, n);
+      c->enq_rb_index = RBTREE_TNIL_INDEX;
+    }
+
+  return c;
 }
 
 static svm_fifo_chunk_t *
-svm_fifo_lookup_clear_chunks (svm_fifo_t * f, rb_tree_t * rt,
-                             svm_fifo_chunk_t * start, u32 start_pos,
-                             u32 end_pos)
+f_lookup_clear_deq_chunks (svm_fifo_t * f, svm_fifo_chunk_t * start,
+                          u32 end_pos)
 {
+  rb_tree_t *rt = &f->ooo_deq_lookup;
   svm_fifo_chunk_t *c;
   rb_node_t *n;
 
-  /* Nothing to do if still in the same chunk and not wrapped */
-  if (svm_fifo_chunk_includes_pos (start, end_pos) && start_pos < end_pos)
-    return start;
-
   c = start;
-  do
+  while (c && !f_chunk_includes_pos (c, end_pos))
     {
-      if (c->rb_index == RBTREE_TNIL_INDEX)
+      if (c->deq_rb_index != RBTREE_TNIL_INDEX)
        {
-         c = c->next;
-         continue;
+         n = rb_node (rt, c->deq_rb_index);
+         rb_tree_del_node (rt, n);
+         c->deq_rb_index = RBTREE_TNIL_INDEX;
        }
 
-      n = rb_node (rt, c->rb_index);
-      rb_tree_del_node (rt, n);
-      c->rb_index = RBTREE_TNIL_INDEX;
       c = c->next;
     }
-  while (!svm_fifo_chunk_includes_pos (c, end_pos));
 
   return c;
 }
 
-static inline void
-svm_fifo_grow (svm_fifo_t * f, svm_fifo_chunk_t * c)
-{
-  svm_fifo_chunk_t *prev;
-  u32 add_bytes = 0;
-
-  if (!c)
-    return;
-
-  f->end_chunk->next = c;
-  while (c)
-    {
-      add_bytes += c->length;
-      prev = c;
-      c = c->next;
-    }
-  f->end_chunk = prev;
-  prev->next = f->start_chunk;
-  f->size += add_bytes;
-  f->nitems = f->size - 1;
-  f->new_chunks = 0;
-}
-
-static void
-svm_fifo_try_grow (svm_fifo_t * f, u32 new_head)
-{
-  if (new_head > f->tail)
-    return;
-
-  svm_fifo_grow (f, f->new_chunks);
-  f->flags &= ~SVM_FIFO_F_GROW;
-}
-
 void
 svm_fifo_add_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c)
 {
   svm_fifo_chunk_t *cur, *prev;
 
-  /* Initialize rbtree if needed and add default chunk to it. Expectation is
-   * that this is called with the heap where the rbtree's pool is pushed. */
-  if (!(f->flags & SVM_FIFO_F_MULTI_CHUNK))
-    {
-      ASSERT (f->start_chunk->next == f->start_chunk);
-      rb_tree_init (&f->ooo_enq_lookup);
-      rb_tree_init (&f->ooo_deq_lookup);
-      f->flags |= SVM_FIFO_F_MULTI_CHUNK;
-    }
-
-  /* If fifo is not wrapped, update the size now */
-  if (!svm_fifo_is_wrapped (f))
-    {
-      /* Initialize chunks and add to lookup rbtree */
-      cur = c;
-      if (f->new_chunks)
-       {
-         prev = f->new_chunks;
-         while (prev->next)
-           prev = prev->next;
-         prev->next = c;
-       }
-      else
-       prev = f->end_chunk;
-
-      while (cur)
-       {
-         cur->start_byte = prev->start_byte + prev->length;
-         cur->rb_index = RBTREE_TNIL_INDEX;
-         prev = cur;
-         cur = cur->next;
-       }
-
-      ASSERT (!f->new_chunks);
-      svm_fifo_grow (f, c);
-      return;
-    }
-
-  /* Wrapped */
-  if (f->flags & SVM_FIFO_F_SINGLE_THREAD_OWNED)
-    {
-      ASSERT (f->master_thread_index == os_get_thread_index ());
-
-      if (!f->new_chunks && f->head_chunk != f->tail_chunk)
-       {
-         u32 head = 0, tail = 0;
-         f_load_head_tail_cons (f, &head, &tail);
-
-         svm_fifo_chunk_t *tmp = f->tail_chunk->next;
-
-         prev = f->tail_chunk;
-         u32 add_bytes = 0;
-         cur = prev->next;
-         while (cur != f->start_chunk)
-           {
-             /* remove any existing rb_tree entry */
-             if (cur->rb_index != RBTREE_TNIL_INDEX)
-               {
-                 rb_tree_del (&f->ooo_enq_lookup, cur->start_byte);
-                 rb_tree_del (&f->ooo_deq_lookup, cur->start_byte);
-               }
-             cur->rb_index = RBTREE_TNIL_INDEX;
-             cur = cur->next;
-           }
-
-         /* insert new chunk after the tail_chunk */
-         f->tail_chunk->next = c;
-         while (c)
-           {
-             add_bytes += c->length;
-             c->start_byte = prev->start_byte + prev->length;
-             cur->rb_index = RBTREE_TNIL_INDEX;
-
-             prev = c;
-             c = c->next;
-           }
-         prev->next = tmp;
-
-         /* shift existing chunks along */
-         cur = tmp;
-         while (cur != f->start_chunk)
-           {
-             cur->start_byte = prev->start_byte + prev->length;
-             prev = cur;
-             cur = cur->next;
-           }
-
-         f->size += add_bytes;
-         f->nitems = f->size - 1;
-         f->new_chunks = 0;
-         head += add_bytes;
-
-         clib_atomic_store_rel_n (&f->head, head);
-         ASSERT (svm_fifo_is_sane (f));
-
-         return;
-       }
-    }
-
-  /* Wrapped, and optimization of single-thread-owned fifo cannot be applied */
-  /* Initialize chunks and add to lookup rbtree */
   cur = c;
-  if (f->new_chunks)
-    {
-      prev = f->new_chunks;
-      while (prev->next)
-       prev = prev->next;
-      prev->next = c;
-    }
-  else
-    prev = f->end_chunk;
+  prev = f->end_chunk;
 
   while (cur)
     {
       cur->start_byte = prev->start_byte + prev->length;
-      cur->rb_index = RBTREE_TNIL_INDEX;
-      prev = cur;
-      cur = cur->next;
-    }
-
-  /* Postpone size update */
-  if (!f->new_chunks)
-    {
-      f->new_chunks = c;
-      f->flags |= SVM_FIFO_F_GROW;
-    }
-}
-
-/**
- * Removes chunks that are after fifo end byte
- */
-svm_fifo_chunk_t *
-svm_fifo_collect_chunks (svm_fifo_t * f)
-{
-  svm_fifo_chunk_t *list, *cur;
-
-  f->flags &= ~SVM_FIFO_F_COLLECT_CHUNKS;
-
-  list = f->new_chunks;
-  f->new_chunks = 0;
-  cur = list;
-  while (cur)
-    {
-      if (cur->rb_index != RBTREE_TNIL_INDEX)
-       {
-         rb_tree_del (&f->ooo_enq_lookup, cur->start_byte);
-         rb_tree_del (&f->ooo_deq_lookup, cur->start_byte);
-       }
-      cur = cur->next;
-    }
-
-  return list;
-}
-
-void
-svm_fifo_try_shrink (svm_fifo_t * f, u32 head, u32 tail)
-{
-  u32 len_to_shrink = 0, tail_pos, len, last_pos;
-  svm_fifo_chunk_t *cur, *prev, *next, *start;
-
-  tail_pos = tail;
-  if (f->ooos_list_head != OOO_SEGMENT_INVALID_INDEX)
-    {
-      ooo_segment_t *last = ooo_segment_last (f);
-      tail_pos = ooo_segment_end_pos (f, last);
-    }
-
-  if (f->size_decrement)
-    {
-      /* Figure out available free space considering that there may be
-       * ooo segments */
-      len = clib_min (f->size_decrement, f_free_count (f, head, tail_pos));
-      f->nitems -= len;
-      f->size_decrement -= len;
-    }
-
-  /* Remove tail chunks if the following hold:
-   * - not wrapped
-   * - last used byte less than start of last chunk
-   */
-  if (tail_pos >= head && tail_pos < f->end_chunk->start_byte)
-    {
-      /* Lookup the last position not to be removed. Since size still needs
-       * to be nitems + 1, nitems must fall within the usable space. Also,
-       * first segment is not removable, so tail_pos can be 0. */
-      last_pos = tail_pos > 0 ? tail_pos - 1 : tail_pos;
-      prev = svm_fifo_find_chunk (f, clib_max (f->nitems, last_pos));
-      next = prev->next;
-      /* If tail_pos is first position in next, skip the chunk, otherwise,
-       * we must update the tail and, if fifo size is 0, even the head.
-       * We should not invalidate the tail for the caller and must not change
-       * consumer owned variables from code that's typically called by the
-       * producer */
-      if (next->start_byte == tail_pos)
-       {
-         prev = next;
-         next = next->next;
-       }
-      while (next != f->start_chunk)
-       {
-         cur = next;
-         next = cur->next;
-         len_to_shrink += cur->length;
-       }
-      if (len_to_shrink)
-       {
-         f->size -= len_to_shrink;
-         start = prev->next;
-         prev->next = f->start_chunk;
-         f->end_chunk = prev;
-         cur->next = f->new_chunks;
-         f->new_chunks = start;
-       }
-    }
-
-  if (!f->size_decrement && f->size == f->nitems + 1)
-    {
-      f->flags &= ~SVM_FIFO_F_SHRINK;
-      f->flags |= SVM_FIFO_F_COLLECT_CHUNKS;
-      if (f->start_chunk == f->start_chunk->next)
-       f->flags &= ~SVM_FIFO_F_MULTI_CHUNK;
-    }
-}
+      cur->enq_rb_index = RBTREE_TNIL_INDEX;
+      cur->deq_rb_index = RBTREE_TNIL_INDEX;
 
-/**
- * Request to reduce fifo size by amount of bytes
- */
-int
-svm_fifo_reduce_size (svm_fifo_t * f, u32 len, u8 try_shrink)
-{
-  svm_fifo_chunk_t *cur;
-  u32 actual_len = 0;
-
-  /* Abort if trying to reduce by more than fifo size or if
-   * fifo is undergoing resizing already */
-  if (len >= f->size || f->size > f->nitems + 1
-      || (f->flags & SVM_FIFO_F_SHRINK) || (f->flags & SVM_FIFO_F_GROW))
-    return 0;
-
-  /* last chunk that will not be removed */
-  cur = svm_fifo_find_chunk (f, f->nitems - len);
-
-  /* sum length of chunks that will be removed */
-  cur = cur->next;
-  while (cur != f->start_chunk)
-    {
-      actual_len += cur->length;
+      prev = cur;
       cur = cur->next;
     }
 
-  ASSERT (actual_len <= len);
-  if (!actual_len)
-    return 0;
-
-  f->size_decrement = actual_len;
-  f->flags |= SVM_FIFO_F_SHRINK;
+  prev->next = 0;
+  f->end_chunk->next = c;
+  f->end_chunk = prev;
 
-  if (try_shrink)
-    {
-      u32 head, tail;
-      f_load_head_tail_prod (f, &head, &tail);
-      svm_fifo_try_shrink (f, head, tail);
-    }
+  if (!f->tail_chunk)
+    f->tail_chunk = c;
 
-  return actual_len;
+  return;
 }
 
 void
@@ -1054,9 +803,13 @@ svm_fifo_overwrite_head (svm_fifo_t * f, u8 * src, u32 len)
   u32 head, tail, head_idx;
   svm_fifo_chunk_t *c;
 
-  ASSERT (len <= f->nitems);
+  ASSERT (len <= f->size);
 
   f_load_head_tail_cons (f, &head, &tail);
+
+  if (!f->head_chunk)
+    f->head_chunk = svm_fifo_find_chunk (f, head);
+
   c = f->head_chunk;
   head_idx = head - c->start_byte;
   n_chunk = c->length - head_idx;
@@ -1064,30 +817,65 @@ svm_fifo_overwrite_head (svm_fifo_t * f, u8 * src, u32 len)
     clib_memcpy_fast (&c->data[head_idx], src, len);
   else
     {
+      ASSERT (len - n_chunk <= c->next->length);
       clib_memcpy_fast (&c->data[head_idx], src, n_chunk);
       clib_memcpy_fast (&c->next->data[0], src + n_chunk, len - n_chunk);
     }
 }
 
+static int
+f_try_grow (svm_fifo_t * f, u32 head, u32 tail, u32 len)
+{
+  svm_fifo_chunk_t *c;
+  u32 alloc_size, free_alloced;
+
+  free_alloced = f_chunk_end (f->end_chunk) - tail;
+  ASSERT (free_alloced < len);
+
+  alloc_size = clib_min (f->min_alloc, f->size - (tail - head));
+  alloc_size = clib_max (alloc_size, len - free_alloced);
+
+  c = fsh_alloc_chunk (f->fs_hdr, f->slice_index, alloc_size);
+  if (PREDICT_FALSE (!c))
+    return -1;
+
+  svm_fifo_add_chunk (f, c);
+  return 0;
+}
+
 int
 svm_fifo_enqueue (svm_fifo_t * f, u32 len, const u8 * src)
 {
   u32 tail, head, free_count;
+  svm_fifo_chunk_t *old_tail_c;
+
+  f->ooos_newest = OOO_SEGMENT_INVALID_INDEX;
 
   f_load_head_tail_prod (f, &head, &tail);
 
   /* free space in fifo can only increase during enqueue: SPSC */
   free_count = f_free_count (f, head, tail);
 
-  f->ooos_newest = OOO_SEGMENT_INVALID_INDEX;
-
   if (PREDICT_FALSE (free_count == 0))
     return SVM_FIFO_EFULL;
 
   /* number of bytes we're going to copy */
   len = clib_min (free_count, len);
+
+  if (f_pos_gt (tail + len, f_chunk_end (f->end_chunk)))
+    {
+      if (PREDICT_FALSE (f_try_grow (f, head, tail, len)))
+       {
+         len = f_chunk_end (f->end_chunk) - tail;
+         if (!len)
+           return SVM_FIFO_EGROW;
+       }
+    }
+
+  old_tail_c = f->tail_chunk;
+
   svm_fifo_copy_to_chunk (f, f->tail_chunk, tail, src, len, &f->tail_chunk);
-  tail = (tail + len) % f->size;
+  tail = tail + len;
 
   svm_fifo_trace_add (f, head, len, 2);
 
@@ -1095,10 +883,9 @@ svm_fifo_enqueue (svm_fifo_t * f, u32 len, const u8 * src)
   if (PREDICT_FALSE (f->ooos_list_head != OOO_SEGMENT_INVALID_INDEX))
     {
       len += ooo_segment_try_collect (f, len, &tail);
-      if (f->flags & SVM_FIFO_F_MULTI_CHUNK)
-       f->tail_chunk = svm_fifo_lookup_clear_chunks (f, &f->ooo_enq_lookup,
-                                                     f->tail_chunk, f->tail,
-                                                     tail);
+      /* Tail chunk might've changed even if nothing was collected */
+      f->tail_chunk = f_lookup_clear_enq_chunks (f, old_tail_c, tail);
+      f->ooo_enq = 0;
     }
 
   /* store-rel: producer owned index (paired with load-acq in consumer) */
@@ -1117,30 +904,33 @@ svm_fifo_enqueue (svm_fifo_t * f, u32 len, const u8 * src)
 int
 svm_fifo_enqueue_with_offset (svm_fifo_t * f, u32 offset, u32 len, u8 * src)
 {
-  u32 tail, head, free_count, tail_idx;
+  u32 tail, head, free_count, enq_pos;
 
   f_load_head_tail_prod (f, &head, &tail);
 
-  if (PREDICT_FALSE (f->flags & SVM_FIFO_F_SHRINK))
-    svm_fifo_try_shrink (f, head, tail);
-
   /* free space in fifo can only increase during enqueue: SPSC */
   free_count = f_free_count (f, head, tail);
+  f->ooos_newest = OOO_SEGMENT_INVALID_INDEX;
 
   /* will this request fit? */
   if ((len + offset) > free_count)
     return SVM_FIFO_EFULL;
 
-  f->ooos_newest = OOO_SEGMENT_INVALID_INDEX;
+  enq_pos = tail + offset;
+
+  if (f_pos_gt (enq_pos + len, f_chunk_end (f->end_chunk)))
+    {
+      if (PREDICT_FALSE (f_try_grow (f, head, tail, offset + len)))
+       return SVM_FIFO_EGROW;
+    }
+
   svm_fifo_trace_add (f, offset, len, 1);
   ooo_segment_add (f, offset, head, tail, len);
-  tail_idx = (tail + offset) % f->size;
 
-  if (f->flags & SVM_FIFO_F_MULTI_CHUNK)
-    svm_fifo_update_ooo_enq (f, f->tail, tail_idx,
-                            (tail_idx + len) % f->size);
+  if (!f->ooo_enq || !f_chunk_includes_pos (f->ooo_enq, enq_pos))
+    f_update_ooo_enq (f, enq_pos, enq_pos + len);
 
-  svm_fifo_copy_to_chunk (f, f->ooo_enq, tail_idx, src, len, &f->ooo_enq);
+  svm_fifo_copy_to_chunk (f, f->ooo_enq, enq_pos, src, len, &f->ooo_enq);
 
   return 0;
 }
@@ -1156,17 +946,74 @@ svm_fifo_enqueue_nocopy (svm_fifo_t * f, u32 len)
   ASSERT (len <= svm_fifo_max_enqueue_prod (f));
   /* load-relaxed: producer owned index */
   tail = f->tail;
-  tail = (tail + len) % f->size;
+  tail = tail + len;
 
-  if (f->flags & SVM_FIFO_F_MULTI_CHUNK)
-    f->tail_chunk = svm_fifo_lookup_clear_chunks (f, &f->ooo_enq_lookup,
-                                                 f->tail_chunk, f->tail,
-                                                 tail);
+  if (rb_tree_is_init (&f->ooo_enq_lookup))
+    {
+      f->tail_chunk = f_lookup_clear_enq_chunks (f, f->tail_chunk, tail);
+      f->ooo_enq = 0;
+    }
+  else
+    {
+      f->tail_chunk = svm_fifo_find_next_chunk (f, f->tail_chunk, tail);
+    }
 
   /* store-rel: producer owned index (paired with load-acq in consumer) */
   clib_atomic_store_rel_n (&f->tail, tail);
 }
 
+always_inline svm_fifo_chunk_t *
+f_unlink_chunks (svm_fifo_t * f, u32 end_pos, u8 maybe_ooo)
+{
+  svm_fifo_chunk_t *start, *prev = 0, *c;
+  rb_tree_t *rt;
+  rb_node_t *n;
+
+  ASSERT (!f_chunk_includes_pos (f->start_chunk, end_pos));
+
+  if (maybe_ooo)
+    rt = &f->ooo_deq_lookup;
+
+  c = f->start_chunk;
+
+  do
+    {
+      if (maybe_ooo && c->deq_rb_index != RBTREE_TNIL_INDEX)
+       {
+         n = rb_node (rt, c->deq_rb_index);
+         ASSERT (n == f_find_node_rbtree (rt, c->start_byte));
+         rb_tree_del_node (rt, n);
+         c->deq_rb_index = RBTREE_TNIL_INDEX;
+       }
+      if (!c->next)
+       break;
+      prev = c;
+      c = c->next;
+    }
+  while (!f_chunk_includes_pos (c, end_pos));
+
+  if (maybe_ooo)
+    {
+      if (f->ooo_deq && f_pos_lt (f->ooo_deq->start_byte, f_chunk_end (c)))
+       f->ooo_deq = 0;
+    }
+  else
+    {
+      if (PREDICT_FALSE (f->ooo_deq != 0))
+       f->ooo_deq = 0;
+    }
+
+  /* Avoid unlinking the last chunk */
+  if (!prev)
+    return 0;
+
+  prev->next = 0;
+  start = f->start_chunk;
+  f->start_chunk = c;
+
+  return start;
+}
+
 int
 svm_fifo_dequeue (svm_fifo_t * f, u32 len, u8 * dst)
 {
@@ -1181,11 +1028,16 @@ svm_fifo_dequeue (svm_fifo_t * f, u32 len, u8 * dst)
     return SVM_FIFO_EEMPTY;
 
   len = clib_min (cursize, len);
+
+  if (!f->head_chunk)
+    f->head_chunk = svm_fifo_find_chunk (f, head);
+
   svm_fifo_copy_from_chunk (f, f->head_chunk, head, dst, len, &f->head_chunk);
-  head = (head + len) % f->size;
+  head = head + len;
 
-  if (PREDICT_FALSE (f->flags & SVM_FIFO_F_GROW))
-    svm_fifo_try_grow (f, head);
+  if (f_pos_geq (head, f_chunk_end (f->start_chunk)))
+    fsh_collect_chunks (f->fs_hdr, f->slice_index,
+                       f_unlink_chunks (f, head, 0));
 
   /* store-rel: consumer owned index (paired with load-acq in producer) */
   clib_atomic_store_rel_n (&f->head, head);
@@ -1207,10 +1059,10 @@ svm_fifo_peek (svm_fifo_t * f, u32 offset, u32 len, u8 * dst)
     return SVM_FIFO_EEMPTY;
 
   len = clib_min (cursize - offset, len);
-  head_idx = (head + offset) % f->size;
+  head_idx = head + offset;
 
-  if (f->flags & SVM_FIFO_F_MULTI_CHUNK)
-    svm_fifo_update_ooo_deq (f, head, head_idx, (head_idx + len) % f->size);
+  if (!f->ooo_deq || !f_chunk_includes_pos (f->ooo_deq, head_idx))
+    f_update_ooo_deq (f, head_idx, head_idx + len);
 
   svm_fifo_copy_from_chunk (f, f->ooo_deq, head_idx, dst, len, &f->ooo_deq);
   return len;
@@ -1234,15 +1086,15 @@ svm_fifo_dequeue_drop (svm_fifo_t * f, u32 len)
   svm_fifo_trace_add (f, tail, total_drop_bytes, 3);
 
   /* move head */
-  head = (head + total_drop_bytes) % f->size;
-
-  if (f->flags & SVM_FIFO_F_MULTI_CHUNK)
-    f->head_chunk = svm_fifo_lookup_clear_chunks (f, &f->ooo_deq_lookup,
-                                                 f->head_chunk, f->head,
-                                                 head);
+  head = head + total_drop_bytes;
 
-  if (PREDICT_FALSE (f->flags & SVM_FIFO_F_GROW))
-    svm_fifo_try_grow (f, head);
+  if (f_pos_geq (head, f_chunk_end (f->start_chunk)))
+    {
+      fsh_collect_chunks (f->fs_hdr, f->slice_index,
+                         f_unlink_chunks (f, head, 1));
+      f->head_chunk =
+       f_chunk_includes_pos (f->start_chunk, head) ? f->start_chunk : 0;
+    }
 
   /* store-rel: consumer owned index (paired with load-acq in producer) */
   clib_atomic_store_rel_n (&f->head, head);
@@ -1250,24 +1102,47 @@ svm_fifo_dequeue_drop (svm_fifo_t * f, u32 len)
   return total_drop_bytes;
 }
 
+/**
+ * Drop all data from fifo
+ *
+ * Should be called only from vpp side because of lookup cleanup
+ */
 void
 svm_fifo_dequeue_drop_all (svm_fifo_t * f)
 {
-  /* consumer foreign index */
-  u32 tail = clib_atomic_load_acq_n (&f->tail);
+  u32 head, tail;
+
+  f_load_head_tail_all_acq (f, &head, &tail);
 
-  if (f->flags & SVM_FIFO_F_MULTI_CHUNK)
-    f->head_chunk = svm_fifo_lookup_clear_chunks (f, &f->ooo_deq_lookup,
-                                                 f->head_chunk, tail,
-                                                 tail - 1);
+  if (!f->head_chunk || !f_chunk_includes_pos (f->head_chunk, head))
+    f->head_chunk = svm_fifo_find_chunk (f, head);
 
-  if (PREDICT_FALSE (f->flags & SVM_FIFO_F_GROW))
-    svm_fifo_try_grow (f, tail);
+  f->head_chunk = f_lookup_clear_deq_chunks (f, f->head_chunk, tail);
+
+  if (f_pos_geq (tail, f_chunk_end (f->start_chunk)))
+    fsh_collect_chunks (f->fs_hdr, f->slice_index,
+                       f_unlink_chunks (f, tail, 0));
 
   /* store-rel: consumer owned index (paired with load-acq in producer) */
   clib_atomic_store_rel_n (&f->head, tail);
 }
 
+int
+svm_fifo_fill_chunk_list (svm_fifo_t * f)
+{
+  u32 head, tail;
+
+  f_load_head_tail_prod (f, &head, &tail);
+
+  if (f_chunk_end (f->end_chunk) - head >= f->size)
+    return 0;
+
+  if (f_try_grow (f, head, tail, f->size - (tail - head)))
+    return SVM_FIFO_EGROW;
+
+  return 0;
+}
+
 int
 svm_fifo_segments (svm_fifo_t * f, svm_fifo_seg_t * fs)
 {
@@ -1309,7 +1184,7 @@ svm_fifo_segments_free (svm_fifo_t * f, svm_fifo_seg_t * fs)
   head = f->head;
 
   ASSERT (fs[0].data == f->head_chunk->data + head);
-  head = (head + fs[0].len + fs[1].len) % f->size;
+  head = (head + fs[0].len + fs[1].len);
   /* store-rel: consumer owned index (paired with load-acq in producer) */
   clib_atomic_store_rel_n (&f->head, head);
 }
@@ -1325,6 +1200,10 @@ void
 svm_fifo_clone (svm_fifo_t * df, svm_fifo_t * sf)
 {
   u32 head, tail;
+
+  /* Support only single chunk clones for now */
+  ASSERT (svm_fifo_n_chunks (sf) == 1);
+
   clib_memcpy_fast (df->head_chunk->data, sf->head_chunk->data, sf->size);
 
   f_load_head_tail_all_acq (sf, &head, &tail);
@@ -1350,20 +1229,17 @@ svm_fifo_first_ooo_segment (svm_fifo_t * f)
 void
 svm_fifo_init_pointers (svm_fifo_t * f, u32 head, u32 tail)
 {
-  head = head % f->size;
-  tail = tail % f->size;
+  svm_fifo_chunk_t *c;
+
   clib_atomic_store_rel_n (&f->head, head);
   clib_atomic_store_rel_n (&f->tail, tail);
-  if (f->flags & SVM_FIFO_F_MULTI_CHUNK)
-    {
-      svm_fifo_chunk_t *c;
-      c = svm_fifo_find_chunk (f, head);
-      ASSERT (c != 0);
-      f->head_chunk = f->ooo_deq = c;
-      c = svm_fifo_find_chunk (f, tail);
-      ASSERT (c != 0);
-      f->tail_chunk = f->ooo_enq = c;
-    }
+
+  c = svm_fifo_find_chunk (f, head);
+  ASSERT (c != 0);
+  f->head_chunk = f->ooo_deq = c;
+  c = svm_fifo_find_chunk (f, tail);
+  ASSERT (c != 0);
+  f->tail_chunk = f->ooo_enq = c;
 }
 
 void
@@ -1392,20 +1268,54 @@ svm_fifo_del_subscriber (svm_fifo_t * f, u8 subscriber)
 u8
 svm_fifo_is_sane (svm_fifo_t * f)
 {
-  if (f->size - 1 != f->nitems && !(f->flags & SVM_FIFO_F_SHRINK))
-    return 0;
-  if (!svm_fifo_chunk_includes_pos (f->head_chunk, f->head))
+  svm_fifo_chunk_t *tmp;
+
+  if (f->head_chunk && !f_chunk_includes_pos (f->head_chunk, f->head))
     return 0;
-  if (!svm_fifo_chunk_includes_pos (f->tail_chunk, f->tail))
+  if (f->tail_chunk && !f_chunk_includes_pos (f->tail_chunk, f->tail))
     return 0;
+  if (f->ooo_deq)
+    {
+      if (rb_tree_is_init (&f->ooo_deq_lookup))
+       {
+         if (f_pos_lt (f->ooo_deq->start_byte, f->start_chunk->start_byte)
+             || f_pos_gt (f->ooo_deq->start_byte,
+                          f_chunk_end (f->end_chunk)))
+           return 0;
 
-  if (f->start_chunk->next != f->start_chunk)
+         tmp = f_find_chunk_rbtree (&f->ooo_deq_lookup,
+                                    f->ooo_deq->start_byte);
+       }
+      else
+       tmp = svm_fifo_find_chunk (f, f->ooo_deq->start_byte);
+      if (tmp != f->ooo_deq)
+       return 0;
+    }
+  if (f->ooo_enq)
     {
-      svm_fifo_chunk_t *c, *prev = 0, *tmp;
-      u32 size = 0;
+      if (rb_tree_is_init (&f->ooo_enq_lookup))
+       {
+         if (f_pos_lt (f->ooo_enq->start_byte, f->start_chunk->start_byte)
+             || f_pos_gt (f->ooo_enq->start_byte,
+                          f_chunk_end (f->end_chunk)))
+           return 0;
 
-      if (!(f->flags & SVM_FIFO_F_MULTI_CHUNK))
+         tmp = f_find_chunk_rbtree (&f->ooo_enq_lookup,
+                                    f->ooo_enq->start_byte);
+       }
+      else
+       {
+         tmp = svm_fifo_find_next_chunk (f, f->tail_chunk,
+                                         f->ooo_enq->start_byte);
+       }
+      if (tmp != f->ooo_enq)
        return 0;
+    }
+
+  if (f->start_chunk->next)
+    {
+      svm_fifo_chunk_t *c, *prev = 0, *tmp;
+      u32 chunks_bytes = 0;
 
       c = f->start_chunk;
       do
@@ -1416,75 +1326,61 @@ svm_fifo_is_sane (svm_fifo_t * f)
          if (prev && (prev->start_byte + prev->length != c->start_byte))
            return 0;
 
-         if (c->rb_index != RBTREE_TNIL_INDEX)
+         if (c->enq_rb_index != RBTREE_TNIL_INDEX)
            {
-             u8 found = 0;
-
-             tmp = svm_fifo_find_chunk_rbtree (&f->ooo_enq_lookup,
-                                               c->start_byte);
+             tmp = f_find_chunk_rbtree (&f->ooo_enq_lookup, c->start_byte);
              if (tmp)
                {
-                 found = 1;
                  if (tmp != c)
                    return 0;
                }
-
-             tmp = svm_fifo_find_chunk_rbtree (&f->ooo_deq_lookup,
-                                               c->start_byte);
+           }
+         if (c->deq_rb_index != RBTREE_TNIL_INDEX)
+           {
+             tmp = f_find_chunk_rbtree (&f->ooo_deq_lookup, c->start_byte);
              if (tmp)
                {
-                 if (found)
-                   return 0;
-
-                 found = 1;
                  if (tmp != c)
                    return 0;
                }
-             if (!found)
-               return 0;
            }
 
-         size += c->length;
+         chunks_bytes += c->length;
          prev = c;
          c = c->next;
        }
-      while (c != f->start_chunk);
+      while (c);
 
-      if (size != f->size)
+      if (chunks_bytes < f->tail - f->head)
        return 0;
     }
 
   return 1;
 }
 
-u8
-svm_fifo_set_single_thread_owned (svm_fifo_t * f)
+u32
+svm_fifo_n_chunks (svm_fifo_t * f)
 {
-  if (f->flags & SVM_FIFO_F_SINGLE_THREAD_OWNED)
-    {
-      if (f->master_thread_index == os_get_thread_index ())
-       {
-         /* just a duplicate call */
-         return 0;
-       }
+  svm_fifo_chunk_t *c;
+  int n_chunks = 0;
 
-      /* already owned by another thread */
-      return 1;
+  c = f->start_chunk;
+  while (c)
+    {
+      n_chunks++;
+      c = c->next;
     }
 
-  f->flags |= SVM_FIFO_F_SINGLE_THREAD_OWNED;
-  return 0;
+  return n_chunks;
 }
 
 u8 *
 format_ooo_segment (u8 * s, va_list * args)
 {
-  svm_fifo_t *f = va_arg (*args, svm_fifo_t *);
+  svm_fifo_t __clib_unused *f = va_arg (*args, svm_fifo_t *);
   ooo_segment_t *seg = va_arg (*args, ooo_segment_t *);
-  u32 normalized_start = (seg->start + f->nitems - f->tail) % f->size;
-  s = format (s, "[%u, %u], len %u, next %d, prev %d", normalized_start,
-             (normalized_start + seg->length) % f->size, seg->length,
-             seg->next, seg->prev);
+  s = format (s, "[%u, %u], len %u, next %d, prev %d", seg->start,
+             seg->start + seg->length, seg->length, seg->next, seg->prev);
   return s;
 }
 
@@ -1532,9 +1428,10 @@ svm_fifo_replay (u8 * s, svm_fifo_t * f, u8 no_read, u8 verbose)
   trace_len = 0;
 #endif
 
-  dummy_fifo = svm_fifo_create (f->size);
-  clib_memset (f->head_chunk->data, 0xFF, f->nitems);
-  vec_validate (data, f->nitems);
+  dummy_fifo = svm_fifo_alloc (f->size);
+  svm_fifo_init (f, f->size);
+  clib_memset (f->head_chunk->data, 0xFF, f->size);
+  vec_validate (data, f->size);
   for (i = 0; i < vec_len (data); i++)
     data[i] = i;
 
@@ -1545,7 +1442,7 @@ svm_fifo_replay (u8 * s, svm_fifo_t * f, u8 no_read, u8 verbose)
        {
          if (verbose)
            s = format (s, "adding [%u, %u]:", trace[i].offset,
-                       (trace[i].offset + trace[i].len) % dummy_fifo->size);
+                       (trace[i].offset + trace[i].len));
          svm_fifo_enqueue_with_offset (dummy_fifo, trace[i].offset,
                                        trace[i].len, &data[offset]);
        }
@@ -1600,11 +1497,10 @@ format_svm_fifo (u8 * s, va_list * args)
     return s;
 
   indent = format_get_indent (s);
-  s = format (s, "cursize %u nitems %u has_event %d\n",
-             svm_fifo_max_dequeue (f), f->nitems, f->has_event);
+  s = format (s, "cursize %u nitems %u has_event %d min_alloc %u\n",
+             svm_fifo_max_dequeue (f), f->size, f->has_event, f->min_alloc);
   s = format (s, "%Uhead %u tail %u segment manager %u\n", format_white_space,
-             indent, (f->head % f->size), (f->tail % f->size),
-             f->segment_manager);
+             indent, f->head, f->tail, f->segment_manager);
 
   if (verbose > 1)
     s = format (s, "%Uvpp session %d thread %d app session %d thread %d\n",
index 2b6e854..8d5e480 100644 (file)
 #include <vppinfra/vec.h>
 #include <vppinfra/pool.h>
 #include <vppinfra/format.h>
-#include <vppinfra/rbtree.h>
+#include <svm/fifo_types.h>
 
-/** Out-of-order segment */
-typedef struct
-{
-  u32 next;    /**< Next linked-list element pool index */
-  u32 prev;    /**< Previous linked-list element pool index */
-  u32 start;   /**< Start of segment, normalized*/
-  u32 length;  /**< Length of segment */
-} ooo_segment_t;
-
-#define SVM_FIFO_TRACE                         (0)
 #define OOO_SEGMENT_INVALID_INDEX      ((u32)~0)
 #define SVM_FIFO_INVALID_SESSION_INDEX         ((u32)~0)
 #define SVM_FIFO_INVALID_INDEX         ((u32)~0)
-#define SVM_FIFO_MAX_EVT_SUBSCRIBERS   7
 
 typedef enum svm_fifo_deq_ntf_
 {
@@ -48,85 +37,16 @@ typedef enum svm_fifo_deq_ntf_
   SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY = 4,        /**< Notify on transition to empty */
 } svm_fifo_deq_ntf_t;
 
-typedef struct
-{
-  u32 offset;
-  u32 len;
-  u32 action;
-} svm_fifo_trace_elem_t;
-
-typedef struct svm_fifo_chunk_
-{
-  u32 start_byte;              /**< chunk start byte */
-  u32 length;                  /**< length of chunk in bytes */
-  struct svm_fifo_chunk_ *next;        /**< pointer to next chunk in linked-lists */
-  rb_node_index_t rb_index;    /**< node index if chunk in rbtree */
-  u8 data[0];                  /**< start of chunk data */
-} svm_fifo_chunk_t;
-
 typedef enum svm_fifo_flag_
 {
-  SVM_FIFO_F_MULTI_CHUNK = 1 << 0,
-  SVM_FIFO_F_GROW = 1 << 1,
-  SVM_FIFO_F_SHRINK = 1 << 2,
-  SVM_FIFO_F_COLLECT_CHUNKS = 1 << 3,
-  SVM_FIFO_F_LL_TRACKED = 1 << 4,
-  SVM_FIFO_F_SINGLE_THREAD_OWNED = 1 << 5,
+  SVM_FIFO_F_LL_TRACKED = 1 << 0,
 } svm_fifo_flag_t;
 
-typedef struct _svm_fifo
-{
-  CLIB_CACHE_LINE_ALIGN_MARK (shared_first);
-  u32 size;                    /**< size of the fifo in bytes */
-  u32 nitems;                  /**< usable size (size-1) */
-  svm_fifo_chunk_t *start_chunk;/**< first chunk in fifo chunk list */
-  svm_fifo_chunk_t *end_chunk; /**< end chunk in fifo chunk list */
-  rb_tree_t ooo_enq_lookup;    /**< rbtree for ooo enq chunk lookup */
-  rb_tree_t ooo_deq_lookup;    /**< rbtree for ooo deq chunk lookup */
-  u8 flags;                    /**< fifo flags */
-  u8 slice_index;              /**< segment slice for fifo */
-
-    CLIB_CACHE_LINE_ALIGN_MARK (shared_second);
-  volatile u32 has_event;      /**< non-zero if deq event exists */
-  u32 master_session_index;    /**< session layer session index */
-  u32 client_session_index;    /**< app session index */
-  u8 master_thread_index;      /**< session layer thread index */
-  u8 client_thread_index;      /**< app worker index */
-  i8 refcnt;                   /**< reference count  */
-  u32 segment_manager;         /**< session layer segment manager index */
-  u32 segment_index;           /**< segment index in segment manager */
-  struct _svm_fifo *next;      /**< next in freelist/active chain */
-  struct _svm_fifo *prev;      /**< prev in active chain */
-  svm_fifo_chunk_t *new_chunks;        /**< chunks yet to be added to list */
-  u32 size_decrement;          /**< bytes to remove from fifo */
-
-    CLIB_CACHE_LINE_ALIGN_MARK (consumer);
-  u32 head;                    /**< fifo head position/byte */
-  svm_fifo_chunk_t *head_chunk;        /**< tracks chunk where head lands */
-  svm_fifo_chunk_t *ooo_deq;   /**< last chunk used for ooo dequeue */
-  volatile u32 want_deq_ntf;   /**< producer wants nudge */
-  volatile u32 has_deq_ntf;
-
-    CLIB_CACHE_LINE_ALIGN_MARK (producer);
-  u32 tail;                    /**< fifo tail position/byte */
-  u32 ooos_list_head;          /**< Head of out-of-order linked-list */
-  svm_fifo_chunk_t *tail_chunk;        /**< tracks chunk where tail lands */
-  svm_fifo_chunk_t *ooo_enq;   /**< last chunk used for ooo enqueue */
-  ooo_segment_t *ooo_segments; /**< Pool of ooo segments */
-  u32 ooos_newest;             /**< Last segment to have been updated */
-  volatile u8 n_subscribers;   /**< Number of subscribers for io events */
-  u8 subscribers[SVM_FIFO_MAX_EVT_SUBSCRIBERS];
-
-#if SVM_FIFO_TRACE
-  svm_fifo_trace_elem_t *trace;
-#endif
-
-} svm_fifo_t;
-
 typedef enum
 {
   SVM_FIFO_EFULL = -2,
   SVM_FIFO_EEMPTY = -3,
+  SVM_FIFO_EGROW = -4,
 } svm_fifo_err_t;
 
 typedef struct svm_fifo_seg_
@@ -193,55 +113,63 @@ f_load_head_tail_all_acq (svm_fifo_t * f, u32 * head, u32 * tail)
 }
 
 /**
- * Distance to a from b, i.e., a - b in the fifo
+ * Fifo current size, i.e., number of bytes enqueued
  *
  * Internal function.
  */
 static inline u32
-f_distance_to (svm_fifo_t * f, u32 a, u32 b)
+f_cursize (svm_fifo_t * f, u32 head, u32 tail)
 {
-  return ((f->size + a - b) % f->size);
+  return tail - head;
 }
 
 /**
- * Distance from a to b, i.e., b - a in the fifo
+ * Fifo free bytes, i.e., number of free bytes
  *
- * Internal function.
+ * Internal function
  */
 static inline u32
-f_distance_from (svm_fifo_t * f, u32 a, u32 b)
+f_free_count (svm_fifo_t * f, u32 head, u32 tail)
 {
-  return ((f->size + b - a) % f->size);
+  return (f->size - f_cursize (f, head, tail));
 }
 
-/**
- * Fifo current size, i.e., number of bytes enqueued
- *
- * Internal function.
- */
-static inline u32
-f_cursize (svm_fifo_t * f, u32 head, u32 tail)
+always_inline u32
+f_chunk_end (svm_fifo_chunk_t * c)
 {
-  return (head <= tail ? tail - head : f->size + tail - head);
+  return c->start_byte + c->length;
 }
 
-/**
- * Fifo free bytes, i.e., number of free bytes
- *
- * Internal function
- */
-static inline u32
-f_free_count (svm_fifo_t * f, u32 head, u32 tail)
+always_inline int
+f_pos_lt (u32 a, u32 b)
 {
-  return (f->nitems - f_cursize (f, head, tail));
+  return ((i32) (a - b) < 0);
 }
 
-/**
- * Try to shrink fifo size.
- *
- * Internal function.
- */
-void svm_fifo_try_shrink (svm_fifo_t * f, u32 head, u32 tail);
+always_inline int
+f_pos_leq (u32 a, u32 b)
+{
+  return ((i32) (a - b) <= 0);
+}
+
+always_inline int
+f_pos_gt (u32 a, u32 b)
+{
+  return ((i32) (a - b) > 0);
+}
+
+always_inline int
+f_pos_geq (u32 a, u32 b)
+{
+  return ((i32) (a - b) >= 0);
+}
+
+always_inline u8
+f_chunk_includes_pos (svm_fifo_chunk_t * c, u32 pos)
+{
+  return (f_pos_geq (pos, c->start_byte)
+         && f_pos_lt (pos, c->start_byte + c->length));
+}
 
 /**
  * Create fifo of requested size
@@ -252,7 +180,7 @@ void svm_fifo_try_shrink (svm_fifo_t * f, u32 head, u32 tail);
  *                     rounded to the next highest power-of-two value.
  * @return             pointer to new fifo
  */
-svm_fifo_t *svm_fifo_create (u32 size);
+svm_fifo_t *svm_fifo_alloc (u32 size);
 /**
  * Initialize fifo
  *
@@ -260,12 +188,6 @@ svm_fifo_t *svm_fifo_create (u32 size);
  * @param size         size for fifo
  */
 void svm_fifo_init (svm_fifo_t * f, u32 size);
-/**
- * Initialize fifo chunks and rbtree
- *
- * @param f            fifo
- */
-void svm_fifo_init_chunks (svm_fifo_t * f);
 /**
  * Allocate a fifo chunk on heap
  *
@@ -287,31 +209,8 @@ svm_fifo_chunk_t *svm_fifo_chunk_alloc (u32 size);
  * @param c    chunk or linked list of chunks to be added
  */
 void svm_fifo_add_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c);
-/**
- * Request to reduce fifo size by amount of bytes
- *
- * Because the producer might be enqueuing data when this is called, the
- * actual size update is only applied when producer tries to enqueue new
- * data, unless @param try_shrink is set.
- *
- * @param f            fifo
- * @param len          number of bytes to remove from fifo. The actual number
- *                     of bytes to be removed will be less or equal to this
- *                     value.
- * @param try_shrink   flg to indicate if it's safe to try to shrink fifo
- *                     size. It should be set only if this is called by the
- *                     producer of if the producer is not using the fifo
- * @return             actual length fifo size will be reduced by
- */
-int svm_fifo_reduce_size (svm_fifo_t * f, u32 len, u8 try_shrink);
-/**
- * Removes chunks that are after fifo end byte
- *
- * Needs to be called with segment heap pushed.
- *
- * @param f fifo
- */
-svm_fifo_chunk_t *svm_fifo_collect_chunks (svm_fifo_t * f);
+int svm_fifo_fill_chunk_list (svm_fifo_t * f);
+void svm_fifo_init_ooo_lookup (svm_fifo_t * f, u8 ooo_type);
 /**
  * Free fifo and associated state
  *
@@ -481,14 +380,7 @@ ooo_segment_t *svm_fifo_first_ooo_segment (svm_fifo_t * f);
  * @return     1 if sane, 0 otherwise
  */
 u8 svm_fifo_is_sane (svm_fifo_t * f);
-/**
- * Declare this fifo is used by only a single thread.
- * In this special case, fifo-growth can be done in an efficient way without delay.
- *
- * @param f             fifo
- * @return              1 if the fifo is already owned by another thread, 0 otherwise
- */
-u8 svm_fifo_set_single_thread_owned (svm_fifo_t * f);
+u32 svm_fifo_n_chunks (svm_fifo_t * f);
 format_function_t format_svm_fifo;
 
 /**
@@ -543,7 +435,7 @@ svm_fifo_max_dequeue (svm_fifo_t * f)
 static inline int
 svm_fifo_is_full_prod (svm_fifo_t * f)
 {
-  return (svm_fifo_max_dequeue_prod (f) == f->nitems);
+  return (svm_fifo_max_dequeue_prod (f) == f->size);
 }
 
 /* Check if fifo is full.
@@ -555,7 +447,7 @@ svm_fifo_is_full_prod (svm_fifo_t * f)
 static inline int
 svm_fifo_is_full (svm_fifo_t * f)
 {
-  return (svm_fifo_max_dequeue (f) == f->nitems);
+  return (svm_fifo_max_dequeue (f) == f->size);
 }
 
 /**
@@ -622,8 +514,6 @@ svm_fifo_max_enqueue_prod (svm_fifo_t * f)
 {
   u32 head, tail;
   f_load_head_tail_prod (f, &head, &tail);
-  if (PREDICT_FALSE (f->flags & SVM_FIFO_F_SHRINK))
-    svm_fifo_try_shrink (f, head, tail);
   return f_free_count (f, head, tail);
 }
 
@@ -638,40 +528,44 @@ svm_fifo_max_enqueue (svm_fifo_t * f)
 {
   u32 head, tail;
   f_load_head_tail_all_acq (f, &head, &tail);
-  if (PREDICT_FALSE (f->flags & SVM_FIFO_F_SHRINK))
-    svm_fifo_try_shrink (f, head, tail);
   return f_free_count (f, head, tail);
 }
 
 /**
- * Max contiguous chunk of data that can be read
+ * Max contiguous chunk of data that can be read.
+ *
+ * Should only be called by consumers.
  */
-static inline u32
-svm_fifo_max_read_chunk (svm_fifo_t * f)
-{
-  u32 head, tail;
-  f_load_head_tail_cons (f, &head, &tail);
-  return tail >= head ? (tail - head) : (f->size - head);
-}
+u32 svm_fifo_max_read_chunk (svm_fifo_t * f);
 
 /**
  * Max contiguous chunk of data that can be written
+ *
+ * Should only be called by producers
  */
-static inline u32
-svm_fifo_max_write_chunk (svm_fifo_t * f)
+u32 svm_fifo_max_write_chunk (svm_fifo_t * f);
+
+static inline svm_fifo_chunk_t *
+svm_fifo_head_chunk (svm_fifo_t * f)
 {
-  u32 head, tail;
-  f_load_head_tail_prod (f, &head, &tail);
-  return tail >= head ? f->size - tail : f_free_count (f, head, tail);
+  return f->head_chunk;
 }
 
 static inline u8 *
 svm_fifo_head (svm_fifo_t * f)
 {
+  if (!f->head_chunk)
+    return 0;
   /* load-relaxed: consumer owned index */
   return (f->head_chunk->data + (f->head - f->head_chunk->start_byte));
 }
 
+static inline svm_fifo_chunk_t *
+svm_fifo_tail_chunk (svm_fifo_t * f)
+{
+  return f->tail_chunk;
+}
+
 static inline u8 *
 svm_fifo_tail (svm_fifo_t * f)
 {
@@ -718,7 +612,7 @@ ooo_segment_offset_prod (svm_fifo_t * f, ooo_segment_t * s)
   /* load-relaxed: producer owned index */
   tail = f->tail;
 
-  return f_distance_to (f, s->start, tail);
+  return (s->start - tail);
 }
 
 static inline u32
@@ -727,6 +621,18 @@ ooo_segment_length (svm_fifo_t * f, ooo_segment_t * s)
   return s->length;
 }
 
+static inline u32
+svm_fifo_size (svm_fifo_t * f)
+{
+  return f->size;
+}
+
+static inline void
+svm_fifo_set_size (svm_fifo_t * f, u32 size)
+{
+  f->size = size;
+}
+
 /**
  * Check if fifo has io event
  *
@@ -850,9 +756,8 @@ svm_fifo_needs_deq_ntf (svm_fifo_t * f, u32 n_last_deq)
   if (want_ntf & SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL)
     {
       u32 max_deq = svm_fifo_max_dequeue_cons (f);
-      u32 nitems = f->nitems;
-      if (!f->has_deq_ntf && max_deq < nitems
-         && max_deq + n_last_deq >= nitems)
+      u32 size = f->size;
+      if (!f->has_deq_ntf && max_deq < size && max_deq + n_last_deq >= size)
        return 1;
     }
   if (want_ntf & SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY)
index 08a22c7..46cea2a 100644 (file)
@@ -3197,7 +3197,8 @@ vppcom_session_attr (uint32_t session_handle, uint32_t op,
 
          /* VPP-TBD */
          *(size_t *) buffer = (session->sndbuf_size ? session->sndbuf_size :
-                               session->tx_fifo ? session->tx_fifo->nitems :
+                               session->tx_fifo ?
+                               svm_fifo_size (session->tx_fifo) :
                                vcm->cfg.tx_fifo_size);
          *buflen = sizeof (u32);
 
@@ -3228,7 +3229,8 @@ vppcom_session_attr (uint32_t session_handle, uint32_t op,
 
          /* VPP-TBD */
          *(size_t *) buffer = (session->rcvbuf_size ? session->rcvbuf_size :
-                               session->rx_fifo ? session->rx_fifo->nitems :
+                               session->rx_fifo ?
+                               svm_fifo_size (session->rx_fifo) :
                                vcm->cfg.rx_fifo_size);
          *buflen = sizeof (u32);
 
index 25fb39e..cc02026 100644 (file)
@@ -180,7 +180,11 @@ ct_init_local_session (app_worker_t * client_wrk, app_worker_t * server_wrk,
   props = application_segment_manager_properties (server);
   round_rx_fifo_sz = 1 << max_log2 (props->rx_fifo_size);
   round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size);
-  seg_size = round_rx_fifo_sz + round_tx_fifo_sz + margin;
+  /* Increase size because of inefficient chunk allocations. Depending on
+   * how data is consumed, it may happen that more chunks than needed are
+   * allocated.
+   * TODO should remove once allocations are done more efficiently */
+  seg_size = 4 * (round_rx_fifo_sz + round_tx_fifo_sz + margin);
 
   sm = app_worker_get_listen_segment_manager (server_wrk, ll);
   seg_index = segment_manager_add_segment (sm, seg_size);
index 0600b67..0a54b96 100644 (file)
@@ -686,50 +686,6 @@ segment_manager_dealloc_fifos (svm_fifo_t * rx_fifo, svm_fifo_t * tx_fifo)
     segment_manager_segment_reader_unlock (sm);
 }
 
-int
-segment_manager_grow_fifo (segment_manager_t * sm, svm_fifo_t * f, u32 size)
-{
-  fifo_segment_t *fs;
-  int rv;
-
-  fs = segment_manager_get_segment_w_lock (sm, f->segment_index);
-  rv = fifo_segment_grow_fifo (fs, f, size);
-  segment_manager_segment_reader_unlock (sm);
-
-  return rv;
-}
-
-int
-segment_manager_collect_fifo_chunks (segment_manager_t * sm, svm_fifo_t * f)
-{
-  fifo_segment_t *fs;
-  int rv;
-
-  fs = segment_manager_get_segment_w_lock (sm, f->segment_index);
-  rv = fifo_segment_collect_fifo_chunks (fs, f);
-  segment_manager_segment_reader_unlock (sm);
-
-  return rv;
-}
-
-int
-segment_manager_shrink_fifo (segment_manager_t * sm, svm_fifo_t * f, u32 size,
-                            u8 is_producer)
-{
-  int rv;
-
-  rv = svm_fifo_reduce_size (f, size, is_producer);
-
-  /* Nothing to collect at this point */
-  if (!is_producer)
-    return rv;
-
-  if (f->flags & SVM_FIFO_F_COLLECT_CHUNKS)
-    segment_manager_collect_fifo_chunks (sm, f);
-
-  return rv;
-}
-
 u32
 segment_manager_evt_q_expected_size (u32 q_len)
 {
index b409260..52f89ee 100644 (file)
@@ -118,50 +118,6 @@ int segment_manager_try_alloc_fifos (fifo_segment_t * fs,
 void segment_manager_dealloc_fifos (svm_fifo_t * rx_fifo,
                                    svm_fifo_t * tx_fifo);
 
-/**
- * Grows fifo owned by segment manager
- *
- * @param sm   segment manager that owns the fifo
- * @param f    fifo to be grown
- * @param size amount of bytes to add to fifo
- * @return     0 on success, negative number otherwise
- */
-int segment_manager_grow_fifo (segment_manager_t * sm, svm_fifo_t * f,
-                              u32 size);
-
-/**
- * Request to shrink fifo owned by segment manager
- *
- * If this is not called by the producer, no attempt is made to reduce the
- * size until the producer tries to enqueue more data. To collect the chunks
- * that are to be removed call @ref segment_manager_collect_fifo_chunks
- *
- * Size reduction does not affect fifo chunk boundaries. Therefore chunks are
- * not split and the amount of bytes to be removed can be equal to or less
- * than what was requested.
- *
- * @param sm           segment manager that owns the fifo
- * @param f            fifo to be shrunk
- * @param size         amount of bytes to remove from fifo
- * @param is_producer  flag that indicates is caller is the producer for the
- *                     fifo.
- * @return             actual number of bytes to be removed
- */
-int segment_manager_shrink_fifo (segment_manager_t * sm, svm_fifo_t * f,
-                                u32 size, u8 is_producer);
-
-/**
- * Collect fifo chunks that are no longer used
- *
- * This should not be called unless SVM_FIFO_F_COLLECT_CHUNKS is set for
- * the fifo. The chunks are returned to the fifo segment freelist.
- *
- * @param sm           segment manager that owns the fifo
- * @param f            fifo whose chunks are to be collected
- * @return             0 on success, error otherwise
- */
-int segment_manager_collect_fifo_chunks (segment_manager_t * sm,
-                                        svm_fifo_t * f);
 u8 segment_manager_has_fifos (segment_manager_t * sm);
 
 svm_msg_q_t *segment_manager_alloc_queue (fifo_segment_t * fs,
index 346af54..e856372 100644 (file)
@@ -492,14 +492,14 @@ always_inline u32
 transport_rx_fifo_size (transport_connection_t * tc)
 {
   session_t *s = session_get (tc->s_index, tc->thread_index);
-  return s->rx_fifo->nitems;
+  return svm_fifo_size (s->rx_fifo);
 }
 
 always_inline u32
 transport_tx_fifo_size (transport_connection_t * tc)
 {
   session_t *s = session_get (tc->s_index, tc->thread_index);
-  return s->tx_fifo->nitems;
+  return svm_fifo_size (s->tx_fifo);
 }
 
 always_inline u8
index ff86b0f..f7383cb 100644 (file)
@@ -207,6 +207,7 @@ rb_tree_add_custom (rb_tree_t * rt, u32 key, uword opaque, rb_tree_lt_fn ltfn)
     {
       x = rb_node (rt, xi);
       y = x;
+      ASSERT (z->key != x->key);
       if (ltfn (z->key, x->key))
        xi = x->left;
       else
@@ -314,8 +315,8 @@ rb_tree_transplant (rb_tree_t * rt, rb_node_t * u, rb_node_t * v)
   v->parent = u->parent;
 }
 
-void
-rb_tree_del_node (rb_tree_t * rt, rb_node_t * z)
+static void
+rb_tree_del_node_internal (rb_tree_t * rt, rb_node_t * z)
 {
   rb_node_color_t y_original_color;
   rb_node_t *x, *y, *yr, *yl, *xp, *w, *wl, *wr;
@@ -440,16 +441,20 @@ rb_tree_del_node (rb_tree_t * rt, rb_node_t * z)
   x->color = RBTREE_BLACK;
 }
 
+void
+rb_tree_del_node (rb_tree_t * rt, rb_node_t * z)
+{
+  rb_tree_del_node_internal (rt, z);
+  pool_put (rt->nodes, z);
+}
+
 void
 rb_tree_del (rb_tree_t * rt, u32 key)
 {
   rb_node_t *n;
   n = rb_tree_search_subtree (rt, rb_node (rt, rt->root), key);
   if (rb_node_index (rt, n) != RBTREE_TNIL_INDEX)
-    {
-      rb_tree_del_node (rt, n);
-      pool_put (rt->nodes, n);
-    }
+    rb_tree_del_node (rt, n);
 }
 
 void
@@ -458,10 +463,7 @@ rb_tree_del_custom (rb_tree_t * rt, u32 key, rb_tree_lt_fn ltfn)
   rb_node_t *n;
   n = rb_tree_search_subtree_custom (rt, rb_node (rt, rt->root), key, ltfn);
   if (rb_node_index (rt, n) != RBTREE_TNIL_INDEX)
-    {
-      rb_tree_del_node (rt, n);
-      pool_put (rt->nodes, n);
-    }
+    rb_tree_del_node (rt, n);
 }
 
 u32
@@ -490,6 +492,14 @@ rb_tree_init (rb_tree_t * rt)
   tnil->color = RBTREE_BLACK;
 }
 
+int
+rb_tree_is_init (rb_tree_t * rt)
+{
+  if (pool_elts (rt->nodes) == 0)
+    return 0;
+  return 1;
+}
+
 /*
  * fd.io coding-style-patch-verification: ON
  *
index dde2fbf..3ab9a33 100644 (file)
@@ -64,6 +64,7 @@ rb_node_t *rb_tree_search_subtree_custom (rb_tree_t * rt, rb_node_t * x,
                                          u32 key, rb_tree_lt_fn ltfn);
 rb_node_t *rb_tree_successor (rb_tree_t * rt, rb_node_t * x);
 rb_node_t *rb_tree_predecessor (rb_tree_t * rt, rb_node_t * x);
+int rb_tree_is_init (rb_tree_t * rt);
 
 static inline rb_node_index_t
 rb_node_index (rb_tree_t * rt, rb_node_t * n)