svm_fifo rework to avoid contention on cursize
[vpp.git] / src / svm / svm_fifo.c
1 /*
2  * Copyright (c) 2016-2019 Cisco and/or its affiliates.
3  * Copyright (c) 2019 Arm Limited
4  * Copyright (c) 2010-2017 Intel Corporation and/or its affiliates.
5  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
6  * Inspired from DPDK rte_ring.h (SPSC only) (derived from freebsd bufring.h).
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at:
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 #include <svm/svm_fifo.h>
21 #include <vppinfra/cpu.h>
22
23 static inline u8
24 position_lt (svm_fifo_t * f, u32 a, u32 b, u32 tail)
25 {
26   return (ooo_segment_distance_from_tail (f, a, tail)
27           < ooo_segment_distance_from_tail (f, b, tail));
28 }
29
30 static inline u8
31 position_leq (svm_fifo_t * f, u32 a, u32 b, u32 tail)
32 {
33   return (ooo_segment_distance_from_tail (f, a, tail)
34           <= ooo_segment_distance_from_tail (f, b, tail));
35 }
36
37 static inline u8
38 position_gt (svm_fifo_t * f, u32 a, u32 b, u32 tail)
39 {
40   return (ooo_segment_distance_from_tail (f, a, tail)
41           > ooo_segment_distance_from_tail (f, b, tail));
42 }
43
44 static inline u32
45 position_diff (svm_fifo_t * f, u32 posa, u32 posb, u32 tail)
46 {
47   return ooo_segment_distance_from_tail (f, posa, tail)
48     - ooo_segment_distance_from_tail (f, posb, tail);
49 }
50
51 static inline u32
52 ooo_segment_end_pos (svm_fifo_t * f, ooo_segment_t * s)
53 {
54   return s->start + s->length;
55 }
56
57 #ifndef CLIB_MARCH_VARIANT
58
59 u8 *
60 format_ooo_segment (u8 * s, va_list * args)
61 {
62   svm_fifo_t *f = va_arg (*args, svm_fifo_t *);
63   ooo_segment_t *seg = va_arg (*args, ooo_segment_t *);
64   u32 normalized_start = (seg->start + f->nitems - f->tail) % f->size;
65   s = format (s, "[%u, %u], len %u, next %d, prev %d", normalized_start,
66               (normalized_start + seg->length) % f->size, seg->length,
67               seg->next, seg->prev);
68   return s;
69 }
70
71 u8 *
72 svm_fifo_dump_trace (u8 * s, svm_fifo_t * f)
73 {
74 #if SVM_FIFO_TRACE
75   svm_fifo_trace_elem_t *seg = 0;
76   int i = 0;
77
78   if (f->trace)
79     {
80       vec_foreach (seg, f->trace)
81       {
82         s = format (s, "{%u, %u, %u}, ", seg->offset, seg->len, seg->action);
83         i++;
84         if (i % 5 == 0)
85           s = format (s, "\n");
86       }
87       s = format (s, "\n");
88     }
89   return s;
90 #else
91   return 0;
92 #endif
93 }
94
95 u8 *
96 svm_fifo_replay (u8 * s, svm_fifo_t * f, u8 no_read, u8 verbose)
97 {
98   int i, trace_len;
99   u8 *data = 0;
100   svm_fifo_trace_elem_t *trace;
101   u32 offset;
102   svm_fifo_t *dummy_fifo;
103
104   if (!f)
105     return s;
106
107 #if SVM_FIFO_TRACE
108   trace = f->trace;
109   trace_len = vec_len (trace);
110 #else
111   trace = 0;
112   trace_len = 0;
113 #endif
114
115   dummy_fifo = svm_fifo_create (f->size);
116   clib_memset (f->data, 0xFF, f->nitems);
117
118   vec_validate (data, f->nitems);
119   for (i = 0; i < vec_len (data); i++)
120     data[i] = i;
121
122   for (i = 0; i < trace_len; i++)
123     {
124       offset = trace[i].offset;
125       if (trace[i].action == 1)
126         {
127           if (verbose)
128             s = format (s, "adding [%u, %u]:", trace[i].offset,
129                         (trace[i].offset + trace[i].len) % dummy_fifo->size);
130           svm_fifo_enqueue_with_offset (dummy_fifo, trace[i].offset,
131                                         trace[i].len, &data[offset]);
132         }
133       else if (trace[i].action == 2)
134         {
135           if (verbose)
136             s = format (s, "adding [%u, %u]:", 0, trace[i].len);
137           svm_fifo_enqueue_nowait (dummy_fifo, trace[i].len, &data[offset]);
138         }
139       else if (!no_read)
140         {
141           if (verbose)
142             s = format (s, "read: %u", trace[i].len);
143           svm_fifo_dequeue_drop (dummy_fifo, trace[i].len);
144         }
145       if (verbose)
146         s = format (s, "%U", format_svm_fifo, dummy_fifo, 1);
147     }
148
149   s = format (s, "result: %U", format_svm_fifo, dummy_fifo, 1);
150
151   return s;
152 }
153
154 u8 *
155 format_ooo_list (u8 * s, va_list * args)
156 {
157   svm_fifo_t *f = va_arg (*args, svm_fifo_t *);
158   u32 indent = va_arg (*args, u32);
159   u32 ooo_segment_index = f->ooos_list_head;
160   ooo_segment_t *seg;
161
162   while (ooo_segment_index != OOO_SEGMENT_INVALID_INDEX)
163     {
164       seg = pool_elt_at_index (f->ooo_segments, ooo_segment_index);
165       s = format (s, "%U%U\n", format_white_space, indent, format_ooo_segment,
166                   f, seg);
167       ooo_segment_index = seg->next;
168     }
169
170   return s;
171 }
172
173 u8 *
174 format_svm_fifo (u8 * s, va_list * args)
175 {
176   svm_fifo_t *f = va_arg (*args, svm_fifo_t *);
177   int verbose = va_arg (*args, int);
178   u32 indent;
179
180   if (!s)
181     return s;
182
183   indent = format_get_indent (s);
184   s = format (s, "cursize %u nitems %u has_event %d\n",
185               svm_fifo_max_dequeue (f), f->nitems, f->has_event);
186   s = format (s, "%Uhead %u tail %u segment manager %u\n", format_white_space,
187               indent, (f->head % f->size), (f->tail % f->size),
188               f->segment_manager);
189
190   if (verbose > 1)
191     s = format (s, "%Uvpp session %d thread %d app session %d thread %d\n",
192                 format_white_space, indent, f->master_session_index,
193                 f->master_thread_index, f->client_session_index,
194                 f->client_thread_index);
195
196   if (verbose)
197     {
198       s = format (s, "%Uooo pool %d active elts newest %u\n",
199                   format_white_space, indent, pool_elts (f->ooo_segments),
200                   f->ooos_newest);
201       if (svm_fifo_has_ooo_data (f))
202         s = format (s, " %U", format_ooo_list, f, indent, verbose);
203     }
204   return s;
205 }
206
207 /** create an svm fifo, in the current heap. Fails vs blow up the process */
208 svm_fifo_t *
209 svm_fifo_create (u32 data_size_in_bytes)
210 {
211   svm_fifo_t *f;
212   u32 rounded_data_size;
213
214   /* always round fifo data size to the next highest power-of-two */
215   rounded_data_size = (1 << (max_log2 (data_size_in_bytes)));
216   f = clib_mem_alloc_aligned_or_null (sizeof (*f) + rounded_data_size,
217                                       CLIB_CACHE_LINE_BYTES);
218   if (f == 0)
219     return 0;
220
221   clib_memset (f, 0, sizeof (*f));
222   f->size = rounded_data_size;
223   /*
224    * usable size of the fifo set to rounded_data_size - 1
225    * to differentiate between free fifo and empty fifo.
226    */
227   f->nitems = f->size - 1;
228   f->ooos_list_head = OOO_SEGMENT_INVALID_INDEX;
229   f->ct_session_index = SVM_FIFO_INVALID_SESSION_INDEX;
230   f->segment_index = SVM_FIFO_INVALID_INDEX;
231   f->refcnt = 1;
232   return (f);
233 }
234
235 void
236 svm_fifo_free (svm_fifo_t * f)
237 {
238   ASSERT (f->refcnt > 0);
239
240   if (--f->refcnt == 0)
241     {
242       pool_free (f->ooo_segments);
243       clib_mem_free (f);
244     }
245 }
246 #endif
247
248 always_inline ooo_segment_t *
249 ooo_segment_new (svm_fifo_t * f, u32 start, u32 length)
250 {
251   ooo_segment_t *s;
252
253   pool_get (f->ooo_segments, s);
254
255   s->start = start;
256   s->length = length;
257
258   s->prev = s->next = OOO_SEGMENT_INVALID_INDEX;
259
260   return s;
261 }
262
263 always_inline void
264 ooo_segment_del (svm_fifo_t * f, u32 index)
265 {
266   ooo_segment_t *cur, *prev = 0, *next = 0;
267   cur = pool_elt_at_index (f->ooo_segments, index);
268
269   if (cur->next != OOO_SEGMENT_INVALID_INDEX)
270     {
271       next = pool_elt_at_index (f->ooo_segments, cur->next);
272       next->prev = cur->prev;
273     }
274
275   if (cur->prev != OOO_SEGMENT_INVALID_INDEX)
276     {
277       prev = pool_elt_at_index (f->ooo_segments, cur->prev);
278       prev->next = cur->next;
279     }
280   else
281     {
282       f->ooos_list_head = cur->next;
283     }
284
285   pool_put (f->ooo_segments, cur);
286 }
287
288 /**
289  * Add segment to fifo's out-of-order segment list. Takes care of merging
290  * adjacent segments and removing overlapping ones.
291  */
292 static void
293 ooo_segment_add (svm_fifo_t * f, u32 offset, u32 head, u32 tail, u32 length)
294 {
295   ooo_segment_t *s, *new_s, *prev, *next, *it;
296   u32 new_index, s_end_pos, s_index;
297   u32 offset_pos, offset_end_pos;
298
299   ASSERT (offset + length <= ooo_segment_distance_from_tail (f, head, tail));
300   offset_pos = tail + offset;
301   offset_end_pos = tail + offset + length;
302
303   f->ooos_newest = OOO_SEGMENT_INVALID_INDEX;
304
305   if (f->ooos_list_head == OOO_SEGMENT_INVALID_INDEX)
306     {
307       s = ooo_segment_new (f, offset_pos, length);
308       f->ooos_list_head = s - f->ooo_segments;
309       f->ooos_newest = f->ooos_list_head;
310       return;
311     }
312
313   /* Find first segment that starts after new segment */
314   s = pool_elt_at_index (f->ooo_segments, f->ooos_list_head);
315   while (s->next != OOO_SEGMENT_INVALID_INDEX
316          && position_lt (f, s->start, offset_pos, tail))
317     s = pool_elt_at_index (f->ooo_segments, s->next);
318
319   /* If we have a previous and we overlap it, use it as starting point */
320   prev = ooo_segment_get_prev (f, s);
321   if (prev
322       && position_leq (f, offset_pos, ooo_segment_end_pos (f, prev), tail))
323     {
324       s = prev;
325       s_end_pos = ooo_segment_end_pos (f, s);
326
327       /* Since we have previous, offset start position cannot be smaller
328        * than prev->start. Check tail */
329       ASSERT (position_lt (f, s->start, offset_pos, tail));
330       goto check_tail;
331     }
332
333   s_index = s - f->ooo_segments;
334   s_end_pos = ooo_segment_end_pos (f, s);
335
336   /* No overlap, add before current segment */
337   if (position_lt (f, offset_end_pos, s->start, tail))
338     {
339       new_s = ooo_segment_new (f, offset_pos, length);
340       new_index = new_s - f->ooo_segments;
341
342       /* Pool might've moved, get segment again */
343       s = pool_elt_at_index (f->ooo_segments, s_index);
344       if (s->prev != OOO_SEGMENT_INVALID_INDEX)
345         {
346           new_s->prev = s->prev;
347           prev = pool_elt_at_index (f->ooo_segments, new_s->prev);
348           prev->next = new_index;
349         }
350       else
351         {
352           /* New head */
353           f->ooos_list_head = new_index;
354         }
355
356       new_s->next = s_index;
357       s->prev = new_index;
358       f->ooos_newest = new_index;
359       return;
360     }
361   /* No overlap, add after current segment */
362   else if (position_gt (f, offset_pos, s_end_pos, tail))
363     {
364       new_s = ooo_segment_new (f, offset_pos, length);
365       new_index = new_s - f->ooo_segments;
366
367       /* Pool might've moved, get segment again */
368       s = pool_elt_at_index (f->ooo_segments, s_index);
369
370       /* Needs to be last */
371       ASSERT (s->next == OOO_SEGMENT_INVALID_INDEX);
372
373       new_s->prev = s_index;
374       s->next = new_index;
375       f->ooos_newest = new_index;
376
377       return;
378     }
379
380   /*
381    * Merge needed
382    */
383
384   /* Merge at head */
385   if (position_lt (f, offset_pos, s->start, tail))
386     {
387       s->start = offset_pos;
388       s->length = position_diff (f, s_end_pos, s->start, tail);
389       f->ooos_newest = s - f->ooo_segments;
390     }
391
392 check_tail:
393
394   /* Overlapping tail */
395   if (position_gt (f, offset_end_pos, s_end_pos, tail))
396     {
397       s->length = position_diff (f, offset_end_pos, s->start, tail);
398
399       /* Remove the completely overlapped segments in the tail */
400       it = ooo_segment_next (f, s);
401       while (it && position_leq (f, ooo_segment_end_pos (f, it),
402                                  offset_end_pos, tail))
403         {
404           next = ooo_segment_next (f, it);
405           ooo_segment_del (f, it - f->ooo_segments);
406           it = next;
407         }
408
409       /* If partial overlap with last, merge */
410       if (it && position_leq (f, it->start, offset_end_pos, tail))
411         {
412           s->length = position_diff (f, ooo_segment_end_pos (f, it),
413                                      s->start, tail);
414           ooo_segment_del (f, it - f->ooo_segments);
415         }
416       f->ooos_newest = s - f->ooo_segments;
417     }
418 }
419
420 /**
421  * Removes segments that can now be enqueued because the fifo's tail has
422  * advanced. Returns the number of bytes added to tail.
423  */
424 static int
425 ooo_segment_try_collect (svm_fifo_t * f, u32 n_bytes_enqueued, u32 * tail)
426 {
427   ooo_segment_t *s;
428   u32 index, bytes = 0;
429   i32 diff;
430
431   s = pool_elt_at_index (f->ooo_segments, f->ooos_list_head);
432   diff = ooo_segment_distance_to_tail (f, s->start, *tail);
433
434   ASSERT (diff != n_bytes_enqueued);
435
436   if (diff > n_bytes_enqueued)
437     return 0;
438
439   /* If last tail update overlaps one/multiple ooo segments, remove them */
440   while (0 <= diff && diff < n_bytes_enqueued)
441     {
442       index = s - f->ooo_segments;
443
444       /* Segment end is beyond the tail. Advance tail and remove segment */
445       if (s->length > diff)
446         {
447           bytes = s->length - diff;
448           *tail = *tail + bytes;
449           ooo_segment_del (f, index);
450           break;
451         }
452
453       /* If we have next go on */
454       if (s->next != OOO_SEGMENT_INVALID_INDEX)
455         {
456           s = pool_elt_at_index (f->ooo_segments, s->next);
457           diff = ooo_segment_distance_to_tail (f, s->start, *tail);
458           ooo_segment_del (f, index);
459         }
460       /* End of search */
461       else
462         {
463           ooo_segment_del (f, index);
464           break;
465         }
466     }
467
468   ASSERT (bytes <= f->nitems);
469   return bytes;
470 }
471
472 CLIB_MARCH_FN (svm_fifo_enqueue_nowait, int, svm_fifo_t * f, u32 max_bytes,
473                const u8 * copy_from_here)
474 {
475   u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
476   u32 tail, head, free_count, tail_idx;
477
478   f_load_head_tail_prod (f, &head, &tail);
479
480   /* free space in fifo can only increase during enqueue: SPSC */
481   free_count = f_free_count (f, head, tail);
482
483   f->ooos_newest = OOO_SEGMENT_INVALID_INDEX;
484
485   if (PREDICT_FALSE (free_count == 0))
486     return SVM_FIFO_FULL;
487
488   /* number of bytes we're going to copy */
489   total_copy_bytes = free_count < max_bytes ? free_count : max_bytes;
490
491   tail_idx = tail % f->size;
492
493   if (PREDICT_TRUE (copy_from_here != 0))
494     {
495       first_copy_bytes = f->size - tail_idx;
496       if (first_copy_bytes < total_copy_bytes)
497         {
498           clib_memcpy_fast (&f->data[tail_idx], copy_from_here,
499                             first_copy_bytes);
500           /* number of bytes in second copy segment */
501           second_copy_bytes = total_copy_bytes - first_copy_bytes;
502           /* wrap around */
503           clib_memcpy_fast (&f->data[0],
504                             copy_from_here + first_copy_bytes,
505                             second_copy_bytes);
506         }
507       else
508         {
509           clib_memcpy_fast (&f->data[tail_idx], copy_from_here,
510                             total_copy_bytes);
511         }
512       tail += total_copy_bytes;
513     }
514   else
515     {
516       ASSERT (0);
517       /* Account for a zero-copy enqueue done elsewhere */
518       tail += max_bytes;
519     }
520
521   svm_fifo_trace_add (f, head, total_copy_bytes, 2);
522
523   /* collect out-of-order segments */
524   if (PREDICT_FALSE (f->ooos_list_head != OOO_SEGMENT_INVALID_INDEX))
525     total_copy_bytes += ooo_segment_try_collect (f, total_copy_bytes, &tail);
526
527   ASSERT (total_copy_bytes <= free_count);
528
529   /* store-rel: producer owned index (paired with load-acq in consumer) */
530   clib_atomic_store_rel_n (&f->tail, tail);
531
532   return total_copy_bytes;
533 }
534
535 #ifndef CLIB_MARCH_VARIANT
536 int
537 svm_fifo_enqueue_nowait (svm_fifo_t * f, u32 max_bytes,
538                          const u8 * copy_from_here)
539 {
540   return CLIB_MARCH_FN_SELECT (svm_fifo_enqueue_nowait) (f, max_bytes,
541                                                          copy_from_here);
542 }
543 #endif
544
545 /**
546  * Enqueue a future segment.
547  *
548  * Two choices: either copies the entire segment, or copies nothing
549  * Returns 0 of the entire segment was copied
550  * Returns -1 if none of the segment was copied due to lack of space
551  */
552 CLIB_MARCH_FN (svm_fifo_enqueue_with_offset, int, svm_fifo_t * f,
553                u32 offset, u32 required_bytes, u8 * copy_from_here)
554 {
555   u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
556   u32 tail_offset;
557   u32 tail, head, free_count, tail_offset_idx;
558
559   f_load_head_tail_prod (f, &head, &tail);
560
561   /* free space in fifo can only increase during enqueue: SPSC */
562   free_count = f_free_count (f, head, tail);
563
564   /* will this request fit? */
565   if ((required_bytes + offset) > free_count)
566     return -1;
567
568   f->ooos_newest = OOO_SEGMENT_INVALID_INDEX;
569
570   ASSERT (required_bytes < f->nitems);
571
572   tail_offset = tail + offset;
573   tail_offset_idx = tail_offset % f->size;
574
575   svm_fifo_trace_add (f, offset, required_bytes, 1);
576
577   ooo_segment_add (f, offset, head, tail, required_bytes);
578
579   /* number of bytes we're going to copy */
580   total_copy_bytes = required_bytes;
581
582   /* number of bytes in first copy segment */
583   first_copy_bytes = f->size - tail_offset_idx;
584
585   if (first_copy_bytes < total_copy_bytes)
586     {
587       clib_memcpy_fast (&f->data[tail_offset_idx], copy_from_here,
588                         first_copy_bytes);
589
590       /* number of bytes in second copy segment */
591       second_copy_bytes = total_copy_bytes - first_copy_bytes;
592       /* wrap around */
593       clib_memcpy_fast (&f->data[0],
594                         copy_from_here + first_copy_bytes, second_copy_bytes);
595     }
596   else
597     {
598       clib_memcpy_fast (&f->data[tail_offset_idx], copy_from_here,
599                         total_copy_bytes);
600     }
601
602   return 0;
603 }
604
605 #ifndef CLIB_MARCH_VARIANT
606
607 int
608 svm_fifo_enqueue_with_offset (svm_fifo_t * f, u32 offset, u32 required_bytes,
609                               u8 * copy_from_here)
610 {
611   return CLIB_MARCH_FN_SELECT (svm_fifo_enqueue_with_offset) (f, offset,
612                                                               required_bytes,
613                                                               copy_from_here);
614 }
615
616 void
617 svm_fifo_overwrite_head (svm_fifo_t * f, u8 * data, u32 len)
618 {
619   u32 first_chunk;
620   u32 head, tail, head_idx;
621
622   f_load_head_tail_cons (f, &head, &tail);
623   head_idx = head % f->size;
624   first_chunk = f->size - (head_idx);
625   ASSERT (len <= f->nitems);
626   if (len <= first_chunk)
627     clib_memcpy_fast (&f->data[head_idx], data, len);
628   else
629     {
630       clib_memcpy_fast (&f->data[head_idx], data, first_chunk);
631       clib_memcpy_fast (&f->data[0], data + first_chunk, len - first_chunk);
632     }
633 }
634 #endif
635
636 CLIB_MARCH_FN (svm_fifo_dequeue_nowait, int, svm_fifo_t * f, u32 max_bytes,
637                u8 * copy_here)
638 {
639   u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
640   u32 tail, head, cursize, head_idx;
641
642   f_load_head_tail_cons (f, &head, &tail);
643
644   /* current size of fifo can only increase during dequeue: SPSC */
645   cursize = f_cursize (f, head, tail);
646
647   if (PREDICT_FALSE (cursize == 0))
648     return -2;                  /* nothing in the fifo */
649
650   /* number of bytes we're going to copy */
651   total_copy_bytes = (cursize < max_bytes) ? cursize : max_bytes;
652
653   head_idx = head % f->size;
654
655   if (PREDICT_TRUE (copy_here != 0))
656     {
657       /* number of bytes in first copy segment */
658       first_copy_bytes = f->size - head_idx;
659       if (first_copy_bytes < total_copy_bytes)
660         {
661           clib_memcpy_fast (copy_here, &f->data[head_idx], first_copy_bytes);
662           /* number of bytes in second copy segment */
663           second_copy_bytes = total_copy_bytes - first_copy_bytes;
664           /* wrap around */
665           clib_memcpy_fast (copy_here + first_copy_bytes,
666                             &f->data[0], second_copy_bytes);
667         }
668       else
669         {
670           clib_memcpy_fast (copy_here, &f->data[head_idx], total_copy_bytes);
671         }
672       head += total_copy_bytes;
673     }
674   else
675     {
676       ASSERT (0);
677       /* Account for a zero-copy dequeue done elsewhere */
678       head += max_bytes;
679     }
680
681   ASSERT (cursize >= total_copy_bytes);
682   /* store-rel: consumer owned index (paired with load-acq in producer) */
683   clib_atomic_store_rel_n (&f->head, head);
684
685   return total_copy_bytes;
686 }
687
688 #ifndef CLIB_MARCH_VARIANT
689
690 int
691 svm_fifo_dequeue_nowait (svm_fifo_t * f, u32 max_bytes, u8 * copy_here)
692 {
693   return CLIB_MARCH_FN_SELECT (svm_fifo_dequeue_nowait) (f, max_bytes,
694                                                          copy_here);
695 }
696 #endif
697
698 CLIB_MARCH_FN (svm_fifo_peek, int, svm_fifo_t * f, u32 relative_offset,
699                u32 max_bytes, u8 * copy_here)
700 {
701   u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
702   u32 tail, head, cursize;
703   u32 relative_head_idx;
704
705   f_load_head_tail_cons (f, &head, &tail);
706
707   /* current size of fifo can only increase during peek: SPSC */
708   cursize = f_cursize (f, head, tail);
709
710   if (PREDICT_FALSE (cursize < relative_offset))
711     return -2;                  /* nothing in the fifo */
712
713   relative_head_idx = (head + relative_offset) % f->size;
714
715   /* number of bytes we're going to copy */
716   total_copy_bytes = ((cursize - relative_offset) < max_bytes) ?
717     cursize - relative_offset : max_bytes;
718
719   if (PREDICT_TRUE (copy_here != 0))
720     {
721       /* number of bytes in first copy segment */
722       first_copy_bytes = f->size - relative_head_idx;
723       if (first_copy_bytes < total_copy_bytes)
724         {
725           clib_memcpy_fast (copy_here, &f->data[relative_head_idx],
726                             first_copy_bytes);
727
728           /* number of bytes in second copy segment */
729           second_copy_bytes = total_copy_bytes - first_copy_bytes;
730           clib_memcpy_fast (copy_here + first_copy_bytes, &f->data[0],
731                             second_copy_bytes);
732         }
733       else
734         {
735           clib_memcpy_fast (copy_here, &f->data[relative_head_idx],
736                             total_copy_bytes);
737         }
738     }
739   return total_copy_bytes;
740 }
741
742 #ifndef CLIB_MARCH_VARIANT
743
744 int
745 svm_fifo_peek (svm_fifo_t * f, u32 relative_offset, u32 max_bytes,
746                u8 * copy_here)
747 {
748   return CLIB_MARCH_FN_SELECT (svm_fifo_peek) (f, relative_offset, max_bytes,
749                                                copy_here);
750 }
751
752 int
753 svm_fifo_dequeue_drop (svm_fifo_t * f, u32 max_bytes)
754 {
755   u32 total_drop_bytes;
756   u32 tail, head, cursize;
757
758   f_load_head_tail_cons (f, &head, &tail);
759
760   /* number of bytes we're going to drop */
761   cursize = f_cursize (f, head, tail);
762
763   if (PREDICT_FALSE (cursize == 0))
764     return -2;                  /* nothing in the fifo */
765
766   svm_fifo_trace_add (f, tail, total_drop_bytes, 3);
767
768   /* number of bytes we're going to drop */
769   total_drop_bytes = (cursize < max_bytes) ? cursize : max_bytes;
770
771   /* move head */
772   head += total_drop_bytes;
773
774   ASSERT (cursize >= total_drop_bytes);
775   /* store-rel: consumer owned index (paired with load-acq in producer) */
776   clib_atomic_store_rel_n (&f->head, head);
777
778   return total_drop_bytes;
779 }
780
781 void
782 svm_fifo_dequeue_drop_all (svm_fifo_t * f)
783 {
784   /* consumer foreign index */
785   u32 tail = clib_atomic_load_acq_n (&f->tail);
786   /* store-rel: consumer owned index (paired with load-acq in producer) */
787   clib_atomic_store_rel_n (&f->head, tail);
788 }
789
790 int
791 svm_fifo_segments (svm_fifo_t * f, svm_fifo_segment_t * fs)
792 {
793   u32 cursize, head, tail, head_idx;
794
795   f_load_head_tail_cons (f, &head, &tail);
796
797   /* consumer function, cursize can only increase while we're working */
798   cursize = f_cursize (f, head, tail);
799
800   if (PREDICT_FALSE (cursize == 0))
801     return -2;                  /* nothing in the fifo */
802
803   head_idx = head % f->size;
804
805   if (tail < head)
806     {
807       fs[0].len = f->size - head_idx;
808       fs[0].data = f->data + head_idx;
809       fs[1].len = cursize - fs[0].len;
810       fs[1].data = f->data;
811     }
812   else
813     {
814       fs[0].len = cursize;
815       fs[0].data = f->data + head_idx;
816       fs[1].len = 0;
817       fs[1].data = 0;
818     }
819   return cursize;
820 }
821
822 void
823 svm_fifo_segments_free (svm_fifo_t * f, svm_fifo_segment_t * fs)
824 {
825   u32 head, head_idx;
826
827   /* consumer owned index */
828   head = f->head;
829   head_idx = head % f->size;
830
831   ASSERT (fs[0].data == f->data + head_idx);
832   head += fs[0].len + fs[1].len;
833   /* store-rel: consumer owned index (paired with load-acq in producer) */
834   clib_atomic_store_rel_n (&f->head, head);
835 }
836
837 /* Assumption: no prod and cons are accessing either dest or src fifo */
838 void
839 svm_fifo_clone (svm_fifo_t * df, svm_fifo_t * sf)
840 {
841   u32 head, tail;
842   clib_memcpy_fast (df->data, sf->data, sf->size);
843
844   f_load_head_tail_all_acq (sf, &head, &tail);
845   clib_atomic_store_rel_n (&df->head, head);
846   clib_atomic_store_rel_n (&df->tail, tail);
847 }
848
849 u32
850 svm_fifo_number_ooo_segments (svm_fifo_t * f)
851 {
852   return pool_elts (f->ooo_segments);
853 }
854
855 ooo_segment_t *
856 svm_fifo_first_ooo_segment (svm_fifo_t * f)
857 {
858   return pool_elt_at_index (f->ooo_segments, f->ooos_list_head);
859 }
860
861 /**
862  * Set fifo pointers to requested offset
863  */
864 void
865 svm_fifo_init_pointers (svm_fifo_t * f, u32 pointer)
866 {
867   clib_atomic_store_rel_n (&f->head, pointer);
868   clib_atomic_store_rel_n (&f->tail, pointer);
869 }
870
871 void
872 svm_fifo_add_subscriber (svm_fifo_t * f, u8 subscriber)
873 {
874   if (f->n_subscribers >= SVM_FIFO_MAX_EVT_SUBSCRIBERS)
875     return;
876   f->subscribers[f->n_subscribers++] = subscriber;
877 }
878
879 void
880 svm_fifo_del_subscriber (svm_fifo_t * f, u8 subscriber)
881 {
882   int i;
883
884   for (i = 0; i < f->n_subscribers; i++)
885     {
886       if (f->subscribers[i] != subscriber)
887         continue;
888       f->subscribers[i] = f->subscribers[f->n_subscribers - 1];
889       f->n_subscribers--;
890       break;
891     }
892 }
893
894 #endif
895 /*
896  * fd.io coding-style-patch-verification: ON
897  *
898  * Local Variables:
899  * eval: (c-set-style "gnu")
900  * End:
901  */