return False
+def running_on_centos():
+ try:
+ os_id = os.getenv("OS_ID")
+ return True if "centos" in os_id.lower() else False
+ except:
+ return False
+ return False
+
+
class KeepAliveReporter(object):
"""
Singleton object which reports test start to parent process
if not desc:
desc = str(test)
- self.pipe.send((desc, test.vpp_bin, test.tempdir))
+ self.pipe.send((desc, test.vpp_bin, test.tempdir, test.vpp.pid))
class VppTestCase(unittest.TestCase):
if coredump_size is None:
coredump_size = "coredump-size unlimited"
cls.vpp_cmdline = [cls.vpp_bin, "unix",
- "{", "nodaemon", debug_cli, coredump_size, "}",
- "api-trace", "{", "on", "}",
+ "{", "nodaemon", debug_cli, "full-coredump",
+ coredump_size, "}", "api-trace", "{", "on", "}",
"api-segment", "{", "prefix", cls.shm_prefix, "}",
"plugins", "{", "plugin", "dpdk_plugin.so", "{",
"disable", "}", "}"]
gc.collect() # run garbage collection first
cls.logger = getLogger(cls.__name__)
cls.tempdir = tempfile.mkdtemp(
- prefix='vpp-unittest-' + cls.__name__ + '-')
+ prefix='vpp-unittest-%s-' % cls.__name__)
cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
cls.file_handler.setFormatter(
Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
cls.registry = VppObjectRegistry()
cls.vpp_startup_failed = False
cls.reporter = KeepAliveReporter()
- cls.reporter.send_keep_alive(cls)
# need to catch exceptions here because if we raise, then the cleanup
# doesn't get called and we might end with a zombie vpp
try:
cls.run_vpp()
+ cls.reporter.send_keep_alive(cls)
cls.vpp_stdout_deque = deque()
cls.vpp_stderr_deque = deque()
cls.pump_thread_stop_flag = Event()
unittest.TestResult.addSkip(self, test, reason)
self.result_string = colorize("SKIP", YELLOW)
+ def symlink_failed(self, test):
+ logger = None
+ if hasattr(test, 'logger'):
+ logger = test.logger
+ if hasattr(test, 'tempdir'):
+ try:
+ failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
+ link_path = '%s/%s-FAILED' % (failed_dir,
+ test.tempdir.split("/")[-1])
+ if logger:
+ logger.debug("creating a link to the failed test")
+ logger.debug("os.symlink(%s, %s)" %
+ (test.tempdir, link_path))
+ os.symlink(test.tempdir, link_path)
+ except Exception as e:
+ if logger:
+ logger.error(e)
+
+ def send_failure_through_pipe(self, test):
+ if hasattr(self, 'test_framework_failed_pipe'):
+ pipe = self.test_framework_failed_pipe
+ if pipe:
+ pipe.send(test.__class__)
+
def addFailure(self, test, err):
"""
Record a test failed result
if hasattr(test, 'tempdir'):
self.result_string = colorize("FAIL", RED) + \
' [ temp dir used by test case: ' + test.tempdir + ' ]'
+ self.symlink_failed(test)
else:
self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
+ self.send_failure_through_pipe(test)
+
def addError(self, test, err):
"""
Record a test error result
if hasattr(test, 'tempdir'):
self.result_string = colorize("ERROR", RED) + \
' [ temp dir used by test case: ' + test.tempdir + ' ]'
+ self.symlink_failed(test)
else:
self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
+ self.send_failure_through_pipe(test)
+
def getDescription(self, test):
"""
Get test description
self.stream.writeln("%s" % err)
+class Filter_by_test_option:
+ def __init__(self, filter_file_name, filter_class_name, filter_func_name):
+ self.filter_file_name = filter_file_name
+ self.filter_class_name = filter_class_name
+ self.filter_func_name = filter_func_name
+
+ def __call__(self, file_name, class_name, func_name):
+ if self.filter_file_name and file_name != self.filter_file_name:
+ return False
+ if self.filter_class_name and class_name != self.filter_class_name:
+ return False
+ if self.filter_func_name and func_name != self.filter_func_name:
+ return False
+ return True
+
+
class VppTestRunner(unittest.TextTestRunner):
"""
A basic test runner implementation which prints results to standard error.
"""Class maintaining the results of the tests"""
return VppTestResult
- def __init__(self, pipe=None, stream=sys.stderr, descriptions=True,
+ def __init__(self, keep_alive_pipe=None, failed_pipe=None,
+ stream=sys.stderr, descriptions=True,
verbosity=1, failfast=False, buffer=False, resultclass=None):
# ignore stream setting here, use hard-coded stdout to be in sync
# with prints from VppTestCase methods ...
verbosity, failfast, buffer,
resultclass)
reporter = KeepAliveReporter()
- reporter.pipe = pipe
+ reporter.pipe = keep_alive_pipe
+ # this is super-ugly, but very simple to implement and works as long
+ # as we run only one test at the same time
+ VppTestResult.test_framework_failed_pipe = failed_pipe
test_option = "TEST"
filter_file_name = 'test_%s' % f
return filter_file_name, filter_class_name, filter_func_name
- def filter_tests(self, tests, filter_file, filter_class, filter_func):
+ @staticmethod
+ def filter_tests(tests, filter_cb):
result = unittest.suite.TestSuite()
for t in tests:
if isinstance(t, unittest.suite.TestSuite):
# this is a bunch of tests, recursively filter...
- x = self.filter_tests(t, filter_file, filter_class,
- filter_func)
+ x = filter_tests(t, filter_cb)
if x.countTestCases() > 0:
result.addTest(x)
elif isinstance(t, unittest.TestCase):
# test_classifier.TestClassifier.test_acl_ip
# apply filtering only if it is so
if len(parts) == 3:
- if filter_file and filter_file != parts[0]:
- continue
- if filter_class and filter_class != parts[1]:
- continue
- if filter_func and filter_func != parts[2]:
+ if not filter_cb(parts[0], parts[1], parts[2]):
continue
result.addTest(t)
else:
filter_file, filter_class, filter_func = self.parse_test_option()
print("Active filters: file=%s, class=%s, function=%s" % (
filter_file, filter_class, filter_func))
- filtered = self.filter_tests(test, filter_file, filter_class,
- filter_func)
+ filter_cb = Filter_by_test_option(
+ filter_file, filter_class, filter_func)
+ filtered = self.filter_tests(test, filter_cb)
print("%s out of %s tests match specified filters" % (
filtered.countTestCases(), test.countTestCases()))
if not running_extended_tests():
print("Not running extended tests (some tests will be skipped)")
return super(VppTestRunner, self).run(filtered)
+
+
+class Worker(Thread):
+ def __init__(self, args, logger):
+ self.logger = logger
+ self.args = args
+ self.result = None
+ super(Worker, self).__init__()
+
+ def run(self):
+ executable = self.args[0]
+ self.logger.debug("Running executable w/args `%s'" % self.args)
+ env = os.environ.copy()
+ env["CK_LOG_FILE_NAME"] = "-"
+ self.process = subprocess.Popen(
+ self.args, shell=False, env=env, preexec_fn=os.setpgrp,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ out, err = self.process.communicate()
+ self.logger.debug("Finished running `%s'" % executable)
+ self.logger.info("Return code is `%s'" % self.process.returncode)
+ self.logger.info(single_line_delim)
+ self.logger.info("Executable `%s' wrote to stdout:" % executable)
+ self.logger.info(single_line_delim)
+ self.logger.info(out)
+ self.logger.info(single_line_delim)
+ self.logger.info("Executable `%s' wrote to stderr:" % executable)
+ self.logger.info(single_line_delim)
+ self.logger.error(err)
+ self.logger.info(single_line_delim)
+ self.result = self.process.returncode