ipfix-export: Add APIs to get/send buffers
[vpp.git] / src / vnet / ipfix-export / flow_report.c
index 38c2454..55c3b4d 100644 (file)
@@ -15,6 +15,7 @@
 /*
  * flow_report.c
  */
+#include <vppinfra/atomics.h>
 #include <vnet/ipfix-export/flow_report.h>
 #include <vnet/api_errno.h>
 #include <vnet/udp/udp.h>
@@ -238,6 +239,135 @@ vnet_flow_rewrite_generic_callback (ipfix_exporter_t *exp, flow_report_t *fr,
   return rewrite;
 }
 
+vlib_buffer_t *
+vnet_ipfix_exp_get_buffer (vlib_main_t *vm, ipfix_exporter_t *exp,
+                          flow_report_t *fr, u32 thread_index)
+{
+  u32 bi0;
+  vlib_buffer_t *b0;
+
+  if (fr->per_thread_data[thread_index].buffer)
+    return fr->per_thread_data[thread_index].buffer;
+
+  if (vlib_buffer_alloc (vm, &bi0, 1) != 1)
+    return NULL;
+
+  /* Initialize the buffer */
+  b0 = fr->per_thread_data[thread_index].buffer = vlib_get_buffer (vm, bi0);
+
+  b0->current_data = 0;
+  b0->current_length = exp->all_headers_size;
+  b0->flags |= (VLIB_BUFFER_TOTAL_LENGTH_VALID | VNET_BUFFER_F_FLOW_REPORT);
+  vnet_buffer (b0)->sw_if_index[VLIB_RX] = 0;
+  vnet_buffer (b0)->sw_if_index[VLIB_TX] = exp->fib_index;
+  fr->per_thread_data[thread_index].next_data_offset = b0->current_length;
+
+  return b0;
+}
+
+/*
+ * Send a buffer that is mostly populated. Has flow records but needs some
+ * header fields updated.
+ */
+void
+vnet_ipfix_exp_send_buffer (vlib_main_t *vm, ipfix_exporter_t *exp,
+                           flow_report_t *fr, flow_report_stream_t *stream,
+                           u32 thread_index, vlib_buffer_t *b0)
+{
+  flow_report_main_t *frm = &flow_report_main;
+  vlib_frame_t *f;
+  ip4_ipfix_template_packet_t *tp;
+  ipfix_set_header_t *s;
+  ipfix_message_header_t *h;
+  ip4_header_t *ip;
+  udp_header_t *udp;
+
+  /* nothing to send */
+  if (fr->per_thread_data[thread_index].next_data_offset <=
+      exp->all_headers_size)
+    return;
+
+  tp = vlib_buffer_get_current (b0);
+  ip = (ip4_header_t *) &tp->ip4;
+  udp = (udp_header_t *) (ip + 1);
+  h = (ipfix_message_header_t *) (udp + 1);
+  s = (ipfix_set_header_t *) (h + 1);
+
+  ip->ip_version_and_header_length = 0x45;
+  ip->ttl = 254;
+  ip->protocol = IP_PROTOCOL_UDP;
+  ip->flags_and_fragment_offset = 0;
+  ip->src_address.as_u32 = exp->src_address.as_u32;
+  ip->dst_address.as_u32 = exp->ipfix_collector.as_u32;
+  udp->src_port = clib_host_to_net_u16 (stream->src_port);
+  udp->dst_port = clib_host_to_net_u16 (exp->collector_port);
+  udp->checksum = 0;
+
+  /* FIXUP: message header export_time */
+  h->export_time =
+    (u32) (((f64) frm->unix_time_0) + (vlib_time_now (vm) - frm->vlib_time_0));
+  h->export_time = clib_host_to_net_u32 (h->export_time);
+  h->domain_id = clib_host_to_net_u32 (stream->domain_id);
+
+  /*
+   * RFC 7011: Section 3.2
+   *
+   * Incremental sequence counter modulo 2^32 of all IPFIX Data Records
+   * sent in the current stream from the current Observation Domain by
+   * the Exporting Process
+   */
+  h->sequence_number =
+    clib_atomic_fetch_add (&stream->sequence_number,
+                          fr->per_thread_data[thread_index].n_data_records);
+  h->sequence_number = clib_host_to_net_u32 (h->sequence_number);
+
+  /*
+   * For data records we use the template ID as the set ID.
+   * RFC 7011: 3.4.3
+   */
+  s->set_id_length = ipfix_set_id_length (
+    fr->template_id,
+    b0->current_length - (sizeof (*ip) + sizeof (*udp) + sizeof (*h)));
+  h->version_length =
+    version_length (b0->current_length - (sizeof (*ip) + sizeof (*udp)));
+
+  ip->length = clib_host_to_net_u16 (b0->current_length);
+
+  ip->checksum = ip4_header_checksum (ip);
+  udp->length = clib_host_to_net_u16 (b0->current_length - sizeof (*ip));
+
+  if (exp->udp_checksum)
+    {
+      /* RFC 7011 section 10.3.2. */
+      udp->checksum = ip4_tcp_udp_compute_checksum (vm, b0, ip);
+      if (udp->checksum == 0)
+       udp->checksum = 0xffff;
+    }
+
+  ASSERT (ip4_header_checksum_is_valid (ip));
+
+  /* Find or allocate a frame */
+  f = fr->per_thread_data[thread_index].frame;
+  if (PREDICT_FALSE (f == 0))
+    {
+      u32 *to_next;
+      f = vlib_get_frame_to_node (vm, ip4_lookup_node.index);
+      fr->per_thread_data[thread_index].frame = f;
+      u32 bi0 = vlib_get_buffer_index (vm, b0);
+
+      /* Enqueue the buffer */
+      to_next = vlib_frame_vector_args (f);
+      to_next[0] = bi0;
+      f->n_vectors = 1;
+    }
+
+  vlib_put_frame_to_node (vm, ip4_lookup_node.index, f);
+
+  fr->per_thread_data[thread_index].frame = NULL;
+  fr->per_thread_data[thread_index].buffer = NULL;
+  fr->per_thread_data[thread_index].next_data_offset = 0;
+}
+
 static uword
 flow_report_process (vlib_main_t * vm,
                     vlib_node_runtime_t * rt, vlib_frame_t * f)
