-#!/usr/bin/env python
+#!/usr/bin/env python3
import sys
import shutil
import signal
import psutil
import re
+import multiprocessing
from multiprocessing import Process, Pipe, cpu_count
from multiprocessing.queues import Queue
from multiprocessing.managers import BaseManager
+import framework
from framework import VppTestRunner, running_extended_tests, VppTestCase, \
get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
TEST_RUN
-from debug import spawn_gdb
+from debug import spawn_gdb, start_vpp_in_gdb
from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
colorize, single_line_delim
from discover_tests import discover_tests
# timeout which controls how long the child has to finish after seeing
# a core dump in test temporary directory. If this is exceeded, parent assumes
-# that child process is stuck (e.g. waiting for shm mutex, which will never
-# get unlocked) and kill the child
+# that child process is stuck (e.g. waiting for event from vpp) and kill
+# the child
core_timeout = 3
-min_req_shm = 536870912 # min 512MB shm required
-# 128MB per extra process
-shm_per_process = 134217728
class StreamQueue(Queue):
except Exception as e:
logger.exception("Unexpected error running `file' utility "
"on core-file")
+ logger.error("gdb %s %s" %
+ (os.getenv('VPP_BIN', 'vpp'), core_path))
if vpp_pid:
# Copy api post mortem
def run_forked(testcase_suites):
wrapped_testcase_suites = set()
+ solo_testcase_suites = []
+ total_test_runners = 0
# suites are unhashable, need to use list
results = []
finished_unread_testcases = set()
manager = StreamQueueManager()
manager.start()
- for i in range(concurrent_tests):
+ total_test_runners = 0
+ while total_test_runners < concurrent_tests:
if testcase_suites:
- wrapped_testcase_suite = TestCaseWrapper(testcase_suites.pop(0),
+ a_suite = testcase_suites.pop(0)
+ if a_suite.is_tagged_run_solo:
+ solo_testcase_suites.append(a_suite)
+ continue
+ wrapped_testcase_suite = TestCaseWrapper(a_suite,
manager)
wrapped_testcase_suites.add(wrapped_testcase_suite)
unread_testcases.add(wrapped_testcase_suite)
+ total_test_runners = total_test_runners + 1
+ else:
+ break
+
+ while total_test_runners < 1 and solo_testcase_suites:
+ if solo_testcase_suites:
+ a_suite = solo_testcase_suites.pop(0)
+ wrapped_testcase_suite = TestCaseWrapper(a_suite,
+ manager)
+ wrapped_testcase_suites.add(wrapped_testcase_suite)
+ unread_testcases.add(wrapped_testcase_suite)
+ total_test_runners = total_test_runners + 1
else:
break
results) or stop_run
for finished_testcase in finished_testcase_suites:
- finished_testcase.child.join()
+ # Somewhat surprisingly, the join below may
+ # timeout, even if client signaled that
+ # it finished - so we note it just in case.
+ join_start = time.time()
+ finished_testcase.child.join(test_finished_join_timeout)
+ join_end = time.time()
+ if join_end - join_start >= test_finished_join_timeout:
+ finished_testcase.logger.error(
+ "Timeout joining finished test: %s (pid %d)" %
+ (finished_testcase.last_test,
+ finished_testcase.child.pid))
finished_testcase.close_pipes()
wrapped_testcase_suites.remove(finished_testcase)
finished_unread_testcases.add(finished_testcase)
finished_testcase.stdouterr_queue.put(None)
+ total_test_runners = total_test_runners - 1
if stop_run:
while testcase_suites:
results.append(TestResult(testcase_suites.pop(0)))
elif testcase_suites:
- new_testcase = TestCaseWrapper(testcase_suites.pop(0),
+ a_testcase = testcase_suites.pop(0)
+ while a_testcase and a_testcase.is_tagged_run_solo:
+ solo_testcase_suites.append(a_testcase)
+ if testcase_suites:
+ a_testcase = testcase_suites.pop(0)
+ else:
+ a_testcase = None
+ if a_testcase:
+ new_testcase = TestCaseWrapper(a_testcase,
+ manager)
+ wrapped_testcase_suites.add(new_testcase)
+ total_test_runners = total_test_runners + 1
+ unread_testcases.add(new_testcase)
+ if solo_testcase_suites and total_test_runners == 0:
+ a_testcase = solo_testcase_suites.pop(0)
+ new_testcase = TestCaseWrapper(a_testcase,
manager)
wrapped_testcase_suites.add(new_testcase)
+ total_test_runners = total_test_runners + 1
unread_testcases.add(new_testcase)
time.sleep(0.1)
except Exception:
self.suite_name = file_name + cls.__name__
if self.suite_name not in self.suites:
self.suites[self.suite_name] = unittest.TestSuite()
+ self.suites[self.suite_name].is_tagged_run_solo = False
self.suites[self.suite_name].addTest(test_method)
+ if test_method.is_tagged_run_solo():
+ self.suites[self.suite_name].is_tagged_run_solo = True
else:
self.filtered.addTest(test_method)
failed_testcase_ids = result[FAIL]
errored_testcase_ids = result[ERROR]
old_testcase_name = None
- if failed_testcase_ids or errored_testcase_ids:
+ if failed_testcase_ids:
for failed_test_id in failed_testcase_ids:
new_testcase_name, test_name = \
result.get_testcase_names(failed_test_id)
old_testcase_name = new_testcase_name
print(' FAILURE: {} [{}]'.format(
colorize(test_name, RED), failed_test_id))
- for failed_test_id in errored_testcase_ids:
+ if errored_testcase_ids:
+ for errored_test_id in errored_testcase_ids:
new_testcase_name, test_name = \
- result.get_testcase_names(failed_test_id)
+ result.get_testcase_names(errored_test_id)
if new_testcase_name != old_testcase_name:
print(' Testcase name: {}'.format(
colorize(new_testcase_name, RED)))
old_testcase_name = new_testcase_name
print(' ERROR: {} [{}]'.format(
- colorize(test_name, RED), failed_test_id))
+ colorize(test_name, RED), errored_test_id))
if self.testsuites_no_tests_run:
print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
tc_classes = set()
test_timeout = parse_digit_env("TIMEOUT", 600) # default = 10 minutes
+ test_finished_join_timeout = 15
+
retries = parse_digit_env("RETRIES", 0)
- debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
+ debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver", "attach"]
debug_core = os.getenv("DEBUG", "").lower() == "core"
- compress_core = os.getenv("CORE_COMPRESS", "").lower() in ("y", "yes", "1")
+ compress_core = framework.BoolEnvironmentVariable("CORE_COMPRESS")
- step = os.getenv("STEP", "n").lower() in ("y", "yes", "1")
+ if os.getenv("VPP_IN_GDB", "n").lower() in ["1", "y", "yes"]:
+ start_vpp_in_gdb()
+ exit()
- force_foreground = os.getenv("FORCE_FOREGROUND", "").lower() in \
- ("y", "yes", "1")
+ step = framework.BoolEnvironmentVariable("STEP")
+ force_foreground = framework.BoolEnvironmentVariable("FORCE_FOREGROUND")
run_interactive = debug or step or force_foreground
+ try:
+ num_cpus = len(os.sched_getaffinity(0))
+ except AttributeError:
+ num_cpus = multiprocessing.cpu_count()
+
+ print("OS reports %s available cpu(s)." % num_cpus)
+
test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
if test_jobs == 'auto':
if run_interactive:
concurrent_tests = 1
print('Interactive mode required, running on one core')
else:
- shm_free = psutil.disk_usage('/dev/shm').free
- shm_max_processes = 1
- if shm_free < min_req_shm:
- raise Exception('Not enough free space in /dev/shm. Required '
- 'free space is at least %sM.'
- % (min_req_shm >> 20))
- else:
- extra_shm = shm_free - min_req_shm
- shm_max_processes += extra_shm / shm_per_process
- concurrent_tests = min(cpu_count(), shm_max_processes)
+ concurrent_tests = num_cpus
print('Found enough resources to run tests with %s cores'
% concurrent_tests)
elif test_jobs.isdigit():
concurrent_tests = int(test_jobs)
+ print("Running on %s core(s) as set by 'TEST_JOBS'." %
+ concurrent_tests)
else:
concurrent_tests = 1
+ print('Running on one core.')
if run_interactive and concurrent_tests > 1:
raise NotImplementedError(
- 'Running tests interactively (DEBUG is gdb or gdbserver or STEP '
- 'is set) in parallel (TEST_JOBS is more than 1) is not supported')
+ 'Running tests interactively (DEBUG is gdb[server] or ATTACH or '
+ 'STEP is set) in parallel (TEST_JOBS is more than 1) is not '
+ 'supported')
parser = argparse.ArgumentParser(description="VPP unit tests")
parser.add_argument("-f", "--failfast", action='store_true',
# don't fork if requiring interactive terminal
print('Running tests in foreground in the current process')
full_suite = unittest.TestSuite()
- map(full_suite.addTests, suites)
+ full_suite.addTests(suites)
result = VppTestRunner(verbosity=verbose,
failfast=failfast,
print_summary=True).run(full_suite)