Make test support for the ipfix flow-per-pkt plugin 81/4481/2
authorDave Barach <dave@barachs.net>
Fri, 23 Dec 2016 20:15:48 +0000 (15:15 -0500)
committerDamjan Marion <dmarion.lists@gmail.com>
Fri, 23 Dec 2016 21:52:47 +0000 (21:52 +0000)
Change-Id: I7a0d37fc2bc21dbbff1cea1b92dc24d43f971eec
Signed-off-by: Dave Barach <dave@barachs.net>
plugins/flowperpkt-plugin/flowperpkt/l2_node.c
plugins/flowperpkt-plugin/flowperpkt/node.c
test/test_flowperpkt.py [new file with mode: 0644]
vnet/vnet/flow/flow_report.c

index ba87d43..1c2f681 100644 (file)
@@ -79,6 +79,7 @@ static char *flowperpkt_l2_error_strings[] = {
 typedef enum
 {
   FLOWPERPKT_L2_NEXT_DROP,
+  FLOWPERPKT_L2_NEXT_IP4_LOOKUP,
   FLOWPERPKT_L2_N_NEXT,
 } flowperpkt_l2_next_t;
 
@@ -95,6 +96,7 @@ typedef enum
 
 static inline void
 add_to_flow_record_l2 (vlib_main_t * vm,
+                      vlib_node_runtime_t * node,
                       flowperpkt_main_t * fm,
                       u32 rx_sw_if_index, u32 tx_sw_if_index,
                       u8 * src_mac, u8 * dst_mac,
@@ -284,6 +286,18 @@ add_to_flow_record_l2 (vlib_main_t * vm,
 
       ASSERT (ip->checksum == ip4_header_checksum (ip));
 
+      if (PREDICT_FALSE (vlib_get_trace_count (vm, node) > 0))
+       {
+         vlib_trace_buffer (vm, node, FLOWPERPKT_L2_NEXT_IP4_LOOKUP, b0,
+                            0 /* follow chain */ );
+         flowperpkt_l2_trace_t *t =
+           vlib_add_trace (vm, node, b0, sizeof (*t));
+         memset (t, 0, sizeof (*t));
+         t->rx_sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_RX];
+         t->tx_sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_TX];
+         t->buffer_size = b0->current_length;
+       }
+
       vlib_put_frame_to_node (vm, ip4_lookup_node.index,
                              fm->l2_frames_per_worker[my_cpu_number]);
       fm->l2_frames_per_worker[my_cpu_number] = 0;
@@ -299,8 +313,10 @@ flowperpkt_flush_callback_l2 (void)
 {
   vlib_main_t *vm = vlib_get_main ();
   flowperpkt_main_t *fm = &flowperpkt_main;
+  vlib_node_runtime_t *node;
+  node = vlib_node_get_runtime (vm, flowperpkt_l2_node.index);
 
-  add_to_flow_record_l2 (vm, fm, 0 /* rx_sw_if_index */ ,
+  add_to_flow_record_l2 (vm, node, fm, 0 /* rx_sw_if_index */ ,
                         0 /* tx_sw_if_index */ ,
                         0 /* src mac */ ,
                         0 /* dst mac */ ,
@@ -376,7 +392,7 @@ flowperpkt_l2_node_fn (vlib_main_t * vm,
          len0 = vlib_buffer_length_in_chain (vm, b0);
 
          if (PREDICT_TRUE ((b0->flags & VLIB_BUFFER_FLOW_REPORT) == 0))
-           add_to_flow_record_l2 (vm, fm,
+           add_to_flow_record_l2 (vm, node, fm,
                                   vnet_buffer (b0)->sw_if_index[VLIB_RX],
                                   vnet_buffer (b0)->sw_if_index[VLIB_TX],
                                   eh0->src_address,
@@ -387,7 +403,7 @@ flowperpkt_l2_node_fn (vlib_main_t * vm,
          len1 = vlib_buffer_length_in_chain (vm, b0);
 
          if (PREDICT_TRUE ((b1->flags & VLIB_BUFFER_FLOW_REPORT) == 0))
-           add_to_flow_record_l2 (vm, fm,
+           add_to_flow_record_l2 (vm, node, fm,
                                   vnet_buffer (b1)->sw_if_index[VLIB_RX],
                                   vnet_buffer (b1)->sw_if_index[VLIB_TX],
                                   eh1->src_address,
@@ -453,7 +469,7 @@ flowperpkt_l2_node_fn (vlib_main_t * vm,
          len0 = vlib_buffer_length_in_chain (vm, b0);
 
          if (PREDICT_TRUE ((b0->flags & VLIB_BUFFER_FLOW_REPORT) == 0))
-           add_to_flow_record_l2 (vm, fm,
+           add_to_flow_record_l2 (vm, node, fm,
                                   vnet_buffer (b0)->sw_if_index[VLIB_RX],
                                   vnet_buffer (b0)->sw_if_index[VLIB_TX],
                                   eh0->src_address,
@@ -531,6 +547,7 @@ VLIB_REGISTER_NODE (flowperpkt_l2_node) = {
   /* edit / add dispositions here */
   .next_nodes = {
     [FLOWPERPKT_L2_NEXT_DROP] = "error-drop",
+    [FLOWPERPKT_L2_NEXT_IP4_LOOKUP] = "ip4-lookup",
   },
 };
 /* *INDENT-ON* */
index 460aa3b..f77f087 100644 (file)
@@ -78,6 +78,7 @@ static char *flowperpkt_ipv4_error_strings[] = {
 typedef enum
 {
   FLOWPERPKT_IPV4_NEXT_DROP,
+  FLOWPERPKT_IPV4_NEXT_LOOKUP,
   FLOWPERPKT_IPV4_N_NEXT,
 } flowperpkt_ipv4_next_t;
 
@@ -94,6 +95,7 @@ typedef enum
 
 static inline void
 add_to_flow_record_ipv4 (vlib_main_t * vm,
+                        vlib_node_runtime_t * node,
                         flowperpkt_main_t * fm,
                         u32 rx_sw_if_index, u32 tx_sw_if_index,
                         u32 src_address, u32 dst_address,
@@ -282,6 +284,21 @@ add_to_flow_record_ipv4 (vlib_main_t * vm,
 
       ASSERT (ip->checksum == ip4_header_checksum (ip));
 
+      if (PREDICT_FALSE (vlib_get_trace_count (vm, node) > 0))
+       {
+         vlib_trace_buffer (vm, node, FLOWPERPKT_IPV4_NEXT_LOOKUP, b0,
+                            0 /* follow chain */ );
+         flowperpkt_ipv4_trace_t *t =
+           vlib_add_trace (vm, node, b0, sizeof (*t));
+         t->rx_sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_RX];
+         t->tx_sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_TX];
+         t->src_address = 0;
+         t->dst_address = 0;
+         t->tos = 0;
+         t->timestamp = 0;
+         t->buffer_size = b0->current_length;
+       }
+
       vlib_put_frame_to_node (vm, ip4_lookup_node.index,
                              fm->ipv4_frames_per_worker[my_cpu_number]);
       fm->ipv4_frames_per_worker[my_cpu_number] = 0;
@@ -297,8 +314,10 @@ flowperpkt_flush_callback_ipv4 (void)
 {
   vlib_main_t *vm = vlib_get_main ();
   flowperpkt_main_t *fm = &flowperpkt_main;
+  vlib_node_runtime_t *node;
+  node = vlib_node_get_runtime (vm, flowperpkt_ipv4_node.index);
 
-  add_to_flow_record_ipv4 (vm, fm, 0 /* rx_sw_if_index */ ,
+  add_to_flow_record_ipv4 (vm, node, fm, 0 /* rx_sw_if_index */ ,
                           0 /* tx_sw_if_index */ ,
                           0 /* src_address */ ,
                           0 /* dst_address */ ,
@@ -376,7 +395,7 @@ flowperpkt_ipv4_node_fn (vlib_main_t * vm,
          len0 = vlib_buffer_length_in_chain (vm, b0);
 
          if (PREDICT_TRUE ((b0->flags & VLIB_BUFFER_FLOW_REPORT) == 0))
-           add_to_flow_record_ipv4 (vm, fm,
+           add_to_flow_record_ipv4 (vm, node, fm,
                                     vnet_buffer (b0)->sw_if_index[VLIB_RX],
                                     vnet_buffer (b0)->sw_if_index[VLIB_TX],
                                     ip0->src_address.as_u32,
@@ -388,7 +407,7 @@ flowperpkt_ipv4_node_fn (vlib_main_t * vm,
          len1 = vlib_buffer_length_in_chain (vm, b1);
 
          if (PREDICT_TRUE ((b1->flags & VLIB_BUFFER_FLOW_REPORT) == 0))
-           add_to_flow_record_ipv4 (vm, fm,
+           add_to_flow_record_ipv4 (vm, node, fm,
                                     vnet_buffer (b1)->sw_if_index[VLIB_RX],
                                     vnet_buffer (b1)->sw_if_index[VLIB_TX],
                                     ip1->src_address.as_u32,
@@ -462,7 +481,7 @@ flowperpkt_ipv4_node_fn (vlib_main_t * vm,
          len0 = vlib_buffer_length_in_chain (vm, b0);
 
          if (PREDICT_TRUE ((b0->flags & VLIB_BUFFER_FLOW_REPORT) == 0))
-           add_to_flow_record_ipv4 (vm, fm,
+           add_to_flow_record_ipv4 (vm, node, fm,
                                     vnet_buffer (b0)->sw_if_index[VLIB_RX],
                                     vnet_buffer (b0)->sw_if_index[VLIB_TX],
                                     ip0->src_address.as_u32,
@@ -540,6 +559,8 @@ VLIB_REGISTER_NODE (flowperpkt_ipv4_node) = {
   /* edit / add dispositions here */
   .next_nodes = {
     [FLOWPERPKT_IPV4_NEXT_DROP] = "error-drop",
+    /* Used only to trace ipfix data packets */
+    [FLOWPERPKT_IPV4_NEXT_LOOKUP] = "ip4-lookup",
   },
 };
 /* *INDENT-ON* */
diff --git a/test/test_flowperpkt.py b/test/test_flowperpkt.py
new file mode 100644 (file)
index 0000000..af68a69
--- /dev/null
@@ -0,0 +1,222 @@
+#!/usr/bin/env python
+
+import unittest
+import socket
+import binascii
+import time
+
+from framework import VppTestCase, VppTestRunner
+
+from scapy.packet import Raw
+from scapy.layers.l2 import Ether
+from scapy.layers.inet import IP, UDP
+from scapy.utils import hexdump
+from util import ppp
+
+class TestFlowperpkt(VppTestCase):
+    """ Flow-per-packet plugin: test both L2 and IP4 reporting """
+
+    def setUp(self):
+        """
+        Set up
+
+        **Config:**
+            - create three PG interfaces
+            - create a couple of loopback interfaces
+        """
+        super(TestFlowperpkt, self).setUp()
+
+        self.create_pg_interfaces(range(3))
+
+        self.pg_if_packet_sizes = [150]
+
+        self.interfaces = list(self.pg_interfaces)
+
+        for intf in self.interfaces:
+            intf.admin_up()
+            intf.config_ip4()
+            intf.resolve_arp()
+
+    def tearDown(self):
+        """Run standard test teardown"""
+        super(TestFlowperpkt, self).tearDown()
+
+
+    def create_stream(self, src_if, dst_if, packet_sizes):
+        """Create a packet stream to tickle the plugin
+
+        :param VppInterface src_if: Source interface for packet stream
+        :param VppInterface src_if: Dst interface for packet stream
+        :param list packet_sizes: Sizes to test
+        """
+        pkts = []
+        for size in packet_sizes:
+            info = self.create_packet_info(src_if.sw_if_index, 
+                                           dst_if.sw_if_index)
+            payload = self.info_to_payload(info)
+            p = (Ether(src=src_if.local_mac, dst=dst_if.remote_mac) /
+                 IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
+                 UDP(sport=1234, dport=4321) /
+                 Raw(payload))
+            info.data = p.copy()
+            self.extend_packet(p, size)
+            pkts.append(p)
+        return pkts
+
+    def verify_ipfix(self, collector_if):
+        """Check the ipfix capture"""
+        found_data_packet = 0
+        found_template_packet = 0
+        found_l2_data_packet = 0
+        found_l2_template_packet = 0
+
+        # Scapy, of course, understands ipfix not at all...
+        # These data vetted by manual inspection in wireshark
+        # X'ed out fields are timestamps, which will absolutely
+        # fail to compare. At L2, kill the pg src MAC address, which
+        # is random.
+        
+        data_udp_string = "1283128300370000000a002fXXXXXXXX00000000000000010100001f0000000100000002ac100102ac10020200XXXXXXXXXXXXXXXX0092"
+
+        template_udp_string = "12831283003c0000000a0034XXXXXXXX00000002000000010002002401000007000a0004000e000400080004000c000400050001009c000801380002"
+
+        l2_data_udp_string =  "12831283003c0000000a0034XXXXXXXX0000000100000001010100240000000100000002XXXXXXXXXXXX02020000ff020008XXXXXXXXXXXXXXXX0092"
+
+        l2_template_udp_string = "12831283003c0000000a0034XXXXXXXX00000002000000010002002401010007000a0004000e0004003800060050000601000002009c000801380002"
+
+        cap_x = "X"
+        data_udp_len = len(data_udp_string)
+        template_udp_len = len(template_udp_string)
+        l2_data_udp_len = len(l2_data_udp_string)
+        l2_template_udp_len = len(l2_template_udp_string)
+
+        self.logger.info("Look for ipfix packets on %s sw_if_index %d " 
+                         % (collector_if.name, collector_if.sw_if_index))
+        capture = collector_if.get_capture()
+
+        for p in capture:
+            data_result = ""
+            template_result = ""
+            l2_data_result = ""
+            l2_template_result = ""
+            unmasked_result = ""
+            ip = p[IP]
+            udp = p[UDP]
+            self.logger.info("src %s dst %s" % (ip.src, ip.dst))
+            self.logger.info(" udp src_port %s dst_port %s" 
+                             % (udp.sport, udp.dport))
+
+            # Hex-dump the UDP datagram 4 ways in parallel
+            # X'ing out incomparable fields
+            # Python completely bites at this sort of thing, of course
+
+            x = str(udp)
+            l = len(x)
+            i = 0
+            while i < l:
+                # If current index within range
+                if i < data_udp_len/2:
+                    # See if we're supposed to don't care the data
+                    if ord(data_udp_string[i*2]) == ord(cap_x[0]):
+                        data_result = data_result + "XX"
+                    else:
+                        data_result = data_result + ("%02x" % ord(x[i]))
+                else:
+                    # index out of range, emit actual data
+                    # The test will fail, but it may help debug, etc.
+                    data_result = data_result + ("%02x" % ord(x[i]))
+                    
+                if i < template_udp_len/2:
+                    if ord(template_udp_string[i*2]) == ord(cap_x[0]):
+                        template_result = template_result + "XX"
+                    else:
+                        template_result = template_result + ("%02x" % ord(x[i]))
+                else:
+                    template_result = template_result + ("%02x" % ord(x[i]))
+
+                if i < l2_data_udp_len/2:
+                    # See if we're supposed to don't care the data
+                    if ord(l2_data_udp_string[i*2]) == ord(cap_x[0]):
+                        l2_data_result = l2_data_result + "XX"
+                    else:
+                        l2_data_result = l2_data_result + ("%02x" % ord(x[i]))
+                else:
+                    # index out of range, emit actual data
+                    # The test will fail, but it may help debug, etc.
+                    l2_data_result = l2_data_result + ("%02x" % ord(x[i]))
+                
+                if i < l2_template_udp_len/2:
+                    if ord(l2_template_udp_string[i*2]) == ord(cap_x[0]):
+                        l2_template_result = l2_template_result + "XX"
+                    else:
+                        l2_template_result = l2_template_result + ("%02x" % ord(x[i]))
+                else:
+                    l2_template_result = l2_template_result + ("%02x" % ord(x[i]))
+                # In case we need to 
+                unmasked_result = unmasked_result + ("%02x" % ord(x[i]))
+
+                i = i + 1
+
+            if data_result == data_udp_string:
+                self.logger.info ("found ip4 data packet")
+                found_data_packet = 1
+            elif template_result == template_udp_string:
+                self.logger.info ("found ip4 template packet")
+                found_template_packet = 1
+            elif l2_data_result == l2_data_udp_string:
+                self.logger.info ("found l2 data packet")
+                found_l2_data_packet = 1
+            elif l2_template_result == l2_template_udp_string:
+                self.logger.info ("found l2 template packet")
+                found_l2_template_packet = 1
+            else:
+                self.logger.info ("unknown pkt '%s'" % unmasked_result)
+                
+        self.assertTrue (found_data_packet == 1)
+        self.assertTrue (found_template_packet == 1)
+        self.assertTrue (found_l2_data_packet == 1)
+        self.assertTrue (found_l2_template_packet == 1)
+
+    def test_L3_fpp(self):
+        """ Flow per packet L3 test """
+
+        # Configure an ipfix report on the [nonexistent] collector
+        # 172.16.3.2, as if it was connected to the pg2 interface
+        # Install a FIB entry, so the exporter's work won't turn into
+        # an ARP request
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.vapi.cli("set ip arp pg2 172.16.3.2 dead.beef.0002")
+        self.logger.info(self.vapi.cli("set ipfix exporter collector 172.16.3.2 src 172.16.3.1 path-mtu 1450 template-interval 1"))
+
+        # Export flow records for all pkts transmitted on pg1
+
+        self.logger.info(self.vapi.cli("flowperpkt feature add-del pg1"))
+        self.logger.info(self.vapi.cli("flowperpkt feature add-del pg1 l2"))
+
+        # Arrange to minimally trace generated ipfix packets
+        self.logger.info(self.vapi.cli("trace add flowperpkt-ipv4 10"))
+        self.logger.info(self.vapi.cli("trace add flowperpkt-l2 10"))
+
+        # Create a stream from pg0 -> pg1, which causes
+        # an ipfix packet to be transmitted on pg2
+        
+        pkts = self.create_stream(self.pg0, self.pg1, 
+                                  self.pg_if_packet_sizes)
+        self.pg0.add_stream(pkts)
+        self.pg_start()
+        
+        # Flush the ipfix collector, so we don't need any
+        # asinine time.sleep(5) action
+
+        self.logger.info(self.vapi.cli("ipfix flush"))
+        
+        # Make sure the 4 pkts we expect actually showed up
+        self.verify_ipfix(self.pg2)
+
+if __name__ == '__main__':
+    unittest.main(testRunner=VppTestRunner)
+            
+        
+    
+        
index 4633328..c78a78a 100644 (file)
@@ -189,7 +189,9 @@ flow_report_process (vlib_main_t * vm,
 
   while (1) 
     {
-      vlib_process_suspend (vm, 5.0);
+      vlib_process_wait_for_event_or_clock (vm, 5.0);
+      event_type = vlib_process_get_events (vm, &event_data);
+      vec_reset_length (event_data);
       
       vec_foreach (fr, frm->reports)
         {
@@ -465,6 +467,24 @@ VLIB_CLI_COMMAND (set_ipfix_exporter_command, static) = {
     .function = set_ipfix_exporter_command_fn,
 };
 
+
+static clib_error_t *
+ipfix_flush_command_fn (vlib_main_t * vm,
+                        unformat_input_t * input,
+                        vlib_cli_command_t * cmd)
+{
+  /* poke the flow reporting process */
+  vlib_process_signal_event (vm, flow_report_process_node.index,
+                             1, 0);
+  return 0;
+}
+
+VLIB_CLI_COMMAND (ipfix_flush_command, static) = {
+    .path = "ipfix flush",
+    .short_help = "flush the current ipfix data [for make test]",
+    .function = ipfix_flush_command_fn,
+};
+
 static clib_error_t * 
 flow_report_init (vlib_main_t *vm)
 {