+ # enable ip4 datapath for an interface
+ # set active and passive timers
+ ipfix = VppCFLOW(
+ test=self,
+ active=2,
+ passive=3,
+ intf="pg3",
+ layer="l3 l4",
+ datapath="ip4",
+ direction="rx",
+ mtu=100,
+ )
+ ipfix.add_vpp_config()
+
+ # template packet should arrive immediately
+ ipfix_decoder = IPFIXDecoder()
+ templates = ipfix.verify_templates(ipfix_decoder, count=1)
+
+ # make a tcp packet
+ self.pkts = [
+ (
+ Ether(src=self.pg3.remote_mac, dst=self.pg4.local_mac)
+ / IP(src=self.pg3.remote_ip4, dst=self.pg4.remote_ip4)
+ / TCP(sport=1234, dport=4321)
+ / Raw(b"\xa5" * 50)
+ )
+ ]
+
+ # send the tcp packet two times, each time with new set of flags
+ tcp_flags = (
+ TCP_F_SYN | TCP_F_ACK,
+ TCP_F_RST | TCP_F_PSH,
+ )
+ for f in tcp_flags:
+ self.pkts[0][TCP].flags = f
+ capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
+
+ # verify meta info - packet/octet delta and tcp flags
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=6)
+ self.verify_cflow_data(ipfix_decoder, capture, cflow)
+ self.verify_cflow_data_detail(
+ ipfix_decoder,
+ capture,
+ cflow,
+ {
+ IPFIX_TCP_FLAGS_ID: f,
+ IPFIX_SRC_TRANS_PORT_ID: 1234,
+ IPFIX_DST_TRANS_PORT_ID: 4321,
+ },
+ )
+
+ self.collector.get_capture(3)
+
+ # cleanup
+ ipfix.remove_vpp_config()
+
+ def test_interface_dump(self):
+ """Dump interfaces with IPFIX flow record generation enabled"""
+ self.logger.info("FFP_TEST_START_0003")
+
+ # Enable feature for 3 interfaces
+ ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
+ ipfix1.add_vpp_config()
+
+ ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
+ ipfix2.enable_flowprobe_feature()
+
+ ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
+ ipfix3.enable_flowprobe_feature()
+
+ # When request "all", dump should contain all enabled interfaces
+ dump = self.vapi.flowprobe_interface_dump()
+ self.assertEqual(len(dump), 3)
+
+ # Verify 1st interface
+ self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
+ self.assertEqual(
+ dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
+ )
+ self.assertEqual(
+ dump[0].direction,
+ VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
+ )
+
+ # Verify 2nd interface
+ self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
+ self.assertEqual(
+ dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
+ )
+ self.assertEqual(
+ dump[1].direction,
+ VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
+ )
+
+ # Verify 3rd interface
+ self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
+ self.assertEqual(
+ dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
+ )
+ self.assertEqual(
+ dump[2].direction,
+ VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
+ )
+
+ # When request 2nd interface, dump should contain only the specified interface
+ dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
+ self.assertEqual(len(dump), 1)
+
+ # Verify 2nd interface
+ self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
+ self.assertEqual(
+ dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
+ )
+ self.assertEqual(
+ dump[0].direction,
+ VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
+ )
+
+ # When request 99th interface, dump should be empty
+ dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
+ self.assertEqual(len(dump), 0)
+
+ ipfix1.remove_vpp_config()
+ ipfix2.remove_vpp_config()
+ ipfix3.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0003")
+
+ def test_get_params(self):
+ """Get IPFIX flow record generation parameters"""
+ self.logger.info("FFP_TEST_START_0004")
+
+ # Enable feature for an interface with custom parameters
+ ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
+ ipfix.add_vpp_config()
+
+ # Get and verify parameters
+ params = self.vapi.flowprobe_get_params()
+ self.assertEqual(params.active_timer, 20)
+ self.assertEqual(params.passive_timer, 40)
+ record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
+ record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
+ record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
+ self.assertEqual(params.record_flags, record_flags)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0004")
+
+
+class DatapathTestsHolder(object):