13 from multiprocessing import Process, Pipe, get_context
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
16 from config import config, num_cpus, available_cpus, max_vpp_cpus
17 from asfframework import (
19 get_testcase_doc_name,
21 get_failed_testcase_linkname,
24 from framework import VppTestCase
25 from test_result_code import TestResultCode
26 from debug import spawn_gdb
36 from discover_tests import discover_tests
38 from subprocess import check_output, CalledProcessError
39 from util import check_core_path, get_core_path, is_core_present
41 # timeout which controls how long the child has to finish after seeing
42 # a core dump in test temporary directory. If this is exceeded, parent assumes
43 # that child process is stuck (e.g. waiting for event from vpp) and kill
48 class StreamQueue(Queue):
53 sys.__stdout__.flush()
54 sys.__stderr__.flush()
57 return self._writer.fileno()
60 class StreamQueueManager(BaseManager):
64 StreamQueueManager.register("StreamQueue", StreamQueue)
67 class TestResult(dict):
68 def __init__(self, testcase_suite, testcases_by_id=None):
69 super(TestResult, self).__init__()
70 for trc in list(TestResultCode):
73 self.testcase_suite = testcase_suite
74 self.testcases = [testcase for testcase in testcase_suite]
75 self.testcases_by_id = testcases_by_id
77 def was_successful(self):
80 == len(self[TestResultCode.FAIL])
81 == len(self[TestResultCode.ERROR])
82 == len(self[TestResultCode.UNEXPECTED_PASS])
83 and len(self[TestResultCode.PASS])
84 + len(self[TestResultCode.SKIP])
85 + len(self[TestResultCode.SKIP_CPU_SHORTAGE])
86 + len(self[TestResultCode.EXPECTED_FAIL])
87 == self.testcase_suite.countTestCases()
90 def no_tests_run(self):
91 return 0 == len(self[TestResultCode.TEST_RUN])
93 def process_result(self, test_id, result):
94 self[result].append(test_id)
96 def suite_from_failed(self):
98 for testcase in self.testcase_suite:
102 not in self[TestResultCode.PASS]
103 + self[TestResultCode.SKIP]
104 + self[TestResultCode.SKIP_CPU_SHORTAGE]
105 + self[TestResultCode.EXPECTED_FAIL]
109 return suite_from_failed(self.testcase_suite, rerun_ids)
111 def get_testcase_names(self, test_id):
112 # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
113 setup_teardown_match = re.match(
114 r"((tearDownClass)|(setUpClass)) \((.+\..+)\)", test_id
116 if setup_teardown_match:
117 test_name, _, _, testcase_name = setup_teardown_match.groups()
118 if len(testcase_name.split(".")) == 2:
119 for key in self.testcases_by_id.keys():
120 if key.startswith(testcase_name):
123 testcase_name = self._get_testcase_doc_name(testcase_name)
125 test_name = self._get_test_description(test_id)
126 testcase_name = self._get_testcase_doc_name(test_id)
128 return testcase_name, test_name
130 def _get_test_description(self, test_id):
131 if test_id in self.testcases_by_id:
132 desc = get_test_description(descriptions, self.testcases_by_id[test_id])
137 def _get_testcase_doc_name(self, test_id):
138 if test_id in self.testcases_by_id:
139 doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
145 def test_runner_wrapper(
146 suite, keep_alive_pipe, stdouterr_queue, finished_pipe, result_pipe, logger
148 sys.stdout = stdouterr_queue
149 sys.stderr = stdouterr_queue
150 VppTestCase.parallel_handler = logger.handlers[0]
151 result = VppTestRunner(
152 keep_alive_pipe=keep_alive_pipe,
153 descriptions=descriptions,
154 verbosity=config.verbose,
155 result_pipe=result_pipe,
156 failfast=config.failfast,
159 finished_pipe.send(result.wasSuccessful())
160 finished_pipe.close()
161 keep_alive_pipe.close()
164 class TestCaseWrapper(object):
165 def __init__(self, testcase_suite, manager):
166 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False)
167 self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
168 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
169 self.testcase_suite = testcase_suite
170 self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
171 self.logger = get_parallel_logger(self.stdouterr_queue)
172 self.child = Process(
173 target=test_runner_wrapper,
176 self.keep_alive_child_end,
177 self.stdouterr_queue,
178 self.finished_child_end,
179 self.result_child_end,
184 self.last_test_temp_dir = None
185 self.last_test_vpp_binary = None
186 self._last_test = None
187 self.last_test_id = None
189 self.last_heard = time.time()
190 self.core_detected_at = None
191 self.testcases_by_id = {}
192 self.testclasess_with_core = {}
193 for testcase in self.testcase_suite:
194 self.testcases_by_id[testcase.id()] = testcase
195 self.result = TestResult(testcase_suite, self.testcases_by_id)
199 return self._last_test
202 def last_test(self, test_id):
203 self.last_test_id = test_id
204 if test_id in self.testcases_by_id:
205 testcase = self.testcases_by_id[test_id]
206 self._last_test = testcase.shortDescription()
207 if not self._last_test:
208 self._last_test = str(testcase)
210 self._last_test = test_id
212 def add_testclass_with_core(self):
213 if self.last_test_id in self.testcases_by_id:
214 test = self.testcases_by_id[self.last_test_id]
215 class_name = unittest.util.strclass(test.__class__)
216 test_name = "'{}' ({})".format(
217 get_test_description(descriptions, test), self.last_test_id
220 test_name = self.last_test_id
221 class_name = re.match(
222 r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name
224 if class_name not in self.testclasess_with_core:
225 self.testclasess_with_core[class_name] = (
227 self.last_test_vpp_binary,
228 self.last_test_temp_dir,
231 def close_pipes(self):
232 self.keep_alive_child_end.close()
233 self.finished_child_end.close()
234 self.result_child_end.close()
235 self.keep_alive_parent_end.close()
236 self.finished_parent_end.close()
237 self.result_parent_end.close()
239 def was_successful(self):
240 return self.result.was_successful()
244 return self.testcase_suite.cpus_used
246 def get_assigned_cpus(self):
247 return self.testcase_suite.get_assigned_cpus()
250 def stdouterr_reader_wrapper(
251 unread_testcases, finished_unread_testcases, read_testcases
254 while read_testcases.is_set() or unread_testcases:
255 if finished_unread_testcases:
256 read_testcase = finished_unread_testcases.pop()
257 unread_testcases.remove(read_testcase)
258 elif unread_testcases:
259 read_testcase = unread_testcases.pop()
262 while data is not None:
263 sys.stdout.write(data)
264 data = read_testcase.stdouterr_queue.get()
266 read_testcase.stdouterr_queue.close()
267 finished_unread_testcases.discard(read_testcase)
271 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
272 if last_test_temp_dir:
273 # Need to create link in case of a timeout or core dump without failure
274 lttd = os.path.basename(last_test_temp_dir)
275 link_path = os.path.join(config.failed_dir, f"{lttd}-FAILED")
276 if not os.path.exists(link_path):
277 os.symlink(last_test_temp_dir, link_path)
279 "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)
282 # Report core existence
283 core_path = get_core_path(last_test_temp_dir)
284 if os.path.exists(core_path):
286 "Core-file exists in test temporary directory: %s!" % core_path
288 check_core_path(logger, core_path)
289 logger.debug("Running 'file %s':" % core_path)
291 info = check_output(["file", core_path])
293 except CalledProcessError as e:
295 "Subprocess returned with return code "
296 "while running `file' utility on core-file "
303 "Subprocess returned with OS error while "
304 "running 'file' utility "
310 except Exception as e:
311 logger.exception("Unexpected error running `file' utility on core-file")
312 logger.error(f"gdb {vpp_binary} {core_path}")
315 # Copy api post mortem
316 api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
317 if os.path.isfile(api_post_mortem_path):
319 "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)
321 shutil.copy2(api_post_mortem_path, last_test_temp_dir)
324 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
325 if is_core_present(tempdir):
328 "VPP core detected in %s. Last test running was %s"
329 % (tempdir, core_crash_test)
331 print(single_line_delim)
332 spawn_gdb(vpp_binary, get_core_path(tempdir))
333 print(single_line_delim)
334 elif config.compress_core:
335 print("Compressing core-file in test directory `%s'" % tempdir)
336 os.system("gzip %s" % get_core_path(tempdir))
339 def handle_cores(failed_testcases):
340 for failed_testcase in failed_testcases:
341 tcs_with_core = failed_testcase.testclasess_with_core
343 for test, vpp_binary, tempdir in tcs_with_core.values():
344 check_and_handle_core(vpp_binary, tempdir, test)
347 def process_finished_testsuite(
348 wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results
350 results.append(wrapped_testcase_suite.result)
351 finished_testcase_suites.add(wrapped_testcase_suite)
353 if config.failfast and not wrapped_testcase_suite.was_successful():
356 if not wrapped_testcase_suite.was_successful():
357 failed_wrapped_testcases.add(wrapped_testcase_suite)
359 wrapped_testcase_suite.logger,
360 wrapped_testcase_suite.last_test_temp_dir,
361 wrapped_testcase_suite.vpp_pid,
362 wrapped_testcase_suite.last_test_vpp_binary,
368 def run_forked(testcase_suites):
369 wrapped_testcase_suites = set()
370 solo_testcase_suites = []
372 # suites are unhashable, need to use list
374 unread_testcases = set()
375 finished_unread_testcases = set()
376 manager = StreamQueueManager()
379 free_cpus = list(available_cpus)
381 def on_suite_start(tc):
382 nonlocal tests_running
384 tests_running = tests_running + 1
386 def on_suite_finish(tc):
387 nonlocal tests_running
389 tests_running = tests_running - 1
390 assert tests_running >= 0
391 free_cpus.extend(tc.get_assigned_cpus())
393 def run_suite(suite):
395 nonlocal wrapped_testcase_suites
396 nonlocal unread_testcases
398 suite.assign_cpus(free_cpus[: suite.cpus_used])
399 free_cpus = free_cpus[suite.cpus_used :]
400 wrapper = TestCaseWrapper(suite, manager)
401 wrapped_testcase_suites.add(wrapper)
402 unread_testcases.add(wrapper)
403 on_suite_start(suite)
405 def can_run_suite(suite):
406 return tests_running < max_concurrent_tests and (
407 suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus
410 while free_cpus and testcase_suites:
411 a_suite = testcase_suites[0]
412 if a_suite.is_tagged_run_solo:
413 a_suite = testcase_suites.pop(0)
414 solo_testcase_suites.append(a_suite)
416 if can_run_suite(a_suite):
417 a_suite = testcase_suites.pop(0)
422 if tests_running == 0 and solo_testcase_suites:
423 a_suite = solo_testcase_suites.pop(0)
426 read_from_testcases = threading.Event()
427 read_from_testcases.set()
428 stdouterr_thread = threading.Thread(
429 target=stdouterr_reader_wrapper,
430 args=(unread_testcases, finished_unread_testcases, read_from_testcases),
432 stdouterr_thread.start()
434 failed_wrapped_testcases = set()
438 while wrapped_testcase_suites or testcase_suites:
439 finished_testcase_suites = set()
440 for wrapped_testcase_suite in wrapped_testcase_suites:
441 while wrapped_testcase_suite.result_parent_end.poll():
442 wrapped_testcase_suite.result.process_result(
443 *wrapped_testcase_suite.result_parent_end.recv()
445 wrapped_testcase_suite.last_heard = time.time()
447 while wrapped_testcase_suite.keep_alive_parent_end.poll():
449 wrapped_testcase_suite.last_test,
450 wrapped_testcase_suite.last_test_vpp_binary,
451 wrapped_testcase_suite.last_test_temp_dir,
452 wrapped_testcase_suite.vpp_pid,
453 ) = wrapped_testcase_suite.keep_alive_parent_end.recv()
454 wrapped_testcase_suite.last_heard = time.time()
456 if wrapped_testcase_suite.finished_parent_end.poll():
457 wrapped_testcase_suite.finished_parent_end.recv()
458 wrapped_testcase_suite.last_heard = time.time()
460 process_finished_testsuite(
461 wrapped_testcase_suite,
462 finished_testcase_suites,
463 failed_wrapped_testcases,
471 if wrapped_testcase_suite.last_heard + config.timeout < time.time():
473 wrapped_testcase_suite.logger.critical(
474 "Child test runner process timed out "
475 "(last test running was `%s' in `%s')!"
477 wrapped_testcase_suite.last_test,
478 wrapped_testcase_suite.last_test_temp_dir,
481 elif not wrapped_testcase_suite.child.is_alive():
483 wrapped_testcase_suite.logger.critical(
484 "Child test runner process unexpectedly died "
485 "(last test running was `%s' in `%s')!"
487 wrapped_testcase_suite.last_test,
488 wrapped_testcase_suite.last_test_temp_dir,
492 wrapped_testcase_suite.last_test_temp_dir
493 and wrapped_testcase_suite.last_test_vpp_binary
495 if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
496 wrapped_testcase_suite.add_testclass_with_core()
497 if wrapped_testcase_suite.core_detected_at is None:
498 wrapped_testcase_suite.core_detected_at = time.time()
500 wrapped_testcase_suite.core_detected_at + core_timeout
503 wrapped_testcase_suite.logger.critical(
504 "Child test runner process unresponsive and "
505 "core-file exists in test temporary directory "
506 "(last test running was `%s' in `%s')!"
508 wrapped_testcase_suite.last_test,
509 wrapped_testcase_suite.last_test_temp_dir,
515 wrapped_testcase_suite.child.terminate()
517 # terminating the child process tends to leave orphan
519 if wrapped_testcase_suite.vpp_pid:
520 os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
524 wrapped_testcase_suite.result.crashed = True
525 wrapped_testcase_suite.result.process_result(
526 wrapped_testcase_suite.last_test_id, TestResultCode.ERROR
529 process_finished_testsuite(
530 wrapped_testcase_suite,
531 finished_testcase_suites,
532 failed_wrapped_testcases,
538 for finished_testcase in finished_testcase_suites:
539 # Somewhat surprisingly, the join below may
540 # timeout, even if client signaled that
541 # it finished - so we note it just in case.
542 join_start = time.time()
543 finished_testcase.child.join(test_finished_join_timeout)
544 join_end = time.time()
545 if join_end - join_start >= test_finished_join_timeout:
546 finished_testcase.logger.error(
547 "Timeout joining finished test: %s (pid %d)"
548 % (finished_testcase.last_test, finished_testcase.child.pid)
550 finished_testcase.close_pipes()
551 wrapped_testcase_suites.remove(finished_testcase)
552 finished_unread_testcases.add(finished_testcase)
553 finished_testcase.stdouterr_queue.put(None)
554 on_suite_finish(finished_testcase)
556 while testcase_suites:
557 results.append(TestResult(testcase_suites.pop(0)))
558 elif testcase_suites:
559 a_suite = testcase_suites[0]
560 while a_suite and a_suite.is_tagged_run_solo:
561 testcase_suites.pop(0)
562 solo_testcase_suites.append(a_suite)
564 a_suite = testcase_suites[0]
567 if a_suite and can_run_suite(a_suite):
568 testcase_suites.pop(0)
570 if solo_testcase_suites and tests_running == 0:
571 a_suite = solo_testcase_suites.pop(0)
575 for wrapped_testcase_suite in wrapped_testcase_suites:
576 wrapped_testcase_suite.child.terminate()
577 wrapped_testcase_suite.stdouterr_queue.put(None)
580 read_from_testcases.clear()
581 stdouterr_thread.join(config.timeout)
584 handle_cores(failed_wrapped_testcases)
588 class TestSuiteWrapper(unittest.TestSuite):
592 return super().__init__()
594 def addTest(self, test):
595 self.cpus_used = max(self.cpus_used, test.get_cpus_required())
596 super().addTest(test)
598 def assign_cpus(self, cpus):
601 def _handleClassSetUp(self, test, result):
602 if not test.__class__.skipped_due_to_cpu_lack:
603 test.assign_cpus(self.cpus)
604 super()._handleClassSetUp(test, result)
606 def get_assigned_cpus(self):
610 class SplitToSuitesCallback:
611 def __init__(self, filter_callback):
613 self.suite_name = "default"
614 self.filter_callback = filter_callback
615 self.filtered = TestSuiteWrapper()
617 def __call__(self, file_name, cls, method):
618 test_method = cls(method)
619 if self.filter_callback(file_name, cls.__name__, method):
620 self.suite_name = file_name + cls.__name__
621 if self.suite_name not in self.suites:
622 self.suites[self.suite_name] = TestSuiteWrapper()
623 self.suites[self.suite_name].is_tagged_run_solo = False
624 self.suites[self.suite_name].addTest(test_method)
625 if test_method.is_tagged_run_solo():
626 self.suites[self.suite_name].is_tagged_run_solo = True
629 self.filtered.addTest(test_method)
632 def parse_test_filter(test_filter):
634 filter_file_name = None
635 filter_class_name = None
636 filter_func_name = None
641 raise Exception(f"Invalid test filter: {test_filter}")
643 if parts[2] not in ("*", ""):
644 filter_func_name = parts[2]
645 if parts[1] not in ("*", ""):
646 filter_class_name = parts[1]
647 if parts[0] not in ("*", ""):
648 if parts[0].startswith("test_"):
649 filter_file_name = parts[0]
651 filter_file_name = "test_%s" % parts[0]
653 if f.startswith("test_"):
656 filter_file_name = "test_%s" % f
658 filter_file_name = "%s.py" % filter_file_name
659 return filter_file_name, filter_class_name, filter_func_name
662 def filter_tests(tests, filter_cb):
663 result = TestSuiteWrapper()
665 if isinstance(t, unittest.suite.TestSuite):
666 # this is a bunch of tests, recursively filter...
667 x = filter_tests(t, filter_cb)
668 if x.countTestCases() > 0:
670 elif isinstance(t, unittest.TestCase):
671 # this is a single test
672 parts = t.id().split(".")
673 # t.id() for common cases like this:
674 # test_classifier.TestClassifier.test_acl_ip
675 # apply filtering only if it is so
677 if not filter_cb(parts[0], parts[1], parts[2]):
681 # unexpected object, don't touch it
686 class FilterByTestOption:
687 def __init__(self, filters):
688 self.filters = filters
690 def __call__(self, file_name, class_name, func_name):
700 fn_match = fnmatch.fnmatch(file_name, filter_file_name)
703 if filter_class_name and class_name != filter_class_name:
705 if filter_func_name and func_name != filter_func_name:
709 for filter_file_name, filter_class_name, filter_func_name in self.filters:
723 class FilterByClassList:
724 def __init__(self, classes_with_filenames):
725 self.classes_with_filenames = classes_with_filenames
727 def __call__(self, file_name, class_name, func_name):
728 return ".".join([file_name, class_name]) in self.classes_with_filenames
731 def suite_from_failed(suite, failed):
732 failed = {x.rsplit(".", 1)[0] for x in failed}
733 filter_cb = FilterByClassList(failed)
734 suite = filter_tests(suite, filter_cb)
738 class AllResults(dict):
740 super(AllResults, self).__init__()
741 self.all_testcases = 0
742 self.results_per_suite = []
743 for trc in list(TestResultCode):
746 self.testsuites_no_tests_run = []
748 def add_results(self, result):
749 self.results_per_suite.append(result)
750 for trc in list(TestResultCode):
751 self[trc] += len(result[trc])
753 def add_result(self, result):
755 self.all_testcases += result.testcase_suite.countTestCases()
756 self.add_results(result)
758 if result.no_tests_run():
759 self.testsuites_no_tests_run.append(result.testcase_suite)
764 elif not result.was_successful():
768 self.rerun.append(result.testcase_suite)
772 def print_results(self):
774 print(double_line_delim)
775 print("TEST RESULTS:")
777 def indent_results(lines):
778 lines = list(filter(None, lines))
779 maximum = max(lines, key=lambda x: x.index(":"))
780 maximum = 4 + maximum.index(":")
782 padding = " " * (maximum - l.index(":"))
783 print(f"{padding}{l}")
787 f"Scheduled tests: {self.all_testcases}",
788 f"Executed tests: {self[TestResultCode.TEST_RUN]}",
789 f"Passed tests: {colorize(self[TestResultCode.PASS], GREEN)}",
790 f"Expected failures: {colorize(self[TestResultCode.EXPECTED_FAIL], GREEN)}"
791 if self[TestResultCode.EXPECTED_FAIL]
793 f"Skipped tests: {colorize(self[TestResultCode.SKIP], YELLOW)}"
794 if self[TestResultCode.SKIP]
796 f"Not Executed tests: {colorize(self.not_executed, RED)}"
799 f"Failures: {colorize(self[TestResultCode.FAIL], RED)}"
800 if self[TestResultCode.FAIL]
802 f"Unexpected passes: {colorize(self[TestResultCode.UNEXPECTED_PASS], RED)}"
803 if self[TestResultCode.UNEXPECTED_PASS]
805 f"Errors: {colorize(self[TestResultCode.ERROR], RED)}"
806 if self[TestResultCode.ERROR]
808 "Tests skipped due to lack of CPUS: "
809 f"{colorize(self[TestResultCode.SKIP_CPU_SHORTAGE], YELLOW)}"
810 if self[TestResultCode.SKIP_CPU_SHORTAGE]
815 if self.all_failed > 0:
816 print("FAILURES AND ERRORS IN TESTS:")
817 for result in self.results_per_suite:
818 old_testcase_name = None
819 for tr_code, headline in (
820 (TestResultCode.FAIL, "FAILURE"),
821 (TestResultCode.ERROR, "ERROR"),
822 (TestResultCode.UNEXPECTED_PASS, "UNEXPECTED PASS"),
824 if not result[tr_code]:
827 for failed_test_id in result[tr_code]:
828 new_testcase_name, test_name = result.get_testcase_names(
831 if new_testcase_name != old_testcase_name:
833 f" Testcase name: {colorize(new_testcase_name, RED)}"
835 old_testcase_name = new_testcase_name
837 f" {headline}: {colorize(test_name, RED)} [{failed_test_id}]"
840 if self.testsuites_no_tests_run:
841 print("TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:")
843 for testsuite in self.testsuites_no_tests_run:
844 for testcase in testsuite:
845 tc_classes.add(get_testcase_doc_name(testcase))
846 for tc_class in tc_classes:
847 print(" {}".format(colorize(tc_class, RED)))
849 if self[TestResultCode.SKIP_CPU_SHORTAGE]:
853 " SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
854 " ENOUGH CPUS AVAILABLE",
858 print(double_line_delim)
862 def not_executed(self):
863 return self.all_testcases - self[TestResultCode.TEST_RUN]
866 def all_failed(self):
868 self[TestResultCode.FAIL]
869 + self[TestResultCode.ERROR]
870 + self[TestResultCode.UNEXPECTED_PASS]
874 def parse_results(results):
876 Prints the number of scheduled, executed, not executed, passed, failed,
877 errored and skipped tests and details about failed and errored tests.
879 Also returns all suites where any test failed.
885 results_per_suite = AllResults()
888 for result in results:
889 result_code = results_per_suite.add_result(result)
892 elif result_code == -1:
895 results_per_suite.print_results()
903 return return_code, results_per_suite.rerun
906 if __name__ == "__main__":
907 print(f"Config is: {config}")
910 print("Running sanity test case.")
912 rc = sanity_run_vpp.main()
915 except Exception as e:
916 print(traceback.format_exc())
917 print("Couldn't run sanity test case.")
920 test_finished_join_timeout = 15
922 debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
923 debug_core = config.debug == "core"
925 run_interactive = debug_gdb or config.step or config.force_foreground
927 max_concurrent_tests = 0
928 print(f"OS reports {num_cpus} available cpu(s).")
930 test_jobs = config.jobs
931 if test_jobs == "auto":
933 max_concurrent_tests = 1
934 print("Interactive mode required, running tests consecutively.")
936 max_concurrent_tests = num_cpus
938 f"Running at most {max_concurrent_tests} python test "
939 "processes concurrently."
942 max_concurrent_tests = test_jobs
944 f"Running at most {max_concurrent_tests} python test processes "
945 "concurrently as set by 'TEST_JOBS'."
948 print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
950 if run_interactive and max_concurrent_tests > 1:
951 raise NotImplementedError(
952 "Running tests interactively (DEBUG is gdb[server] or ATTACH or "
953 "STEP is set) in parallel (TEST_JOBS is more than 1) is not "
959 print("Running tests using custom test runner.")
960 filters = [(parse_test_filter(f)) for f in config.filter.split(",")]
963 "Selected filters: ",
965 f"file={filter_file}, class={filter_class}, function={filter_func}"
966 for filter_file, filter_class, filter_func in filters
970 filter_cb = FilterByTestOption(filters)
972 cb = SplitToSuitesCallback(filter_cb)
973 for d in config.test_src_dir:
974 print("Adding tests from directory tree %s" % d)
975 discover_tests(d, cb)
977 # suites are not hashable, need to use list
980 for testcase_suite in cb.suites.values():
981 tests_amount += testcase_suite.countTestCases()
982 if testcase_suite.cpus_used > max_vpp_cpus:
983 # here we replace test functions with lambdas to just skip them
984 # but we also replace setUp/tearDown functions to do nothing
985 # so that the test can be "started" and "stopped", so that we can
986 # still keep those prints (test description - SKIP), which are done
987 # in stopTest() (for that to trigger, test function must run)
988 for t in testcase_suite:
990 if m.startswith("test_"):
991 setattr(t, m, lambda: t.skipTest("not enough cpus"))
992 setattr(t.__class__, "setUpClass", lambda: None)
993 setattr(t.__class__, "tearDownClass", lambda: None)
994 setattr(t, "setUp", lambda: None)
995 setattr(t, "tearDown", lambda: None)
996 t.__class__.skipped_due_to_cpu_lack = True
997 suites.append(testcase_suite)
1000 "%s out of %s tests match specified filters"
1001 % (tests_amount, tests_amount + cb.filtered.countTestCases())
1004 if not config.extended:
1005 print("Not running extended tests (some tests will be skipped)")
1007 attempts = config.retries + 1
1009 print("Perform %s attempts to pass the suite..." % attempts)
1011 if run_interactive and suites:
1012 # don't fork if requiring interactive terminal
1013 print("Running tests in foreground in the current process")
1014 full_suite = unittest.TestSuite()
1015 free_cpus = list(available_cpus)
1016 cpu_shortage = False
1017 for suite in suites:
1018 if suite.cpus_used <= max_vpp_cpus:
1019 suite.assign_cpus(free_cpus[: suite.cpus_used])
1021 suite.assign_cpus([])
1023 full_suite.addTests(suites)
1024 result = VppTestRunner(
1025 verbosity=config.verbose, failfast=config.failfast, print_summary=True
1027 was_successful = result.wasSuccessful()
1028 if not was_successful:
1029 for test_case_info in result.failed_test_cases_info:
1030 handle_failed_suite(
1031 test_case_info.logger,
1032 test_case_info.tempdir,
1033 test_case_info.vpp_pid,
1036 if test_case_info in result.core_crash_test_cases_info:
1037 check_and_handle_core(
1038 test_case_info.vpp_bin_path,
1039 test_case_info.tempdir,
1040 test_case_info.core_crash_test,
1047 "SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
1048 " ENOUGH CPUS AVAILABLE",
1053 sys.exit(not was_successful)
1056 "Running each VPPTestCase in a separate background process"
1057 f" with at most {max_concurrent_tests} parallel python test "
1061 while suites and attempts > 0:
1062 for suite in suites:
1063 failed_link = get_failed_testcase_linkname(
1065 f"{get_testcase_dirname(suite._tests[0].__class__.__name__)}",
1067 if os.path.islink(failed_link):
1068 os.unlink(failed_link)
1069 results = run_forked(suites)
1070 exit_code, suites = parse_results(results)
1073 print("Test run was successful")
1075 print("%s attempt(s) left." % attempts)