-#!/usr/bin/env python
+#!/usr/bin/env python3
import sys
import shutil
import argparse
import time
import threading
+import traceback
import signal
-import psutil
import re
-from multiprocessing import Process, Pipe, cpu_count
+from multiprocessing import Process, Pipe, get_context
from multiprocessing.queues import Queue
from multiprocessing.managers import BaseManager
-from framework import VppTestRunner, running_extended_tests, VppTestCase, \
+import framework
+from config import config, num_cpus, available_cpus, max_vpp_cpus
+from framework import VppTestRunner, VppTestCase, \
get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
- TEST_RUN
-from debug import spawn_gdb
+ TEST_RUN, SKIP_CPU_SHORTAGE
+from debug import spawn_gdb, start_vpp_in_gdb
from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
colorize, single_line_delim
from discover_tests import discover_tests
+import sanity_run_vpp
from subprocess import check_output, CalledProcessError
from util import check_core_path, get_core_path, is_core_present
# timeout which controls how long the child has to finish after seeing
# a core dump in test temporary directory. If this is exceeded, parent assumes
-# that child process is stuck (e.g. waiting for shm mutex, which will never
-# get unlocked) and kill the child
+# that child process is stuck (e.g. waiting for event from vpp) and kill
+# the child
core_timeout = 3
-min_req_shm = 536870912 # min 512MB shm required
-# 128MB per extra process
-shm_per_process = 134217728
class StreamQueue(Queue):
self[FAIL] = []
self[ERROR] = []
self[SKIP] = []
+ self[SKIP_CPU_SHORTAGE] = []
self[TEST_RUN] = []
self.crashed = False
self.testcase_suite = testcase_suite
def was_successful(self):
return 0 == len(self[FAIL]) == len(self[ERROR]) \
- and len(self[PASS] + self[SKIP]) \
- == self.testcase_suite.countTestCases() == len(self[TEST_RUN])
+ and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]) \
+ == self.testcase_suite.countTestCases()
def no_tests_run(self):
return 0 == len(self[TEST_RUN])
rerun_ids = set([])
for testcase in self.testcase_suite:
tc_id = testcase.id()
- if tc_id not in self[PASS] and tc_id not in self[SKIP]:
+ if tc_id not in self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]:
rerun_ids.add(tc_id)
- if len(rerun_ids) > 0:
+ if rerun_ids:
return suite_from_failed(self.testcase_suite, rerun_ids)
def get_testcase_names(self, test_id):
- if re.match(r'.+\..+\..+', test_id):
+ # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
+ setup_teardown_match = re.match(
+ r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
+ if setup_teardown_match:
+ test_name, _, _, testcase_name = setup_teardown_match.groups()
+ if len(testcase_name.split('.')) == 2:
+ for key in self.testcases_by_id.keys():
+ if key.startswith(testcase_name):
+ testcase_name = key
+ break
+ testcase_name = self._get_testcase_doc_name(testcase_name)
+ else:
test_name = self._get_test_description(test_id)
testcase_name = self._get_testcase_doc_name(test_id)
- else:
- # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
- setup_teardown_match = re.match(
- r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
- if setup_teardown_match:
- test_name, _, _, testcase_name = setup_teardown_match.groups()
- if len(testcase_name.split('.')) == 2:
- for key in self.testcases_by_id.keys():
- if key.startswith(testcase_name):
- testcase_name = key
- break
- testcase_name = self._get_testcase_doc_name(testcase_name)
- else:
- test_name = test_id
- testcase_name = test_id
return testcase_name, test_name
def _get_test_description(self, test_id):
- return get_test_description(descriptions,
- self.testcases_by_id[test_id])
+ if test_id in self.testcases_by_id:
+ desc = get_test_description(descriptions,
+ self.testcases_by_id[test_id])
+ else:
+ desc = test_id
+ return desc
def _get_testcase_doc_name(self, test_id):
- return get_testcase_doc_name(self.testcases_by_id[test_id])
+ if test_id in self.testcases_by_id:
+ doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
+ else:
+ doc_name = test_id
+ return doc_name
def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
VppTestCase.parallel_handler = logger.handlers[0]
result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
descriptions=descriptions,
- verbosity=verbose,
+ verbosity=config.verbose,
result_pipe=result_pipe,
- failfast=failfast,
+ failfast=config.failfast,
print_summary=False).run(suite)
finished_pipe.send(result.wasSuccessful())
finished_pipe.close()
self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
self.result_parent_end, self.result_child_end = Pipe(duplex=False)
self.testcase_suite = testcase_suite
- self.stdouterr_queue = manager.StreamQueue()
+ self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
self.logger = get_parallel_logger(self.stdouterr_queue)
self.child = Process(target=test_runner_wrapper,
args=(testcase_suite,
def was_successful(self):
return self.result.was_successful()
+ @property
+ def cpus_used(self):
+ return self.testcase_suite.cpus_used
+
+ def get_assigned_cpus(self):
+ return self.testcase_suite.get_assigned_cpus()
+
def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
read_testcases):
read_testcase = None
- while read_testcases.is_set() or len(unread_testcases) > 0:
- if not read_testcase:
- if len(finished_unread_testcases) > 0:
- read_testcase = finished_unread_testcases.pop()
- unread_testcases.remove(read_testcase)
- elif len(unread_testcases) > 0:
- read_testcase = unread_testcases.pop()
+ while read_testcases.is_set() or unread_testcases:
+ if finished_unread_testcases:
+ read_testcase = finished_unread_testcases.pop()
+ unread_testcases.remove(read_testcase)
+ elif unread_testcases:
+ read_testcase = unread_testcases.pop()
if read_testcase:
data = ''
while data is not None:
read_testcase = None
-def handle_failed_suite(logger, last_test_temp_dir, vpp_pid):
+def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
if last_test_temp_dir:
# Need to create link in case of a timeout or core dump without failure
lttd = os.path.basename(last_test_temp_dir)
- failed_dir = os.getenv('FAILED_DIR')
- link_path = '%s%s-FAILED' % (failed_dir, lttd)
+ link_path = '%s%s-FAILED' % (config.failed_dir, lttd)
if not os.path.exists(link_path):
os.symlink(last_test_temp_dir, link_path)
logger.error("Symlink to failed testcase directory: %s -> %s"
"Core-file exists in test temporary directory: %s!" %
core_path)
check_core_path(logger, core_path)
- logger.debug("Running `file %s':" % core_path)
+ logger.debug("Running 'file %s':" % core_path)
try:
info = check_output(["file", core_path])
logger.debug(info)
except CalledProcessError as e:
- logger.error("Could not run `file' utility on core-file, "
- "rc=%s" % e.returncode)
+ logger.error("Subprocess returned with return code "
+ "while running `file' utility on core-file "
+ "returned: "
+ "rc=%s", e.returncode)
+ except OSError as e:
+ logger.error("Subprocess returned with OS error while "
+ "running 'file' utility "
+ "on core-file: "
+ "(%s) %s", e.errno, e.strerror)
+ except Exception as e:
+ logger.exception("Unexpected error running `file' utility "
+ "on core-file")
+ logger.error(f"gdb {vpp_binary} {core_path}")
if vpp_pid:
# Copy api post mortem
def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
if is_core_present(tempdir):
- print('VPP core detected in %s. Last test running was %s' %
- (tempdir, core_crash_test))
- print(single_line_delim)
- spawn_gdb(vpp_binary, get_core_path(tempdir))
- print(single_line_delim)
+ if debug_core:
+ print('VPP core detected in %s. Last test running was %s' %
+ (tempdir, core_crash_test))
+ print(single_line_delim)
+ spawn_gdb(vpp_binary, get_core_path(tempdir))
+ print(single_line_delim)
+ elif config.compress_core:
+ print("Compressing core-file in test directory `%s'" % tempdir)
+ os.system("gzip %s" % get_core_path(tempdir))
def handle_cores(failed_testcases):
- if debug_core:
- for failed_testcase in failed_testcases:
- tcs_with_core = failed_testcase.testclasess_with_core
- if len(tcs_with_core) > 0:
- for test, vpp_binary, tempdir in tcs_with_core.values():
- check_and_handle_core(vpp_binary, tempdir, test)
+ for failed_testcase in failed_testcases:
+ tcs_with_core = failed_testcase.testclasess_with_core
+ if tcs_with_core:
+ for test, vpp_binary, tempdir in tcs_with_core.values():
+ check_and_handle_core(vpp_binary, tempdir, test)
def process_finished_testsuite(wrapped_testcase_suite,
results.append(wrapped_testcase_suite.result)
finished_testcase_suites.add(wrapped_testcase_suite)
stop_run = False
- if failfast and not wrapped_testcase_suite.was_successful():
+ if config.failfast and not wrapped_testcase_suite.was_successful():
stop_run = True
if not wrapped_testcase_suite.was_successful():
failed_wrapped_testcases.add(wrapped_testcase_suite)
handle_failed_suite(wrapped_testcase_suite.logger,
wrapped_testcase_suite.last_test_temp_dir,
- wrapped_testcase_suite.vpp_pid)
+ wrapped_testcase_suite.vpp_pid,
+ wrapped_testcase_suite.last_test_vpp_binary,)
return stop_run
def run_forked(testcase_suites):
wrapped_testcase_suites = set()
+ solo_testcase_suites = []
# suites are unhashable, need to use list
results = []
finished_unread_testcases = set()
manager = StreamQueueManager()
manager.start()
- for i in range(concurrent_tests):
- if len(testcase_suites) > 0:
- wrapped_testcase_suite = TestCaseWrapper(testcase_suites.pop(0),
- manager)
- wrapped_testcase_suites.add(wrapped_testcase_suite)
- unread_testcases.add(wrapped_testcase_suite)
+ tests_running = 0
+ free_cpus = list(available_cpus)
+
+ def on_suite_start(tc):
+ nonlocal tests_running
+ nonlocal free_cpus
+ tests_running = tests_running + 1
+
+ def on_suite_finish(tc):
+ nonlocal tests_running
+ nonlocal free_cpus
+ tests_running = tests_running - 1
+ assert tests_running >= 0
+ free_cpus.extend(tc.get_assigned_cpus())
+
+ def run_suite(suite):
+ nonlocal manager
+ nonlocal wrapped_testcase_suites
+ nonlocal unread_testcases
+ nonlocal free_cpus
+ suite.assign_cpus(free_cpus[:suite.cpus_used])
+ free_cpus = free_cpus[suite.cpus_used:]
+ wrapper = TestCaseWrapper(suite, manager)
+ wrapped_testcase_suites.add(wrapper)
+ unread_testcases.add(wrapper)
+ on_suite_start(suite)
+
+ def can_run_suite(suite):
+ return (tests_running < max_concurrent_tests and
+ (suite.cpus_used <= len(free_cpus) or
+ suite.cpus_used > max_vpp_cpus))
+
+ while free_cpus and testcase_suites:
+ a_suite = testcase_suites[0]
+ if a_suite.is_tagged_run_solo:
+ a_suite = testcase_suites.pop(0)
+ solo_testcase_suites.append(a_suite)
+ continue
+ if can_run_suite(a_suite):
+ a_suite = testcase_suites.pop(0)
+ run_suite(a_suite)
else:
break
+ if tests_running == 0 and solo_testcase_suites:
+ a_suite = solo_testcase_suites.pop(0)
+ run_suite(a_suite)
+
read_from_testcases = threading.Event()
read_from_testcases.set()
stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
failed_wrapped_testcases = set()
stop_run = False
- while len(wrapped_testcase_suites) > 0:
- finished_testcase_suites = set()
+
+ try:
+ while wrapped_testcase_suites:
+ finished_testcase_suites = set()
+ for wrapped_testcase_suite in wrapped_testcase_suites:
+ while wrapped_testcase_suite.result_parent_end.poll():
+ wrapped_testcase_suite.result.process_result(
+ *wrapped_testcase_suite.result_parent_end.recv())
+ wrapped_testcase_suite.last_heard = time.time()
+
+ while wrapped_testcase_suite.keep_alive_parent_end.poll():
+ wrapped_testcase_suite.last_test, \
+ wrapped_testcase_suite.last_test_vpp_binary, \
+ wrapped_testcase_suite.last_test_temp_dir, \
+ wrapped_testcase_suite.vpp_pid = \
+ wrapped_testcase_suite.keep_alive_parent_end.recv()
+ wrapped_testcase_suite.last_heard = time.time()
+
+ if wrapped_testcase_suite.finished_parent_end.poll():
+ wrapped_testcase_suite.finished_parent_end.recv()
+ wrapped_testcase_suite.last_heard = time.time()
+ stop_run = process_finished_testsuite(
+ wrapped_testcase_suite,
+ finished_testcase_suites,
+ failed_wrapped_testcases,
+ results) or stop_run
+ continue
+
+ fail = False
+ if wrapped_testcase_suite.last_heard + config.timeout < \
+ time.time():
+ fail = True
+ wrapped_testcase_suite.logger.critical(
+ "Child test runner process timed out "
+ "(last test running was `%s' in `%s')!" %
+ (wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_temp_dir))
+ elif not wrapped_testcase_suite.child.is_alive():
+ fail = True
+ wrapped_testcase_suite.logger.critical(
+ "Child test runner process unexpectedly died "
+ "(last test running was `%s' in `%s')!" %
+ (wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_temp_dir))
+ elif wrapped_testcase_suite.last_test_temp_dir and \
+ wrapped_testcase_suite.last_test_vpp_binary:
+ if is_core_present(
+ wrapped_testcase_suite.last_test_temp_dir):
+ wrapped_testcase_suite.add_testclass_with_core()
+ if wrapped_testcase_suite.core_detected_at is None:
+ wrapped_testcase_suite.core_detected_at = \
+ time.time()
+ elif wrapped_testcase_suite.core_detected_at + \
+ core_timeout < time.time():
+ wrapped_testcase_suite.logger.critical(
+ "Child test runner process unresponsive and "
+ "core-file exists in test temporary directory "
+ "(last test running was `%s' in `%s')!" %
+ (wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_temp_dir))
+ fail = True
+
+ if fail:
+ wrapped_testcase_suite.child.terminate()
+ try:
+ # terminating the child process tends to leave orphan
+ # VPP process around
+ if wrapped_testcase_suite.vpp_pid:
+ os.kill(wrapped_testcase_suite.vpp_pid,
+ signal.SIGTERM)
+ except OSError:
+ # already dead
+ pass
+ wrapped_testcase_suite.result.crashed = True
+ wrapped_testcase_suite.result.process_result(
+ wrapped_testcase_suite.last_test_id, ERROR)
+ stop_run = process_finished_testsuite(
+ wrapped_testcase_suite,
+ finished_testcase_suites,
+ failed_wrapped_testcases,
+ results) or stop_run
+
+ for finished_testcase in finished_testcase_suites:
+ # Somewhat surprisingly, the join below may
+ # timeout, even if client signaled that
+ # it finished - so we note it just in case.
+ join_start = time.time()
+ finished_testcase.child.join(test_finished_join_timeout)
+ join_end = time.time()
+ if join_end - join_start >= test_finished_join_timeout:
+ finished_testcase.logger.error(
+ "Timeout joining finished test: %s (pid %d)" %
+ (finished_testcase.last_test,
+ finished_testcase.child.pid))
+ finished_testcase.close_pipes()
+ wrapped_testcase_suites.remove(finished_testcase)
+ finished_unread_testcases.add(finished_testcase)
+ finished_testcase.stdouterr_queue.put(None)
+ on_suite_finish(finished_testcase)
+ if stop_run:
+ while testcase_suites:
+ results.append(TestResult(testcase_suites.pop(0)))
+ elif testcase_suites:
+ a_suite = testcase_suites.pop(0)
+ while a_suite and a_suite.is_tagged_run_solo:
+ solo_testcase_suites.append(a_suite)
+ if testcase_suites:
+ a_suite = testcase_suites.pop(0)
+ else:
+ a_suite = None
+ if a_suite and can_run_suite(a_suite):
+ run_suite(a_suite)
+ if solo_testcase_suites and tests_running == 0:
+ a_suite = solo_testcase_suites.pop(0)
+ run_suite(a_suite)
+ time.sleep(0.1)
+ except Exception:
for wrapped_testcase_suite in wrapped_testcase_suites:
- while wrapped_testcase_suite.result_parent_end.poll():
- wrapped_testcase_suite.result.process_result(
- *wrapped_testcase_suite.result_parent_end.recv())
- wrapped_testcase_suite.last_heard = time.time()
-
- while wrapped_testcase_suite.keep_alive_parent_end.poll():
- wrapped_testcase_suite.last_test, \
- wrapped_testcase_suite.last_test_vpp_binary, \
- wrapped_testcase_suite.last_test_temp_dir, \
- wrapped_testcase_suite.vpp_pid = \
- wrapped_testcase_suite.keep_alive_parent_end.recv()
- wrapped_testcase_suite.last_heard = time.time()
-
- if wrapped_testcase_suite.finished_parent_end.poll():
- wrapped_testcase_suite.finished_parent_end.recv()
- wrapped_testcase_suite.last_heard = time.time()
- stop_run = process_finished_testsuite(
- wrapped_testcase_suite,
- finished_testcase_suites,
- failed_wrapped_testcases,
- results) or stop_run
- continue
-
- fail = False
- if wrapped_testcase_suite.last_heard + test_timeout < time.time():
- fail = True
- wrapped_testcase_suite.logger.critical(
- "Child test runner process timed out "
- "(last test running was `%s' in `%s')!" %
- (wrapped_testcase_suite.last_test,
- wrapped_testcase_suite.last_test_temp_dir))
- elif not wrapped_testcase_suite.child.is_alive():
- fail = True
- wrapped_testcase_suite.logger.critical(
- "Child test runner process unexpectedly died "
- "(last test running was `%s' in `%s')!" %
- (wrapped_testcase_suite.last_test,
- wrapped_testcase_suite.last_test_temp_dir))
- elif wrapped_testcase_suite.last_test_temp_dir and \
- wrapped_testcase_suite.last_test_vpp_binary:
- if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
- wrapped_testcase_suite.add_testclass_with_core()
- if wrapped_testcase_suite.core_detected_at is None:
- wrapped_testcase_suite.core_detected_at = time.time()
- elif wrapped_testcase_suite.core_detected_at + \
- core_timeout < time.time():
- wrapped_testcase_suite.logger.critical(
- "Child test runner process unresponsive and core-"
- "file exists in test temporary directory "
- "(last test running was `%s' in `%s')!" %
- (wrapped_testcase_suite.last_test,
- wrapped_testcase_suite.last_test_temp_dir))
- fail = True
-
- if fail:
- wrapped_testcase_suite.child.terminate()
- try:
- # terminating the child process tends to leave orphan
- # VPP process around
- if wrapped_testcase_suite.vpp_pid:
- os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
- except OSError:
- # already dead
- pass
- wrapped_testcase_suite.result.crashed = True
- wrapped_testcase_suite.result.process_result(
- wrapped_testcase_suite.last_test_id, ERROR)
- stop_run = process_finished_testsuite(
- wrapped_testcase_suite,
- finished_testcase_suites,
- failed_wrapped_testcases,
- results) or stop_run
-
- for finished_testcase in finished_testcase_suites:
- finished_testcase.child.join()
- finished_testcase.close_pipes()
- wrapped_testcase_suites.remove(finished_testcase)
- finished_unread_testcases.add(finished_testcase)
- finished_testcase.stdouterr_queue.put(None)
- if stop_run:
- while len(testcase_suites) > 0:
- results.append(TestResult(testcase_suites.pop(0)))
- elif len(testcase_suites) > 0:
- new_testcase = TestCaseWrapper(testcase_suites.pop(0), manager)
- wrapped_testcase_suites.add(new_testcase)
- unread_testcases.add(new_testcase)
-
- while len(unread_testcases) > 0:
- # wait for reader thread to read everything in all loggers
- pass
-
- read_from_testcases.clear()
- stdouterr_thread.join(test_timeout)
- manager.shutdown()
+ wrapped_testcase_suite.child.terminate()
+ wrapped_testcase_suite.stdouterr_queue.put(None)
+ raise
+ finally:
+ read_from_testcases.clear()
+ stdouterr_thread.join(config.timeout)
+ manager.shutdown()
+
handle_cores(failed_wrapped_testcases)
return results
+class TestSuiteWrapper(unittest.TestSuite):
+ cpus_used = 0
+
+ def __init__(self):
+ return super().__init__()
+
+ def addTest(self, test):
+ self.cpus_used = max(self.cpus_used, test.get_cpus_required())
+ super().addTest(test)
+
+ def assign_cpus(self, cpus):
+ self.cpus = cpus
+
+ def _handleClassSetUp(self, test, result):
+ if not test.__class__.skipped_due_to_cpu_lack:
+ test.assign_cpus(self.cpus)
+ super()._handleClassSetUp(test, result)
+
+ def get_assigned_cpus(self):
+ return self.cpus
+
+
class SplitToSuitesCallback:
def __init__(self, filter_callback):
self.suites = {}
self.suite_name = 'default'
self.filter_callback = filter_callback
- self.filtered = unittest.TestSuite()
+ self.filtered = TestSuiteWrapper()
def __call__(self, file_name, cls, method):
test_method = cls(method)
if self.filter_callback(file_name, cls.__name__, method):
self.suite_name = file_name + cls.__name__
if self.suite_name not in self.suites:
- self.suites[self.suite_name] = unittest.TestSuite()
+ self.suites[self.suite_name] = TestSuiteWrapper()
+ self.suites[self.suite_name].is_tagged_run_solo = False
self.suites[self.suite_name].addTest(test_method)
+ if test_method.is_tagged_run_solo():
+ self.suites[self.suite_name].is_tagged_run_solo = True
else:
self.filtered.addTest(test_method)
-test_option = "TEST"
-
-
-def parse_test_option():
- f = os.getenv(test_option, None)
+def parse_test_filter(test_filter):
+ f = test_filter
filter_file_name = None
filter_class_name = None
filter_func_name = None
def filter_tests(tests, filter_cb):
- result = unittest.suite.TestSuite()
+ result = TestSuiteWrapper()
for t in tests:
if isinstance(t, unittest.suite.TestSuite):
# this is a bunch of tests, recursively filter...
self[FAIL] = 0
self[ERROR] = 0
self[SKIP] = 0
+ self[SKIP_CPU_SHORTAGE] = 0
self[TEST_RUN] = 0
self.rerun = []
self.testsuites_no_tests_run = []
def add_results(self, result):
self.results_per_suite.append(result)
- result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN]
+ result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN, SKIP_CPU_SHORTAGE]
for result_type in result_types:
self[result_type] += len(result[result_type])
print('')
print(double_line_delim)
print('TEST RESULTS:')
- print(' Scheduled tests: {}'.format(self.all_testcases))
- print(' Executed tests: {}'.format(self[TEST_RUN]))
- print(' Passed tests: {}'.format(
- colorize(str(self[PASS]), GREEN)))
- if self[SKIP] > 0:
- print(' Skipped tests: {}'.format(
- colorize(str(self[SKIP]), YELLOW)))
- if self.not_executed > 0:
- print(' Not Executed tests: {}'.format(
- colorize(str(self.not_executed), RED)))
- if self[FAIL] > 0:
- print(' Failures: {}'.format(
- colorize(str(self[FAIL]), RED)))
- if self[ERROR] > 0:
- print(' Errors: {}'.format(
- colorize(str(self[ERROR]), RED)))
+
+ def indent_results(lines):
+ lines = list(filter(None, lines))
+ maximum = max(lines, key=lambda x: x.index(":"))
+ maximum = 4 + maximum.index(":")
+ for l in lines:
+ padding = " " * (maximum - l.index(":"))
+ print(f"{padding}{l}")
+
+ indent_results([
+ f'Scheduled tests: {self.all_testcases}',
+ f'Executed tests: {self[TEST_RUN]}',
+ f'Passed tests: {colorize(self[PASS], GREEN)}',
+ f'Skipped tests: {colorize(self[SKIP], YELLOW)}'
+ if self[SKIP] else None,
+ f'Not Executed tests: {colorize(self.not_executed, RED)}'
+ if self.not_executed else None,
+ f'Failures: {colorize(self[FAIL], RED)}' if self[FAIL] else None,
+ f'Errors: {colorize(self[ERROR], RED)}' if self[ERROR] else None,
+ 'Tests skipped due to lack of CPUS: '
+ f'{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}'
+ if self[SKIP_CPU_SHORTAGE] else None
+ ])
if self.all_failed > 0:
print('FAILURES AND ERRORS IN TESTS:')
failed_testcase_ids = result[FAIL]
errored_testcase_ids = result[ERROR]
old_testcase_name = None
- if len(failed_testcase_ids) or len(errored_testcase_ids):
+ if failed_testcase_ids:
for failed_test_id in failed_testcase_ids:
new_testcase_name, test_name = \
result.get_testcase_names(failed_test_id)
old_testcase_name = new_testcase_name
print(' FAILURE: {} [{}]'.format(
colorize(test_name, RED), failed_test_id))
- for failed_test_id in errored_testcase_ids:
+ if errored_testcase_ids:
+ for errored_test_id in errored_testcase_ids:
new_testcase_name, test_name = \
- result.get_testcase_names(failed_test_id)
+ result.get_testcase_names(errored_test_id)
if new_testcase_name != old_testcase_name:
print(' Testcase name: {}'.format(
colorize(new_testcase_name, RED)))
old_testcase_name = new_testcase_name
print(' ERROR: {} [{}]'.format(
- colorize(test_name, RED), failed_test_id))
- if len(self.testsuites_no_tests_run) > 0:
+ colorize(test_name, RED), errored_test_id))
+ if self.testsuites_no_tests_run:
print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
tc_classes = set()
for testsuite in self.testsuites_no_tests_run:
for tc_class in tc_classes:
print(' {}'.format(colorize(tc_class, RED)))
+ if self[SKIP_CPU_SHORTAGE]:
+ print()
+ print(colorize(' SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
+ ' ENOUGH CPUS AVAILABLE', YELLOW))
print(double_line_delim)
print('')
return return_code, results_per_suite.rerun
-def parse_digit_env(env_var, default):
- value = os.getenv(env_var, default)
- if value != default:
- if value.isdigit():
- value = int(value)
- else:
- print('WARNING: unsupported value "%s" for env var "%s",'
- 'defaulting to %s' % (value, env_var, default))
- value = default
- return value
-
-
if __name__ == '__main__':
- verbose = parse_digit_env("V", 0)
-
- test_timeout = parse_digit_env("TIMEOUT", 600) # default = 10 minutes
+ print(f"Config is: {config}")
- retries = parse_digit_env("RETRIES", 0)
+ if config.sanity:
+ print("Running sanity test case.")
+ try:
+ rc = sanity_run_vpp.main()
+ if rc != 0:
+ sys.exit(rc)
+ except Exception as e:
+ print(traceback.format_exc())
+ print("Couldn't run sanity test case.")
+ sys.exit(-1)
- debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
+ test_finished_join_timeout = 15
- debug_core = os.getenv("DEBUG", "").lower() == "core"
+ debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
+ debug_core = config.debug == "core"
- step = os.getenv("STEP", "n").lower() in ("y", "yes", "1")
+ run_interactive = debug_gdb or config.step or config.force_foreground
- run_interactive = debug or step
+ max_concurrent_tests = 0
+ print(f"OS reports {num_cpus} available cpu(s).")
- test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
+ test_jobs = config.jobs
if test_jobs == 'auto':
if run_interactive:
- concurrent_tests = 1
- print('Interactive mode required, running on one core')
+ max_concurrent_tests = 1
+ print('Interactive mode required, running tests consecutively.')
else:
- shm_free = psutil.disk_usage('/dev/shm').free
- shm_max_processes = 1
- if shm_free < min_req_shm:
- raise Exception('Not enough free space in /dev/shm. Required '
- 'free space is at least %sM.'
- % (min_req_shm >> 20))
- else:
- extra_shm = shm_free - min_req_shm
- shm_max_processes += extra_shm / shm_per_process
- concurrent_tests = min(cpu_count(), shm_max_processes)
- print('Found enough resources to run tests with %s cores'
- % concurrent_tests)
- elif test_jobs.isdigit():
- concurrent_tests = int(test_jobs)
+ max_concurrent_tests = num_cpus
+ print(f"Running at most {max_concurrent_tests} python test "
+ "processes concurrently.")
else:
- concurrent_tests = 1
+ max_concurrent_tests = test_jobs
+ print(f"Running at most {max_concurrent_tests} python test processes "
+ "concurrently as set by 'TEST_JOBS'.")
+
+ print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
- if run_interactive and concurrent_tests > 1:
+ if run_interactive and max_concurrent_tests > 1:
raise NotImplementedError(
- 'Running tests interactively (DEBUG is gdb or gdbserver or STEP '
- 'is set) in parallel (TEST_JOBS is more than 1) is not supported')
-
- parser = argparse.ArgumentParser(description="VPP unit tests")
- parser.add_argument("-f", "--failfast", action='store_true',
- help="fast failure flag")
- parser.add_argument("-d", "--dir", action='append', type=str,
- help="directory containing test files "
- "(may be specified multiple times)")
- args = parser.parse_args()
- failfast = args.failfast
+ 'Running tests interactively (DEBUG is gdb[server] or ATTACH or '
+ 'STEP is set) in parallel (TEST_JOBS is more than 1) is not '
+ 'supported')
+
descriptions = True
- print("Running tests using custom test runner") # debug message
- filter_file, filter_class, filter_func = parse_test_option()
+ print("Running tests using custom test runner.")
+ filter_file, filter_class, filter_func = \
+ parse_test_filter(config.filter)
- print("Active filters: file=%s, class=%s, function=%s" % (
+ print("Selected filters: file=%s, class=%s, function=%s" % (
filter_file, filter_class, filter_func))
filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
- ignore_path = os.getenv("VENV_PATH", None)
+ ignore_path = config.venv_dir
cb = SplitToSuitesCallback(filter_cb)
- for d in args.dir:
+ for d in config.test_src_dir:
print("Adding tests from directory tree %s" % d)
discover_tests(d, cb, ignore_path)
tests_amount = 0
for testcase_suite in cb.suites.values():
tests_amount += testcase_suite.countTestCases()
+ if testcase_suite.cpus_used > max_vpp_cpus:
+ # here we replace test functions with lambdas to just skip them
+ # but we also replace setUp/tearDown functions to do nothing
+ # so that the test can be "started" and "stopped", so that we can
+ # still keep those prints (test description - SKIP), which are done
+ # in stopTest() (for that to trigger, test function must run)
+ for t in testcase_suite:
+ for m in dir(t):
+ if m.startswith('test_'):
+ setattr(t, m, lambda: t.skipTest("not enough cpus"))
+ setattr(t.__class__, 'setUpClass', lambda: None)
+ setattr(t.__class__, 'tearDownClass', lambda: None)
+ setattr(t, 'setUp', lambda: None)
+ setattr(t, 'tearDown', lambda: None)
+ t.__class__.skipped_due_to_cpu_lack = True
suites.append(testcase_suite)
print("%s out of %s tests match specified filters" % (
tests_amount, tests_amount + cb.filtered.countTestCases()))
- if not running_extended_tests():
+ if not config.extended:
print("Not running extended tests (some tests will be skipped)")
- attempts = retries + 1
+ attempts = config.retries + 1
if attempts > 1:
print("Perform %s attempts to pass the suite..." % attempts)
- if run_interactive:
+ if run_interactive and suites:
# don't fork if requiring interactive terminal
- result = VppTestRunner(verbosity=verbose,
- failfast=failfast,
- print_summary=True).run(suites[0])
+ print('Running tests in foreground in the current process')
+ full_suite = unittest.TestSuite()
+ free_cpus = list(available_cpus)
+ cpu_shortage = False
+ for suite in suites:
+ if suite.cpus_used <= max_vpp_cpus:
+ suite.assign_cpus(free_cpus[:suite.cpus_used])
+ else:
+ suite.assign_cpus([])
+ cpu_shortage = True
+ full_suite.addTests(suites)
+ result = VppTestRunner(verbosity=config.verbose,
+ failfast=config.failfast,
+ print_summary=True).run(full_suite)
was_successful = result.wasSuccessful()
if not was_successful:
for test_case_info in result.failed_test_cases_info:
handle_failed_suite(test_case_info.logger,
test_case_info.tempdir,
- test_case_info.vpp_pid)
- if debug_core and \
- test_case_info in result.core_crash_test_cases_info:
+ test_case_info.vpp_pid,
+ config.vpp)
+ if test_case_info in result.core_crash_test_cases_info:
check_and_handle_core(test_case_info.vpp_bin_path,
test_case_info.tempdir,
test_case_info.core_crash_test)
+ if cpu_shortage:
+ print()
+ print(colorize('SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
+ ' ENOUGH CPUS AVAILABLE', YELLOW))
+ print()
sys.exit(not was_successful)
else:
+ print('Running each VPPTestCase in a separate background process'
+ f' with at most {max_concurrent_tests} parallel python test '
+ 'process(es)')
exit_code = 0
- while len(suites) > 0 and attempts > 0:
+ while suites and attempts > 0:
results = run_forked(suites)
exit_code, suites = parse_results(results)
attempts -= 1