session: remove fifo segment va allocator
[vpp.git] / src / svm / svm_fifo.c
index f1ac8d4..dc9e4fb 100644 (file)
 #include <svm/fifo_segment.h>
 #include <vppinfra/cpu.h>
 
-CLIB_MARCH_FN (svm_fifo_copy_to_chunk, void, svm_fifo_t * f,
-              svm_fifo_chunk_t * c, u32 tail_idx, const u8 * src, u32 len,
-              svm_fifo_chunk_t ** last)
+#define F_INVALID_CPTR (svm_fifo_chunk_ptr_t) ~0ULL
+
+CLIB_MARCH_FN (svm_fifo_copy_to_chunk, void, svm_fifo_t *f,
+              svm_fifo_chunk_t *c, u32 tail_idx, const u8 *src, u32 len,
+              svm_fifo_chunk_ptr_t *last)
 {
   u32 n_chunk;
 
@@ -36,15 +38,15 @@ CLIB_MARCH_FN (svm_fifo_copy_to_chunk, void, svm_fifo_t * f,
     {
       u32 to_copy = len;
       clib_memcpy_fast (&c->data[tail_idx], src, n_chunk);
-      c = c->next;
+      c = f_cptr (f, c->next);
       while ((to_copy -= n_chunk))
        {
          n_chunk = clib_min (c->length, to_copy);
          clib_memcpy_fast (&c->data[0], src + (len - to_copy), n_chunk);
-         c = c->length <= to_copy ? c->next : c;
+         c = c->length <= to_copy ? f_cptr (f, c->next) : c;
        }
       if (*last)
-       *last = c;
+       *last = f_csptr (f, c);
     }
   else
     {
@@ -52,9 +54,9 @@ CLIB_MARCH_FN (svm_fifo_copy_to_chunk, void, svm_fifo_t * f,
     }
 }
 
-CLIB_MARCH_FN (svm_fifo_copy_from_chunk, void, svm_fifo_t * f,
-              svm_fifo_chunk_t * c, u32 head_idx, u8 * dst, u32 len,
-              svm_fifo_chunk_t ** last)
+CLIB_MARCH_FN (svm_fifo_copy_from_chunk, void, svm_fifo_t *f,
+              svm_fifo_chunk_t *c, u32 head_idx, u8 *dst, u32 len,
+              svm_fifo_chunk_ptr_t *last)
 {
   u32 n_chunk;
 
@@ -67,17 +69,17 @@ CLIB_MARCH_FN (svm_fifo_copy_from_chunk, void, svm_fifo_t * f,
     {
       u32 to_copy = len;
       clib_memcpy_fast (dst, &c->data[head_idx], n_chunk);
-      c = c->next;
+      c = f_cptr (f, c->next);
       while ((to_copy -= n_chunk))
        {
          CLIB_MEM_UNPOISON (c, sizeof (*c));
          CLIB_MEM_UNPOISON (c->data, c->length);
          n_chunk = clib_min (c->length, to_copy);
          clib_memcpy_fast (dst + (len - to_copy), &c->data[0], n_chunk);
-         c = c->length <= to_copy ? c->next : c;
+         c = c->length <= to_copy ? f_cptr (f, c->next) : c;
        }
       if (*last)
-       *last = c;
+       *last = f_csptr (f, c);
     }
   else
     {
@@ -88,16 +90,16 @@ CLIB_MARCH_FN (svm_fifo_copy_from_chunk, void, svm_fifo_t * f,
 #ifndef CLIB_MARCH_VARIANT
 
 static inline void
-svm_fifo_copy_to_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c, u32 tail_idx,
-                       const u8 * src, u32 len, svm_fifo_chunk_t ** last)
+svm_fifo_copy_to_chunk (svm_fifo_t *f, svm_fifo_chunk_t *c, u32 tail_idx,
+                       const u8 *src, u32 len, svm_fifo_chunk_ptr_t *last)
 {
   CLIB_MARCH_FN_SELECT (svm_fifo_copy_to_chunk) (f, c, tail_idx, src, len,
                                                 last);
 }
 
 static inline void
-svm_fifo_copy_from_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c, u32 head_idx,
-                         u8 * dst, u32 len, svm_fifo_chunk_t ** last)
+svm_fifo_copy_from_chunk (svm_fifo_t *f, svm_fifo_chunk_t *c, u32 head_idx,
+                         u8 *dst, u32 len, svm_fifo_chunk_ptr_t *last)
 {
   CLIB_MARCH_FN_SELECT (svm_fifo_copy_from_chunk) (f, c, head_idx, dst, len,
                                                   last);
@@ -348,7 +350,7 @@ ooo_segment_try_collect (svm_fifo_t * f, u32 n_bytes_enqueued, u32 * tail)
        }
     }
 
-  ASSERT (bytes <= f->size);
+  ASSERT (bytes <= f->shr->size);
   return bytes;
 }
 
@@ -372,32 +374,33 @@ svm_fifo_init (svm_fifo_t * f, u32 size)
   svm_fifo_chunk_t *c, *prev;
   u32 min_alloc;
 
-  f->size = size;
+  f->shr->size = size;
   f->ooos_list_head = OOO_SEGMENT_INVALID_INDEX;
   f->segment_index = SVM_FIFO_INVALID_INDEX;
   f->refcnt = 1;
-  f->head = f->tail = f->flags = 0;
-  f->head_chunk = f->tail_chunk = f->start_chunk;
+  f->shr->head = f->shr->tail = f->flags = 0;
+  f->shr->head_chunk = f->shr->tail_chunk = f->shr->start_chunk;
   f->ooo_deq = f->ooo_enq = 0;
 
   min_alloc = size > 32 << 10 ? size >> 3 : 4096;
   min_alloc = clib_min (min_alloc, 64 << 10);
-  f->min_alloc = min_alloc;
+  f->shr->min_alloc = min_alloc;
 
   /*
    * Initialize chunks
    */
-  f->start_chunk->start_byte = 0;
-  prev = f->start_chunk;
+  prev = f_start_cptr (f);
+  prev->start_byte = 0;
   prev->enq_rb_index = prev->deq_rb_index = RBTREE_TNIL_INDEX;
-  c = prev->next;
+  c = f_cptr (f, prev->next);
 
   while (c)
     {
       c->start_byte = prev->start_byte + prev->length;
       c->enq_rb_index = c->deq_rb_index = RBTREE_TNIL_INDEX;
+      ASSERT (c->length >= 1 << FS_MIN_LOG2_CHUNK_SZ);
       prev = c;
-      c = c->next;
+      c = f_cptr (f, c->next);
     }
 }
 
@@ -447,7 +450,7 @@ svm_fifo_alloc (u32 data_size_in_bytes)
   c->length = data_size_in_bytes;
   c->enq_rb_index = RBTREE_TNIL_INDEX;
   c->deq_rb_index = RBTREE_TNIL_INDEX;
-  f->start_chunk = f->end_chunk = c;
+  f->shr->start_chunk = f->shr->end_chunk = f_csptr (f, c);
 
   return f;
 }
