svm: Add support for thresh dequeue notification
[vpp.git] / src / svm / svm_fifo.h
index ce4c53d..42efb5a 100644 (file)
 #include <vppinfra/vec.h>
 #include <vppinfra/pool.h>
 #include <vppinfra/format.h>
-#include <vppinfra/rbtree.h>
+#include <svm/fifo_types.h>
 
-/** Out-of-order segment */
-typedef struct
-{
-  u32 next;    /**< Next linked-list element pool index */
-  u32 prev;    /**< Previous linked-list element pool index */
-  u32 start;   /**< Start of segment, normalized*/
-  u32 length;  /**< Length of segment */
-} ooo_segment_t;
-
-#define SVM_FIFO_TRACE                         (0)
 #define OOO_SEGMENT_INVALID_INDEX      ((u32)~0)
 #define SVM_FIFO_INVALID_SESSION_INDEX         ((u32)~0)
 #define SVM_FIFO_INVALID_INDEX         ((u32)~0)
-#define SVM_FIFO_MAX_EVT_SUBSCRIBERS   7
 
 typedef enum svm_fifo_deq_ntf_
 {
   SVM_FIFO_NO_DEQ_NOTIF = 0,           /**< No notification requested */
   SVM_FIFO_WANT_DEQ_NOTIF = 1,         /**< Notify on dequeue */
   SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL = 2, /**< Notify on transition from full */
-  SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY = 4,        /**< Notify on transition to empty */
+  SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY = 4, /**< Notify on transition to empty */
+  SVM_FIFO_WANT_DEQ_NOTIF_IF_LEQ_THRESH = 5, /**< Notify on transition to less
+                                              than or equal threshold */
 } svm_fifo_deq_ntf_t;
 
-typedef struct
-{
-  u32 offset;
-  u32 len;
-  u32 action;
-} svm_fifo_trace_elem_t;
-
-typedef struct svm_fifo_chunk_
-{
-  u32 start_byte;              /**< chunk start byte */
-  u32 length;                  /**< length of chunk in bytes */
-  struct svm_fifo_chunk_ *next;        /**< pointer to next chunk in linked-lists */
-  u8 data[0];                  /**< start of chunk data */
-} svm_fifo_chunk_t;
-
 typedef enum svm_fifo_flag_
 {
-  SVM_FIFO_F_MULTI_CHUNK = 1 << 0,
-  SVM_FIFO_F_GROW = 1 << 1,
-  SVM_FIFO_F_SHRINK = 1 << 2,
-  SVM_FIFO_F_COLLECT_CHUNKS = 1 << 3,
-  SVM_FIFO_F_LL_TRACKED = 1 << 4,
-  SVM_FIFO_F_SINGLE_THREAD_OWNED = 1 << 5,
+  SVM_FIFO_F_LL_TRACKED = 1 << 0,
 } svm_fifo_flag_t;
 
