svm: refactor fifo chunk tracking
[vpp.git] / src / svm / svm_fifo.h
index 96ca3ee..2b6e854 100644 (file)
@@ -40,12 +40,13 @@ typedef struct
 #define SVM_FIFO_INVALID_INDEX         ((u32)~0)
 #define SVM_FIFO_MAX_EVT_SUBSCRIBERS   7
 
-typedef enum svm_fifo_tx_ntf_
+typedef enum svm_fifo_deq_ntf_
 {
-  SVM_FIFO_NO_TX_NOTIF = 0,
-  SVM_FIFO_WANT_TX_NOTIF = 1,
-  SVM_FIFO_WANT_TX_NOTIF_IF_FULL = 2,
-} svm_fifo_tx_ntf_t;
+  SVM_FIFO_NO_DEQ_NOTIF = 0,           /**< No notification requested */
+  SVM_FIFO_WANT_DEQ_NOTIF = 1,         /**< Notify on dequeue */
+  SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL = 2, /**< Notify on transition from full */
+  SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY = 4,        /**< Notify on transition to empty */
+} svm_fifo_deq_ntf_t;
 
 typedef struct
 {
@@ -59,14 +60,18 @@ typedef struct svm_fifo_chunk_
   u32 start_byte;              /**< chunk start byte */
   u32 length;                  /**< length of chunk in bytes */
   struct svm_fifo_chunk_ *next;        /**< pointer to next chunk in linked-lists */
+  rb_node_index_t rb_index;    /**< node index if chunk in rbtree */
   u8 data[0];                  /**< start of chunk data */
 } svm_fifo_chunk_t;
 
 typedef enum svm_fifo_flag_
 {
-  SVM_FIFO_F_SIZE_UPDATE = 1 << 0,
-  SVM_FIFO_F_MULTI_CHUNK = 1 << 1,
-  SVM_FIFO_F_LL_TRACKED = 1 << 2,
+  SVM_FIFO_F_MULTI_CHUNK = 1 << 0,
+  SVM_FIFO_F_GROW = 1 << 1,
+  SVM_FIFO_F_SHRINK = 1 << 2,
+  SVM_FIFO_F_COLLECT_CHUNKS = 1 << 3,
+  SVM_FIFO_F_LL_TRACKED = 1 << 4,
+  SVM_FIFO_F_SINGLE_THREAD_OWNED = 1 << 5,
 } svm_fifo_flag_t;
 
 typedef struct _svm_fifo
@@ -74,11 +79,12 @@ typedef struct _svm_fifo
   CLIB_CACHE_LINE_ALIGN_MARK (shared_first);
   u32 size;                    /**< size of the fifo in bytes */
   u32 nitems;                  /**< usable size (size-1) */
-  u8 flags;                    /**< fifo flags */
   svm_fifo_chunk_t *start_chunk;/**< first chunk in fifo chunk list */
   svm_fifo_chunk_t *end_chunk; /**< end chunk in fifo chunk list */
-  svm_fifo_chunk_t *new_chunks;        /**< chunks yet to be added to list */
-  rb_tree_t chunk_lookup;
+  rb_tree_t ooo_enq_lookup;    /**< rbtree for ooo enq chunk lookup */
+  rb_tree_t ooo_deq_lookup;    /**< rbtree for ooo deq chunk lookup */
+  u8 flags;                    /**< fifo flags */
+  u8 slice_index;              /**< segment slice for fifo */
 
     CLIB_CACHE_LINE_ALIGN_MARK (shared_second);
   volatile u32 has_event;      /**< non-zero if deq event exists */
@@ -86,19 +92,20 @@ typedef struct _svm_fifo
   u32 client_session_index;    /**< app session index */
   u8 master_thread_index;      /**< session layer thread index */
   u8 client_thread_index;      /**< app worker index */
+  i8 refcnt;                   /**< reference count  */
   u32 segment_manager;         /**< session layer segment manager index */
   u32 segment_index;           /**< segment index in segment manager */
-  u32 freelist_index;          /**< aka log2(allocated_size) - const. */
-  i8 refcnt;                   /**< reference count  */
   struct _svm_fifo *next;      /**< next in freelist/active chain */
   struct _svm_fifo *prev;      /**< prev in active chain */