@@ -486,9 +489,9 @@ svm_fifo_find_chunk (svm_fifo_t * f, u32 pos)
 {
   svm_fifo_chunk_t *c;
 
-  c = f->start_chunk;
+  c = f_start_cptr (f);
   while (c && !f_chunk_includes_pos (c, pos))
-    c = c->next;
+    c = f_cptr (f, c->next);
 
   return c;
 }
@@ -502,7 +505,7 @@ svm_fifo_find_next_chunk (svm_fifo_t * f, svm_fifo_chunk_t * start, u32 pos)
 
   c = start;
   while (c && !f_chunk_includes_pos (c, pos))
-    c = c->next;
+    c = f_cptr (f, c->next);
 
   return c;
 }
@@ -513,16 +516,16 @@ svm_fifo_max_read_chunk (svm_fifo_t * f)
   u32 head, tail, end_chunk;
 
   f_load_head_tail_cons (f, &head, &tail);
-  ASSERT (!f->head_chunk || f_chunk_includes_pos (f->head_chunk, head));
+  ASSERT (!f->shr->head_chunk || f_chunk_includes_pos (f_head_cptr (f), head));
 
-  if (!f->head_chunk)
+  if (!f->shr->head_chunk)
     {
-      f->head_chunk = svm_fifo_find_chunk (f, head);
-      if (PREDICT_FALSE (!f->head_chunk))
+      f->shr->head_chunk = f_csptr (f, svm_fifo_find_chunk (f, head));
+      if (PREDICT_FALSE (!f->shr->head_chunk))
        return 0;
     }
 
-  end_chunk = f_chunk_end (f->head_chunk);
+  end_chunk = f_chunk_end (f_head_cptr (f));
 
   return f_pos_lt (end_chunk, tail) ? end_chunk - head : tail - head;
 }
@@ -530,12 +533,15 @@ svm_fifo_max_read_chunk (svm_fifo_t * f)
 u32
 svm_fifo_max_write_chunk (svm_fifo_t * f)
 {
+  svm_fifo_chunk_t *tail_chunk;
   u32 head, tail;
 
   f_load_head_tail_prod (f, &head, &tail);
-  ASSERT (!f->tail_chunk || f_chunk_includes_pos (f->tail_chunk, tail));
+  tail_chunk = f_tail_cptr (f);
+
+  ASSERT (!tail_chunk || f_chunk_includes_pos (tail_chunk, tail));
 
-  return f->tail_chunk ? f_chunk_end (f->tail_chunk) - tail : 0;
+  return tail_chunk ? f_chunk_end (tail_chunk) - tail : 0;
 }
 
 static rb_node_t *
