13 from multiprocessing import Process, Pipe, get_context
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
16 from config import config, num_cpus, available_cpus, max_vpp_cpus
17 from framework import (
20 get_testcase_doc_name,
29 from debug import spawn_gdb
39 from discover_tests import discover_tests
41 from subprocess import check_output, CalledProcessError
42 from util import check_core_path, get_core_path, is_core_present
44 # timeout which controls how long the child has to finish after seeing
45 # a core dump in test temporary directory. If this is exceeded, parent assumes
46 # that child process is stuck (e.g. waiting for event from vpp) and kill
51 class StreamQueue(Queue):
56 sys.__stdout__.flush()
57 sys.__stderr__.flush()
60 return self._writer.fileno()
63 class StreamQueueManager(BaseManager):
67 StreamQueueManager.register("StreamQueue", StreamQueue)
70 class TestResult(dict):
71 def __init__(self, testcase_suite, testcases_by_id=None):
72 super(TestResult, self).__init__()
77 self[SKIP_CPU_SHORTAGE] = []
80 self.testcase_suite = testcase_suite
81 self.testcases = [testcase for testcase in testcase_suite]
82 self.testcases_by_id = testcases_by_id
84 def was_successful(self):
86 0 == len(self[FAIL]) == len(self[ERROR])
87 and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE])
88 == self.testcase_suite.countTestCases()
91 def no_tests_run(self):
92 return 0 == len(self[TEST_RUN])
94 def process_result(self, test_id, result):
95 self[result].append(test_id)
97 def suite_from_failed(self):
99 for testcase in self.testcase_suite:
100 tc_id = testcase.id()
101 if tc_id not in self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]:
104 return suite_from_failed(self.testcase_suite, rerun_ids)
106 def get_testcase_names(self, test_id):
107 # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
108 setup_teardown_match = re.match(
109 r"((tearDownClass)|(setUpClass)) \((.+\..+)\)", test_id
111 if setup_teardown_match:
112 test_name, _, _, testcase_name = setup_teardown_match.groups()
113 if len(testcase_name.split(".")) == 2:
114 for key in self.testcases_by_id.keys():
115 if key.startswith(testcase_name):
118 testcase_name = self._get_testcase_doc_name(testcase_name)
120 test_name = self._get_test_description(test_id)
121 testcase_name = self._get_testcase_doc_name(test_id)
123 return testcase_name, test_name
125 def _get_test_description(self, test_id):
126 if test_id in self.testcases_by_id:
127 desc = get_test_description(descriptions, self.testcases_by_id[test_id])
132 def _get_testcase_doc_name(self, test_id):
133 if test_id in self.testcases_by_id:
134 doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
140 def test_runner_wrapper(
141 suite, keep_alive_pipe, stdouterr_queue, finished_pipe, result_pipe, logger
143 sys.stdout = stdouterr_queue
144 sys.stderr = stdouterr_queue
145 VppTestCase.parallel_handler = logger.handlers[0]
146 result = VppTestRunner(
147 keep_alive_pipe=keep_alive_pipe,
148 descriptions=descriptions,
149 verbosity=config.verbose,
150 result_pipe=result_pipe,
151 failfast=config.failfast,
154 finished_pipe.send(result.wasSuccessful())
155 finished_pipe.close()
156 keep_alive_pipe.close()
159 class TestCaseWrapper(object):
160 def __init__(self, testcase_suite, manager):
161 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False)
162 self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
163 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
164 self.testcase_suite = testcase_suite
165 self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
166 self.logger = get_parallel_logger(self.stdouterr_queue)
167 self.child = Process(
168 target=test_runner_wrapper,
171 self.keep_alive_child_end,
172 self.stdouterr_queue,
173 self.finished_child_end,
174 self.result_child_end,
179 self.last_test_temp_dir = None
180 self.last_test_vpp_binary = None
181 self._last_test = None
182 self.last_test_id = None
184 self.last_heard = time.time()
185 self.core_detected_at = None
186 self.testcases_by_id = {}
187 self.testclasess_with_core = {}
188 for testcase in self.testcase_suite:
189 self.testcases_by_id[testcase.id()] = testcase
190 self.result = TestResult(testcase_suite, self.testcases_by_id)
194 return self._last_test
197 def last_test(self, test_id):
198 self.last_test_id = test_id
199 if test_id in self.testcases_by_id:
200 testcase = self.testcases_by_id[test_id]
201 self._last_test = testcase.shortDescription()
202 if not self._last_test:
203 self._last_test = str(testcase)
205 self._last_test = test_id
207 def add_testclass_with_core(self):
208 if self.last_test_id in self.testcases_by_id:
209 test = self.testcases_by_id[self.last_test_id]
210 class_name = unittest.util.strclass(test.__class__)
211 test_name = "'{}' ({})".format(
212 get_test_description(descriptions, test), self.last_test_id
215 test_name = self.last_test_id
216 class_name = re.match(
217 r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name
219 if class_name not in self.testclasess_with_core:
220 self.testclasess_with_core[class_name] = (
222 self.last_test_vpp_binary,
223 self.last_test_temp_dir,
226 def close_pipes(self):
227 self.keep_alive_child_end.close()
228 self.finished_child_end.close()
229 self.result_child_end.close()
230 self.keep_alive_parent_end.close()
231 self.finished_parent_end.close()
232 self.result_parent_end.close()
234 def was_successful(self):
235 return self.result.was_successful()
239 return self.testcase_suite.cpus_used
241 def get_assigned_cpus(self):
242 return self.testcase_suite.get_assigned_cpus()
245 def stdouterr_reader_wrapper(
246 unread_testcases, finished_unread_testcases, read_testcases
249 while read_testcases.is_set() or unread_testcases:
250 if finished_unread_testcases:
251 read_testcase = finished_unread_testcases.pop()
252 unread_testcases.remove(read_testcase)
253 elif unread_testcases:
254 read_testcase = unread_testcases.pop()
257 while data is not None:
258 sys.stdout.write(data)
259 data = read_testcase.stdouterr_queue.get()
261 read_testcase.stdouterr_queue.close()
262 finished_unread_testcases.discard(read_testcase)
266 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
267 if last_test_temp_dir:
268 # Need to create link in case of a timeout or core dump without failure
269 lttd = os.path.basename(last_test_temp_dir)
270 link_path = os.path.join(config.failed_dir, f"{lttd}-FAILED")
271 if not os.path.exists(link_path):
272 os.symlink(last_test_temp_dir, link_path)
274 "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)
277 # Report core existence
278 core_path = get_core_path(last_test_temp_dir)
279 if os.path.exists(core_path):
281 "Core-file exists in test temporary directory: %s!" % core_path
283 check_core_path(logger, core_path)
284 logger.debug("Running 'file %s':" % core_path)
286 info = check_output(["file", core_path])
288 except CalledProcessError as e:
290 "Subprocess returned with return code "
291 "while running `file' utility on core-file "
298 "Subprocess returned with OS error while "
299 "running 'file' utility "
305 except Exception as e:
306 logger.exception("Unexpected error running `file' utility on core-file")
307 logger.error(f"gdb {vpp_binary} {core_path}")
310 # Copy api post mortem
311 api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
312 if os.path.isfile(api_post_mortem_path):
314 "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)
316 shutil.copy2(api_post_mortem_path, last_test_temp_dir)
319 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
320 if is_core_present(tempdir):
323 "VPP core detected in %s. Last test running was %s"
324 % (tempdir, core_crash_test)
326 print(single_line_delim)
327 spawn_gdb(vpp_binary, get_core_path(tempdir))
328 print(single_line_delim)
329 elif config.compress_core:
330 print("Compressing core-file in test directory `%s'" % tempdir)
331 os.system("gzip %s" % get_core_path(tempdir))
334 def handle_cores(failed_testcases):
335 for failed_testcase in failed_testcases:
336 tcs_with_core = failed_testcase.testclasess_with_core
338 for test, vpp_binary, tempdir in tcs_with_core.values():
339 check_and_handle_core(vpp_binary, tempdir, test)
342 def process_finished_testsuite(
343 wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results
345 results.append(wrapped_testcase_suite.result)
346 finished_testcase_suites.add(wrapped_testcase_suite)
348 if config.failfast and not wrapped_testcase_suite.was_successful():
351 if not wrapped_testcase_suite.was_successful():
352 failed_wrapped_testcases.add(wrapped_testcase_suite)
354 wrapped_testcase_suite.logger,
355 wrapped_testcase_suite.last_test_temp_dir,
356 wrapped_testcase_suite.vpp_pid,
357 wrapped_testcase_suite.last_test_vpp_binary,
363 def run_forked(testcase_suites):
364 wrapped_testcase_suites = set()
365 solo_testcase_suites = []
367 # suites are unhashable, need to use list
369 unread_testcases = set()
370 finished_unread_testcases = set()
371 manager = StreamQueueManager()
374 free_cpus = list(available_cpus)
376 def on_suite_start(tc):
377 nonlocal tests_running
379 tests_running = tests_running + 1
381 def on_suite_finish(tc):
382 nonlocal tests_running
384 tests_running = tests_running - 1
385 assert tests_running >= 0
386 free_cpus.extend(tc.get_assigned_cpus())
388 def run_suite(suite):
390 nonlocal wrapped_testcase_suites
391 nonlocal unread_testcases
393 suite.assign_cpus(free_cpus[: suite.cpus_used])
394 free_cpus = free_cpus[suite.cpus_used :]
395 wrapper = TestCaseWrapper(suite, manager)
396 wrapped_testcase_suites.add(wrapper)
397 unread_testcases.add(wrapper)
398 on_suite_start(suite)
400 def can_run_suite(suite):
401 return tests_running < max_concurrent_tests and (
402 suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus
405 while free_cpus and testcase_suites:
406 a_suite = testcase_suites[0]
407 if a_suite.is_tagged_run_solo:
408 a_suite = testcase_suites.pop(0)
409 solo_testcase_suites.append(a_suite)
411 if can_run_suite(a_suite):
412 a_suite = testcase_suites.pop(0)
417 if tests_running == 0 and solo_testcase_suites:
418 a_suite = solo_testcase_suites.pop(0)
421 read_from_testcases = threading.Event()
422 read_from_testcases.set()
423 stdouterr_thread = threading.Thread(
424 target=stdouterr_reader_wrapper,
425 args=(unread_testcases, finished_unread_testcases, read_from_testcases),
427 stdouterr_thread.start()
429 failed_wrapped_testcases = set()
433 while wrapped_testcase_suites:
434 finished_testcase_suites = set()
435 for wrapped_testcase_suite in wrapped_testcase_suites:
436 while wrapped_testcase_suite.result_parent_end.poll():
437 wrapped_testcase_suite.result.process_result(
438 *wrapped_testcase_suite.result_parent_end.recv()
440 wrapped_testcase_suite.last_heard = time.time()
442 while wrapped_testcase_suite.keep_alive_parent_end.poll():
444 wrapped_testcase_suite.last_test,
445 wrapped_testcase_suite.last_test_vpp_binary,
446 wrapped_testcase_suite.last_test_temp_dir,
447 wrapped_testcase_suite.vpp_pid,
448 ) = wrapped_testcase_suite.keep_alive_parent_end.recv()
449 wrapped_testcase_suite.last_heard = time.time()
451 if wrapped_testcase_suite.finished_parent_end.poll():
452 wrapped_testcase_suite.finished_parent_end.recv()
453 wrapped_testcase_suite.last_heard = time.time()
455 process_finished_testsuite(
456 wrapped_testcase_suite,
457 finished_testcase_suites,
458 failed_wrapped_testcases,
466 if wrapped_testcase_suite.last_heard + config.timeout < time.time():
468 wrapped_testcase_suite.logger.critical(
469 "Child test runner process timed out "
470 "(last test running was `%s' in `%s')!"
472 wrapped_testcase_suite.last_test,
473 wrapped_testcase_suite.last_test_temp_dir,
476 elif not wrapped_testcase_suite.child.is_alive():
478 wrapped_testcase_suite.logger.critical(
479 "Child test runner process unexpectedly died "
480 "(last test running was `%s' in `%s')!"
482 wrapped_testcase_suite.last_test,
483 wrapped_testcase_suite.last_test_temp_dir,
487 wrapped_testcase_suite.last_test_temp_dir
488 and wrapped_testcase_suite.last_test_vpp_binary
490 if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
491 wrapped_testcase_suite.add_testclass_with_core()
492 if wrapped_testcase_suite.core_detected_at is None:
493 wrapped_testcase_suite.core_detected_at = time.time()
495 wrapped_testcase_suite.core_detected_at + core_timeout
498 wrapped_testcase_suite.logger.critical(
499 "Child test runner process unresponsive and "
500 "core-file exists in test temporary directory "
501 "(last test running was `%s' in `%s')!"
503 wrapped_testcase_suite.last_test,
504 wrapped_testcase_suite.last_test_temp_dir,
510 wrapped_testcase_suite.child.terminate()
512 # terminating the child process tends to leave orphan
514 if wrapped_testcase_suite.vpp_pid:
515 os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
519 wrapped_testcase_suite.result.crashed = True
520 wrapped_testcase_suite.result.process_result(
521 wrapped_testcase_suite.last_test_id, ERROR
524 process_finished_testsuite(
525 wrapped_testcase_suite,
526 finished_testcase_suites,
527 failed_wrapped_testcases,
533 for finished_testcase in finished_testcase_suites:
534 # Somewhat surprisingly, the join below may
535 # timeout, even if client signaled that
536 # it finished - so we note it just in case.
537 join_start = time.time()
538 finished_testcase.child.join(test_finished_join_timeout)
539 join_end = time.time()
540 if join_end - join_start >= test_finished_join_timeout:
541 finished_testcase.logger.error(
542 "Timeout joining finished test: %s (pid %d)"
543 % (finished_testcase.last_test, finished_testcase.child.pid)
545 finished_testcase.close_pipes()
546 wrapped_testcase_suites.remove(finished_testcase)
547 finished_unread_testcases.add(finished_testcase)
548 finished_testcase.stdouterr_queue.put(None)
549 on_suite_finish(finished_testcase)
551 while testcase_suites:
552 results.append(TestResult(testcase_suites.pop(0)))
553 elif testcase_suites:
554 a_suite = testcase_suites.pop(0)
555 while a_suite and a_suite.is_tagged_run_solo:
556 solo_testcase_suites.append(a_suite)
558 a_suite = testcase_suites.pop(0)
561 if a_suite and can_run_suite(a_suite):
563 if solo_testcase_suites and tests_running == 0:
564 a_suite = solo_testcase_suites.pop(0)
568 for wrapped_testcase_suite in wrapped_testcase_suites:
569 wrapped_testcase_suite.child.terminate()
570 wrapped_testcase_suite.stdouterr_queue.put(None)
573 read_from_testcases.clear()
574 stdouterr_thread.join(config.timeout)
577 handle_cores(failed_wrapped_testcases)
581 class TestSuiteWrapper(unittest.TestSuite):
585 return super().__init__()
587 def addTest(self, test):
588 self.cpus_used = max(self.cpus_used, test.get_cpus_required())
589 super().addTest(test)
591 def assign_cpus(self, cpus):
594 def _handleClassSetUp(self, test, result):
595 if not test.__class__.skipped_due_to_cpu_lack:
596 test.assign_cpus(self.cpus)
597 super()._handleClassSetUp(test, result)
599 def get_assigned_cpus(self):
603 class SplitToSuitesCallback:
604 def __init__(self, filter_callback):
606 self.suite_name = "default"
607 self.filter_callback = filter_callback
608 self.filtered = TestSuiteWrapper()
610 def __call__(self, file_name, cls, method):
611 test_method = cls(method)
612 if self.filter_callback(file_name, cls.__name__, method):
613 self.suite_name = file_name + cls.__name__
614 if self.suite_name not in self.suites:
615 self.suites[self.suite_name] = TestSuiteWrapper()
616 self.suites[self.suite_name].is_tagged_run_solo = False
617 self.suites[self.suite_name].addTest(test_method)
618 if test_method.is_tagged_run_solo():
619 self.suites[self.suite_name].is_tagged_run_solo = True
622 self.filtered.addTest(test_method)
625 def parse_test_filter(test_filter):
627 filter_file_name = None
628 filter_class_name = None
629 filter_func_name = None
634 raise Exception(f"Invalid test filter: {test_filter}")
636 if parts[2] not in ("*", ""):
637 filter_func_name = parts[2]
638 if parts[1] not in ("*", ""):
639 filter_class_name = parts[1]
640 if parts[0] not in ("*", ""):
641 if parts[0].startswith("test_"):
642 filter_file_name = parts[0]
644 filter_file_name = "test_%s" % parts[0]
646 if f.startswith("test_"):
649 filter_file_name = "test_%s" % f
651 filter_file_name = "%s.py" % filter_file_name
652 return filter_file_name, filter_class_name, filter_func_name
655 def filter_tests(tests, filter_cb):
656 result = TestSuiteWrapper()
658 if isinstance(t, unittest.suite.TestSuite):
659 # this is a bunch of tests, recursively filter...
660 x = filter_tests(t, filter_cb)
661 if x.countTestCases() > 0:
663 elif isinstance(t, unittest.TestCase):
664 # this is a single test
665 parts = t.id().split(".")
666 # t.id() for common cases like this:
667 # test_classifier.TestClassifier.test_acl_ip
668 # apply filtering only if it is so
670 if not filter_cb(parts[0], parts[1], parts[2]):
674 # unexpected object, don't touch it
679 class FilterByTestOption:
680 def __init__(self, filters):
681 self.filters = filters
683 def __call__(self, file_name, class_name, func_name):
693 fn_match = fnmatch.fnmatch(file_name, filter_file_name)
696 if filter_class_name and class_name != filter_class_name:
698 if filter_func_name and func_name != filter_func_name:
702 for filter_file_name, filter_class_name, filter_func_name in self.filters:
716 class FilterByClassList:
717 def __init__(self, classes_with_filenames):
718 self.classes_with_filenames = classes_with_filenames
720 def __call__(self, file_name, class_name, func_name):
721 return ".".join([file_name, class_name]) in self.classes_with_filenames
724 def suite_from_failed(suite, failed):
725 failed = {x.rsplit(".", 1)[0] for x in failed}
726 filter_cb = FilterByClassList(failed)
727 suite = filter_tests(suite, filter_cb)
731 class AllResults(dict):
733 super(AllResults, self).__init__()
734 self.all_testcases = 0
735 self.results_per_suite = []
740 self[SKIP_CPU_SHORTAGE] = 0
743 self.testsuites_no_tests_run = []
745 def add_results(self, result):
746 self.results_per_suite.append(result)
747 result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN, SKIP_CPU_SHORTAGE]
748 for result_type in result_types:
749 self[result_type] += len(result[result_type])
751 def add_result(self, result):
753 self.all_testcases += result.testcase_suite.countTestCases()
754 self.add_results(result)
756 if result.no_tests_run():
757 self.testsuites_no_tests_run.append(result.testcase_suite)
762 elif not result.was_successful():
766 self.rerun.append(result.testcase_suite)
770 def print_results(self):
772 print(double_line_delim)
773 print("TEST RESULTS:")
775 def indent_results(lines):
776 lines = list(filter(None, lines))
777 maximum = max(lines, key=lambda x: x.index(":"))
778 maximum = 4 + maximum.index(":")
780 padding = " " * (maximum - l.index(":"))
781 print(f"{padding}{l}")
785 f"Scheduled tests: {self.all_testcases}",
786 f"Executed tests: {self[TEST_RUN]}",
787 f"Passed tests: {colorize(self[PASS], GREEN)}",
788 f"Skipped tests: {colorize(self[SKIP], YELLOW)}"
791 f"Not Executed tests: {colorize(self.not_executed, RED)}"
794 f"Failures: {colorize(self[FAIL], RED)}" if self[FAIL] else None,
795 f"Errors: {colorize(self[ERROR], RED)}" if self[ERROR] else None,
796 "Tests skipped due to lack of CPUS: "
797 f"{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}"
798 if self[SKIP_CPU_SHORTAGE]
803 if self.all_failed > 0:
804 print("FAILURES AND ERRORS IN TESTS:")
805 for result in self.results_per_suite:
806 failed_testcase_ids = result[FAIL]
807 errored_testcase_ids = result[ERROR]
808 old_testcase_name = None
809 if failed_testcase_ids:
810 for failed_test_id in failed_testcase_ids:
811 new_testcase_name, test_name = result.get_testcase_names(
814 if new_testcase_name != old_testcase_name:
816 " Testcase name: {}".format(
817 colorize(new_testcase_name, RED)
820 old_testcase_name = new_testcase_name
822 " FAILURE: {} [{}]".format(
823 colorize(test_name, RED), failed_test_id
826 if errored_testcase_ids:
827 for errored_test_id in errored_testcase_ids:
828 new_testcase_name, test_name = result.get_testcase_names(
831 if new_testcase_name != old_testcase_name:
833 " Testcase name: {}".format(
834 colorize(new_testcase_name, RED)
837 old_testcase_name = new_testcase_name
839 " ERROR: {} [{}]".format(
840 colorize(test_name, RED), errored_test_id
843 if self.testsuites_no_tests_run:
844 print("TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:")
846 for testsuite in self.testsuites_no_tests_run:
847 for testcase in testsuite:
848 tc_classes.add(get_testcase_doc_name(testcase))
849 for tc_class in tc_classes:
850 print(" {}".format(colorize(tc_class, RED)))
852 if self[SKIP_CPU_SHORTAGE]:
856 " SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
857 " ENOUGH CPUS AVAILABLE",
861 print(double_line_delim)
865 def not_executed(self):
866 return self.all_testcases - self[TEST_RUN]
869 def all_failed(self):
870 return self[FAIL] + self[ERROR]
873 def parse_results(results):
875 Prints the number of scheduled, executed, not executed, passed, failed,
876 errored and skipped tests and details about failed and errored tests.
878 Also returns all suites where any test failed.
884 results_per_suite = AllResults()
887 for result in results:
888 result_code = results_per_suite.add_result(result)
891 elif result_code == -1:
894 results_per_suite.print_results()
902 return return_code, results_per_suite.rerun
905 if __name__ == "__main__":
907 print(f"Config is: {config}")
910 print("Running sanity test case.")
912 rc = sanity_run_vpp.main()
915 except Exception as e:
916 print(traceback.format_exc())
917 print("Couldn't run sanity test case.")
920 test_finished_join_timeout = 15
922 debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
923 debug_core = config.debug == "core"
925 run_interactive = debug_gdb or config.step or config.force_foreground
927 max_concurrent_tests = 0
928 print(f"OS reports {num_cpus} available cpu(s).")
930 test_jobs = config.jobs
931 if test_jobs == "auto":
933 max_concurrent_tests = 1
934 print("Interactive mode required, running tests consecutively.")
936 max_concurrent_tests = num_cpus
938 f"Running at most {max_concurrent_tests} python test "
939 "processes concurrently."
942 max_concurrent_tests = test_jobs
944 f"Running at most {max_concurrent_tests} python test processes "
945 "concurrently as set by 'TEST_JOBS'."
948 print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
950 if run_interactive and max_concurrent_tests > 1:
951 raise NotImplementedError(
952 "Running tests interactively (DEBUG is gdb[server] or ATTACH or "
953 "STEP is set) in parallel (TEST_JOBS is more than 1) is not "
959 print("Running tests using custom test runner.")
960 filters = [(parse_test_filter(f)) for f in config.filter.split(",")]
963 "Selected filters: ",
965 f"file={filter_file}, class={filter_class}, function={filter_func}"
966 for filter_file, filter_class, filter_func in filters
970 filter_cb = FilterByTestOption(filters)
972 cb = SplitToSuitesCallback(filter_cb)
973 for d in config.test_src_dir:
974 print("Adding tests from directory tree %s" % d)
975 discover_tests(d, cb)
977 # suites are not hashable, need to use list
980 for testcase_suite in cb.suites.values():
981 tests_amount += testcase_suite.countTestCases()
982 if testcase_suite.cpus_used > max_vpp_cpus:
983 # here we replace test functions with lambdas to just skip them
984 # but we also replace setUp/tearDown functions to do nothing
985 # so that the test can be "started" and "stopped", so that we can
986 # still keep those prints (test description - SKIP), which are done
987 # in stopTest() (for that to trigger, test function must run)
988 for t in testcase_suite:
990 if m.startswith("test_"):
991 setattr(t, m, lambda: t.skipTest("not enough cpus"))
992 setattr(t.__class__, "setUpClass", lambda: None)
993 setattr(t.__class__, "tearDownClass", lambda: None)
994 setattr(t, "setUp", lambda: None)
995 setattr(t, "tearDown", lambda: None)
996 t.__class__.skipped_due_to_cpu_lack = True
997 suites.append(testcase_suite)
1000 "%s out of %s tests match specified filters"
1001 % (tests_amount, tests_amount + cb.filtered.countTestCases())
1004 if not config.extended:
1005 print("Not running extended tests (some tests will be skipped)")
1007 attempts = config.retries + 1
1009 print("Perform %s attempts to pass the suite..." % attempts)
1011 if run_interactive and suites:
1012 # don't fork if requiring interactive terminal
1013 print("Running tests in foreground in the current process")
1014 full_suite = unittest.TestSuite()
1015 free_cpus = list(available_cpus)
1016 cpu_shortage = False
1017 for suite in suites:
1018 if suite.cpus_used <= max_vpp_cpus:
1019 suite.assign_cpus(free_cpus[: suite.cpus_used])
1021 suite.assign_cpus([])
1023 full_suite.addTests(suites)
1024 result = VppTestRunner(
1025 verbosity=config.verbose, failfast=config.failfast, print_summary=True
1027 was_successful = result.wasSuccessful()
1028 if not was_successful:
1029 for test_case_info in result.failed_test_cases_info:
1030 handle_failed_suite(
1031 test_case_info.logger,
1032 test_case_info.tempdir,
1033 test_case_info.vpp_pid,
1036 if test_case_info in result.core_crash_test_cases_info:
1037 check_and_handle_core(
1038 test_case_info.vpp_bin_path,
1039 test_case_info.tempdir,
1040 test_case_info.core_crash_test,
1047 "SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
1048 " ENOUGH CPUS AVAILABLE",
1053 sys.exit(not was_successful)
1056 "Running each VPPTestCase in a separate background process"
1057 f" with at most {max_concurrent_tests} parallel python test "
1061 while suites and attempts > 0:
1062 results = run_forked(suites)
1063 exit_code, suites = parse_results(results)
1066 print("Test run was successful")
1068 print("%s attempt(s) left." % attempts)