@@ -346,6 +476,10 @@ vnet_flow_report_add_del (ipfix_exporter_t *exp,
   flow_report_t *fr;
   flow_report_stream_t *stream;
   u32 si;
+  vlib_thread_main_t *tm = &vlib_thread_main;
+  flow_report_main_t *frm = &flow_report_main;
+  vlib_main_t *vm = frm->vlib_main;
+  int size;
 
   si = find_stream (exp, a->domain_id, a->src_port);
   if (si == -2)
@@ -371,6 +505,19 @@ vnet_flow_report_add_del (ipfix_exporter_t *exp,
     {
       if (found_index != ~0)
        {
+         for (int i = 0;
+              i < vec_len (exp->reports[found_index].per_thread_data); i++)
+           {
+             u32 bi;
+             if (exp->reports[found_index].per_thread_data[i].buffer)
+               {
+                 bi = vlib_get_buffer_index (
+                   vm, exp->reports[found_index].per_thread_data[i].buffer);
+                 vlib_buffer_free (vm, &bi, 1);
+               }
+           }
+         vec_free (exp->reports[found_index].per_thread_data);
+
          vec_delete (exp->reports, 1, found_index);
          stream = &exp->streams[si];
          stream->n_reports--;
@@ -410,6 +557,14 @@ vnet_flow_report_add_del (ipfix_exporter_t *exp,
   fr->report_elements = a->report_elements;
   fr->n_report_elements = a->n_report_elements;
   fr->stream_indexp = a->stream_indexp;
+  vec_validate (fr->per_thread_data, tm->n_threads);
+  /* Store the flow_report index back in the args struct */
+  a->flow_report_index = fr - exp->reports;
+
+  size = 0;
+  for (int i = 0; i < fr->n_report_elements; i++)
+    size += fr->report_elements[i].size;
+  fr->data_record_size = size;
   if (template_id)
     *template_id = fr->template_id;
 
@@ -539,6 +694,11 @@ set_ipfix_exporter_command_fn (vlib_main_t * vm,
   if (path_mtu < 68)
     return clib_error_return (0, "too small path-mtu value, minimum is 68");
 
+  /* Calculate how much header data we need. */
+  exp->all_headers_size = sizeof (ip4_header_t) + sizeof (udp_header_t) +
+                         sizeof (ipfix_message_header_t) +
+                         sizeof (ipfix_set_header_t);
+
   /* Reset report streams if we are reconfiguring IP addresses */
   if (exp->ipfix_collector.as_u32 != collector.as_u32 ||
       exp->src_address.as_u32 != src.as_u32 ||