Perform class setup before running the testcase
Remove shared memory files, start vpp and connect the vpp-api
"""
+ super(VppTestCase, cls).setUpClass()
gc.collect() # run garbage collection first
random.seed()
cls.logger = get_logger(cls.__name__)
def tearDown(self):
""" Show various debug prints after each test """
+ super(VppTestCase, self).tearDown()
self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
(self.__class__.__name__, self._testMethodName,
self._testMethodDoc))
def setUp(self):
""" Clear trace before running each test"""
+ super(VppTestCase, self).setUp()
self.reporter.send_keep_alive(self)
self.logger.debug("--- setUp() for %s.%s(%s) called ---" %
(self.__class__.__name__, self._testMethodName,
i.resolve_arp()
i.resolve_ndp()
+ @classmethod
+ def tearDownClass(cls):
+ super(ACLPluginConnTestCase, cls).tearDownClass()
+
def tearDown(self):
"""Run standard test teardown and log various show commands
"""
super(BFDAPITestCase, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(BFDAPITestCase, cls).tearDownClass()
+
def setUp(self):
super(BFDAPITestCase, self).setUp()
self.factory = AuthKeyFactory()
self.assertFalse(echo_source.have_usable_ip6)
-@unittest.skipUnless(running_extended_tests, "part of extended tests")
class BFDTestSession(object):
""" BFD session as seen from test framework side """
super(BFD4TestCase, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(BFD4TestCase, cls).tearDownClass()
+
def setUp(self):
super(BFD4TestCase, self).setUp()
self.factory = AuthKeyFactory()
super(BFD6TestCase, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(BFD6TestCase, cls).tearDownClass()
+
def setUp(self):
super(BFD6TestCase, self).setUp()
self.factory = AuthKeyFactory()
vpp_session = None
test_session = None
+ @classmethod
+ def setUpClass(cls):
+ super(BFDFIBTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(BFDFIBTestCase, cls).tearDownClass()
+
def setUp(self):
super(BFDFIBTestCase, self).setUp()
self.create_pg_interfaces(range(1))
super(BFDSHA1TestCase, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(BFDSHA1TestCase, cls).tearDownClass()
+
def setUp(self):
super(BFDSHA1TestCase, self).setUp()
self.factory = AuthKeyFactory()
super(BFDAuthOnOffTestCase, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(BFDAuthOnOffTestCase, cls).tearDownClass()
+
def setUp(self):
super(BFDAuthOnOffTestCase, self).setUp()
self.factory = AuthKeyFactory()
super(BFDCLITestCase, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(BFDCLITestCase, cls).tearDownClass()
+
def setUp(self):
super(BFDCLITestCase, self).setUp()
self.factory = AuthKeyFactory()
miss_next_index=0,
current_data_flag=1,
current_data_offset=data_offset)
- self.assertIsNotNone(r, msg='No response msg for add_del_table')
+ self.assertIsNotNone(r, 'No response msg for add_del_table')
self.acl_tbl_idx[key] = r.new_table_index
def create_classify_session(self, table_index, match, pbr_option=0,
opaque_index=0,
action=pbr_option,
metadata=vrfid)
- self.assertIsNotNone(r, msg='No response msg for add_del_session')
+ self.assertIsNotNone(r, 'No response msg for add_del_session')
def input_acl_set_interface(self, intf, table_index, is_add=1):
"""Configure Input ACL interface
is_add,
intf.sw_if_index,
ip4_table_index=table_index)
- self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
+ self.assertIsNotNone(r, 'No response msg for acl_set_interface')
def output_acl_set_interface(self, intf, table_index, is_add=1):
"""Configure Output ACL interface
is_add,
intf.sw_if_index,
ip4_table_index=table_index)
- self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
+ self.assertIsNotNone(r, 'No response msg for acl_set_interface')
# Tests split to different test case classes because of issue reported in
miss_next_index=0,
current_data_flag=1,
current_data_offset=data_offset)
- self.assertIsNotNone(r, msg='No response msg for add_del_table')
+ self.assertIsNotNone(r, 'No response msg for add_del_table')
self.acl_tbl_idx[key] = r.new_table_index
def create_classify_session(self, table_index, match, vrfid=0, is_add=1):
binascii.unhexlify(match),
opaque_index=0,
metadata=vrfid)
- self.assertIsNotNone(r, msg='No response msg for add_del_session')
+ self.assertIsNotNone(r, 'No response msg for add_del_session')
def input_acl_set_interface(self, intf, table_index, is_add=1):
"""Configure Input ACL interface
is_add,
intf.sw_if_index,
ip6_table_index=table_index)
- self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
+ self.assertIsNotNone(r, 'No response msg for acl_set_interface')
def output_acl_set_interface(self, intf, table_index, is_add=1):
"""Configure Output ACL interface
is_add,
intf.sw_if_index,
ip6_table_index=table_index)
- self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
+ self.assertIsNotNone(r, 'No response msg for acl_set_interface')
class TestClassifierIP6(TestClassifier):
miss_next_index=0,
current_data_flag=1,
current_data_offset=data_offset)
- self.assertIsNotNone(r, msg='No response msg for add_del_table')
+ self.assertIsNotNone(r, 'No response msg for add_del_table')
self.acl_tbl_idx[key] = r.new_table_index
def create_classify_session(self, intf, table_index, match,
table_index,
binascii.unhexlify(match),
hit_next_index=hit_next_index)
- self.assertIsNotNone(r, msg='No response msg for add_del_session')
+ self.assertIsNotNone(r, 'No response msg for add_del_session')
def input_acl_set_interface(self, intf, table_index, is_add=1):
"""Configure Input ACL interface
is_add,
intf.sw_if_index,
l2_table_index=table_index)
- self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
+ self.assertIsNotNone(r, 'No response msg for acl_set_interface')
def output_acl_set_interface(self, intf, table_index, is_add=1):
"""Configure Output ACL interface
is_add,
intf.sw_if_index,
l2_table_index=table_index)
- self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
+ self.assertIsNotNone(r, 'No response msg for acl_set_interface')
def create_hosts(self, count, start=0):
"""
i.resolve_arp()
i.resolve_ndp()
+ @classmethod
+ def tearDownClass(cls):
+ super(ContainerIntegrationTestCase, cls).tearDownClass()
+
def tearDown(self):
"""Run standard test teardown and log various show commands
"""
super(MethodHolder, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(MethodHolder, cls).tearDownClass()
+
def create_stream(self, src_if=None, dst_if=None, packets=None,
size=None, ip_ver='v4'):
"""Create a packet stream to tickle the plugin
class Flowprobe(MethodHolder):
"""Template verification, timer tests"""
+ @classmethod
+ def setUpClass(cls):
+ super(Flowprobe, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(Flowprobe, cls).tearDownClass()
+
def test_0001(self):
""" timer less than template timeout"""
self.logger.info("FFP_TEST_START_0001")
class Datapath(MethodHolder):
"""collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
+ @classmethod
+ def setUpClass(cls):
+ super(Datapath, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(Datapath, cls).tearDownClass()
+
def test_templatesL2(self):
""" verify template on L2 datapath"""
self.logger.info("FFP_TEST_START_0000")
class DisableIPFIX(MethodHolder):
"""Disable IPFIX"""
+ @classmethod
+ def setUpClass(cls):
+ super(DisableIPFIX, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(DisableIPFIX, cls).tearDownClass()
+
def test_0001(self):
""" disable IPFIX after first packets"""
self.logger.info("FFP_TEST_START_0001")
class ReenableIPFIX(MethodHolder):
"""Re-enable IPFIX"""
+ @classmethod
+ def setUpClass(cls):
+ super(ReenableIPFIX, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(ReenableIPFIX, cls).tearDownClass()
+
def test_0011(self):
""" disable IPFIX after first packets and re-enable after few packets
"""
self.wait_for_cflow_packet(self.collector, templates[1])
self.collector.get_capture(4)
- # disble IPFIX
+ # disable IPFIX
ipfix.disable_exporter()
self.vapi.cli("ipfix flush")
self.pg_enable_capture([self.collector])
class DisableFP(MethodHolder):
"""Disable Flowprobe feature"""
+ @classmethod
+ def setUpClass(cls):
+ super(DisableFP, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(DisableFP, cls).tearDownClass()
+
def test_0001(self):
""" disable flowprobe feature after first packets"""
self.logger.info("FFP_TEST_START_0001")
class ReenableFP(MethodHolder):
"""Re-enable Flowprobe feature"""
+ @classmethod
+ def setUpClass(cls):
+ super(ReenableFP, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(ReenableFP, cls).tearDownClass()
+
def test_0001(self):
""" disable flowprobe feature after first packets and re-enable
after few packets """
cls.interfaces = list(cls.pg_interfaces)
def setUp(self):
+ super(TestIPIP6, self).setUp()
for i in self.interfaces:
i.admin_up()
i.config_ip4()
class TestJVpp(VppTestCase):
""" JVPP Core Test Case """
+ @classmethod
+ def setUpClass(cls):
+ super(TestJVpp, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestJVpp, cls).tearDownClass()
+
def invoke_for_jvpp_core(self, api_jar_name, test_class_name):
self.jvpp_connection_test(api_jar_name=api_jar_name,
test_class_name=test_class_name,
portsCheck = dict()
nr_packets = 256
+ @classmethod
+ def setUpClass(cls):
+ super(TestPuntSocket, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestPuntSocket, cls).tearDownClass()
+
@classmethod
def setUpConstants(cls):
cls.extra_vpp_punt_config = [
def tearDown(self):
del self.sock_servers[:]
+ super(TestPuntSocket, self).tearDown()
def socket_client_create(self, sock_name, id=None):
thread = serverSocketThread(id, sock_name, self.portsCheck)
class TestIP4PuntSocket(TestPuntSocket):
""" Punt Socket for IPv4 """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIP4PuntSocket, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIP4PuntSocket, cls).tearDownClass()
+
def setUp(self):
super(TestIP4PuntSocket, self).setUp()
class TestIP6PuntSocket(TestPuntSocket):
""" Punt Socket for IPv6"""
+ @classmethod
+ def setUpClass(cls):
+ super(TestIP6PuntSocket, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIP6PuntSocket, cls).tearDownClass()
+
def setUp(self):
super(TestIP6PuntSocket, self).setUp()
match_n_vectors=(len(mask) - 1) // 32 + 1,
current_data_flag=1,
skip_n_vectors=2) # data offset
- self.assertIsNotNone(r, msg='No response msg for add_del_table')
+ self.assertIsNotNone(r, 'No response msg for add_del_table')
table_index = r.new_table_index
# add the source routign node as a ip6 inacl netxt node
hit_next_index=inacl_next_node_index,
action=3,
metadata=0) # sr policy index
- self.assertIsNotNone(r, msg='No response msg for add_del_session')
+ self.assertIsNotNone(r, 'No response msg for add_del_session')
# log the classify table used in the steering policy
self.logger.info(self.vapi.cli("show classify table"))
sw_if_index=self.pg3.sw_if_index,
ip6_table_index=table_index)
self.assertIsNotNone(r,
- msg='No response msg for input_acl_set_interface')
+ 'No response msg for input_acl_set_interface')
# log the ip6 inacl
self.logger.info(self.vapi.cli("show inacl type ip6"))
sw_if_index=self.pg3.sw_if_index,
ip6_table_index=table_index)
self.assertIsNotNone(r,
- msg='No response msg for input_acl_set_interface')
+ 'No response msg for input_acl_set_interface')
# log the ip6 inacl after cleaning
self.logger.info(self.vapi.cli("show inacl type ip6"))
0,
table_index,
binascii.unhexlify(match))
- self.assertIsNotNone(r, msg='No response msg for add_del_session')
+ self.assertIsNotNone(r, 'No response msg for add_del_session')
r = self.vapi.classify_add_del_table(
0,
binascii.unhexlify(mask),
table_index=table_index)
- self.assertIsNotNone(r, msg='No response msg for add_del_table')
+ self.assertIsNotNone(r, 'No response msg for add_del_table')
self.logger.info(self.vapi.cli("show classify table"))
class VAPITestCase(VppTestCase):
""" VAPI test """
+ @classmethod
+ def setUpClass(cls):
+ super(VAPITestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VAPITestCase, cls).tearDownClass()
+
def test_vapi_c(self):
""" run C VAPI tests """
var = "TEST_DIR"
class VCLTestCase(VppTestCase):
""" VCL Test Class """
- def __init__(self, methodName):
+ @classmethod
+ def setUpClass(cls):
+ super(VCLTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLTestCase, cls).tearDownClass()
+
+ def setUp(self):
var = "VPP_BUILD_DIR"
self.build_dir = os.getenv(var, None)
if self.build_dir is None:
if os.path.isfile("/tmp/ldp_server_af_unix_socket"):
os.remove("/tmp/ldp_server_af_unix_socket")
- super(VCLTestCase, self).__init__(methodName)
+ super(VCLTestCase, self).setUp()
def cut_thru_setup(self):
self.vapi.session_enable_disable(is_enabled=1)
class LDPCutThruTestCase(VCLTestCase):
""" LDP Cut Thru Tests """
+ @classmethod
+ def setUpClass(cls):
+ super(LDPCutThruTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(LDPCutThruTestCase, cls).tearDownClass()
+
def setUp(self):
super(LDPCutThruTestCase, self).setUp()
class VCLCutThruTestCase(VCLTestCase):
""" VCL Cut Thru Tests """
+ @classmethod
+ def setUpClass(cls):
+ super(VCLCutThruTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLCutThruTestCase, cls).tearDownClass()
+
def setUp(self):
super(VCLCutThruTestCase, self).setUp()
class LDPThruHostStackEcho(VCLTestCase):
""" LDP Thru Host Stack Echo """
+ @classmethod
+ def setUpClass(cls):
+ super(LDPThruHostStackEcho, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(LDPThruHostStackEcho, cls).tearDownClass()
+
def setUp(self):
super(LDPThruHostStackEcho, self).setUp()
class VCLThruHostStackEcho(VCLTestCase):
""" VCL Thru Host Stack Echo """
+ @classmethod
+ def setUpClass(cls):
+ super(VCLThruHostStackEcho, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLThruHostStackEcho, cls).tearDownClass()
+
def setUp(self):
super(VCLThruHostStackEcho, self).setUp()
class VCLThruHostStackBidirNsock(VCLTestCase):
""" VCL Thru Host Stack Bidir Nsock """
+ @classmethod
+ def setUpClass(cls):
+ super(VCLThruHostStackBidirNsock, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLThruHostStackBidirNsock, cls).tearDownClass()
+
def setUp(self):
super(VCLThruHostStackBidirNsock, self).setUp()
class LDPThruHostStackBidirNsock(VCLTestCase):
""" LDP Thru Host Stack Bidir Nsock """
+ @classmethod
+ def setUpClass(cls):
+ super(LDPThruHostStackBidirNsock, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(LDPThruHostStackBidirNsock, cls).tearDownClass()
+
def setUp(self):
super(LDPThruHostStackBidirNsock, self).setUp()
class LDPThruHostStackNsock(VCLTestCase):
""" LDP Thru Host Stack Nsock """
+ @classmethod
+ def setUpClass(cls):
+ super(LDPThruHostStackNsock, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(LDPThruHostStackNsock, cls).tearDownClass()
+
def setUp(self):
super(LDPThruHostStackNsock, self).setUp()
class VCLThruHostStackNsock(VCLTestCase):
""" VCL Thru Host Stack Nsock """
+ @classmethod
+ def setUpClass(cls):
+ super(VCLThruHostStackNsock, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLThruHostStackNsock, cls).tearDownClass()
+
def setUp(self):
super(VCLThruHostStackNsock, self).setUp()
class LDPThruHostStackIperf(VCLTestCase):
""" LDP Thru Host Stack Iperf """
+ @classmethod
+ def setUpClass(cls):
+ super(LDPThruHostStackIperf, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(LDPThruHostStackIperf, cls).tearDownClass()
+
def setUp(self):
super(LDPThruHostStackIperf, self).setUp()
class LDPIpv6CutThruTestCase(VCLTestCase):
""" LDP IPv6 Cut Thru Tests """
+ @classmethod
+ def setUpClass(cls):
+ super(LDPIpv6CutThruTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(LDPIpv6CutThruTestCase, cls).tearDownClass()
+
def setUp(self):
super(LDPIpv6CutThruTestCase, self).setUp()
class VCLIpv6CutThruTestCase(VCLTestCase):
""" VCL IPv6 Cut Thru Tests """
+ @classmethod
+ def setUpClass(cls):
+ super(VCLIpv6CutThruTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLIpv6CutThruTestCase, cls).tearDownClass()
+
def setUp(self):
super(VCLIpv6CutThruTestCase, self).setUp()
class VCLIpv6ThruHostStackEcho(VCLTestCase):
""" VCL IPv6 Thru Host Stack Echo """
+ @classmethod
+ def setUpClass(cls):
+ super(VCLIpv6ThruHostStackEcho, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLIpv6ThruHostStackEcho, cls).tearDownClass()
+
def setUp(self):
super(VCLIpv6ThruHostStackEcho, self).setUp()
class VOMTestCase(VppTestCase):
""" VPP Object Model Test """
+ @classmethod
+ def setUpClass(cls):
+ super(VOMTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VOMTestCase, cls).tearDownClass()
+
def test_vom_cpp(self):
""" run C++ VOM tests """
var = "TEST_DIR"
super(TestVxlan6, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestVxlan6, cls).tearDownClass()
+
# Method to define VPP actions before tear down of the test case.
# Overrides tearDown method in VppTestCase class.
# @param self The object pointer.
super(TestVxlanGpe, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestVxlanGpe, cls).tearDownClass()
+
@unittest.skip("test disabled for vxlan-gpe")
def test_mcast_flood(self):
""" inherited from BridgeDomain """