-typedef struct _svm_fifo
-{
-  CLIB_CACHE_LINE_ALIGN_MARK (shared_first);
-  u32 size;                    /**< size of the fifo in bytes */
-  u32 nitems;                  /**< usable size (size-1) */
-  u8 flags;                    /**< fifo flags */
-  svm_fifo_chunk_t *start_chunk;/**< first chunk in fifo chunk list */
-  svm_fifo_chunk_t *end_chunk; /**< end chunk in fifo chunk list */
-  svm_fifo_chunk_t *new_chunks;        /**< chunks yet to be added to list */
-  rb_tree_t chunk_lookup;
-
-    CLIB_CACHE_LINE_ALIGN_MARK (shared_second);
-  volatile u32 has_event;      /**< non-zero if deq event exists */
-  u32 master_session_index;    /**< session layer session index */
-  u32 client_session_index;    /**< app session index */
-  u8 master_thread_index;      /**< session layer thread index */
-  u8 client_thread_index;      /**< app worker index */
-  i8 refcnt;                   /**< reference count  */
-  u32 segment_manager;         /**< session layer segment manager index */
-  u32 segment_index;           /**< segment index in segment manager */
-  struct _svm_fifo *next;      /**< next in freelist/active chain */
-  struct _svm_fifo *prev;      /**< prev in active chain */
-  u32 size_decrement;          /**< bytes to remove from fifo */
-
-    CLIB_CACHE_LINE_ALIGN_MARK (consumer);
-  u32 head;                    /**< fifo head position/byte */
-  svm_fifo_chunk_t *head_chunk;        /**< tracks chunk where head lands */
-  svm_fifo_chunk_t *ooo_deq;   /**< last chunk used for ooo dequeue */
-  volatile u32 want_deq_ntf;   /**< producer wants nudge */
-  volatile u32 has_deq_ntf;
-
-    CLIB_CACHE_LINE_ALIGN_MARK (producer);
-  u32 tail;                    /**< fifo tail position/byte */
-  u32 ooos_list_head;          /**< Head of out-of-order linked-list */
-  svm_fifo_chunk_t *tail_chunk;        /**< tracks chunk where tail lands */
-  svm_fifo_chunk_t *ooo_enq;   /**< last chunk used for ooo enqueue */
-  ooo_segment_t *ooo_segments; /**< Pool of ooo segments */
-  u32 ooos_newest;             /**< Last segment to have been updated */
-  volatile u8 n_subscribers;   /**< Number of subscribers for io events */
-  u8 subscribers[SVM_FIFO_MAX_EVT_SUBSCRIBERS];
-
-#if SVM_FIFO_TRACE
-  svm_fifo_trace_elem_t *trace;
-#endif
-
-} svm_fifo_t;
-
 typedef enum
 {
   SVM_FIFO_EFULL = -2,
   SVM_FIFO_EEMPTY = -3,
+  SVM_FIFO_EGROW = -4,
 } svm_fifo_err_t;
 
 typedef struct svm_fifo_seg_
@@ -157,9 +82,9 @@ static inline void
 f_load_head_tail_cons (svm_fifo_t * f, u32 * head, u32 * tail)
 {
   /* load-relaxed: consumer owned index */
-  *head = f->head;
+  *head = f->shr->head;
   /* load-acq: consumer foreign index (paired with store-rel in producer) */
-  *tail = clib_atomic_load_acq_n (&f->tail);
+  *tail = clib_atomic_load_acq_n (&f->shr->tail);
 }
 
 /** Load head and tail optimized for producer
@@ -170,9 +95,9 @@ static inline void
 f_load_head_tail_prod (svm_fifo_t * f, u32 * head, u32 * tail)
 {
   /* load relaxed: producer owned index */
-  *tail = f->tail;
+  *tail = f->shr->tail;
   /* load-acq: producer foreign index (paired with store-rel in consumer) */
-  *head = clib_atomic_load_acq_n (&f->head);
+  *head = clib_atomic_load_acq_n (&f->shr->head);
 }
 
 /**
@@ -184,61 +109,111 @@ static inline void
 f_load_head_tail_all_acq (svm_fifo_t * f, u32 * head, u32 * tail)
 {
   /* load-acq : consumer foreign index (paired with store-rel) */
-  *tail = clib_atomic_load_acq_n (&f->tail);
+  *tail = clib_atomic_load_acq_n (&f->shr->tail);
   /* load-acq : producer foriegn index (paired with store-rel) */
-  *head = clib_atomic_load_acq_n (&f->head);
+  *head = clib_atomic_load_acq_n (&f->shr->head);
 }
 
 /**
- * Distance to a from b, i.e., a - b in the fifo
+ * Fifo current size, i.e., number of bytes enqueued
  *
  * Internal function.
  */
 static inline u32
-f_distance_to (svm_fifo_t * f, u32 a, u32 b)
+f_cursize (svm_fifo_t * f, u32 head, u32 tail)
 {
-  return ((f->size + a - b) % f->size);
+  return tail - head;
 }
 
 /**
- * Distance from a to b, i.e., b - a in the fifo
+ * Fifo free bytes, i.e., number of free bytes
  *
- * Internal function.
+ * Internal function
  */
 static inline u32
