TCP/session improvements
[vpp.git] / src / svm / svm_fifo.c
1 /*
2  * Copyright (c) 2016 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <svm/svm_fifo.h>
17
18 /** create an svm fifo, in the current heap. Fails vs blow up the process */
19 svm_fifo_t *
20 svm_fifo_create (u32 data_size_in_bytes)
21 {
22   svm_fifo_t *f;
23   pthread_mutexattr_t attr;
24   pthread_condattr_t cattr;
25
26   f = clib_mem_alloc_aligned_or_null (sizeof (*f) + data_size_in_bytes,
27                                       CLIB_CACHE_LINE_BYTES);
28   if (f == 0)
29     return 0;
30
31   memset (f, 0, sizeof (*f) + data_size_in_bytes);
32   f->nitems = data_size_in_bytes;
33   f->ooos_list_head = OOO_SEGMENT_INVALID_INDEX;
34
35   memset (&attr, 0, sizeof (attr));
36   memset (&cattr, 0, sizeof (cattr));
37
38   if (pthread_mutexattr_init (&attr))
39     clib_unix_warning ("mutexattr_init");
40   if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
41     clib_unix_warning ("pthread_mutexattr_setpshared");
42   if (pthread_mutex_init (&f->mutex, &attr))
43     clib_unix_warning ("mutex_init");
44   if (pthread_mutexattr_destroy (&attr))
45     clib_unix_warning ("mutexattr_destroy");
46   if (pthread_condattr_init (&cattr))
47     clib_unix_warning ("condattr_init");
48   if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
49     clib_unix_warning ("condattr_setpshared");
50   if (pthread_cond_init (&f->condvar, &cattr))
51     clib_unix_warning ("cond_init1");
52   if (pthread_condattr_destroy (&cattr))
53     clib_unix_warning ("cond_init2");
54
55   return (f);
56 }
57
58 always_inline ooo_segment_t *
59 ooo_segment_new (svm_fifo_t * f, u32 start, u32 length)
60 {
61   ooo_segment_t *s;
62
63   pool_get (f->ooo_segments, s);
64
65   s->fifo_position = start;
66   s->length = length;
67
68   s->prev = s->next = OOO_SEGMENT_INVALID_INDEX;
69
70   return s;
71 }
72
73 always_inline void
74 ooo_segment_del (svm_fifo_t * f, u32 index)
75 {
76   ooo_segment_t *cur, *prev = 0, *next = 0;
77   cur = pool_elt_at_index (f->ooo_segments, index);
78
79   if (cur->next != OOO_SEGMENT_INVALID_INDEX)
80     {
81       next = pool_elt_at_index (f->ooo_segments, cur->next);
82       next->prev = cur->prev;
83     }
84
85   if (cur->prev != OOO_SEGMENT_INVALID_INDEX)
86     {
87       prev = pool_elt_at_index (f->ooo_segments, cur->prev);
88       prev->next = cur->next;
89     }
90   else
91     {
92       f->ooos_list_head = cur->next;
93     }
94
95   pool_put (f->ooo_segments, cur);
96 }
97
98 /**
99  * Add segment to fifo's out-of-order segment list. Takes care of merging
100  * adjacent segments and removing overlapping ones.
101  */
102 static void
103 ooo_segment_add (svm_fifo_t * f, u32 offset, u32 length)
104 {
105   ooo_segment_t *s, *new_s, *prev, *next, *it;
106   u32 new_index, position, end_offset, s_sof, s_eof, s_index;
107
108   position = (f->tail + offset) % f->nitems;
109   end_offset = offset + length;
110
111   if (f->ooos_list_head == OOO_SEGMENT_INVALID_INDEX)
112     {
113       s = ooo_segment_new (f, position, length);
114       f->ooos_list_head = s - f->ooo_segments;
115       f->ooos_newest = f->ooos_list_head;
116       return;
117     }
118
119   /* Find first segment that starts after new segment */
120   s = pool_elt_at_index (f->ooo_segments, f->ooos_list_head);
121   while (s->next != OOO_SEGMENT_INVALID_INDEX
122          && ooo_segment_offset (f, s) <= offset)
123     s = pool_elt_at_index (f->ooo_segments, s->next);
124
125   s_index = s - f->ooo_segments;
126   s_sof = ooo_segment_offset (f, s);
127   s_eof = ooo_segment_end_offset (f, s);
128
129   /* No overlap, add before current segment */
130   if (end_offset < s_sof)
131     {
132       new_s = ooo_segment_new (f, position, length);
133       new_index = new_s - f->ooo_segments;
134
135       /* Pool might've moved, get segment again */
136       s = pool_elt_at_index (f->ooo_segments, s_index);
137
138       if (s->prev != OOO_SEGMENT_INVALID_INDEX)
139         {
140           new_s->prev = s->prev;
141
142           prev = pool_elt_at_index (f->ooo_segments, new_s->prev);
143           prev->next = new_index;
144         }
145       else
146         {
147           /* New head */
148           f->ooos_list_head = new_index;
149         }
150
151       new_s->next = s - f->ooo_segments;
152       s->prev = new_index;
153       f->ooos_newest = new_index;
154       return;
155     }
156   /* No overlap, add after current segment */
157   else if (s_eof < offset)
158     {
159       new_s = ooo_segment_new (f, position, length);
160       new_index = new_s - f->ooo_segments;
161
162       /* Pool might've moved, get segment again */
163       s = pool_elt_at_index (f->ooo_segments, s_index);
164
165       if (s->next != OOO_SEGMENT_INVALID_INDEX)
166         {
167           new_s->next = s->next;
168
169           next = pool_elt_at_index (f->ooo_segments, new_s->next);
170           next->prev = new_index;
171         }
172
173       new_s->prev = s - f->ooo_segments;
174       s->next = new_index;
175       f->ooos_newest = new_index;
176
177       return;
178     }
179
180   /*
181    * Merge needed
182    */
183
184   /* Merge at head */
185   if (offset <= s_sof)
186     {
187       /* If we have a previous, check if we overlap */
188       if (s->prev != OOO_SEGMENT_INVALID_INDEX)
189         {
190           prev = pool_elt_at_index (f->ooo_segments, s->prev);
191
192           /* New segment merges prev and current. Remove previous and
193            * update position of current. */
194           if (ooo_segment_end_offset (f, prev) >= offset)
195             {
196               s->fifo_position = prev->fifo_position;
197               s->length = s_eof - ooo_segment_offset (f, prev);
198               ooo_segment_del (f, s->prev);
199             }
200         }
201       else
202         {
203           s->fifo_position = position;
204           s->length = s_eof - ooo_segment_offset (f, s);
205         }
206
207       /* The new segment's tail may cover multiple smaller ones */
208       if (s_eof < end_offset)
209         {
210           /* Remove segments completely covered */
211           it = (s->next != OOO_SEGMENT_INVALID_INDEX) ?
212             pool_elt_at_index (f->ooo_segments, s->next) : 0;
213           while (it && ooo_segment_end_offset (f, it) < end_offset)
214             {
215               next = (it->next != OOO_SEGMENT_INVALID_INDEX) ?
216                 pool_elt_at_index (f->ooo_segments, it->next) : 0;
217               ooo_segment_del (f, it - f->ooo_segments);
218               it = next;
219             }
220
221           /* Update length. Segment's start might have changed. */
222           s->length = end_offset - ooo_segment_offset (f, s);
223
224           /* If partial overlap with last, merge */
225           if (it && ooo_segment_offset (f, it) < end_offset)
226             {
227               s->length +=
228                 it->length - (ooo_segment_offset (f, it) - end_offset);
229               ooo_segment_del (f, it - f->ooo_segments);
230             }
231         }
232     }
233   /* Last but overlapping previous */
234   else if (s_eof <= end_offset)
235     {
236       s->length = end_offset - ooo_segment_offset (f, s);
237     }
238   /* New segment completely covered by current one */
239   else
240     {
241       /* Do Nothing */
242     }
243
244   /* Most recently updated segment */
245   f->ooos_newest = s - f->ooo_segments;
246 }
247
248 /**
249  * Removes segments that can now be enqueued because the fifo's tail has
250  * advanced. Returns the number of bytes added to tail.
251  */
252 static int
253 ooo_segment_try_collect (svm_fifo_t * f, u32 n_bytes_enqueued)
254 {
255   ooo_segment_t *s;
256   u32 index, bytes = 0, diff;
257
258   s = pool_elt_at_index (f->ooo_segments, f->ooos_list_head);
259
260   /* If last tail update overlaps one/multiple ooo segments, remove them */
261   diff = (f->nitems + f->tail - s->fifo_position) % f->nitems;
262   while (0 < diff && diff < n_bytes_enqueued)
263     {
264       /* Segment end is beyond the tail. Advance tail and be done */
265       if (diff < s->length)
266         {
267           f->tail += s->length - diff;
268           f->tail %= f->nitems;
269           break;
270         }
271       /* If we have next go on */
272       else if (s->next != OOO_SEGMENT_INVALID_INDEX)
273         {
274           index = s - f->ooo_segments;
275           s = pool_elt_at_index (f->ooo_segments, s->next);
276           diff = (f->nitems + f->tail - s->fifo_position) % f->nitems;
277           ooo_segment_del (f, index);
278         }
279       /* End of search */
280       else
281         {
282           break;
283         }
284     }
285
286   /* If tail is adjacent to an ooo segment, 'consume' it */
287   if (diff == 0)
288     {
289       bytes = ((f->nitems - f->cursize) >= s->length) ? s->length :
290         f->nitems - f->cursize;
291
292       f->tail += bytes;
293       f->tail %= f->nitems;
294
295       ooo_segment_del (f, s - f->ooo_segments);
296     }
297
298   return bytes;
299 }
300
301 static int
302 svm_fifo_enqueue_internal (svm_fifo_t * f,
303                            int pid, u32 max_bytes, u8 * copy_from_here)
304 {
305   u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
306   u32 cursize, nitems;
307
308   if (PREDICT_FALSE (f->cursize == f->nitems))
309     return -2;                  /* fifo stuffed */
310
311   /* read cursize, which can only decrease while we're working */
312   cursize = f->cursize;
313   nitems = f->nitems;
314
315   /* Number of bytes we're going to copy */
316   total_copy_bytes = (nitems - cursize) < max_bytes ?
317     (nitems - cursize) : max_bytes;
318
319   if (PREDICT_TRUE (copy_from_here != 0))
320     {
321       /* Number of bytes in first copy segment */
322       first_copy_bytes = ((nitems - f->tail) < total_copy_bytes)
323         ? (nitems - f->tail) : total_copy_bytes;
324
325       clib_memcpy (&f->data[f->tail], copy_from_here, first_copy_bytes);
326       f->tail += first_copy_bytes;
327       f->tail = (f->tail == nitems) ? 0 : f->tail;
328
329       /* Number of bytes in second copy segment, if any */
330       second_copy_bytes = total_copy_bytes - first_copy_bytes;
331       if (second_copy_bytes)
332         {
333           clib_memcpy (&f->data[f->tail], copy_from_here + first_copy_bytes,
334                        second_copy_bytes);
335           f->tail += second_copy_bytes;
336           f->tail = (f->tail == nitems) ? 0 : f->tail;
337         }
338     }
339   else
340     {
341       /* Account for a zero-copy enqueue done elsewhere */
342       ASSERT (max_bytes <= (nitems - cursize));
343       f->tail += max_bytes;
344       f->tail = f->tail % nitems;
345       total_copy_bytes = max_bytes;
346     }
347
348   /* Any out-of-order segments to collect? */
349   if (PREDICT_FALSE (f->ooos_list_head != OOO_SEGMENT_INVALID_INDEX))
350     total_copy_bytes += ooo_segment_try_collect (f, total_copy_bytes);
351
352   /* Atomically increase the queue length */
353   __sync_fetch_and_add (&f->cursize, total_copy_bytes);
354
355   return (total_copy_bytes);
356 }
357
358 int
359 svm_fifo_enqueue_nowait (svm_fifo_t * f,
360                          int pid, u32 max_bytes, u8 * copy_from_here)
361 {
362   return svm_fifo_enqueue_internal (f, pid, max_bytes, copy_from_here);
363 }
364
365 /**
366  * Enqueue a future segment.
367  *
368  * Two choices: either copies the entire segment, or copies nothing
369  * Returns 0 of the entire segment was copied
370  * Returns -1 if none of the segment was copied due to lack of space
371  */
372 static int
373 svm_fifo_enqueue_with_offset_internal (svm_fifo_t * f,
374                                        int pid,
375                                        u32 offset,
376                                        u32 required_bytes,
377                                        u8 * copy_from_here)
378 {
379   u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
380   u32 cursize, nitems;
381   u32 tail_plus_offset;
382
383   ASSERT (offset > 0);
384
385   /* read cursize, which can only decrease while we're working */
386   cursize = f->cursize;
387   nitems = f->nitems;
388
389   /* Will this request fit? */
390   if ((required_bytes + offset) > (nitems - cursize))
391     return -1;
392
393   ooo_segment_add (f, offset, required_bytes);
394
395   /* Number of bytes we're going to copy */
396   total_copy_bytes = required_bytes;
397   tail_plus_offset = (f->tail + offset) % nitems;
398
399   /* Number of bytes in first copy segment */
400   first_copy_bytes = ((nitems - tail_plus_offset) < total_copy_bytes)
401     ? (nitems - tail_plus_offset) : total_copy_bytes;
402
403   clib_memcpy (&f->data[tail_plus_offset], copy_from_here, first_copy_bytes);
404
405   /* Number of bytes in second copy segment, if any */
406   second_copy_bytes = total_copy_bytes - first_copy_bytes;
407   if (second_copy_bytes)
408     {
409       tail_plus_offset += first_copy_bytes;
410       tail_plus_offset %= nitems;
411
412       ASSERT (tail_plus_offset == 0);
413
414       clib_memcpy (&f->data[tail_plus_offset],
415                    copy_from_here + first_copy_bytes, second_copy_bytes);
416     }
417
418   return (0);
419 }
420
421
422 int
423 svm_fifo_enqueue_with_offset (svm_fifo_t * f,
424                               int pid,
425                               u32 offset,
426                               u32 required_bytes, u8 * copy_from_here)
427 {
428   return svm_fifo_enqueue_with_offset_internal
429     (f, pid, offset, required_bytes, copy_from_here);
430 }
431
432
433 static int
434 svm_fifo_dequeue_internal (svm_fifo_t * f,
435                            int pid, u32 max_bytes, u8 * copy_here)
436 {
437   u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
438   u32 cursize, nitems;
439
440   if (PREDICT_FALSE (f->cursize == 0))
441     return -2;                  /* nothing in the fifo */
442
443   /* read cursize, which can only increase while we're working */
444   cursize = f->cursize;
445   nitems = f->nitems;
446
447   /* Number of bytes we're going to copy */
448   total_copy_bytes = (cursize < max_bytes) ? cursize : max_bytes;
449
450   if (PREDICT_TRUE (copy_here != 0))
451     {
452       /* Number of bytes in first copy segment */
453       first_copy_bytes = ((nitems - f->head) < total_copy_bytes)
454         ? (nitems - f->head) : total_copy_bytes;
455       clib_memcpy (copy_here, &f->data[f->head], first_copy_bytes);
456       f->head += first_copy_bytes;
457       f->head = (f->head == nitems) ? 0 : f->head;
458
459       /* Number of bytes in second copy segment, if any */
460       second_copy_bytes = total_copy_bytes - first_copy_bytes;
461       if (second_copy_bytes)
462         {
463           clib_memcpy (copy_here + first_copy_bytes,
464                        &f->data[f->head], second_copy_bytes);
465           f->head += second_copy_bytes;
466           f->head = (f->head == nitems) ? 0 : f->head;
467         }
468     }
469   else
470     {
471       /* Account for a zero-copy dequeue done elsewhere */
472       ASSERT (max_bytes <= cursize);
473       f->head += max_bytes;
474       f->head = f->head % nitems;
475       cursize -= max_bytes;
476       total_copy_bytes = max_bytes;
477     }
478
479   __sync_fetch_and_sub (&f->cursize, total_copy_bytes);
480
481   return (total_copy_bytes);
482 }
483
484 int
485 svm_fifo_dequeue_nowait (svm_fifo_t * f,
486                          int pid, u32 max_bytes, u8 * copy_here)
487 {
488   return svm_fifo_dequeue_internal (f, pid, max_bytes, copy_here);
489 }
490
491 int
492 svm_fifo_peek (svm_fifo_t * f, int pid, u32 offset, u32 max_bytes,
493                u8 * copy_here)
494 {
495   u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
496   u32 cursize, nitems, real_head;
497
498   if (PREDICT_FALSE (f->cursize == 0))
499     return -2;                  /* nothing in the fifo */
500
501   /* read cursize, which can only increase while we're working */
502   cursize = f->cursize;
503   nitems = f->nitems;
504   real_head = f->head + offset;
505   real_head = real_head >= nitems ? real_head - nitems : real_head;
506
507   /* Number of bytes we're going to copy */
508   total_copy_bytes = (cursize < max_bytes) ? cursize : max_bytes;
509
510   if (PREDICT_TRUE (copy_here != 0))
511     {
512       /* Number of bytes in first copy segment */
513       first_copy_bytes =
514         ((nitems - real_head) < total_copy_bytes) ?
515         (nitems - real_head) : total_copy_bytes;
516       clib_memcpy (copy_here, &f->data[real_head], first_copy_bytes);
517
518       /* Number of bytes in second copy segment, if any */
519       second_copy_bytes = total_copy_bytes - first_copy_bytes;
520       if (second_copy_bytes)
521         {
522           clib_memcpy (copy_here + first_copy_bytes, &f->data[0],
523                        second_copy_bytes);
524         }
525     }
526   return total_copy_bytes;
527 }
528
529 int
530 svm_fifo_dequeue_drop (svm_fifo_t * f, int pid, u32 max_bytes)
531 {
532   u32 total_drop_bytes, first_drop_bytes, second_drop_bytes;
533   u32 cursize, nitems;
534
535   if (PREDICT_FALSE (f->cursize == 0))
536     return -2;                  /* nothing in the fifo */
537
538   /* read cursize, which can only increase while we're working */
539   cursize = f->cursize;
540   nitems = f->nitems;
541
542   /* Number of bytes we're going to drop */
543   total_drop_bytes = (cursize < max_bytes) ? cursize : max_bytes;
544
545   /* Number of bytes in first copy segment */
546   first_drop_bytes =
547     ((nitems - f->head) < total_drop_bytes) ?
548     (nitems - f->head) : total_drop_bytes;
549   f->head += first_drop_bytes;
550   f->head = (f->head == nitems) ? 0 : f->head;
551
552   /* Number of bytes in second drop segment, if any */
553   second_drop_bytes = total_drop_bytes - first_drop_bytes;
554   if (second_drop_bytes)
555     {
556       f->head += second_drop_bytes;
557       f->head = (f->head == nitems) ? 0 : f->head;
558     }
559
560   __sync_fetch_and_sub (&f->cursize, total_drop_bytes);
561
562   return total_drop_bytes;
563 }
564
565 /*
566  * fd.io coding-style-patch-verification: ON
567  *
568  * Local Variables:
569  * eval: (c-set-style "gnu")
570  * End:
571  */