13 from multiprocessing import Process, Pipe, get_context
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
16 from config import config, num_cpus, available_cpus, max_vpp_cpus
17 from framework import (
20 get_testcase_doc_name,
29 from debug import spawn_gdb
39 from discover_tests import discover_tests
41 from subprocess import check_output, CalledProcessError
42 from util import check_core_path, get_core_path, is_core_present
44 # timeout which controls how long the child has to finish after seeing
45 # a core dump in test temporary directory. If this is exceeded, parent assumes
46 # that child process is stuck (e.g. waiting for event from vpp) and kill
51 class StreamQueue(Queue):
56 sys.__stdout__.flush()
57 sys.__stderr__.flush()
60 return self._writer.fileno()
63 class StreamQueueManager(BaseManager):
67 StreamQueueManager.register("StreamQueue", StreamQueue)
70 class TestResult(dict):
71 def __init__(self, testcase_suite, testcases_by_id=None):
72 super(TestResult, self).__init__()
77 self[SKIP_CPU_SHORTAGE] = []
80 self.testcase_suite = testcase_suite
81 self.testcases = [testcase for testcase in testcase_suite]
82 self.testcases_by_id = testcases_by_id
84 def was_successful(self):
86 0 == len(self[FAIL]) == len(self[ERROR])
87 and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE])
88 == self.testcase_suite.countTestCases()
91 def no_tests_run(self):
92 return 0 == len(self[TEST_RUN])
94 def process_result(self, test_id, result):
95 self[result].append(test_id)
97 def suite_from_failed(self):
99 for testcase in self.testcase_suite:
100 tc_id = testcase.id()
101 if tc_id not in self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]:
104 return suite_from_failed(self.testcase_suite, rerun_ids)
106 def get_testcase_names(self, test_id):
107 # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
108 setup_teardown_match = re.match(
109 r"((tearDownClass)|(setUpClass)) \((.+\..+)\)", test_id
111 if setup_teardown_match:
112 test_name, _, _, testcase_name = setup_teardown_match.groups()
113 if len(testcase_name.split(".")) == 2:
114 for key in self.testcases_by_id.keys():
115 if key.startswith(testcase_name):
118 testcase_name = self._get_testcase_doc_name(testcase_name)
120 test_name = self._get_test_description(test_id)
121 testcase_name = self._get_testcase_doc_name(test_id)
123 return testcase_name, test_name
125 def _get_test_description(self, test_id):
126 if test_id in self.testcases_by_id:
127 desc = get_test_description(descriptions, self.testcases_by_id[test_id])
132 def _get_testcase_doc_name(self, test_id):
133 if test_id in self.testcases_by_id:
134 doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
140 def test_runner_wrapper(
141 suite, keep_alive_pipe, stdouterr_queue, finished_pipe, result_pipe, logger
143 sys.stdout = stdouterr_queue
144 sys.stderr = stdouterr_queue
145 VppTestCase.parallel_handler = logger.handlers[0]
146 result = VppTestRunner(
147 keep_alive_pipe=keep_alive_pipe,
148 descriptions=descriptions,
149 verbosity=config.verbose,
150 result_pipe=result_pipe,
151 failfast=config.failfast,
154 finished_pipe.send(result.wasSuccessful())
155 finished_pipe.close()
156 keep_alive_pipe.close()
159 class TestCaseWrapper(object):
160 def __init__(self, testcase_suite, manager):
161 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False)
162 self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
163 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
164 self.testcase_suite = testcase_suite
165 self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
166 self.logger = get_parallel_logger(self.stdouterr_queue)
167 self.child = Process(
168 target=test_runner_wrapper,
171 self.keep_alive_child_end,
172 self.stdouterr_queue,
173 self.finished_child_end,
174 self.result_child_end,
179 self.last_test_temp_dir = None
180 self.last_test_vpp_binary = None
181 self._last_test = None
182 self.last_test_id = None
184 self.last_heard = time.time()
185 self.core_detected_at = None
186 self.testcases_by_id = {}
187 self.testclasess_with_core = {}
188 for testcase in self.testcase_suite:
189 self.testcases_by_id[testcase.id()] = testcase
190 self.result = TestResult(testcase_suite, self.testcases_by_id)
194 return self._last_test
197 def last_test(self, test_id):
198 self.last_test_id = test_id
199 if test_id in self.testcases_by_id:
200 testcase = self.testcases_by_id[test_id]
201 self._last_test = testcase.shortDescription()
202 if not self._last_test:
203 self._last_test = str(testcase)
205 self._last_test = test_id
207 def add_testclass_with_core(self):
208 if self.last_test_id in self.testcases_by_id:
209 test = self.testcases_by_id[self.last_test_id]
210 class_name = unittest.util.strclass(test.__class__)
211 test_name = "'{}' ({})".format(
212 get_test_description(descriptions, test), self.last_test_id
215 test_name = self.last_test_id
216 class_name = re.match(
217 r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name
219 if class_name not in self.testclasess_with_core:
220 self.testclasess_with_core[class_name] = (
222 self.last_test_vpp_binary,
223 self.last_test_temp_dir,
226 def close_pipes(self):
227 self.keep_alive_child_end.close()
228 self.finished_child_end.close()
229 self.result_child_end.close()
230 self.keep_alive_parent_end.close()
231 self.finished_parent_end.close()
232 self.result_parent_end.close()
234 def was_successful(self):
235 return self.result.was_successful()
239 return self.testcase_suite.cpus_used
241 def get_assigned_cpus(self):
242 return self.testcase_suite.get_assigned_cpus()
245 def stdouterr_reader_wrapper(
246 unread_testcases, finished_unread_testcases, read_testcases
249 while read_testcases.is_set() or unread_testcases:
250 if finished_unread_testcases:
251 read_testcase = finished_unread_testcases.pop()
252 unread_testcases.remove(read_testcase)
253 elif unread_testcases:
254 read_testcase = unread_testcases.pop()
257 while data is not None:
258 sys.stdout.write(data)
259 data = read_testcase.stdouterr_queue.get()
261 read_testcase.stdouterr_queue.close()
262 finished_unread_testcases.discard(read_testcase)
266 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
267 if last_test_temp_dir:
268 # Need to create link in case of a timeout or core dump without failure
269 lttd = os.path.basename(last_test_temp_dir)
270 link_path = os.path.join(config.failed_dir, f"{lttd}-FAILED")
271 if not os.path.exists(link_path):
272 os.symlink(last_test_temp_dir, link_path)
274 "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)
277 # Report core existence
278 core_path = get_core_path(last_test_temp_dir)
279 if os.path.exists(core_path):
281 "Core-file exists in test temporary directory: %s!" % core_path
283 check_core_path(logger, core_path)
284 logger.debug("Running 'file %s':" % core_path)
286 info = check_output(["file", core_path])
288 except CalledProcessError as e:
290 "Subprocess returned with return code "
291 "while running `file' utility on core-file "
298 "Subprocess returned with OS error while "
299 "running 'file' utility "
305 except Exception as e:
306 logger.exception("Unexpected error running `file' utility on core-file")
307 logger.error(f"gdb {vpp_binary} {core_path}")
310 # Copy api post mortem
311 api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
312 if os.path.isfile(api_post_mortem_path):
314 "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)
316 shutil.copy2(api_post_mortem_path, last_test_temp_dir)
319 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
320 if is_core_present(tempdir):
323 "VPP core detected in %s. Last test running was %s"
324 % (tempdir, core_crash_test)
326 print(single_line_delim)
327 spawn_gdb(vpp_binary, get_core_path(tempdir))
328 print(single_line_delim)
329 elif config.compress_core:
330 print("Compressing core-file in test directory `%s'" % tempdir)
331 os.system("gzip %s" % get_core_path(tempdir))
334 def handle_cores(failed_testcases):
335 for failed_testcase in failed_testcases:
336 tcs_with_core = failed_testcase.testclasess_with_core
338 for test, vpp_binary, tempdir in tcs_with_core.values():
339 check_and_handle_core(vpp_binary, tempdir, test)
342 def process_finished_testsuite(
343 wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results
345 results.append(wrapped_testcase_suite.result)
346 finished_testcase_suites.add(wrapped_testcase_suite)
348 if config.failfast and not wrapped_testcase_suite.was_successful():
351 if not wrapped_testcase_suite.was_successful():
352 failed_wrapped_testcases.add(wrapped_testcase_suite)
354 wrapped_testcase_suite.logger,
355 wrapped_testcase_suite.last_test_temp_dir,
356 wrapped_testcase_suite.vpp_pid,
357 wrapped_testcase_suite.last_test_vpp_binary,
363 def run_forked(testcase_suites):
364 wrapped_testcase_suites = set()
365 solo_testcase_suites = []
367 # suites are unhashable, need to use list
369 unread_testcases = set()
370 finished_unread_testcases = set()
371 manager = StreamQueueManager()
374 free_cpus = list(available_cpus)
376 def on_suite_start(tc):
377 nonlocal tests_running
379 tests_running = tests_running + 1
381 def on_suite_finish(tc):
382 nonlocal tests_running
384 tests_running = tests_running - 1
385 assert tests_running >= 0
386 free_cpus.extend(tc.get_assigned_cpus())
388 def run_suite(suite):
390 nonlocal wrapped_testcase_suites
391 nonlocal unread_testcases
393 suite.assign_cpus(free_cpus[: suite.cpus_used])
394 free_cpus = free_cpus[suite.cpus_used :]
395 wrapper = TestCaseWrapper(suite, manager)
396 wrapped_testcase_suites.add(wrapper)
397 unread_testcases.add(wrapper)
398 on_suite_start(suite)
400 def can_run_suite(suite):
401 return tests_running < max_concurrent_tests and (
402 suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus
405 while free_cpus and testcase_suites:
406 a_suite = testcase_suites[0]
407 if a_suite.is_tagged_run_solo:
408 a_suite = testcase_suites.pop(0)
409 solo_testcase_suites.append(a_suite)
411 if can_run_suite(a_suite):
412 a_suite = testcase_suites.pop(0)
417 if tests_running == 0 and solo_testcase_suites:
418 a_suite = solo_testcase_suites.pop(0)
421 read_from_testcases = threading.Event()
422 read_from_testcases.set()
423 stdouterr_thread = threading.Thread(
424 target=stdouterr_reader_wrapper,
425 args=(unread_testcases, finished_unread_testcases, read_from_testcases),
427 stdouterr_thread.start()
429 failed_wrapped_testcases = set()
433 while wrapped_testcase_suites or testcase_suites:
434 finished_testcase_suites = set()
435 for wrapped_testcase_suite in wrapped_testcase_suites:
436 while wrapped_testcase_suite.result_parent_end.poll():
437 wrapped_testcase_suite.result.process_result(
438 *wrapped_testcase_suite.result_parent_end.recv()
440 wrapped_testcase_suite.last_heard = time.time()
442 while wrapped_testcase_suite.keep_alive_parent_end.poll():
444 wrapped_testcase_suite.last_test,
445 wrapped_testcase_suite.last_test_vpp_binary,
446 wrapped_testcase_suite.last_test_temp_dir,
447 wrapped_testcase_suite.vpp_pid,
448 ) = wrapped_testcase_suite.keep_alive_parent_end.recv()
449 wrapped_testcase_suite.last_heard = time.time()
451 if wrapped_testcase_suite.finished_parent_end.poll():
452 wrapped_testcase_suite.finished_parent_end.recv()
453 wrapped_testcase_suite.last_heard = time.time()
455 process_finished_testsuite(
456 wrapped_testcase_suite,
457 finished_testcase_suites,
458 failed_wrapped_testcases,
466 if wrapped_testcase_suite.last_heard + config.timeout < time.time():
468 wrapped_testcase_suite.logger.critical(
469 "Child test runner process timed out "
470 "(last test running was `%s' in `%s')!"
472 wrapped_testcase_suite.last_test,
473 wrapped_testcase_suite.last_test_temp_dir,
476 elif not wrapped_testcase_suite.child.is_alive():
478 wrapped_testcase_suite.logger.critical(
479 "Child test runner process unexpectedly died "
480 "(last test running was `%s' in `%s')!"
482 wrapped_testcase_suite.last_test,
483 wrapped_testcase_suite.last_test_temp_dir,
487 wrapped_testcase_suite.last_test_temp_dir
488 and wrapped_testcase_suite.last_test_vpp_binary
490 if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
491 wrapped_testcase_suite.add_testclass_with_core()
492 if wrapped_testcase_suite.core_detected_at is None:
493 wrapped_testcase_suite.core_detected_at = time.time()
495 wrapped_testcase_suite.core_detected_at + core_timeout
498 wrapped_testcase_suite.logger.critical(
499 "Child test runner process unresponsive and "
500 "core-file exists in test temporary directory "
501 "(last test running was `%s' in `%s')!"
503 wrapped_testcase_suite.last_test,
504 wrapped_testcase_suite.last_test_temp_dir,
510 wrapped_testcase_suite.child.terminate()
512 # terminating the child process tends to leave orphan
514 if wrapped_testcase_suite.vpp_pid:
515 os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
519 wrapped_testcase_suite.result.crashed = True
520 wrapped_testcase_suite.result.process_result(
521 wrapped_testcase_suite.last_test_id, ERROR
524 process_finished_testsuite(
525 wrapped_testcase_suite,
526 finished_testcase_suites,
527 failed_wrapped_testcases,
533 for finished_testcase in finished_testcase_suites:
534 # Somewhat surprisingly, the join below may
535 # timeout, even if client signaled that
536 # it finished - so we note it just in case.
537 join_start = time.time()
538 finished_testcase.child.join(test_finished_join_timeout)
539 join_end = time.time()
540 if join_end - join_start >= test_finished_join_timeout:
541 finished_testcase.logger.error(
542 "Timeout joining finished test: %s (pid %d)"
543 % (finished_testcase.last_test, finished_testcase.child.pid)
545 finished_testcase.close_pipes()
546 wrapped_testcase_suites.remove(finished_testcase)
547 finished_unread_testcases.add(finished_testcase)
548 finished_testcase.stdouterr_queue.put(None)
549 on_suite_finish(finished_testcase)
551 while testcase_suites:
552 results.append(TestResult(testcase_suites.pop(0)))
553 elif testcase_suites:
554 a_suite = testcase_suites[0]
555 while a_suite and a_suite.is_tagged_run_solo:
556 testcase_suites.pop(0)
557 solo_testcase_suites.append(a_suite)
559 a_suite = testcase_suites[0]
562 if a_suite and can_run_suite(a_suite):
563 testcase_suites.pop(0)
565 if solo_testcase_suites and tests_running == 0:
566 a_suite = solo_testcase_suites.pop(0)
570 for wrapped_testcase_suite in wrapped_testcase_suites:
571 wrapped_testcase_suite.child.terminate()
572 wrapped_testcase_suite.stdouterr_queue.put(None)
575 read_from_testcases.clear()
576 stdouterr_thread.join(config.timeout)
579 handle_cores(failed_wrapped_testcases)
583 class TestSuiteWrapper(unittest.TestSuite):
587 return super().__init__()
589 def addTest(self, test):
590 self.cpus_used = max(self.cpus_used, test.get_cpus_required())
591 super().addTest(test)
593 def assign_cpus(self, cpus):
596 def _handleClassSetUp(self, test, result):
597 if not test.__class__.skipped_due_to_cpu_lack:
598 test.assign_cpus(self.cpus)
599 super()._handleClassSetUp(test, result)
601 def get_assigned_cpus(self):
605 class SplitToSuitesCallback:
606 def __init__(self, filter_callback):
608 self.suite_name = "default"
609 self.filter_callback = filter_callback
610 self.filtered = TestSuiteWrapper()
612 def __call__(self, file_name, cls, method):
613 test_method = cls(method)
614 if self.filter_callback(file_name, cls.__name__, method):
615 self.suite_name = file_name + cls.__name__
616 if self.suite_name not in self.suites:
617 self.suites[self.suite_name] = TestSuiteWrapper()
618 self.suites[self.suite_name].is_tagged_run_solo = False
619 self.suites[self.suite_name].addTest(test_method)
620 if test_method.is_tagged_run_solo():
621 self.suites[self.suite_name].is_tagged_run_solo = True
624 self.filtered.addTest(test_method)
627 def parse_test_filter(test_filter):
629 filter_file_name = None
630 filter_class_name = None
631 filter_func_name = None
636 raise Exception(f"Invalid test filter: {test_filter}")
638 if parts[2] not in ("*", ""):
639 filter_func_name = parts[2]
640 if parts[1] not in ("*", ""):
641 filter_class_name = parts[1]
642 if parts[0] not in ("*", ""):
643 if parts[0].startswith("test_"):
644 filter_file_name = parts[0]
646 filter_file_name = "test_%s" % parts[0]
648 if f.startswith("test_"):
651 filter_file_name = "test_%s" % f
653 filter_file_name = "%s.py" % filter_file_name
654 return filter_file_name, filter_class_name, filter_func_name
657 def filter_tests(tests, filter_cb):
658 result = TestSuiteWrapper()
660 if isinstance(t, unittest.suite.TestSuite):
661 # this is a bunch of tests, recursively filter...
662 x = filter_tests(t, filter_cb)
663 if x.countTestCases() > 0:
665 elif isinstance(t, unittest.TestCase):
666 # this is a single test
667 parts = t.id().split(".")
668 # t.id() for common cases like this:
669 # test_classifier.TestClassifier.test_acl_ip
670 # apply filtering only if it is so
672 if not filter_cb(parts[0], parts[1], parts[2]):
676 # unexpected object, don't touch it
681 class FilterByTestOption:
682 def __init__(self, filters):
683 self.filters = filters
685 def __call__(self, file_name, class_name, func_name):
695 fn_match = fnmatch.fnmatch(file_name, filter_file_name)
698 if filter_class_name and class_name != filter_class_name:
700 if filter_func_name and func_name != filter_func_name:
704 for filter_file_name, filter_class_name, filter_func_name in self.filters:
718 class FilterByClassList:
719 def __init__(self, classes_with_filenames):
720 self.classes_with_filenames = classes_with_filenames
722 def __call__(self, file_name, class_name, func_name):
723 return ".".join([file_name, class_name]) in self.classes_with_filenames
726 def suite_from_failed(suite, failed):
727 failed = {x.rsplit(".", 1)[0] for x in failed}
728 filter_cb = FilterByClassList(failed)
729 suite = filter_tests(suite, filter_cb)
733 class AllResults(dict):
735 super(AllResults, self).__init__()
736 self.all_testcases = 0
737 self.results_per_suite = []
742 self[SKIP_CPU_SHORTAGE] = 0
745 self.testsuites_no_tests_run = []
747 def add_results(self, result):
748 self.results_per_suite.append(result)
749 result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN, SKIP_CPU_SHORTAGE]
750 for result_type in result_types:
751 self[result_type] += len(result[result_type])
753 def add_result(self, result):
755 self.all_testcases += result.testcase_suite.countTestCases()
756 self.add_results(result)
758 if result.no_tests_run():
759 self.testsuites_no_tests_run.append(result.testcase_suite)
764 elif not result.was_successful():
768 self.rerun.append(result.testcase_suite)
772 def print_results(self):
774 print(double_line_delim)
775 print("TEST RESULTS:")
777 def indent_results(lines):
778 lines = list(filter(None, lines))
779 maximum = max(lines, key=lambda x: x.index(":"))
780 maximum = 4 + maximum.index(":")
782 padding = " " * (maximum - l.index(":"))
783 print(f"{padding}{l}")
787 f"Scheduled tests: {self.all_testcases}",
788 f"Executed tests: {self[TEST_RUN]}",
789 f"Passed tests: {colorize(self[PASS], GREEN)}",
790 f"Skipped tests: {colorize(self[SKIP], YELLOW)}"
793 f"Not Executed tests: {colorize(self.not_executed, RED)}"
796 f"Failures: {colorize(self[FAIL], RED)}" if self[FAIL] else None,
797 f"Errors: {colorize(self[ERROR], RED)}" if self[ERROR] else None,
798 "Tests skipped due to lack of CPUS: "
799 f"{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}"
800 if self[SKIP_CPU_SHORTAGE]
805 if self.all_failed > 0:
806 print("FAILURES AND ERRORS IN TESTS:")
807 for result in self.results_per_suite:
808 failed_testcase_ids = result[FAIL]
809 errored_testcase_ids = result[ERROR]
810 old_testcase_name = None
811 if failed_testcase_ids:
812 for failed_test_id in failed_testcase_ids:
813 new_testcase_name, test_name = result.get_testcase_names(
816 if new_testcase_name != old_testcase_name:
818 " Testcase name: {}".format(
819 colorize(new_testcase_name, RED)
822 old_testcase_name = new_testcase_name
824 " FAILURE: {} [{}]".format(
825 colorize(test_name, RED), failed_test_id
828 if errored_testcase_ids:
829 for errored_test_id in errored_testcase_ids:
830 new_testcase_name, test_name = result.get_testcase_names(
833 if new_testcase_name != old_testcase_name:
835 " Testcase name: {}".format(
836 colorize(new_testcase_name, RED)
839 old_testcase_name = new_testcase_name
841 " ERROR: {} [{}]".format(
842 colorize(test_name, RED), errored_test_id
845 if self.testsuites_no_tests_run:
846 print("TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:")
848 for testsuite in self.testsuites_no_tests_run:
849 for testcase in testsuite:
850 tc_classes.add(get_testcase_doc_name(testcase))
851 for tc_class in tc_classes:
852 print(" {}".format(colorize(tc_class, RED)))
854 if self[SKIP_CPU_SHORTAGE]:
858 " SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
859 " ENOUGH CPUS AVAILABLE",
863 print(double_line_delim)
867 def not_executed(self):
868 return self.all_testcases - self[TEST_RUN]
871 def all_failed(self):
872 return self[FAIL] + self[ERROR]
875 def parse_results(results):
877 Prints the number of scheduled, executed, not executed, passed, failed,
878 errored and skipped tests and details about failed and errored tests.
880 Also returns all suites where any test failed.
886 results_per_suite = AllResults()
889 for result in results:
890 result_code = results_per_suite.add_result(result)
893 elif result_code == -1:
896 results_per_suite.print_results()
904 return return_code, results_per_suite.rerun
907 if __name__ == "__main__":
909 print(f"Config is: {config}")
912 print("Running sanity test case.")
914 rc = sanity_run_vpp.main()
917 except Exception as e:
918 print(traceback.format_exc())
919 print("Couldn't run sanity test case.")
922 test_finished_join_timeout = 15
924 debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
925 debug_core = config.debug == "core"
927 run_interactive = debug_gdb or config.step or config.force_foreground
929 max_concurrent_tests = 0
930 print(f"OS reports {num_cpus} available cpu(s).")
932 test_jobs = config.jobs
933 if test_jobs == "auto":
935 max_concurrent_tests = 1
936 print("Interactive mode required, running tests consecutively.")
938 max_concurrent_tests = num_cpus
940 f"Running at most {max_concurrent_tests} python test "
941 "processes concurrently."
944 max_concurrent_tests = test_jobs
946 f"Running at most {max_concurrent_tests} python test processes "
947 "concurrently as set by 'TEST_JOBS'."
950 print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
952 if run_interactive and max_concurrent_tests > 1:
953 raise NotImplementedError(
954 "Running tests interactively (DEBUG is gdb[server] or ATTACH or "
955 "STEP is set) in parallel (TEST_JOBS is more than 1) is not "
961 print("Running tests using custom test runner.")
962 filters = [(parse_test_filter(f)) for f in config.filter.split(",")]
965 "Selected filters: ",
967 f"file={filter_file}, class={filter_class}, function={filter_func}"
968 for filter_file, filter_class, filter_func in filters
972 filter_cb = FilterByTestOption(filters)
974 cb = SplitToSuitesCallback(filter_cb)
975 for d in config.test_src_dir:
976 print("Adding tests from directory tree %s" % d)
977 discover_tests(d, cb)
979 # suites are not hashable, need to use list
982 for testcase_suite in cb.suites.values():
983 tests_amount += testcase_suite.countTestCases()
984 if testcase_suite.cpus_used > max_vpp_cpus:
985 # here we replace test functions with lambdas to just skip them
986 # but we also replace setUp/tearDown functions to do nothing
987 # so that the test can be "started" and "stopped", so that we can
988 # still keep those prints (test description - SKIP), which are done
989 # in stopTest() (for that to trigger, test function must run)
990 for t in testcase_suite:
992 if m.startswith("test_"):
993 setattr(t, m, lambda: t.skipTest("not enough cpus"))
994 setattr(t.__class__, "setUpClass", lambda: None)
995 setattr(t.__class__, "tearDownClass", lambda: None)
996 setattr(t, "setUp", lambda: None)
997 setattr(t, "tearDown", lambda: None)
998 t.__class__.skipped_due_to_cpu_lack = True
999 suites.append(testcase_suite)
1002 "%s out of %s tests match specified filters"
1003 % (tests_amount, tests_amount + cb.filtered.countTestCases())
1006 if not config.extended:
1007 print("Not running extended tests (some tests will be skipped)")
1009 attempts = config.retries + 1
1011 print("Perform %s attempts to pass the suite..." % attempts)
1013 if run_interactive and suites:
1014 # don't fork if requiring interactive terminal
1015 print("Running tests in foreground in the current process")
1016 full_suite = unittest.TestSuite()
1017 free_cpus = list(available_cpus)
1018 cpu_shortage = False
1019 for suite in suites:
1020 if suite.cpus_used <= max_vpp_cpus:
1021 suite.assign_cpus(free_cpus[: suite.cpus_used])
1023 suite.assign_cpus([])
1025 full_suite.addTests(suites)
1026 result = VppTestRunner(
1027 verbosity=config.verbose, failfast=config.failfast, print_summary=True
1029 was_successful = result.wasSuccessful()
1030 if not was_successful:
1031 for test_case_info in result.failed_test_cases_info:
1032 handle_failed_suite(
1033 test_case_info.logger,
1034 test_case_info.tempdir,
1035 test_case_info.vpp_pid,
1038 if test_case_info in result.core_crash_test_cases_info:
1039 check_and_handle_core(
1040 test_case_info.vpp_bin_path,
1041 test_case_info.tempdir,
1042 test_case_info.core_crash_test,
1049 "SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
1050 " ENOUGH CPUS AVAILABLE",
1055 sys.exit(not was_successful)
1058 "Running each VPPTestCase in a separate background process"
1059 f" with at most {max_concurrent_tests} parallel python test "
1063 while suites and attempts > 0:
1064 results = run_forked(suites)
1065 exit_code, suites = parse_results(results)
1068 print("Test run was successful")
1070 print("%s attempt(s) left." % attempts)