-f_distance_from (svm_fifo_t * f, u32 a, u32 b)
+f_free_count (svm_fifo_t * f, u32 head, u32 tail)
 {
-  return ((f->size + b - a) % f->size);
+  return (f->shr->size - f_cursize (f, head, tail));
 }
 
-/**
- * Fifo current size, i.e., number of bytes enqueued
- *
- * Internal function.
- */
-static inline u32
-f_cursize (svm_fifo_t * f, u32 head, u32 tail)
+always_inline u32
+f_chunk_end (svm_fifo_chunk_t * c)
 {
-  return (head <= tail ? tail - head : f->size + tail - head);
+  return c->start_byte + c->length;
 }
 
-/**
- * Fifo free bytes, i.e., number of free bytes
- *
- * Internal function
- */
-static inline u32
-f_free_count (svm_fifo_t * f, u32 head, u32 tail)
+always_inline int
+f_pos_lt (u32 a, u32 b)
 {
-  return (f->nitems - f_cursize (f, head, tail));
+  return ((i32) (a - b) < 0);
 }
 
-/**
- * Try to shrink fifo size.
- *
- * Internal function.
- */
-void svm_fifo_try_shrink (svm_fifo_t * f, u32 head, u32 tail);
+always_inline int
+f_pos_leq (u32 a, u32 b)
+{
+  return ((i32) (a - b) <= 0);
+}
+
+always_inline int
+f_pos_gt (u32 a, u32 b)
+{
+  return ((i32) (a - b) > 0);
+}
+
+always_inline int
+f_pos_geq (u32 a, u32 b)
+{
+  return ((i32) (a - b) >= 0);
+}
+
+always_inline u8
+f_chunk_includes_pos (svm_fifo_chunk_t * c, u32 pos)
+{
+  return (f_pos_geq (pos, c->start_byte)
+         && f_pos_lt (pos, c->start_byte + c->length));
+}
+
+always_inline svm_fifo_chunk_t *
+f_start_cptr (svm_fifo_t *f)
+{
+  return fs_chunk_ptr (f->fs_hdr, f->shr->start_chunk);
+}
+
+always_inline svm_fifo_chunk_t *
+f_end_cptr (svm_fifo_t *f)
+{
+  return fs_chunk_ptr (f->fs_hdr, f->shr->end_chunk);
+}
+
+always_inline svm_fifo_chunk_t *
+f_head_cptr (svm_fifo_t *f)
+{
+  return fs_chunk_ptr (f->fs_hdr, f->shr->head_chunk);
+}
+
+always_inline svm_fifo_chunk_t *
+f_tail_cptr (svm_fifo_t *f)
+{
+  return fs_chunk_ptr (f->fs_hdr, f->shr->tail_chunk);
+}
+
+always_inline svm_fifo_chunk_t *
+f_cptr (svm_fifo_t *f, fs_sptr_t cp)
+{
+  return fs_chunk_ptr (f->fs_hdr, cp);
+}
+
+always_inline fs_sptr_t
+f_csptr (svm_fifo_t *f, svm_fifo_chunk_t *c)
+{
+  return fs_chunk_sptr (f->fs_hdr, c);
+}
+
+always_inline void
+f_csptr_link (svm_fifo_t *f, fs_sptr_t cp, svm_fifo_chunk_t *c)
+{
+  fs_chunk_ptr (f->fs_hdr, cp)->next = fs_chunk_sptr (f->fs_hdr, c);
+}
 
 /**
  * Create fifo of requested size
@@ -249,7 +224,7 @@ void svm_fifo_try_shrink (svm_fifo_t * f, u32 head, u32 tail);
  *                     rounded to the next highest power-of-two value.
  * @return             pointer to new fifo
  */
