998a729dcd26ee1eca102274f905791f86584aa6
[vpp.git] / src / vnet / dpo / replicate_dpo.c
1 /*
2  * Copyright (c) 2016 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/ip/lookup.h>
17 #include <vnet/dpo/replicate_dpo.h>
18 #include <vnet/dpo/drop_dpo.h>
19 #include <vnet/dpo/receive_dpo.h>
20 #include <vnet/adj/adj.h>
21 #include <vnet/mpls/mpls_types.h>
22
23 /**
24  * the logger
25  */
26 vlib_log_class_t replicate_logger;
27
28 #define REP_DBG(_rep, _fmt, _args...)                                   \
29 {                                                                       \
30     vlib_log_debug(replicate_logger,                                    \
31                    "rep:[%U]:" _fmt,                                    \
32                    format_replicate,                                    \
33                    replicate_get_index(_rep),                           \
34                    REPLICATE_FORMAT_NONE,                               \
35                    ##_args);                                            \
36 }
37
38 #define foreach_replicate_dpo_error                       \
39 _(BUFFER_ALLOCATION_FAILURE, "Buffer Allocation Failure")
40
41 typedef enum {
42 #define _(sym,str) REPLICATE_DPO_ERROR_##sym,
43   foreach_replicate_dpo_error
44 #undef _
45   REPLICATE_DPO_N_ERROR,
46 } replicate_dpo_error_t;
47
48 static char * replicate_dpo_error_strings[] = {
49 #define _(sym,string) string,
50   foreach_replicate_dpo_error
51 #undef _
52 };
53
54 /**
55  * Pool of all DPOs. It's not static so the DP can have fast access
56  */
57 replicate_t *replicate_pool;
58
59 /**
60  * The one instance of replicate main
61  */
62 replicate_main_t replicate_main = {
63     .repm_counters = {
64         .name = "mroutes",
65         .stat_segment_name = "/net/mroute",
66     },
67 };
68
69 static inline index_t
70 replicate_get_index (const replicate_t *rep)
71 {
72     return (rep - replicate_pool);
73 }
74
75 static inline dpo_id_t*
76 replicate_get_buckets (replicate_t *rep)
77 {
78     if (REP_HAS_INLINE_BUCKETS(rep))
79     {
80         return (rep->rep_buckets_inline);
81     }
82     else
83     {
84         return (rep->rep_buckets);
85     }
86 }
87
88 static replicate_t *
89 replicate_alloc_i (void)
90 {
91     replicate_t *rep;
92
93     pool_get_aligned(replicate_pool, rep, CLIB_CACHE_LINE_BYTES);
94     clib_memset(rep, 0, sizeof(*rep));
95
96     vlib_validate_combined_counter(&(replicate_main.repm_counters),
97                                    replicate_get_index(rep));
98     vlib_zero_combined_counter(&(replicate_main.repm_counters),
99                                replicate_get_index(rep));
100
101     return (rep);
102 }
103
104 static u8*
105 format_replicate_flags (u8 *s, va_list *args)
106 {
107     int flags = va_arg (*args, int);
108
109     if (flags == REPLICATE_FLAGS_NONE)
110     {
111         s = format (s, "none");
112     }
113     else if (flags & REPLICATE_FLAGS_HAS_LOCAL)
114     {
115         s = format (s, "has-local ");
116     }
117
118     return (s);
119 }
120
121 static u8*
122 replicate_format (index_t repi,
123                   replicate_format_flags_t flags,
124                   u32 indent,
125                   u8 *s)
126 {
127     vlib_counter_t to;
128     replicate_t *rep;
129     dpo_id_t *buckets;
130     u32 i;
131
132     repi &= ~MPLS_IS_REPLICATE;
133     rep = replicate_get(repi);
134     vlib_get_combined_counter(&(replicate_main.repm_counters), repi, &to);
135     buckets = replicate_get_buckets(rep);
136
137     s = format(s, "%U: ", format_dpo_type, DPO_REPLICATE);
138     s = format(s, "[index:%d buckets:%d ", repi, rep->rep_n_buckets);
139     s = format(s, "flags:[%U] ", format_replicate_flags, rep->rep_flags);
140     s = format(s, "to:[%Ld:%Ld]]", to.packets, to.bytes);
141
142     for (i = 0; i < rep->rep_n_buckets; i++)
143     {
144         s = format(s, "\n%U", format_white_space, indent+2);
145         s = format(s, "[%d]", i);
146         s = format(s, " %U", format_dpo_id, &buckets[i], indent+6);
147     }
148     return (s);
149 }
150
151 u8*
152 format_replicate (u8 * s, va_list * args)
153 {
154     index_t repi = va_arg(*args, index_t);
155     replicate_format_flags_t flags = va_arg(*args, replicate_format_flags_t);
156
157     return (replicate_format(repi, flags, 0, s));
158 }
159 static u8*
160 format_replicate_dpo (u8 * s, va_list * args)
161 {
162     index_t repi = va_arg(*args, index_t);
163     u32 indent = va_arg(*args, u32);
164
165     return (replicate_format(repi, REPLICATE_FORMAT_DETAIL, indent, s));
166 }
167
168
169 static replicate_t *
170 replicate_create_i (u32 num_buckets,
171                     dpo_proto_t rep_proto)
172 {
173     replicate_t *rep;
174
175     rep = replicate_alloc_i();
176     rep->rep_n_buckets = num_buckets;
177     rep->rep_proto = rep_proto;
178
179     if (!REP_HAS_INLINE_BUCKETS(rep))
180     {
181         vec_validate_aligned(rep->rep_buckets,
182                              rep->rep_n_buckets - 1,
183                              CLIB_CACHE_LINE_BYTES);
184     }
185
186     REP_DBG(rep, "create");
187
188     return (rep);
189 }
190
191 index_t
192 replicate_create (u32 n_buckets,
193                   dpo_proto_t rep_proto)
194 {
195     return (replicate_get_index(replicate_create_i(n_buckets, rep_proto)));
196 }
197
198 static inline void
199 replicate_set_bucket_i (replicate_t *rep,
200                         u32 bucket,
201                         dpo_id_t *buckets,
202                         const dpo_id_t *next)
203 {
204     if (dpo_is_receive(&buckets[bucket]))
205     {
206         rep->rep_flags &= ~REPLICATE_FLAGS_HAS_LOCAL;
207     }
208     if (dpo_is_receive(next))
209     {
210         rep->rep_flags |= REPLICATE_FLAGS_HAS_LOCAL;
211     }
212     dpo_stack(DPO_REPLICATE, rep->rep_proto, &buckets[bucket], next);
213 }
214
215 void
216 replicate_set_bucket (index_t repi,
217                       u32 bucket,
218                       const dpo_id_t *next)
219 {
220     replicate_t *rep;
221     dpo_id_t *buckets;
222
223     repi &= ~MPLS_IS_REPLICATE;
224     rep = replicate_get(repi);
225     buckets = replicate_get_buckets(rep);
226
227     ASSERT(bucket < rep->rep_n_buckets);
228
229     replicate_set_bucket_i(rep, bucket, buckets, next);
230 }
231
232 int
233 replicate_is_drop (const dpo_id_t *dpo)
234 {
235     replicate_t *rep;
236     index_t repi;
237
238     if (DPO_REPLICATE != dpo->dpoi_type)
239         return (0);
240
241     repi = dpo->dpoi_index & ~MPLS_IS_REPLICATE;
242     rep = replicate_get(repi);
243
244     if (1 == rep->rep_n_buckets)
245     {
246         return (dpo_is_drop(replicate_get_bucket_i(rep, 0)));
247     }
248     return (0);
249 }
250
251 const dpo_id_t *
252 replicate_get_bucket (index_t repi,
253                       u32 bucket)
254 {
255     replicate_t *rep;
256
257     repi &= ~MPLS_IS_REPLICATE;
258     rep = replicate_get(repi);
259
260     return (replicate_get_bucket_i(rep, bucket));
261 }
262
263
264 static load_balance_path_t *
265 replicate_multipath_next_hop_fixup (load_balance_path_t *nhs,
266                                     dpo_proto_t drop_proto)
267 {
268     if (0 == vec_len(nhs))
269     {
270         load_balance_path_t *nh;
271
272         /*
273          * we need something for the replicate. so use the drop
274          */
275         vec_add2(nhs, nh, 1);
276
277         nh->path_weight = 1;
278         dpo_copy(&nh->path_dpo, drop_dpo_get(drop_proto));
279     }
280
281     return (nhs);
282 }
283
284 /*
285  * Fill in adjacencies in block based on corresponding
286  * next hop adjacencies.
287  */
288 static void
289 replicate_fill_buckets (replicate_t *rep,
290                         load_balance_path_t *nhs,
291                         dpo_id_t *buckets,
292                         u32 n_buckets)
293 {
294     load_balance_path_t * nh;
295     u16 bucket;
296
297     bucket = 0;
298
299     /*
300      * the next-hops have normalised weights. that means their sum is the number
301      * of buckets we need to fill.
302      */
303     vec_foreach (nh, nhs)
304     {
305         ASSERT(bucket < n_buckets);
306         replicate_set_bucket_i(rep, bucket++, buckets, &nh->path_dpo);
307     }
308 }
309
310 static inline void
311 replicate_set_n_buckets (replicate_t *rep,
312                          u32 n_buckets)
313 {
314     rep->rep_n_buckets = n_buckets;
315 }
316
317 void
318 replicate_multipath_update (const dpo_id_t *dpo,
319                             load_balance_path_t * next_hops)
320 {
321     load_balance_path_t * nh, * nhs;
322     dpo_id_t *tmp_dpo;
323     u32 ii, n_buckets;
324     replicate_t *rep;
325     index_t repi;
326
327     ASSERT(DPO_REPLICATE == dpo->dpoi_type);
328     repi = dpo->dpoi_index & ~MPLS_IS_REPLICATE;
329     rep = replicate_get(repi);
330     nhs = replicate_multipath_next_hop_fixup(next_hops,
331                                              rep->rep_proto);
332     n_buckets = vec_len(nhs);
333
334     if (0 == rep->rep_n_buckets)
335     {
336         /*
337          * first time initialisation. no packets inflight, so we can write
338          * at leisure.
339          */
340         replicate_set_n_buckets(rep, n_buckets);
341
342         if (!REP_HAS_INLINE_BUCKETS(rep))
343             vec_validate_aligned(rep->rep_buckets,
344                                  rep->rep_n_buckets - 1,
345                                  CLIB_CACHE_LINE_BYTES);
346
347         replicate_fill_buckets(rep, nhs,
348                                replicate_get_buckets(rep),
349                                n_buckets);
350     }
351     else
352     {
353         /*
354          * This is a modification of an existing replicate.
355          * We need to ensure that packets in flight see a consistent state, that
356          * is the number of reported buckets the REP has
357          * is not more than it actually has. So if the
358          * number of buckets is increasing, we must update the bucket array first,
359          * then the reported number. vice-versa if the number of buckets goes down.
360          */
361         if (n_buckets == rep->rep_n_buckets)
362         {
363             /*
364              * no change in the number of buckets. we can simply fill what
365              * is new over what is old.
366              */
367             replicate_fill_buckets(rep, nhs,
368                                    replicate_get_buckets(rep),
369                                    n_buckets);
370         }
371         else if (n_buckets > rep->rep_n_buckets)
372         {
373             /*
374              * we have more buckets. the old replicate map (if there is one)
375              * will remain valid, i.e. mapping to indices within range, so we
376              * update it last.
377              */
378             if (n_buckets > REP_NUM_INLINE_BUCKETS &&
379                 rep->rep_n_buckets <= REP_NUM_INLINE_BUCKETS)
380             {
381                 /*
382                  * the new increased number of buckets is crossing the threshold
383                  * from the inline storage to out-line. Alloc the outline buckets
384                  * first, then fixup the number. then reset the inlines.
385                  */
386                 ASSERT(NULL == rep->rep_buckets);
387                 vec_validate_aligned(rep->rep_buckets,
388                                      n_buckets - 1,
389                                      CLIB_CACHE_LINE_BYTES);
390
391                 replicate_fill_buckets(rep, nhs,
392                                        rep->rep_buckets,
393                                        n_buckets);
394                 CLIB_MEMORY_BARRIER();
395                 replicate_set_n_buckets(rep, n_buckets);
396
397                 CLIB_MEMORY_BARRIER();
398
399                 for (ii = 0; ii < REP_NUM_INLINE_BUCKETS; ii++)
400                 {
401                     dpo_reset(&rep->rep_buckets_inline[ii]);
402                 }
403             }
404             else
405             {
406                 if (n_buckets <= REP_NUM_INLINE_BUCKETS)
407                 {
408                     /*
409                      * we are not crossing the threshold and it's still inline buckets.
410                      * we can write the new on the old..
411                      */
412                     replicate_fill_buckets(rep, nhs,
413                                            replicate_get_buckets(rep),
414                                            n_buckets);
415                     CLIB_MEMORY_BARRIER();
416                     replicate_set_n_buckets(rep, n_buckets);
417                 }
418                 else
419                 {
420                     /*
421                      * we are not crossing the threshold. We need a new bucket array to
422                      * hold the increased number of choices.
423                      */
424                     dpo_id_t *new_buckets, *old_buckets, *tmp_dpo;
425
426                     new_buckets = NULL;
427                     old_buckets = replicate_get_buckets(rep);
428
429                     vec_validate_aligned(new_buckets,
430                                          n_buckets - 1,
431                                          CLIB_CACHE_LINE_BYTES);
432
433                     replicate_fill_buckets(rep, nhs, new_buckets, n_buckets);
434                     CLIB_MEMORY_BARRIER();
435                     rep->rep_buckets = new_buckets;
436                     CLIB_MEMORY_BARRIER();
437                     replicate_set_n_buckets(rep, n_buckets);
438
439                     vec_foreach(tmp_dpo, old_buckets)
440                     {
441                         dpo_reset(tmp_dpo);
442                     }
443                     vec_free(old_buckets);
444                 }
445             }
446         }
447         else
448         {
449             /*
450              * bucket size shrinkage.
451              */
452             if (n_buckets <= REP_NUM_INLINE_BUCKETS &&
453                 rep->rep_n_buckets > REP_NUM_INLINE_BUCKETS)
454             {
455                 /*
456                  * the new decreased number of buckets is crossing the threshold
457                  * from out-line storage to inline:
458                  *   1 - Fill the inline buckets,
459                  *   2 - fixup the number (and this point the inline buckets are
460                  *       used).
461                  *   3 - free the outline buckets
462                  */
463                 replicate_fill_buckets(rep, nhs,
464                                        rep->rep_buckets_inline,
465                                        n_buckets);
466                 CLIB_MEMORY_BARRIER();
467                 replicate_set_n_buckets(rep, n_buckets);
468                 CLIB_MEMORY_BARRIER();
469
470                 vec_foreach(tmp_dpo, rep->rep_buckets)
471                 {
472                     dpo_reset(tmp_dpo);
473                 }
474                 vec_free(rep->rep_buckets);
475             }
476             else
477             {
478                 /*
479                  * not crossing the threshold.
480                  *  1 - update the number to the smaller size
481                  *  2 - write the new buckets
482                  *  3 - reset those no longer used.
483                  */
484                 dpo_id_t *buckets;
485                 u32 old_n_buckets;
486
487                 old_n_buckets = rep->rep_n_buckets;
488                 buckets = replicate_get_buckets(rep);
489
490                 replicate_set_n_buckets(rep, n_buckets);
491                 CLIB_MEMORY_BARRIER();
492
493                 replicate_fill_buckets(rep, nhs,
494                                        buckets,
495                                        n_buckets);
496
497                 for (ii = n_buckets; ii < old_n_buckets; ii++)
498                 {
499                     dpo_reset(&buckets[ii]);
500                 }
501             }
502         }
503     }
504
505     vec_foreach (nh, nhs)
506     {
507         dpo_reset(&nh->path_dpo);
508     }
509     vec_free(nhs);
510 }
511
512 static void
513 replicate_lock (dpo_id_t *dpo)
514 {
515     replicate_t *rep;
516
517     rep = replicate_get(dpo->dpoi_index);
518
519     rep->rep_locks++;
520 }
521
522 index_t
523 replicate_dup (replicate_flags_t flags,
524                index_t repi)
525 {
526     replicate_t *rep, *copy;
527
528     rep = replicate_get(repi);
529
530     if (rep->rep_flags == flags ||
531         flags & REPLICATE_FLAGS_HAS_LOCAL)
532     {
533         /*
534          * we can include all the buckets from the original in the copy
535          */
536         return (repi);
537     }
538     else
539     {
540         /*
541          * caller doesn't want the local paths that the original has
542          */
543         if (rep->rep_n_buckets == 1)
544         {
545             /*
546              * original has only one bucket that is the local, so create
547              * a new one with only the drop
548              */
549             copy = replicate_create_i (1, rep->rep_proto);
550
551             replicate_set_bucket_i(copy, 0,
552                                    replicate_get_buckets(copy),
553                                    drop_dpo_get(rep->rep_proto));
554         }
555         else
556         {
557             dpo_id_t *old_buckets, *copy_buckets;
558             u16 bucket, pos;
559
560             copy = replicate_create_i(rep->rep_n_buckets - 1,
561                                       rep->rep_proto);
562
563             rep = replicate_get(repi);
564             old_buckets = replicate_get_buckets(rep);
565             copy_buckets = replicate_get_buckets(copy);
566             pos = 0;
567
568             for (bucket = 0; bucket < rep->rep_n_buckets; bucket++)
569             {
570                 if (!dpo_is_receive(&old_buckets[bucket]))
571                 {
572                     replicate_set_bucket_i(copy, pos, copy_buckets,
573                                            (&old_buckets[bucket]));
574                     pos++;
575                 }
576             }
577         }
578     }
579
580     return (replicate_get_index(copy));
581 }
582
583 static void
584 replicate_destroy (replicate_t *rep)
585 {
586     dpo_id_t *buckets;
587     int i;
588
589     buckets = replicate_get_buckets(rep);
590
591     for (i = 0; i < rep->rep_n_buckets; i++)
592     {
593         dpo_reset(&buckets[i]);
594     }
595
596     REP_DBG(rep, "destroy");
597     if (!REP_HAS_INLINE_BUCKETS(rep))
598     {
599         vec_free(rep->rep_buckets);
600     }
601
602     pool_put(replicate_pool, rep);
603 }
604
605 static void
606 replicate_unlock (dpo_id_t *dpo)
607 {
608     replicate_t *rep;
609
610     rep = replicate_get(dpo->dpoi_index);
611
612     rep->rep_locks--;
613
614     if (0 == rep->rep_locks)
615     {
616         replicate_destroy(rep);
617     }
618 }
619
620 static void
621 replicate_mem_show (void)
622 {
623     fib_show_memory_usage("replicate",
624                           pool_elts(replicate_pool),
625                           pool_len(replicate_pool),
626                           sizeof(replicate_t));
627 }
628
629 const static dpo_vft_t rep_vft = {
630     .dv_lock = replicate_lock,
631     .dv_unlock = replicate_unlock,
632     .dv_format = format_replicate_dpo,
633     .dv_mem_show = replicate_mem_show,
634 };
635
636 /**
637  * @brief The per-protocol VLIB graph nodes that are assigned to a replicate
638  *        object.
639  *
640  * this means that these graph nodes are ones from which a replicate is the
641  * parent object in the DPO-graph.
642  */
643 const static char* const replicate_ip4_nodes[] =
644 {
645     "ip4-replicate",
646     NULL,
647 };
648 const static char* const replicate_ip6_nodes[] =
649 {
650     "ip6-replicate",
651     NULL,
652 };
653 const static char* const replicate_mpls_nodes[] =
654 {
655     "mpls-replicate",
656     NULL,
657 };
658
659 const static char* const * const replicate_nodes[DPO_PROTO_NUM] =
660 {
661     [DPO_PROTO_IP4]  = replicate_ip4_nodes,
662     [DPO_PROTO_IP6]  = replicate_ip6_nodes,
663     [DPO_PROTO_MPLS] = replicate_mpls_nodes,
664 };
665
666 void
667 replicate_module_init (void)
668 {
669     dpo_register(DPO_REPLICATE, &rep_vft, replicate_nodes);
670     replicate_logger = vlib_log_register_class("dpo", "replicate");
671 }
672
673 static clib_error_t *
674 replicate_show (vlib_main_t * vm,
675                 unformat_input_t * input,
676                 vlib_cli_command_t * cmd)
677 {
678     index_t repi = INDEX_INVALID;
679
680     while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
681     {
682         if (unformat (input, "%d", &repi))
683             ;
684         else
685             break;
686     }
687
688     if (INDEX_INVALID != repi)
689     {
690         vlib_cli_output (vm, "%U", format_replicate, repi,
691                          REPLICATE_FORMAT_DETAIL);
692     }
693     else
694     {
695         replicate_t *rep;
696
697         pool_foreach(rep, replicate_pool,
698         ({
699             vlib_cli_output (vm, "%U", format_replicate,
700                              replicate_get_index(rep),
701                              REPLICATE_FORMAT_NONE);
702         }));
703     }
704
705     return 0;
706 }
707
708 VLIB_CLI_COMMAND (replicate_show_command, static) = {
709     .path = "show replicate",
710     .short_help = "show replicate [<index>]",
711     .function = replicate_show,
712 };
713
714 typedef struct replicate_trace_t_
715 {
716     index_t rep_index;
717     dpo_id_t dpo;
718 } replicate_trace_t;
719
720 static uword
721 replicate_inline (vlib_main_t * vm,
722                   vlib_node_runtime_t * node,
723                   vlib_frame_t * frame)
724 {
725     vlib_combined_counter_main_t * cm = &replicate_main.repm_counters;
726     replicate_main_t * rm = &replicate_main;
727     u32 n_left_from, * from, * to_next, next_index;
728     u32 thread_index = vlib_get_thread_index();
729
730     from = vlib_frame_vector_args (frame);
731     n_left_from = frame->n_vectors;
732     next_index = node->cached_next_index;
733   
734     while (n_left_from > 0)
735     {
736         u32 n_left_to_next;
737
738         vlib_get_next_frame (vm, node, next_index,
739                              to_next, n_left_to_next);
740
741         while (n_left_from > 0 && n_left_to_next > 0)
742         {
743             u32 next0, ci0, bi0, bucket, repi0;
744             const replicate_t *rep0;
745             vlib_buffer_t * b0, *c0;
746             const dpo_id_t *dpo0;
747             u8 num_cloned;
748
749             bi0 = from[0];
750             from += 1;
751             n_left_from -= 1;
752
753             b0 = vlib_get_buffer (vm, bi0);
754             repi0 = vnet_buffer (b0)->ip.adj_index[VLIB_TX];
755             rep0 = replicate_get(repi0);
756
757             vlib_increment_combined_counter(
758                 cm, thread_index, repi0, 1,
759                 vlib_buffer_length_in_chain(vm, b0));
760
761             vec_validate (rm->clones[thread_index], rep0->rep_n_buckets - 1);
762
763             num_cloned = vlib_buffer_clone (vm, bi0, rm->clones[thread_index],
764                                             rep0->rep_n_buckets,
765                                             VLIB_BUFFER_CLONE_HEAD_SIZE);
766
767             if (num_cloned != rep0->rep_n_buckets)
768               {
769                 vlib_node_increment_counter
770                   (vm, node->node_index,
771                    REPLICATE_DPO_ERROR_BUFFER_ALLOCATION_FAILURE, 1);
772               }
773
774             for (bucket = 0; bucket < num_cloned; bucket++)
775             {
776                 ci0 = rm->clones[thread_index][bucket];
777                 c0 = vlib_get_buffer(vm, ci0);
778
779                 to_next[0] = ci0;
780                 to_next += 1;
781                 n_left_to_next -= 1;
782
783                 dpo0 = replicate_get_bucket_i(rep0, bucket);
784                 next0 = dpo0->dpoi_next_node;
785                 vnet_buffer (c0)->ip.adj_index[VLIB_TX] = dpo0->dpoi_index;
786
787                 if (PREDICT_FALSE(b0->flags & VLIB_BUFFER_IS_TRACED))
788                 {
789                     replicate_trace_t *t;
790
791                     if (c0 != b0)
792                       VLIB_BUFFER_TRACE_TRAJECTORY_INIT (c0);
793                     t = vlib_add_trace (vm, node, c0, sizeof (*t));
794                     t->rep_index = repi0;
795                     t->dpo = *dpo0;
796                 }
797
798                 vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
799                                                  to_next, n_left_to_next,
800                                                  ci0, next0);
801                 if (PREDICT_FALSE (n_left_to_next == 0))
802                   {
803                     vlib_put_next_frame (vm, node, next_index, n_left_to_next);
804                     vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
805                   }
806             }
807             vec_reset_length (rm->clones[thread_index]);
808         }
809
810         vlib_put_next_frame (vm, node, next_index, n_left_to_next);
811     }
812
813     return frame->n_vectors;
814 }
815
816 static u8 *
817 format_replicate_trace (u8 * s, va_list * args)
818 {
819   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
820   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
821   replicate_trace_t *t = va_arg (*args, replicate_trace_t *);
822
823   s = format (s, "replicate: %d via %U",
824               t->rep_index,
825               format_dpo_id, &t->dpo, 0);
826   return s;
827 }
828
829 static uword
830 ip4_replicate (vlib_main_t * vm,
831                vlib_node_runtime_t * node,
832                vlib_frame_t * frame)
833 {
834     return (replicate_inline (vm, node, frame));
835 }
836
837 /**
838  * @brief IP4 replication node
839  */
840 VLIB_REGISTER_NODE (ip4_replicate_node) = {
841   .function = ip4_replicate,
842   .name = "ip4-replicate",
843   .vector_size = sizeof (u32),
844
845   .n_errors = ARRAY_LEN(replicate_dpo_error_strings),
846   .error_strings = replicate_dpo_error_strings,
847
848   .format_trace = format_replicate_trace,
849   .n_next_nodes = 1,
850   .next_nodes = {
851       [0] = "ip4-drop",
852   },
853 };
854
855 static uword
856 ip6_replicate (vlib_main_t * vm,
857                vlib_node_runtime_t * node,
858                vlib_frame_t * frame)
859 {
860     return (replicate_inline (vm, node, frame));
861 }
862
863 /**
864  * @brief IPv6 replication node
865  */
866 VLIB_REGISTER_NODE (ip6_replicate_node) = {
867   .function = ip6_replicate,
868   .name = "ip6-replicate",
869   .vector_size = sizeof (u32),
870
871   .n_errors = ARRAY_LEN(replicate_dpo_error_strings),
872   .error_strings = replicate_dpo_error_strings,
873
874   .format_trace = format_replicate_trace,
875   .n_next_nodes = 1,
876   .next_nodes = {
877       [0] = "ip6-drop",
878   },
879 };
880
881 static uword
882 mpls_replicate (vlib_main_t * vm,
883                 vlib_node_runtime_t * node,
884                 vlib_frame_t * frame)
885 {
886     return (replicate_inline (vm, node, frame));
887 }
888
889 /**
890  * @brief MPLS replication node
891  */
892 VLIB_REGISTER_NODE (mpls_replicate_node) = {
893   .function = mpls_replicate,
894   .name = "mpls-replicate",
895   .vector_size = sizeof (u32),
896
897   .n_errors = ARRAY_LEN(replicate_dpo_error_strings),
898   .error_strings = replicate_dpo_error_strings,
899
900   .format_trace = format_replicate_trace,
901   .n_next_nodes = 1,
902   .next_nodes = {
903       [0] = "mpls-drop",
904   },
905 };
906
907 clib_error_t *
908 replicate_dpo_init (vlib_main_t * vm)
909 {
910   replicate_main_t * rm = &replicate_main;
911
912   vec_validate (rm->clones, vlib_num_workers());
913
914   return 0;
915 }
916
917 VLIB_INIT_FUNCTION (replicate_dpo_init);