BIER: fix support for longer bit-string lengths
[vpp.git] / src / vnet / dpo / replicate_dpo.c
1 /*
2  * Copyright (c) 2016 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/ip/lookup.h>
17 #include <vnet/dpo/replicate_dpo.h>
18 #include <vnet/dpo/drop_dpo.h>
19 #include <vnet/adj/adj.h>
20 #include <vnet/mpls/mpls_types.h>
21
22 #undef REP_DEBUG
23
24 #ifdef REP_DEBUG
25 #define REP_DBG(_rep, _fmt, _args...)                                   \
26 {                                                                       \
27     u8* _tmp =NULL;                                                     \
28     clib_warning("rep:[%s]:" _fmt,                                      \
29                  replicate_format(replicate_get_index((_rep)),          \
30                                   0, _tmp),                             \
31                  ##_args);                                              \
32     vec_free(_tmp);                                                     \
33 }
34 #else
35 #define REP_DBG(_p, _fmt, _args...)
36 #endif
37
38 #define foreach_replicate_dpo_error                       \
39 _(BUFFER_ALLOCATION_FAILURE, "Buffer Allocation Failure")
40
41 typedef enum {
42 #define _(sym,str) REPLICATE_DPO_ERROR_##sym,
43   foreach_replicate_dpo_error
44 #undef _
45   REPLICATE_DPO_N_ERROR,
46 } replicate_dpo_error_t;
47
48 static char * replicate_dpo_error_strings[] = {
49 #define _(sym,string) string,
50   foreach_replicate_dpo_error
51 #undef _
52 };
53
54 /**
55  * Pool of all DPOs. It's not static so the DP can have fast access
56  */
57 replicate_t *replicate_pool;
58
59 /**
60  * The one instance of replicate main
61  */
62 replicate_main_t replicate_main;
63
64 static inline index_t
65 replicate_get_index (const replicate_t *rep)
66 {
67     return (rep - replicate_pool);
68 }
69
70 static inline dpo_id_t*
71 replicate_get_buckets (replicate_t *rep)
72 {
73     if (REP_HAS_INLINE_BUCKETS(rep))
74     {
75         return (rep->rep_buckets_inline);
76     }
77     else
78     {
79         return (rep->rep_buckets);
80     }
81 }
82
83 static replicate_t *
84 replicate_alloc_i (void)
85 {
86     replicate_t *rep;
87
88     pool_get_aligned(replicate_pool, rep, CLIB_CACHE_LINE_BYTES);
89     memset(rep, 0, sizeof(*rep));
90
91     vlib_validate_combined_counter(&(replicate_main.repm_counters),
92                                    replicate_get_index(rep));
93     vlib_zero_combined_counter(&(replicate_main.repm_counters),
94                                replicate_get_index(rep));
95
96     return (rep);
97 }
98
99 static u8*
100 replicate_format (index_t repi,
101                   replicate_format_flags_t flags,
102                   u32 indent,
103                   u8 *s)
104 {
105     vlib_counter_t to;
106     replicate_t *rep;
107     dpo_id_t *buckets;
108     u32 i;
109
110     repi &= ~MPLS_IS_REPLICATE;
111     rep = replicate_get(repi);
112     vlib_get_combined_counter(&(replicate_main.repm_counters), repi, &to);
113     buckets = replicate_get_buckets(rep);
114
115     s = format(s, "%U: ", format_dpo_type, DPO_REPLICATE);
116     s = format(s, "[index:%d buckets:%d ", repi, rep->rep_n_buckets);
117     s = format(s, "to:[%Ld:%Ld]]", to.packets, to.bytes);
118
119     for (i = 0; i < rep->rep_n_buckets; i++)
120     {
121         s = format(s, "\n%U", format_white_space, indent+2);
122         s = format(s, "[%d]", i);
123         s = format(s, " %U", format_dpo_id, &buckets[i], indent+6);
124     }
125     return (s);
126 }
127
128 u8*
129 format_replicate (u8 * s, va_list * args)
130 {
131     index_t repi = va_arg(*args, index_t);
132     replicate_format_flags_t flags = va_arg(*args, replicate_format_flags_t);
133
134     return (replicate_format(repi, flags, 0, s));
135 }
136 static u8*
137 format_replicate_dpo (u8 * s, va_list * args)
138 {
139     index_t repi = va_arg(*args, index_t);
140     u32 indent = va_arg(*args, u32);
141
142     return (replicate_format(repi, REPLICATE_FORMAT_DETAIL, indent, s));
143 }
144
145
146 static replicate_t *
147 replicate_create_i (u32 num_buckets,
148                     dpo_proto_t rep_proto)
149 {
150     replicate_t *rep;
151
152     rep = replicate_alloc_i();
153     rep->rep_n_buckets = num_buckets;
154     rep->rep_proto = rep_proto;
155
156     if (!REP_HAS_INLINE_BUCKETS(rep))
157     {
158         vec_validate_aligned(rep->rep_buckets,
159                              rep->rep_n_buckets - 1,
160                              CLIB_CACHE_LINE_BYTES);
161     }
162
163     REP_DBG(rep, "create");
164
165     return (rep);
166 }
167
168 index_t
169 replicate_create (u32 n_buckets,
170                   dpo_proto_t rep_proto)
171 {
172     return (replicate_get_index(replicate_create_i(n_buckets, rep_proto)));
173 }
174
175 static inline void
176 replicate_set_bucket_i (replicate_t *rep,
177                         u32 bucket,
178                         dpo_id_t *buckets,
179                         const dpo_id_t *next)
180 {
181     dpo_stack(DPO_REPLICATE, rep->rep_proto, &buckets[bucket], next);
182 }
183
184 void
185 replicate_set_bucket (index_t repi,
186                       u32 bucket,
187                       const dpo_id_t *next)
188 {
189     replicate_t *rep;
190     dpo_id_t *buckets;
191
192     repi &= ~MPLS_IS_REPLICATE;
193     rep = replicate_get(repi);
194     buckets = replicate_get_buckets(rep);
195
196     ASSERT(bucket < rep->rep_n_buckets);
197
198     replicate_set_bucket_i(rep, bucket, buckets, next);
199 }
200
201 int
202 replicate_is_drop (const dpo_id_t *dpo)
203 {
204     replicate_t *rep;
205     index_t repi;
206
207     if (DPO_REPLICATE != dpo->dpoi_type)
208         return (0);
209
210     repi = dpo->dpoi_index & ~MPLS_IS_REPLICATE;
211     rep = replicate_get(repi);
212
213     if (1 == rep->rep_n_buckets)
214     {
215         return (dpo_is_drop(replicate_get_bucket_i(rep, 0)));
216     }
217     return (0);
218 }
219
220 const dpo_id_t *
221 replicate_get_bucket (index_t repi,
222                       u32 bucket)
223 {
224     replicate_t *rep;
225
226     repi &= ~MPLS_IS_REPLICATE;
227     rep = replicate_get(repi);
228
229     return (replicate_get_bucket_i(rep, bucket));
230 }
231
232
233 static load_balance_path_t *
234 replicate_multipath_next_hop_fixup (load_balance_path_t *nhs,
235                                     dpo_proto_t drop_proto)
236 {
237     if (0 == vec_len(nhs))
238     {
239         load_balance_path_t *nh;
240
241         /*
242          * we need something for the replicate. so use the drop
243          */
244         vec_add2(nhs, nh, 1);
245
246         nh->path_weight = 1;
247         dpo_copy(&nh->path_dpo, drop_dpo_get(drop_proto));
248     }
249
250     return (nhs);
251 }
252
253 /*
254  * Fill in adjacencies in block based on corresponding
255  * next hop adjacencies.
256  */
257 static void
258 replicate_fill_buckets (replicate_t *rep,
259                         load_balance_path_t *nhs,
260                         dpo_id_t *buckets,
261                         u32 n_buckets)
262 {
263     load_balance_path_t * nh;
264     u16 bucket;
265
266     bucket = 0;
267
268     /*
269      * the next-hops have normalised weights. that means their sum is the number
270      * of buckets we need to fill.
271      */
272     vec_foreach (nh, nhs)
273     {
274         ASSERT(bucket < n_buckets);
275         replicate_set_bucket_i(rep, bucket++, buckets, &nh->path_dpo);
276     }
277 }
278
279 static inline void
280 replicate_set_n_buckets (replicate_t *rep,
281                          u32 n_buckets)
282 {
283     rep->rep_n_buckets = n_buckets;
284 }
285
286 void
287 replicate_multipath_update (const dpo_id_t *dpo,
288                             load_balance_path_t * next_hops)
289 {
290     load_balance_path_t * nh, * nhs;
291     dpo_id_t *tmp_dpo;
292     u32 ii, n_buckets;
293     replicate_t *rep;
294     index_t repi;
295
296     ASSERT(DPO_REPLICATE == dpo->dpoi_type);
297     repi = dpo->dpoi_index & ~MPLS_IS_REPLICATE;
298     rep = replicate_get(repi);
299     nhs = replicate_multipath_next_hop_fixup(next_hops,
300                                              rep->rep_proto);
301     n_buckets = vec_len(nhs);
302
303     if (0 == rep->rep_n_buckets)
304     {
305         /*
306          * first time initialisation. no packets inflight, so we can write
307          * at leisure.
308          */
309         replicate_set_n_buckets(rep, n_buckets);
310
311         if (!REP_HAS_INLINE_BUCKETS(rep))
312             vec_validate_aligned(rep->rep_buckets,
313                                  rep->rep_n_buckets - 1,
314                                  CLIB_CACHE_LINE_BYTES);
315
316         replicate_fill_buckets(rep, nhs,
317                                replicate_get_buckets(rep),
318                                n_buckets);
319     }
320     else
321     {
322         /*
323          * This is a modification of an existing replicate.
324          * We need to ensure that packets in flight see a consistent state, that
325          * is the number of reported buckets the REP has
326          * is not more than it actually has. So if the
327          * number of buckets is increasing, we must update the bucket array first,
328          * then the reported number. vice-versa if the number of buckets goes down.
329          */
330         if (n_buckets == rep->rep_n_buckets)
331         {
332             /*
333              * no change in the number of buckets. we can simply fill what
334              * is new over what is old.
335              */
336             replicate_fill_buckets(rep, nhs,
337                                    replicate_get_buckets(rep),
338                                    n_buckets);
339         }
340         else if (n_buckets > rep->rep_n_buckets)
341         {
342             /*
343              * we have more buckets. the old replicate map (if there is one)
344              * will remain valid, i.e. mapping to indices within range, so we
345              * update it last.
346              */
347             if (n_buckets > REP_NUM_INLINE_BUCKETS &&
348                 rep->rep_n_buckets <= REP_NUM_INLINE_BUCKETS)
349             {
350                 /*
351                  * the new increased number of buckets is crossing the threshold
352                  * from the inline storage to out-line. Alloc the outline buckets
353                  * first, then fixup the number. then reset the inlines.
354                  */
355                 ASSERT(NULL == rep->rep_buckets);
356                 vec_validate_aligned(rep->rep_buckets,
357                                      n_buckets - 1,
358                                      CLIB_CACHE_LINE_BYTES);
359
360                 replicate_fill_buckets(rep, nhs,
361                                        rep->rep_buckets,
362                                        n_buckets);
363                 CLIB_MEMORY_BARRIER();
364                 replicate_set_n_buckets(rep, n_buckets);
365
366                 CLIB_MEMORY_BARRIER();
367
368                 for (ii = 0; ii < REP_NUM_INLINE_BUCKETS; ii++)
369                 {
370                     dpo_reset(&rep->rep_buckets_inline[ii]);
371                 }
372             }
373             else
374             {
375                 if (n_buckets <= REP_NUM_INLINE_BUCKETS)
376                 {
377                     /*
378                      * we are not crossing the threshold and it's still inline buckets.
379                      * we can write the new on the old..
380                      */
381                     replicate_fill_buckets(rep, nhs,
382                                            replicate_get_buckets(rep),
383                                            n_buckets);
384                     CLIB_MEMORY_BARRIER();
385                     replicate_set_n_buckets(rep, n_buckets);
386                 }
387                 else
388                 {
389                     /*
390                      * we are not crossing the threshold. We need a new bucket array to
391                      * hold the increased number of choices.
392                      */
393                     dpo_id_t *new_buckets, *old_buckets, *tmp_dpo;
394
395                     new_buckets = NULL;
396                     old_buckets = replicate_get_buckets(rep);
397
398                     vec_validate_aligned(new_buckets,
399                                          n_buckets - 1,
400                                          CLIB_CACHE_LINE_BYTES);
401
402                     replicate_fill_buckets(rep, nhs, new_buckets, n_buckets);
403                     CLIB_MEMORY_BARRIER();
404                     rep->rep_buckets = new_buckets;
405                     CLIB_MEMORY_BARRIER();
406                     replicate_set_n_buckets(rep, n_buckets);
407
408                     vec_foreach(tmp_dpo, old_buckets)
409                     {
410                         dpo_reset(tmp_dpo);
411                     }
412                     vec_free(old_buckets);
413                 }
414             }
415         }
416         else
417         {
418             /*
419              * bucket size shrinkage.
420              */
421             if (n_buckets <= REP_NUM_INLINE_BUCKETS &&
422                 rep->rep_n_buckets > REP_NUM_INLINE_BUCKETS)
423             {
424                 /*
425                  * the new decreased number of buckets is crossing the threshold
426                  * from out-line storage to inline:
427                  *   1 - Fill the inline buckets,
428                  *   2 - fixup the number (and this point the inline buckets are
429                  *       used).
430                  *   3 - free the outline buckets
431                  */
432                 replicate_fill_buckets(rep, nhs,
433                                        rep->rep_buckets_inline,
434                                        n_buckets);
435                 CLIB_MEMORY_BARRIER();
436                 replicate_set_n_buckets(rep, n_buckets);
437                 CLIB_MEMORY_BARRIER();
438
439                 vec_foreach(tmp_dpo, rep->rep_buckets)
440                 {
441                     dpo_reset(tmp_dpo);
442                 }
443                 vec_free(rep->rep_buckets);
444             }
445             else
446             {
447                 /*
448                  * not crossing the threshold.
449                  *  1 - update the number to the smaller size
450                  *  2 - write the new buckets
451                  *  3 - reset those no longer used.
452                  */
453                 dpo_id_t *buckets;
454                 u32 old_n_buckets;
455
456                 old_n_buckets = rep->rep_n_buckets;
457                 buckets = replicate_get_buckets(rep);
458
459                 replicate_set_n_buckets(rep, n_buckets);
460                 CLIB_MEMORY_BARRIER();
461
462                 replicate_fill_buckets(rep, nhs,
463                                        buckets,
464                                        n_buckets);
465
466                 for (ii = n_buckets; ii < old_n_buckets; ii++)
467                 {
468                     dpo_reset(&buckets[ii]);
469                 }
470             }
471         }
472     }
473
474     vec_foreach (nh, nhs)
475     {
476         dpo_reset(&nh->path_dpo);
477     }
478     vec_free(nhs);
479 }
480
481 static void
482 replicate_lock (dpo_id_t *dpo)
483 {
484     replicate_t *rep;
485
486     rep = replicate_get(dpo->dpoi_index);
487
488     rep->rep_locks++;
489 }
490
491 static void
492 replicate_destroy (replicate_t *rep)
493 {
494     dpo_id_t *buckets;
495     int i;
496
497     buckets = replicate_get_buckets(rep);
498
499     for (i = 0; i < rep->rep_n_buckets; i++)
500     {
501         dpo_reset(&buckets[i]);
502     }
503
504     REP_DBG(rep, "destroy");
505     if (!REP_HAS_INLINE_BUCKETS(rep))
506     {
507         vec_free(rep->rep_buckets);
508     }
509
510     pool_put(replicate_pool, rep);
511 }
512
513 static void
514 replicate_unlock (dpo_id_t *dpo)
515 {
516     replicate_t *rep;
517
518     rep = replicate_get(dpo->dpoi_index);
519
520     rep->rep_locks--;
521
522     if (0 == rep->rep_locks)
523     {
524         replicate_destroy(rep);
525     }
526 }
527
528 static void
529 replicate_mem_show (void)
530 {
531     fib_show_memory_usage("replicate",
532                           pool_elts(replicate_pool),
533                           pool_len(replicate_pool),
534                           sizeof(replicate_t));
535 }
536
537 const static dpo_vft_t rep_vft = {
538     .dv_lock = replicate_lock,
539     .dv_unlock = replicate_unlock,
540     .dv_format = format_replicate_dpo,
541     .dv_mem_show = replicate_mem_show,
542 };
543
544 /**
545  * @brief The per-protocol VLIB graph nodes that are assigned to a replicate
546  *        object.
547  *
548  * this means that these graph nodes are ones from which a replicate is the
549  * parent object in the DPO-graph.
550  */
551 const static char* const replicate_ip4_nodes[] =
552 {
553     "ip4-replicate",
554     NULL,
555 };
556 const static char* const replicate_ip6_nodes[] =
557 {
558     "ip6-replicate",
559     NULL,
560 };
561 const static char* const replicate_mpls_nodes[] =
562 {
563     "mpls-replicate",
564     NULL,
565 };
566
567 const static char* const * const replicate_nodes[DPO_PROTO_NUM] =
568 {
569     [DPO_PROTO_IP4]  = replicate_ip4_nodes,
570     [DPO_PROTO_IP6]  = replicate_ip6_nodes,
571     [DPO_PROTO_MPLS] = replicate_mpls_nodes,
572 };
573
574 void
575 replicate_module_init (void)
576 {
577     dpo_register(DPO_REPLICATE, &rep_vft, replicate_nodes);
578 }
579
580 static clib_error_t *
581 replicate_show (vlib_main_t * vm,
582                 unformat_input_t * input,
583                 vlib_cli_command_t * cmd)
584 {
585     index_t repi = INDEX_INVALID;
586
587     while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
588     {
589         if (unformat (input, "%d", &repi))
590             ;
591         else
592             break;
593     }
594
595     if (INDEX_INVALID != repi)
596     {
597         vlib_cli_output (vm, "%U", format_replicate, repi,
598                          REPLICATE_FORMAT_DETAIL);
599     }
600     else
601     {
602         replicate_t *rep;
603
604         pool_foreach(rep, replicate_pool,
605         ({
606             vlib_cli_output (vm, "%U", format_replicate,
607                              replicate_get_index(rep),
608                              REPLICATE_FORMAT_NONE);
609         }));
610     }
611
612     return 0;
613 }
614
615 VLIB_CLI_COMMAND (replicate_show_command, static) = {
616     .path = "show replicate",
617     .short_help = "show replicate [<index>]",
618     .function = replicate_show,
619 };
620
621 typedef struct replicate_trace_t_
622 {
623     index_t rep_index;
624     dpo_id_t dpo;
625 } replicate_trace_t;
626
627 static uword
628 replicate_inline (vlib_main_t * vm,
629                   vlib_node_runtime_t * node,
630                   vlib_frame_t * frame)
631 {
632     vlib_combined_counter_main_t * cm = &replicate_main.repm_counters;
633     replicate_main_t * rm = &replicate_main;
634     u32 n_left_from, * from, * to_next, next_index;
635     u32 thread_index = vlib_get_thread_index();
636
637     from = vlib_frame_vector_args (frame);
638     n_left_from = frame->n_vectors;
639     next_index = node->cached_next_index;
640   
641     while (n_left_from > 0)
642     {
643         u32 n_left_to_next;
644
645         vlib_get_next_frame (vm, node, next_index,
646                              to_next, n_left_to_next);
647
648         while (n_left_from > 0 && n_left_to_next > 0)
649         {
650             u32 next0, ci0, bi0, bucket, repi0;
651             const replicate_t *rep0;
652             vlib_buffer_t * b0, *c0;
653             const dpo_id_t *dpo0;
654             u8 num_cloned;
655
656             bi0 = from[0];
657             from += 1;
658             n_left_from -= 1;
659
660             b0 = vlib_get_buffer (vm, bi0);
661             repi0 = vnet_buffer (b0)->ip.adj_index[VLIB_TX];
662             rep0 = replicate_get(repi0);
663
664             vlib_increment_combined_counter(
665                 cm, thread_index, repi0, 1,
666                 vlib_buffer_length_in_chain(vm, b0));
667
668             vec_validate (rm->clones[thread_index], rep0->rep_n_buckets - 1);
669
670             num_cloned = vlib_buffer_clone (vm, bi0, rm->clones[thread_index],
671                                             rep0->rep_n_buckets, 128);
672
673             if (num_cloned != rep0->rep_n_buckets)
674               {
675                 vlib_node_increment_counter
676                   (vm, node->node_index,
677                    REPLICATE_DPO_ERROR_BUFFER_ALLOCATION_FAILURE, 1);
678               }
679
680             for (bucket = 0; bucket < num_cloned; bucket++)
681             {
682                 ci0 = rm->clones[thread_index][bucket];
683                 c0 = vlib_get_buffer(vm, ci0);
684
685                 to_next[0] = ci0;
686                 to_next += 1;
687                 n_left_to_next -= 1;
688
689                 dpo0 = replicate_get_bucket_i(rep0, bucket);
690                 next0 = dpo0->dpoi_next_node;
691                 vnet_buffer (c0)->ip.adj_index[VLIB_TX] = dpo0->dpoi_index;
692
693                 if (PREDICT_FALSE(c0->flags & VLIB_BUFFER_IS_TRACED))
694                 {
695                     replicate_trace_t *t;
696
697                     vlib_trace_buffer (vm, node, next0, c0, 0);
698                     t = vlib_add_trace (vm, node, c0, sizeof (*t));
699                     t->rep_index = repi0;
700                     t->dpo = *dpo0;
701                 }
702
703                 vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
704                                                  to_next, n_left_to_next,
705                                                  ci0, next0);
706                 if (PREDICT_FALSE (n_left_to_next == 0))
707                   {
708                     vlib_put_next_frame (vm, node, next_index, n_left_to_next);
709                     vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
710                   }
711             }
712             vec_reset_length (rm->clones[thread_index]);
713         }
714
715         vlib_put_next_frame (vm, node, next_index, n_left_to_next);
716     }
717
718     return frame->n_vectors;
719 }
720
721 static u8 *
722 format_replicate_trace (u8 * s, va_list * args)
723 {
724   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
725   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
726   replicate_trace_t *t = va_arg (*args, replicate_trace_t *);
727
728   s = format (s, "replicate: %d via %U",
729               t->rep_index,
730               format_dpo_id, &t->dpo, 0);
731   return s;
732 }
733
734 static uword
735 ip4_replicate (vlib_main_t * vm,
736                vlib_node_runtime_t * node,
737                vlib_frame_t * frame)
738 {
739     return (replicate_inline (vm, node, frame));
740 }
741
742 /**
743  * @brief IP4 replication node
744  */
745 VLIB_REGISTER_NODE (ip4_replicate_node) = {
746   .function = ip4_replicate,
747   .name = "ip4-replicate",
748   .vector_size = sizeof (u32),
749
750   .n_errors = ARRAY_LEN(replicate_dpo_error_strings),
751   .error_strings = replicate_dpo_error_strings,
752
753   .format_trace = format_replicate_trace,
754   .n_next_nodes = 1,
755   .next_nodes = {
756       [0] = "ip4-drop",
757   },
758 };
759
760 static uword
761 ip6_replicate (vlib_main_t * vm,
762                vlib_node_runtime_t * node,
763                vlib_frame_t * frame)
764 {
765     return (replicate_inline (vm, node, frame));
766 }
767
768 /**
769  * @brief IPv6 replication node
770  */
771 VLIB_REGISTER_NODE (ip6_replicate_node) = {
772   .function = ip6_replicate,
773   .name = "ip6-replicate",
774   .vector_size = sizeof (u32),
775
776   .n_errors = ARRAY_LEN(replicate_dpo_error_strings),
777   .error_strings = replicate_dpo_error_strings,
778
779   .format_trace = format_replicate_trace,
780   .n_next_nodes = 1,
781   .next_nodes = {
782       [0] = "ip6-drop",
783   },
784 };
785
786 static uword
787 mpls_replicate (vlib_main_t * vm,
788                 vlib_node_runtime_t * node,
789                 vlib_frame_t * frame)
790 {
791     return (replicate_inline (vm, node, frame));
792 }
793
794 /**
795  * @brief MPLS replication node
796  */
797 VLIB_REGISTER_NODE (mpls_replicate_node) = {
798   .function = mpls_replicate,
799   .name = "mpls-replicate",
800   .vector_size = sizeof (u32),
801
802   .n_errors = ARRAY_LEN(replicate_dpo_error_strings),
803   .error_strings = replicate_dpo_error_strings,
804
805   .format_trace = format_replicate_trace,
806   .n_next_nodes = 1,
807   .next_nodes = {
808       [0] = "mpls-drop",
809   },
810 };
811
812 clib_error_t *
813 replicate_dpo_init (vlib_main_t * vm)
814 {
815   replicate_main_t * rm = &replicate_main;
816
817   vec_validate (rm->clones, vlib_num_workers());
818
819   return 0;
820 }
821
822 VLIB_INIT_FUNCTION (replicate_dpo_init);