13 from multiprocessing import Process, Pipe, get_context
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
16 from config import config, num_cpus, available_cpus, max_vpp_cpus
17 from framework import (
20 get_testcase_doc_name,
23 from test_result_code import TestResultCode
24 from debug import spawn_gdb
34 from discover_tests import discover_tests
36 from subprocess import check_output, CalledProcessError
37 from util import check_core_path, get_core_path, is_core_present
39 # timeout which controls how long the child has to finish after seeing
40 # a core dump in test temporary directory. If this is exceeded, parent assumes
41 # that child process is stuck (e.g. waiting for event from vpp) and kill
46 class StreamQueue(Queue):
51 sys.__stdout__.flush()
52 sys.__stderr__.flush()
55 return self._writer.fileno()
58 class StreamQueueManager(BaseManager):
62 StreamQueueManager.register("StreamQueue", StreamQueue)
65 class TestResult(dict):
66 def __init__(self, testcase_suite, testcases_by_id=None):
67 super(TestResult, self).__init__()
68 for trc in list(TestResultCode):
71 self.testcase_suite = testcase_suite
72 self.testcases = [testcase for testcase in testcase_suite]
73 self.testcases_by_id = testcases_by_id
75 def was_successful(self):
78 == len(self[TestResultCode.FAIL])
79 == len(self[TestResultCode.ERROR])
80 == len(self[TestResultCode.UNEXPECTED_PASS])
81 and len(self[TestResultCode.PASS])
82 + len(self[TestResultCode.SKIP])
83 + len(self[TestResultCode.SKIP_CPU_SHORTAGE])
84 + len(self[TestResultCode.EXPECTED_FAIL])
85 == self.testcase_suite.countTestCases()
88 def no_tests_run(self):
89 return 0 == len(self[TestResultCode.TEST_RUN])
91 def process_result(self, test_id, result):
92 self[result].append(test_id)
94 def suite_from_failed(self):
96 for testcase in self.testcase_suite:
100 not in self[TestResultCode.PASS]
101 + self[TestResultCode.SKIP]
102 + self[TestResultCode.SKIP_CPU_SHORTAGE]
103 + self[TestResultCode.EXPECTED_FAIL]
107 return suite_from_failed(self.testcase_suite, rerun_ids)
109 def get_testcase_names(self, test_id):
110 # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
111 setup_teardown_match = re.match(
112 r"((tearDownClass)|(setUpClass)) \((.+\..+)\)", test_id
114 if setup_teardown_match:
115 test_name, _, _, testcase_name = setup_teardown_match.groups()
116 if len(testcase_name.split(".")) == 2:
117 for key in self.testcases_by_id.keys():
118 if key.startswith(testcase_name):
121 testcase_name = self._get_testcase_doc_name(testcase_name)
123 test_name = self._get_test_description(test_id)
124 testcase_name = self._get_testcase_doc_name(test_id)
126 return testcase_name, test_name
128 def _get_test_description(self, test_id):
129 if test_id in self.testcases_by_id:
130 desc = get_test_description(descriptions, self.testcases_by_id[test_id])
135 def _get_testcase_doc_name(self, test_id):
136 if test_id in self.testcases_by_id:
137 doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
143 def test_runner_wrapper(
144 suite, keep_alive_pipe, stdouterr_queue, finished_pipe, result_pipe, logger
146 sys.stdout = stdouterr_queue
147 sys.stderr = stdouterr_queue
148 VppTestCase.parallel_handler = logger.handlers[0]
149 result = VppTestRunner(
150 keep_alive_pipe=keep_alive_pipe,
151 descriptions=descriptions,
152 verbosity=config.verbose,
153 result_pipe=result_pipe,
154 failfast=config.failfast,
157 finished_pipe.send(result.wasSuccessful())
158 finished_pipe.close()
159 keep_alive_pipe.close()
162 class TestCaseWrapper(object):
163 def __init__(self, testcase_suite, manager):
164 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False)
165 self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
166 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
167 self.testcase_suite = testcase_suite
168 self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
169 self.logger = get_parallel_logger(self.stdouterr_queue)
170 self.child = Process(
171 target=test_runner_wrapper,
174 self.keep_alive_child_end,
175 self.stdouterr_queue,
176 self.finished_child_end,
177 self.result_child_end,
182 self.last_test_temp_dir = None
183 self.last_test_vpp_binary = None
184 self._last_test = None
185 self.last_test_id = None
187 self.last_heard = time.time()
188 self.core_detected_at = None
189 self.testcases_by_id = {}
190 self.testclasess_with_core = {}
191 for testcase in self.testcase_suite:
192 self.testcases_by_id[testcase.id()] = testcase
193 self.result = TestResult(testcase_suite, self.testcases_by_id)
197 return self._last_test
200 def last_test(self, test_id):
201 self.last_test_id = test_id
202 if test_id in self.testcases_by_id:
203 testcase = self.testcases_by_id[test_id]
204 self._last_test = testcase.shortDescription()
205 if not self._last_test:
206 self._last_test = str(testcase)
208 self._last_test = test_id
210 def add_testclass_with_core(self):
211 if self.last_test_id in self.testcases_by_id:
212 test = self.testcases_by_id[self.last_test_id]
213 class_name = unittest.util.strclass(test.__class__)
214 test_name = "'{}' ({})".format(
215 get_test_description(descriptions, test), self.last_test_id
218 test_name = self.last_test_id
219 class_name = re.match(
220 r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name
222 if class_name not in self.testclasess_with_core:
223 self.testclasess_with_core[class_name] = (
225 self.last_test_vpp_binary,
226 self.last_test_temp_dir,
229 def close_pipes(self):
230 self.keep_alive_child_end.close()
231 self.finished_child_end.close()
232 self.result_child_end.close()
233 self.keep_alive_parent_end.close()
234 self.finished_parent_end.close()
235 self.result_parent_end.close()
237 def was_successful(self):
238 return self.result.was_successful()
242 return self.testcase_suite.cpus_used
244 def get_assigned_cpus(self):
245 return self.testcase_suite.get_assigned_cpus()
248 def stdouterr_reader_wrapper(
249 unread_testcases, finished_unread_testcases, read_testcases
252 while read_testcases.is_set() or unread_testcases:
253 if finished_unread_testcases:
254 read_testcase = finished_unread_testcases.pop()
255 unread_testcases.remove(read_testcase)
256 elif unread_testcases:
257 read_testcase = unread_testcases.pop()
260 while data is not None:
261 sys.stdout.write(data)
262 data = read_testcase.stdouterr_queue.get()
264 read_testcase.stdouterr_queue.close()
265 finished_unread_testcases.discard(read_testcase)
269 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
270 if last_test_temp_dir:
271 # Need to create link in case of a timeout or core dump without failure
272 lttd = os.path.basename(last_test_temp_dir)
273 link_path = os.path.join(config.failed_dir, f"{lttd}-FAILED")
274 if not os.path.exists(link_path):
275 os.symlink(last_test_temp_dir, link_path)
277 "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)
280 # Report core existence
281 core_path = get_core_path(last_test_temp_dir)
282 if os.path.exists(core_path):
284 "Core-file exists in test temporary directory: %s!" % core_path
286 check_core_path(logger, core_path)
287 logger.debug("Running 'file %s':" % core_path)
289 info = check_output(["file", core_path])
291 except CalledProcessError as e:
293 "Subprocess returned with return code "
294 "while running `file' utility on core-file "
301 "Subprocess returned with OS error while "
302 "running 'file' utility "
308 except Exception as e:
309 logger.exception("Unexpected error running `file' utility on core-file")
310 logger.error(f"gdb {vpp_binary} {core_path}")
313 # Copy api post mortem
314 api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
315 if os.path.isfile(api_post_mortem_path):
317 "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)
319 shutil.copy2(api_post_mortem_path, last_test_temp_dir)
322 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
323 if is_core_present(tempdir):
326 "VPP core detected in %s. Last test running was %s"
327 % (tempdir, core_crash_test)
329 print(single_line_delim)
330 spawn_gdb(vpp_binary, get_core_path(tempdir))
331 print(single_line_delim)
332 elif config.compress_core:
333 print("Compressing core-file in test directory `%s'" % tempdir)
334 os.system("gzip %s" % get_core_path(tempdir))
337 def handle_cores(failed_testcases):
338 for failed_testcase in failed_testcases:
339 tcs_with_core = failed_testcase.testclasess_with_core
341 for test, vpp_binary, tempdir in tcs_with_core.values():
342 check_and_handle_core(vpp_binary, tempdir, test)
345 def process_finished_testsuite(
346 wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results
348 results.append(wrapped_testcase_suite.result)
349 finished_testcase_suites.add(wrapped_testcase_suite)
351 if config.failfast and not wrapped_testcase_suite.was_successful():
354 if not wrapped_testcase_suite.was_successful():
355 failed_wrapped_testcases.add(wrapped_testcase_suite)
357 wrapped_testcase_suite.logger,
358 wrapped_testcase_suite.last_test_temp_dir,
359 wrapped_testcase_suite.vpp_pid,
360 wrapped_testcase_suite.last_test_vpp_binary,
366 def run_forked(testcase_suites):
367 wrapped_testcase_suites = set()
368 solo_testcase_suites = []
370 # suites are unhashable, need to use list
372 unread_testcases = set()
373 finished_unread_testcases = set()
374 manager = StreamQueueManager()
377 free_cpus = list(available_cpus)
379 def on_suite_start(tc):
380 nonlocal tests_running
382 tests_running = tests_running + 1
384 def on_suite_finish(tc):
385 nonlocal tests_running
387 tests_running = tests_running - 1
388 assert tests_running >= 0
389 free_cpus.extend(tc.get_assigned_cpus())
391 def run_suite(suite):
393 nonlocal wrapped_testcase_suites
394 nonlocal unread_testcases
396 suite.assign_cpus(free_cpus[: suite.cpus_used])
397 free_cpus = free_cpus[suite.cpus_used :]
398 wrapper = TestCaseWrapper(suite, manager)
399 wrapped_testcase_suites.add(wrapper)
400 unread_testcases.add(wrapper)
401 on_suite_start(suite)
403 def can_run_suite(suite):
404 return tests_running < max_concurrent_tests and (
405 suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus
408 while free_cpus and testcase_suites:
409 a_suite = testcase_suites[0]
410 if a_suite.is_tagged_run_solo:
411 a_suite = testcase_suites.pop(0)
412 solo_testcase_suites.append(a_suite)
414 if can_run_suite(a_suite):
415 a_suite = testcase_suites.pop(0)
420 if tests_running == 0 and solo_testcase_suites:
421 a_suite = solo_testcase_suites.pop(0)
424 read_from_testcases = threading.Event()
425 read_from_testcases.set()
426 stdouterr_thread = threading.Thread(
427 target=stdouterr_reader_wrapper,
428 args=(unread_testcases, finished_unread_testcases, read_from_testcases),
430 stdouterr_thread.start()
432 failed_wrapped_testcases = set()
436 while wrapped_testcase_suites or testcase_suites:
437 finished_testcase_suites = set()
438 for wrapped_testcase_suite in wrapped_testcase_suites:
439 while wrapped_testcase_suite.result_parent_end.poll():
440 wrapped_testcase_suite.result.process_result(
441 *wrapped_testcase_suite.result_parent_end.recv()
443 wrapped_testcase_suite.last_heard = time.time()
445 while wrapped_testcase_suite.keep_alive_parent_end.poll():
447 wrapped_testcase_suite.last_test,
448 wrapped_testcase_suite.last_test_vpp_binary,
449 wrapped_testcase_suite.last_test_temp_dir,
450 wrapped_testcase_suite.vpp_pid,
451 ) = wrapped_testcase_suite.keep_alive_parent_end.recv()
452 wrapped_testcase_suite.last_heard = time.time()
454 if wrapped_testcase_suite.finished_parent_end.poll():
455 wrapped_testcase_suite.finished_parent_end.recv()
456 wrapped_testcase_suite.last_heard = time.time()
458 process_finished_testsuite(
459 wrapped_testcase_suite,
460 finished_testcase_suites,
461 failed_wrapped_testcases,
469 if wrapped_testcase_suite.last_heard + config.timeout < time.time():
471 wrapped_testcase_suite.logger.critical(
472 "Child test runner process timed out "
473 "(last test running was `%s' in `%s')!"
475 wrapped_testcase_suite.last_test,
476 wrapped_testcase_suite.last_test_temp_dir,
479 elif not wrapped_testcase_suite.child.is_alive():
481 wrapped_testcase_suite.logger.critical(
482 "Child test runner process unexpectedly died "
483 "(last test running was `%s' in `%s')!"
485 wrapped_testcase_suite.last_test,
486 wrapped_testcase_suite.last_test_temp_dir,
490 wrapped_testcase_suite.last_test_temp_dir
491 and wrapped_testcase_suite.last_test_vpp_binary
493 if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
494 wrapped_testcase_suite.add_testclass_with_core()
495 if wrapped_testcase_suite.core_detected_at is None:
496 wrapped_testcase_suite.core_detected_at = time.time()
498 wrapped_testcase_suite.core_detected_at + core_timeout
501 wrapped_testcase_suite.logger.critical(
502 "Child test runner process unresponsive and "
503 "core-file exists in test temporary directory "
504 "(last test running was `%s' in `%s')!"
506 wrapped_testcase_suite.last_test,
507 wrapped_testcase_suite.last_test_temp_dir,
513 wrapped_testcase_suite.child.terminate()
515 # terminating the child process tends to leave orphan
517 if wrapped_testcase_suite.vpp_pid:
518 os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
522 wrapped_testcase_suite.result.crashed = True
523 wrapped_testcase_suite.result.process_result(
524 wrapped_testcase_suite.last_test_id, TestResultCode.ERROR
527 process_finished_testsuite(
528 wrapped_testcase_suite,
529 finished_testcase_suites,
530 failed_wrapped_testcases,
536 for finished_testcase in finished_testcase_suites:
537 # Somewhat surprisingly, the join below may
538 # timeout, even if client signaled that
539 # it finished - so we note it just in case.
540 join_start = time.time()
541 finished_testcase.child.join(test_finished_join_timeout)
542 join_end = time.time()
543 if join_end - join_start >= test_finished_join_timeout:
544 finished_testcase.logger.error(
545 "Timeout joining finished test: %s (pid %d)"
546 % (finished_testcase.last_test, finished_testcase.child.pid)
548 finished_testcase.close_pipes()
549 wrapped_testcase_suites.remove(finished_testcase)
550 finished_unread_testcases.add(finished_testcase)
551 finished_testcase.stdouterr_queue.put(None)
552 on_suite_finish(finished_testcase)
554 while testcase_suites:
555 results.append(TestResult(testcase_suites.pop(0)))
556 elif testcase_suites:
557 a_suite = testcase_suites[0]
558 while a_suite and a_suite.is_tagged_run_solo:
559 testcase_suites.pop(0)
560 solo_testcase_suites.append(a_suite)
562 a_suite = testcase_suites[0]
565 if a_suite and can_run_suite(a_suite):
566 testcase_suites.pop(0)
568 if solo_testcase_suites and tests_running == 0:
569 a_suite = solo_testcase_suites.pop(0)
573 for wrapped_testcase_suite in wrapped_testcase_suites:
574 wrapped_testcase_suite.child.terminate()
575 wrapped_testcase_suite.stdouterr_queue.put(None)
578 read_from_testcases.clear()
579 stdouterr_thread.join(config.timeout)
582 handle_cores(failed_wrapped_testcases)
586 class TestSuiteWrapper(unittest.TestSuite):
590 return super().__init__()
592 def addTest(self, test):
593 self.cpus_used = max(self.cpus_used, test.get_cpus_required())
594 super().addTest(test)
596 def assign_cpus(self, cpus):
599 def _handleClassSetUp(self, test, result):
600 if not test.__class__.skipped_due_to_cpu_lack:
601 test.assign_cpus(self.cpus)
602 super()._handleClassSetUp(test, result)
604 def get_assigned_cpus(self):
608 class SplitToSuitesCallback:
609 def __init__(self, filter_callback):
611 self.suite_name = "default"
612 self.filter_callback = filter_callback
613 self.filtered = TestSuiteWrapper()
615 def __call__(self, file_name, cls, method):
616 test_method = cls(method)
617 if self.filter_callback(file_name, cls.__name__, method):
618 self.suite_name = file_name + cls.__name__
619 if self.suite_name not in self.suites:
620 self.suites[self.suite_name] = TestSuiteWrapper()
621 self.suites[self.suite_name].is_tagged_run_solo = False
622 self.suites[self.suite_name].addTest(test_method)
623 if test_method.is_tagged_run_solo():
624 self.suites[self.suite_name].is_tagged_run_solo = True
627 self.filtered.addTest(test_method)
630 def parse_test_filter(test_filter):
632 filter_file_name = None
633 filter_class_name = None
634 filter_func_name = None
639 raise Exception(f"Invalid test filter: {test_filter}")
641 if parts[2] not in ("*", ""):
642 filter_func_name = parts[2]
643 if parts[1] not in ("*", ""):
644 filter_class_name = parts[1]
645 if parts[0] not in ("*", ""):
646 if parts[0].startswith("test_"):
647 filter_file_name = parts[0]
649 filter_file_name = "test_%s" % parts[0]
651 if f.startswith("test_"):
654 filter_file_name = "test_%s" % f
656 filter_file_name = "%s.py" % filter_file_name
657 return filter_file_name, filter_class_name, filter_func_name
660 def filter_tests(tests, filter_cb):
661 result = TestSuiteWrapper()
663 if isinstance(t, unittest.suite.TestSuite):
664 # this is a bunch of tests, recursively filter...
665 x = filter_tests(t, filter_cb)
666 if x.countTestCases() > 0:
668 elif isinstance(t, unittest.TestCase):
669 # this is a single test
670 parts = t.id().split(".")
671 # t.id() for common cases like this:
672 # test_classifier.TestClassifier.test_acl_ip
673 # apply filtering only if it is so
675 if not filter_cb(parts[0], parts[1], parts[2]):
679 # unexpected object, don't touch it
684 class FilterByTestOption:
685 def __init__(self, filters):
686 self.filters = filters
688 def __call__(self, file_name, class_name, func_name):
698 fn_match = fnmatch.fnmatch(file_name, filter_file_name)
701 if filter_class_name and class_name != filter_class_name:
703 if filter_func_name and func_name != filter_func_name:
707 for filter_file_name, filter_class_name, filter_func_name in self.filters:
721 class FilterByClassList:
722 def __init__(self, classes_with_filenames):
723 self.classes_with_filenames = classes_with_filenames
725 def __call__(self, file_name, class_name, func_name):
726 return ".".join([file_name, class_name]) in self.classes_with_filenames
729 def suite_from_failed(suite, failed):
730 failed = {x.rsplit(".", 1)[0] for x in failed}
731 filter_cb = FilterByClassList(failed)
732 suite = filter_tests(suite, filter_cb)
736 class AllResults(dict):
738 super(AllResults, self).__init__()
739 self.all_testcases = 0
740 self.results_per_suite = []
741 for trc in list(TestResultCode):
744 self.testsuites_no_tests_run = []
746 def add_results(self, result):
747 self.results_per_suite.append(result)
748 for trc in list(TestResultCode):
749 self[trc] += len(result[trc])
751 def add_result(self, result):
753 self.all_testcases += result.testcase_suite.countTestCases()
754 self.add_results(result)
756 if result.no_tests_run():
757 self.testsuites_no_tests_run.append(result.testcase_suite)
762 elif not result.was_successful():
766 self.rerun.append(result.testcase_suite)
770 def print_results(self):
772 print(double_line_delim)
773 print("TEST RESULTS:")
775 def indent_results(lines):
776 lines = list(filter(None, lines))
777 maximum = max(lines, key=lambda x: x.index(":"))
778 maximum = 4 + maximum.index(":")
780 padding = " " * (maximum - l.index(":"))
781 print(f"{padding}{l}")
785 f"Scheduled tests: {self.all_testcases}",
786 f"Executed tests: {self[TestResultCode.TEST_RUN]}",
787 f"Passed tests: {colorize(self[TestResultCode.PASS], GREEN)}",
788 f"Expected failures: {colorize(self[TestResultCode.EXPECTED_FAIL], GREEN)}"
789 if self[TestResultCode.EXPECTED_FAIL]
791 f"Skipped tests: {colorize(self[TestResultCode.SKIP], YELLOW)}"
792 if self[TestResultCode.SKIP]
794 f"Not Executed tests: {colorize(self.not_executed, RED)}"
797 f"Failures: {colorize(self[TestResultCode.FAIL], RED)}"
798 if self[TestResultCode.FAIL]
800 f"Unexpected passes: {colorize(self[TestResultCode.UNEXPECTED_PASS], RED)}"
801 if self[TestResultCode.UNEXPECTED_PASS]
803 f"Errors: {colorize(self[TestResultCode.ERROR], RED)}"
804 if self[TestResultCode.ERROR]
806 "Tests skipped due to lack of CPUS: "
807 f"{colorize(self[TestResultCode.SKIP_CPU_SHORTAGE], YELLOW)}"
808 if self[TestResultCode.SKIP_CPU_SHORTAGE]
813 if self.all_failed > 0:
814 print("FAILURES AND ERRORS IN TESTS:")
815 for result in self.results_per_suite:
816 old_testcase_name = None
817 for tr_code, headline in (
818 (TestResultCode.FAIL, "FAILURE"),
819 (TestResultCode.ERROR, "ERROR"),
820 (TestResultCode.UNEXPECTED_PASS, "UNEXPECTED PASS"),
822 if not result[tr_code]:
825 for failed_test_id in result[tr_code]:
826 new_testcase_name, test_name = result.get_testcase_names(
829 if new_testcase_name != old_testcase_name:
831 f" Testcase name: {colorize(new_testcase_name, RED)}"
833 old_testcase_name = new_testcase_name
835 f" {headline}: {colorize(test_name, RED)} [{failed_test_id}]"
838 if self.testsuites_no_tests_run:
839 print("TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:")
841 for testsuite in self.testsuites_no_tests_run:
842 for testcase in testsuite:
843 tc_classes.add(get_testcase_doc_name(testcase))
844 for tc_class in tc_classes:
845 print(" {}".format(colorize(tc_class, RED)))
847 if self[TestResultCode.SKIP_CPU_SHORTAGE]:
851 " SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
852 " ENOUGH CPUS AVAILABLE",
856 print(double_line_delim)
860 def not_executed(self):
861 return self.all_testcases - self[TestResultCode.TEST_RUN]
864 def all_failed(self):
866 self[TestResultCode.FAIL]
867 + self[TestResultCode.ERROR]
868 + self[TestResultCode.UNEXPECTED_PASS]
872 def parse_results(results):
874 Prints the number of scheduled, executed, not executed, passed, failed,
875 errored and skipped tests and details about failed and errored tests.
877 Also returns all suites where any test failed.
883 results_per_suite = AllResults()
886 for result in results:
887 result_code = results_per_suite.add_result(result)
890 elif result_code == -1:
893 results_per_suite.print_results()
901 return return_code, results_per_suite.rerun
904 if __name__ == "__main__":
906 print(f"Config is: {config}")
909 print("Running sanity test case.")
911 rc = sanity_run_vpp.main()
914 except Exception as e:
915 print(traceback.format_exc())
916 print("Couldn't run sanity test case.")
919 test_finished_join_timeout = 15
921 debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
922 debug_core = config.debug == "core"
924 run_interactive = debug_gdb or config.step or config.force_foreground
926 max_concurrent_tests = 0
927 print(f"OS reports {num_cpus} available cpu(s).")
929 test_jobs = config.jobs
930 if test_jobs == "auto":
932 max_concurrent_tests = 1
933 print("Interactive mode required, running tests consecutively.")
935 max_concurrent_tests = num_cpus
937 f"Running at most {max_concurrent_tests} python test "
938 "processes concurrently."
941 max_concurrent_tests = test_jobs
943 f"Running at most {max_concurrent_tests} python test processes "
944 "concurrently as set by 'TEST_JOBS'."
947 print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
949 if run_interactive and max_concurrent_tests > 1:
950 raise NotImplementedError(
951 "Running tests interactively (DEBUG is gdb[server] or ATTACH or "
952 "STEP is set) in parallel (TEST_JOBS is more than 1) is not "
958 print("Running tests using custom test runner.")
959 filters = [(parse_test_filter(f)) for f in config.filter.split(",")]
962 "Selected filters: ",
964 f"file={filter_file}, class={filter_class}, function={filter_func}"
965 for filter_file, filter_class, filter_func in filters
969 filter_cb = FilterByTestOption(filters)
971 cb = SplitToSuitesCallback(filter_cb)
972 for d in config.test_src_dir:
973 print("Adding tests from directory tree %s" % d)
974 discover_tests(d, cb)
976 # suites are not hashable, need to use list
979 for testcase_suite in cb.suites.values():
980 tests_amount += testcase_suite.countTestCases()
981 if testcase_suite.cpus_used > max_vpp_cpus:
982 # here we replace test functions with lambdas to just skip them
983 # but we also replace setUp/tearDown functions to do nothing
984 # so that the test can be "started" and "stopped", so that we can
985 # still keep those prints (test description - SKIP), which are done
986 # in stopTest() (for that to trigger, test function must run)
987 for t in testcase_suite:
989 if m.startswith("test_"):
990 setattr(t, m, lambda: t.skipTest("not enough cpus"))
991 setattr(t.__class__, "setUpClass", lambda: None)
992 setattr(t.__class__, "tearDownClass", lambda: None)
993 setattr(t, "setUp", lambda: None)
994 setattr(t, "tearDown", lambda: None)
995 t.__class__.skipped_due_to_cpu_lack = True
996 suites.append(testcase_suite)
999 "%s out of %s tests match specified filters"
1000 % (tests_amount, tests_amount + cb.filtered.countTestCases())
1003 if not config.extended:
1004 print("Not running extended tests (some tests will be skipped)")
1006 attempts = config.retries + 1
1008 print("Perform %s attempts to pass the suite..." % attempts)
1010 if run_interactive and suites:
1011 # don't fork if requiring interactive terminal
1012 print("Running tests in foreground in the current process")
1013 full_suite = unittest.TestSuite()
1014 free_cpus = list(available_cpus)
1015 cpu_shortage = False
1016 for suite in suites:
1017 if suite.cpus_used <= max_vpp_cpus:
1018 suite.assign_cpus(free_cpus[: suite.cpus_used])
1020 suite.assign_cpus([])
1022 full_suite.addTests(suites)
1023 result = VppTestRunner(
1024 verbosity=config.verbose, failfast=config.failfast, print_summary=True
1026 was_successful = result.wasSuccessful()
1027 if not was_successful:
1028 for test_case_info in result.failed_test_cases_info:
1029 handle_failed_suite(
1030 test_case_info.logger,
1031 test_case_info.tempdir,
1032 test_case_info.vpp_pid,
1035 if test_case_info in result.core_crash_test_cases_info:
1036 check_and_handle_core(
1037 test_case_info.vpp_bin_path,
1038 test_case_info.tempdir,
1039 test_case_info.core_crash_test,
1046 "SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
1047 " ENOUGH CPUS AVAILABLE",
1052 sys.exit(not was_successful)
1055 "Running each VPPTestCase in a separate background process"
1056 f" with at most {max_concurrent_tests} parallel python test "
1060 while suites and attempts > 0:
1061 results = run_forked(suites)
1062 exit_code, suites = parse_results(results)
1065 print("Test run was successful")
1067 print("%s attempt(s) left." % attempts)