13 from multiprocessing import Process, Pipe, get_context
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
16 from config import config, num_cpus, available_cpus, max_vpp_cpus
17 from framework import (
20 get_testcase_doc_name,
29 from debug import spawn_gdb
39 from discover_tests import discover_tests
41 from subprocess import check_output, CalledProcessError
42 from util import check_core_path, get_core_path, is_core_present
44 # timeout which controls how long the child has to finish after seeing
45 # a core dump in test temporary directory. If this is exceeded, parent assumes
46 # that child process is stuck (e.g. waiting for event from vpp) and kill
51 class StreamQueue(Queue):
56 sys.__stdout__.flush()
57 sys.__stderr__.flush()
60 return self._writer.fileno()
63 class StreamQueueManager(BaseManager):
67 StreamQueueManager.register("StreamQueue", StreamQueue)
70 class TestResult(dict):
71 def __init__(self, testcase_suite, testcases_by_id=None):
72 super(TestResult, self).__init__()
77 self[SKIP_CPU_SHORTAGE] = []
80 self.testcase_suite = testcase_suite
81 self.testcases = [testcase for testcase in testcase_suite]
82 self.testcases_by_id = testcases_by_id
84 def was_successful(self):
86 0 == len(self[FAIL]) == len(self[ERROR])
87 and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE])
88 == self.testcase_suite.countTestCases()
91 def no_tests_run(self):
92 return 0 == len(self[TEST_RUN])
94 def process_result(self, test_id, result):
95 self[result].append(test_id)
97 def suite_from_failed(self):
99 for testcase in self.testcase_suite:
100 tc_id = testcase.id()
101 if tc_id not in self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]:
104 return suite_from_failed(self.testcase_suite, rerun_ids)
106 def get_testcase_names(self, test_id):
107 # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
108 setup_teardown_match = re.match(
109 r"((tearDownClass)|(setUpClass)) \((.+\..+)\)", test_id
111 if setup_teardown_match:
112 test_name, _, _, testcase_name = setup_teardown_match.groups()
113 if len(testcase_name.split(".")) == 2:
114 for key in self.testcases_by_id.keys():
115 if key.startswith(testcase_name):
118 testcase_name = self._get_testcase_doc_name(testcase_name)
120 test_name = self._get_test_description(test_id)
121 testcase_name = self._get_testcase_doc_name(test_id)
123 return testcase_name, test_name
125 def _get_test_description(self, test_id):
126 if test_id in self.testcases_by_id:
127 desc = get_test_description(descriptions, self.testcases_by_id[test_id])
132 def _get_testcase_doc_name(self, test_id):
133 if test_id in self.testcases_by_id:
134 doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
140 def test_runner_wrapper(
141 suite, keep_alive_pipe, stdouterr_queue, finished_pipe, result_pipe, logger
143 sys.stdout = stdouterr_queue
144 sys.stderr = stdouterr_queue
145 VppTestCase.parallel_handler = logger.handlers[0]
146 result = VppTestRunner(
147 keep_alive_pipe=keep_alive_pipe,
148 descriptions=descriptions,
149 verbosity=config.verbose,
150 result_pipe=result_pipe,
151 failfast=config.failfast,
154 finished_pipe.send(result.wasSuccessful())
155 finished_pipe.close()
156 keep_alive_pipe.close()
159 class TestCaseWrapper(object):
160 def __init__(self, testcase_suite, manager):
161 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False)
162 self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
163 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
164 self.testcase_suite = testcase_suite
165 self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
166 self.logger = get_parallel_logger(self.stdouterr_queue)
167 self.child = Process(
168 target=test_runner_wrapper,
171 self.keep_alive_child_end,
172 self.stdouterr_queue,
173 self.finished_child_end,
174 self.result_child_end,
179 self.last_test_temp_dir = None
180 self.last_test_vpp_binary = None
181 self._last_test = None
182 self.last_test_id = None
184 self.last_heard = time.time()
185 self.core_detected_at = None
186 self.testcases_by_id = {}
187 self.testclasess_with_core = {}
188 for testcase in self.testcase_suite:
189 self.testcases_by_id[testcase.id()] = testcase
190 self.result = TestResult(testcase_suite, self.testcases_by_id)
194 return self._last_test
197 def last_test(self, test_id):
198 self.last_test_id = test_id
199 if test_id in self.testcases_by_id:
200 testcase = self.testcases_by_id[test_id]
201 self._last_test = testcase.shortDescription()
202 if not self._last_test:
203 self._last_test = str(testcase)
205 self._last_test = test_id
207 def add_testclass_with_core(self):
208 if self.last_test_id in self.testcases_by_id:
209 test = self.testcases_by_id[self.last_test_id]
210 class_name = unittest.util.strclass(test.__class__)
211 test_name = "'{}' ({})".format(
212 get_test_description(descriptions, test), self.last_test_id
215 test_name = self.last_test_id
216 class_name = re.match(
217 r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name
219 if class_name not in self.testclasess_with_core:
220 self.testclasess_with_core[class_name] = (
222 self.last_test_vpp_binary,
223 self.last_test_temp_dir,
226 def close_pipes(self):
227 self.keep_alive_child_end.close()
228 self.finished_child_end.close()
229 self.result_child_end.close()
230 self.keep_alive_parent_end.close()
231 self.finished_parent_end.close()
232 self.result_parent_end.close()
234 def was_successful(self):
235 return self.result.was_successful()
239 return self.testcase_suite.cpus_used
241 def get_assigned_cpus(self):
242 return self.testcase_suite.get_assigned_cpus()
245 def stdouterr_reader_wrapper(
246 unread_testcases, finished_unread_testcases, read_testcases
249 while read_testcases.is_set() or unread_testcases:
250 if finished_unread_testcases:
251 read_testcase = finished_unread_testcases.pop()
252 unread_testcases.remove(read_testcase)
253 elif unread_testcases:
254 read_testcase = unread_testcases.pop()
257 while data is not None:
258 sys.stdout.write(data)
259 data = read_testcase.stdouterr_queue.get()
261 read_testcase.stdouterr_queue.close()
262 finished_unread_testcases.discard(read_testcase)
266 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
267 if last_test_temp_dir:
268 # Need to create link in case of a timeout or core dump without failure
269 lttd = os.path.basename(last_test_temp_dir)
270 link_path = os.path.join(config.failed_dir, f"{lttd}-FAILED")
271 if not os.path.exists(link_path):
272 os.symlink(last_test_temp_dir, link_path)
274 "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)
277 # Report core existence
278 core_path = get_core_path(last_test_temp_dir)
279 if os.path.exists(core_path):
281 "Core-file exists in test temporary directory: %s!" % core_path
283 check_core_path(logger, core_path)
284 logger.debug("Running 'file %s':" % core_path)
286 info = check_output(["file", core_path])
288 except CalledProcessError as e:
290 "Subprocess returned with return code "
291 "while running `file' utility on core-file "
298 "Subprocess returned with OS error while "
299 "running 'file' utility "
305 except Exception as e:
306 logger.exception("Unexpected error running `file' utility on core-file")
307 logger.error(f"gdb {vpp_binary} {core_path}")
310 # Copy api post mortem
311 api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
312 if os.path.isfile(api_post_mortem_path):
314 "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)
316 shutil.copy2(api_post_mortem_path, last_test_temp_dir)
319 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
320 if is_core_present(tempdir):
323 "VPP core detected in %s. Last test running was %s"
324 % (tempdir, core_crash_test)
326 print(single_line_delim)
327 spawn_gdb(vpp_binary, get_core_path(tempdir))
328 print(single_line_delim)
329 elif config.compress_core:
330 print("Compressing core-file in test directory `%s'" % tempdir)
331 os.system("gzip %s" % get_core_path(tempdir))
334 def handle_cores(failed_testcases):
335 for failed_testcase in failed_testcases:
336 tcs_with_core = failed_testcase.testclasess_with_core
338 for test, vpp_binary, tempdir in tcs_with_core.values():
339 check_and_handle_core(vpp_binary, tempdir, test)
342 def process_finished_testsuite(
343 wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results
345 results.append(wrapped_testcase_suite.result)
346 finished_testcase_suites.add(wrapped_testcase_suite)
348 if config.failfast and not wrapped_testcase_suite.was_successful():
351 if not wrapped_testcase_suite.was_successful():
352 failed_wrapped_testcases.add(wrapped_testcase_suite)
354 wrapped_testcase_suite.logger,
355 wrapped_testcase_suite.last_test_temp_dir,
356 wrapped_testcase_suite.vpp_pid,
357 wrapped_testcase_suite.last_test_vpp_binary,
363 def run_forked(testcase_suites):
364 wrapped_testcase_suites = set()
365 solo_testcase_suites = []
367 # suites are unhashable, need to use list
369 unread_testcases = set()
370 finished_unread_testcases = set()
371 manager = StreamQueueManager()
374 free_cpus = list(available_cpus)
376 def on_suite_start(tc):
377 nonlocal tests_running
379 tests_running = tests_running + 1
381 def on_suite_finish(tc):
382 nonlocal tests_running
384 tests_running = tests_running - 1
385 assert tests_running >= 0
386 free_cpus.extend(tc.get_assigned_cpus())
388 def run_suite(suite):
390 nonlocal wrapped_testcase_suites
391 nonlocal unread_testcases
393 suite.assign_cpus(free_cpus[: suite.cpus_used])
394 free_cpus = free_cpus[suite.cpus_used :]
395 wrapper = TestCaseWrapper(suite, manager)
396 wrapped_testcase_suites.add(wrapper)
397 unread_testcases.add(wrapper)
398 on_suite_start(suite)
400 def can_run_suite(suite):
401 return tests_running < max_concurrent_tests and (
402 suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus
405 while free_cpus and testcase_suites:
406 a_suite = testcase_suites[0]
407 if a_suite.is_tagged_run_solo:
408 a_suite = testcase_suites.pop(0)
409 solo_testcase_suites.append(a_suite)
411 if can_run_suite(a_suite):
412 a_suite = testcase_suites.pop(0)
417 if tests_running == 0 and solo_testcase_suites:
418 a_suite = solo_testcase_suites.pop(0)
421 read_from_testcases = threading.Event()
422 read_from_testcases.set()
423 stdouterr_thread = threading.Thread(
424 target=stdouterr_reader_wrapper,
425 args=(unread_testcases, finished_unread_testcases, read_from_testcases),
427 stdouterr_thread.start()
429 failed_wrapped_testcases = set()
433 while wrapped_testcase_suites:
434 finished_testcase_suites = set()
435 for wrapped_testcase_suite in wrapped_testcase_suites:
436 while wrapped_testcase_suite.result_parent_end.poll():
437 wrapped_testcase_suite.result.process_result(
438 *wrapped_testcase_suite.result_parent_end.recv()
440 wrapped_testcase_suite.last_heard = time.time()
442 while wrapped_testcase_suite.keep_alive_parent_end.poll():
444 wrapped_testcase_suite.last_test,
445 wrapped_testcase_suite.last_test_vpp_binary,
446 wrapped_testcase_suite.last_test_temp_dir,
447 wrapped_testcase_suite.vpp_pid,
448 ) = wrapped_testcase_suite.keep_alive_parent_end.recv()
449 wrapped_testcase_suite.last_heard = time.time()
451 if wrapped_testcase_suite.finished_parent_end.poll():
452 wrapped_testcase_suite.finished_parent_end.recv()
453 wrapped_testcase_suite.last_heard = time.time()
455 process_finished_testsuite(
456 wrapped_testcase_suite,
457 finished_testcase_suites,
458 failed_wrapped_testcases,
466 if wrapped_testcase_suite.last_heard + config.timeout < time.time():
468 wrapped_testcase_suite.logger.critical(
469 "Child test runner process timed out "
470 "(last test running was `%s' in `%s')!"
472 wrapped_testcase_suite.last_test,
473 wrapped_testcase_suite.last_test_temp_dir,
476 elif not wrapped_testcase_suite.child.is_alive():
478 wrapped_testcase_suite.logger.critical(
479 "Child test runner process unexpectedly died "
480 "(last test running was `%s' in `%s')!"
482 wrapped_testcase_suite.last_test,
483 wrapped_testcase_suite.last_test_temp_dir,
487 wrapped_testcase_suite.last_test_temp_dir
488 and wrapped_testcase_suite.last_test_vpp_binary
490 if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
491 wrapped_testcase_suite.add_testclass_with_core()
492 if wrapped_testcase_suite.core_detected_at is None:
493 wrapped_testcase_suite.core_detected_at = time.time()
495 wrapped_testcase_suite.core_detected_at + core_timeout
498 wrapped_testcase_suite.logger.critical(
499 "Child test runner process unresponsive and "
500 "core-file exists in test temporary directory "
501 "(last test running was `%s' in `%s')!"
503 wrapped_testcase_suite.last_test,
504 wrapped_testcase_suite.last_test_temp_dir,
510 wrapped_testcase_suite.child.terminate()
512 # terminating the child process tends to leave orphan
514 if wrapped_testcase_suite.vpp_pid:
515 os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
519 wrapped_testcase_suite.result.crashed = True
520 wrapped_testcase_suite.result.process_result(
521 wrapped_testcase_suite.last_test_id, ERROR
524 process_finished_testsuite(
525 wrapped_testcase_suite,
526 finished_testcase_suites,
527 failed_wrapped_testcases,
533 for finished_testcase in finished_testcase_suites:
534 # Somewhat surprisingly, the join below may
535 # timeout, even if client signaled that
536 # it finished - so we note it just in case.
537 join_start = time.time()
538 finished_testcase.child.join(test_finished_join_timeout)
539 join_end = time.time()
540 if join_end - join_start >= test_finished_join_timeout:
541 finished_testcase.logger.error(
542 "Timeout joining finished test: %s (pid %d)"
543 % (finished_testcase.last_test, finished_testcase.child.pid)
545 finished_testcase.close_pipes()
546 wrapped_testcase_suites.remove(finished_testcase)
547 finished_unread_testcases.add(finished_testcase)
548 finished_testcase.stdouterr_queue.put(None)
549 on_suite_finish(finished_testcase)
551 while testcase_suites:
552 results.append(TestResult(testcase_suites.pop(0)))
553 elif testcase_suites:
554 a_suite = testcase_suites.pop(0)
555 while a_suite and a_suite.is_tagged_run_solo:
556 solo_testcase_suites.append(a_suite)
558 a_suite = testcase_suites.pop(0)
561 if a_suite and can_run_suite(a_suite):
563 if solo_testcase_suites and tests_running == 0:
564 a_suite = solo_testcase_suites.pop(0)
568 for wrapped_testcase_suite in wrapped_testcase_suites:
569 wrapped_testcase_suite.child.terminate()
570 wrapped_testcase_suite.stdouterr_queue.put(None)
573 read_from_testcases.clear()
574 stdouterr_thread.join(config.timeout)
577 handle_cores(failed_wrapped_testcases)
581 class TestSuiteWrapper(unittest.TestSuite):
585 return super().__init__()
587 def addTest(self, test):
588 self.cpus_used = max(self.cpus_used, test.get_cpus_required())
589 super().addTest(test)
591 def assign_cpus(self, cpus):
594 def _handleClassSetUp(self, test, result):
595 if not test.__class__.skipped_due_to_cpu_lack:
596 test.assign_cpus(self.cpus)
597 super()._handleClassSetUp(test, result)
599 def get_assigned_cpus(self):
603 class SplitToSuitesCallback:
604 def __init__(self, filter_callback):
606 self.suite_name = "default"
607 self.filter_callback = filter_callback
608 self.filtered = TestSuiteWrapper()
610 def __call__(self, file_name, cls, method):
611 test_method = cls(method)
612 if self.filter_callback(file_name, cls.__name__, method):
613 self.suite_name = file_name + cls.__name__
614 if self.suite_name not in self.suites:
615 self.suites[self.suite_name] = TestSuiteWrapper()
616 self.suites[self.suite_name].is_tagged_run_solo = False
617 self.suites[self.suite_name].addTest(test_method)
618 if test_method.is_tagged_run_solo():
619 self.suites[self.suite_name].is_tagged_run_solo = True
622 self.filtered.addTest(test_method)
625 def parse_test_filter(test_filter):
627 filter_file_name = None
628 filter_class_name = None
629 filter_func_name = None
634 raise Exception("Unrecognized %s option: %s" % (test_option, f))
636 if parts[2] not in ("*", ""):
637 filter_func_name = parts[2]
638 if parts[1] not in ("*", ""):
639 filter_class_name = parts[1]
640 if parts[0] not in ("*", ""):
641 if parts[0].startswith("test_"):
642 filter_file_name = parts[0]
644 filter_file_name = "test_%s" % parts[0]
646 if f.startswith("test_"):
649 filter_file_name = "test_%s" % f
651 filter_file_name = "%s.py" % filter_file_name
652 return filter_file_name, filter_class_name, filter_func_name
655 def filter_tests(tests, filter_cb):
656 result = TestSuiteWrapper()
658 if isinstance(t, unittest.suite.TestSuite):
659 # this is a bunch of tests, recursively filter...
660 x = filter_tests(t, filter_cb)
661 if x.countTestCases() > 0:
663 elif isinstance(t, unittest.TestCase):
664 # this is a single test
665 parts = t.id().split(".")
666 # t.id() for common cases like this:
667 # test_classifier.TestClassifier.test_acl_ip
668 # apply filtering only if it is so
670 if not filter_cb(parts[0], parts[1], parts[2]):
674 # unexpected object, don't touch it
679 class FilterByTestOption:
680 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
681 self.filter_file_name = filter_file_name
682 self.filter_class_name = filter_class_name
683 self.filter_func_name = filter_func_name
685 def __call__(self, file_name, class_name, func_name):
686 if self.filter_file_name:
687 fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
690 if self.filter_class_name and class_name != self.filter_class_name:
692 if self.filter_func_name and func_name != self.filter_func_name:
697 class FilterByClassList:
698 def __init__(self, classes_with_filenames):
699 self.classes_with_filenames = classes_with_filenames
701 def __call__(self, file_name, class_name, func_name):
702 return ".".join([file_name, class_name]) in self.classes_with_filenames
705 def suite_from_failed(suite, failed):
706 failed = {x.rsplit(".", 1)[0] for x in failed}
707 filter_cb = FilterByClassList(failed)
708 suite = filter_tests(suite, filter_cb)
712 class AllResults(dict):
714 super(AllResults, self).__init__()
715 self.all_testcases = 0
716 self.results_per_suite = []
721 self[SKIP_CPU_SHORTAGE] = 0
724 self.testsuites_no_tests_run = []
726 def add_results(self, result):
727 self.results_per_suite.append(result)
728 result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN, SKIP_CPU_SHORTAGE]
729 for result_type in result_types:
730 self[result_type] += len(result[result_type])
732 def add_result(self, result):
734 self.all_testcases += result.testcase_suite.countTestCases()
735 self.add_results(result)
737 if result.no_tests_run():
738 self.testsuites_no_tests_run.append(result.testcase_suite)
743 elif not result.was_successful():
747 self.rerun.append(result.testcase_suite)
751 def print_results(self):
753 print(double_line_delim)
754 print("TEST RESULTS:")
756 def indent_results(lines):
757 lines = list(filter(None, lines))
758 maximum = max(lines, key=lambda x: x.index(":"))
759 maximum = 4 + maximum.index(":")
761 padding = " " * (maximum - l.index(":"))
762 print(f"{padding}{l}")
766 f"Scheduled tests: {self.all_testcases}",
767 f"Executed tests: {self[TEST_RUN]}",
768 f"Passed tests: {colorize(self[PASS], GREEN)}",
769 f"Skipped tests: {colorize(self[SKIP], YELLOW)}"
772 f"Not Executed tests: {colorize(self.not_executed, RED)}"
775 f"Failures: {colorize(self[FAIL], RED)}" if self[FAIL] else None,
776 f"Errors: {colorize(self[ERROR], RED)}" if self[ERROR] else None,
777 "Tests skipped due to lack of CPUS: "
778 f"{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}"
779 if self[SKIP_CPU_SHORTAGE]
784 if self.all_failed > 0:
785 print("FAILURES AND ERRORS IN TESTS:")
786 for result in self.results_per_suite:
787 failed_testcase_ids = result[FAIL]
788 errored_testcase_ids = result[ERROR]
789 old_testcase_name = None
790 if failed_testcase_ids:
791 for failed_test_id in failed_testcase_ids:
792 new_testcase_name, test_name = result.get_testcase_names(
795 if new_testcase_name != old_testcase_name:
797 " Testcase name: {}".format(
798 colorize(new_testcase_name, RED)
801 old_testcase_name = new_testcase_name
803 " FAILURE: {} [{}]".format(
804 colorize(test_name, RED), failed_test_id
807 if errored_testcase_ids:
808 for errored_test_id in errored_testcase_ids:
809 new_testcase_name, test_name = result.get_testcase_names(
812 if new_testcase_name != old_testcase_name:
814 " Testcase name: {}".format(
815 colorize(new_testcase_name, RED)
818 old_testcase_name = new_testcase_name
820 " ERROR: {} [{}]".format(
821 colorize(test_name, RED), errored_test_id
824 if self.testsuites_no_tests_run:
825 print("TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:")
827 for testsuite in self.testsuites_no_tests_run:
828 for testcase in testsuite:
829 tc_classes.add(get_testcase_doc_name(testcase))
830 for tc_class in tc_classes:
831 print(" {}".format(colorize(tc_class, RED)))
833 if self[SKIP_CPU_SHORTAGE]:
837 " SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
838 " ENOUGH CPUS AVAILABLE",
842 print(double_line_delim)
846 def not_executed(self):
847 return self.all_testcases - self[TEST_RUN]
850 def all_failed(self):
851 return self[FAIL] + self[ERROR]
854 def parse_results(results):
856 Prints the number of scheduled, executed, not executed, passed, failed,
857 errored and skipped tests and details about failed and errored tests.
859 Also returns all suites where any test failed.
865 results_per_suite = AllResults()
868 for result in results:
869 result_code = results_per_suite.add_result(result)
872 elif result_code == -1:
875 results_per_suite.print_results()
883 return return_code, results_per_suite.rerun
886 if __name__ == "__main__":
888 print(f"Config is: {config}")
891 print("Running sanity test case.")
893 rc = sanity_run_vpp.main()
896 except Exception as e:
897 print(traceback.format_exc())
898 print("Couldn't run sanity test case.")
901 test_finished_join_timeout = 15
903 debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
904 debug_core = config.debug == "core"
906 run_interactive = debug_gdb or config.step or config.force_foreground
908 max_concurrent_tests = 0
909 print(f"OS reports {num_cpus} available cpu(s).")
911 test_jobs = config.jobs
912 if test_jobs == "auto":
914 max_concurrent_tests = 1
915 print("Interactive mode required, running tests consecutively.")
917 max_concurrent_tests = num_cpus
919 f"Running at most {max_concurrent_tests} python test "
920 "processes concurrently."
923 max_concurrent_tests = test_jobs
925 f"Running at most {max_concurrent_tests} python test processes "
926 "concurrently as set by 'TEST_JOBS'."
929 print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
931 if run_interactive and max_concurrent_tests > 1:
932 raise NotImplementedError(
933 "Running tests interactively (DEBUG is gdb[server] or ATTACH or "
934 "STEP is set) in parallel (TEST_JOBS is more than 1) is not "
940 print("Running tests using custom test runner.")
941 filter_file, filter_class, filter_func = parse_test_filter(config.filter)
944 "Selected filters: file=%s, class=%s, function=%s"
945 % (filter_file, filter_class, filter_func)
948 filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
950 cb = SplitToSuitesCallback(filter_cb)
951 for d in config.test_src_dir:
952 print("Adding tests from directory tree %s" % d)
953 discover_tests(d, cb)
955 # suites are not hashable, need to use list
958 for testcase_suite in cb.suites.values():
959 tests_amount += testcase_suite.countTestCases()
960 if testcase_suite.cpus_used > max_vpp_cpus:
961 # here we replace test functions with lambdas to just skip them
962 # but we also replace setUp/tearDown functions to do nothing
963 # so that the test can be "started" and "stopped", so that we can
964 # still keep those prints (test description - SKIP), which are done
965 # in stopTest() (for that to trigger, test function must run)
966 for t in testcase_suite:
968 if m.startswith("test_"):
969 setattr(t, m, lambda: t.skipTest("not enough cpus"))
970 setattr(t.__class__, "setUpClass", lambda: None)
971 setattr(t.__class__, "tearDownClass", lambda: None)
972 setattr(t, "setUp", lambda: None)
973 setattr(t, "tearDown", lambda: None)
974 t.__class__.skipped_due_to_cpu_lack = True
975 suites.append(testcase_suite)
978 "%s out of %s tests match specified filters"
979 % (tests_amount, tests_amount + cb.filtered.countTestCases())
982 if not config.extended:
983 print("Not running extended tests (some tests will be skipped)")
985 attempts = config.retries + 1
987 print("Perform %s attempts to pass the suite..." % attempts)
989 if run_interactive and suites:
990 # don't fork if requiring interactive terminal
991 print("Running tests in foreground in the current process")
992 full_suite = unittest.TestSuite()
993 free_cpus = list(available_cpus)
996 if suite.cpus_used <= max_vpp_cpus:
997 suite.assign_cpus(free_cpus[: suite.cpus_used])
999 suite.assign_cpus([])
1001 full_suite.addTests(suites)
1002 result = VppTestRunner(
1003 verbosity=config.verbose, failfast=config.failfast, print_summary=True
1005 was_successful = result.wasSuccessful()
1006 if not was_successful:
1007 for test_case_info in result.failed_test_cases_info:
1008 handle_failed_suite(
1009 test_case_info.logger,
1010 test_case_info.tempdir,
1011 test_case_info.vpp_pid,
1014 if test_case_info in result.core_crash_test_cases_info:
1015 check_and_handle_core(
1016 test_case_info.vpp_bin_path,
1017 test_case_info.tempdir,
1018 test_case_info.core_crash_test,
1025 "SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
1026 " ENOUGH CPUS AVAILABLE",
1031 sys.exit(not was_successful)
1034 "Running each VPPTestCase in a separate background process"
1035 f" with at most {max_concurrent_tests} parallel python test "
1039 while suites and attempts > 0:
1040 results = run_forked(suites)
1041 exit_code, suites = parse_results(results)
1044 print("Test run was successful")
1046 print("%s attempt(s) left." % attempts)