+  svm_fifo_chunk_t *new_chunks;        /**< chunks yet to be added to list */
+  u32 size_decrement;          /**< bytes to remove from fifo */
 
     CLIB_CACHE_LINE_ALIGN_MARK (consumer);
   u32 head;                    /**< fifo head position/byte */
   svm_fifo_chunk_t *head_chunk;        /**< tracks chunk where head lands */
   svm_fifo_chunk_t *ooo_deq;   /**< last chunk used for ooo dequeue */
-  volatile u32 want_tx_ntf;    /**< producer wants nudge */
-  volatile u32 has_tx_ntf;
+  volatile u32 want_deq_ntf;   /**< producer wants nudge */
+  volatile u32 has_deq_ntf;
 
     CLIB_CACHE_LINE_ALIGN_MARK (producer);
   u32 tail;                    /**< fifo tail position/byte */
@@ -114,7 +121,6 @@ typedef struct _svm_fifo
   svm_fifo_trace_elem_t *trace;
 #endif
 
-  svm_fifo_chunk_t default_chunk;
 } svm_fifo_t;
 
 typedef enum
@@ -172,7 +178,8 @@ f_load_head_tail_prod (svm_fifo_t * f, u32 * head, u32 * tail)
   *head = clib_atomic_load_acq_n (&f->head);
 }
 
-/* Load head and tail independent of producer/consumer role
+/**
+ * Load head and tail independent of producer/consumer role
  *
  * Internal function.
  */
@@ -186,49 +193,56 @@ f_load_head_tail_all_acq (svm_fifo_t * f, u32 * head, u32 * tail)
 }
 
 /**
- * Fifo free bytes, i.e., number of free bytes
+ * Distance to a from b, i.e., a - b in the fifo
  *
- * Internal function
+ * Internal function.
  */
 static inline u32
-f_free_count (svm_fifo_t * f, u32 head, u32 tail)
+f_distance_to (svm_fifo_t * f, u32 a, u32 b)
 {
-  return (f->nitems + head - tail);
+  return ((f->size + a - b) % f->size);
 }
 
 /**
- * Fifo current size, i.e., number of bytes enqueued
+ * Distance from a to b, i.e., b - a in the fifo
  *
  * Internal function.
  */
 static inline u32
-f_cursize (svm_fifo_t * f, u32 head, u32 tail)
+f_distance_from (svm_fifo_t * f, u32 a, u32 b)
 {
-  return (f->nitems - f_free_count (f, head, tail));
+  return ((f->size + b - a) % f->size);
 }
 
 /**
- * Distance to a from b, i.e., a - b in the fifo
+ * Fifo current size, i.e., number of bytes enqueued
  *
  * Internal function.
  */
 static inline u32
-f_distance_to (svm_fifo_t * f, u32 a, u32 b)
+f_cursize (svm_fifo_t * f, u32 head, u32 tail)
 {
-  return ((f->size + a - b) % f->size);
+  return (head <= tail ? tail - head : f->size + tail - head);
 }
 
 /**
- * Distance from a to b, i.e., b - a in the fifo
+ * Fifo free bytes, i.e., number of free bytes
  *
- * Internal function.
+ * Internal function
  */
 static inline u32
-f_distance_from (svm_fifo_t * f, u32 a, u32 b)
+f_free_count (svm_fifo_t * f, u32 head, u32 tail)
 {
-  return ((f->size + b - a) % f->size);
+  return (f->nitems - f_cursize (f, head, tail));
 }
 
+/**
+ * Try to shrink fifo size.
+ *
+ * Internal function.
+ */
+void svm_fifo_try_shrink (svm_fifo_t * f, u32 head, u32 tail);
+
 /**
  * Create fifo of requested size
  *
@@ -242,9 +256,16 @@ svm_fifo_t *svm_fifo_create (u32 size);
 /**
  * Initialize fifo
  *
+ * @param f            fifo
  * @param size         size for fifo
  */
 void svm_fifo_init (svm_fifo_t * f, u32 size);