-svm_fifo_t *svm_fifo_create (u32 size);
+svm_fifo_t *svm_fifo_alloc (u32 size);
 /**
  * Initialize fifo
  *
@@ -257,12 +232,6 @@ svm_fifo_t *svm_fifo_create (u32 size);
  * @param size         size for fifo
  */
 void svm_fifo_init (svm_fifo_t * f, u32 size);
-/**
- * Initialize fifo chunks and rbtree
- *
- * @param f            fifo
- */
-void svm_fifo_init_chunks (svm_fifo_t * f);
 /**
  * Allocate a fifo chunk on heap
  *
@@ -275,40 +244,35 @@ void svm_fifo_init_chunks (svm_fifo_t * f);
  */
 svm_fifo_chunk_t *svm_fifo_chunk_alloc (u32 size);
 /**
- * Grow fifo size by adding chunk to chunk list
+ * Ensure the whole fifo size is writeable
  *
- * If fifos are allocated on a segment, this should be called with
- * the segment's heap pushed.
+ * Allocates enough chunks to cover the whole fifo size.
  *
- * @param f    fifo to be extended
- * @param c    chunk or linked list of chunks to be added
+ * @param f    fifo
  */
-void svm_fifo_add_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c);
+int svm_fifo_fill_chunk_list (svm_fifo_t * f);
 /**
- * Request to reduce fifo size by amount of bytes
+ * Provision and return chunks for number of bytes requested
  *
- * Because the producer might be enqueuing data when this is called, the
- * actual size update is only applied when producer tries to enqueue new
- * data, unless @param try_shrink is set.
+ * Allocates enough chunks to cover the bytes requested and returns them
+ * in the fifo segment array. The number of bytes provisioned may be less
+ * than requested if not enough segments were provided.
  *
  * @param f            fifo
- * @param len          number of bytes to remove from fifo. The actual number
- *                     of bytes to be removed will be less or equal to this
- *                     value.
- * @param try_shrink   flg to indicate if it's safe to try to shrink fifo
- *                     size. It should be set only if this is called by the
- *                     producer of if the producer is not using the fifo
- * @return             actual length fifo size will be reduced by
+ * @param fs           array of fifo segments
+ * @param n_segs       length of fifo segments array
+ * @param len          number of bytes to preallocate
+ * @return             number of fifo segments provisioned or error
  */
-int svm_fifo_reduce_size (svm_fifo_t * f, u32 len, u8 try_shrink);
+int svm_fifo_provision_chunks (svm_fifo_t *f, svm_fifo_seg_t *fs, u32 n_segs,
+                              u32 len);
 /**
- * Removes chunks that are after fifo end byte
+ * Initialize rbtrees used for ooo lookups
  *
- * Needs to be called with segment heap pushed.
- *
- * @param f fifo
+ * @param f            fifo
+ * @param ooo_type     type of ooo operation (0 enqueue, 1 dequeue)
  */
-svm_fifo_chunk_t *svm_fifo_collect_chunks (svm_fifo_t * f);
+void svm_fifo_init_ooo_lookup (svm_fifo_t * f, u8 ooo_type);
 /**
  * Free fifo and associated state
  *
@@ -386,11 +350,22 @@ int svm_fifo_enqueue_with_offset (svm_fifo_t * f, u32 offset, u32 len,
  * @param len          number of bytes to add to tail
  */
 void svm_fifo_enqueue_nocopy (svm_fifo_t * f, u32 len);
