import sys
import shutil
import os
-import select
+import fnmatch
import unittest
import argparse
import time
import threading
import signal
import psutil
+import re
from multiprocessing import Process, Pipe, cpu_count
from multiprocessing.queues import Queue
from multiprocessing.managers import BaseManager
TEST_RUN
from debug import spawn_gdb
from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
- colorize
+ colorize, single_line_delim
from discover_tests import discover_tests
from subprocess import check_output, CalledProcessError
-from util import check_core_path
+from util import check_core_path, get_core_path, is_core_present
# timeout which controls how long the child has to finish after seeing
# a core dump in test temporary directory. If this is exceeded, parent assumes
class TestResult(dict):
- def __init__(self, testcase_suite):
+ def __init__(self, testcase_suite, testcases_by_id=None):
super(TestResult, self).__init__()
self[PASS] = []
self[FAIL] = []
self[ERROR] = []
self[SKIP] = []
self[TEST_RUN] = []
+ self.crashed = False
self.testcase_suite = testcase_suite
self.testcases = [testcase for testcase in testcase_suite]
- self.testcases_by_id = {}
+ self.testcases_by_id = testcases_by_id
def was_successful(self):
- return len(self[PASS] + self[SKIP]) \
- == self.testcase_suite.countTestCases()
+ return 0 == len(self[FAIL]) == len(self[ERROR]) \
+ and len(self[PASS] + self[SKIP]) \
+ == self.testcase_suite.countTestCases() == len(self[TEST_RUN])
def no_tests_run(self):
return 0 == len(self[TEST_RUN])
def process_result(self, test_id, result):
self[result].append(test_id)
- for testcase in self.testcases:
- if testcase.id() == test_id:
- self.testcases_by_id[test_id] = testcase
- self.testcases.remove(testcase)
- break
def suite_from_failed(self):
rerun_ids = set([])
return suite_from_failed(self.testcase_suite, rerun_ids)
def get_testcase_names(self, test_id):
- return self._get_testcase_class(test_id), \
- self._get_test_description(test_id)
+ if re.match(r'.+\..+\..+', test_id):
+ test_name = self._get_test_description(test_id)
+ testcase_name = self._get_testcase_doc_name(test_id)
+ else:
+ # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
+ setup_teardown_match = re.match(
+ r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
+ if setup_teardown_match:
+ test_name, _, _, testcase_name = setup_teardown_match.groups()
+ if len(testcase_name.split('.')) == 2:
+ for key in self.testcases_by_id.keys():
+ if key.startswith(testcase_name):
+ testcase_name = key
+ break
+ testcase_name = self._get_testcase_doc_name(testcase_name)
+ else:
+ test_name = test_id
+ testcase_name = test_id
+
+ return testcase_name, test_name
def _get_test_description(self, test_id):
- if test_id in self.testcases_by_id:
- return get_test_description(descriptions,
- self.testcases_by_id[test_id])
- else:
- return test_id
+ return get_test_description(descriptions,
+ self.testcases_by_id[test_id])
- def _get_testcase_class(self, test_id):
- if test_id in self.testcases_by_id:
- return get_testcase_doc_name(self.testcases_by_id[test_id])
- else:
- return test_id
+ def _get_testcase_doc_name(self, test_id):
+ return get_testcase_doc_name(self.testcases_by_id[test_id])
def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
finished_pipe, result_pipe, logger):
sys.stdout = stdouterr_queue
sys.stderr = stdouterr_queue
- VppTestCase.logger = logger
+ VppTestCase.parallel_handler = logger.handlers[0]
result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
descriptions=descriptions,
verbosity=verbose,
result_pipe=result_pipe,
- failfast=failfast).run(suite)
+ failfast=failfast,
+ print_summary=False).run(suite)
finished_pipe.send(result.wasSuccessful())
finished_pipe.close()
keep_alive_pipe.close()
self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
self.result_parent_end, self.result_child_end = Pipe(duplex=False)
self.testcase_suite = testcase_suite
- self.stdouterr_queue = manager.StreamQueue()
+ if sys.version[0] == '2':
+ self.stdouterr_queue = manager.StreamQueue()
+ else:
+ from multiprocessing import get_context
+ self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
self.logger = get_parallel_logger(self.stdouterr_queue)
self.child = Process(target=test_runner_wrapper,
args=(testcase_suite,
self.child.start()
self.last_test_temp_dir = None
self.last_test_vpp_binary = None
- self.last_test = None
- self.result = None
+ self._last_test = None
+ self.last_test_id = None
self.vpp_pid = None
self.last_heard = time.time()
self.core_detected_at = None
- self.result = TestResult(testcase_suite)
+ self.testcases_by_id = {}
+ self.testclasess_with_core = {}
+ for testcase in self.testcase_suite:
+ self.testcases_by_id[testcase.id()] = testcase
+ self.result = TestResult(testcase_suite, self.testcases_by_id)
+
+ @property
+ def last_test(self):
+ return self._last_test
+
+ @last_test.setter
+ def last_test(self, test_id):
+ self.last_test_id = test_id
+ if test_id in self.testcases_by_id:
+ testcase = self.testcases_by_id[test_id]
+ self._last_test = testcase.shortDescription()
+ if not self._last_test:
+ self._last_test = str(testcase)
+ else:
+ self._last_test = test_id
+
+ def add_testclass_with_core(self):
+ if self.last_test_id in self.testcases_by_id:
+ test = self.testcases_by_id[self.last_test_id]
+ class_name = unittest.util.strclass(test.__class__)
+ test_name = "'{}' ({})".format(get_test_description(descriptions,
+ test),
+ self.last_test_id)
+ else:
+ test_name = self.last_test_id
+ class_name = re.match(r'((tearDownClass)|(setUpClass)) '
+ r'\((.+\..+)\)', test_name).groups()[3]
+ if class_name not in self.testclasess_with_core:
+ self.testclasess_with_core[class_name] = (
+ test_name,
+ self.last_test_vpp_binary,
+ self.last_test_temp_dir)
def close_pipes(self):
self.keep_alive_child_end.close()
self.finished_parent_end.close()
self.result_parent_end.close()
+ def was_successful(self):
+ return self.result.was_successful()
+
def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
read_testcases):
read_testcase = None
- while read_testcases.is_set() or len(unread_testcases) > 0:
- if not read_testcase:
- if len(finished_unread_testcases) > 0:
- read_testcase = finished_unread_testcases.pop()
- unread_testcases.remove(read_testcase)
- elif len(unread_testcases) > 0:
- read_testcase = unread_testcases.pop()
+ while read_testcases.is_set() or len(unread_testcases):
+ if len(finished_unread_testcases):
+ read_testcase = finished_unread_testcases.pop()
+ unread_testcases.remove(read_testcase)
+ elif len(unread_testcases):
+ read_testcase = unread_testcases.pop()
if read_testcase:
data = ''
while data is not None:
read_testcase = None
+def handle_failed_suite(logger, last_test_temp_dir, vpp_pid):
+ if last_test_temp_dir:
+ # Need to create link in case of a timeout or core dump without failure
+ lttd = os.path.basename(last_test_temp_dir)
+ failed_dir = os.getenv('FAILED_DIR')
+ link_path = '%s%s-FAILED' % (failed_dir, lttd)
+ if not os.path.exists(link_path):
+ os.symlink(last_test_temp_dir, link_path)
+ logger.error("Symlink to failed testcase directory: %s -> %s"
+ % (link_path, lttd))
+
+ # Report core existence
+ core_path = get_core_path(last_test_temp_dir)
+ if os.path.exists(core_path):
+ logger.error(
+ "Core-file exists in test temporary directory: %s!" %
+ core_path)
+ check_core_path(logger, core_path)
+ logger.debug("Running `file %s':" % core_path)
+ try:
+ info = check_output(["file", core_path])
+ logger.debug(info)
+ except CalledProcessError as e:
+ logger.error("Could not run `file' utility on core-file, "
+ "rc=%s" % e.returncode)
+
+ if vpp_pid:
+ # Copy api post mortem
+ api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
+ if os.path.isfile(api_post_mortem_path):
+ logger.error("Copying api_post_mortem.%d to %s" %
+ (vpp_pid, last_test_temp_dir))
+ shutil.copy2(api_post_mortem_path, last_test_temp_dir)
+
+
+def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
+ if is_core_present(tempdir):
+ print('VPP core detected in %s. Last test running was %s' %
+ (tempdir, core_crash_test))
+ print(single_line_delim)
+ spawn_gdb(vpp_binary, get_core_path(tempdir))
+ print(single_line_delim)
+
+
+def handle_cores(failed_testcases):
+ if debug_core:
+ for failed_testcase in failed_testcases:
+ tcs_with_core = failed_testcase.testclasess_with_core
+ if len(tcs_with_core) > 0:
+ for test, vpp_binary, tempdir in tcs_with_core.values():
+ check_and_handle_core(vpp_binary, tempdir, test)
+
+
+def process_finished_testsuite(wrapped_testcase_suite,
+ finished_testcase_suites,
+ failed_wrapped_testcases,
+ results):
+ results.append(wrapped_testcase_suite.result)
+ finished_testcase_suites.add(wrapped_testcase_suite)
+ stop_run = False
+ if failfast and not wrapped_testcase_suite.was_successful():
+ stop_run = True
+
+ if not wrapped_testcase_suite.was_successful():
+ failed_wrapped_testcases.add(wrapped_testcase_suite)
+ handle_failed_suite(wrapped_testcase_suite.logger,
+ wrapped_testcase_suite.last_test_temp_dir,
+ wrapped_testcase_suite.vpp_pid)
+
+ return stop_run
+
+
def run_forked(testcase_suites):
wrapped_testcase_suites = set()
# suites are unhashable, need to use list
results = []
- debug_core = os.getenv("DEBUG", "").lower() == "core"
unread_testcases = set()
finished_unread_testcases = set()
manager = StreamQueueManager()
manager)
wrapped_testcase_suites.add(wrapped_testcase_suite)
unread_testcases.add(wrapped_testcase_suite)
- # time.sleep(1)
else:
break
read_from_testcases))
stdouterr_thread.start()
- while len(wrapped_testcase_suites) > 0:
- finished_testcase_suites = set()
- for wrapped_testcase_suite in wrapped_testcase_suites:
- while wrapped_testcase_suite.result_parent_end.poll():
- wrapped_testcase_suite.result.process_result(
- *wrapped_testcase_suite.result_parent_end.recv())
- wrapped_testcase_suite.last_heard = time.time()
-
- if wrapped_testcase_suite.finished_parent_end.poll():
- wrapped_testcase_suite.finished_parent_end.recv()
- results.append(wrapped_testcase_suite.result)
- finished_testcase_suites.add(wrapped_testcase_suite)
- continue
-
- while wrapped_testcase_suite.keep_alive_parent_end.poll():
- wrapped_testcase_suite.last_test, \
- wrapped_testcase_suite.last_test_vpp_binary, \
- wrapped_testcase_suite.last_test_temp_dir, \
- wrapped_testcase_suite.vpp_pid = \
- wrapped_testcase_suite.keep_alive_parent_end.recv()
- wrapped_testcase_suite.last_heard = time.time()
-
- fail = False
- if wrapped_testcase_suite.last_heard + test_timeout < time.time() \
- and not os.path.isfile(
- "%s/_core_handled" %
- wrapped_testcase_suite.last_test_temp_dir):
- fail = True
- wrapped_testcase_suite.logger.critical(
- "Timeout while waiting for child test "
- "runner process (last test running was "
- "`%s' in `%s')!" %
- (wrapped_testcase_suite.last_test,
- wrapped_testcase_suite.last_test_temp_dir))
- elif not wrapped_testcase_suite.child.is_alive():
- fail = True
- wrapped_testcase_suite.logger.critical(
- "Child python process unexpectedly died "
- "(last test running was `%s' in `%s')!" %
- (wrapped_testcase_suite.last_test,
- wrapped_testcase_suite.last_test_temp_dir))
- elif wrapped_testcase_suite.last_test_temp_dir and \
- wrapped_testcase_suite.last_test_vpp_binary:
- core_path = "%s/core" % \
- wrapped_testcase_suite.last_test_temp_dir
- if os.path.isfile(core_path):
- if wrapped_testcase_suite.core_detected_at is None:
- wrapped_testcase_suite.core_detected_at = time.time()
- elif wrapped_testcase_suite.core_detected_at + \
- core_timeout < time.time():
- if not os.path.isfile(
- "%s/_core_handled" %
- wrapped_testcase_suite.
- last_test_temp_dir):
- wrapped_testcase_suite.logger.critical(
- "Child python process unresponsive and core-"
- "file exists in test temporary directory!")
- fail = True
+ failed_wrapped_testcases = set()
+ stop_run = False
+
+ try:
+ while len(wrapped_testcase_suites) > 0:
+ finished_testcase_suites = set()
+ for wrapped_testcase_suite in wrapped_testcase_suites:
+ while wrapped_testcase_suite.result_parent_end.poll():
+ wrapped_testcase_suite.result.process_result(
+ *wrapped_testcase_suite.result_parent_end.recv())
+ wrapped_testcase_suite.last_heard = time.time()
+
+ while wrapped_testcase_suite.keep_alive_parent_end.poll():
+ wrapped_testcase_suite.last_test, \
+ wrapped_testcase_suite.last_test_vpp_binary, \
+ wrapped_testcase_suite.last_test_temp_dir, \
+ wrapped_testcase_suite.vpp_pid = \
+ wrapped_testcase_suite.keep_alive_parent_end.recv()
+ wrapped_testcase_suite.last_heard = time.time()
+
+ if wrapped_testcase_suite.finished_parent_end.poll():
+ wrapped_testcase_suite.finished_parent_end.recv()
+ wrapped_testcase_suite.last_heard = time.time()
+ stop_run = process_finished_testsuite(
+ wrapped_testcase_suite,
+ finished_testcase_suites,
+ failed_wrapped_testcases,
+ results) or stop_run
+ continue
- if fail:
- failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
- if wrapped_testcase_suite.last_test_temp_dir:
- lttd = os.path.basename(
- wrapped_testcase_suite.last_test_temp_dir)
- else:
- lttd = None
- link_path = '%s%s-FAILED' % (failed_dir, lttd)
- wrapped_testcase_suite.logger.error(
- "Creating a link to the failed test: %s -> %s" %
- (link_path, lttd))
- if not os.path.exists(link_path) \
- and wrapped_testcase_suite.last_test_temp_dir:
- os.symlink(wrapped_testcase_suite.last_test_temp_dir,
- link_path)
- api_post_mortem_path = "/tmp/api_post_mortem.%d" % \
- wrapped_testcase_suite.vpp_pid
- if os.path.isfile(api_post_mortem_path):
- wrapped_testcase_suite.logger.error(
- "Copying api_post_mortem.%d to %s" %
- (wrapped_testcase_suite.vpp_pid,
+ fail = False
+ if wrapped_testcase_suite.last_heard + test_timeout < \
+ time.time():
+ fail = True
+ wrapped_testcase_suite.logger.critical(
+ "Child test runner process timed out "
+ "(last test running was `%s' in `%s')!" %
+ (wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_temp_dir))
+ elif not wrapped_testcase_suite.child.is_alive():
+ fail = True
+ wrapped_testcase_suite.logger.critical(
+ "Child test runner process unexpectedly died "
+ "(last test running was `%s' in `%s')!" %
+ (wrapped_testcase_suite.last_test,
wrapped_testcase_suite.last_test_temp_dir))
- shutil.copy2(api_post_mortem_path,
- wrapped_testcase_suite.last_test_temp_dir)
- if wrapped_testcase_suite.last_test_temp_dir and \
+ elif wrapped_testcase_suite.last_test_temp_dir and \
wrapped_testcase_suite.last_test_vpp_binary:
- core_path = "%s/core" % \
- wrapped_testcase_suite.last_test_temp_dir
- if os.path.isfile(core_path):
- wrapped_testcase_suite.logger.error(
- "Core-file exists in test temporary directory: %s!"
- % core_path)
- check_core_path(wrapped_testcase_suite.logger,
- core_path)
- wrapped_testcase_suite.logger.debug(
- "Running `file %s':" % core_path)
- try:
- info = check_output(["file", core_path])
- wrapped_testcase_suite.logger.debug(info)
- except CalledProcessError as e:
- wrapped_testcase_suite.logger.error(
- "Could not run `file' utility on core-file, "
- "rc=%s" % e.returncode)
- pass
- if debug_core:
- spawn_gdb(
- wrapped_testcase_suite.last_test_vpp_binary,
- core_path, wrapped_testcase_suite.logger)
- wrapped_testcase_suite.child.terminate()
- try:
- # terminating the child process tends to leave orphan
- # VPP process around
- os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
- except OSError:
- # already dead
- pass
- results.append(wrapped_testcase_suite.result)
- finished_testcase_suites.add(wrapped_testcase_suite)
-
- for finished_testcase in finished_testcase_suites:
- finished_testcase.child.join()
- finished_testcase.close_pipes()
- wrapped_testcase_suites.remove(finished_testcase)
- finished_unread_testcases.add(finished_testcase)
- finished_testcase.stdouterr_queue.put(None)
- if len(testcase_suites) > 0:
- new_testcase = TestCaseWrapper(testcase_suites.pop(0), manager)
- wrapped_testcase_suites.add(new_testcase)
- unread_testcases.add(new_testcase)
-
- read_from_testcases.clear()
- stdouterr_thread.join(test_timeout)
- manager.shutdown()
+ if is_core_present(
+ wrapped_testcase_suite.last_test_temp_dir):
+ wrapped_testcase_suite.add_testclass_with_core()
+ if wrapped_testcase_suite.core_detected_at is None:
+ wrapped_testcase_suite.core_detected_at = \
+ time.time()
+ elif wrapped_testcase_suite.core_detected_at + \
+ core_timeout < time.time():
+ wrapped_testcase_suite.logger.critical(
+ "Child test runner process unresponsive and "
+ "core-file exists in test temporary directory "
+ "(last test running was `%s' in `%s')!" %
+ (wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_temp_dir))
+ fail = True
+
+ if fail:
+ wrapped_testcase_suite.child.terminate()
+ try:
+ # terminating the child process tends to leave orphan
+ # VPP process around
+ if wrapped_testcase_suite.vpp_pid:
+ os.kill(wrapped_testcase_suite.vpp_pid,
+ signal.SIGTERM)
+ except OSError:
+ # already dead
+ pass
+ wrapped_testcase_suite.result.crashed = True
+ wrapped_testcase_suite.result.process_result(
+ wrapped_testcase_suite.last_test_id, ERROR)
+ stop_run = process_finished_testsuite(
+ wrapped_testcase_suite,
+ finished_testcase_suites,
+ failed_wrapped_testcases,
+ results) or stop_run
+
+ for finished_testcase in finished_testcase_suites:
+ finished_testcase.child.join()
+ finished_testcase.close_pipes()
+ wrapped_testcase_suites.remove(finished_testcase)
+ finished_unread_testcases.add(finished_testcase)
+ finished_testcase.stdouterr_queue.put(None)
+ if stop_run:
+ while len(testcase_suites) > 0:
+ results.append(TestResult(testcase_suites.pop(0)))
+ elif len(testcase_suites) > 0:
+ new_testcase = TestCaseWrapper(testcase_suites.pop(0),
+ manager)
+ wrapped_testcase_suites.add(new_testcase)
+ unread_testcases.add(new_testcase)
+ except Exception:
+ for wrapped_testcase_suite in wrapped_testcase_suites:
+ wrapped_testcase_suite.child.terminate()
+ wrapped_testcase_suite.stdouterr_queue.put(None)
+ raise
+ finally:
+ read_from_testcases.clear()
+ stdouterr_thread.join(test_timeout)
+ manager.shutdown()
+
+ handle_cores(failed_wrapped_testcases)
return results
self.filter_func_name = filter_func_name
def __call__(self, file_name, class_name, func_name):
- if self.filter_file_name and file_name != self.filter_file_name:
- return False
+ if self.filter_file_name:
+ fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
+ if not fn_match:
+ return False
if self.filter_class_name and class_name != self.filter_class_name:
return False
if self.filter_func_name and func_name != self.filter_func_name:
def add_result(self, result):
retval = 0
self.all_testcases += result.testcase_suite.countTestCases()
- if not result.no_tests_run():
- if not result.was_successful():
- retval = 1
+ self.add_results(result)
- self.add_results(result)
- else:
+ if result.no_tests_run():
self.testsuites_no_tests_run.append(result.testcase_suite)
- retval = -1
+ if result.crashed:
+ retval = -1
+ else:
+ retval = 1
+ elif not result.was_successful():
+ retval = 1
if retval != 0:
- if concurrent_tests == 1:
- if not result.no_tests_run():
- self.rerun.append(result.suite_from_failed())
- else:
- self.rerun.append(result.testcase_suite)
- else:
- self.rerun.append(result.testcase_suite)
+ self.rerun.append(result.testcase_suite)
return retval
colorize(str(self[ERROR]), RED)))
if self.all_failed > 0:
- print('FAILED TESTS:')
+ print('FAILURES AND ERRORS IN TESTS:')
for result in self.results_per_suite:
failed_testcase_ids = result[FAIL]
errored_testcase_ids = result[ERROR]
print(' Testcase name: {}'.format(
colorize(new_testcase_name, RED)))
old_testcase_name = new_testcase_name
- print(' FAILED: {}'.format(
- colorize(test_name, RED)))
+ print(' FAILURE: {} [{}]'.format(
+ colorize(test_name, RED), failed_test_id))
for failed_test_id in errored_testcase_ids:
new_testcase_name, test_name = \
result.get_testcase_names(failed_test_id)
print(' Testcase name: {}'.format(
colorize(new_testcase_name, RED)))
old_testcase_name = new_testcase_name
- print(' ERRORED: {}'.format(
- colorize(test_name, RED)))
+ print(' ERROR: {} [{}]'.format(
+ colorize(test_name, RED), failed_test_id))
if len(self.testsuites_no_tests_run) > 0:
print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
- tc_classes = set([])
+ tc_classes = set()
for testsuite in self.testsuites_no_tests_run:
for testcase in testsuite:
tc_classes.add(get_testcase_doc_name(testcase))
debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
- step = os.getenv("STEP", "n").lower() in ("y", "yes", "1")
+ debug_core = os.getenv("DEBUG", "").lower() == "core"
- force_foreground = \
- os.getenv("FORCE_FOREGROUND", "n").lower() in ("y", "yes", "1")
+ step = os.getenv("STEP", "n").lower() in ("y", "yes", "1")
- run_interactive = debug or step or force_foreground
+ run_interactive = debug or step
test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
if test_jobs == 'auto':
if run_interactive and concurrent_tests > 1:
raise NotImplementedError(
- 'Running tests interactively (DEBUG, STEP or FORCE_FOREGROUND is '
- 'set) in parallel (TEST_JOBS is more than 1) is not '
- 'supported')
+ 'Running tests interactively (DEBUG is gdb or gdbserver or STEP '
+ 'is set) in parallel (TEST_JOBS is more than 1) is not supported')
parser = argparse.ArgumentParser(description="VPP unit tests")
parser.add_argument("-f", "--failfast", action='store_true',
filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
+ ignore_path = os.getenv("VENV_PATH", None)
cb = SplitToSuitesCallback(filter_cb)
for d in args.dir:
print("Adding tests from directory tree %s" % d)
- discover_tests(d, cb)
+ discover_tests(d, cb, ignore_path)
# suites are not hashable, need to use list
suites = []
tests_amount += testcase_suite.countTestCases()
suites.append(testcase_suite)
- if concurrent_tests == 1:
- new_suite = unittest.TestSuite()
- for suite in suites:
- new_suite.addTests(suite)
-
- suites = [new_suite]
-
print("%s out of %s tests match specified filters" % (
tests_amount, tests_amount + cb.filtered.countTestCases()))
if run_interactive:
# don't fork if requiring interactive terminal
- sys.exit(not VppTestRunner(
- verbosity=verbose, failfast=failfast)
- .run(suites[0]).wasSuccessful())
+ result = VppTestRunner(verbosity=verbose,
+ failfast=failfast,
+ print_summary=True).run(suites[0])
+ was_successful = result.wasSuccessful()
+ if not was_successful:
+ for test_case_info in result.failed_test_cases_info:
+ handle_failed_suite(test_case_info.logger,
+ test_case_info.tempdir,
+ test_case_info.vpp_pid)
+ if debug_core and \
+ test_case_info in result.core_crash_test_cases_info:
+ check_and_handle_core(test_case_info.vpp_bin_path,
+ test_case_info.tempdir,
+ test_case_info.core_crash_test)
+
+ sys.exit(not was_successful)
else:
exit_code = 0
while len(suites) > 0 and attempts > 0:
- tests_amount = sum([x.countTestCases() for x in suites])
results = run_forked(suites)
exit_code, suites = parse_results(results)
attempts -= 1