14 from multiprocessing import Process, Pipe, cpu_count
15 from multiprocessing.queues import Queue
16 from multiprocessing.managers import BaseManager
17 from framework import VppTestRunner, running_extended_tests, VppTestCase, \
18 get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
20 from debug import spawn_gdb
21 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
22 colorize, single_line_delim
23 from discover_tests import discover_tests
24 from subprocess import check_output, CalledProcessError
25 from util import check_core_path, get_core_path, is_core_present
27 # timeout which controls how long the child has to finish after seeing
28 # a core dump in test temporary directory. If this is exceeded, parent assumes
29 # that child process is stuck (e.g. waiting for shm mutex, which will never
30 # get unlocked) and kill the child
32 min_req_shm = 536870912 # min 512MB shm required
33 # 128MB per extra process
34 shm_per_process = 134217728
37 class StreamQueue(Queue):
42 sys.__stdout__.flush()
43 sys.__stderr__.flush()
46 return self._writer.fileno()
49 class StreamQueueManager(BaseManager):
53 StreamQueueManager.register('StreamQueue', StreamQueue)
56 class TestResult(dict):
57 def __init__(self, testcase_suite, testcases_by_id=None):
58 super(TestResult, self).__init__()
65 self.testcase_suite = testcase_suite
66 self.testcases = [testcase for testcase in testcase_suite]
67 self.testcases_by_id = testcases_by_id
69 def was_successful(self):
70 return 0 == len(self[FAIL]) == len(self[ERROR]) \
71 and len(self[PASS] + self[SKIP]) \
72 == self.testcase_suite.countTestCases() == len(self[TEST_RUN])
74 def no_tests_run(self):
75 return 0 == len(self[TEST_RUN])
77 def process_result(self, test_id, result):
78 self[result].append(test_id)
80 def suite_from_failed(self):
82 for testcase in self.testcase_suite:
84 if tc_id not in self[PASS] and tc_id not in self[SKIP]:
86 if len(rerun_ids) > 0:
87 return suite_from_failed(self.testcase_suite, rerun_ids)
89 def get_testcase_names(self, test_id):
90 if re.match(r'.+\..+\..+', test_id):
91 test_name = self._get_test_description(test_id)
92 testcase_name = self._get_testcase_doc_name(test_id)
94 # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
95 setup_teardown_match = re.match(
96 r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
97 if setup_teardown_match:
98 test_name, _, _, testcase_name = setup_teardown_match.groups()
99 if len(testcase_name.split('.')) == 2:
100 for key in self.testcases_by_id.keys():
101 if key.startswith(testcase_name):
104 testcase_name = self._get_testcase_doc_name(testcase_name)
107 testcase_name = test_id
109 return testcase_name, test_name
111 def _get_test_description(self, test_id):
112 return get_test_description(descriptions,
113 self.testcases_by_id[test_id])
115 def _get_testcase_doc_name(self, test_id):
116 return get_testcase_doc_name(self.testcases_by_id[test_id])
119 def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
120 finished_pipe, result_pipe, logger):
121 sys.stdout = stdouterr_queue
122 sys.stderr = stdouterr_queue
123 VppTestCase.parallel_handler = logger.handlers[0]
124 result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
125 descriptions=descriptions,
127 result_pipe=result_pipe,
129 print_summary=False).run(suite)
130 finished_pipe.send(result.wasSuccessful())
131 finished_pipe.close()
132 keep_alive_pipe.close()
135 class TestCaseWrapper(object):
136 def __init__(self, testcase_suite, manager):
137 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
139 self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
140 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
141 self.testcase_suite = testcase_suite
142 self.stdouterr_queue = manager.StreamQueue()
143 self.logger = get_parallel_logger(self.stdouterr_queue)
144 self.child = Process(target=test_runner_wrapper,
145 args=(testcase_suite,
146 self.keep_alive_child_end,
147 self.stdouterr_queue,
148 self.finished_child_end,
149 self.result_child_end,
153 self.last_test_temp_dir = None
154 self.last_test_vpp_binary = None
155 self._last_test = None
156 self.last_test_id = None
158 self.last_heard = time.time()
159 self.core_detected_at = None
160 self.testcases_by_id = {}
161 self.testclasess_with_core = {}
162 for testcase in self.testcase_suite:
163 self.testcases_by_id[testcase.id()] = testcase
164 self.result = TestResult(testcase_suite, self.testcases_by_id)
168 return self._last_test
171 def last_test(self, test_id):
172 self.last_test_id = test_id
173 if test_id in self.testcases_by_id:
174 testcase = self.testcases_by_id[test_id]
175 self._last_test = testcase.shortDescription()
176 if not self._last_test:
177 self._last_test = str(testcase)
179 self._last_test = test_id
181 def add_testclass_with_core(self):
182 if self.last_test_id in self.testcases_by_id:
183 test = self.testcases_by_id[self.last_test_id]
184 class_name = unittest.util.strclass(test.__class__)
185 test_name = "'{}' ({})".format(get_test_description(descriptions,
189 test_name = self.last_test_id
190 class_name = re.match(r'((tearDownClass)|(setUpClass)) '
191 r'\((.+\..+)\)', test_name).groups()[3]
192 if class_name not in self.testclasess_with_core:
193 self.testclasess_with_core[class_name] = (
195 self.last_test_vpp_binary,
196 self.last_test_temp_dir)
198 def close_pipes(self):
199 self.keep_alive_child_end.close()
200 self.finished_child_end.close()
201 self.result_child_end.close()
202 self.keep_alive_parent_end.close()
203 self.finished_parent_end.close()
204 self.result_parent_end.close()
206 def was_successful(self):
207 return self.result.was_successful()
210 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
213 while read_testcases.is_set() or len(unread_testcases) > 0:
214 if not read_testcase:
215 if len(finished_unread_testcases) > 0:
216 read_testcase = finished_unread_testcases.pop()
217 unread_testcases.remove(read_testcase)
218 elif len(unread_testcases) > 0:
219 read_testcase = unread_testcases.pop()
222 while data is not None:
223 sys.stdout.write(data)
224 data = read_testcase.stdouterr_queue.get()
226 read_testcase.stdouterr_queue.close()
227 finished_unread_testcases.discard(read_testcase)
231 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid):
232 if last_test_temp_dir:
233 # Need to create link in case of a timeout or core dump without failure
234 lttd = os.path.basename(last_test_temp_dir)
235 failed_dir = os.getenv('FAILED_DIR')
236 link_path = '%s%s-FAILED' % (failed_dir, lttd)
237 if not os.path.exists(link_path):
238 os.symlink(last_test_temp_dir, link_path)
239 logger.error("Symlink to failed testcase directory: %s -> %s"
242 # Report core existence
243 core_path = get_core_path(last_test_temp_dir)
244 if os.path.exists(core_path):
246 "Core-file exists in test temporary directory: %s!" %
248 check_core_path(logger, core_path)
249 logger.debug("Running `file %s':" % core_path)
251 info = check_output(["file", core_path])
253 except CalledProcessError as e:
254 logger.error("Could not run `file' utility on core-file, "
255 "rc=%s" % e.returncode)
258 # Copy api post mortem
259 api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
260 if os.path.isfile(api_post_mortem_path):
261 logger.error("Copying api_post_mortem.%d to %s" %
262 (vpp_pid, last_test_temp_dir))
263 shutil.copy2(api_post_mortem_path, last_test_temp_dir)
266 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
267 if is_core_present(tempdir):
268 print('VPP core detected in %s. Last test running was %s' %
269 (tempdir, core_crash_test))
270 print(single_line_delim)
271 spawn_gdb(vpp_binary, get_core_path(tempdir))
272 print(single_line_delim)
275 def handle_cores(failed_testcases):
277 for failed_testcase in failed_testcases:
278 tcs_with_core = failed_testcase.testclasess_with_core
279 if len(tcs_with_core) > 0:
280 for test, vpp_binary, tempdir in tcs_with_core.values():
281 check_and_handle_core(vpp_binary, tempdir, test)
284 def process_finished_testsuite(wrapped_testcase_suite,
285 finished_testcase_suites,
286 failed_wrapped_testcases,
288 results.append(wrapped_testcase_suite.result)
289 finished_testcase_suites.add(wrapped_testcase_suite)
291 if failfast and not wrapped_testcase_suite.was_successful():
294 if not wrapped_testcase_suite.was_successful():
295 failed_wrapped_testcases.add(wrapped_testcase_suite)
296 handle_failed_suite(wrapped_testcase_suite.logger,
297 wrapped_testcase_suite.last_test_temp_dir,
298 wrapped_testcase_suite.vpp_pid)
303 def run_forked(testcase_suites):
304 wrapped_testcase_suites = set()
306 # suites are unhashable, need to use list
308 unread_testcases = set()
309 finished_unread_testcases = set()
310 manager = StreamQueueManager()
312 for i in range(concurrent_tests):
313 if len(testcase_suites) > 0:
314 wrapped_testcase_suite = TestCaseWrapper(testcase_suites.pop(0),
316 wrapped_testcase_suites.add(wrapped_testcase_suite)
317 unread_testcases.add(wrapped_testcase_suite)
321 read_from_testcases = threading.Event()
322 read_from_testcases.set()
323 stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
324 args=(unread_testcases,
325 finished_unread_testcases,
326 read_from_testcases))
327 stdouterr_thread.start()
329 failed_wrapped_testcases = set()
331 while len(wrapped_testcase_suites) > 0:
332 finished_testcase_suites = set()
333 for wrapped_testcase_suite in wrapped_testcase_suites:
334 while wrapped_testcase_suite.result_parent_end.poll():
335 wrapped_testcase_suite.result.process_result(
336 *wrapped_testcase_suite.result_parent_end.recv())
337 wrapped_testcase_suite.last_heard = time.time()
339 while wrapped_testcase_suite.keep_alive_parent_end.poll():
340 wrapped_testcase_suite.last_test, \
341 wrapped_testcase_suite.last_test_vpp_binary, \
342 wrapped_testcase_suite.last_test_temp_dir, \
343 wrapped_testcase_suite.vpp_pid = \
344 wrapped_testcase_suite.keep_alive_parent_end.recv()
345 wrapped_testcase_suite.last_heard = time.time()
347 if wrapped_testcase_suite.finished_parent_end.poll():
348 wrapped_testcase_suite.finished_parent_end.recv()
349 wrapped_testcase_suite.last_heard = time.time()
350 stop_run = process_finished_testsuite(
351 wrapped_testcase_suite,
352 finished_testcase_suites,
353 failed_wrapped_testcases,
358 if wrapped_testcase_suite.last_heard + test_timeout < time.time():
360 wrapped_testcase_suite.logger.critical(
361 "Child test runner process timed out "
362 "(last test running was `%s' in `%s')!" %
363 (wrapped_testcase_suite.last_test,
364 wrapped_testcase_suite.last_test_temp_dir))
365 elif not wrapped_testcase_suite.child.is_alive():
367 wrapped_testcase_suite.logger.critical(
368 "Child test runner process unexpectedly died "
369 "(last test running was `%s' in `%s')!" %
370 (wrapped_testcase_suite.last_test,
371 wrapped_testcase_suite.last_test_temp_dir))
372 elif wrapped_testcase_suite.last_test_temp_dir and \
373 wrapped_testcase_suite.last_test_vpp_binary:
374 if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
375 wrapped_testcase_suite.add_testclass_with_core()
376 if wrapped_testcase_suite.core_detected_at is None:
377 wrapped_testcase_suite.core_detected_at = time.time()
378 elif wrapped_testcase_suite.core_detected_at + \
379 core_timeout < time.time():
380 wrapped_testcase_suite.logger.critical(
381 "Child test runner process unresponsive and core-"
382 "file exists in test temporary directory "
383 "(last test running was `%s' in `%s')!" %
384 (wrapped_testcase_suite.last_test,
385 wrapped_testcase_suite.last_test_temp_dir))
389 wrapped_testcase_suite.child.terminate()
391 # terminating the child process tends to leave orphan
393 if wrapped_testcase_suite.vpp_pid:
394 os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
398 wrapped_testcase_suite.result.crashed = True
399 wrapped_testcase_suite.result.process_result(
400 wrapped_testcase_suite.last_test_id, ERROR)
401 stop_run = process_finished_testsuite(
402 wrapped_testcase_suite,
403 finished_testcase_suites,
404 failed_wrapped_testcases,
407 for finished_testcase in finished_testcase_suites:
408 finished_testcase.child.join()
409 finished_testcase.close_pipes()
410 wrapped_testcase_suites.remove(finished_testcase)
411 finished_unread_testcases.add(finished_testcase)
412 finished_testcase.stdouterr_queue.put(None)
414 while len(testcase_suites) > 0:
415 results.append(TestResult(testcase_suites.pop(0)))
416 elif len(testcase_suites) > 0:
417 new_testcase = TestCaseWrapper(testcase_suites.pop(0), manager)
418 wrapped_testcase_suites.add(new_testcase)
419 unread_testcases.add(new_testcase)
421 while len(unread_testcases) > 0:
422 # wait for reader thread to read everything in all loggers
425 read_from_testcases.clear()
426 stdouterr_thread.join(test_timeout)
428 handle_cores(failed_wrapped_testcases)
432 class SplitToSuitesCallback:
433 def __init__(self, filter_callback):
435 self.suite_name = 'default'
436 self.filter_callback = filter_callback
437 self.filtered = unittest.TestSuite()
439 def __call__(self, file_name, cls, method):
440 test_method = cls(method)
441 if self.filter_callback(file_name, cls.__name__, method):
442 self.suite_name = file_name + cls.__name__
443 if self.suite_name not in self.suites:
444 self.suites[self.suite_name] = unittest.TestSuite()
445 self.suites[self.suite_name].addTest(test_method)
448 self.filtered.addTest(test_method)
454 def parse_test_option():
455 f = os.getenv(test_option, None)
456 filter_file_name = None
457 filter_class_name = None
458 filter_func_name = None
463 raise Exception("Unrecognized %s option: %s" %
466 if parts[2] not in ('*', ''):
467 filter_func_name = parts[2]
468 if parts[1] not in ('*', ''):
469 filter_class_name = parts[1]
470 if parts[0] not in ('*', ''):
471 if parts[0].startswith('test_'):
472 filter_file_name = parts[0]
474 filter_file_name = 'test_%s' % parts[0]
476 if f.startswith('test_'):
479 filter_file_name = 'test_%s' % f
481 filter_file_name = '%s.py' % filter_file_name
482 return filter_file_name, filter_class_name, filter_func_name
485 def filter_tests(tests, filter_cb):
486 result = unittest.suite.TestSuite()
488 if isinstance(t, unittest.suite.TestSuite):
489 # this is a bunch of tests, recursively filter...
490 x = filter_tests(t, filter_cb)
491 if x.countTestCases() > 0:
493 elif isinstance(t, unittest.TestCase):
494 # this is a single test
495 parts = t.id().split('.')
496 # t.id() for common cases like this:
497 # test_classifier.TestClassifier.test_acl_ip
498 # apply filtering only if it is so
500 if not filter_cb(parts[0], parts[1], parts[2]):
504 # unexpected object, don't touch it
509 class FilterByTestOption:
510 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
511 self.filter_file_name = filter_file_name
512 self.filter_class_name = filter_class_name
513 self.filter_func_name = filter_func_name
515 def __call__(self, file_name, class_name, func_name):
516 if self.filter_file_name:
517 fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
520 if self.filter_class_name and class_name != self.filter_class_name:
522 if self.filter_func_name and func_name != self.filter_func_name:
527 class FilterByClassList:
528 def __init__(self, classes_with_filenames):
529 self.classes_with_filenames = classes_with_filenames
531 def __call__(self, file_name, class_name, func_name):
532 return '.'.join([file_name, class_name]) in self.classes_with_filenames
535 def suite_from_failed(suite, failed):
536 failed = {x.rsplit('.', 1)[0] for x in failed}
537 filter_cb = FilterByClassList(failed)
538 suite = filter_tests(suite, filter_cb)
542 class AllResults(dict):
544 super(AllResults, self).__init__()
545 self.all_testcases = 0
546 self.results_per_suite = []
553 self.testsuites_no_tests_run = []
555 def add_results(self, result):
556 self.results_per_suite.append(result)
557 result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN]
558 for result_type in result_types:
559 self[result_type] += len(result[result_type])
561 def add_result(self, result):
563 self.all_testcases += result.testcase_suite.countTestCases()
564 self.add_results(result)
566 if result.no_tests_run():
567 self.testsuites_no_tests_run.append(result.testcase_suite)
572 elif not result.was_successful():
576 self.rerun.append(result.testcase_suite)
580 def print_results(self):
582 print(double_line_delim)
583 print('TEST RESULTS:')
584 print(' Scheduled tests: {}'.format(self.all_testcases))
585 print(' Executed tests: {}'.format(self[TEST_RUN]))
586 print(' Passed tests: {}'.format(
587 colorize(str(self[PASS]), GREEN)))
589 print(' Skipped tests: {}'.format(
590 colorize(str(self[SKIP]), YELLOW)))
591 if self.not_executed > 0:
592 print(' Not Executed tests: {}'.format(
593 colorize(str(self.not_executed), RED)))
595 print(' Failures: {}'.format(
596 colorize(str(self[FAIL]), RED)))
598 print(' Errors: {}'.format(
599 colorize(str(self[ERROR]), RED)))
601 if self.all_failed > 0:
602 print('FAILURES AND ERRORS IN TESTS:')
603 for result in self.results_per_suite:
604 failed_testcase_ids = result[FAIL]
605 errored_testcase_ids = result[ERROR]
606 old_testcase_name = None
607 if len(failed_testcase_ids) or len(errored_testcase_ids):
608 for failed_test_id in failed_testcase_ids:
609 new_testcase_name, test_name = \
610 result.get_testcase_names(failed_test_id)
611 if new_testcase_name != old_testcase_name:
612 print(' Testcase name: {}'.format(
613 colorize(new_testcase_name, RED)))
614 old_testcase_name = new_testcase_name
615 print(' FAILURE: {}'.format(
616 colorize(test_name, RED)))
617 for failed_test_id in errored_testcase_ids:
618 new_testcase_name, test_name = \
619 result.get_testcase_names(failed_test_id)
620 if new_testcase_name != old_testcase_name:
621 print(' Testcase name: {}'.format(
622 colorize(new_testcase_name, RED)))
623 old_testcase_name = new_testcase_name
624 print(' ERROR: {}'.format(
625 colorize(test_name, RED)))
626 if len(self.testsuites_no_tests_run) > 0:
627 print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
629 for testsuite in self.testsuites_no_tests_run:
630 for testcase in testsuite:
631 tc_classes.add(get_testcase_doc_name(testcase))
632 for tc_class in tc_classes:
633 print(' {}'.format(colorize(tc_class, RED)))
635 print(double_line_delim)
639 def not_executed(self):
640 return self.all_testcases - self[TEST_RUN]
643 def all_failed(self):
644 return self[FAIL] + self[ERROR]
647 def parse_results(results):
649 Prints the number of scheduled, executed, not executed, passed, failed,
650 errored and skipped tests and details about failed and errored tests.
652 Also returns all suites where any test failed.
658 results_per_suite = AllResults()
661 for result in results:
662 result_code = results_per_suite.add_result(result)
665 elif result_code == -1:
668 results_per_suite.print_results()
676 return return_code, results_per_suite.rerun
679 def parse_digit_env(env_var, default):
680 value = os.getenv(env_var, default)
685 print('WARNING: unsupported value "%s" for env var "%s",'
686 'defaulting to %s' % (value, env_var, default))
691 if __name__ == '__main__':
693 verbose = parse_digit_env("V", 0)
695 test_timeout = parse_digit_env("TIMEOUT", 600) # default = 10 minutes
697 retries = parse_digit_env("RETRIES", 0)
699 debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
701 debug_core = os.getenv("DEBUG", "").lower() == "core"
703 step = os.getenv("STEP", "n").lower() in ("y", "yes", "1")
705 run_interactive = debug or step
707 test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
708 if test_jobs == 'auto':
711 print('Interactive mode required, running on one core')
713 shm_free = psutil.disk_usage('/dev/shm').free
714 shm_max_processes = 1
715 if shm_free < min_req_shm:
716 raise Exception('Not enough free space in /dev/shm. Required '
717 'free space is at least %sM.'
718 % (min_req_shm >> 20))
720 extra_shm = shm_free - min_req_shm
721 shm_max_processes += extra_shm / shm_per_process
722 concurrent_tests = min(cpu_count(), shm_max_processes)
723 print('Found enough resources to run tests with %s cores'
725 elif test_jobs.isdigit():
726 concurrent_tests = int(test_jobs)
730 if run_interactive and concurrent_tests > 1:
731 raise NotImplementedError(
732 'Running tests interactively (DEBUG is gdb or gdbserver or STEP '
733 'is set) in parallel (TEST_JOBS is more than 1) is not supported')
735 parser = argparse.ArgumentParser(description="VPP unit tests")
736 parser.add_argument("-f", "--failfast", action='store_true',
737 help="fast failure flag")
738 parser.add_argument("-d", "--dir", action='append', type=str,
739 help="directory containing test files "
740 "(may be specified multiple times)")
741 args = parser.parse_args()
742 failfast = args.failfast
745 print("Running tests using custom test runner") # debug message
746 filter_file, filter_class, filter_func = parse_test_option()
748 print("Active filters: file=%s, class=%s, function=%s" % (
749 filter_file, filter_class, filter_func))
751 filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
753 ignore_path = os.getenv("VENV_PATH", None)
754 cb = SplitToSuitesCallback(filter_cb)
756 print("Adding tests from directory tree %s" % d)
757 discover_tests(d, cb, ignore_path)
759 # suites are not hashable, need to use list
762 for testcase_suite in cb.suites.values():
763 tests_amount += testcase_suite.countTestCases()
764 suites.append(testcase_suite)
766 print("%s out of %s tests match specified filters" % (
767 tests_amount, tests_amount + cb.filtered.countTestCases()))
769 if not running_extended_tests():
770 print("Not running extended tests (some tests will be skipped)")
772 attempts = retries + 1
774 print("Perform %s attempts to pass the suite..." % attempts)
777 # don't fork if requiring interactive terminal
778 result = VppTestRunner(verbosity=verbose,
780 print_summary=True).run(suites[0])
781 was_successful = result.wasSuccessful()
782 if not was_successful:
783 for test_case_info in result.failed_test_cases_info:
784 handle_failed_suite(test_case_info.logger,
785 test_case_info.tempdir,
786 test_case_info.vpp_pid)
788 test_case_info in result.core_crash_test_cases_info:
789 check_and_handle_core(test_case_info.vpp_bin_path,
790 test_case_info.tempdir,
791 test_case_info.core_crash_test)
793 sys.exit(not was_successful)
796 while len(suites) > 0 and attempts > 0:
797 results = run_forked(suites)
798 exit_code, suites = parse_results(results)
801 print('Test run was successful')
803 print('%s attempt(s) left.' % attempts)