+/**
+ * Enqueue array of @ref svm_fifo_seg_t in order
+ *
+ * @param f            fifo
+ * @param segs         array of segments to enqueue
+ * @param n_segs       number of segments
+ * @param allow_partial        if set partial enqueues are allowed
+ * @return             len if enqueue was successful, error otherwise
+ */
+int svm_fifo_enqueue_segments (svm_fifo_t * f, const svm_fifo_seg_t segs[],
+                              u32 n_segs, u8 allow_partial);
 /**
  * Overwrite fifo head with new data
  *
  * This should be typically used by dgram transport protocols that need
- * to update the dgram header after dequeueing a chunk of data. It assumes
+ * to update the dgram header after dequeuing a chunk of data. It assumes
  * that the dgram header is at most spread over two chunks.
  *
  * @param f            fifo
@@ -402,7 +377,9 @@ void svm_fifo_overwrite_head (svm_fifo_t * f, u8 * src, u32 len);
  * Dequeue data from fifo
  *
  * Data is dequeued to consumer provided buffer and head is atomically
- * updated.
+ * updated. This should not be used in combination with ooo lookups. If
+ * ooo peeking of data is needed in combination with dequeuing use @ref
+ * svm_fifo_dequeue_drop.
  *
  * @param f            fifo
  * @param len          length of data to dequeue
@@ -441,8 +418,23 @@ int svm_fifo_dequeue_drop (svm_fifo_t * f, u32 len);
  * @param f            fifo
  */
 void svm_fifo_dequeue_drop_all (svm_fifo_t * f);
-int svm_fifo_segments (svm_fifo_t * f, svm_fifo_seg_t * fs);
-void svm_fifo_segments_free (svm_fifo_t * f, svm_fifo_seg_t * fs);
+/**
+ * Get pointers to fifo chunks data in @ref svm_fifo_seg_t array
+ *
+ * Populates fifo segment array with pointers to fifo chunk data and lengths.
+ * Because this returns pointers to data, it must be paired with
+ * @ref svm_fifo_dequeue_drop to actually release the fifo chunks after the
+ * data is consumed.
+ *
+ * @param f            fifo
+ * @param offset       offset from where to retrieve segments
+ * @param fs           array of fifo segments allocated by caller
+ * @param n_segs       number of fifo segments in array
+ * @param max_bytes    max bytes to be mapped to fifo segments
+ * @return             number of bytes in fifo segments or SVM_FIFO_EEMPTY
+ */
+int svm_fifo_segments (svm_fifo_t * f, u32 offset, svm_fifo_seg_t * fs,
+                      u32 n_segs, u32 max_bytes);
 /**
  * Add io events subscriber to list
  *
@@ -479,13 +471,12 @@ ooo_segment_t *svm_fifo_first_ooo_segment (svm_fifo_t * f);
  */
 u8 svm_fifo_is_sane (svm_fifo_t * f);
 /**
- * Declare this fifo is used by only a single thread.
- * In this special case, fifo-growth can be done in an efficient way without delay.
+ * Number of chunks linked into the fifo
  *
- * @param f             fifo
- * @return              1 if the fifo is already owned by another thread, 0 otherwise
+ * @param f    fifo
+ * @return     number of chunks in fifo linked list
  */
-u8 svm_fifo_set_single_thread_owned (svm_fifo_t * f);
+u32 svm_fifo_n_chunks (svm_fifo_t * f);
 format_function_t format_svm_fifo;
 
 /**
@@ -540,7 +531,7 @@ svm_fifo_max_dequeue (svm_fifo_t * f)
 static inline int
 svm_fifo_is_full_prod (svm_fifo_t * f)
 {
-  return (svm_fifo_max_dequeue_prod (f) == f->nitems);
+  return (svm_fifo_max_dequeue_prod (f) == f->shr->size);
 }
 
 /* Check if fifo is full.
@@ -552,7 +543,7 @@ svm_fifo_is_full_prod (svm_fifo_t * f)
 static inline int
 svm_fifo_is_full (svm_fifo_t * f)
 {
-  return (svm_fifo_max_dequeue (f) == f->nitems);
+  return (svm_fifo_max_dequeue (f) == f->shr->size);
 }
 
 /**
@@ -619,8 +610,6 @@ svm_fifo_max_enqueue_prod (svm_fifo_t * f)
 {
   u32 head, tail;
   f_load_head_tail_prod (f, &head, &tail);
-  if (PREDICT_FALSE (f->flags & SVM_FIFO_F_SHRINK))
-    svm_fifo_try_shrink (f, head, tail);
   return f_free_count (f, head, tail);
 }
 
@@ -635,51 +624,33 @@ svm_fifo_max_enqueue (svm_fifo_t * f)
 {
   u32 head, tail;
   f_load_head_tail_all_acq (f, &head, &tail);
-  if (PREDICT_FALSE (f->flags & SVM_FIFO_F_SHRINK))
-    svm_fifo_try_shrink (f, head, tail);
   return f_free_count (f, head, tail);
 }
 
 /**
- * Max contiguous chunk of data that can be read
+ * Max contiguous chunk of data that can be read.
+ *
+ * Should only be called by consumers.
  */
