9fdb9a05071517bb0214de649a2e8e5de24ff8f9
[vpp.git] / src / vnet / dpo / replicate_dpo.c
1 /*
2  * Copyright (c) 2016 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/ip/lookup.h>
17 #include <vnet/dpo/replicate_dpo.h>
18 #include <vnet/dpo/drop_dpo.h>
19 #include <vnet/adj/adj.h>
20 #include <vnet/mpls/mpls_types.h>
21
22 #undef REP_DEBUG
23
24 #ifdef REP_DEBUG
25 #define REP_DBG(_rep, _fmt, _args...)                                   \
26 {                                                                       \
27     u8* _tmp =NULL;                                                     \
28     clib_warning("rep:[%s]:" _fmt,                                      \
29                  replicate_format(replicate_get_index((_rep)),          \
30                                   0, _tmp),                             \
31                  ##_args);                                              \
32     vec_free(_tmp);                                                     \
33 }
34 #else
35 #define REP_DBG(_p, _fmt, _args...)
36 #endif
37
38 #define foreach_replicate_dpo_error                       \
39 _(BUFFER_ALLOCATION_FAILURE, "Buffer Allocation Failure")
40
41 typedef enum {
42 #define _(sym,str) REPLICATE_DPO_ERROR_##sym,
43   foreach_replicate_dpo_error
44 #undef _
45   REPLICATE_DPO_N_ERROR,
46 } replicate_dpo_error_t;
47
48 static char * replicate_dpo_error_strings[] = {
49 #define _(sym,string) string,
50   foreach_replicate_dpo_error
51 #undef _
52 };
53
54 /**
55  * Pool of all DPOs. It's not static so the DP can have fast access
56  */
57 replicate_t *replicate_pool;
58
59 /**
60  * The one instance of replicate main
61  */
62 replicate_main_t replicate_main;
63
64 static inline index_t
65 replicate_get_index (const replicate_t *rep)
66 {
67     return (rep - replicate_pool);
68 }
69
70 static inline dpo_id_t*
71 replicate_get_buckets (replicate_t *rep)
72 {
73     if (REP_HAS_INLINE_BUCKETS(rep))
74     {
75         return (rep->rep_buckets_inline);
76     }
77     else
78     {
79         return (rep->rep_buckets);
80     }
81 }
82
83 static replicate_t *
84 replicate_alloc_i (void)
85 {
86     replicate_t *rep;
87
88     pool_get_aligned(replicate_pool, rep, CLIB_CACHE_LINE_BYTES);
89     memset(rep, 0, sizeof(*rep));
90
91     vlib_validate_combined_counter(&(replicate_main.repm_counters),
92                                    replicate_get_index(rep));
93     vlib_zero_combined_counter(&(replicate_main.repm_counters),
94                                replicate_get_index(rep));
95
96     return (rep);
97 }
98
99 static u8*
100 replicate_format (index_t repi,
101                   replicate_format_flags_t flags,
102                   u32 indent,
103                   u8 *s)
104 {
105     vlib_counter_t to;
106     replicate_t *rep;
107     dpo_id_t *buckets;
108     u32 i;
109
110     repi &= ~MPLS_IS_REPLICATE;
111     rep = replicate_get(repi);
112     vlib_get_combined_counter(&(replicate_main.repm_counters), repi, &to);
113     buckets = replicate_get_buckets(rep);
114
115     s = format(s, "%U: ", format_dpo_type, DPO_REPLICATE);
116     s = format(s, "[index:%d buckets:%d ", repi, rep->rep_n_buckets);
117     s = format(s, "to:[%Ld:%Ld]]", to.packets, to.bytes);
118
119     for (i = 0; i < rep->rep_n_buckets; i++)
120     {
121         s = format(s, "\n%U", format_white_space, indent+2);
122         s = format(s, "[%d]", i);
123         s = format(s, " %U", format_dpo_id, &buckets[i], indent+6);
124     }
125     return (s);
126 }
127
128 u8*
129 format_replicate (u8 * s, va_list * args)
130 {
131     index_t repi = va_arg(*args, index_t);
132     replicate_format_flags_t flags = va_arg(*args, replicate_format_flags_t);
133
134     return (replicate_format(repi, flags, 0, s));
135 }
136 static u8*
137 format_replicate_dpo (u8 * s, va_list * args)
138 {
139     index_t repi = va_arg(*args, index_t);
140     u32 indent = va_arg(*args, u32);
141
142     return (replicate_format(repi, REPLICATE_FORMAT_DETAIL, indent, s));
143 }
144
145
146 static replicate_t *
147 replicate_create_i (u32 num_buckets,
148                     dpo_proto_t rep_proto)
149 {
150     replicate_t *rep;
151
152     rep = replicate_alloc_i();
153     rep->rep_n_buckets = num_buckets;
154     rep->rep_proto = rep_proto;
155
156     if (!REP_HAS_INLINE_BUCKETS(rep))
157     {
158         vec_validate_aligned(rep->rep_buckets,
159                              rep->rep_n_buckets - 1,
160                              CLIB_CACHE_LINE_BYTES);
161     }
162
163     REP_DBG(rep, "create");
164
165     return (rep);
166 }
167
168 index_t
169 replicate_create (u32 n_buckets,
170                   dpo_proto_t rep_proto)
171 {
172     return (replicate_get_index(replicate_create_i(n_buckets, rep_proto)));
173 }
174
175 static inline void
176 replicate_set_bucket_i (replicate_t *rep,
177                         u32 bucket,
178                         dpo_id_t *buckets,
179                         const dpo_id_t *next)
180 {
181     dpo_stack(DPO_REPLICATE, rep->rep_proto, &buckets[bucket], next);
182 }
183
184 void
185 replicate_set_bucket (index_t repi,
186                       u32 bucket,
187                       const dpo_id_t *next)
188 {
189     replicate_t *rep;
190     dpo_id_t *buckets;
191
192     repi &= ~MPLS_IS_REPLICATE;
193     rep = replicate_get(repi);
194     buckets = replicate_get_buckets(rep);
195
196     ASSERT(bucket < rep->rep_n_buckets);
197
198     replicate_set_bucket_i(rep, bucket, buckets, next);
199 }
200
201 int
202 replicate_is_drop (const dpo_id_t *dpo)
203 {
204     replicate_t *rep;
205     index_t repi;
206
207     if (DPO_REPLICATE != dpo->dpoi_type)
208         return (0);
209
210     repi = dpo->dpoi_index & ~MPLS_IS_REPLICATE;
211     rep = replicate_get(repi);
212
213     if (1 == rep->rep_n_buckets)
214     {
215         return (dpo_is_drop(replicate_get_bucket_i(rep, 0)));
216     }
217     return (0);
218 }
219
220 const dpo_id_t *
221 replicate_get_bucket (index_t repi,
222                       u32 bucket)
223 {
224     replicate_t *rep;
225
226     repi &= ~MPLS_IS_REPLICATE;
227     rep = replicate_get(repi);
228
229     return (replicate_get_bucket_i(rep, bucket));
230 }
231
232
233 static load_balance_path_t *
234 replicate_multipath_next_hop_fixup (load_balance_path_t *nhs,
235                                     dpo_proto_t drop_proto)
236 {
237     if (0 == vec_len(nhs))
238     {
239         load_balance_path_t *nh;
240
241         /*
242          * we need something for the replicate. so use the drop
243          */
244         vec_add2(nhs, nh, 1);
245
246         nh->path_weight = 1;
247         dpo_copy(&nh->path_dpo, drop_dpo_get(drop_proto));
248     }
249
250     return (nhs);
251 }
252
253 /*
254  * Fill in adjacencies in block based on corresponding
255  * next hop adjacencies.
256  */
257 static void
258 replicate_fill_buckets (replicate_t *rep,
259                         load_balance_path_t *nhs,
260                         dpo_id_t *buckets,
261                         u32 n_buckets)
262 {
263     load_balance_path_t * nh;
264     u16 ii, bucket;
265
266     bucket = 0;
267
268     /*
269      * the next-hops have normalised weights. that means their sum is the number
270      * of buckets we need to fill.
271      */
272     vec_foreach (nh, nhs)
273     {
274         for (ii = 0; ii < nh->path_weight; ii++)
275         {
276             ASSERT(bucket < n_buckets);
277             replicate_set_bucket_i(rep, bucket++, buckets, &nh->path_dpo);
278         }
279     }
280 }
281
282 static inline void
283 replicate_set_n_buckets (replicate_t *rep,
284                          u32 n_buckets)
285 {
286     rep->rep_n_buckets = n_buckets;
287 }
288
289 void
290 replicate_multipath_update (const dpo_id_t *dpo,
291                             load_balance_path_t * next_hops)
292 {
293     load_balance_path_t * nh, * nhs;
294     dpo_id_t *tmp_dpo;
295     u32 ii, n_buckets;
296     replicate_t *rep;
297     index_t repi;
298
299     ASSERT(DPO_REPLICATE == dpo->dpoi_type);
300     repi = dpo->dpoi_index & ~MPLS_IS_REPLICATE;
301     rep = replicate_get(repi);
302     nhs = replicate_multipath_next_hop_fixup(next_hops,
303                                              rep->rep_proto);
304     n_buckets = vec_len(nhs);
305
306     if (0 == rep->rep_n_buckets)
307     {
308         /*
309          * first time initialisation. no packets inflight, so we can write
310          * at leisure.
311          */
312         replicate_set_n_buckets(rep, n_buckets);
313
314         if (!REP_HAS_INLINE_BUCKETS(rep))
315             vec_validate_aligned(rep->rep_buckets,
316                                  rep->rep_n_buckets - 1,
317                                  CLIB_CACHE_LINE_BYTES);
318
319         replicate_fill_buckets(rep, nhs,
320                                replicate_get_buckets(rep),
321                                n_buckets);
322     }
323     else
324     {
325         /*
326          * This is a modification of an existing replicate.
327          * We need to ensure that packets in flight see a consistent state, that
328          * is the number of reported buckets the REP has
329          * is not more than it actually has. So if the
330          * number of buckets is increasing, we must update the bucket array first,
331          * then the reported number. vice-versa if the number of buckets goes down.
332          */
333         if (n_buckets == rep->rep_n_buckets)
334         {
335             /*
336              * no change in the number of buckets. we can simply fill what
337              * is new over what is old.
338              */
339             replicate_fill_buckets(rep, nhs,
340                                    replicate_get_buckets(rep),
341                                    n_buckets);
342         }
343         else if (n_buckets > rep->rep_n_buckets)
344         {
345             /*
346              * we have more buckets. the old replicate map (if there is one)
347              * will remain valid, i.e. mapping to indices within range, so we
348              * update it last.
349              */
350             if (n_buckets > REP_NUM_INLINE_BUCKETS &&
351                 rep->rep_n_buckets <= REP_NUM_INLINE_BUCKETS)
352             {
353                 /*
354                  * the new increased number of buckets is crossing the threshold
355                  * from the inline storage to out-line. Alloc the outline buckets
356                  * first, then fixup the number. then reset the inlines.
357                  */
358                 ASSERT(NULL == rep->rep_buckets);
359                 vec_validate_aligned(rep->rep_buckets,
360                                      n_buckets - 1,
361                                      CLIB_CACHE_LINE_BYTES);
362
363                 replicate_fill_buckets(rep, nhs,
364                                        rep->rep_buckets,
365                                        n_buckets);
366                 CLIB_MEMORY_BARRIER();
367                 replicate_set_n_buckets(rep, n_buckets);
368
369                 CLIB_MEMORY_BARRIER();
370
371                 for (ii = 0; ii < REP_NUM_INLINE_BUCKETS; ii++)
372                 {
373                     dpo_reset(&rep->rep_buckets_inline[ii]);
374                 }
375             }
376             else
377             {
378                 if (n_buckets <= REP_NUM_INLINE_BUCKETS)
379                 {
380                     /*
381                      * we are not crossing the threshold and it's still inline buckets.
382                      * we can write the new on the old..
383                      */
384                     replicate_fill_buckets(rep, nhs,
385                                            replicate_get_buckets(rep),
386                                            n_buckets);
387                     CLIB_MEMORY_BARRIER();
388                     replicate_set_n_buckets(rep, n_buckets);
389                 }
390                 else
391                 {
392                     /*
393                      * we are not crossing the threshold. We need a new bucket array to
394                      * hold the increased number of choices.
395                      */
396                     dpo_id_t *new_buckets, *old_buckets, *tmp_dpo;
397
398                     new_buckets = NULL;
399                     old_buckets = replicate_get_buckets(rep);
400
401                     vec_validate_aligned(new_buckets,
402                                          n_buckets - 1,
403                                          CLIB_CACHE_LINE_BYTES);
404
405                     replicate_fill_buckets(rep, nhs, new_buckets, n_buckets);
406                     CLIB_MEMORY_BARRIER();
407                     rep->rep_buckets = new_buckets;
408                     CLIB_MEMORY_BARRIER();
409                     replicate_set_n_buckets(rep, n_buckets);
410
411                     vec_foreach(tmp_dpo, old_buckets)
412                     {
413                         dpo_reset(tmp_dpo);
414                     }
415                     vec_free(old_buckets);
416                 }
417             }
418         }
419         else
420         {
421             /*
422              * bucket size shrinkage.
423              */
424             if (n_buckets <= REP_NUM_INLINE_BUCKETS &&
425                 rep->rep_n_buckets > REP_NUM_INLINE_BUCKETS)
426             {
427                 /*
428                  * the new decreased number of buckets is crossing the threshold
429                  * from out-line storage to inline:
430                  *   1 - Fill the inline buckets,
431                  *   2 - fixup the number (and this point the inline buckets are
432                  *       used).
433                  *   3 - free the outline buckets
434                  */
435                 replicate_fill_buckets(rep, nhs,
436                                        rep->rep_buckets_inline,
437                                        n_buckets);
438                 CLIB_MEMORY_BARRIER();
439                 replicate_set_n_buckets(rep, n_buckets);
440                 CLIB_MEMORY_BARRIER();
441
442                 vec_foreach(tmp_dpo, rep->rep_buckets)
443                 {
444                     dpo_reset(tmp_dpo);
445                 }
446                 vec_free(rep->rep_buckets);
447             }
448             else
449             {
450                 /*
451                  * not crossing the threshold.
452                  *  1 - update the number to the smaller size
453                  *  2 - write the new buckets
454                  *  3 - reset those no longer used.
455                  */
456                 dpo_id_t *buckets;
457                 u32 old_n_buckets;
458
459                 old_n_buckets = rep->rep_n_buckets;
460                 buckets = replicate_get_buckets(rep);
461
462                 replicate_set_n_buckets(rep, n_buckets);
463                 CLIB_MEMORY_BARRIER();
464
465                 replicate_fill_buckets(rep, nhs,
466                                        buckets,
467                                        n_buckets);
468
469                 for (ii = n_buckets; ii < old_n_buckets; ii++)
470                 {
471                     dpo_reset(&buckets[ii]);
472                 }
473             }
474         }
475     }
476
477     vec_foreach (nh, nhs)
478     {
479         dpo_reset(&nh->path_dpo);
480     }
481     vec_free(nhs);
482 }
483
484 static void
485 replicate_lock (dpo_id_t *dpo)
486 {
487     replicate_t *rep;
488
489     rep = replicate_get(dpo->dpoi_index);
490
491     rep->rep_locks++;
492 }
493
494 static void
495 replicate_destroy (replicate_t *rep)
496 {
497     dpo_id_t *buckets;
498     int i;
499
500     buckets = replicate_get_buckets(rep);
501
502     for (i = 0; i < rep->rep_n_buckets; i++)
503     {
504         dpo_reset(&buckets[i]);
505     }
506
507     REP_DBG(rep, "destroy");
508     if (!REP_HAS_INLINE_BUCKETS(rep))
509     {
510         vec_free(rep->rep_buckets);
511     }
512
513     pool_put(replicate_pool, rep);
514 }
515
516 static void
517 replicate_unlock (dpo_id_t *dpo)
518 {
519     replicate_t *rep;
520
521     rep = replicate_get(dpo->dpoi_index);
522
523     rep->rep_locks--;
524
525     if (0 == rep->rep_locks)
526     {
527         replicate_destroy(rep);
528     }
529 }
530
531 static void
532 replicate_mem_show (void)
533 {
534     fib_show_memory_usage("replicate",
535                           pool_elts(replicate_pool),
536                           pool_len(replicate_pool),
537                           sizeof(replicate_t));
538 }
539
540 const static dpo_vft_t rep_vft = {
541     .dv_lock = replicate_lock,
542     .dv_unlock = replicate_unlock,
543     .dv_format = format_replicate_dpo,
544     .dv_mem_show = replicate_mem_show,
545 };
546
547 /**
548  * @brief The per-protocol VLIB graph nodes that are assigned to a replicate
549  *        object.
550  *
551  * this means that these graph nodes are ones from which a replicate is the
552  * parent object in the DPO-graph.
553  */
554 const static char* const replicate_ip4_nodes[] =
555 {
556     "ip4-replicate",
557     NULL,
558 };
559 const static char* const replicate_ip6_nodes[] =
560 {
561     "ip6-replicate",
562     NULL,
563 };
564 const static char* const replicate_mpls_nodes[] =
565 {
566     "mpls-replicate",
567     NULL,
568 };
569
570 const static char* const * const replicate_nodes[DPO_PROTO_NUM] =
571 {
572     [DPO_PROTO_IP4]  = replicate_ip4_nodes,
573     [DPO_PROTO_IP6]  = replicate_ip6_nodes,
574     [DPO_PROTO_MPLS] = replicate_mpls_nodes,
575 };
576
577 void
578 replicate_module_init (void)
579 {
580     dpo_register(DPO_REPLICATE, &rep_vft, replicate_nodes);
581 }
582
583 static clib_error_t *
584 replicate_show (vlib_main_t * vm,
585                 unformat_input_t * input,
586                 vlib_cli_command_t * cmd)
587 {
588     index_t repi = INDEX_INVALID;
589
590     while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
591     {
592         if (unformat (input, "%d", &repi))
593             ;
594         else
595             break;
596     }
597
598     if (INDEX_INVALID != repi)
599     {
600         vlib_cli_output (vm, "%U", format_replicate, repi,
601                          REPLICATE_FORMAT_DETAIL);
602     }
603     else
604     {
605         replicate_t *rep;
606
607         pool_foreach(rep, replicate_pool,
608         ({
609             vlib_cli_output (vm, "%U", format_replicate,
610                              replicate_get_index(rep),
611                              REPLICATE_FORMAT_NONE);
612         }));
613     }
614
615     return 0;
616 }
617
618 VLIB_CLI_COMMAND (replicate_show_command, static) = {
619     .path = "show replicate",
620     .short_help = "show replicate [<index>]",
621     .function = replicate_show,
622 };
623
624 typedef struct replicate_trace_t_
625 {
626     index_t rep_index;
627     dpo_id_t dpo;
628 } replicate_trace_t;
629
630 static uword
631 replicate_inline (vlib_main_t * vm,
632                   vlib_node_runtime_t * node,
633                   vlib_frame_t * frame)
634 {
635     vlib_combined_counter_main_t * cm = &replicate_main.repm_counters;
636     replicate_main_t * rm = &replicate_main;
637     u32 n_left_from, * from, * to_next, next_index;
638     u32 thread_index = vlib_get_thread_index();
639
640     from = vlib_frame_vector_args (frame);
641     n_left_from = frame->n_vectors;
642     next_index = node->cached_next_index;
643   
644     while (n_left_from > 0)
645     {
646         u32 n_left_to_next;
647
648         vlib_get_next_frame (vm, node, next_index,
649                              to_next, n_left_to_next);
650
651         while (n_left_from > 0 && n_left_to_next > 0)
652         {
653             u32 next0, ci0, bi0, bucket, repi0;
654             const replicate_t *rep0;
655             vlib_buffer_t * b0, *c0;
656             const dpo_id_t *dpo0;
657             u8 num_cloned;
658
659             bi0 = from[0];
660             from += 1;
661             n_left_from -= 1;
662
663             b0 = vlib_get_buffer (vm, bi0);
664             repi0 = vnet_buffer (b0)->ip.adj_index[VLIB_TX];
665             rep0 = replicate_get(repi0);
666
667             vlib_increment_combined_counter(
668                 cm, thread_index, repi0, 1,
669                 vlib_buffer_length_in_chain(vm, b0));
670
671             vec_validate (rm->clones[thread_index], rep0->rep_n_buckets - 1);
672
673             num_cloned = vlib_buffer_clone (vm, bi0, rm->clones[thread_index], rep0->rep_n_buckets, 128);
674
675             if (num_cloned != rep0->rep_n_buckets)
676               {
677                 vlib_node_increment_counter
678                   (vm, node->node_index,
679                    REPLICATE_DPO_ERROR_BUFFER_ALLOCATION_FAILURE, 1);
680               }
681
682             for (bucket = 0; bucket < num_cloned; bucket++)
683             {
684                 ci0 = rm->clones[thread_index][bucket];
685                 c0 = vlib_get_buffer(vm, ci0);
686
687                 to_next[0] = ci0;
688                 to_next += 1;
689                 n_left_to_next -= 1;
690
691                 dpo0 = replicate_get_bucket_i(rep0, bucket);
692                 next0 = dpo0->dpoi_next_node;
693                 vnet_buffer (c0)->ip.adj_index[VLIB_TX] = dpo0->dpoi_index;
694
695                 if (PREDICT_FALSE(c0->flags & VLIB_BUFFER_IS_TRACED))
696                 {
697                     replicate_trace_t *t = vlib_add_trace (vm, node, c0, sizeof (*t));
698                     t->rep_index = repi0;
699                     t->dpo = *dpo0;
700                 }
701
702                 vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
703                                                  to_next, n_left_to_next,
704                                                  ci0, next0);
705                 if (PREDICT_FALSE (n_left_to_next == 0))
706                   {
707                     vlib_put_next_frame (vm, node, next_index, n_left_to_next);
708                     vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
709                   }
710             }
711             vec_reset_length (rm->clones[thread_index]);
712         }
713
714         vlib_put_next_frame (vm, node, next_index, n_left_to_next);
715     }
716
717     return frame->n_vectors;
718 }
719
720 static u8 *
721 format_replicate_trace (u8 * s, va_list * args)
722 {
723   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
724   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
725   replicate_trace_t *t = va_arg (*args, replicate_trace_t *);
726
727   s = format (s, "replicate: %d via %U",
728               t->rep_index,
729               format_dpo_id, &t->dpo, 0);
730   return s;
731 }
732
733 static uword
734 ip4_replicate (vlib_main_t * vm,
735                vlib_node_runtime_t * node,
736                vlib_frame_t * frame)
737 {
738     return (replicate_inline (vm, node, frame));
739 }
740
741 /**
742  * @brief IP4 replication node
743  */
744 VLIB_REGISTER_NODE (ip4_replicate_node) = {
745   .function = ip4_replicate,
746   .name = "ip4-replicate",
747   .vector_size = sizeof (u32),
748
749   .n_errors = ARRAY_LEN(replicate_dpo_error_strings),
750   .error_strings = replicate_dpo_error_strings,
751
752   .format_trace = format_replicate_trace,
753   .n_next_nodes = 1,
754   .next_nodes = {
755       [0] = "ip4-drop",
756   },
757 };
758
759 static uword
760 ip6_replicate (vlib_main_t * vm,
761                vlib_node_runtime_t * node,
762                vlib_frame_t * frame)
763 {
764     return (replicate_inline (vm, node, frame));
765 }
766
767 /**
768  * @brief IPv6 replication node
769  */
770 VLIB_REGISTER_NODE (ip6_replicate_node) = {
771   .function = ip6_replicate,
772   .name = "ip6-replicate",
773   .vector_size = sizeof (u32),
774
775   .n_errors = ARRAY_LEN(replicate_dpo_error_strings),
776   .error_strings = replicate_dpo_error_strings,
777
778   .format_trace = format_replicate_trace,
779   .n_next_nodes = 1,
780   .next_nodes = {
781       [0] = "ip6-drop",
782   },
783 };
784
785 static uword
786 mpls_replicate (vlib_main_t * vm,
787                 vlib_node_runtime_t * node,
788                 vlib_frame_t * frame)
789 {
790     return (replicate_inline (vm, node, frame));
791 }
792
793 /**
794  * @brief MPLS replication node
795  */
796 VLIB_REGISTER_NODE (mpls_replicate_node) = {
797   .function = mpls_replicate,
798   .name = "mpls-replicate",
799   .vector_size = sizeof (u32),
800
801   .n_errors = ARRAY_LEN(replicate_dpo_error_strings),
802   .error_strings = replicate_dpo_error_strings,
803
804   .format_trace = format_replicate_trace,
805   .n_next_nodes = 1,
806   .next_nodes = {
807       [0] = "mpls-drop",
808   },
809 };
810
811 clib_error_t *
812 replicate_dpo_init (vlib_main_t * vm)
813 {
814   replicate_main_t * rm = &replicate_main;
815
816   vec_validate (rm->clones, vlib_num_workers());
817
818   return 0;
819 }
820
821 VLIB_INIT_FUNCTION (replicate_dpo_init);