+class TestCaseWrapper(object):
+ def __init__(self, testcase_suite, manager):
+ self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False)
+ self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
+ self.result_parent_end, self.result_child_end = Pipe(duplex=False)
+ self.testcase_suite = testcase_suite
+ self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
+ self.logger = get_parallel_logger(self.stdouterr_queue)
+ self.child = Process(
+ target=test_runner_wrapper,
+ args=(
+ testcase_suite,
+ self.keep_alive_child_end,
+ self.stdouterr_queue,
+ self.finished_child_end,
+ self.result_child_end,
+ self.logger,
+ ),
+ )
+ self.child.start()
+ self.last_test_temp_dir = None
+ self.last_test_vpp_binary = None
+ self._last_test = None
+ self.last_test_id = None
+ self.vpp_pid = None
+ self.last_heard = time.time()
+ self.core_detected_at = None
+ self.testcases_by_id = {}
+ self.testclasess_with_core = {}
+ for testcase in self.testcase_suite:
+ self.testcases_by_id[testcase.id()] = testcase
+ self.result = TestResult(testcase_suite, self.testcases_by_id)
+
+ @property
+ def last_test(self):
+ return self._last_test
+
+ @last_test.setter
+ def last_test(self, test_id):
+ self.last_test_id = test_id
+ if test_id in self.testcases_by_id:
+ testcase = self.testcases_by_id[test_id]
+ self._last_test = testcase.shortDescription()
+ if not self._last_test:
+ self._last_test = str(testcase)
+ else:
+ self._last_test = test_id
+
+ def add_testclass_with_core(self):
+ if self.last_test_id in self.testcases_by_id:
+ test = self.testcases_by_id[self.last_test_id]
+ class_name = unittest.util.strclass(test.__class__)
+ test_name = "'{}' ({})".format(
+ get_test_description(descriptions, test), self.last_test_id
+ )
+ else:
+ test_name = self.last_test_id
+ class_name = re.match(
+ r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name
+ ).groups()[3]
+ if class_name not in self.testclasess_with_core:
+ self.testclasess_with_core[class_name] = (
+ test_name,
+ self.last_test_vpp_binary,
+ self.last_test_temp_dir,
+ )
+
+ def close_pipes(self):
+ self.keep_alive_child_end.close()
+ self.finished_child_end.close()
+ self.result_child_end.close()
+ self.keep_alive_parent_end.close()
+ self.finished_parent_end.close()
+ self.result_parent_end.close()
+
+ def was_successful(self):
+ return self.result.was_successful()
+
+ @property
+ def cpus_used(self):
+ return self.testcase_suite.cpus_used
+
+ def get_assigned_cpus(self):
+ return self.testcase_suite.get_assigned_cpus()
+
+
+def stdouterr_reader_wrapper(
+ unread_testcases, finished_unread_testcases, read_testcases
+):
+ read_testcase = None
+ while read_testcases.is_set() or unread_testcases:
+ if finished_unread_testcases:
+ read_testcase = finished_unread_testcases.pop()
+ unread_testcases.remove(read_testcase)
+ elif unread_testcases:
+ read_testcase = unread_testcases.pop()
+ if read_testcase:
+ data = ""
+ while data is not None:
+ sys.stdout.write(data)
+ data = read_testcase.stdouterr_queue.get()
+
+ read_testcase.stdouterr_queue.close()
+ finished_unread_testcases.discard(read_testcase)
+ read_testcase = None
+
+
+def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
+ if last_test_temp_dir:
+ # Need to create link in case of a timeout or core dump without failure
+ lttd = os.path.basename(last_test_temp_dir)
+ link_path = os.path.join(config.failed_dir, f"{lttd}-FAILED")
+ if not os.path.exists(link_path):
+ os.symlink(last_test_temp_dir, link_path)
+ logger.error(
+ "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)
+ )
+
+ # Report core existence
+ core_path = get_core_path(last_test_temp_dir)
+ if os.path.exists(core_path):
+ logger.error(
+ "Core-file exists in test temporary directory: %s!" % core_path
+ )
+ check_core_path(logger, core_path)
+ logger.debug("Running 'file %s':" % core_path)
+ try:
+ info = check_output(["file", core_path])
+ logger.debug(info)
+ except CalledProcessError as e:
+ logger.error(
+ "Subprocess returned with return code "
+ "while running `file' utility on core-file "
+ "returned: "
+ "rc=%s",
+ e.returncode,
+ )
+ except OSError as e:
+ logger.error(
+ "Subprocess returned with OS error while "
+ "running 'file' utility "
+ "on core-file: "
+ "(%s) %s",
+ e.errno,
+ e.strerror,
+ )
+ except Exception as e:
+ logger.exception("Unexpected error running `file' utility on core-file")
+ logger.error(f"gdb {vpp_binary} {core_path}")
+
+ if vpp_pid:
+ # Copy api post mortem
+ api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
+ if os.path.isfile(api_post_mortem_path):
+ logger.error(
+ "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)
+ )
+ shutil.copy2(api_post_mortem_path, last_test_temp_dir)
+
+
+def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
+ if is_core_present(tempdir):
+ if debug_core:
+ print(
+ "VPP core detected in %s. Last test running was %s"
+ % (tempdir, core_crash_test)
+ )
+ print(single_line_delim)
+ spawn_gdb(vpp_binary, get_core_path(tempdir))
+ print(single_line_delim)
+ elif config.compress_core:
+ print("Compressing core-file in test directory `%s'" % tempdir)
+ os.system("gzip %s" % get_core_path(tempdir))
+
+
+def handle_cores(failed_testcases):
+ for failed_testcase in failed_testcases:
+ tcs_with_core = failed_testcase.testclasess_with_core
+ if tcs_with_core:
+ for test, vpp_binary, tempdir in tcs_with_core.values():
+ check_and_handle_core(vpp_binary, tempdir, test)
+
+
+def process_finished_testsuite(
+ wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results
+):
+ results.append(wrapped_testcase_suite.result)
+ finished_testcase_suites.add(wrapped_testcase_suite)
+ stop_run = False
+ if config.failfast and not wrapped_testcase_suite.was_successful():
+ stop_run = True
+
+ if not wrapped_testcase_suite.was_successful():
+ failed_wrapped_testcases.add(wrapped_testcase_suite)
+ handle_failed_suite(
+ wrapped_testcase_suite.logger,
+ wrapped_testcase_suite.last_test_temp_dir,
+ wrapped_testcase_suite.vpp_pid,
+ wrapped_testcase_suite.last_test_vpp_binary,
+ )
+
+ return stop_run
+
+
+def run_forked(testcase_suites):
+ wrapped_testcase_suites = set()
+ solo_testcase_suites = []
+
+ # suites are unhashable, need to use list
+ results = []
+ unread_testcases = set()
+ finished_unread_testcases = set()
+ manager = StreamQueueManager()
+ manager.start()
+ tests_running = 0
+ free_cpus = list(available_cpus)
+
+ def on_suite_start(tc):
+ nonlocal tests_running
+ nonlocal free_cpus
+ tests_running = tests_running + 1
+
+ def on_suite_finish(tc):
+ nonlocal tests_running
+ nonlocal free_cpus
+ tests_running = tests_running - 1
+ assert tests_running >= 0
+ free_cpus.extend(tc.get_assigned_cpus())
+
+ def run_suite(suite):
+ nonlocal manager
+ nonlocal wrapped_testcase_suites
+ nonlocal unread_testcases
+ nonlocal free_cpus
+ suite.assign_cpus(free_cpus[: suite.cpus_used])
+ free_cpus = free_cpus[suite.cpus_used :]
+ wrapper = TestCaseWrapper(suite, manager)
+ wrapped_testcase_suites.add(wrapper)
+ unread_testcases.add(wrapper)
+ on_suite_start(suite)
+
+ def can_run_suite(suite):
+ return tests_running < max_concurrent_tests and (
+ suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus
+ )
+
+ while free_cpus and testcase_suites:
+ a_suite = testcase_suites[0]
+ if a_suite.is_tagged_run_solo:
+ a_suite = testcase_suites.pop(0)
+ solo_testcase_suites.append(a_suite)
+ continue
+ if can_run_suite(a_suite):
+ a_suite = testcase_suites.pop(0)
+ run_suite(a_suite)
+ else:
+ break
+
+ if tests_running == 0 and solo_testcase_suites:
+ a_suite = solo_testcase_suites.pop(0)
+ run_suite(a_suite)
+
+ read_from_testcases = threading.Event()
+ read_from_testcases.set()
+ stdouterr_thread = threading.Thread(
+ target=stdouterr_reader_wrapper,
+ args=(unread_testcases, finished_unread_testcases, read_from_testcases),
+ )
+ stdouterr_thread.start()
+
+ failed_wrapped_testcases = set()
+ stop_run = False
+
+ try:
+ while wrapped_testcase_suites or testcase_suites:
+ finished_testcase_suites = set()
+ for wrapped_testcase_suite in wrapped_testcase_suites:
+ while wrapped_testcase_suite.result_parent_end.poll():
+ wrapped_testcase_suite.result.process_result(
+ *wrapped_testcase_suite.result_parent_end.recv()
+ )
+ wrapped_testcase_suite.last_heard = time.time()
+
+ while wrapped_testcase_suite.keep_alive_parent_end.poll():
+ (
+ wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_vpp_binary,
+ wrapped_testcase_suite.last_test_temp_dir,
+ wrapped_testcase_suite.vpp_pid,
+ ) = wrapped_testcase_suite.keep_alive_parent_end.recv()
+ wrapped_testcase_suite.last_heard = time.time()
+
+ if wrapped_testcase_suite.finished_parent_end.poll():
+ wrapped_testcase_suite.finished_parent_end.recv()
+ wrapped_testcase_suite.last_heard = time.time()
+ stop_run = (
+ process_finished_testsuite(
+ wrapped_testcase_suite,
+ finished_testcase_suites,
+ failed_wrapped_testcases,
+ results,
+ )
+ or stop_run
+ )
+ continue
+
+ fail = False
+ if wrapped_testcase_suite.last_heard + config.timeout < time.time():
+ fail = True
+ wrapped_testcase_suite.logger.critical(
+ "Child test runner process timed out "
+ "(last test running was `%s' in `%s')!"
+ % (
+ wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_temp_dir,
+ )
+ )
+ elif not wrapped_testcase_suite.child.is_alive():
+ fail = True
+ wrapped_testcase_suite.logger.critical(
+ "Child test runner process unexpectedly died "
+ "(last test running was `%s' in `%s')!"
+ % (
+ wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_temp_dir,
+ )
+ )
+ elif (
+ wrapped_testcase_suite.last_test_temp_dir
+ and wrapped_testcase_suite.last_test_vpp_binary
+ ):
+ if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
+ wrapped_testcase_suite.add_testclass_with_core()
+ if wrapped_testcase_suite.core_detected_at is None:
+ wrapped_testcase_suite.core_detected_at = time.time()
+ elif (
+ wrapped_testcase_suite.core_detected_at + core_timeout
+ < time.time()
+ ):
+ wrapped_testcase_suite.logger.critical(
+ "Child test runner process unresponsive and "
+ "core-file exists in test temporary directory "
+ "(last test running was `%s' in `%s')!"
+ % (
+ wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_temp_dir,
+ )
+ )
+ fail = True
+
+ if fail:
+ wrapped_testcase_suite.child.terminate()
+ try:
+ # terminating the child process tends to leave orphan
+ # VPP process around
+ if wrapped_testcase_suite.vpp_pid:
+ os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
+ except OSError:
+ # already dead
+ pass
+ wrapped_testcase_suite.result.crashed = True
+ wrapped_testcase_suite.result.process_result(
+ wrapped_testcase_suite.last_test_id, TestResultCode.ERROR
+ )
+ stop_run = (
+ process_finished_testsuite(
+ wrapped_testcase_suite,
+ finished_testcase_suites,
+ failed_wrapped_testcases,
+ results,
+ )
+ or stop_run
+ )
+
+ for finished_testcase in finished_testcase_suites:
+ # Somewhat surprisingly, the join below may
+ # timeout, even if client signaled that
+ # it finished - so we note it just in case.
+ join_start = time.time()
+ finished_testcase.child.join(test_finished_join_timeout)
+ join_end = time.time()
+ if join_end - join_start >= test_finished_join_timeout:
+ finished_testcase.logger.error(
+ "Timeout joining finished test: %s (pid %d)"
+ % (finished_testcase.last_test, finished_testcase.child.pid)
+ )
+ finished_testcase.close_pipes()
+ wrapped_testcase_suites.remove(finished_testcase)
+ finished_unread_testcases.add(finished_testcase)
+ finished_testcase.stdouterr_queue.put(None)
+ on_suite_finish(finished_testcase)
+ if stop_run:
+ while testcase_suites:
+ results.append(TestResult(testcase_suites.pop(0)))
+ elif testcase_suites:
+ a_suite = testcase_suites[0]
+ while a_suite and a_suite.is_tagged_run_solo:
+ testcase_suites.pop(0)
+ solo_testcase_suites.append(a_suite)
+ if testcase_suites:
+ a_suite = testcase_suites[0]
+ else:
+ a_suite = None
+ if a_suite and can_run_suite(a_suite):
+ testcase_suites.pop(0)
+ run_suite(a_suite)
+ if solo_testcase_suites and tests_running == 0:
+ a_suite = solo_testcase_suites.pop(0)
+ run_suite(a_suite)
+ time.sleep(0.1)
+ except Exception:
+ for wrapped_testcase_suite in wrapped_testcase_suites:
+ wrapped_testcase_suite.child.terminate()
+ wrapped_testcase_suite.stdouterr_queue.put(None)
+ raise
+ finally:
+ read_from_testcases.clear()
+ stdouterr_thread.join(config.timeout)
+ manager.shutdown()
+
+ handle_cores(failed_wrapped_testcases)
+ return results
+
+
+class TestSuiteWrapper(unittest.TestSuite):
+ cpus_used = 0
+
+ def __init__(self):
+ return super().__init__()
+
+ def addTest(self, test):
+ self.cpus_used = max(self.cpus_used, test.get_cpus_required())
+ super().addTest(test)
+
+ def assign_cpus(self, cpus):
+ self.cpus = cpus
+
+ def _handleClassSetUp(self, test, result):
+ if not test.__class__.skipped_due_to_cpu_lack:
+ test.assign_cpus(self.cpus)
+ super()._handleClassSetUp(test, result)
+
+ def get_assigned_cpus(self):
+ return self.cpus
+
+
+class SplitToSuitesCallback:
+ def __init__(self, filter_callback):
+ self.suites = {}
+ self.suite_name = "default"
+ self.filter_callback = filter_callback
+ self.filtered = TestSuiteWrapper()