14 from multiprocessing import Process, Pipe, get_context
15 from multiprocessing.queues import Queue
16 from multiprocessing.managers import BaseManager
18 from config import config, num_cpus, available_cpus, max_vpp_cpus
19 from framework import (
22 get_testcase_doc_name,
31 from debug import spawn_gdb, start_vpp_in_gdb
41 from discover_tests import discover_tests
43 from subprocess import check_output, CalledProcessError
44 from util import check_core_path, get_core_path, is_core_present
46 # timeout which controls how long the child has to finish after seeing
47 # a core dump in test temporary directory. If this is exceeded, parent assumes
48 # that child process is stuck (e.g. waiting for event from vpp) and kill
53 class StreamQueue(Queue):
58 sys.__stdout__.flush()
59 sys.__stderr__.flush()
62 return self._writer.fileno()
65 class StreamQueueManager(BaseManager):
69 StreamQueueManager.register("StreamQueue", StreamQueue)
72 class TestResult(dict):
73 def __init__(self, testcase_suite, testcases_by_id=None):
74 super(TestResult, self).__init__()
79 self[SKIP_CPU_SHORTAGE] = []
82 self.testcase_suite = testcase_suite
83 self.testcases = [testcase for testcase in testcase_suite]
84 self.testcases_by_id = testcases_by_id
86 def was_successful(self):
88 0 == len(self[FAIL]) == len(self[ERROR])
89 and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE])
90 == self.testcase_suite.countTestCases()
93 def no_tests_run(self):
94 return 0 == len(self[TEST_RUN])
96 def process_result(self, test_id, result):
97 self[result].append(test_id)
99 def suite_from_failed(self):
101 for testcase in self.testcase_suite:
102 tc_id = testcase.id()
103 if tc_id not in self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]:
106 return suite_from_failed(self.testcase_suite, rerun_ids)
108 def get_testcase_names(self, test_id):
109 # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
110 setup_teardown_match = re.match(
111 r"((tearDownClass)|(setUpClass)) \((.+\..+)\)", test_id
113 if setup_teardown_match:
114 test_name, _, _, testcase_name = setup_teardown_match.groups()
115 if len(testcase_name.split(".")) == 2:
116 for key in self.testcases_by_id.keys():
117 if key.startswith(testcase_name):
120 testcase_name = self._get_testcase_doc_name(testcase_name)
122 test_name = self._get_test_description(test_id)
123 testcase_name = self._get_testcase_doc_name(test_id)
125 return testcase_name, test_name
127 def _get_test_description(self, test_id):
128 if test_id in self.testcases_by_id:
129 desc = get_test_description(descriptions, self.testcases_by_id[test_id])
134 def _get_testcase_doc_name(self, test_id):
135 if test_id in self.testcases_by_id:
136 doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
142 def test_runner_wrapper(
143 suite, keep_alive_pipe, stdouterr_queue, finished_pipe, result_pipe, logger
145 sys.stdout = stdouterr_queue
146 sys.stderr = stdouterr_queue
147 VppTestCase.parallel_handler = logger.handlers[0]
148 result = VppTestRunner(
149 keep_alive_pipe=keep_alive_pipe,
150 descriptions=descriptions,
151 verbosity=config.verbose,
152 result_pipe=result_pipe,
153 failfast=config.failfast,
156 finished_pipe.send(result.wasSuccessful())
157 finished_pipe.close()
158 keep_alive_pipe.close()
161 class TestCaseWrapper(object):
162 def __init__(self, testcase_suite, manager):
163 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False)
164 self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
165 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
166 self.testcase_suite = testcase_suite
167 self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
168 self.logger = get_parallel_logger(self.stdouterr_queue)
169 self.child = Process(
170 target=test_runner_wrapper,
173 self.keep_alive_child_end,
174 self.stdouterr_queue,
175 self.finished_child_end,
176 self.result_child_end,
181 self.last_test_temp_dir = None
182 self.last_test_vpp_binary = None
183 self._last_test = None
184 self.last_test_id = None
186 self.last_heard = time.time()
187 self.core_detected_at = None
188 self.testcases_by_id = {}
189 self.testclasess_with_core = {}
190 for testcase in self.testcase_suite:
191 self.testcases_by_id[testcase.id()] = testcase
192 self.result = TestResult(testcase_suite, self.testcases_by_id)
196 return self._last_test
199 def last_test(self, test_id):
200 self.last_test_id = test_id
201 if test_id in self.testcases_by_id:
202 testcase = self.testcases_by_id[test_id]
203 self._last_test = testcase.shortDescription()
204 if not self._last_test:
205 self._last_test = str(testcase)
207 self._last_test = test_id
209 def add_testclass_with_core(self):
210 if self.last_test_id in self.testcases_by_id:
211 test = self.testcases_by_id[self.last_test_id]
212 class_name = unittest.util.strclass(test.__class__)
213 test_name = "'{}' ({})".format(
214 get_test_description(descriptions, test), self.last_test_id
217 test_name = self.last_test_id
218 class_name = re.match(
219 r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name
221 if class_name not in self.testclasess_with_core:
222 self.testclasess_with_core[class_name] = (
224 self.last_test_vpp_binary,
225 self.last_test_temp_dir,
228 def close_pipes(self):
229 self.keep_alive_child_end.close()
230 self.finished_child_end.close()
231 self.result_child_end.close()
232 self.keep_alive_parent_end.close()
233 self.finished_parent_end.close()
234 self.result_parent_end.close()
236 def was_successful(self):
237 return self.result.was_successful()
241 return self.testcase_suite.cpus_used
243 def get_assigned_cpus(self):
244 return self.testcase_suite.get_assigned_cpus()
247 def stdouterr_reader_wrapper(
248 unread_testcases, finished_unread_testcases, read_testcases
251 while read_testcases.is_set() or unread_testcases:
252 if finished_unread_testcases:
253 read_testcase = finished_unread_testcases.pop()
254 unread_testcases.remove(read_testcase)
255 elif unread_testcases:
256 read_testcase = unread_testcases.pop()
259 while data is not None:
260 sys.stdout.write(data)
261 data = read_testcase.stdouterr_queue.get()
263 read_testcase.stdouterr_queue.close()
264 finished_unread_testcases.discard(read_testcase)
268 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
269 if last_test_temp_dir:
270 # Need to create link in case of a timeout or core dump without failure
271 lttd = os.path.basename(last_test_temp_dir)
272 link_path = "%s%s-FAILED" % (config.failed_dir, lttd)
273 if not os.path.exists(link_path):
274 os.symlink(last_test_temp_dir, link_path)
276 "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)
279 # Report core existence
280 core_path = get_core_path(last_test_temp_dir)
281 if os.path.exists(core_path):
283 "Core-file exists in test temporary directory: %s!" % core_path
285 check_core_path(logger, core_path)
286 logger.debug("Running 'file %s':" % core_path)
288 info = check_output(["file", core_path])
290 except CalledProcessError as e:
292 "Subprocess returned with return code "
293 "while running `file' utility on core-file "
300 "Subprocess returned with OS error while "
301 "running 'file' utility "
307 except Exception as e:
308 logger.exception("Unexpected error running `file' utility on core-file")
309 logger.error(f"gdb {vpp_binary} {core_path}")
312 # Copy api post mortem
313 api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
314 if os.path.isfile(api_post_mortem_path):
316 "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)
318 shutil.copy2(api_post_mortem_path, last_test_temp_dir)
321 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
322 if is_core_present(tempdir):
325 "VPP core detected in %s. Last test running was %s"
326 % (tempdir, core_crash_test)
328 print(single_line_delim)
329 spawn_gdb(vpp_binary, get_core_path(tempdir))
330 print(single_line_delim)
331 elif config.compress_core:
332 print("Compressing core-file in test directory `%s'" % tempdir)
333 os.system("gzip %s" % get_core_path(tempdir))
336 def handle_cores(failed_testcases):
337 for failed_testcase in failed_testcases:
338 tcs_with_core = failed_testcase.testclasess_with_core
340 for test, vpp_binary, tempdir in tcs_with_core.values():
341 check_and_handle_core(vpp_binary, tempdir, test)
344 def process_finished_testsuite(
345 wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results
347 results.append(wrapped_testcase_suite.result)
348 finished_testcase_suites.add(wrapped_testcase_suite)
350 if config.failfast and not wrapped_testcase_suite.was_successful():
353 if not wrapped_testcase_suite.was_successful():
354 failed_wrapped_testcases.add(wrapped_testcase_suite)
356 wrapped_testcase_suite.logger,
357 wrapped_testcase_suite.last_test_temp_dir,
358 wrapped_testcase_suite.vpp_pid,
359 wrapped_testcase_suite.last_test_vpp_binary,
365 def run_forked(testcase_suites):
366 wrapped_testcase_suites = set()
367 solo_testcase_suites = []
369 # suites are unhashable, need to use list
371 unread_testcases = set()
372 finished_unread_testcases = set()
373 manager = StreamQueueManager()
376 free_cpus = list(available_cpus)
378 def on_suite_start(tc):
379 nonlocal tests_running
381 tests_running = tests_running + 1
383 def on_suite_finish(tc):
384 nonlocal tests_running
386 tests_running = tests_running - 1
387 assert tests_running >= 0
388 free_cpus.extend(tc.get_assigned_cpus())
390 def run_suite(suite):
392 nonlocal wrapped_testcase_suites
393 nonlocal unread_testcases
395 suite.assign_cpus(free_cpus[: suite.cpus_used])
396 free_cpus = free_cpus[suite.cpus_used :]
397 wrapper = TestCaseWrapper(suite, manager)
398 wrapped_testcase_suites.add(wrapper)
399 unread_testcases.add(wrapper)
400 on_suite_start(suite)
402 def can_run_suite(suite):
403 return tests_running < max_concurrent_tests and (
404 suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus
407 while free_cpus and testcase_suites:
408 a_suite = testcase_suites[0]
409 if a_suite.is_tagged_run_solo:
410 a_suite = testcase_suites.pop(0)
411 solo_testcase_suites.append(a_suite)
413 if can_run_suite(a_suite):
414 a_suite = testcase_suites.pop(0)
419 if tests_running == 0 and solo_testcase_suites:
420 a_suite = solo_testcase_suites.pop(0)
423 read_from_testcases = threading.Event()
424 read_from_testcases.set()
425 stdouterr_thread = threading.Thread(
426 target=stdouterr_reader_wrapper,
427 args=(unread_testcases, finished_unread_testcases, read_from_testcases),
429 stdouterr_thread.start()
431 failed_wrapped_testcases = set()
435 while wrapped_testcase_suites:
436 finished_testcase_suites = set()
437 for wrapped_testcase_suite in wrapped_testcase_suites:
438 while wrapped_testcase_suite.result_parent_end.poll():
439 wrapped_testcase_suite.result.process_result(
440 *wrapped_testcase_suite.result_parent_end.recv()
442 wrapped_testcase_suite.last_heard = time.time()
444 while wrapped_testcase_suite.keep_alive_parent_end.poll():
446 wrapped_testcase_suite.last_test,
447 wrapped_testcase_suite.last_test_vpp_binary,
448 wrapped_testcase_suite.last_test_temp_dir,
449 wrapped_testcase_suite.vpp_pid,
450 ) = wrapped_testcase_suite.keep_alive_parent_end.recv()
451 wrapped_testcase_suite.last_heard = time.time()
453 if wrapped_testcase_suite.finished_parent_end.poll():
454 wrapped_testcase_suite.finished_parent_end.recv()
455 wrapped_testcase_suite.last_heard = time.time()
457 process_finished_testsuite(
458 wrapped_testcase_suite,
459 finished_testcase_suites,
460 failed_wrapped_testcases,
468 if wrapped_testcase_suite.last_heard + config.timeout < time.time():
470 wrapped_testcase_suite.logger.critical(
471 "Child test runner process timed out "
472 "(last test running was `%s' in `%s')!"
474 wrapped_testcase_suite.last_test,
475 wrapped_testcase_suite.last_test_temp_dir,
478 elif not wrapped_testcase_suite.child.is_alive():
480 wrapped_testcase_suite.logger.critical(
481 "Child test runner process unexpectedly died "
482 "(last test running was `%s' in `%s')!"
484 wrapped_testcase_suite.last_test,
485 wrapped_testcase_suite.last_test_temp_dir,
489 wrapped_testcase_suite.last_test_temp_dir
490 and wrapped_testcase_suite.last_test_vpp_binary
492 if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
493 wrapped_testcase_suite.add_testclass_with_core()
494 if wrapped_testcase_suite.core_detected_at is None:
495 wrapped_testcase_suite.core_detected_at = time.time()
497 wrapped_testcase_suite.core_detected_at + core_timeout
500 wrapped_testcase_suite.logger.critical(
501 "Child test runner process unresponsive and "
502 "core-file exists in test temporary directory "
503 "(last test running was `%s' in `%s')!"
505 wrapped_testcase_suite.last_test,
506 wrapped_testcase_suite.last_test_temp_dir,
512 wrapped_testcase_suite.child.terminate()
514 # terminating the child process tends to leave orphan
516 if wrapped_testcase_suite.vpp_pid:
517 os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
521 wrapped_testcase_suite.result.crashed = True
522 wrapped_testcase_suite.result.process_result(
523 wrapped_testcase_suite.last_test_id, ERROR
526 process_finished_testsuite(
527 wrapped_testcase_suite,
528 finished_testcase_suites,
529 failed_wrapped_testcases,
535 for finished_testcase in finished_testcase_suites:
536 # Somewhat surprisingly, the join below may
537 # timeout, even if client signaled that
538 # it finished - so we note it just in case.
539 join_start = time.time()
540 finished_testcase.child.join(test_finished_join_timeout)
541 join_end = time.time()
542 if join_end - join_start >= test_finished_join_timeout:
543 finished_testcase.logger.error(
544 "Timeout joining finished test: %s (pid %d)"
545 % (finished_testcase.last_test, finished_testcase.child.pid)
547 finished_testcase.close_pipes()
548 wrapped_testcase_suites.remove(finished_testcase)
549 finished_unread_testcases.add(finished_testcase)
550 finished_testcase.stdouterr_queue.put(None)
551 on_suite_finish(finished_testcase)
553 while testcase_suites:
554 results.append(TestResult(testcase_suites.pop(0)))
555 elif testcase_suites:
556 a_suite = testcase_suites.pop(0)
557 while a_suite and a_suite.is_tagged_run_solo:
558 solo_testcase_suites.append(a_suite)
560 a_suite = testcase_suites.pop(0)
563 if a_suite and can_run_suite(a_suite):
565 if solo_testcase_suites and tests_running == 0:
566 a_suite = solo_testcase_suites.pop(0)
570 for wrapped_testcase_suite in wrapped_testcase_suites:
571 wrapped_testcase_suite.child.terminate()
572 wrapped_testcase_suite.stdouterr_queue.put(None)
575 read_from_testcases.clear()
576 stdouterr_thread.join(config.timeout)
579 handle_cores(failed_wrapped_testcases)
583 class TestSuiteWrapper(unittest.TestSuite):
587 return super().__init__()
589 def addTest(self, test):
590 self.cpus_used = max(self.cpus_used, test.get_cpus_required())
591 super().addTest(test)
593 def assign_cpus(self, cpus):
596 def _handleClassSetUp(self, test, result):
597 if not test.__class__.skipped_due_to_cpu_lack:
598 test.assign_cpus(self.cpus)
599 super()._handleClassSetUp(test, result)
601 def get_assigned_cpus(self):
605 class SplitToSuitesCallback:
606 def __init__(self, filter_callback):
608 self.suite_name = "default"
609 self.filter_callback = filter_callback
610 self.filtered = TestSuiteWrapper()
612 def __call__(self, file_name, cls, method):
613 test_method = cls(method)
614 if self.filter_callback(file_name, cls.__name__, method):
615 self.suite_name = file_name + cls.__name__
616 if self.suite_name not in self.suites:
617 self.suites[self.suite_name] = TestSuiteWrapper()
618 self.suites[self.suite_name].is_tagged_run_solo = False
619 self.suites[self.suite_name].addTest(test_method)
620 if test_method.is_tagged_run_solo():
621 self.suites[self.suite_name].is_tagged_run_solo = True
624 self.filtered.addTest(test_method)
627 def parse_test_filter(test_filter):
629 filter_file_name = None
630 filter_class_name = None
631 filter_func_name = None
636 raise Exception("Unrecognized %s option: %s" % (test_option, f))
638 if parts[2] not in ("*", ""):
639 filter_func_name = parts[2]
640 if parts[1] not in ("*", ""):
641 filter_class_name = parts[1]
642 if parts[0] not in ("*", ""):
643 if parts[0].startswith("test_"):
644 filter_file_name = parts[0]
646 filter_file_name = "test_%s" % parts[0]
648 if f.startswith("test_"):
651 filter_file_name = "test_%s" % f
653 filter_file_name = "%s.py" % filter_file_name
654 return filter_file_name, filter_class_name, filter_func_name
657 def filter_tests(tests, filter_cb):
658 result = TestSuiteWrapper()
660 if isinstance(t, unittest.suite.TestSuite):
661 # this is a bunch of tests, recursively filter...
662 x = filter_tests(t, filter_cb)
663 if x.countTestCases() > 0:
665 elif isinstance(t, unittest.TestCase):
666 # this is a single test
667 parts = t.id().split(".")
668 # t.id() for common cases like this:
669 # test_classifier.TestClassifier.test_acl_ip
670 # apply filtering only if it is so
672 if not filter_cb(parts[0], parts[1], parts[2]):
676 # unexpected object, don't touch it
681 class FilterByTestOption:
682 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
683 self.filter_file_name = filter_file_name
684 self.filter_class_name = filter_class_name
685 self.filter_func_name = filter_func_name
687 def __call__(self, file_name, class_name, func_name):
688 if self.filter_file_name:
689 fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
692 if self.filter_class_name and class_name != self.filter_class_name:
694 if self.filter_func_name and func_name != self.filter_func_name:
699 class FilterByClassList:
700 def __init__(self, classes_with_filenames):
701 self.classes_with_filenames = classes_with_filenames
703 def __call__(self, file_name, class_name, func_name):
704 return ".".join([file_name, class_name]) in self.classes_with_filenames
707 def suite_from_failed(suite, failed):
708 failed = {x.rsplit(".", 1)[0] for x in failed}
709 filter_cb = FilterByClassList(failed)
710 suite = filter_tests(suite, filter_cb)
714 class AllResults(dict):
716 super(AllResults, self).__init__()
717 self.all_testcases = 0
718 self.results_per_suite = []
723 self[SKIP_CPU_SHORTAGE] = 0
726 self.testsuites_no_tests_run = []
728 def add_results(self, result):
729 self.results_per_suite.append(result)
730 result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN, SKIP_CPU_SHORTAGE]
731 for result_type in result_types:
732 self[result_type] += len(result[result_type])
734 def add_result(self, result):
736 self.all_testcases += result.testcase_suite.countTestCases()
737 self.add_results(result)
739 if result.no_tests_run():
740 self.testsuites_no_tests_run.append(result.testcase_suite)
745 elif not result.was_successful():
749 self.rerun.append(result.testcase_suite)
753 def print_results(self):
755 print(double_line_delim)
756 print("TEST RESULTS:")
758 def indent_results(lines):
759 lines = list(filter(None, lines))
760 maximum = max(lines, key=lambda x: x.index(":"))
761 maximum = 4 + maximum.index(":")
763 padding = " " * (maximum - l.index(":"))
764 print(f"{padding}{l}")
768 f"Scheduled tests: {self.all_testcases}",
769 f"Executed tests: {self[TEST_RUN]}",
770 f"Passed tests: {colorize(self[PASS], GREEN)}",
771 f"Skipped tests: {colorize(self[SKIP], YELLOW)}"
774 f"Not Executed tests: {colorize(self.not_executed, RED)}"
777 f"Failures: {colorize(self[FAIL], RED)}" if self[FAIL] else None,
778 f"Errors: {colorize(self[ERROR], RED)}" if self[ERROR] else None,
779 "Tests skipped due to lack of CPUS: "
780 f"{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}"
781 if self[SKIP_CPU_SHORTAGE]
786 if self.all_failed > 0:
787 print("FAILURES AND ERRORS IN TESTS:")
788 for result in self.results_per_suite:
789 failed_testcase_ids = result[FAIL]
790 errored_testcase_ids = result[ERROR]
791 old_testcase_name = None
792 if failed_testcase_ids:
793 for failed_test_id in failed_testcase_ids:
794 new_testcase_name, test_name = result.get_testcase_names(
797 if new_testcase_name != old_testcase_name:
799 " Testcase name: {}".format(
800 colorize(new_testcase_name, RED)
803 old_testcase_name = new_testcase_name
805 " FAILURE: {} [{}]".format(
806 colorize(test_name, RED), failed_test_id
809 if errored_testcase_ids:
810 for errored_test_id in errored_testcase_ids:
811 new_testcase_name, test_name = result.get_testcase_names(
814 if new_testcase_name != old_testcase_name:
816 " Testcase name: {}".format(
817 colorize(new_testcase_name, RED)
820 old_testcase_name = new_testcase_name
822 " ERROR: {} [{}]".format(
823 colorize(test_name, RED), errored_test_id
826 if self.testsuites_no_tests_run:
827 print("TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:")
829 for testsuite in self.testsuites_no_tests_run:
830 for testcase in testsuite:
831 tc_classes.add(get_testcase_doc_name(testcase))
832 for tc_class in tc_classes:
833 print(" {}".format(colorize(tc_class, RED)))
835 if self[SKIP_CPU_SHORTAGE]:
839 " SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
840 " ENOUGH CPUS AVAILABLE",
844 print(double_line_delim)
848 def not_executed(self):
849 return self.all_testcases - self[TEST_RUN]
852 def all_failed(self):
853 return self[FAIL] + self[ERROR]
856 def parse_results(results):
858 Prints the number of scheduled, executed, not executed, passed, failed,
859 errored and skipped tests and details about failed and errored tests.
861 Also returns all suites where any test failed.
867 results_per_suite = AllResults()
870 for result in results:
871 result_code = results_per_suite.add_result(result)
874 elif result_code == -1:
877 results_per_suite.print_results()
885 return return_code, results_per_suite.rerun
888 if __name__ == "__main__":
890 print(f"Config is: {config}")
893 print("Running sanity test case.")
895 rc = sanity_run_vpp.main()
898 except Exception as e:
899 print(traceback.format_exc())
900 print("Couldn't run sanity test case.")
903 test_finished_join_timeout = 15
905 debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
906 debug_core = config.debug == "core"
908 run_interactive = debug_gdb or config.step or config.force_foreground
910 max_concurrent_tests = 0
911 print(f"OS reports {num_cpus} available cpu(s).")
913 test_jobs = config.jobs
914 if test_jobs == "auto":
916 max_concurrent_tests = 1
917 print("Interactive mode required, running tests consecutively.")
919 max_concurrent_tests = num_cpus
921 f"Running at most {max_concurrent_tests} python test "
922 "processes concurrently."
925 max_concurrent_tests = test_jobs
927 f"Running at most {max_concurrent_tests} python test processes "
928 "concurrently as set by 'TEST_JOBS'."
931 print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
933 if run_interactive and max_concurrent_tests > 1:
934 raise NotImplementedError(
935 "Running tests interactively (DEBUG is gdb[server] or ATTACH or "
936 "STEP is set) in parallel (TEST_JOBS is more than 1) is not "
942 print("Running tests using custom test runner.")
943 filter_file, filter_class, filter_func = parse_test_filter(config.filter)
946 "Selected filters: file=%s, class=%s, function=%s"
947 % (filter_file, filter_class, filter_func)
950 filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
952 ignore_path = config.venv_dir
953 cb = SplitToSuitesCallback(filter_cb)
954 for d in config.test_src_dir:
955 print("Adding tests from directory tree %s" % d)
956 discover_tests(d, cb, ignore_path)
958 # suites are not hashable, need to use list
961 for testcase_suite in cb.suites.values():
962 tests_amount += testcase_suite.countTestCases()
963 if testcase_suite.cpus_used > max_vpp_cpus:
964 # here we replace test functions with lambdas to just skip them
965 # but we also replace setUp/tearDown functions to do nothing
966 # so that the test can be "started" and "stopped", so that we can
967 # still keep those prints (test description - SKIP), which are done
968 # in stopTest() (for that to trigger, test function must run)
969 for t in testcase_suite:
971 if m.startswith("test_"):
972 setattr(t, m, lambda: t.skipTest("not enough cpus"))
973 setattr(t.__class__, "setUpClass", lambda: None)
974 setattr(t.__class__, "tearDownClass", lambda: None)
975 setattr(t, "setUp", lambda: None)
976 setattr(t, "tearDown", lambda: None)
977 t.__class__.skipped_due_to_cpu_lack = True
978 suites.append(testcase_suite)
981 "%s out of %s tests match specified filters"
982 % (tests_amount, tests_amount + cb.filtered.countTestCases())
985 if not config.extended:
986 print("Not running extended tests (some tests will be skipped)")
988 attempts = config.retries + 1
990 print("Perform %s attempts to pass the suite..." % attempts)
992 if run_interactive and suites:
993 # don't fork if requiring interactive terminal
994 print("Running tests in foreground in the current process")
995 full_suite = unittest.TestSuite()
996 free_cpus = list(available_cpus)
999 if suite.cpus_used <= max_vpp_cpus:
1000 suite.assign_cpus(free_cpus[: suite.cpus_used])
1002 suite.assign_cpus([])
1004 full_suite.addTests(suites)
1005 result = VppTestRunner(
1006 verbosity=config.verbose, failfast=config.failfast, print_summary=True
1008 was_successful = result.wasSuccessful()
1009 if not was_successful:
1010 for test_case_info in result.failed_test_cases_info:
1011 handle_failed_suite(
1012 test_case_info.logger,
1013 test_case_info.tempdir,
1014 test_case_info.vpp_pid,
1017 if test_case_info in result.core_crash_test_cases_info:
1018 check_and_handle_core(
1019 test_case_info.vpp_bin_path,
1020 test_case_info.tempdir,
1021 test_case_info.core_crash_test,
1028 "SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
1029 " ENOUGH CPUS AVAILABLE",
1034 sys.exit(not was_successful)
1037 "Running each VPPTestCase in a separate background process"
1038 f" with at most {max_concurrent_tests} parallel python test "
1042 while suites and attempts > 0:
1043 results = run_forked(suites)
1044 exit_code, suites = parse_results(results)
1047 print("Test run was successful")
1049 print("%s attempt(s) left." % attempts)