@@ -605,13 +611,13 @@ f_update_ooo_enq (svm_fifo_t * f, u32 start_pos, u32 end_pos)
   /* Use linear search if rbtree is not initialized */
   if (PREDICT_FALSE (!rb_tree_is_init (rt)))
     {
-      f->ooo_enq = svm_fifo_find_next_chunk (f, f->tail_chunk, start_pos);
+      f->ooo_enq = svm_fifo_find_next_chunk (f, f_tail_cptr (f), start_pos);
       return;
     }
 
   if (rt->root == RBTREE_TNIL_INDEX)
     {
-      c = f->tail_chunk;
+      c = f_tail_cptr (f);
       ASSERT (c->enq_rb_index == RBTREE_TNIL_INDEX);
       c->enq_rb_index = rb_tree_add_custom (rt, c->start_byte,
                                            pointer_to_uword (c), f_pos_lt);
@@ -631,7 +637,7 @@ f_update_ooo_enq (svm_fifo_t * f, u32 start_pos, u32 end_pos)
 
   do
     {
-      c = c->next;
+      c = f_cptr (f, c->next);
       if (!c || c->enq_rb_index != RBTREE_TNIL_INDEX)
        break;
 
@@ -660,7 +666,7 @@ f_update_ooo_deq (svm_fifo_t * f, u32 start_pos, u32 end_pos)
 
   if (rt->root == RBTREE_TNIL_INDEX)
     {
-      c = f->start_chunk;
+      c = f_start_cptr (f);
       ASSERT (c->deq_rb_index == RBTREE_TNIL_INDEX);
       c->deq_rb_index = rb_tree_add_custom (rt, c->start_byte,
                                            pointer_to_uword (c), f_pos_lt);
@@ -680,7 +686,7 @@ f_update_ooo_deq (svm_fifo_t * f, u32 start_pos, u32 end_pos)
 
   do
     {
-      c = c->next;
+      c = f_cptr (f, c->next);
       if (!c || c->deq_rb_index != RBTREE_TNIL_INDEX)
        break;
 
@@ -711,7 +717,7 @@ f_lookup_clear_enq_chunks (svm_fifo_t * f, svm_fifo_chunk_t * start,
          c->enq_rb_index = RBTREE_TNIL_INDEX;
        }
 
-      c = c->next;
+      c = f_cptr (f, c->next);
     }
 
   /* No ooo segments left, so make sure the current chunk
@@ -745,7 +751,7 @@ f_lookup_clear_deq_chunks (svm_fifo_t * f, svm_fifo_chunk_t * start,
          c->deq_rb_index = RBTREE_TNIL_INDEX;
        }
 
-      c = c->next;
+      c = f_cptr (f, c->next);
     }
 
   return c;
@@ -778,23 +784,24 @@ svm_fifo_overwrite_head (svm_fifo_t * f, u8 * src, u32 len)
   u32 head, tail, head_idx;
   svm_fifo_chunk_t *c;
 
-  ASSERT (len <= f->size);
+  ASSERT (len <= f->shr->size);
 
   f_load_head_tail_cons (f, &head, &tail);
 
-  if (!f->head_chunk)
-    f->head_chunk = svm_fifo_find_chunk (f, head);
+  if (!f->shr->head_chunk)
+    f->shr->head_chunk = f_csptr (f, svm_fifo_find_chunk (f, head));
 
-  c = f->head_chunk;
+  c = f_head_cptr (f);
   head_idx = head - c->start_byte;
   n_chunk = c->length - head_idx;
   if (len <= n_chunk)
     clib_memcpy_fast (&c->data[head_idx], src, len);
   else
     {
-      ASSERT (len - n_chunk <= c->next->length);
+      ASSERT (len - n_chunk <= f_cptr (f, c->next)->length);
       clib_memcpy_fast (&c->data[head_idx], src, n_chunk);
-      clib_memcpy_fast (&c->next->data[0], src + n_chunk, len - n_chunk);
+      clib_memcpy_fast (&f_cptr (f, c->next)->data[0], src + n_chunk,
+                       len - n_chunk);
     }
 }
 
@@ -804,17 +811,17 @@ f_try_chunk_alloc (svm_fifo_t * f, u32 head, u32 tail, u32 len)
   svm_fifo_chunk_t *c, *cur, *prev;
   u32 alloc_size, free_alloced;
 
-  free_alloced = f_chunk_end (f->end_chunk) - tail;
+  prev = f_end_cptr (f);
+  free_alloced = f_chunk_end (prev) - tail;
 
-  alloc_size = clib_min (f->min_alloc, f->size - (tail - head));
+  alloc_size = clib_min (f->shr->min_alloc, f->shr->size - (tail - head));
   alloc_size = clib_max (alloc_size, len - free_alloced);
 
-  c = fsh_alloc_chunk (f->fs_hdr, f->slice_index, alloc_size);
+  c = fsh_alloc_chunk (f->fs_hdr, f->shr->slice_index, alloc_size);
   if (PREDICT_FALSE (!c))
     return -1;
 
   cur = c;
-  prev = f->end_chunk;
 
   while (cur)
     {
@@ -823,15 +830,15 @@ f_try_chunk_alloc (svm_fifo_t * f, u32 head, u32 tail, u32 len)
       cur->deq_rb_index = RBTREE_TNIL_INDEX;
 
       prev = cur;
-      cur = cur->next;
+      cur = f_cptr (f, cur->next);
     }
 
+  f_csptr_link (f, f->shr->end_chunk, c);
   prev->next = 0;
-  f->end_chunk->next = c;
-  f->end_chunk = prev;
+  f->shr->end_chunk = f_csptr (f, prev);
 
-  if (!f->tail_chunk)
-    f->tail_chunk = c;
+  if (!f->shr->tail_chunk)
+    f->shr->tail_chunk = f_csptr (f, c);
 
   return 0;
 }
@@ -855,19 +862,19 @@ svm_fifo_enqueue (svm_fifo_t * f, u32 len, const u8 * src)
   /* number of bytes we're going to copy */
   len = clib_min (free_count, len);
 
-  if (f_pos_gt (tail + len, f_chunk_end (f->end_chunk)))
+  if (f_pos_gt (tail + len, f_chunk_end (f_end_cptr (f))))
     {
       if (PREDICT_FALSE (f_try_chunk_alloc (f, head, tail, len)))
        {
-         len = f_chunk_end (f->end_chunk) - tail;
+         len = f_chunk_end (f_end_cptr (f)) - tail;
          if (!len)
            return SVM_FIFO_EGROW;
        }
     }
 
-  old_tail_c = f->tail_chunk;
+  old_tail_c = f_tail_cptr (f);
 
-  svm_fifo_copy_to_chunk (f, f->tail_chunk, tail, src, len, &f->tail_chunk);
+  svm_fifo_copy_to_chunk (f, old_tail_c, tail, src, len, &f->shr->tail_chunk);
   tail = tail + len;
 
   svm_fifo_trace_add (f, head, len, 2);
@@ -877,12 +884,13 @@ svm_fifo_enqueue (svm_fifo_t * f, u32 len, const u8 * src)
     {
       len += ooo_segment_try_collect (f, len, &tail);
       /* Tail chunk might've changed even if nothing was collected */
-      f->tail_chunk = f_lookup_clear_enq_chunks (f, old_tail_c, tail);
+      f->shr->tail_chunk =
+       f_csptr (f, f_lookup_clear_enq_chunks (f, old_tail_c, tail));
       f->ooo_enq = 0;
     }
 
   /* store-rel: producer owned index (paired with load-acq in consumer) */
-  clib_atomic_store_rel_n (&f->tail, tail);
+  clib_atomic_store_rel_n (&f->shr->tail, tail);
 
   return len;
 }
@@ -898,6 +906,7 @@ int
 svm_fifo_enqueue_with_offset (svm_fifo_t * f, u32 offset, u32 len, u8 * src)
 {
   u32 tail, head, free_count, enq_pos;
+  svm_fifo_chunk_ptr_t last = F_INVALID_CPTR;
 
   f_load_head_tail_prod (f, &head, &tail);
 
@@ -911,7 +920,7 @@ svm_fifo_enqueue_with_offset (svm_fifo_t * f, u32 offset, u32 len, u8 * src)
 
   enq_pos = tail + offset;
 
-  if (f_pos_gt (enq_pos + len, f_chunk_end (f->end_chunk)))
+  if (f_pos_gt (enq_pos + len, f_chunk_end (f_end_cptr (f))))
     {
       if (PREDICT_FALSE (f_try_chunk_alloc (f, head, tail, offset + len)))
        return SVM_FIFO_EGROW;
@@ -923,7 +932,9 @@ svm_fifo_enqueue_with_offset (svm_fifo_t * f, u32 offset, u32 len, u8 * src)
   if (!f->ooo_enq || !f_chunk_includes_pos (f->ooo_enq, enq_pos))
     f_update_ooo_enq (f, enq_pos, enq_pos + len);
 
-  svm_fifo_copy_to_chunk (f, f->ooo_enq, enq_pos, src, len, &f->ooo_enq);
+  svm_fifo_copy_to_chunk (f, f->ooo_enq, enq_pos, src, len, &last);
+  if (last != F_INVALID_CPTR)
+    f->ooo_enq = f_cptr (f, last);
 
   return 0;
 }
@@ -938,21 +949,23 @@ svm_fifo_enqueue_nocopy (svm_fifo_t * f, u32 len)
 
   ASSERT (len <= svm_fifo_max_enqueue_prod (f));
   /* load-relaxed: producer owned index */
-  tail = f->tail;
+  tail = f->shr->tail;
   tail = tail + len;
 
   if (rb_tree_is_init (&f->ooo_enq_lookup))
     {
-      f->tail_chunk = f_lookup_clear_enq_chunks (f, f->tail_chunk, tail);
+      f->shr->tail_chunk =
+       f_csptr (f, f_lookup_clear_enq_chunks (f, f_tail_cptr (f), tail));
       f->ooo_enq = 0;
     }
   else
     {
-      f->tail_chunk = svm_fifo_find_next_chunk (f, f->tail_chunk, tail);
+      f->shr->tail_chunk =
+       f_csptr (f, svm_fifo_find_next_chunk (f, f_tail_cptr (f), tail));
     }
 
   /* store-rel: producer owned index (paired with load-acq in consumer) */
-  clib_atomic_store_rel_n (&f->tail, tail);
+  clib_atomic_store_rel_n (&f->shr->tail, tail);
 }
 
 int
@@ -975,14 +988,14 @@ svm_fifo_enqueue_segments (svm_fifo_t * f, const svm_fifo_seg_t segs[],
   for (i = 0; i < n_segs; i++)
     len += segs[i].len;
 
-  old_tail_c = f->tail_chunk;
+  old_tail_c = f_tail_cptr (f);
 
   if (!allow_partial)
     {
       if (PREDICT_FALSE (free_count < len))
        return SVM_FIFO_EFULL;
 
-      if (f_pos_gt (tail + len, f_chunk_end (f->end_chunk)))
+      if (f_pos_gt (tail + len, f_chunk_end (f_end_cptr (f))))
        {
          if (PREDICT_FALSE (f_try_chunk_alloc (f, head, tail, len)))
            return SVM_FIFO_EGROW;
@@ -990,8 +1003,8 @@ svm_fifo_enqueue_segments (svm_fifo_t * f, const svm_fifo_seg_t segs[],
 
       for (i = 0; i < n_segs; i++)
        {
-         svm_fifo_copy_to_chunk (f, f->tail_chunk, tail, segs[i].data,
-                                 segs[i].len, &f->tail_chunk);
+         svm_fifo_copy_to_chunk (f, f_tail_cptr (f), tail, segs[i].data,
+                                 segs[i].len, &f->shr->tail_chunk);
          tail += segs[i].len;
        }
     }
@@ -999,11 +1012,11 @@ svm_fifo_enqueue_segments (svm_fifo_t * f, const svm_fifo_seg_t segs[],
     {
       len = clib_min (free_count, len);
 
-      if (f_pos_gt (tail + len, f_chunk_end (f->end_chunk)))
+      if (f_pos_gt (tail + len, f_chunk_end (f_end_cptr (f))))
        {
          if (PREDICT_FALSE (f_try_chunk_alloc (f, head, tail, len)))
            {
-             len = f_chunk_end (f->end_chunk) - tail;
+             len = f_chunk_end (f_end_cptr (f)) - tail;
              if (!len)
                return SVM_FIFO_EGROW;
            }
@@ -1013,8 +1026,8 @@ svm_fifo_enqueue_segments (svm_fifo_t * f, const svm_fifo_seg_t segs[],
       while (len)
        {
          u32 to_copy = clib_min (segs[i].len, len);
-         svm_fifo_copy_to_chunk (f, f->tail_chunk, tail, segs[i].data,
-                                 to_copy, &f->tail_chunk);
+         svm_fifo_copy_to_chunk (f, f_tail_cptr (f), tail, segs[i].data,
+                                 to_copy, &f->shr->tail_chunk);
          len -= to_copy;
          tail += to_copy;
          i++;
@@ -1026,12 +1039,13 @@ svm_fifo_enqueue_segments (svm_fifo_t * f, const svm_fifo_seg_t segs[],
     {
       len += ooo_segment_try_collect (f, len, &tail);
       /* Tail chunk might've changed even if nothing was collected */
-      f->tail_chunk = f_lookup_clear_enq_chunks (f, old_tail_c, tail);
+      f->shr->tail_chunk =
+       f_csptr (f, f_lookup_clear_enq_chunks (f, old_tail_c, tail));
       f->ooo_enq = 0;
     }
 
   /* store-rel: producer owned index (paired with load-acq in consumer) */
-  clib_atomic_store_rel_n (&f->tail, tail);
+  clib_atomic_store_rel_n (&f->shr->tail, tail);
 
   return len;
 }
@@ -1043,12 +1057,11 @@ f_unlink_chunks (svm_fifo_t * f, u32 end_pos, u8 maybe_ooo)
   rb_tree_t *rt;
   rb_node_t *n;
 
-  ASSERT (!f_chunk_includes_pos (f->start_chunk, end_pos));
-
   if (maybe_ooo)
     rt = &f->ooo_deq_lookup;
 
-  c = f->start_chunk;
+  c = f_start_cptr (f);
+  ASSERT (!f_chunk_includes_pos (c, end_pos));
 
   do
     {
@@ -1062,7 +1075,7 @@ f_unlink_chunks (svm_fifo_t * f, u32 end_pos, u8 maybe_ooo)
       if (!c->next)
        break;
       prev = c;
-      c = c->next;
+      c = f_cptr (f, c->next);
     }
   while (!f_chunk_includes_pos (c, end_pos));
 
@@ -1082,8 +1095,8 @@ f_unlink_chunks (svm_fifo_t * f, u32 end_pos, u8 maybe_ooo)
     return 0;
 
   prev->next = 0;
-  start = f->start_chunk;
-  f->start_chunk = c;
+  start = f_start_cptr (f);
+  f->shr->start_chunk = f_csptr (f, c);
 
   return start;
 }
@@ -1103,22 +1116,23 @@ svm_fifo_dequeue (svm_fifo_t * f, u32 len, u8 * dst)
 
   len = clib_min (cursize, len);
 
-  if (!f->head_chunk)
-    f->head_chunk = svm_fifo_find_chunk (f, head);
+  if (!f->shr->head_chunk)
+    f->shr->head_chunk = f_csptr (f, svm_fifo_find_chunk (f, head));
 
-  svm_fifo_copy_from_chunk (f, f->head_chunk, head, dst, len, &f->head_chunk);
+  svm_fifo_copy_from_chunk (f, f_head_cptr (f), head, dst, len,
+                           &f->shr->head_chunk);
   head = head + len;
 
   /* In order dequeues are not supported in combination with ooo peeking.
    * Use svm_fifo_dequeue_drop instead. */
   ASSERT (rb_tree_n_nodes (&f->ooo_deq_lookup) <= 1);
 
-  if (f_pos_geq (head, f_chunk_end (f->start_chunk)))
-    fsh_collect_chunks (f->fs_hdr, f->slice_index,
+  if (f_pos_geq (head, f_chunk_end (f_start_cptr (f))))
+    fsh_collect_chunks (f->fs_hdr, f->shr->slice_index,
                        f_unlink_chunks (f, head, 0));
 
   /* store-rel: consumer owned index (paired with load-acq in producer) */
-  clib_atomic_store_rel_n (&f->head, head);
+  clib_atomic_store_rel_n (&f->shr->head, head);
 
   return len;
 }
@@ -1127,6 +1141,7 @@ int
 svm_fifo_peek (svm_fifo_t * f, u32 offset, u32 len, u8 * dst)
 {
   u32 tail, head, cursize, head_idx;
+  svm_fifo_chunk_ptr_t last = F_INVALID_CPTR;
 
   f_load_head_tail_cons (f, &head, &tail);
 
@@ -1143,7 +1158,9 @@ svm_fifo_peek (svm_fifo_t * f, u32 offset, u32 len, u8 * dst)
   if (!f->ooo_deq || !f_chunk_includes_pos (f->ooo_deq, head_idx))
     f_update_ooo_deq (f, head_idx, head_idx + len);
 
-  svm_fifo_copy_from_chunk (f, f->ooo_deq, head_idx, dst, len, &f->ooo_deq);
+  svm_fifo_copy_from_chunk (f, f->ooo_deq, head_idx, dst, len, &last);
+  if (last != F_INVALID_CPTR)
+    f->ooo_deq = f_cptr (f, last);
   return len;
 }
 
@@ -1167,16 +1184,17 @@ svm_fifo_dequeue_drop (svm_fifo_t * f, u32 len)
   /* move head */
   head = head + total_drop_bytes;
 
-  if (f_pos_geq (head, f_chunk_end (f->start_chunk)))
+  if (f_pos_geq (head, f_chunk_end (f_start_cptr (f))))
     {
-      fsh_collect_chunks (f->fs_hdr, f->slice_index,
+      fsh_collect_chunks (f->fs_hdr, f->shr->slice_index,
                          f_unlink_chunks (f, head, 1));
-      f->head_chunk =
-       f_chunk_includes_pos (f->start_chunk, head) ? f->start_chunk : 0;
+      f->shr->head_chunk = f_chunk_includes_pos (f_start_cptr (f), head) ?
+                            f->shr->start_chunk :
+                            0;
     }
 
   /* store-rel: consumer owned index (paired with load-acq in producer) */
-  clib_atomic_store_rel_n (&f->head, head);
+  clib_atomic_store_rel_n (&f->shr->head, head);
 
   return total_drop_bytes;
 }
@@ -1192,17 +1210,18 @@ svm_fifo_dequeue_drop_all (svm_fifo_t * f)
 
   f_load_head_tail_all_acq (f, &head, &tail);
 
-  if (!f->head_chunk || !f_chunk_includes_pos (f->head_chunk, head))
-    f->head_chunk = svm_fifo_find_chunk (f, head);
+  if (!f->shr->head_chunk || !f_chunk_includes_pos (f_head_cptr (f), head))
+    f->shr->head_chunk = f_csptr (f, svm_fifo_find_chunk (f, head));
 
-  f->head_chunk = f_lookup_clear_deq_chunks (f, f->head_chunk, tail);
+  f->shr->head_chunk =
+    f_csptr (f, f_lookup_clear_deq_chunks (f, f_head_cptr (f), tail));
 
-  if (f_pos_geq (tail, f_chunk_end (f->start_chunk)))
-    fsh_collect_chunks (f->fs_hdr, f->slice_index,
+  if (f_pos_geq (tail, f_chunk_end (f_start_cptr (f))))
+    fsh_collect_chunks (f->fs_hdr, f->shr->slice_index,
                        f_unlink_chunks (f, tail, 0));
 
   /* store-rel: consumer owned index (paired with load-acq in producer) */
-  clib_atomic_store_rel_n (&f->head, tail);
+  clib_atomic_store_rel_n (&f->shr->head, tail);
 }
 
 int
@@ -1212,15 +1231,51 @@ svm_fifo_fill_chunk_list (svm_fifo_t * f)
 
   f_load_head_tail_prod (f, &head, &tail);
 
-  if (f_chunk_end (f->end_chunk) - head >= f->size)
+  if (f_chunk_end (f_end_cptr (f)) - head >= f->shr->size)
     return 0;
 
-  if (f_try_chunk_alloc (f, head, tail, f->size - (tail - head)))
+  if (f_try_chunk_alloc (f, head, tail, f->shr->size - (tail - head)))
     return SVM_FIFO_EGROW;
 
   return 0;
 }
 
+int
+svm_fifo_provision_chunks (svm_fifo_t *f, svm_fifo_seg_t *fs, u32 n_segs,
+                          u32 len)
+{
+  u32 head, tail, n_avail, head_pos, n_bytes, fs_index = 1, clen;
+  svm_fifo_chunk_t *c;
+
+  f_load_head_tail_prod (f, &head, &tail);
+
+  if (f_free_count (f, head, tail) < len)
+    return SVM_FIFO_EFULL;
+
+  n_avail = f_chunk_end (f_end_cptr (f)) - tail;
+
+  if (n_avail < len && f_try_chunk_alloc (f, head, tail, len))
+    return SVM_FIFO_EGROW;
+
+  c = f_tail_cptr (f);
+  head_pos = (tail - c->start_byte);
+  fs[0].data = c->data + head_pos;
+  fs[0].len = clib_min (c->length - head_pos, len);
+  n_bytes = fs[0].len;
+
+  while (n_bytes < len && fs_index < n_segs)
+    {
+      c = f_cptr (f, c->next);
+      clen = clib_min (c->length, len - n_bytes);
+      fs[fs_index].data = c->data;
+      fs[fs_index].len = clen;
+      n_bytes += clen;
+      fs_index += 1;
+    }
+
+  return fs_index;
+}
+
 int
 svm_fifo_segments (svm_fifo_t * f, u32 offset, svm_fifo_seg_t * fs,
                   u32 n_segs, u32 max_bytes)
@@ -1243,22 +1298,22 @@ svm_fifo_segments (svm_fifo_t * f, u32 offset, svm_fifo_seg_t * fs,
   to_read = clib_min (cursize - offset, max_bytes);
   start = head + offset;
 
-  if (!f->head_chunk)
-    f->head_chunk = svm_fifo_find_chunk (f, head);
+  if (!f->shr->head_chunk)
+    f->shr->head_chunk = f_csptr (f, svm_fifo_find_chunk (f, head));
 
-  c = f->head_chunk;
+  c = f_head_cptr (f);
 
   while (!f_chunk_includes_pos (c, start))
-    c = c->next;
+    c = f_cptr (f, c->next);
 
   head_pos = start - c->start_byte;
   fs[0].data = c->data + head_pos;
-  fs[0].len = clib_min (c->length - head_pos, cursize - offset);
+  fs[0].len = clib_min (c->length - head_pos, to_read);
   n_bytes = fs[0].len;
 
   while (n_bytes < to_read && fs_index < n_segs)
     {
-      c = c->next;
+      c = f_cptr (f, c->next);
       len = clib_min (c->length, to_read - n_bytes);
       fs[fs_index].data = c->data;
       fs[fs_index].len = len;
@@ -1284,11 +1339,12 @@ svm_fifo_clone (svm_fifo_t * df, svm_fifo_t * sf)
   /* Support only single chunk clones for now */
   ASSERT (svm_fifo_n_chunks (sf) == 1);
 
-  clib_memcpy_fast (df->head_chunk->data, sf->head_chunk->data, sf->size);
+  clib_memcpy_fast (f_head_cptr (df)->data, f_head_cptr (sf)->data,
+                   f_head_cptr (sf)->length);
 
   f_load_head_tail_all_acq (sf, &head, &tail);
-  clib_atomic_store_rel_n (&df->head, head);
-  clib_atomic_store_rel_n (&df->tail, tail);
+  clib_atomic_store_rel_n (&df->shr->head, head);
+  clib_atomic_store_rel_n (&df->shr->tail, tail);
 }
 
 u32
@@ -1311,23 +1367,25 @@ svm_fifo_init_pointers (svm_fifo_t * f, u32 head, u32 tail)
 {
   svm_fifo_chunk_t *c;
 
-  clib_atomic_store_rel_n (&f->head, head);
-  clib_atomic_store_rel_n (&f->tail, tail);
+  clib_atomic_store_rel_n (&f->shr->head, head);
+  clib_atomic_store_rel_n (&f->shr->tail, tail);
 
   c = svm_fifo_find_chunk (f, head);
   ASSERT (c != 0);
-  f->head_chunk = f->ooo_deq = c;
+  f->ooo_deq = c;
+  f->shr->head_chunk = f_csptr (f, c);
   c = svm_fifo_find_chunk (f, tail);
   ASSERT (c != 0);
-  f->tail_chunk = f->ooo_enq = c;
+  f->ooo_enq = c;
+  f->shr->tail_chunk = f_csptr (f, c);
 }
 
 void
 svm_fifo_add_subscriber (svm_fifo_t * f, u8 subscriber)
 {
-  if (f->n_subscribers >= SVM_FIFO_MAX_EVT_SUBSCRIBERS)
+  if (f->shr->n_subscribers >= SVM_FIFO_MAX_EVT_SUBSCRIBERS)
     return;
-  f->subscribers[f->n_subscribers++] = subscriber;
+  f->shr->subscribers[f->shr->n_subscribers++] = subscriber;
 }
 
 void
@@ -1335,12 +1393,12 @@ svm_fifo_del_subscriber (svm_fifo_t * f, u8 subscriber)
 {
   int i;
 
-  for (i = 0; i < f->n_subscribers; i++)
+  for (i = 0; i < f->shr->n_subscribers; i++)
     {
-      if (f->subscribers[i] != subscriber)
+      if (f->shr->subscribers[i] != subscriber)
        continue;
-      f->subscribers[i] = f->subscribers[f->n_subscribers - 1];
-      f->n_subscribers--;
+      f->shr->subscribers[i] = f->shr->subscribers[f->shr->n_subscribers - 1];
+      f->shr->n_subscribers--;
       break;
     }
 }
@@ -1350,17 +1408,19 @@ svm_fifo_is_sane (svm_fifo_t * f)
 {
   svm_fifo_chunk_t *tmp;
 
-  if (f->head_chunk && !f_chunk_includes_pos (f->head_chunk, f->head))
+  if (f->shr->head_chunk &&
+      !f_chunk_includes_pos (f_head_cptr (f), f->shr->head))
     return 0;
-  if (f->tail_chunk && !f_chunk_includes_pos (f->tail_chunk, f->tail))
+  if (f->shr->tail_chunk &&
+      !f_chunk_includes_pos (f_tail_cptr (f), f->shr->tail))
     return 0;
   if (f->ooo_deq)
     {
       if (rb_tree_is_init (&f->ooo_deq_lookup))
        {
-         if (f_pos_lt (f->ooo_deq->start_byte, f->start_chunk->start_byte)
-             || f_pos_gt (f->ooo_deq->start_byte,
-                          f_chunk_end (f->end_chunk)))
+         if (f_pos_lt (f->ooo_deq->start_byte,
+                       f_start_cptr (f)->start_byte) ||
+             f_pos_gt (f->ooo_deq->start_byte, f_chunk_end (f_end_cptr (f))))
            return 0;
 
          tmp = f_find_chunk_rbtree (&f->ooo_deq_lookup,
@@ -1375,9 +1435,9 @@ svm_fifo_is_sane (svm_fifo_t * f)
     {
       if (rb_tree_is_init (&f->ooo_enq_lookup))
        {
-         if (f_pos_lt (f->ooo_enq->start_byte, f->start_chunk->start_byte)
-             || f_pos_gt (f->ooo_enq->start_byte,
-                          f_chunk_end (f->end_chunk)))
+         if (f_pos_lt (f->ooo_enq->start_byte,
+                       f_start_cptr (f)->start_byte) ||
+             f_pos_gt (f->ooo_enq->start_byte, f_chunk_end (f_end_cptr (f))))
            return 0;
 
          tmp = f_find_chunk_rbtree (&f->ooo_enq_lookup,
@@ -1385,19 +1445,19 @@ svm_fifo_is_sane (svm_fifo_t * f)
        }
       else
        {
-         tmp = svm_fifo_find_next_chunk (f, f->tail_chunk,
+         tmp = svm_fifo_find_next_chunk (f, f_tail_cptr (f),
                                          f->ooo_enq->start_byte);
        }
       if (tmp != f->ooo_enq)
        return 0;
     }
 
-  if (f->start_chunk->next)
+  if (f_start_cptr (f)->next)
     {
       svm_fifo_chunk_t *c, *prev = 0, *tmp;
       u32 chunks_bytes = 0;
 
-      c = f->start_chunk;
+      c = f_start_cptr (f);
       do
        {
          tmp = svm_fifo_find_chunk (f, c->start_byte);
@@ -1427,11 +1487,11 @@ svm_fifo_is_sane (svm_fifo_t * f)
 
          chunks_bytes += c->length;
          prev = c;
-         c = c->next;
+         c = f_cptr (f, c->next);
        }
       while (c);
 
-      if (chunks_bytes < f->tail - f->head)
+      if (chunks_bytes < f->shr->tail - f->shr->head)
        return 0;
     }
 
@@ -1444,11 +1504,11 @@ svm_fifo_n_chunks (svm_fifo_t * f)
   svm_fifo_chunk_t *c;
   int n_chunks = 0;
 
-  c = f->start_chunk;
+  c = f_start_cptr (f);
   while (c)
     {
       n_chunks++;
-      c = c->next;
+      c = f_cptr (f, c->next);
     }
 
   return n_chunks;
@@ -1508,10 +1568,10 @@ svm_fifo_replay (u8 * s, svm_fifo_t * f, u8 no_read, u8 verbose)
   trace_len = 0;
 #endif
 
-  placeholder_fifo = svm_fifo_alloc (f->size);
-  svm_fifo_init (f, f->size);
-  clib_memset (f->head_chunk->data, 0xFF, f->size);
-  vec_validate (data, f->size);
+  placeholder_fifo = svm_fifo_alloc (f->shr->size);
+  svm_fifo_init (f, f->shr->size);
+  clib_memset (f_head_cptr (f)->data, 0xFF, f->shr->size);
+  vec_validate (data, f->shr->size);
   for (i = 0; i < vec_len (data); i++)
     data[i] = i;
 
@@ -1578,14 +1638,15 @@ format_svm_fifo (u8 * s, va_list * args)
 
   indent = format_get_indent (s);
   s = format (s, "cursize %u nitems %u has_event %d min_alloc %u\n",
-             svm_fifo_max_dequeue (f), f->size, f->has_event, f->min_alloc);
+             svm_fifo_max_dequeue (f), f->shr->size, f->shr->has_event,
+             f->shr->min_alloc);
   s = format (s, "%Uhead %u tail %u segment manager %u\n", format_white_space,
-             indent, f->head, f->tail, f->segment_manager);
+             indent, f->shr->head, f->shr->tail, f->segment_manager);
 
   if (verbose > 1)
     s = format (s, "%Uvpp session %d thread %d app session %d thread %d\n",
-               format_white_space, indent, f->master_session_index,
-               f->master_thread_index, f->client_session_index,
+               format_white_space, indent, f->shr->master_session_index,
+               f->master_thread_index, f->shr->client_session_index,
                f->client_thread_index);
 
   if (verbose)