svm: support for multi-segment enqueues
[vpp.git] / src / svm / svm_fifo.c
index 8e3bb0a..f1ac8d4 100644 (file)
@@ -70,6 +70,8 @@ CLIB_MARCH_FN (svm_fifo_copy_from_chunk, void, svm_fifo_t * f,
       c = c->next;
       while ((to_copy -= n_chunk))
        {
+         CLIB_MEM_UNPOISON (c, sizeof (*c));
+         CLIB_MEM_UNPOISON (c->data, c->length);
          n_chunk = clib_min (c->length, to_copy);
          clib_memcpy_fast (dst + (len - to_copy), &c->data[0], n_chunk);
          c = c->length <= to_copy ? c->next : c;
@@ -387,11 +389,13 @@ svm_fifo_init (svm_fifo_t * f, u32 size)
    */
   f->start_chunk->start_byte = 0;
   prev = f->start_chunk;
+  prev->enq_rb_index = prev->deq_rb_index = RBTREE_TNIL_INDEX;
   c = prev->next;
 
   while (c)
     {
       c->start_byte = prev->start_byte + prev->length;
+      c->enq_rb_index = c->deq_rb_index = RBTREE_TNIL_INDEX;
       prev = c;
       c = c->next;
     }
@@ -951,6 +955,87 @@ svm_fifo_enqueue_nocopy (svm_fifo_t * f, u32 len)
   clib_atomic_store_rel_n (&f->tail, tail);
 }
 
+int
+svm_fifo_enqueue_segments (svm_fifo_t * f, const svm_fifo_seg_t segs[],
+                          u32 n_segs, u8 allow_partial)
+{
+  u32 tail, head, free_count, len = 0, i;
+  svm_fifo_chunk_t *old_tail_c;
+
+  f->ooos_newest = OOO_SEGMENT_INVALID_INDEX;
+
+  f_load_head_tail_prod (f, &head, &tail);
+
+  /* free space in fifo can only increase during enqueue: SPSC */
+  free_count = f_free_count (f, head, tail);
+
+  if (PREDICT_FALSE (free_count == 0))
+    return SVM_FIFO_EFULL;
+
+  for (i = 0; i < n_segs; i++)
+    len += segs[i].len;
+
+  old_tail_c = f->tail_chunk;
+
+  if (!allow_partial)
+    {
+      if (PREDICT_FALSE (free_count < len))
+       return SVM_FIFO_EFULL;
+
+      if (f_pos_gt (tail + len, f_chunk_end (f->end_chunk)))
+       {
+         if (PREDICT_FALSE (f_try_chunk_alloc (f, head, tail, len)))
+           return SVM_FIFO_EGROW;
+       }
+
+      for (i = 0; i < n_segs; i++)
+       {
+         svm_fifo_copy_to_chunk (f, f->tail_chunk, tail, segs[i].data,
+                                 segs[i].len, &f->tail_chunk);
+         tail += segs[i].len;
+       }
+    }
+  else
+    {
+      len = clib_min (free_count, len);
+
+      if (f_pos_gt (tail + len, f_chunk_end (f->end_chunk)))
+       {
+         if (PREDICT_FALSE (f_try_chunk_alloc (f, head, tail, len)))
+           {
+             len = f_chunk_end (f->end_chunk) - tail;
+             if (!len)
+               return SVM_FIFO_EGROW;
+           }
+       }
+
+      i = 0;
+      while (len)
+       {
+         u32 to_copy = clib_min (segs[i].len, len);
+         svm_fifo_copy_to_chunk (f, f->tail_chunk, tail, segs[i].data,
+                                 to_copy, &f->tail_chunk);
+         len -= to_copy;
+         tail += to_copy;
+         i++;
+       }
+    }
+
+  /* collect out-of-order segments */
+  if (PREDICT_FALSE (f->ooos_list_head != OOO_SEGMENT_INVALID_INDEX))
+    {
+      len += ooo_segment_try_collect (f, len, &tail);
+      /* Tail chunk might've changed even if nothing was collected */
+      f->tail_chunk = f_lookup_clear_enq_chunks (f, old_tail_c, tail);
+      f->ooo_enq = 0;
+    }
+
+  /* store-rel: producer owned index (paired with load-acq in consumer) */
+  clib_atomic_store_rel_n (&f->tail, tail);
+
+  return len;
+}
+
 always_inline svm_fifo_chunk_t *
 f_unlink_chunks (svm_fifo_t * f, u32 end_pos, u8 maybe_ooo)
 {
@@ -1024,6 +1109,10 @@ svm_fifo_dequeue (svm_fifo_t * f, u32 len, u8 * dst)
   svm_fifo_copy_from_chunk (f, f->head_chunk, head, dst, len, &f->head_chunk);
   head = head + len;
 
+  /* In order dequeues are not supported in combination with ooo peeking.
+   * Use svm_fifo_dequeue_drop instead. */
+  ASSERT (rb_tree_n_nodes (&f->ooo_deq_lookup) <= 1);
+
   if (f_pos_geq (head, f_chunk_end (f->start_chunk)))
     fsh_collect_chunks (f->fs_hdr, f->slice_index,
                        f_unlink_chunks (f, head, 0));
@@ -1050,6 +1139,7 @@ svm_fifo_peek (svm_fifo_t * f, u32 offset, u32 len, u8 * dst)
   len = clib_min (cursize - offset, len);
   head_idx = head + offset;
 
+  CLIB_MEM_UNPOISON (f->ooo_deq, sizeof (*f->ooo_deq));
   if (!f->ooo_deq || !f_chunk_includes_pos (f->ooo_deq, head_idx))
     f_update_ooo_deq (f, head_idx, head_idx + len);
 
@@ -1132,9 +1222,12 @@ svm_fifo_fill_chunk_list (svm_fifo_t * f)
 }
 
 int
-svm_fifo_segments (svm_fifo_t * f, svm_fifo_seg_t * fs)
+svm_fifo_segments (svm_fifo_t * f, u32 offset, svm_fifo_seg_t * fs,
+                  u32 n_segs, u32 max_bytes)
 {
-  u32 cursize, head, tail, head_idx;
+  u32 cursize, to_read, head, tail, fs_index = 1;
+  u32 n_bytes, head_pos, len, start;
+  svm_fifo_chunk_t *c;
 
   f_load_head_tail_cons (f, &head, &tail);
 
@@ -1144,37 +1237,36 @@ svm_fifo_segments (svm_fifo_t * f, svm_fifo_seg_t * fs)
   if (PREDICT_FALSE (cursize == 0))
     return SVM_FIFO_EEMPTY;
 
-  head_idx = head;
+  if (offset >= cursize)
+    return SVM_FIFO_EEMPTY;
 
-  if (tail < head)
-    {
-      fs[0].len = f->size - head_idx;
-      fs[0].data = f->head_chunk->data + head_idx;
-      fs[1].len = cursize - fs[0].len;
-      fs[1].data = f->head_chunk->data;
-    }
-  else
-    {
-      fs[0].len = cursize;
-      fs[0].data = f->head_chunk->data + head_idx;
-      fs[1].len = 0;
-      fs[1].data = 0;
-    }
-  return cursize;
-}
+  to_read = clib_min (cursize - offset, max_bytes);
+  start = head + offset;
 
-void
-svm_fifo_segments_free (svm_fifo_t * f, svm_fifo_seg_t * fs)
-{
-  u32 head;
+  if (!f->head_chunk)
+    f->head_chunk = svm_fifo_find_chunk (f, head);
 
-  /* consumer owned index */
-  head = f->head;
+  c = f->head_chunk;
 
-  ASSERT (fs[0].data == f->head_chunk->data + head);
-  head = (head + fs[0].len + fs[1].len);
-  /* store-rel: consumer owned index (paired with load-acq in producer) */
-  clib_atomic_store_rel_n (&f->head, head);
+  while (!f_chunk_includes_pos (c, start))
+    c = c->next;
+
+  head_pos = start - c->start_byte;
+  fs[0].data = c->data + head_pos;
+  fs[0].len = clib_min (c->length - head_pos, cursize - offset);
+  n_bytes = fs[0].len;
+
+  while (n_bytes < to_read && fs_index < n_segs)
+    {
+      c = c->next;
+      len = clib_min (c->length, to_read - n_bytes);
+      fs[fs_index].data = c->data;
+      fs[fs_index].len = len;
+      n_bytes += len;
+      fs_index += 1;
+    }
+
+  return n_bytes;
 }
 
 /**