-#!/usr/bin/env python
+#!/usr/bin/env python3
from __future__ import print_function
import binascii
import random
from scapy.layers.inet import IP, TCP, UDP
from scapy.layers.inet6 import IPv6
+from framework import tag_fixme_vpp_workers
from framework import VppTestCase, VppTestRunner, running_extended_tests
+from framework import tag_run_solo
from vpp_object import VppObject
from vpp_pg_interface import CaptureTimeoutError
from util import ppp
from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
from vpp_ip_route import VppIpRoute, VppRoutePath
+from vpp_papi.macaddress import mac_ntop
+from socket import inet_ntop
+from vpp_papi import VppEnum
class VppCFLOW(VppObject):
def add_vpp_config(self):
self.enable_exporter()
- self._test.vapi.ppcli("flowprobe params record %s active %s "
- "passive %s" % (self._collect, self._active,
- self._passive))
+ l2_flag = 0
+ l3_flag = 0
+ l4_flag = 0
+ if 'l2' in self._collect.lower():
+ l2_flag = (VppEnum.vl_api_flowprobe_record_flags_t.
+ FLOWPROBE_RECORD_FLAG_L2)
+ if 'l3' in self._collect.lower():
+ l3_flag = (VppEnum.vl_api_flowprobe_record_flags_t.
+ FLOWPROBE_RECORD_FLAG_L3)
+ if 'l4' in self._collect.lower():
+ l4_flag = (VppEnum.vl_api_flowprobe_record_flags_t.
+ FLOWPROBE_RECORD_FLAG_L4)
+ self._test.vapi.flowprobe_params(
+ record_flags=(l2_flag | l3_flag | l4_flag),
+ active_timer=self._active, passive_timer=self._passive)
self.enable_flowprobe_feature()
self._test.vapi.cli("ipfix flush")
self._configured = True
def enable_exporter(self):
self._test.vapi.set_ipfix_exporter(
- collector_address=self._test.pg0.remote_ip4n,
- src_address=self._test.pg0.local_ip4n,
+ collector_address=self._test.pg0.remote_ip4,
+ src_address=self._test.pg0.local_ip4,
path_mtu=self._mtu,
template_interval=self._timeout)
(self._intf, self._datapath))
def object_id(self):
- return "ipfix-collector-%s" % (self._src, self.dst)
+ return "ipfix-collector-%s-%s" % (self._src, self.dst)
def query_vpp_config(self):
return self._configured
# Create BD with MAC learning and unknown unicast flooding disabled
# and put interfaces to this BD
cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1)
- cls.vapi.sw_interface_set_l2_bridge(cls.pg1._sw_if_index, bd_id=1)
- cls.vapi.sw_interface_set_l2_bridge(cls.pg2._sw_if_index, bd_id=1)
+ cls.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1)
+ cls.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1)
# Set up all interfaces
for i in cls.pg_interfaces:
ip_layer = capture[0][IPv6]
if data_set is not None:
for record in data:
- # skip flow if in/out gress interface is 0
+ # skip flow if ingress/egress interface is 0
if int(binascii.hexlify(record[10]), 16) == 0:
continue
if int(binascii.hexlify(record[14]), 16) == 0:
return p
+@tag_run_solo
+@tag_fixme_vpp_workers
class Flowprobe(MethodHolder):
"""Template verification, timer tests"""
src=self.pg7.remote_mac) /
IP(src=self.pg7.remote_ip4, dst="9.0.0.100") /
TCP(sport=1234, dport=4321, flags=80) /
- Raw('\xa5' * 100))]
+ Raw(b'\xa5' * 100))]
nowUTC = int(time.time())
nowUNIX = nowUTC+2208988800
# packets
self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
# src mac
- self.assertEqual(':'.join(re.findall('..', record[56].encode(
- 'hex'))), self.pg8.local_mac)
+ self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
# dst mac
- self.assertEqual(':'.join(re.findall('..', record[80].encode(
- 'hex'))), self.pg8.remote_mac)
+ self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
# flow start timestamp
self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
# ethernet type
self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
# src ip
- self.assertEqual('.'.join(re.findall('..', record[8].encode(
- 'hex'))),
- '.'.join('{:02x}'.format(int(n)) for n in
- self.pg7.remote_ip4.split('.')))
+ self.assertEqual(inet_ntop(socket.AF_INET, record[8]),
+ self.pg7.remote_ip4)
# dst ip
- self.assertEqual('.'.join(re.findall('..', record[12].encode(
- 'hex'))),
- '.'.join('{:02x}'.format(int(n)) for n in
- "9.0.0.100".split('.')))
+ self.assertEqual(inet_ntop(socket.AF_INET, record[12]),
+ "9.0.0.100")
# protocol (TCP)
self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
# src port
self.logger.info("FFP_TEST_FINISH_0000")
+@tag_fixme_vpp_workers
class Datapath(MethodHolder):
"""collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
ipfix.add_vpp_config()
# template packet should arrive immediately
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
ipfix.verify_templates(timeout=3, count=1)
self.collector.get_capture(1)
capture = self.send_packets()
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 256: 8})
capture = self.send_packets()
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 4: 17,
capture = self.send_packets()
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 7: 'sport', 11: 'dport'})
ipfix.add_vpp_config()
# template packet should arrive immediately
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
ipfix.verify_templates(timeout=3, count=1)
self.collector.get_capture(1)
capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 256: 8})
capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{1: 'octets', 2: 'packets',
capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 7: 'sport', 11: 'dport'})
capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 256: 56710},
capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets',
capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 7: 'sport', 11: 'dport'},
capture = self.send_packets()
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[1])
self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
self.collector.get_capture(4)
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
templates = ipfix.verify_templates(ipfix_decoder)
self.create_stream(packets=6)
# make sure the one packet we expect actually showed up
cflows = []
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflows.append(self.wait_for_cflow_packet(self.collector,
templates[1]))
cflows.append(self.wait_for_cflow_packet(self.collector,
self.send_packets()
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.wait_for_cflow_packet(self.collector, templates[1])
self.collector.get_capture(4)
- # disble IPFIX
+ # disable IPFIX
ipfix.disable_exporter()
self.pg_enable_capture([self.collector])
self.send_packets()
# make sure no one packet arrived in 1 minute
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.wait_for_cflow_packet(self.collector, templates[1],
expected=False)
self.collector.get_capture(0)
self.send_packets()
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.wait_for_cflow_packet(self.collector, templates[1])
self.collector.get_capture(4)
# disable IPFIX
ipfix.disable_exporter()
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.pg_enable_capture([self.collector])
self.send_packets()
# make sure no one packet arrived in active timer span
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.wait_for_cflow_packet(self.collector, templates[1],
expected=False)
self.collector.get_capture(0)
self.send_packets()
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.wait_for_cflow_packet(self.collector, templates[1])
self.collector.get_capture(4)
- # disble IPFIX
+ # disable IPFIX
ipfix.disable_flowprobe_feature()
self.pg_enable_capture([self.collector])
self.send_packets()
# make sure no one packet arrived in active timer span
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.wait_for_cflow_packet(self.collector, templates[1],
expected=False)
self.collector.get_capture(0)
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
self.create_stream()
self.send_packets()
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.wait_for_cflow_packet(self.collector, templates[1], 5)
self.collector.get_capture(4)
- # disble FPP feature
+ # disable FPP feature
ipfix.disable_flowprobe_feature()
self.pg_enable_capture([self.collector])
self.send_packets()
# make sure no one packet arrived in active timer span
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.wait_for_cflow_packet(self.collector, templates[1], 5,
expected=False)
self.collector.get_capture(0)
# enable FPP feature
ipfix.enable_flowprobe_feature()
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
self.send_packets()
# make sure the next packets (templates and data) we expect actually
# showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.wait_for_cflow_packet(self.collector, templates[1], 5)
self.collector.get_capture(4)