14 from multiprocessing import Process, Pipe, get_context
15 from multiprocessing.queues import Queue
16 from multiprocessing.managers import BaseManager
18 from config import config, num_cpus, available_cpus, max_vpp_cpus
19 from framework import VppTestRunner, VppTestCase, \
20 get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
21 TEST_RUN, SKIP_CPU_SHORTAGE
22 from debug import spawn_gdb, start_vpp_in_gdb
23 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
24 colorize, single_line_delim
25 from discover_tests import discover_tests
27 from subprocess import check_output, CalledProcessError
28 from util import check_core_path, get_core_path, is_core_present
30 # timeout which controls how long the child has to finish after seeing
31 # a core dump in test temporary directory. If this is exceeded, parent assumes
32 # that child process is stuck (e.g. waiting for event from vpp) and kill
37 class StreamQueue(Queue):
42 sys.__stdout__.flush()
43 sys.__stderr__.flush()
46 return self._writer.fileno()
49 class StreamQueueManager(BaseManager):
53 StreamQueueManager.register('StreamQueue', StreamQueue)
56 class TestResult(dict):
57 def __init__(self, testcase_suite, testcases_by_id=None):
58 super(TestResult, self).__init__()
63 self[SKIP_CPU_SHORTAGE] = []
66 self.testcase_suite = testcase_suite
67 self.testcases = [testcase for testcase in testcase_suite]
68 self.testcases_by_id = testcases_by_id
70 def was_successful(self):
71 return 0 == len(self[FAIL]) == len(self[ERROR]) \
72 and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]) \
73 == self.testcase_suite.countTestCases()
75 def no_tests_run(self):
76 return 0 == len(self[TEST_RUN])
78 def process_result(self, test_id, result):
79 self[result].append(test_id)
81 def suite_from_failed(self):
83 for testcase in self.testcase_suite:
85 if tc_id not in self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]:
88 return suite_from_failed(self.testcase_suite, rerun_ids)
90 def get_testcase_names(self, test_id):
91 # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
92 setup_teardown_match = re.match(
93 r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
94 if setup_teardown_match:
95 test_name, _, _, testcase_name = setup_teardown_match.groups()
96 if len(testcase_name.split('.')) == 2:
97 for key in self.testcases_by_id.keys():
98 if key.startswith(testcase_name):
101 testcase_name = self._get_testcase_doc_name(testcase_name)
103 test_name = self._get_test_description(test_id)
104 testcase_name = self._get_testcase_doc_name(test_id)
106 return testcase_name, test_name
108 def _get_test_description(self, test_id):
109 if test_id in self.testcases_by_id:
110 desc = get_test_description(descriptions,
111 self.testcases_by_id[test_id])
116 def _get_testcase_doc_name(self, test_id):
117 if test_id in self.testcases_by_id:
118 doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
124 def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
125 finished_pipe, result_pipe, logger):
126 sys.stdout = stdouterr_queue
127 sys.stderr = stdouterr_queue
128 VppTestCase.parallel_handler = logger.handlers[0]
129 result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
130 descriptions=descriptions,
131 verbosity=config.verbose,
132 result_pipe=result_pipe,
133 failfast=config.failfast,
134 print_summary=False).run(suite)
135 finished_pipe.send(result.wasSuccessful())
136 finished_pipe.close()
137 keep_alive_pipe.close()
140 class TestCaseWrapper(object):
141 def __init__(self, testcase_suite, manager):
142 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
144 self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
145 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
146 self.testcase_suite = testcase_suite
147 self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
148 self.logger = get_parallel_logger(self.stdouterr_queue)
149 self.child = Process(target=test_runner_wrapper,
150 args=(testcase_suite,
151 self.keep_alive_child_end,
152 self.stdouterr_queue,
153 self.finished_child_end,
154 self.result_child_end,
158 self.last_test_temp_dir = None
159 self.last_test_vpp_binary = None
160 self._last_test = None
161 self.last_test_id = None
163 self.last_heard = time.time()
164 self.core_detected_at = None
165 self.testcases_by_id = {}
166 self.testclasess_with_core = {}
167 for testcase in self.testcase_suite:
168 self.testcases_by_id[testcase.id()] = testcase
169 self.result = TestResult(testcase_suite, self.testcases_by_id)
173 return self._last_test
176 def last_test(self, test_id):
177 self.last_test_id = test_id
178 if test_id in self.testcases_by_id:
179 testcase = self.testcases_by_id[test_id]
180 self._last_test = testcase.shortDescription()
181 if not self._last_test:
182 self._last_test = str(testcase)
184 self._last_test = test_id
186 def add_testclass_with_core(self):
187 if self.last_test_id in self.testcases_by_id:
188 test = self.testcases_by_id[self.last_test_id]
189 class_name = unittest.util.strclass(test.__class__)
190 test_name = "'{}' ({})".format(get_test_description(descriptions,
194 test_name = self.last_test_id
195 class_name = re.match(r'((tearDownClass)|(setUpClass)) '
196 r'\((.+\..+)\)', test_name).groups()[3]
197 if class_name not in self.testclasess_with_core:
198 self.testclasess_with_core[class_name] = (
200 self.last_test_vpp_binary,
201 self.last_test_temp_dir)
203 def close_pipes(self):
204 self.keep_alive_child_end.close()
205 self.finished_child_end.close()
206 self.result_child_end.close()
207 self.keep_alive_parent_end.close()
208 self.finished_parent_end.close()
209 self.result_parent_end.close()
211 def was_successful(self):
212 return self.result.was_successful()
216 return self.testcase_suite.cpus_used
218 def get_assigned_cpus(self):
219 return self.testcase_suite.get_assigned_cpus()
222 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
225 while read_testcases.is_set() or unread_testcases:
226 if finished_unread_testcases:
227 read_testcase = finished_unread_testcases.pop()
228 unread_testcases.remove(read_testcase)
229 elif unread_testcases:
230 read_testcase = unread_testcases.pop()
233 while data is not None:
234 sys.stdout.write(data)
235 data = read_testcase.stdouterr_queue.get()
237 read_testcase.stdouterr_queue.close()
238 finished_unread_testcases.discard(read_testcase)
242 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
243 if last_test_temp_dir:
244 # Need to create link in case of a timeout or core dump without failure
245 lttd = os.path.basename(last_test_temp_dir)
246 link_path = '%s%s-FAILED' % (config.failed_dir, lttd)
247 if not os.path.exists(link_path):
248 os.symlink(last_test_temp_dir, link_path)
249 logger.error("Symlink to failed testcase directory: %s -> %s"
252 # Report core existence
253 core_path = get_core_path(last_test_temp_dir)
254 if os.path.exists(core_path):
256 "Core-file exists in test temporary directory: %s!" %
258 check_core_path(logger, core_path)
259 logger.debug("Running 'file %s':" % core_path)
261 info = check_output(["file", core_path])
263 except CalledProcessError as e:
264 logger.error("Subprocess returned with return code "
265 "while running `file' utility on core-file "
267 "rc=%s", e.returncode)
269 logger.error("Subprocess returned with OS error while "
270 "running 'file' utility "
272 "(%s) %s", e.errno, e.strerror)
273 except Exception as e:
274 logger.exception("Unexpected error running `file' utility "
276 logger.error(f"gdb {vpp_binary} {core_path}")
279 # Copy api post mortem
280 api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
281 if os.path.isfile(api_post_mortem_path):
282 logger.error("Copying api_post_mortem.%d to %s" %
283 (vpp_pid, last_test_temp_dir))
284 shutil.copy2(api_post_mortem_path, last_test_temp_dir)
287 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
288 if is_core_present(tempdir):
290 print('VPP core detected in %s. Last test running was %s' %
291 (tempdir, core_crash_test))
292 print(single_line_delim)
293 spawn_gdb(vpp_binary, get_core_path(tempdir))
294 print(single_line_delim)
295 elif config.compress_core:
296 print("Compressing core-file in test directory `%s'" % tempdir)
297 os.system("gzip %s" % get_core_path(tempdir))
300 def handle_cores(failed_testcases):
301 for failed_testcase in failed_testcases:
302 tcs_with_core = failed_testcase.testclasess_with_core
304 for test, vpp_binary, tempdir in tcs_with_core.values():
305 check_and_handle_core(vpp_binary, tempdir, test)
308 def process_finished_testsuite(wrapped_testcase_suite,
309 finished_testcase_suites,
310 failed_wrapped_testcases,
312 results.append(wrapped_testcase_suite.result)
313 finished_testcase_suites.add(wrapped_testcase_suite)
315 if config.failfast and not wrapped_testcase_suite.was_successful():
318 if not wrapped_testcase_suite.was_successful():
319 failed_wrapped_testcases.add(wrapped_testcase_suite)
320 handle_failed_suite(wrapped_testcase_suite.logger,
321 wrapped_testcase_suite.last_test_temp_dir,
322 wrapped_testcase_suite.vpp_pid,
323 wrapped_testcase_suite.last_test_vpp_binary,)
328 def run_forked(testcase_suites):
329 wrapped_testcase_suites = set()
330 solo_testcase_suites = []
332 # suites are unhashable, need to use list
334 unread_testcases = set()
335 finished_unread_testcases = set()
336 manager = StreamQueueManager()
339 free_cpus = list(available_cpus)
341 def on_suite_start(tc):
342 nonlocal tests_running
344 tests_running = tests_running + 1
346 def on_suite_finish(tc):
347 nonlocal tests_running
349 tests_running = tests_running - 1
350 assert tests_running >= 0
351 free_cpus.extend(tc.get_assigned_cpus())
353 def run_suite(suite):
355 nonlocal wrapped_testcase_suites
356 nonlocal unread_testcases
358 suite.assign_cpus(free_cpus[:suite.cpus_used])
359 free_cpus = free_cpus[suite.cpus_used:]
360 wrapper = TestCaseWrapper(suite, manager)
361 wrapped_testcase_suites.add(wrapper)
362 unread_testcases.add(wrapper)
363 on_suite_start(suite)
365 def can_run_suite(suite):
366 return (tests_running < max_concurrent_tests and
367 (suite.cpus_used <= len(free_cpus) or
368 suite.cpus_used > max_vpp_cpus))
370 while free_cpus and testcase_suites:
371 a_suite = testcase_suites[0]
372 if a_suite.is_tagged_run_solo:
373 a_suite = testcase_suites.pop(0)
374 solo_testcase_suites.append(a_suite)
376 if can_run_suite(a_suite):
377 a_suite = testcase_suites.pop(0)
382 if tests_running == 0 and solo_testcase_suites:
383 a_suite = solo_testcase_suites.pop(0)
386 read_from_testcases = threading.Event()
387 read_from_testcases.set()
388 stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
389 args=(unread_testcases,
390 finished_unread_testcases,
391 read_from_testcases))
392 stdouterr_thread.start()
394 failed_wrapped_testcases = set()
398 while wrapped_testcase_suites:
399 finished_testcase_suites = set()
400 for wrapped_testcase_suite in wrapped_testcase_suites:
401 while wrapped_testcase_suite.result_parent_end.poll():
402 wrapped_testcase_suite.result.process_result(
403 *wrapped_testcase_suite.result_parent_end.recv())
404 wrapped_testcase_suite.last_heard = time.time()
406 while wrapped_testcase_suite.keep_alive_parent_end.poll():
407 wrapped_testcase_suite.last_test, \
408 wrapped_testcase_suite.last_test_vpp_binary, \
409 wrapped_testcase_suite.last_test_temp_dir, \
410 wrapped_testcase_suite.vpp_pid = \
411 wrapped_testcase_suite.keep_alive_parent_end.recv()
412 wrapped_testcase_suite.last_heard = time.time()
414 if wrapped_testcase_suite.finished_parent_end.poll():
415 wrapped_testcase_suite.finished_parent_end.recv()
416 wrapped_testcase_suite.last_heard = time.time()
417 stop_run = process_finished_testsuite(
418 wrapped_testcase_suite,
419 finished_testcase_suites,
420 failed_wrapped_testcases,
425 if wrapped_testcase_suite.last_heard + config.timeout < \
428 wrapped_testcase_suite.logger.critical(
429 "Child test runner process timed out "
430 "(last test running was `%s' in `%s')!" %
431 (wrapped_testcase_suite.last_test,
432 wrapped_testcase_suite.last_test_temp_dir))
433 elif not wrapped_testcase_suite.child.is_alive():
435 wrapped_testcase_suite.logger.critical(
436 "Child test runner process unexpectedly died "
437 "(last test running was `%s' in `%s')!" %
438 (wrapped_testcase_suite.last_test,
439 wrapped_testcase_suite.last_test_temp_dir))
440 elif wrapped_testcase_suite.last_test_temp_dir and \
441 wrapped_testcase_suite.last_test_vpp_binary:
443 wrapped_testcase_suite.last_test_temp_dir):
444 wrapped_testcase_suite.add_testclass_with_core()
445 if wrapped_testcase_suite.core_detected_at is None:
446 wrapped_testcase_suite.core_detected_at = \
448 elif wrapped_testcase_suite.core_detected_at + \
449 core_timeout < time.time():
450 wrapped_testcase_suite.logger.critical(
451 "Child test runner process unresponsive and "
452 "core-file exists in test temporary directory "
453 "(last test running was `%s' in `%s')!" %
454 (wrapped_testcase_suite.last_test,
455 wrapped_testcase_suite.last_test_temp_dir))
459 wrapped_testcase_suite.child.terminate()
461 # terminating the child process tends to leave orphan
463 if wrapped_testcase_suite.vpp_pid:
464 os.kill(wrapped_testcase_suite.vpp_pid,
469 wrapped_testcase_suite.result.crashed = True
470 wrapped_testcase_suite.result.process_result(
471 wrapped_testcase_suite.last_test_id, ERROR)
472 stop_run = process_finished_testsuite(
473 wrapped_testcase_suite,
474 finished_testcase_suites,
475 failed_wrapped_testcases,
478 for finished_testcase in finished_testcase_suites:
479 # Somewhat surprisingly, the join below may
480 # timeout, even if client signaled that
481 # it finished - so we note it just in case.
482 join_start = time.time()
483 finished_testcase.child.join(test_finished_join_timeout)
484 join_end = time.time()
485 if join_end - join_start >= test_finished_join_timeout:
486 finished_testcase.logger.error(
487 "Timeout joining finished test: %s (pid %d)" %
488 (finished_testcase.last_test,
489 finished_testcase.child.pid))
490 finished_testcase.close_pipes()
491 wrapped_testcase_suites.remove(finished_testcase)
492 finished_unread_testcases.add(finished_testcase)
493 finished_testcase.stdouterr_queue.put(None)
494 on_suite_finish(finished_testcase)
496 while testcase_suites:
497 results.append(TestResult(testcase_suites.pop(0)))
498 elif testcase_suites:
499 a_suite = testcase_suites.pop(0)
500 while a_suite and a_suite.is_tagged_run_solo:
501 solo_testcase_suites.append(a_suite)
503 a_suite = testcase_suites.pop(0)
506 if a_suite and can_run_suite(a_suite):
508 if solo_testcase_suites and tests_running == 0:
509 a_suite = solo_testcase_suites.pop(0)
513 for wrapped_testcase_suite in wrapped_testcase_suites:
514 wrapped_testcase_suite.child.terminate()
515 wrapped_testcase_suite.stdouterr_queue.put(None)
518 read_from_testcases.clear()
519 stdouterr_thread.join(config.timeout)
522 handle_cores(failed_wrapped_testcases)
526 class TestSuiteWrapper(unittest.TestSuite):
530 return super().__init__()
532 def addTest(self, test):
533 self.cpus_used = max(self.cpus_used, test.get_cpus_required())
534 super().addTest(test)
536 def assign_cpus(self, cpus):
539 def _handleClassSetUp(self, test, result):
540 if not test.__class__.skipped_due_to_cpu_lack:
541 test.assign_cpus(self.cpus)
542 super()._handleClassSetUp(test, result)
544 def get_assigned_cpus(self):
548 class SplitToSuitesCallback:
549 def __init__(self, filter_callback):
551 self.suite_name = 'default'
552 self.filter_callback = filter_callback
553 self.filtered = TestSuiteWrapper()
555 def __call__(self, file_name, cls, method):
556 test_method = cls(method)
557 if self.filter_callback(file_name, cls.__name__, method):
558 self.suite_name = file_name + cls.__name__
559 if self.suite_name not in self.suites:
560 self.suites[self.suite_name] = TestSuiteWrapper()
561 self.suites[self.suite_name].is_tagged_run_solo = False
562 self.suites[self.suite_name].addTest(test_method)
563 if test_method.is_tagged_run_solo():
564 self.suites[self.suite_name].is_tagged_run_solo = True
567 self.filtered.addTest(test_method)
570 def parse_test_filter(test_filter):
572 filter_file_name = None
573 filter_class_name = None
574 filter_func_name = None
579 raise Exception("Unrecognized %s option: %s" %
582 if parts[2] not in ('*', ''):
583 filter_func_name = parts[2]
584 if parts[1] not in ('*', ''):
585 filter_class_name = parts[1]
586 if parts[0] not in ('*', ''):
587 if parts[0].startswith('test_'):
588 filter_file_name = parts[0]
590 filter_file_name = 'test_%s' % parts[0]
592 if f.startswith('test_'):
595 filter_file_name = 'test_%s' % f
597 filter_file_name = '%s.py' % filter_file_name
598 return filter_file_name, filter_class_name, filter_func_name
601 def filter_tests(tests, filter_cb):
602 result = TestSuiteWrapper()
604 if isinstance(t, unittest.suite.TestSuite):
605 # this is a bunch of tests, recursively filter...
606 x = filter_tests(t, filter_cb)
607 if x.countTestCases() > 0:
609 elif isinstance(t, unittest.TestCase):
610 # this is a single test
611 parts = t.id().split('.')
612 # t.id() for common cases like this:
613 # test_classifier.TestClassifier.test_acl_ip
614 # apply filtering only if it is so
616 if not filter_cb(parts[0], parts[1], parts[2]):
620 # unexpected object, don't touch it
625 class FilterByTestOption:
626 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
627 self.filter_file_name = filter_file_name
628 self.filter_class_name = filter_class_name
629 self.filter_func_name = filter_func_name
631 def __call__(self, file_name, class_name, func_name):
632 if self.filter_file_name:
633 fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
636 if self.filter_class_name and class_name != self.filter_class_name:
638 if self.filter_func_name and func_name != self.filter_func_name:
643 class FilterByClassList:
644 def __init__(self, classes_with_filenames):
645 self.classes_with_filenames = classes_with_filenames
647 def __call__(self, file_name, class_name, func_name):
648 return '.'.join([file_name, class_name]) in self.classes_with_filenames
651 def suite_from_failed(suite, failed):
652 failed = {x.rsplit('.', 1)[0] for x in failed}
653 filter_cb = FilterByClassList(failed)
654 suite = filter_tests(suite, filter_cb)
658 class AllResults(dict):
660 super(AllResults, self).__init__()
661 self.all_testcases = 0
662 self.results_per_suite = []
667 self[SKIP_CPU_SHORTAGE] = 0
670 self.testsuites_no_tests_run = []
672 def add_results(self, result):
673 self.results_per_suite.append(result)
674 result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN, SKIP_CPU_SHORTAGE]
675 for result_type in result_types:
676 self[result_type] += len(result[result_type])
678 def add_result(self, result):
680 self.all_testcases += result.testcase_suite.countTestCases()
681 self.add_results(result)
683 if result.no_tests_run():
684 self.testsuites_no_tests_run.append(result.testcase_suite)
689 elif not result.was_successful():
693 self.rerun.append(result.testcase_suite)
697 def print_results(self):
699 print(double_line_delim)
700 print('TEST RESULTS:')
702 def indent_results(lines):
703 lines = list(filter(None, lines))
704 maximum = max(lines, key=lambda x: x.index(":"))
705 maximum = 4 + maximum.index(":")
707 padding = " " * (maximum - l.index(":"))
708 print(f"{padding}{l}")
711 f'Scheduled tests: {self.all_testcases}',
712 f'Executed tests: {self[TEST_RUN]}',
713 f'Passed tests: {colorize(self[PASS], GREEN)}',
714 f'Skipped tests: {colorize(self[SKIP], YELLOW)}'
715 if self[SKIP] else None,
716 f'Not Executed tests: {colorize(self.not_executed, RED)}'
717 if self.not_executed else None,
718 f'Failures: {colorize(self[FAIL], RED)}' if self[FAIL] else None,
719 f'Errors: {colorize(self[ERROR], RED)}' if self[ERROR] else None,
720 'Tests skipped due to lack of CPUS: '
721 f'{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}'
722 if self[SKIP_CPU_SHORTAGE] else None
725 if self.all_failed > 0:
726 print('FAILURES AND ERRORS IN TESTS:')
727 for result in self.results_per_suite:
728 failed_testcase_ids = result[FAIL]
729 errored_testcase_ids = result[ERROR]
730 old_testcase_name = None
731 if failed_testcase_ids:
732 for failed_test_id in failed_testcase_ids:
733 new_testcase_name, test_name = \
734 result.get_testcase_names(failed_test_id)
735 if new_testcase_name != old_testcase_name:
736 print(' Testcase name: {}'.format(
737 colorize(new_testcase_name, RED)))
738 old_testcase_name = new_testcase_name
739 print(' FAILURE: {} [{}]'.format(
740 colorize(test_name, RED), failed_test_id))
741 if errored_testcase_ids:
742 for errored_test_id in errored_testcase_ids:
743 new_testcase_name, test_name = \
744 result.get_testcase_names(errored_test_id)
745 if new_testcase_name != old_testcase_name:
746 print(' Testcase name: {}'.format(
747 colorize(new_testcase_name, RED)))
748 old_testcase_name = new_testcase_name
749 print(' ERROR: {} [{}]'.format(
750 colorize(test_name, RED), errored_test_id))
751 if self.testsuites_no_tests_run:
752 print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
754 for testsuite in self.testsuites_no_tests_run:
755 for testcase in testsuite:
756 tc_classes.add(get_testcase_doc_name(testcase))
757 for tc_class in tc_classes:
758 print(' {}'.format(colorize(tc_class, RED)))
760 if self[SKIP_CPU_SHORTAGE]:
762 print(colorize(' SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
763 ' ENOUGH CPUS AVAILABLE', YELLOW))
764 print(double_line_delim)
768 def not_executed(self):
769 return self.all_testcases - self[TEST_RUN]
772 def all_failed(self):
773 return self[FAIL] + self[ERROR]
776 def parse_results(results):
778 Prints the number of scheduled, executed, not executed, passed, failed,
779 errored and skipped tests and details about failed and errored tests.
781 Also returns all suites where any test failed.
787 results_per_suite = AllResults()
790 for result in results:
791 result_code = results_per_suite.add_result(result)
794 elif result_code == -1:
797 results_per_suite.print_results()
805 return return_code, results_per_suite.rerun
808 if __name__ == '__main__':
810 print(f"Config is: {config}")
813 print("Running sanity test case.")
815 rc = sanity_run_vpp.main()
818 except Exception as e:
819 print(traceback.format_exc())
820 print("Couldn't run sanity test case.")
823 test_finished_join_timeout = 15
825 debug_gdb = config.debug in ["gdb", "gdbserver", "attach"]
826 debug_core = config.debug == "core"
828 run_interactive = debug_gdb or config.step or config.force_foreground
830 max_concurrent_tests = 0
831 print(f"OS reports {num_cpus} available cpu(s).")
833 test_jobs = config.jobs
834 if test_jobs == 'auto':
836 max_concurrent_tests = 1
837 print('Interactive mode required, running tests consecutively.')
839 max_concurrent_tests = num_cpus
840 print(f"Running at most {max_concurrent_tests} python test "
841 "processes concurrently.")
843 max_concurrent_tests = test_jobs
844 print(f"Running at most {max_concurrent_tests} python test processes "
845 "concurrently as set by 'TEST_JOBS'.")
847 print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
849 if run_interactive and max_concurrent_tests > 1:
850 raise NotImplementedError(
851 'Running tests interactively (DEBUG is gdb[server] or ATTACH or '
852 'STEP is set) in parallel (TEST_JOBS is more than 1) is not '
857 print("Running tests using custom test runner.")
858 filter_file, filter_class, filter_func = \
859 parse_test_filter(config.filter)
861 print("Selected filters: file=%s, class=%s, function=%s" % (
862 filter_file, filter_class, filter_func))
864 filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
866 ignore_path = config.venv_dir
867 cb = SplitToSuitesCallback(filter_cb)
868 for d in config.test_src_dir:
869 print("Adding tests from directory tree %s" % d)
870 discover_tests(d, cb, ignore_path)
872 # suites are not hashable, need to use list
875 for testcase_suite in cb.suites.values():
876 tests_amount += testcase_suite.countTestCases()
877 if testcase_suite.cpus_used > max_vpp_cpus:
878 # here we replace test functions with lambdas to just skip them
879 # but we also replace setUp/tearDown functions to do nothing
880 # so that the test can be "started" and "stopped", so that we can
881 # still keep those prints (test description - SKIP), which are done
882 # in stopTest() (for that to trigger, test function must run)
883 for t in testcase_suite:
885 if m.startswith('test_'):
886 setattr(t, m, lambda: t.skipTest("not enough cpus"))
887 setattr(t.__class__, 'setUpClass', lambda: None)
888 setattr(t.__class__, 'tearDownClass', lambda: None)
889 setattr(t, 'setUp', lambda: None)
890 setattr(t, 'tearDown', lambda: None)
891 t.__class__.skipped_due_to_cpu_lack = True
892 suites.append(testcase_suite)
894 print("%s out of %s tests match specified filters" % (
895 tests_amount, tests_amount + cb.filtered.countTestCases()))
897 if not config.extended:
898 print("Not running extended tests (some tests will be skipped)")
900 attempts = config.retries + 1
902 print("Perform %s attempts to pass the suite..." % attempts)
904 if run_interactive and suites:
905 # don't fork if requiring interactive terminal
906 print('Running tests in foreground in the current process')
907 full_suite = unittest.TestSuite()
908 free_cpus = list(available_cpus)
911 if suite.cpus_used <= max_vpp_cpus:
912 suite.assign_cpus(free_cpus[:suite.cpus_used])
914 suite.assign_cpus([])
916 full_suite.addTests(suites)
917 result = VppTestRunner(verbosity=config.verbose,
918 failfast=config.failfast,
919 print_summary=True).run(full_suite)
920 was_successful = result.wasSuccessful()
921 if not was_successful:
922 for test_case_info in result.failed_test_cases_info:
923 handle_failed_suite(test_case_info.logger,
924 test_case_info.tempdir,
925 test_case_info.vpp_pid)
926 if test_case_info in result.core_crash_test_cases_info:
927 check_and_handle_core(test_case_info.vpp_bin_path,
928 test_case_info.tempdir,
929 test_case_info.core_crash_test)
933 print(colorize('SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
934 ' ENOUGH CPUS AVAILABLE', YELLOW))
936 sys.exit(not was_successful)
938 print('Running each VPPTestCase in a separate background process'
939 f' with at most {max_concurrent_tests} parallel python test '
942 while suites and attempts > 0:
943 results = run_forked(suites)
944 exit_code, suites = parse_results(results)
947 print('Test run was successful')
949 print('%s attempt(s) left.' % attempts)