14 from multiprocessing import Process, Pipe, cpu_count
15 from multiprocessing.queues import Queue
16 from multiprocessing.managers import BaseManager
17 from framework import VppTestRunner, running_extended_tests, VppTestCase, \
18 get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
20 from debug import spawn_gdb
21 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
22 colorize, single_line_delim
23 from discover_tests import discover_tests
24 from subprocess import check_output, CalledProcessError
25 from util import check_core_path, get_core_path, is_core_present
27 # timeout which controls how long the child has to finish after seeing
28 # a core dump in test temporary directory. If this is exceeded, parent assumes
29 # that child process is stuck (e.g. waiting for shm mutex, which will never
30 # get unlocked) and kill the child
32 min_req_shm = 536870912 # min 512MB shm required
33 # 128MB per extra process
34 shm_per_process = 134217728
37 class StreamQueue(Queue):
42 sys.__stdout__.flush()
43 sys.__stderr__.flush()
46 return self._writer.fileno()
49 class StreamQueueManager(BaseManager):
53 StreamQueueManager.register('StreamQueue', StreamQueue)
56 class TestResult(dict):
57 def __init__(self, testcase_suite, testcases_by_id=None):
58 super(TestResult, self).__init__()
65 self.testcase_suite = testcase_suite
66 self.testcases = [testcase for testcase in testcase_suite]
67 self.testcases_by_id = testcases_by_id
69 def was_successful(self):
70 return 0 == len(self[FAIL]) == len(self[ERROR]) \
71 and len(self[PASS] + self[SKIP]) \
72 == self.testcase_suite.countTestCases() == len(self[TEST_RUN])
74 def no_tests_run(self):
75 return 0 == len(self[TEST_RUN])
77 def process_result(self, test_id, result):
78 self[result].append(test_id)
80 def suite_from_failed(self):
82 for testcase in self.testcase_suite:
84 if tc_id not in self[PASS] and tc_id not in self[SKIP]:
86 if len(rerun_ids) > 0:
87 return suite_from_failed(self.testcase_suite, rerun_ids)
89 def get_testcase_names(self, test_id):
90 if re.match(r'.+\..+\..+', test_id):
91 test_name = self._get_test_description(test_id)
92 testcase_name = self._get_testcase_doc_name(test_id)
94 # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
95 setup_teardown_match = re.match(
96 r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
97 if setup_teardown_match:
98 test_name, _, _, testcase_name = setup_teardown_match.groups()
99 if len(testcase_name.split('.')) == 2:
100 for key in self.testcases_by_id.keys():
101 if key.startswith(testcase_name):
104 testcase_name = self._get_testcase_doc_name(testcase_name)
107 testcase_name = test_id
109 return testcase_name, test_name
111 def _get_test_description(self, test_id):
112 return get_test_description(descriptions,
113 self.testcases_by_id[test_id])
115 def _get_testcase_doc_name(self, test_id):
116 return get_testcase_doc_name(self.testcases_by_id[test_id])
119 def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
120 finished_pipe, result_pipe, logger):
121 sys.stdout = stdouterr_queue
122 sys.stderr = stdouterr_queue
123 VppTestCase.parallel_handler = logger.handlers[0]
124 result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
125 descriptions=descriptions,
127 result_pipe=result_pipe,
129 print_summary=False).run(suite)
130 finished_pipe.send(result.wasSuccessful())
131 finished_pipe.close()
132 keep_alive_pipe.close()
135 class TestCaseWrapper(object):
136 def __init__(self, testcase_suite, manager):
137 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
139 self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
140 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
141 self.testcase_suite = testcase_suite
142 if sys.version[0] == '2':
143 self.stdouterr_queue = manager.StreamQueue()
145 from multiprocessing import get_context
146 self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
147 self.logger = get_parallel_logger(self.stdouterr_queue)
148 self.child = Process(target=test_runner_wrapper,
149 args=(testcase_suite,
150 self.keep_alive_child_end,
151 self.stdouterr_queue,
152 self.finished_child_end,
153 self.result_child_end,
157 self.last_test_temp_dir = None
158 self.last_test_vpp_binary = None
159 self._last_test = None
160 self.last_test_id = None
162 self.last_heard = time.time()
163 self.core_detected_at = None
164 self.testcases_by_id = {}
165 self.testclasess_with_core = {}
166 for testcase in self.testcase_suite:
167 self.testcases_by_id[testcase.id()] = testcase
168 self.result = TestResult(testcase_suite, self.testcases_by_id)
172 return self._last_test
175 def last_test(self, test_id):
176 self.last_test_id = test_id
177 if test_id in self.testcases_by_id:
178 testcase = self.testcases_by_id[test_id]
179 self._last_test = testcase.shortDescription()
180 if not self._last_test:
181 self._last_test = str(testcase)
183 self._last_test = test_id
185 def add_testclass_with_core(self):
186 if self.last_test_id in self.testcases_by_id:
187 test = self.testcases_by_id[self.last_test_id]
188 class_name = unittest.util.strclass(test.__class__)
189 test_name = "'{}' ({})".format(get_test_description(descriptions,
193 test_name = self.last_test_id
194 class_name = re.match(r'((tearDownClass)|(setUpClass)) '
195 r'\((.+\..+)\)', test_name).groups()[3]
196 if class_name not in self.testclasess_with_core:
197 self.testclasess_with_core[class_name] = (
199 self.last_test_vpp_binary,
200 self.last_test_temp_dir)
202 def close_pipes(self):
203 self.keep_alive_child_end.close()
204 self.finished_child_end.close()
205 self.result_child_end.close()
206 self.keep_alive_parent_end.close()
207 self.finished_parent_end.close()
208 self.result_parent_end.close()
210 def was_successful(self):
211 return self.result.was_successful()
214 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
217 while read_testcases.is_set() or len(unread_testcases):
218 if len(finished_unread_testcases):
219 read_testcase = finished_unread_testcases.pop()
220 unread_testcases.remove(read_testcase)
221 elif len(unread_testcases):
222 read_testcase = unread_testcases.pop()
225 while data is not None:
226 sys.stdout.write(data)
227 data = read_testcase.stdouterr_queue.get()
229 read_testcase.stdouterr_queue.close()
230 finished_unread_testcases.discard(read_testcase)
234 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid):
235 if last_test_temp_dir:
236 # Need to create link in case of a timeout or core dump without failure
237 lttd = os.path.basename(last_test_temp_dir)
238 failed_dir = os.getenv('FAILED_DIR')
239 link_path = '%s%s-FAILED' % (failed_dir, lttd)
240 if not os.path.exists(link_path):
241 os.symlink(last_test_temp_dir, link_path)
242 logger.error("Symlink to failed testcase directory: %s -> %s"
245 # Report core existence
246 core_path = get_core_path(last_test_temp_dir)
247 if os.path.exists(core_path):
249 "Core-file exists in test temporary directory: %s!" %
251 check_core_path(logger, core_path)
252 logger.debug("Running `file %s':" % core_path)
254 info = check_output(["file", core_path])
256 except CalledProcessError as e:
257 logger.error("Could not run `file' utility on core-file, "
258 "rc=%s" % e.returncode)
261 # Copy api post mortem
262 api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
263 if os.path.isfile(api_post_mortem_path):
264 logger.error("Copying api_post_mortem.%d to %s" %
265 (vpp_pid, last_test_temp_dir))
266 shutil.copy2(api_post_mortem_path, last_test_temp_dir)
269 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
270 if is_core_present(tempdir):
271 print('VPP core detected in %s. Last test running was %s' %
272 (tempdir, core_crash_test))
273 print(single_line_delim)
274 spawn_gdb(vpp_binary, get_core_path(tempdir))
275 print(single_line_delim)
278 def handle_cores(failed_testcases):
280 for failed_testcase in failed_testcases:
281 tcs_with_core = failed_testcase.testclasess_with_core
282 if len(tcs_with_core) > 0:
283 for test, vpp_binary, tempdir in tcs_with_core.values():
284 check_and_handle_core(vpp_binary, tempdir, test)
287 def process_finished_testsuite(wrapped_testcase_suite,
288 finished_testcase_suites,
289 failed_wrapped_testcases,
291 results.append(wrapped_testcase_suite.result)
292 finished_testcase_suites.add(wrapped_testcase_suite)
294 if failfast and not wrapped_testcase_suite.was_successful():
297 if not wrapped_testcase_suite.was_successful():
298 failed_wrapped_testcases.add(wrapped_testcase_suite)
299 handle_failed_suite(wrapped_testcase_suite.logger,
300 wrapped_testcase_suite.last_test_temp_dir,
301 wrapped_testcase_suite.vpp_pid)
306 def run_forked(testcase_suites):
307 wrapped_testcase_suites = set()
309 # suites are unhashable, need to use list
311 unread_testcases = set()
312 finished_unread_testcases = set()
313 manager = StreamQueueManager()
315 for i in range(concurrent_tests):
316 if len(testcase_suites) > 0:
317 wrapped_testcase_suite = TestCaseWrapper(testcase_suites.pop(0),
319 wrapped_testcase_suites.add(wrapped_testcase_suite)
320 unread_testcases.add(wrapped_testcase_suite)
324 read_from_testcases = threading.Event()
325 read_from_testcases.set()
326 stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
327 args=(unread_testcases,
328 finished_unread_testcases,
329 read_from_testcases))
330 stdouterr_thread.start()
332 failed_wrapped_testcases = set()
336 while len(wrapped_testcase_suites) > 0:
337 finished_testcase_suites = set()
338 for wrapped_testcase_suite in wrapped_testcase_suites:
339 while wrapped_testcase_suite.result_parent_end.poll():
340 wrapped_testcase_suite.result.process_result(
341 *wrapped_testcase_suite.result_parent_end.recv())
342 wrapped_testcase_suite.last_heard = time.time()
344 while wrapped_testcase_suite.keep_alive_parent_end.poll():
345 wrapped_testcase_suite.last_test, \
346 wrapped_testcase_suite.last_test_vpp_binary, \
347 wrapped_testcase_suite.last_test_temp_dir, \
348 wrapped_testcase_suite.vpp_pid = \
349 wrapped_testcase_suite.keep_alive_parent_end.recv()
350 wrapped_testcase_suite.last_heard = time.time()
352 if wrapped_testcase_suite.finished_parent_end.poll():
353 wrapped_testcase_suite.finished_parent_end.recv()
354 wrapped_testcase_suite.last_heard = time.time()
355 stop_run = process_finished_testsuite(
356 wrapped_testcase_suite,
357 finished_testcase_suites,
358 failed_wrapped_testcases,
363 if wrapped_testcase_suite.last_heard + test_timeout < \
366 wrapped_testcase_suite.logger.critical(
367 "Child test runner process timed out "
368 "(last test running was `%s' in `%s')!" %
369 (wrapped_testcase_suite.last_test,
370 wrapped_testcase_suite.last_test_temp_dir))
371 elif not wrapped_testcase_suite.child.is_alive():
373 wrapped_testcase_suite.logger.critical(
374 "Child test runner process unexpectedly died "
375 "(last test running was `%s' in `%s')!" %
376 (wrapped_testcase_suite.last_test,
377 wrapped_testcase_suite.last_test_temp_dir))
378 elif wrapped_testcase_suite.last_test_temp_dir and \
379 wrapped_testcase_suite.last_test_vpp_binary:
381 wrapped_testcase_suite.last_test_temp_dir):
382 wrapped_testcase_suite.add_testclass_with_core()
383 if wrapped_testcase_suite.core_detected_at is None:
384 wrapped_testcase_suite.core_detected_at = \
386 elif wrapped_testcase_suite.core_detected_at + \
387 core_timeout < time.time():
388 wrapped_testcase_suite.logger.critical(
389 "Child test runner process unresponsive and "
390 "core-file exists in test temporary directory "
391 "(last test running was `%s' in `%s')!" %
392 (wrapped_testcase_suite.last_test,
393 wrapped_testcase_suite.last_test_temp_dir))
397 wrapped_testcase_suite.child.terminate()
399 # terminating the child process tends to leave orphan
401 if wrapped_testcase_suite.vpp_pid:
402 os.kill(wrapped_testcase_suite.vpp_pid,
407 wrapped_testcase_suite.result.crashed = True
408 wrapped_testcase_suite.result.process_result(
409 wrapped_testcase_suite.last_test_id, ERROR)
410 stop_run = process_finished_testsuite(
411 wrapped_testcase_suite,
412 finished_testcase_suites,
413 failed_wrapped_testcases,
416 for finished_testcase in finished_testcase_suites:
417 finished_testcase.child.join()
418 finished_testcase.close_pipes()
419 wrapped_testcase_suites.remove(finished_testcase)
420 finished_unread_testcases.add(finished_testcase)
421 finished_testcase.stdouterr_queue.put(None)
423 while len(testcase_suites) > 0:
424 results.append(TestResult(testcase_suites.pop(0)))
425 elif len(testcase_suites) > 0:
426 new_testcase = TestCaseWrapper(testcase_suites.pop(0),
428 wrapped_testcase_suites.add(new_testcase)
429 unread_testcases.add(new_testcase)
431 for wrapped_testcase_suite in wrapped_testcase_suites:
432 wrapped_testcase_suite.child.terminate()
433 wrapped_testcase_suite.stdouterr_queue.put(None)
436 read_from_testcases.clear()
437 stdouterr_thread.join(test_timeout)
440 handle_cores(failed_wrapped_testcases)
444 class SplitToSuitesCallback:
445 def __init__(self, filter_callback):
447 self.suite_name = 'default'
448 self.filter_callback = filter_callback
449 self.filtered = unittest.TestSuite()
451 def __call__(self, file_name, cls, method):
452 test_method = cls(method)
453 if self.filter_callback(file_name, cls.__name__, method):
454 self.suite_name = file_name + cls.__name__
455 if self.suite_name not in self.suites:
456 self.suites[self.suite_name] = unittest.TestSuite()
457 self.suites[self.suite_name].addTest(test_method)
460 self.filtered.addTest(test_method)
466 def parse_test_option():
467 f = os.getenv(test_option, None)
468 filter_file_name = None
469 filter_class_name = None
470 filter_func_name = None
475 raise Exception("Unrecognized %s option: %s" %
478 if parts[2] not in ('*', ''):
479 filter_func_name = parts[2]
480 if parts[1] not in ('*', ''):
481 filter_class_name = parts[1]
482 if parts[0] not in ('*', ''):
483 if parts[0].startswith('test_'):
484 filter_file_name = parts[0]
486 filter_file_name = 'test_%s' % parts[0]
488 if f.startswith('test_'):
491 filter_file_name = 'test_%s' % f
493 filter_file_name = '%s.py' % filter_file_name
494 return filter_file_name, filter_class_name, filter_func_name
497 def filter_tests(tests, filter_cb):
498 result = unittest.suite.TestSuite()
500 if isinstance(t, unittest.suite.TestSuite):
501 # this is a bunch of tests, recursively filter...
502 x = filter_tests(t, filter_cb)
503 if x.countTestCases() > 0:
505 elif isinstance(t, unittest.TestCase):
506 # this is a single test
507 parts = t.id().split('.')
508 # t.id() for common cases like this:
509 # test_classifier.TestClassifier.test_acl_ip
510 # apply filtering only if it is so
512 if not filter_cb(parts[0], parts[1], parts[2]):
516 # unexpected object, don't touch it
521 class FilterByTestOption:
522 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
523 self.filter_file_name = filter_file_name
524 self.filter_class_name = filter_class_name
525 self.filter_func_name = filter_func_name
527 def __call__(self, file_name, class_name, func_name):
528 if self.filter_file_name:
529 fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
532 if self.filter_class_name and class_name != self.filter_class_name:
534 if self.filter_func_name and func_name != self.filter_func_name:
539 class FilterByClassList:
540 def __init__(self, classes_with_filenames):
541 self.classes_with_filenames = classes_with_filenames
543 def __call__(self, file_name, class_name, func_name):
544 return '.'.join([file_name, class_name]) in self.classes_with_filenames
547 def suite_from_failed(suite, failed):
548 failed = {x.rsplit('.', 1)[0] for x in failed}
549 filter_cb = FilterByClassList(failed)
550 suite = filter_tests(suite, filter_cb)
554 class AllResults(dict):
556 super(AllResults, self).__init__()
557 self.all_testcases = 0
558 self.results_per_suite = []
565 self.testsuites_no_tests_run = []
567 def add_results(self, result):
568 self.results_per_suite.append(result)
569 result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN]
570 for result_type in result_types:
571 self[result_type] += len(result[result_type])
573 def add_result(self, result):
575 self.all_testcases += result.testcase_suite.countTestCases()
576 self.add_results(result)
578 if result.no_tests_run():
579 self.testsuites_no_tests_run.append(result.testcase_suite)
584 elif not result.was_successful():
588 self.rerun.append(result.testcase_suite)
592 def print_results(self):
594 print(double_line_delim)
595 print('TEST RESULTS:')
596 print(' Scheduled tests: {}'.format(self.all_testcases))
597 print(' Executed tests: {}'.format(self[TEST_RUN]))
598 print(' Passed tests: {}'.format(
599 colorize(str(self[PASS]), GREEN)))
601 print(' Skipped tests: {}'.format(
602 colorize(str(self[SKIP]), YELLOW)))
603 if self.not_executed > 0:
604 print(' Not Executed tests: {}'.format(
605 colorize(str(self.not_executed), RED)))
607 print(' Failures: {}'.format(
608 colorize(str(self[FAIL]), RED)))
610 print(' Errors: {}'.format(
611 colorize(str(self[ERROR]), RED)))
613 if self.all_failed > 0:
614 print('FAILURES AND ERRORS IN TESTS:')
615 for result in self.results_per_suite:
616 failed_testcase_ids = result[FAIL]
617 errored_testcase_ids = result[ERROR]
618 old_testcase_name = None
619 if len(failed_testcase_ids) or len(errored_testcase_ids):
620 for failed_test_id in failed_testcase_ids:
621 new_testcase_name, test_name = \
622 result.get_testcase_names(failed_test_id)
623 if new_testcase_name != old_testcase_name:
624 print(' Testcase name: {}'.format(
625 colorize(new_testcase_name, RED)))
626 old_testcase_name = new_testcase_name
627 print(' FAILURE: {} [{}]'.format(
628 colorize(test_name, RED), failed_test_id))
629 for failed_test_id in errored_testcase_ids:
630 new_testcase_name, test_name = \
631 result.get_testcase_names(failed_test_id)
632 if new_testcase_name != old_testcase_name:
633 print(' Testcase name: {}'.format(
634 colorize(new_testcase_name, RED)))
635 old_testcase_name = new_testcase_name
636 print(' ERROR: {} [{}]'.format(
637 colorize(test_name, RED), failed_test_id))
638 if len(self.testsuites_no_tests_run) > 0:
639 print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
641 for testsuite in self.testsuites_no_tests_run:
642 for testcase in testsuite:
643 tc_classes.add(get_testcase_doc_name(testcase))
644 for tc_class in tc_classes:
645 print(' {}'.format(colorize(tc_class, RED)))
647 print(double_line_delim)
651 def not_executed(self):
652 return self.all_testcases - self[TEST_RUN]
655 def all_failed(self):
656 return self[FAIL] + self[ERROR]
659 def parse_results(results):
661 Prints the number of scheduled, executed, not executed, passed, failed,
662 errored and skipped tests and details about failed and errored tests.
664 Also returns all suites where any test failed.
670 results_per_suite = AllResults()
673 for result in results:
674 result_code = results_per_suite.add_result(result)
677 elif result_code == -1:
680 results_per_suite.print_results()
688 return return_code, results_per_suite.rerun
691 def parse_digit_env(env_var, default):
692 value = os.getenv(env_var, default)
697 print('WARNING: unsupported value "%s" for env var "%s",'
698 'defaulting to %s' % (value, env_var, default))
703 if __name__ == '__main__':
705 verbose = parse_digit_env("V", 0)
707 test_timeout = parse_digit_env("TIMEOUT", 600) # default = 10 minutes
709 retries = parse_digit_env("RETRIES", 0)
711 debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
713 debug_core = os.getenv("DEBUG", "").lower() == "core"
715 step = os.getenv("STEP", "n").lower() in ("y", "yes", "1")
717 run_interactive = debug or step
719 test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
720 if test_jobs == 'auto':
723 print('Interactive mode required, running on one core')
725 shm_free = psutil.disk_usage('/dev/shm').free
726 shm_max_processes = 1
727 if shm_free < min_req_shm:
728 raise Exception('Not enough free space in /dev/shm. Required '
729 'free space is at least %sM.'
730 % (min_req_shm >> 20))
732 extra_shm = shm_free - min_req_shm
733 shm_max_processes += extra_shm / shm_per_process
734 concurrent_tests = min(cpu_count(), shm_max_processes)
735 print('Found enough resources to run tests with %s cores'
737 elif test_jobs.isdigit():
738 concurrent_tests = int(test_jobs)
742 if run_interactive and concurrent_tests > 1:
743 raise NotImplementedError(
744 'Running tests interactively (DEBUG is gdb or gdbserver or STEP '
745 'is set) in parallel (TEST_JOBS is more than 1) is not supported')
747 parser = argparse.ArgumentParser(description="VPP unit tests")
748 parser.add_argument("-f", "--failfast", action='store_true',
749 help="fast failure flag")
750 parser.add_argument("-d", "--dir", action='append', type=str,
751 help="directory containing test files "
752 "(may be specified multiple times)")
753 args = parser.parse_args()
754 failfast = args.failfast
757 print("Running tests using custom test runner") # debug message
758 filter_file, filter_class, filter_func = parse_test_option()
760 print("Active filters: file=%s, class=%s, function=%s" % (
761 filter_file, filter_class, filter_func))
763 filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
765 ignore_path = os.getenv("VENV_PATH", None)
766 cb = SplitToSuitesCallback(filter_cb)
768 print("Adding tests from directory tree %s" % d)
769 discover_tests(d, cb, ignore_path)
771 # suites are not hashable, need to use list
774 for testcase_suite in cb.suites.values():
775 tests_amount += testcase_suite.countTestCases()
776 suites.append(testcase_suite)
778 print("%s out of %s tests match specified filters" % (
779 tests_amount, tests_amount + cb.filtered.countTestCases()))
781 if not running_extended_tests():
782 print("Not running extended tests (some tests will be skipped)")
784 attempts = retries + 1
786 print("Perform %s attempts to pass the suite..." % attempts)
789 # don't fork if requiring interactive terminal
790 result = VppTestRunner(verbosity=verbose,
792 print_summary=True).run(suites[0])
793 was_successful = result.wasSuccessful()
794 if not was_successful:
795 for test_case_info in result.failed_test_cases_info:
796 handle_failed_suite(test_case_info.logger,
797 test_case_info.tempdir,
798 test_case_info.vpp_pid)
800 test_case_info in result.core_crash_test_cases_info:
801 check_and_handle_core(test_case_info.vpp_bin_path,
802 test_case_info.tempdir,
803 test_case_info.core_crash_test)
805 sys.exit(not was_successful)
808 while len(suites) > 0 and attempts > 0:
809 results = run_forked(suites)
810 exit_code, suites = parse_results(results)
813 print('Test run was successful')
815 print('%s attempt(s) left.' % attempts)