Fix fifo ooo bugs and improve testing
[vpp.git] / src / svm / svm_fifo.c
1 /*
2  * Copyright (c) 2016 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <svm/svm_fifo.h>
17
18 #define offset_lt(_a, _b) ((i32)((_a)-(_b)) < 0)
19 #define offset_leq(_a, _b) ((i32)((_a)-(_b)) <= 0)
20
21 u8 *
22 format_ooo_segment (u8 * s, va_list * args)
23 {
24   ooo_segment_t *seg = va_arg (*args, ooo_segment_t *);
25
26   s = format (s, "pos %u, len %u, next %d, prev %d",
27               seg->start, seg->length, seg->next, seg->prev);
28   return s;
29 }
30
31 u8 *
32 format_ooo_list (u8 * s, va_list * args)
33 {
34   svm_fifo_t *f = va_arg (*args, svm_fifo_t *);
35   u32 ooo_segment_index = f->ooos_list_head;
36   ooo_segment_t *seg;
37
38   while (ooo_segment_index != OOO_SEGMENT_INVALID_INDEX)
39     {
40       seg = pool_elt_at_index (f->ooo_segments, ooo_segment_index);
41       s = format (s, "\n  %U", format_ooo_segment, seg);
42
43       ooo_segment_index = seg->next;
44     }
45   return s;
46 }
47
48 /** create an svm fifo, in the current heap. Fails vs blow up the process */
49 svm_fifo_t *
50 svm_fifo_create (u32 data_size_in_bytes)
51 {
52   svm_fifo_t *f;
53
54   f = clib_mem_alloc_aligned_or_null (sizeof (*f) + data_size_in_bytes,
55                                       CLIB_CACHE_LINE_BYTES);
56   if (f == 0)
57     return 0;
58
59   memset (f, 0, sizeof (*f) + data_size_in_bytes);
60   f->nitems = data_size_in_bytes;
61   f->ooos_list_head = OOO_SEGMENT_INVALID_INDEX;
62
63   return (f);
64 }
65
66 void
67 svm_fifo_free (svm_fifo_t * f)
68 {
69   pool_free (f->ooo_segments);
70   clib_mem_free (f);
71 }
72
73 always_inline ooo_segment_t *
74 ooo_segment_new (svm_fifo_t * f, u32 start, u32 length)
75 {
76   ooo_segment_t *s;
77
78   pool_get (f->ooo_segments, s);
79
80   s->start = start;
81   s->length = length;
82
83   s->prev = s->next = OOO_SEGMENT_INVALID_INDEX;
84
85   return s;
86 }
87
88 always_inline void
89 ooo_segment_del (svm_fifo_t * f, u32 index)
90 {
91   ooo_segment_t *cur, *prev = 0, *next = 0;
92   cur = pool_elt_at_index (f->ooo_segments, index);
93
94   if (cur->next != OOO_SEGMENT_INVALID_INDEX)
95     {
96       next = pool_elt_at_index (f->ooo_segments, cur->next);
97       next->prev = cur->prev;
98     }
99
100   if (cur->prev != OOO_SEGMENT_INVALID_INDEX)
101     {
102       prev = pool_elt_at_index (f->ooo_segments, cur->prev);
103       prev->next = cur->next;
104     }
105   else
106     {
107       f->ooos_list_head = cur->next;
108     }
109
110   pool_put (f->ooo_segments, cur);
111 }
112
113 /**
114  * Add segment to fifo's out-of-order segment list. Takes care of merging
115  * adjacent segments and removing overlapping ones.
116  */
117 static void
118 ooo_segment_add (svm_fifo_t * f, u32 offset, u32 length)
119 {
120   ooo_segment_t *s, *new_s, *prev, *next, *it;
121   u32 new_index, end_offset, s_sof, s_eof, s_index;
122
123   end_offset = offset + length;
124
125   if (f->ooos_list_head == OOO_SEGMENT_INVALID_INDEX)
126     {
127       s = ooo_segment_new (f, offset, length);
128       f->ooos_list_head = s - f->ooo_segments;
129       f->ooos_newest = f->ooos_list_head;
130       return;
131     }
132
133   /* Find first segment that starts after new segment */
134   s = pool_elt_at_index (f->ooo_segments, f->ooos_list_head);
135   while (s->next != OOO_SEGMENT_INVALID_INDEX
136          && offset_leq (ooo_segment_offset (f, s), offset))
137     s = pool_elt_at_index (f->ooo_segments, s->next);
138
139   s_index = s - f->ooo_segments;
140   s_sof = ooo_segment_offset (f, s);
141   s_eof = ooo_segment_end_offset (f, s);
142   prev = ooo_segment_get_prev (f, s);
143
144   /* No overlap, add before current segment */
145   if (offset_lt (end_offset, s_sof)
146       && (!prev || offset_lt (prev->start + prev->length, offset)))
147     {
148       new_s = ooo_segment_new (f, offset, length);
149       new_index = new_s - f->ooo_segments;
150
151       /* Pool might've moved, get segment again */
152       s = pool_elt_at_index (f->ooo_segments, s_index);
153       if (s->prev != OOO_SEGMENT_INVALID_INDEX)
154         {
155           new_s->prev = s->prev;
156           prev = pool_elt_at_index (f->ooo_segments, new_s->prev);
157           prev->next = new_index;
158         }
159       else
160         {
161           /* New head */
162           f->ooos_list_head = new_index;
163         }
164
165       new_s->next = s - f->ooo_segments;
166       s->prev = new_index;
167       f->ooos_newest = new_index;
168       return;
169     }
170   /* No overlap, add after current segment */
171   else if (offset_lt (s_eof, offset))
172     {
173       new_s = ooo_segment_new (f, offset, length);
174       new_index = new_s - f->ooo_segments;
175
176       /* Pool might've moved, get segment again */
177       s = pool_elt_at_index (f->ooo_segments, s_index);
178
179       if (s->next != OOO_SEGMENT_INVALID_INDEX)
180         {
181           new_s->next = s->next;
182           next = pool_elt_at_index (f->ooo_segments, new_s->next);
183           next->prev = new_index;
184         }
185
186       new_s->prev = s - f->ooo_segments;
187       s->next = new_index;
188       f->ooos_newest = new_index;
189
190       return;
191     }
192
193   /*
194    * Merge needed
195    */
196
197   /* Merge at head */
198   if (offset_leq (offset, s_sof))
199     {
200       /* If we have a previous, check if we overlap */
201       if (s->prev != OOO_SEGMENT_INVALID_INDEX)
202         {
203           prev = pool_elt_at_index (f->ooo_segments, s->prev);
204
205           /* New segment merges prev and current. Remove previous and
206            * update position of current. */
207           if (offset_leq (offset, ooo_segment_end_offset (f, prev)))
208             {
209               s->start = prev->start;
210               s->length = s_eof - ooo_segment_offset (f, prev);
211               ooo_segment_del (f, s->prev);
212             }
213           else
214             {
215               s->start = offset;
216               s->length = s_eof - ooo_segment_offset (f, s);
217             }
218         }
219       else
220         {
221           s->start = offset;
222           s->length = s_eof - ooo_segment_offset (f, s);
223         }
224
225       /* The new segment's tail may cover multiple smaller ones */
226       if (offset_lt (s_eof, end_offset))
227         {
228           /* Remove segments completely covered */
229           it = (s->next != OOO_SEGMENT_INVALID_INDEX) ?
230             pool_elt_at_index (f->ooo_segments, s->next) : 0;
231           while (it && offset_lt (ooo_segment_end_offset (f, it), end_offset))
232             {
233               next = (it->next != OOO_SEGMENT_INVALID_INDEX) ?
234                 pool_elt_at_index (f->ooo_segments, it->next) : 0;
235               ooo_segment_del (f, it - f->ooo_segments);
236               it = next;
237             }
238
239           /* Update length. Segment's start might have changed. */
240           s->length = end_offset - ooo_segment_offset (f, s);
241
242           /* If partial overlap with last, merge */
243           if (it && offset_lt (ooo_segment_offset (f, it), end_offset))
244             {
245               s->length +=
246                 it->length - (ooo_segment_offset (f, it) - end_offset);
247               ooo_segment_del (f, it - f->ooo_segments);
248             }
249         }
250     }
251   /* Last but overlapping previous */
252   else if (offset_leq (s_eof, end_offset))
253     {
254       s->length = end_offset - ooo_segment_offset (f, s);
255     }
256   /* New segment completely covered by current one */
257   else
258     {
259       /* Do Nothing */
260     }
261
262   /* Most recently updated segment */
263   f->ooos_newest = s - f->ooo_segments;
264 }
265
266 /**
267  * Removes segments that can now be enqueued because the fifo's tail has
268  * advanced. Returns the number of bytes added to tail.
269  */
270 static int
271 ooo_segment_try_collect (svm_fifo_t * f, u32 n_bytes_enqueued)
272 {
273   ooo_segment_t *s;
274   u32 index, bytes = 0, diff;
275   u32 cursize;
276
277   /* read cursize, which can only increase while we're working */
278   cursize = svm_fifo_max_dequeue (f);
279
280   s = pool_elt_at_index (f->ooo_segments, f->ooos_list_head);
281
282   /* If last tail update overlaps one/multiple ooo segments, remove them */
283   diff = (f->nitems + ((int) s->start - f->tail)) % f->nitems;
284   while (0 < diff && diff < n_bytes_enqueued)
285     {
286       /* Segment end is beyond the tail. Advance tail and be done */
287       if (diff < s->length)
288         {
289           f->tail += s->length - diff;
290           f->tail %= f->nitems;
291           break;
292         }
293       /* If we have next go on */
294       else if (s->next != OOO_SEGMENT_INVALID_INDEX)
295         {
296           index = s - f->ooo_segments;
297           s = pool_elt_at_index (f->ooo_segments, s->next);
298           diff = (f->nitems + ((int) s->start - f->tail)) % f->nitems;
299           ooo_segment_del (f, index);
300         }
301       /* End of search */
302       else
303         {
304           break;
305         }
306     }
307
308   /* If tail is adjacent to an ooo segment, 'consume' it */
309   if (diff == 0)
310     {
311       bytes = ((f->nitems - cursize) >= s->length) ? s->length :
312         f->nitems - cursize;
313
314       f->tail += bytes;
315       f->tail %= f->nitems;
316
317       ooo_segment_del (f, s - f->ooo_segments);
318     }
319
320   return bytes;
321 }
322
323 static int
324 svm_fifo_enqueue_internal (svm_fifo_t * f,
325                            int pid, u32 max_bytes, u8 * copy_from_here)
326 {
327   u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
328   u32 cursize, nitems;
329
330   /* read cursize, which can only increase while we're working */
331   cursize = svm_fifo_max_dequeue (f);
332
333   if (PREDICT_FALSE (cursize == f->nitems))
334     return -2;                  /* fifo stuffed */
335
336   nitems = f->nitems;
337
338   /* Number of bytes we're going to copy */
339   total_copy_bytes = (nitems - cursize) < max_bytes ?
340     (nitems - cursize) : max_bytes;
341
342   if (PREDICT_TRUE (copy_from_here != 0))
343     {
344       /* Number of bytes in first copy segment */
345       first_copy_bytes = ((nitems - f->tail) < total_copy_bytes)
346         ? (nitems - f->tail) : total_copy_bytes;
347
348       clib_memcpy (&f->data[f->tail], copy_from_here, first_copy_bytes);
349       f->tail += first_copy_bytes;
350       f->tail = (f->tail == nitems) ? 0 : f->tail;
351
352       /* Number of bytes in second copy segment, if any */
353       second_copy_bytes = total_copy_bytes - first_copy_bytes;
354       if (second_copy_bytes)
355         {
356           clib_memcpy (&f->data[f->tail], copy_from_here + first_copy_bytes,
357                        second_copy_bytes);
358           f->tail += second_copy_bytes;
359           f->tail = (f->tail == nitems) ? 0 : f->tail;
360         }
361     }
362   else
363     {
364       /* Account for a zero-copy enqueue done elsewhere */
365       ASSERT (max_bytes <= (nitems - cursize));
366       f->tail += max_bytes;
367       f->tail = f->tail % nitems;
368       total_copy_bytes = max_bytes;
369     }
370
371   /* Any out-of-order segments to collect? */
372   if (PREDICT_FALSE (f->ooos_list_head != OOO_SEGMENT_INVALID_INDEX))
373     total_copy_bytes += ooo_segment_try_collect (f, total_copy_bytes);
374
375   /* Atomically increase the queue length */
376   __sync_fetch_and_add (&f->cursize, total_copy_bytes);
377
378   return (total_copy_bytes);
379 }
380
381 int
382 svm_fifo_enqueue_nowait (svm_fifo_t * f,
383                          int pid, u32 max_bytes, u8 * copy_from_here)
384 {
385   return svm_fifo_enqueue_internal (f, pid, max_bytes, copy_from_here);
386 }
387
388 /**
389  * Enqueue a future segment.
390  *
391  * Two choices: either copies the entire segment, or copies nothing
392  * Returns 0 of the entire segment was copied
393  * Returns -1 if none of the segment was copied due to lack of space
394  */
395 static int
396 svm_fifo_enqueue_with_offset_internal (svm_fifo_t * f,
397                                        int pid,
398                                        u32 offset,
399                                        u32 required_bytes,
400                                        u8 * copy_from_here)
401 {
402   u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
403   u32 cursize, nitems;
404   u32 normalized_offset;
405   int rv;
406
407   /* Safety: don't wrap more than nitems/2 */
408   ASSERT ((f->nitems + offset - f->tail) % f->nitems < f->nitems / 2);
409
410   /* Users would do do well to avoid this */
411   if (PREDICT_FALSE (f->tail == (offset % f->nitems)))
412     {
413       rv = svm_fifo_enqueue_internal (f, pid, required_bytes, copy_from_here);
414       if (rv > 0)
415         return 0;
416       return -1;
417     }
418
419   /* read cursize, which can only increase while we're working */
420   cursize = svm_fifo_max_dequeue (f);
421   nitems = f->nitems;
422
423   /* Will this request fit? */
424   if ((required_bytes + offset) > (nitems - cursize))
425     return -1;
426
427   ooo_segment_add (f, offset, required_bytes);
428
429   /* Number of bytes we're going to copy */
430   total_copy_bytes = required_bytes;
431   normalized_offset = offset % nitems;
432
433   /* Number of bytes in first copy segment */
434   first_copy_bytes = ((nitems - normalized_offset) < total_copy_bytes)
435     ? (nitems - normalized_offset) : total_copy_bytes;
436
437   clib_memcpy (&f->data[normalized_offset], copy_from_here, first_copy_bytes);
438
439   /* Number of bytes in second copy segment, if any */
440   second_copy_bytes = total_copy_bytes - first_copy_bytes;
441   if (second_copy_bytes)
442     {
443       normalized_offset += first_copy_bytes;
444       normalized_offset %= nitems;
445
446       ASSERT (normalized_offset == 0);
447
448       clib_memcpy (&f->data[normalized_offset],
449                    copy_from_here + first_copy_bytes, second_copy_bytes);
450     }
451
452   return (0);
453 }
454
455
456 int
457 svm_fifo_enqueue_with_offset (svm_fifo_t * f,
458                               int pid,
459                               u32 offset,
460                               u32 required_bytes, u8 * copy_from_here)
461 {
462   return svm_fifo_enqueue_with_offset_internal
463     (f, pid, offset, required_bytes, copy_from_here);
464 }
465
466
467 static int
468 svm_fifo_dequeue_internal (svm_fifo_t * f,
469                            int pid, u32 max_bytes, u8 * copy_here)
470 {
471   u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
472   u32 cursize, nitems;
473
474   /* read cursize, which can only increase while we're working */
475   cursize = svm_fifo_max_dequeue (f);
476   if (PREDICT_FALSE (cursize == 0))
477     return -2;                  /* nothing in the fifo */
478
479   nitems = f->nitems;
480
481   /* Number of bytes we're going to copy */
482   total_copy_bytes = (cursize < max_bytes) ? cursize : max_bytes;
483
484   if (PREDICT_TRUE (copy_here != 0))
485     {
486       /* Number of bytes in first copy segment */
487       first_copy_bytes = ((nitems - f->head) < total_copy_bytes)
488         ? (nitems - f->head) : total_copy_bytes;
489       clib_memcpy (copy_here, &f->data[f->head], first_copy_bytes);
490       f->head += first_copy_bytes;
491       f->head = (f->head == nitems) ? 0 : f->head;
492
493       /* Number of bytes in second copy segment, if any */
494       second_copy_bytes = total_copy_bytes - first_copy_bytes;
495       if (second_copy_bytes)
496         {
497           clib_memcpy (copy_here + first_copy_bytes,
498                        &f->data[f->head], second_copy_bytes);
499           f->head += second_copy_bytes;
500           f->head = (f->head == nitems) ? 0 : f->head;
501         }
502     }
503   else
504     {
505       /* Account for a zero-copy dequeue done elsewhere */
506       ASSERT (max_bytes <= cursize);
507       f->head += max_bytes;
508       f->head = f->head % nitems;
509       cursize -= max_bytes;
510       total_copy_bytes = max_bytes;
511     }
512
513   __sync_fetch_and_sub (&f->cursize, total_copy_bytes);
514
515   return (total_copy_bytes);
516 }
517
518 int
519 svm_fifo_dequeue_nowait (svm_fifo_t * f,
520                          int pid, u32 max_bytes, u8 * copy_here)
521 {
522   return svm_fifo_dequeue_internal (f, pid, max_bytes, copy_here);
523 }
524
525 int
526 svm_fifo_peek (svm_fifo_t * f, int pid, u32 offset, u32 max_bytes,
527                u8 * copy_here)
528 {
529   u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
530   u32 cursize, nitems, real_head;
531
532   /* read cursize, which can only increase while we're working */
533   cursize = svm_fifo_max_dequeue (f);
534   if (PREDICT_FALSE (cursize == 0))
535     return -2;                  /* nothing in the fifo */
536
537   nitems = f->nitems;
538   real_head = f->head + offset;
539   real_head = real_head >= nitems ? real_head - nitems : real_head;
540
541   /* Number of bytes we're going to copy */
542   total_copy_bytes = (cursize < max_bytes) ? cursize : max_bytes;
543
544   if (PREDICT_TRUE (copy_here != 0))
545     {
546       /* Number of bytes in first copy segment */
547       first_copy_bytes =
548         ((nitems - real_head) < total_copy_bytes) ?
549         (nitems - real_head) : total_copy_bytes;
550       clib_memcpy (copy_here, &f->data[real_head], first_copy_bytes);
551
552       /* Number of bytes in second copy segment, if any */
553       second_copy_bytes = total_copy_bytes - first_copy_bytes;
554       if (second_copy_bytes)
555         {
556           clib_memcpy (copy_here + first_copy_bytes, &f->data[0],
557                        second_copy_bytes);
558         }
559     }
560   return total_copy_bytes;
561 }
562
563 int
564 svm_fifo_dequeue_drop (svm_fifo_t * f, int pid, u32 max_bytes)
565 {
566   u32 total_drop_bytes, first_drop_bytes, second_drop_bytes;
567   u32 cursize, nitems;
568
569   /* read cursize, which can only increase while we're working */
570   cursize = svm_fifo_max_dequeue (f);
571   if (PREDICT_FALSE (cursize == 0))
572     return -2;                  /* nothing in the fifo */
573
574   nitems = f->nitems;
575
576   /* Number of bytes we're going to drop */
577   total_drop_bytes = (cursize < max_bytes) ? cursize : max_bytes;
578
579   /* Number of bytes in first copy segment */
580   first_drop_bytes =
581     ((nitems - f->head) < total_drop_bytes) ?
582     (nitems - f->head) : total_drop_bytes;
583   f->head += first_drop_bytes;
584   f->head = (f->head == nitems) ? 0 : f->head;
585
586   /* Number of bytes in second drop segment, if any */
587   second_drop_bytes = total_drop_bytes - first_drop_bytes;
588   if (second_drop_bytes)
589     {
590       f->head += second_drop_bytes;
591       f->head = (f->head == nitems) ? 0 : f->head;
592     }
593
594   __sync_fetch_and_sub (&f->cursize, total_drop_bytes);
595
596   return total_drop_bytes;
597 }
598
599 u8 *
600 format_svm_fifo (u8 * s, va_list * args)
601 {
602   svm_fifo_t *f = va_arg (*args, svm_fifo_t *);
603   int verbose = va_arg (*args, int);
604
605   s = format (s, "cursize %u nitems %u has_event %d\n",
606               f->cursize, f->nitems, f->has_event);
607   s = format (s, "head %d tail %d\n", f->head, f->tail);
608
609   if (verbose > 1)
610     s = format
611       (s, "server session %d thread %d client session %d thread %d\n",
612        f->server_session_index, f->server_thread_index,
613        f->client_session_index, f->client_thread_index);
614
615   if (verbose)
616     {
617       ooo_segment_t *seg;
618       u32 seg_index;
619
620       s = format (s, "ooo pool %d active elts\n",
621                   pool_elts (f->ooo_segments));
622
623       seg_index = f->ooos_list_head;
624
625       while (seg_index != OOO_SEGMENT_INVALID_INDEX)
626         {
627           seg = pool_elt_at_index (f->ooo_segments, seg_index);
628           s = format (s, "  pos %u, len %u next %d\n",
629                       seg->start, seg->length, seg->next);
630           seg_index = seg->next;
631         }
632     }
633   return s;
634 }
635
636 u32
637 svm_fifo_number_ooo_segments (svm_fifo_t * f)
638 {
639   return pool_elts (f->ooo_segments);
640 }
641
642 ooo_segment_t *
643 svm_fifo_first_ooo_segment (svm_fifo_t * f)
644 {
645   return pool_elt_at_index (f->ooo_segments, f->ooos_list_head);
646 }
647
648 /*
649  * fd.io coding-style-patch-verification: ON
650  *
651  * Local Variables:
652  * eval: (c-set-style "gnu")
653  * End:
654  */