+/**
+ * Initialize fifo chunks and rbtree
+ *
+ * @param f            fifo
+ */
+void svm_fifo_init_chunks (svm_fifo_t * f);
 /**
  * Allocate a fifo chunk on heap
  *
@@ -266,6 +287,31 @@ svm_fifo_chunk_t *svm_fifo_chunk_alloc (u32 size);
  * @param c    chunk or linked list of chunks to be added
  */
 void svm_fifo_add_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c);
+/**
+ * Request to reduce fifo size by amount of bytes
+ *
+ * Because the producer might be enqueuing data when this is called, the
+ * actual size update is only applied when producer tries to enqueue new
+ * data, unless @param try_shrink is set.
+ *
+ * @param f            fifo
+ * @param len          number of bytes to remove from fifo. The actual number
+ *                     of bytes to be removed will be less or equal to this
+ *                     value.
+ * @param try_shrink   flg to indicate if it's safe to try to shrink fifo
+ *                     size. It should be set only if this is called by the
+ *                     producer of if the producer is not using the fifo
+ * @return             actual length fifo size will be reduced by
+ */
+int svm_fifo_reduce_size (svm_fifo_t * f, u32 len, u8 try_shrink);
+/**
+ * Removes chunks that are after fifo end byte
+ *
+ * Needs to be called with segment heap pushed.
+ *
+ * @param f fifo
+ */
+svm_fifo_chunk_t *svm_fifo_collect_chunks (svm_fifo_t * f);
 /**
  * Free fifo and associated state
  *
@@ -333,6 +379,16 @@ int svm_fifo_enqueue (svm_fifo_t * f, u32 len, const u8 * src);
  */
 int svm_fifo_enqueue_with_offset (svm_fifo_t * f, u32 offset, u32 len,
                                  u8 * src);
+
+/**
+ * Advance tail pointer
+ *
+ * Useful for moving tail pointer after external enqueue.
+ *
+ * @param f            fifo
+ * @param len          number of bytes to add to tail
+ */
+void svm_fifo_enqueue_nocopy (svm_fifo_t * f, u32 len);
 /**
  * Overwrite fifo head with new data
  *
@@ -411,13 +467,28 @@ void svm_fifo_del_subscriber (svm_fifo_t * f, u8 subscriber);
  * @return     number of out of order segments
  */
 u32 svm_fifo_n_ooo_segments (svm_fifo_t * f);
-/*
+/**
  * First out-of-order segment for fifo
  *
  * @param f    fifo
  * @return     first out-of-order segment for fifo
  */
 ooo_segment_t *svm_fifo_first_ooo_segment (svm_fifo_t * f);
