Use thread local storage for thread index
[vpp.git] / src / vnet / dpo / replicate_dpo.c
1 /*
2  * Copyright (c) 2016 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/ip/lookup.h>
17 #include <vnet/dpo/replicate_dpo.h>
18 #include <vnet/dpo/drop_dpo.h>
19 #include <vnet/adj/adj.h>
20
21 #undef REP_DEBUG
22
23 #ifdef REP_DEBUG
24 #define REP_DBG(_rep, _fmt, _args...)                                   \
25 {                                                                       \
26     u8* _tmp =NULL;                                                     \
27     clib_warning("rep:[%s]:" _fmt,                                      \
28                  replicate_format(replicate_get_index((_rep)),          \
29                                   0, _tmp),                             \
30                  ##_args);                                              \
31     vec_free(_tmp);                                                     \
32 }
33 #else
34 #define REP_DBG(_p, _fmt, _args...)
35 #endif
36
37 #define foreach_replicate_dpo_error                       \
38 _(BUFFER_ALLOCATION_FAILURE, "Buffer Allocation Failure")
39
40 typedef enum {
41 #define _(sym,str) REPLICATE_DPO_ERROR_##sym,
42   foreach_replicate_dpo_error
43 #undef _
44   REPLICATE_DPO_N_ERROR,
45 } replicate_dpo_error_t;
46
47 static char * replicate_dpo_error_strings[] = {
48 #define _(sym,string) string,
49   foreach_replicate_dpo_error
50 #undef _
51 };
52
53 /**
54  * Pool of all DPOs. It's not static so the DP can have fast access
55  */
56 replicate_t *replicate_pool;
57
58 /**
59  * The one instance of replicate main
60  */
61 replicate_main_t replicate_main;
62
63 static inline index_t
64 replicate_get_index (const replicate_t *rep)
65 {
66     return (rep - replicate_pool);
67 }
68
69 static inline dpo_id_t*
70 replicate_get_buckets (replicate_t *rep)
71 {
72     if (REP_HAS_INLINE_BUCKETS(rep))
73     {
74         return (rep->rep_buckets_inline);
75     }
76     else
77     {
78         return (rep->rep_buckets);
79     }
80 }
81
82 static replicate_t *
83 replicate_alloc_i (void)
84 {
85     replicate_t *rep;
86
87     pool_get_aligned(replicate_pool, rep, CLIB_CACHE_LINE_BYTES);
88     memset(rep, 0, sizeof(*rep));
89
90     vlib_validate_combined_counter(&(replicate_main.repm_counters),
91                                    replicate_get_index(rep));
92     vlib_zero_combined_counter(&(replicate_main.repm_counters),
93                                replicate_get_index(rep));
94
95     return (rep);
96 }
97
98 static u8*
99 replicate_format (index_t repi,
100                   replicate_format_flags_t flags,
101                   u32 indent,
102                   u8 *s)
103 {
104     vlib_counter_t to;
105     replicate_t *rep;
106     dpo_id_t *buckets;
107     u32 i;
108
109     rep = replicate_get(repi);
110     vlib_get_combined_counter(&(replicate_main.repm_counters), repi, &to);
111     buckets = replicate_get_buckets(rep);
112
113     s = format(s, "%U: ", format_dpo_type, DPO_REPLICATE);
114     s = format(s, "[index:%d buckets:%d ", repi, rep->rep_n_buckets);
115     s = format(s, "to:[%Ld:%Ld]]", to.packets, to.bytes);
116
117     for (i = 0; i < rep->rep_n_buckets; i++)
118     {
119         s = format(s, "\n%U", format_white_space, indent+2);
120         s = format(s, "[%d]", i);
121         s = format(s, " %U", format_dpo_id, &buckets[i], indent+6);
122     }
123     return (s);
124 }
125
126 u8*
127 format_replicate (u8 * s, va_list * args)
128 {
129     index_t repi = va_arg(*args, index_t);
130     replicate_format_flags_t flags = va_arg(*args, replicate_format_flags_t);
131
132     return (replicate_format(repi, flags, 0, s));
133 }
134 static u8*
135 format_replicate_dpo (u8 * s, va_list * args)
136 {
137     index_t repi = va_arg(*args, index_t);
138     u32 indent = va_arg(*args, u32);
139
140     return (replicate_format(repi, REPLICATE_FORMAT_DETAIL, indent, s));
141 }
142
143
144 static replicate_t *
145 replicate_create_i (u32 num_buckets,
146                     dpo_proto_t rep_proto)
147 {
148     replicate_t *rep;
149
150     rep = replicate_alloc_i();
151     rep->rep_n_buckets = num_buckets;
152     rep->rep_proto = rep_proto;
153
154     if (!REP_HAS_INLINE_BUCKETS(rep))
155     {
156         vec_validate_aligned(rep->rep_buckets,
157                              rep->rep_n_buckets - 1,
158                              CLIB_CACHE_LINE_BYTES);
159     }
160
161     REP_DBG(rep, "create");
162
163     return (rep);
164 }
165
166 index_t
167 replicate_create (u32 n_buckets,
168                   dpo_proto_t rep_proto)
169 {
170     return (replicate_get_index(replicate_create_i(n_buckets, rep_proto)));
171 }
172
173 static inline void
174 replicate_set_bucket_i (replicate_t *rep,
175                         u32 bucket,
176                         dpo_id_t *buckets,
177                         const dpo_id_t *next)
178 {
179     dpo_stack(DPO_REPLICATE, rep->rep_proto, &buckets[bucket], next);
180 }
181
182 void
183 replicate_set_bucket (index_t repi,
184                       u32 bucket,
185                       const dpo_id_t *next)
186 {
187     replicate_t *rep;
188     dpo_id_t *buckets;
189
190     rep = replicate_get(repi);
191     buckets = replicate_get_buckets(rep);
192
193     ASSERT(bucket < rep->rep_n_buckets);
194
195     replicate_set_bucket_i(rep, bucket, buckets, next);
196 }
197
198 int
199 replicate_is_drop (const dpo_id_t *dpo)
200 {
201     replicate_t *rep;
202
203     if (DPO_REPLICATE != dpo->dpoi_type)
204         return (0);
205
206     rep = replicate_get(dpo->dpoi_index);
207
208     if (1 == rep->rep_n_buckets)
209     {
210         return (dpo_is_drop(replicate_get_bucket_i(rep, 0)));
211     }
212     return (0);
213 }
214
215 const dpo_id_t *
216 replicate_get_bucket (index_t repi,
217                       u32 bucket)
218 {
219     replicate_t *rep;
220
221     rep = replicate_get(repi);
222
223     return (replicate_get_bucket_i(rep, bucket));
224 }
225
226
227 static load_balance_path_t *
228 replicate_multipath_next_hop_fixup (load_balance_path_t *nhs,
229                                     dpo_proto_t drop_proto)
230 {
231     if (0 == vec_len(nhs))
232     {
233         load_balance_path_t *nh;
234
235         /*
236          * we need something for the replicate. so use the drop
237          */
238         vec_add2(nhs, nh, 1);
239
240         nh->path_weight = 1;
241         dpo_copy(&nh->path_dpo, drop_dpo_get(drop_proto));
242     }
243
244     return (nhs);
245 }
246
247 /*
248  * Fill in adjacencies in block based on corresponding
249  * next hop adjacencies.
250  */
251 static void
252 replicate_fill_buckets (replicate_t *rep,
253                         load_balance_path_t *nhs,
254                         dpo_id_t *buckets,
255                         u32 n_buckets)
256 {
257     load_balance_path_t * nh;
258     u16 ii, bucket;
259
260     bucket = 0;
261
262     /*
263      * the next-hops have normalised weights. that means their sum is the number
264      * of buckets we need to fill.
265      */
266     vec_foreach (nh, nhs)
267     {
268         for (ii = 0; ii < nh->path_weight; ii++)
269         {
270             ASSERT(bucket < n_buckets);
271             replicate_set_bucket_i(rep, bucket++, buckets, &nh->path_dpo);
272         }
273     }
274 }
275
276 static inline void
277 replicate_set_n_buckets (replicate_t *rep,
278                          u32 n_buckets)
279 {
280     rep->rep_n_buckets = n_buckets;
281 }
282
283 void
284 replicate_multipath_update (const dpo_id_t *dpo,
285                             load_balance_path_t * next_hops)
286 {
287     load_balance_path_t * nh, * nhs;
288     dpo_id_t *tmp_dpo;
289     u32 ii, n_buckets;
290     replicate_t *rep;
291
292     ASSERT(DPO_REPLICATE == dpo->dpoi_type);
293     rep = replicate_get(dpo->dpoi_index);
294     nhs = replicate_multipath_next_hop_fixup(next_hops,
295                                              rep->rep_proto);
296     n_buckets = vec_len(nhs);
297
298     if (0 == rep->rep_n_buckets)
299     {
300         /*
301          * first time initialisation. no packets inflight, so we can write
302          * at leisure.
303          */
304         replicate_set_n_buckets(rep, n_buckets);
305
306         if (!REP_HAS_INLINE_BUCKETS(rep))
307             vec_validate_aligned(rep->rep_buckets,
308                                  rep->rep_n_buckets - 1,
309                                  CLIB_CACHE_LINE_BYTES);
310
311         replicate_fill_buckets(rep, nhs,
312                                replicate_get_buckets(rep),
313                                n_buckets);
314     }
315     else
316     {
317         /*
318          * This is a modification of an existing replicate.
319          * We need to ensure that packets in flight see a consistent state, that
320          * is the number of reported buckets the REP has
321          * is not more than it actually has. So if the
322          * number of buckets is increasing, we must update the bucket array first,
323          * then the reported number. vice-versa if the number of buckets goes down.
324          */
325         if (n_buckets == rep->rep_n_buckets)
326         {
327             /*
328              * no change in the number of buckets. we can simply fill what
329              * is new over what is old.
330              */
331             replicate_fill_buckets(rep, nhs,
332                                    replicate_get_buckets(rep),
333                                    n_buckets);
334         }
335         else if (n_buckets > rep->rep_n_buckets)
336         {
337             /*
338              * we have more buckets. the old replicate map (if there is one)
339              * will remain valid, i.e. mapping to indices within range, so we
340              * update it last.
341              */
342             if (n_buckets > REP_NUM_INLINE_BUCKETS &&
343                 rep->rep_n_buckets <= REP_NUM_INLINE_BUCKETS)
344             {
345                 /*
346                  * the new increased number of buckets is crossing the threshold
347                  * from the inline storage to out-line. Alloc the outline buckets
348                  * first, then fixup the number. then reset the inlines.
349                  */
350                 ASSERT(NULL == rep->rep_buckets);
351                 vec_validate_aligned(rep->rep_buckets,
352                                      n_buckets - 1,
353                                      CLIB_CACHE_LINE_BYTES);
354
355                 replicate_fill_buckets(rep, nhs,
356                                        rep->rep_buckets,
357                                        n_buckets);
358                 CLIB_MEMORY_BARRIER();
359                 replicate_set_n_buckets(rep, n_buckets);
360
361                 CLIB_MEMORY_BARRIER();
362
363                 for (ii = 0; ii < REP_NUM_INLINE_BUCKETS; ii++)
364                 {
365                     dpo_reset(&rep->rep_buckets_inline[ii]);
366                 }
367             }
368             else
369             {
370                 if (n_buckets <= REP_NUM_INLINE_BUCKETS)
371                 {
372                     /*
373                      * we are not crossing the threshold and it's still inline buckets.
374                      * we can write the new on the old..
375                      */
376                     replicate_fill_buckets(rep, nhs,
377                                            replicate_get_buckets(rep),
378                                            n_buckets);
379                     CLIB_MEMORY_BARRIER();
380                     replicate_set_n_buckets(rep, n_buckets);
381                 }
382                 else
383                 {
384                     /*
385                      * we are not crossing the threshold. We need a new bucket array to
386                      * hold the increased number of choices.
387                      */
388                     dpo_id_t *new_buckets, *old_buckets, *tmp_dpo;
389
390                     new_buckets = NULL;
391                     old_buckets = replicate_get_buckets(rep);
392
393                     vec_validate_aligned(new_buckets,
394                                          n_buckets - 1,
395                                          CLIB_CACHE_LINE_BYTES);
396
397                     replicate_fill_buckets(rep, nhs, new_buckets, n_buckets);
398                     CLIB_MEMORY_BARRIER();
399                     rep->rep_buckets = new_buckets;
400                     CLIB_MEMORY_BARRIER();
401                     replicate_set_n_buckets(rep, n_buckets);
402
403                     vec_foreach(tmp_dpo, old_buckets)
404                     {
405                         dpo_reset(tmp_dpo);
406                     }
407                     vec_free(old_buckets);
408                 }
409             }
410         }
411         else
412         {
413             /*
414              * bucket size shrinkage.
415              */
416             if (n_buckets <= REP_NUM_INLINE_BUCKETS &&
417                 rep->rep_n_buckets > REP_NUM_INLINE_BUCKETS)
418             {
419                 /*
420                  * the new decreased number of buckets is crossing the threshold
421                  * from out-line storage to inline:
422                  *   1 - Fill the inline buckets,
423                  *   2 - fixup the number (and this point the inline buckets are
424                  *       used).
425                  *   3 - free the outline buckets
426                  */
427                 replicate_fill_buckets(rep, nhs,
428                                        rep->rep_buckets_inline,
429                                        n_buckets);
430                 CLIB_MEMORY_BARRIER();
431                 replicate_set_n_buckets(rep, n_buckets);
432                 CLIB_MEMORY_BARRIER();
433
434                 vec_foreach(tmp_dpo, rep->rep_buckets)
435                 {
436                     dpo_reset(tmp_dpo);
437                 }
438                 vec_free(rep->rep_buckets);
439             }
440             else
441             {
442                 /*
443                  * not crossing the threshold.
444                  *  1 - update the number to the smaller size
445                  *  2 - write the new buckets
446                  *  3 - reset those no longer used.
447                  */
448                 dpo_id_t *buckets;
449                 u32 old_n_buckets;
450
451                 old_n_buckets = rep->rep_n_buckets;
452                 buckets = replicate_get_buckets(rep);
453
454                 replicate_set_n_buckets(rep, n_buckets);
455                 CLIB_MEMORY_BARRIER();
456
457                 replicate_fill_buckets(rep, nhs,
458                                        buckets,
459                                        n_buckets);
460
461                 for (ii = n_buckets; ii < old_n_buckets; ii++)
462                 {
463                     dpo_reset(&buckets[ii]);
464                 }
465             }
466         }
467     }
468
469     vec_foreach (nh, nhs)
470     {
471         dpo_reset(&nh->path_dpo);
472     }
473     vec_free(nhs);
474 }
475
476 static void
477 replicate_lock (dpo_id_t *dpo)
478 {
479     replicate_t *rep;
480
481     rep = replicate_get(dpo->dpoi_index);
482
483     rep->rep_locks++;
484 }
485
486 static void
487 replicate_destroy (replicate_t *rep)
488 {
489     dpo_id_t *buckets;
490     int i;
491
492     buckets = replicate_get_buckets(rep);
493
494     for (i = 0; i < rep->rep_n_buckets; i++)
495     {
496         dpo_reset(&buckets[i]);
497     }
498
499     REP_DBG(rep, "destroy");
500     if (!REP_HAS_INLINE_BUCKETS(rep))
501     {
502         vec_free(rep->rep_buckets);
503     }
504
505     pool_put(replicate_pool, rep);
506 }
507
508 static void
509 replicate_unlock (dpo_id_t *dpo)
510 {
511     replicate_t *rep;
512
513     rep = replicate_get(dpo->dpoi_index);
514
515     rep->rep_locks--;
516
517     if (0 == rep->rep_locks)
518     {
519         replicate_destroy(rep);
520     }
521 }
522
523 static void
524 replicate_mem_show (void)
525 {
526     fib_show_memory_usage("replicate",
527                           pool_elts(replicate_pool),
528                           pool_len(replicate_pool),
529                           sizeof(replicate_t));
530 }
531
532 const static dpo_vft_t rep_vft = {
533     .dv_lock = replicate_lock,
534     .dv_unlock = replicate_unlock,
535     .dv_format = format_replicate_dpo,
536     .dv_mem_show = replicate_mem_show,
537 };
538
539 /**
540  * @brief The per-protocol VLIB graph nodes that are assigned to a replicate
541  *        object.
542  *
543  * this means that these graph nodes are ones from which a replicate is the
544  * parent object in the DPO-graph.
545  */
546 const static char* const replicate_ip4_nodes[] =
547 {
548     "ip4-replicate",
549     NULL,
550 };
551 const static char* const replicate_ip6_nodes[] =
552 {
553     "ip6-replicate",
554     NULL,
555 };
556 const static char* const replicate_mpls_nodes[] =
557 {
558     "mpls-replicate",
559     NULL,
560 };
561
562 const static char* const * const replicate_nodes[DPO_PROTO_NUM] =
563 {
564     [DPO_PROTO_IP4]  = replicate_ip4_nodes,
565     [DPO_PROTO_IP6]  = replicate_ip6_nodes,
566     [DPO_PROTO_MPLS] = replicate_mpls_nodes,
567 };
568
569 void
570 replicate_module_init (void)
571 {
572     dpo_register(DPO_REPLICATE, &rep_vft, replicate_nodes);
573 }
574
575 static clib_error_t *
576 replicate_show (vlib_main_t * vm,
577                 unformat_input_t * input,
578                 vlib_cli_command_t * cmd)
579 {
580     index_t repi = INDEX_INVALID;
581
582     while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
583     {
584         if (unformat (input, "%d", &repi))
585             ;
586         else
587             break;
588     }
589
590     if (INDEX_INVALID != repi)
591     {
592         vlib_cli_output (vm, "%U", format_replicate, repi,
593                          REPLICATE_FORMAT_DETAIL);
594     }
595     else
596     {
597         replicate_t *rep;
598
599         pool_foreach(rep, replicate_pool,
600         ({
601             vlib_cli_output (vm, "%U", format_replicate,
602                              replicate_get_index(rep),
603                              REPLICATE_FORMAT_NONE);
604         }));
605     }
606
607     return 0;
608 }
609
610 VLIB_CLI_COMMAND (replicate_show_command, static) = {
611     .path = "show replicate",
612     .short_help = "show replicate [<index>]",
613     .function = replicate_show,
614 };
615
616 typedef struct replicate_trace_t_
617 {
618     index_t rep_index;
619     dpo_id_t dpo;
620 } replicate_trace_t;
621
622 static uword
623 replicate_inline (vlib_main_t * vm,
624                   vlib_node_runtime_t * node,
625                   vlib_frame_t * frame)
626 {
627     vlib_combined_counter_main_t * cm = &replicate_main.repm_counters;
628     replicate_main_t * rm = &replicate_main;
629     u32 n_left_from, * from, * to_next, next_index;
630     u32 thread_index = vlib_get_thread_index();
631
632     from = vlib_frame_vector_args (frame);
633     n_left_from = frame->n_vectors;
634     next_index = node->cached_next_index;
635   
636     while (n_left_from > 0)
637     {
638         u32 n_left_to_next;
639
640         vlib_get_next_frame (vm, node, next_index,
641                              to_next, n_left_to_next);
642
643         while (n_left_from > 0 && n_left_to_next > 0)
644         {
645             u32 next0, ci0, bi0, bucket, repi0;
646             const replicate_t *rep0;
647             vlib_buffer_t * b0, *c0;
648             const dpo_id_t *dpo0;
649             u8 num_cloned;
650
651             bi0 = from[0];
652             from += 1;
653             n_left_from -= 1;
654
655             b0 = vlib_get_buffer (vm, bi0);
656             repi0 = vnet_buffer (b0)->ip.adj_index[VLIB_TX];
657             rep0 = replicate_get(repi0);
658
659             vlib_increment_combined_counter(
660                 cm, thread_index, repi0, 1,
661                 vlib_buffer_length_in_chain(vm, b0));
662
663             vec_validate (rm->clones[thread_index], rep0->rep_n_buckets - 1);
664
665             num_cloned = vlib_buffer_clone (vm, bi0, rm->clones[thread_index], rep0->rep_n_buckets, 128);
666
667             if (num_cloned != rep0->rep_n_buckets)
668               {
669                 vlib_node_increment_counter
670                   (vm, node->node_index,
671                    REPLICATE_DPO_ERROR_BUFFER_ALLOCATION_FAILURE, 1);
672               }
673
674             for (bucket = 0; bucket < num_cloned; bucket++)
675             {
676                 ci0 = rm->clones[thread_index][bucket];
677                 c0 = vlib_get_buffer(vm, ci0);
678
679                 to_next[0] = ci0;
680                 to_next += 1;
681                 n_left_to_next -= 1;
682
683                 dpo0 = replicate_get_bucket_i(rep0, bucket);
684                 next0 = dpo0->dpoi_next_node;
685                 vnet_buffer (c0)->ip.adj_index[VLIB_TX] = dpo0->dpoi_index;
686
687                 if (PREDICT_FALSE(c0->flags & VLIB_BUFFER_IS_TRACED))
688                 {
689                     replicate_trace_t *t = vlib_add_trace (vm, node, c0, sizeof (*t));
690                     t->rep_index = repi0;
691                     t->dpo = *dpo0;
692                 }
693
694                 vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
695                                                  to_next, n_left_to_next,
696                                                  ci0, next0);
697                 if (PREDICT_FALSE (n_left_to_next == 0))
698                   {
699                     vlib_put_next_frame (vm, node, next_index, n_left_to_next);
700                     vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
701                   }
702             }
703             vec_reset_length (rm->clones[thread_index]);
704         }
705
706         vlib_put_next_frame (vm, node, next_index, n_left_to_next);
707     }
708
709     return frame->n_vectors;
710 }
711
712 static u8 *
713 format_replicate_trace (u8 * s, va_list * args)
714 {
715   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
716   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
717   replicate_trace_t *t = va_arg (*args, replicate_trace_t *);
718
719   s = format (s, "replicate: %d via %U",
720               t->rep_index,
721               format_dpo_id, &t->dpo);
722   return s;
723 }
724
725 static uword
726 ip4_replicate (vlib_main_t * vm,
727                vlib_node_runtime_t * node,
728                vlib_frame_t * frame)
729 {
730     return (replicate_inline (vm, node, frame));
731 }
732
733 /**
734  * @brief
735  */
736 VLIB_REGISTER_NODE (ip4_replicate_node) = {
737   .function = ip4_replicate,
738   .name = "ip4-replicate",
739   .vector_size = sizeof (u32),
740
741   .n_errors = ARRAY_LEN(replicate_dpo_error_strings),
742   .error_strings = replicate_dpo_error_strings,
743
744   .format_trace = format_replicate_trace,
745   .n_next_nodes = 1,
746   .next_nodes = {
747       [0] = "error-drop",
748   },
749 };
750
751 static uword
752 ip6_replicate (vlib_main_t * vm,
753                vlib_node_runtime_t * node,
754                vlib_frame_t * frame)
755 {
756     return (replicate_inline (vm, node, frame));
757 }
758
759 /**
760  * @brief
761  */
762 VLIB_REGISTER_NODE (ip6_replicate_node) = {
763   .function = ip6_replicate,
764   .name = "ip6-replicate",
765   .vector_size = sizeof (u32),
766
767   .n_errors = ARRAY_LEN(replicate_dpo_error_strings),
768   .error_strings = replicate_dpo_error_strings,
769
770   .format_trace = format_replicate_trace,
771   .n_next_nodes = 1,
772   .next_nodes = {
773       [0] = "error-drop",
774   },
775 };
776
777 clib_error_t *
778 replicate_dpo_init (vlib_main_t * vm)
779 {
780   replicate_main_t * rm = &replicate_main;
781
782   vec_validate (rm->clones, vlib_num_workers());
783
784   return 0;
785 }
786
787 VLIB_INIT_FUNCTION (replicate_dpo_init);