import tempfile
import time
import resource
+import faulthandler
from collections import deque
from threading import Thread, Event
from inspect import getdoc
#: Store the index of the destination packet generator interface
#: of the packet.
dst = -1
+ #: Store expected ip version
+ ip = -1
+ #: Store expected upper protocol
+ proto = -1
#: Store the copy of the former packet.
data = None
# of properly terminating the loop
+def running_extended_tests():
+ try:
+ s = os.getenv("EXTENDED_TESTS")
+ return True if s.lower() in ("y", "yes", "1") else False
+ except:
+ return False
+ return False
+
+
class VppTestCase(unittest.TestCase):
"""This subclass is a base class for VPP test cases that are implemented as
classes. It provides methods to create and run test case.
return
dl = d.lower()
if dl == "core":
- if resource.getrlimit(resource.RLIMIT_CORE)[0] <= 0:
- # give a heads up if this is actually useless
- print(colorize("WARNING: core size limit is set 0, core files "
- "will NOT be created", RED))
cls.debug_core = True
elif dl == "gdb":
cls.debug_gdb = True
cls.set_debug_flags(d)
cls.vpp_bin = os.getenv('VPP_TEST_BIN', "vpp")
cls.plugin_path = os.getenv('VPP_TEST_PLUGIN_PATH')
+ cls.extern_plugin_path = os.getenv('EXTERN_PLUGINS')
+ plugin_path = None
+ if cls.plugin_path is not None:
+ if cls.extern_plugin_path is not None:
+ plugin_path = "%s:%s" % (
+ cls.plugin_path, cls.extern_plugin_path)
+ else:
+ plugin_path = cls.plugin_path
+ elif cls.extern_plugin_path is not None:
+ plugin_path = cls.extern_plugin_path
debug_cli = ""
if cls.step or cls.debug_gdb or cls.debug_gdbserver:
debug_cli = "cli-listen localhost:5002"
- cls.vpp_cmdline = [cls.vpp_bin,
- "unix", "{", "nodaemon", debug_cli, "}",
- "api-segment", "{", "prefix", cls.shm_prefix, "}"]
- if cls.plugin_path is not None:
- cls.vpp_cmdline.extend(["plugin_path", cls.plugin_path])
+ coredump_size = None
+ try:
+ size = os.getenv("COREDUMP_SIZE")
+ if size is not None:
+ coredump_size = "coredump-size %s" % size
+ except:
+ pass
+ if coredump_size is None:
+ coredump_size = "coredump-size unlimited"
+ cls.vpp_cmdline = [cls.vpp_bin, "unix",
+ "{", "nodaemon", debug_cli, coredump_size, "}",
+ "api-trace", "{", "on", "}",
+ "api-segment", "{", "prefix", cls.shm_prefix, "}",
+ "plugins", "{", "plugin", "dpdk_plugin.so", "{",
+ "disable", "}", "}"]
+ if plugin_path is not None:
+ cls.vpp_cmdline.extend(["plugin_path", plugin_path])
cls.logger.info("vpp_cmdline: %s" % cls.vpp_cmdline)
@classmethod
cls.logger = getLogger(cls.__name__)
cls.tempdir = tempfile.mkdtemp(
prefix='vpp-unittest-' + cls.__name__ + '-')
- file_handler = FileHandler("%s/log.txt" % cls.tempdir)
- file_handler.setFormatter(
+ cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
+ cls.file_handler.setFormatter(
Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
datefmt="%H:%M:%S"))
- file_handler.setLevel(DEBUG)
- cls.logger.addHandler(file_handler)
+ cls.file_handler.setLevel(DEBUG)
+ cls.logger.addHandler(cls.file_handler)
cls.shm_prefix = cls.tempdir.split("/")[-1]
os.chdir(cls.tempdir)
cls.logger.info("Temporary dir is %s, shm prefix is %s",
cls.verbose = 0
cls.vpp_dead = False
cls.registry = VppObjectRegistry()
- print(double_line_delim)
- print(colorize(getdoc(cls).splitlines()[0], YELLOW))
- print(double_line_delim)
+ cls.vpp_startup_failed = False
# need to catch exceptions here because if we raise, then the cleanup
# doesn't get called and we might end with a zombie vpp
try:
hook = PollHook(cls)
cls.vapi.register_hook(hook)
cls.sleep(0.1, "after vpp startup, before initial poll")
- hook.poll_vpp()
+ try:
+ hook.poll_vpp()
+ except:
+ cls.vpp_startup_failed = True
+ cls.logger.critical(
+ "VPP died shortly after startup, check the"
+ " output to standard error for possible cause")
+ raise
try:
cls.vapi.connect()
except:
cls.vpp.communicate()
del cls.vpp
+ if cls.vpp_startup_failed:
+ stdout_log = cls.logger.info
+ stderr_log = cls.logger.critical
+ else:
+ stdout_log = cls.logger.info
+ stderr_log = cls.logger.info
+
if hasattr(cls, 'vpp_stdout_deque'):
- cls.logger.info(single_line_delim)
- cls.logger.info('VPP output to stdout while running %s:',
- cls.__name__)
- cls.logger.info(single_line_delim)
- f = open(cls.tempdir + '/vpp_stdout.txt', 'w')
+ stdout_log(single_line_delim)
+ stdout_log('VPP output to stdout while running %s:', cls.__name__)
+ stdout_log(single_line_delim)
vpp_output = "".join(cls.vpp_stdout_deque)
- f.write(vpp_output)
- cls.logger.info('\n%s', vpp_output)
- cls.logger.info(single_line_delim)
+ with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
+ f.write(vpp_output)
+ stdout_log('\n%s', vpp_output)
+ stdout_log(single_line_delim)
if hasattr(cls, 'vpp_stderr_deque'):
- cls.logger.info(single_line_delim)
- cls.logger.info('VPP output to stderr while running %s:',
- cls.__name__)
- cls.logger.info(single_line_delim)
- f = open(cls.tempdir + '/vpp_stderr.txt', 'w')
+ stderr_log(single_line_delim)
+ stderr_log('VPP output to stderr while running %s:', cls.__name__)
+ stderr_log(single_line_delim)
vpp_output = "".join(cls.vpp_stderr_deque)
- f.write(vpp_output)
- cls.logger.info('\n%s', vpp_output)
- cls.logger.info(single_line_delim)
+ with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
+ f.write(vpp_output)
+ stderr_log('\n%s', vpp_output)
+ stderr_log(single_line_delim)
@classmethod
def tearDownClass(cls):
""" Perform final cleanup after running all tests in this test-case """
cls.quit()
+ cls.file_handler.close()
def tearDown(self):
""" Show various debug prints after each test """
self._testMethodDoc))
if not self.vpp_dead:
self.logger.debug(self.vapi.cli("show trace"))
- self.logger.info(self.vapi.ppcli("show int"))
+ self.logger.info(self.vapi.ppcli("show interface"))
self.logger.info(self.vapi.ppcli("show hardware"))
self.logger.info(self.vapi.ppcli("show error"))
self.logger.info(self.vapi.ppcli("show run"))
self.registry.remove_vpp_config(self.logger)
+ # Save/Dump VPP api trace log
+ api_trace = "vpp_api_trace.%s.log" % self._testMethodName
+ tmp_api_trace = "/tmp/%s" % api_trace
+ vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
+ self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
+ self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
+ vpp_api_trace_log))
+ os.rename(tmp_api_trace, vpp_api_trace_log)
+ self.logger.info(self.vapi.ppcli("api trace dump %s" %
+ vpp_api_trace_log))
+ else:
+ self.registry.unregister_all(self.logger)
def setUp(self):
""" Clear trace before running each test"""
:returns: string containing serialized data from packet info
"""
- return "%d %d %d" % (info.index, info.src, info.dst)
+ return "%d %d %d %d %d" % (info.index, info.src, info.dst,
+ info.ip, info.proto)
@staticmethod
def payload_to_info(payload):
info.index = int(numbers[0])
info.src = int(numbers[1])
info.dst = int(numbers[2])
+ info.ip = int(numbers[3])
+ info.proto = int(numbers[4])
return info
def get_next_packet_info(self, info):
def assert_equal(self, real_value, expected_value, name_or_class=None):
if name_or_class is None:
- self.assertEqual(real_value, expected_value, msg)
+ self.assertEqual(real_value, expected_value)
return
try:
msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
@classmethod
def sleep(cls, timeout, remark=None):
if hasattr(cls, 'logger'):
- cls.logger.debug("Sleeping for %ss (%s)" % (timeout, remark))
+ cls.logger.debug("Starting sleep for %ss (%s)" % (timeout, remark))
+ before = time.time()
time.sleep(timeout)
+ after = time.time()
+ if after - before > 2 * timeout:
+ cls.logger.error(
+ "time.sleep() derp! slept for %ss instead of ~%ss!" % (
+ after - before, timeout))
+ if hasattr(cls, 'logger'):
+ cls.logger.debug(
+ "Finished sleep (%s) - slept %ss (wanted %ss)" % (
+ remark, after - before, timeout))
+
+
+class TestCasePrinter(object):
+ _shared_state = {}
+
+ def __init__(self):
+ self.__dict__ = self._shared_state
+ if not hasattr(self, "_test_case_set"):
+ self._test_case_set = set()
+
+ def print_test_case_heading_if_first_time(self, case):
+ if case.__class__ not in self._test_case_set:
+ print(double_line_delim)
+ print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
+ print(double_line_delim)
+ self._test_case_set.add(case.__class__)
class VppTestResult(unittest.TestResult):
self.descriptions = descriptions
self.verbosity = verbosity
self.result_string = None
+ self.printer = TestCasePrinter()
def addSuccess(self, test):
"""
:param test:
"""
+ self.printer.print_test_case_heading_if_first_time(test)
unittest.TestResult.startTest(self, test)
if self.verbosity > 0:
self.stream.writeln(
:param test:
"""
- gc.disable() # disable garbage collection, we'll do that manually
+ faulthandler.enable() # emit stack trace to stderr if killed by signal
print("Running tests using custom test runner") # debug message
filter_file, filter_class, filter_func = self.parse_test_option()
print("Active filters: file=%s, class=%s, function=%s" % (
filter_func)
print("%s out of %s tests match specified filters" % (
filtered.countTestCases(), test.countTestCases()))
+ if not running_extended_tests():
+ print("Not running extended tests (some tests will be skipped)")
return super(VppTestRunner, self).run(filtered)