+/**
+ * Check if fifo is sane. Debug only.
+ *
+ * @param f    fifo
+ * @return     1 if sane, 0 otherwise
+ */
+u8 svm_fifo_is_sane (svm_fifo_t * f);
+/**
+ * Declare this fifo is used by only a single thread.
+ * In this special case, fifo-growth can be done in an efficient way without delay.
+ *
+ * @param f             fifo
+ * @return              1 if the fifo is already owned by another thread, 0 otherwise
+ */
+u8 svm_fifo_set_single_thread_owned (svm_fifo_t * f);
 format_function_t format_svm_fifo;
 
 /**
@@ -535,7 +606,7 @@ svm_fifo_is_wrapped (svm_fifo_t * f)
 {
   u32 head, tail;
   f_load_head_tail_all_acq (f, &head, &tail);
-  return head % f->size > tail % f->size;
+  return head > tail;
 }
 
 /**
@@ -551,6 +622,8 @@ svm_fifo_max_enqueue_prod (svm_fifo_t * f)
 {
   u32 head, tail;
   f_load_head_tail_prod (f, &head, &tail);
+  if (PREDICT_FALSE (f->flags & SVM_FIFO_F_SHRINK))
+    svm_fifo_try_shrink (f, head, tail);
   return f_free_count (f, head, tail);
 }
 
@@ -565,6 +638,8 @@ svm_fifo_max_enqueue (svm_fifo_t * f)
 {
   u32 head, tail;
   f_load_head_tail_all_acq (f, &head, &tail);
+  if (PREDICT_FALSE (f->flags & SVM_FIFO_F_SHRINK))
+    svm_fifo_try_shrink (f, head, tail);
   return f_free_count (f, head, tail);
 }
 
@@ -575,11 +650,8 @@ static inline u32
 svm_fifo_max_read_chunk (svm_fifo_t * f)
 {
   u32 head, tail;
-  u32 head_idx, tail_idx;
   f_load_head_tail_cons (f, &head, &tail);
-  head_idx = head % f->size;
-  tail_idx = tail % f->size;
-  return tail_idx > head_idx ? (tail_idx - head_idx) : (f->size - head_idx);
+  return tail >= head ? (tail - head) : (f->size - head);
 }
 
 /**
@@ -589,46 +661,22 @@ static inline u32
 svm_fifo_max_write_chunk (svm_fifo_t * f)
 {
   u32 head, tail;
-  u32 head_idx, tail_idx;
   f_load_head_tail_prod (f, &head, &tail);
-  head_idx = head % f->size;
-  tail_idx = tail % f->size;
-  return tail_idx >= head_idx ? (f->size - tail_idx) : (head_idx - tail_idx);
-}
-
-/**
- * Advance tail pointer
- *
- * Useful for moving tail pointer after external enqueue.
- *
- * @param f            fifo
- * @param len          number of bytes to add to tail
- */
-static inline void
-svm_fifo_enqueue_nocopy (svm_fifo_t * f, u32 len)
-{
-  ASSERT (len <= svm_fifo_max_enqueue_prod (f));
-  /* load-relaxed: producer owned index */
-  u32 tail = f->tail;
-  tail += len;
-  /* store-rel: producer owned index (paired with load-acq in consumer) */
-  clib_atomic_store_rel_n (&f->tail, tail);
+  return tail >= head ? f->size - tail : f_free_count (f, head, tail);
 }
 
 static inline u8 *
 svm_fifo_head (svm_fifo_t * f)
 {
   /* load-relaxed: consumer owned index */
-  return (f->head_chunk->data
-         + ((f->head % f->size) - f->head_chunk->start_byte));
+  return (f->head_chunk->data + (f->head - f->head_chunk->start_byte));
 }
 
 static inline u8 *
 svm_fifo_tail (svm_fifo_t * f)
 {
   /* load-relaxed: producer owned index */
-  return (f->tail_chunk->data
-         + ((f->tail % f->size) - f->tail_chunk->start_byte));
+  return (f->tail_chunk->data + (f->tail - f->tail_chunk->start_byte));
 }
 
 static inline u8
@@ -719,95 +767,98 @@ svm_fifo_unset_event (svm_fifo_t * f)
 }
 
 /**
- * Set specific want tx notification flag
+ * Set specific want notification flag
  *
- * For list of flags see @ref svm_fifo_tx_ntf_t
+ * For list of flags see @ref svm_fifo_deq_ntf_t
  *
  * @param f            fifo
  * @param ntf_type     type of notification requested
  */
 static inline void
-svm_fifo_add_want_tx_ntf (svm_fifo_t * f, u8 ntf_type)
+svm_fifo_add_want_deq_ntf (svm_fifo_t * f, u8 ntf_type)
 {
-  f->want_tx_ntf |= ntf_type;
+  f->want_deq_ntf |= ntf_type;
 }
 
 /**
- * Clear specific want tx notification flag
+ * Clear specific want notification flag
  *
- * For list of flags see @ref svm_fifo_tx_ntf_t
+ * For list of flags see @ref svm_fifo_ntf_t
  *
  * @param f            fifo
  * @param ntf_type     type of notification to be cleared
  */
 static inline void
