vlib: vlib frame bitmaps
[vpp.git] / src / vlib / buffer_funcs.c
1 /* SPDX-License-Identifier: Apache-2.0
2  * Copyright(c) 2021 Cisco Systems, Inc.
3  */
4
5 #include <vppinfra/clib.h>
6 #include <vlib/vlib.h>
7 #include <vppinfra/vector/mask_compare.h>
8 #include <vppinfra/vector/compress.h>
9
10 static_always_inline u32
11 enqueue_one (vlib_main_t *vm, vlib_node_runtime_t *node,
12              vlib_frame_bitmap_t used_elt_bmp, u16 next_index, u32 *buffers,
13              u16 *nexts, u32 n_buffers, u32 n_left, u32 *tmp)
14 {
15   vlib_frame_bitmap_t match_bmp;
16   vlib_frame_t *f;
17   u32 n_extracted, n_free;
18   u32 *to;
19
20   f = vlib_get_next_frame_internal (vm, node, next_index, 0);
21
22   n_free = VLIB_FRAME_SIZE - f->n_vectors;
23
24   /* if frame contains enough space for worst case scenario, we can avoid
25    * use of tmp */
26   if (n_free >= n_left)
27     to = (u32 *) vlib_frame_vector_args (f) + f->n_vectors;
28   else
29     to = tmp;
30
31   clib_mask_compare_u16 (next_index, nexts, match_bmp, n_buffers);
32   n_extracted = clib_compress_u32 (to, buffers, match_bmp, n_buffers);
33   vlib_frame_bitmap_or (used_elt_bmp, match_bmp);
34
35   if (to != tmp)
36     {
37       /* indices already written to frame, just close it */
38       vlib_put_next_frame (vm, node, next_index, n_free - n_extracted);
39     }
40   else if (n_free >= n_extracted)
41     {
42       /* enough space in the existing frame */
43       to = (u32 *) vlib_frame_vector_args (f) + f->n_vectors;
44       vlib_buffer_copy_indices (to, tmp, n_extracted);
45       vlib_put_next_frame (vm, node, next_index, n_free - n_extracted);
46     }
47   else
48     {
49       /* full frame */
50       to = (u32 *) vlib_frame_vector_args (f) + f->n_vectors;
51       vlib_buffer_copy_indices (to, tmp, n_free);
52       vlib_put_next_frame (vm, node, next_index, 0);
53
54       /* second frame */
55       u32 n_2nd_frame = n_extracted - n_free;
56       f = vlib_get_next_frame_internal (vm, node, next_index, 1);
57       to = vlib_frame_vector_args (f);
58       vlib_buffer_copy_indices (to, tmp + n_free, n_2nd_frame);
59       vlib_put_next_frame (vm, node, next_index,
60                            VLIB_FRAME_SIZE - n_2nd_frame);
61     }
62
63   return n_left - n_extracted;
64 }
65
66 void __clib_section (".vlib_buffer_enqueue_to_next_fn")
67 CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_next_fn)
68 (vlib_main_t *vm, vlib_node_runtime_t *node, u32 *buffers, u16 *nexts,
69  uword count)
70 {
71   u32 tmp[VLIB_FRAME_SIZE];
72   u32 n_left;
73   u16 next_index;
74
75   while (count >= VLIB_FRAME_SIZE)
76     {
77       vlib_frame_bitmap_t used_elt_bmp = {};
78       n_left = VLIB_FRAME_SIZE;
79       u32 off = 0;
80
81       next_index = nexts[0];
82       n_left = enqueue_one (vm, node, used_elt_bmp, next_index, buffers, nexts,
83                             VLIB_FRAME_SIZE, n_left, tmp);
84
85       while (n_left)
86         {
87           while (PREDICT_FALSE (used_elt_bmp[off] == ~0))
88             {
89               off++;
90               ASSERT (off < ARRAY_LEN (used_elt_bmp));
91             }
92
93           next_index =
94             nexts[off * 64 + count_trailing_zeros (~used_elt_bmp[off])];
95           n_left = enqueue_one (vm, node, used_elt_bmp, next_index, buffers,
96                                 nexts, VLIB_FRAME_SIZE, n_left, tmp);
97         }
98
99       buffers += VLIB_FRAME_SIZE;
100       nexts += VLIB_FRAME_SIZE;
101       count -= VLIB_FRAME_SIZE;
102     }
103
104   if (count)
105     {
106       vlib_frame_bitmap_t used_elt_bmp = {};
107       next_index = nexts[0];
108       n_left = count;
109       u32 off = 0;
110
111       n_left = enqueue_one (vm, node, used_elt_bmp, next_index, buffers, nexts,
112                             count, n_left, tmp);
113
114       while (n_left)
115         {
116           while (PREDICT_FALSE (used_elt_bmp[off] == ~0))
117             {
118               off++;
119               ASSERT (off < ARRAY_LEN (used_elt_bmp));
120             }
121
122           next_index =
123             nexts[off * 64 + count_trailing_zeros (~used_elt_bmp[off])];
124           n_left = enqueue_one (vm, node, used_elt_bmp, next_index, buffers,
125                                 nexts, count, n_left, tmp);
126         }
127     }
128 }
129
130 CLIB_MARCH_FN_REGISTRATION (vlib_buffer_enqueue_to_next_fn);
131
132 void __clib_section (".vlib_buffer_enqueue_to_single_next_fn")
133 CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_single_next_fn)
134 (vlib_main_t *vm, vlib_node_runtime_t *node, u32 *buffers, u16 next_index,
135  u32 count)
136 {
137   u32 *to_next, n_left_to_next, n_enq;
138
139   vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
140
141   if (PREDICT_TRUE (n_left_to_next >= count))
142     {
143       vlib_buffer_copy_indices (to_next, buffers, count);
144       n_left_to_next -= count;
145       vlib_put_next_frame (vm, node, next_index, n_left_to_next);
146       return;
147     }
148
149   n_enq = n_left_to_next;
150 next:
151   vlib_buffer_copy_indices (to_next, buffers, n_enq);
152   n_left_to_next -= n_enq;
153
154   if (PREDICT_FALSE (count > n_enq))
155     {
156       count -= n_enq;
157       buffers += n_enq;
158
159       vlib_put_next_frame (vm, node, next_index, n_left_to_next);
160       vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
161       n_enq = clib_min (n_left_to_next, count);
162       goto next;
163     }
164   vlib_put_next_frame (vm, node, next_index, n_left_to_next);
165 }
166 CLIB_MARCH_FN_REGISTRATION (vlib_buffer_enqueue_to_single_next_fn);
167
168 static inline vlib_frame_queue_elt_t *
169 vlib_get_frame_queue_elt (vlib_frame_queue_main_t *fqm, u32 index,
170                           int dont_wait)
171 {
172   vlib_frame_queue_t *fq;
173   u64 nelts, tail, new_tail;
174
175   fq = fqm->vlib_frame_queues[index];
176   ASSERT (fq);
177   nelts = fq->nelts;
178
179 retry:
180   tail = __atomic_load_n (&fq->tail, __ATOMIC_ACQUIRE);
181   new_tail = tail + 1;
182
183   if (new_tail >= fq->head + nelts)
184     {
185       if (dont_wait)
186         return 0;
187
188       /* Wait until a ring slot is available */
189       while (new_tail >= fq->head + nelts)
190         vlib_worker_thread_barrier_check ();
191     }
192
193   if (!__atomic_compare_exchange_n (&fq->tail, &tail, new_tail, 0 /* weak */,
194                                     __ATOMIC_RELAXED, __ATOMIC_RELAXED))
195     goto retry;
196
197   return fq->elts + (new_tail & (nelts - 1));
198 }
199
200 static_always_inline u32
201 vlib_buffer_enqueue_to_thread_inline (vlib_main_t *vm,
202                                       vlib_node_runtime_t *node,
203                                       vlib_frame_queue_main_t *fqm,
204                                       u32 *buffer_indices, u16 *thread_indices,
205                                       u32 n_packets, int drop_on_congestion)
206 {
207   u32 drop_list[VLIB_FRAME_SIZE], n_drop = 0;
208   vlib_frame_bitmap_t mask, used_elts = {};
209   vlib_frame_queue_elt_t *hf = 0;
210   u16 thread_index;
211   u32 n_comp, off = 0, n_left = n_packets;
212
213   thread_index = thread_indices[0];
214
215 more:
216   clib_mask_compare_u16 (thread_index, thread_indices, mask, n_packets);
217   hf = vlib_get_frame_queue_elt (fqm, thread_index, drop_on_congestion);
218
219   n_comp = clib_compress_u32 (hf ? hf->buffer_index : drop_list + n_drop,
220                               buffer_indices, mask, n_packets);
221
222   if (hf)
223     {
224       if (node->flags & VLIB_NODE_FLAG_TRACE)
225         hf->maybe_trace = 1;
226       hf->n_vectors = n_comp;
227       __atomic_store_n (&hf->valid, 1, __ATOMIC_RELEASE);
228       vlib_get_main_by_index (thread_index)->check_frame_queues = 1;
229     }
230   else
231     n_drop += n_comp;
232
233   n_left -= n_comp;
234
235   if (n_left)
236     {
237       vlib_frame_bitmap_or (used_elts, mask);
238
239       while (PREDICT_FALSE (used_elts[off] == ~0))
240         {
241           off++;
242           ASSERT (off < ARRAY_LEN (used_elts));
243         }
244
245       thread_index =
246         thread_indices[off * 64 + count_trailing_zeros (~used_elts[off])];
247       goto more;
248     }
249
250   if (drop_on_congestion && n_drop)
251     vlib_buffer_free (vm, drop_list, n_drop);
252
253   return n_packets - n_drop;
254 }
255
256 u32 __clib_section (".vlib_buffer_enqueue_to_thread_fn")
257 CLIB_MULTIARCH_FN (vlib_buffer_enqueue_to_thread_fn)
258 (vlib_main_t *vm, vlib_node_runtime_t *node, u32 frame_queue_index,
259  u32 *buffer_indices, u16 *thread_indices, u32 n_packets,
260  int drop_on_congestion)
261 {
262   vlib_thread_main_t *tm = vlib_get_thread_main ();
263   vlib_frame_queue_main_t *fqm;
264   u32 n_enq = 0;
265
266   fqm = vec_elt_at_index (tm->frame_queue_mains, frame_queue_index);
267
268   while (n_packets >= VLIB_FRAME_SIZE)
269     {
270       n_enq += vlib_buffer_enqueue_to_thread_inline (
271         vm, node, fqm, buffer_indices, thread_indices, VLIB_FRAME_SIZE,
272         drop_on_congestion);
273       buffer_indices += VLIB_FRAME_SIZE;
274       thread_indices += VLIB_FRAME_SIZE;
275       n_packets -= VLIB_FRAME_SIZE;
276     }
277
278   if (n_packets == 0)
279     return n_enq;
280
281   n_enq += vlib_buffer_enqueue_to_thread_inline (vm, node, fqm, buffer_indices,
282                                                  thread_indices, n_packets,
283                                                  drop_on_congestion);
284
285   return n_enq;
286 }
287
288 CLIB_MARCH_FN_REGISTRATION (vlib_buffer_enqueue_to_thread_fn);
289
290 u32 __clib_section (".vlib_frame_queue_dequeue_fn")
291 CLIB_MULTIARCH_FN (vlib_frame_queue_dequeue_fn)
292 (vlib_main_t *vm, vlib_frame_queue_main_t *fqm)
293 {
294   u32 thread_id = vm->thread_index;
295   vlib_frame_queue_t *fq = fqm->vlib_frame_queues[thread_id];
296   u32 mask = fq->nelts - 1;
297   vlib_frame_queue_elt_t *elt;
298   u32 n_free, n_copy, *from, *to = 0, processed = 0, vectors = 0;
299   vlib_frame_t *f = 0;
300
301   ASSERT (fq);
302   ASSERT (vm == vlib_global_main.vlib_mains[thread_id]);
303
304   if (PREDICT_FALSE (fqm->node_index == ~0))
305     return 0;
306   /*
307    * Gather trace data for frame queues
308    */
309   if (PREDICT_FALSE (fq->trace))
310     {
311       frame_queue_trace_t *fqt;
312       frame_queue_nelt_counter_t *fqh;
313       u32 elix;
314
315       fqt = &fqm->frame_queue_traces[thread_id];
316
317       fqt->nelts = fq->nelts;
318       fqt->head = fq->head;
319       fqt->tail = fq->tail;
320       fqt->threshold = fq->vector_threshold;
321       fqt->n_in_use = fqt->tail - fqt->head;
322       if (fqt->n_in_use >= fqt->nelts)
323         {
324           // if beyond max then use max
325           fqt->n_in_use = fqt->nelts - 1;
326         }
327
328       /* Record the number of elements in use in the histogram */
329       fqh = &fqm->frame_queue_histogram[thread_id];
330       fqh->count[fqt->n_in_use]++;
331
332       /* Record a snapshot of the elements in use */
333       for (elix = 0; elix < fqt->nelts; elix++)
334         {
335           elt = fq->elts + ((fq->head + 1 + elix) & (mask));
336           if (1 || elt->valid)
337             {
338               fqt->n_vectors[elix] = elt->n_vectors;
339             }
340         }
341       fqt->written = 1;
342     }
343
344   while (1)
345     {
346       if (fq->head == fq->tail)
347         break;
348
349       elt = fq->elts + ((fq->head + 1) & mask);
350
351       if (!__atomic_load_n (&elt->valid, __ATOMIC_ACQUIRE))
352         break;
353
354       from = elt->buffer_index + elt->offset;
355
356       ASSERT (elt->offset + elt->n_vectors <= VLIB_FRAME_SIZE);
357
358       if (f == 0)
359         {
360           f = vlib_get_frame_to_node (vm, fqm->node_index);
361           to = vlib_frame_vector_args (f);
362           n_free = VLIB_FRAME_SIZE;
363         }
364
365       if (elt->maybe_trace)
366         f->frame_flags |= VLIB_NODE_FLAG_TRACE;
367
368       n_copy = clib_min (n_free, elt->n_vectors);
369
370       vlib_buffer_copy_indices (to, from, n_copy);
371       to += n_copy;
372       n_free -= n_copy;
373       vectors += n_copy;
374
375       if (n_free == 0)
376         {
377           f->n_vectors = VLIB_FRAME_SIZE;
378           vlib_put_frame_to_node (vm, fqm->node_index, f);
379           f = 0;
380         }
381
382       if (n_copy < elt->n_vectors)
383         {
384           /* not empty - leave it on the ring */
385           elt->n_vectors -= n_copy;
386           elt->offset += n_copy;
387         }
388       else
389         {
390           /* empty - reset and bump head */
391           u32 sz = STRUCT_OFFSET_OF (vlib_frame_queue_elt_t, end_of_reset);
392           clib_memset (elt, 0, sz);
393           __atomic_store_n (&fq->head, fq->head + 1, __ATOMIC_RELEASE);
394           processed++;
395         }
396
397       /* Limit the number of packets pushed into the graph */
398       if (vectors >= fq->vector_threshold)
399         break;
400     }
401
402   if (f)
403     {
404       f->n_vectors = VLIB_FRAME_SIZE - n_free;
405       vlib_put_frame_to_node (vm, fqm->node_index, f);
406     }
407
408   return processed;
409 }
410
411 CLIB_MARCH_FN_REGISTRATION (vlib_frame_queue_dequeue_fn);
412
413 #ifndef CLIB_MARCH_VARIANT
414 vlib_buffer_func_main_t vlib_buffer_func_main;
415
416 static clib_error_t *
417 vlib_buffer_funcs_init (vlib_main_t *vm)
418 {
419   vlib_buffer_func_main_t *bfm = &vlib_buffer_func_main;
420   bfm->buffer_enqueue_to_next_fn =
421     CLIB_MARCH_FN_POINTER (vlib_buffer_enqueue_to_next_fn);
422   bfm->buffer_enqueue_to_single_next_fn =
423     CLIB_MARCH_FN_POINTER (vlib_buffer_enqueue_to_single_next_fn);
424   bfm->buffer_enqueue_to_thread_fn =
425     CLIB_MARCH_FN_POINTER (vlib_buffer_enqueue_to_thread_fn);
426   bfm->frame_queue_dequeue_fn =
427     CLIB_MARCH_FN_POINTER (vlib_frame_queue_dequeue_fn);
428   return 0;
429 }
430
431 VLIB_INIT_FUNCTION (vlib_buffer_funcs_init);
432 #endif