-static inline u32
-svm_fifo_max_read_chunk (svm_fifo_t * f)
-{
-  u32 head, tail;
-  f_load_head_tail_cons (f, &head, &tail);
-  return tail >= head ? (tail - head) : (f->size - head);
-}
+u32 svm_fifo_max_read_chunk (svm_fifo_t * f);
 
 /**
  * Max contiguous chunk of data that can be written
+ *
+ * Should only be called by producers
  */
-static inline u32
-svm_fifo_max_write_chunk (svm_fifo_t * f)
-{
-  u32 head, tail;
-  f_load_head_tail_prod (f, &head, &tail);
-  return tail >= head ? f->size - tail : f_free_count (f, head, tail);
-}
-
-static inline u8 *
-svm_fifo_head (svm_fifo_t * f)
-{
-  /* load-relaxed: consumer owned index */
-  return (f->head_chunk->data + (f->head - f->head_chunk->start_byte));
-}
-
-static inline u8 *
-svm_fifo_tail (svm_fifo_t * f)
-{
-  /* load-relaxed: producer owned index */
-  return (f->tail_chunk->data + (f->tail - f->tail_chunk->start_byte));
-}
+u32 svm_fifo_max_write_chunk (svm_fifo_t * f);
 
+/**
+ * Fifo number of subscribers getter
+ *
+ * @param f    fifo
+ * @return     number of subscribers
+ */
 static inline u8
 svm_fifo_n_subscribers (svm_fifo_t * f)
 {
-  return f->n_subscribers;
+  return f->shr->n_subscribers;
 }
 
 /**
@@ -713,9 +684,9 @@ ooo_segment_offset_prod (svm_fifo_t * f, ooo_segment_t * s)
 {
   u32 tail;
   /* load-relaxed: producer owned index */
-  tail = f->tail;
+  tail = f->shr->tail;
 
-  return f_distance_to (f, s->start, tail);
+  return (s->start - tail);
 }
 
 static inline u32
@@ -724,6 +695,22 @@ ooo_segment_length (svm_fifo_t * f, ooo_segment_t * s)
   return s->length;
 }
 
+static inline u32
+svm_fifo_size (svm_fifo_t * f)
+{
+  return f->shr->size;
+}
+
+static inline void
+svm_fifo_set_size (svm_fifo_t * f, u32 size)
+{
+  if (size > (1 << f->fs_hdr->max_log2_fifo_size))
+    return;
+  fsh_virtual_mem_update (f->fs_hdr, f->shr->slice_index,
+                         (int) f->shr->size - size);
+  f->shr->size = size;
+}
+
 /**
  * Check if fifo has io event
  *
@@ -733,7 +720,7 @@ ooo_segment_length (svm_fifo_t * f, ooo_segment_t * s)
 static inline int
 svm_fifo_has_event (svm_fifo_t * f)
 {
-  return f->has_event;
+  return f->shr->has_event;
 }
 
 /**
@@ -747,7 +734,7 @@ svm_fifo_has_event (svm_fifo_t * f)
 always_inline u8
 svm_fifo_set_event (svm_fifo_t * f)
 {
-  return !clib_atomic_swap_rel_n (&f->has_event, 1);
+  return !clib_atomic_swap_rel_n (&f->shr->has_event, 1);
 }
 
 /**
@@ -760,7 +747,7 @@ svm_fifo_set_event (svm_fifo_t * f)
 always_inline void
 svm_fifo_unset_event (svm_fifo_t * f)
 {
-  clib_atomic_swap_acq_n (&f->has_event, 0);
+  clib_atomic_swap_acq_n (&f->shr->has_event, 0);
 }
 
 /**
@@ -774,7 +761,7 @@ svm_fifo_unset_event (svm_fifo_t * f)
 static inline void
 svm_fifo_add_want_deq_ntf (svm_fifo_t * f, u8 ntf_type)
 {
-  f->want_deq_ntf |= ntf_type;
+  f->shr->want_deq_ntf |= ntf_type;
 }
 
 /**
@@ -788,7 +775,7 @@ svm_fifo_add_want_deq_ntf (svm_fifo_t * f, u8 ntf_type)
 static inline void
 svm_fifo_del_want_deq_ntf (svm_fifo_t * f, u8 ntf_type)
 {
-  f->want_deq_ntf &= ~ntf_type;
+  f->shr->want_deq_ntf &= ~ntf_type;
 }
 
 /**
@@ -806,8 +793,10 @@ static inline void
 svm_fifo_clear_deq_ntf (svm_fifo_t * f)
 {
   /* Set the flag if want_notif_if_full was the only ntf requested */
