#!/usr/bin/env python
+from __future__ import print_function
+import binascii
import random
import socket
import unittest
def add_vpp_config(self):
self.enable_exporter()
- self._test.vapi.ppcli("flowprobe params record %s active %s "
- "passive %s" % (self._collect, self._active,
- self._passive))
+ self._test.vapi.flowprobe_params(
+ record_l2=1 if 'l2' in self._collect.lower() else 0,
+ record_l3=1 if 'l3' in self._collect.lower() else 0,
+ record_l4=1 if 'l4' in self._collect.lower() else 0,
+ active_timer=self._active, passive_timer=self._passive)
self.enable_flowprobe_feature()
self._test.vapi.cli("ipfix flush")
self._configured = True
(self._intf, self._datapath))
def object_id(self):
- return "ipfix-collector-%s" % (self._src, self.dst)
+ return "ipfix-collector-%s-%s" % (self._src, self.dst)
def query_vpp_config(self):
return self._configured
# Create BD with MAC learning and unknown unicast flooding disabled
# and put interfaces to this BD
cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1)
- cls.vapi.sw_interface_set_l2_bridge(cls.pg1._sw_if_index, bd_id=1)
- cls.vapi.sw_interface_set_l2_bridge(cls.pg2._sw_if_index, bd_id=1)
+ cls.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1)
+ cls.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1)
# Set up all interfaces
for i in cls.pg_interfaces:
super(MethodHolder, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(MethodHolder, cls).tearDownClass()
+
def create_stream(self, src_if=None, dst_if=None, packets=None,
size=None, ip_ver='v4'):
"""Create a packet stream to tickle the plugin
if cflow.haslayer(Data):
data = decoder.decode_data_set(cflow.getlayer(Set))
for record in data:
- self.assertEqual(int(record[1].encode('hex'), 16), octets)
- self.assertEqual(int(record[2].encode('hex'), 16), packets)
+ self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
+ self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
def send_packets(self, src_if=None, dst_if=None):
if src_if is None:
data_set={1: 'octets', 2: 'packets'},
ip_ver='v4'):
if self.debug_print:
- print capture[0].show()
+ print(capture[0].show())
if cflow.haslayer(Data):
data = decoder.decode_data_set(cflow.getlayer(Set))
if self.debug_print:
- print data
+ print(data)
if ip_ver == 'v4':
ip_layer = capture[0][IP]
else:
ip_layer = capture[0][IPv6]
if data_set is not None:
for record in data:
- # skip flow if in/out gress interface is 0
- if int(record[10].encode('hex'), 16) == 0:
+ # skip flow if ingress/egress interface is 0
+ if int(binascii.hexlify(record[10]), 16) == 0:
continue
- if int(record[14].encode('hex'), 16) == 0:
+ if int(binascii.hexlify(record[14]), 16) == 0:
continue
for field in data_set:
else:
ip = socket.inet_pton(socket.AF_INET6,
ip_layer.src)
- value = int(ip.encode('hex'), 16)
+ value = int(binascii.hexlify(ip), 16)
elif value == 'dst_ip':
if ip_ver == 'v4':
ip = socket.inet_pton(socket.AF_INET,
else:
ip = socket.inet_pton(socket.AF_INET6,
ip_layer.dst)
- value = int(ip.encode('hex'), 16)
+ value = int(binascii.hexlify(ip), 16)
elif value == 'sport':
value = int(capture[0][UDP].sport)
elif value == 'dport':
value = int(capture[0][UDP].dport)
- self.assertEqual(int(record[field].encode('hex'), 16),
+ self.assertEqual(int(binascii.hexlify(
+ record[field]), 16),
value)
def verify_cflow_data_notimer(self, decoder, capture, cflows):
for rec in data:
p = capture[idx]
idx += 1
- self.assertEqual(p[IP].len, int(rec[1].encode('hex'), 16))
- self.assertEqual(1, int(rec[2].encode('hex'), 16))
+ self.assertEqual(p[IP].len, int(
+ binascii.hexlify(rec[1]), 16))
+ self.assertEqual(1, int(
+ binascii.hexlify(rec[2]), 16))
self.assertEqual(len(capture), idx)
def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1,
class Flowprobe(MethodHolder):
"""Template verification, timer tests"""
+ @classmethod
+ def setUpClass(cls):
+ super(Flowprobe, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(Flowprobe, cls).tearDownClass()
+
def test_0001(self):
""" timer less than template timeout"""
self.logger.info("FFP_TEST_START_0001")
if cflow.haslayer(Data):
record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
# ingress interface
- self.assertEqual(int(record[10].encode('hex'), 16), 8)
+ self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
# egress interface
- self.assertEqual(int(record[14].encode('hex'), 16), 9)
+ self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
# packets
- self.assertEqual(int(record[2].encode('hex'), 16), 1)
+ self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
# src mac
self.assertEqual(':'.join(re.findall('..', record[56].encode(
'hex'))), self.pg8.local_mac)
# dst mac
self.assertEqual(':'.join(re.findall('..', record[80].encode(
'hex'))), self.pg8.remote_mac)
- flowTimestamp = int(record[156].encode('hex'), 16) >> 32
+ flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
# flow start timestamp
self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
- flowTimestamp = int(record[157].encode('hex'), 16) >> 32
+ flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
# flow end timestamp
self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
# ethernet type
- self.assertEqual(int(record[256].encode('hex'), 16), 8)
+ self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
# src ip
self.assertEqual('.'.join(re.findall('..', record[8].encode(
'hex'))),
'.'.join('{:02x}'.format(int(n)) for n in
"9.0.0.100".split('.')))
# protocol (TCP)
- self.assertEqual(int(record[4].encode('hex'), 16), 6)
+ self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
# src port
- self.assertEqual(int(record[7].encode('hex'), 16), 1234)
+ self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
# dst port
- self.assertEqual(int(record[11].encode('hex'), 16), 4321)
+ self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
# tcp flags
- self.assertEqual(int(record[6].encode('hex'), 16), 80)
+ self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
ipfix.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0000")
class Datapath(MethodHolder):
"""collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
+ @classmethod
+ def setUpClass(cls):
+ super(Datapath, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(Datapath, cls).tearDownClass()
+
def test_templatesL2(self):
""" verify template on L2 datapath"""
self.logger.info("FFP_TEST_START_0000")
ipfix.add_vpp_config()
# template packet should arrive immediately
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
ipfix.verify_templates(timeout=3, count=1)
self.collector.get_capture(1)
capture = self.send_packets()
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 256: 8})
capture = self.send_packets()
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 4: 17,
capture = self.send_packets()
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 7: 'sport', 11: 'dport'})
ipfix.add_vpp_config()
# template packet should arrive immediately
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
ipfix.verify_templates(timeout=3, count=1)
self.collector.get_capture(1)
capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 256: 8})
capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{1: 'octets', 2: 'packets',
capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 7: 'sport', 11: 'dport'})
capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 256: 56710},
capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets',
capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 7: 'sport', 11: 'dport'},
capture = self.send_packets()
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[1])
self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
self.collector.get_capture(4)
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
templates = ipfix.verify_templates(ipfix_decoder)
self.create_stream(packets=6)
# make sure the one packet we expect actually showed up
cflows = []
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
cflows.append(self.wait_for_cflow_packet(self.collector,
templates[1]))
cflows.append(self.wait_for_cflow_packet(self.collector,
self.logger.info("FFP_TEST_FINISH_0002")
-@unittest.skipUnless(running_extended_tests(), "part of extended tests")
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
class DisableIPFIX(MethodHolder):
"""Disable IPFIX"""
+ @classmethod
+ def setUpClass(cls):
+ super(DisableIPFIX, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(DisableIPFIX, cls).tearDownClass()
+
def test_0001(self):
""" disable IPFIX after first packets"""
self.logger.info("FFP_TEST_START_0001")
self.send_packets()
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.wait_for_cflow_packet(self.collector, templates[1])
self.collector.get_capture(4)
- # disble IPFIX
+ # disable IPFIX
ipfix.disable_exporter()
self.pg_enable_capture([self.collector])
self.send_packets()
# make sure no one packet arrived in 1 minute
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.wait_for_cflow_packet(self.collector, templates[1],
expected=False)
self.collector.get_capture(0)
self.logger.info("FFP_TEST_FINISH_0001")
-@unittest.skipUnless(running_extended_tests(), "part of extended tests")
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
class ReenableIPFIX(MethodHolder):
"""Re-enable IPFIX"""
+ @classmethod
+ def setUpClass(cls):
+ super(ReenableIPFIX, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(ReenableIPFIX, cls).tearDownClass()
+
def test_0011(self):
""" disable IPFIX after first packets and re-enable after few packets
"""
self.send_packets()
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.wait_for_cflow_packet(self.collector, templates[1])
self.collector.get_capture(4)
- # disble IPFIX
+ # disable IPFIX
ipfix.disable_exporter()
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.pg_enable_capture([self.collector])
self.send_packets()
# make sure no one packet arrived in active timer span
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.wait_for_cflow_packet(self.collector, templates[1],
expected=False)
self.collector.get_capture(0)
self.logger.info("FFP_TEST_FINISH_0001")
-@unittest.skipUnless(running_extended_tests(), "part of extended tests")
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
class DisableFP(MethodHolder):
"""Disable Flowprobe feature"""
+ @classmethod
+ def setUpClass(cls):
+ super(DisableFP, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(DisableFP, cls).tearDownClass()
+
def test_0001(self):
""" disable flowprobe feature after first packets"""
self.logger.info("FFP_TEST_START_0001")
self.send_packets()
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.wait_for_cflow_packet(self.collector, templates[1])
self.collector.get_capture(4)
- # disble IPFIX
+ # disable IPFIX
ipfix.disable_flowprobe_feature()
self.pg_enable_capture([self.collector])
self.send_packets()
# make sure no one packet arrived in active timer span
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.wait_for_cflow_packet(self.collector, templates[1],
expected=False)
self.collector.get_capture(0)
self.logger.info("FFP_TEST_FINISH_0001")
-@unittest.skipUnless(running_extended_tests(), "part of extended tests")
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
class ReenableFP(MethodHolder):
"""Re-enable Flowprobe feature"""
+ @classmethod
+ def setUpClass(cls):
+ super(ReenableFP, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(ReenableFP, cls).tearDownClass()
+
def test_0001(self):
""" disable flowprobe feature after first packets and re-enable
after few packets """
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
self.create_stream()
self.send_packets()
# make sure the one packet we expect actually showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.wait_for_cflow_packet(self.collector, templates[1], 5)
self.collector.get_capture(4)
- # disble FPP feature
+ # disable FPP feature
ipfix.disable_flowprobe_feature()
self.pg_enable_capture([self.collector])
self.send_packets()
# make sure no one packet arrived in active timer span
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.wait_for_cflow_packet(self.collector, templates[1], 5,
expected=False)
self.collector.get_capture(0)
# enable FPP feature
ipfix.enable_flowprobe_feature()
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
self.send_packets()
# make sure the next packets (templates and data) we expect actually
# showed up
- self.vapi.cli("ipfix flush")
+ self.vapi.ipfix_flush()
self.wait_for_cflow_packet(self.collector, templates[1], 5)
self.collector.get_capture(4)