svm: reorganize fifo march code
[vpp.git] / src / svm / svm_fifo.c
1 /*
2  * Copyright (c) 2016-2019 Cisco and/or its affiliates.
3  * Copyright (c) 2019 Arm Limited
4  * Copyright (c) 2010-2017 Intel Corporation and/or its affiliates.
5  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
6  * Inspired from DPDK rte_ring.h (SPSC only) (derived from freebsd bufring.h).
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at:
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 #include <svm/svm_fifo.h>
21 #include <vppinfra/cpu.h>
22
23 CLIB_MARCH_FN (svm_fifo_copy_to_chunk, void, svm_fifo_t * f,
24                svm_fifo_chunk_t * c, u32 tail_idx, const u8 * src, u32 len,
25                u8 update_tail)
26 {
27   u32 n_chunk;
28
29   ASSERT (tail_idx >= c->start_byte && tail_idx < c->start_byte + c->length);
30
31   tail_idx -= c->start_byte;
32   n_chunk = c->length - tail_idx;
33   if (n_chunk < len)
34     {
35       u32 to_copy = len;
36       clib_memcpy_fast (&c->data[tail_idx], src, n_chunk);
37       while ((to_copy -= n_chunk))
38         {
39           c = c->next;
40           n_chunk = clib_min (c->length, to_copy);
41           clib_memcpy_fast (&c->data[0], src + (len - to_copy), n_chunk);
42         }
43       if (update_tail)
44         f->tail_chunk = c;
45     }
46   else
47     {
48       clib_memcpy_fast (&c->data[tail_idx], src, len);
49     }
50 }
51
52 CLIB_MARCH_FN (svm_fifo_copy_from_chunk, void, svm_fifo_t * f,
53                svm_fifo_chunk_t * c, u32 head_idx, u8 * dst, u32 len,
54                u8 update_head)
55 {
56   u32 n_chunk;
57
58   head_idx -= c->start_byte;
59   n_chunk = c->length - head_idx;
60   if (n_chunk < len)
61     {
62       u32 to_copy = len;
63       clib_memcpy_fast (dst, &c->data[head_idx], n_chunk);
64       while ((to_copy -= n_chunk))
65         {
66           c = c->next;
67           n_chunk = clib_min (c->length, to_copy);
68           clib_memcpy_fast (dst + (len - to_copy), &c->data[0], n_chunk);
69         }
70       if (update_head)
71         f->head_chunk = c;
72     }
73   else
74     {
75       clib_memcpy_fast (dst, &c->data[head_idx], len);
76     }
77 }
78
79 #ifndef CLIB_MARCH_VARIANT
80
81 static inline void
82 svm_fifo_copy_to_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c, u32 tail_idx,
83                         const u8 * src, u32 len, u8 update_tail)
84 {
85   CLIB_MARCH_FN_SELECT (svm_fifo_copy_to_chunk) (f, c, tail_idx, src, len,
86                                                  update_tail);
87 }
88
89 static inline void
90 svm_fifo_copy_from_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c, u32 head_idx,
91                           u8 * dst, u32 len, u8 update_head)
92 {
93   CLIB_MARCH_FN_SELECT (svm_fifo_copy_from_chunk) (f, c, head_idx, dst, len,
94                                                    update_head);
95 }
96
97 static inline u8
98 position_lt (svm_fifo_t * f, u32 a, u32 b, u32 tail)
99 {
100   return (ooo_segment_distance_from_tail (f, a, tail)
101           < ooo_segment_distance_from_tail (f, b, tail));
102 }
103
104 static inline u8
105 position_leq (svm_fifo_t * f, u32 a, u32 b, u32 tail)
106 {
107   return (ooo_segment_distance_from_tail (f, a, tail)
108           <= ooo_segment_distance_from_tail (f, b, tail));
109 }
110
111 static inline u8
112 position_gt (svm_fifo_t * f, u32 a, u32 b, u32 tail)
113 {
114   return (ooo_segment_distance_from_tail (f, a, tail)
115           > ooo_segment_distance_from_tail (f, b, tail));
116 }
117
118 static inline u32
119 position_diff (svm_fifo_t * f, u32 posa, u32 posb, u32 tail)
120 {
121   return ooo_segment_distance_from_tail (f, posa, tail)
122     - ooo_segment_distance_from_tail (f, posb, tail);
123 }
124
125 static inline u32
126 ooo_segment_end_pos (svm_fifo_t * f, ooo_segment_t * s)
127 {
128   return s->start + s->length;
129 }
130
131 u8 *
132 format_ooo_segment (u8 * s, va_list * args)
133 {
134   svm_fifo_t *f = va_arg (*args, svm_fifo_t *);
135   ooo_segment_t *seg = va_arg (*args, ooo_segment_t *);
136   u32 normalized_start = (seg->start + f->nitems - f->tail) % f->size;
137   s = format (s, "[%u, %u], len %u, next %d, prev %d", normalized_start,
138               (normalized_start + seg->length) % f->size, seg->length,
139               seg->next, seg->prev);
140   return s;
141 }
142
143 u8 *
144 svm_fifo_dump_trace (u8 * s, svm_fifo_t * f)
145 {
146 #if SVM_FIFO_TRACE
147   svm_fifo_trace_elem_t *seg = 0;
148   int i = 0;
149
150   if (f->trace)
151     {
152       vec_foreach (seg, f->trace)
153       {
154         s = format (s, "{%u, %u, %u}, ", seg->offset, seg->len, seg->action);
155         i++;
156         if (i % 5 == 0)
157           s = format (s, "\n");
158       }
159       s = format (s, "\n");
160     }
161   return s;
162 #else
163   return 0;
164 #endif
165 }
166
167 u8 *
168 svm_fifo_replay (u8 * s, svm_fifo_t * f, u8 no_read, u8 verbose)
169 {
170   int i, trace_len;
171   u8 *data = 0;
172   svm_fifo_trace_elem_t *trace;
173   u32 offset;
174   svm_fifo_t *dummy_fifo;
175
176   if (!f)
177     return s;
178
179 #if SVM_FIFO_TRACE
180   trace = f->trace;
181   trace_len = vec_len (trace);
182 #else
183   trace = 0;
184   trace_len = 0;
185 #endif
186
187   dummy_fifo = svm_fifo_create (f->size);
188   clib_memset (f->head_chunk->data, 0xFF, f->nitems);
189   vec_validate (data, f->nitems);
190   for (i = 0; i < vec_len (data); i++)
191     data[i] = i;
192
193   for (i = 0; i < trace_len; i++)
194     {
195       offset = trace[i].offset;
196       if (trace[i].action == 1)
197         {
198           if (verbose)
199             s = format (s, "adding [%u, %u]:", trace[i].offset,
200                         (trace[i].offset + trace[i].len) % dummy_fifo->size);
201           svm_fifo_enqueue_with_offset (dummy_fifo, trace[i].offset,
202                                         trace[i].len, &data[offset]);
203         }
204       else if (trace[i].action == 2)
205         {
206           if (verbose)
207             s = format (s, "adding [%u, %u]:", 0, trace[i].len);
208           svm_fifo_enqueue_nowait (dummy_fifo, trace[i].len, &data[offset]);
209         }
210       else if (!no_read)
211         {
212           if (verbose)
213             s = format (s, "read: %u", trace[i].len);
214           svm_fifo_dequeue_drop (dummy_fifo, trace[i].len);
215         }
216       if (verbose)
217         s = format (s, "%U", format_svm_fifo, dummy_fifo, 1);
218     }
219
220   s = format (s, "result: %U", format_svm_fifo, dummy_fifo, 1);
221
222   return s;
223 }
224
225 u8 *
226 format_ooo_list (u8 * s, va_list * args)
227 {
228   svm_fifo_t *f = va_arg (*args, svm_fifo_t *);
229   u32 indent = va_arg (*args, u32);
230   u32 ooo_segment_index = f->ooos_list_head;
231   ooo_segment_t *seg;
232
233   while (ooo_segment_index != OOO_SEGMENT_INVALID_INDEX)
234     {
235       seg = pool_elt_at_index (f->ooo_segments, ooo_segment_index);
236       s = format (s, "%U%U\n", format_white_space, indent, format_ooo_segment,
237                   f, seg);
238       ooo_segment_index = seg->next;
239     }
240
241   return s;
242 }
243
244 u8 *
245 format_svm_fifo (u8 * s, va_list * args)
246 {
247   svm_fifo_t *f = va_arg (*args, svm_fifo_t *);
248   int verbose = va_arg (*args, int);
249   u32 indent;
250
251   if (!s)
252     return s;
253
254   indent = format_get_indent (s);
255   s = format (s, "cursize %u nitems %u has_event %d\n",
256               svm_fifo_max_dequeue (f), f->nitems, f->has_event);
257   s = format (s, "%Uhead %u tail %u segment manager %u\n", format_white_space,
258               indent, (f->head % f->size), (f->tail % f->size),
259               f->segment_manager);
260
261   if (verbose > 1)
262     s = format (s, "%Uvpp session %d thread %d app session %d thread %d\n",
263                 format_white_space, indent, f->master_session_index,
264                 f->master_thread_index, f->client_session_index,
265                 f->client_thread_index);
266
267   if (verbose)
268     {
269       s = format (s, "%Uooo pool %d active elts newest %u\n",
270                   format_white_space, indent, pool_elts (f->ooo_segments),
271                   f->ooos_newest);
272       if (svm_fifo_has_ooo_data (f))
273         s = format (s, " %U", format_ooo_list, f, indent, verbose);
274     }
275   return s;
276 }
277
278 void
279 svm_fifo_init (svm_fifo_t * f, u32 size)
280 {
281   f->size = size;
282   /*
283    * usable size of the fifo set to rounded_data_size - 1
284    * to differentiate between free fifo and empty fifo.
285    */
286   f->nitems = f->size - 1;
287   f->ooos_list_head = OOO_SEGMENT_INVALID_INDEX;
288   f->ct_session_index = SVM_FIFO_INVALID_SESSION_INDEX;
289   f->segment_index = SVM_FIFO_INVALID_INDEX;
290   f->refcnt = 1;
291   f->default_chunk.start_byte = 0;
292   f->default_chunk.length = f->size;
293   f->default_chunk.next = f->start_chunk = &f->default_chunk;
294   f->end_chunk = f->head_chunk = f->tail_chunk = f->start_chunk;
295 }
296
297 /** create an svm fifo, in the current heap. Fails vs blow up the process */
298 svm_fifo_t *
299 svm_fifo_create (u32 data_size_in_bytes)
300 {
301   svm_fifo_t *f;
302   u32 rounded_data_size;
303
304   /* always round fifo data size to the next highest power-of-two */
305   rounded_data_size = (1 << (max_log2 (data_size_in_bytes)));
306   f = clib_mem_alloc_aligned_or_null (sizeof (*f) + rounded_data_size,
307                                       CLIB_CACHE_LINE_BYTES);
308   if (f == 0)
309     return 0;
310
311   clib_memset (f, 0, sizeof (*f));
312   svm_fifo_init (f, data_size_in_bytes);
313   return f;
314 }
315
316 static inline void
317 svm_fifo_size_update (svm_fifo_t * f, svm_fifo_chunk_t * c)
318 {
319   svm_fifo_chunk_t *prev;
320   u32 add_bytes = 0;
321
322   prev = f->end_chunk;
323   while (c)
324     {
325       c->start_byte = prev->start_byte + prev->length;
326       add_bytes += c->length;
327       prev->next = c;
328       prev = c;
329       c = c->next;
330     }
331   f->end_chunk = prev;
332   prev->next = f->start_chunk;
333   f->size += add_bytes;
334   f->nitems = f->size - 1;
335   f->new_chunks = 0;
336 }
337
338 static void
339 svm_fifo_try_size_update (svm_fifo_t * f, u32 new_head)
340 {
341   if (new_head % f->size > f->tail % f->size)
342     return;
343
344   svm_fifo_size_update (f, f->new_chunks);
345   f->flags &= ~SVM_FIFO_F_SIZE_UPDATE;
346 }
347
348 void
349 svm_fifo_add_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c)
350 {
351   if (svm_fifo_is_wrapped (f))
352     {
353       if (f->new_chunks)
354         {
355           svm_fifo_chunk_t *prev;
356
357           prev = f->new_chunks;
358           while (prev->next)
359             prev = prev->next;
360           prev->next = c;
361         }
362       else
363         {
364           f->new_chunks = c;
365         }
366       f->flags |= SVM_FIFO_F_SIZE_UPDATE;
367       return;
368     }
369
370   svm_fifo_size_update (f, c);
371 }
372
373 void
374 svm_fifo_free (svm_fifo_t * f)
375 {
376   ASSERT (f->refcnt > 0);
377
378   if (--f->refcnt == 0)
379     {
380       pool_free (f->ooo_segments);
381       clib_mem_free (f);
382     }
383 }
384
385 always_inline ooo_segment_t *
386 ooo_segment_new (svm_fifo_t * f, u32 start, u32 length)
387 {
388   ooo_segment_t *s;
389
390   pool_get (f->ooo_segments, s);
391
392   s->start = start;
393   s->length = length;
394
395   s->prev = s->next = OOO_SEGMENT_INVALID_INDEX;
396
397   return s;
398 }
399
400 always_inline void
401 ooo_segment_del (svm_fifo_t * f, u32 index)
402 {
403   ooo_segment_t *cur, *prev = 0, *next = 0;
404   cur = pool_elt_at_index (f->ooo_segments, index);
405
406   if (cur->next != OOO_SEGMENT_INVALID_INDEX)
407     {
408       next = pool_elt_at_index (f->ooo_segments, cur->next);
409       next->prev = cur->prev;
410     }
411
412   if (cur->prev != OOO_SEGMENT_INVALID_INDEX)
413     {
414       prev = pool_elt_at_index (f->ooo_segments, cur->prev);
415       prev->next = cur->next;
416     }
417   else
418     {
419       f->ooos_list_head = cur->next;
420     }
421
422   pool_put (f->ooo_segments, cur);
423 }
424
425 /**
426  * Add segment to fifo's out-of-order segment list. Takes care of merging
427  * adjacent segments and removing overlapping ones.
428  */
429 static void
430 ooo_segment_add (svm_fifo_t * f, u32 offset, u32 head, u32 tail, u32 length)
431 {
432   ooo_segment_t *s, *new_s, *prev, *next, *it;
433   u32 new_index, s_end_pos, s_index;
434   u32 offset_pos, offset_end_pos;
435
436   ASSERT (offset + length <= ooo_segment_distance_from_tail (f, head, tail)
437           || head == tail);
438
439   offset_pos = tail + offset;
440   offset_end_pos = tail + offset + length;
441
442   f->ooos_newest = OOO_SEGMENT_INVALID_INDEX;
443
444   if (f->ooos_list_head == OOO_SEGMENT_INVALID_INDEX)
445     {
446       s = ooo_segment_new (f, offset_pos, length);
447       f->ooos_list_head = s - f->ooo_segments;
448       f->ooos_newest = f->ooos_list_head;
449       return;
450     }
451
452   /* Find first segment that starts after new segment */
453   s = pool_elt_at_index (f->ooo_segments, f->ooos_list_head);
454   while (s->next != OOO_SEGMENT_INVALID_INDEX
455          && position_lt (f, s->start, offset_pos, tail))
456     s = pool_elt_at_index (f->ooo_segments, s->next);
457
458   /* If we have a previous and we overlap it, use it as starting point */
459   prev = ooo_segment_get_prev (f, s);
460   if (prev
461       && position_leq (f, offset_pos, ooo_segment_end_pos (f, prev), tail))
462     {
463       s = prev;
464       s_end_pos = ooo_segment_end_pos (f, s);
465
466       /* Since we have previous, offset start position cannot be smaller
467        * than prev->start. Check tail */
468       ASSERT (position_lt (f, s->start, offset_pos, tail));
469       goto check_tail;
470     }
471
472   s_index = s - f->ooo_segments;
473   s_end_pos = ooo_segment_end_pos (f, s);
474
475   /* No overlap, add before current segment */
476   if (position_lt (f, offset_end_pos, s->start, tail))
477     {
478       new_s = ooo_segment_new (f, offset_pos, length);
479       new_index = new_s - f->ooo_segments;
480
481       /* Pool might've moved, get segment again */
482       s = pool_elt_at_index (f->ooo_segments, s_index);
483       if (s->prev != OOO_SEGMENT_INVALID_INDEX)
484         {
485           new_s->prev = s->prev;
486           prev = pool_elt_at_index (f->ooo_segments, new_s->prev);
487           prev->next = new_index;
488         }
489       else
490         {
491           /* New head */
492           f->ooos_list_head = new_index;
493         }
494
495       new_s->next = s_index;
496       s->prev = new_index;
497       f->ooos_newest = new_index;
498       return;
499     }
500   /* No overlap, add after current segment */
501   else if (position_gt (f, offset_pos, s_end_pos, tail))
502     {
503       new_s = ooo_segment_new (f, offset_pos, length);
504       new_index = new_s - f->ooo_segments;
505
506       /* Pool might've moved, get segment again */
507       s = pool_elt_at_index (f->ooo_segments, s_index);
508
509       /* Needs to be last */
510       ASSERT (s->next == OOO_SEGMENT_INVALID_INDEX);
511
512       new_s->prev = s_index;
513       s->next = new_index;
514       f->ooos_newest = new_index;
515
516       return;
517     }
518
519   /*
520    * Merge needed
521    */
522
523   /* Merge at head */
524   if (position_lt (f, offset_pos, s->start, tail))
525     {
526       s->start = offset_pos;
527       s->length = position_diff (f, s_end_pos, s->start, tail);
528       f->ooos_newest = s - f->ooo_segments;
529     }
530
531 check_tail:
532
533   /* Overlapping tail */
534   if (position_gt (f, offset_end_pos, s_end_pos, tail))
535     {
536       s->length = position_diff (f, offset_end_pos, s->start, tail);
537
538       /* Remove the completely overlapped segments in the tail */
539       it = ooo_segment_next (f, s);
540       while (it && position_leq (f, ooo_segment_end_pos (f, it),
541                                  offset_end_pos, tail))
542         {
543           next = ooo_segment_next (f, it);
544           ooo_segment_del (f, it - f->ooo_segments);
545           it = next;
546         }
547
548       /* If partial overlap with last, merge */
549       if (it && position_leq (f, it->start, offset_end_pos, tail))
550         {
551           s->length = position_diff (f, ooo_segment_end_pos (f, it),
552                                      s->start, tail);
553           ooo_segment_del (f, it - f->ooo_segments);
554         }
555       f->ooos_newest = s - f->ooo_segments;
556     }
557 }
558
559 /**
560  * Removes segments that can now be enqueued because the fifo's tail has
561  * advanced. Returns the number of bytes added to tail.
562  */
563 static int
564 ooo_segment_try_collect (svm_fifo_t * f, u32 n_bytes_enqueued, u32 * tail)
565 {
566   ooo_segment_t *s;
567   u32 index, bytes = 0;
568   i32 diff;
569
570   s = pool_elt_at_index (f->ooo_segments, f->ooos_list_head);
571   diff = ooo_segment_distance_to_tail (f, s->start, *tail);
572
573   ASSERT (diff != n_bytes_enqueued);
574
575   if (diff > n_bytes_enqueued)
576     return 0;
577
578   /* If last tail update overlaps one/multiple ooo segments, remove them */
579   while (0 <= diff && diff < n_bytes_enqueued)
580     {
581       index = s - f->ooo_segments;
582
583       /* Segment end is beyond the tail. Advance tail and remove segment */
584       if (s->length > diff)
585         {
586           bytes = s->length - diff;
587           *tail = *tail + bytes;
588           ooo_segment_del (f, index);
589           break;
590         }
591
592       /* If we have next go on */
593       if (s->next != OOO_SEGMENT_INVALID_INDEX)
594         {
595           s = pool_elt_at_index (f->ooo_segments, s->next);
596           diff = ooo_segment_distance_to_tail (f, s->start, *tail);
597           ooo_segment_del (f, index);
598         }
599       /* End of search */
600       else
601         {
602           ooo_segment_del (f, index);
603           break;
604         }
605     }
606
607   ASSERT (bytes <= f->nitems);
608   return bytes;
609 }
610
611 void
612 svm_fifo_overwrite_head (svm_fifo_t * f, u8 * data, u32 len)
613 {
614   u32 n_chunk;
615   u32 head, tail, head_idx;
616   svm_fifo_chunk_t *c;
617
618   ASSERT (len <= f->nitems);
619
620   f_load_head_tail_cons (f, &head, &tail);
621   c = f->head_chunk;
622   head_idx = head % f->size;
623   head_idx -= c->start_byte;
624   n_chunk = c->length - head_idx;
625   if (len <= n_chunk)
626     clib_memcpy_fast (&c->data[head_idx], data, len);
627   else
628     {
629       clib_memcpy_fast (&c->data[head_idx], data, n_chunk);
630       clib_memcpy_fast (&c->next->data[0], data + n_chunk, len - n_chunk);
631     }
632 }
633
634 int
635 svm_fifo_enqueue_nowait (svm_fifo_t * f, u32 len, const u8 * src)
636 {
637   u32 tail, head, free_count;
638
639   f_load_head_tail_prod (f, &head, &tail);
640
641   /* free space in fifo can only increase during enqueue: SPSC */
642   free_count = f_free_count (f, head, tail);
643
644   f->ooos_newest = OOO_SEGMENT_INVALID_INDEX;
645
646   if (PREDICT_FALSE (free_count == 0))
647     return SVM_FIFO_FULL;
648
649   /* number of bytes we're going to copy */
650   len = clib_min (free_count, len);
651
652   svm_fifo_copy_to_chunk (f, f->tail_chunk, tail % f->size, src, len,
653                           1 /* update tail */ );
654   tail += len;
655
656   svm_fifo_trace_add (f, head, n_total, 2);
657
658   /* collect out-of-order segments */
659   if (PREDICT_FALSE (f->ooos_list_head != OOO_SEGMENT_INVALID_INDEX))
660     len += ooo_segment_try_collect (f, len, &tail);
661
662   /* store-rel: producer owned index (paired with load-acq in consumer) */
663   clib_atomic_store_rel_n (&f->tail, tail);
664
665   return len;
666 }
667
668 /**
669  * Enqueue a future segment.
670  *
671  * Two choices: either copies the entire segment, or copies nothing
672  * Returns 0 of the entire segment was copied
673  * Returns -1 if none of the segment was copied due to lack of space
674  */
675 int
676 svm_fifo_enqueue_with_offset (svm_fifo_t * f, u32 offset, u32 len, u8 * src)
677 {
678   u32 tail, head, free_count;
679
680   f_load_head_tail_prod (f, &head, &tail);
681
682   /* free space in fifo can only increase during enqueue: SPSC */
683   free_count = f_free_count (f, head, tail);
684
685   /* will this request fit? */
686   if ((len + offset) > free_count)
687     return -1;
688
689   f->ooos_newest = OOO_SEGMENT_INVALID_INDEX;
690
691   svm_fifo_trace_add (f, offset, len, 1);
692
693   ooo_segment_add (f, offset, head, tail, len);
694
695   svm_fifo_copy_to_chunk (f, f->tail_chunk, (tail + offset) % f->size, src,
696                           len, 0 /* update tail */ );
697
698   return 0;
699 }
700
701 int
702 svm_fifo_dequeue_nowait (svm_fifo_t * f, u32 len, u8 * dst)
703 {
704   u32 tail, head, cursize;
705
706   f_load_head_tail_cons (f, &head, &tail);
707
708   /* current size of fifo can only increase during dequeue: SPSC */
709   cursize = f_cursize (f, head, tail);
710
711   if (PREDICT_FALSE (cursize == 0))
712     return -2;                  /* nothing in the fifo */
713
714   len = clib_min (cursize, len);
715   ASSERT (cursize >= len);
716
717   svm_fifo_copy_from_chunk (f, f->head_chunk, head % f->size, dst, len,
718                             1 /* update head */ );
719   head += len;
720
721   if (PREDICT_FALSE (f->flags & SVM_FIFO_F_SIZE_UPDATE))
722     svm_fifo_try_size_update (f, head);
723
724   /* store-rel: consumer owned index (paired with load-acq in producer) */
725   clib_atomic_store_rel_n (&f->head, head);
726
727   return len;
728 }
729
730 int
731 svm_fifo_peek (svm_fifo_t * f, u32 relative_offset, u32 len, u8 * dst)
732 {
733   u32 tail, head, cursize;
734
735   f_load_head_tail_cons (f, &head, &tail);
736
737   /* current size of fifo can only increase during peek: SPSC */
738   cursize = f_cursize (f, head, tail);
739
740   if (PREDICT_FALSE (cursize < relative_offset))
741     return -2;                  /* nothing in the fifo */
742
743   len = clib_min (cursize - relative_offset, len);
744
745   svm_fifo_copy_from_chunk (f, f->head_chunk,
746                             (head + relative_offset) % f->size, dst, len,
747                             0 /* update head */ );
748   return len;
749 }
750
751 int
752 svm_fifo_dequeue_drop (svm_fifo_t * f, u32 max_bytes)
753 {
754   u32 total_drop_bytes;
755   u32 tail, head, cursize;
756
757   f_load_head_tail_cons (f, &head, &tail);
758
759   /* number of bytes we're going to drop */
760   cursize = f_cursize (f, head, tail);
761
762   if (PREDICT_FALSE (cursize == 0))
763     return -2;                  /* nothing in the fifo */
764
765   svm_fifo_trace_add (f, tail, total_drop_bytes, 3);
766
767   /* number of bytes we're going to drop */
768   total_drop_bytes = (cursize < max_bytes) ? cursize : max_bytes;
769
770   /* move head */
771   head += total_drop_bytes;
772
773   ASSERT (cursize >= total_drop_bytes);
774   /* store-rel: consumer owned index (paired with load-acq in producer) */
775   clib_atomic_store_rel_n (&f->head, head);
776
777   return total_drop_bytes;
778 }
779
780 void
781 svm_fifo_dequeue_drop_all (svm_fifo_t * f)
782 {
783   /* consumer foreign index */
784   u32 tail = clib_atomic_load_acq_n (&f->tail);
785   /* store-rel: consumer owned index (paired with load-acq in producer) */
786   clib_atomic_store_rel_n (&f->head, tail);
787 }
788
789 int
790 svm_fifo_segments (svm_fifo_t * f, svm_fifo_segment_t * fs)
791 {
792   u32 cursize, head, tail, head_idx;
793
794   f_load_head_tail_cons (f, &head, &tail);
795
796   /* consumer function, cursize can only increase while we're working */
797   cursize = f_cursize (f, head, tail);
798
799   if (PREDICT_FALSE (cursize == 0))
800     return -2;                  /* nothing in the fifo */
801
802   head_idx = head % f->size;
803
804   if (tail < head)
805     {
806       fs[0].len = f->size - head_idx;
807       fs[0].data = f->head_chunk->data + head_idx;
808       fs[1].len = cursize - fs[0].len;
809       fs[1].data = f->head_chunk->data;
810     }
811   else
812     {
813       fs[0].len = cursize;
814       fs[0].data = f->head_chunk->data + head_idx;
815       fs[1].len = 0;
816       fs[1].data = 0;
817     }
818   return cursize;
819 }
820
821 void
822 svm_fifo_segments_free (svm_fifo_t * f, svm_fifo_segment_t * fs)
823 {
824   u32 head, head_idx;
825
826   /* consumer owned index */
827   head = f->head;
828   head_idx = head % f->size;
829
830   ASSERT (fs[0].data == f->head_chunk->data + head_idx);
831   head += fs[0].len + fs[1].len;
832   /* store-rel: consumer owned index (paired with load-acq in producer) */
833   clib_atomic_store_rel_n (&f->head, head);
834 }
835
836 /* Assumption: no prod and cons are accessing either dest or src fifo */
837 void
838 svm_fifo_clone (svm_fifo_t * df, svm_fifo_t * sf)
839 {
840   u32 head, tail;
841   clib_memcpy_fast (df->head_chunk->data, sf->head_chunk->data, sf->size);
842
843   f_load_head_tail_all_acq (sf, &head, &tail);
844   clib_atomic_store_rel_n (&df->head, head);
845   clib_atomic_store_rel_n (&df->tail, tail);
846 }
847
848 u32
849 svm_fifo_number_ooo_segments (svm_fifo_t * f)
850 {
851   return pool_elts (f->ooo_segments);
852 }
853
854 ooo_segment_t *
855 svm_fifo_first_ooo_segment (svm_fifo_t * f)
856 {
857   return pool_elt_at_index (f->ooo_segments, f->ooos_list_head);
858 }
859
860 /**
861  * Set fifo pointers to requested offset
862  */
863 void
864 svm_fifo_init_pointers (svm_fifo_t * f, u32 pointer)
865 {
866   clib_atomic_store_rel_n (&f->head, pointer);
867   clib_atomic_store_rel_n (&f->tail, pointer);
868 }
869
870 void
871 svm_fifo_add_subscriber (svm_fifo_t * f, u8 subscriber)
872 {
873   if (f->n_subscribers >= SVM_FIFO_MAX_EVT_SUBSCRIBERS)
874     return;
875   f->subscribers[f->n_subscribers++] = subscriber;
876 }
877
878 void
879 svm_fifo_del_subscriber (svm_fifo_t * f, u8 subscriber)
880 {
881   int i;
882
883   for (i = 0; i < f->n_subscribers; i++)
884     {
885       if (f->subscribers[i] != subscriber)
886         continue;
887       f->subscribers[i] = f->subscribers[f->n_subscribers - 1];
888       f->n_subscribers--;
889       break;
890     }
891 }
892
893 #endif
894 /*
895  * fd.io coding-style-patch-verification: ON
896  *
897  * Local Variables:
898  * eval: (c-set-style "gnu")
899  * End:
900  */