-  f->has_deq_ntf = f->want_deq_ntf == SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL;
-  svm_fifo_del_want_deq_ntf (f, SVM_FIFO_WANT_DEQ_NOTIF);
+  f->shr->has_deq_ntf =
+    f->shr->want_deq_ntf == SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL;
+  svm_fifo_del_want_deq_ntf (f, SVM_FIFO_WANT_DEQ_NOTIF |
+                                 SVM_FIFO_WANT_DEQ_NOTIF_IF_LEQ_THRESH);
 }
 
 /**
@@ -822,7 +811,7 @@ svm_fifo_clear_deq_ntf (svm_fifo_t * f)
 static inline void
 svm_fifo_reset_has_deq_ntf (svm_fifo_t * f)
 {
-  f->has_deq_ntf = 0;
+  f->shr->has_deq_ntf = 0;
 }
 
 /**
@@ -838,7 +827,7 @@ svm_fifo_reset_has_deq_ntf (svm_fifo_t * f)
 static inline u8
 svm_fifo_needs_deq_ntf (svm_fifo_t * f, u32 n_last_deq)
 {
-  u8 want_ntf = f->want_deq_ntf;
+  u8 want_ntf = f->shr->want_deq_ntf;
 
   if (PREDICT_TRUE (want_ntf == SVM_FIFO_NO_DEQ_NOTIF))
     return 0;
@@ -847,19 +836,37 @@ svm_fifo_needs_deq_ntf (svm_fifo_t * f, u32 n_last_deq)
   if (want_ntf & SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL)
     {
       u32 max_deq = svm_fifo_max_dequeue_cons (f);
-      u32 nitems = f->nitems;
-      if (!f->has_deq_ntf && max_deq < nitems
-         && max_deq + n_last_deq >= nitems)
+      u32 size = f->shr->size;
+      if (!f->shr->has_deq_ntf && max_deq < size &&
+         max_deq + n_last_deq >= size)
        return 1;
     }
   if (want_ntf & SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY)
     {
-      if (!f->has_deq_ntf && svm_fifo_is_empty (f))
+      if (!f->shr->has_deq_ntf && svm_fifo_is_empty (f))
+       return 1;
+    }
+  if (want_ntf & SVM_FIFO_WANT_DEQ_NOTIF_IF_LEQ_THRESH)
+    {
+      if (!f->shr->has_deq_ntf &&
+         (svm_fifo_max_dequeue (f) <= f->shr->deq_thresh))
        return 1;
     }
   return 0;
 }
 
+/**
+ * Set the fifo dequeue threshold which will be used for notifications.
+ *
+ * Note: If not set, by default threshold is zero, equivalent to
+ * empty.
+ */
+static inline void
+svm_fifo_set_deq_thresh (svm_fifo_t *f, u32 thresh)
+{
+  f->shr->deq_thresh = thresh;
+}
+
 #endif /* __included_ssvm_fifo_h__ */
 
 /*