14 from multiprocessing import Process, Pipe, cpu_count
15 from multiprocessing.queues import Queue
16 from multiprocessing.managers import BaseManager
17 from framework import VppTestRunner, running_extended_tests, VppTestCase, \
18 get_testcase_doc_name, PASS, FAIL, ERROR, SKIP, \
20 from debug import spawn_gdb
21 from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
22 colorize, single_line_delim
23 from discover_tests import discover_tests
24 from subprocess import check_output, CalledProcessError
25 from util import check_core_path, get_core_path, is_core_present
27 # timeout which controls how long the child has to finish after seeing
28 # a core dump in test temporary directory. If this is exceeded, parent assumes
29 # that child process is stuck (e.g. waiting for shm mutex, which will never
30 # get unlocked) and kill the child
32 min_req_shm = 536870912 # min 512MB shm required
33 # 128MB per extra process
34 shm_per_process = 134217728
37 class StreamQueue(Queue):
42 sys.__stdout__.flush()
43 sys.__stderr__.flush()
46 return self._writer.fileno()
49 class StreamQueueManager(BaseManager):
53 StreamQueueManager.register('StreamQueue', StreamQueue)
56 class TestResult(dict):
57 def __init__(self, testcase_suite, testcases_by_id=None):
58 super(TestResult, self).__init__()
65 self.testcase_suite = testcase_suite
66 self.testcases = [testcase for testcase in testcase_suite]
67 self.testcases_by_id = testcases_by_id
69 def was_successful(self):
70 return 0 == len(self[FAIL]) == len(self[ERROR]) \
71 and len(self[PASS] + self[SKIP]) \
72 == self.testcase_suite.countTestCases() == len(self[TEST_RUN])
74 def no_tests_run(self):
75 return 0 == len(self[TEST_RUN])
77 def process_result(self, test_id, result):
78 self[result].append(test_id)
80 def suite_from_failed(self):
82 for testcase in self.testcase_suite:
84 if tc_id not in self[PASS] and tc_id not in self[SKIP]:
86 if len(rerun_ids) > 0:
87 return suite_from_failed(self.testcase_suite, rerun_ids)
89 def get_testcase_names(self, test_id):
90 if re.match(r'.+\..+\..+', test_id):
91 test_name = test_id.getDescription()
92 testcase_name = self._get_testcase_doc_name(test_id)
94 # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
95 setup_teardown_match = re.match(
96 r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
97 if setup_teardown_match:
98 test_name, _, _, testcase_name = setup_teardown_match.groups()
99 if len(testcase_name.split('.')) == 2:
100 for key in self.testcases_by_id.keys():
101 if key.startswith(testcase_name):
104 testcase_name = self._get_testcase_doc_name(testcase_name)
107 testcase_name = test_id
109 return testcase_name, test_name
111 def _get_testcase_doc_name(self, test_id):
112 return get_testcase_doc_name(self.testcases_by_id[test_id])
115 def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
116 finished_pipe, result_pipe, logger):
117 sys.stdout = stdouterr_queue
118 sys.stderr = stdouterr_queue
119 VppTestCase.parallel_handler = logger.handlers[0]
120 result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
121 descriptions=descriptions,
123 result_pipe=result_pipe,
124 failfast=failfast).run(suite)
125 finished_pipe.send(result.wasSuccessful())
126 finished_pipe.close()
127 keep_alive_pipe.close()
130 class TestCaseWrapper(object):
131 def __init__(self, testcase_suite, manager):
132 self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
134 self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
135 self.result_parent_end, self.result_child_end = Pipe(duplex=False)
136 self.testcase_suite = testcase_suite
137 self.stdouterr_queue = manager.StreamQueue()
138 self.logger = get_parallel_logger(self.stdouterr_queue)
139 self.child = Process(target=test_runner_wrapper,
140 args=(testcase_suite,
141 self.keep_alive_child_end,
142 self.stdouterr_queue,
143 self.finished_child_end,
144 self.result_child_end,
148 self.last_test_temp_dir = None
149 self.last_test_vpp_binary = None
150 self._last_test = None
151 self.last_test_id = None
153 self.last_heard = time.time()
154 self.core_detected_at = None
155 self.testcases_by_id = {}
156 self.testclasess_with_core = {}
157 for testcase in self.testcase_suite:
158 self.testcases_by_id[testcase.id()] = testcase
159 self.result = TestResult(testcase_suite, self.testcases_by_id)
163 return self._last_test
166 def last_test(self, test_id):
167 self.last_test_id = test_id
168 if test_id in self.testcases_by_id:
169 testcase = self.testcases_by_id[test_id]
170 self._last_test = testcase.shortDescription()
171 if not self._last_test:
172 self._last_test = str(testcase)
174 self._last_test = test_id
176 def add_testclass_with_core(self):
177 if self.last_test_id in self.testcases_by_id:
178 test = self.testcases_by_id[self.last_test_id]
179 class_name = unittest.util.strclass(test.__class__)
180 test_name = "'{}' ({})".format(get_test_description(descriptions,
184 test_name = self.last_test_id
185 class_name = re.match(r'((tearDownClass)|(setUpClass)) '
186 r'\((.+\..+)\)', test_name).groups()[3]
187 if class_name not in self.testclasess_with_core:
188 self.testclasess_with_core[class_name] = (
190 self.last_test_vpp_binary,
191 self.last_test_temp_dir)
193 def close_pipes(self):
194 self.keep_alive_child_end.close()
195 self.finished_child_end.close()
196 self.result_child_end.close()
197 self.keep_alive_parent_end.close()
198 self.finished_parent_end.close()
199 self.result_parent_end.close()
201 def was_successful(self):
202 return self.result.was_successful()
205 def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
208 while read_testcases.is_set() or len(unread_testcases) > 0:
209 if not read_testcase:
210 if len(finished_unread_testcases) > 0:
211 read_testcase = finished_unread_testcases.pop()
212 unread_testcases.remove(read_testcase)
213 elif len(unread_testcases) > 0:
214 read_testcase = unread_testcases.pop()
217 while data is not None:
218 sys.stdout.write(data)
219 data = read_testcase.stdouterr_queue.get()
221 read_testcase.stdouterr_queue.close()
222 finished_unread_testcases.discard(read_testcase)
226 def handle_failed_suite(logger, last_test_temp_dir, vpp_pid):
227 if last_test_temp_dir:
228 # Need to create link in case of a timeout or core dump without failure
229 lttd = os.path.basename(last_test_temp_dir)
230 failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
231 link_path = '%s%s-FAILED' % (failed_dir, lttd)
232 if not os.path.exists(link_path):
233 logger.error("Creating a link to the failed test: %s -> %s" %
235 os.symlink(last_test_temp_dir, link_path)
237 logger.error("Link to the failed test already exists: %s -> %s" %
240 # Report core existence
241 core_path = get_core_path(last_test_temp_dir)
242 if os.path.exists(core_path):
244 "Core-file exists in test temporary directory: %s!" %
246 check_core_path(logger, core_path)
247 logger.debug("Running `file %s':" % core_path)
249 info = check_output(["file", core_path])
251 except CalledProcessError as e:
252 logger.error("Could not run `file' utility on core-file, "
253 "rc=%s" % e.returncode)
256 # Copy api post mortem
257 api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
258 if os.path.isfile(api_post_mortem_path):
259 logger.error("Copying api_post_mortem.%d to %s" %
260 (vpp_pid, last_test_temp_dir))
261 shutil.copy2(api_post_mortem_path, last_test_temp_dir)
264 def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
265 if is_core_present(tempdir):
266 print('VPP core detected in %s. Last test running was %s' %
267 (tempdir, core_crash_test))
268 print(single_line_delim)
269 spawn_gdb(vpp_binary, get_core_path(tempdir))
270 print(single_line_delim)
273 def handle_cores(failed_testcases):
275 for failed_testcase in failed_testcases:
276 tcs_with_core = failed_testcase.testclasess_with_core
277 if len(tcs_with_core) > 0:
278 for test, vpp_binary, tempdir in tcs_with_core.values():
279 check_and_handle_core(vpp_binary, tempdir, test)
282 def process_finished_testsuite(wrapped_testcase_suite,
283 finished_testcase_suites,
284 failed_wrapped_testcases,
286 results.append(wrapped_testcase_suite.result)
287 finished_testcase_suites.add(wrapped_testcase_suite)
289 if failfast and not wrapped_testcase_suite.was_successful():
292 if not wrapped_testcase_suite.was_successful():
293 failed_wrapped_testcases.add(wrapped_testcase_suite)
294 handle_failed_suite(wrapped_testcase_suite.logger,
295 wrapped_testcase_suite.last_test_temp_dir,
296 wrapped_testcase_suite.vpp_pid)
301 def run_forked(testcase_suites):
302 wrapped_testcase_suites = set()
304 # suites are unhashable, need to use list
306 unread_testcases = set()
307 finished_unread_testcases = set()
308 manager = StreamQueueManager()
310 for i in range(concurrent_tests):
311 if len(testcase_suites) > 0:
312 wrapped_testcase_suite = TestCaseWrapper(testcase_suites.pop(0),
314 wrapped_testcase_suites.add(wrapped_testcase_suite)
315 unread_testcases.add(wrapped_testcase_suite)
319 read_from_testcases = threading.Event()
320 read_from_testcases.set()
321 stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
322 args=(unread_testcases,
323 finished_unread_testcases,
324 read_from_testcases))
325 stdouterr_thread.start()
327 failed_wrapped_testcases = set()
329 while len(wrapped_testcase_suites) > 0:
330 finished_testcase_suites = set()
331 for wrapped_testcase_suite in wrapped_testcase_suites:
332 while wrapped_testcase_suite.result_parent_end.poll():
333 wrapped_testcase_suite.result.process_result(
334 *wrapped_testcase_suite.result_parent_end.recv())
335 wrapped_testcase_suite.last_heard = time.time()
337 while wrapped_testcase_suite.keep_alive_parent_end.poll():
338 wrapped_testcase_suite.last_test, \
339 wrapped_testcase_suite.last_test_vpp_binary, \
340 wrapped_testcase_suite.last_test_temp_dir, \
341 wrapped_testcase_suite.vpp_pid = \
342 wrapped_testcase_suite.keep_alive_parent_end.recv()
343 wrapped_testcase_suite.last_heard = time.time()
345 if wrapped_testcase_suite.finished_parent_end.poll():
346 wrapped_testcase_suite.finished_parent_end.recv()
347 wrapped_testcase_suite.last_heard = time.time()
348 stop_run = process_finished_testsuite(
349 wrapped_testcase_suite,
350 finished_testcase_suites,
351 failed_wrapped_testcases,
356 if wrapped_testcase_suite.last_heard + test_timeout < time.time():
358 wrapped_testcase_suite.logger.critical(
359 "Child test runner process timed out "
360 "(last test running was `%s' in `%s')!" %
361 (wrapped_testcase_suite.last_test,
362 wrapped_testcase_suite.last_test_temp_dir))
363 elif not wrapped_testcase_suite.child.is_alive():
365 wrapped_testcase_suite.logger.critical(
366 "Child test runner process unexpectedly died "
367 "(last test running was `%s' in `%s')!" %
368 (wrapped_testcase_suite.last_test,
369 wrapped_testcase_suite.last_test_temp_dir))
370 elif wrapped_testcase_suite.last_test_temp_dir and \
371 wrapped_testcase_suite.last_test_vpp_binary:
372 if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
373 wrapped_testcase_suite.add_testclass_with_core()
374 if wrapped_testcase_suite.core_detected_at is None:
375 wrapped_testcase_suite.core_detected_at = time.time()
376 elif wrapped_testcase_suite.core_detected_at + \
377 core_timeout < time.time():
378 wrapped_testcase_suite.logger.critical(
379 "Child test runner process unresponsive and core-"
380 "file exists in test temporary directory "
381 "(last test running was `%s' in `%s')!" %
382 (wrapped_testcase_suite.last_test,
383 wrapped_testcase_suite.last_test_temp_dir))
387 wrapped_testcase_suite.child.terminate()
389 # terminating the child process tends to leave orphan
391 if wrapped_testcase_suite.vpp_pid:
392 os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
396 wrapped_testcase_suite.result.crashed = True
397 wrapped_testcase_suite.result.process_result(
398 wrapped_testcase_suite.last_test_id, ERROR)
399 stop_run = process_finished_testsuite(
400 wrapped_testcase_suite,
401 finished_testcase_suites,
402 failed_wrapped_testcases,
405 for finished_testcase in finished_testcase_suites:
406 finished_testcase.child.join()
407 finished_testcase.close_pipes()
408 wrapped_testcase_suites.remove(finished_testcase)
409 finished_unread_testcases.add(finished_testcase)
410 finished_testcase.stdouterr_queue.put(None)
412 while len(testcase_suites) > 0:
413 results.append(TestResult(testcase_suites.pop(0)))
414 elif len(testcase_suites) > 0:
415 new_testcase = TestCaseWrapper(testcase_suites.pop(0), manager)
416 wrapped_testcase_suites.add(new_testcase)
417 unread_testcases.add(new_testcase)
419 while len(unread_testcases) > 0:
420 # wait for reader thread to read everything in all loggers
423 read_from_testcases.clear()
424 stdouterr_thread.join(test_timeout)
426 handle_cores(failed_wrapped_testcases)
430 class SplitToSuitesCallback:
431 def __init__(self, filter_callback):
433 self.suite_name = 'default'
434 self.filter_callback = filter_callback
435 self.filtered = unittest.TestSuite()
437 def __call__(self, file_name, cls, method):
438 test_method = cls(method)
439 if self.filter_callback(file_name, cls.__name__, method):
440 self.suite_name = file_name + cls.__name__
441 if self.suite_name not in self.suites:
442 self.suites[self.suite_name] = unittest.TestSuite()
443 self.suites[self.suite_name].addTest(test_method)
446 self.filtered.addTest(test_method)
452 def parse_test_option():
453 f = os.getenv(test_option, None)
454 filter_file_name = None
455 filter_class_name = None
456 filter_func_name = None
461 raise Exception("Unrecognized %s option: %s" %
464 if parts[2] not in ('*', ''):
465 filter_func_name = parts[2]
466 if parts[1] not in ('*', ''):
467 filter_class_name = parts[1]
468 if parts[0] not in ('*', ''):
469 if parts[0].startswith('test_'):
470 filter_file_name = parts[0]
472 filter_file_name = 'test_%s' % parts[0]
474 if f.startswith('test_'):
477 filter_file_name = 'test_%s' % f
479 filter_file_name = '%s.py' % filter_file_name
480 return filter_file_name, filter_class_name, filter_func_name
483 def filter_tests(tests, filter_cb):
484 result = unittest.suite.TestSuite()
486 if isinstance(t, unittest.suite.TestSuite):
487 # this is a bunch of tests, recursively filter...
488 x = filter_tests(t, filter_cb)
489 if x.countTestCases() > 0:
491 elif isinstance(t, unittest.TestCase):
492 # this is a single test
493 parts = t.id().split('.')
494 # t.id() for common cases like this:
495 # test_classifier.TestClassifier.test_acl_ip
496 # apply filtering only if it is so
498 if not filter_cb(parts[0], parts[1], parts[2]):
502 # unexpected object, don't touch it
507 class FilterByTestOption:
508 def __init__(self, filter_file_name, filter_class_name, filter_func_name):
509 self.filter_file_name = filter_file_name
510 self.filter_class_name = filter_class_name
511 self.filter_func_name = filter_func_name
513 def __call__(self, file_name, class_name, func_name):
514 if self.filter_file_name:
515 fn_match = fnmatch.fnmatch(file_name, self.filter_file_name)
518 if self.filter_class_name and class_name != self.filter_class_name:
520 if self.filter_func_name and func_name != self.filter_func_name:
525 class FilterByClassList:
526 def __init__(self, classes_with_filenames):
527 self.classes_with_filenames = classes_with_filenames
529 def __call__(self, file_name, class_name, func_name):
530 return '.'.join([file_name, class_name]) in self.classes_with_filenames
533 def suite_from_failed(suite, failed):
534 failed = {x.rsplit('.', 1)[0] for x in failed}
535 filter_cb = FilterByClassList(failed)
536 suite = filter_tests(suite, filter_cb)
540 class AllResults(dict):
542 super(AllResults, self).__init__()
543 self.all_testcases = 0
544 self.results_per_suite = []
551 self.testsuites_no_tests_run = []
553 def add_results(self, result):
554 self.results_per_suite.append(result)
555 result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN]
556 for result_type in result_types:
557 self[result_type] += len(result[result_type])
559 def add_result(self, result):
561 self.all_testcases += result.testcase_suite.countTestCases()
562 self.add_results(result)
564 if result.no_tests_run():
565 self.testsuites_no_tests_run.append(result.testcase_suite)
570 elif not result.was_successful():
574 if concurrent_tests == 1:
575 self.rerun.append(result.suite_from_failed())
577 self.rerun.append(result.testcase_suite)
581 def print_results(self):
583 print(double_line_delim)
584 print('TEST RESULTS:')
585 print(' Scheduled tests: {}'.format(self.all_testcases))
586 print(' Executed tests: {}'.format(self[TEST_RUN]))
587 print(' Passed tests: {}'.format(
588 colorize(str(self[PASS]), GREEN)))
590 print(' Skipped tests: {}'.format(
591 colorize(str(self[SKIP]), YELLOW)))
592 if self.not_executed > 0:
593 print(' Not Executed tests: {}'.format(
594 colorize(str(self.not_executed), RED)))
596 print(' Failures: {}'.format(
597 colorize(str(self[FAIL]), RED)))
599 print(' Errors: {}'.format(
600 colorize(str(self[ERROR]), RED)))
602 if self.all_failed > 0:
603 print('FAILURES AND ERRORS IN TESTS:')
604 for result in self.results_per_suite:
605 failed_testcase_ids = result[FAIL]
606 errored_testcase_ids = result[ERROR]
607 old_testcase_name = None
608 if len(failed_testcase_ids) or len(errored_testcase_ids):
609 for failed_test_id in failed_testcase_ids:
610 new_testcase_name, test_name = \
611 result.get_testcase_names(failed_test_id)
612 if new_testcase_name != old_testcase_name:
613 print(' Testcase name: {}'.format(
614 colorize(new_testcase_name, RED)))
615 old_testcase_name = new_testcase_name
616 print(' FAILURE: {}'.format(
617 colorize(test_name, RED)))
618 for failed_test_id in errored_testcase_ids:
619 new_testcase_name, test_name = \
620 result.get_testcase_names(failed_test_id)
621 if new_testcase_name != old_testcase_name:
622 print(' Testcase name: {}'.format(
623 colorize(new_testcase_name, RED)))
624 old_testcase_name = new_testcase_name
625 print(' ERROR: {}'.format(
626 colorize(test_name, RED)))
627 if len(self.testsuites_no_tests_run) > 0:
628 print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
630 for testsuite in self.testsuites_no_tests_run:
631 for testcase in testsuite:
632 tc_classes.add(get_testcase_doc_name(testcase))
633 for tc_class in tc_classes:
634 print(' {}'.format(colorize(tc_class, RED)))
636 print(double_line_delim)
640 def not_executed(self):
641 return self.all_testcases - self[TEST_RUN]
644 def all_failed(self):
645 return self[FAIL] + self[ERROR]
648 def parse_results(results):
650 Prints the number of scheduled, executed, not executed, passed, failed,
651 errored and skipped tests and details about failed and errored tests.
653 Also returns all suites where any test failed.
659 results_per_suite = AllResults()
662 for result in results:
663 result_code = results_per_suite.add_result(result)
666 elif result_code == -1:
669 results_per_suite.print_results()
677 return return_code, results_per_suite.rerun
680 def parse_digit_env(env_var, default):
681 value = os.getenv(env_var, default)
686 print('WARNING: unsupported value "%s" for env var "%s",'
687 'defaulting to %s' % (value, env_var, default))
692 if __name__ == '__main__':
694 verbose = parse_digit_env("V", 0)
696 test_timeout = parse_digit_env("TIMEOUT", 600) # default = 10 minutes
698 retries = parse_digit_env("RETRIES", 0)
700 debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
702 debug_core = os.getenv("DEBUG", "").lower() == "core"
704 step = os.getenv("STEP", "n").lower() in ("y", "yes", "1")
706 run_interactive = debug or step
708 test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
709 if test_jobs == 'auto':
712 print('Interactive mode required, running on one core')
714 shm_free = psutil.disk_usage('/dev/shm').free
715 shm_max_processes = 1
716 if shm_free < min_req_shm:
717 raise Exception('Not enough free space in /dev/shm. Required '
718 'free space is at least %sM.'
719 % (min_req_shm >> 20))
721 extra_shm = shm_free - min_req_shm
722 shm_max_processes += extra_shm / shm_per_process
723 concurrent_tests = min(cpu_count(), shm_max_processes)
724 print('Found enough resources to run tests with %s cores'
726 elif test_jobs.isdigit():
727 concurrent_tests = int(test_jobs)
731 if run_interactive and concurrent_tests > 1:
732 raise NotImplementedError(
733 'Running tests interactively (DEBUG is gdb or gdbserver or STEP '
734 'is set) in parallel (TEST_JOBS is more than 1) is not supported')
736 parser = argparse.ArgumentParser(description="VPP unit tests")
737 parser.add_argument("-f", "--failfast", action='store_true',
738 help="fast failure flag")
739 parser.add_argument("-d", "--dir", action='append', type=str,
740 help="directory containing test files "
741 "(may be specified multiple times)")
742 args = parser.parse_args()
743 failfast = args.failfast
746 print("Running tests using custom test runner") # debug message
747 filter_file, filter_class, filter_func = parse_test_option()
749 print("Active filters: file=%s, class=%s, function=%s" % (
750 filter_file, filter_class, filter_func))
752 filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
754 cb = SplitToSuitesCallback(filter_cb)
756 print("Adding tests from directory tree %s" % d)
757 discover_tests(d, cb)
759 # suites are not hashable, need to use list
762 for testcase_suite in cb.suites.values():
763 tests_amount += testcase_suite.countTestCases()
764 suites.append(testcase_suite)
766 if concurrent_tests == 1:
767 new_suite = unittest.TestSuite()
769 new_suite.addTests(suite)
773 print("%s out of %s tests match specified filters" % (
774 tests_amount, tests_amount + cb.filtered.countTestCases()))
776 if not running_extended_tests():
777 print("Not running extended tests (some tests will be skipped)")
779 attempts = retries + 1
781 print("Perform %s attempts to pass the suite..." % attempts)
784 # don't fork if requiring interactive terminal
785 result = VppTestRunner(verbosity=verbose, failfast=failfast)\
787 was_successful = result.wasSuccessful()
788 if not was_successful:
789 for test_case_info in result.failed_test_cases_info:
790 handle_failed_suite(test_case_info.logger,
791 test_case_info.tempdir,
792 test_case_info.vpp_pid)
794 test_case_info in result.core_crash_test_cases_info:
795 check_and_handle_core(test_case_info.vpp_bin_path,
796 test_case_info.tempdir,
797 test_case_info.core_crash_test)
799 sys.exit(not was_successful)
802 while len(suites) > 0 and attempts > 0:
803 results = run_forked(suites)
804 exit_code, suites = parse_results(results)
807 print('Test run was successful')
809 print('%s attempt(s) left.' % attempts)