14 from multiprocessing import Process, Pipe, cpu_count
15 from multiprocessing.queues import Queue
16 from multiprocessing.managers import BaseManager
17 from framework import VppTestRunner, running_extended_tests, VppTestCase, \
18 get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
20 from debug import spawn_gdb
21 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
22 colorize, single_line_delim
23 from discover_tests import discover_tests
24 from subprocess import check_output, CalledProcessError
25 from util import check_core_path, get_core_path, is_core_present
27 # timeout which controls how long the child has to finish after seeing
28 # a core dump in test temporary directory. If this is exceeded, parent assumes
29 # that child process is stuck (e.g. waiting for shm mutex, which will never
30 # get unlocked) and kill the child
32 min_req_shm = 536870912 # min 512MB shm required
33 # 128MB per extra process
34 shm_per_process = 134217728
37 class StreamQueue(Queue):
42 sys.__stdout__.flush()
43 sys.__stderr__.flush()
46 return self._writer.fileno()
49 class StreamQueueManager(BaseManager):
53 StreamQueueManager.register('StreamQueue', StreamQueue)
56 class TestResult(dict):
57 def __init__(self, testcase_suite, testcases_by_id=None):
58 super(TestResult, self).__init__()
65 self.testcase_suite = testcase_suite
66 self.testcases = [testcase for testcase in testcase_suite]
67 self.testcases_by_id = testcases_by_id
69 def was_successful(self):
70 return 0 == len(self[FAIL]) == len(self[ERROR]) \
71 and len(self[PASS] + self[SKIP]) \
72 == self.testcase_suite.countTestCases() == len(self[TEST_RUN])
74 def no_tests_run(self):
75 return 0 == len(self[TEST_RUN])
77 def process_result(self, test_id, result):
78 self[result].append(test_id)
80 def suite_from_failed(self):
82 for testcase in self.testcase_suite:
84 if tc_id not in self[PASS] and tc_id not in self[SKIP]:
86 if len(rerun_ids) > 0:
87 return suite_from_failed(self.testcase_suite, rerun_ids)
89 def get_testcase_names(self, test_id):
90 if re.match(r'.+\..+\..+', test_id):
91 test_name = self._get_test_description(test_id)
92 testcase_name = self._get_testcase_doc_name(test_id)
94 # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
95 setup_teardown_match = re.match(
96 r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
97 if setup_teardown_match:
98 test_name, _, _, testcase_name = setup_teardown_match.groups()
99 if len(testcase_name.split('.')) == 2:
100 for key in self.testcases_by_id.keys():
101 if key.startswith(testcase_name):
104 testcase_name = self._get_testcase_doc_name(testcase_name)
107 testcase_name = test_id
109 return testcase_name, test_name
111 def _get_test_description(self, test_id):
112 return get_test_description(descriptions,
113 self.testcases_by_id[test_id])
115 def _get_testcase_doc_name(self, test_id):
116 return get_testcase_doc_name(self.testcases_by_id[test_id])
119 def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
120 finished_pipe, result_pipe, logger):
121 sys.stdout = stdouterr_queue
122 sys.stderr = stdouterr_queue
123 VppTestCase.parallel_handler = logger.handlers[0]
124 result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
125 descriptions=descriptions,
127 result_pipe=result_pipe,
128 failfast=failfast).run(suite)
129 finished_pipe.send(result.wasSuccessful())
130 finished_pipe.close()
131 keep_alive_pipe.close()
134 class TestCaseWrapper(object):
135 def __init__(self, testcase_suite, manager):
136 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
138 self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
139 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
140 self.testcase_suite = testcase_suite
141 self.stdouterr_queue = manager.StreamQueue()
142 self.logger = get_parallel_logger(self.stdouterr_queue)
143 self.child = Process(target=test_runner_wrapper,
144 args=(testcase_suite,
145 self.keep_alive_child_end,
146 self.stdouterr_queue,
147 self.finished_child_end,
148 self.result_child_end,
152 self.last_test_temp_dir = None
153 self.last_test_vpp_binary = None
154 self._last_test = None
155 self.last_test_id = None
157 self.last_heard = time.time()
158 self.core_detected_at = None
159 self.testcases_by_id = {}
160 self.testclasess_with_core = {}
161 for testcase in self.testcase_suite:
162 self.testcases_by_id[testcase.id()] = testcase
163 self.result = TestResult(testcase_suite, self.testcases_by_id)
167 return self._last_test
170 def last_test(self, test_id):
171 self.last_test_id = test_id
172 if test_id in self.testcases_by_id:
173 testcase = self.testcases_by_id[test_id]
174 self._last_test = testcase.shortDescription()
175 if not self._last_test:
176 self._last_test = str(testcase)
178 self._last_test = test_id
180 def add_testclass_with_core(self):
181 if self.last_test_id in self.testcases_by_id:
182 test = self.testcases_by_id[self.last_test_id]
183 class_name = unittest.util.strclass(test.__class__)
184 test_name = "'{}' ({})".format(get_test_description(descriptions,
188 test_name = self.last_test_id
189 class_name = re.match(r'((tearDownClass)|(setUpClass)) '
190 r'\((.+\..+)\)', test_name).groups()[3]
191 if class_name not in self.testclasess_with_core:
192 self.testclasess_with_core[class_name] = (
194 self.last_test_vpp_binary,
195 self.last_test_temp_dir)
197 def close_pipes(self):
198 self.keep_alive_child_end.close()
199 self.finished_child_end.close()
200 self.result_child_end.close()
201 self.keep_alive_parent_end.close()
202 self.finished_parent_end.close()
203 self.result_parent_end.close()
205 def was_successful(self):
206 return self.result.was_successful()
209 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
212 while read_testcases.is_set() or len(unread_testcases) > 0:
213 if not read_testcase:
214 if len(finished_unread_testcases) > 0:
215 read_testcase = finished_unread_testcases.pop()
216 unread_testcases.remove(read_testcase)
217 elif len(unread_testcases) > 0:
218 read_testcase = unread_testcases.pop()
221 while data is not None:
222 sys.stdout.write(data)
223 data = read_testcase.stdouterr_queue.get()
225 read_testcase.stdouterr_queue.close()
226 finished_unread_testcases.discard(read_testcase)
230 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid):
231 if last_test_temp_dir:
232 # Need to create link in case of a timeout or core dump without failure
233 lttd = os.path.basename(last_test_temp_dir)
234 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
235 link_path = '%s%s-FAILED' % (failed_dir, lttd)
236 if not os.path.exists(link_path):
237 logger.error("Creating a link to the failed test: %s -> %s" %
239 os.symlink(last_test_temp_dir, link_path)
241 logger.error("Link to the failed test already exists: %s -> %s" %
244 # Report core existence
245 core_path = get_core_path(last_test_temp_dir)
246 if os.path.exists(core_path):
248 "Core-file exists in test temporary directory: %s!" %
250 check_core_path(logger, core_path)
251 logger.debug("Running `file %s':" % core_path)
253 info = check_output(["file", core_path])
255 except CalledProcessError as e:
256 logger.error("Could not run `file' utility on core-file, "
257 "rc=%s" % e.returncode)
260 # Copy api post mortem
261 api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
262 if os.path.isfile(api_post_mortem_path):
263 logger.error("Copying api_post_mortem.%d to %s" %
264 (vpp_pid, last_test_temp_dir))
265 shutil.copy2(api_post_mortem_path, last_test_temp_dir)
268 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
269 if is_core_present(tempdir):
270 print('VPP core detected in %s. Last test running was %s' %
271 (tempdir, core_crash_test))
272 print(single_line_delim)
273 spawn_gdb(vpp_binary, get_core_path(tempdir))
274 print(single_line_delim)
277 def handle_cores(failed_testcases):
279 for failed_testcase in failed_testcases:
280 tcs_with_core = failed_testcase.testclasess_with_core
281 if len(tcs_with_core) > 0:
282 for test, vpp_binary, tempdir in tcs_with_core.values():
283 check_and_handle_core(vpp_binary, tempdir, test)
286 def process_finished_testsuite(wrapped_testcase_suite,
287 finished_testcase_suites,
288 failed_wrapped_testcases,
290 results.append(wrapped_testcase_suite.result)
291 finished_testcase_suites.add(wrapped_testcase_suite)
293 if failfast and not wrapped_testcase_suite.was_successful():
296 if not wrapped_testcase_suite.was_successful():
297 failed_wrapped_testcases.add(wrapped_testcase_suite)
298 handle_failed_suite(wrapped_testcase_suite.logger,
299 wrapped_testcase_suite.last_test_temp_dir,
300 wrapped_testcase_suite.vpp_pid)
305 def run_forked(testcase_suites):
306 wrapped_testcase_suites = set()
308 # suites are unhashable, need to use list
310 unread_testcases = set()
311 finished_unread_testcases = set()
312 manager = StreamQueueManager()
314 for i in range(concurrent_tests):
315 if len(testcase_suites) > 0:
316 wrapped_testcase_suite = TestCaseWrapper(testcase_suites.pop(0),
318 wrapped_testcase_suites.add(wrapped_testcase_suite)
319 unread_testcases.add(wrapped_testcase_suite)
323 read_from_testcases = threading.Event()
324 read_from_testcases.set()
325 stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
326 args=(unread_testcases,
327 finished_unread_testcases,
328 read_from_testcases))
329 stdouterr_thread.start()
331 failed_wrapped_testcases = set()
333 while len(wrapped_testcase_suites) > 0:
334 finished_testcase_suites = set()
335 for wrapped_testcase_suite in wrapped_testcase_suites:
336 while wrapped_testcase_suite.result_parent_end.poll():
337 wrapped_testcase_suite.result.process_result(
338 *wrapped_testcase_suite.result_parent_end.recv())
339 wrapped_testcase_suite.last_heard = time.time()
341 while wrapped_testcase_suite.keep_alive_parent_end.poll():
342 wrapped_testcase_suite.last_test, \
343 wrapped_testcase_suite.last_test_vpp_binary, \
344 wrapped_testcase_suite.last_test_temp_dir, \
345 wrapped_testcase_suite.vpp_pid = \
346 wrapped_testcase_suite.keep_alive_parent_end.recv()
347 wrapped_testcase_suite.last_heard = time.time()
349 if wrapped_testcase_suite.finished_parent_end.poll():
350 wrapped_testcase_suite.finished_parent_end.recv()
351 wrapped_testcase_suite.last_heard = time.time()
352 stop_run = process_finished_testsuite(
353 wrapped_testcase_suite,
354 finished_testcase_suites,
355 failed_wrapped_testcases,
360 if wrapped_testcase_suite.last_heard + test_timeout < time.time():
362 wrapped_testcase_suite.logger.critical(
363 "Child test runner process timed out "
364 "(last test running was `%s' in `%s')!" %
365 (wrapped_testcase_suite.last_test,
366 wrapped_testcase_suite.last_test_temp_dir))
367 elif not wrapped_testcase_suite.child.is_alive():
369 wrapped_testcase_suite.logger.critical(
370 "Child test runner process unexpectedly died "
371 "(last test running was `%s' in `%s')!" %
372 (wrapped_testcase_suite.last_test,
373 wrapped_testcase_suite.last_test_temp_dir))
374 elif wrapped_testcase_suite.last_test_temp_dir and \
375 wrapped_testcase_suite.last_test_vpp_binary:
376 if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
377 wrapped_testcase_suite.add_testclass_with_core()
378 if wrapped_testcase_suite.core_detected_at is None:
379 wrapped_testcase_suite.core_detected_at = time.time()
380 elif wrapped_testcase_suite.core_detected_at + \
381 core_timeout < time.time():
382 wrapped_testcase_suite.logger.critical(
383 "Child test runner process unresponsive and core-"
384 "file exists in test temporary directory "
385 "(last test running was `%s' in `%s')!" %
386 (wrapped_testcase_suite.last_test,
387 wrapped_testcase_suite.last_test_temp_dir))
391 wrapped_testcase_suite.child.terminate()
393 # terminating the child process tends to leave orphan
395 if wrapped_testcase_suite.vpp_pid:
396 os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
400 wrapped_testcase_suite.result.crashed = True
401 wrapped_testcase_suite.result.process_result(
402 wrapped_testcase_suite.last_test_id, ERROR)
403 stop_run = process_finished_testsuite(
404 wrapped_testcase_suite,
405 finished_testcase_suites,
406 failed_wrapped_testcases,
409 for finished_testcase in finished_testcase_suites:
410 finished_testcase.child.join()
411 finished_testcase.close_pipes()
412 wrapped_testcase_suites.remove(finished_testcase)
413 finished_unread_testcases.add(finished_testcase)
414 finished_testcase.stdouterr_queue.put(None)
416 while len(testcase_suites) > 0:
417 results.append(TestResult(testcase_suites.pop(0)))
418 elif len(testcase_suites) > 0:
419 new_testcase = TestCaseWrapper(testcase_suites.pop(0), manager)
420 wrapped_testcase_suites.add(new_testcase)
421 unread_testcases.add(new_testcase)
423 while len(unread_testcases) > 0:
424 # wait for reader thread to read everything in all loggers
427 read_from_testcases.clear()
428 stdouterr_thread.join(test_timeout)
430 handle_cores(failed_wrapped_testcases)
434 class SplitToSuitesCallback:
435 def __init__(self, filter_callback):
437 self.suite_name = 'default'
438 self.filter_callback = filter_callback
439 self.filtered = unittest.TestSuite()
441 def __call__(self, file_name, cls, method):
442 test_method = cls(method)
443 if self.filter_callback(file_name, cls.__name__, method):
444 self.suite_name = file_name + cls.__name__
445 if self.suite_name not in self.suites:
446 self.suites[self.suite_name] = unittest.TestSuite()
447 self.suites[self.suite_name].addTest(test_method)
450 self.filtered.addTest(test_method)
456 def parse_test_option():
457 f = os.getenv(test_option, None)
458 filter_file_name = None
459 filter_class_name = None
460 filter_func_name = None
465 raise Exception("Unrecognized %s option: %s" %
468 if parts[2] not in ('*', ''):
469 filter_func_name = parts[2]
470 if parts[1] not in ('*', ''):
471 filter_class_name = parts[1]
472 if parts[0] not in ('*', ''):
473 if parts[0].startswith('test_'):
474 filter_file_name = parts[0]
476 filter_file_name = 'test_%s' % parts[0]
478 if f.startswith('test_'):
481 filter_file_name = 'test_%s' % f
483 filter_file_name = '%s.py' % filter_file_name
484 return filter_file_name, filter_class_name, filter_func_name
487 def filter_tests(tests, filter_cb):
488 result = unittest.suite.TestSuite()
490 if isinstance(t, unittest.suite.TestSuite):
491 # this is a bunch of tests, recursively filter...
492 x = filter_tests(t, filter_cb)
493 if x.countTestCases() > 0:
495 elif isinstance(t, unittest.TestCase):
496 # this is a single test
497 parts = t.id().split('.')
498 # t.id() for common cases like this:
499 # test_classifier.TestClassifier.test_acl_ip
500 # apply filtering only if it is so
502 if not filter_cb(parts[0], parts[1], parts[2]):
506 # unexpected object, don't touch it
511 class FilterByTestOption:
512 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
513 self.filter_file_name = filter_file_name
514 self.filter_class_name = filter_class_name
515 self.filter_func_name = filter_func_name
517 def __call__(self, file_name, class_name, func_name):
518 if self.filter_file_name:
519 fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
522 if self.filter_class_name and class_name != self.filter_class_name:
524 if self.filter_func_name and func_name != self.filter_func_name:
529 class FilterByClassList:
530 def __init__(self, classes_with_filenames):
531 self.classes_with_filenames = classes_with_filenames
533 def __call__(self, file_name, class_name, func_name):
534 return '.'.join([file_name, class_name]) in self.classes_with_filenames
537 def suite_from_failed(suite, failed):
538 failed = {x.rsplit('.', 1)[0] for x in failed}
539 filter_cb = FilterByClassList(failed)
540 suite = filter_tests(suite, filter_cb)
544 class AllResults(dict):
546 super(AllResults, self).__init__()
547 self.all_testcases = 0
548 self.results_per_suite = []
555 self.testsuites_no_tests_run = []
557 def add_results(self, result):
558 self.results_per_suite.append(result)
559 result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN]
560 for result_type in result_types:
561 self[result_type] += len(result[result_type])
563 def add_result(self, result):
565 self.all_testcases += result.testcase_suite.countTestCases()
566 self.add_results(result)
568 if result.no_tests_run():
569 self.testsuites_no_tests_run.append(result.testcase_suite)
574 elif not result.was_successful():
578 if concurrent_tests == 1:
579 self.rerun.append(result.suite_from_failed())
581 self.rerun.append(result.testcase_suite)
585 def print_results(self):
587 print(double_line_delim)
588 print('TEST RESULTS:')
589 print(' Scheduled tests: {}'.format(self.all_testcases))
590 print(' Executed tests: {}'.format(self[TEST_RUN]))
591 print(' Passed tests: {}'.format(
592 colorize(str(self[PASS]), GREEN)))
594 print(' Skipped tests: {}'.format(
595 colorize(str(self[SKIP]), YELLOW)))
596 if self.not_executed > 0:
597 print(' Not Executed tests: {}'.format(
598 colorize(str(self.not_executed), RED)))
600 print(' Failures: {}'.format(
601 colorize(str(self[FAIL]), RED)))
603 print(' Errors: {}'.format(
604 colorize(str(self[ERROR]), RED)))
606 if self.all_failed > 0:
607 print('FAILURES AND ERRORS IN TESTS:')
608 for result in self.results_per_suite:
609 failed_testcase_ids = result[FAIL]
610 errored_testcase_ids = result[ERROR]
611 old_testcase_name = None
612 if len(failed_testcase_ids) or len(errored_testcase_ids):
613 for failed_test_id in failed_testcase_ids:
614 new_testcase_name, test_name = \
615 result.get_testcase_names(failed_test_id)
616 if new_testcase_name != old_testcase_name:
617 print(' Testcase name: {}'.format(
618 colorize(new_testcase_name, RED)))
619 old_testcase_name = new_testcase_name
620 print(' FAILURE: {}'.format(
621 colorize(test_name, RED)))
622 for failed_test_id in errored_testcase_ids:
623 new_testcase_name, test_name = \
624 result.get_testcase_names(failed_test_id)
625 if new_testcase_name != old_testcase_name:
626 print(' Testcase name: {}'.format(
627 colorize(new_testcase_name, RED)))
628 old_testcase_name = new_testcase_name
629 print(' ERROR: {}'.format(
630 colorize(test_name, RED)))
631 if len(self.testsuites_no_tests_run) > 0:
632 print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
634 for testsuite in self.testsuites_no_tests_run:
635 for testcase in testsuite:
636 tc_classes.add(get_testcase_doc_name(testcase))
637 for tc_class in tc_classes:
638 print(' {}'.format(colorize(tc_class, RED)))
640 print(double_line_delim)
644 def not_executed(self):
645 return self.all_testcases - self[TEST_RUN]
648 def all_failed(self):
649 return self[FAIL] + self[ERROR]
652 def parse_results(results):
654 Prints the number of scheduled, executed, not executed, passed, failed,
655 errored and skipped tests and details about failed and errored tests.
657 Also returns all suites where any test failed.
663 results_per_suite = AllResults()
666 for result in results:
667 result_code = results_per_suite.add_result(result)
670 elif result_code == -1:
673 results_per_suite.print_results()
681 return return_code, results_per_suite.rerun
684 def parse_digit_env(env_var, default):
685 value = os.getenv(env_var, default)
690 print('WARNING: unsupported value "%s" for env var "%s",'
691 'defaulting to %s' % (value, env_var, default))
696 if __name__ == '__main__':
698 verbose = parse_digit_env("V", 0)
700 test_timeout = parse_digit_env("TIMEOUT", 600) # default = 10 minutes
702 retries = parse_digit_env("RETRIES", 0)
704 debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
706 debug_core = os.getenv("DEBUG", "").lower() == "core"
708 step = os.getenv("STEP", "n").lower() in ("y", "yes", "1")
710 run_interactive = debug or step
712 test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
713 if test_jobs == 'auto':
716 print('Interactive mode required, running on one core')
718 shm_free = psutil.disk_usage('/dev/shm').free
719 shm_max_processes = 1
720 if shm_free < min_req_shm:
721 raise Exception('Not enough free space in /dev/shm. Required '
722 'free space is at least %sM.'
723 % (min_req_shm >> 20))
725 extra_shm = shm_free - min_req_shm
726 shm_max_processes += extra_shm / shm_per_process
727 concurrent_tests = min(cpu_count(), shm_max_processes)
728 print('Found enough resources to run tests with %s cores'
730 elif test_jobs.isdigit():
731 concurrent_tests = int(test_jobs)
735 if run_interactive and concurrent_tests > 1:
736 raise NotImplementedError(
737 'Running tests interactively (DEBUG is gdb or gdbserver or STEP '
738 'is set) in parallel (TEST_JOBS is more than 1) is not supported')
740 parser = argparse.ArgumentParser(description="VPP unit tests")
741 parser.add_argument("-f", "--failfast", action='store_true',
742 help="fast failure flag")
743 parser.add_argument("-d", "--dir", action='append', type=str,
744 help="directory containing test files "
745 "(may be specified multiple times)")
746 args = parser.parse_args()
747 failfast = args.failfast
750 print("Running tests using custom test runner") # debug message
751 filter_file, filter_class, filter_func = parse_test_option()
753 print("Active filters: file=%s, class=%s, function=%s" % (
754 filter_file, filter_class, filter_func))
756 filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
758 cb = SplitToSuitesCallback(filter_cb)
760 print("Adding tests from directory tree %s" % d)
761 discover_tests(d, cb)
763 # suites are not hashable, need to use list
766 for testcase_suite in cb.suites.values():
767 tests_amount += testcase_suite.countTestCases()
768 suites.append(testcase_suite)
770 if concurrent_tests == 1:
771 new_suite = unittest.TestSuite()
773 new_suite.addTests(suite)
777 print("%s out of %s tests match specified filters" % (
778 tests_amount, tests_amount + cb.filtered.countTestCases()))
780 if not running_extended_tests():
781 print("Not running extended tests (some tests will be skipped)")
783 attempts = retries + 1
785 print("Perform %s attempts to pass the suite..." % attempts)
788 # don't fork if requiring interactive terminal
789 result = VppTestRunner(verbosity=verbose, failfast=failfast)\
791 was_successful = result.wasSuccessful()
792 if not was_successful:
793 for test_case_info in result.failed_test_cases_info:
794 handle_failed_suite(test_case_info.logger,
795 test_case_info.tempdir,
796 test_case_info.vpp_pid)
798 test_case_info in result.core_crash_test_cases_info:
799 check_and_handle_core(test_case_info.vpp_bin_path,
800 test_case_info.tempdir,
801 test_case_info.core_crash_test)
803 sys.exit(not was_successful)
806 while len(suites) > 0 and attempts > 0:
807 results = run_forked(suites)
808 exit_code, suites = parse_results(results)
811 print('Test run was successful')
813 print('%s attempt(s) left.' % attempts)