14 from multiprocessing import Process, Pipe, get_context
15 from multiprocessing.queues import Queue
16 from multiprocessing.managers import BaseManager
18 from config import config, num_cpus, available_cpus, max_vpp_cpus
19 from framework import VppTestRunner, VppTestCase, \
20 get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
21 TEST_RUN, SKIP_CPU_SHORTAGE
22 from debug import spawn_gdb, start_vpp_in_gdb
23 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
24 colorize, single_line_delim
25 from discover_tests import discover_tests
27 from subprocess import check_output, CalledProcessError
28 from util import check_core_path, get_core_path, is_core_present
30 # timeout which controls how long the child has to finish after seeing
31 # a core dump in test temporary directory. If this is exceeded, parent assumes
32 # that child process is stuck (e.g. waiting for event from vpp) and kill
37 class StreamQueue(Queue):
42 sys.__stdout__.flush()
43 sys.__stderr__.flush()
46 return self._writer.fileno()
49 class StreamQueueManager(BaseManager):
53 StreamQueueManager.register('StreamQueue', StreamQueue)
56 class TestResult(dict):
57 def __init__(self, testcase_suite, testcases_by_id=None):
58 super(TestResult, self).__init__()
63 self[SKIP_CPU_SHORTAGE] = []
66 self.testcase_suite = testcase_suite
67 self.testcases = [testcase for testcase in testcase_suite]
68 self.testcases_by_id = testcases_by_id
70 def was_successful(self):
71 return 0 == len(self[FAIL]) == len(self[ERROR]) \
72 and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]) \
73 == self.testcase_suite.countTestCases()
75 def no_tests_run(self):
76 return 0 == len(self[TEST_RUN])
78 def process_result(self, test_id, result):
79 self[result].append(test_id)
81 def suite_from_failed(self):
83 for testcase in self.testcase_suite:
85 if tc_id not in self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]:
88 return suite_from_failed(self.testcase_suite, rerun_ids)
90 def get_testcase_names(self, test_id):
91 # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
92 setup_teardown_match = re.match(
93 r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
94 if setup_teardown_match:
95 test_name, _, _, testcase_name = setup_teardown_match.groups()
96 if len(testcase_name.split('.')) == 2:
97 for key in self.testcases_by_id.keys():
98 if key.startswith(testcase_name):
101 testcase_name = self._get_testcase_doc_name(testcase_name)
103 test_name = self._get_test_description(test_id)
104 testcase_name = self._get_testcase_doc_name(test_id)
106 return testcase_name, test_name
108 def _get_test_description(self, test_id):
109 if test_id in self.testcases_by_id:
110 desc = get_test_description(descriptions,
111 self.testcases_by_id[test_id])
116 def _get_testcase_doc_name(self, test_id):
117 if test_id in self.testcases_by_id:
118 doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
124 def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
125 finished_pipe, result_pipe, logger):
126 sys.stdout = stdouterr_queue
127 sys.stderr = stdouterr_queue
128 VppTestCase.parallel_handler = logger.handlers[0]
129 result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
130 descriptions=descriptions,
131 verbosity=config.verbose,
132 result_pipe=result_pipe,
133 failfast=config.failfast,
134 print_summary=False).run(suite)
135 finished_pipe.send(result.wasSuccessful())
136 finished_pipe.close()
137 keep_alive_pipe.close()
140 class TestCaseWrapper(object):
141 def __init__(self, testcase_suite, manager):
142 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
144 self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
145 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
146 self.testcase_suite = testcase_suite
147 self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
148 self.logger = get_parallel_logger(self.stdouterr_queue)
149 self.child = Process(target=test_runner_wrapper,
150 args=(testcase_suite,
151 self.keep_alive_child_end,
152 self.stdouterr_queue,
153 self.finished_child_end,
154 self.result_child_end,
158 self.last_test_temp_dir = None
159 self.last_test_vpp_binary = None
160 self._last_test = None
161 self.last_test_id = None
163 self.last_heard = time.time()
164 self.core_detected_at = None
165 self.testcases_by_id = {}
166 self.testclasess_with_core = {}
167 for testcase in self.testcase_suite:
168 self.testcases_by_id[testcase.id()] = testcase
169 self.result = TestResult(testcase_suite, self.testcases_by_id)
173 return self._last_test
176 def last_test(self, test_id):
177 self.last_test_id = test_id
178 if test_id in self.testcases_by_id:
179 testcase = self.testcases_by_id[test_id]
180 self._last_test = testcase.shortDescription()
181 if not self._last_test:
182 self._last_test = str(testcase)
184 self._last_test = test_id
186 def add_testclass_with_core(self):
187 if self.last_test_id in self.testcases_by_id:
188 test = self.testcases_by_id[self.last_test_id]
189 class_name = unittest.util.strclass(test.__class__)
190 test_name = "'{}' ({})".format(get_test_description(descriptions,
194 test_name = self.last_test_id
195 class_name = re.match(r'((tearDownClass)|(setUpClass)) '
196 r'\((.+\..+)\)', test_name).groups()[3]
197 if class_name not in self.testclasess_with_core:
198 self.testclasess_with_core[class_name] = (
200 self.last_test_vpp_binary,
201 self.last_test_temp_dir)
203 def close_pipes(self):
204 self.keep_alive_child_end.close()
205 self.finished_child_end.close()
206 self.result_child_end.close()
207 self.keep_alive_parent_end.close()
208 self.finished_parent_end.close()
209 self.result_parent_end.close()
211 def was_successful(self):
212 return self.result.was_successful()
216 return self.testcase_suite.cpus_used
218 def get_assigned_cpus(self):
219 return self.testcase_suite.get_assigned_cpus()
222 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
225 while read_testcases.is_set() or unread_testcases:
226 if finished_unread_testcases:
227 read_testcase = finished_unread_testcases.pop()
228 unread_testcases.remove(read_testcase)
229 elif unread_testcases:
230 read_testcase = unread_testcases.pop()
233 while data is not None:
234 sys.stdout.write(data)
235 data = read_testcase.stdouterr_queue.get()
237 read_testcase.stdouterr_queue.close()
238 finished_unread_testcases.discard(read_testcase)
242 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid):
243 if last_test_temp_dir:
244 # Need to create link in case of a timeout or core dump without failure
245 lttd = os.path.basename(last_test_temp_dir)
246 link_path = '%s%s-FAILED' % (config.failed_dir, lttd)
247 if not os.path.exists(link_path):
248 os.symlink(last_test_temp_dir, link_path)
249 logger.error("Symlink to failed testcase directory: %s -> %s"
252 # Report core existence
253 core_path = get_core_path(last_test_temp_dir)
254 if os.path.exists(core_path):
256 "Core-file exists in test temporary directory: %s!" %
258 check_core_path(logger, core_path)
259 logger.debug("Running 'file %s':" % core_path)
261 info = check_output(["file", core_path])
263 except CalledProcessError as e:
264 logger.error("Subprocess returned with return code "
265 "while running `file' utility on core-file "
267 "rc=%s", e.returncode)
269 logger.error("Subprocess returned with OS error while "
270 "running 'file' utility "
272 "(%s) %s", e.errno, e.strerror)
273 except Exception as e:
274 logger.exception("Unexpected error running `file' utility "
276 logger.error(f"gdb {config.vpp_bin} {core_path}")
279 # Copy api post mortem
280 api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
281 if os.path.isfile(api_post_mortem_path):
282 logger.error("Copying api_post_mortem.%d to %s" %
283 (vpp_pid, last_test_temp_dir))
284 shutil.copy2(api_post_mortem_path, last_test_temp_dir)
287 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
288 if is_core_present(tempdir):
290 print('VPP core detected in %s. Last test running was %s' %
291 (tempdir, core_crash_test))
292 print(single_line_delim)
293 spawn_gdb(vpp_binary, get_core_path(tempdir))
294 print(single_line_delim)
295 elif config.compress_core:
296 print("Compressing core-file in test directory `%s'" % tempdir)
297 os.system("gzip %s" % get_core_path(tempdir))
300 def handle_cores(failed_testcases):
301 for failed_testcase in failed_testcases:
302 tcs_with_core = failed_testcase.testclasess_with_core
304 for test, vpp_binary, tempdir in tcs_with_core.values():
305 check_and_handle_core(vpp_binary, tempdir, test)
308 def process_finished_testsuite(wrapped_testcase_suite,
309 finished_testcase_suites,
310 failed_wrapped_testcases,
312 results.append(wrapped_testcase_suite.result)
313 finished_testcase_suites.add(wrapped_testcase_suite)
315 if config.failfast and not wrapped_testcase_suite.was_successful():
318 if not wrapped_testcase_suite.was_successful():
319 failed_wrapped_testcases.add(wrapped_testcase_suite)
320 handle_failed_suite(wrapped_testcase_suite.logger,
321 wrapped_testcase_suite.last_test_temp_dir,
322 wrapped_testcase_suite.vpp_pid)
327 def run_forked(testcase_suites):
328 wrapped_testcase_suites = set()
329 solo_testcase_suites = []
331 # suites are unhashable, need to use list
333 unread_testcases = set()
334 finished_unread_testcases = set()
335 manager = StreamQueueManager()
338 free_cpus = list(available_cpus)
340 def on_suite_start(tc):
341 nonlocal tests_running
343 tests_running = tests_running + 1
345 def on_suite_finish(tc):
346 nonlocal tests_running
348 tests_running = tests_running - 1
349 assert tests_running >= 0
350 free_cpus.extend(tc.get_assigned_cpus())
352 def run_suite(suite):
354 nonlocal wrapped_testcase_suites
355 nonlocal unread_testcases
357 suite.assign_cpus(free_cpus[:suite.cpus_used])
358 free_cpus = free_cpus[suite.cpus_used:]
359 wrapper = TestCaseWrapper(suite, manager)
360 wrapped_testcase_suites.add(wrapper)
361 unread_testcases.add(wrapper)
362 on_suite_start(suite)
364 def can_run_suite(suite):
365 return (tests_running < max_concurrent_tests and
366 (suite.cpus_used <= len(free_cpus) or
367 suite.cpus_used > max_vpp_cpus))
369 while free_cpus and testcase_suites:
370 a_suite = testcase_suites[0]
371 if a_suite.is_tagged_run_solo:
372 a_suite = testcase_suites.pop(0)
373 solo_testcase_suites.append(a_suite)
375 if can_run_suite(a_suite):
376 a_suite = testcase_suites.pop(0)
381 if tests_running == 0 and solo_testcase_suites:
382 a_suite = solo_testcase_suites.pop(0)
385 read_from_testcases = threading.Event()
386 read_from_testcases.set()
387 stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
388 args=(unread_testcases,
389 finished_unread_testcases,
390 read_from_testcases))
391 stdouterr_thread.start()
393 failed_wrapped_testcases = set()
397 while wrapped_testcase_suites:
398 finished_testcase_suites = set()
399 for wrapped_testcase_suite in wrapped_testcase_suites:
400 while wrapped_testcase_suite.result_parent_end.poll():
401 wrapped_testcase_suite.result.process_result(
402 *wrapped_testcase_suite.result_parent_end.recv())
403 wrapped_testcase_suite.last_heard = time.time()
405 while wrapped_testcase_suite.keep_alive_parent_end.poll():
406 wrapped_testcase_suite.last_test, \
407 wrapped_testcase_suite.last_test_vpp_binary, \
408 wrapped_testcase_suite.last_test_temp_dir, \
409 wrapped_testcase_suite.vpp_pid = \
410 wrapped_testcase_suite.keep_alive_parent_end.recv()
411 wrapped_testcase_suite.last_heard = time.time()
413 if wrapped_testcase_suite.finished_parent_end.poll():
414 wrapped_testcase_suite.finished_parent_end.recv()
415 wrapped_testcase_suite.last_heard = time.time()
416 stop_run = process_finished_testsuite(
417 wrapped_testcase_suite,
418 finished_testcase_suites,
419 failed_wrapped_testcases,
424 if wrapped_testcase_suite.last_heard + config.timeout < \
427 wrapped_testcase_suite.logger.critical(
428 "Child test runner process timed out "
429 "(last test running was `%s' in `%s')!" %
430 (wrapped_testcase_suite.last_test,
431 wrapped_testcase_suite.last_test_temp_dir))
432 elif not wrapped_testcase_suite.child.is_alive():
434 wrapped_testcase_suite.logger.critical(
435 "Child test runner process unexpectedly died "
436 "(last test running was `%s' in `%s')!" %
437 (wrapped_testcase_suite.last_test,
438 wrapped_testcase_suite.last_test_temp_dir))
439 elif wrapped_testcase_suite.last_test_temp_dir and \
440 wrapped_testcase_suite.last_test_vpp_binary:
442 wrapped_testcase_suite.last_test_temp_dir):
443 wrapped_testcase_suite.add_testclass_with_core()
444 if wrapped_testcase_suite.core_detected_at is None:
445 wrapped_testcase_suite.core_detected_at = \
447 elif wrapped_testcase_suite.core_detected_at + \
448 core_timeout < time.time():
449 wrapped_testcase_suite.logger.critical(
450 "Child test runner process unresponsive and "
451 "core-file exists in test temporary directory "
452 "(last test running was `%s' in `%s')!" %
453 (wrapped_testcase_suite.last_test,
454 wrapped_testcase_suite.last_test_temp_dir))
458 wrapped_testcase_suite.child.terminate()
460 # terminating the child process tends to leave orphan
462 if wrapped_testcase_suite.vpp_pid:
463 os.kill(wrapped_testcase_suite.vpp_pid,
468 wrapped_testcase_suite.result.crashed = True
469 wrapped_testcase_suite.result.process_result(
470 wrapped_testcase_suite.last_test_id, ERROR)
471 stop_run = process_finished_testsuite(
472 wrapped_testcase_suite,
473 finished_testcase_suites,
474 failed_wrapped_testcases,
477 for finished_testcase in finished_testcase_suites:
478 # Somewhat surprisingly, the join below may
479 # timeout, even if client signaled that
480 # it finished - so we note it just in case.
481 join_start = time.time()
482 finished_testcase.child.join(test_finished_join_timeout)
483 join_end = time.time()
484 if join_end - join_start >= test_finished_join_timeout:
485 finished_testcase.logger.error(
486 "Timeout joining finished test: %s (pid %d)" %
487 (finished_testcase.last_test,
488 finished_testcase.child.pid))
489 finished_testcase.close_pipes()
490 wrapped_testcase_suites.remove(finished_testcase)
491 finished_unread_testcases.add(finished_testcase)
492 finished_testcase.stdouterr_queue.put(None)
493 on_suite_finish(finished_testcase)
495 while testcase_suites:
496 results.append(TestResult(testcase_suites.pop(0)))
497 elif testcase_suites:
498 a_suite = testcase_suites.pop(0)
499 while a_suite and a_suite.is_tagged_run_solo:
500 solo_testcase_suites.append(a_suite)
502 a_suite = testcase_suites.pop(0)
505 if a_suite and can_run_suite(a_suite):
507 if solo_testcase_suites and tests_running == 0:
508 a_suite = solo_testcase_suites.pop(0)
512 for wrapped_testcase_suite in wrapped_testcase_suites:
513 wrapped_testcase_suite.child.terminate()
514 wrapped_testcase_suite.stdouterr_queue.put(None)
517 read_from_testcases.clear()
518 stdouterr_thread.join(config.timeout)
521 handle_cores(failed_wrapped_testcases)
525 class TestSuiteWrapper(unittest.TestSuite):
529 return super().__init__()
531 def addTest(self, test):
532 self.cpus_used = max(self.cpus_used, test.get_cpus_required())
533 super().addTest(test)
535 def assign_cpus(self, cpus):
538 def _handleClassSetUp(self, test, result):
539 if not test.__class__.skipped_due_to_cpu_lack:
540 test.assign_cpus(self.cpus)
541 super()._handleClassSetUp(test, result)
543 def get_assigned_cpus(self):
547 class SplitToSuitesCallback:
548 def __init__(self, filter_callback):
550 self.suite_name = 'default'
551 self.filter_callback = filter_callback
552 self.filtered = TestSuiteWrapper()
554 def __call__(self, file_name, cls, method):
555 test_method = cls(method)
556 if self.filter_callback(file_name, cls.__name__, method):
557 self.suite_name = file_name + cls.__name__
558 if self.suite_name not in self.suites:
559 self.suites[self.suite_name] = TestSuiteWrapper()
560 self.suites[self.suite_name].is_tagged_run_solo = False
561 self.suites[self.suite_name].addTest(test_method)
562 if test_method.is_tagged_run_solo():
563 self.suites[self.suite_name].is_tagged_run_solo = True
566 self.filtered.addTest(test_method)
569 def parse_test_filter(test_filter):
571 filter_file_name = None
572 filter_class_name = None
573 filter_func_name = None
578 raise Exception("Unrecognized %s option: %s" %
581 if parts[2] not in ('*', ''):
582 filter_func_name = parts[2]
583 if parts[1] not in ('*', ''):
584 filter_class_name = parts[1]
585 if parts[0] not in ('*', ''):
586 if parts[0].startswith('test_'):
587 filter_file_name = parts[0]
589 filter_file_name = 'test_%s' % parts[0]
591 if f.startswith('test_'):
594 filter_file_name = 'test_%s' % f
596 filter_file_name = '%s.py' % filter_file_name
597 return filter_file_name, filter_class_name, filter_func_name
600 def filter_tests(tests, filter_cb):
601 result = TestSuiteWrapper()
603 if isinstance(t, unittest.suite.TestSuite):
604 # this is a bunch of tests, recursively filter...
605 x = filter_tests(t, filter_cb)
606 if x.countTestCases() > 0:
608 elif isinstance(t, unittest.TestCase):
609 # this is a single test
610 parts = t.id().split('.')
611 # t.id() for common cases like this:
612 # test_classifier.TestClassifier.test_acl_ip
613 # apply filtering only if it is so
615 if not filter_cb(parts[0], parts[1], parts[2]):
619 # unexpected object, don't touch it
624 class FilterByTestOption:
625 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
626 self.filter_file_name = filter_file_name
627 self.filter_class_name = filter_class_name
628 self.filter_func_name = filter_func_name
630 def __call__(self, file_name, class_name, func_name):
631 if self.filter_file_name:
632 fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
635 if self.filter_class_name and class_name != self.filter_class_name:
637 if self.filter_func_name and func_name != self.filter_func_name:
642 class FilterByClassList:
643 def __init__(self, classes_with_filenames):
644 self.classes_with_filenames = classes_with_filenames
646 def __call__(self, file_name, class_name, func_name):
647 return '.'.join([file_name, class_name]) in self.classes_with_filenames
650 def suite_from_failed(suite, failed):
651 failed = {x.rsplit('.', 1)[0] for x in failed}
652 filter_cb = FilterByClassList(failed)
653 suite = filter_tests(suite, filter_cb)
657 class AllResults(dict):
659 super(AllResults, self).__init__()
660 self.all_testcases = 0
661 self.results_per_suite = []
666 self[SKIP_CPU_SHORTAGE] = 0
669 self.testsuites_no_tests_run = []
671 def add_results(self, result):
672 self.results_per_suite.append(result)
673 result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN, SKIP_CPU_SHORTAGE]
674 for result_type in result_types:
675 self[result_type] += len(result[result_type])
677 def add_result(self, result):
679 self.all_testcases += result.testcase_suite.countTestCases()
680 self.add_results(result)
682 if result.no_tests_run():
683 self.testsuites_no_tests_run.append(result.testcase_suite)
688 elif not result.was_successful():
692 self.rerun.append(result.testcase_suite)
696 def print_results(self):
698 print(double_line_delim)
699 print('TEST RESULTS:')
701 def indent_results(lines):
702 lines = list(filter(None, lines))
703 maximum = max(lines, key=lambda x: x.index(":"))
704 maximum = 4 + maximum.index(":")
706 padding = " " * (maximum - l.index(":"))
707 print(f"{padding}{l}")
710 f'Scheduled tests: {self.all_testcases}',
711 f'Executed tests: {self[TEST_RUN]}',
712 f'Passed tests: {colorize(self[PASS], GREEN)}',
713 f'Skipped tests: {colorize(self[SKIP], YELLOW)}'
714 if self[SKIP] else None,
715 f'Not Executed tests: {colorize(self.not_executed, RED)}'
716 if self.not_executed else None,
717 f'Failures: {colorize(self[FAIL], RED)}' if self[FAIL] else None,
718 f'Errors: {colorize(self[ERROR], RED)}' if self[ERROR] else None,
719 'Tests skipped due to lack of CPUS: '
720 f'{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}'
721 if self[SKIP_CPU_SHORTAGE] else None
724 if self.all_failed > 0:
725 print('FAILURES AND ERRORS IN TESTS:')
726 for result in self.results_per_suite:
727 failed_testcase_ids = result[FAIL]
728 errored_testcase_ids = result[ERROR]
729 old_testcase_name = None
730 if failed_testcase_ids:
731 for failed_test_id in failed_testcase_ids:
732 new_testcase_name, test_name = \
733 result.get_testcase_names(failed_test_id)
734 if new_testcase_name != old_testcase_name:
735 print(' Testcase name: {}'.format(
736 colorize(new_testcase_name, RED)))
737 old_testcase_name = new_testcase_name
738 print(' FAILURE: {} [{}]'.format(
739 colorize(test_name, RED), failed_test_id))
740 if errored_testcase_ids:
741 for errored_test_id in errored_testcase_ids:
742 new_testcase_name, test_name = \
743 result.get_testcase_names(errored_test_id)
744 if new_testcase_name != old_testcase_name:
745 print(' Testcase name: {}'.format(
746 colorize(new_testcase_name, RED)))
747 old_testcase_name = new_testcase_name
748 print(' ERROR: {} [{}]'.format(
749 colorize(test_name, RED), errored_test_id))
750 if self.testsuites_no_tests_run:
751 print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
753 for testsuite in self.testsuites_no_tests_run:
754 for testcase in testsuite:
755 tc_classes.add(get_testcase_doc_name(testcase))
756 for tc_class in tc_classes:
757 print(' {}'.format(colorize(tc_class, RED)))
759 if self[SKIP_CPU_SHORTAGE]:
761 print(colorize(' SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
762 ' ENOUGH CPUS AVAILABLE', YELLOW))
763 print(double_line_delim)
767 def not_executed(self):
768 return self.all_testcases - self[TEST_RUN]
771 def all_failed(self):
772 return self[FAIL] + self[ERROR]
775 def parse_results(results):
777 Prints the number of scheduled, executed, not executed, passed, failed,
778 errored and skipped tests and details about failed and errored tests.
780 Also returns all suites where any test failed.
786 results_per_suite = AllResults()
789 for result in results:
790 result_code = results_per_suite.add_result(result)
793 elif result_code == -1:
796 results_per_suite.print_results()
804 return return_code, results_per_suite.rerun
807 if __name__ == '__main__':
809 print(f"Config is: {config}")
812 print("Running sanity test case.")
814 rc = sanity_run_vpp.main()
817 except Exception as e:
818 print(traceback.format_exc())
819 print("Couldn't run sanity test case.")
822 test_finished_join_timeout = 15
824 debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
825 debug_core = config.debug == "core"
827 run_interactive = debug_gdb or config.step or config.force_foreground
829 max_concurrent_tests = 0
830 print(f"OS reports {num_cpus} available cpu(s).")
832 test_jobs = config.jobs
833 if test_jobs == 'auto':
835 max_concurrent_tests = 1
836 print('Interactive mode required, running tests consecutively.')
838 max_concurrent_tests = num_cpus
839 print(f"Running at most {max_concurrent_tests} python test "
840 "processes concurrently.")
842 max_concurrent_tests = test_jobs
843 print(f"Running at most {max_concurrent_tests} python test processes "
844 "concurrently as set by 'TEST_JOBS'.")
846 print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
848 if run_interactive and max_concurrent_tests > 1:
849 raise NotImplementedError(
850 'Running tests interactively (DEBUG is gdb[server] or ATTACH or '
851 'STEP is set) in parallel (TEST_JOBS is more than 1) is not '
856 print("Running tests using custom test runner.")
857 filter_file, filter_class, filter_func = \
858 parse_test_filter(config.filter)
860 print("Selected filters: file=%s, class=%s, function=%s" % (
861 filter_file, filter_class, filter_func))
863 filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
865 ignore_path = config.venv_dir
866 cb = SplitToSuitesCallback(filter_cb)
867 for d in config.test_src_dir:
868 print("Adding tests from directory tree %s" % d)
869 discover_tests(d, cb, ignore_path)
871 # suites are not hashable, need to use list
874 for testcase_suite in cb.suites.values():
875 tests_amount += testcase_suite.countTestCases()
876 if testcase_suite.cpus_used > max_vpp_cpus:
877 # here we replace test functions with lambdas to just skip them
878 # but we also replace setUp/tearDown functions to do nothing
879 # so that the test can be "started" and "stopped", so that we can
880 # still keep those prints (test description - SKIP), which are done
881 # in stopTest() (for that to trigger, test function must run)
882 for t in testcase_suite:
884 if m.startswith('test_'):
885 setattr(t, m, lambda: t.skipTest("not enough cpus"))
886 setattr(t.__class__, 'setUpClass', lambda: None)
887 setattr(t.__class__, 'tearDownClass', lambda: None)
888 setattr(t, 'setUp', lambda: None)
889 setattr(t, 'tearDown', lambda: None)
890 t.__class__.skipped_due_to_cpu_lack = True
891 suites.append(testcase_suite)
893 print("%s out of %s tests match specified filters" % (
894 tests_amount, tests_amount + cb.filtered.countTestCases()))
896 if not config.extended:
897 print("Not running extended tests (some tests will be skipped)")
899 attempts = config.retries + 1
901 print("Perform %s attempts to pass the suite..." % attempts)
903 if run_interactive and suites:
904 # don't fork if requiring interactive terminal
905 print('Running tests in foreground in the current process')
906 full_suite = unittest.TestSuite()
907 free_cpus = list(available_cpus)
910 if suite.cpus_used <= max_vpp_cpus:
911 suite.assign_cpus(free_cpus[:suite.cpus_used])
913 suite.assign_cpus([])
915 full_suite.addTests(suites)
916 result = VppTestRunner(verbosity=config.verbose,
917 failfast=config.failfast,
918 print_summary=True).run(full_suite)
919 was_successful = result.wasSuccessful()
920 if not was_successful:
921 for test_case_info in result.failed_test_cases_info:
922 handle_failed_suite(test_case_info.logger,
923 test_case_info.tempdir,
924 test_case_info.vpp_pid)
925 if test_case_info in result.core_crash_test_cases_info:
926 check_and_handle_core(test_case_info.vpp_bin_path,
927 test_case_info.tempdir,
928 test_case_info.core_crash_test)
932 print(colorize('SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
933 ' ENOUGH CPUS AVAILABLE', YELLOW))
935 sys.exit(not was_successful)
937 print('Running each VPPTestCase in a separate background process'
938 f' with at most {max_concurrent_tests} parallel python test '
941 while suites and attempts > 0:
942 results = run_forked(suites)
943 exit_code, suites = parse_results(results)
946 print('Test run was successful')
948 print('%s attempt(s) left.' % attempts)