-svm_fifo_del_want_tx_ntf (svm_fifo_t * f, u8 ntf_type)
+svm_fifo_del_want_deq_ntf (svm_fifo_t * f, u8 ntf_type)
 {
-  f->want_tx_ntf &= ~ntf_type;
+  f->want_deq_ntf &= ~ntf_type;
 }
 
 /**
- * Clear the want tx notification flag and set has tx notification
+ * Clear the want notification flag and set has notification
  *
- * Should be used after enqueuing a tx event. This clears the
- * SVM_FIFO_WANT_TX_NOTIF flag but it does not clear
- * SVM_FIFO_WANT_TX_NOTIF_IF_FULL. If the latter was set, has_tx_ntf is
- * set to avoid enqueueing tx events for for all dequeue operations until
+ * Should be used after enqueuing an event. This clears the
+ * SVM_FIFO_WANT_NOTIF flag but it does not clear
+ * SVM_FIFO_WANT_NOTIF_IF_FULL. If the latter was set, has_ntf is
+ * set to avoid enqueueing events for for all dequeue operations until
  * it is manually cleared.
  *
  * @param f    fifo
  */
 static inline void
-svm_fifo_clear_tx_ntf (svm_fifo_t * f)
+svm_fifo_clear_deq_ntf (svm_fifo_t * f)
 {
-  /* Set the flag if want_tx_notif_if_full was the only ntf requested */
-  f->has_tx_ntf = f->want_tx_ntf == SVM_FIFO_WANT_TX_NOTIF_IF_FULL;
-  svm_fifo_del_want_tx_ntf (f, SVM_FIFO_WANT_TX_NOTIF);
+  /* Set the flag if want_notif_if_full was the only ntf requested */
+  f->has_deq_ntf = f->want_deq_ntf == SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL;
+  svm_fifo_del_want_deq_ntf (f, SVM_FIFO_WANT_DEQ_NOTIF);
 }
 
 /**
- * Clear has tx notification flag
+ * Clear has notification flag
  *
- * The fifo generates only one event per SVM_FIFO_WANT_TX_NOTIF_IF_FULL
- * request and sets has_tx_ntf. To received new events the flag must be
+ * The fifo generates only one event per SVM_FIFO_WANT_NOTIF_IF_FULL
+ * request and sets has_ntf. To received new events the flag must be
  * cleared using this function.
  *
  * @param f    fifo
  */
 static inline void
-svm_fifo_reset_tx_ntf (svm_fifo_t * f)
+svm_fifo_reset_has_deq_ntf (svm_fifo_t * f)
 {
-  f->has_tx_ntf = 0;
+  f->has_deq_ntf = 0;
 }
 
 /**
- * Check if fifo needs tx notification
+ * Check if fifo needs dequeue notification
  *
- * Determines based on tx notification request flags and state of the fifo if
- * a tx io event should be generated.
+ * Determines based on notification request flags and state of the fifo if
+ * an event should be generated.
  *
  * @param f            fifo
  * @param n_last_deq   number of bytes last dequeued
- * @return             1 if tx io event should be generated, 0 otherwise
+ * @return             1 if event should be generated, 0 otherwise
  */
 static inline u8
-svm_fifo_needs_tx_ntf (svm_fifo_t * f, u32 n_last_deq)
+svm_fifo_needs_deq_ntf (svm_fifo_t * f, u32 n_last_deq)
 {
-  u8 want_ntf = f->want_tx_ntf;
+  u8 want_ntf = f->want_deq_ntf;
 
-  if (PREDICT_TRUE (want_ntf == SVM_FIFO_NO_TX_NOTIF))
+  if (PREDICT_TRUE (want_ntf == SVM_FIFO_NO_DEQ_NOTIF))
     return 0;
-  else if (want_ntf & SVM_FIFO_WANT_TX_NOTIF)
+  else if (want_ntf & SVM_FIFO_WANT_DEQ_NOTIF)
     return 1;
-  else if (want_ntf & SVM_FIFO_WANT_TX_NOTIF_IF_FULL)
+  if (want_ntf & SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL)
     {
       u32 max_deq = svm_fifo_max_dequeue_cons (f);
       u32 nitems = f->nitems;
-      if (!f->has_tx_ntf && max_deq < nitems
+      if (!f->has_deq_ntf && max_deq < nitems
          && max_deq + n_last_deq >= nitems)
        return 1;
-
-      return 0;
+    }
+  if (want_ntf & SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY)
+    {
+      if (!f->has_deq_ntf && svm_fifo_is_empty (f))
+       return 1;
     }
   return 0;
 }