+ /* Use linear search if rbtree is not initialized */
+ if (PREDICT_FALSE (!rb_tree_is_init (rt)))
+ {
+ f->ooo_deq = svm_fifo_find_chunk (f, start_pos);
+ return;
+ }
+
+ if (rt->root == RBTREE_TNIL_INDEX)
+ {
+ c = f_start_cptr (f);
+ ASSERT (c->deq_rb_index == RBTREE_TNIL_INDEX);
+ c->deq_rb_index = rb_tree_add_custom (rt, c->start_byte,
+ pointer_to_uword (c), f_pos_lt);
+ }
+ else
+ {
+ cur = f_find_node_rbtree (rt, start_pos);
+ c = uword_to_pointer (cur->opaque, svm_fifo_chunk_t *);
+ ASSERT (f_pos_leq (c->start_byte, start_pos));
+ }
+
+ if (f_chunk_includes_pos (c, start_pos))
+ f->ooo_deq = c;
+
+ if (f_chunk_includes_pos (c, end_pos))
+ return;
+
+ do
+ {
+ c = f_cptr (f, c->next);
+ if (!c || c->deq_rb_index != RBTREE_TNIL_INDEX)
+ break;
+
+ c->deq_rb_index = rb_tree_add_custom (rt, c->start_byte,
+ pointer_to_uword (c), f_pos_lt);
+
+ if (f_chunk_includes_pos (c, start_pos))
+ f->ooo_deq = c;
+ }
+ while (!f_chunk_includes_pos (c, end_pos));
+}
+
+static svm_fifo_chunk_t *
+f_lookup_clear_enq_chunks (svm_fifo_t * f, svm_fifo_chunk_t * start,
+ u32 end_pos)
+{
+ rb_tree_t *rt = &f->ooo_enq_lookup;
+ svm_fifo_chunk_t *c;
+ rb_node_t *n;
+
+ c = start;
+ while (c && !f_chunk_includes_pos (c, end_pos))
+ {
+ if (c->enq_rb_index != RBTREE_TNIL_INDEX)
+ {
+ n = rb_node (rt, c->enq_rb_index);
+ rb_tree_del_node (rt, n);
+ c->enq_rb_index = RBTREE_TNIL_INDEX;
+ }
+
+ c = f_cptr (f, c->next);
+ }
+
+ /* No ooo segments left, so make sure the current chunk
+ * is not tracked in the enq rbtree */
+ if (f->ooos_list_head == OOO_SEGMENT_INVALID_INDEX
+ && c && c->enq_rb_index != RBTREE_TNIL_INDEX)
+ {
+ n = rb_node (rt, c->enq_rb_index);
+ rb_tree_del_node (rt, n);
+ c->enq_rb_index = RBTREE_TNIL_INDEX;
+ }
+
+ return c;
+}
+
+static svm_fifo_chunk_t *
+f_lookup_clear_deq_chunks (svm_fifo_t * f, svm_fifo_chunk_t * start,
+ u32 end_pos)
+{
+ rb_tree_t *rt = &f->ooo_deq_lookup;
+ svm_fifo_chunk_t *c;
+ rb_node_t *n;
+
+ c = start;
+ while (c && !f_chunk_includes_pos (c, end_pos))
+ {
+ if (c->deq_rb_index != RBTREE_TNIL_INDEX)
+ {
+ n = rb_node (rt, c->deq_rb_index);
+ rb_tree_del_node (rt, n);
+ c->deq_rb_index = RBTREE_TNIL_INDEX;
+ }
+
+ c = f_cptr (f, c->next);
+ }
+
+ return c;
+}
+
+void
+svm_fifo_free_chunk_lookup (svm_fifo_t * f)
+{
+ rb_tree_free_nodes (&f->ooo_enq_lookup);
+ rb_tree_free_nodes (&f->ooo_deq_lookup);
+}
+
+void
+svm_fifo_free (svm_fifo_t * f)
+{
+ ASSERT (f->refcnt > 0);
+
+ if (--f->refcnt == 0)
+ {
+ /* ooo data is not allocated on segment heap */
+ svm_fifo_free_chunk_lookup (f);
+ clib_mem_free (f);
+ }
+}
+
+void
+svm_fifo_overwrite_head (svm_fifo_t * f, u8 * src, u32 len)
+{
+ u32 n_chunk;
+ u32 head, tail, head_idx;
+ svm_fifo_chunk_t *c;
+
+ ASSERT (len <= f->shr->size);
+
+ f_load_head_tail_cons (f, &head, &tail);
+
+ if (!f->shr->head_chunk)
+ f->shr->head_chunk = f_csptr (f, svm_fifo_find_chunk (f, head));
+
+ c = f_head_cptr (f);
+ head_idx = head - c->start_byte;
+ n_chunk = c->length - head_idx;
+ if (len <= n_chunk)
+ clib_memcpy_fast (&c->data[head_idx], src, len);
+ else
+ {
+ ASSERT (len - n_chunk <= f_cptr (f, c->next)->length);
+ clib_memcpy_fast (&c->data[head_idx], src, n_chunk);
+ clib_memcpy_fast (&f_cptr (f, c->next)->data[0], src + n_chunk,
+ len - n_chunk);
+ }
+}
+
+static int
+f_try_chunk_alloc (svm_fifo_t * f, u32 head, u32 tail, u32 len)
+{
+ svm_fifo_chunk_t *c, *cur, *prev;
+ u32 alloc_size, free_alloced;
+
+ prev = f_end_cptr (f);
+ free_alloced = f_chunk_end (prev) - tail;
+
+ alloc_size = clib_min (f->shr->min_alloc, f->shr->size - (tail - head));
+ alloc_size = clib_max (alloc_size, len - free_alloced);
+
+ c = fsh_alloc_chunk (f->fs_hdr, f->shr->slice_index, alloc_size);
+ if (PREDICT_FALSE (!c))
+ return -1;
+
+ cur = c;
+
+ while (cur)
+ {
+ cur->start_byte = prev->start_byte + prev->length;
+ cur->enq_rb_index = RBTREE_TNIL_INDEX;
+ cur->deq_rb_index = RBTREE_TNIL_INDEX;
+
+ prev = cur;
+ cur = f_cptr (f, cur->next);
+ }
+
+ f_csptr_link (f, f->shr->end_chunk, c);
+ prev->next = 0;
+ f->shr->end_chunk = f_csptr (f, prev);
+
+ if (!f->shr->tail_chunk)
+ f->shr->tail_chunk = f_csptr (f, c);
+
+ return 0;
+}
+
+int
+svm_fifo_enqueue (svm_fifo_t * f, u32 len, const u8 * src)
+{
+ u32 tail, head, free_count;
+ svm_fifo_chunk_t *old_tail_c;
+
+ f->ooos_newest = OOO_SEGMENT_INVALID_INDEX;
+
+ f_load_head_tail_prod (f, &head, &tail);
+
+ /* free space in fifo can only increase during enqueue: SPSC */
+ free_count = f_free_count (f, head, tail);
+
+ if (PREDICT_FALSE (free_count == 0))
+ return SVM_FIFO_EFULL;
+
+ /* number of bytes we're going to copy */
+ len = clib_min (free_count, len);
+
+ if (f_pos_gt (tail + len, f_chunk_end (f_end_cptr (f))))
+ {
+ if (PREDICT_FALSE (f_try_chunk_alloc (f, head, tail, len)))
+ {
+ len = f_chunk_end (f_end_cptr (f)) - tail;
+ if (!len)
+ return SVM_FIFO_EGROW;
+ }
+ }
+
+ old_tail_c = f_tail_cptr (f);
+
+ svm_fifo_copy_to_chunk (f, old_tail_c, tail, src, len, &f->shr->tail_chunk);
+ tail = tail + len;
+
+ svm_fifo_trace_add (f, head, len, 2);
+
+ /* collect out-of-order segments */
+ if (PREDICT_FALSE (f->ooos_list_head != OOO_SEGMENT_INVALID_INDEX))
+ {
+ len += ooo_segment_try_collect (f, len, &tail);
+ /* Tail chunk might've changed even if nothing was collected */
+ f->shr->tail_chunk =
+ f_csptr (f, f_lookup_clear_enq_chunks (f, old_tail_c, tail));
+ f->ooo_enq = 0;
+ }
+
+ /* store-rel: producer owned index (paired with load-acq in consumer) */
+ clib_atomic_store_rel_n (&f->shr->tail, tail);
+
+ return len;
+}
+
+/**
+ * Enqueue a future segment.
+ *
+ * Two choices: either copies the entire segment, or copies nothing
+ * Returns 0 of the entire segment was copied
+ * Returns -1 if none of the segment was copied due to lack of space
+ */
+int
+svm_fifo_enqueue_with_offset (svm_fifo_t * f, u32 offset, u32 len, u8 * src)
+{
+ u32 tail, head, free_count, enq_pos;
+ fs_sptr_t last = F_INVALID_CPTR;
+
+ f_load_head_tail_prod (f, &head, &tail);
+
+ /* free space in fifo can only increase during enqueue: SPSC */
+ free_count = f_free_count (f, head, tail);
+ f->ooos_newest = OOO_SEGMENT_INVALID_INDEX;
+
+ /* will this request fit? */
+ if ((len + offset) > free_count)
+ return SVM_FIFO_EFULL;
+
+ enq_pos = tail + offset;
+
+ if (f_pos_gt (enq_pos + len, f_chunk_end (f_end_cptr (f))))
+ {
+ if (PREDICT_FALSE (f_try_chunk_alloc (f, head, tail, offset + len)))
+ return SVM_FIFO_EGROW;
+ }
+
+ svm_fifo_trace_add (f, offset, len, 1);
+ ooo_segment_add (f, offset, head, tail, len);
+
+ if (!f->ooo_enq || !f_chunk_includes_pos (f->ooo_enq, enq_pos))
+ f_update_ooo_enq (f, enq_pos, enq_pos + len);
+
+ svm_fifo_copy_to_chunk (f, f->ooo_enq, enq_pos, src, len, &last);
+ if (last != F_INVALID_CPTR)
+ f->ooo_enq = f_cptr (f, last);
+
+ return 0;
+}
+
+/**
+ * Advance tail
+ */
+void
+svm_fifo_enqueue_nocopy (svm_fifo_t * f, u32 len)
+{
+ u32 tail;
+
+ ASSERT (len <= svm_fifo_max_enqueue_prod (f));
+ /* load-relaxed: producer owned index */
+ tail = f->shr->tail;
+ tail = tail + len;
+
+ if (rb_tree_is_init (&f->ooo_enq_lookup))
+ {
+ f->shr->tail_chunk =
+ f_csptr (f, f_lookup_clear_enq_chunks (f, f_tail_cptr (f), tail));
+ f->ooo_enq = 0;
+ }
+ else
+ {
+ f->shr->tail_chunk =
+ f_csptr (f, svm_fifo_find_next_chunk (f, f_tail_cptr (f), tail));
+ }
+
+ /* store-rel: producer owned index (paired with load-acq in consumer) */
+ clib_atomic_store_rel_n (&f->shr->tail, tail);
+}
+
+int
+svm_fifo_enqueue_segments (svm_fifo_t * f, const svm_fifo_seg_t segs[],
+ u32 n_segs, u8 allow_partial)
+{
+ u32 tail, head, free_count, len = 0, i;
+ svm_fifo_chunk_t *old_tail_c;
+
+ f->ooos_newest = OOO_SEGMENT_INVALID_INDEX;
+
+ f_load_head_tail_prod (f, &head, &tail);
+
+ /* free space in fifo can only increase during enqueue: SPSC */
+ free_count = f_free_count (f, head, tail);
+
+ if (PREDICT_FALSE (free_count == 0))
+ return SVM_FIFO_EFULL;
+
+ for (i = 0; i < n_segs; i++)
+ len += segs[i].len;
+
+ old_tail_c = f_tail_cptr (f);
+
+ if (!allow_partial)
+ {
+ if (PREDICT_FALSE (free_count < len))
+ return SVM_FIFO_EFULL;
+
+ if (f_pos_gt (tail + len, f_chunk_end (f_end_cptr (f))))
+ {
+ if (PREDICT_FALSE (f_try_chunk_alloc (f, head, tail, len)))
+ return SVM_FIFO_EGROW;
+ }
+
+ for (i = 0; i < n_segs; i++)
+ {
+ svm_fifo_copy_to_chunk (f, f_tail_cptr (f), tail, segs[i].data,
+ segs[i].len, &f->shr->tail_chunk);
+ tail += segs[i].len;
+ }
+ }
+ else
+ {
+ len = clib_min (free_count, len);
+
+ if (f_pos_gt (tail + len, f_chunk_end (f_end_cptr (f))))
+ {
+ if (PREDICT_FALSE (f_try_chunk_alloc (f, head, tail, len)))
+ {
+ len = f_chunk_end (f_end_cptr (f)) - tail;
+ if (!len)
+ return SVM_FIFO_EGROW;
+ }
+ }
+
+ i = 0;
+ while (len)
+ {
+ u32 to_copy = clib_min (segs[i].len, len);
+ svm_fifo_copy_to_chunk (f, f_tail_cptr (f), tail, segs[i].data,
+ to_copy, &f->shr->tail_chunk);
+ len -= to_copy;
+ tail += to_copy;
+ i++;
+ }
+ }
+
+ /* collect out-of-order segments */
+ if (PREDICT_FALSE (f->ooos_list_head != OOO_SEGMENT_INVALID_INDEX))
+ {
+ len += ooo_segment_try_collect (f, len, &tail);
+ /* Tail chunk might've changed even if nothing was collected */
+ f->shr->tail_chunk =
+ f_csptr (f, f_lookup_clear_enq_chunks (f, old_tail_c, tail));
+ f->ooo_enq = 0;
+ }
+
+ /* store-rel: producer owned index (paired with load-acq in consumer) */
+ clib_atomic_store_rel_n (&f->shr->tail, tail);
+
+ return len;
+}
+
+always_inline svm_fifo_chunk_t *
+f_unlink_chunks (svm_fifo_t * f, u32 end_pos, u8 maybe_ooo)
+{
+ svm_fifo_chunk_t *start, *prev = 0, *c;
+ rb_tree_t *rt;
+ rb_node_t *n;
+
+ if (maybe_ooo)
+ rt = &f->ooo_deq_lookup;
+
+ c = f_start_cptr (f);
+ ASSERT (!f_chunk_includes_pos (c, end_pos));
+
+ do
+ {
+ if (maybe_ooo && c->deq_rb_index != RBTREE_TNIL_INDEX)
+ {
+ n = rb_node (rt, c->deq_rb_index);
+ ASSERT (n == f_find_node_rbtree (rt, c->start_byte));
+ rb_tree_del_node (rt, n);
+ c->deq_rb_index = RBTREE_TNIL_INDEX;
+ }
+ if (!c->next)
+ break;
+ prev = c;
+ c = f_cptr (f, c->next);
+ }
+ while (!f_chunk_includes_pos (c, end_pos));
+
+ if (maybe_ooo)
+ {
+ if (f->ooo_deq && f_pos_lt (f->ooo_deq->start_byte, f_chunk_end (c)))
+ f->ooo_deq = 0;
+ }
+ else
+ {
+ if (PREDICT_FALSE (f->ooo_deq != 0))
+ f->ooo_deq = 0;
+ }
+
+ /* Avoid unlinking the last chunk */
+ if (!prev)
+ return 0;
+
+ prev->next = 0;
+ start = f_start_cptr (f);
+ f->shr->start_chunk = f_csptr (f, c);
+
+ return start;
+}
+
+int
+svm_fifo_dequeue (svm_fifo_t * f, u32 len, u8 * dst)
+{
+ u32 tail, head, cursize;
+
+ f_load_head_tail_cons (f, &head, &tail);
+
+ /* current size of fifo can only increase during dequeue: SPSC */
+ cursize = f_cursize (f, head, tail);
+
+ if (PREDICT_FALSE (cursize == 0))
+ return SVM_FIFO_EEMPTY;
+
+ len = clib_min (cursize, len);
+
+ if (!f->shr->head_chunk)
+ f->shr->head_chunk = f_csptr (f, svm_fifo_find_chunk (f, head));
+
+ svm_fifo_copy_from_chunk (f, f_head_cptr (f), head, dst, len,
+ &f->shr->head_chunk);
+ head = head + len;
+
+ /* In order dequeues are not supported in combination with ooo peeking.
+ * Use svm_fifo_dequeue_drop instead. */
+ ASSERT (rb_tree_n_nodes (&f->ooo_deq_lookup) <= 1);
+
+ if (f_pos_geq (head, f_chunk_end (f_start_cptr (f))))
+ fsh_collect_chunks (f->fs_hdr, f->shr->slice_index,
+ f_unlink_chunks (f, head, 0));
+
+ /* store-rel: consumer owned index (paired with load-acq in producer) */
+ clib_atomic_store_rel_n (&f->shr->head, head);
+
+ return len;
+}
+
+int
+svm_fifo_peek (svm_fifo_t * f, u32 offset, u32 len, u8 * dst)
+{
+ u32 tail, head, cursize, head_idx;
+ fs_sptr_t last = F_INVALID_CPTR;
+
+ f_load_head_tail_cons (f, &head, &tail);
+
+ /* current size of fifo can only increase during peek: SPSC */
+ cursize = f_cursize (f, head, tail);
+
+ if (PREDICT_FALSE (cursize < offset))
+ return SVM_FIFO_EEMPTY;
+
+ len = clib_min (cursize - offset, len);
+ head_idx = head + offset;
+
+ CLIB_MEM_UNPOISON (f->ooo_deq, sizeof (*f->ooo_deq));
+ if (!f->ooo_deq || !f_chunk_includes_pos (f->ooo_deq, head_idx))
+ f_update_ooo_deq (f, head_idx, head_idx + len);
+
+ svm_fifo_copy_from_chunk (f, f->ooo_deq, head_idx, dst, len, &last);
+ if (last != F_INVALID_CPTR)
+ f->ooo_deq = f_cptr (f, last);
+ return len;
+}
+
+int
+svm_fifo_dequeue_drop (svm_fifo_t * f, u32 len)
+{
+ u32 total_drop_bytes, tail, head, cursize;