Flowprobe: Stateful flows and IPv6, L4 recording
[vpp.git] / src / plugins / flowprobe / node.c
1 /*
2  * node.c - ipfix probe graph node
3  *
4  * Copyright (c) 2017 Cisco and/or its affiliates.
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 #include <vlib/vlib.h>
18 #include <vnet/vnet.h>
19 #include <vnet/pg/pg.h>
20 #include <vppinfra/error.h>
21 #include <flowprobe/flowprobe.h>
22 #include <vnet/ip/ip6_packet.h>
23
24 static void flowprobe_export_entry (vlib_main_t * vm, flowprobe_entry_t * e);
25
26 /**
27  * @file flow record generator graph node
28  */
29
30 typedef struct
31 {
32   /** interface handle */
33   u32 rx_sw_if_index;
34   u32 tx_sw_if_index;
35   /** packet timestamp */
36   u64 timestamp;
37   /** size of the buffer */
38   u16 buffer_size;
39
40   /** L2 information */
41   u8 src_mac[6];
42   u8 dst_mac[6];
43   /** Ethertype */
44   u16 ethertype;
45
46   /** L3 information */
47   ip46_address_t src_address;
48   ip46_address_t dst_address;
49   u8 protocol;
50   u8 tos;
51
52   /** L4 information */
53   u16 src_port;
54   u16 dst_port;
55
56   flowprobe_variant_t which;
57 } flowprobe_trace_t;
58
59 static char *flowprobe_variant_strings[] = {
60   [FLOW_VARIANT_IP4] = "IP4",
61   [FLOW_VARIANT_IP6] = "IP6",
62   [FLOW_VARIANT_L2] = "L2",
63   [FLOW_VARIANT_L2_IP4] = "L2-IP4",
64   [FLOW_VARIANT_L2_IP6] = "L2-IP6",
65 };
66
67 /* packet trace format function */
68 static u8 *
69 format_flowprobe_trace (u8 * s, va_list * args)
70 {
71   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
72   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
73   flowprobe_trace_t *t = va_arg (*args, flowprobe_trace_t *);
74   uword indent = format_get_indent (s);
75
76   s = format (s,
77               "FLOWPROBE[%s]: rx_sw_if_index %d, tx_sw_if_index %d, "
78               "timestamp %lld, size %d", flowprobe_variant_strings[t->which],
79               t->rx_sw_if_index, t->tx_sw_if_index,
80               t->timestamp, t->buffer_size);
81
82   if (t->which == FLOW_VARIANT_L2)
83     s = format (s, "\n%U -> %U", format_white_space, indent,
84                 format_ethernet_address, &t->src_mac,
85                 format_ethernet_address, &t->dst_mac);
86
87   if (t->protocol > 0
88       && (t->which == FLOW_VARIANT_L2_IP4 || t->which == FLOW_VARIANT_IP4
89           || t->which == FLOW_VARIANT_L2_IP6 || t->which == FLOW_VARIANT_IP6))
90     s =
91       format (s, "\n%U%U: %U -> %U", format_white_space, indent,
92               format_ip_protocol, t->protocol, format_ip46_address,
93               &t->src_address, IP46_TYPE_ANY, format_ip46_address,
94               &t->dst_address, IP46_TYPE_ANY);
95   return s;
96 }
97
98 vlib_node_registration_t flowprobe_ip4_node;
99 vlib_node_registration_t flowprobe_ip6_node;
100 vlib_node_registration_t flowprobe_l2_node;
101
102 /* No counters at the moment */
103 #define foreach_flowprobe_error                 \
104 _(COLLISION, "Hash table collisions")           \
105 _(BUFFER, "Buffer allocation error")            \
106 _(EXPORTED_PACKETS, "Exported packets")         \
107 _(INPATH, "Exported packets in path")
108
109 typedef enum
110 {
111 #define _(sym,str) FLOWPROBE_ERROR_##sym,
112   foreach_flowprobe_error
113 #undef _
114     FLOWPROBE_N_ERROR,
115 } flowprobe_error_t;
116
117 static char *flowprobe_error_strings[] = {
118 #define _(sym,string) string,
119   foreach_flowprobe_error
120 #undef _
121 };
122
123 typedef enum
124 {
125   FLOWPROBE_NEXT_DROP,
126   FLOWPROBE_NEXT_IP4_LOOKUP,
127   FLOWPROBE_N_NEXT,
128 } flowprobe_next_t;
129
130 #define FLOWPROBE_NEXT_NODES {                                  \
131     [FLOWPROBE_NEXT_DROP] = "error-drop",                       \
132     [FLOWPROBE_NEXT_IP4_LOOKUP] = "ip4-lookup",         \
133 }
134
135 static inline flowprobe_variant_t
136 flowprobe_get_variant (flowprobe_variant_t which,
137                        flowprobe_record_t flags, u16 ethertype)
138 {
139   if (which == FLOW_VARIANT_L2
140       && (flags & FLOW_RECORD_L3 || flags & FLOW_RECORD_L4))
141     return ethertype == ETHERNET_TYPE_IP6 ? FLOW_VARIANT_L2_IP6 : ethertype ==
142       ETHERNET_TYPE_IP4 ? FLOW_VARIANT_L2_IP4 : FLOW_VARIANT_L2;
143   return which;
144 }
145
146 static inline u32
147 flowprobe_common_add (vlib_buffer_t * to_b, flowprobe_entry_t * e, u16 offset)
148 {
149   u16 start = offset;
150
151   /* Ingress interface */
152   u32 rx_if = clib_host_to_net_u32 (e->key.rx_sw_if_index);
153   clib_memcpy (to_b->data + offset, &rx_if, sizeof (rx_if));
154   offset += sizeof (rx_if);
155
156   /* Egress interface */
157   u32 tx_if = clib_host_to_net_u32 (e->key.tx_sw_if_index);
158   clib_memcpy (to_b->data + offset, &tx_if, sizeof (tx_if));
159   offset += sizeof (tx_if);
160
161   /* packet delta count */
162   u64 packetdelta = clib_host_to_net_u64 (e->packetcount);
163   clib_memcpy (to_b->data + offset, &packetdelta, sizeof (u64));
164   offset += sizeof (u64);
165
166   return offset - start;
167 }
168
169 static inline u32
170 flowprobe_l2_add (vlib_buffer_t * to_b, flowprobe_entry_t * e, u16 offset)
171 {
172   u16 start = offset;
173
174   /* src mac address */
175   clib_memcpy (to_b->data + offset, &e->key.src_mac, 6);
176   offset += 6;
177
178   /* dst mac address */
179   clib_memcpy (to_b->data + offset, &e->key.dst_mac, 6);
180   offset += 6;
181
182   /* ethertype */
183   clib_memcpy (to_b->data + offset, &e->key.ethertype, 2);
184   offset += 2;
185
186   return offset - start;
187 }
188
189 static inline u32
190 flowprobe_l3_ip6_add (vlib_buffer_t * to_b, flowprobe_entry_t * e, u16 offset)
191 {
192   u16 start = offset;
193
194   /* ip6 src address */
195   clib_memcpy (to_b->data + offset, &e->key.src_address,
196                sizeof (ip6_address_t));
197   offset += sizeof (ip6_address_t);
198
199   /* ip6 dst address */
200   clib_memcpy (to_b->data + offset, &e->key.dst_address,
201                sizeof (ip6_address_t));
202   offset += sizeof (ip6_address_t);
203
204   /* Protocol */
205   to_b->data[offset++] = e->key.protocol;
206
207   /* octetDeltaCount */
208   u64 octetdelta = clib_host_to_net_u64 (e->octetcount);
209   clib_memcpy (to_b->data + offset, &octetdelta, sizeof (u64));
210   offset += sizeof (u64);
211
212   return offset - start;
213 }
214
215 static inline u32
216 flowprobe_l3_ip4_add (vlib_buffer_t * to_b, flowprobe_entry_t * e, u16 offset)
217 {
218   u16 start = offset;
219
220   /* ip4 src address */
221   clib_memcpy (to_b->data + offset, &e->key.src_address.ip4,
222                sizeof (ip4_address_t));
223   offset += sizeof (ip4_address_t);
224
225   /* ip4 dst address */
226   clib_memcpy (to_b->data + offset, &e->key.dst_address.ip4,
227                sizeof (ip4_address_t));
228   offset += sizeof (ip4_address_t);
229
230   /* Protocol */
231   to_b->data[offset++] = e->key.protocol;
232
233   /* octetDeltaCount */
234   u64 octetdelta = clib_host_to_net_u64 (e->octetcount);
235   clib_memcpy (to_b->data + offset, &octetdelta, sizeof (u64));
236   offset += sizeof (u64);
237
238   return offset - start;
239 }
240
241 static inline u32
242 flowprobe_l4_add (vlib_buffer_t * to_b, flowprobe_entry_t * e, u16 offset)
243 {
244   u16 start = offset;
245
246   /* src port */
247   clib_memcpy (to_b->data + offset, &e->key.src_port, 2);
248   offset += 2;
249
250   /* dst port */
251   clib_memcpy (to_b->data + offset, &e->key.dst_port, 2);
252   offset += 2;
253
254   return offset - start;
255 }
256
257 static inline u32
258 flowprobe_hash (flowprobe_key_t * k)
259 {
260   flowprobe_main_t *fm = &flowprobe_main;
261   int i;
262   u32 h = 0;
263   for (i = 0; i < sizeof (k->as_u32) / sizeof (u32); i++)
264     h = crc_u32 (k->as_u32[i], h);
265   return h >> (32 - fm->ht_log2len);
266 }
267
268 flowprobe_entry_t *
269 flowprobe_lookup (u32 my_cpu_number, flowprobe_key_t * k, u32 * poolindex,
270                   bool * collision)
271 {
272   flowprobe_main_t *fm = &flowprobe_main;
273   flowprobe_entry_t *e;
274   u32 h;
275
276   h = (fm->active_timer) ? flowprobe_hash (k) : 0;
277
278   /* Lookup in the flow state pool */
279   *poolindex = fm->hash_per_worker[my_cpu_number][h];
280   if (*poolindex != ~0)
281     {
282       e = pool_elt_at_index (fm->pool_per_worker[my_cpu_number], *poolindex);
283       if (e)
284         {
285           /* Verify key or report collision */
286           if (memcmp (k, &e->key, sizeof (flowprobe_key_t)))
287             *collision = true;
288           return e;
289         }
290     }
291
292   return 0;
293 }
294
295 flowprobe_entry_t *
296 flowprobe_create (u32 my_cpu_number, flowprobe_key_t * k, u32 * poolindex)
297 {
298   flowprobe_main_t *fm = &flowprobe_main;
299   u32 h;
300
301   flowprobe_entry_t *e;
302
303   /* Get my index */
304   h = (fm->active_timer) ? flowprobe_hash (k) : 0;
305
306   pool_get (fm->pool_per_worker[my_cpu_number], e);
307   *poolindex = e - fm->pool_per_worker[my_cpu_number];
308   fm->hash_per_worker[my_cpu_number][h] = *poolindex;
309
310   e->key = *k;
311
312   if (fm->passive_timer > 0)
313     {
314       e->passive_timer_handle = tw_timer_start_2t_1w_2048sl
315         (fm->timers_per_worker[my_cpu_number], *poolindex, 0,
316          fm->passive_timer);
317     }
318   return e;
319 }
320
321 static inline void
322 add_to_flow_record_state (vlib_main_t * vm, vlib_node_runtime_t * node,
323                           flowprobe_main_t * fm, vlib_buffer_t * b,
324                           u64 timestamp, u16 length,
325                           flowprobe_variant_t which, flowprobe_trace_t * t)
326 {
327   if (fm->disabled)
328     return;
329
330   u32 my_cpu_number = vm->thread_index;
331   u16 octets = 0;
332
333   flowprobe_record_t flags = fm->context[which].flags;
334   bool collect_ip4 = false, collect_ip6 = false;
335   ASSERT (b);
336   ethernet_header_t *eth = vlib_buffer_get_current (b);
337   u16 ethertype = clib_net_to_host_u16 (eth->type);
338   /* *INDENT-OFF* */
339   flowprobe_key_t k = { {0} };
340   /* *INDENT-ON* */
341   ip4_header_t *ip4 = 0;
342   ip6_header_t *ip6 = 0;
343   udp_header_t *udp = 0;
344
345   if (flags & FLOW_RECORD_L3 || flags & FLOW_RECORD_L4)
346     {
347       collect_ip4 = which == FLOW_VARIANT_L2_IP4 || which == FLOW_VARIANT_IP4;
348       collect_ip6 = which == FLOW_VARIANT_L2_IP6 || which == FLOW_VARIANT_IP6;
349     }
350
351   k.rx_sw_if_index = vnet_buffer (b)->sw_if_index[VLIB_RX];
352   k.tx_sw_if_index = vnet_buffer (b)->sw_if_index[VLIB_TX];
353
354   k.which = which;
355
356   if (flags & FLOW_RECORD_L2)
357     {
358       clib_memcpy (k.src_mac, eth->src_address, 6);
359       clib_memcpy (k.dst_mac, eth->dst_address, 6);
360       k.ethertype = ethertype;
361     }
362   if (collect_ip6 && ethertype == ETHERNET_TYPE_IP6)
363     {
364       ip6 = (ip6_header_t *) (eth + 1);
365       udp = (udp_header_t *) (ip6 + 1);
366       if (flags & FLOW_RECORD_L3)
367         {
368           k.src_address.as_u64[0] = ip6->src_address.as_u64[0];
369           k.src_address.as_u64[1] = ip6->src_address.as_u64[1];
370           k.dst_address.as_u64[0] = ip6->dst_address.as_u64[0];
371           k.dst_address.as_u64[1] = ip6->dst_address.as_u64[1];
372         }
373       k.protocol = ip6->protocol;
374       octets = clib_net_to_host_u16 (ip6->payload_length)
375         + sizeof (ip6_header_t);
376     }
377   if (collect_ip4 && ethertype == ETHERNET_TYPE_IP4)
378     {
379       ip4 = (ip4_header_t *) (eth + 1);
380       udp = (udp_header_t *) (ip4 + 1);
381       if (flags & FLOW_RECORD_L3)
382         {
383           k.src_address.ip4.as_u32 = ip4->src_address.as_u32;
384           k.dst_address.ip4.as_u32 = ip4->dst_address.as_u32;
385         }
386       k.protocol = ip4->protocol;
387       octets = clib_net_to_host_u16 (ip4->length);
388     }
389   if ((flags & FLOW_RECORD_L4) && udp &&
390       (k.protocol == IP_PROTOCOL_TCP || k.protocol == IP_PROTOCOL_UDP))
391     {
392       k.src_port = udp->src_port;
393       k.dst_port = udp->dst_port;
394     }
395
396   if (t)
397     {
398       t->rx_sw_if_index = k.rx_sw_if_index;
399       t->tx_sw_if_index = k.tx_sw_if_index;
400       clib_memcpy (t->src_mac, k.src_mac, 6);
401       clib_memcpy (t->dst_mac, k.dst_mac, 6);
402       t->ethertype = k.ethertype;
403       t->src_address.ip4.as_u32 = k.src_address.ip4.as_u32;
404       t->dst_address.ip4.as_u32 = k.dst_address.ip4.as_u32;
405       t->protocol = k.protocol;
406       t->src_port = k.src_port;
407       t->dst_port = k.dst_port;
408       t->which = k.which;
409     }
410
411   flowprobe_entry_t *e = 0;
412   f64 now = vlib_time_now (vm);
413   if (fm->active_timer > 0)
414     {
415       u32 poolindex = ~0;
416       bool collision = false;
417
418       e = flowprobe_lookup (my_cpu_number, &k, &poolindex, &collision);
419       if (collision)
420         {
421           /* Flush data and clean up entry for reuse. */
422           if (e->packetcount)
423             flowprobe_export_entry (vm, e);
424           e->key = k;
425           vlib_node_increment_counter (vm, node->node_index,
426                                        FLOWPROBE_ERROR_COLLISION, 1);
427         }
428       if (!e)                   /* Create new entry */
429         {
430           e = flowprobe_create (my_cpu_number, &k, &poolindex);
431           e->last_exported = now;
432         }
433     }
434   else
435     {
436       e = &fm->stateless_entry[my_cpu_number];
437       e->key = k;
438     }
439
440   if (e)
441     {
442       /* Updating entry */
443       e->packetcount++;
444       e->octetcount += octets;
445       e->last_updated = now;
446
447       if (fm->active_timer == 0
448           || (now > e->last_exported + fm->active_timer))
449         flowprobe_export_entry (vm, e);
450     }
451 }
452
453 static u16
454 flowprobe_get_headersize (void)
455 {
456   return sizeof (ip4_header_t) + sizeof (udp_header_t) +
457     sizeof (ipfix_message_header_t) + sizeof (ipfix_set_header_t);
458 }
459
460 static void
461 flowprobe_export_send (vlib_main_t * vm, vlib_buffer_t * b0,
462                        flowprobe_variant_t which)
463 {
464   flowprobe_main_t *fm = &flowprobe_main;
465   flow_report_main_t *frm = &flow_report_main;
466   vlib_frame_t *f;
467   ip4_ipfix_template_packet_t *tp;
468   ipfix_set_header_t *s;
469   ipfix_message_header_t *h;
470   ip4_header_t *ip;
471   udp_header_t *udp;
472   flowprobe_record_t flags = fm->context[which].flags;
473   u32 my_cpu_number = vm->thread_index;
474
475   /* Fill in header */
476   flow_report_stream_t *stream;
477
478   /* Nothing to send */
479   if (fm->context[which].next_record_offset_per_worker[my_cpu_number] <=
480       flowprobe_get_headersize ())
481     return;
482
483   u32 i, index = vec_len (frm->streams);
484   for (i = 0; i < index; i++)
485     if (frm->streams[i].domain_id == 1)
486       {
487         index = i;
488         break;
489       }
490   if (i == vec_len (frm->streams))
491     {
492       vec_validate (frm->streams, index);
493       frm->streams[index].domain_id = 1;
494     }
495   stream = &frm->streams[index];
496
497   tp = vlib_buffer_get_current (b0);
498   ip = (ip4_header_t *) & tp->ip4;
499   udp = (udp_header_t *) (ip + 1);
500   h = (ipfix_message_header_t *) (udp + 1);
501   s = (ipfix_set_header_t *) (h + 1);
502
503   ip->ip_version_and_header_length = 0x45;
504   ip->ttl = 254;
505   ip->protocol = IP_PROTOCOL_UDP;
506   ip->flags_and_fragment_offset = 0;
507   ip->src_address.as_u32 = frm->src_address.as_u32;
508   ip->dst_address.as_u32 = frm->ipfix_collector.as_u32;
509   udp->src_port = clib_host_to_net_u16 (UDP_DST_PORT_ipfix);
510   udp->dst_port = clib_host_to_net_u16 (UDP_DST_PORT_ipfix);
511   udp->checksum = 0;
512
513   /* FIXUP: message header export_time */
514   h->export_time = (u32)
515     (((f64) frm->unix_time_0) +
516      (vlib_time_now (frm->vlib_main) - frm->vlib_time_0));
517   h->export_time = clib_host_to_net_u32 (h->export_time);
518   h->domain_id = clib_host_to_net_u32 (stream->domain_id);
519
520   /* FIXUP: message header sequence_number */
521   h->sequence_number = stream->sequence_number++;
522   h->sequence_number = clib_host_to_net_u32 (h->sequence_number);
523
524   s->set_id_length = ipfix_set_id_length (fm->template_reports[flags],
525                                           b0->current_length -
526                                           (sizeof (*ip) + sizeof (*udp) +
527                                            sizeof (*h)));
528   h->version_length = version_length (b0->current_length -
529                                       (sizeof (*ip) + sizeof (*udp)));
530
531   ip->length = clib_host_to_net_u16 (b0->current_length);
532
533   ip->checksum = ip4_header_checksum (ip);
534   udp->length = clib_host_to_net_u16 (b0->current_length - sizeof (*ip));
535
536   if (frm->udp_checksum)
537     {
538       /* RFC 7011 section 10.3.2. */
539       udp->checksum = ip4_tcp_udp_compute_checksum (vm, b0, ip);
540       if (udp->checksum == 0)
541         udp->checksum = 0xffff;
542     }
543
544   ASSERT (ip->checksum == ip4_header_checksum (ip));
545
546   /* Find or allocate a frame */
547   f = fm->context[which].frames_per_worker[my_cpu_number];
548   if (PREDICT_FALSE (f == 0))
549     {
550       u32 *to_next;
551       f = vlib_get_frame_to_node (vm, ip4_lookup_node.index);
552       fm->context[which].frames_per_worker[my_cpu_number] = f;
553       u32 bi0 = vlib_get_buffer_index (vm, b0);
554
555       /* Enqueue the buffer */
556       to_next = vlib_frame_vector_args (f);
557       to_next[0] = bi0;
558       f->n_vectors = 1;
559     }
560
561   vlib_put_frame_to_node (vm, ip4_lookup_node.index, f);
562   vlib_node_increment_counter (vm, flowprobe_l2_node.index,
563                                FLOWPROBE_ERROR_EXPORTED_PACKETS, 1);
564
565   fm->context[which].frames_per_worker[my_cpu_number] = 0;
566   fm->context[which].buffers_per_worker[my_cpu_number] = 0;
567   fm->context[which].next_record_offset_per_worker[my_cpu_number] =
568     flowprobe_get_headersize ();
569 }
570
571 static vlib_buffer_t *
572 flowprobe_get_buffer (vlib_main_t * vm, flowprobe_variant_t which)
573 {
574   flowprobe_main_t *fm = &flowprobe_main;
575   flow_report_main_t *frm = &flow_report_main;
576   vlib_buffer_t *b0;
577   u32 bi0;
578   vlib_buffer_free_list_t *fl;
579   u32 my_cpu_number = vm->thread_index;
580
581   /* Find or allocate a buffer */
582   b0 = fm->context[which].buffers_per_worker[my_cpu_number];
583
584   /* Need to allocate a buffer? */
585   if (PREDICT_FALSE (b0 == 0))
586     {
587       if (vlib_buffer_alloc (vm, &bi0, 1) != 1)
588         {
589           vlib_node_increment_counter (vm, flowprobe_l2_node.index,
590                                        FLOWPROBE_ERROR_BUFFER, 1);
591           return 0;
592         }
593
594       /* Initialize the buffer */
595       b0 = fm->context[which].buffers_per_worker[my_cpu_number] =
596         vlib_get_buffer (vm, bi0);
597       fl =
598         vlib_buffer_get_free_list (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
599       vlib_buffer_init_for_free_list (b0, fl);
600       VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);
601
602       b0->current_data = 0;
603       b0->current_length = flowprobe_get_headersize ();
604       b0->flags |= (VLIB_BUFFER_TOTAL_LENGTH_VALID | VLIB_BUFFER_FLOW_REPORT);
605       vnet_buffer (b0)->sw_if_index[VLIB_RX] = 0;
606       vnet_buffer (b0)->sw_if_index[VLIB_TX] = frm->fib_index;
607       fm->context[which].next_record_offset_per_worker[my_cpu_number] =
608         b0->current_length;
609     }
610
611   return b0;
612 }
613
614 static void
615 flowprobe_export_entry (vlib_main_t * vm, flowprobe_entry_t * e)
616 {
617   u32 my_cpu_number = vm->thread_index;
618   flowprobe_main_t *fm = &flowprobe_main;
619   flow_report_main_t *frm = &flow_report_main;
620   vlib_buffer_t *b0;
621   bool collect_ip4 = false, collect_ip6 = false;
622   flowprobe_variant_t which = e->key.which;
623   flowprobe_record_t flags = fm->context[which].flags;
624   u16 offset =
625     fm->context[which].next_record_offset_per_worker[my_cpu_number];
626
627   if (offset < flowprobe_get_headersize ())
628     offset = flowprobe_get_headersize ();
629
630   b0 = flowprobe_get_buffer (vm, which);
631   /* No available buffer, what to do... */
632   if (b0 == 0)
633     return;
634
635   if (flags & FLOW_RECORD_L3)
636     {
637       collect_ip4 = which == FLOW_VARIANT_L2_IP4 || which == FLOW_VARIANT_IP4;
638       collect_ip6 = which == FLOW_VARIANT_L2_IP6 || which == FLOW_VARIANT_IP6;
639     }
640
641   offset += flowprobe_common_add (b0, e, offset);
642
643   if (flags & FLOW_RECORD_L2)
644     offset += flowprobe_l2_add (b0, e, offset);
645   if (collect_ip6)
646     offset += flowprobe_l3_ip6_add (b0, e, offset);
647   if (collect_ip4)
648     offset += flowprobe_l3_ip4_add (b0, e, offset);
649   if (flags & FLOW_RECORD_L4)
650     offset += flowprobe_l4_add (b0, e, offset);
651
652   /* Reset per flow-export counters */
653   e->packetcount = 0;
654   e->octetcount = 0;
655   e->last_exported = vlib_time_now (vm);
656
657   b0->current_length = offset;
658
659   fm->context[which].next_record_offset_per_worker[my_cpu_number] = offset;
660   /* Time to flush the buffer? */
661   if (offset + fm->template_size[flags] > frm->path_mtu)
662     flowprobe_export_send (vm, b0, which);
663 }
664
665 uword
666 flowprobe_node_fn (vlib_main_t * vm,
667                    vlib_node_runtime_t * node, vlib_frame_t * frame,
668                    flowprobe_variant_t which)
669 {
670   u32 n_left_from, *from, *to_next;
671   flowprobe_next_t next_index;
672   flowprobe_main_t *fm = &flowprobe_main;
673   u64 now;
674
675   now = (u64) ((vlib_time_now (vm) - fm->vlib_time_0) * 1e9);
676   now += fm->nanosecond_time_0;
677
678   from = vlib_frame_vector_args (frame);
679   n_left_from = frame->n_vectors;
680   next_index = node->cached_next_index;
681
682   while (n_left_from > 0)
683     {
684       u32 n_left_to_next;
685
686       vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
687
688       while (n_left_from >= 4 && n_left_to_next >= 2)
689         {
690           u32 next0 = FLOWPROBE_NEXT_DROP;
691           u32 next1 = FLOWPROBE_NEXT_DROP;
692           u16 len0, len1;
693           u32 bi0, bi1;
694           vlib_buffer_t *b0, *b1;
695
696           /* Prefetch next iteration. */
697           {
698             vlib_buffer_t *p2, *p3;
699
700             p2 = vlib_get_buffer (vm, from[2]);
701             p3 = vlib_get_buffer (vm, from[3]);
702
703             vlib_prefetch_buffer_header (p2, LOAD);
704             vlib_prefetch_buffer_header (p3, LOAD);
705
706             CLIB_PREFETCH (p2->data, CLIB_CACHE_LINE_BYTES, STORE);
707             CLIB_PREFETCH (p3->data, CLIB_CACHE_LINE_BYTES, STORE);
708           }
709
710           /* speculatively enqueue b0 and b1 to the current next frame */
711           to_next[0] = bi0 = from[0];
712           to_next[1] = bi1 = from[1];
713           from += 2;
714           to_next += 2;
715           n_left_from -= 2;
716           n_left_to_next -= 2;
717
718           b0 = vlib_get_buffer (vm, bi0);
719           b1 = vlib_get_buffer (vm, bi1);
720
721           vnet_feature_next (vnet_buffer (b0)->sw_if_index[VLIB_TX],
722                              &next0, b0);
723           vnet_feature_next (vnet_buffer (b1)->sw_if_index[VLIB_TX],
724                              &next1, b1);
725
726           len0 = vlib_buffer_length_in_chain (vm, b0);
727           ethernet_header_t *eh0 = vlib_buffer_get_current (b0);
728           u16 ethertype0 = clib_net_to_host_u16 (eh0->type);
729
730           if (PREDICT_TRUE ((b0->flags & VLIB_BUFFER_FLOW_REPORT) == 0))
731             add_to_flow_record_state (vm, node, fm, b0, now, len0,
732                                       flowprobe_get_variant
733                                       (which, fm->context[which].flags,
734                                        ethertype0), 0);
735
736           len1 = vlib_buffer_length_in_chain (vm, b1);
737           ethernet_header_t *eh1 = vlib_buffer_get_current (b1);
738           u16 ethertype1 = clib_net_to_host_u16 (eh1->type);
739
740           if (PREDICT_TRUE ((b1->flags & VLIB_BUFFER_FLOW_REPORT) == 0))
741             add_to_flow_record_state (vm, node, fm, b1, now, len1,
742                                       flowprobe_get_variant
743                                       (which, fm->context[which].flags,
744                                        ethertype1), 0);
745
746           /* verify speculative enqueues, maybe switch current next frame */
747           vlib_validate_buffer_enqueue_x2 (vm, node, next_index,
748                                            to_next, n_left_to_next,
749                                            bi0, bi1, next0, next1);
750         }
751
752       while (n_left_from > 0 && n_left_to_next > 0)
753         {
754           u32 bi0;
755           vlib_buffer_t *b0;
756           u32 next0 = FLOWPROBE_NEXT_DROP;
757           u16 len0;
758
759           /* speculatively enqueue b0 to the current next frame */
760           bi0 = from[0];
761           to_next[0] = bi0;
762           from += 1;
763           to_next += 1;
764           n_left_from -= 1;
765           n_left_to_next -= 1;
766
767           b0 = vlib_get_buffer (vm, bi0);
768
769           vnet_feature_next (vnet_buffer (b0)->sw_if_index[VLIB_TX],
770                              &next0, b0);
771
772           len0 = vlib_buffer_length_in_chain (vm, b0);
773           ethernet_header_t *eh0 = vlib_buffer_get_current (b0);
774           u16 ethertype0 = clib_net_to_host_u16 (eh0->type);
775
776           if (PREDICT_TRUE ((b0->flags & VLIB_BUFFER_FLOW_REPORT) == 0))
777             {
778               flowprobe_trace_t *t = 0;
779               if (PREDICT_FALSE ((node->flags & VLIB_NODE_FLAG_TRACE)
780                                  && (b0->flags & VLIB_BUFFER_IS_TRACED)))
781                 t = vlib_add_trace (vm, node, b0, sizeof (*t));
782
783               add_to_flow_record_state (vm, node, fm, b0, now, len0,
784                                         flowprobe_get_variant
785                                         (which, fm->context[which].flags,
786                                          ethertype0), t);
787             }
788
789           /* verify speculative enqueue, maybe switch current next frame */
790           vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
791                                            to_next, n_left_to_next,
792                                            bi0, next0);
793         }
794
795       vlib_put_next_frame (vm, node, next_index, n_left_to_next);
796     }
797   return frame->n_vectors;
798 }
799
800 static uword
801 flowprobe_ip4_node_fn (vlib_main_t * vm,
802                        vlib_node_runtime_t * node, vlib_frame_t * frame)
803 {
804   return flowprobe_node_fn (vm, node, frame, FLOW_VARIANT_IP4);
805 }
806
807 static uword
808 flowprobe_ip6_node_fn (vlib_main_t * vm,
809                        vlib_node_runtime_t * node, vlib_frame_t * frame)
810 {
811   return flowprobe_node_fn (vm, node, frame, FLOW_VARIANT_IP6);
812 }
813
814 static uword
815 flowprobe_l2_node_fn (vlib_main_t * vm,
816                       vlib_node_runtime_t * node, vlib_frame_t * frame)
817 {
818   return flowprobe_node_fn (vm, node, frame, FLOW_VARIANT_L2);
819 }
820
821 static inline void
822 flush_record (flowprobe_variant_t which)
823 {
824   vlib_main_t *vm = vlib_get_main ();
825   vlib_buffer_t *b = flowprobe_get_buffer (vm, which);
826   if (b)
827     flowprobe_export_send (vm, b, which);
828 }
829
830 void
831 flowprobe_flush_callback_ip4 (void)
832 {
833   flush_record (FLOW_VARIANT_IP4);
834 }
835
836 void
837 flowprobe_flush_callback_ip6 (void)
838 {
839   flush_record (FLOW_VARIANT_IP6);
840 }
841
842 void
843 flowprobe_flush_callback_l2 (void)
844 {
845   flush_record (FLOW_VARIANT_L2);
846   flush_record (FLOW_VARIANT_L2_IP4);
847   flush_record (FLOW_VARIANT_L2_IP6);
848 }
849
850
851 static void
852 flowprobe_delete_by_index (u32 my_cpu_number, u32 poolindex)
853 {
854   flowprobe_main_t *fm = &flowprobe_main;
855   flowprobe_entry_t *e;
856   u32 h;
857
858   e = pool_elt_at_index (fm->pool_per_worker[my_cpu_number], poolindex);
859
860   /* Get my index */
861   h = flowprobe_hash (&e->key);
862
863   /* Reset hash */
864   fm->hash_per_worker[my_cpu_number][h] = ~0;
865
866   pool_put_index (fm->pool_per_worker[my_cpu_number], poolindex);
867 }
868
869
870 /* Per worker process processing the active/passive expired entries */
871 static uword
872 flowprobe_walker_process (vlib_main_t * vm,
873                           vlib_node_runtime_t * rt, vlib_frame_t * f)
874 {
875   flowprobe_main_t *fm = &flowprobe_main;
876   flow_report_main_t *frm = &flow_report_main;
877   flowprobe_entry_t *e;
878
879   /*
880    * $$$$ Remove this check from here and track FRM status and disable
881    * this process if required.
882    */
883   if (frm->ipfix_collector.as_u32 == 0 || frm->src_address.as_u32 == 0)
884     {
885       fm->disabled = true;
886       return 0;
887     }
888   fm->disabled = false;
889
890   u32 cpu_index = os_get_thread_index ();
891   u32 *to_be_removed = 0, *i;
892
893   /*
894    * Tick the timer when required and process the vector of expired
895    * timers
896    */
897   f64 start_time = vlib_time_now (vm);
898   u32 count = 0;
899
900   tw_timer_expire_timers_2t_1w_2048sl (fm->timers_per_worker[cpu_index],
901                                        start_time);
902
903   vec_foreach (i, fm->expired_passive_per_worker[cpu_index])
904   {
905     u32 exported = 0;
906     f64 now = vlib_time_now (vm);
907     if (now > start_time + 100e-6
908         || exported > FLOW_MAXIMUM_EXPORT_ENTRIES - 1)
909       break;
910
911     if (pool_is_free_index (fm->pool_per_worker[cpu_index], *i))
912       {
913         clib_warning ("Element is %d is freed already\n", *i);
914         continue;
915       }
916     else
917       e = pool_elt_at_index (fm->pool_per_worker[cpu_index], *i);
918
919     /* Check last update timestamp. If it is longer than passive time nuke
920      * entry. Otherwise restart timer with what's left
921      * Premature passive timer by more than 10%
922      */
923     if ((now - e->last_updated) < (fm->passive_timer * 0.9))
924       {
925         f64 delta = fm->passive_timer - (now - e->last_updated);
926         e->passive_timer_handle = tw_timer_start_2t_1w_2048sl
927           (fm->timers_per_worker[cpu_index], *i, 0, delta);
928       }
929     else                        /* Nuke entry */
930       {
931         vec_add1 (to_be_removed, *i);
932       }
933     /* If anything to report send it to the exporter */
934     if (e->packetcount && now > e->last_exported + fm->active_timer)
935       {
936         exported++;
937         flowprobe_export_entry (vm, e);
938       }
939     count++;
940   }
941   if (count)
942     vec_delete (fm->expired_passive_per_worker[cpu_index], count, 0);
943
944   vec_foreach (i, to_be_removed) flowprobe_delete_by_index (cpu_index, *i);
945   vec_free (to_be_removed);
946
947   return 0;
948 }
949
950 /* *INDENT-OFF* */
951 VLIB_REGISTER_NODE (flowprobe_ip4_node) = {
952   .function = flowprobe_ip4_node_fn,
953   .name = "flowprobe-ip4",
954   .vector_size = sizeof (u32),
955   .format_trace = format_flowprobe_trace,
956   .type = VLIB_NODE_TYPE_INTERNAL,
957   .n_errors = ARRAY_LEN(flowprobe_error_strings),
958   .error_strings = flowprobe_error_strings,
959   .n_next_nodes = FLOWPROBE_N_NEXT,
960   .next_nodes = FLOWPROBE_NEXT_NODES,
961 };
962 VLIB_REGISTER_NODE (flowprobe_ip6_node) = {
963   .function = flowprobe_ip6_node_fn,
964   .name = "flowprobe-ip6",
965   .vector_size = sizeof (u32),
966   .format_trace = format_flowprobe_trace,
967   .type = VLIB_NODE_TYPE_INTERNAL,
968   .n_errors = ARRAY_LEN(flowprobe_error_strings),
969   .error_strings = flowprobe_error_strings,
970   .n_next_nodes = FLOWPROBE_N_NEXT,
971   .next_nodes = FLOWPROBE_NEXT_NODES,
972 };
973 VLIB_REGISTER_NODE (flowprobe_l2_node) = {
974   .function = flowprobe_l2_node_fn,
975   .name = "flowprobe-l2",
976   .vector_size = sizeof (u32),
977   .format_trace = format_flowprobe_trace,
978   .type = VLIB_NODE_TYPE_INTERNAL,
979   .n_errors = ARRAY_LEN(flowprobe_error_strings),
980   .error_strings = flowprobe_error_strings,
981   .n_next_nodes = FLOWPROBE_N_NEXT,
982   .next_nodes = FLOWPROBE_NEXT_NODES,
983 };
984 VLIB_REGISTER_NODE (flowprobe_walker_node) = {
985   .function = flowprobe_walker_process,
986   .name = "flowprobe-walker",
987   .type = VLIB_NODE_TYPE_INPUT,
988   .state = VLIB_NODE_STATE_INTERRUPT,
989 };
990 /* *INDENT-ON* */
991
992 /*
993  * fd.io coding-style-patch-verification: ON
994  *
995  * Local Variables:
996  * eval: (c-set-style "gnu")
997  * End:
998  */