13 from multiprocessing import Process, Pipe, get_context
14 from multiprocessing.queues import Queue
15 from multiprocessing.managers import BaseManager
17 from framework import VppTestRunner, running_extended_tests, VppTestCase, \
18 get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
19 TEST_RUN, SKIP_CPU_SHORTAGE
20 from debug import spawn_gdb, start_vpp_in_gdb
21 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
22 colorize, single_line_delim
23 from discover_tests import discover_tests
24 from subprocess import check_output, CalledProcessError
25 from util import check_core_path, get_core_path, is_core_present
26 from cpu_config import num_cpus, max_vpp_cpus, available_cpus
28 # timeout which controls how long the child has to finish after seeing
29 # a core dump in test temporary directory. If this is exceeded, parent assumes
30 # that child process is stuck (e.g. waiting for event from vpp) and kill
35 class StreamQueue(Queue):
40 sys.__stdout__.flush()
41 sys.__stderr__.flush()
44 return self._writer.fileno()
47 class StreamQueueManager(BaseManager):
51 StreamQueueManager.register('StreamQueue', StreamQueue)
54 class TestResult(dict):
55 def __init__(self, testcase_suite, testcases_by_id=None):
56 super(TestResult, self).__init__()
61 self[SKIP_CPU_SHORTAGE] = []
64 self.testcase_suite = testcase_suite
65 self.testcases = [testcase for testcase in testcase_suite]
66 self.testcases_by_id = testcases_by_id
68 def was_successful(self):
69 return 0 == len(self[FAIL]) == len(self[ERROR]) \
70 and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]) \
71 == self.testcase_suite.countTestCases()
73 def no_tests_run(self):
74 return 0 == len(self[TEST_RUN])
76 def process_result(self, test_id, result):
77 self[result].append(test_id)
79 def suite_from_failed(self):
81 for testcase in self.testcase_suite:
83 if tc_id not in self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]:
86 return suite_from_failed(self.testcase_suite, rerun_ids)
88 def get_testcase_names(self, test_id):
89 # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
90 setup_teardown_match = re.match(
91 r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
92 if setup_teardown_match:
93 test_name, _, _, testcase_name = setup_teardown_match.groups()
94 if len(testcase_name.split('.')) == 2:
95 for key in self.testcases_by_id.keys():
96 if key.startswith(testcase_name):
99 testcase_name = self._get_testcase_doc_name(testcase_name)
101 test_name = self._get_test_description(test_id)
102 testcase_name = self._get_testcase_doc_name(test_id)
104 return testcase_name, test_name
106 def _get_test_description(self, test_id):
107 if test_id in self.testcases_by_id:
108 desc = get_test_description(descriptions,
109 self.testcases_by_id[test_id])
114 def _get_testcase_doc_name(self, test_id):
115 if test_id in self.testcases_by_id:
116 doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
122 def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
123 finished_pipe, result_pipe, logger):
124 sys.stdout = stdouterr_queue
125 sys.stderr = stdouterr_queue
126 VppTestCase.parallel_handler = logger.handlers[0]
127 result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
128 descriptions=descriptions,
130 result_pipe=result_pipe,
132 print_summary=False).run(suite)
133 finished_pipe.send(result.wasSuccessful())
134 finished_pipe.close()
135 keep_alive_pipe.close()
138 class TestCaseWrapper(object):
139 def __init__(self, testcase_suite, manager):
140 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
142 self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
143 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
144 self.testcase_suite = testcase_suite
145 self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
146 self.logger = get_parallel_logger(self.stdouterr_queue)
147 self.child = Process(target=test_runner_wrapper,
148 args=(testcase_suite,
149 self.keep_alive_child_end,
150 self.stdouterr_queue,
151 self.finished_child_end,
152 self.result_child_end,
156 self.last_test_temp_dir = None
157 self.last_test_vpp_binary = None
158 self._last_test = None
159 self.last_test_id = None
161 self.last_heard = time.time()
162 self.core_detected_at = None
163 self.testcases_by_id = {}
164 self.testclasess_with_core = {}
165 for testcase in self.testcase_suite:
166 self.testcases_by_id[testcase.id()] = testcase
167 self.result = TestResult(testcase_suite, self.testcases_by_id)
171 return self._last_test
174 def last_test(self, test_id):
175 self.last_test_id = test_id
176 if test_id in self.testcases_by_id:
177 testcase = self.testcases_by_id[test_id]
178 self._last_test = testcase.shortDescription()
179 if not self._last_test:
180 self._last_test = str(testcase)
182 self._last_test = test_id
184 def add_testclass_with_core(self):
185 if self.last_test_id in self.testcases_by_id:
186 test = self.testcases_by_id[self.last_test_id]
187 class_name = unittest.util.strclass(test.__class__)
188 test_name = "'{}' ({})".format(get_test_description(descriptions,
192 test_name = self.last_test_id
193 class_name = re.match(r'((tearDownClass)|(setUpClass)) '
194 r'\((.+\..+)\)', test_name).groups()[3]
195 if class_name not in self.testclasess_with_core:
196 self.testclasess_with_core[class_name] = (
198 self.last_test_vpp_binary,
199 self.last_test_temp_dir)
201 def close_pipes(self):
202 self.keep_alive_child_end.close()
203 self.finished_child_end.close()
204 self.result_child_end.close()
205 self.keep_alive_parent_end.close()
206 self.finished_parent_end.close()
207 self.result_parent_end.close()
209 def was_successful(self):
210 return self.result.was_successful()
214 return self.testcase_suite.cpus_used
216 def get_assigned_cpus(self):
217 return self.testcase_suite.get_assigned_cpus()
220 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
223 while read_testcases.is_set() or unread_testcases:
224 if finished_unread_testcases:
225 read_testcase = finished_unread_testcases.pop()
226 unread_testcases.remove(read_testcase)
227 elif unread_testcases:
228 read_testcase = unread_testcases.pop()
231 while data is not None:
232 sys.stdout.write(data)
233 data = read_testcase.stdouterr_queue.get()
235 read_testcase.stdouterr_queue.close()
236 finished_unread_testcases.discard(read_testcase)
240 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid):
241 if last_test_temp_dir:
242 # Need to create link in case of a timeout or core dump without failure
243 lttd = os.path.basename(last_test_temp_dir)
244 failed_dir = os.getenv('FAILED_DIR')
245 link_path = '%s%s-FAILED' % (failed_dir, lttd)
246 if not os.path.exists(link_path):
247 os.symlink(last_test_temp_dir, link_path)
248 logger.error("Symlink to failed testcase directory: %s -> %s"
251 # Report core existence
252 core_path = get_core_path(last_test_temp_dir)
253 if os.path.exists(core_path):
255 "Core-file exists in test temporary directory: %s!" %
257 check_core_path(logger, core_path)
258 logger.debug("Running 'file %s':" % core_path)
260 info = check_output(["file", core_path])
262 except CalledProcessError as e:
263 logger.error("Subprocess returned with return code "
264 "while running `file' utility on core-file "
266 "rc=%s", e.returncode)
268 logger.error("Subprocess returned with OS error while "
269 "running 'file' utility "
271 "(%s) %s", e.errno, e.strerror)
272 except Exception as e:
273 logger.exception("Unexpected error running `file' utility "
275 logger.error("gdb %s %s" %
276 (os.getenv('VPP_BIN', 'vpp'), core_path))
279 # Copy api post mortem
280 api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
281 if os.path.isfile(api_post_mortem_path):
282 logger.error("Copying api_post_mortem.%d to %s" %
283 (vpp_pid, last_test_temp_dir))
284 shutil.copy2(api_post_mortem_path, last_test_temp_dir)
287 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
288 if is_core_present(tempdir):
290 print('VPP core detected in %s. Last test running was %s' %
291 (tempdir, core_crash_test))
292 print(single_line_delim)
293 spawn_gdb(vpp_binary, get_core_path(tempdir))
294 print(single_line_delim)
296 print("Compressing core-file in test directory `%s'" % tempdir)
297 os.system("gzip %s" % get_core_path(tempdir))
300 def handle_cores(failed_testcases):
301 for failed_testcase in failed_testcases:
302 tcs_with_core = failed_testcase.testclasess_with_core
304 for test, vpp_binary, tempdir in tcs_with_core.values():
305 check_and_handle_core(vpp_binary, tempdir, test)
308 def process_finished_testsuite(wrapped_testcase_suite,
309 finished_testcase_suites,
310 failed_wrapped_testcases,
312 results.append(wrapped_testcase_suite.result)
313 finished_testcase_suites.add(wrapped_testcase_suite)
315 if failfast and not wrapped_testcase_suite.was_successful():
318 if not wrapped_testcase_suite.was_successful():
319 failed_wrapped_testcases.add(wrapped_testcase_suite)
320 handle_failed_suite(wrapped_testcase_suite.logger,
321 wrapped_testcase_suite.last_test_temp_dir,
322 wrapped_testcase_suite.vpp_pid)
327 def run_forked(testcase_suites):
328 wrapped_testcase_suites = set()
329 solo_testcase_suites = []
331 # suites are unhashable, need to use list
333 unread_testcases = set()
334 finished_unread_testcases = set()
335 manager = StreamQueueManager()
338 free_cpus = list(available_cpus)
340 def on_suite_start(tc):
341 nonlocal tests_running
343 tests_running = tests_running + 1
345 def on_suite_finish(tc):
346 nonlocal tests_running
348 tests_running = tests_running - 1
349 assert tests_running >= 0
350 free_cpus.extend(tc.get_assigned_cpus())
352 def run_suite(suite):
354 nonlocal wrapped_testcase_suites
355 nonlocal unread_testcases
357 suite.assign_cpus(free_cpus[:suite.cpus_used])
358 free_cpus = free_cpus[suite.cpus_used:]
359 wrapper = TestCaseWrapper(suite, manager)
360 wrapped_testcase_suites.add(wrapper)
361 unread_testcases.add(wrapper)
362 on_suite_start(suite)
364 def can_run_suite(suite):
365 return (tests_running < max_concurrent_tests and
366 (suite.cpus_used <= len(free_cpus) or
367 suite.cpus_used > max_vpp_cpus))
369 while free_cpus and testcase_suites:
370 a_suite = testcase_suites[0]
371 if a_suite.is_tagged_run_solo:
372 a_suite = testcase_suites.pop(0)
373 solo_testcase_suites.append(a_suite)
375 if can_run_suite(a_suite):
376 a_suite = testcase_suites.pop(0)
381 if tests_running == 0 and solo_testcase_suites:
382 a_suite = solo_testcase_suites.pop(0)
385 read_from_testcases = threading.Event()
386 read_from_testcases.set()
387 stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
388 args=(unread_testcases,
389 finished_unread_testcases,
390 read_from_testcases))
391 stdouterr_thread.start()
393 failed_wrapped_testcases = set()
397 while wrapped_testcase_suites:
398 finished_testcase_suites = set()
399 for wrapped_testcase_suite in wrapped_testcase_suites:
400 while wrapped_testcase_suite.result_parent_end.poll():
401 wrapped_testcase_suite.result.process_result(
402 *wrapped_testcase_suite.result_parent_end.recv())
403 wrapped_testcase_suite.last_heard = time.time()
405 while wrapped_testcase_suite.keep_alive_parent_end.poll():
406 wrapped_testcase_suite.last_test, \
407 wrapped_testcase_suite.last_test_vpp_binary, \
408 wrapped_testcase_suite.last_test_temp_dir, \
409 wrapped_testcase_suite.vpp_pid = \
410 wrapped_testcase_suite.keep_alive_parent_end.recv()
411 wrapped_testcase_suite.last_heard = time.time()
413 if wrapped_testcase_suite.finished_parent_end.poll():
414 wrapped_testcase_suite.finished_parent_end.recv()
415 wrapped_testcase_suite.last_heard = time.time()
416 stop_run = process_finished_testsuite(
417 wrapped_testcase_suite,
418 finished_testcase_suites,
419 failed_wrapped_testcases,
424 if wrapped_testcase_suite.last_heard + test_timeout < \
427 wrapped_testcase_suite.logger.critical(
428 "Child test runner process timed out "
429 "(last test running was `%s' in `%s')!" %
430 (wrapped_testcase_suite.last_test,
431 wrapped_testcase_suite.last_test_temp_dir))
432 elif not wrapped_testcase_suite.child.is_alive():
434 wrapped_testcase_suite.logger.critical(
435 "Child test runner process unexpectedly died "
436 "(last test running was `%s' in `%s')!" %
437 (wrapped_testcase_suite.last_test,
438 wrapped_testcase_suite.last_test_temp_dir))
439 elif wrapped_testcase_suite.last_test_temp_dir and \
440 wrapped_testcase_suite.last_test_vpp_binary:
442 wrapped_testcase_suite.last_test_temp_dir):
443 wrapped_testcase_suite.add_testclass_with_core()
444 if wrapped_testcase_suite.core_detected_at is None:
445 wrapped_testcase_suite.core_detected_at = \
447 elif wrapped_testcase_suite.core_detected_at + \
448 core_timeout < time.time():
449 wrapped_testcase_suite.logger.critical(
450 "Child test runner process unresponsive and "
451 "core-file exists in test temporary directory "
452 "(last test running was `%s' in `%s')!" %
453 (wrapped_testcase_suite.last_test,
454 wrapped_testcase_suite.last_test_temp_dir))
458 wrapped_testcase_suite.child.terminate()
460 # terminating the child process tends to leave orphan
462 if wrapped_testcase_suite.vpp_pid:
463 os.kill(wrapped_testcase_suite.vpp_pid,
468 wrapped_testcase_suite.result.crashed = True
469 wrapped_testcase_suite.result.process_result(
470 wrapped_testcase_suite.last_test_id, ERROR)
471 stop_run = process_finished_testsuite(
472 wrapped_testcase_suite,
473 finished_testcase_suites,
474 failed_wrapped_testcases,
477 for finished_testcase in finished_testcase_suites:
478 # Somewhat surprisingly, the join below may
479 # timeout, even if client signaled that
480 # it finished - so we note it just in case.
481 join_start = time.time()
482 finished_testcase.child.join(test_finished_join_timeout)
483 join_end = time.time()
484 if join_end - join_start >= test_finished_join_timeout:
485 finished_testcase.logger.error(
486 "Timeout joining finished test: %s (pid %d)" %
487 (finished_testcase.last_test,
488 finished_testcase.child.pid))
489 finished_testcase.close_pipes()
490 wrapped_testcase_suites.remove(finished_testcase)
491 finished_unread_testcases.add(finished_testcase)
492 finished_testcase.stdouterr_queue.put(None)
493 on_suite_finish(finished_testcase)
495 while testcase_suites:
496 results.append(TestResult(testcase_suites.pop(0)))
497 elif testcase_suites:
498 a_suite = testcase_suites.pop(0)
499 while a_suite and a_suite.is_tagged_run_solo:
500 solo_testcase_suites.append(a_suite)
502 a_suite = testcase_suites.pop(0)
505 if a_suite and can_run_suite(a_suite):
507 if solo_testcase_suites and tests_running == 0:
508 a_suite = solo_testcase_suites.pop(0)
512 for wrapped_testcase_suite in wrapped_testcase_suites:
513 wrapped_testcase_suite.child.terminate()
514 wrapped_testcase_suite.stdouterr_queue.put(None)
517 read_from_testcases.clear()
518 stdouterr_thread.join(test_timeout)
521 handle_cores(failed_wrapped_testcases)
525 class TestSuiteWrapper(unittest.TestSuite):
529 return super().__init__()
531 def addTest(self, test):
532 self.cpus_used = max(self.cpus_used, test.get_cpus_required())
533 super().addTest(test)
535 def assign_cpus(self, cpus):
538 def _handleClassSetUp(self, test, result):
539 if not test.__class__.skipped_due_to_cpu_lack:
540 test.assign_cpus(self.cpus)
541 super()._handleClassSetUp(test, result)
543 def get_assigned_cpus(self):
547 class SplitToSuitesCallback:
548 def __init__(self, filter_callback):
550 self.suite_name = 'default'
551 self.filter_callback = filter_callback
552 self.filtered = TestSuiteWrapper()
554 def __call__(self, file_name, cls, method):
555 test_method = cls(method)
556 if self.filter_callback(file_name, cls.__name__, method):
557 self.suite_name = file_name + cls.__name__
558 if self.suite_name not in self.suites:
559 self.suites[self.suite_name] = TestSuiteWrapper()
560 self.suites[self.suite_name].is_tagged_run_solo = False
561 self.suites[self.suite_name].addTest(test_method)
562 if test_method.is_tagged_run_solo():
563 self.suites[self.suite_name].is_tagged_run_solo = True
566 self.filtered.addTest(test_method)
572 def parse_test_option():
573 f = os.getenv(test_option, None)
574 filter_file_name = None
575 filter_class_name = None
576 filter_func_name = None
581 raise Exception("Unrecognized %s option: %s" %
584 if parts[2] not in ('*', ''):
585 filter_func_name = parts[2]
586 if parts[1] not in ('*', ''):
587 filter_class_name = parts[1]
588 if parts[0] not in ('*', ''):
589 if parts[0].startswith('test_'):
590 filter_file_name = parts[0]
592 filter_file_name = 'test_%s' % parts[0]
594 if f.startswith('test_'):
597 filter_file_name = 'test_%s' % f
599 filter_file_name = '%s.py' % filter_file_name
600 return filter_file_name, filter_class_name, filter_func_name
603 def filter_tests(tests, filter_cb):
604 result = TestSuiteWrapper()
606 if isinstance(t, unittest.suite.TestSuite):
607 # this is a bunch of tests, recursively filter...
608 x = filter_tests(t, filter_cb)
609 if x.countTestCases() > 0:
611 elif isinstance(t, unittest.TestCase):
612 # this is a single test
613 parts = t.id().split('.')
614 # t.id() for common cases like this:
615 # test_classifier.TestClassifier.test_acl_ip
616 # apply filtering only if it is so
618 if not filter_cb(parts[0], parts[1], parts[2]):
622 # unexpected object, don't touch it
627 class FilterByTestOption:
628 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
629 self.filter_file_name = filter_file_name
630 self.filter_class_name = filter_class_name
631 self.filter_func_name = filter_func_name
633 def __call__(self, file_name, class_name, func_name):
634 if self.filter_file_name:
635 fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
638 if self.filter_class_name and class_name != self.filter_class_name:
640 if self.filter_func_name and func_name != self.filter_func_name:
645 class FilterByClassList:
646 def __init__(self, classes_with_filenames):
647 self.classes_with_filenames = classes_with_filenames
649 def __call__(self, file_name, class_name, func_name):
650 return '.'.join([file_name, class_name]) in self.classes_with_filenames
653 def suite_from_failed(suite, failed):
654 failed = {x.rsplit('.', 1)[0] for x in failed}
655 filter_cb = FilterByClassList(failed)
656 suite = filter_tests(suite, filter_cb)
660 class AllResults(dict):
662 super(AllResults, self).__init__()
663 self.all_testcases = 0
664 self.results_per_suite = []
669 self[SKIP_CPU_SHORTAGE] = 0
672 self.testsuites_no_tests_run = []
674 def add_results(self, result):
675 self.results_per_suite.append(result)
676 result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN, SKIP_CPU_SHORTAGE]
677 for result_type in result_types:
678 self[result_type] += len(result[result_type])
680 def add_result(self, result):
682 self.all_testcases += result.testcase_suite.countTestCases()
683 self.add_results(result)
685 if result.no_tests_run():
686 self.testsuites_no_tests_run.append(result.testcase_suite)
691 elif not result.was_successful():
695 self.rerun.append(result.testcase_suite)
699 def print_results(self):
701 print(double_line_delim)
702 print('TEST RESULTS:')
704 def indent_results(lines):
705 lines = list(filter(None, lines))
706 maximum = max(lines, key=lambda x: x.index(":"))
707 maximum = 4 + maximum.index(":")
709 padding = " " * (maximum - l.index(":"))
710 print(f"{padding}{l}")
713 f'Scheduled tests: {self.all_testcases}',
714 f'Executed tests: {self[TEST_RUN]}',
715 f'Passed tests: {colorize(self[PASS], GREEN)}',
716 f'Skipped tests: {colorize(self[SKIP], YELLOW)}'
717 if self[SKIP] else None,
718 f'Not Executed tests: {colorize(self.not_executed, RED)}'
719 if self.not_executed else None,
720 f'Failures: {colorize(self[FAIL], RED)}' if self[FAIL] else None,
721 f'Errors: {colorize(self[ERROR], RED)}' if self[ERROR] else None,
722 'Tests skipped due to lack of CPUS: '
723 f'{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}'
724 if self[SKIP_CPU_SHORTAGE] else None
727 if self.all_failed > 0:
728 print('FAILURES AND ERRORS IN TESTS:')
729 for result in self.results_per_suite:
730 failed_testcase_ids = result[FAIL]
731 errored_testcase_ids = result[ERROR]
732 old_testcase_name = None
733 if failed_testcase_ids:
734 for failed_test_id in failed_testcase_ids:
735 new_testcase_name, test_name = \
736 result.get_testcase_names(failed_test_id)
737 if new_testcase_name != old_testcase_name:
738 print(' Testcase name: {}'.format(
739 colorize(new_testcase_name, RED)))
740 old_testcase_name = new_testcase_name
741 print(' FAILURE: {} [{}]'.format(
742 colorize(test_name, RED), failed_test_id))
743 if errored_testcase_ids:
744 for errored_test_id in errored_testcase_ids:
745 new_testcase_name, test_name = \
746 result.get_testcase_names(errored_test_id)
747 if new_testcase_name != old_testcase_name:
748 print(' Testcase name: {}'.format(
749 colorize(new_testcase_name, RED)))
750 old_testcase_name = new_testcase_name
751 print(' ERROR: {} [{}]'.format(
752 colorize(test_name, RED), errored_test_id))
753 if self.testsuites_no_tests_run:
754 print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
756 for testsuite in self.testsuites_no_tests_run:
757 for testcase in testsuite:
758 tc_classes.add(get_testcase_doc_name(testcase))
759 for tc_class in tc_classes:
760 print(' {}'.format(colorize(tc_class, RED)))
762 if self[SKIP_CPU_SHORTAGE]:
764 print(colorize(' SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
765 ' ENOUGH CPUS AVAILABLE', YELLOW))
766 print(double_line_delim)
770 def not_executed(self):
771 return self.all_testcases - self[TEST_RUN]
774 def all_failed(self):
775 return self[FAIL] + self[ERROR]
778 def parse_results(results):
780 Prints the number of scheduled, executed, not executed, passed, failed,
781 errored and skipped tests and details about failed and errored tests.
783 Also returns all suites where any test failed.
789 results_per_suite = AllResults()
792 for result in results:
793 result_code = results_per_suite.add_result(result)
796 elif result_code == -1:
799 results_per_suite.print_results()
807 return return_code, results_per_suite.rerun
810 def parse_digit_env(env_var, default):
811 value = os.getenv(env_var, default)
816 print('WARNING: unsupported value "%s" for env var "%s",'
817 'defaulting to %s' % (value, env_var, default))
822 if __name__ == '__main__':
824 verbose = parse_digit_env("V", 0)
826 test_timeout = parse_digit_env("TIMEOUT", 600) # default = 10 minutes
828 test_finished_join_timeout = 15
830 retries = parse_digit_env("RETRIES", 0)
832 debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver", "attach"]
834 debug_core = os.getenv("DEBUG", "").lower() == "core"
835 compress_core = framework.BoolEnvironmentVariable("CORE_COMPRESS")
837 if os.getenv("VPP_IN_GDB", "n").lower() in ["1", "y", "yes"]:
841 step = framework.BoolEnvironmentVariable("STEP")
842 force_foreground = framework.BoolEnvironmentVariable("FORCE_FOREGROUND")
844 run_interactive = debug or step or force_foreground
846 max_concurrent_tests = 0
847 print(f"OS reports {num_cpus} available cpu(s).")
849 test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
850 if test_jobs == 'auto':
852 max_concurrent_tests = 1
853 print('Interactive mode required, running tests consecutively.')
855 max_concurrent_tests = num_cpus
856 print(f"Running at most {max_concurrent_tests} python test "
857 "processes concurrently.")
860 test_jobs = int(test_jobs)
861 except ValueError as e:
862 raise ValueError("Invalid TEST_JOBS value specified, valid "
863 "values are a positive integer or 'auto'") from e
865 raise ValueError("Invalid TEST_JOBS value specified, valid "
866 "values are a positive integer or 'auto'")
867 max_concurrent_tests = int(test_jobs)
868 print(f"Running at most {max_concurrent_tests} python test processes "
869 "concurrently as set by 'TEST_JOBS'.")
871 print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
873 if run_interactive and max_concurrent_tests > 1:
874 raise NotImplementedError(
875 'Running tests interactively (DEBUG is gdb[server] or ATTACH or '
876 'STEP is set) in parallel (TEST_JOBS is more than 1) is not '
879 parser = argparse.ArgumentParser(description="VPP unit tests")
880 parser.add_argument("-f", "--failfast", action='store_true',
881 help="fast failure flag")
882 parser.add_argument("-d", "--dir", action='append', type=str,
883 help="directory containing test files "
884 "(may be specified multiple times)")
885 args = parser.parse_args()
886 failfast = args.failfast
889 print("Running tests using custom test runner.")
890 filter_file, filter_class, filter_func = parse_test_option()
892 print("Active filters: file=%s, class=%s, function=%s" % (
893 filter_file, filter_class, filter_func))
895 filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
897 ignore_path = os.getenv("VENV_PATH", None)
898 cb = SplitToSuitesCallback(filter_cb)
900 print("Adding tests from directory tree %s" % d)
901 discover_tests(d, cb, ignore_path)
903 # suites are not hashable, need to use list
906 for testcase_suite in cb.suites.values():
907 tests_amount += testcase_suite.countTestCases()
908 if testcase_suite.cpus_used > max_vpp_cpus:
909 # here we replace test functions with lambdas to just skip them
910 # but we also replace setUp/tearDown functions to do nothing
911 # so that the test can be "started" and "stopped", so that we can
912 # still keep those prints (test description - SKIP), which are done
913 # in stopTest() (for that to trigger, test function must run)
914 for t in testcase_suite:
916 if m.startswith('test_'):
917 setattr(t, m, lambda: t.skipTest("not enough cpus"))
918 setattr(t.__class__, 'setUpClass', lambda: None)
919 setattr(t.__class__, 'tearDownClass', lambda: None)
920 setattr(t, 'setUp', lambda: None)
921 setattr(t, 'tearDown', lambda: None)
922 t.__class__.skipped_due_to_cpu_lack = True
923 suites.append(testcase_suite)
925 print("%s out of %s tests match specified filters" % (
926 tests_amount, tests_amount + cb.filtered.countTestCases()))
928 if not running_extended_tests:
929 print("Not running extended tests (some tests will be skipped)")
931 attempts = retries + 1
933 print("Perform %s attempts to pass the suite..." % attempts)
935 if run_interactive and suites:
936 # don't fork if requiring interactive terminal
937 print('Running tests in foreground in the current process')
938 full_suite = unittest.TestSuite()
939 free_cpus = list(available_cpus)
942 if suite.cpus_used <= max_vpp_cpus:
943 suite.assign_cpus(free_cpus[:suite.cpus_used])
945 suite.assign_cpus([])
947 full_suite.addTests(suites)
948 result = VppTestRunner(verbosity=verbose,
950 print_summary=True).run(full_suite)
951 was_successful = result.wasSuccessful()
952 if not was_successful:
953 for test_case_info in result.failed_test_cases_info:
954 handle_failed_suite(test_case_info.logger,
955 test_case_info.tempdir,
956 test_case_info.vpp_pid)
957 if test_case_info in result.core_crash_test_cases_info:
958 check_and_handle_core(test_case_info.vpp_bin_path,
959 test_case_info.tempdir,
960 test_case_info.core_crash_test)
964 print(colorize('SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
965 ' ENOUGH CPUS AVAILABLE', YELLOW))
967 sys.exit(not was_successful)
969 print('Running each VPPTestCase in a separate background process'
970 f' with at most {max_concurrent_tests} parallel python test '
973 while suites and attempts > 0:
974 results = run_forked(suites)
975 exit_code, suites = parse_results(results)
978 print('Test run was successful')
980 print('%s attempt(s) left.' % attempts)