#!/usr/bin/env python
+from __future__ import print_function
+import binascii
import random
import socket
import unittest
import time
+import re
from scapy.packet import Raw
from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, UDP
+from scapy.layers.inet import IP, TCP, UDP
from scapy.layers.inet6 import IPv6
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase, VppTestRunner, running_extended_tests
from vpp_object import VppObject
from vpp_pg_interface import CaptureTimeoutError
from util import ppp
from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
+from vpp_ip_route import VppIpRoute, VppRoutePath
class VppCFLOW(VppObject):
"""CFLOW object for IPFIX exporter and Flowprobe feature"""
- def __init__(self, test, intf='pg2', active=0, passive=0, timeout=300,
- mtu=512, datapath='l2', layer='l2 l3 l4'):
+ def __init__(self, test, intf='pg2', active=0, passive=0, timeout=100,
+ mtu=1024, datapath='l2', layer='l2 l3 l4'):
self._test = test
self._intf = intf
self._active = active
if passive == 0 or passive < active:
- self._passive = active+5
+ self._passive = active+1
else:
self._passive = passive
self._datapath = datapath # l2 ip4 ip6
# Test variables
debug_print = False
- max_number_of_packets = 16
+ max_number_of_packets = 10
pkts = []
@classmethod
super(MethodHolder, cls).setUpClass()
try:
# Create pg interfaces
- cls.create_pg_interfaces(range(7))
+ cls.create_pg_interfaces(range(9))
# Packet sizes
cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
cls.pg3.resolve_arp()
cls.pg4.config_ip4()
cls.pg4.resolve_arp()
+ cls.pg7.config_ip4()
+ cls.pg8.config_ip4()
+ cls.pg8.configure_ipv4_neighbors()
cls.pg5.config_ip6()
cls.pg5.resolve_ndp()
p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
else:
p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
- p /= (UDP(sport=1234, dport=4321) / Raw(payload))
+ p /= UDP(sport=1234, dport=4321)
+ p /= Raw(payload)
info.data = p.copy()
self.extend_packet(p, pkt_size)
self.pkts.append(p)
if cflow.haslayer(Data):
data = decoder.decode_data_set(cflow.getlayer(Set))
for record in data:
- self.assertEqual(int(record[1].encode('hex'), 16), octets)
- self.assertEqual(int(record[2].encode('hex'), 16), packets)
+ self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
+ self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
+
+ def send_packets(self, src_if=None, dst_if=None):
+ if src_if is None:
+ src_if = self.pg1
+ if dst_if is None:
+ dst_if = self.pg2
+ self.pg_enable_capture([dst_if])
+ src_if.add_stream(self.pkts)
+ self.pg_start()
+ return dst_if.get_capture(len(self.pkts))
def verify_cflow_data_detail(self, decoder, capture, cflow,
data_set={1: 'octets', 2: 'packets'},
ip_ver='v4'):
if self.debug_print:
- print capture[0].show()
+ print(capture[0].show())
if cflow.haslayer(Data):
data = decoder.decode_data_set(cflow.getlayer(Set))
if self.debug_print:
- print data
+ print(data)
if ip_ver == 'v4':
ip_layer = capture[0][IP]
else:
if data_set is not None:
for record in data:
# skip flow if in/out gress interface is 0
- if int(record[10].encode('hex'), 16) == 0:
+ if int(binascii.hexlify(record[10]), 16) == 0:
continue
- if int(record[14].encode('hex'), 16) == 0:
+ if int(binascii.hexlify(record[14]), 16) == 0:
continue
for field in data_set:
else:
ip = socket.inet_pton(socket.AF_INET6,
ip_layer.src)
- value = int(ip.encode('hex'), 16)
+ value = int(binascii.hexlify(ip), 16)
elif value == 'dst_ip':
if ip_ver == 'v4':
ip = socket.inet_pton(socket.AF_INET,
else:
ip = socket.inet_pton(socket.AF_INET6,
ip_layer.dst)
- value = int(ip.encode('hex'), 16)
+ value = int(binascii.hexlify(ip), 16)
elif value == 'sport':
value = int(capture[0][UDP].sport)
elif value == 'dport':
value = int(capture[0][UDP].dport)
- self.assertEqual(int(record[field].encode('hex'), 16),
+ self.assertEqual(int(binascii.hexlify(
+ record[field]), 16),
value)
def verify_cflow_data_notimer(self, decoder, capture, cflows):
for rec in data:
p = capture[idx]
idx += 1
- self.assertEqual(p[IP].len, int(rec[1].encode('hex'), 16))
- self.assertEqual(1, int(rec[2].encode('hex'), 16))
+ self.assertEqual(p[IP].len, int(
+ binascii.hexlify(rec[1]), 16))
+ self.assertEqual(1, int(
+ binascii.hexlify(rec[2]), 16))
self.assertEqual(len(capture), idx)
def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1,
break
return p
- def send_packets(self, src_if=None, dst_if=None):
- self.sleep(3)
- if src_if is None:
- src_if = self.pg1
- if dst_if is None:
- dst_if = self.pg2
- self.pg_enable_capture([dst_if])
- src_if.add_stream(self.pkts)
- self.pg_start()
- return dst_if.get_capture(len(self.pkts))
-
-class TestFFP_Timers(MethodHolder):
+class Flowprobe(MethodHolder):
"""Template verification, timer tests"""
def test_0001(self):
- """ receive template data packets"""
+ """ timer less than template timeout"""
self.logger.info("FFP_TEST_START_0001")
self.pg_enable_capture(self.pg_interfaces)
-
- ipfix = VppCFLOW(test=self, active=10)
- ipfix.add_vpp_config()
-
- # template packet should arrive immediately
- ipfix.verify_templates(timeout=3)
-
- ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0001")
-
- def test_0002(self):
- """ timer=10s, less than template timeout"""
- self.logger.info("FFP_TEST_START_0002")
- self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, active=20)
+ ipfix = VppCFLOW(test=self, active=2)
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
+ templates = ipfix.verify_templates(ipfix_decoder)
- self.create_stream()
- capture = self.send_packets()
+ self.create_stream(packets=1)
+ self.send_packets()
+ capture = self.pg2.get_capture(1)
# make sure the one packet we expect actually showed up
- cflow = self.wait_for_cflow_packet(self.collector, templates[1], 39)
+ cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
self.verify_cflow_data(ipfix_decoder, capture, cflow)
ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0002")
+ self.logger.info("FFP_TEST_FINISH_0001")
- def test_0003(self):
- """ timer=30s, greater than template timeout"""
- self.logger.info("FFP_TEST_START_0003")
+ def test_0002(self):
+ """ timer greater than template timeout"""
+ self.logger.info("FFP_TEST_START_0002")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, timeout=20, active=25)
+ ipfix = VppCFLOW(test=self, timeout=3, active=4)
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- ipfix.verify_templates(timeout=3)
+ ipfix.verify_templates()
- self.create_stream()
- capture = self.send_packets()
+ self.create_stream(packets=2)
+ self.send_packets()
+ capture = self.pg2.get_capture(2)
# next set of template packet should arrive after 20 seconds
# template packet should arrive within 20 s
- templates = ipfix.verify_templates(ipfix_decoder, timeout=25)
+ templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
# make sure the one packet we expect actually showed up
- cflow = self.wait_for_cflow_packet(self.collector, templates[1], 55)
+ cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
self.verify_cflow_data(ipfix_decoder, capture, cflow)
ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0003")
+ self.logger.info("FFP_TEST_FINISH_0002")
- def test_0004(self):
- """ sent packet after first cflow packet arrived"""
- self.logger.info("FFP_TEST_START_0004")
+ def test_cflow_packet(self):
+ """verify cflow packet fields"""
+ self.logger.info("FFP_TEST_START_0000")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, active=20)
+ ipfix = VppCFLOW(test=self, intf='pg8', datapath="ip4",
+ layer='l2 l3 l4', active=2)
ipfix.add_vpp_config()
+ route_9001 = VppIpRoute(self, "9.0.0.0", 24,
+ [VppRoutePath(self.pg8._remote_hosts[0].ip4,
+ self.pg8.sw_if_index)])
+ route_9001.add_vpp_config()
+
ipfix_decoder = IPFIXDecoder()
- # template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
+ templates = ipfix.verify_templates(ipfix_decoder, count=1)
- self.create_stream()
- self.send_packets()
+ self.pkts = [(Ether(dst=self.pg7.local_mac,
+ src=self.pg7.remote_mac) /
+ IP(src=self.pg7.remote_ip4, dst="9.0.0.100") /
+ TCP(sport=1234, dport=4321, flags=80) /
+ Raw('\xa5' * 100))]
- # make sure the one packet we expect actually showed up
- self.wait_for_cflow_packet(self.collector, templates[1], 39)
+ nowUTC = int(time.time())
+ nowUNIX = nowUTC+2208988800
+ self.send_packets(src_if=self.pg7, dst_if=self.pg8)
- self.pg_enable_capture([self.pg2])
-
- capture = self.send_packets()
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
+ self.collector.get_capture(2)
- # make sure the one packet we expect actually showed up
- cflow = self.wait_for_cflow_packet(self.collector, templates[1], 30)
- self.verify_cflow_data(ipfix_decoder, capture, cflow)
+ if cflow[0].haslayer(IPFIX):
+ self.assertEqual(cflow[IPFIX].version, 10)
+ self.assertEqual(cflow[IPFIX].observationDomainID, 1)
+ self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
+ self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
+ if cflow.haslayer(Data):
+ record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
+ # ingress interface
+ self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
+ # egress interface
+ self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
+ # packets
+ self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
+ # src mac
+ self.assertEqual(':'.join(re.findall('..', record[56].encode(
+ 'hex'))), self.pg8.local_mac)
+ # dst mac
+ self.assertEqual(':'.join(re.findall('..', record[80].encode(
+ 'hex'))), self.pg8.remote_mac)
+ flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
+ # flow start timestamp
+ self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
+ flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
+ # flow end timestamp
+ self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
+ # ethernet type
+ self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
+ # src ip
+ self.assertEqual('.'.join(re.findall('..', record[8].encode(
+ 'hex'))),
+ '.'.join('{:02x}'.format(int(n)) for n in
+ self.pg7.remote_ip4.split('.')))
+ # dst ip
+ self.assertEqual('.'.join(re.findall('..', record[12].encode(
+ 'hex'))),
+ '.'.join('{:02x}'.format(int(n)) for n in
+ "9.0.0.100".split('.')))
+ # protocol (TCP)
+ self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
+ # src port
+ self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
+ # dst port
+ self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
+ # tcp flags
+ self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
ipfix.remove_vpp_config()
- self.logger.info("FFP_TEST_FINISH_0004")
+ self.logger.info("FFP_TEST_FINISH_0000")
-class TestFFP_DatapathL2(MethodHolder):
- """collect information on Ethernet datapath"""
+class Datapath(MethodHolder):
+ """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
- def test_0000(self):
+ def test_templatesL2(self):
""" verify template on L2 datapath"""
self.logger.info("FFP_TEST_START_0000")
self.pg_enable_capture(self.pg_interfaces)
- ipfix = VppCFLOW(test=self, active=10)
+ ipfix = VppCFLOW(test=self, layer='l2')
ipfix.add_vpp_config()
# template packet should arrive immediately
- ipfix.verify_templates(timeout=3, count=3)
- self.collector.get_capture(3)
+ self.vapi.cli("ipfix flush")
+ ipfix.verify_templates(timeout=3, count=1)
+ self.collector.get_capture(1)
ipfix.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0000")
- def test_0001(self):
+ def test_L2onL2(self):
""" L2 data on L2 datapath"""
self.logger.info("FFP_TEST_START_0001")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, active=10, layer='l2')
+ ipfix = VppCFLOW(test=self, layer='l2')
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1)
+ templates = ipfix.verify_templates(ipfix_decoder, count=1)
self.create_stream(packets=1)
capture = self.send_packets()
# make sure the one packet we expect actually showed up
- cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39)
+ self.vapi.cli("ipfix flush")
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 256: 8})
-
- # expected two templates and one cflow packet
self.collector.get_capture(2)
ipfix.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0001")
- def test_0002(self):
+ def test_L3onL2(self):
""" L3 data on L2 datapath"""
self.logger.info("FFP_TEST_START_0002")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, active=10, layer='l3')
+ ipfix = VppCFLOW(test=self, layer='l3')
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=2)
+ templates = ipfix.verify_templates(ipfix_decoder, count=2)
self.create_stream(packets=1)
capture = self.send_packets()
# make sure the one packet we expect actually showed up
- cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39)
+ self.vapi.cli("ipfix flush")
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 4: 17,
8: 'src_ip', 12: 'dst_ip'})
- # expected one template and one cflow packet
self.collector.get_capture(3)
ipfix.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0002")
- def test_0003(self):
+ def test_L4onL2(self):
""" L4 data on L2 datapath"""
self.logger.info("FFP_TEST_START_0003")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, active=10, layer='l4')
+ ipfix = VppCFLOW(test=self, layer='l4')
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=2)
+ templates = ipfix.verify_templates(ipfix_decoder, count=2)
self.create_stream(packets=1)
capture = self.send_packets()
# make sure the one packet we expect actually showed up
- cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39)
+ self.vapi.cli("ipfix flush")
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 7: 'sport', 11: 'dport'})
- # expected one template and one cflow packet
self.collector.get_capture(3)
ipfix.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0003")
-
-class TestFFP_DatapathIP4(MethodHolder):
- """collect information on IP4 datapath"""
-
- def test_0000(self):
+ def test_templatesIp4(self):
""" verify templates on IP4 datapath"""
self.logger.info("FFP_TEST_START_0000")
self.pg_enable_capture(self.pg_interfaces)
- ipfix = VppCFLOW(test=self, active=10, datapath='ip4')
+ ipfix = VppCFLOW(test=self, datapath='ip4')
ipfix.add_vpp_config()
# template packet should arrive immediately
+ self.vapi.cli("ipfix flush")
ipfix.verify_templates(timeout=3, count=1)
self.collector.get_capture(1)
self.logger.info("FFP_TEST_FINISH_0000")
- def test_0001(self):
+ def test_L2onIP4(self):
""" L2 data on IP4 datapath"""
self.logger.info("FFP_TEST_START_0001")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, intf='pg4', active=10,
- layer='l2', datapath='ip4')
+ ipfix = VppCFLOW(test=self, intf='pg4', layer='l2', datapath='ip4')
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1)
+ templates = ipfix.verify_templates(ipfix_decoder, count=1)
self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
# make sure the one packet we expect actually showed up
- cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39)
+ self.vapi.cli("ipfix flush")
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 256: 8})
ipfix.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0001")
- def test_0002(self):
+ def test_L3onIP4(self):
""" L3 data on IP4 datapath"""
self.logger.info("FFP_TEST_START_0002")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, intf='pg4', active=10,
- layer='l3', datapath='ip4')
+ ipfix = VppCFLOW(test=self, intf='pg4', layer='l3', datapath='ip4')
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1)
+ templates = ipfix.verify_templates(ipfix_decoder, count=1)
self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
# make sure the one packet we expect actually showed up
- cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39)
+ self.vapi.cli("ipfix flush")
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{1: 'octets', 2: 'packets',
8: 'src_ip', 12: 'dst_ip'})
ipfix.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0002")
- def test_0003(self):
+ def test_L4onIP4(self):
""" L4 data on IP4 datapath"""
self.logger.info("FFP_TEST_START_0003")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, intf='pg4', active=10,
- layer='l4', datapath='ip4')
+ ipfix = VppCFLOW(test=self, intf='pg4', layer='l4', datapath='ip4')
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1)
+ templates = ipfix.verify_templates(ipfix_decoder, count=1)
self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
# make sure the one packet we expect actually showed up
- cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39)
+ self.vapi.cli("ipfix flush")
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 7: 'sport', 11: 'dport'})
ipfix.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0003")
-
-class TestFFP_DatapathIP6(MethodHolder):
- """collect information on IP6 datapath"""
-
- def test_0000(self):
+ def test_templatesIP6(self):
""" verify templates on IP6 datapath"""
self.logger.info("FFP_TEST_START_0000")
self.pg_enable_capture(self.pg_interfaces)
- ipfix = VppCFLOW(test=self, active=10, datapath='ip6')
+ ipfix = VppCFLOW(test=self, datapath='ip6')
ipfix.add_vpp_config()
# template packet should arrive immediately
- ipfix.verify_templates(timeout=3, count=1)
+ ipfix.verify_templates(count=1)
self.collector.get_capture(1)
ipfix.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0000")
- def test_0001(self):
+ def test_L2onIP6(self):
""" L2 data on IP6 datapath"""
self.logger.info("FFP_TEST_START_0001")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, intf='pg6', active=20,
- layer='l2', datapath='ip6')
+ ipfix = VppCFLOW(test=self, intf='pg6', layer='l2', datapath='ip6')
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1)
+ templates = ipfix.verify_templates(ipfix_decoder, count=1)
self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1,
ip_ver='IPv6')
capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
# make sure the one packet we expect actually showed up
- cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39)
+ self.vapi.cli("ipfix flush")
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 256: 56710},
ip_ver='v6')
ipfix.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0001")
- def test_0002(self):
+ def test_L3onIP6(self):
""" L3 data on IP6 datapath"""
self.logger.info("FFP_TEST_START_0002")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, intf='pg6', active=10,
- layer='l3', datapath='ip6')
+ ipfix = VppCFLOW(test=self, intf='pg6', layer='l3', datapath='ip6')
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1)
+ templates = ipfix.verify_templates(ipfix_decoder, count=1)
self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1,
ip_ver='IPv6')
capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
# make sure the one packet we expect actually showed up
- cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39)
+ self.vapi.cli("ipfix flush")
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets',
27: 'src_ip', 28: 'dst_ip'},
ipfix.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0002")
- def test_0003(self):
+ def test_L4onIP6(self):
""" L4 data on IP6 datapath"""
self.logger.info("FFP_TEST_START_0003")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, intf='pg6', active=10,
- layer='l4', datapath='ip6')
+ ipfix = VppCFLOW(test=self, intf='pg6', layer='l4', datapath='ip6')
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1)
+ templates = ipfix.verify_templates(ipfix_decoder, count=1)
self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1,
ip_ver='IPv6')
capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
# make sure the one packet we expect actually showed up
- cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39)
+ self.vapi.cli("ipfix flush")
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
{2: 'packets', 7: 'sport', 11: 'dport'},
ip_ver='v6')
ipfix.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0003")
-
-class TestFFP_NoTimers(MethodHolder):
- """No timers"""
-
def test_0001(self):
""" no timers, one CFLOW packet, 9 Flows inside"""
self.logger.info("FFP_TEST_START_0001")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, active=0)
+ ipfix = VppCFLOW(test=self)
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
+ templates = ipfix.verify_templates(ipfix_decoder)
self.create_stream(packets=9)
capture = self.send_packets()
# make sure the one packet we expect actually showed up
- cflow = self.wait_for_cflow_packet(self.collector, templates[1], 10)
+ self.vapi.cli("ipfix flush")
+ cflow = self.wait_for_cflow_packet(self.collector, templates[1])
self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
- self.wait_for_cflow_packet(self.collector, templates[1], 10,
- expected=False)
self.collector.get_capture(4)
ipfix.remove_vpp_config()
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, active=0, mtu=256)
+ ipfix = VppCFLOW(test=self, mtu=256)
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
+ self.vapi.cli("ipfix flush")
+ templates = ipfix.verify_templates(ipfix_decoder)
self.create_stream(packets=6)
capture = self.send_packets()
# make sure the one packet we expect actually showed up
cflows = []
+ self.vapi.cli("ipfix flush")
cflows.append(self.wait_for_cflow_packet(self.collector,
- templates[1], 10))
+ templates[1]))
cflows.append(self.wait_for_cflow_packet(self.collector,
- templates[1], 10))
+ templates[1]))
self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
self.collector.get_capture(5)
self.logger.info("FFP_TEST_FINISH_0002")
-class TestFFP_DisableIPFIX(MethodHolder):
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
+class DisableIPFIX(MethodHolder):
"""Disable IPFIX"""
def test_0001(self):
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, active=10)
+ ipfix = VppCFLOW(test=self)
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, timeout=30)
+ templates = ipfix.verify_templates(ipfix_decoder)
self.create_stream()
self.send_packets()
# make sure the one packet we expect actually showed up
- self.wait_for_cflow_packet(self.collector, templates[1], 30)
+ self.vapi.cli("ipfix flush")
+ self.wait_for_cflow_packet(self.collector, templates[1])
self.collector.get_capture(4)
# disble IPFIX
self.send_packets()
# make sure no one packet arrived in 1 minute
- self.wait_for_cflow_packet(self.collector, templates[1], 30,
+ self.vapi.cli("ipfix flush")
+ self.wait_for_cflow_packet(self.collector, templates[1],
expected=False)
self.collector.get_capture(0)
self.logger.info("FFP_TEST_FINISH_0001")
-class TestFFP_ReenableIPFIX(MethodHolder):
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
+class ReenableIPFIX(MethodHolder):
"""Re-enable IPFIX"""
- def test_0001(self):
+ def test_0011(self):
""" disable IPFIX after first packets and re-enable after few packets
"""
self.logger.info("FFP_TEST_START_0001")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, active=10)
+ ipfix = VppCFLOW(test=self)
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
+ templates = ipfix.verify_templates(ipfix_decoder)
- self.create_stream()
+ self.create_stream(packets=5)
self.send_packets()
# make sure the one packet we expect actually showed up
- self.wait_for_cflow_packet(self.collector, templates[1], 30)
+ self.vapi.cli("ipfix flush")
+ self.wait_for_cflow_packet(self.collector, templates[1])
self.collector.get_capture(4)
# disble IPFIX
ipfix.disable_exporter()
+ self.vapi.cli("ipfix flush")
self.pg_enable_capture([self.collector])
self.send_packets()
# make sure no one packet arrived in active timer span
- self.wait_for_cflow_packet(self.collector, templates[1], 30,
+ self.vapi.cli("ipfix flush")
+ self.wait_for_cflow_packet(self.collector, templates[1],
expected=False)
self.collector.get_capture(0)
+ self.pg2.get_capture(5)
# enable IPFIX
ipfix.enable_exporter()
- self.vapi.cli("ipfix flush")
- templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
- self.send_packets()
-
- # make sure the next packets (templates and data) we expect actually
- # showed up
- self.wait_for_cflow_packet(self.collector, templates[1], 30)
- self.collector.get_capture(4)
+ capture = self.collector.get_capture(4)
+ nr_templates = 0
+ nr_data = 0
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ if p.haslayer(Template):
+ nr_templates += 1
+ self.assertTrue(nr_templates, 3)
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ if p.haslayer(Data):
+ nr_data += 1
+ self.assertTrue(nr_templates, 1)
ipfix.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0001")
-class TestFFP_DisableFFP(MethodHolder):
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
+class DisableFP(MethodHolder):
"""Disable Flowprobe feature"""
def test_0001(self):
self.logger.info("FFP_TEST_START_0001")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, active=10)
+ ipfix = VppCFLOW(test=self)
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
- templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
+ templates = ipfix.verify_templates(ipfix_decoder)
self.create_stream()
self.send_packets()
# make sure the one packet we expect actually showed up
- self.wait_for_cflow_packet(self.collector, templates[1], 30)
+ self.vapi.cli("ipfix flush")
+ self.wait_for_cflow_packet(self.collector, templates[1])
self.collector.get_capture(4)
# disble IPFIX
self.send_packets()
# make sure no one packet arrived in active timer span
- self.wait_for_cflow_packet(self.collector, templates[1], 30,
+ self.vapi.cli("ipfix flush")
+ self.wait_for_cflow_packet(self.collector, templates[1],
expected=False)
self.collector.get_capture(0)
self.logger.info("FFP_TEST_FINISH_0001")
-class TestFFP_ReenableFFP(MethodHolder):
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
+class ReenableFP(MethodHolder):
"""Re-enable Flowprobe feature"""
def test_0001(self):
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, active=10)
+ ipfix = VppCFLOW(test=self)
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
+ self.vapi.cli("ipfix flush")
templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
self.create_stream()
self.send_packets()
# make sure the one packet we expect actually showed up
- self.wait_for_cflow_packet(self.collector, templates[1], 30)
+ self.vapi.cli("ipfix flush")
+ self.wait_for_cflow_packet(self.collector, templates[1], 5)
self.collector.get_capture(4)
# disble FPP feature
self.send_packets()
# make sure no one packet arrived in active timer span
- self.wait_for_cflow_packet(self.collector, templates[1], 30,
+ self.vapi.cli("ipfix flush")
+ self.wait_for_cflow_packet(self.collector, templates[1], 5,
expected=False)
self.collector.get_capture(0)
# make sure the next packets (templates and data) we expect actually
# showed up
- self.wait_for_cflow_packet(self.collector, templates[1], 30)
+ self.vapi.cli("ipfix flush")
+ self.wait_for_cflow_packet(self.collector, templates[1], 5)
self.collector.get_capture(4)
ipfix.remove_vpp_config()