svm: support addition of chunks to fifos
[vpp.git] / src / svm / svm_fifo.c
1 /*
2  * Copyright (c) 2016-2019 Cisco and/or its affiliates.
3  * Copyright (c) 2019 Arm Limited
4  * Copyright (c) 2010-2017 Intel Corporation and/or its affiliates.
5  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
6  * Inspired from DPDK rte_ring.h (SPSC only) (derived from freebsd bufring.h).
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at:
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 #include <svm/svm_fifo.h>
21 #include <vppinfra/cpu.h>
22
23 static inline u8
24 position_lt (svm_fifo_t * f, u32 a, u32 b, u32 tail)
25 {
26   return (ooo_segment_distance_from_tail (f, a, tail)
27           < ooo_segment_distance_from_tail (f, b, tail));
28 }
29
30 static inline u8
31 position_leq (svm_fifo_t * f, u32 a, u32 b, u32 tail)
32 {
33   return (ooo_segment_distance_from_tail (f, a, tail)
34           <= ooo_segment_distance_from_tail (f, b, tail));
35 }
36
37 static inline u8
38 position_gt (svm_fifo_t * f, u32 a, u32 b, u32 tail)
39 {
40   return (ooo_segment_distance_from_tail (f, a, tail)
41           > ooo_segment_distance_from_tail (f, b, tail));
42 }
43
44 static inline u32
45 position_diff (svm_fifo_t * f, u32 posa, u32 posb, u32 tail)
46 {
47   return ooo_segment_distance_from_tail (f, posa, tail)
48     - ooo_segment_distance_from_tail (f, posb, tail);
49 }
50
51 static inline u32
52 ooo_segment_end_pos (svm_fifo_t * f, ooo_segment_t * s)
53 {
54   return s->start + s->length;
55 }
56
57 #ifndef CLIB_MARCH_VARIANT
58
59 u8 *
60 format_ooo_segment (u8 * s, va_list * args)
61 {
62   svm_fifo_t *f = va_arg (*args, svm_fifo_t *);
63   ooo_segment_t *seg = va_arg (*args, ooo_segment_t *);
64   u32 normalized_start = (seg->start + f->nitems - f->tail) % f->size;
65   s = format (s, "[%u, %u], len %u, next %d, prev %d", normalized_start,
66               (normalized_start + seg->length) % f->size, seg->length,
67               seg->next, seg->prev);
68   return s;
69 }
70
71 u8 *
72 svm_fifo_dump_trace (u8 * s, svm_fifo_t * f)
73 {
74 #if SVM_FIFO_TRACE
75   svm_fifo_trace_elem_t *seg = 0;
76   int i = 0;
77
78   if (f->trace)
79     {
80       vec_foreach (seg, f->trace)
81       {
82         s = format (s, "{%u, %u, %u}, ", seg->offset, seg->len, seg->action);
83         i++;
84         if (i % 5 == 0)
85           s = format (s, "\n");
86       }
87       s = format (s, "\n");
88     }
89   return s;
90 #else
91   return 0;
92 #endif
93 }
94
95 u8 *
96 svm_fifo_replay (u8 * s, svm_fifo_t * f, u8 no_read, u8 verbose)
97 {
98   int i, trace_len;
99   u8 *data = 0;
100   svm_fifo_trace_elem_t *trace;
101   u32 offset;
102   svm_fifo_t *dummy_fifo;
103
104   if (!f)
105     return s;
106
107 #if SVM_FIFO_TRACE
108   trace = f->trace;
109   trace_len = vec_len (trace);
110 #else
111   trace = 0;
112   trace_len = 0;
113 #endif
114
115   dummy_fifo = svm_fifo_create (f->size);
116   clib_memset (f->head_chunk->data, 0xFF, f->nitems);
117   vec_validate (data, f->nitems);
118   for (i = 0; i < vec_len (data); i++)
119     data[i] = i;
120
121   for (i = 0; i < trace_len; i++)
122     {
123       offset = trace[i].offset;
124       if (trace[i].action == 1)
125         {
126           if (verbose)
127             s = format (s, "adding [%u, %u]:", trace[i].offset,
128                         (trace[i].offset + trace[i].len) % dummy_fifo->size);
129           svm_fifo_enqueue_with_offset (dummy_fifo, trace[i].offset,
130                                         trace[i].len, &data[offset]);
131         }
132       else if (trace[i].action == 2)
133         {
134           if (verbose)
135             s = format (s, "adding [%u, %u]:", 0, trace[i].len);
136           svm_fifo_enqueue_nowait (dummy_fifo, trace[i].len, &data[offset]);
137         }
138       else if (!no_read)
139         {
140           if (verbose)
141             s = format (s, "read: %u", trace[i].len);
142           svm_fifo_dequeue_drop (dummy_fifo, trace[i].len);
143         }
144       if (verbose)
145         s = format (s, "%U", format_svm_fifo, dummy_fifo, 1);
146     }
147
148   s = format (s, "result: %U", format_svm_fifo, dummy_fifo, 1);
149
150   return s;
151 }
152
153 u8 *
154 format_ooo_list (u8 * s, va_list * args)
155 {
156   svm_fifo_t *f = va_arg (*args, svm_fifo_t *);
157   u32 indent = va_arg (*args, u32);
158   u32 ooo_segment_index = f->ooos_list_head;
159   ooo_segment_t *seg;
160
161   while (ooo_segment_index != OOO_SEGMENT_INVALID_INDEX)
162     {
163       seg = pool_elt_at_index (f->ooo_segments, ooo_segment_index);
164       s = format (s, "%U%U\n", format_white_space, indent, format_ooo_segment,
165                   f, seg);
166       ooo_segment_index = seg->next;
167     }
168
169   return s;
170 }
171
172 u8 *
173 format_svm_fifo (u8 * s, va_list * args)
174 {
175   svm_fifo_t *f = va_arg (*args, svm_fifo_t *);
176   int verbose = va_arg (*args, int);
177   u32 indent;
178
179   if (!s)
180     return s;
181
182   indent = format_get_indent (s);
183   s = format (s, "cursize %u nitems %u has_event %d\n",
184               svm_fifo_max_dequeue (f), f->nitems, f->has_event);
185   s = format (s, "%Uhead %u tail %u segment manager %u\n", format_white_space,
186               indent, (f->head % f->size), (f->tail % f->size),
187               f->segment_manager);
188
189   if (verbose > 1)
190     s = format (s, "%Uvpp session %d thread %d app session %d thread %d\n",
191                 format_white_space, indent, f->master_session_index,
192                 f->master_thread_index, f->client_session_index,
193                 f->client_thread_index);
194
195   if (verbose)
196     {
197       s = format (s, "%Uooo pool %d active elts newest %u\n",
198                   format_white_space, indent, pool_elts (f->ooo_segments),
199                   f->ooos_newest);
200       if (svm_fifo_has_ooo_data (f))
201         s = format (s, " %U", format_ooo_list, f, indent, verbose);
202     }
203   return s;
204 }
205
206 void
207 svm_fifo_init (svm_fifo_t * f, u32 size)
208 {
209   f->size = size;
210   /*
211    * usable size of the fifo set to rounded_data_size - 1
212    * to differentiate between free fifo and empty fifo.
213    */
214   f->nitems = f->size - 1;
215   f->ooos_list_head = OOO_SEGMENT_INVALID_INDEX;
216   f->ct_session_index = SVM_FIFO_INVALID_SESSION_INDEX;
217   f->segment_index = SVM_FIFO_INVALID_INDEX;
218   f->refcnt = 1;
219   f->default_chunk.start_byte = 0;
220   f->default_chunk.length = f->size;
221   f->default_chunk.next = f->start_chunk = &f->default_chunk;
222   f->end_chunk = f->head_chunk = f->tail_chunk = f->start_chunk;
223 }
224
225 /** create an svm fifo, in the current heap. Fails vs blow up the process */
226 svm_fifo_t *
227 svm_fifo_create (u32 data_size_in_bytes)
228 {
229   svm_fifo_t *f;
230   u32 rounded_data_size;
231
232   /* always round fifo data size to the next highest power-of-two */
233   rounded_data_size = (1 << (max_log2 (data_size_in_bytes)));
234   f = clib_mem_alloc_aligned_or_null (sizeof (*f) + rounded_data_size,
235                                       CLIB_CACHE_LINE_BYTES);
236   if (f == 0)
237     return 0;
238
239   clib_memset (f, 0, sizeof (*f));
240   svm_fifo_init (f, data_size_in_bytes);
241   return f;
242 }
243
244 void
245 svm_fifo_free (svm_fifo_t * f)
246 {
247   ASSERT (f->refcnt > 0);
248
249   if (--f->refcnt == 0)
250     {
251       pool_free (f->ooo_segments);
252       clib_mem_free (f);
253     }
254 }
255 #endif
256
257 always_inline ooo_segment_t *
258 ooo_segment_new (svm_fifo_t * f, u32 start, u32 length)
259 {
260   ooo_segment_t *s;
261
262   pool_get (f->ooo_segments, s);
263
264   s->start = start;
265   s->length = length;
266
267   s->prev = s->next = OOO_SEGMENT_INVALID_INDEX;
268
269   return s;
270 }
271
272 always_inline void
273 ooo_segment_del (svm_fifo_t * f, u32 index)
274 {
275   ooo_segment_t *cur, *prev = 0, *next = 0;
276   cur = pool_elt_at_index (f->ooo_segments, index);
277
278   if (cur->next != OOO_SEGMENT_INVALID_INDEX)
279     {
280       next = pool_elt_at_index (f->ooo_segments, cur->next);
281       next->prev = cur->prev;
282     }
283
284   if (cur->prev != OOO_SEGMENT_INVALID_INDEX)
285     {
286       prev = pool_elt_at_index (f->ooo_segments, cur->prev);
287       prev->next = cur->next;
288     }
289   else
290     {
291       f->ooos_list_head = cur->next;
292     }
293
294   pool_put (f->ooo_segments, cur);
295 }
296
297 /**
298  * Add segment to fifo's out-of-order segment list. Takes care of merging
299  * adjacent segments and removing overlapping ones.
300  */
301 static void
302 ooo_segment_add (svm_fifo_t * f, u32 offset, u32 head, u32 tail, u32 length)
303 {
304   ooo_segment_t *s, *new_s, *prev, *next, *it;
305   u32 new_index, s_end_pos, s_index;
306   u32 offset_pos, offset_end_pos;
307
308   ASSERT (offset + length <= ooo_segment_distance_from_tail (f, head, tail)
309           || head == tail);
310
311   offset_pos = tail + offset;
312   offset_end_pos = tail + offset + length;
313
314   f->ooos_newest = OOO_SEGMENT_INVALID_INDEX;
315
316   if (f->ooos_list_head == OOO_SEGMENT_INVALID_INDEX)
317     {
318       s = ooo_segment_new (f, offset_pos, length);
319       f->ooos_list_head = s - f->ooo_segments;
320       f->ooos_newest = f->ooos_list_head;
321       return;
322     }
323
324   /* Find first segment that starts after new segment */
325   s = pool_elt_at_index (f->ooo_segments, f->ooos_list_head);
326   while (s->next != OOO_SEGMENT_INVALID_INDEX
327          && position_lt (f, s->start, offset_pos, tail))
328     s = pool_elt_at_index (f->ooo_segments, s->next);
329
330   /* If we have a previous and we overlap it, use it as starting point */
331   prev = ooo_segment_get_prev (f, s);
332   if (prev
333       && position_leq (f, offset_pos, ooo_segment_end_pos (f, prev), tail))
334     {
335       s = prev;
336       s_end_pos = ooo_segment_end_pos (f, s);
337
338       /* Since we have previous, offset start position cannot be smaller
339        * than prev->start. Check tail */
340       ASSERT (position_lt (f, s->start, offset_pos, tail));
341       goto check_tail;
342     }
343
344   s_index = s - f->ooo_segments;
345   s_end_pos = ooo_segment_end_pos (f, s);
346
347   /* No overlap, add before current segment */
348   if (position_lt (f, offset_end_pos, s->start, tail))
349     {
350       new_s = ooo_segment_new (f, offset_pos, length);
351       new_index = new_s - f->ooo_segments;
352
353       /* Pool might've moved, get segment again */
354       s = pool_elt_at_index (f->ooo_segments, s_index);
355       if (s->prev != OOO_SEGMENT_INVALID_INDEX)
356         {
357           new_s->prev = s->prev;
358           prev = pool_elt_at_index (f->ooo_segments, new_s->prev);
359           prev->next = new_index;
360         }
361       else
362         {
363           /* New head */
364           f->ooos_list_head = new_index;
365         }
366
367       new_s->next = s_index;
368       s->prev = new_index;
369       f->ooos_newest = new_index;
370       return;
371     }
372   /* No overlap, add after current segment */
373   else if (position_gt (f, offset_pos, s_end_pos, tail))
374     {
375       new_s = ooo_segment_new (f, offset_pos, length);
376       new_index = new_s - f->ooo_segments;
377
378       /* Pool might've moved, get segment again */
379       s = pool_elt_at_index (f->ooo_segments, s_index);
380
381       /* Needs to be last */
382       ASSERT (s->next == OOO_SEGMENT_INVALID_INDEX);
383
384       new_s->prev = s_index;
385       s->next = new_index;
386       f->ooos_newest = new_index;
387
388       return;
389     }
390
391   /*
392    * Merge needed
393    */
394
395   /* Merge at head */
396   if (position_lt (f, offset_pos, s->start, tail))
397     {
398       s->start = offset_pos;
399       s->length = position_diff (f, s_end_pos, s->start, tail);
400       f->ooos_newest = s - f->ooo_segments;
401     }
402
403 check_tail:
404
405   /* Overlapping tail */
406   if (position_gt (f, offset_end_pos, s_end_pos, tail))
407     {
408       s->length = position_diff (f, offset_end_pos, s->start, tail);
409
410       /* Remove the completely overlapped segments in the tail */
411       it = ooo_segment_next (f, s);
412       while (it && position_leq (f, ooo_segment_end_pos (f, it),
413                                  offset_end_pos, tail))
414         {
415           next = ooo_segment_next (f, it);
416           ooo_segment_del (f, it - f->ooo_segments);
417           it = next;
418         }
419
420       /* If partial overlap with last, merge */
421       if (it && position_leq (f, it->start, offset_end_pos, tail))
422         {
423           s->length = position_diff (f, ooo_segment_end_pos (f, it),
424                                      s->start, tail);
425           ooo_segment_del (f, it - f->ooo_segments);
426         }
427       f->ooos_newest = s - f->ooo_segments;
428     }
429 }
430
431 /**
432  * Removes segments that can now be enqueued because the fifo's tail has
433  * advanced. Returns the number of bytes added to tail.
434  */
435 static int
436 ooo_segment_try_collect (svm_fifo_t * f, u32 n_bytes_enqueued, u32 * tail)
437 {
438   ooo_segment_t *s;
439   u32 index, bytes = 0;
440   i32 diff;
441
442   s = pool_elt_at_index (f->ooo_segments, f->ooos_list_head);
443   diff = ooo_segment_distance_to_tail (f, s->start, *tail);
444
445   ASSERT (diff != n_bytes_enqueued);
446
447   if (diff > n_bytes_enqueued)
448     return 0;
449
450   /* If last tail update overlaps one/multiple ooo segments, remove them */
451   while (0 <= diff && diff < n_bytes_enqueued)
452     {
453       index = s - f->ooo_segments;
454
455       /* Segment end is beyond the tail. Advance tail and remove segment */
456       if (s->length > diff)
457         {
458           bytes = s->length - diff;
459           *tail = *tail + bytes;
460           ooo_segment_del (f, index);
461           break;
462         }
463
464       /* If we have next go on */
465       if (s->next != OOO_SEGMENT_INVALID_INDEX)
466         {
467           s = pool_elt_at_index (f->ooo_segments, s->next);
468           diff = ooo_segment_distance_to_tail (f, s->start, *tail);
469           ooo_segment_del (f, index);
470         }
471       /* End of search */
472       else
473         {
474           ooo_segment_del (f, index);
475           break;
476         }
477     }
478
479   ASSERT (bytes <= f->nitems);
480   return bytes;
481 }
482
483 CLIB_MARCH_FN (svm_fifo_enqueue_nowait, int, svm_fifo_t * f, u32 len,
484                const u8 * src)
485 {
486   u32 n_chunk, to_copy, tail, head, free_count, tail_idx;
487   svm_fifo_chunk_t *c;
488
489   f_load_head_tail_prod (f, &head, &tail);
490
491   /* free space in fifo can only increase during enqueue: SPSC */
492   free_count = f_free_count (f, head, tail);
493
494   f->ooos_newest = OOO_SEGMENT_INVALID_INDEX;
495
496   if (PREDICT_FALSE (free_count == 0))
497     return SVM_FIFO_FULL;
498
499   /* number of bytes we're going to copy */
500   to_copy = len = clib_min (free_count, len);
501
502   c = f->tail_chunk;
503   tail_idx = tail % f->size;
504   ASSERT (tail_idx >= c->start_byte);
505   tail_idx -= c->start_byte;
506   n_chunk = c->length - tail_idx;
507
508   if (n_chunk < to_copy)
509     {
510       clib_memcpy_fast (&c->data[tail_idx], src, n_chunk);
511       while ((to_copy -= n_chunk))
512         {
513           c = c->next;
514           n_chunk = clib_min (c->length, to_copy);
515           clib_memcpy_fast (&c->data[0], src + (len - to_copy), n_chunk);
516         }
517       f->tail_chunk = c;
518     }
519   else
520     {
521       clib_memcpy_fast (&c->data[tail_idx], src, to_copy);
522     }
523   tail += len;
524
525   svm_fifo_trace_add (f, head, n_total, 2);
526
527   /* collect out-of-order segments */
528   if (PREDICT_FALSE (f->ooos_list_head != OOO_SEGMENT_INVALID_INDEX))
529     len += ooo_segment_try_collect (f, len, &tail);
530
531   ASSERT (len <= free_count);
532
533   /* store-rel: producer owned index (paired with load-acq in consumer) */
534   clib_atomic_store_rel_n (&f->tail, tail);
535
536   return len;
537 }
538
539 #ifndef CLIB_MARCH_VARIANT
540 int
541 svm_fifo_enqueue_nowait (svm_fifo_t * f, u32 max_bytes,
542                          const u8 * copy_from_here)
543 {
544   return CLIB_MARCH_FN_SELECT (svm_fifo_enqueue_nowait) (f, max_bytes,
545                                                          copy_from_here);
546 }
547 #endif
548
549 /**
550  * Enqueue a future segment.
551  *
552  * Two choices: either copies the entire segment, or copies nothing
553  * Returns 0 of the entire segment was copied
554  * Returns -1 if none of the segment was copied due to lack of space
555  */
556 CLIB_MARCH_FN (svm_fifo_enqueue_with_offset, int, svm_fifo_t * f,
557                u32 offset, u32 len, u8 * src)
558 {
559   u32 to_copy, n_chunk, tail, head, free_count, tail_offset_idx;
560   svm_fifo_chunk_t *c;
561
562   f_load_head_tail_prod (f, &head, &tail);
563
564   /* free space in fifo can only increase during enqueue: SPSC */
565   free_count = f_free_count (f, head, tail);
566
567   /* will this request fit? */
568   if ((len + offset) > free_count)
569     return -1;
570
571   f->ooos_newest = OOO_SEGMENT_INVALID_INDEX;
572
573   ASSERT (len < f->nitems);
574   svm_fifo_trace_add (f, offset, len, 1);
575
576   ooo_segment_add (f, offset, head, tail, len);
577
578   c = f->tail_chunk;
579   tail_offset_idx = (tail + offset) % f->size;
580   tail_offset_idx -= c->start_byte;
581   n_chunk = c->length - tail_offset_idx;
582   to_copy = len;
583
584   if (n_chunk < to_copy)
585     {
586       clib_memcpy_fast (&c->data[tail_offset_idx], src, n_chunk);
587       while ((to_copy -= n_chunk))
588         {
589           c = c->next;
590           n_chunk = clib_min (c->length, to_copy);
591           clib_memcpy_fast (&c->data[0], src + (len - to_copy), n_chunk);
592         }
593     }
594   else
595     {
596       clib_memcpy_fast (&c->data[tail_offset_idx], src, len);
597     }
598
599   return 0;
600 }
601
602 #ifndef CLIB_MARCH_VARIANT
603
604 int
605 svm_fifo_enqueue_with_offset (svm_fifo_t * f, u32 offset, u32 required_bytes,
606                               u8 * copy_from_here)
607 {
608   return CLIB_MARCH_FN_SELECT (svm_fifo_enqueue_with_offset) (f, offset,
609                                                               required_bytes,
610                                                               copy_from_here);
611 }
612
613 void
614 svm_fifo_overwrite_head (svm_fifo_t * f, u8 * data, u32 len)
615 {
616   u32 n_chunk;
617   u32 head, tail, head_idx;
618   svm_fifo_chunk_t *c;
619
620   ASSERT (len <= f->nitems);
621
622   f_load_head_tail_cons (f, &head, &tail);
623   c = f->head_chunk;
624   head_idx = head % f->size;
625   head_idx -= c->start_byte;
626   n_chunk = c->length - head_idx;
627   if (len <= n_chunk)
628     clib_memcpy_fast (&c->data[head_idx], data, len);
629   else
630     {
631       clib_memcpy_fast (&c->data[head_idx], data, n_chunk);
632       clib_memcpy_fast (&c->next->data[0], data + n_chunk, len - n_chunk);
633     }
634 }
635 #endif
636
637 CLIB_MARCH_FN (svm_fifo_dequeue_nowait, int, svm_fifo_t * f, u32 len,
638                u8 * dst)
639 {
640   u32 to_copy, n_chunk, tail, head, cursize, head_idx;
641   svm_fifo_chunk_t *c;
642
643   f_load_head_tail_cons (f, &head, &tail);
644
645   /* current size of fifo can only increase during dequeue: SPSC */
646   cursize = f_cursize (f, head, tail);
647
648   if (PREDICT_FALSE (cursize == 0))
649     return -2;                  /* nothing in the fifo */
650
651   to_copy = len = clib_min (cursize, len);
652   ASSERT (cursize >= to_copy);
653
654   c = f->head_chunk;
655   head_idx = head % f->size;
656   head_idx -= c->start_byte;
657   n_chunk = c->length - head_idx;
658
659   if (n_chunk < to_copy)
660     {
661       clib_memcpy_fast (dst, &c->data[head_idx], n_chunk);
662       while ((to_copy -= n_chunk))
663         {
664           c = c->next;
665           n_chunk = clib_min (c->length, to_copy);
666           clib_memcpy_fast (dst + (len - to_copy), &c->data[0], n_chunk);
667         }
668       f->head_chunk = c;
669     }
670   else
671     {
672       clib_memcpy_fast (dst, &c->data[head_idx], to_copy);
673     }
674   head += len;
675
676   if (PREDICT_FALSE (f->flags & SVM_FIFO_F_SIZE_UPDATE))
677     svm_fifo_try_size_update (f, head);
678
679   /* store-rel: consumer owned index (paired with load-acq in producer) */
680   clib_atomic_store_rel_n (&f->head, head);
681
682   return len;
683 }
684
685 #ifndef CLIB_MARCH_VARIANT
686
687 int
688 svm_fifo_dequeue_nowait (svm_fifo_t * f, u32 max_bytes, u8 * copy_here)
689 {
690   return CLIB_MARCH_FN_SELECT (svm_fifo_dequeue_nowait) (f, max_bytes,
691                                                          copy_here);
692 }
693 #endif
694
695 CLIB_MARCH_FN (svm_fifo_peek, int, svm_fifo_t * f, u32 relative_offset,
696                u32 len, u8 * dst)
697 {
698   u32 to_copy, n_chunk, tail, head, cursize, head_idx;
699   svm_fifo_chunk_t *c;
700
701   f_load_head_tail_cons (f, &head, &tail);
702
703   /* current size of fifo can only increase during peek: SPSC */
704   cursize = f_cursize (f, head, tail);
705
706   if (PREDICT_FALSE (cursize < relative_offset))
707     return -2;                  /* nothing in the fifo */
708
709   to_copy = len = clib_min (cursize - relative_offset, len);
710
711   c = f->head_chunk;
712   head_idx = (head + relative_offset) % f->size;
713   head_idx -= c->start_byte;
714   n_chunk = c->length - head_idx;
715
716   if (n_chunk < to_copy)
717     {
718       clib_memcpy_fast (dst, &c->data[head_idx], n_chunk);
719       while ((to_copy -= n_chunk))
720         {
721           c = c->next;
722           n_chunk = clib_min (c->length, to_copy);
723           clib_memcpy_fast (dst + (len - to_copy), &c->data[0], n_chunk);
724         }
725       f->head_chunk = c;
726     }
727   else
728     {
729       clib_memcpy_fast (dst, &c->data[head_idx], to_copy);
730     }
731   return len;
732 }
733
734 #ifndef CLIB_MARCH_VARIANT
735
736 int
737 svm_fifo_peek (svm_fifo_t * f, u32 relative_offset, u32 max_bytes,
738                u8 * copy_here)
739 {
740   return CLIB_MARCH_FN_SELECT (svm_fifo_peek) (f, relative_offset, max_bytes,
741                                                copy_here);
742 }
743
744 int
745 svm_fifo_dequeue_drop (svm_fifo_t * f, u32 max_bytes)
746 {
747   u32 total_drop_bytes;
748   u32 tail, head, cursize;
749
750   f_load_head_tail_cons (f, &head, &tail);
751
752   /* number of bytes we're going to drop */
753   cursize = f_cursize (f, head, tail);
754
755   if (PREDICT_FALSE (cursize == 0))
756     return -2;                  /* nothing in the fifo */
757
758   svm_fifo_trace_add (f, tail, total_drop_bytes, 3);
759
760   /* number of bytes we're going to drop */
761   total_drop_bytes = (cursize < max_bytes) ? cursize : max_bytes;
762
763   /* move head */
764   head += total_drop_bytes;
765
766   ASSERT (cursize >= total_drop_bytes);
767   /* store-rel: consumer owned index (paired with load-acq in producer) */
768   clib_atomic_store_rel_n (&f->head, head);
769
770   return total_drop_bytes;
771 }
772
773 void
774 svm_fifo_dequeue_drop_all (svm_fifo_t * f)
775 {
776   /* consumer foreign index */
777   u32 tail = clib_atomic_load_acq_n (&f->tail);
778   /* store-rel: consumer owned index (paired with load-acq in producer) */
779   clib_atomic_store_rel_n (&f->head, tail);
780 }
781
782 int
783 svm_fifo_segments (svm_fifo_t * f, svm_fifo_segment_t * fs)
784 {
785   u32 cursize, head, tail, head_idx;
786
787   f_load_head_tail_cons (f, &head, &tail);
788
789   /* consumer function, cursize can only increase while we're working */
790   cursize = f_cursize (f, head, tail);
791
792   if (PREDICT_FALSE (cursize == 0))
793     return -2;                  /* nothing in the fifo */
794
795   head_idx = head % f->size;
796
797   if (tail < head)
798     {
799       fs[0].len = f->size - head_idx;
800       fs[0].data = f->head_chunk->data + head_idx;
801       fs[1].len = cursize - fs[0].len;
802       fs[1].data = f->head_chunk->data;
803     }
804   else
805     {
806       fs[0].len = cursize;
807       fs[0].data = f->head_chunk->data + head_idx;
808       fs[1].len = 0;
809       fs[1].data = 0;
810     }
811   return cursize;
812 }
813
814 void
815 svm_fifo_segments_free (svm_fifo_t * f, svm_fifo_segment_t * fs)
816 {
817   u32 head, head_idx;
818
819   /* consumer owned index */
820   head = f->head;
821   head_idx = head % f->size;
822
823   ASSERT (fs[0].data == f->head_chunk->data + head_idx);
824   head += fs[0].len + fs[1].len;
825   /* store-rel: consumer owned index (paired with load-acq in producer) */
826   clib_atomic_store_rel_n (&f->head, head);
827 }
828
829 /* Assumption: no prod and cons are accessing either dest or src fifo */
830 void
831 svm_fifo_clone (svm_fifo_t * df, svm_fifo_t * sf)
832 {
833   u32 head, tail;
834   clib_memcpy_fast (df->head_chunk->data, sf->head_chunk->data, sf->size);
835
836   f_load_head_tail_all_acq (sf, &head, &tail);
837   clib_atomic_store_rel_n (&df->head, head);
838   clib_atomic_store_rel_n (&df->tail, tail);
839 }
840
841 u32
842 svm_fifo_number_ooo_segments (svm_fifo_t * f)
843 {
844   return pool_elts (f->ooo_segments);
845 }
846
847 ooo_segment_t *
848 svm_fifo_first_ooo_segment (svm_fifo_t * f)
849 {
850   return pool_elt_at_index (f->ooo_segments, f->ooos_list_head);
851 }
852
853 /**
854  * Set fifo pointers to requested offset
855  */
856 void
857 svm_fifo_init_pointers (svm_fifo_t * f, u32 pointer)
858 {
859   clib_atomic_store_rel_n (&f->head, pointer);
860   clib_atomic_store_rel_n (&f->tail, pointer);
861 }
862
863 void
864 svm_fifo_add_subscriber (svm_fifo_t * f, u8 subscriber)
865 {
866   if (f->n_subscribers >= SVM_FIFO_MAX_EVT_SUBSCRIBERS)
867     return;
868   f->subscribers[f->n_subscribers++] = subscriber;
869 }
870
871 void
872 svm_fifo_del_subscriber (svm_fifo_t * f, u8 subscriber)
873 {
874   int i;
875
876   for (i = 0; i < f->n_subscribers; i++)
877     {
878       if (f->subscribers[i] != subscriber)
879         continue;
880       f->subscribers[i] = f->subscribers[f->n_subscribers - 1];
881       f->n_subscribers--;
882       break;
883     }
884 }
885
886 #endif
887 /*
888  * fd.io coding-style-patch-verification: ON
889  *
890  * Local Variables:
891  * eval: (c-set-style "gnu")
892  * End:
893  */