14 from multiprocessing import Process, Pipe, cpu_count
15 from multiprocessing.queues import Queue
16 from multiprocessing.managers import BaseManager
17 from framework import VppTestRunner, running_extended_tests, VppTestCase, \
18 get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
20 from debug import spawn_gdb
21 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
22 colorize, single_line_delim
23 from discover_tests import discover_tests
24 from subprocess import check_output, CalledProcessError
25 from util import check_core_path, get_core_path, is_core_present
27 # timeout which controls how long the child has to finish after seeing
28 # a core dump in test temporary directory. If this is exceeded, parent assumes
29 # that child process is stuck (e.g. waiting for shm mutex, which will never
30 # get unlocked) and kill the child
32 min_req_shm = 536870912 # min 512MB shm required
33 # 128MB per extra process
34 shm_per_process = 134217728
37 class StreamQueue(Queue):
42 sys.__stdout__.flush()
43 sys.__stderr__.flush()
46 return self._writer.fileno()
49 class StreamQueueManager(BaseManager):
53 StreamQueueManager.register('StreamQueue', StreamQueue)
56 class TestResult(dict):
57 def __init__(self, testcase_suite, testcases_by_id=None):
58 super(TestResult, self).__init__()
65 self.testcase_suite = testcase_suite
66 self.testcases = [testcase for testcase in testcase_suite]
67 self.testcases_by_id = testcases_by_id
69 def was_successful(self):
70 return 0 == len(self[FAIL]) == len(self[ERROR]) \
71 and len(self[PASS] + self[SKIP]) \
72 == self.testcase_suite.countTestCases() == len(self[TEST_RUN])
74 def no_tests_run(self):
75 return 0 == len(self[TEST_RUN])
77 def process_result(self, test_id, result):
78 self[result].append(test_id)
80 def suite_from_failed(self):
82 for testcase in self.testcase_suite:
84 if tc_id not in self[PASS] and tc_id not in self[SKIP]:
86 if len(rerun_ids) > 0:
87 return suite_from_failed(self.testcase_suite, rerun_ids)
89 def get_testcase_names(self, test_id):
90 if re.match(r'.+\..+\..+', test_id):
91 test_name = self._get_test_description(test_id)
92 testcase_name = self._get_testcase_doc_name(test_id)
94 # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
95 setup_teardown_match = re.match(
96 r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
97 if setup_teardown_match:
98 test_name, _, _, testcase_name = setup_teardown_match.groups()
99 if len(testcase_name.split('.')) == 2:
100 for key in self.testcases_by_id.keys():
101 if key.startswith(testcase_name):
104 testcase_name = self._get_testcase_doc_name(testcase_name)
107 testcase_name = test_id
109 return testcase_name, test_name
111 def _get_test_description(self, test_id):
112 return get_test_description(descriptions,
113 self.testcases_by_id[test_id])
115 def _get_testcase_doc_name(self, test_id):
116 return get_testcase_doc_name(self.testcases_by_id[test_id])
119 def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
120 finished_pipe, result_pipe, logger):
121 sys.stdout = stdouterr_queue
122 sys.stderr = stdouterr_queue
123 VppTestCase.parallel_handler = logger.handlers[0]
124 result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
125 descriptions=descriptions,
127 result_pipe=result_pipe,
129 print_summary=False).run(suite)
130 finished_pipe.send(result.wasSuccessful())
131 finished_pipe.close()
132 keep_alive_pipe.close()
135 class TestCaseWrapper(object):
136 def __init__(self, testcase_suite, manager):
137 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
139 self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
140 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
141 self.testcase_suite = testcase_suite
142 self.stdouterr_queue = manager.StreamQueue()
143 self.logger = get_parallel_logger(self.stdouterr_queue)
144 self.child = Process(target=test_runner_wrapper,
145 args=(testcase_suite,
146 self.keep_alive_child_end,
147 self.stdouterr_queue,
148 self.finished_child_end,
149 self.result_child_end,
153 self.last_test_temp_dir = None
154 self.last_test_vpp_binary = None
155 self._last_test = None
156 self.last_test_id = None
158 self.last_heard = time.time()
159 self.core_detected_at = None
160 self.testcases_by_id = {}
161 self.testclasess_with_core = {}
162 for testcase in self.testcase_suite:
163 self.testcases_by_id[testcase.id()] = testcase
164 self.result = TestResult(testcase_suite, self.testcases_by_id)
168 return self._last_test
171 def last_test(self, test_id):
172 self.last_test_id = test_id
173 if test_id in self.testcases_by_id:
174 testcase = self.testcases_by_id[test_id]
175 self._last_test = testcase.shortDescription()
176 if not self._last_test:
177 self._last_test = str(testcase)
179 self._last_test = test_id
181 def add_testclass_with_core(self):
182 if self.last_test_id in self.testcases_by_id:
183 test = self.testcases_by_id[self.last_test_id]
184 class_name = unittest.util.strclass(test.__class__)
185 test_name = "'{}' ({})".format(get_test_description(descriptions,
189 test_name = self.last_test_id
190 class_name = re.match(r'((tearDownClass)|(setUpClass)) '
191 r'\((.+\..+)\)', test_name).groups()[3]
192 if class_name not in self.testclasess_with_core:
193 self.testclasess_with_core[class_name] = (
195 self.last_test_vpp_binary,
196 self.last_test_temp_dir)
198 def close_pipes(self):
199 self.keep_alive_child_end.close()
200 self.finished_child_end.close()
201 self.result_child_end.close()
202 self.keep_alive_parent_end.close()
203 self.finished_parent_end.close()
204 self.result_parent_end.close()
206 def was_successful(self):
207 return self.result.was_successful()
210 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
213 while read_testcases.is_set() or len(unread_testcases):
214 if len(finished_unread_testcases):
215 read_testcase = finished_unread_testcases.pop()
216 unread_testcases.remove(read_testcase)
217 elif len(unread_testcases):
218 read_testcase = unread_testcases.pop()
221 while data is not None:
222 sys.stdout.write(data)
223 data = read_testcase.stdouterr_queue.get()
225 read_testcase.stdouterr_queue.close()
226 finished_unread_testcases.discard(read_testcase)
230 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid):
231 if last_test_temp_dir:
232 # Need to create link in case of a timeout or core dump without failure
233 lttd = os.path.basename(last_test_temp_dir)
234 failed_dir = os.getenv('FAILED_DIR')
235 link_path = '%s%s-FAILED' % (failed_dir, lttd)
236 if not os.path.exists(link_path):
237 os.symlink(last_test_temp_dir, link_path)
238 logger.error("Symlink to failed testcase directory: %s -> %s"
241 # Report core existence
242 core_path = get_core_path(last_test_temp_dir)
243 if os.path.exists(core_path):
245 "Core-file exists in test temporary directory: %s!" %
247 check_core_path(logger, core_path)
248 logger.debug("Running `file %s':" % core_path)
250 info = check_output(["file", core_path])
252 except CalledProcessError as e:
253 logger.error("Could not run `file' utility on core-file, "
254 "rc=%s" % e.returncode)
257 # Copy api post mortem
258 api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
259 if os.path.isfile(api_post_mortem_path):
260 logger.error("Copying api_post_mortem.%d to %s" %
261 (vpp_pid, last_test_temp_dir))
262 shutil.copy2(api_post_mortem_path, last_test_temp_dir)
265 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
266 if is_core_present(tempdir):
267 print('VPP core detected in %s. Last test running was %s' %
268 (tempdir, core_crash_test))
269 print(single_line_delim)
270 spawn_gdb(vpp_binary, get_core_path(tempdir))
271 print(single_line_delim)
274 def handle_cores(failed_testcases):
276 for failed_testcase in failed_testcases:
277 tcs_with_core = failed_testcase.testclasess_with_core
278 if len(tcs_with_core) > 0:
279 for test, vpp_binary, tempdir in tcs_with_core.values():
280 check_and_handle_core(vpp_binary, tempdir, test)
283 def process_finished_testsuite(wrapped_testcase_suite,
284 finished_testcase_suites,
285 failed_wrapped_testcases,
287 results.append(wrapped_testcase_suite.result)
288 finished_testcase_suites.add(wrapped_testcase_suite)
290 if failfast and not wrapped_testcase_suite.was_successful():
293 if not wrapped_testcase_suite.was_successful():
294 failed_wrapped_testcases.add(wrapped_testcase_suite)
295 handle_failed_suite(wrapped_testcase_suite.logger,
296 wrapped_testcase_suite.last_test_temp_dir,
297 wrapped_testcase_suite.vpp_pid)
302 def run_forked(testcase_suites):
303 wrapped_testcase_suites = set()
305 # suites are unhashable, need to use list
307 unread_testcases = set()
308 finished_unread_testcases = set()
309 manager = StreamQueueManager()
311 for i in range(concurrent_tests):
312 if len(testcase_suites) > 0:
313 wrapped_testcase_suite = TestCaseWrapper(testcase_suites.pop(0),
315 wrapped_testcase_suites.add(wrapped_testcase_suite)
316 unread_testcases.add(wrapped_testcase_suite)
320 read_from_testcases = threading.Event()
321 read_from_testcases.set()
322 stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
323 args=(unread_testcases,
324 finished_unread_testcases,
325 read_from_testcases))
326 stdouterr_thread.start()
328 failed_wrapped_testcases = set()
332 while len(wrapped_testcase_suites) > 0:
333 finished_testcase_suites = set()
334 for wrapped_testcase_suite in wrapped_testcase_suites:
335 while wrapped_testcase_suite.result_parent_end.poll():
336 wrapped_testcase_suite.result.process_result(
337 *wrapped_testcase_suite.result_parent_end.recv())
338 wrapped_testcase_suite.last_heard = time.time()
340 while wrapped_testcase_suite.keep_alive_parent_end.poll():
341 wrapped_testcase_suite.last_test, \
342 wrapped_testcase_suite.last_test_vpp_binary, \
343 wrapped_testcase_suite.last_test_temp_dir, \
344 wrapped_testcase_suite.vpp_pid = \
345 wrapped_testcase_suite.keep_alive_parent_end.recv()
346 wrapped_testcase_suite.last_heard = time.time()
348 if wrapped_testcase_suite.finished_parent_end.poll():
349 wrapped_testcase_suite.finished_parent_end.recv()
350 wrapped_testcase_suite.last_heard = time.time()
351 stop_run = process_finished_testsuite(
352 wrapped_testcase_suite,
353 finished_testcase_suites,
354 failed_wrapped_testcases,
359 if wrapped_testcase_suite.last_heard + test_timeout < \
362 wrapped_testcase_suite.logger.critical(
363 "Child test runner process timed out "
364 "(last test running was `%s' in `%s')!" %
365 (wrapped_testcase_suite.last_test,
366 wrapped_testcase_suite.last_test_temp_dir))
367 elif not wrapped_testcase_suite.child.is_alive():
369 wrapped_testcase_suite.logger.critical(
370 "Child test runner process unexpectedly died "
371 "(last test running was `%s' in `%s')!" %
372 (wrapped_testcase_suite.last_test,
373 wrapped_testcase_suite.last_test_temp_dir))
374 elif wrapped_testcase_suite.last_test_temp_dir and \
375 wrapped_testcase_suite.last_test_vpp_binary:
377 wrapped_testcase_suite.last_test_temp_dir):
378 wrapped_testcase_suite.add_testclass_with_core()
379 if wrapped_testcase_suite.core_detected_at is None:
380 wrapped_testcase_suite.core_detected_at = \
382 elif wrapped_testcase_suite.core_detected_at + \
383 core_timeout < time.time():
384 wrapped_testcase_suite.logger.critical(
385 "Child test runner process unresponsive and "
386 "core-file exists in test temporary directory "
387 "(last test running was `%s' in `%s')!" %
388 (wrapped_testcase_suite.last_test,
389 wrapped_testcase_suite.last_test_temp_dir))
393 wrapped_testcase_suite.child.terminate()
395 # terminating the child process tends to leave orphan
397 if wrapped_testcase_suite.vpp_pid:
398 os.kill(wrapped_testcase_suite.vpp_pid,
403 wrapped_testcase_suite.result.crashed = True
404 wrapped_testcase_suite.result.process_result(
405 wrapped_testcase_suite.last_test_id, ERROR)
406 stop_run = process_finished_testsuite(
407 wrapped_testcase_suite,
408 finished_testcase_suites,
409 failed_wrapped_testcases,
412 for finished_testcase in finished_testcase_suites:
413 finished_testcase.child.join()
414 finished_testcase.close_pipes()
415 wrapped_testcase_suites.remove(finished_testcase)
416 finished_unread_testcases.add(finished_testcase)
417 finished_testcase.stdouterr_queue.put(None)
419 while len(testcase_suites) > 0:
420 results.append(TestResult(testcase_suites.pop(0)))
421 elif len(testcase_suites) > 0:
422 new_testcase = TestCaseWrapper(testcase_suites.pop(0),
424 wrapped_testcase_suites.add(new_testcase)
425 unread_testcases.add(new_testcase)
427 for wrapped_testcase_suite in wrapped_testcase_suites:
428 wrapped_testcase_suite.child.terminate()
429 wrapped_testcase_suite.stdouterr_queue.put(None)
432 read_from_testcases.clear()
433 stdouterr_thread.join(test_timeout)
436 handle_cores(failed_wrapped_testcases)
440 class SplitToSuitesCallback:
441 def __init__(self, filter_callback):
443 self.suite_name = 'default'
444 self.filter_callback = filter_callback
445 self.filtered = unittest.TestSuite()
447 def __call__(self, file_name, cls, method):
448 test_method = cls(method)
449 if self.filter_callback(file_name, cls.__name__, method):
450 self.suite_name = file_name + cls.__name__
451 if self.suite_name not in self.suites:
452 self.suites[self.suite_name] = unittest.TestSuite()
453 self.suites[self.suite_name].addTest(test_method)
456 self.filtered.addTest(test_method)
462 def parse_test_option():
463 f = os.getenv(test_option, None)
464 filter_file_name = None
465 filter_class_name = None
466 filter_func_name = None
471 raise Exception("Unrecognized %s option: %s" %
474 if parts[2] not in ('*', ''):
475 filter_func_name = parts[2]
476 if parts[1] not in ('*', ''):
477 filter_class_name = parts[1]
478 if parts[0] not in ('*', ''):
479 if parts[0].startswith('test_'):
480 filter_file_name = parts[0]
482 filter_file_name = 'test_%s' % parts[0]
484 if f.startswith('test_'):
487 filter_file_name = 'test_%s' % f
489 filter_file_name = '%s.py' % filter_file_name
490 return filter_file_name, filter_class_name, filter_func_name
493 def filter_tests(tests, filter_cb):
494 result = unittest.suite.TestSuite()
496 if isinstance(t, unittest.suite.TestSuite):
497 # this is a bunch of tests, recursively filter...
498 x = filter_tests(t, filter_cb)
499 if x.countTestCases() > 0:
501 elif isinstance(t, unittest.TestCase):
502 # this is a single test
503 parts = t.id().split('.')
504 # t.id() for common cases like this:
505 # test_classifier.TestClassifier.test_acl_ip
506 # apply filtering only if it is so
508 if not filter_cb(parts[0], parts[1], parts[2]):
512 # unexpected object, don't touch it
517 class FilterByTestOption:
518 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
519 self.filter_file_name = filter_file_name
520 self.filter_class_name = filter_class_name
521 self.filter_func_name = filter_func_name
523 def __call__(self, file_name, class_name, func_name):
524 if self.filter_file_name:
525 fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
528 if self.filter_class_name and class_name != self.filter_class_name:
530 if self.filter_func_name and func_name != self.filter_func_name:
535 class FilterByClassList:
536 def __init__(self, classes_with_filenames):
537 self.classes_with_filenames = classes_with_filenames
539 def __call__(self, file_name, class_name, func_name):
540 return '.'.join([file_name, class_name]) in self.classes_with_filenames
543 def suite_from_failed(suite, failed):
544 failed = {x.rsplit('.', 1)[0] for x in failed}
545 filter_cb = FilterByClassList(failed)
546 suite = filter_tests(suite, filter_cb)
550 class AllResults(dict):
552 super(AllResults, self).__init__()
553 self.all_testcases = 0
554 self.results_per_suite = []
561 self.testsuites_no_tests_run = []
563 def add_results(self, result):
564 self.results_per_suite.append(result)
565 result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN]
566 for result_type in result_types:
567 self[result_type] += len(result[result_type])
569 def add_result(self, result):
571 self.all_testcases += result.testcase_suite.countTestCases()
572 self.add_results(result)
574 if result.no_tests_run():
575 self.testsuites_no_tests_run.append(result.testcase_suite)
580 elif not result.was_successful():
584 self.rerun.append(result.testcase_suite)
588 def print_results(self):
590 print(double_line_delim)
591 print('TEST RESULTS:')
592 print(' Scheduled tests: {}'.format(self.all_testcases))
593 print(' Executed tests: {}'.format(self[TEST_RUN]))
594 print(' Passed tests: {}'.format(
595 colorize(str(self[PASS]), GREEN)))
597 print(' Skipped tests: {}'.format(
598 colorize(str(self[SKIP]), YELLOW)))
599 if self.not_executed > 0:
600 print(' Not Executed tests: {}'.format(
601 colorize(str(self.not_executed), RED)))
603 print(' Failures: {}'.format(
604 colorize(str(self[FAIL]), RED)))
606 print(' Errors: {}'.format(
607 colorize(str(self[ERROR]), RED)))
609 if self.all_failed > 0:
610 print('FAILURES AND ERRORS IN TESTS:')
611 for result in self.results_per_suite:
612 failed_testcase_ids = result[FAIL]
613 errored_testcase_ids = result[ERROR]
614 old_testcase_name = None
615 if len(failed_testcase_ids) or len(errored_testcase_ids):
616 for failed_test_id in failed_testcase_ids:
617 new_testcase_name, test_name = \
618 result.get_testcase_names(failed_test_id)
619 if new_testcase_name != old_testcase_name:
620 print(' Testcase name: {}'.format(
621 colorize(new_testcase_name, RED)))
622 old_testcase_name = new_testcase_name
623 print(' FAILURE: {} [{}]'.format(
624 colorize(test_name, RED), failed_test_id))
625 for failed_test_id in errored_testcase_ids:
626 new_testcase_name, test_name = \
627 result.get_testcase_names(failed_test_id)
628 if new_testcase_name != old_testcase_name:
629 print(' Testcase name: {}'.format(
630 colorize(new_testcase_name, RED)))
631 old_testcase_name = new_testcase_name
632 print(' ERROR: {} [{}]'.format(
633 colorize(test_name, RED), failed_test_id))
634 if len(self.testsuites_no_tests_run) > 0:
635 print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
637 for testsuite in self.testsuites_no_tests_run:
638 for testcase in testsuite:
639 tc_classes.add(get_testcase_doc_name(testcase))
640 for tc_class in tc_classes:
641 print(' {}'.format(colorize(tc_class, RED)))
643 print(double_line_delim)
647 def not_executed(self):
648 return self.all_testcases - self[TEST_RUN]
651 def all_failed(self):
652 return self[FAIL] + self[ERROR]
655 def parse_results(results):
657 Prints the number of scheduled, executed, not executed, passed, failed,
658 errored and skipped tests and details about failed and errored tests.
660 Also returns all suites where any test failed.
666 results_per_suite = AllResults()
669 for result in results:
670 result_code = results_per_suite.add_result(result)
673 elif result_code == -1:
676 results_per_suite.print_results()
684 return return_code, results_per_suite.rerun
687 def parse_digit_env(env_var, default):
688 value = os.getenv(env_var, default)
693 print('WARNING: unsupported value "%s" for env var "%s",'
694 'defaulting to %s' % (value, env_var, default))
699 if __name__ == '__main__':
701 verbose = parse_digit_env("V", 0)
703 test_timeout = parse_digit_env("TIMEOUT", 600) # default = 10 minutes
705 retries = parse_digit_env("RETRIES", 0)
707 debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
709 debug_core = os.getenv("DEBUG", "").lower() == "core"
711 step = os.getenv("STEP", "n").lower() in ("y", "yes", "1")
713 run_interactive = debug or step
715 test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
716 if test_jobs == 'auto':
719 print('Interactive mode required, running on one core')
721 shm_free = psutil.disk_usage('/dev/shm').free
722 shm_max_processes = 1
723 if shm_free < min_req_shm:
724 raise Exception('Not enough free space in /dev/shm. Required '
725 'free space is at least %sM.'
726 % (min_req_shm >> 20))
728 extra_shm = shm_free - min_req_shm
729 shm_max_processes += extra_shm / shm_per_process
730 concurrent_tests = min(cpu_count(), shm_max_processes)
731 print('Found enough resources to run tests with %s cores'
733 elif test_jobs.isdigit():
734 concurrent_tests = int(test_jobs)
738 if run_interactive and concurrent_tests > 1:
739 raise NotImplementedError(
740 'Running tests interactively (DEBUG is gdb or gdbserver or STEP '
741 'is set) in parallel (TEST_JOBS is more than 1) is not supported')
743 parser = argparse.ArgumentParser(description="VPP unit tests")
744 parser.add_argument("-f", "--failfast", action='store_true',
745 help="fast failure flag")
746 parser.add_argument("-d", "--dir", action='append', type=str,
747 help="directory containing test files "
748 "(may be specified multiple times)")
749 args = parser.parse_args()
750 failfast = args.failfast
753 print("Running tests using custom test runner") # debug message
754 filter_file, filter_class, filter_func = parse_test_option()
756 print("Active filters: file=%s, class=%s, function=%s" % (
757 filter_file, filter_class, filter_func))
759 filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
761 ignore_path = os.getenv("VENV_PATH", None)
762 cb = SplitToSuitesCallback(filter_cb)
764 print("Adding tests from directory tree %s" % d)
765 discover_tests(d, cb, ignore_path)
767 # suites are not hashable, need to use list
770 for testcase_suite in cb.suites.values():
771 tests_amount += testcase_suite.countTestCases()
772 suites.append(testcase_suite)
774 print("%s out of %s tests match specified filters" % (
775 tests_amount, tests_amount + cb.filtered.countTestCases()))
777 if not running_extended_tests():
778 print("Not running extended tests (some tests will be skipped)")
780 attempts = retries + 1
782 print("Perform %s attempts to pass the suite..." % attempts)
785 # don't fork if requiring interactive terminal
786 result = VppTestRunner(verbosity=verbose,
788 print_summary=True).run(suites[0])
789 was_successful = result.wasSuccessful()
790 if not was_successful:
791 for test_case_info in result.failed_test_cases_info:
792 handle_failed_suite(test_case_info.logger,
793 test_case_info.tempdir,
794 test_case_info.vpp_pid)
796 test_case_info in result.core_crash_test_cases_info:
797 check_and_handle_core(test_case_info.vpp_bin_path,
798 test_case_info.tempdir,
799 test_case_info.core_crash_test)
801 sys.exit(not was_successful)
804 while len(suites) > 0 and attempts > 0:
805 results = run_forked(suites)
806 exit_code, suites = parse_results(results)
809 print('Test run was successful')
811 print('%s attempt(s) left.' % attempts)