13 from multiprocessing import Process, Pipe, get_context
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
16 from config import config, num_cpus, available_cpus, max_vpp_cpus
17 from vpp_papi import VPPApiJSONFiles
18 from asfframework import (
20 get_testcase_doc_name,
22 get_failed_testcase_linkname,
25 from framework import VppTestCase
26 from test_result_code import TestResultCode
27 from debug import spawn_gdb
37 from discover_tests import discover_tests
39 from subprocess import check_output, CalledProcessError
40 from util import check_core_path, get_core_path, is_core_present
42 # timeout which controls how long the child has to finish after seeing
43 # a core dump in test temporary directory. If this is exceeded, parent assumes
44 # that child process is stuck (e.g. waiting for event from vpp) and kill
49 class StreamQueue(Queue):
54 sys.__stdout__.flush()
55 sys.__stderr__.flush()
58 return self._writer.fileno()
61 class StreamQueueManager(BaseManager):
65 StreamQueueManager.register("StreamQueue", StreamQueue)
68 class TestResult(dict):
69 def __init__(self, testcase_suite, testcases_by_id=None):
70 super(TestResult, self).__init__()
71 for trc in list(TestResultCode):
74 self.testcase_suite = testcase_suite
75 self.testcases = [testcase for testcase in testcase_suite]
76 self.testcases_by_id = testcases_by_id
78 def was_successful(self):
81 == len(self[TestResultCode.FAIL])
82 == len(self[TestResultCode.ERROR])
83 == len(self[TestResultCode.UNEXPECTED_PASS])
84 and len(self[TestResultCode.PASS])
85 + len(self[TestResultCode.SKIP])
86 + len(self[TestResultCode.SKIP_CPU_SHORTAGE])
87 + len(self[TestResultCode.EXPECTED_FAIL])
88 == self.testcase_suite.countTestCases()
91 def no_tests_run(self):
92 return 0 == len(self[TestResultCode.TEST_RUN])
94 def process_result(self, test_id, result):
95 self[result].append(test_id)
97 def suite_from_failed(self):
99 for testcase in self.testcase_suite:
100 tc_id = testcase.id()
103 not in self[TestResultCode.PASS]
104 + self[TestResultCode.SKIP]
105 + self[TestResultCode.SKIP_CPU_SHORTAGE]
106 + self[TestResultCode.EXPECTED_FAIL]
110 return suite_from_failed(self.testcase_suite, rerun_ids)
112 def get_testcase_names(self, test_id):
113 # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
114 setup_teardown_match = re.match(
115 r"((tearDownClass)|(setUpClass)) \((.+\..+)\)", test_id
117 if setup_teardown_match:
118 test_name, _, _, testcase_name = setup_teardown_match.groups()
119 if len(testcase_name.split(".")) == 2:
120 for key in self.testcases_by_id.keys():
121 if key.startswith(testcase_name):
124 testcase_name = self._get_testcase_doc_name(testcase_name)
126 test_name = self._get_test_description(test_id)
127 testcase_name = self._get_testcase_doc_name(test_id)
129 return testcase_name, test_name
131 def _get_test_description(self, test_id):
132 if test_id in self.testcases_by_id:
133 desc = get_test_description(descriptions, self.testcases_by_id[test_id])
138 def _get_testcase_doc_name(self, test_id):
139 if test_id in self.testcases_by_id:
140 doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
146 def test_runner_wrapper(
147 suite, keep_alive_pipe, stdouterr_queue, finished_pipe, result_pipe, logger
149 sys.stdout = stdouterr_queue
150 sys.stderr = stdouterr_queue
151 VppTestCase.parallel_handler = logger.handlers[0]
152 result = VppTestRunner(
153 keep_alive_pipe=keep_alive_pipe,
154 descriptions=descriptions,
155 verbosity=config.verbose,
156 result_pipe=result_pipe,
157 failfast=config.failfast,
160 finished_pipe.send(result.wasSuccessful())
161 finished_pipe.close()
162 keep_alive_pipe.close()
165 class TestCaseWrapper(object):
166 def __init__(self, testcase_suite, manager):
167 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False)
168 self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
169 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
170 self.testcase_suite = testcase_suite
171 self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
172 self.logger = get_parallel_logger(self.stdouterr_queue)
173 self.child = Process(
174 target=test_runner_wrapper,
177 self.keep_alive_child_end,
178 self.stdouterr_queue,
179 self.finished_child_end,
180 self.result_child_end,
185 self.last_test_temp_dir = None
186 self.last_test_vpp_binary = None
187 self._last_test = None
188 self.last_test_id = None
190 self.last_heard = time.time()
191 self.core_detected_at = None
192 self.testcases_by_id = {}
193 self.testclasess_with_core = {}
194 for testcase in self.testcase_suite:
195 self.testcases_by_id[testcase.id()] = testcase
196 self.result = TestResult(testcase_suite, self.testcases_by_id)
200 return self._last_test
203 def last_test(self, test_id):
204 self.last_test_id = test_id
205 if test_id in self.testcases_by_id:
206 testcase = self.testcases_by_id[test_id]
207 self._last_test = testcase.shortDescription()
208 if not self._last_test:
209 self._last_test = str(testcase)
211 self._last_test = test_id
213 def add_testclass_with_core(self):
214 if self.last_test_id in self.testcases_by_id:
215 test = self.testcases_by_id[self.last_test_id]
216 class_name = unittest.util.strclass(test.__class__)
217 test_name = "'{}' ({})".format(
218 get_test_description(descriptions, test), self.last_test_id
221 test_name = self.last_test_id
222 class_name = re.match(
223 r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name
225 if class_name not in self.testclasess_with_core:
226 self.testclasess_with_core[class_name] = (
228 self.last_test_vpp_binary,
229 self.last_test_temp_dir,
232 def close_pipes(self):
233 self.keep_alive_child_end.close()
234 self.finished_child_end.close()
235 self.result_child_end.close()
236 self.keep_alive_parent_end.close()
237 self.finished_parent_end.close()
238 self.result_parent_end.close()
240 def was_successful(self):
241 return self.result.was_successful()
245 return self.testcase_suite.cpus_used
247 def get_assigned_cpus(self):
248 return self.testcase_suite.get_assigned_cpus()
251 def stdouterr_reader_wrapper(
252 unread_testcases, finished_unread_testcases, read_testcases
255 while read_testcases.is_set() or unread_testcases:
256 if finished_unread_testcases:
257 read_testcase = finished_unread_testcases.pop()
258 unread_testcases.remove(read_testcase)
259 elif unread_testcases:
260 read_testcase = unread_testcases.pop()
263 while data is not None:
264 sys.stdout.write(data)
265 data = read_testcase.stdouterr_queue.get()
267 read_testcase.stdouterr_queue.close()
268 finished_unread_testcases.discard(read_testcase)
272 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
273 if last_test_temp_dir:
274 # Need to create link in case of a timeout or core dump without failure
275 lttd = os.path.basename(last_test_temp_dir)
276 link_path = os.path.join(config.failed_dir, f"{lttd}-FAILED")
277 if not os.path.exists(link_path):
278 os.symlink(last_test_temp_dir, link_path)
280 "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)
283 # Report core existence
284 core_path = get_core_path(last_test_temp_dir)
285 if os.path.exists(core_path):
287 "Core-file exists in test temporary directory: %s!" % core_path
289 check_core_path(logger, core_path)
290 logger.debug("Running 'file %s':" % core_path)
292 info = check_output(["file", core_path])
294 except CalledProcessError as e:
296 "Subprocess returned with return code "
297 "while running `file' utility on core-file "
304 "Subprocess returned with OS error while "
305 "running 'file' utility "
311 except Exception as e:
312 logger.exception("Unexpected error running `file' utility on core-file")
313 logger.error(f"gdb {vpp_binary} {core_path}")
316 # Copy api post mortem
317 api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
318 if os.path.isfile(api_post_mortem_path):
320 "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)
322 shutil.copy2(api_post_mortem_path, last_test_temp_dir)
325 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
326 if is_core_present(tempdir):
329 "VPP core detected in %s. Last test running was %s"
330 % (tempdir, core_crash_test)
332 print(single_line_delim)
333 spawn_gdb(vpp_binary, get_core_path(tempdir))
334 print(single_line_delim)
335 elif config.compress_core:
336 print("Compressing core-file in test directory `%s'" % tempdir)
337 os.system("gzip %s" % get_core_path(tempdir))
340 def handle_cores(failed_testcases):
341 for failed_testcase in failed_testcases:
342 tcs_with_core = failed_testcase.testclasess_with_core
344 for test, vpp_binary, tempdir in tcs_with_core.values():
345 check_and_handle_core(vpp_binary, tempdir, test)
348 def process_finished_testsuite(
349 wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results
351 results.append(wrapped_testcase_suite.result)
352 finished_testcase_suites.add(wrapped_testcase_suite)
354 if config.failfast and not wrapped_testcase_suite.was_successful():
357 if not wrapped_testcase_suite.was_successful():
358 failed_wrapped_testcases.add(wrapped_testcase_suite)
360 wrapped_testcase_suite.logger,
361 wrapped_testcase_suite.last_test_temp_dir,
362 wrapped_testcase_suite.vpp_pid,
363 wrapped_testcase_suite.last_test_vpp_binary,
369 def run_forked(testcase_suites):
370 wrapped_testcase_suites = set()
371 solo_testcase_suites = []
373 # suites are unhashable, need to use list
375 unread_testcases = set()
376 finished_unread_testcases = set()
377 manager = StreamQueueManager()
380 free_cpus = list(available_cpus)
382 def on_suite_start(tc):
383 nonlocal tests_running
385 tests_running = tests_running + 1
387 def on_suite_finish(tc):
388 nonlocal tests_running
390 tests_running = tests_running - 1
391 assert tests_running >= 0
392 free_cpus.extend(tc.get_assigned_cpus())
394 def run_suite(suite):
396 nonlocal wrapped_testcase_suites
397 nonlocal unread_testcases
399 suite.assign_cpus(free_cpus[: suite.cpus_used])
400 free_cpus = free_cpus[suite.cpus_used :]
401 wrapper = TestCaseWrapper(suite, manager)
402 wrapped_testcase_suites.add(wrapper)
403 unread_testcases.add(wrapper)
404 on_suite_start(suite)
406 def can_run_suite(suite):
407 return tests_running < max_concurrent_tests and (
408 suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus
411 while free_cpus and testcase_suites:
412 a_suite = testcase_suites[0]
413 if a_suite.is_tagged_run_solo:
414 a_suite = testcase_suites.pop(0)
415 solo_testcase_suites.append(a_suite)
417 if can_run_suite(a_suite):
418 a_suite = testcase_suites.pop(0)
423 if tests_running == 0 and solo_testcase_suites:
424 a_suite = solo_testcase_suites.pop(0)
427 read_from_testcases = threading.Event()
428 read_from_testcases.set()
429 stdouterr_thread = threading.Thread(
430 target=stdouterr_reader_wrapper,
431 args=(unread_testcases, finished_unread_testcases, read_from_testcases),
433 stdouterr_thread.start()
435 failed_wrapped_testcases = set()
439 while wrapped_testcase_suites or testcase_suites:
440 finished_testcase_suites = set()
441 for wrapped_testcase_suite in wrapped_testcase_suites:
442 while wrapped_testcase_suite.result_parent_end.poll():
443 wrapped_testcase_suite.result.process_result(
444 *wrapped_testcase_suite.result_parent_end.recv()
446 wrapped_testcase_suite.last_heard = time.time()
448 while wrapped_testcase_suite.keep_alive_parent_end.poll():
450 wrapped_testcase_suite.last_test,
451 wrapped_testcase_suite.last_test_vpp_binary,
452 wrapped_testcase_suite.last_test_temp_dir,
453 wrapped_testcase_suite.vpp_pid,
454 ) = wrapped_testcase_suite.keep_alive_parent_end.recv()
455 wrapped_testcase_suite.last_heard = time.time()
457 if wrapped_testcase_suite.finished_parent_end.poll():
458 wrapped_testcase_suite.finished_parent_end.recv()
459 wrapped_testcase_suite.last_heard = time.time()
461 process_finished_testsuite(
462 wrapped_testcase_suite,
463 finished_testcase_suites,
464 failed_wrapped_testcases,
472 if wrapped_testcase_suite.last_heard + config.timeout < time.time():
474 wrapped_testcase_suite.logger.critical(
475 "Child test runner process timed out "
476 "(last test running was `%s' in `%s')!"
478 wrapped_testcase_suite.last_test,
479 wrapped_testcase_suite.last_test_temp_dir,
482 elif not wrapped_testcase_suite.child.is_alive():
484 wrapped_testcase_suite.logger.critical(
485 "Child test runner process unexpectedly died "
486 "(last test running was `%s' in `%s')!"
488 wrapped_testcase_suite.last_test,
489 wrapped_testcase_suite.last_test_temp_dir,
493 wrapped_testcase_suite.last_test_temp_dir
494 and wrapped_testcase_suite.last_test_vpp_binary
496 if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
497 wrapped_testcase_suite.add_testclass_with_core()
498 if wrapped_testcase_suite.core_detected_at is None:
499 wrapped_testcase_suite.core_detected_at = time.time()
501 wrapped_testcase_suite.core_detected_at + core_timeout
504 wrapped_testcase_suite.logger.critical(
505 "Child test runner process unresponsive and "
506 "core-file exists in test temporary directory "
507 "(last test running was `%s' in `%s')!"
509 wrapped_testcase_suite.last_test,
510 wrapped_testcase_suite.last_test_temp_dir,
516 wrapped_testcase_suite.child.terminate()
518 # terminating the child process tends to leave orphan
520 if wrapped_testcase_suite.vpp_pid:
521 os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
525 wrapped_testcase_suite.result.crashed = True
526 wrapped_testcase_suite.result.process_result(
527 wrapped_testcase_suite.last_test_id, TestResultCode.ERROR
530 process_finished_testsuite(
531 wrapped_testcase_suite,
532 finished_testcase_suites,
533 failed_wrapped_testcases,
539 for finished_testcase in finished_testcase_suites:
540 # Somewhat surprisingly, the join below may
541 # timeout, even if client signaled that
542 # it finished - so we note it just in case.
543 join_start = time.time()
544 finished_testcase.child.join(test_finished_join_timeout)
545 join_end = time.time()
546 if join_end - join_start >= test_finished_join_timeout:
547 finished_testcase.logger.error(
548 "Timeout joining finished test: %s (pid %d)"
549 % (finished_testcase.last_test, finished_testcase.child.pid)
551 finished_testcase.close_pipes()
552 wrapped_testcase_suites.remove(finished_testcase)
553 finished_unread_testcases.add(finished_testcase)
554 finished_testcase.stdouterr_queue.put(None)
555 on_suite_finish(finished_testcase)
557 while testcase_suites:
558 results.append(TestResult(testcase_suites.pop(0)))
559 elif testcase_suites:
560 a_suite = testcase_suites[0]
561 while a_suite and a_suite.is_tagged_run_solo:
562 testcase_suites.pop(0)
563 solo_testcase_suites.append(a_suite)
565 a_suite = testcase_suites[0]
568 if a_suite and can_run_suite(a_suite):
569 testcase_suites.pop(0)
571 if solo_testcase_suites and tests_running == 0:
572 a_suite = solo_testcase_suites.pop(0)
576 for wrapped_testcase_suite in wrapped_testcase_suites:
577 wrapped_testcase_suite.child.terminate()
578 wrapped_testcase_suite.stdouterr_queue.put(None)
581 read_from_testcases.clear()
582 stdouterr_thread.join(config.timeout)
585 handle_cores(failed_wrapped_testcases)
589 class TestSuiteWrapper(unittest.TestSuite):
593 return super().__init__()
595 def addTest(self, test):
596 self.cpus_used = max(self.cpus_used, test.get_cpus_required())
597 super().addTest(test)
599 def assign_cpus(self, cpus):
602 def _handleClassSetUp(self, test, result):
603 if not test.__class__.skipped_due_to_cpu_lack:
604 test.assign_cpus(self.cpus)
605 super()._handleClassSetUp(test, result)
607 def get_assigned_cpus(self):
611 class SplitToSuitesCallback:
612 def __init__(self, filter_callback):
614 self.suite_name = "default"
615 self.filter_callback = filter_callback
616 self.filtered = TestSuiteWrapper()
618 def __call__(self, file_name, cls, method):
619 test_method = cls(method)
620 if self.filter_callback(file_name, cls.__name__, method):
621 self.suite_name = file_name + cls.__name__
622 if self.suite_name not in self.suites:
623 self.suites[self.suite_name] = TestSuiteWrapper()
624 self.suites[self.suite_name].is_tagged_run_solo = False
625 self.suites[self.suite_name].addTest(test_method)
626 if test_method.is_tagged_run_solo():
627 self.suites[self.suite_name].is_tagged_run_solo = True
630 self.filtered.addTest(test_method)
633 def parse_test_filter(test_filter):
635 filter_file_name = None
636 filter_class_name = None
637 filter_func_name = None
642 raise Exception(f"Invalid test filter: {test_filter}")
644 if parts[2] not in ("*", ""):
645 filter_func_name = parts[2]
646 if parts[1] not in ("*", ""):
647 filter_class_name = parts[1]
648 if parts[0] not in ("*", ""):
649 if parts[0].startswith("test_"):
650 filter_file_name = parts[0]
652 filter_file_name = "test_%s" % parts[0]
654 if f.startswith("test_"):
657 filter_file_name = "test_%s" % f
659 filter_file_name = "%s.py" % filter_file_name
660 return filter_file_name, filter_class_name, filter_func_name
663 def filter_tests(tests, filter_cb):
664 result = TestSuiteWrapper()
666 if isinstance(t, unittest.suite.TestSuite):
667 # this is a bunch of tests, recursively filter...
668 x = filter_tests(t, filter_cb)
669 if x.countTestCases() > 0:
671 elif isinstance(t, unittest.TestCase):
672 # this is a single test
673 parts = t.id().split(".")
674 # t.id() for common cases like this:
675 # test_classifier.TestClassifier.test_acl_ip
676 # apply filtering only if it is so
678 if not filter_cb(parts[0], parts[1], parts[2]):
682 # unexpected object, don't touch it
687 class FilterByTestOption:
688 def __init__(self, filters):
689 self.filters = filters
691 def __call__(self, file_name, class_name, func_name):
701 fn_match = fnmatch.fnmatch(file_name, filter_file_name)
704 if filter_class_name and class_name != filter_class_name:
706 if filter_func_name and func_name != filter_func_name:
710 for filter_file_name, filter_class_name, filter_func_name in self.filters:
724 class FilterByClassList:
725 def __init__(self, classes_with_filenames):
726 self.classes_with_filenames = classes_with_filenames
728 def __call__(self, file_name, class_name, func_name):
729 return ".".join([file_name, class_name]) in self.classes_with_filenames
732 def suite_from_failed(suite, failed):
733 failed = {x.rsplit(".", 1)[0] for x in failed}
734 filter_cb = FilterByClassList(failed)
735 suite = filter_tests(suite, filter_cb)
739 class AllResults(dict):
741 super(AllResults, self).__init__()
742 self.all_testcases = 0
743 self.results_per_suite = []
744 for trc in list(TestResultCode):
747 self.testsuites_no_tests_run = []
749 def add_results(self, result):
750 self.results_per_suite.append(result)
751 for trc in list(TestResultCode):
752 self[trc] += len(result[trc])
754 def add_result(self, result):
756 self.all_testcases += result.testcase_suite.countTestCases()
757 self.add_results(result)
759 if result.no_tests_run():
760 self.testsuites_no_tests_run.append(result.testcase_suite)
765 elif not result.was_successful():
769 self.rerun.append(result.testcase_suite)
773 def print_results(self):
775 print(double_line_delim)
776 print("TEST RESULTS:")
778 def indent_results(lines):
779 lines = list(filter(None, lines))
780 maximum = max(lines, key=lambda x: x.index(":"))
781 maximum = 4 + maximum.index(":")
783 padding = " " * (maximum - l.index(":"))
784 print(f"{padding}{l}")
788 f"Scheduled tests: {self.all_testcases}",
789 f"Executed tests: {self[TestResultCode.TEST_RUN]}",
790 f"Passed tests: {colorize(self[TestResultCode.PASS], GREEN)}",
791 f"Expected failures: {colorize(self[TestResultCode.EXPECTED_FAIL], GREEN)}"
792 if self[TestResultCode.EXPECTED_FAIL]
794 f"Skipped tests: {colorize(self[TestResultCode.SKIP], YELLOW)}"
795 if self[TestResultCode.SKIP]
797 f"Not Executed tests: {colorize(self.not_executed, RED)}"
800 f"Failures: {colorize(self[TestResultCode.FAIL], RED)}"
801 if self[TestResultCode.FAIL]
803 f"Unexpected passes: {colorize(self[TestResultCode.UNEXPECTED_PASS], RED)}"
804 if self[TestResultCode.UNEXPECTED_PASS]
806 f"Errors: {colorize(self[TestResultCode.ERROR], RED)}"
807 if self[TestResultCode.ERROR]
809 "Tests skipped due to lack of CPUS: "
810 f"{colorize(self[TestResultCode.SKIP_CPU_SHORTAGE], YELLOW)}"
811 if self[TestResultCode.SKIP_CPU_SHORTAGE]
816 if self.all_failed > 0:
817 print("FAILURES AND ERRORS IN TESTS:")
818 for result in self.results_per_suite:
819 old_testcase_name = None
820 for tr_code, headline in (
821 (TestResultCode.FAIL, "FAILURE"),
822 (TestResultCode.ERROR, "ERROR"),
823 (TestResultCode.UNEXPECTED_PASS, "UNEXPECTED PASS"),
825 if not result[tr_code]:
828 for failed_test_id in result[tr_code]:
829 new_testcase_name, test_name = result.get_testcase_names(
832 if new_testcase_name != old_testcase_name:
834 f" Testcase name: {colorize(new_testcase_name, RED)}"
836 old_testcase_name = new_testcase_name
838 f" {headline}: {colorize(test_name, RED)} [{failed_test_id}]"
841 if self.testsuites_no_tests_run:
842 print("TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:")
844 for testsuite in self.testsuites_no_tests_run:
845 for testcase in testsuite:
846 tc_classes.add(get_testcase_doc_name(testcase))
847 for tc_class in tc_classes:
848 print(" {}".format(colorize(tc_class, RED)))
850 if self[TestResultCode.SKIP_CPU_SHORTAGE]:
854 " SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
855 " ENOUGH CPUS AVAILABLE",
859 print(double_line_delim)
863 def not_executed(self):
864 return self.all_testcases - self[TestResultCode.TEST_RUN]
867 def all_failed(self):
869 self[TestResultCode.FAIL]
870 + self[TestResultCode.ERROR]
871 + self[TestResultCode.UNEXPECTED_PASS]
875 def parse_results(results):
877 Prints the number of scheduled, executed, not executed, passed, failed,
878 errored and skipped tests and details about failed and errored tests.
880 Also returns all suites where any test failed.
886 results_per_suite = AllResults()
889 for result in results:
890 result_code = results_per_suite.add_result(result)
893 elif result_code == -1:
896 results_per_suite.print_results()
904 return return_code, results_per_suite.rerun
907 if __name__ == "__main__":
908 print(f"Config is: {config}")
910 if config.api_preload:
911 VPPApiJSONFiles.load_api(apidir=config.extern_apidir + [config.vpp_install_dir])
914 print("Running sanity test case.")
916 rc = sanity_run_vpp.main()
919 except Exception as e:
920 print(traceback.format_exc())
921 print("Couldn't run sanity test case.")
924 test_finished_join_timeout = 15
926 debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
927 debug_core = config.debug == "core"
929 run_interactive = debug_gdb or config.step or config.force_foreground
931 max_concurrent_tests = 0
932 print(f"OS reports {num_cpus} available cpu(s).")
934 test_jobs = config.jobs
935 if test_jobs == "auto":
937 max_concurrent_tests = 1
938 print("Interactive mode required, running tests consecutively.")
940 max_concurrent_tests = num_cpus
942 f"Running at most {max_concurrent_tests} python test "
943 "processes concurrently."
946 max_concurrent_tests = test_jobs
948 f"Running at most {max_concurrent_tests} python test processes "
949 "concurrently as set by 'TEST_JOBS'."
952 print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
954 if run_interactive and max_concurrent_tests > 1:
955 raise NotImplementedError(
956 "Running tests interactively (DEBUG is gdb[server] or ATTACH or "
957 "STEP is set) in parallel (TEST_JOBS is more than 1) is not "
963 print("Running tests using custom test runner.")
964 filters = [(parse_test_filter(f)) for f in config.filter.split(",")]
967 "Selected filters: ",
969 f"file={filter_file}, class={filter_class}, function={filter_func}"
970 for filter_file, filter_class, filter_func in filters
974 filter_cb = FilterByTestOption(filters)
976 cb = SplitToSuitesCallback(filter_cb)
977 for d in config.test_src_dir:
978 print("Adding tests from directory tree %s" % d)
979 discover_tests(d, cb)
981 # suites are not hashable, need to use list
984 for testcase_suite in cb.suites.values():
985 tests_amount += testcase_suite.countTestCases()
986 if testcase_suite.cpus_used > max_vpp_cpus:
987 # here we replace test functions with lambdas to just skip them
988 # but we also replace setUp/tearDown functions to do nothing
989 # so that the test can be "started" and "stopped", so that we can
990 # still keep those prints (test description - SKIP), which are done
991 # in stopTest() (for that to trigger, test function must run)
992 for t in testcase_suite:
994 if m.startswith("test_"):
995 setattr(t, m, lambda: t.skipTest("not enough cpus"))
996 setattr(t.__class__, "setUpClass", lambda: None)
997 setattr(t.__class__, "tearDownClass", lambda: None)
998 setattr(t, "setUp", lambda: None)
999 setattr(t, "tearDown", lambda: None)
1000 t.__class__.skipped_due_to_cpu_lack = True
1001 suites.append(testcase_suite)
1004 "%s out of %s tests match specified filters"
1005 % (tests_amount, tests_amount + cb.filtered.countTestCases())
1008 if not config.extended:
1009 print("Not running extended tests (some tests will be skipped)")
1011 attempts = config.retries + 1
1013 print("Perform %s attempts to pass the suite..." % attempts)
1015 if run_interactive and suites:
1016 # don't fork if requiring interactive terminal
1017 print("Running tests in foreground in the current process")
1018 full_suite = unittest.TestSuite()
1019 free_cpus = list(available_cpus)
1020 cpu_shortage = False
1021 for suite in suites:
1022 if suite.cpus_used <= max_vpp_cpus:
1023 suite.assign_cpus(free_cpus[: suite.cpus_used])
1025 suite.assign_cpus([])
1027 full_suite.addTests(suites)
1028 result = VppTestRunner(
1029 verbosity=config.verbose, failfast=config.failfast, print_summary=True
1031 was_successful = result.wasSuccessful()
1032 if not was_successful:
1033 for test_case_info in result.failed_test_cases_info:
1034 handle_failed_suite(
1035 test_case_info.logger,
1036 test_case_info.tempdir,
1037 test_case_info.vpp_pid,
1040 if test_case_info in result.core_crash_test_cases_info:
1041 check_and_handle_core(
1042 test_case_info.vpp_bin_path,
1043 test_case_info.tempdir,
1044 test_case_info.core_crash_test,
1051 "SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
1052 " ENOUGH CPUS AVAILABLE",
1057 sys.exit(not was_successful)
1060 "Running each VPPTestCase in a separate background process"
1061 f" with at most {max_concurrent_tests} parallel python test "
1065 while suites and attempts > 0:
1066 for suite in suites:
1067 failed_link = get_failed_testcase_linkname(
1069 f"{get_testcase_dirname(suite._tests[0].__class__.__name__)}",
1071 if os.path.islink(failed_link):
1072 os.unlink(failed_link)
1073 results = run_forked(suites)
1074 exit_code, suites = parse_results(results)
1077 print("Test run was successful")
1079 print("%s attempt(s) left." % attempts)